diff --git a/handler/src/main/java/io/netty/handler/flow/FlowControlHandler.java b/handler/src/main/java/io/netty/handler/flow/FlowControlHandler.java index 72377c2c20..7c6665c81f 100644 --- a/handler/src/main/java/io/netty/handler/flow/FlowControlHandler.java +++ b/handler/src/main/java/io/netty/handler/flow/FlowControlHandler.java @@ -74,12 +74,17 @@ public class FlowControlHandler extends ChannelDuplexHandler { private ChannelConfig config; - private boolean shouldConsume; + private int readRequestCount; public FlowControlHandler() { this(true); } + /** + * @param releaseMessages If {@code false}, the handler won't release the buffered messages + * when the handler is removed. + * + */ public FlowControlHandler(boolean releaseMessages) { this.releaseMessages = releaseMessages; } @@ -140,7 +145,7 @@ public class FlowControlHandler extends ChannelDuplexHandler { // It seems no messages were consumed. We need to read() some // messages from upstream and once one arrives it need to be // relayed to downstream to keep the flow going. - shouldConsume = true; + ++readRequestCount; ctx.read(); } } @@ -156,8 +161,8 @@ public class FlowControlHandler extends ChannelDuplexHandler { // We just received one message. Do we need to relay it regardless // of the auto reading configuration? The answer is yes if this // method was called as a result of a prior read() call. - int minConsume = shouldConsume ? 1 : 0; - shouldConsume = false; + int minConsume = Math.min(readRequestCount, queue.size()); + readRequestCount -= minConsume; dequeue(ctx, minConsume); } diff --git a/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java b/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java index 9201093a12..c1498ff31c 100644 --- a/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java @@ -304,7 +304,7 @@ public class FlowControlHandlerTest { // channelRead(2) peer.config().setAutoRead(true); setAutoReadLatch1.countDown(); - assertTrue(msgRcvLatch1.await(1L, SECONDS)); + assertTrue(msgRcvLatch2.await(1L, SECONDS)); // channelRead(3) peer.config().setAutoRead(true); @@ -353,7 +353,7 @@ public class FlowControlHandlerTest { // Write the message client.writeAndFlush(newOneMessage()) - .syncUninterruptibly(); + .syncUninterruptibly(); // channelRead(1) peer.read(); @@ -373,6 +373,63 @@ public class FlowControlHandlerTest { } } + /** + * The {@link FlowControlHandler} will keep track of read calls when + * when read is called multiple times when the FlowControlHandler queue is empty. + */ + @Test + public void testTrackReadCallCount() throws Exception { + final Exchanger peerRef = new Exchanger(); + final CountDownLatch msgRcvLatch1 = new CountDownLatch(1); + final CountDownLatch msgRcvLatch2 = new CountDownLatch(2); + final CountDownLatch msgRcvLatch3 = new CountDownLatch(3); + + ChannelInboundHandlerAdapter handler = new ChannelDuplexHandler() { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + ctx.fireChannelActive(); + peerRef.exchange(ctx.channel(), 1L, SECONDS); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + msgRcvLatch1.countDown(); + msgRcvLatch2.countDown(); + msgRcvLatch3.countDown(); + } + }; + + FlowControlHandler flow = new FlowControlHandler(); + Channel server = newServer(false, flow, handler); + Channel client = newClient(server.localAddress()); + try { + // The client connection on the server side + Channel peer = peerRef.exchange(null, 1L, SECONDS); + + // Confirm that the queue is empty + assertTrue(flow.isQueueEmpty()); + // Request read 3 times + peer.read(); + peer.read(); + peer.read(); + + // Write the message + client.writeAndFlush(newOneMessage()) + .syncUninterruptibly(); + + // channelRead(1) + assertTrue(msgRcvLatch1.await(1L, SECONDS)); + // channelRead(2) + assertTrue(msgRcvLatch2.await(1L, SECONDS)); + // channelRead(3) + assertTrue(msgRcvLatch3.await(1L, SECONDS)); + assertTrue(flow.isQueueEmpty()); + } finally { + client.close(); + server.close(); + } + } + @Test public void testReentranceNotCausesNPE() throws Throwable { final Exchanger peerRef = new Exchanger();