Do not suppress channelReadComplete() when a handler was just added
Related:
- 14d64d0966
Motivation:
The commit mentioned above introduced a regression where
channelReadComplete() event is swallowed by a handler which was added
dynamically.
Modifications:
Do not suppress channelReadComplete() if the current handler's
channelRead() method was not invoked at all, so that a just-added
handler does not suppress channelReadComplete().
Result:
Regression is gone, and channelReadComplete() is invoked when necessary.
This commit is contained in:
parent
14d64d0966
commit
c4483c25e4
@ -201,11 +201,11 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
|
|||||||
ByteBuf bytes = buf.readBytes(readable);
|
ByteBuf bytes = buf.readBytes(readable);
|
||||||
buf.release();
|
buf.release();
|
||||||
ctx.fireChannelRead(bytes);
|
ctx.fireChannelRead(bytes);
|
||||||
|
ctx.fireChannelReadComplete();
|
||||||
} else {
|
} else {
|
||||||
buf.release();
|
buf.release();
|
||||||
}
|
}
|
||||||
cumulation = null;
|
cumulation = null;
|
||||||
ctx.fireChannelReadComplete();
|
|
||||||
handlerRemoved0(ctx);
|
handlerRemoved0(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -214,13 +214,22 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
|
|||||||
private final DefaultChannelPipeline pipeline;
|
private final DefaultChannelPipeline pipeline;
|
||||||
private final String name;
|
private final String name;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set when the {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)} of
|
||||||
|
* this context's handler is invoked.
|
||||||
|
* Cleared when a user calls {@link #fireChannelReadComplete()} on this context.
|
||||||
|
*
|
||||||
|
* See {@link #fireChannelReadComplete()} to understand how this flag is used.
|
||||||
|
*/
|
||||||
|
boolean invokedThisChannelRead;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set when a user calls {@link #fireChannelRead(Object)} on this context.
|
* Set when a user calls {@link #fireChannelRead(Object)} on this context.
|
||||||
* Cleared when a user calls {@link #fireChannelReadComplete()} on this context.
|
* Cleared when a user calls {@link #fireChannelReadComplete()} on this context.
|
||||||
*
|
*
|
||||||
* See {@link #fireChannelReadComplete()} to understand how this flag is used.
|
* See {@link #fireChannelReadComplete()} to understand how this flag is used.
|
||||||
*/
|
*/
|
||||||
private volatile boolean firedChannelRead;
|
private volatile boolean invokedNextChannelRead;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set when a user calls {@link #read()} on this context.
|
* Set when a user calls {@link #read()} on this context.
|
||||||
@ -228,7 +237,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
|
|||||||
*
|
*
|
||||||
* See {@link #fireChannelReadComplete()} to understand how this flag is used.
|
* See {@link #fireChannelReadComplete()} to understand how this flag is used.
|
||||||
*/
|
*/
|
||||||
private volatile boolean invokedRead;
|
private volatile boolean invokedPrevRead;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@code true} if and only if this context has been removed from the pipeline.
|
* {@code true} if and only if this context has been removed from the pipeline.
|
||||||
@ -376,7 +385,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
|
|||||||
public ChannelHandlerContext fireChannelRead(Object msg) {
|
public ChannelHandlerContext fireChannelRead(Object msg) {
|
||||||
AbstractChannelHandlerContext next = findContextInbound();
|
AbstractChannelHandlerContext next = findContextInbound();
|
||||||
ReferenceCountUtil.touch(msg, next);
|
ReferenceCountUtil.touch(msg, next);
|
||||||
firedChannelRead = true;
|
invokedNextChannelRead = true;
|
||||||
next.invoker().invokeChannelRead(next, msg);
|
next.invoker().invokeChannelRead(next, msg);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
@ -389,11 +398,16 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
|
|||||||
*
|
*
|
||||||
* This is pretty common for the handlers that transform multiple messages into one message,
|
* This is pretty common for the handlers that transform multiple messages into one message,
|
||||||
* such as byte-to-message decoder and message aggregators.
|
* such as byte-to-message decoder and message aggregators.
|
||||||
|
*
|
||||||
|
* Only one exception is when nobody invoked the channelRead() method of this context's handler.
|
||||||
|
* It means the handler has been added later dynamically.
|
||||||
*/
|
*/
|
||||||
if (firedChannelRead) {
|
if (invokedNextChannelRead || // The handler of this context produced a message, or
|
||||||
// The handler of this context produced a message, so we are OK to trigger this event.
|
!invokedThisChannelRead) { // it is not required to produce a message to trigger the event.
|
||||||
firedChannelRead = false;
|
|
||||||
invokedRead = false;
|
invokedNextChannelRead = false;
|
||||||
|
invokedPrevRead = false;
|
||||||
|
|
||||||
AbstractChannelHandlerContext next = findContextInbound();
|
AbstractChannelHandlerContext next = findContextInbound();
|
||||||
next.invoker().invokeChannelReadComplete(next);
|
next.invoker().invokeChannelReadComplete(next);
|
||||||
return this;
|
return this;
|
||||||
@ -410,7 +424,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
|
|||||||
* Why? Because otherwise the next handler will not receive {@code channelRead()} nor
|
* Why? Because otherwise the next handler will not receive {@code channelRead()} nor
|
||||||
* {@code channelReadComplete()} event at all for the {@link #read()} operation it issued.
|
* {@code channelReadComplete()} event at all for the {@link #read()} operation it issued.
|
||||||
*/
|
*/
|
||||||
if (invokedRead && !channel().config().isAutoRead()) {
|
if (invokedPrevRead && !channel().config().isAutoRead()) {
|
||||||
/**
|
/**
|
||||||
* The next (or upstream) handler invoked {@link #read()}, but it didn't get any
|
* The next (or upstream) handler invoked {@link #read()}, but it didn't get any
|
||||||
* {@code channelRead()} event. We should read once more, so that the handler of the current
|
* {@code channelRead()} event. We should read once more, so that the handler of the current
|
||||||
@ -418,7 +432,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
|
|||||||
*/
|
*/
|
||||||
read();
|
read();
|
||||||
} else {
|
} else {
|
||||||
invokedRead = false;
|
invokedPrevRead = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
return this;
|
return this;
|
||||||
@ -508,7 +522,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
|
|||||||
@Override
|
@Override
|
||||||
public ChannelHandlerContext read() {
|
public ChannelHandlerContext read() {
|
||||||
AbstractChannelHandlerContext next = findContextOutbound();
|
AbstractChannelHandlerContext next = findContextOutbound();
|
||||||
invokedRead = true;
|
invokedPrevRead = true;
|
||||||
next.invoker().invokeRead(next);
|
next.invoker().invokeRead(next);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
@ -80,6 +80,7 @@ public final class ChannelHandlerInvokerUtil {
|
|||||||
|
|
||||||
public static void invokeChannelReadNow(final ChannelHandlerContext ctx, final Object msg) {
|
public static void invokeChannelReadNow(final ChannelHandlerContext ctx, final Object msg) {
|
||||||
try {
|
try {
|
||||||
|
((AbstractChannelHandlerContext) ctx).invokedThisChannelRead = true;
|
||||||
ctx.handler().channelRead(ctx, msg);
|
ctx.handler().channelRead(ctx, msg);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
notifyHandlerException(ctx, t);
|
notifyHandlerException(ctx, t);
|
||||||
|
@ -589,6 +589,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
|
|
||||||
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
|
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
|
||||||
try {
|
try {
|
||||||
|
ctx.invokedThisChannelRead = false;
|
||||||
ctx.handler().handlerAdded(ctx);
|
ctx.handler().handlerAdded(ctx);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
boolean removed = false;
|
boolean removed = false;
|
||||||
|
Loading…
Reference in New Issue
Block a user