Do not suppress channelReadComplete() when a handler was just added

Related:
- 27a25e29f7

Motivation:

The commit mentioned above introduced a regression where
channelReadComplete() event is swallowed by a handler which was added
dynamically.

Modifications:

Do not suppress channelReadComplete() if the current handler's
channelRead() method was not invoked at all, so that a just-added
handler does not suppress channelReadComplete().

Result:

Regression is gone, and channelReadComplete() is invoked when necessary.
This commit is contained in:
Trustin Lee 2015-02-07 22:49:52 +09:00
parent ed98ce27e1
commit 720faa4df1
4 changed files with 27 additions and 11 deletions

View File

@ -200,11 +200,11 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
ByteBuf bytes = buf.readBytes(readable); ByteBuf bytes = buf.readBytes(readable);
buf.release(); buf.release();
ctx.fireChannelRead(bytes); ctx.fireChannelRead(bytes);
ctx.fireChannelReadComplete();
} else { } else {
buf.release(); buf.release();
} }
cumulation = null; cumulation = null;
ctx.fireChannelReadComplete();
handlerRemoved0(ctx); handlerRemoved0(ctx);
} }

View File

@ -36,13 +36,22 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
private final DefaultChannelPipeline pipeline; private final DefaultChannelPipeline pipeline;
private final String name; private final String name;
/**
* Set when the {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)} of
* this context's handler is invoked.
* Cleared when a user calls {@link #fireChannelReadComplete()} on this context.
*
* See {@link #fireChannelReadComplete()} to understand how this flag is used.
*/
boolean invokedThisChannelRead;
/** /**
* Set when a user calls {@link #fireChannelRead(Object)} on this context. * Set when a user calls {@link #fireChannelRead(Object)} on this context.
* Cleared when a user calls {@link #fireChannelReadComplete()} on this context. * Cleared when a user calls {@link #fireChannelReadComplete()} on this context.
* *
* See {@link #fireChannelReadComplete()} to understand how this flag is used. * See {@link #fireChannelReadComplete()} to understand how this flag is used.
*/ */
private volatile boolean firedChannelRead; private volatile boolean invokedNextChannelRead;
/** /**
* Set when a user calls {@link #read()} on this context. * Set when a user calls {@link #read()} on this context.
@ -50,7 +59,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
* *
* See {@link #fireChannelReadComplete()} to understand how this flag is used. * See {@link #fireChannelReadComplete()} to understand how this flag is used.
*/ */
private volatile boolean invokedRead; private volatile boolean invokedPrevRead;
/** /**
* {@code true} if and only if this context has been removed from the pipeline. * {@code true} if and only if this context has been removed from the pipeline.
@ -174,7 +183,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
public ChannelHandlerContext fireChannelRead(Object msg) { public ChannelHandlerContext fireChannelRead(Object msg) {
AbstractChannelHandlerContext next = findContextInbound(); AbstractChannelHandlerContext next = findContextInbound();
ReferenceCountUtil.touch(msg, next); ReferenceCountUtil.touch(msg, next);
firedChannelRead = true; invokedNextChannelRead = true;
next.invoker().invokeChannelRead(next, msg); next.invoker().invokeChannelRead(next, msg);
return this; return this;
} }
@ -187,11 +196,16 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
* *
* This is pretty common for the handlers that transform multiple messages into one message, * This is pretty common for the handlers that transform multiple messages into one message,
* such as byte-to-message decoder and message aggregators. * such as byte-to-message decoder and message aggregators.
*
* Only one exception is when nobody invoked the channelRead() method of this context's handler.
* It means the handler has been added later dynamically.
*/ */
if (firedChannelRead) { if (invokedNextChannelRead || // The handler of this context produced a message, or
// The handler of this context produced a message, so we are OK to trigger this event. !invokedThisChannelRead) { // it is not required to produce a message to trigger the event.
firedChannelRead = false;
invokedRead = false; invokedNextChannelRead = false;
invokedPrevRead = false;
AbstractChannelHandlerContext next = findContextInbound(); AbstractChannelHandlerContext next = findContextInbound();
next.invoker().invokeChannelReadComplete(next); next.invoker().invokeChannelReadComplete(next);
return this; return this;
@ -208,7 +222,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
* Why? Because otherwise the next handler will not receive {@code channelRead()} nor * Why? Because otherwise the next handler will not receive {@code channelRead()} nor
* {@code channelReadComplete()} event at all for the {@link #read()} operation it issued. * {@code channelReadComplete()} event at all for the {@link #read()} operation it issued.
*/ */
if (invokedRead && !channel().config().isAutoRead()) { if (invokedPrevRead && !channel().config().isAutoRead()) {
/** /**
* The next (or upstream) handler invoked {@link #read()}, but it didn't get any * The next (or upstream) handler invoked {@link #read()}, but it didn't get any
* {@code channelRead()} event. We should read once more, so that the handler of the current * {@code channelRead()} event. We should read once more, so that the handler of the current
@ -216,7 +230,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
*/ */
read(); read();
} else { } else {
invokedRead = false; invokedPrevRead = false;
} }
return this; return this;
@ -306,7 +320,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
@Override @Override
public ChannelHandlerContext read() { public ChannelHandlerContext read() {
AbstractChannelHandlerContext next = findContextOutbound(); AbstractChannelHandlerContext next = findContextOutbound();
invokedRead = true; invokedPrevRead = true;
next.invoker().invokeRead(next); next.invoker().invokeRead(next);
return this; return this;
} }

View File

@ -80,6 +80,7 @@ public final class ChannelHandlerInvokerUtil {
public static void invokeChannelReadNow(final ChannelHandlerContext ctx, final Object msg) { public static void invokeChannelReadNow(final ChannelHandlerContext ctx, final Object msg) {
try { try {
((AbstractChannelHandlerContext) ctx).invokedThisChannelRead = true;
((ChannelInboundHandler) ctx.handler()).channelRead(ctx, msg); ((ChannelInboundHandler) ctx.handler()).channelRead(ctx, msg);
} catch (Throwable t) { } catch (Throwable t) {
notifyHandlerException(ctx, t); notifyHandlerException(ctx, t);

View File

@ -584,6 +584,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) { private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try { try {
ctx.invokedThisChannelRead = false;
ctx.handler().handlerAdded(ctx); ctx.handler().handlerAdded(ctx);
} catch (Throwable t) { } catch (Throwable t) {
boolean removed = false; boolean removed = false;