diff --git a/src/main/java/org/jboss/netty/handler/stream/ChunkedWriteHandler.java b/src/main/java/org/jboss/netty/handler/stream/ChunkedWriteHandler.java index bad00834b3..86a46bedac 100644 --- a/src/main/java/org/jboss/netty/handler/stream/ChunkedWriteHandler.java +++ b/src/main/java/org/jboss/netty/handler/stream/ChunkedWriteHandler.java @@ -17,6 +17,7 @@ package org.jboss.netty.handler.stream; import static org.jboss.netty.channel.Channels.*; +import java.io.IOException; import java.nio.channels.ClosedChannelException; import java.util.Queue; @@ -32,6 +33,7 @@ import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ChannelUpstreamHandler; import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.LifeCycleAwareChannelHandler; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; @@ -70,7 +72,7 @@ import org.jboss.netty.util.internal.QueueFactory; * @apiviz.landmark * @apiviz.has org.jboss.netty.handler.stream.ChunkedInput oneway - - reads from */ -public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler { +public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler, LifeCycleAwareChannelHandler { private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChunkedWriteHandler.class); @@ -80,13 +82,6 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns private volatile ChannelHandlerContext ctx; private MessageEvent currentEvent; - /** - * Creates a new instance. - */ - public ChunkedWriteHandler() { - super(); - } - /** * Continues to fetch the chunks from the input. */ @@ -299,4 +294,62 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns } } } + + public void beforeAdd(ChannelHandlerContext ctx) throws Exception { + // nothing to do + + } + + public void afterAdd(ChannelHandlerContext ctx) throws Exception { + // nothing to do + + } + + public void beforeRemove(ChannelHandlerContext ctx) throws Exception { + // try to flush again a last time. + // + // See #304 + flush(ctx, false); + } + + // This method should not need any synchronization as the ChunkedWriteHandler will not receive any new events + public void afterRemove(ChannelHandlerContext ctx) throws Exception { + // Fail all MessageEvent's that are left. This is needed because otherwise we would never notify the + // ChannelFuture and the registered FutureListener. See #304 + // + Throwable cause = null; + boolean fireExceptionCaught = false; + + for (;;) { + MessageEvent currentEvent = this.currentEvent; + + if (this.currentEvent == null) { + currentEvent = queue.poll(); + } else { + this.currentEvent = null; + } + + if (currentEvent == null) { + break; + } + + Object m = currentEvent.getMessage(); + if (m instanceof ChunkedInput) { + closeInput((ChunkedInput) m); + } + + // Create exception + if (cause == null) { + cause = new IOException("Unable to flush event, discarding"); + } + currentEvent.getFuture().setFailure(cause); + fireExceptionCaught = true; + + currentEvent = null; + } + + if (fireExceptionCaught) { + Channels.fireExceptionCaughtLater(ctx.getChannel(), cause); + } + } }