Revert "Do not suppress channelReadComplete() when a handler was just added"

This reverts commit a41b46ff43.
This commit is contained in:
Norman Maurer 2015-04-14 14:16:10 +02:00
parent 2e1325aa9d
commit f35158f402
3 changed files with 17 additions and 32 deletions

View File

@ -200,11 +200,11 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
ByteBuf bytes = buf.readBytes(readable);
buf.release();
ctx.fireChannelRead(bytes);
ctx.fireChannelReadComplete();
} else {
buf.release();
}
cumulation = null;
ctx.fireChannelReadComplete();
handlerRemoved0(ctx);
}

View File

@ -40,22 +40,13 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
private final DefaultChannelPipeline pipeline;
private final String name;
/**
* Set when the {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)} of
* this context's handler is invoked.
* Cleared when a user calls {@link #fireChannelReadComplete()} on this context.
*
* See {@link #fireChannelReadComplete()} to understand how this flag is used.
*/
boolean invokedThisChannelRead;
/**
* Set when a user calls {@link #fireChannelRead(Object)} on this context.
* Cleared when a user calls {@link #fireChannelReadComplete()} on this context.
*
* See {@link #fireChannelReadComplete()} to understand how this flag is used.
*/
private volatile boolean invokedNextChannelRead;
private volatile boolean firedChannelRead;
/**
* Set when a user calls {@link #read()} on this context.
@ -63,7 +54,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
*
* See {@link #fireChannelReadComplete()} to understand how this flag is used.
*/
private volatile boolean invokedPrevRead;
private volatile boolean invokedRead;
/**
* {@code true} if and only if this context has been removed from the pipeline.
@ -317,7 +308,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
throw new NullPointerException("msg");
}
invokedNextChannelRead = true;
firedChannelRead = true;
final AbstractChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
@ -334,7 +325,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
}
private void invokeChannelRead(Object msg) {
invokedThisChannelRead = true;
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
@ -350,15 +340,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
*
* This is pretty common for the handlers that transform multiple messages into one message,
* such as byte-to-message decoder and message aggregators.
*
* Only one exception is when nobody invoked the channelRead() method of this context's handler.
* It means the handler has been added later dynamically.
*/
if (invokedNextChannelRead || // The handler of this context produced a message, or
!invokedThisChannelRead) { // it is not required to produce a message to trigger the event.
invokedNextChannelRead = false;
invokedPrevRead = false;
if (firedChannelRead) {
// The handler of this context produced a message, so we are OK to trigger this event.
firedChannelRead = false;
invokedRead = false;
final AbstractChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
@ -390,7 +376,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
* Why? Because otherwise the next handler will not receive {@code channelRead()} nor
* {@code channelReadComplete()} event at all for the {@link #read()} operation it issued.
*/
if (invokedPrevRead && !channel().config().isAutoRead()) {
if (invokedRead && !channel().config().isAutoRead()) {
/**
* The next (or upstream) handler invoked {@link #read()}, but it didn't get any
* {@code channelRead()} event. We should read once more, so that the handler of the current
@ -398,7 +384,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
*/
read();
} else {
invokedPrevRead = false;
invokedRead = false;
}
return this;
@ -651,7 +637,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
@Override
public ChannelHandlerContext read() {
invokedPrevRead = true;
invokedRead = true;
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {

View File

@ -67,7 +67,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
final Map<EventExecutorGroup, EventExecutor> childExecutors =
new IdentityHashMap<EventExecutorGroup, EventExecutor>();
DefaultChannelPipeline(AbstractChannel channel) {
public DefaultChannelPipeline(AbstractChannel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
@ -469,7 +469,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
}
}
private void callHandlerAdded(final AbstractChannelHandlerContext ctx) {
private void callHandlerAdded(final ChannelHandlerContext ctx) {
if (ctx.channel().isRegistered() && !ctx.executor().inEventLoop()) {
ctx.executor().execute(new Runnable() {
@Override
@ -482,14 +482,13 @@ final class DefaultChannelPipeline implements ChannelPipeline {
callHandlerAdded0(ctx);
}
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
private void callHandlerAdded0(final ChannelHandlerContext ctx) {
try {
ctx.invokedThisChannelRead = false;
ctx.handler().handlerAdded(ctx);
} catch (Throwable t) {
boolean removed = false;
try {
remove(ctx);
remove((AbstractChannelHandlerContext) ctx);
removed = true;
} catch (Throwable t2) {
if (logger.isWarnEnabled()) {
@ -1030,7 +1029,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.warn(
"An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
"It usually means the last handler in the pipeline did not handle the exception.", cause);
"It usually means the last handler in the pipeline did not handle the exception.", cause);
}
@Override
@ -1052,7 +1051,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
private static final String HEAD_NAME = generateName0(HeadContext.class);
final Unsafe unsafe;
protected final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);