Fix for issue #2765 relative to unstable trafficshaping test procedure
Motivation: The test procedure is unstable due to not enough precise timestamping during the check. Modifications: Reducing the test cases and cibling "stable" test ("timestamp-able") bring more stability to the tests. Result: Tests for TrafficShapingHandler seem more stable (whatever using JVM 6, 7 or 8).
This commit is contained in:
parent
929f1bad80
commit
7327d681bb
@ -49,24 +49,27 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
public class TrafficShapingTest extends AbstractSocketTest {
|
public class TrafficShapingHandlerTest extends AbstractSocketTest {
|
||||||
private static final InternalLogger logger = InternalLoggerFactory.getInstance(TrafficShapingTest.class);
|
private static final InternalLogger logger = InternalLoggerFactory.getInstance(TrafficShapingHandlerTest.class);
|
||||||
private static final InternalLogger loggerServer = InternalLoggerFactory.getInstance(ValidTimestampedHandler.class);
|
private static final InternalLogger loggerServer = InternalLoggerFactory.getInstance("ServerTSH");
|
||||||
private static final InternalLogger loggerClient = InternalLoggerFactory.getInstance(ClientTrafficHandler.class);
|
private static final InternalLogger loggerClient = InternalLoggerFactory.getInstance("ClientTSH");
|
||||||
|
|
||||||
static final int messageSize = 1024;
|
static final int messageSize = 1024;
|
||||||
static final int bandwidthFactor = 15;
|
static final int bandwidthFactor = 12;
|
||||||
static final int minfactor = bandwidthFactor - (bandwidthFactor / 2);
|
static final int minfactor = 3;
|
||||||
static final int maxfactor = bandwidthFactor + (bandwidthFactor / 2);
|
static final int maxfactor = bandwidthFactor + (bandwidthFactor / 2);
|
||||||
static final long stepms = 1000 / bandwidthFactor;
|
static final long stepms = 1000 / bandwidthFactor;
|
||||||
static final long minimalms = Math.max(stepms / 2, 20) / 10 * 10;
|
static final long minimalms = Math.max(stepms / 2, 20) / 10 * 10;
|
||||||
static final long check = Math.max(Math.min(100, minimalms / 2) / 10 * 10, 20);
|
static final long check = 10;
|
||||||
private static final Random random = new Random();
|
private static final Random random = new Random();
|
||||||
static final byte[] data = new byte[messageSize];
|
static final byte[] data = new byte[messageSize];
|
||||||
|
|
||||||
private static final String TRAFFIC = "traffic";
|
private static final String TRAFFIC = "traffic";
|
||||||
|
private static String TESTNAME;
|
||||||
|
private static int TESTRUN;
|
||||||
|
|
||||||
private static EventExecutorGroup group;
|
private static EventExecutorGroup group;
|
||||||
|
private static EventExecutorGroup groupForGlobal;
|
||||||
private static ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
|
private static ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
|
||||||
static {
|
static {
|
||||||
random.nextBytes(data);
|
random.nextBytes(data);
|
||||||
@ -80,11 +83,14 @@ public class TrafficShapingTest extends AbstractSocketTest {
|
|||||||
Logger logger = (Logger) LoggerFactory.getLogger("ROOT");
|
Logger logger = (Logger) LoggerFactory.getLogger("ROOT");
|
||||||
logger.setLevel(Level.INFO);
|
logger.setLevel(Level.INFO);
|
||||||
group = new DefaultEventExecutorGroup(8);
|
group = new DefaultEventExecutorGroup(8);
|
||||||
|
groupForGlobal = new DefaultEventExecutorGroup(8);
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void destroyGroup() throws Exception {
|
public static void destroyGroup() throws Exception {
|
||||||
group.shutdownGracefully().sync();
|
group.shutdownGracefully().sync();
|
||||||
|
groupForGlobal.shutdownGracefully().sync();
|
||||||
|
executor.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static long[] computeWaitRead(int[] multipleMessage) {
|
private static long[] computeWaitRead(int[] multipleMessage) {
|
||||||
@ -104,9 +110,27 @@ public class TrafficShapingTest extends AbstractSocketTest {
|
|||||||
return minimalWaitBetween;
|
return minimalWaitBetween;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static long[] computeWaitAutoRead(int []autoRead) {
|
||||||
|
long [] minimalWaitBetween = new long[autoRead.length + 1];
|
||||||
|
minimalWaitBetween[0] = 0;
|
||||||
|
for (int i = 0; i < autoRead.length; i++) {
|
||||||
|
if (autoRead[i] != 0) {
|
||||||
|
if (autoRead[i] > 0) {
|
||||||
|
minimalWaitBetween[i + 1] = -1;
|
||||||
|
} else {
|
||||||
|
minimalWaitBetween[i + 1] = check;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
minimalWaitBetween[i + 1] = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return minimalWaitBetween;
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 10000)
|
@Test(timeout = 10000)
|
||||||
public void testNoTrafficShapping() throws Throwable {
|
public void testNoTrafficShapping() throws Throwable {
|
||||||
logger.info("TEST NO TRAFFIC");
|
TESTNAME = "TEST NO TRAFFIC";
|
||||||
|
TESTRUN = 0;
|
||||||
run();
|
run();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -117,35 +141,24 @@ public class TrafficShapingTest extends AbstractSocketTest {
|
|||||||
testTrafficShapping0(sb, cb, false, false, false, false, autoRead, minimalWaitBetween, multipleMessage);
|
testTrafficShapping0(sb, cb, false, false, false, false, autoRead, minimalWaitBetween, multipleMessage);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 20000)
|
|
||||||
public void testExecNoTrafficShapping() throws Throwable {
|
|
||||||
logger.info("TEST EXEC NO TRAFFIC");
|
|
||||||
run();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testExecNoTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
|
||||||
int[] autoRead = null;
|
|
||||||
int[] multipleMessage = { 1, 2, 1 };
|
|
||||||
long[] minimalWaitBetween = null;
|
|
||||||
testTrafficShapping0(sb, cb, true, false, false, false, autoRead, minimalWaitBetween, multipleMessage);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(timeout = 10000)
|
@Test(timeout = 10000)
|
||||||
public void testWriteTrafficShapping() throws Throwable {
|
public void testWriteTrafficShapping() throws Throwable {
|
||||||
logger.info("TEST WRITE");
|
TESTNAME = "TEST WRITE";
|
||||||
|
TESTRUN = 0;
|
||||||
run();
|
run();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testWriteTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
public void testWriteTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
||||||
int[] autoRead = null;
|
int[] autoRead = null;
|
||||||
int[] multipleMessage = { 1, 2, 1 };
|
int[] multipleMessage = { 1, 2, 1, 1 };
|
||||||
long[] minimalWaitBetween = computeWaitWrite(multipleMessage);
|
long[] minimalWaitBetween = computeWaitWrite(multipleMessage);
|
||||||
testTrafficShapping0(sb, cb, false, false, true, false, autoRead, minimalWaitBetween, multipleMessage);
|
testTrafficShapping0(sb, cb, false, false, true, false, autoRead, minimalWaitBetween, multipleMessage);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 10000)
|
@Test(timeout = 10000)
|
||||||
public void testReadTrafficShapping() throws Throwable {
|
public void testReadTrafficShapping() throws Throwable {
|
||||||
logger.info("TEST READ");
|
TESTNAME = "TEST READ";
|
||||||
|
TESTRUN = 0;
|
||||||
run();
|
run();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -158,7 +171,8 @@ public class TrafficShapingTest extends AbstractSocketTest {
|
|||||||
|
|
||||||
@Test(timeout = 10000)
|
@Test(timeout = 10000)
|
||||||
public void testWrite1TrafficShapping() throws Throwable {
|
public void testWrite1TrafficShapping() throws Throwable {
|
||||||
logger.info("TEST WRITE");
|
TESTNAME = "TEST WRITE";
|
||||||
|
TESTRUN = 0;
|
||||||
run();
|
run();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -171,7 +185,8 @@ public class TrafficShapingTest extends AbstractSocketTest {
|
|||||||
|
|
||||||
@Test(timeout = 10000)
|
@Test(timeout = 10000)
|
||||||
public void testRead1TrafficShapping() throws Throwable {
|
public void testRead1TrafficShapping() throws Throwable {
|
||||||
logger.info("TEST READ");
|
TESTNAME = "TEST READ";
|
||||||
|
TESTRUN = 0;
|
||||||
run();
|
run();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -182,48 +197,24 @@ public class TrafficShapingTest extends AbstractSocketTest {
|
|||||||
testTrafficShapping0(sb, cb, false, true, false, false, autoRead, minimalWaitBetween, multipleMessage);
|
testTrafficShapping0(sb, cb, false, true, false, false, autoRead, minimalWaitBetween, multipleMessage);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 20000)
|
|
||||||
public void testExecWriteTrafficShapping() throws Throwable {
|
|
||||||
logger.info("TEST EXEC WRITE");
|
|
||||||
run();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testExecWriteTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
|
||||||
int[] autoRead = null;
|
|
||||||
int[] multipleMessage = { 1, 2, 1 };
|
|
||||||
long[] minimalWaitBetween = computeWaitWrite(multipleMessage);
|
|
||||||
testTrafficShapping0(sb, cb, true, false, true, false, autoRead, minimalWaitBetween, multipleMessage);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(timeout = 10000)
|
|
||||||
public void testExecReadTrafficShapping() throws Throwable {
|
|
||||||
logger.info("TEST EXEC READ");
|
|
||||||
run();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testExecReadTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
|
||||||
int[] autoRead = null;
|
|
||||||
int[] multipleMessage = { 1, 2, 1, 1 };
|
|
||||||
long[] minimalWaitBetween = computeWaitRead(multipleMessage);
|
|
||||||
testTrafficShapping0(sb, cb, true, true, false, false, autoRead, minimalWaitBetween, multipleMessage);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(timeout = 10000)
|
@Test(timeout = 10000)
|
||||||
public void testWriteGlobalTrafficShapping() throws Throwable {
|
public void testWriteGlobalTrafficShapping() throws Throwable {
|
||||||
logger.info("TEST GLOBAL WRITE");
|
TESTNAME = "TEST GLOBAL WRITE";
|
||||||
|
TESTRUN = 0;
|
||||||
run();
|
run();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testWriteGlobalTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
public void testWriteGlobalTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
||||||
int[] autoRead = null;
|
int[] autoRead = null;
|
||||||
int[] multipleMessage = { 1, 2, 1 };
|
int[] multipleMessage = { 1, 2, 1, 1 };
|
||||||
long[] minimalWaitBetween = computeWaitWrite(multipleMessage);
|
long[] minimalWaitBetween = computeWaitWrite(multipleMessage);
|
||||||
testTrafficShapping0(sb, cb, false, false, true, true, autoRead, minimalWaitBetween, multipleMessage);
|
testTrafficShapping0(sb, cb, false, false, true, true, autoRead, minimalWaitBetween, multipleMessage);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 10000)
|
@Test(timeout = 10000)
|
||||||
public void testReadGlobalTrafficShapping() throws Throwable {
|
public void testReadGlobalTrafficShapping() throws Throwable {
|
||||||
logger.info("TEST GLOBAL READ");
|
TESTNAME = "TEST GLOBAL READ";
|
||||||
|
TESTRUN = 0;
|
||||||
run();
|
run();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -236,52 +227,32 @@ public class TrafficShapingTest extends AbstractSocketTest {
|
|||||||
|
|
||||||
@Test(timeout = 10000)
|
@Test(timeout = 10000)
|
||||||
public void testAutoReadTrafficShapping() throws Throwable {
|
public void testAutoReadTrafficShapping() throws Throwable {
|
||||||
logger.info("TEST AUTO READ");
|
TESTNAME = "TEST AUTO READ";
|
||||||
|
TESTRUN = 0;
|
||||||
run();
|
run();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testAutoReadTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
public void testAutoReadTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
||||||
int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 };
|
int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 };
|
||||||
int[] multipleMessage = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
|
int[] multipleMessage = new int[autoRead.length];
|
||||||
long[] minimalWaitBetween = computeWaitRead(multipleMessage);
|
Arrays.fill(multipleMessage, 1);
|
||||||
|
long[] minimalWaitBetween = computeWaitAutoRead(autoRead);
|
||||||
testTrafficShapping0(sb, cb, false, true, false, false, autoRead, minimalWaitBetween, multipleMessage);
|
testTrafficShapping0(sb, cb, false, true, false, false, autoRead, minimalWaitBetween, multipleMessage);
|
||||||
}
|
}
|
||||||
@Test(timeout = 10000)
|
@Test(timeout = 10000)
|
||||||
public void testAutoReadGlobalTrafficShapping() throws Throwable {
|
public void testAutoReadGlobalTrafficShapping() throws Throwable {
|
||||||
logger.info("TEST AUTO READ GLOBAL");
|
TESTNAME = "TEST AUTO READ GLOBAL";
|
||||||
|
TESTRUN = 0;
|
||||||
run();
|
run();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testAutoReadGlobalTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
public void testAutoReadGlobalTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
||||||
int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 };
|
int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 };
|
||||||
int[] multipleMessage = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
|
int[] multipleMessage = new int[autoRead.length];
|
||||||
long[] minimalWaitBetween = computeWaitRead(multipleMessage);
|
Arrays.fill(multipleMessage, 1);
|
||||||
|
long[] minimalWaitBetween = computeWaitAutoRead(autoRead);
|
||||||
testTrafficShapping0(sb, cb, false, true, false, true, autoRead, minimalWaitBetween, multipleMessage);
|
testTrafficShapping0(sb, cb, false, true, false, true, autoRead, minimalWaitBetween, multipleMessage);
|
||||||
}
|
}
|
||||||
@Test(timeout = 10000)
|
|
||||||
public void testAutoReadExecTrafficShapping() throws Throwable {
|
|
||||||
logger.info("TEST AUTO READ EXEC");
|
|
||||||
run();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testAutoReadExecTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
|
||||||
int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 };
|
|
||||||
int[] multipleMessage = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
|
|
||||||
long[] minimalWaitBetween = computeWaitRead(multipleMessage);
|
|
||||||
testTrafficShapping0(sb, cb, true, true, false, false, autoRead, minimalWaitBetween, multipleMessage);
|
|
||||||
}
|
|
||||||
@Test(timeout = 10000)
|
|
||||||
public void testAutoReadExecGlobalTrafficShapping() throws Throwable {
|
|
||||||
logger.info("TEST AUTO READ EXEC GLOBAL");
|
|
||||||
run();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testAutoReadExecGlobalTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
|
||||||
int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 };
|
|
||||||
int[] multipleMessage = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
|
|
||||||
long[] minimalWaitBetween = computeWaitRead(multipleMessage);
|
|
||||||
testTrafficShapping0(sb, cb, true, true, false, true, autoRead, minimalWaitBetween, multipleMessage);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
@ -307,23 +278,25 @@ public class TrafficShapingTest extends AbstractSocketTest {
|
|||||||
private static void testTrafficShapping0(ServerBootstrap sb, Bootstrap cb, final boolean additionalExecutor,
|
private static void testTrafficShapping0(ServerBootstrap sb, Bootstrap cb, final boolean additionalExecutor,
|
||||||
final boolean limitRead, final boolean limitWrite, final boolean globalLimit, int[] autoRead,
|
final boolean limitRead, final boolean limitWrite, final boolean globalLimit, int[] autoRead,
|
||||||
long[] minimalWaitBetween, int[] multipleMessage) throws Throwable {
|
long[] minimalWaitBetween, int[] multipleMessage) throws Throwable {
|
||||||
logger.info("Exec: " + additionalExecutor + " Read: " + limitRead + " Write: " + limitWrite + " Global: "
|
TESTRUN ++;
|
||||||
|
logger.info("TEST: " + TESTNAME + " RUN: " + TESTRUN +
|
||||||
|
" Exec: " + additionalExecutor + " Read: " + limitRead + " Write: " + limitWrite + " Global: "
|
||||||
+ globalLimit);
|
+ globalLimit);
|
||||||
final ValidTimestampedHandler sh = new ValidTimestampedHandler(autoRead, multipleMessage);
|
final ServerHandler sh = new ServerHandler(autoRead, multipleMessage);
|
||||||
Promise<Boolean> promise = group.next().newPromise();
|
Promise<Boolean> promise = group.next().newPromise();
|
||||||
final ClientTrafficHandler ch = new ClientTrafficHandler(promise, minimalWaitBetween, multipleMessage,
|
final ClientHandler ch = new ClientHandler(promise, minimalWaitBetween, multipleMessage,
|
||||||
autoRead);
|
autoRead);
|
||||||
|
|
||||||
final AbstractTrafficShapingHandler handler;
|
final AbstractTrafficShapingHandler handler;
|
||||||
if (limitRead) {
|
if (limitRead) {
|
||||||
if (globalLimit) {
|
if (globalLimit) {
|
||||||
handler = new GlobalTrafficShapingHandler(group, 0, bandwidthFactor * messageSize, check);
|
handler = new GlobalTrafficShapingHandler(groupForGlobal, 0, bandwidthFactor * messageSize, check);
|
||||||
} else {
|
} else {
|
||||||
handler = new ChannelTrafficShapingHandler(0, bandwidthFactor * messageSize, check);
|
handler = new ChannelTrafficShapingHandler(0, bandwidthFactor * messageSize, check);
|
||||||
}
|
}
|
||||||
} else if (limitWrite) {
|
} else if (limitWrite) {
|
||||||
if (globalLimit) {
|
if (globalLimit) {
|
||||||
handler = new GlobalTrafficShapingHandler(group, bandwidthFactor * messageSize, 0, check);
|
handler = new GlobalTrafficShapingHandler(groupForGlobal, bandwidthFactor * messageSize, 0, check);
|
||||||
} else {
|
} else {
|
||||||
handler = new ChannelTrafficShapingHandler(bandwidthFactor * messageSize, 0, check);
|
handler = new ChannelTrafficShapingHandler(bandwidthFactor * messageSize, 0, check);
|
||||||
}
|
}
|
||||||
@ -335,34 +308,18 @@ public class TrafficShapingTest extends AbstractSocketTest {
|
|||||||
@Override
|
@Override
|
||||||
protected void initChannel(SocketChannel c) throws Exception {
|
protected void initChannel(SocketChannel c) throws Exception {
|
||||||
if (limitRead) {
|
if (limitRead) {
|
||||||
if (additionalExecutor) {
|
c.pipeline().addLast(TRAFFIC, handler);
|
||||||
c.pipeline().addLast(group, TRAFFIC, handler);
|
|
||||||
} else {
|
|
||||||
c.pipeline().addLast(TRAFFIC, handler);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (additionalExecutor) {
|
|
||||||
c.pipeline().addLast(group, sh);
|
|
||||||
} else {
|
|
||||||
c.pipeline().addLast(sh);
|
|
||||||
}
|
}
|
||||||
|
c.pipeline().addLast(sh);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
cb.handler(new ChannelInitializer<SocketChannel>() {
|
cb.handler(new ChannelInitializer<SocketChannel>() {
|
||||||
@Override
|
@Override
|
||||||
protected void initChannel(SocketChannel c) throws Exception {
|
protected void initChannel(SocketChannel c) throws Exception {
|
||||||
if (limitWrite) {
|
if (limitWrite) {
|
||||||
if (additionalExecutor) {
|
c.pipeline().addLast(TRAFFIC, handler);
|
||||||
c.pipeline().addLast(group, TRAFFIC, handler);
|
|
||||||
} else {
|
|
||||||
c.pipeline().addLast(TRAFFIC, handler);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (additionalExecutor) {
|
|
||||||
c.pipeline().addLast(group, ch);
|
|
||||||
} else {
|
|
||||||
c.pipeline().addLast(ch);
|
|
||||||
}
|
}
|
||||||
|
c.pipeline().addLast(ch);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -385,7 +342,8 @@ public class TrafficShapingTest extends AbstractSocketTest {
|
|||||||
assertTrue("Error during exceution of TrafficShapping: " + promise.cause(), promise.isSuccess());
|
assertTrue("Error during exceution of TrafficShapping: " + promise.cause(), promise.isSuccess());
|
||||||
|
|
||||||
float average = (totalNb * messageSize) / (float) (stop - start);
|
float average = (totalNb * messageSize) / (float) (stop - start);
|
||||||
logger.info("Average of traffic: " + average + " compare to " + bandwidthFactor);
|
logger.info("TEST: " + TESTNAME + " RUN: " + TESTRUN +
|
||||||
|
" Average of traffic: " + average + " compare to " + bandwidthFactor);
|
||||||
sh.channel.close().sync();
|
sh.channel.close().sync();
|
||||||
ch.channel.close().sync();
|
ch.channel.close().sync();
|
||||||
sc.close().sync();
|
sc.close().sync();
|
||||||
@ -423,7 +381,7 @@ public class TrafficShapingTest extends AbstractSocketTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class ClientTrafficHandler extends SimpleChannelInboundHandler<ByteBuf> {
|
private static class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
|
||||||
volatile Channel channel;
|
volatile Channel channel;
|
||||||
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
|
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
|
||||||
volatile int step;
|
volatile int step;
|
||||||
@ -434,7 +392,7 @@ public class TrafficShapingTest extends AbstractSocketTest {
|
|||||||
private final int[] autoRead;
|
private final int[] autoRead;
|
||||||
final Promise<Boolean> promise;
|
final Promise<Boolean> promise;
|
||||||
|
|
||||||
ClientTrafficHandler(Promise<Boolean> promise, long[] minimalWaitBetween, int[] multipleMessage,
|
ClientHandler(Promise<Boolean> promise, long[] minimalWaitBetween, int[] multipleMessage,
|
||||||
int[] autoRead) {
|
int[] autoRead) {
|
||||||
this.minimalWaitBetween = minimalWaitBetween;
|
this.minimalWaitBetween = minimalWaitBetween;
|
||||||
this.multipleMessage = Arrays.copyOf(multipleMessage, multipleMessage.length);
|
this.multipleMessage = Arrays.copyOf(multipleMessage, multipleMessage.length);
|
||||||
@ -450,6 +408,7 @@ public class TrafficShapingTest extends AbstractSocketTest {
|
|||||||
@Override
|
@Override
|
||||||
public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
|
public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
|
||||||
long lastTimestamp = 0;
|
long lastTimestamp = 0;
|
||||||
|
loggerClient.debug("Step: " + step + " Read: " + (in.readableBytes() / 8) + " blocks");
|
||||||
while (in.isReadable()) {
|
while (in.isReadable()) {
|
||||||
lastTimestamp = in.readLong();
|
lastTimestamp = in.readLong();
|
||||||
multipleMessage[step]--;
|
multipleMessage[step]--;
|
||||||
@ -463,13 +422,6 @@ public class TrafficShapingTest extends AbstractSocketTest {
|
|||||||
if (autoRead != null) {
|
if (autoRead != null) {
|
||||||
if (step > 0 && autoRead[step - 1] != 0) {
|
if (step > 0 && autoRead[step - 1] != 0) {
|
||||||
ar = autoRead[step - 1];
|
ar = autoRead[step - 1];
|
||||||
if (ar > 0) {
|
|
||||||
minimalWait = -1;
|
|
||||||
} else {
|
|
||||||
minimalWait = minimalms / 3;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
minimalWait = 0;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
loggerClient.info("Step: " + step + " Interval: " + (lastTimestamp - currentLastTime) + " compareTo "
|
loggerClient.info("Step: " + step + " Interval: " + (lastTimestamp - currentLastTime) + " compareTo "
|
||||||
@ -499,14 +451,14 @@ public class TrafficShapingTest extends AbstractSocketTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class ValidTimestampedHandler extends SimpleChannelInboundHandler<ByteBuf> {
|
private static class ServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
|
||||||
private final int[] autoRead;
|
private final int[] autoRead;
|
||||||
private final int[] multipleMessage;
|
private final int[] multipleMessage;
|
||||||
volatile Channel channel;
|
volatile Channel channel;
|
||||||
volatile int step;
|
volatile int step;
|
||||||
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
|
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
|
||||||
|
|
||||||
ValidTimestampedHandler(int[] autoRead, int[] multipleMessage) {
|
ServerHandler(int[] autoRead, int[] multipleMessage) {
|
||||||
this.autoRead = autoRead;
|
this.autoRead = autoRead;
|
||||||
this.multipleMessage = Arrays.copyOf(multipleMessage, multipleMessage.length);
|
this.multipleMessage = Arrays.copyOf(multipleMessage, multipleMessage.length);
|
||||||
}
|
}
|
||||||
@ -519,14 +471,16 @@ public class TrafficShapingTest extends AbstractSocketTest {
|
|||||||
@Override
|
@Override
|
||||||
public void channelRead0(final ChannelHandlerContext ctx, ByteBuf in) throws Exception {
|
public void channelRead0(final ChannelHandlerContext ctx, ByteBuf in) throws Exception {
|
||||||
byte[] actual = new byte[in.readableBytes()];
|
byte[] actual = new byte[in.readableBytes()];
|
||||||
|
int nb = actual.length / messageSize;
|
||||||
|
loggerServer.info("Step: " + step + " Read: " + nb + " blocks");
|
||||||
in.readBytes(actual);
|
in.readBytes(actual);
|
||||||
long timestamp = System.currentTimeMillis();
|
long timestamp = System.currentTimeMillis();
|
||||||
int nb = actual.length / messageSize;
|
|
||||||
int isAutoRead = 0;
|
int isAutoRead = 0;
|
||||||
int laststep = step;
|
int laststep = step;
|
||||||
for (int i = 0; i < nb; i++) {
|
for (int i = 0; i < nb; i++) {
|
||||||
multipleMessage[step]--;
|
multipleMessage[step]--;
|
||||||
if (multipleMessage[step] == 0) {
|
if (multipleMessage[step] == 0) {
|
||||||
|
// setAutoRead test
|
||||||
if (autoRead != null) {
|
if (autoRead != null) {
|
||||||
isAutoRead = autoRead[step];
|
isAutoRead = autoRead[step];
|
||||||
}
|
}
|
||||||
@ -534,21 +488,23 @@ public class TrafficShapingTest extends AbstractSocketTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (laststep != step) {
|
if (laststep != step) {
|
||||||
|
// setAutoRead test
|
||||||
if (autoRead != null && isAutoRead != 2) {
|
if (autoRead != null && isAutoRead != 2) {
|
||||||
if (isAutoRead != 0) {
|
if (isAutoRead != 0) {
|
||||||
loggerServer.info("Set AutoRead: " + (isAutoRead > 0) + " Step: " + step);
|
loggerServer.info("Step: " + step + " Set AutoRead: " + (isAutoRead > 0));
|
||||||
channel.config().setAutoRead(isAutoRead > 0);
|
channel.config().setAutoRead(isAutoRead > 0);
|
||||||
} else {
|
} else {
|
||||||
loggerServer.info("AutoRead: NO Step:" + step);
|
loggerServer.info("Step: " + step + " AutoRead: NO");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
loggerServer.debug("Step: " + step + " Get: " + actual.length + " TS " + timestamp + " NB: " + nb);
|
loggerServer.debug("Step: " + step + " Write: " + nb);
|
||||||
for (int i = 0; i < nb; i++) {
|
for (int i = 0; i < nb; i++) {
|
||||||
channel.write(Unpooled.copyLong(timestamp));
|
channel.write(Unpooled.copyLong(timestamp));
|
||||||
}
|
}
|
||||||
channel.flush();
|
channel.flush();
|
||||||
if (laststep != step) {
|
if (laststep != step) {
|
||||||
|
// setAutoRead test
|
||||||
if (isAutoRead != 0) {
|
if (isAutoRead != 0) {
|
||||||
if (isAutoRead < 0) {
|
if (isAutoRead < 0) {
|
||||||
final int exactStep = step;
|
final int exactStep = step;
|
||||||
@ -558,16 +514,17 @@ public class TrafficShapingTest extends AbstractSocketTest {
|
|||||||
}
|
}
|
||||||
executor.schedule(new Runnable() {
|
executor.schedule(new Runnable() {
|
||||||
public void run() {
|
public void run() {
|
||||||
loggerServer.info("Reset AutoRead: Step " + exactStep);
|
loggerServer.info("Step: " + exactStep + " Reset AutoRead");
|
||||||
channel.config().setAutoRead(true);
|
channel.config().setAutoRead(true);
|
||||||
}
|
}
|
||||||
}, wait, TimeUnit.MILLISECONDS);
|
}, wait, TimeUnit.MILLISECONDS);
|
||||||
} else {
|
} else {
|
||||||
if (isAutoRead > 1) {
|
if (isAutoRead > 1) {
|
||||||
loggerServer.info("Will Set AutoRead: Rrue, Step: " + step);
|
loggerServer.debug("Step: " + step + " Will Set AutoRead: True");
|
||||||
|
final int exactStep = step;
|
||||||
executor.schedule(new Runnable() {
|
executor.schedule(new Runnable() {
|
||||||
public void run() {
|
public void run() {
|
||||||
loggerServer.info("Set AutoRead: Rrue, Step: " + step);
|
loggerServer.info("Step: " + exactStep + " Set AutoRead: True");
|
||||||
channel.config().setAutoRead(true);
|
channel.config().setAutoRead(true);
|
||||||
}
|
}
|
||||||
}, stepms + minimalms, TimeUnit.MILLISECONDS);
|
}, stepms + minimalms, TimeUnit.MILLISECONDS);
|
Loading…
Reference in New Issue
Block a user