diff --git a/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java
index 3d96d43a75..4b8f03d53e 100644
--- a/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java
+++ b/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java
@@ -1,12 +1,9 @@
/*
* Copyright 2011 The Netty Project
- *
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
- *
* http://www.apache.org/licenses/LICENSE-2.0
- *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -22,6 +19,8 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
+import io.netty.util.internal.logging.InternalLogger;
+import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.concurrent.TimeUnit;
@@ -38,20 +37,29 @@ import java.util.concurrent.TimeUnit;
* the read/write limit or the check interval, several methods allow that for you:
*
A TrafficCounter counts the read and written bytes such that the - * {@link AbstractTrafficShapingHandler} can limit the traffic, globally or per channel.
+ *+ * A TrafficCounter counts the read and written bytes such that the {@link AbstractTrafficShapingHandler} + * can limit the traffic, globally or per channel. + *
* - *It computes the statistics for both read and written every {@link #checkInterval}, and calls - * back to its parent {@link AbstractTrafficShapingHandler#doAccounting} method. If the checkInterval - * is set to 0, no accounting will be done and statistics will only be computed at each receive or - * write operation.
+ *+ * It computes the statistics for both read and written every {@link #checkInterval}, and calls back to its parent + * {@link AbstractTrafficShapingHandler#doAccounting} method. If the checkInterval is set to 0, no accounting will be + * done and statistics will only be computed at each receive or write operation. + *
*/ public class TrafficCounter { + private static final InternalLogger logger = InternalLoggerFactory.getInstance(TrafficCounter.class); + /** * Current written bytes */ @@ -83,11 +88,30 @@ public class TrafficCounter { */ private long lastReadBytes; + /** + * Last non 0 written bytes number during last check interval + */ + private long lastNonNullWrittenBytes; + + /** + * Last time written bytes with non 0 written bytes + */ + private long lastNonNullWrittenTime; + + /** + * Last time read bytes with non 0 written bytes + */ + private long lastNonNullReadTime; + + /** + * Last non 0 read bytes number during last check interval + */ + private long lastNonNullReadBytes; + /** * Delay between two captures */ - final AtomicLong checkInterval = new AtomicLong( - AbstractTrafficShapingHandler.DEFAULT_CHECK_INTERVAL); + final AtomicLong checkInterval = new AtomicLong(AbstractTrafficShapingHandler.DEFAULT_CHECK_INTERVAL); // default 1 s @@ -135,12 +159,12 @@ public class TrafficCounter { private final TrafficCounter counter; /** - * @param trafficShapingHandler The parent handler to which this task needs to callback to for accounting - * @param counter The parent TrafficCounter that we need to reset the statistics for + * @param trafficShapingHandler + * The parent handler to which this task needs to callback to for accounting + * @param counter + * The parent TrafficCounter that we need to reset the statistics for */ - protected TrafficMonitoringTask( - AbstractTrafficShapingHandler trafficShapingHandler, - TrafficCounter counter) { + protected TrafficMonitoringTask(AbstractTrafficShapingHandler trafficShapingHandler, TrafficCounter counter) { trafficShapingHandler1 = trafficShapingHandler; this.counter = counter; } @@ -156,7 +180,7 @@ public class TrafficCounter { trafficShapingHandler1.doAccounting(counter); } counter.scheduledFuture = counter.executor.schedule(this, counter.checkInterval.get(), - TimeUnit.MILLISECONDS); + TimeUnit.MILLISECONDS); } } @@ -171,8 +195,7 @@ public class TrafficCounter { if (checkInterval.get() > 0) { monitorActive.set(true); monitor = new TrafficMonitoringTask(trafficShapingHandler, this); - scheduledFuture = - executor.schedule(monitor, checkInterval.get(), TimeUnit.MILLISECONDS); + scheduledFuture = executor.schedule(monitor, checkInterval.get(), TimeUnit.MILLISECONDS); } } @@ -196,7 +219,8 @@ public class TrafficCounter { /** * Reset the accounting on Read and Write * - * @param newLastTime the millisecond unix timestamp that we should be considered up-to-date for + * @param newLastTime + * the millisecond unix timestamp that we should be considered up-to-date for */ synchronized void resetAccounting(long newLastTime) { long interval = newLastTime - lastTime.getAndSet(newLastTime); @@ -204,24 +228,40 @@ public class TrafficCounter { // nothing to do return; } + if (logger.isDebugEnabled() && interval > 2 * checkInterval()) { + logger.debug("Acct schedule not ok: " + interval + " > 2*" + checkInterval() + " from " + name); + } lastReadBytes = currentReadBytes.getAndSet(0); lastWrittenBytes = currentWrittenBytes.getAndSet(0); lastReadThroughput = lastReadBytes / interval * 1000; // nb byte / checkInterval in ms * 1000 (1s) lastWriteThroughput = lastWrittenBytes / interval * 1000; // nb byte / checkInterval in ms * 1000 (1s) + if (lastWrittenBytes > 0) { + lastNonNullWrittenBytes = lastWrittenBytes; + lastNonNullWrittenTime = newLastTime; + } + if (lastReadBytes > 0) { + lastNonNullReadBytes = lastReadBytes; + lastNonNullReadTime = newLastTime; + } } /** * Constructor with the {@link AbstractTrafficShapingHandler} that hosts it, the Timer to use, its * name, the checkInterval between two computations in millisecond - * @param trafficShapingHandler the associated AbstractTrafficShapingHandler - * @param executor the underlying executor service for scheduling checks - * @param name the name given to this monitor - * @param checkInterval the checkInterval in millisecond between two computations + * + * @param trafficShapingHandler + * the associated AbstractTrafficShapingHandler + * @param executor + * the underlying executor service for scheduling checks + * @param name + * the name given to this monitor + * @param checkInterval + * the checkInterval in millisecond between two computations */ - public TrafficCounter(AbstractTrafficShapingHandler trafficShapingHandler, - ScheduledExecutorService executor, String name, long checkInterval) { + public TrafficCounter(AbstractTrafficShapingHandler trafficShapingHandler, ScheduledExecutorService executor, + String name, long checkInterval) { this.trafficShapingHandler = trafficShapingHandler; this.executor = executor; this.name = name; @@ -232,7 +272,8 @@ public class TrafficCounter { /** * Change checkInterval between two computations in millisecond * - * @param newcheckInterval The new check interval (in milliseconds) + * @param newcheckInterval + * The new check interval (in milliseconds) */ public void configure(long newcheckInterval) { long newInterval = newcheckInterval / 10 * 10; @@ -313,9 +354,9 @@ public class TrafficCounter { } /** - * - * @return the current number of bytes read since the last checkInterval - */ + * + * @return the current number of bytes read since the last checkInterval + */ public long currentReadBytes() { return currentReadBytes.get(); } @@ -351,7 +392,7 @@ public class TrafficCounter { /** * @return the lastCumulativeTime in millisecond as of System.currentTimeMillis() - * when the cumulative counters were reset to 0. + * when the cumulative counters were reset to 0. */ public long lastCumulativeTime() { return lastCumulativeTime; @@ -373,15 +414,124 @@ public class TrafficCounter { return name; } + /** + * Returns the time to wait (if any) for the given length message, using the given limitTraffic and the max wait + * time + * + * @param size + * the recv size + * @param limitTraffic + * the traffic limit in bytes per second + * @param maxTime + * the max time in ms to wait in case of excess of traffic + * @return the current time to wait (in ms) if needed for Read operation + */ + public synchronized long readTimeToWait(final long size, final long limitTraffic, final long maxTime) { + final long now = System.currentTimeMillis(); + bytesRecvFlowControl(size); + if (limitTraffic == 0) { + return 0; + } + long sum = currentReadBytes.get(); + long interval = now - lastTime.get(); + // Short time checking + if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT && sum > 0) { + long time = (sum * 1000 / limitTraffic - interval) / 10 * 10; + if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) { + if (logger.isDebugEnabled()) { + logger.debug("Time: " + time + ":" + sum + ":" + interval); + } + return time > maxTime ? maxTime : time; + } + return 0; + } + // long time checking + if (lastNonNullReadBytes > 0 && lastNonNullReadTime + AbstractTrafficShapingHandler.MINIMAL_WAIT < now) { + long lastsum = sum + lastNonNullReadBytes; + long lastinterval = now - lastNonNullReadTime; + long time = (lastsum * 1000 / limitTraffic - lastinterval) / 10 * 10; + if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) { + if (logger.isDebugEnabled()) { + logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval); + } + return time > maxTime ? maxTime : time; + } + } else { + // final "middle" time checking in case resetAccounting called very recently + sum += lastReadBytes; + long lastinterval = AbstractTrafficShapingHandler.MINIMAL_WAIT; + long time = (sum * 1000 / limitTraffic - lastinterval) / 10 * 10; + if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) { + if (logger.isDebugEnabled()) { + logger.debug("Time: " + time + ":" + sum + ":" + lastinterval); + } + return time > maxTime ? maxTime : time; + } + } + return 0; + } + + /** + * Returns the time to wait (if any) for the given length message, using the given limitTraffic and + * the max wait time + * + * @param size + * the write size + * @param limitTraffic + * the traffic limit in bytes per second + * @param maxTime + * the max time in ms to wait in case of excess of traffic + * @return the current time to wait (in ms) if needed for Write operation + */ + public synchronized long writeTimeToWait(final long size, final long limitTraffic, final long maxTime) { + bytesWriteFlowControl(size); + if (limitTraffic == 0) { + return 0; + } + long sum = currentWrittenBytes.get(); + final long now = System.currentTimeMillis(); + long interval = now - lastTime.get(); + if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT && sum > 0) { + long time = (sum * 1000 / limitTraffic - interval) / 10 * 10; + if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) { + if (logger.isDebugEnabled()) { + logger.debug("Time: " + time + ":" + sum + ":" + interval); + } + return time > maxTime ? maxTime : time; + } + return 0; + } + if (lastNonNullWrittenBytes > 0 && lastNonNullWrittenTime + AbstractTrafficShapingHandler.MINIMAL_WAIT < now) { + long lastsum = sum + lastNonNullWrittenBytes; + long lastinterval = now - lastNonNullWrittenTime; + long time = (lastsum * 1000 / limitTraffic - lastinterval) / 10 * 10; + if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) { + if (logger.isDebugEnabled()) { + logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval); + } + return time > maxTime ? maxTime : time; + } + } else { + sum += lastWrittenBytes; + long lastinterval = AbstractTrafficShapingHandler.MINIMAL_WAIT + Math.abs(interval); + long time = (sum * 1000 / limitTraffic - lastinterval) / 10 * 10; + if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) { + if (logger.isDebugEnabled()) { + logger.debug("Time: " + time + ":" + sum + ":" + lastinterval); + } + return time > maxTime ? maxTime : time; + } + } + return 0; + } + /** * String information */ @Override public String toString() { - return "Monitor " + name + " Current Speed Read: " + - (lastReadThroughput >> 10) + " KB/s, Write: " + - (lastWriteThroughput >> 10) + " KB/s Current Read: " + - (currentReadBytes.get() >> 10) + " KB Current Write: " + - (currentWrittenBytes.get() >> 10) + " KB"; + return "Monitor " + name + " Current Speed Read: " + (lastReadThroughput >> 10) + " KB/s, Write: " + + (lastWriteThroughput >> 10) + " KB/s Current Read: " + (currentReadBytes.get() >> 10) + + " KB Current Write: " + (currentWrittenBytes.get() >> 10) + " KB"; } } diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/TrafficShapingTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/TrafficShapingTest.java new file mode 100644 index 0000000000..01c4d7178c --- /dev/null +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/TrafficShapingTest.java @@ -0,0 +1,588 @@ +/* + * Copyright 2012 The Netty Project + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport.socket; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.traffic.AbstractTrafficShapingHandler; +import io.netty.handler.traffic.ChannelTrafficShapingHandler; +import io.netty.handler.traffic.GlobalTrafficShapingHandler; +import io.netty.util.concurrent.DefaultEventExecutorGroup; +import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.Promise; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; +import io.netty.util.internal.logging.Slf4JLoggerFactory; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.LoggerFactory; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.*; + +public class TrafficShapingTest extends AbstractSocketTest { + private static final InternalLogger logger = InternalLoggerFactory.getInstance(TrafficShapingTest.class); + private static final InternalLogger loggerServer = InternalLoggerFactory.getInstance(ValidTimestampedHandler.class); + private static final InternalLogger loggerClient = InternalLoggerFactory.getInstance(ClientTrafficHandler.class); + + static final int messageSize = 1024; + static final int bandwidthFactor = 15; + static final int minfactor = bandwidthFactor - (bandwidthFactor / 2); + static final int maxfactor = bandwidthFactor + (bandwidthFactor / 2); + static final long stepms = 1000 / bandwidthFactor; + static final long minimalms = Math.max(stepms / 2, 20) / 10 * 10; + static final long check = Math.max(Math.min(100, minimalms / 2) / 10 * 10, 20); + private static final Random random = new Random(); + static final byte[] data = new byte[messageSize]; + + private static final String TRAFFIC = "traffic"; + + private static EventExecutorGroup group; + private static ScheduledExecutorService executor = Executors.newScheduledThreadPool(10); + static { + random.nextBytes(data); + } + + @BeforeClass + public static void createGroup() { + logger.info("Bandwidth: " + minfactor + " <= " + bandwidthFactor + " <= " + maxfactor + + " StepMs: " + stepms + " MinMs: " + minimalms + " CheckMs: " + check); + InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory()); + Logger logger = (Logger) LoggerFactory.getLogger("ROOT"); + logger.setLevel(Level.INFO); + group = new DefaultEventExecutorGroup(8); + } + + @AfterClass + public static void destroyGroup() throws Exception { + group.shutdownGracefully().sync(); + } + + private static long[] computeWaitRead(int[] multipleMessage) { + long[] minimalWaitBetween = new long[multipleMessage.length + 1]; + minimalWaitBetween[0] = 0; + for (int i = 0; i < multipleMessage.length; i++) { + minimalWaitBetween[i + 1] = (multipleMessage[i] - 1) * stepms + minimalms; + } + return minimalWaitBetween; + } + + private static long[] computeWaitWrite(int[] multipleMessage) { + long[] minimalWaitBetween = new long[multipleMessage.length + 1]; + for (int i = 0; i < multipleMessage.length; i++) { + minimalWaitBetween[i] = (multipleMessage[i] - 1) * stepms + minimalms; + } + return minimalWaitBetween; + } + + @Test(timeout = 10000) + public void testNoTrafficShapping() throws Throwable { + logger.info("TEST NO TRAFFIC"); + run(); + } + + public void testNoTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = null; + int[] multipleMessage = { 1, 2, 1 }; + long[] minimalWaitBetween = null; + testTrafficShapping0(sb, cb, false, false, false, false, autoRead, minimalWaitBetween, multipleMessage); + } + + @Test(timeout = 20000) + public void testExecNoTrafficShapping() throws Throwable { + logger.info("TEST EXEC NO TRAFFIC"); + run(); + } + + public void testExecNoTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = null; + int[] multipleMessage = { 1, 2, 1 }; + long[] minimalWaitBetween = null; + testTrafficShapping0(sb, cb, true, false, false, false, autoRead, minimalWaitBetween, multipleMessage); + } + + @Test(timeout = 10000) + public void testWriteTrafficShapping() throws Throwable { + logger.info("TEST WRITE"); + run(); + } + + public void testWriteTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = null; + int[] multipleMessage = { 1, 2, 1 }; + long[] minimalWaitBetween = computeWaitWrite(multipleMessage); + testTrafficShapping0(sb, cb, false, false, true, false, autoRead, minimalWaitBetween, multipleMessage); + } + + @Test(timeout = 10000) + public void testReadTrafficShapping() throws Throwable { + logger.info("TEST READ"); + run(); + } + + public void testReadTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = null; + int[] multipleMessage = { 1, 2, 1, 1 }; + long[] minimalWaitBetween = computeWaitRead(multipleMessage); + testTrafficShapping0(sb, cb, false, true, false, false, autoRead, minimalWaitBetween, multipleMessage); + } + + @Test(timeout = 10000) + public void testWrite1TrafficShapping() throws Throwable { + logger.info("TEST WRITE"); + run(); + } + + public void testWrite1TrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = null; + int[] multipleMessage = { 1, 1, 1 }; + long[] minimalWaitBetween = computeWaitWrite(multipleMessage); + testTrafficShapping0(sb, cb, false, false, true, false, autoRead, minimalWaitBetween, multipleMessage); + } + + @Test(timeout = 10000) + public void testRead1TrafficShapping() throws Throwable { + logger.info("TEST READ"); + run(); + } + + public void testRead1TrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = null; + int[] multipleMessage = { 1, 1, 1 }; + long[] minimalWaitBetween = computeWaitRead(multipleMessage); + testTrafficShapping0(sb, cb, false, true, false, false, autoRead, minimalWaitBetween, multipleMessage); + } + + @Test(timeout = 20000) + public void testExecWriteTrafficShapping() throws Throwable { + logger.info("TEST EXEC WRITE"); + run(); + } + + public void testExecWriteTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = null; + int[] multipleMessage = { 1, 2, 1 }; + long[] minimalWaitBetween = computeWaitWrite(multipleMessage); + testTrafficShapping0(sb, cb, true, false, true, false, autoRead, minimalWaitBetween, multipleMessage); + } + + @Test(timeout = 10000) + public void testExecReadTrafficShapping() throws Throwable { + logger.info("TEST EXEC READ"); + run(); + } + + public void testExecReadTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = null; + int[] multipleMessage = { 1, 2, 1, 1 }; + long[] minimalWaitBetween = computeWaitRead(multipleMessage); + testTrafficShapping0(sb, cb, true, true, false, false, autoRead, minimalWaitBetween, multipleMessage); + } + + @Test(timeout = 10000) + public void testWriteGlobalTrafficShapping() throws Throwable { + logger.info("TEST GLOBAL WRITE"); + run(); + } + + public void testWriteGlobalTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = null; + int[] multipleMessage = { 1, 2, 1 }; + long[] minimalWaitBetween = computeWaitWrite(multipleMessage); + testTrafficShapping0(sb, cb, false, false, true, true, autoRead, minimalWaitBetween, multipleMessage); + } + + @Test(timeout = 10000) + public void testReadGlobalTrafficShapping() throws Throwable { + logger.info("TEST GLOBAL READ"); + run(); + } + + public void testReadGlobalTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = null; + int[] multipleMessage = { 1, 2, 1, 1 }; + long[] minimalWaitBetween = computeWaitRead(multipleMessage); + testTrafficShapping0(sb, cb, false, true, false, true, autoRead, minimalWaitBetween, multipleMessage); + } + + @Test(timeout = 10000) + public void testAutoReadTrafficShapping() throws Throwable { + logger.info("TEST AUTO READ"); + run(); + } + + public void testAutoReadTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 }; + int[] multipleMessage = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }; + long[] minimalWaitBetween = computeWaitRead(multipleMessage); + testTrafficShapping0(sb, cb, false, true, false, false, autoRead, minimalWaitBetween, multipleMessage); + } + @Test(timeout = 10000) + public void testAutoReadGlobalTrafficShapping() throws Throwable { + logger.info("TEST AUTO READ GLOBAL"); + run(); + } + + public void testAutoReadGlobalTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 }; + int[] multipleMessage = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }; + long[] minimalWaitBetween = computeWaitRead(multipleMessage); + testTrafficShapping0(sb, cb, false, true, false, true, autoRead, minimalWaitBetween, multipleMessage); + } + @Test(timeout = 10000) + public void testAutoReadExecTrafficShapping() throws Throwable { + logger.info("TEST AUTO READ EXEC"); + run(); + } + + public void testAutoReadExecTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 }; + int[] multipleMessage = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }; + long[] minimalWaitBetween = computeWaitRead(multipleMessage); + testTrafficShapping0(sb, cb, true, true, false, false, autoRead, minimalWaitBetween, multipleMessage); + } + @Test(timeout = 10000) + public void testAutoReadExecGlobalTrafficShapping() throws Throwable { + logger.info("TEST AUTO READ EXEC GLOBAL"); + run(); + } + + public void testAutoReadExecGlobalTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 }; + int[] multipleMessage = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }; + long[] minimalWaitBetween = computeWaitRead(multipleMessage); + testTrafficShapping0(sb, cb, true, true, false, true, autoRead, minimalWaitBetween, multipleMessage); + } + + /** + * + * @param sb + * @param cb + * @param additionalExecutor + * shall the pipeline add the handler using an additionnal executor + * @param limitRead + * True to set Read Limit on Server side + * @param limitWrite + * True to set Write Limit on Client side + * @param globalLimit + * True to change Channel to Global TrafficShapping + * @param autoRead + * @param minimalWaitBetween + * time in ms that should be waited before getting the final result (note: for READ the values are + * right shifted once, the first value being 0) + * @param multipleMessage + * how many message to send at each step (for READ: the first should be 1, as the two last steps to + * ensure correct testing) + * @throws Throwable + */ + private static void testTrafficShapping0(ServerBootstrap sb, Bootstrap cb, final boolean additionalExecutor, + final boolean limitRead, final boolean limitWrite, final boolean globalLimit, int[] autoRead, + long[] minimalWaitBetween, int[] multipleMessage) throws Throwable { + logger.info("Exec: " + additionalExecutor + " Read: " + limitRead + " Write: " + limitWrite + " Global: " + + globalLimit); + final ValidTimestampedHandler sh = new ValidTimestampedHandler(autoRead, multipleMessage); + Promise