diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java index a206542333..97ecb4638e 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java @@ -200,11 +200,11 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter ByteBuf bytes = buf.readBytes(readable); buf.release(); ctx.fireChannelRead(bytes); - ctx.fireChannelReadComplete(); } else { buf.release(); } cumulation = null; + ctx.fireChannelReadComplete(); handlerRemoved0(ctx); } diff --git a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java index 0b2a66e4eb..cd7d286284 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java @@ -36,22 +36,13 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R private final DefaultChannelPipeline pipeline; private final String name; - /** - * Set when the {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)} of - * this context's handler is invoked. - * Cleared when a user calls {@link #fireChannelReadComplete()} on this context. - * - * See {@link #fireChannelReadComplete()} to understand how this flag is used. - */ - boolean invokedThisChannelRead; - /** * Set when a user calls {@link #fireChannelRead(Object)} on this context. * Cleared when a user calls {@link #fireChannelReadComplete()} on this context. * * See {@link #fireChannelReadComplete()} to understand how this flag is used. */ - private volatile boolean invokedNextChannelRead; + private volatile boolean firedChannelRead; /** * Set when a user calls {@link #read()} on this context. @@ -59,7 +50,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R * * See {@link #fireChannelReadComplete()} to understand how this flag is used. */ - private volatile boolean invokedPrevRead; + private volatile boolean invokedRead; /** * {@code true} if and only if this context has been removed from the pipeline. @@ -183,7 +174,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R public ChannelHandlerContext fireChannelRead(Object msg) { AbstractChannelHandlerContext next = findContextInbound(); ReferenceCountUtil.touch(msg, next); - invokedNextChannelRead = true; + firedChannelRead = true; next.invoker().invokeChannelRead(next, msg); return this; } @@ -196,16 +187,11 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R * * This is pretty common for the handlers that transform multiple messages into one message, * such as byte-to-message decoder and message aggregators. - * - * Only one exception is when nobody invoked the channelRead() method of this context's handler. - * It means the handler has been added later dynamically. */ - if (invokedNextChannelRead || // The handler of this context produced a message, or - !invokedThisChannelRead) { // it is not required to produce a message to trigger the event. - - invokedNextChannelRead = false; - invokedPrevRead = false; - + if (firedChannelRead) { + // The handler of this context produced a message, so we are OK to trigger this event. + firedChannelRead = false; + invokedRead = false; AbstractChannelHandlerContext next = findContextInbound(); next.invoker().invokeChannelReadComplete(next); return this; @@ -222,7 +208,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R * Why? Because otherwise the next handler will not receive {@code channelRead()} nor * {@code channelReadComplete()} event at all for the {@link #read()} operation it issued. */ - if (invokedPrevRead && !channel().config().isAutoRead()) { + if (invokedRead && !channel().config().isAutoRead()) { /** * The next (or upstream) handler invoked {@link #read()}, but it didn't get any * {@code channelRead()} event. We should read once more, so that the handler of the current @@ -230,7 +216,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R */ read(); } else { - invokedPrevRead = false; + invokedRead = false; } return this; @@ -320,7 +306,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R @Override public ChannelHandlerContext read() { AbstractChannelHandlerContext next = findContextOutbound(); - invokedPrevRead = true; + invokedRead = true; next.invoker().invokeRead(next); return this; } diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerInvokerUtil.java b/transport/src/main/java/io/netty/channel/ChannelHandlerInvokerUtil.java index 265e837775..86e3e269b4 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerInvokerUtil.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerInvokerUtil.java @@ -80,7 +80,6 @@ public final class ChannelHandlerInvokerUtil { public static void invokeChannelReadNow(final ChannelHandlerContext ctx, final Object msg) { try { - ((AbstractChannelHandlerContext) ctx).invokedThisChannelRead = true; ((ChannelInboundHandler) ctx.handler()).channelRead(ctx, msg); } catch (Throwable t) { notifyHandlerException(ctx, t); diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index dc43390e1c..cd665d1e9b 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -584,7 +584,6 @@ final class DefaultChannelPipeline implements ChannelPipeline { private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) { try { - ctx.invokedThisChannelRead = false; ctx.handler().handlerAdded(ctx); } catch (Throwable t) { boolean removed = false;