diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java index a206542333..97ecb4638e 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java @@ -200,11 +200,11 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter ByteBuf bytes = buf.readBytes(readable); buf.release(); ctx.fireChannelRead(bytes); - ctx.fireChannelReadComplete(); } else { buf.release(); } cumulation = null; + ctx.fireChannelReadComplete(); handlerRemoved0(ctx); } diff --git a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java index d1f37ef1af..cad17c84eb 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java @@ -40,22 +40,13 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme private final DefaultChannelPipeline pipeline; private final String name; - /** - * Set when the {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)} of - * this context's handler is invoked. - * Cleared when a user calls {@link #fireChannelReadComplete()} on this context. - * - * See {@link #fireChannelReadComplete()} to understand how this flag is used. - */ - boolean invokedThisChannelRead; - /** * Set when a user calls {@link #fireChannelRead(Object)} on this context. * Cleared when a user calls {@link #fireChannelReadComplete()} on this context. * * See {@link #fireChannelReadComplete()} to understand how this flag is used. */ - private volatile boolean invokedNextChannelRead; + private volatile boolean firedChannelRead; /** * Set when a user calls {@link #read()} on this context. @@ -63,7 +54,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme * * See {@link #fireChannelReadComplete()} to understand how this flag is used. */ - private volatile boolean invokedPrevRead; + private volatile boolean invokedRead; /** * {@code true} if and only if this context has been removed from the pipeline. @@ -317,7 +308,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme throw new NullPointerException("msg"); } - invokedNextChannelRead = true; + firedChannelRead = true; final AbstractChannelHandlerContext next = findContextInbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { @@ -334,7 +325,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme } private void invokeChannelRead(Object msg) { - invokedThisChannelRead = true; try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { @@ -350,15 +340,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme * * This is pretty common for the handlers that transform multiple messages into one message, * such as byte-to-message decoder and message aggregators. - * - * Only one exception is when nobody invoked the channelRead() method of this context's handler. - * It means the handler has been added later dynamically. */ - if (invokedNextChannelRead || // The handler of this context produced a message, or - !invokedThisChannelRead) { // it is not required to produce a message to trigger the event. - - invokedNextChannelRead = false; - invokedPrevRead = false; + if (firedChannelRead) { + // The handler of this context produced a message, so we are OK to trigger this event. + firedChannelRead = false; + invokedRead = false; final AbstractChannelHandlerContext next = findContextInbound(); EventExecutor executor = next.executor(); @@ -390,7 +376,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme * Why? Because otherwise the next handler will not receive {@code channelRead()} nor * {@code channelReadComplete()} event at all for the {@link #read()} operation it issued. */ - if (invokedPrevRead && !channel().config().isAutoRead()) { + if (invokedRead && !channel().config().isAutoRead()) { /** * The next (or upstream) handler invoked {@link #read()}, but it didn't get any * {@code channelRead()} event. We should read once more, so that the handler of the current @@ -398,7 +384,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme */ read(); } else { - invokedPrevRead = false; + invokedRead = false; } return this; @@ -651,7 +637,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme @Override public ChannelHandlerContext read() { - invokedPrevRead = true; + invokedRead = true; final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index b8b533dbb8..80146ff243 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -67,7 +67,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { final Map childExecutors = new IdentityHashMap(); - DefaultChannelPipeline(AbstractChannel channel) { + public DefaultChannelPipeline(AbstractChannel channel) { if (channel == null) { throw new NullPointerException("channel"); } @@ -469,7 +469,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } } - private void callHandlerAdded(final AbstractChannelHandlerContext ctx) { + private void callHandlerAdded(final ChannelHandlerContext ctx) { if (ctx.channel().isRegistered() && !ctx.executor().inEventLoop()) { ctx.executor().execute(new Runnable() { @Override @@ -482,14 +482,13 @@ final class DefaultChannelPipeline implements ChannelPipeline { callHandlerAdded0(ctx); } - private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) { + private void callHandlerAdded0(final ChannelHandlerContext ctx) { try { - ctx.invokedThisChannelRead = false; ctx.handler().handlerAdded(ctx); } catch (Throwable t) { boolean removed = false; try { - remove(ctx); + remove((AbstractChannelHandlerContext) ctx); removed = true; } catch (Throwable t2) { if (logger.isWarnEnabled()) { @@ -1030,7 +1029,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.warn( "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " + - "It usually means the last handler in the pipeline did not handle the exception.", cause); + "It usually means the last handler in the pipeline did not handle the exception.", cause); } @Override @@ -1052,7 +1051,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { private static final String HEAD_NAME = generateName0(HeadContext.class); - final Unsafe unsafe; + protected final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true);