Revert #10326 due regression in FlowControlHandler

Motivation:

This reverts commit b559711f3e due regression introduced by it.

Modification:

Revert commit

Result:

Fixes https://github.com/netty/netty/issues/10464
This commit is contained in:
Norman Maurer 2020-08-11 08:54:00 +02:00
parent 330cdebf7c
commit 1208f27070
2 changed files with 6 additions and 68 deletions

View File

@ -72,17 +72,12 @@ public class FlowControlHandler implements ChannelHandler {
private ChannelConfig config; private ChannelConfig config;
private int readRequestCount; private boolean shouldConsume;
public FlowControlHandler() { public FlowControlHandler() {
this(true); this(true);
} }
/**
* @param releaseMessages If {@code false}, the handler won't release the buffered messages
* when the handler is removed.
*
*/
public FlowControlHandler(boolean releaseMessages) { public FlowControlHandler(boolean releaseMessages) {
this.releaseMessages = releaseMessages; this.releaseMessages = releaseMessages;
} }
@ -142,7 +137,7 @@ public class FlowControlHandler implements ChannelHandler {
// It seems no messages were consumed. We need to read() some // It seems no messages were consumed. We need to read() some
// messages from upstream and once one arrives it need to be // messages from upstream and once one arrives it need to be
// relayed to downstream to keep the flow going. // relayed to downstream to keep the flow going.
++readRequestCount; shouldConsume = true;
ctx.read(); ctx.read();
} }
} }
@ -158,8 +153,8 @@ public class FlowControlHandler implements ChannelHandler {
// We just received one message. Do we need to relay it regardless // We just received one message. Do we need to relay it regardless
// of the auto reading configuration? The answer is yes if this // of the auto reading configuration? The answer is yes if this
// method was called as a result of a prior read() call. // method was called as a result of a prior read() call.
int minConsume = Math.min(readRequestCount, queue.size()); int minConsume = shouldConsume ? 1 : 0;
readRequestCount -= minConsume; shouldConsume = false;
dequeue(ctx, minConsume); dequeue(ctx, minConsume);
} }

View File

@ -302,7 +302,7 @@ public class FlowControlHandlerTest {
// channelRead(2) // channelRead(2)
peer.config().setAutoRead(true); peer.config().setAutoRead(true);
setAutoReadLatch1.countDown(); setAutoReadLatch1.countDown();
assertTrue(msgRcvLatch2.await(1L, SECONDS)); assertTrue(msgRcvLatch1.await(1L, SECONDS));
// channelRead(3) // channelRead(3)
peer.config().setAutoRead(true); peer.config().setAutoRead(true);
@ -351,7 +351,7 @@ public class FlowControlHandlerTest {
// Write the message // Write the message
client.writeAndFlush(newOneMessage()) client.writeAndFlush(newOneMessage())
.syncUninterruptibly(); .syncUninterruptibly();
// channelRead(1) // channelRead(1)
peer.read(); peer.read();
@ -371,63 +371,6 @@ public class FlowControlHandlerTest {
} }
} }
/**
* The {@link FlowControlHandler} will keep track of read calls when
* when read is called multiple times when the FlowControlHandler queue is empty.
*/
@Test
public void testTrackReadCallCount() throws Exception {
final Exchanger<Channel> peerRef = new Exchanger<Channel>();
final CountDownLatch msgRcvLatch1 = new CountDownLatch(1);
final CountDownLatch msgRcvLatch2 = new CountDownLatch(2);
final CountDownLatch msgRcvLatch3 = new CountDownLatch(3);
ChannelHandler handler = new ChannelHandler() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
peerRef.exchange(ctx.channel(), 1L, SECONDS);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
msgRcvLatch1.countDown();
msgRcvLatch2.countDown();
msgRcvLatch3.countDown();
}
};
FlowControlHandler flow = new FlowControlHandler();
Channel server = newServer(false, flow, handler);
Channel client = newClient(server.localAddress());
try {
// The client connection on the server side
Channel peer = peerRef.exchange(null, 1L, SECONDS);
// Confirm that the queue is empty
assertTrue(flow.isQueueEmpty());
// Request read 3 times
peer.read();
peer.read();
peer.read();
// Write the message
client.writeAndFlush(newOneMessage())
.syncUninterruptibly();
// channelRead(1)
assertTrue(msgRcvLatch1.await(1L, SECONDS));
// channelRead(2)
assertTrue(msgRcvLatch2.await(1L, SECONDS));
// channelRead(3)
assertTrue(msgRcvLatch3.await(1L, SECONDS));
assertTrue(flow.isQueueEmpty());
} finally {
client.close();
server.close();
}
}
@Test @Test
public void testReentranceNotCausesNPE() throws Throwable { public void testReentranceNotCausesNPE() throws Throwable {
final Exchanger<Channel> peerRef = new Exchanger<Channel>(); final Exchanger<Channel> peerRef = new Exchanger<Channel>();