Ensure channelReadComplete() is called only when necessary
Related:
- 375b9e1307
Motivation:
Even if a handler called ctx.fireChannelReadComplete(), the next handler
should not get its channelReadComplete() invoked if fireChannelRead()
was not invoked before.
Modifications:
- Ensure channelReadComplete() is invoked only when the handler of the
current context actually produced a message, because otherwise there's
no point of triggering channelReadComplete().
i.e. channelReadComplete() must follow channelRead().
- Fix a bug where ctx.read() was not called if the handler of the
current context did not produce any message, making the connection
stall. Read the new comment for more information.
Result:
- channelReadComplete() is invoked only when it makes sense.
- No stale connection
This commit is contained in:
parent
1c01dd1efc
commit
8b2fb2b985
@ -39,13 +39,27 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
|||||||
private final AbstractChannel channel;
|
private final AbstractChannel channel;
|
||||||
private final DefaultChannelPipeline pipeline;
|
private final DefaultChannelPipeline pipeline;
|
||||||
private final String name;
|
private final String name;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set when a user calls {@link #fireChannelRead(Object)} on this context.
|
||||||
|
* Cleared when a user calls {@link #fireChannelReadComplete()} on this context.
|
||||||
|
*
|
||||||
|
* See {@link #fireChannelReadComplete()} to understand how this flag is used.
|
||||||
|
*/
|
||||||
|
private volatile boolean firedChannelRead;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set when a user calls {@link #read()} on this context.
|
||||||
|
* Cleared when a user calls {@link #fireChannelReadComplete()} on this context.
|
||||||
|
*
|
||||||
|
* See {@link #fireChannelReadComplete()} to understand how this flag is used.
|
||||||
|
*/
|
||||||
|
private volatile boolean invokedRead;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@code true} if and only if this context has been removed from the pipeline.
|
||||||
|
*/
|
||||||
private boolean removed;
|
private boolean removed;
|
||||||
// This does not need to be volatile as we always check and set this flag from EventExecutor thread. This means
|
|
||||||
// that at worse we will submit a task for channelReadComplete() that may do nothing if nextChannelReadInvoked
|
|
||||||
// is false. This is prefered to introduce another volatile flag because often fireChannelRead(...) and
|
|
||||||
// fireChannelReadComplete() are triggered from the EventExecutor thread anyway.
|
|
||||||
private boolean nextChannelReadInvoked;
|
|
||||||
private boolean readInvoked;
|
|
||||||
|
|
||||||
// Will be set to null if no child executor should be used, otherwise it will be set to the
|
// Will be set to null if no child executor should be used, otherwise it will be set to the
|
||||||
// child executor.
|
// child executor.
|
||||||
@ -294,15 +308,16 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
|||||||
throw new NullPointerException("msg");
|
throw new NullPointerException("msg");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
firedChannelRead = true;
|
||||||
final AbstractChannelHandlerContext next = findContextInbound();
|
final AbstractChannelHandlerContext next = findContextInbound();
|
||||||
EventExecutor executor = next.executor();
|
EventExecutor executor = next.executor();
|
||||||
if (executor.inEventLoop()) {
|
if (executor.inEventLoop()) {
|
||||||
invokeNextChannelRead(next, msg);
|
next.invokeChannelRead(msg);
|
||||||
} else {
|
} else {
|
||||||
executor.execute(new OneTimeTask() {
|
executor.execute(new OneTimeTask() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
invokeNextChannelRead(next, msg);
|
next.invokeChannelRead(msg);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -317,29 +332,61 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void invokeNextChannelRead(AbstractChannelHandlerContext next, Object msg) {
|
|
||||||
nextChannelReadInvoked = true;
|
|
||||||
next.invokeChannelRead(msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelHandlerContext fireChannelReadComplete() {
|
public ChannelHandlerContext fireChannelReadComplete() {
|
||||||
final AbstractChannelHandlerContext next = findContextInbound();
|
/**
|
||||||
EventExecutor executor = next.executor();
|
* If the handler of this context did not produce any messages via {@link #fireChannelRead(Object)},
|
||||||
if (executor.inEventLoop()) {
|
* there's no reason to trigger {@code channelReadComplete()} even if the handler called this method.
|
||||||
invokeNextChannelReadComplete(next);
|
*
|
||||||
} else {
|
* This is pretty common for the handlers that transform multiple messages into one message,
|
||||||
Runnable task = next.invokeChannelReadCompleteTask;
|
* such as byte-to-message decoder and message aggregators.
|
||||||
if (task == null) {
|
*/
|
||||||
next.invokeChannelReadCompleteTask = task = new Runnable() {
|
if (firedChannelRead) {
|
||||||
@Override
|
// The handler of this context produced a message, so we are OK to trigger this event.
|
||||||
public void run() {
|
firedChannelRead = false;
|
||||||
invokeNextChannelReadComplete(next);
|
invokedRead = false;
|
||||||
}
|
|
||||||
};
|
final AbstractChannelHandlerContext next = findContextInbound();
|
||||||
|
EventExecutor executor = next.executor();
|
||||||
|
if (executor.inEventLoop()) {
|
||||||
|
next.invokeChannelReadComplete();
|
||||||
|
} else {
|
||||||
|
Runnable task = next.invokeChannelReadCompleteTask;
|
||||||
|
if (task == null) {
|
||||||
|
next.invokeChannelReadCompleteTask = task = new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
next.invokeChannelReadComplete();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
executor.execute(task);
|
||||||
}
|
}
|
||||||
executor.execute(task);
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* At this point, we are sure the handler of this context did not produce anything and
|
||||||
|
* we suppressed the {@code channelReadComplete()} event.
|
||||||
|
*
|
||||||
|
* If the next handler invoked {@link #read()} to read something but nothing was produced
|
||||||
|
* by the handler of this context, someone has to issue another {@link #read()} operation
|
||||||
|
* until the handler of this context produces something.
|
||||||
|
*
|
||||||
|
* Why? Because otherwise the next handler will not receive {@code channelRead()} nor
|
||||||
|
* {@code channelReadComplete()} event at all for the {@link #read()} operation it issued.
|
||||||
|
*/
|
||||||
|
if (invokedRead && !channel().config().isAutoRead()) {
|
||||||
|
/**
|
||||||
|
* The next (or upstream) handler invoked {@link #read()}, but it didn't get any
|
||||||
|
* {@code channelRead()} event. We should read once more, so that the handler of the current
|
||||||
|
* context have a chance to produce something.
|
||||||
|
*/
|
||||||
|
read();
|
||||||
|
} else {
|
||||||
|
invokedRead = false;
|
||||||
|
}
|
||||||
|
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -351,24 +398,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void invokeNextChannelReadComplete(AbstractChannelHandlerContext next) {
|
|
||||||
if (nextChannelReadInvoked) {
|
|
||||||
nextChannelReadInvoked = false;
|
|
||||||
readInvoked = false;
|
|
||||||
|
|
||||||
next.invokeChannelReadComplete();
|
|
||||||
} else if (readInvoked && !channel().config().isAutoRead()) {
|
|
||||||
// As this context not belongs to the last handler in the pipeline and autoRead is false we need to
|
|
||||||
// trigger read again as otherwise we may stop reading before a full message was passed on to the
|
|
||||||
// pipeline. This is especially true for all kind of decoders that usually buffer bytes/messages until
|
|
||||||
// they are able to compose a full message that is passed via fireChannelRead(...) and so be consumed
|
|
||||||
// be the rest of the handlers in the pipeline.
|
|
||||||
read();
|
|
||||||
} else {
|
|
||||||
readInvoked = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelHandlerContext fireChannelWritabilityChanged() {
|
public ChannelHandlerContext fireChannelWritabilityChanged() {
|
||||||
final AbstractChannelHandlerContext next = findContextInbound();
|
final AbstractChannelHandlerContext next = findContextInbound();
|
||||||
@ -608,6 +637,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelHandlerContext read() {
|
public ChannelHandlerContext read() {
|
||||||
|
invokedRead = true;
|
||||||
final AbstractChannelHandlerContext next = findContextOutbound();
|
final AbstractChannelHandlerContext next = findContextOutbound();
|
||||||
EventExecutor executor = next.executor();
|
EventExecutor executor = next.executor();
|
||||||
if (executor.inEventLoop()) {
|
if (executor.inEventLoop()) {
|
||||||
@ -624,13 +654,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
|||||||
}
|
}
|
||||||
executor.execute(task);
|
executor.execute(task);
|
||||||
}
|
}
|
||||||
|
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void invokeRead() {
|
private void invokeRead() {
|
||||||
try {
|
try {
|
||||||
readInvoked = true;
|
|
||||||
((ChannelOutboundHandler) handler()).read(this);
|
((ChannelOutboundHandler) handler()).read(this);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
notifyHandlerException(t);
|
notifyHandlerException(t);
|
||||||
|
@ -540,19 +540,19 @@ public class DefaultChannelPipelineTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSurpressChannelReadComplete() throws Exception {
|
public void testSupressChannelReadComplete() throws Exception {
|
||||||
testSurpressChannelReadComplete0(false);
|
testSupressChannelReadComplete0(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSurpressChannelReadCompleteDifferentExecutors() throws Exception {
|
public void testSupressChannelReadCompleteDifferentExecutors() throws Exception {
|
||||||
testSurpressChannelReadComplete0(true);
|
testSupressChannelReadComplete0(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
// See:
|
// See:
|
||||||
// https://github.com/netty/netty/pull/3263
|
// https://github.com/netty/netty/pull/3263
|
||||||
// https://github.com/netty/netty/pull/3272
|
// https://github.com/netty/netty/pull/3272
|
||||||
private static void testSurpressChannelReadComplete0(boolean executors) throws Exception {
|
private static void testSupressChannelReadComplete0(boolean executors) throws Exception {
|
||||||
final AtomicInteger read1 = new AtomicInteger();
|
final AtomicInteger read1 = new AtomicInteger();
|
||||||
final AtomicInteger read2 = new AtomicInteger();
|
final AtomicInteger read2 = new AtomicInteger();
|
||||||
final AtomicInteger readComplete1 = new AtomicInteger();
|
final AtomicInteger readComplete1 = new AtomicInteger();
|
||||||
|
Loading…
Reference in New Issue
Block a user