diff --git a/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java
index c1e9e75451..5eb44650de 100644
--- a/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java
+++ b/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java
@@ -22,6 +22,8 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
+import io.netty.util.internal.logging.InternalLogger;
+import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.concurrent.TimeUnit;
@@ -43,15 +45,24 @@ import java.util.concurrent.TimeUnit;
*
*/
public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler {
+ private static final InternalLogger logger =
+ InternalLoggerFactory.getInstance(AbstractTrafficShapingHandler.class);
/**
* Default delay between two checks: 1s
*/
public static final long DEFAULT_CHECK_INTERVAL = 1000;
+ /**
+ * Default max delay in case of traffic shaping
+ * (during which no communication will occur).
+ * Shall be less than TIMEOUT. Here half of "standard" 30s
+ */
+ public static final long DEFAULT_MAX_TIME = 15000;
+
/**
* Default minimal time to wait
*/
- private static final long MINIMAL_WAIT = 10;
+ static final long MINIMAL_WAIT = 10;
/**
* Traffic Counter
@@ -68,6 +79,11 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
*/
private long readLimit;
+ /**
+ * Max delay in wait
+ */
+ protected long maxTime = DEFAULT_MAX_TIME; // default 15 s
+
/**
* Delay between two performance snapshots
*/
@@ -94,12 +110,29 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed
+ * @param maxTime
+ * The maximum delay to wait in case of traffic excess
*/
protected AbstractTrafficShapingHandler(long writeLimit, long readLimit,
- long checkInterval) {
+ long checkInterval, long maxTime) {
this.writeLimit = writeLimit;
this.readLimit = readLimit;
this.checkInterval = checkInterval;
+ this.maxTime = maxTime;
+ }
+
+ /**
+ * @param writeLimit
+ * 0 or a limit in bytes/s
+ * @param readLimit
+ * 0 or a limit in bytes/s
+ * @param checkInterval
+ * The delay between two computations of performances for
+ * channels or 0 if no stats are to be computed
+ */
+ protected AbstractTrafficShapingHandler(long writeLimit, long readLimit,
+ long checkInterval) {
+ this(writeLimit, readLimit, checkInterval, DEFAULT_MAX_TIME);
}
/**
@@ -111,14 +144,14 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
* 0 or a limit in bytes/s
*/
protected AbstractTrafficShapingHandler(long writeLimit, long readLimit) {
- this(writeLimit, readLimit, DEFAULT_CHECK_INTERVAL);
+ this(writeLimit, readLimit, DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
}
/**
* Constructor using NO LIMIT and default Check Interval
*/
protected AbstractTrafficShapingHandler() {
- this(0, 0, DEFAULT_CHECK_INTERVAL);
+ this(0, 0, DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
}
/**
@@ -129,7 +162,7 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
* channels or 0 if no stats are to be computed
*/
protected AbstractTrafficShapingHandler(long checkInterval) {
- this(0, 0, checkInterval);
+ this(0, 0, checkInterval, DEFAULT_MAX_TIME);
}
/**
@@ -171,6 +204,73 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
}
}
+ /**
+ * @return the writeLimit
+ */
+ public long getWriteLimit() {
+ return writeLimit;
+ }
+
+ /**
+ * @param writeLimit the writeLimit to set
+ */
+ public void setWriteLimit(long writeLimit) {
+ this.writeLimit = writeLimit;
+ if (trafficCounter != null) {
+ trafficCounter.resetAccounting(System.currentTimeMillis() + 1);
+ }
+ }
+
+ /**
+ * @return the readLimit
+ */
+ public long getReadLimit() {
+ return readLimit;
+ }
+
+ /**
+ * @param readLimit the readLimit to set
+ */
+ public void setReadLimit(long readLimit) {
+ this.readLimit = readLimit;
+ if (trafficCounter != null) {
+ trafficCounter.resetAccounting(System.currentTimeMillis() + 1);
+ }
+ }
+
+ /**
+ * @return the checkInterval
+ */
+ public long getCheckInterval() {
+ return checkInterval;
+ }
+
+ /**
+ * @param checkInterval the checkInterval to set
+ */
+ public void setCheckInterval(long checkInterval) {
+ this.checkInterval = checkInterval;
+ if (trafficCounter != null) {
+ trafficCounter.configure(checkInterval);
+ }
+ }
+
+ /**
+ *
+ * @param maxTime
+ * Max delay in wait, shall be less than TIME OUT in related protocol
+ */
+ public void setMaxTimeWait(long maxTime) {
+ this.maxTime = maxTime;
+ }
+
+ /**
+ * @return the max delay in wait
+ */
+ public long getMaxTimeWait() {
+ return maxTime;
+ }
+
/**
* Called each time the accounting is computed from the TrafficCounters.
* This method could be used for instance to implement almost real time accounting.
@@ -178,7 +278,6 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
* @param counter
* the TrafficCounter that computes its performance
*/
- @SuppressWarnings("unused")
protected void doAccounting(TrafficCounter counter) {
// NOOP by default
}
@@ -192,107 +291,115 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
this.ctx = ctx;
}
- @Override
public void run() {
- ctx.attr(READ_SUSPENDED).set(false);
- ctx.read();
+ if (!ctx.channel().config().isAutoRead() && isHandlerActive(ctx)) {
+ // If AutoRead is False and Active is True, user make a direct setAutoRead(false)
+ // Then Just reset the status
+ if (logger.isDebugEnabled()) {
+ logger.debug("Not Unsuspend: " + ctx.channel().config().isAutoRead() + ":" + isHandlerActive(ctx));
+ }
+ ctx.attr(READ_SUSPENDED).set(false);
+ } else {
+ // Anything else allows the handler to reset the AutoRead
+ if (logger.isDebugEnabled()) {
+ if (ctx.channel().config().isAutoRead() && !isHandlerActive(ctx)) {
+ logger.debug("Unsuspend: " + ctx.channel().config().isAutoRead() + ":" + isHandlerActive(ctx));
+ } else {
+ logger.debug("Normal Unsuspend: " + ctx.channel().config().isAutoRead() + ":"
+ + isHandlerActive(ctx));
+ }
+ }
+ ctx.attr(READ_SUSPENDED).set(false);
+ ctx.channel().config().setAutoRead(true);
+ ctx.channel().read();
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Unsupsend final status => " + ctx.channel().config().isAutoRead() + ":"
+ + isHandlerActive(ctx));
+ }
}
}
- /**
- * @return the time that should be necessary to wait to respect limit. Can be negative time
- */
- private static long getTimeToWait(long limit, long bytes, long lastTime, long curtime) {
- long interval = curtime - lastTime;
- if (interval <= 0) {
- // Time is too short, so just lets continue
- return 0;
- }
- return (bytes * 1000 / limit - interval) / 10 * 10;
- }
-
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
long size = calculateSize(msg);
- long curtime = System.currentTimeMillis();
-
- if (trafficCounter != null) {
- trafficCounter.bytesRecvFlowControl(size);
- if (readLimit == 0) {
- // no action
- ctx.fireChannelRead(msg);
-
- return;
- }
+ if (size > 0 && trafficCounter != null) {
// compute the number of ms to wait before reopening the channel
- long wait = getTimeToWait(readLimit,
- trafficCounter.currentReadBytes(),
- trafficCounter.lastTime(), curtime);
+ long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime);
if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
- // time in order to
- // try to limit the traffic
- if (!isSuspended(ctx)) {
+ // time in order to try to limit the traffic
+ // Only AutoRead AND HandlerActive True means Context Active
+ if (logger.isDebugEnabled()) {
+ logger.debug("Read Suspend: " + wait + ":" + ctx.channel().config().isAutoRead() + ":"
+ + isHandlerActive(ctx));
+ }
+ if (ctx.channel().config().isAutoRead() && isHandlerActive(ctx)) {
+ ctx.channel().config().setAutoRead(false);
ctx.attr(READ_SUSPENDED).set(true);
-
// Create a Runnable to reactive the read if needed. If one was create before it will just be
// reused to limit object creation
- Attribute attr = ctx.attr(REOPEN_TASK);
+ Attribute attr = ctx.attr(REOPEN_TASK);
Runnable reopenTask = attr.get();
if (reopenTask == null) {
reopenTask = new ReopenReadTimerTask(ctx);
attr.set(reopenTask);
}
- ctx.executor().schedule(reopenTask, wait,
- TimeUnit.MILLISECONDS);
+ ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Suspend final status => " + ctx.channel().config().isAutoRead() + ":"
+ + isHandlerActive(ctx) + " will reopened at: " + wait);
+ }
}
}
}
ctx.fireChannelRead(msg);
}
- @Override
- public void read(ChannelHandlerContext ctx) {
- if (!isSuspended(ctx)) {
- ctx.read();
- }
+ protected static boolean isHandlerActive(ChannelHandlerContext ctx) {
+ Boolean suspended = ctx.attr(READ_SUSPENDED).get();
+ return suspended == null || Boolean.FALSE.equals(suspended);
}
- private static boolean isSuspended(ChannelHandlerContext ctx) {
- Boolean suspended = ctx.attr(READ_SUSPENDED).get();
- return !(suspended == null || Boolean.FALSE.equals(suspended));
+ @Override
+ public void read(ChannelHandlerContext ctx) {
+ if (isHandlerActive(ctx)) {
+ // For Global Traffic (and Read when using EventLoop in pipeline) : check if READ_SUSPENDED is False
+ ctx.read();
+ }
}
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
throws Exception {
- long curtime = System.currentTimeMillis();
long size = calculateSize(msg);
- if (size > -1 && trafficCounter != null) {
- trafficCounter.bytesWriteFlowControl(size);
- if (writeLimit == 0) {
- ctx.write(msg, promise);
- return;
- }
- // compute the number of ms to wait before continue with the
- // channel
- long wait = getTimeToWait(writeLimit,
- trafficCounter.currentWrittenBytes(),
- trafficCounter.lastTime(), curtime);
+ if (size > 0 && trafficCounter != null) {
+ // compute the number of ms to wait before continue with the channel
+ long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime);
if (wait >= MINIMAL_WAIT) {
- ctx.executor().schedule(new Runnable() {
- @Override
- public void run() {
- ctx.write(msg, promise);
- }
- }, wait, TimeUnit.MILLISECONDS);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Write suspend: " + wait + ":" + ctx.channel().config().isAutoRead() + ":"
+ + isHandlerActive(ctx));
+ }
+ /*
+ * Option 2: but issue with ctx.executor().schedule()
+ * Thread.sleep(wait);
+ * System.out.println("Write unsuspended");
+ * Option 1: use an ordered list of messages to send
+ * Warning of memory pressure!
+ */
+ submitWrite(ctx, msg, wait, promise);
return;
}
}
- ctx.write(msg, promise);
+ // to maintain order of write (if not using option 2)
+ submitWrite(ctx, msg, 0, promise);
}
+ protected abstract void submitWrite(final ChannelHandlerContext ctx, final Object msg, final long delay,
+ final ChannelPromise promise);
+
/**
*
* @return the current TrafficCounter (if
diff --git a/handler/src/main/java/io/netty/handler/traffic/ChannelTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/ChannelTrafficShapingHandler.java
index f296f6f691..0e4d73893c 100644
--- a/handler/src/main/java/io/netty/handler/traffic/ChannelTrafficShapingHandler.java
+++ b/handler/src/main/java/io/netty/handler/traffic/ChannelTrafficShapingHandler.java
@@ -15,7 +15,13 @@
*/
package io.netty.handler.traffic;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
/**
* This implementation of the {@link AbstractTrafficShapingHandler} is for channel
@@ -38,11 +44,32 @@ import io.netty.channel.ChannelHandlerContext;
* it is recommended to set a positive value, even if it is high since the precision of the
* Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
* the less precise the traffic shaping will be. It is suggested as higher value something close
- * to 5 or 10 minutes.
+ * to 5 or 10 minutes.
+ *
+ * maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.
*
*
*/
public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler {
+ private List messagesQueue = new LinkedList();
+
+ /**
+ * Create a new instance
+ *
+ * @param writeLimit
+ * 0 or a limit in bytes/s
+ * @param readLimit
+ * 0 or a limit in bytes/s
+ * @param checkInterval
+ * The delay between two computations of performances for
+ * channels or 0 if no stats are to be computed
+ * @param maxTime
+ * The maximum delay to wait in case of traffic excess
+ */
+ public ChannelTrafficShapingHandler(long writeLimit, long readLimit,
+ long checkInterval, long maxTime) {
+ super(writeLimit, readLimit, checkInterval, maxTime);
+ }
/**
* Create a new instance
@@ -93,9 +120,57 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler
}
@Override
- public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
+ public synchronized void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
if (trafficCounter != null) {
trafficCounter.stop();
}
+ for (ToSend toSend : messagesQueue) {
+ if (toSend.toSend instanceof ByteBuf) {
+ ((ByteBuf) toSend.toSend).release();
+ }
+ }
+ messagesQueue.clear();
+ }
+
+ private static final class ToSend {
+ final long date;
+ final Object toSend;
+ final ChannelPromise promise;
+
+ private ToSend(final long delay, final Object toSend, final ChannelPromise promise) {
+ this.date = System.currentTimeMillis() + delay;
+ this.toSend = toSend;
+ this.promise = promise;
+ }
+ }
+
+ @Override
+ protected synchronized void submitWrite(final ChannelHandlerContext ctx, final Object msg, final long delay,
+ final ChannelPromise promise) {
+ if (delay == 0 && messagesQueue.isEmpty()) {
+ ctx.write(msg, promise);
+ return;
+ }
+ final ToSend newToSend = new ToSend(delay, msg, promise);
+ messagesQueue.add(newToSend);
+ ctx.executor().schedule(new Runnable() {
+ @Override
+ public void run() {
+ sendAllValid(ctx);
+ }
+ }, delay, TimeUnit.MILLISECONDS);
+ }
+
+ private synchronized void sendAllValid(ChannelHandlerContext ctx) {
+ while (!messagesQueue.isEmpty()) {
+ ToSend newToSend = messagesQueue.remove(0);
+ if (newToSend.date <= System.currentTimeMillis()) {
+ ctx.write(newToSend.toSend, newToSend.promise);
+ } else {
+ messagesQueue.add(0, newToSend);
+ break;
+ }
+ }
+ ctx.flush();
}
}
diff --git a/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java
index d5fc40dce4..a73c6d3978 100644
--- a/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java
+++ b/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java
@@ -15,10 +15,19 @@
*/
package io.netty.handler.traffic;
+import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.EventExecutor;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
/**
* This implementation of the {@link AbstractTrafficShapingHandler} is for global
@@ -43,7 +52,9 @@ import java.util.concurrent.ScheduledExecutorService;
* it is recommended to set a positive value, even if it is high since the precision of the
* Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
* the less precise the traffic shaping will be. It is suggested as higher value something close
- * to 5 or 10 minutes.
+ * to 5 or 10 minutes.
+ *
+ * maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.
*
*
*
@@ -52,6 +63,8 @@ import java.util.concurrent.ScheduledExecutorService;
*/
@Sharable
public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
+ private Map> messagesQueues = new HashMap>();
+
/**
* Create the global TrafficCounter
*/
@@ -65,6 +78,27 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
tc.start();
}
+ /**
+ * Create a new instance
+ *
+ * @param executor
+ * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}
+ * @param writeLimit
+ * 0 or a limit in bytes/s
+ * @param readLimit
+ * 0 or a limit in bytes/s
+ * @param checkInterval
+ * The delay between two computations of performances for
+ * channels or 0 if no stats are to be computed
+ * @param maxTime
+ * The maximum delay to wait in case of traffic excess
+ */
+ public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit, long readLimit,
+ long checkInterval, long maxTime) {
+ super(writeLimit, readLimit, checkInterval, maxTime);
+ createGlobalTrafficCounter(executor);
+ }
+
/**
* Create a new instance
*
@@ -132,4 +166,74 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
trafficCounter.stop();
}
}
+
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+ Integer key = ctx.channel().hashCode();
+ List mq = new LinkedList();
+ messagesQueues.put(key, mq);
+ }
+
+ @Override
+ public synchronized void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
+ Integer key = ctx.channel().hashCode();
+ List mq = messagesQueues.remove(key);
+ if (mq != null) {
+ for (ToSend toSend : mq) {
+ if (toSend.toSend instanceof ByteBuf) {
+ ((ByteBuf) toSend.toSend).release();
+ }
+ }
+ mq.clear();
+ }
+ }
+
+ private static final class ToSend {
+ final long date;
+ final Object toSend;
+ final ChannelPromise promise;
+
+ private ToSend(final long delay, final Object toSend, final ChannelPromise promise) {
+ this.date = System.currentTimeMillis() + delay;
+ this.toSend = toSend;
+ this.promise = promise;
+ }
+ }
+
+ @Override
+ protected synchronized void submitWrite(final ChannelHandlerContext ctx, final Object msg, final long delay,
+ final ChannelPromise promise) {
+ Integer key = ctx.channel().hashCode();
+ List messagesQueue = messagesQueues.get(key);
+ if (delay == 0 && (messagesQueue == null || messagesQueue.isEmpty())) {
+ ctx.write(msg, promise);
+ return;
+ }
+ final ToSend newToSend = new ToSend(delay, msg, promise);
+ if (messagesQueue == null) {
+ messagesQueue = new LinkedList();
+ messagesQueues.put(key, messagesQueue);
+ }
+ messagesQueue.add(newToSend);
+ final List mqfinal = messagesQueue;
+ ctx.executor().schedule(new Runnable() {
+ @Override
+ public void run() {
+ sendAllValid(ctx, mqfinal);
+ }
+ }, delay, TimeUnit.MILLISECONDS);
+ }
+
+ private synchronized void sendAllValid(final ChannelHandlerContext ctx, final List messagesQueue) {
+ while (!messagesQueue.isEmpty()) {
+ ToSend newToSend = messagesQueue.remove(0);
+ if (newToSend.date <= System.currentTimeMillis()) {
+ ctx.write(newToSend.toSend, newToSend.promise);
+ } else {
+ messagesQueue.add(0, newToSend);
+ break;
+ }
+ }
+ ctx.flush();
+ }
}
diff --git a/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java b/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java
index 5d6d873d25..cf0ac5f691 100644
--- a/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java
+++ b/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java
@@ -15,12 +15,16 @@
*/
package io.netty.handler.traffic;
+import io.netty.util.internal.logging.InternalLogger;
+import io.netty.util.internal.logging.InternalLoggerFactory;
+
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+
/**
* TrafficCounter is associated with {@link AbstractTrafficShapingHandler}.
*
@@ -33,6 +37,9 @@ import java.util.concurrent.atomic.AtomicLong;
* write operation.
*/
public class TrafficCounter {
+ private static final InternalLogger logger =
+ InternalLoggerFactory.getInstance(TrafficCounter.class);
+
/**
* Current written bytes
*/
@@ -83,6 +90,26 @@ public class TrafficCounter {
*/
private long lastReadBytes;
+ /**
+ * Last non 0 written bytes number during last check interval
+ */
+ private long lastNonNullWrittenBytes;
+
+ /**
+ * Last time written bytes with non 0 written bytes
+ */
+ private long lastNonNullWrittenTime;
+
+ /**
+ * Last time read bytes with non 0 written bytes
+ */
+ private long lastNonNullReadTime;
+
+ /**
+ * Last non 0 read bytes number during last check interval
+ */
+ private long lastNonNullReadBytes;
+
/**
* Delay between two captures
*/
@@ -204,12 +231,23 @@ public class TrafficCounter {
// nothing to do
return;
}
+ if (logger.isDebugEnabled() && (interval > 2 * checkInterval())) {
+ logger.debug("Acct schedule not ok: " + interval + " > 2*" + checkInterval() + " from " + name);
+ }
lastReadBytes = currentReadBytes.getAndSet(0);
lastWrittenBytes = currentWrittenBytes.getAndSet(0);
lastReadThroughput = lastReadBytes / interval * 1000;
// nb byte / checkInterval in ms * 1000 (1s)
lastWriteThroughput = lastWrittenBytes / interval * 1000;
// nb byte / checkInterval in ms * 1000 (1s)
+ if (lastWrittenBytes > 0) {
+ lastNonNullWrittenBytes = lastWrittenBytes;
+ lastNonNullWrittenTime = newLastTime;
+ }
+ if (lastReadBytes > 0) {
+ lastNonNullReadBytes = lastReadBytes;
+ lastNonNullReadTime = newLastTime;
+ }
}
/**
@@ -373,6 +411,117 @@ public class TrafficCounter {
return name;
}
+ /**
+ * Returns the time to wait (if any) for the given length message, using the given limitTraffic and the max wait
+ * time
+ *
+ * @param size
+ * the recv size
+ * @param limitTraffic
+ * the traffic limit in bytes per second
+ * @param maxTime
+ * the max time in ms to wait in case of excess of traffic
+ * @return the current time to wait (in ms) if needed for Read operation
+ */
+ public synchronized long readTimeToWait(final long size, final long limitTraffic, final long maxTime) {
+ final long now = System.currentTimeMillis();
+ bytesRecvFlowControl(size);
+ if (limitTraffic == 0) {
+ return 0;
+ }
+ long sum = currentReadBytes.get();
+ long interval = now - lastTime.get();
+ // Short time checking
+ if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT && sum > 0) {
+ long time = (sum * 1000 / limitTraffic - interval) / 10 * 10;
+ if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Time: " + time + ":" + sum + ":" + interval);
+ }
+ return time > maxTime ? maxTime : time;
+ }
+ return 0;
+ }
+ // long time checking
+ if (lastNonNullReadBytes > 0 && lastNonNullReadTime + AbstractTrafficShapingHandler.MINIMAL_WAIT < now) {
+ long lastsum = sum + lastNonNullReadBytes;
+ long lastinterval = now - lastNonNullReadTime;
+ long time = (lastsum * 1000 / limitTraffic - lastinterval) / 10 * 10;
+ if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval);
+ }
+ return time > maxTime ? maxTime : time;
+ }
+ } else {
+ // final "middle" time checking in case resetAccounting called very recently
+ sum += lastReadBytes;
+ long lastinterval = AbstractTrafficShapingHandler.MINIMAL_WAIT;
+ long time = (sum * 1000 / limitTraffic - lastinterval) / 10 * 10;
+ if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Time: " + time + ":" + sum + ":" + lastinterval);
+ }
+ return time > maxTime ? maxTime : time;
+ }
+ }
+ return 0;
+ }
+
+ /**
+ * Returns the time to wait (if any) for the given length message, using the given limitTraffic and
+ * the max wait time
+ *
+ * @param size
+ * the write size
+ * @param limitTraffic
+ * the traffic limit in bytes per second
+ * @param maxTime
+ * the max time in ms to wait in case of excess of traffic
+ * @return the current time to wait (in ms) if needed for Write operation
+ */
+ public synchronized long writeTimeToWait(final long size, final long limitTraffic, final long maxTime) {
+ bytesWriteFlowControl(size);
+ if (limitTraffic == 0) {
+ return 0;
+ }
+ long sum = currentWrittenBytes.get();
+ final long now = System.currentTimeMillis();
+ long interval = now - lastTime.get();
+ if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT && sum > 0) {
+ long time = (sum * 1000 / limitTraffic - interval) / 10 * 10;
+ if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Time: " + time + ":" + sum + ":" + interval);
+ }
+ return time > maxTime ? maxTime : time;
+ }
+ return 0;
+ }
+ if (lastNonNullWrittenBytes > 0 && lastNonNullWrittenTime + AbstractTrafficShapingHandler.MINIMAL_WAIT < now) {
+ long lastsum = sum + lastNonNullWrittenBytes;
+ long lastinterval = now - lastNonNullWrittenTime;
+ long time = (lastsum * 1000 / limitTraffic - lastinterval) / 10 * 10;
+ if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval);
+ }
+ return time > maxTime ? maxTime : time;
+ }
+ } else {
+ sum += lastWrittenBytes;
+ long lastinterval = AbstractTrafficShapingHandler.MINIMAL_WAIT + Math.abs(interval);
+ long time = (sum * 1000 / limitTraffic - lastinterval) / 10 * 10;
+ if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Time: " + time + ":" + sum + ":" + lastinterval);
+ }
+ return time > maxTime ? maxTime : time;
+ }
+ }
+ return 0;
+ }
+
/**
* String information
*/
diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/TrafficShapingTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/TrafficShapingTest.java
new file mode 100644
index 0000000000..6ad0499901
--- /dev/null
+++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/TrafficShapingTest.java
@@ -0,0 +1,580 @@
+/*
+ * Copyright 2012 The Netty Project
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.testsuite.transport.socket;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.traffic.AbstractTrafficShapingHandler;
+import io.netty.handler.traffic.ChannelTrafficShapingHandler;
+import io.netty.handler.traffic.GlobalTrafficShapingHandler;
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import io.netty.util.concurrent.EventExecutorGroup;
+import io.netty.util.concurrent.Promise;
+import io.netty.util.internal.logging.InternalLogger;
+import io.netty.util.internal.logging.InternalLoggerFactory;
+import io.netty.util.internal.logging.Slf4JLoggerFactory;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.Logger;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.*;
+
+public class TrafficShapingTest extends AbstractSocketTest {
+ private static final InternalLogger logger = InternalLoggerFactory.getInstance(TrafficShapingTest.class);
+
+ static final int messageSize = 1024;
+ static final int bandwidthFactor = 15;
+ static final int minfactor = bandwidthFactor - (bandwidthFactor / 2) - 1;
+ static final int maxfactor = bandwidthFactor + (bandwidthFactor / 2);
+ static final long stepms = 1000 / bandwidthFactor;
+ static final long minimalms = Math.max(stepms / 2, 20) / 10 * 10;
+ static final long check = Math.max(Math.min(100, minimalms / 2) / 10 * 10, 20);
+ private static final Random random = new Random();
+ static final byte[] data = new byte[messageSize];
+
+ private static EventExecutorGroup group;
+ private static ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
+ static {
+ random.nextBytes(data);
+ }
+
+ @BeforeClass
+ public static void createGroup() {
+ InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
+ Logger logger = (Logger) LoggerFactory.getLogger("ROOT");
+ logger.setLevel(Level.INFO);
+ logger.info("Bandwidth: " + minfactor + " <= " + bandwidthFactor + " <= " + maxfactor +
+ " StepMs: " + stepms + " MinMs: " + minimalms + " CheckMs: " + check);
+ group = new DefaultEventExecutorGroup(8);
+ }
+
+ @AfterClass
+ public static void destroyGroup() throws Exception {
+ group.shutdownGracefully().sync();
+ }
+
+ private static long[] computeWaitRead(int[] multipleMessage) {
+ long[] minimalWaitBetween = new long[multipleMessage.length + 1];
+ minimalWaitBetween[0] = 0;
+ for (int i = 0; i < multipleMessage.length; i++) {
+ minimalWaitBetween[i + 1] = (multipleMessage[i] - 1) * stepms + minimalms;
+ }
+ return minimalWaitBetween;
+ }
+
+ private static long[] computeWaitWrite(int[] multipleMessage) {
+ long[] minimalWaitBetween = new long[multipleMessage.length + 1];
+ for (int i = 0; i < multipleMessage.length; i++) {
+ minimalWaitBetween[i] = (multipleMessage[i] - 1) * stepms + minimalms;
+ }
+ return minimalWaitBetween;
+ }
+
+ @Test(timeout = 10000)
+ public void testNoTrafficShapping() throws Throwable {
+ logger.info("TEST NO TRAFFIC");
+ run();
+ }
+
+ public void testNoTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
+ int[] autoRead = null;
+ int[] multipleMessage = { 1, 2, 1 };
+ long[] minimalWaitBetween = null;
+ testTrafficShapping0(sb, cb, false, false, false, false, autoRead, minimalWaitBetween, multipleMessage);
+ }
+
+ @Test(timeout = 15000)
+ public void testExecNoTrafficShapping() throws Throwable {
+ logger.info("TEST EXEC NO TRAFFIC");
+ run();
+ }
+
+ public void testExecNoTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
+ int[] autoRead = null;
+ int[] multipleMessage = { 1, 2, 1 };
+ long[] minimalWaitBetween = null;
+ testTrafficShapping0(sb, cb, true, false, false, false, autoRead, minimalWaitBetween, multipleMessage);
+ }
+
+ @Test(timeout = 10000)
+ public void testWriteTrafficShapping() throws Throwable {
+ logger.info("TEST WRITE");
+ run();
+ }
+
+ public void testWriteTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
+ int[] autoRead = null;
+ int[] multipleMessage = { 1, 2, 1 };
+ long[] minimalWaitBetween = computeWaitWrite(multipleMessage);
+ testTrafficShapping0(sb, cb, false, false, true, false, autoRead, minimalWaitBetween, multipleMessage);
+ }
+
+ @Test(timeout = 10000)
+ public void testReadTrafficShapping() throws Throwable {
+ logger.info("TEST READ");
+ run();
+ }
+
+ public void testReadTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
+ int[] autoRead = null;
+ int[] multipleMessage = { 1, 2, 1, 1 };
+ long[] minimalWaitBetween = computeWaitRead(multipleMessage);
+ testTrafficShapping0(sb, cb, false, true, false, false, autoRead, minimalWaitBetween, multipleMessage);
+ }
+
+ @Test(timeout = 10000)
+ public void testWrite1TrafficShapping() throws Throwable {
+ logger.info("TEST WRITE");
+ run();
+ }
+
+ public void testWrite1TrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
+ int[] autoRead = null;
+ int[] multipleMessage = { 1, 1, 1 };
+ long[] minimalWaitBetween = computeWaitWrite(multipleMessage);
+ testTrafficShapping0(sb, cb, false, false, true, false, autoRead, minimalWaitBetween, multipleMessage);
+ }
+
+ @Test(timeout = 10000)
+ public void testRead1TrafficShapping() throws Throwable {
+ logger.info("TEST READ");
+ run();
+ }
+
+ public void testRead1TrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
+ int[] autoRead = null;
+ int[] multipleMessage = { 1, 1, 1 };
+ long[] minimalWaitBetween = computeWaitRead(multipleMessage);
+ testTrafficShapping0(sb, cb, false, true, false, false, autoRead, minimalWaitBetween, multipleMessage);
+ }
+
+ @Test(timeout = 15000)
+ public void testExecWriteTrafficShapping() throws Throwable {
+ logger.info("TEST EXEC WRITE");
+ run();
+ }
+
+ public void testExecWriteTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
+ int[] autoRead = null;
+ int[] multipleMessage = { 1, 2, 1 };
+ long[] minimalWaitBetween = computeWaitWrite(multipleMessage);
+ testTrafficShapping0(sb, cb, true, false, true, false, autoRead, minimalWaitBetween, multipleMessage);
+ }
+
+ @Test(timeout = 10000)
+ public void testExecReadTrafficShapping() throws Throwable {
+ logger.info("TEST EXEC READ");
+ run();
+ }
+
+ public void testExecReadTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
+ int[] autoRead = null;
+ int[] multipleMessage = { 1, 2, 1, 1 };
+ long[] minimalWaitBetween = computeWaitRead(multipleMessage);
+ testTrafficShapping0(sb, cb, true, true, false, false, autoRead, minimalWaitBetween, multipleMessage);
+ }
+
+ @Test(timeout = 10000)
+ public void testWriteGlobalTrafficShapping() throws Throwable {
+ logger.info("TEST GLOBAL WRITE");
+ run();
+ }
+
+ public void testWriteGlobalTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
+ int[] autoRead = null;
+ int[] multipleMessage = { 1, 2, 1 };
+ long[] minimalWaitBetween = computeWaitWrite(multipleMessage);
+ testTrafficShapping0(sb, cb, false, false, true, true, autoRead, minimalWaitBetween, multipleMessage);
+ }
+
+ @Test(timeout = 10000)
+ public void testReadGlobalTrafficShapping() throws Throwable {
+ logger.info("TEST GLOBAL READ");
+ run();
+ }
+
+ public void testReadGlobalTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
+ int[] autoRead = null;
+ int[] multipleMessage = { 1, 2, 1, 1 };
+ long[] minimalWaitBetween = computeWaitRead(multipleMessage);
+ testTrafficShapping0(sb, cb, false, true, false, true, autoRead, minimalWaitBetween, multipleMessage);
+ }
+
+ @Test(timeout = 10000)
+ public void testAutoReadTrafficShapping() throws Throwable {
+ logger.info("TEST AUTO READ");
+ run();
+ }
+
+ public void testAutoReadTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
+ int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 };
+ int[] multipleMessage = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
+ long[] minimalWaitBetween = computeWaitRead(multipleMessage);
+ testTrafficShapping0(sb, cb, false, true, false, false, autoRead, minimalWaitBetween, multipleMessage);
+ }
+ @Test(timeout = 10000)
+ public void testAutoReadGlobalTrafficShapping() throws Throwable {
+ logger.info("TEST AUTO READ GLOBAL");
+ run();
+ }
+
+ public void testAutoReadGlobalTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
+ int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 };
+ int[] multipleMessage = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
+ long[] minimalWaitBetween = computeWaitRead(multipleMessage);
+ testTrafficShapping0(sb, cb, false, true, false, true, autoRead, minimalWaitBetween, multipleMessage);
+ }
+ @Test(timeout = 10000)
+ public void testAutoReadExecTrafficShapping() throws Throwable {
+ logger.info("TEST AUTO READ EXEC");
+ run();
+ }
+
+ public void testAutoReadExecTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
+ int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 };
+ int[] multipleMessage = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
+ long[] minimalWaitBetween = computeWaitRead(multipleMessage);
+ testTrafficShapping0(sb, cb, true, true, false, false, autoRead, minimalWaitBetween, multipleMessage);
+ }
+ @Test(timeout = 10000)
+ public void testAutoReadExecGlobalTrafficShapping() throws Throwable {
+ logger.info("TEST AUTO READ EXEC GLOBAL");
+ run();
+ }
+
+ public void testAutoReadExecGlobalTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
+ int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 };
+ int[] multipleMessage = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
+ long[] minimalWaitBetween = computeWaitRead(multipleMessage);
+ testTrafficShapping0(sb, cb, true, true, false, true, autoRead, minimalWaitBetween, multipleMessage);
+ }
+
+ /**
+ *
+ * @param sb
+ * @param cb
+ * @param additionalExecutor
+ * shall the pipeline add the handler using an additionnal executor
+ * @param limitRead
+ * True to set Read Limit on Server side
+ * @param limitWrite
+ * True to set Write Limit on Client side
+ * @param globalLimit
+ * True to change Channel to Global TrafficShapping
+ * @param autoRead
+ * @param minimalWaitBetween
+ * time in ms that should be waited before getting the final result (note: for READ the values are
+ * right shifted once, the first value being 0)
+ * @param multipleMessage
+ * how many message to send at each step (for READ: the first should be 1, as the two last steps to
+ * ensure correct testing)
+ * @throws Throwable
+ */
+ private static void testTrafficShapping0(ServerBootstrap sb, Bootstrap cb, final boolean additionalExecutor,
+ final boolean limitRead, final boolean limitWrite, final boolean globalLimit, int[] autoRead,
+ long[] minimalWaitBetween, int[] multipleMessage) throws Throwable {
+ logger.info("Exec: " + additionalExecutor + " Read: " + limitRead + " Write: " + limitWrite + " Global: "
+ + globalLimit);
+ final ValidTimestampedHandler sh = new ValidTimestampedHandler(autoRead, multipleMessage);
+ Promise promise = group.next().newPromise();
+ final ClientTrafficHandler ch = new ClientTrafficHandler(promise, minimalWaitBetween, multipleMessage,
+ autoRead);
+
+ final AbstractTrafficShapingHandler handler;
+ if (limitRead) {
+ if (globalLimit) {
+ handler = new GlobalTrafficShapingHandler(group, 0, bandwidthFactor * messageSize, check);
+ } else {
+ handler = new ChannelTrafficShapingHandler(0, bandwidthFactor * messageSize, check);
+ }
+ } else if (limitWrite) {
+ if (globalLimit) {
+ handler = new GlobalTrafficShapingHandler(group, bandwidthFactor * messageSize, 0, check);
+ } else {
+ handler = new ChannelTrafficShapingHandler(bandwidthFactor * messageSize, 0, check);
+ }
+ } else {
+ handler = null;
+ }
+
+ sb.childHandler(new ChannelInitializer() {
+ @Override
+ protected void initChannel(SocketChannel c) throws Exception {
+ if (limitRead) {
+ if (additionalExecutor) {
+ c.pipeline().addLast(group, handler);
+ } else {
+ c.pipeline().addLast(handler);
+ }
+ }
+ if (additionalExecutor) {
+ c.pipeline().addLast(group, sh);
+ } else {
+ c.pipeline().addLast(sh);
+ }
+ }
+ });
+ cb.handler(new ChannelInitializer() {
+ @Override
+ protected void initChannel(SocketChannel c) throws Exception {
+ if (limitWrite) {
+ if (additionalExecutor) {
+ c.pipeline().addLast(group, handler);
+ } else {
+ c.pipeline().addLast(handler);
+ }
+ }
+ if (additionalExecutor) {
+ c.pipeline().addLast(group, ch);
+ } else {
+ c.pipeline().addLast(ch);
+ }
+ }
+ });
+
+ Channel sc = sb.bind().sync().channel();
+ Channel cc = cb.connect().sync().channel();
+
+ int totalNb = 0;
+ for (int i = 1; i < multipleMessage.length; i++) {
+ totalNb += multipleMessage[i];
+ }
+ Long start = System.currentTimeMillis();
+ int nb = multipleMessage[0];
+ for (int i = 0; i < nb; i++) {
+ cc.write(cc.alloc().buffer().writeBytes(data));
+ }
+ cc.flush();
+
+ promise.await();
+ Long stop = System.currentTimeMillis();
+ assertTrue("Error during exceution of TrafficShapping: " + promise.cause(), promise.isSuccess());
+
+ float average = (totalNb * messageSize) / (float) (stop - start);
+ logger.info("Average of traffic: " + average + " compare to " + bandwidthFactor);
+ sh.channel.close().sync();
+ ch.channel.close().sync();
+ sc.close().sync();
+ if (autoRead != null) {
+ // for extra release call in AutoRead
+ Thread.sleep(minimalms);
+ }
+
+ if (autoRead == null && minimalWaitBetween != null) {
+ assertTrue("Overall Traffic not ok since > " + maxfactor + ": " + average,
+ average <= maxfactor);
+ if (additionalExecutor) {
+ // Oio is not as good when using additionalExecutor
+ assertTrue("Overall Traffic not ok since < 0.25: " + average, average >= 0.25);
+ } else {
+ assertTrue("Overall Traffic not ok since < " + minfactor + ": " + average,
+ average >= minfactor);
+ }
+ }
+ if (handler != null && globalLimit) {
+ ((GlobalTrafficShapingHandler) handler).release();
+ }
+
+ if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
+ throw sh.exception.get();
+ }
+ if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
+ throw ch.exception.get();
+ }
+ if (sh.exception.get() != null) {
+ throw sh.exception.get();
+ }
+ if (ch.exception.get() != null) {
+ throw ch.exception.get();
+ }
+ }
+
+ private static class ClientTrafficHandler extends SimpleChannelInboundHandler {
+ volatile Channel channel;
+ final AtomicReference exception = new AtomicReference();
+ volatile int step;
+ // first message will always be validated
+ private long currentLastTime = System.currentTimeMillis();
+ private final long[] minimalWaitBetween;
+ private final int[] multipleMessage;
+ private final int[] autoRead;
+ final Promise promise;
+
+ ClientTrafficHandler(Promise promise, long[] minimalWaitBetween, int[] multipleMessage,
+ int[] autoRead) {
+ this.minimalWaitBetween = minimalWaitBetween;
+ this.multipleMessage = Arrays.copyOf(multipleMessage, multipleMessage.length);
+ this.promise = promise;
+ this.autoRead = autoRead;
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ channel = ctx.channel();
+ }
+
+ @Override
+ public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
+ long lastTimestamp = 0;
+ while (in.isReadable()) {
+ lastTimestamp = in.readLong();
+ multipleMessage[step]--;
+ }
+ if (multipleMessage[step] > 0) {
+ // still some message to get
+ return;
+ }
+ long minimalWait = (minimalWaitBetween != null) ? minimalWaitBetween[step] : 0;
+ int ar = 0;
+ if (autoRead != null) {
+ if (step > 0 && autoRead[step - 1] != 0) {
+ ar = autoRead[step - 1];
+ if (ar > 0) {
+ minimalWait = -1;
+ } else {
+ minimalWait = minimalms / 3;
+ }
+ } else {
+ minimalWait = 0;
+ }
+ }
+ logger.info("Step: " + step + " Interval: " + (lastTimestamp - currentLastTime) + " compareTo "
+ + minimalWait + " (" + ar + ")");
+ assertTrue("The interval of time is incorrect:" + (lastTimestamp - currentLastTime) + " not> "
+ + minimalWait, lastTimestamp - currentLastTime >= minimalWait);
+ currentLastTime = lastTimestamp;
+ step++;
+ if (multipleMessage.length > step) {
+ int nb = multipleMessage[step];
+ for (int i = 0; i < nb; i++) {
+ channel.write(channel.alloc().buffer().writeBytes(data));
+ }
+ channel.flush();
+ } else {
+ promise.setSuccess(true);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ if (exception.compareAndSet(null, cause)) {
+ cause.printStackTrace();
+ promise.setFailure(cause);
+ ctx.close();
+ }
+ }
+ }
+
+ private static class ValidTimestampedHandler extends SimpleChannelInboundHandler {
+ private final int[] autoRead;
+ private final int[] multipleMessage;
+ volatile Channel channel;
+ volatile int step;
+ final AtomicReference exception = new AtomicReference();
+
+ ValidTimestampedHandler(int[] autoRead, int[] multipleMessage) {
+ this.autoRead = autoRead;
+ this.multipleMessage = Arrays.copyOf(multipleMessage, multipleMessage.length);
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ channel = ctx.channel();
+ }
+
+ @Override
+ public void channelRead0(final ChannelHandlerContext ctx, ByteBuf in) throws Exception {
+ byte[] actual = new byte[in.readableBytes()];
+ in.readBytes(actual);
+ long timestamp = System.currentTimeMillis();
+ int nb = actual.length / messageSize;
+ int isAutoRead = 0;
+ int laststep = step;
+ for (int i = 0; i < nb; i++) {
+ multipleMessage[step]--;
+ if (multipleMessage[step] == 0) {
+ if (autoRead != null) {
+ isAutoRead = autoRead[step];
+ }
+ step++;
+ }
+ }
+ if (laststep != step && autoRead != null && isAutoRead != 2) {
+ if (isAutoRead != 0) {
+ logger.info("Set AutoRead: " + (isAutoRead > 0) + " Step: " + step);
+ channel.config().setAutoRead(isAutoRead > 0);
+ } else {
+ logger.info("AutoRead: NO Step:" + step);
+ }
+ }
+ logger.debug("Get: " + actual.length + " TS " + timestamp + " NB: " + nb);
+ for (int i = 0; i < nb; i++) {
+ channel.write(Unpooled.copyLong(timestamp));
+ }
+ channel.flush();
+ if (laststep != step && isAutoRead != 0) {
+ if (isAutoRead < 0) {
+ final int exactStep = step;
+ long wait = (isAutoRead == -1) ? minimalms : stepms + minimalms;
+ if (isAutoRead == -3) {
+ wait = stepms * 3;
+ }
+ executor.schedule(new Runnable() {
+ public void run() {
+ logger.info("Reset AutoRead: Step " + exactStep);
+ channel.config().setAutoRead(true);
+ }
+ }, wait, TimeUnit.MILLISECONDS);
+ } else {
+ if (isAutoRead > 1) {
+ logger.info("Will Set AutoRead: Rrue, Step: " + step);
+ executor.schedule(new Runnable() {
+ public void run() {
+ logger.info("Set AutoRead: Rrue, Step: " + step);
+ channel.config().setAutoRead(true);
+ }
+ }, stepms + minimalms, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ if (exception.compareAndSet(null, cause)) {
+ cause.printStackTrace();
+ ctx.close();
+ }
+ }
+ }
+}