diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java index c35fa7518b..99dd761c37 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java @@ -19,7 +19,6 @@ import io.netty.buffer.MessageBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; -import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; @@ -99,14 +98,6 @@ public class ChunkedWriteHandler this.ctx = ctx; } - // This method should not need any synchronization as the ChunkedWriteHandler will not receive any new events - @Override - public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { - // Fail all promised that are queued. This is needed because otherwise we would never notify the - // ChannelFuture and the registered FutureListener. See #304 - discard(ctx, new ChannelException(ChunkedWriteHandler.class.getSimpleName() + " removed from pipeline.")); - } - private boolean isWritable() { return pendingWrites.get() < maxPendingWrites; } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index 1991069de8..31f4d9bbbf 100755 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -60,8 +60,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements private final ByteBuf inByteBuf; private MessageBuf outMsgBuf; private ByteBuf outByteBuf; - - private int flags; + private short callDepth; + private short flags; // When the two handlers run in a different thread and they are next to each other, // each other's buffers can be accessed at the same time resulting in a race condition. @@ -200,51 +200,58 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements void forwardBufferContentAndRemove( final DefaultChannelHandlerContext forwardPrev, final DefaultChannelHandlerContext forwardNext) { + try { boolean flush = false; boolean inboundBufferUpdated = false; - if (hasOutboundByteBuffer() && outboundByteBuffer().isReadable()) { - ByteBuf forwardPrevBuf; - if (forwardPrev.hasOutboundByteBuffer()) { - forwardPrevBuf = forwardPrev.outboundByteBuffer(); - } else { - forwardPrevBuf = forwardPrev.nextOutboundByteBuffer(); - } - forwardPrevBuf.writeBytes(outboundByteBuffer()); - flush = true; - } - if (hasOutboundMessageBuffer() && !outboundMessageBuffer().isEmpty()) { - MessageBuf forwardPrevBuf; - if (forwardPrev.hasOutboundMessageBuffer()) { - forwardPrevBuf = forwardPrev.outboundMessageBuffer(); - } else { - forwardPrevBuf = forwardPrev.nextOutboundMessageBuffer(); - } - if (outboundMessageBuffer().drainTo(forwardPrevBuf) > 0) { + if (!isOutboundFreed()) { + if (hasOutboundByteBuffer() && outboundByteBuffer().isReadable()) { + ByteBuf forwardPrevBuf; + if (forwardPrev.hasOutboundByteBuffer()) { + forwardPrevBuf = forwardPrev.outboundByteBuffer(); + } else { + forwardPrevBuf = forwardPrev.nextOutboundByteBuffer(); + } + forwardPrevBuf.writeBytes(outboundByteBuffer()); flush = true; } - } - if (hasInboundByteBuffer() && inboundByteBuffer().isReadable()) { - ByteBuf forwardNextBuf; - if (forwardNext.hasInboundByteBuffer()) { - forwardNextBuf = forwardNext.inboundByteBuffer(); - } else { - forwardNextBuf = forwardNext.nextInboundByteBuffer(); + if (hasOutboundMessageBuffer() && !outboundMessageBuffer().isEmpty()) { + MessageBuf forwardPrevBuf; + if (forwardPrev.hasOutboundMessageBuffer()) { + forwardPrevBuf = forwardPrev.outboundMessageBuffer(); + } else { + forwardPrevBuf = forwardPrev.nextOutboundMessageBuffer(); + } + if (outboundMessageBuffer().drainTo(forwardPrevBuf) > 0) { + flush = true; + } } - forwardNextBuf.writeBytes(inboundByteBuffer()); - inboundBufferUpdated = true; } - if (hasInboundMessageBuffer() && !inboundMessageBuffer().isEmpty()) { - MessageBuf forwardNextBuf; - if (forwardNext.hasInboundMessageBuffer()) { - forwardNextBuf = forwardNext.inboundMessageBuffer(); - } else { - forwardNextBuf = forwardNext.nextInboundMessageBuffer(); - } - if (inboundMessageBuffer().drainTo(forwardNextBuf) > 0) { + + if (!isInboundFreed()) { + if (hasInboundByteBuffer() && inboundByteBuffer().isReadable()) { + ByteBuf forwardNextBuf; + if (forwardNext.hasInboundByteBuffer()) { + forwardNextBuf = forwardNext.inboundByteBuffer(); + } else { + forwardNextBuf = forwardNext.nextInboundByteBuffer(); + } + forwardNextBuf.writeBytes(inboundByteBuffer()); inboundBufferUpdated = true; } + if (hasInboundMessageBuffer() && !inboundMessageBuffer().isEmpty()) { + MessageBuf forwardNextBuf; + if (forwardNext.hasInboundMessageBuffer()) { + forwardNextBuf = forwardNext.inboundMessageBuffer(); + } else { + forwardNextBuf = forwardNext.nextInboundMessageBuffer(); + } + if (inboundMessageBuffer().drainTo(forwardNextBuf) > 0) { + inboundBufferUpdated = true; + } + } } + if (flush) { EventExecutor executor = executor(); Thread currentThread = Thread.currentThread(); @@ -260,6 +267,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements }); } } + if (inboundBufferUpdated) { EventExecutor executor = executor(); if (executor.inEventLoop()) { @@ -275,11 +283,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } } finally { flags |= FLAG_REMOVED; - - // Free all buffers before completing removal. - if (!channel.isRegistered()) { - freeHandlerBuffersAfterRemoval(); - } + freeAllIfRemoved(); } } @@ -430,34 +434,105 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements return nextBufferHadEnoughRoom; } - private void freeHandlerBuffersAfterRemoval() { - int flags = this.flags; + private boolean isInboundFreed() { + return (flags & FLAG_FREED_INBOUND) != 0; + } + + private boolean isOutboundFreed() { + return (flags & FLAG_FREED_OUTBOUND) != 0; + } + + private void freeAllIfRemoved() { + if (callDepth != 0) { + // Free only when the current context's handler is not being called. + return; + } + + final int flags = this.flags; if ((flags & FLAG_REMOVED) != 0 && (flags & FLAG_FREED) == 0) { // Removed, but not freed yet try { - freeBuffer(inByteBuf); - freeBuffer(inMsgBuf); - freeBuffer(outByteBuf); - freeBuffer(outMsgBuf); + safeFree(inByteBuf); + safeFree(inMsgBuf); + safeFree(outByteBuf); + safeFree(outMsgBuf); } finally { - flags |= FLAG_FREED | FLAG_FREED_INBOUND | FLAG_FREED_OUTBOUND; + this.flags = (short) (flags | FLAG_FREED | FLAG_FREED_INBOUND | FLAG_FREED_OUTBOUND); freeNextInboundBridgeFeeder(); freeNextOutboundBridgeFeeder(); } } } - private void freeBuffer(Buf buf) { - if (buf != null) { - try { - buf.release(); - } catch (Exception e) { - notifyHandlerException(e); - } + void freeInbound() { + EventExecutor executor = executor(); + if (executor.inEventLoop()) { + freeInbound0(); + } else { + executor.execute(new Runnable() { + @Override + public void run() { + freeInbound0(); + } + }); } } - private boolean isInboundFreed() { - return (flags & FLAG_FREED_INBOUND) != 0; + private void freeInbound0() { + try { + safeFree(inByteBuf); + safeFree(inMsgBuf); + } finally { + flags |= FLAG_FREED_INBOUND; + freeNextInboundBridgeFeeder(); + } + + if (next != null) { + DefaultChannelHandlerContext nextCtx = findContextInbound(); + nextCtx.freeInbound(); + } else { + // Freed all inbound buffers. Remove all handlers from the pipeline one by one from tail (exclusive) + // to head (inclusive) to trigger handlerRemoved(). If the removed handler has an outbound buffer, free it, + // too. Note that the tail handler is excluded because it's neither an outbound buffer and it doesn't + // do anything in handlerRemoved(). + pipeline.tail.prev.freeOutboundAndRemove(); + } + } + + /** Invocation initiated by {@link #freeInbound0()} after freeing all inbound buffers. */ + private void freeOutboundAndRemove() { + EventExecutor executor = executor(); + if (executor.inEventLoop()) { + freeOutboundAndRemove0(); + } else { + executor.execute(new Runnable() { + @Override + public void run() { + freeOutboundAndRemove0(); + } + }); + } + } + + private void freeOutboundAndRemove0() { + if (handler instanceof ChannelOperationHandler) { + // Outbound handler - free the buffers / bridge feeders + try { + safeFree(outByteBuf); + safeFree(outMsgBuf); + } finally { + // We also OR FLAG_FREED because at this point we are sure both inbound and outbound were freed. + flags |= FLAG_FREED | FLAG_FREED_OUTBOUND; + freeNextOutboundBridgeFeeder(); + } + } + + DefaultChannelHandlerContext prev = this.prev; + if (prev != null) { + synchronized (pipeline) { + pipeline.remove0(this, false); + } + prev.freeOutboundAndRemove(); + } } private void freeNextInboundBridgeFeeder() { @@ -479,10 +554,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } } - private boolean isOutboundFreed() { - return (flags & FLAG_FREED_OUTBOUND) != 0; - } - private void freeNextOutboundBridgeFeeder() { // Release the bridge feeder NextBridgeFeeder feeder = nextOutBridgeFeeder; @@ -500,6 +571,16 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } } + private static void safeFree(Buf buf) { + if (buf != null) { + try { + buf.release(); + } catch (Exception e) { + logger.warn("Failed to release a handler buffer.", e); + } + } + } + @Override public Channel channel() { return channel; @@ -710,12 +791,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } private void invokeChannelRegistered() { + callDepth ++; try { ((ChannelStateHandler) handler()).channelRegistered(this); } catch (Throwable t) { notifyHandlerException(t); } finally { - freeHandlerBuffersAfterRemoval(); + callDepth --; + freeAllIfRemoved(); } } @@ -737,10 +820,13 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } private void invokeChannelUnregistered() { + callDepth ++; try { ((ChannelStateHandler) handler()).channelUnregistered(this); } catch (Throwable t) { notifyHandlerException(t); + } finally { + callDepth --; } } @@ -762,12 +848,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } private void invokeChannelActive() { + callDepth ++; try { ((ChannelStateHandler) handler()).channelActive(this); } catch (Throwable t) { notifyHandlerException(t); } finally { - freeHandlerBuffersAfterRemoval(); + callDepth --; + freeAllIfRemoved(); } } @@ -789,12 +877,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } private void invokeChannelInactive() { + callDepth ++; try { ((ChannelStateHandler) handler()).channelInactive(this); } catch (Throwable t) { notifyHandlerException(t); } finally { - freeHandlerBuffersAfterRemoval(); + callDepth --; + freeAllIfRemoved(); } } @@ -831,6 +921,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements private void invokeExceptionCaught0(Throwable cause) { ChannelHandler handler = handler(); + callDepth ++; try { handler.exceptionCaught(this, cause); } catch (Throwable t) { @@ -840,7 +931,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements "exceptionCaught() method while handling the following exception:", cause); } } finally { - freeHandlerBuffersAfterRemoval(); + callDepth --; + freeAllIfRemoved(); } } @@ -868,12 +960,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements private void invokeUserEventTriggered(Object event) { ChannelStateHandler handler = (ChannelStateHandler) handler(); + callDepth ++; try { handler.userEventTriggered(this, event); } catch (Throwable t) { notifyHandlerException(t); } finally { - freeHandlerBuffersAfterRemoval(); + callDepth --; + freeAllIfRemoved(); } } @@ -931,6 +1025,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ChannelStateHandler handler = (ChannelStateHandler) handler(); if (handler instanceof ChannelInboundHandler) { for (;;) { + callDepth ++; try { boolean flushedAll = flushInboundBridge(); handler.inboundBufferUpdated(this); @@ -941,6 +1036,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements notifyHandlerException(t); break; } finally { + callDepth --; if (handler instanceof ChannelInboundByteHandler && !isInboundFreed()) { try { ((ChannelInboundByteHandler) handler).discardInboundReadBytes(this); @@ -948,14 +1044,21 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements notifyHandlerException(t); } } - freeHandlerBuffersAfterRemoval(); + freeAllIfRemoved(); + } + + if (isInboundFreed()) { + break; } } } else { + callDepth ++; try { handler.inboundBufferUpdated(this); } catch (Throwable t) { notifyHandlerException(t); + } finally { + callDepth --; } } } @@ -982,12 +1085,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } private void invokeChannelReadSuspended() { + callDepth ++; try { ((ChannelStateHandler) handler()).channelReadSuspended(this); } catch (Throwable t) { notifyHandlerException(t); } finally { - freeHandlerBuffersAfterRemoval(); + callDepth --; + freeAllIfRemoved(); } } @@ -1056,12 +1161,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } private void invokeBind0(SocketAddress localAddress, ChannelPromise promise) { + callDepth ++; try { ((ChannelOperationHandler) handler()).bind(this, localAddress, promise); } catch (Throwable t) { notifyHandlerException(t); } finally { - freeHandlerBuffersAfterRemoval(); + callDepth --; + freeAllIfRemoved(); } } @@ -1097,12 +1204,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } private void invokeConnect0(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { + callDepth ++; try { ((ChannelOperationHandler) handler()).connect(this, remoteAddress, localAddress, promise); } catch (Throwable t) { notifyHandlerException(t); } finally { - freeHandlerBuffersAfterRemoval(); + callDepth --; + freeAllIfRemoved(); } } @@ -1136,12 +1245,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } private void invokeDisconnect0(ChannelPromise promise) { + callDepth ++; try { ((ChannelOperationHandler) handler()).disconnect(this, promise); } catch (Throwable t) { notifyHandlerException(t); } finally { - freeHandlerBuffersAfterRemoval(); + callDepth --; + freeAllIfRemoved(); } } @@ -1168,12 +1279,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } private void invokeClose0(ChannelPromise promise) { + callDepth ++; try { ((ChannelOperationHandler) handler()).close(this, promise); } catch (Throwable t) { notifyHandlerException(t); } finally { - freeHandlerBuffersAfterRemoval(); + callDepth --; + freeAllIfRemoved(); } } @@ -1200,12 +1313,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } private void invokeDeregister0(ChannelPromise promise) { + callDepth ++; try { ((ChannelOperationHandler) handler()).deregister(this, promise); } catch (Throwable t) { notifyHandlerException(t); } finally { - freeHandlerBuffersAfterRemoval(); + callDepth --; + freeAllIfRemoved(); } } @@ -1233,12 +1348,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } private void invokeRead0() { + callDepth ++; try { ((ChannelOperationHandler) handler()).read(this); } catch (Throwable t) { notifyHandlerException(t); } finally { - freeHandlerBuffersAfterRemoval(); + callDepth --; + freeAllIfRemoved(); } } @@ -1308,11 +1425,13 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements flushOutboundBridge(); } + callDepth ++; try { handler.flush(this, promise); } catch (Throwable t) { notifyHandlerException(t); } finally { + callDepth --; if (handler instanceof ChannelOutboundByteHandler && !isOutboundFreed()) { try { ((ChannelOutboundByteHandler) handler).discardOutboundReadBytes(this); @@ -1320,7 +1439,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements notifyHandlerException(t); } } - freeHandlerBuffersAfterRemoval(); + freeAllIfRemoved(); } } @@ -1360,12 +1479,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements flushOutboundBridge(); } + callDepth ++; try { handler.sendFile(this, region, promise); } catch (Throwable t) { notifyHandlerException(t); } finally { - freeHandlerBuffersAfterRemoval(); + callDepth --; + freeAllIfRemoved(); } } @@ -1453,80 +1574,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements invokeFlush0(promise); } - void freeInbound() { - EventExecutor executor = executor(); - if (executor.inEventLoop()) { - freeInbound0(); - } else { - executor.execute(new Runnable() { - @Override - public void run() { - freeInbound0(); - } - }); - } - } - - private void freeInbound0() { - try { - freeBuffer(inByteBuf); - freeBuffer(inMsgBuf); - } finally { - flags |= FLAG_FREED_INBOUND; - freeNextInboundBridgeFeeder(); - } - - if (next != null) { - DefaultChannelHandlerContext nextCtx = findContextInbound(); - nextCtx.freeInbound(); - } else { - // Freed all inbound buffers. Free all outbound buffers in a reverse order. - findContextOutbound().freeOutbound(); - } - } - - /** Invocation initiated by {@link #freeInbound0()} after freeing all inbound buffers. */ - private void freeOutbound() { - EventExecutor executor = executor(); - if (next == null) { - if (executor.inEventLoop()) { - freeOutbound0(); - } else { - executor.execute(new Runnable() { - @Override - public void run() { - freeOutbound0(); - } - }); - } - } else { - if (executor.inEventLoop()) { - freeOutbound0(); - } else { - executor.execute(new Runnable() { - @Override - public void run() { - freeOutbound0(); - } - }); - } - } - } - - private void freeOutbound0() { - try { - freeBuffer(outByteBuf); - freeBuffer(outMsgBuf); - } finally { - flags |= FLAG_FREED_OUTBOUND; - freeNextOutboundBridgeFeeder(); - } - - if (prev != null) { - findContextOutbound().freeOutbound(); - } - } - private void notifyHandlerException(Throwable cause) { if (inExceptionCaught(cause)) { if (logger.isWarnEnabled()) { diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index fdb2a6c8d0..aff6ab7cb7 100755 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -331,14 +331,14 @@ final class DefaultChannelPipeline implements ChannelPipeline { synchronized (this) { if (!ctx.channel().isRegistered() || ctx.executor().inEventLoop()) { - remove0(ctx); + remove0(ctx, true); return ctx; } else { future = ctx.executor().submit(new Runnable() { @Override public void run() { synchronized (DefaultChannelPipeline.this) { - remove0(ctx); + remove0(ctx, true); } } }); @@ -354,14 +354,14 @@ final class DefaultChannelPipeline implements ChannelPipeline { return context; } - private void remove0(DefaultChannelHandlerContext ctx) { + void remove0(DefaultChannelHandlerContext ctx, boolean forward) { DefaultChannelHandlerContext prev = ctx.prev; DefaultChannelHandlerContext next = ctx.next; prev.next = next; next.prev = prev; name2ctx.remove(ctx.name()); - callHandlerRemoved(ctx, prev, next); + callHandlerRemoved(ctx, prev, next, forward); } @Override @@ -462,7 +462,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { // because callHandlerRemoved() will trigger inboundBufferUpdated() or flush() on newHandler and those // event handlers must be called after handlerAdded(). callHandlerAdded(newCtx); - callHandlerRemoved(oldCtx, newCtx, newCtx); + callHandlerRemoved(oldCtx, newCtx, newCtx, true); } private static void checkMultiplicity(ChannelHandlerContext ctx) { @@ -519,31 +519,33 @@ final class DefaultChannelPipeline implements ChannelPipeline { private void callHandlerRemoved( final DefaultChannelHandlerContext ctx, final DefaultChannelHandlerContext ctxPrev, - final DefaultChannelHandlerContext ctxNext) { + final DefaultChannelHandlerContext ctxNext, final boolean forward) { if (ctx.channel().isRegistered() && !ctx.executor().inEventLoop()) { ctx.executor().execute(new Runnable() { @Override public void run() { - callHandlerRemoved0(ctx, ctxPrev, ctxNext); + callHandlerRemoved0(ctx, ctxPrev, ctxNext, forward); } }); return; } - callHandlerRemoved0(ctx, ctxPrev, ctxNext); + callHandlerRemoved0(ctx, ctxPrev, ctxNext, forward); } private void callHandlerRemoved0( final DefaultChannelHandlerContext ctx, DefaultChannelHandlerContext ctxPrev, - DefaultChannelHandlerContext ctxNext) { + DefaultChannelHandlerContext ctxNext, boolean forward) { final ChannelHandler handler = ctx.handler(); // Finish removal by forwarding buffer content and freeing the buffers. - try { - ctx.forwardBufferContentAndRemove(ctxPrev, ctxNext); - } catch (Throwable t) { - fireExceptionCaught(new ChannelPipelineException( - "failed to forward buffer content of " + ctx.handler().getClass().getName(), t)); + if (forward) { + try { + ctx.forwardBufferContentAndRemove(ctxPrev, ctxNext); + } catch (Throwable t) { + fireExceptionCaught(new ChannelPipelineException( + "failed to forward buffer content of " + ctx.handler().getClass().getName(), t)); + } } // Notify the complete removal.