FlowControlHandlerTest synchronization issues

Motivation:
2b65258568 only partially addressed the synchronization issues that are present in FlowControlHandlerTest. A few tests are attempting to validate state changes made across an EventLoop thread and the JUnit thread but are not properly synchronized.

Modifications:
- Ensure that conditions which verify expectations set in another thread have synchronization gates to ensure the event has actually occurred.
- Remove the message counter verification in favor of using individual CountDownLatch objects

Result:
FLowControlHanderTest has less race conditions which may lead to test failures.
This commit is contained in:
Scott Mitchell 2016-06-10 11:31:42 -07:00 committed by Norman Maurer
parent eb1d932466
commit a7496ed83d

View File

@ -47,6 +47,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static java.util.concurrent.TimeUnit.*;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -145,7 +146,7 @@ public class FlowControlHandlerTest {
// We received three messages even through auto reading // We received three messages even through auto reading
// was turned off after we received the first message. // was turned off after we received the first message.
assertTrue(latch.await(1L, TimeUnit.SECONDS)); assertTrue(latch.await(1L, SECONDS));
} finally { } finally {
client.close(); client.close();
server.close(); server.close();
@ -169,7 +170,7 @@ public class FlowControlHandlerTest {
ChannelInboundHandlerAdapter handler = new ChannelInboundHandlerAdapter() { ChannelInboundHandlerAdapter handler = new ChannelInboundHandlerAdapter() {
@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
peerRef.exchange(ctx.channel(), 1L, TimeUnit.SECONDS); peerRef.exchange(ctx.channel(), 1L, SECONDS);
ctx.fireChannelActive(); ctx.fireChannelActive();
} }
@ -185,7 +186,7 @@ public class FlowControlHandlerTest {
try { try {
// The client connection on the server side // The client connection on the server side
Channel peer = peerRef.exchange(null, 1L, TimeUnit.SECONDS); Channel peer = peerRef.exchange(null, 1L, SECONDS);
// Write the message // Write the message
client.writeAndFlush(newOneMessage()) client.writeAndFlush(newOneMessage())
@ -197,7 +198,7 @@ public class FlowControlHandlerTest {
// We received all three messages but hoped that only one // We received all three messages but hoped that only one
// message was read because auto reading was off and we // message was read because auto reading was off and we
// invoked the read() method only once. // invoked the read() method only once.
assertTrue(latch.await(1L, TimeUnit.SECONDS)); assertTrue(latch.await(1L, SECONDS));
} finally { } finally {
client.close(); client.close();
server.close(); server.close();
@ -228,7 +229,7 @@ public class FlowControlHandlerTest {
.syncUninterruptibly(); .syncUninterruptibly();
// We should receive 3 messages // We should receive 3 messages
assertTrue(latch.await(1L, TimeUnit.SECONDS)); assertTrue(latch.await(1L, SECONDS));
assertTrue(flow.isQueueEmpty()); assertTrue(flow.isQueueEmpty());
} finally { } finally {
client.close(); client.close();
@ -243,28 +244,48 @@ public class FlowControlHandlerTest {
@Test @Test
public void testFlowToggleAutoRead() throws Exception { public void testFlowToggleAutoRead() throws Exception {
final Exchanger<Channel> peerRef = new Exchanger<Channel>(); final Exchanger<Channel> peerRef = new Exchanger<Channel>();
final AtomicReference<CountDownLatch> latchRef final CountDownLatch msgRcvLatch1 = new CountDownLatch(1);
= new AtomicReference<CountDownLatch>(new CountDownLatch(1)); final CountDownLatch msgRcvLatch2 = new CountDownLatch(1);
final CountDownLatch msgRcvLatch3 = new CountDownLatch(1);
final AtomicInteger counter = new AtomicInteger(); final CountDownLatch setAutoReadLatch1 = new CountDownLatch(1);
final CountDownLatch setAutoReadLatch2 = new CountDownLatch(1);
ChannelInboundHandlerAdapter handler = new ChannelInboundHandlerAdapter() { ChannelInboundHandlerAdapter handler = new ChannelInboundHandlerAdapter() {
private int msgRcvCount;
private int expectedMsgCount;
@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
peerRef.exchange(ctx.channel(), 1L, TimeUnit.SECONDS); peerRef.exchange(ctx.channel(), 1L, SECONDS);
ctx.fireChannelActive(); ctx.fireChannelActive();
} }
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { public void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException {
ReferenceCountUtil.release(msg); ReferenceCountUtil.release(msg);
// Disable auto reading after each message // Disable auto reading after each message
counter.incrementAndGet();
ctx.channel().config().setAutoRead(false); ctx.channel().config().setAutoRead(false);
CountDownLatch latch = latchRef.get(); if (msgRcvCount++ != expectedMsgCount) {
latch.countDown(); return;
}
switch (msgRcvCount) {
case 1:
msgRcvLatch1.countDown();
if (setAutoReadLatch1.await(1L, SECONDS)) {
++expectedMsgCount;
}
break;
case 2:
msgRcvLatch2.countDown();
if (setAutoReadLatch2.await(1L, SECONDS)) {
++expectedMsgCount;
}
break;
default:
msgRcvLatch3.countDown();
break;
}
} }
}; };
@ -273,29 +294,23 @@ public class FlowControlHandlerTest {
Channel client = newClient(server.localAddress()); Channel client = newClient(server.localAddress());
try { try {
// The client connection on the server side // The client connection on the server side
Channel peer = peerRef.exchange(null, 1L, TimeUnit.SECONDS); Channel peer = peerRef.exchange(null, 1L, SECONDS);
client.writeAndFlush(newOneMessage()) client.writeAndFlush(newOneMessage())
.syncUninterruptibly(); .syncUninterruptibly();
// channelRead(1) // channelRead(1)
assertTrue(latchRef.get().await(1L, TimeUnit.SECONDS)); assertTrue(msgRcvLatch1.await(1L, SECONDS));
assertFalse(peer.config().isAutoRead());
assertEquals(1, counter.get());
// channelRead(2) // channelRead(2)
latchRef.set(new CountDownLatch(1));
peer.config().setAutoRead(true); peer.config().setAutoRead(true);
assertTrue(latchRef.get().await(1L, TimeUnit.SECONDS)); setAutoReadLatch1.countDown();
assertFalse(peer.config().isAutoRead()); assertTrue(msgRcvLatch1.await(1L, SECONDS));
assertEquals(2, counter.get());
// channelRead(3) // channelRead(3)
latchRef.set(new CountDownLatch(1));
peer.config().setAutoRead(true); peer.config().setAutoRead(true);
assertTrue(latchRef.get().await(1L, TimeUnit.SECONDS)); setAutoReadLatch2.countDown();
assertFalse(peer.config().isAutoRead()); assertTrue(msgRcvLatch3.await(1L, SECONDS));
assertEquals(3, counter.get());
assertTrue(flow.isQueueEmpty()); assertTrue(flow.isQueueEmpty());
} finally { } finally {
client.close(); client.close();
@ -311,23 +326,22 @@ public class FlowControlHandlerTest {
@Test @Test
public void testFlowAutoReadOff() throws Exception { public void testFlowAutoReadOff() throws Exception {
final Exchanger<Channel> peerRef = new Exchanger<Channel>(); final Exchanger<Channel> peerRef = new Exchanger<Channel>();
final AtomicReference<CountDownLatch> latchRef final CountDownLatch msgRcvLatch1 = new CountDownLatch(1);
= new AtomicReference<CountDownLatch>(new CountDownLatch(1)); final CountDownLatch msgRcvLatch2 = new CountDownLatch(2);
final AtomicInteger counter = new AtomicInteger(); final CountDownLatch msgRcvLatch3 = new CountDownLatch(3);
ChannelInboundHandlerAdapter handler = new ChannelDuplexHandler() { ChannelInboundHandlerAdapter handler = new ChannelDuplexHandler() {
@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
peerRef.exchange(ctx.channel(), 1L, TimeUnit.SECONDS);
ctx.fireChannelActive(); ctx.fireChannelActive();
peerRef.exchange(ctx.channel(), 1L, SECONDS);
} }
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { public void channelRead(ChannelHandlerContext ctx, Object msg) {
counter.incrementAndGet(); msgRcvLatch1.countDown();
msgRcvLatch2.countDown();
CountDownLatch latch = latchRef.get(); msgRcvLatch3.countDown();
latch.countDown();
} }
}; };
@ -336,7 +350,7 @@ public class FlowControlHandlerTest {
Channel client = newClient(server.localAddress()); Channel client = newClient(server.localAddress());
try { try {
// The client connection on the server side // The client connection on the server side
Channel peer = peerRef.exchange(null, 1L, TimeUnit.SECONDS); Channel peer = peerRef.exchange(null, 1L, SECONDS);
// Write the message // Write the message
client.writeAndFlush(newOneMessage()) client.writeAndFlush(newOneMessage())
@ -344,20 +358,15 @@ public class FlowControlHandlerTest {
// channelRead(1) // channelRead(1)
peer.read(); peer.read();
assertTrue(latchRef.get().await(1L, TimeUnit.SECONDS)); assertTrue(msgRcvLatch1.await(1L, SECONDS));
assertEquals(1, counter.get());
// channelRead(2) // channelRead(2)
latchRef.set(new CountDownLatch(1));
peer.read(); peer.read();
assertTrue(latchRef.get().await(1L, TimeUnit.SECONDS)); assertTrue(msgRcvLatch2.await(1L, SECONDS));
assertEquals(2, counter.get());
// channelRead(3) // channelRead(3)
latchRef.set(new CountDownLatch(1));
peer.read(); peer.read();
assertTrue(latchRef.get().await(1L, TimeUnit.SECONDS)); assertTrue(msgRcvLatch3.await(1L, SECONDS));
assertEquals(3, counter.get());
assertTrue(flow.isQueueEmpty()); assertTrue(flow.isQueueEmpty());
} finally { } finally {
client.close(); client.close();