SocketGatheringWriteTest improvements
Motivation: SocketGatherWriteTest has been observed to fail and it has numerous issues which when resolved may help reduce the test failures. Modifications: - A volatile counter and a spin/sleep loop is used to trigger test termination. Incrementing a volatile is generally bad practice and can be avoided in this situation. This mechanism can be replaced by a promise. This mechanism should also trigger upon exception or channel inactive. - The TestHandler maintains an internal buffer, but it is not released. We now only create a buffer on the server side and release it after comparing the expected results. - The composite buffer creation logic can be simplified, also the existing composite buffer doesn't take into account the buffer's reader index when building buf2. Result: Cleaner test.
This commit is contained in:
parent
1ba592049c
commit
0bf614d9e9
@ -26,6 +26,8 @@ import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.testsuite.util.TestUtils;
|
||||
import io.netty.util.concurrent.ImmediateEventExecutor;
|
||||
import io.netty.util.concurrent.Promise;
|
||||
import io.netty.util.internal.StringUtil;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Rule;
|
||||
@ -36,7 +38,11 @@ import java.io.IOException;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static io.netty.buffer.Unpooled.compositeBuffer;
|
||||
import static io.netty.buffer.Unpooled.wrappedBuffer;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class SocketGatheringWriteTest extends AbstractSocketTest {
|
||||
|
||||
@ -108,7 +114,8 @@ public class SocketGatheringWriteTest extends AbstractSocketTest {
|
||||
sb.childOption(ChannelOption.AUTO_READ, autoRead);
|
||||
cb.option(ChannelOption.AUTO_READ, autoRead);
|
||||
|
||||
final TestHandler sh = new TestHandler(autoRead);
|
||||
Promise<Void> serverDonePromise = ImmediateEventExecutor.INSTANCE.newPromise();
|
||||
final TestServerHandler sh = new TestServerHandler(autoRead, serverDonePromise, data.length);
|
||||
final TestHandler ch = new TestHandler(autoRead);
|
||||
|
||||
cb.handler(ch);
|
||||
@ -119,18 +126,14 @@ public class SocketGatheringWriteTest extends AbstractSocketTest {
|
||||
|
||||
for (int i = 0; i < data.length;) {
|
||||
int length = Math.min(random.nextInt(1024 * 8), data.length - i);
|
||||
ByteBuf buf = Unpooled.wrappedBuffer(data, i, length);
|
||||
if (composite && i % 2 == 0) {
|
||||
int split = buf.readableBytes() / 2;
|
||||
int size = buf.readableBytes() - split;
|
||||
int oldIndex = buf.writerIndex();
|
||||
buf.writerIndex(split);
|
||||
ByteBuf buf2 = Unpooled.buffer(size).writeBytes(buf, split, oldIndex - split);
|
||||
CompositeByteBuf comp = Unpooled.compositeBuffer();
|
||||
comp.addComponent(true, buf).addComponent(true, buf2);
|
||||
int firstBufLength = length / 2;
|
||||
CompositeByteBuf comp = compositeBuffer();
|
||||
comp.addComponent(true, wrappedBuffer(data, i, firstBufLength))
|
||||
.addComponent(true, wrappedBuffer(data, i + firstBufLength, length - firstBufLength));
|
||||
cc.write(comp);
|
||||
} else {
|
||||
cc.write(buf);
|
||||
cc.write(wrappedBuffer(data, i, length));
|
||||
}
|
||||
i += length;
|
||||
}
|
||||
@ -146,19 +149,7 @@ public class SocketGatheringWriteTest extends AbstractSocketTest {
|
||||
throw t;
|
||||
}
|
||||
|
||||
while (sh.counter < data.length) {
|
||||
if (sh.exception.get() != null) {
|
||||
break;
|
||||
}
|
||||
if (ch.exception.get() != null) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(50);
|
||||
} catch (InterruptedException e) {
|
||||
// Ignore.
|
||||
}
|
||||
}
|
||||
serverDonePromise.sync();
|
||||
sh.channel.close().sync();
|
||||
ch.channel.close().sync();
|
||||
sc.close().sync();
|
||||
@ -175,49 +166,84 @@ public class SocketGatheringWriteTest extends AbstractSocketTest {
|
||||
if (ch.exception.get() != null) {
|
||||
throw ch.exception.get();
|
||||
}
|
||||
assertEquals(0, ch.counter);
|
||||
assertEquals(Unpooled.wrappedBuffer(data), sh.received);
|
||||
ByteBuf expected = wrappedBuffer(data);
|
||||
assertEquals(expected, sh.received);
|
||||
expected.release();
|
||||
sh.received.release();
|
||||
}
|
||||
|
||||
private static final class TestServerHandler extends TestHandler {
|
||||
private final int expectedBytes;
|
||||
private final Promise<Void> doneReadingPromise;
|
||||
final ByteBuf received = Unpooled.buffer();
|
||||
|
||||
TestServerHandler(boolean autoRead, Promise<Void> doneReadingPromise, int expectedBytes) {
|
||||
super(autoRead);
|
||||
this.doneReadingPromise = doneReadingPromise;
|
||||
this.expectedBytes = expectedBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
|
||||
received.writeBytes(in);
|
||||
if (received.readableBytes() >= expectedBytes) {
|
||||
doneReadingPromise.setSuccess(null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
void handleException(ChannelHandlerContext ctx, Throwable cause) {
|
||||
doneReadingPromise.tryFailure(cause);
|
||||
super.handleException(ctx, cause);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||
doneReadingPromise.tryFailure(new IllegalStateException("server closed!"));
|
||||
super.channelInactive(ctx);
|
||||
}
|
||||
}
|
||||
|
||||
private static class TestHandler extends SimpleChannelInboundHandler<ByteBuf> {
|
||||
private final boolean autoRead;
|
||||
volatile Channel channel;
|
||||
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
|
||||
volatile int counter;
|
||||
final ByteBuf received = Unpooled.buffer();
|
||||
|
||||
TestHandler(boolean autoRead) {
|
||||
this.autoRead = autoRead;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx)
|
||||
throws Exception {
|
||||
public final void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||
channel = ctx.channel();
|
||||
if (!autoRead) {
|
||||
ctx.read();
|
||||
}
|
||||
super.channelInactive(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
|
||||
counter += in.readableBytes();
|
||||
received.writeBytes(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||
public final void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||
if (!autoRead) {
|
||||
ctx.read();
|
||||
}
|
||||
super.channelReadComplete(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx,
|
||||
Throwable cause) throws Exception {
|
||||
public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
if (exception.compareAndSet(null, cause)) {
|
||||
ctx.close();
|
||||
handleException(ctx, cause);
|
||||
}
|
||||
super.exceptionCaught(ctx, cause);
|
||||
}
|
||||
|
||||
void handleException(ChannelHandlerContext ctx, Throwable cause) {
|
||||
ctx.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user