From 5d5c60bdd30efbf4aa311f09b2fc788d629910e2 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Tue, 23 Apr 2013 13:35:32 +0900 Subject: [PATCH] Fix a bug where fireInboundBufferUpdated() and flush() swallow the event too early - Fixes #1292 - Replace DefaultChannelPipeline.inbound/outboundShutdown flag with per-context flags - Update the flags in free() / freeInbound() / freeOutbound() for clarity --- .../channel/DefaultChannelHandlerContext.java | 80 +++++++++---------- .../netty/channel/DefaultChannelPipeline.java | 19 ----- 2 files changed, 40 insertions(+), 59 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index d18cbab6b6..a9302406d6 100755 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -40,6 +40,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements private static final int FLAG_REMOVED = 1; private static final int FLAG_FREED = 2; + private static final int FLAG_FREED_INBOUND = 4; + private static final int FLAG_FREED_OUTBOUND = 8; volatile DefaultChannelHandlerContext next; volatile DefaultChannelHandlerContext prev; @@ -431,10 +433,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements private void freeHandlerBuffersAfterRemoval() { int flags = this.flags; if ((flags & FLAG_REMOVED) != 0 && (flags & FLAG_FREED) == 0) { // Removed, but not freed yet - this.flags |= FLAG_FREED; - final ChannelHandler handler = handler(); - try { if (handler instanceof ChannelInboundHandler) { try { @@ -457,12 +456,15 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } private void free() { + flags |= FLAG_FREED; freeInbound(); freeOutbound(); } private void freeInbound() { // Release the bridge feeder + flags |= FLAG_FREED_INBOUND; + NextBridgeFeeder feeder; feeder = nextInBridgeFeeder; if (feeder != null) { @@ -482,6 +484,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements private void freeOutbound() { // Release the bridge feeder + flags |= FLAG_FREED_OUTBOUND; + NextBridgeFeeder feeder = nextOutBridgeFeeder; if (feeder != null) { feeder.release(); @@ -895,25 +899,21 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } private void fireInboundBufferUpdated0(final DefaultChannelHandlerContext next) { - if (!pipeline.isInboundShutdown()) { - feedNextInBridge(); - // This comparison is safe because this method is always executed from the executor. - if (next.executor == executor) { - next.invokeInboundBufferUpdated(); - } else { - Runnable task = next.invokeInboundBufferUpdatedTask; - if (task == null) { - next.invokeInboundBufferUpdatedTask = task = new Runnable() { - @Override - public void run() { - if (!pipeline.isInboundShutdown()) { - next.invokeInboundBufferUpdated(); - } - } - }; - } - next.executor().execute(task); + feedNextInBridge(); + // This comparison is safe because this method is always executed from the executor. + if (next.executor == executor) { + next.invokeInboundBufferUpdated(); + } else { + Runnable task = next.invokeInboundBufferUpdatedTask; + if (task == null) { + next.invokeInboundBufferUpdatedTask = task = new Runnable() { + @Override + public void run() { + next.invokeInboundBufferUpdated(); + } + }; } + next.executor().execute(task); } } @@ -925,6 +925,10 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } private void invokeInboundBufferUpdated() { + if ((flags & FLAG_FREED_INBOUND) != 0) { + return; + } + ChannelStateHandler handler = (ChannelStateHandler) handler(); if (handler instanceof ChannelInboundHandler) { for (;;) { @@ -937,16 +941,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } catch (Throwable t) { notifyHandlerException(t); } finally { - if ((flags & FLAG_FREED) == 0) { - if (handler instanceof ChannelInboundByteHandler && !pipeline.isInboundShutdown()) { - try { - ((ChannelInboundByteHandler) handler).discardInboundReadBytes(this); - } catch (Throwable t) { - notifyHandlerException(t); - } + if (handler instanceof ChannelInboundByteHandler && (flags & FLAG_FREED_INBOUND) == 0) { + try { + ((ChannelInboundByteHandler) handler).discardInboundReadBytes(this); + } catch (Throwable t) { + notifyHandlerException(t); } - freeHandlerBuffersAfterRemoval(); } + freeHandlerBuffersAfterRemoval(); } } } else { @@ -1261,11 +1263,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } private void invokePrevFlush(ChannelPromise promise, Thread currentThread, DefaultChannelHandlerContext prev) { - if (pipeline.isOutboundShutdown()) { - promise.setFailure(new ChannelPipelineException( - "Unable to flush as outbound buffer of next handler was freed already")); - return; - } feedNextOutBridge(); prev.invokeFlush(promise, currentThread); } @@ -1294,8 +1291,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } private void invokeFlush0(ChannelPromise promise) { + if ((flags & FLAG_FREED_OUTBOUND) != 0) { + promise.setFailure(new ChannelPipelineException( + "Unable to flush as outbound buffer of next handler was freed already")); + return; + } + Channel channel = channel(); - if (!channel.isRegistered() && !channel.isActive()) { + if (!channel.isActive() && !channel.isRegistered()) { promise.setFailure(new ClosedChannelException()); return; } @@ -1310,7 +1313,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } catch (Throwable t) { notifyHandlerException(t); } finally { - if (handler instanceof ChannelOutboundByteHandler && !pipeline.isOutboundShutdown()) { + if (handler instanceof ChannelOutboundByteHandler && (flags & FLAG_FREED_OUTBOUND) == 0) { try { ((ChannelOutboundByteHandler) handler).discardOutboundReadBytes(this); } catch (Throwable t) { @@ -1432,7 +1435,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements return; } - if (pipeline.isOutboundShutdown()) { + if ((flags & FLAG_FREED_OUTBOUND) != 0) { promise.setFailure(new ChannelPipelineException( "Unable to write as outbound buffer of next handler was freed already")); return; @@ -1458,7 +1461,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements executor.execute(new Runnable() { @Override public void run() { - pipeline.shutdownInbound(); invokeFreeInboundBuffer0(); } }); @@ -1491,13 +1493,11 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements EventExecutor executor = executor(); if (next == null) { if (executor.inEventLoop()) { - pipeline.shutdownOutbound(); invokeFreeOutboundBuffer0(); } else { executor.execute(new Runnable() { @Override public void run() { - pipeline.shutdownOutbound(); invokeFreeOutboundBuffer0(); } }); diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 314dbff362..813c74a690 100755 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -69,9 +69,6 @@ final class DefaultChannelPipeline implements ChannelPipeline { final Map childExecutors = new IdentityHashMap(); - private boolean inboundShutdown; - private boolean outboundShutdown; - public DefaultChannelPipeline(Channel channel) { if (channel == null) { throw new NullPointerException("channel"); @@ -776,22 +773,6 @@ final class DefaultChannelPipeline implements ChannelPipeline { return tail.nextOutboundByteBuffer(); } - boolean isInboundShutdown() { - return inboundShutdown; - } - - boolean isOutboundShutdown() { - return outboundShutdown; - } - - void shutdownInbound() { - inboundShutdown = true; - } - - void shutdownOutbound() { - outboundShutdown = true; - } - @Override public ChannelPipeline fireChannelRegistered() { head.initHeadHandler();