From f66412c84c57f77915accb003dec8dc9b3896afe Mon Sep 17 00:00:00 2001 From: feijermu Date: Tue, 19 May 2020 15:46:12 +0800 Subject: [PATCH] Dequeue all cached messages and destroy the queue instance after removing the FlowControlHandler from channel pipeline. (#10304) Motivation: The `FlowControlHandler` may cache the received messages in a queue in order to do the flow control. However, if this handler is manually removed from pipeline during runtime, those cached messages might not be passed to the next channel handler forever. Modification: Dequeue all these cached messages and call `ChannelHandlerContext.fireChannelRead(...)` in `handlerRemoved(...)` method. Result: Avoid losing the received messages. --- .../handler/flow/FlowControlHandler.java | 9 +++ .../handler/flow/FlowControlHandlerTest.java | 59 +++++++++++++++++++ 2 files changed, 68 insertions(+) diff --git a/handler/src/main/java/io/netty/handler/flow/FlowControlHandler.java b/handler/src/main/java/io/netty/handler/flow/FlowControlHandler.java index c79c36a580..72377c2c20 100644 --- a/handler/src/main/java/io/netty/handler/flow/FlowControlHandler.java +++ b/handler/src/main/java/io/netty/handler/flow/FlowControlHandler.java @@ -119,6 +119,15 @@ public class FlowControlHandler extends ChannelDuplexHandler { config = ctx.channel().config(); } + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + super.handlerRemoved(ctx); + if (!isQueueEmpty()) { + dequeue(ctx, queue.size()); + } + destroy(); + } + @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { destroy(); diff --git a/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java b/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java index 5876137d50..e1f053bc51 100644 --- a/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java @@ -479,6 +479,65 @@ public class FlowControlHandlerTest { assertFalse(channel.finish()); } + @Test + public void testRemoveFlowControl() throws Exception { + final CountDownLatch latch = new CountDownLatch(3); + + ChannelInboundHandlerAdapter handler = new ChannelDuplexHandler() { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + //do the first read + ctx.read(); + super.channelActive(ctx); + } + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + latch.countDown(); + super.channelRead(ctx, msg); + } + }; + + FlowControlHandler flow = new FlowControlHandler() { + private int num; + @Override + public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { + super.channelRead(ctx, msg); + ++num; + if (num >= 3) { + //We have received 3 messages. Remove myself later + final ChannelHandler handler = this; + ctx.channel().eventLoop().execute(new Runnable() { + @Override + public void run() { + ctx.pipeline().remove(handler); + } + }); + } + } + }; + ChannelInboundHandlerAdapter tail = new ChannelInboundHandlerAdapter() { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + //consume this msg + ReferenceCountUtil.release(msg); + } + }; + + Channel server = newServer(false /* no auto read */, flow, handler, tail); + Channel client = newClient(server.localAddress()); + try { + // Write one message + client.writeAndFlush(newOneMessage()).sync(); + + // We should receive 3 messages + assertTrue(latch.await(1L, SECONDS)); + assertTrue(flow.isQueueEmpty()); + } finally { + client.close(); + server.close(); + } + } + /** * This is a fictional message decoder. It decodes each {@code byte} * into three strings.