From 76dcda8259d60a822a2617cab6ebdd4e12d3fcb5 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 14 Apr 2015 14:16:29 +0200 Subject: [PATCH] Revert "Ensure channelReadComplete() is called only when necessary" This reverts commit 8b2fb2b985cd969719f23da689eb3dc67282070a. --- .../AbstractChannelHandlerContext.java | 122 +++++++----------- .../channel/DefaultChannelPipelineTest.java | 10 +- 2 files changed, 52 insertions(+), 80 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java index cad17c84eb..8b63e318be 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java @@ -39,27 +39,13 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme private final AbstractChannel channel; private final DefaultChannelPipeline pipeline; private final String name; - - /** - * Set when a user calls {@link #fireChannelRead(Object)} on this context. - * Cleared when a user calls {@link #fireChannelReadComplete()} on this context. - * - * See {@link #fireChannelReadComplete()} to understand how this flag is used. - */ - private volatile boolean firedChannelRead; - - /** - * Set when a user calls {@link #read()} on this context. - * Cleared when a user calls {@link #fireChannelReadComplete()} on this context. - * - * See {@link #fireChannelReadComplete()} to understand how this flag is used. - */ - private volatile boolean invokedRead; - - /** - * {@code true} if and only if this context has been removed from the pipeline. - */ private boolean removed; + // This does not need to be volatile as we always check and set this flag from EventExecutor thread. This means + // that at worse we will submit a task for channelReadComplete() that may do nothing if nextChannelReadInvoked + // is false. This is prefered to introduce another volatile flag because often fireChannelRead(...) and + // fireChannelReadComplete() are triggered from the EventExecutor thread anyway. + private boolean nextChannelReadInvoked; + private boolean readInvoked; // Will be set to null if no child executor should be used, otherwise it will be set to the // child executor. @@ -308,16 +294,15 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme throw new NullPointerException("msg"); } - firedChannelRead = true; final AbstractChannelHandlerContext next = findContextInbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { - next.invokeChannelRead(msg); + invokeNextChannelRead(next, msg); } else { executor.execute(new OneTimeTask() { @Override public void run() { - next.invokeChannelRead(msg); + invokeNextChannelRead(next, msg); } }); } @@ -332,61 +317,29 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme } } + private void invokeNextChannelRead(AbstractChannelHandlerContext next, Object msg) { + nextChannelReadInvoked = true; + next.invokeChannelRead(msg); + } + @Override public ChannelHandlerContext fireChannelReadComplete() { - /** - * If the handler of this context did not produce any messages via {@link #fireChannelRead(Object)}, - * there's no reason to trigger {@code channelReadComplete()} even if the handler called this method. - * - * This is pretty common for the handlers that transform multiple messages into one message, - * such as byte-to-message decoder and message aggregators. - */ - if (firedChannelRead) { - // The handler of this context produced a message, so we are OK to trigger this event. - firedChannelRead = false; - invokedRead = false; - - final AbstractChannelHandlerContext next = findContextInbound(); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeChannelReadComplete(); - } else { - Runnable task = next.invokeChannelReadCompleteTask; - if (task == null) { - next.invokeChannelReadCompleteTask = task = new Runnable() { - @Override - public void run() { - next.invokeChannelReadComplete(); - } - }; - } - executor.execute(task); - } - return this; - } - - /** - * At this point, we are sure the handler of this context did not produce anything and - * we suppressed the {@code channelReadComplete()} event. - * - * If the next handler invoked {@link #read()} to read something but nothing was produced - * by the handler of this context, someone has to issue another {@link #read()} operation - * until the handler of this context produces something. - * - * Why? Because otherwise the next handler will not receive {@code channelRead()} nor - * {@code channelReadComplete()} event at all for the {@link #read()} operation it issued. - */ - if (invokedRead && !channel().config().isAutoRead()) { - /** - * The next (or upstream) handler invoked {@link #read()}, but it didn't get any - * {@code channelRead()} event. We should read once more, so that the handler of the current - * context have a chance to produce something. - */ - read(); + final AbstractChannelHandlerContext next = findContextInbound(); + EventExecutor executor = next.executor(); + if (executor.inEventLoop()) { + invokeNextChannelReadComplete(next); } else { - invokedRead = false; + Runnable task = next.invokeChannelReadCompleteTask; + if (task == null) { + next.invokeChannelReadCompleteTask = task = new Runnable() { + @Override + public void run() { + invokeNextChannelReadComplete(next); + } + }; + } + executor.execute(task); } - return this; } @@ -398,6 +351,24 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme } } + private void invokeNextChannelReadComplete(AbstractChannelHandlerContext next) { + if (nextChannelReadInvoked) { + nextChannelReadInvoked = false; + readInvoked = false; + + next.invokeChannelReadComplete(); + } else if (readInvoked && !channel().config().isAutoRead()) { + // As this context not belongs to the last handler in the pipeline and autoRead is false we need to + // trigger read again as otherwise we may stop reading before a full message was passed on to the + // pipeline. This is especially true for all kind of decoders that usually buffer bytes/messages until + // they are able to compose a full message that is passed via fireChannelRead(...) and so be consumed + // be the rest of the handlers in the pipeline. + read(); + } else { + readInvoked = false; + } + } + @Override public ChannelHandlerContext fireChannelWritabilityChanged() { final AbstractChannelHandlerContext next = findContextInbound(); @@ -637,7 +608,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme @Override public ChannelHandlerContext read() { - invokedRead = true; final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { @@ -654,11 +624,13 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme } executor.execute(task); } + return this; } private void invokeRead() { try { + readInvoked = true; ((ChannelOutboundHandler) handler()).read(this); } catch (Throwable t) { notifyHandlerException(t); diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java index 93ec1ad18c..18032e3a67 100644 --- a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java +++ b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java @@ -540,19 +540,19 @@ public class DefaultChannelPipelineTest { } @Test - public void testSupressChannelReadComplete() throws Exception { - testSupressChannelReadComplete0(false); + public void testSurpressChannelReadComplete() throws Exception { + testSurpressChannelReadComplete0(false); } @Test - public void testSupressChannelReadCompleteDifferentExecutors() throws Exception { - testSupressChannelReadComplete0(true); + public void testSurpressChannelReadCompleteDifferentExecutors() throws Exception { + testSurpressChannelReadComplete0(true); } // See: // https://github.com/netty/netty/pull/3263 // https://github.com/netty/netty/pull/3272 - private static void testSupressChannelReadComplete0(boolean executors) throws Exception { + private static void testSurpressChannelReadComplete0(boolean executors) throws Exception { final AtomicInteger read1 = new AtomicInteger(); final AtomicInteger read2 = new AtomicInteger(); final AtomicInteger readComplete1 = new AtomicInteger();