diff --git a/handler/src/main/java/io/netty/handler/queue/BufferedWriteHandler.java b/handler/src/main/java/io/netty/handler/queue/BufferedWriteHandler.java index 97ceabc0f0..9c8ab56acd 100644 --- a/handler/src/main/java/io/netty/handler/queue/BufferedWriteHandler.java +++ b/handler/src/main/java/io/netty/handler/queue/BufferedWriteHandler.java @@ -15,6 +15,8 @@ */ package io.netty.handler.queue; +import java.io.IOException; +import java.nio.channels.ClosedChannelException; import java.util.ArrayList; import java.util.List; import java.util.Queue; @@ -29,6 +31,7 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelStateEvent; import io.netty.channel.Channels; +import io.netty.channel.LifeCycleAwareChannelHandler; import io.netty.channel.MessageEvent; import io.netty.channel.SimpleChannelHandler; import io.netty.channel.socket.nio.NioSocketChannelConfig; @@ -156,7 +159,7 @@ import io.netty.util.internal.QueueFactory; * {@link HashedWheelTimer} every second. * @apiviz.landmark */ -public class BufferedWriteHandler extends SimpleChannelHandler { +public class BufferedWriteHandler extends SimpleChannelHandler implements LifeCycleAwareChannelHandler { private final Queue queue; private final boolean consolidateOnFlush; @@ -351,4 +354,74 @@ public class BufferedWriteHandler extends SimpleChannelHandler { ctx.sendDownstream(e); } } + + /** + * Fail all buffered writes that are left. See #308 for more details + */ + @Override + public void afterRemove(ChannelHandlerContext ctx) throws Exception { + Throwable cause = null; + for (;;) { + MessageEvent ev = queue.poll(); + + if (ev == null) { + break; + } + + if (cause == null) { + cause = new IOException("Unable to flush message"); + } + ev.getFuture().setFailure(cause); + + } + if (cause != null) { + Channels.fireExceptionCaughtLater(ctx.getChannel(), cause); + } + } + + }