diff --git a/handler/src/main/java/io/netty/handler/flow/FlowControlHandler.java b/handler/src/main/java/io/netty/handler/flow/FlowControlHandler.java index c79c36a580..72377c2c20 100644 --- a/handler/src/main/java/io/netty/handler/flow/FlowControlHandler.java +++ b/handler/src/main/java/io/netty/handler/flow/FlowControlHandler.java @@ -119,6 +119,15 @@ public class FlowControlHandler extends ChannelDuplexHandler { config = ctx.channel().config(); } + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + super.handlerRemoved(ctx); + if (!isQueueEmpty()) { + dequeue(ctx, queue.size()); + } + destroy(); + } + @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { destroy(); diff --git a/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java b/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java index 5876137d50..e1f053bc51 100644 --- a/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java @@ -479,6 +479,65 @@ public class FlowControlHandlerTest { assertFalse(channel.finish()); } + @Test + public void testRemoveFlowControl() throws Exception { + final CountDownLatch latch = new CountDownLatch(3); + + ChannelInboundHandlerAdapter handler = new ChannelDuplexHandler() { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + //do the first read + ctx.read(); + super.channelActive(ctx); + } + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + latch.countDown(); + super.channelRead(ctx, msg); + } + }; + + FlowControlHandler flow = new FlowControlHandler() { + private int num; + @Override + public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { + super.channelRead(ctx, msg); + ++num; + if (num >= 3) { + //We have received 3 messages. Remove myself later + final ChannelHandler handler = this; + ctx.channel().eventLoop().execute(new Runnable() { + @Override + public void run() { + ctx.pipeline().remove(handler); + } + }); + } + } + }; + ChannelInboundHandlerAdapter tail = new ChannelInboundHandlerAdapter() { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + //consume this msg + ReferenceCountUtil.release(msg); + } + }; + + Channel server = newServer(false /* no auto read */, flow, handler, tail); + Channel client = newClient(server.localAddress()); + try { + // Write one message + client.writeAndFlush(newOneMessage()).sync(); + + // We should receive 3 messages + assertTrue(latch.await(1L, SECONDS)); + assertTrue(flow.isQueueEmpty()); + } finally { + client.close(); + server.close(); + } + } + /** * This is a fictional message decoder. It decodes each {@code byte} * into three strings.