Make sure the ChannelFuture's of the MessageEvent's are notified on channelClosed(..) event and on removal of the handler from the ChannelPipeline. See #308

This commit is contained in:
norman 2012-05-04 13:56:34 +02:00
parent d509425b90
commit 21a61ce632

View File

@ -15,6 +15,8 @@
*/ */
package io.netty.handler.queue; package io.netty.handler.queue;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Queue; import java.util.Queue;
@ -29,6 +31,7 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelStateEvent; import io.netty.channel.ChannelStateEvent;
import io.netty.channel.Channels; import io.netty.channel.Channels;
import io.netty.channel.LifeCycleAwareChannelHandler;
import io.netty.channel.MessageEvent; import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelHandler; import io.netty.channel.SimpleChannelHandler;
import io.netty.channel.socket.nio.NioSocketChannelConfig; import io.netty.channel.socket.nio.NioSocketChannelConfig;
@ -156,7 +159,7 @@ import io.netty.util.internal.QueueFactory;
* {@link HashedWheelTimer} every second. * {@link HashedWheelTimer} every second.
* @apiviz.landmark * @apiviz.landmark
*/ */
public class BufferedWriteHandler extends SimpleChannelHandler { public class BufferedWriteHandler extends SimpleChannelHandler implements LifeCycleAwareChannelHandler {
private final Queue<MessageEvent> queue; private final Queue<MessageEvent> queue;
private final boolean consolidateOnFlush; private final boolean consolidateOnFlush;
@ -351,4 +354,74 @@ public class BufferedWriteHandler extends SimpleChannelHandler {
ctx.sendDownstream(e); ctx.sendDownstream(e);
} }
} }
/**
* Fail all buffered writes that are left. See <a href="https://github.com/netty/netty/issues/308>#308</a> for more details
*/
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
Throwable cause = null;
for (;;) {
MessageEvent ev = queue.poll();
if (ev == null) {
break;
}
if (cause == null) {
cause = new ClosedChannelException();
}
ev.getFuture().setFailure(cause);
}
if (cause != null) {
Channels.fireExceptionCaught(ctx.getChannel(), cause);
}
super.channelClosed(ctx, e);
}
@Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
// Nothing to do
}
@Override
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
// Nothing to do
}
@Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
// flush a last time before remove the handler
flush(consolidateOnFlush);
}
/**
* Fail all buffered writes that are left. See <a href="https://github.com/netty/netty/issues/308>#308</a> for more details
*/
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
Throwable cause = null;
for (;;) {
MessageEvent ev = queue.poll();
if (ev == null) {
break;
}
if (cause == null) {
cause = new IOException("Unable to flush message");
}
ev.getFuture().setFailure(cause);
}
if (cause != null) {
Channels.fireExceptionCaughtLater(ctx.getChannel(), cause);
}
}
} }