Fix a bug where fireInboundBufferUpdated() and flush() swallow the event too early

- Fixes #1292
- Replace DefaultChannelPipeline.inbound/outboundShutdown flag with per-context flags
- Update the flags in free() / freeInbound() / freeOutbound() for clarity
This commit is contained in:
Trustin Lee 2013-04-23 13:35:32 +09:00
parent 8e2e22c270
commit 5d5c60bdd3
2 changed files with 40 additions and 59 deletions

View File

@ -40,6 +40,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
private static final int FLAG_REMOVED = 1; private static final int FLAG_REMOVED = 1;
private static final int FLAG_FREED = 2; private static final int FLAG_FREED = 2;
private static final int FLAG_FREED_INBOUND = 4;
private static final int FLAG_FREED_OUTBOUND = 8;
volatile DefaultChannelHandlerContext next; volatile DefaultChannelHandlerContext next;
volatile DefaultChannelHandlerContext prev; volatile DefaultChannelHandlerContext prev;
@ -431,10 +433,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
private void freeHandlerBuffersAfterRemoval() { private void freeHandlerBuffersAfterRemoval() {
int flags = this.flags; int flags = this.flags;
if ((flags & FLAG_REMOVED) != 0 && (flags & FLAG_FREED) == 0) { // Removed, but not freed yet if ((flags & FLAG_REMOVED) != 0 && (flags & FLAG_FREED) == 0) { // Removed, but not freed yet
this.flags |= FLAG_FREED;
final ChannelHandler handler = handler(); final ChannelHandler handler = handler();
try { try {
if (handler instanceof ChannelInboundHandler) { if (handler instanceof ChannelInboundHandler) {
try { try {
@ -457,12 +456,15 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
} }
private void free() { private void free() {
flags |= FLAG_FREED;
freeInbound(); freeInbound();
freeOutbound(); freeOutbound();
} }
private void freeInbound() { private void freeInbound() {
// Release the bridge feeder // Release the bridge feeder
flags |= FLAG_FREED_INBOUND;
NextBridgeFeeder feeder; NextBridgeFeeder feeder;
feeder = nextInBridgeFeeder; feeder = nextInBridgeFeeder;
if (feeder != null) { if (feeder != null) {
@ -482,6 +484,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
private void freeOutbound() { private void freeOutbound() {
// Release the bridge feeder // Release the bridge feeder
flags |= FLAG_FREED_OUTBOUND;
NextBridgeFeeder feeder = nextOutBridgeFeeder; NextBridgeFeeder feeder = nextOutBridgeFeeder;
if (feeder != null) { if (feeder != null) {
feeder.release(); feeder.release();
@ -895,7 +899,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
} }
private void fireInboundBufferUpdated0(final DefaultChannelHandlerContext next) { private void fireInboundBufferUpdated0(final DefaultChannelHandlerContext next) {
if (!pipeline.isInboundShutdown()) {
feedNextInBridge(); feedNextInBridge();
// This comparison is safe because this method is always executed from the executor. // This comparison is safe because this method is always executed from the executor.
if (next.executor == executor) { if (next.executor == executor) {
@ -906,16 +909,13 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
next.invokeInboundBufferUpdatedTask = task = new Runnable() { next.invokeInboundBufferUpdatedTask = task = new Runnable() {
@Override @Override
public void run() { public void run() {
if (!pipeline.isInboundShutdown()) {
next.invokeInboundBufferUpdated(); next.invokeInboundBufferUpdated();
} }
}
}; };
} }
next.executor().execute(task); next.executor().execute(task);
} }
} }
}
private void feedNextInBridge() { private void feedNextInBridge() {
NextBridgeFeeder feeder = nextInBridgeFeeder; NextBridgeFeeder feeder = nextInBridgeFeeder;
@ -925,6 +925,10 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
} }
private void invokeInboundBufferUpdated() { private void invokeInboundBufferUpdated() {
if ((flags & FLAG_FREED_INBOUND) != 0) {
return;
}
ChannelStateHandler handler = (ChannelStateHandler) handler(); ChannelStateHandler handler = (ChannelStateHandler) handler();
if (handler instanceof ChannelInboundHandler) { if (handler instanceof ChannelInboundHandler) {
for (;;) { for (;;) {
@ -937,8 +941,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
} catch (Throwable t) { } catch (Throwable t) {
notifyHandlerException(t); notifyHandlerException(t);
} finally { } finally {
if ((flags & FLAG_FREED) == 0) { if (handler instanceof ChannelInboundByteHandler && (flags & FLAG_FREED_INBOUND) == 0) {
if (handler instanceof ChannelInboundByteHandler && !pipeline.isInboundShutdown()) {
try { try {
((ChannelInboundByteHandler) handler).discardInboundReadBytes(this); ((ChannelInboundByteHandler) handler).discardInboundReadBytes(this);
} catch (Throwable t) { } catch (Throwable t) {
@ -948,7 +951,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
freeHandlerBuffersAfterRemoval(); freeHandlerBuffersAfterRemoval();
} }
} }
}
} else { } else {
try { try {
handler.inboundBufferUpdated(this); handler.inboundBufferUpdated(this);
@ -1261,11 +1263,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
} }
private void invokePrevFlush(ChannelPromise promise, Thread currentThread, DefaultChannelHandlerContext prev) { private void invokePrevFlush(ChannelPromise promise, Thread currentThread, DefaultChannelHandlerContext prev) {
if (pipeline.isOutboundShutdown()) {
promise.setFailure(new ChannelPipelineException(
"Unable to flush as outbound buffer of next handler was freed already"));
return;
}
feedNextOutBridge(); feedNextOutBridge();
prev.invokeFlush(promise, currentThread); prev.invokeFlush(promise, currentThread);
} }
@ -1294,8 +1291,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
} }
private void invokeFlush0(ChannelPromise promise) { private void invokeFlush0(ChannelPromise promise) {
if ((flags & FLAG_FREED_OUTBOUND) != 0) {
promise.setFailure(new ChannelPipelineException(
"Unable to flush as outbound buffer of next handler was freed already"));
return;
}
Channel channel = channel(); Channel channel = channel();
if (!channel.isRegistered() && !channel.isActive()) { if (!channel.isActive() && !channel.isRegistered()) {
promise.setFailure(new ClosedChannelException()); promise.setFailure(new ClosedChannelException());
return; return;
} }
@ -1310,7 +1313,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
} catch (Throwable t) { } catch (Throwable t) {
notifyHandlerException(t); notifyHandlerException(t);
} finally { } finally {
if (handler instanceof ChannelOutboundByteHandler && !pipeline.isOutboundShutdown()) { if (handler instanceof ChannelOutboundByteHandler && (flags & FLAG_FREED_OUTBOUND) == 0) {
try { try {
((ChannelOutboundByteHandler) handler).discardOutboundReadBytes(this); ((ChannelOutboundByteHandler) handler).discardOutboundReadBytes(this);
} catch (Throwable t) { } catch (Throwable t) {
@ -1432,7 +1435,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
return; return;
} }
if (pipeline.isOutboundShutdown()) { if ((flags & FLAG_FREED_OUTBOUND) != 0) {
promise.setFailure(new ChannelPipelineException( promise.setFailure(new ChannelPipelineException(
"Unable to write as outbound buffer of next handler was freed already")); "Unable to write as outbound buffer of next handler was freed already"));
return; return;
@ -1458,7 +1461,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
executor.execute(new Runnable() { executor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
pipeline.shutdownInbound();
invokeFreeInboundBuffer0(); invokeFreeInboundBuffer0();
} }
}); });
@ -1491,13 +1493,11 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
EventExecutor executor = executor(); EventExecutor executor = executor();
if (next == null) { if (next == null) {
if (executor.inEventLoop()) { if (executor.inEventLoop()) {
pipeline.shutdownOutbound();
invokeFreeOutboundBuffer0(); invokeFreeOutboundBuffer0();
} else { } else {
executor.execute(new Runnable() { executor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
pipeline.shutdownOutbound();
invokeFreeOutboundBuffer0(); invokeFreeOutboundBuffer0();
} }
}); });

View File

@ -69,9 +69,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
final Map<EventExecutorGroup, EventExecutor> childExecutors = final Map<EventExecutorGroup, EventExecutor> childExecutors =
new IdentityHashMap<EventExecutorGroup, EventExecutor>(); new IdentityHashMap<EventExecutorGroup, EventExecutor>();
private boolean inboundShutdown;
private boolean outboundShutdown;
public DefaultChannelPipeline(Channel channel) { public DefaultChannelPipeline(Channel channel) {
if (channel == null) { if (channel == null) {
throw new NullPointerException("channel"); throw new NullPointerException("channel");
@ -776,22 +773,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
return tail.nextOutboundByteBuffer(); return tail.nextOutboundByteBuffer();
} }
boolean isInboundShutdown() {
return inboundShutdown;
}
boolean isOutboundShutdown() {
return outboundShutdown;
}
void shutdownInbound() {
inboundShutdown = true;
}
void shutdownOutbound() {
outboundShutdown = true;
}
@Override @Override
public ChannelPipeline fireChannelRegistered() { public ChannelPipeline fireChannelRegistered() {
head.initHeadHandler(); head.initHeadHandler();