Fix flacky test introduced by af2f343648

Motivation:

af2f343648 introduced a test-case which was flacky due of multiple problems:

- we called writeAndFlush(...) in channelRead(...) and assumed it will only be called once. This is true most of the times but it may be called multile times if the data is fragemented.
- we didnt guard against the possibility that channelRead(...) is called with an empty buffer

Modifications:

- Call writeAndFlush(...) in channelActive(...) so we are sure its only called once and close the channel once we wrote the data
- only compare the data after we received a close so we are sure there isnt anything extra received
- check for exception and if we catched one fail the test.

Result:

No flacky test anymore and easier to debug issues that accour because of a catched exception.
This commit is contained in:
Norman Maurer 2017-12-15 11:09:40 +01:00
parent af2f343648
commit 661bd86829

View File

@ -21,12 +21,14 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.util.ReferenceCountUtil;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
@ -45,15 +47,15 @@ public class CompositeBufferGatheringWriteTest extends AbstractSocketTest {
Channel clientChannel = null;
try {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<ByteBuf> clientReceived = new AtomicReference<ByteBuf>();
final AtomicReference<Object> clientReceived = new AtomicReference<Object>();
sb.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ReferenceCountUtil.release(msg);
ctx.writeAndFlush(newCompositeBuffer(ctx.alloc()));
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(newCompositeBuffer(ctx.alloc()))
.addListener(ChannelFutureListener.CLOSE);
}
});
}
@ -73,16 +75,35 @@ public class CompositeBufferGatheringWriteTest extends AbstractSocketTest {
try {
if (msg instanceof ByteBuf) {
aggregator.writeBytes((ByteBuf) msg);
if (aggregator.readableBytes() == EXPECTED_BYTES) {
if (clientReceived.compareAndSet(null, aggregator)) {
latch.countDown();
}
}
}
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// IOException is fine as it will also close the channel and may just be a connection reset.
if (!(cause instanceof IOException)) {
clientReceived.set(cause);
latch.countDown();
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (clientReceived.compareAndSet(null, aggregator)) {
try {
assertEquals(EXPECTED_BYTES, aggregator.readableBytes());
} catch (Throwable cause) {
aggregator.release();
aggregator = null;
clientReceived.set(cause);
} finally {
latch.countDown();
}
}
}
});
}
});
@ -91,12 +112,17 @@ public class CompositeBufferGatheringWriteTest extends AbstractSocketTest {
clientChannel = cb.connect(serverChannel.localAddress()).syncUninterruptibly().channel();
ByteBuf expected = newCompositeBuffer(clientChannel.alloc());
clientChannel.writeAndFlush(expected.retainedSlice());
latch.await();
ByteBuf actual = clientReceived.get();
Object received = clientReceived.get();
if (received instanceof ByteBuf) {
ByteBuf actual = (ByteBuf) received;
assertEquals(expected, actual);
expected.release();
actual.release();
} else {
expected.release();
throw (Throwable) received;
}
} finally {
if (clientChannel != null) {
clientChannel.close().sync();