diff --git a/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java b/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java index f6cea33c3f..6070e9bf46 100644 --- a/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java @@ -47,6 +47,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static java.util.concurrent.TimeUnit.*; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -145,7 +146,7 @@ public class FlowControlHandlerTest { // We received three messages even through auto reading // was turned off after we received the first message. - assertTrue(latch.await(1L, TimeUnit.SECONDS)); + assertTrue(latch.await(1L, SECONDS)); } finally { client.close(); server.close(); @@ -169,7 +170,7 @@ public class FlowControlHandlerTest { ChannelInboundHandlerAdapter handler = new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { - peerRef.exchange(ctx.channel(), 1L, TimeUnit.SECONDS); + peerRef.exchange(ctx.channel(), 1L, SECONDS); ctx.fireChannelActive(); } @@ -185,7 +186,7 @@ public class FlowControlHandlerTest { try { // The client connection on the server side - Channel peer = peerRef.exchange(null, 1L, TimeUnit.SECONDS); + Channel peer = peerRef.exchange(null, 1L, SECONDS); // Write the message client.writeAndFlush(newOneMessage()) @@ -197,7 +198,7 @@ public class FlowControlHandlerTest { // We received all three messages but hoped that only one // message was read because auto reading was off and we // invoked the read() method only once. - assertTrue(latch.await(1L, TimeUnit.SECONDS)); + assertTrue(latch.await(1L, SECONDS)); } finally { client.close(); server.close(); @@ -228,7 +229,7 @@ public class FlowControlHandlerTest { .syncUninterruptibly(); // We should receive 3 messages - assertTrue(latch.await(1L, TimeUnit.SECONDS)); + assertTrue(latch.await(1L, SECONDS)); assertTrue(flow.isQueueEmpty()); } finally { client.close(); @@ -243,28 +244,48 @@ public class FlowControlHandlerTest { @Test public void testFlowToggleAutoRead() throws Exception { final Exchanger peerRef = new Exchanger(); - final AtomicReference latchRef - = new AtomicReference(new CountDownLatch(1)); - - final AtomicInteger counter = new AtomicInteger(); + final CountDownLatch msgRcvLatch1 = new CountDownLatch(1); + final CountDownLatch msgRcvLatch2 = new CountDownLatch(1); + final CountDownLatch msgRcvLatch3 = new CountDownLatch(1); + final CountDownLatch setAutoReadLatch1 = new CountDownLatch(1); + final CountDownLatch setAutoReadLatch2 = new CountDownLatch(1); ChannelInboundHandlerAdapter handler = new ChannelInboundHandlerAdapter() { + private int msgRcvCount; + private int expectedMsgCount; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { - peerRef.exchange(ctx.channel(), 1L, TimeUnit.SECONDS); + peerRef.exchange(ctx.channel(), 1L, SECONDS); ctx.fireChannelActive(); } @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) { + public void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException { ReferenceCountUtil.release(msg); // Disable auto reading after each message - counter.incrementAndGet(); ctx.channel().config().setAutoRead(false); - CountDownLatch latch = latchRef.get(); - latch.countDown(); + if (msgRcvCount++ != expectedMsgCount) { + return; + } + switch (msgRcvCount) { + case 1: + msgRcvLatch1.countDown(); + if (setAutoReadLatch1.await(1L, SECONDS)) { + ++expectedMsgCount; + } + break; + case 2: + msgRcvLatch2.countDown(); + if (setAutoReadLatch2.await(1L, SECONDS)) { + ++expectedMsgCount; + } + break; + default: + msgRcvLatch3.countDown(); + break; + } } }; @@ -273,29 +294,23 @@ public class FlowControlHandlerTest { Channel client = newClient(server.localAddress()); try { // The client connection on the server side - Channel peer = peerRef.exchange(null, 1L, TimeUnit.SECONDS); + Channel peer = peerRef.exchange(null, 1L, SECONDS); client.writeAndFlush(newOneMessage()) .syncUninterruptibly(); // channelRead(1) - assertTrue(latchRef.get().await(1L, TimeUnit.SECONDS)); - assertFalse(peer.config().isAutoRead()); - assertEquals(1, counter.get()); + assertTrue(msgRcvLatch1.await(1L, SECONDS)); // channelRead(2) - latchRef.set(new CountDownLatch(1)); peer.config().setAutoRead(true); - assertTrue(latchRef.get().await(1L, TimeUnit.SECONDS)); - assertFalse(peer.config().isAutoRead()); - assertEquals(2, counter.get()); + setAutoReadLatch1.countDown(); + assertTrue(msgRcvLatch1.await(1L, SECONDS)); // channelRead(3) - latchRef.set(new CountDownLatch(1)); peer.config().setAutoRead(true); - assertTrue(latchRef.get().await(1L, TimeUnit.SECONDS)); - assertFalse(peer.config().isAutoRead()); - assertEquals(3, counter.get()); + setAutoReadLatch2.countDown(); + assertTrue(msgRcvLatch3.await(1L, SECONDS)); assertTrue(flow.isQueueEmpty()); } finally { client.close(); @@ -311,23 +326,22 @@ public class FlowControlHandlerTest { @Test public void testFlowAutoReadOff() throws Exception { final Exchanger peerRef = new Exchanger(); - final AtomicReference latchRef - = new AtomicReference(new CountDownLatch(1)); - final AtomicInteger counter = new AtomicInteger(); + final CountDownLatch msgRcvLatch1 = new CountDownLatch(1); + final CountDownLatch msgRcvLatch2 = new CountDownLatch(2); + final CountDownLatch msgRcvLatch3 = new CountDownLatch(3); ChannelInboundHandlerAdapter handler = new ChannelDuplexHandler() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { - peerRef.exchange(ctx.channel(), 1L, TimeUnit.SECONDS); ctx.fireChannelActive(); + peerRef.exchange(ctx.channel(), 1L, SECONDS); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { - counter.incrementAndGet(); - - CountDownLatch latch = latchRef.get(); - latch.countDown(); + msgRcvLatch1.countDown(); + msgRcvLatch2.countDown(); + msgRcvLatch3.countDown(); } }; @@ -336,7 +350,7 @@ public class FlowControlHandlerTest { Channel client = newClient(server.localAddress()); try { // The client connection on the server side - Channel peer = peerRef.exchange(null, 1L, TimeUnit.SECONDS); + Channel peer = peerRef.exchange(null, 1L, SECONDS); // Write the message client.writeAndFlush(newOneMessage()) @@ -344,20 +358,15 @@ public class FlowControlHandlerTest { // channelRead(1) peer.read(); - assertTrue(latchRef.get().await(1L, TimeUnit.SECONDS)); - assertEquals(1, counter.get()); + assertTrue(msgRcvLatch1.await(1L, SECONDS)); // channelRead(2) - latchRef.set(new CountDownLatch(1)); peer.read(); - assertTrue(latchRef.get().await(1L, TimeUnit.SECONDS)); - assertEquals(2, counter.get()); + assertTrue(msgRcvLatch2.await(1L, SECONDS)); // channelRead(3) - latchRef.set(new CountDownLatch(1)); peer.read(); - assertTrue(latchRef.get().await(1L, TimeUnit.SECONDS)); - assertEquals(3, counter.get()); + assertTrue(msgRcvLatch3.await(1L, SECONDS)); assertTrue(flow.isQueueEmpty()); } finally { client.close();