Fix for issue #2765 relative to unstable trafficshaping test procedure

Motivation:

The test procedure is unstable due to not enough precise timestamping
during the check.

Modifications:

Reducing the test cases and cibling "stable" test ("timestamp-able")
bring more stability to the tests.

Result:

Tests for TrafficShapingHandler seem more stable (whatever using JVM 6,
7 or 8).
This commit is contained in:
Trustin Lee 2014-08-15 10:18:05 -07:00
parent 02643e41fb
commit 88c273364d

View File

@ -49,20 +49,27 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.*; import static org.junit.Assert.*;
public class TrafficShapingTest extends AbstractSocketTest { public class TrafficShapingHandlerTest extends AbstractSocketTest {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(TrafficShapingTest.class); private static final InternalLogger logger = InternalLoggerFactory.getInstance(TrafficShapingHandlerTest.class);
private static final InternalLogger loggerServer = InternalLoggerFactory.getInstance("ServerTSH");
private static final InternalLogger loggerClient = InternalLoggerFactory.getInstance("ClientTSH");
static final int messageSize = 1024; static final int messageSize = 1024;
static final int bandwidthFactor = 15; static final int bandwidthFactor = 12;
static final int minfactor = bandwidthFactor - (bandwidthFactor / 2) - 1; static final int minfactor = 3;
static final int maxfactor = bandwidthFactor + (bandwidthFactor / 2); static final int maxfactor = bandwidthFactor + (bandwidthFactor / 2);
static final long stepms = 1000 / bandwidthFactor; static final long stepms = 1000 / bandwidthFactor;
static final long minimalms = Math.max(stepms / 2, 20) / 10 * 10; static final long minimalms = Math.max(stepms / 2, 20) / 10 * 10;
static final long check = Math.max(Math.min(100, minimalms / 2) / 10 * 10, 20); static final long check = 10;
private static final Random random = new Random(); private static final Random random = new Random();
static final byte[] data = new byte[messageSize]; static final byte[] data = new byte[messageSize];
private static final String TRAFFIC = "traffic";
private static String TESTNAME;
private static int TESTRUN;
private static EventExecutorGroup group; private static EventExecutorGroup group;
private static EventExecutorGroup groupForGlobal;
private static ScheduledExecutorService executor = Executors.newScheduledThreadPool(10); private static ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
static { static {
random.nextBytes(data); random.nextBytes(data);
@ -70,17 +77,20 @@ public class TrafficShapingTest extends AbstractSocketTest {
@BeforeClass @BeforeClass
public static void createGroup() { public static void createGroup() {
logger.info("Bandwidth: " + minfactor + " <= " + bandwidthFactor + " <= " + maxfactor +
" StepMs: " + stepms + " MinMs: " + minimalms + " CheckMs: " + check);
InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory()); InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
Logger logger = (Logger) LoggerFactory.getLogger("ROOT"); Logger logger = (Logger) LoggerFactory.getLogger("ROOT");
logger.setLevel(Level.INFO); logger.setLevel(Level.INFO);
logger.info("Bandwidth: " + minfactor + " <= " + bandwidthFactor + " <= " + maxfactor +
" StepMs: " + stepms + " MinMs: " + minimalms + " CheckMs: " + check);
group = new DefaultEventExecutorGroup(8); group = new DefaultEventExecutorGroup(8);
groupForGlobal = new DefaultEventExecutorGroup(8);
} }
@AfterClass @AfterClass
public static void destroyGroup() throws Exception { public static void destroyGroup() throws Exception {
group.shutdownGracefully().sync(); group.shutdownGracefully().sync();
groupForGlobal.shutdownGracefully().sync();
executor.shutdown();
} }
private static long[] computeWaitRead(int[] multipleMessage) { private static long[] computeWaitRead(int[] multipleMessage) {
@ -100,9 +110,27 @@ public class TrafficShapingTest extends AbstractSocketTest {
return minimalWaitBetween; return minimalWaitBetween;
} }
private static long[] computeWaitAutoRead(int []autoRead) {
long [] minimalWaitBetween = new long[autoRead.length + 1];
minimalWaitBetween[0] = 0;
for (int i = 0; i < autoRead.length; i++) {
if (autoRead[i] != 0) {
if (autoRead[i] > 0) {
minimalWaitBetween[i + 1] = -1;
} else {
minimalWaitBetween[i + 1] = check;
}
} else {
minimalWaitBetween[i + 1] = 0;
}
}
return minimalWaitBetween;
}
@Test(timeout = 10000) @Test(timeout = 10000)
public void testNoTrafficShapping() throws Throwable { public void testNoTrafficShapping() throws Throwable {
logger.info("TEST NO TRAFFIC"); TESTNAME = "TEST NO TRAFFIC";
TESTRUN = 0;
run(); run();
} }
@ -113,35 +141,24 @@ public class TrafficShapingTest extends AbstractSocketTest {
testTrafficShapping0(sb, cb, false, false, false, false, autoRead, minimalWaitBetween, multipleMessage); testTrafficShapping0(sb, cb, false, false, false, false, autoRead, minimalWaitBetween, multipleMessage);
} }
@Test(timeout = 15000)
public void testExecNoTrafficShapping() throws Throwable {
logger.info("TEST EXEC NO TRAFFIC");
run();
}
public void testExecNoTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
int[] autoRead = null;
int[] multipleMessage = { 1, 2, 1 };
long[] minimalWaitBetween = null;
testTrafficShapping0(sb, cb, true, false, false, false, autoRead, minimalWaitBetween, multipleMessage);
}
@Test(timeout = 10000) @Test(timeout = 10000)
public void testWriteTrafficShapping() throws Throwable { public void testWriteTrafficShapping() throws Throwable {
logger.info("TEST WRITE"); TESTNAME = "TEST WRITE";
TESTRUN = 0;
run(); run();
} }
public void testWriteTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { public void testWriteTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
int[] autoRead = null; int[] autoRead = null;
int[] multipleMessage = { 1, 2, 1 }; int[] multipleMessage = { 1, 2, 1, 1 };
long[] minimalWaitBetween = computeWaitWrite(multipleMessage); long[] minimalWaitBetween = computeWaitWrite(multipleMessage);
testTrafficShapping0(sb, cb, false, false, true, false, autoRead, minimalWaitBetween, multipleMessage); testTrafficShapping0(sb, cb, false, false, true, false, autoRead, minimalWaitBetween, multipleMessage);
} }
@Test(timeout = 10000) @Test(timeout = 10000)
public void testReadTrafficShapping() throws Throwable { public void testReadTrafficShapping() throws Throwable {
logger.info("TEST READ"); TESTNAME = "TEST READ";
TESTRUN = 0;
run(); run();
} }
@ -154,7 +171,8 @@ public class TrafficShapingTest extends AbstractSocketTest {
@Test(timeout = 10000) @Test(timeout = 10000)
public void testWrite1TrafficShapping() throws Throwable { public void testWrite1TrafficShapping() throws Throwable {
logger.info("TEST WRITE"); TESTNAME = "TEST WRITE";
TESTRUN = 0;
run(); run();
} }
@ -167,7 +185,8 @@ public class TrafficShapingTest extends AbstractSocketTest {
@Test(timeout = 10000) @Test(timeout = 10000)
public void testRead1TrafficShapping() throws Throwable { public void testRead1TrafficShapping() throws Throwable {
logger.info("TEST READ"); TESTNAME = "TEST READ";
TESTRUN = 0;
run(); run();
} }
@ -178,48 +197,24 @@ public class TrafficShapingTest extends AbstractSocketTest {
testTrafficShapping0(sb, cb, false, true, false, false, autoRead, minimalWaitBetween, multipleMessage); testTrafficShapping0(sb, cb, false, true, false, false, autoRead, minimalWaitBetween, multipleMessage);
} }
@Test(timeout = 15000)
public void testExecWriteTrafficShapping() throws Throwable {
logger.info("TEST EXEC WRITE");
run();
}
public void testExecWriteTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
int[] autoRead = null;
int[] multipleMessage = { 1, 2, 1 };
long[] minimalWaitBetween = computeWaitWrite(multipleMessage);
testTrafficShapping0(sb, cb, true, false, true, false, autoRead, minimalWaitBetween, multipleMessage);
}
@Test(timeout = 10000)
public void testExecReadTrafficShapping() throws Throwable {
logger.info("TEST EXEC READ");
run();
}
public void testExecReadTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
int[] autoRead = null;
int[] multipleMessage = { 1, 2, 1, 1 };
long[] minimalWaitBetween = computeWaitRead(multipleMessage);
testTrafficShapping0(sb, cb, true, true, false, false, autoRead, minimalWaitBetween, multipleMessage);
}
@Test(timeout = 10000) @Test(timeout = 10000)
public void testWriteGlobalTrafficShapping() throws Throwable { public void testWriteGlobalTrafficShapping() throws Throwable {
logger.info("TEST GLOBAL WRITE"); TESTNAME = "TEST GLOBAL WRITE";
TESTRUN = 0;
run(); run();
} }
public void testWriteGlobalTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { public void testWriteGlobalTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
int[] autoRead = null; int[] autoRead = null;
int[] multipleMessage = { 1, 2, 1 }; int[] multipleMessage = { 1, 2, 1, 1 };
long[] minimalWaitBetween = computeWaitWrite(multipleMessage); long[] minimalWaitBetween = computeWaitWrite(multipleMessage);
testTrafficShapping0(sb, cb, false, false, true, true, autoRead, minimalWaitBetween, multipleMessage); testTrafficShapping0(sb, cb, false, false, true, true, autoRead, minimalWaitBetween, multipleMessage);
} }
@Test(timeout = 10000) @Test(timeout = 10000)
public void testReadGlobalTrafficShapping() throws Throwable { public void testReadGlobalTrafficShapping() throws Throwable {
logger.info("TEST GLOBAL READ"); TESTNAME = "TEST GLOBAL READ";
TESTRUN = 0;
run(); run();
} }
@ -232,52 +227,32 @@ public class TrafficShapingTest extends AbstractSocketTest {
@Test(timeout = 10000) @Test(timeout = 10000)
public void testAutoReadTrafficShapping() throws Throwable { public void testAutoReadTrafficShapping() throws Throwable {
logger.info("TEST AUTO READ"); TESTNAME = "TEST AUTO READ";
TESTRUN = 0;
run(); run();
} }
public void testAutoReadTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { public void testAutoReadTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 }; int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 };
int[] multipleMessage = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }; int[] multipleMessage = new int[autoRead.length];
long[] minimalWaitBetween = computeWaitRead(multipleMessage); Arrays.fill(multipleMessage, 1);
long[] minimalWaitBetween = computeWaitAutoRead(autoRead);
testTrafficShapping0(sb, cb, false, true, false, false, autoRead, minimalWaitBetween, multipleMessage); testTrafficShapping0(sb, cb, false, true, false, false, autoRead, minimalWaitBetween, multipleMessage);
} }
@Test(timeout = 10000) @Test(timeout = 10000)
public void testAutoReadGlobalTrafficShapping() throws Throwable { public void testAutoReadGlobalTrafficShapping() throws Throwable {
logger.info("TEST AUTO READ GLOBAL"); TESTNAME = "TEST AUTO READ GLOBAL";
TESTRUN = 0;
run(); run();
} }
public void testAutoReadGlobalTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { public void testAutoReadGlobalTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 }; int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 };
int[] multipleMessage = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }; int[] multipleMessage = new int[autoRead.length];
long[] minimalWaitBetween = computeWaitRead(multipleMessage); Arrays.fill(multipleMessage, 1);
long[] minimalWaitBetween = computeWaitAutoRead(autoRead);
testTrafficShapping0(sb, cb, false, true, false, true, autoRead, minimalWaitBetween, multipleMessage); testTrafficShapping0(sb, cb, false, true, false, true, autoRead, minimalWaitBetween, multipleMessage);
} }
@Test(timeout = 10000)
public void testAutoReadExecTrafficShapping() throws Throwable {
logger.info("TEST AUTO READ EXEC");
run();
}
public void testAutoReadExecTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 };
int[] multipleMessage = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
long[] minimalWaitBetween = computeWaitRead(multipleMessage);
testTrafficShapping0(sb, cb, true, true, false, false, autoRead, minimalWaitBetween, multipleMessage);
}
@Test(timeout = 10000)
public void testAutoReadExecGlobalTrafficShapping() throws Throwable {
logger.info("TEST AUTO READ EXEC GLOBAL");
run();
}
public void testAutoReadExecGlobalTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 };
int[] multipleMessage = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
long[] minimalWaitBetween = computeWaitRead(multipleMessage);
testTrafficShapping0(sb, cb, true, true, false, true, autoRead, minimalWaitBetween, multipleMessage);
}
/** /**
* *
@ -303,23 +278,25 @@ public class TrafficShapingTest extends AbstractSocketTest {
private static void testTrafficShapping0(ServerBootstrap sb, Bootstrap cb, final boolean additionalExecutor, private static void testTrafficShapping0(ServerBootstrap sb, Bootstrap cb, final boolean additionalExecutor,
final boolean limitRead, final boolean limitWrite, final boolean globalLimit, int[] autoRead, final boolean limitRead, final boolean limitWrite, final boolean globalLimit, int[] autoRead,
long[] minimalWaitBetween, int[] multipleMessage) throws Throwable { long[] minimalWaitBetween, int[] multipleMessage) throws Throwable {
logger.info("Exec: " + additionalExecutor + " Read: " + limitRead + " Write: " + limitWrite + " Global: " TESTRUN ++;
logger.info("TEST: " + TESTNAME + " RUN: " + TESTRUN +
" Exec: " + additionalExecutor + " Read: " + limitRead + " Write: " + limitWrite + " Global: "
+ globalLimit); + globalLimit);
final ValidTimestampedHandler sh = new ValidTimestampedHandler(autoRead, multipleMessage); final ServerHandler sh = new ServerHandler(autoRead, multipleMessage);
Promise<Boolean> promise = group.next().newPromise(); Promise<Boolean> promise = group.next().newPromise();
final ClientTrafficHandler ch = new ClientTrafficHandler(promise, minimalWaitBetween, multipleMessage, final ClientHandler ch = new ClientHandler(promise, minimalWaitBetween, multipleMessage,
autoRead); autoRead);
final AbstractTrafficShapingHandler handler; final AbstractTrafficShapingHandler handler;
if (limitRead) { if (limitRead) {
if (globalLimit) { if (globalLimit) {
handler = new GlobalTrafficShapingHandler(group, 0, bandwidthFactor * messageSize, check); handler = new GlobalTrafficShapingHandler(groupForGlobal, 0, bandwidthFactor * messageSize, check);
} else { } else {
handler = new ChannelTrafficShapingHandler(0, bandwidthFactor * messageSize, check); handler = new ChannelTrafficShapingHandler(0, bandwidthFactor * messageSize, check);
} }
} else if (limitWrite) { } else if (limitWrite) {
if (globalLimit) { if (globalLimit) {
handler = new GlobalTrafficShapingHandler(group, bandwidthFactor * messageSize, 0, check); handler = new GlobalTrafficShapingHandler(groupForGlobal, bandwidthFactor * messageSize, 0, check);
} else { } else {
handler = new ChannelTrafficShapingHandler(bandwidthFactor * messageSize, 0, check); handler = new ChannelTrafficShapingHandler(bandwidthFactor * messageSize, 0, check);
} }
@ -331,35 +308,19 @@ public class TrafficShapingTest extends AbstractSocketTest {
@Override @Override
protected void initChannel(SocketChannel c) throws Exception { protected void initChannel(SocketChannel c) throws Exception {
if (limitRead) { if (limitRead) {
if (additionalExecutor) { c.pipeline().addLast(TRAFFIC, handler);
c.pipeline().addLast(group, handler);
} else {
c.pipeline().addLast(handler);
} }
}
if (additionalExecutor) {
c.pipeline().addLast(group, sh);
} else {
c.pipeline().addLast(sh); c.pipeline().addLast(sh);
} }
}
}); });
cb.handler(new ChannelInitializer<SocketChannel>() { cb.handler(new ChannelInitializer<SocketChannel>() {
@Override @Override
protected void initChannel(SocketChannel c) throws Exception { protected void initChannel(SocketChannel c) throws Exception {
if (limitWrite) { if (limitWrite) {
if (additionalExecutor) { c.pipeline().addLast(TRAFFIC, handler);
c.pipeline().addLast(group, handler);
} else {
c.pipeline().addLast(handler);
} }
}
if (additionalExecutor) {
c.pipeline().addLast(group, ch);
} else {
c.pipeline().addLast(ch); c.pipeline().addLast(ch);
} }
}
}); });
Channel sc = sb.bind().sync().channel(); Channel sc = sb.bind().sync().channel();
@ -381,7 +342,8 @@ public class TrafficShapingTest extends AbstractSocketTest {
assertTrue("Error during exceution of TrafficShapping: " + promise.cause(), promise.isSuccess()); assertTrue("Error during exceution of TrafficShapping: " + promise.cause(), promise.isSuccess());
float average = (totalNb * messageSize) / (float) (stop - start); float average = (totalNb * messageSize) / (float) (stop - start);
logger.info("Average of traffic: " + average + " compare to " + bandwidthFactor); logger.info("TEST: " + TESTNAME + " RUN: " + TESTRUN +
" Average of traffic: " + average + " compare to " + bandwidthFactor);
sh.channel.close().sync(); sh.channel.close().sync();
ch.channel.close().sync(); ch.channel.close().sync();
sc.close().sync(); sc.close().sync();
@ -419,7 +381,7 @@ public class TrafficShapingTest extends AbstractSocketTest {
} }
} }
private static class ClientTrafficHandler extends SimpleChannelInboundHandler<ByteBuf> { private static class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
volatile Channel channel; volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
volatile int step; volatile int step;
@ -430,7 +392,7 @@ public class TrafficShapingTest extends AbstractSocketTest {
private final int[] autoRead; private final int[] autoRead;
final Promise<Boolean> promise; final Promise<Boolean> promise;
ClientTrafficHandler(Promise<Boolean> promise, long[] minimalWaitBetween, int[] multipleMessage, ClientHandler(Promise<Boolean> promise, long[] minimalWaitBetween, int[] multipleMessage,
int[] autoRead) { int[] autoRead) {
this.minimalWaitBetween = minimalWaitBetween; this.minimalWaitBetween = minimalWaitBetween;
this.multipleMessage = Arrays.copyOf(multipleMessage, multipleMessage.length); this.multipleMessage = Arrays.copyOf(multipleMessage, multipleMessage.length);
@ -446,6 +408,7 @@ public class TrafficShapingTest extends AbstractSocketTest {
@Override @Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception { public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
long lastTimestamp = 0; long lastTimestamp = 0;
loggerClient.debug("Step: " + step + " Read: " + (in.readableBytes() / 8) + " blocks");
while (in.isReadable()) { while (in.isReadable()) {
lastTimestamp = in.readLong(); lastTimestamp = in.readLong();
multipleMessage[step]--; multipleMessage[step]--;
@ -459,16 +422,9 @@ public class TrafficShapingTest extends AbstractSocketTest {
if (autoRead != null) { if (autoRead != null) {
if (step > 0 && autoRead[step - 1] != 0) { if (step > 0 && autoRead[step - 1] != 0) {
ar = autoRead[step - 1]; ar = autoRead[step - 1];
if (ar > 0) {
minimalWait = -1;
} else {
minimalWait = minimalms / 3;
}
} else {
minimalWait = 0;
} }
} }
logger.info("Step: " + step + " Interval: " + (lastTimestamp - currentLastTime) + " compareTo " loggerClient.info("Step: " + step + " Interval: " + (lastTimestamp - currentLastTime) + " compareTo "
+ minimalWait + " (" + ar + ")"); + minimalWait + " (" + ar + ")");
assertTrue("The interval of time is incorrect:" + (lastTimestamp - currentLastTime) + " not> " assertTrue("The interval of time is incorrect:" + (lastTimestamp - currentLastTime) + " not> "
+ minimalWait, lastTimestamp - currentLastTime >= minimalWait); + minimalWait, lastTimestamp - currentLastTime >= minimalWait);
@ -495,14 +451,14 @@ public class TrafficShapingTest extends AbstractSocketTest {
} }
} }
private static class ValidTimestampedHandler extends SimpleChannelInboundHandler<ByteBuf> { private static class ServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final int[] autoRead; private final int[] autoRead;
private final int[] multipleMessage; private final int[] multipleMessage;
volatile Channel channel; volatile Channel channel;
volatile int step; volatile int step;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
ValidTimestampedHandler(int[] autoRead, int[] multipleMessage) { ServerHandler(int[] autoRead, int[] multipleMessage) {
this.autoRead = autoRead; this.autoRead = autoRead;
this.multipleMessage = Arrays.copyOf(multipleMessage, multipleMessage.length); this.multipleMessage = Arrays.copyOf(multipleMessage, multipleMessage.length);
} }
@ -515,34 +471,41 @@ public class TrafficShapingTest extends AbstractSocketTest {
@Override @Override
public void channelRead0(final ChannelHandlerContext ctx, ByteBuf in) throws Exception { public void channelRead0(final ChannelHandlerContext ctx, ByteBuf in) throws Exception {
byte[] actual = new byte[in.readableBytes()]; byte[] actual = new byte[in.readableBytes()];
int nb = actual.length / messageSize;
loggerServer.info("Step: " + step + " Read: " + nb + " blocks");
in.readBytes(actual); in.readBytes(actual);
long timestamp = System.currentTimeMillis(); long timestamp = System.currentTimeMillis();
int nb = actual.length / messageSize;
int isAutoRead = 0; int isAutoRead = 0;
int laststep = step; int laststep = step;
for (int i = 0; i < nb; i++) { for (int i = 0; i < nb; i++) {
multipleMessage[step]--; multipleMessage[step]--;
if (multipleMessage[step] == 0) { if (multipleMessage[step] == 0) {
// setAutoRead test
if (autoRead != null) { if (autoRead != null) {
isAutoRead = autoRead[step]; isAutoRead = autoRead[step];
} }
step++; step++;
} }
} }
if (laststep != step && autoRead != null && isAutoRead != 2) { if (laststep != step) {
// setAutoRead test
if (autoRead != null && isAutoRead != 2) {
if (isAutoRead != 0) { if (isAutoRead != 0) {
logger.info("Set AutoRead: " + (isAutoRead > 0) + " Step: " + step); loggerServer.info("Step: " + step + " Set AutoRead: " + (isAutoRead > 0));
channel.config().setAutoRead(isAutoRead > 0); channel.config().setAutoRead(isAutoRead > 0);
} else { } else {
logger.info("AutoRead: NO Step:" + step); loggerServer.info("Step: " + step + " AutoRead: NO");
} }
} }
logger.debug("Get: " + actual.length + " TS " + timestamp + " NB: " + nb); }
loggerServer.debug("Step: " + step + " Write: " + nb);
for (int i = 0; i < nb; i++) { for (int i = 0; i < nb; i++) {
channel.write(Unpooled.copyLong(timestamp)); channel.write(Unpooled.copyLong(timestamp));
} }
channel.flush(); channel.flush();
if (laststep != step && isAutoRead != 0) { if (laststep != step) {
// setAutoRead test
if (isAutoRead != 0) {
if (isAutoRead < 0) { if (isAutoRead < 0) {
final int exactStep = step; final int exactStep = step;
long wait = (isAutoRead == -1) ? minimalms : stepms + minimalms; long wait = (isAutoRead == -1) ? minimalms : stepms + minimalms;
@ -551,16 +514,17 @@ public class TrafficShapingTest extends AbstractSocketTest {
} }
executor.schedule(new Runnable() { executor.schedule(new Runnable() {
public void run() { public void run() {
logger.info("Reset AutoRead: Step " + exactStep); loggerServer.info("Step: " + exactStep + " Reset AutoRead");
channel.config().setAutoRead(true); channel.config().setAutoRead(true);
} }
}, wait, TimeUnit.MILLISECONDS); }, wait, TimeUnit.MILLISECONDS);
} else { } else {
if (isAutoRead > 1) { if (isAutoRead > 1) {
logger.info("Will Set AutoRead: Rrue, Step: " + step); loggerServer.debug("Step: " + step + " Will Set AutoRead: True");
final int exactStep = step;
executor.schedule(new Runnable() { executor.schedule(new Runnable() {
public void run() { public void run() {
logger.info("Set AutoRead: Rrue, Step: " + step); loggerServer.info("Step: " + exactStep + " Set AutoRead: True");
channel.config().setAutoRead(true); channel.config().setAutoRead(true);
} }
}, stepms + minimalms, TimeUnit.MILLISECONDS); }, stepms + minimalms, TimeUnit.MILLISECONDS);
@ -568,6 +532,7 @@ public class TrafficShapingTest extends AbstractSocketTest {
} }
} }
} }
}
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {