Do not suppress channelReadComplete() when a handler was just added
Related:
- 8b2fb2b985
Motivation:
The commit mentioned above introduced a regression where
channelReadComplete() event is swallowed by a handler which was added
dynamically.
Modifications:
Do not suppress channelReadComplete() if the current handler's
channelRead() method was not invoked at all, so that a just-added
handler does not suppress channelReadComplete().
Result:
Regression is gone, and channelReadComplete() is invoked when necessary.
This commit is contained in:
parent
585ce1593f
commit
a41b46ff43
@ -200,11 +200,11 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
||||
ByteBuf bytes = buf.readBytes(readable);
|
||||
buf.release();
|
||||
ctx.fireChannelRead(bytes);
|
||||
ctx.fireChannelReadComplete();
|
||||
} else {
|
||||
buf.release();
|
||||
}
|
||||
cumulation = null;
|
||||
ctx.fireChannelReadComplete();
|
||||
handlerRemoved0(ctx);
|
||||
}
|
||||
|
||||
|
@ -40,13 +40,22 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
private final DefaultChannelPipeline pipeline;
|
||||
private final String name;
|
||||
|
||||
/**
|
||||
* Set when the {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)} of
|
||||
* this context's handler is invoked.
|
||||
* Cleared when a user calls {@link #fireChannelReadComplete()} on this context.
|
||||
*
|
||||
* See {@link #fireChannelReadComplete()} to understand how this flag is used.
|
||||
*/
|
||||
boolean invokedThisChannelRead;
|
||||
|
||||
/**
|
||||
* Set when a user calls {@link #fireChannelRead(Object)} on this context.
|
||||
* Cleared when a user calls {@link #fireChannelReadComplete()} on this context.
|
||||
*
|
||||
* See {@link #fireChannelReadComplete()} to understand how this flag is used.
|
||||
*/
|
||||
private volatile boolean firedChannelRead;
|
||||
private volatile boolean invokedNextChannelRead;
|
||||
|
||||
/**
|
||||
* Set when a user calls {@link #read()} on this context.
|
||||
@ -54,7 +63,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
*
|
||||
* See {@link #fireChannelReadComplete()} to understand how this flag is used.
|
||||
*/
|
||||
private volatile boolean invokedRead;
|
||||
private volatile boolean invokedPrevRead;
|
||||
|
||||
/**
|
||||
* {@code true} if and only if this context has been removed from the pipeline.
|
||||
@ -308,7 +317,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
throw new NullPointerException("msg");
|
||||
}
|
||||
|
||||
firedChannelRead = true;
|
||||
invokedNextChannelRead = true;
|
||||
final AbstractChannelHandlerContext next = findContextInbound();
|
||||
EventExecutor executor = next.executor();
|
||||
if (executor.inEventLoop()) {
|
||||
@ -325,6 +334,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
}
|
||||
|
||||
private void invokeChannelRead(Object msg) {
|
||||
invokedThisChannelRead = true;
|
||||
try {
|
||||
((ChannelInboundHandler) handler()).channelRead(this, msg);
|
||||
} catch (Throwable t) {
|
||||
@ -340,11 +350,15 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
*
|
||||
* This is pretty common for the handlers that transform multiple messages into one message,
|
||||
* such as byte-to-message decoder and message aggregators.
|
||||
*
|
||||
* Only one exception is when nobody invoked the channelRead() method of this context's handler.
|
||||
* It means the handler has been added later dynamically.
|
||||
*/
|
||||
if (firedChannelRead) {
|
||||
// The handler of this context produced a message, so we are OK to trigger this event.
|
||||
firedChannelRead = false;
|
||||
invokedRead = false;
|
||||
if (invokedNextChannelRead || // The handler of this context produced a message, or
|
||||
!invokedThisChannelRead) { // it is not required to produce a message to trigger the event.
|
||||
|
||||
invokedNextChannelRead = false;
|
||||
invokedPrevRead = false;
|
||||
|
||||
final AbstractChannelHandlerContext next = findContextInbound();
|
||||
EventExecutor executor = next.executor();
|
||||
@ -376,7 +390,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
* Why? Because otherwise the next handler will not receive {@code channelRead()} nor
|
||||
* {@code channelReadComplete()} event at all for the {@link #read()} operation it issued.
|
||||
*/
|
||||
if (invokedRead && !channel().config().isAutoRead()) {
|
||||
if (invokedPrevRead && !channel().config().isAutoRead()) {
|
||||
/**
|
||||
* The next (or upstream) handler invoked {@link #read()}, but it didn't get any
|
||||
* {@code channelRead()} event. We should read once more, so that the handler of the current
|
||||
@ -384,7 +398,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
*/
|
||||
read();
|
||||
} else {
|
||||
invokedRead = false;
|
||||
invokedPrevRead = false;
|
||||
}
|
||||
|
||||
return this;
|
||||
@ -637,7 +651,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
|
||||
@Override
|
||||
public ChannelHandlerContext read() {
|
||||
invokedRead = true;
|
||||
invokedPrevRead = true;
|
||||
final AbstractChannelHandlerContext next = findContextOutbound();
|
||||
EventExecutor executor = next.executor();
|
||||
if (executor.inEventLoop()) {
|
||||
|
@ -67,7 +67,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
final Map<EventExecutorGroup, EventExecutor> childExecutors =
|
||||
new IdentityHashMap<EventExecutorGroup, EventExecutor>();
|
||||
|
||||
public DefaultChannelPipeline(AbstractChannel channel) {
|
||||
DefaultChannelPipeline(AbstractChannel channel) {
|
||||
if (channel == null) {
|
||||
throw new NullPointerException("channel");
|
||||
}
|
||||
@ -469,7 +469,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
}
|
||||
}
|
||||
|
||||
private void callHandlerAdded(final ChannelHandlerContext ctx) {
|
||||
private void callHandlerAdded(final AbstractChannelHandlerContext ctx) {
|
||||
if (ctx.channel().isRegistered() && !ctx.executor().inEventLoop()) {
|
||||
ctx.executor().execute(new Runnable() {
|
||||
@Override
|
||||
@ -482,13 +482,14 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
callHandlerAdded0(ctx);
|
||||
}
|
||||
|
||||
private void callHandlerAdded0(final ChannelHandlerContext ctx) {
|
||||
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
|
||||
try {
|
||||
ctx.invokedThisChannelRead = false;
|
||||
ctx.handler().handlerAdded(ctx);
|
||||
} catch (Throwable t) {
|
||||
boolean removed = false;
|
||||
try {
|
||||
remove((AbstractChannelHandlerContext) ctx);
|
||||
remove(ctx);
|
||||
removed = true;
|
||||
} catch (Throwable t2) {
|
||||
if (logger.isWarnEnabled()) {
|
||||
@ -1029,7 +1030,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
logger.warn(
|
||||
"An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
|
||||
"It usually means the last handler in the pipeline did not handle the exception.", cause);
|
||||
"It usually means the last handler in the pipeline did not handle the exception.", cause);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -1051,7 +1052,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
|
||||
private static final String HEAD_NAME = generateName0(HeadContext.class);
|
||||
|
||||
protected final Unsafe unsafe;
|
||||
final Unsafe unsafe;
|
||||
|
||||
HeadContext(DefaultChannelPipeline pipeline) {
|
||||
super(pipeline, null, HEAD_NAME, false, true);
|
||||
|
Loading…
Reference in New Issue
Block a user