Revert "Ensure channelReadComplete() is called only when necessary"

This reverts commit 8b2fb2b985.
This commit is contained in:
Norman Maurer 2015-04-14 14:16:29 +02:00
parent f35158f402
commit 76dcda8259
2 changed files with 52 additions and 80 deletions

View File

@ -39,27 +39,13 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
private final AbstractChannel channel; private final AbstractChannel channel;
private final DefaultChannelPipeline pipeline; private final DefaultChannelPipeline pipeline;
private final String name; private final String name;
/**
* Set when a user calls {@link #fireChannelRead(Object)} on this context.
* Cleared when a user calls {@link #fireChannelReadComplete()} on this context.
*
* See {@link #fireChannelReadComplete()} to understand how this flag is used.
*/
private volatile boolean firedChannelRead;
/**
* Set when a user calls {@link #read()} on this context.
* Cleared when a user calls {@link #fireChannelReadComplete()} on this context.
*
* See {@link #fireChannelReadComplete()} to understand how this flag is used.
*/
private volatile boolean invokedRead;
/**
* {@code true} if and only if this context has been removed from the pipeline.
*/
private boolean removed; private boolean removed;
// This does not need to be volatile as we always check and set this flag from EventExecutor thread. This means
// that at worse we will submit a task for channelReadComplete() that may do nothing if nextChannelReadInvoked
// is false. This is prefered to introduce another volatile flag because often fireChannelRead(...) and
// fireChannelReadComplete() are triggered from the EventExecutor thread anyway.
private boolean nextChannelReadInvoked;
private boolean readInvoked;
// Will be set to null if no child executor should be used, otherwise it will be set to the // Will be set to null if no child executor should be used, otherwise it will be set to the
// child executor. // child executor.
@ -308,16 +294,15 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
throw new NullPointerException("msg"); throw new NullPointerException("msg");
} }
firedChannelRead = true;
final AbstractChannelHandlerContext next = findContextInbound(); final AbstractChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (executor.inEventLoop()) {
next.invokeChannelRead(msg); invokeNextChannelRead(next, msg);
} else { } else {
executor.execute(new OneTimeTask() { executor.execute(new OneTimeTask() {
@Override @Override
public void run() { public void run() {
next.invokeChannelRead(msg); invokeNextChannelRead(next, msg);
} }
}); });
} }
@ -332,61 +317,29 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
} }
} }
private void invokeNextChannelRead(AbstractChannelHandlerContext next, Object msg) {
nextChannelReadInvoked = true;
next.invokeChannelRead(msg);
}
@Override @Override
public ChannelHandlerContext fireChannelReadComplete() { public ChannelHandlerContext fireChannelReadComplete() {
/** final AbstractChannelHandlerContext next = findContextInbound();
* If the handler of this context did not produce any messages via {@link #fireChannelRead(Object)}, EventExecutor executor = next.executor();
* there's no reason to trigger {@code channelReadComplete()} even if the handler called this method. if (executor.inEventLoop()) {
* invokeNextChannelReadComplete(next);
* This is pretty common for the handlers that transform multiple messages into one message,
* such as byte-to-message decoder and message aggregators.
*/
if (firedChannelRead) {
// The handler of this context produced a message, so we are OK to trigger this event.
firedChannelRead = false;
invokedRead = false;
final AbstractChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelReadComplete();
} else {
Runnable task = next.invokeChannelReadCompleteTask;
if (task == null) {
next.invokeChannelReadCompleteTask = task = new Runnable() {
@Override
public void run() {
next.invokeChannelReadComplete();
}
};
}
executor.execute(task);
}
return this;
}
/**
* At this point, we are sure the handler of this context did not produce anything and
* we suppressed the {@code channelReadComplete()} event.
*
* If the next handler invoked {@link #read()} to read something but nothing was produced
* by the handler of this context, someone has to issue another {@link #read()} operation
* until the handler of this context produces something.
*
* Why? Because otherwise the next handler will not receive {@code channelRead()} nor
* {@code channelReadComplete()} event at all for the {@link #read()} operation it issued.
*/
if (invokedRead && !channel().config().isAutoRead()) {
/**
* The next (or upstream) handler invoked {@link #read()}, but it didn't get any
* {@code channelRead()} event. We should read once more, so that the handler of the current
* context have a chance to produce something.
*/
read();
} else { } else {
invokedRead = false; Runnable task = next.invokeChannelReadCompleteTask;
if (task == null) {
next.invokeChannelReadCompleteTask = task = new Runnable() {
@Override
public void run() {
invokeNextChannelReadComplete(next);
}
};
}
executor.execute(task);
} }
return this; return this;
} }
@ -398,6 +351,24 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
} }
} }
private void invokeNextChannelReadComplete(AbstractChannelHandlerContext next) {
if (nextChannelReadInvoked) {
nextChannelReadInvoked = false;
readInvoked = false;
next.invokeChannelReadComplete();
} else if (readInvoked && !channel().config().isAutoRead()) {
// As this context not belongs to the last handler in the pipeline and autoRead is false we need to
// trigger read again as otherwise we may stop reading before a full message was passed on to the
// pipeline. This is especially true for all kind of decoders that usually buffer bytes/messages until
// they are able to compose a full message that is passed via fireChannelRead(...) and so be consumed
// be the rest of the handlers in the pipeline.
read();
} else {
readInvoked = false;
}
}
@Override @Override
public ChannelHandlerContext fireChannelWritabilityChanged() { public ChannelHandlerContext fireChannelWritabilityChanged() {
final AbstractChannelHandlerContext next = findContextInbound(); final AbstractChannelHandlerContext next = findContextInbound();
@ -637,7 +608,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
@Override @Override
public ChannelHandlerContext read() { public ChannelHandlerContext read() {
invokedRead = true;
final AbstractChannelHandlerContext next = findContextOutbound(); final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (executor.inEventLoop()) {
@ -654,11 +624,13 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
} }
executor.execute(task); executor.execute(task);
} }
return this; return this;
} }
private void invokeRead() { private void invokeRead() {
try { try {
readInvoked = true;
((ChannelOutboundHandler) handler()).read(this); ((ChannelOutboundHandler) handler()).read(this);
} catch (Throwable t) { } catch (Throwable t) {
notifyHandlerException(t); notifyHandlerException(t);

View File

@ -540,19 +540,19 @@ public class DefaultChannelPipelineTest {
} }
@Test @Test
public void testSupressChannelReadComplete() throws Exception { public void testSurpressChannelReadComplete() throws Exception {
testSupressChannelReadComplete0(false); testSurpressChannelReadComplete0(false);
} }
@Test @Test
public void testSupressChannelReadCompleteDifferentExecutors() throws Exception { public void testSurpressChannelReadCompleteDifferentExecutors() throws Exception {
testSupressChannelReadComplete0(true); testSurpressChannelReadComplete0(true);
} }
// See: // See:
// https://github.com/netty/netty/pull/3263 // https://github.com/netty/netty/pull/3263
// https://github.com/netty/netty/pull/3272 // https://github.com/netty/netty/pull/3272
private static void testSupressChannelReadComplete0(boolean executors) throws Exception { private static void testSurpressChannelReadComplete0(boolean executors) throws Exception {
final AtomicInteger read1 = new AtomicInteger(); final AtomicInteger read1 = new AtomicInteger();
final AtomicInteger read2 = new AtomicInteger(); final AtomicInteger read2 = new AtomicInteger();
final AtomicInteger readComplete1 = new AtomicInteger(); final AtomicInteger readComplete1 = new AtomicInteger();