Fix for issue #2765 relative to unstable test

Fix for issue #2765 relative to unstable trafficshaping test procedure

Motivation:
The test procedure is unstable due to not enough precise timestamping during the check.

Modifications:
Reducing the test cases and cibling "stable" test ("timestamp-able") bring more stability to the tests.

Result:
Tests for TrafficShapingHandler seem more stable (whatever using JVM 6, 7 or 8).
Same version as in 4.0, 4.1 and Master.
This commit is contained in:
Frédéric Brégier 2014-08-16 18:24:35 +02:00
parent f6ce33f9a5
commit 7ff9a43271

View File

@ -29,8 +29,6 @@ import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler; import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.execution.ExecutionHandler;
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
import org.jboss.netty.handler.traffic.AbstractTrafficShapingHandler; import org.jboss.netty.handler.traffic.AbstractTrafficShapingHandler;
import org.jboss.netty.handler.traffic.ChannelTrafficShapingHandler; import org.jboss.netty.handler.traffic.ChannelTrafficShapingHandler;
import org.jboss.netty.handler.traffic.GlobalTrafficShapingHandler; import org.jboss.netty.handler.traffic.GlobalTrafficShapingHandler;
@ -55,23 +53,27 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.*; import static org.junit.Assert.*;
public class TrafficShapingTest { public class TrafficShapingHandlerTest {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(TrafficShapingTest.class); private static final InternalLogger logger = InternalLoggerFactory.getInstance(TrafficShapingHandlerTest.class);
private static final InternalLogger loggerServer = InternalLoggerFactory.getInstance("ServerTSH");
private static final InternalLogger loggerClient = InternalLoggerFactory.getInstance("ClientTSH");
static final int messageSize = 1024; static final int messageSize = 1024;
static final int bandwidthFactor = 8; static final int bandwidthFactor = 12;
static final int minfactor = bandwidthFactor - (bandwidthFactor / 2); static final int minfactor = 3;
static final int maxfactor = bandwidthFactor + (bandwidthFactor / 2); static final int maxfactor = bandwidthFactor + (bandwidthFactor / 2);
static final long stepms = (1000 / bandwidthFactor - 10) / 10 * 10; static final long stepms = (1000 / bandwidthFactor - 10) / 10 * 10;
static final long minimalms = Math.max(stepms / 2, 40) / 10 * 10; static final long minimalms = Math.max(stepms / 2, 20) / 10 * 10;
static final long check = Math.max(Math.min(100, minimalms / 2) / 10 * 10, 40); static final long check = 10;
private static final Random random = new Random(); private static final Random random = new Random();
static final byte[] data = new byte[messageSize]; static final byte[] data = new byte[messageSize];
private static final String TRAFFIC = "traffic";
private static String TESTNAME;
private static int TESTRUN;
private static ExecutorService group = Executors.newCachedThreadPool(); private static ExecutorService group = Executors.newCachedThreadPool();
private static Timer timer = new HashedWheelTimer(20, TimeUnit.MILLISECONDS); private static Timer timer = new HashedWheelTimer(20, TimeUnit.MILLISECONDS);
private static OrderedMemoryAwareThreadPoolExecutor executorHandler =
new OrderedMemoryAwareThreadPoolExecutor(10, 1000000, 10000000);
static { static {
random.nextBytes(data); random.nextBytes(data);
} }
@ -129,15 +131,15 @@ public class TrafficShapingTest {
private static int []autoRead; private static int []autoRead;
private static int []multipleMessage; private static int []multipleMessage;
private static long []minimalWaitBetween; private static long []minimalWaitBetween;
private static boolean limitRead, globalLimit, limitWrite, additionalExecutor; private static boolean limitRead, globalLimit, limitWrite;
private static ChannelFuture promise; private static ChannelFuture promise;
private static ValidTimestampedHandler sh; private static ServerHandler sh;
private static ClientTrafficHandler ch; private static ClientHandler ch;
private static AbstractTrafficShapingHandler handler; private static AbstractTrafficShapingHandler handler;
private static ChannelPipeline getPipelineTraffic(boolean server) { private static ChannelPipeline getPipelineTraffic(boolean server) {
if (server) { if (server) {
sh = new ValidTimestampedHandler(autoRead, multipleMessage); sh = new ServerHandler(autoRead, multipleMessage);
ChannelPipeline p = Channels.pipeline(); ChannelPipeline p = Channels.pipeline();
if (limitRead) { if (limitRead) {
if (globalLimit) { if (globalLimit) {
@ -145,16 +147,13 @@ public class TrafficShapingTest {
} else { } else {
handler = new ChannelTrafficShapingHandler(timer, 0, bandwidthFactor * messageSize, check); handler = new ChannelTrafficShapingHandler(timer, 0, bandwidthFactor * messageSize, check);
} }
p.addLast("traffic", handler); p.addLast(TRAFFIC, handler);
}
if (additionalExecutor) {
p.addLast("executor", new ExecutionHandler(executorHandler));
} }
p.addLast("handler", sh); p.addLast("handler", sh);
logger.info("Server Pipeline: "+p); logger.info("Server Pipeline: "+p);
return p; return p;
} else { } else {
ch = new ClientTrafficHandler(promise, minimalWaitBetween, multipleMessage, ch = new ClientHandler(promise, minimalWaitBetween, multipleMessage,
autoRead); autoRead);
ChannelPipeline p = Channels.pipeline(); ChannelPipeline p = Channels.pipeline();
if (limitWrite) { if (limitWrite) {
@ -163,10 +162,7 @@ public class TrafficShapingTest {
} else { } else {
handler = new ChannelTrafficShapingHandler(timer, bandwidthFactor * messageSize, 0, check); handler = new ChannelTrafficShapingHandler(timer, bandwidthFactor * messageSize, 0, check);
} }
p.addLast("traffic", handler); p.addLast(TRAFFIC, handler);
}
if (additionalExecutor) {
p.addLast("executor", new ExecutionHandler(executorHandler));
} }
p.addLast("handler", ch); p.addLast("handler", ch);
logger.info("Client Pipeline: "+p); logger.info("Client Pipeline: "+p);
@ -174,6 +170,7 @@ public class TrafficShapingTest {
} }
} }
private static long[] computeWaitRead(int[] multipleMessage) { private static long[] computeWaitRead(int[] multipleMessage) {
long[] minimalWaitBetween = new long[multipleMessage.length + 1]; long[] minimalWaitBetween = new long[multipleMessage.length + 1];
minimalWaitBetween[0] = 0; minimalWaitBetween[0] = 0;
@ -196,13 +193,30 @@ public class TrafficShapingTest {
minimalWaitBetween[i] = 10; minimalWaitBetween[i] = 10;
} }
} }
return minimalWaitBetween;
}
private static long[] computeWaitAutoRead(int []autoRead) {
long [] minimalWaitBetween = new long[autoRead.length + 1];
minimalWaitBetween[0] = 0; minimalWaitBetween[0] = 0;
for (int i = 0; i < autoRead.length; i++) {
if (autoRead[i] != 0) {
if (autoRead[i] > 0) {
minimalWaitBetween[i + 1] = -1;
} else {
minimalWaitBetween[i + 1] = check;
}
} else {
minimalWaitBetween[i + 1] = 0;
}
}
return minimalWaitBetween; return minimalWaitBetween;
} }
@Test(timeout = 10000) @Test(timeout = 10000)
public void testNoTrafficShapping() throws Throwable { public void testNoTrafficShapping() throws Throwable {
logger.info("TEST NO TRAFFIC"); TESTNAME = "TEST NO TRAFFIC";
TESTRUN = 0;
testNoTrafficShapping(bootstrapServer, bootstrapCient); testNoTrafficShapping(bootstrapServer, bootstrapCient);
} }
@ -213,35 +227,24 @@ public class TrafficShapingTest {
testTrafficShapping0(sb, cb, false, false, false, false, autoRead, minimalWaitBetween, multipleMessage); testTrafficShapping0(sb, cb, false, false, false, false, autoRead, minimalWaitBetween, multipleMessage);
} }
@Test(timeout = 10000)
public void testExecNoTrafficShapping() throws Throwable {
logger.info("TEST EXEC NO TRAFFIC");
testExecNoTrafficShapping(bootstrapServer, bootstrapCient);
}
public void testExecNoTrafficShapping(ServerBootstrap sb, ClientBootstrap cb) throws Throwable {
int[] autoRead = null;
int[] multipleMessage = { 1, 2, 1 };
long[] minimalWaitBetween = null;
testTrafficShapping0(sb, cb, true, false, false, false, autoRead, minimalWaitBetween, multipleMessage);
}
@Test(timeout = 10000) @Test(timeout = 10000)
public void testWriteTrafficShapping() throws Throwable { public void testWriteTrafficShapping() throws Throwable {
logger.info("TEST WRITE"); TESTNAME = "TEST WRITE";
TESTRUN = 0;
testWriteTrafficShapping(bootstrapServer, bootstrapCient); testWriteTrafficShapping(bootstrapServer, bootstrapCient);
} }
public void testWriteTrafficShapping(ServerBootstrap sb, ClientBootstrap cb) throws Throwable { public void testWriteTrafficShapping(ServerBootstrap sb, ClientBootstrap cb) throws Throwable {
int[] autoRead = null; int[] autoRead = null;
int[] multipleMessage = { 1, 1, 2, 1 }; int[] multipleMessage = { 1, 2, 1, 1 };
long[] minimalWaitBetween = computeWaitWrite(multipleMessage); long[] minimalWaitBetween = computeWaitWrite(multipleMessage);
testTrafficShapping0(sb, cb, false, false, true, false, autoRead, minimalWaitBetween, multipleMessage); testTrafficShapping0(sb, cb, false, false, true, false, autoRead, minimalWaitBetween, multipleMessage);
} }
@Test(timeout = 10000) @Test(timeout = 10000)
public void testReadTrafficShapping() throws Throwable { public void testReadTrafficShapping() throws Throwable {
logger.info("TEST READ"); TESTNAME = "TEST READ";
TESTRUN = 0;
testReadTrafficShapping(bootstrapServer, bootstrapCient); testReadTrafficShapping(bootstrapServer, bootstrapCient);
} }
@ -254,20 +257,22 @@ public class TrafficShapingTest {
@Test(timeout = 10000) @Test(timeout = 10000)
public void testWrite1TrafficShapping() throws Throwable { public void testWrite1TrafficShapping() throws Throwable {
logger.info("TEST WRITE"); TESTNAME = "TEST WRITE";
TESTRUN = 0;
testWrite1TrafficShapping(bootstrapServer, bootstrapCient); testWrite1TrafficShapping(bootstrapServer, bootstrapCient);
} }
public void testWrite1TrafficShapping(ServerBootstrap sb, ClientBootstrap cb) throws Throwable { public void testWrite1TrafficShapping(ServerBootstrap sb, ClientBootstrap cb) throws Throwable {
int[] autoRead = null; int[] autoRead = null;
int[] multipleMessage = { 1, 1, 1, 1 }; int[] multipleMessage = { 1, 1, 1 };
long[] minimalWaitBetween = computeWaitWrite(multipleMessage); long[] minimalWaitBetween = computeWaitWrite(multipleMessage);
testTrafficShapping0(sb, cb, false, false, true, false, autoRead, minimalWaitBetween, multipleMessage); testTrafficShapping0(sb, cb, false, false, true, false, autoRead, minimalWaitBetween, multipleMessage);
} }
@Test(timeout = 10000) @Test(timeout = 10000)
public void testRead1TrafficShapping() throws Throwable { public void testRead1TrafficShapping() throws Throwable {
logger.info("TEST READ"); TESTNAME = "TEST READ";
TESTRUN = 0;
testRead1TrafficShapping(bootstrapServer, bootstrapCient); testRead1TrafficShapping(bootstrapServer, bootstrapCient);
} }
@ -278,48 +283,24 @@ public class TrafficShapingTest {
testTrafficShapping0(sb, cb, false, true, false, false, autoRead, minimalWaitBetween, multipleMessage); testTrafficShapping0(sb, cb, false, true, false, false, autoRead, minimalWaitBetween, multipleMessage);
} }
@Test(timeout = 10000)
public void testExecWriteTrafficShapping() throws Throwable {
logger.info("TEST EXEC WRITE");
testExecWriteTrafficShapping(bootstrapServer, bootstrapCient);
}
public void testExecWriteTrafficShapping(ServerBootstrap sb, ClientBootstrap cb) throws Throwable {
int[] autoRead = null;
int[] multipleMessage = { 1, 1, 2, 1 };
long[] minimalWaitBetween = computeWaitWrite(multipleMessage);
testTrafficShapping0(sb, cb, true, false, true, false, autoRead, minimalWaitBetween, multipleMessage);
}
@Test(timeout = 10000)
public void testExecReadTrafficShapping() throws Throwable {
logger.info("TEST EXEC READ");
testExecReadTrafficShapping(bootstrapServer, bootstrapCient);
}
public void testExecReadTrafficShapping(ServerBootstrap sb, ClientBootstrap cb) throws Throwable {
int[] autoRead = null;
int[] multipleMessage = { 1, 2, 1, 1 };
long[] minimalWaitBetween = computeWaitRead(multipleMessage);
testTrafficShapping0(sb, cb, true, true, false, false, autoRead, minimalWaitBetween, multipleMessage);
}
@Test(timeout = 10000) @Test(timeout = 10000)
public void testWriteGlobalTrafficShapping() throws Throwable { public void testWriteGlobalTrafficShapping() throws Throwable {
logger.info("TEST GLOBAL WRITE"); TESTNAME = "TEST GLOBAL WRITE";
TESTRUN = 0;
testWriteGlobalTrafficShapping(bootstrapServer, bootstrapCient); testWriteGlobalTrafficShapping(bootstrapServer, bootstrapCient);
} }
public void testWriteGlobalTrafficShapping(ServerBootstrap sb, ClientBootstrap cb) throws Throwable { public void testWriteGlobalTrafficShapping(ServerBootstrap sb, ClientBootstrap cb) throws Throwable {
int[] autoRead = null; int[] autoRead = null;
int[] multipleMessage = { 1, 1, 2, 1 }; int[] multipleMessage = { 1, 2, 1, 1 };
long[] minimalWaitBetween = computeWaitWrite(multipleMessage); long[] minimalWaitBetween = computeWaitWrite(multipleMessage);
testTrafficShapping0(sb, cb, false, false, true, true, autoRead, minimalWaitBetween, multipleMessage); testTrafficShapping0(sb, cb, false, false, true, true, autoRead, minimalWaitBetween, multipleMessage);
} }
@Test(timeout = 10000) @Test(timeout = 10000)
public void testReadGlobalTrafficShapping() throws Throwable { public void testReadGlobalTrafficShapping() throws Throwable {
logger.info("TEST GLOBAL READ"); TESTNAME = "TEST GLOBAL READ";
TESTRUN = 0;
testReadGlobalTrafficShapping(bootstrapServer, bootstrapCient); testReadGlobalTrafficShapping(bootstrapServer, bootstrapCient);
} }
@ -332,52 +313,32 @@ public class TrafficShapingTest {
@Test(timeout = 10000) @Test(timeout = 10000)
public void testAutoReadTrafficShapping() throws Throwable { public void testAutoReadTrafficShapping() throws Throwable {
logger.info("TEST AUTO READ"); TESTNAME = "TEST AUTO READ";
TESTRUN = 0;
testAutoReadTrafficShapping(bootstrapServer, bootstrapCient); testAutoReadTrafficShapping(bootstrapServer, bootstrapCient);
} }
public void testAutoReadTrafficShapping(ServerBootstrap sb, ClientBootstrap cb) throws Throwable { public void testAutoReadTrafficShapping(ServerBootstrap sb, ClientBootstrap cb) throws Throwable {
int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 }; int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 };
int[] multipleMessage = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }; int[] multipleMessage = new int[autoRead.length];
long[] minimalWaitBetween = computeWaitRead(multipleMessage); Arrays.fill(multipleMessage, 1);
long[] minimalWaitBetween = computeWaitAutoRead(autoRead);
testTrafficShapping0(sb, cb, false, true, false, false, autoRead, minimalWaitBetween, multipleMessage); testTrafficShapping0(sb, cb, false, true, false, false, autoRead, minimalWaitBetween, multipleMessage);
} }
@Test(timeout = 10000) @Test(timeout = 10000)
public void testAutoReadGlobalTrafficShapping() throws Throwable { public void testAutoReadGlobalTrafficShapping() throws Throwable {
logger.info("TEST AUTO READ GLOBAL"); TESTNAME = "TEST AUTO READ GLOBAL";
testAutoReadTrafficShapping(bootstrapServer, bootstrapCient); TESTRUN = 0;
testAutoReadGlobalTrafficShapping(bootstrapServer, bootstrapCient);
} }
public void testAutoReadGlobalTrafficShapping(ServerBootstrap sb, ClientBootstrap cb) throws Throwable { public void testAutoReadGlobalTrafficShapping(ServerBootstrap sb, ClientBootstrap cb) throws Throwable {
int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 }; int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 };
int[] multipleMessage = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }; int[] multipleMessage = new int[autoRead.length];
long[] minimalWaitBetween = computeWaitRead(multipleMessage); Arrays.fill(multipleMessage, 1);
long[] minimalWaitBetween = computeWaitAutoRead(autoRead);
testTrafficShapping0(sb, cb, false, true, false, true, autoRead, minimalWaitBetween, multipleMessage); testTrafficShapping0(sb, cb, false, true, false, true, autoRead, minimalWaitBetween, multipleMessage);
} }
@Test(timeout = 10000)
public void testAutoReadExecTrafficShapping() throws Throwable {
logger.info("TEST AUTO READ EXEC");
testAutoReadTrafficShapping(bootstrapServer, bootstrapCient);
}
public void testAutoReadExecTrafficShapping(ServerBootstrap sb, ClientBootstrap cb) throws Throwable {
int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 };
int[] multipleMessage = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
long[] minimalWaitBetween = computeWaitRead(multipleMessage);
testTrafficShapping0(sb, cb, true, true, false, false, autoRead, minimalWaitBetween, multipleMessage);
}
@Test(timeout = 10000)
public void testAutoReadExecGlobalTrafficShapping() throws Throwable {
logger.info("TEST AUTO READ EXEC GLOBAL");
testAutoReadTrafficShapping(bootstrapServer, bootstrapCient);
}
public void testAutoReadExecGlobalTrafficShapping(ServerBootstrap sb, ClientBootstrap cb) throws Throwable {
int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 };
int[] multipleMessage = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
long[] minimalWaitBetween = computeWaitRead(multipleMessage);
testTrafficShapping0(sb, cb, true, true, false, true, autoRead, minimalWaitBetween, multipleMessage);
}
/** /**
* *
@ -403,16 +364,17 @@ public class TrafficShapingTest {
private static void testTrafficShapping0(ServerBootstrap sb, ClientBootstrap cb, final boolean additionalExecutor, private static void testTrafficShapping0(ServerBootstrap sb, ClientBootstrap cb, final boolean additionalExecutor,
final boolean limitRead, final boolean limitWrite, final boolean globalLimit, int[] autoRead, final boolean limitRead, final boolean limitWrite, final boolean globalLimit, int[] autoRead,
long[] minimalWaitBetween, int[] multipleMessage) throws Throwable { long[] minimalWaitBetween, int[] multipleMessage) throws Throwable {
logger.info("Exec: " + additionalExecutor + " Read: " + limitRead + " Write: " + limitWrite + " Global: " TESTRUN ++;
logger.info("TEST: " + TESTNAME + " RUN: " + TESTRUN +
" Exec: " + additionalExecutor + " Read: " + limitRead + " Write: " + limitWrite + " Global: "
+ globalLimit); + globalLimit);
TrafficShapingTest.additionalExecutor = additionalExecutor; TrafficShapingHandlerTest.autoRead = autoRead;
TrafficShapingTest.autoRead = autoRead; TrafficShapingHandlerTest.globalLimit = globalLimit;
TrafficShapingTest.globalLimit = globalLimit; TrafficShapingHandlerTest.limitRead = limitRead;
TrafficShapingTest.limitRead = limitRead; TrafficShapingHandlerTest.limitWrite = limitWrite;
TrafficShapingTest.limitWrite = limitWrite; TrafficShapingHandlerTest.minimalWaitBetween = minimalWaitBetween;
TrafficShapingTest.minimalWaitBetween = minimalWaitBetween; TrafficShapingHandlerTest.multipleMessage = multipleMessage;
TrafficShapingTest.multipleMessage = multipleMessage; TrafficShapingHandlerTest.promise = new DefaultChannelFuture(null, true);
TrafficShapingTest.promise = new DefaultChannelFuture(null, true);
Channel cc = cb.connect(serverSocketAddress).await().getChannel(); Channel cc = cb.connect(serverSocketAddress).await().getChannel();
@ -431,7 +393,8 @@ public class TrafficShapingTest {
assertTrue("Error during exceution of TrafficShapping: " + promise.getCause(), promise.isSuccess()); assertTrue("Error during exceution of TrafficShapping: " + promise.getCause(), promise.isSuccess());
float average = (totalNb * messageSize) / (float) (stop - start); float average = (totalNb * messageSize) / (float) (stop - start);
logger.info("Average of traffic: " + average + " compare to " + bandwidthFactor); logger.info("TEST: " + TESTNAME + " RUN: " + TESTRUN +
" Average of traffic: " + average + " compare to " + bandwidthFactor);
sh.channel.close().await(); sh.channel.close().await();
ch.channel.close().await(); ch.channel.close().await();
@ -472,18 +435,18 @@ public class TrafficShapingTest {
} }
} }
private static class ClientTrafficHandler extends SimpleChannelHandler { private static class ClientHandler extends SimpleChannelHandler {
volatile Channel channel; volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
volatile int step; volatile int step;
// first message will always be validated // first message will always be validated
private long currentLastTime = System.currentTimeMillis(); private long currentLastTime = System.currentTimeMillis() - minimalms;
private final long[] minimalWaitBetween; private final long[] minimalWaitBetween;
private final int[] multipleMessage; private final int[] multipleMessage;
private final int[] autoRead; private final int[] autoRead;
final ChannelFuture promise; final ChannelFuture promise;
ClientTrafficHandler(ChannelFuture promise, long[] minimalWaitBetween, int[] multipleMessage, ClientHandler(ChannelFuture promise, long[] minimalWaitBetween, int[] multipleMessage,
int[] autoRead) { int[] autoRead) {
this.minimalWaitBetween = minimalWaitBetween; this.minimalWaitBetween = minimalWaitBetween;
if (multipleMessage != null) { if (multipleMessage != null) {
@ -503,9 +466,10 @@ public class TrafficShapingTest {
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
channel = ctx.getChannel();
long lastTimestamp = 0; long lastTimestamp = 0;
channel = ctx.getChannel();
ChannelBuffer in = (ChannelBuffer) e.getMessage(); ChannelBuffer in = (ChannelBuffer) e.getMessage();
loggerClient.debug("Step: " + step + " Read: " + (in.readableBytes() / 8) + " blocks");
while (in.readable()) { while (in.readable()) {
lastTimestamp = in.readLong(); lastTimestamp = in.readLong();
multipleMessage[step]--; multipleMessage[step]--;
@ -517,22 +481,11 @@ public class TrafficShapingTest {
long minimalWait = (minimalWaitBetween != null) ? minimalWaitBetween[step] : 0; long minimalWait = (minimalWaitBetween != null) ? minimalWaitBetween[step] : 0;
int ar = 0; int ar = 0;
if (autoRead != null) { if (autoRead != null) {
// When autoRead: special timer
if (step > 0 && autoRead[step - 1] != 0) { if (step > 0 && autoRead[step - 1] != 0) {
ar = autoRead[step - 1]; ar = autoRead[step - 1];
if (ar > 0) {
// No limit
minimalWait = -1;
} else {
// minimal limit
minimalWait = minimalms;
}
} else {
// No limit
minimalWait = 0;
} }
} }
logger.info("Step: " + step + " Interval: " + (lastTimestamp - currentLastTime) + " compareTo " loggerClient.info("Step: " + step + " Interval: " + (lastTimestamp - currentLastTime) + " compareTo "
+ minimalWait + " (" + ar + ")"); + minimalWait + " (" + ar + ")");
assertTrue("The interval of time is incorrect:" + (lastTimestamp - currentLastTime) + " not> " assertTrue("The interval of time is incorrect:" + (lastTimestamp - currentLastTime) + " not> "
+ minimalWait, lastTimestamp - currentLastTime >= minimalWait); + minimalWait, lastTimestamp - currentLastTime >= minimalWait);
@ -558,14 +511,14 @@ public class TrafficShapingTest {
} }
} }
private static class ValidTimestampedHandler extends SimpleChannelHandler { private static class ServerHandler extends SimpleChannelHandler {
private final int[] autoRead; private final int[] autoRead;
private final int[] multipleMessage; private final int[] multipleMessage;
volatile Channel channel; volatile Channel channel;
volatile int step; volatile int step;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
ValidTimestampedHandler(int[] autoRead, int[] multipleMessage) { ServerHandler(int[] autoRead, int[] multipleMessage) {
this.autoRead = autoRead; this.autoRead = autoRead;
if (multipleMessage != null) { if (multipleMessage != null) {
this.multipleMessage = Arrays.copyOf(multipleMessage, multipleMessage.length); this.multipleMessage = Arrays.copyOf(multipleMessage, multipleMessage.length);
@ -585,39 +538,48 @@ public class TrafficShapingTest {
ChannelBuffer in = (ChannelBuffer) e.getMessage(); ChannelBuffer in = (ChannelBuffer) e.getMessage();
channel = ctx.getChannel(); channel = ctx.getChannel();
byte[] actual = new byte[in.readableBytes()]; byte[] actual = new byte[in.readableBytes()];
int nb = actual.length / messageSize;
loggerServer.info("Step: " + step + " Read: " + nb + " blocks");
in.readBytes(actual); in.readBytes(actual);
long timestamp = System.currentTimeMillis(); long timestamp = System.currentTimeMillis();
int nb = actual.length / messageSize;
int isAutoRead = 0; int isAutoRead = 0;
int laststep = step;
for (int i = 0; i < nb; i++) { for (int i = 0; i < nb; i++) {
multipleMessage[step]--; multipleMessage[step]--;
if (multipleMessage[step] == 0) { if (multipleMessage[step] == 0) {
// setAutoRead test
if (autoRead != null) { if (autoRead != null) {
isAutoRead = autoRead[step]; isAutoRead = autoRead[step];
} }
step++; step++;
} }
} }
if (laststep != step) {
// setAutoRead test
if (autoRead != null && isAutoRead != 2) { if (autoRead != null && isAutoRead != 2) {
if (isAutoRead != 0) { if (isAutoRead != 0) {
logger.info("Set AutoRead: " + (isAutoRead > 0) + " Step: " + step); loggerServer.info("Step: " + step + " Set AutoRead: " + (isAutoRead > 0));
channel.setReadable(isAutoRead > 0); channel.setReadable(isAutoRead > 0);
} else { } else {
logger.info("AutoRead: NO Step:" + step); loggerServer.info("Step: " + step + " AutoRead: NO");
} }
} }
logger.debug("Get: " + actual.length + " TS " + timestamp + " NB: " + nb); }
Thread.sleep(10);
loggerServer.debug("Step: " + step + " Write: " + nb);
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(); ChannelBuffer buf = ChannelBuffers.dynamicBuffer();
for (int i = 0; i < nb; i++) { for (int i = 0; i < nb; i++) {
buf.writeLong(timestamp); buf.writeLong(timestamp);
} }
channel.write(buf); channel.write(buf);
if (laststep != step) {
// setAutoRead test
if (isAutoRead != 0) { if (isAutoRead != 0) {
if (isAutoRead < 0) { if (isAutoRead < 0) {
final int exactStep = step; final int exactStep = step;
long wait = (isAutoRead == -1) ? minimalms : stepms + minimalms; long wait = (isAutoRead == -1) ? minimalms : stepms + minimalms;
if (isAutoRead == -3) { if (isAutoRead == -3) {
wait = stepms * 2; wait = stepms * 3;
} }
timer.newTimeout(new TimerTask() { timer.newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception { public void run(Timeout timeout) throws Exception {
@ -627,9 +589,11 @@ public class TrafficShapingTest {
}, wait, TimeUnit.MILLISECONDS); }, wait, TimeUnit.MILLISECONDS);
} else { } else {
if (isAutoRead > 1) { if (isAutoRead > 1) {
loggerServer.debug("Step: " + step + " Will Set AutoRead: True");
final int exactStep = step;
timer.newTimeout(new TimerTask() { timer.newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception { public void run(Timeout timeout) throws Exception {
logger.info("AutoRead: True, Step " + step); logger.info("AutoRead: True, Step " + exactStep);
channel.setReadable(true); channel.setReadable(true);
} }
}, stepms + minimalms, TimeUnit.MILLISECONDS); }, stepms + minimalms, TimeUnit.MILLISECONDS);
@ -637,6 +601,7 @@ public class TrafficShapingTest {
} }
} }
} }
}
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {