diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java index 2a1f7a2c3f..d7064531ff 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java @@ -17,6 +17,7 @@ package io.netty.handler.stream; import static io.netty.channel.Channels.*; +import java.io.IOException; import java.nio.channels.ClosedChannelException; import java.util.Queue; @@ -32,6 +33,7 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelStateEvent; import io.netty.channel.ChannelUpstreamHandler; import io.netty.channel.Channels; +import io.netty.channel.LifeCycleAwareChannelHandler; import io.netty.channel.MessageEvent; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; @@ -70,7 +72,7 @@ import io.netty.util.internal.QueueFactory; * @apiviz.landmark * @apiviz.has io.netty.handler.stream.ChunkedInput oneway - - reads from */ -public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler { +public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler, LifeCycleAwareChannelHandler { private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChunkedWriteHandler.class); @@ -283,4 +285,63 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns } } } + + + public void beforeAdd(ChannelHandlerContext ctx) throws Exception { + // nothing to do + + } + + public void afterAdd(ChannelHandlerContext ctx) throws Exception { + // nothing to do + + } + + public void beforeRemove(ChannelHandlerContext ctx) throws Exception { + // try to flush again a last time. + // + // See #304 + flush(ctx, false); + } + + // This method should not need any synchronization as the ChunkedWriteHandler will not receive any new events + public void afterRemove(ChannelHandlerContext ctx) throws Exception { + // Fail all MessageEvent's that are left. This is needed because otherwise we would never notify the + // ChannelFuture and the registered FutureListener. See #304 + // + Throwable cause = null; + boolean fireExceptionCaught = false; + + for (;;) { + MessageEvent currentEvent = this.currentEvent; + + if (this.currentEvent == null) { + currentEvent = queue.poll(); + } else { + this.currentEvent = null; + } + + if (currentEvent == null) { + break; + } + + Object m = currentEvent.getMessage(); + if (m instanceof ChunkedInput) { + closeInput((ChunkedInput) m); + } + + // Create exception + if (cause == null) { + cause = new IOException("Unable to flush event, discarding"); + } + currentEvent.getFuture().setFailure(cause); + fireExceptionCaught = true; + + currentEvent = null; + } + + if (fireExceptionCaught) { + Channels.fireExceptionCaughtLater(ctx.getChannel(), cause); + } + } }