From b5706d54f738524baf87ac483bd1f3592717a7d3 Mon Sep 17 00:00:00 2001 From: norman Date: Fri, 4 May 2012 13:47:30 +0200 Subject: [PATCH] Make sure the ChannelFuture's of the MessageEvent's are notified on channelClosed(..) event and on removal of the handler from the ChannelPipeline. See #308 --- .../handler/queue/BufferedWriteHandler.java | 71 ++++++++++++++++++- 1 file changed, 70 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/jboss/netty/handler/queue/BufferedWriteHandler.java b/src/main/java/org/jboss/netty/handler/queue/BufferedWriteHandler.java index 8b3c36b317..d5d4489389 100644 --- a/src/main/java/org/jboss/netty/handler/queue/BufferedWriteHandler.java +++ b/src/main/java/org/jboss/netty/handler/queue/BufferedWriteHandler.java @@ -15,6 +15,8 @@ */ package org.jboss.netty.handler.queue; +import java.io.IOException; +import java.nio.channels.ClosedChannelException; import java.util.ArrayList; import java.util.List; import java.util.Queue; @@ -29,6 +31,7 @@ import org.jboss.netty.channel.ChannelFutureListener; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.LifeCycleAwareChannelHandler; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig; @@ -156,7 +159,7 @@ import org.jboss.netty.util.internal.QueueFactory; * {@link HashedWheelTimer} every second. * @apiviz.landmark */ -public class BufferedWriteHandler extends SimpleChannelHandler { +public class BufferedWriteHandler extends SimpleChannelHandler implements LifeCycleAwareChannelHandler { private final Queue queue; private final boolean consolidateOnFlush; @@ -350,4 +353,70 @@ public class BufferedWriteHandler extends SimpleChannelHandler { ctx.sendDownstream(e); } } + + /** + * Fail all buffered writes that are left. See #308 for more details + */ + public void afterRemove(ChannelHandlerContext ctx) throws Exception { + Throwable cause = null; + for (;;) { + MessageEvent ev = queue.poll(); + + if (ev == null) { + break; + } + + if (cause == null) { + cause = new IOException("Unable to flush message"); + } + ev.getFuture().setFailure(cause); + + } + if (cause != null) { + Channels.fireExceptionCaughtLater(ctx.getChannel(), cause); + } + } + + }