Make sure ChannelHandler.handlerRemoved() is always invoked

- Fixes #1366: No elegant way to free non-in/outbound buffers held by a handler
- handlerRemoved() is now also invoked when a channel is deregistered, as well as when a handler is removed from a pipeline.
- A little bit of clean-up for readability
- Fix a bug in forwardBufferContentAndRemove() where the handler buffers are not freed (mainly because we were relying on channel.isRegistered() to determine if the handler has been removed from inside the handler.
- ChunkedWriteHandler.handlerRemoved() is unnecessary anymore because ChannelPipeline now always forwards the content of the buffer.
This commit is contained in:
Trustin Lee 2013-05-16 19:32:39 +09:00
parent 670d3f53a8
commit dc13b68632
3 changed files with 214 additions and 174 deletions

View File

@ -19,7 +19,6 @@ import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
@ -99,14 +98,6 @@ public class ChunkedWriteHandler
this.ctx = ctx;
}
// This method should not need any synchronization as the ChunkedWriteHandler will not receive any new events
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// Fail all promised that are queued. This is needed because otherwise we would never notify the
// ChannelFuture and the registered FutureListener. See #304
discard(ctx, new ChannelException(ChunkedWriteHandler.class.getSimpleName() + " removed from pipeline."));
}
private boolean isWritable() {
return pendingWrites.get() < maxPendingWrites;
}

View File

@ -60,8 +60,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
private final ByteBuf inByteBuf;
private MessageBuf<Object> outMsgBuf;
private ByteBuf outByteBuf;
private int flags;
private short callDepth;
private short flags;
// When the two handlers run in a different thread and they are next to each other,
// each other's buffers can be accessed at the same time resulting in a race condition.
@ -200,51 +200,58 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
void forwardBufferContentAndRemove(
final DefaultChannelHandlerContext forwardPrev, final DefaultChannelHandlerContext forwardNext) {
try {
boolean flush = false;
boolean inboundBufferUpdated = false;
if (hasOutboundByteBuffer() && outboundByteBuffer().isReadable()) {
ByteBuf forwardPrevBuf;
if (forwardPrev.hasOutboundByteBuffer()) {
forwardPrevBuf = forwardPrev.outboundByteBuffer();
} else {
forwardPrevBuf = forwardPrev.nextOutboundByteBuffer();
}
forwardPrevBuf.writeBytes(outboundByteBuffer());
flush = true;
}
if (hasOutboundMessageBuffer() && !outboundMessageBuffer().isEmpty()) {
MessageBuf<Object> forwardPrevBuf;
if (forwardPrev.hasOutboundMessageBuffer()) {
forwardPrevBuf = forwardPrev.outboundMessageBuffer();
} else {
forwardPrevBuf = forwardPrev.nextOutboundMessageBuffer();
}
if (outboundMessageBuffer().drainTo(forwardPrevBuf) > 0) {
if (!isOutboundFreed()) {
if (hasOutboundByteBuffer() && outboundByteBuffer().isReadable()) {
ByteBuf forwardPrevBuf;
if (forwardPrev.hasOutboundByteBuffer()) {
forwardPrevBuf = forwardPrev.outboundByteBuffer();
} else {
forwardPrevBuf = forwardPrev.nextOutboundByteBuffer();
}
forwardPrevBuf.writeBytes(outboundByteBuffer());
flush = true;
}
}
if (hasInboundByteBuffer() && inboundByteBuffer().isReadable()) {
ByteBuf forwardNextBuf;
if (forwardNext.hasInboundByteBuffer()) {
forwardNextBuf = forwardNext.inboundByteBuffer();
} else {
forwardNextBuf = forwardNext.nextInboundByteBuffer();
if (hasOutboundMessageBuffer() && !outboundMessageBuffer().isEmpty()) {
MessageBuf<Object> forwardPrevBuf;
if (forwardPrev.hasOutboundMessageBuffer()) {
forwardPrevBuf = forwardPrev.outboundMessageBuffer();
} else {
forwardPrevBuf = forwardPrev.nextOutboundMessageBuffer();
}
if (outboundMessageBuffer().drainTo(forwardPrevBuf) > 0) {
flush = true;
}
}
forwardNextBuf.writeBytes(inboundByteBuffer());
inboundBufferUpdated = true;
}
if (hasInboundMessageBuffer() && !inboundMessageBuffer().isEmpty()) {
MessageBuf<Object> forwardNextBuf;
if (forwardNext.hasInboundMessageBuffer()) {
forwardNextBuf = forwardNext.inboundMessageBuffer();
} else {
forwardNextBuf = forwardNext.nextInboundMessageBuffer();
}
if (inboundMessageBuffer().drainTo(forwardNextBuf) > 0) {
if (!isInboundFreed()) {
if (hasInboundByteBuffer() && inboundByteBuffer().isReadable()) {
ByteBuf forwardNextBuf;
if (forwardNext.hasInboundByteBuffer()) {
forwardNextBuf = forwardNext.inboundByteBuffer();
} else {
forwardNextBuf = forwardNext.nextInboundByteBuffer();
}
forwardNextBuf.writeBytes(inboundByteBuffer());
inboundBufferUpdated = true;
}
if (hasInboundMessageBuffer() && !inboundMessageBuffer().isEmpty()) {
MessageBuf<Object> forwardNextBuf;
if (forwardNext.hasInboundMessageBuffer()) {
forwardNextBuf = forwardNext.inboundMessageBuffer();
} else {
forwardNextBuf = forwardNext.nextInboundMessageBuffer();
}
if (inboundMessageBuffer().drainTo(forwardNextBuf) > 0) {
inboundBufferUpdated = true;
}
}
}
if (flush) {
EventExecutor executor = executor();
Thread currentThread = Thread.currentThread();
@ -260,6 +267,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
});
}
}
if (inboundBufferUpdated) {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
@ -275,11 +283,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
} finally {
flags |= FLAG_REMOVED;
// Free all buffers before completing removal.
if (!channel.isRegistered()) {
freeHandlerBuffersAfterRemoval();
}
freeAllIfRemoved();
}
}
@ -430,34 +434,105 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
return nextBufferHadEnoughRoom;
}
private void freeHandlerBuffersAfterRemoval() {
int flags = this.flags;
private boolean isInboundFreed() {
return (flags & FLAG_FREED_INBOUND) != 0;
}
private boolean isOutboundFreed() {
return (flags & FLAG_FREED_OUTBOUND) != 0;
}
private void freeAllIfRemoved() {
if (callDepth != 0) {
// Free only when the current context's handler is not being called.
return;
}
final int flags = this.flags;
if ((flags & FLAG_REMOVED) != 0 && (flags & FLAG_FREED) == 0) { // Removed, but not freed yet
try {
freeBuffer(inByteBuf);
freeBuffer(inMsgBuf);
freeBuffer(outByteBuf);
freeBuffer(outMsgBuf);
safeFree(inByteBuf);
safeFree(inMsgBuf);
safeFree(outByteBuf);
safeFree(outMsgBuf);
} finally {
flags |= FLAG_FREED | FLAG_FREED_INBOUND | FLAG_FREED_OUTBOUND;
this.flags = (short) (flags | FLAG_FREED | FLAG_FREED_INBOUND | FLAG_FREED_OUTBOUND);
freeNextInboundBridgeFeeder();
freeNextOutboundBridgeFeeder();
}
}
}
private void freeBuffer(Buf buf) {
if (buf != null) {
try {
buf.release();
} catch (Exception e) {
notifyHandlerException(e);
}
void freeInbound() {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
freeInbound0();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
freeInbound0();
}
});
}
}
private boolean isInboundFreed() {
return (flags & FLAG_FREED_INBOUND) != 0;
private void freeInbound0() {
try {
safeFree(inByteBuf);
safeFree(inMsgBuf);
} finally {
flags |= FLAG_FREED_INBOUND;
freeNextInboundBridgeFeeder();
}
if (next != null) {
DefaultChannelHandlerContext nextCtx = findContextInbound();
nextCtx.freeInbound();
} else {
// Freed all inbound buffers. Remove all handlers from the pipeline one by one from tail (exclusive)
// to head (inclusive) to trigger handlerRemoved(). If the removed handler has an outbound buffer, free it,
// too. Note that the tail handler is excluded because it's neither an outbound buffer and it doesn't
// do anything in handlerRemoved().
pipeline.tail.prev.freeOutboundAndRemove();
}
}
/** Invocation initiated by {@link #freeInbound0()} after freeing all inbound buffers. */
private void freeOutboundAndRemove() {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
freeOutboundAndRemove0();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
freeOutboundAndRemove0();
}
});
}
}
private void freeOutboundAndRemove0() {
if (handler instanceof ChannelOperationHandler) {
// Outbound handler - free the buffers / bridge feeders
try {
safeFree(outByteBuf);
safeFree(outMsgBuf);
} finally {
// We also OR FLAG_FREED because at this point we are sure both inbound and outbound were freed.
flags |= FLAG_FREED | FLAG_FREED_OUTBOUND;
freeNextOutboundBridgeFeeder();
}
}
DefaultChannelHandlerContext prev = this.prev;
if (prev != null) {
synchronized (pipeline) {
pipeline.remove0(this, false);
}
prev.freeOutboundAndRemove();
}
}
private void freeNextInboundBridgeFeeder() {
@ -479,10 +554,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
}
private boolean isOutboundFreed() {
return (flags & FLAG_FREED_OUTBOUND) != 0;
}
private void freeNextOutboundBridgeFeeder() {
// Release the bridge feeder
NextBridgeFeeder feeder = nextOutBridgeFeeder;
@ -500,6 +571,16 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
}
private static void safeFree(Buf buf) {
if (buf != null) {
try {
buf.release();
} catch (Exception e) {
logger.warn("Failed to release a handler buffer.", e);
}
}
}
@Override
public Channel channel() {
return channel;
@ -710,12 +791,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
private void invokeChannelRegistered() {
callDepth ++;
try {
((ChannelStateHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
} finally {
freeHandlerBuffersAfterRemoval();
callDepth --;
freeAllIfRemoved();
}
}
@ -737,10 +820,13 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
private void invokeChannelUnregistered() {
callDepth ++;
try {
((ChannelStateHandler) handler()).channelUnregistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
} finally {
callDepth --;
}
}
@ -762,12 +848,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
private void invokeChannelActive() {
callDepth ++;
try {
((ChannelStateHandler) handler()).channelActive(this);
} catch (Throwable t) {
notifyHandlerException(t);
} finally {
freeHandlerBuffersAfterRemoval();
callDepth --;
freeAllIfRemoved();
}
}
@ -789,12 +877,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
private void invokeChannelInactive() {
callDepth ++;
try {
((ChannelStateHandler) handler()).channelInactive(this);
} catch (Throwable t) {
notifyHandlerException(t);
} finally {
freeHandlerBuffersAfterRemoval();
callDepth --;
freeAllIfRemoved();
}
}
@ -831,6 +921,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
private void invokeExceptionCaught0(Throwable cause) {
ChannelHandler handler = handler();
callDepth ++;
try {
handler.exceptionCaught(this, cause);
} catch (Throwable t) {
@ -840,7 +931,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
"exceptionCaught() method while handling the following exception:", cause);
}
} finally {
freeHandlerBuffersAfterRemoval();
callDepth --;
freeAllIfRemoved();
}
}
@ -868,12 +960,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
private void invokeUserEventTriggered(Object event) {
ChannelStateHandler handler = (ChannelStateHandler) handler();
callDepth ++;
try {
handler.userEventTriggered(this, event);
} catch (Throwable t) {
notifyHandlerException(t);
} finally {
freeHandlerBuffersAfterRemoval();
callDepth --;
freeAllIfRemoved();
}
}
@ -931,6 +1025,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
ChannelStateHandler handler = (ChannelStateHandler) handler();
if (handler instanceof ChannelInboundHandler) {
for (;;) {
callDepth ++;
try {
boolean flushedAll = flushInboundBridge();
handler.inboundBufferUpdated(this);
@ -941,6 +1036,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
notifyHandlerException(t);
break;
} finally {
callDepth --;
if (handler instanceof ChannelInboundByteHandler && !isInboundFreed()) {
try {
((ChannelInboundByteHandler) handler).discardInboundReadBytes(this);
@ -948,14 +1044,21 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
notifyHandlerException(t);
}
}
freeHandlerBuffersAfterRemoval();
freeAllIfRemoved();
}
if (isInboundFreed()) {
break;
}
}
} else {
callDepth ++;
try {
handler.inboundBufferUpdated(this);
} catch (Throwable t) {
notifyHandlerException(t);
} finally {
callDepth --;
}
}
}
@ -982,12 +1085,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
private void invokeChannelReadSuspended() {
callDepth ++;
try {
((ChannelStateHandler) handler()).channelReadSuspended(this);
} catch (Throwable t) {
notifyHandlerException(t);
} finally {
freeHandlerBuffersAfterRemoval();
callDepth --;
freeAllIfRemoved();
}
}
@ -1056,12 +1161,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
private void invokeBind0(SocketAddress localAddress, ChannelPromise promise) {
callDepth ++;
try {
((ChannelOperationHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyHandlerException(t);
} finally {
freeHandlerBuffersAfterRemoval();
callDepth --;
freeAllIfRemoved();
}
}
@ -1097,12 +1204,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
private void invokeConnect0(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
callDepth ++;
try {
((ChannelOperationHandler) handler()).connect(this, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyHandlerException(t);
} finally {
freeHandlerBuffersAfterRemoval();
callDepth --;
freeAllIfRemoved();
}
}
@ -1136,12 +1245,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
private void invokeDisconnect0(ChannelPromise promise) {
callDepth ++;
try {
((ChannelOperationHandler) handler()).disconnect(this, promise);
} catch (Throwable t) {
notifyHandlerException(t);
} finally {
freeHandlerBuffersAfterRemoval();
callDepth --;
freeAllIfRemoved();
}
}
@ -1168,12 +1279,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
private void invokeClose0(ChannelPromise promise) {
callDepth ++;
try {
((ChannelOperationHandler) handler()).close(this, promise);
} catch (Throwable t) {
notifyHandlerException(t);
} finally {
freeHandlerBuffersAfterRemoval();
callDepth --;
freeAllIfRemoved();
}
}
@ -1200,12 +1313,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
private void invokeDeregister0(ChannelPromise promise) {
callDepth ++;
try {
((ChannelOperationHandler) handler()).deregister(this, promise);
} catch (Throwable t) {
notifyHandlerException(t);
} finally {
freeHandlerBuffersAfterRemoval();
callDepth --;
freeAllIfRemoved();
}
}
@ -1233,12 +1348,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
private void invokeRead0() {
callDepth ++;
try {
((ChannelOperationHandler) handler()).read(this);
} catch (Throwable t) {
notifyHandlerException(t);
} finally {
freeHandlerBuffersAfterRemoval();
callDepth --;
freeAllIfRemoved();
}
}
@ -1308,11 +1425,13 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
flushOutboundBridge();
}
callDepth ++;
try {
handler.flush(this, promise);
} catch (Throwable t) {
notifyHandlerException(t);
} finally {
callDepth --;
if (handler instanceof ChannelOutboundByteHandler && !isOutboundFreed()) {
try {
((ChannelOutboundByteHandler) handler).discardOutboundReadBytes(this);
@ -1320,7 +1439,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
notifyHandlerException(t);
}
}
freeHandlerBuffersAfterRemoval();
freeAllIfRemoved();
}
}
@ -1360,12 +1479,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
flushOutboundBridge();
}
callDepth ++;
try {
handler.sendFile(this, region, promise);
} catch (Throwable t) {
notifyHandlerException(t);
} finally {
freeHandlerBuffersAfterRemoval();
callDepth --;
freeAllIfRemoved();
}
}
@ -1453,80 +1574,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
invokeFlush0(promise);
}
void freeInbound() {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
freeInbound0();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
freeInbound0();
}
});
}
}
private void freeInbound0() {
try {
freeBuffer(inByteBuf);
freeBuffer(inMsgBuf);
} finally {
flags |= FLAG_FREED_INBOUND;
freeNextInboundBridgeFeeder();
}
if (next != null) {
DefaultChannelHandlerContext nextCtx = findContextInbound();
nextCtx.freeInbound();
} else {
// Freed all inbound buffers. Free all outbound buffers in a reverse order.
findContextOutbound().freeOutbound();
}
}
/** Invocation initiated by {@link #freeInbound0()} after freeing all inbound buffers. */
private void freeOutbound() {
EventExecutor executor = executor();
if (next == null) {
if (executor.inEventLoop()) {
freeOutbound0();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
freeOutbound0();
}
});
}
} else {
if (executor.inEventLoop()) {
freeOutbound0();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
freeOutbound0();
}
});
}
}
}
private void freeOutbound0() {
try {
freeBuffer(outByteBuf);
freeBuffer(outMsgBuf);
} finally {
flags |= FLAG_FREED_OUTBOUND;
freeNextOutboundBridgeFeeder();
}
if (prev != null) {
findContextOutbound().freeOutbound();
}
}
private void notifyHandlerException(Throwable cause) {
if (inExceptionCaught(cause)) {
if (logger.isWarnEnabled()) {

View File

@ -331,14 +331,14 @@ final class DefaultChannelPipeline implements ChannelPipeline {
synchronized (this) {
if (!ctx.channel().isRegistered() || ctx.executor().inEventLoop()) {
remove0(ctx);
remove0(ctx, true);
return ctx;
} else {
future = ctx.executor().submit(new Runnable() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
remove0(ctx);
remove0(ctx, true);
}
}
});
@ -354,14 +354,14 @@ final class DefaultChannelPipeline implements ChannelPipeline {
return context;
}
private void remove0(DefaultChannelHandlerContext ctx) {
void remove0(DefaultChannelHandlerContext ctx, boolean forward) {
DefaultChannelHandlerContext prev = ctx.prev;
DefaultChannelHandlerContext next = ctx.next;
prev.next = next;
next.prev = prev;
name2ctx.remove(ctx.name());
callHandlerRemoved(ctx, prev, next);
callHandlerRemoved(ctx, prev, next, forward);
}
@Override
@ -462,7 +462,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
// because callHandlerRemoved() will trigger inboundBufferUpdated() or flush() on newHandler and those
// event handlers must be called after handlerAdded().
callHandlerAdded(newCtx);
callHandlerRemoved(oldCtx, newCtx, newCtx);
callHandlerRemoved(oldCtx, newCtx, newCtx, true);
}
private static void checkMultiplicity(ChannelHandlerContext ctx) {
@ -519,31 +519,33 @@ final class DefaultChannelPipeline implements ChannelPipeline {
private void callHandlerRemoved(
final DefaultChannelHandlerContext ctx, final DefaultChannelHandlerContext ctxPrev,
final DefaultChannelHandlerContext ctxNext) {
final DefaultChannelHandlerContext ctxNext, final boolean forward) {
if (ctx.channel().isRegistered() && !ctx.executor().inEventLoop()) {
ctx.executor().execute(new Runnable() {
@Override
public void run() {
callHandlerRemoved0(ctx, ctxPrev, ctxNext);
callHandlerRemoved0(ctx, ctxPrev, ctxNext, forward);
}
});
return;
}
callHandlerRemoved0(ctx, ctxPrev, ctxNext);
callHandlerRemoved0(ctx, ctxPrev, ctxNext, forward);
}
private void callHandlerRemoved0(
final DefaultChannelHandlerContext ctx, DefaultChannelHandlerContext ctxPrev,
DefaultChannelHandlerContext ctxNext) {
DefaultChannelHandlerContext ctxNext, boolean forward) {
final ChannelHandler handler = ctx.handler();
// Finish removal by forwarding buffer content and freeing the buffers.
try {
ctx.forwardBufferContentAndRemove(ctxPrev, ctxNext);
} catch (Throwable t) {
fireExceptionCaught(new ChannelPipelineException(
"failed to forward buffer content of " + ctx.handler().getClass().getName(), t));
if (forward) {
try {
ctx.forwardBufferContentAndRemove(ctxPrev, ctxNext);
} catch (Throwable t) {
fireExceptionCaught(new ChannelPipelineException(
"failed to forward buffer content of " + ctx.handler().getClass().getName(), t));
}
}
// Notify the complete removal.