From 3ac6a827df4de91ac843d027075844087c845de9 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 11 Aug 2020 08:54:00 +0200 Subject: [PATCH] Revert #10326 due regression in FlowControlHandler Motivation: This reverts commit b559711f3e1b712c3f1dfc77d5504cd1ca58aa38 due regression introduced by it. Modification: Revert commit Result: Fixes https://github.com/netty/netty/issues/10464 --- .../handler/flow/FlowControlHandler.java | 13 ++-- .../handler/flow/FlowControlHandlerTest.java | 61 +------------------ 2 files changed, 6 insertions(+), 68 deletions(-) diff --git a/handler/src/main/java/io/netty/handler/flow/FlowControlHandler.java b/handler/src/main/java/io/netty/handler/flow/FlowControlHandler.java index 7c6665c81f..72377c2c20 100644 --- a/handler/src/main/java/io/netty/handler/flow/FlowControlHandler.java +++ b/handler/src/main/java/io/netty/handler/flow/FlowControlHandler.java @@ -74,17 +74,12 @@ public class FlowControlHandler extends ChannelDuplexHandler { private ChannelConfig config; - private int readRequestCount; + private boolean shouldConsume; public FlowControlHandler() { this(true); } - /** - * @param releaseMessages If {@code false}, the handler won't release the buffered messages - * when the handler is removed. - * - */ public FlowControlHandler(boolean releaseMessages) { this.releaseMessages = releaseMessages; } @@ -145,7 +140,7 @@ public class FlowControlHandler extends ChannelDuplexHandler { // It seems no messages were consumed. We need to read() some // messages from upstream and once one arrives it need to be // relayed to downstream to keep the flow going. - ++readRequestCount; + shouldConsume = true; ctx.read(); } } @@ -161,8 +156,8 @@ public class FlowControlHandler extends ChannelDuplexHandler { // We just received one message. Do we need to relay it regardless // of the auto reading configuration? The answer is yes if this // method was called as a result of a prior read() call. - int minConsume = Math.min(readRequestCount, queue.size()); - readRequestCount -= minConsume; + int minConsume = shouldConsume ? 1 : 0; + shouldConsume = false; dequeue(ctx, minConsume); } diff --git a/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java b/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java index c1498ff31c..9201093a12 100644 --- a/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java @@ -304,7 +304,7 @@ public class FlowControlHandlerTest { // channelRead(2) peer.config().setAutoRead(true); setAutoReadLatch1.countDown(); - assertTrue(msgRcvLatch2.await(1L, SECONDS)); + assertTrue(msgRcvLatch1.await(1L, SECONDS)); // channelRead(3) peer.config().setAutoRead(true); @@ -353,7 +353,7 @@ public class FlowControlHandlerTest { // Write the message client.writeAndFlush(newOneMessage()) - .syncUninterruptibly(); + .syncUninterruptibly(); // channelRead(1) peer.read(); @@ -373,63 +373,6 @@ public class FlowControlHandlerTest { } } - /** - * The {@link FlowControlHandler} will keep track of read calls when - * when read is called multiple times when the FlowControlHandler queue is empty. - */ - @Test - public void testTrackReadCallCount() throws Exception { - final Exchanger peerRef = new Exchanger(); - final CountDownLatch msgRcvLatch1 = new CountDownLatch(1); - final CountDownLatch msgRcvLatch2 = new CountDownLatch(2); - final CountDownLatch msgRcvLatch3 = new CountDownLatch(3); - - ChannelInboundHandlerAdapter handler = new ChannelDuplexHandler() { - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - ctx.fireChannelActive(); - peerRef.exchange(ctx.channel(), 1L, SECONDS); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) { - msgRcvLatch1.countDown(); - msgRcvLatch2.countDown(); - msgRcvLatch3.countDown(); - } - }; - - FlowControlHandler flow = new FlowControlHandler(); - Channel server = newServer(false, flow, handler); - Channel client = newClient(server.localAddress()); - try { - // The client connection on the server side - Channel peer = peerRef.exchange(null, 1L, SECONDS); - - // Confirm that the queue is empty - assertTrue(flow.isQueueEmpty()); - // Request read 3 times - peer.read(); - peer.read(); - peer.read(); - - // Write the message - client.writeAndFlush(newOneMessage()) - .syncUninterruptibly(); - - // channelRead(1) - assertTrue(msgRcvLatch1.await(1L, SECONDS)); - // channelRead(2) - assertTrue(msgRcvLatch2.await(1L, SECONDS)); - // channelRead(3) - assertTrue(msgRcvLatch3.await(1L, SECONDS)); - assertTrue(flow.isQueueEmpty()); - } finally { - client.close(); - server.close(); - } - } - @Test public void testReentranceNotCausesNPE() throws Throwable { final Exchanger peerRef = new Exchanger();