Fail all queued writes if the ChunkedWriteHandler is removed from the ChannelPipeline. See #304

This commit is contained in:
norman 2012-05-04 10:26:25 +02:00 committed by Trustin Lee
parent d308fa8fe1
commit c0bb070876

View File

@ -17,6 +17,7 @@ package io.netty.handler.stream;
import static io.netty.channel.Channels.*;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.Queue;
@ -32,6 +33,7 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.ChannelUpstreamHandler;
import io.netty.channel.Channels;
import io.netty.channel.LifeCycleAwareChannelHandler;
import io.netty.channel.MessageEvent;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
@ -70,7 +72,7 @@ import io.netty.util.internal.QueueFactory;
* @apiviz.landmark
* @apiviz.has io.netty.handler.stream.ChunkedInput oneway - - reads from
*/
public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler {
public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler, LifeCycleAwareChannelHandler {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
@ -283,4 +285,63 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
}
}
}
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
// nothing to do
}
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
// nothing to do
}
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
// try to flush again a last time.
//
// See #304
flush(ctx, false);
}
// This method should not need any synchronization as the ChunkedWriteHandler will not receive any new events
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
// Fail all MessageEvent's that are left. This is needed because otherwise we would never notify the
// ChannelFuture and the registered FutureListener. See #304
//
Throwable cause = null;
boolean fireExceptionCaught = false;
for (;;) {
MessageEvent currentEvent = this.currentEvent;
if (this.currentEvent == null) {
currentEvent = queue.poll();
} else {
this.currentEvent = null;
}
if (currentEvent == null) {
break;
}
Object m = currentEvent.getMessage();
if (m instanceof ChunkedInput) {
closeInput((ChunkedInput) m);
}
// Create exception
if (cause == null) {
cause = new IOException("Unable to flush event, discarding");
}
currentEvent.getFuture().setFailure(cause);
fireExceptionCaught = true;
currentEvent = null;
}
if (fireExceptionCaught) {
Channels.fireExceptionCaughtLater(ctx.getChannel(), cause);
}
}
}