From bc1379d19dcf59626ef8d5b431830a189957e3bd Mon Sep 17 00:00:00 2001 From: fbregier Date: Fri, 1 Aug 2014 10:13:00 +0200 Subject: [PATCH] [#2721] Improve Traffic Shaping handler Motivation: Currently Traffic Shaping is using 1 timer only and could lead to "partial" wrong bandwidth computation when "short" time occurs between adding used bytes and when the TrafficCounter updates itself and finally when the traffic is computed. Indeed, the TrafficCounter is updated every x delay and it is at the same time saved into "lastXxxxBytes" and set to 0. Therefore, when one request the counter, it first updates the TrafficCounter with the added used bytes. If this value is set just before the TrafficCounter is updated, then the bandwidth computation will use the TrafficCounter with a "0" value (this value being reset once the delay occurs). Therefore, the traffic shaping computation is wrong in rare cases. Secondly the traffic shapping should avoid if possible the "Timeout" effect by not stopping reading or writing more than a maxTime, this maxTime being less than the TimeOut limit. Thirdly the traffic shapping in read had an issue since the readOp was not set but should, turning in no read blocking from socket point of view. Modifications: The TrafficCounter has 2 new methods that compute the time to wait according to read or write) using in priority the currentXxxxBytes (as before), but could used (if current is at 0) the lastXxxxxBytes, and therefore having more chance to take into account the real traffic. Moreover the Handler could change the default "max time to wait", which is by default set to half of "standard" Time Out (30s:2 = 15s). Finally we add the setAutoRead(boolean) accordingly to the situation, as proposed in #2696 (this pull request is in error for unknown reason). Result: The Traffic Shaping is better take into account (no 0 value when it shouldn't) and it tries to not block traffic more than Time Out event. Moreover the read is really stopped from socket point of view. This version is similar to #2388 and #2450. This version is for V4.1, and includes the #2696 pull request to ease the merge process. It is compatible with master too. Including also #2748 The test minimizes time check by reducing to 66ms steps (55s). --- .../AbstractTrafficShapingHandler.java | 243 ++++++-- .../traffic/ChannelTrafficShapingHandler.java | 79 ++- .../traffic/GlobalTrafficShapingHandler.java | 106 +++- .../netty/handler/traffic/TrafficCounter.java | 149 +++++ .../transport/socket/TrafficShapingTest.java | 580 ++++++++++++++++++ 5 files changed, 1086 insertions(+), 71 deletions(-) create mode 100644 testsuite/src/test/java/io/netty/testsuite/transport/socket/TrafficShapingTest.java diff --git a/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java index c1e9e75451..5eb44650de 100644 --- a/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java +++ b/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java @@ -22,6 +22,8 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.util.Attribute; import io.netty.util.AttributeKey; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; import java.util.concurrent.TimeUnit; @@ -43,15 +45,24 @@ import java.util.concurrent.TimeUnit; * */ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler { + private static final InternalLogger logger = + InternalLoggerFactory.getInstance(AbstractTrafficShapingHandler.class); /** * Default delay between two checks: 1s */ public static final long DEFAULT_CHECK_INTERVAL = 1000; + /** + * Default max delay in case of traffic shaping + * (during which no communication will occur). + * Shall be less than TIMEOUT. Here half of "standard" 30s + */ + public static final long DEFAULT_MAX_TIME = 15000; + /** * Default minimal time to wait */ - private static final long MINIMAL_WAIT = 10; + static final long MINIMAL_WAIT = 10; /** * Traffic Counter @@ -68,6 +79,11 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler */ private long readLimit; + /** + * Max delay in wait + */ + protected long maxTime = DEFAULT_MAX_TIME; // default 15 s + /** * Delay between two performance snapshots */ @@ -94,12 +110,29 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler * @param checkInterval * The delay between two computations of performances for * channels or 0 if no stats are to be computed + * @param maxTime + * The maximum delay to wait in case of traffic excess */ protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, - long checkInterval) { + long checkInterval, long maxTime) { this.writeLimit = writeLimit; this.readLimit = readLimit; this.checkInterval = checkInterval; + this.maxTime = maxTime; + } + + /** + * @param writeLimit + * 0 or a limit in bytes/s + * @param readLimit + * 0 or a limit in bytes/s + * @param checkInterval + * The delay between two computations of performances for + * channels or 0 if no stats are to be computed + */ + protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, + long checkInterval) { + this(writeLimit, readLimit, checkInterval, DEFAULT_MAX_TIME); } /** @@ -111,14 +144,14 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler * 0 or a limit in bytes/s */ protected AbstractTrafficShapingHandler(long writeLimit, long readLimit) { - this(writeLimit, readLimit, DEFAULT_CHECK_INTERVAL); + this(writeLimit, readLimit, DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME); } /** * Constructor using NO LIMIT and default Check Interval */ protected AbstractTrafficShapingHandler() { - this(0, 0, DEFAULT_CHECK_INTERVAL); + this(0, 0, DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME); } /** @@ -129,7 +162,7 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler * channels or 0 if no stats are to be computed */ protected AbstractTrafficShapingHandler(long checkInterval) { - this(0, 0, checkInterval); + this(0, 0, checkInterval, DEFAULT_MAX_TIME); } /** @@ -171,6 +204,73 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler } } + /** + * @return the writeLimit + */ + public long getWriteLimit() { + return writeLimit; + } + + /** + * @param writeLimit the writeLimit to set + */ + public void setWriteLimit(long writeLimit) { + this.writeLimit = writeLimit; + if (trafficCounter != null) { + trafficCounter.resetAccounting(System.currentTimeMillis() + 1); + } + } + + /** + * @return the readLimit + */ + public long getReadLimit() { + return readLimit; + } + + /** + * @param readLimit the readLimit to set + */ + public void setReadLimit(long readLimit) { + this.readLimit = readLimit; + if (trafficCounter != null) { + trafficCounter.resetAccounting(System.currentTimeMillis() + 1); + } + } + + /** + * @return the checkInterval + */ + public long getCheckInterval() { + return checkInterval; + } + + /** + * @param checkInterval the checkInterval to set + */ + public void setCheckInterval(long checkInterval) { + this.checkInterval = checkInterval; + if (trafficCounter != null) { + trafficCounter.configure(checkInterval); + } + } + + /** + * + * @param maxTime + * Max delay in wait, shall be less than TIME OUT in related protocol + */ + public void setMaxTimeWait(long maxTime) { + this.maxTime = maxTime; + } + + /** + * @return the max delay in wait + */ + public long getMaxTimeWait() { + return maxTime; + } + /** * Called each time the accounting is computed from the TrafficCounters. * This method could be used for instance to implement almost real time accounting. @@ -178,7 +278,6 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler * @param counter * the TrafficCounter that computes its performance */ - @SuppressWarnings("unused") protected void doAccounting(TrafficCounter counter) { // NOOP by default } @@ -192,107 +291,115 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler this.ctx = ctx; } - @Override public void run() { - ctx.attr(READ_SUSPENDED).set(false); - ctx.read(); + if (!ctx.channel().config().isAutoRead() && isHandlerActive(ctx)) { + // If AutoRead is False and Active is True, user make a direct setAutoRead(false) + // Then Just reset the status + if (logger.isDebugEnabled()) { + logger.debug("Not Unsuspend: " + ctx.channel().config().isAutoRead() + ":" + isHandlerActive(ctx)); + } + ctx.attr(READ_SUSPENDED).set(false); + } else { + // Anything else allows the handler to reset the AutoRead + if (logger.isDebugEnabled()) { + if (ctx.channel().config().isAutoRead() && !isHandlerActive(ctx)) { + logger.debug("Unsuspend: " + ctx.channel().config().isAutoRead() + ":" + isHandlerActive(ctx)); + } else { + logger.debug("Normal Unsuspend: " + ctx.channel().config().isAutoRead() + ":" + + isHandlerActive(ctx)); + } + } + ctx.attr(READ_SUSPENDED).set(false); + ctx.channel().config().setAutoRead(true); + ctx.channel().read(); + } + if (logger.isDebugEnabled()) { + logger.debug("Unsupsend final status => " + ctx.channel().config().isAutoRead() + ":" + + isHandlerActive(ctx)); + } } } - /** - * @return the time that should be necessary to wait to respect limit. Can be negative time - */ - private static long getTimeToWait(long limit, long bytes, long lastTime, long curtime) { - long interval = curtime - lastTime; - if (interval <= 0) { - // Time is too short, so just lets continue - return 0; - } - return (bytes * 1000 / limit - interval) / 10 * 10; - } - @Override public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { long size = calculateSize(msg); - long curtime = System.currentTimeMillis(); - - if (trafficCounter != null) { - trafficCounter.bytesRecvFlowControl(size); - if (readLimit == 0) { - // no action - ctx.fireChannelRead(msg); - - return; - } + if (size > 0 && trafficCounter != null) { // compute the number of ms to wait before reopening the channel - long wait = getTimeToWait(readLimit, - trafficCounter.currentReadBytes(), - trafficCounter.lastTime(), curtime); + long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime); if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal - // time in order to - // try to limit the traffic - if (!isSuspended(ctx)) { + // time in order to try to limit the traffic + // Only AutoRead AND HandlerActive True means Context Active + if (logger.isDebugEnabled()) { + logger.debug("Read Suspend: " + wait + ":" + ctx.channel().config().isAutoRead() + ":" + + isHandlerActive(ctx)); + } + if (ctx.channel().config().isAutoRead() && isHandlerActive(ctx)) { + ctx.channel().config().setAutoRead(false); ctx.attr(READ_SUSPENDED).set(true); - // Create a Runnable to reactive the read if needed. If one was create before it will just be // reused to limit object creation - Attribute attr = ctx.attr(REOPEN_TASK); + Attribute attr = ctx.attr(REOPEN_TASK); Runnable reopenTask = attr.get(); if (reopenTask == null) { reopenTask = new ReopenReadTimerTask(ctx); attr.set(reopenTask); } - ctx.executor().schedule(reopenTask, wait, - TimeUnit.MILLISECONDS); + ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS); + if (logger.isDebugEnabled()) { + logger.debug("Suspend final status => " + ctx.channel().config().isAutoRead() + ":" + + isHandlerActive(ctx) + " will reopened at: " + wait); + } } } } ctx.fireChannelRead(msg); } - @Override - public void read(ChannelHandlerContext ctx) { - if (!isSuspended(ctx)) { - ctx.read(); - } + protected static boolean isHandlerActive(ChannelHandlerContext ctx) { + Boolean suspended = ctx.attr(READ_SUSPENDED).get(); + return suspended == null || Boolean.FALSE.equals(suspended); } - private static boolean isSuspended(ChannelHandlerContext ctx) { - Boolean suspended = ctx.attr(READ_SUSPENDED).get(); - return !(suspended == null || Boolean.FALSE.equals(suspended)); + @Override + public void read(ChannelHandlerContext ctx) { + if (isHandlerActive(ctx)) { + // For Global Traffic (and Read when using EventLoop in pipeline) : check if READ_SUSPENDED is False + ctx.read(); + } } @Override public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception { - long curtime = System.currentTimeMillis(); long size = calculateSize(msg); - if (size > -1 && trafficCounter != null) { - trafficCounter.bytesWriteFlowControl(size); - if (writeLimit == 0) { - ctx.write(msg, promise); - return; - } - // compute the number of ms to wait before continue with the - // channel - long wait = getTimeToWait(writeLimit, - trafficCounter.currentWrittenBytes(), - trafficCounter.lastTime(), curtime); + if (size > 0 && trafficCounter != null) { + // compute the number of ms to wait before continue with the channel + long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime); if (wait >= MINIMAL_WAIT) { - ctx.executor().schedule(new Runnable() { - @Override - public void run() { - ctx.write(msg, promise); - } - }, wait, TimeUnit.MILLISECONDS); + if (logger.isDebugEnabled()) { + logger.debug("Write suspend: " + wait + ":" + ctx.channel().config().isAutoRead() + ":" + + isHandlerActive(ctx)); + } + /* + * Option 2: but issue with ctx.executor().schedule() + * Thread.sleep(wait); + * System.out.println("Write unsuspended"); + * Option 1: use an ordered list of messages to send + * Warning of memory pressure! + */ + submitWrite(ctx, msg, wait, promise); return; } } - ctx.write(msg, promise); + // to maintain order of write (if not using option 2) + submitWrite(ctx, msg, 0, promise); } + protected abstract void submitWrite(final ChannelHandlerContext ctx, final Object msg, final long delay, + final ChannelPromise promise); + /** * * @return the current TrafficCounter (if diff --git a/handler/src/main/java/io/netty/handler/traffic/ChannelTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/ChannelTrafficShapingHandler.java index f296f6f691..0e4d73893c 100644 --- a/handler/src/main/java/io/netty/handler/traffic/ChannelTrafficShapingHandler.java +++ b/handler/src/main/java/io/netty/handler/traffic/ChannelTrafficShapingHandler.java @@ -15,7 +15,13 @@ */ package io.netty.handler.traffic; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; /** * This implementation of the {@link AbstractTrafficShapingHandler} is for channel @@ -38,11 +44,32 @@ import io.netty.channel.ChannelHandlerContext; * it is recommended to set a positive value, even if it is high since the precision of the * Traffic Shaping depends on the period where the traffic is computed. The highest the interval, * the less precise the traffic shaping will be. It is suggested as higher value something close - * to 5 or 10 minutes.
+ * to 5 or 10 minutes.

+ * + * maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.
* *
*/ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler { + private List messagesQueue = new LinkedList(); + + /** + * Create a new instance + * + * @param writeLimit + * 0 or a limit in bytes/s + * @param readLimit + * 0 or a limit in bytes/s + * @param checkInterval + * The delay between two computations of performances for + * channels or 0 if no stats are to be computed + * @param maxTime + * The maximum delay to wait in case of traffic excess + */ + public ChannelTrafficShapingHandler(long writeLimit, long readLimit, + long checkInterval, long maxTime) { + super(writeLimit, readLimit, checkInterval, maxTime); + } /** * Create a new instance @@ -93,9 +120,57 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler } @Override - public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + public synchronized void handlerRemoved(ChannelHandlerContext ctx) throws Exception { if (trafficCounter != null) { trafficCounter.stop(); } + for (ToSend toSend : messagesQueue) { + if (toSend.toSend instanceof ByteBuf) { + ((ByteBuf) toSend.toSend).release(); + } + } + messagesQueue.clear(); + } + + private static final class ToSend { + final long date; + final Object toSend; + final ChannelPromise promise; + + private ToSend(final long delay, final Object toSend, final ChannelPromise promise) { + this.date = System.currentTimeMillis() + delay; + this.toSend = toSend; + this.promise = promise; + } + } + + @Override + protected synchronized void submitWrite(final ChannelHandlerContext ctx, final Object msg, final long delay, + final ChannelPromise promise) { + if (delay == 0 && messagesQueue.isEmpty()) { + ctx.write(msg, promise); + return; + } + final ToSend newToSend = new ToSend(delay, msg, promise); + messagesQueue.add(newToSend); + ctx.executor().schedule(new Runnable() { + @Override + public void run() { + sendAllValid(ctx); + } + }, delay, TimeUnit.MILLISECONDS); + } + + private synchronized void sendAllValid(ChannelHandlerContext ctx) { + while (!messagesQueue.isEmpty()) { + ToSend newToSend = messagesQueue.remove(0); + if (newToSend.date <= System.currentTimeMillis()) { + ctx.write(newToSend.toSend, newToSend.promise); + } else { + messagesQueue.add(0, newToSend); + break; + } + } + ctx.flush(); } } diff --git a/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java index d5fc40dce4..a73c6d3978 100644 --- a/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java +++ b/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java @@ -15,10 +15,19 @@ */ package io.netty.handler.traffic; +import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; import io.netty.util.concurrent.EventExecutor; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + /** * This implementation of the {@link AbstractTrafficShapingHandler} is for global @@ -43,7 +52,9 @@ import java.util.concurrent.ScheduledExecutorService; * it is recommended to set a positive value, even if it is high since the precision of the * Traffic Shaping depends on the period where the traffic is computed. The highest the interval, * the less precise the traffic shaping will be. It is suggested as higher value something close - * to 5 or 10 minutes.
+ * to 5 or 10 minutes.

+ * + * maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.
* *
* @@ -52,6 +63,8 @@ import java.util.concurrent.ScheduledExecutorService; */ @Sharable public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler { + private Map> messagesQueues = new HashMap>(); + /** * Create the global TrafficCounter */ @@ -65,6 +78,27 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler { tc.start(); } + /** + * Create a new instance + * + * @param executor + * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter} + * @param writeLimit + * 0 or a limit in bytes/s + * @param readLimit + * 0 or a limit in bytes/s + * @param checkInterval + * The delay between two computations of performances for + * channels or 0 if no stats are to be computed + * @param maxTime + * The maximum delay to wait in case of traffic excess + */ + public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit, long readLimit, + long checkInterval, long maxTime) { + super(writeLimit, readLimit, checkInterval, maxTime); + createGlobalTrafficCounter(executor); + } + /** * Create a new instance * @@ -132,4 +166,74 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler { trafficCounter.stop(); } } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + Integer key = ctx.channel().hashCode(); + List mq = new LinkedList(); + messagesQueues.put(key, mq); + } + + @Override + public synchronized void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + Integer key = ctx.channel().hashCode(); + List mq = messagesQueues.remove(key); + if (mq != null) { + for (ToSend toSend : mq) { + if (toSend.toSend instanceof ByteBuf) { + ((ByteBuf) toSend.toSend).release(); + } + } + mq.clear(); + } + } + + private static final class ToSend { + final long date; + final Object toSend; + final ChannelPromise promise; + + private ToSend(final long delay, final Object toSend, final ChannelPromise promise) { + this.date = System.currentTimeMillis() + delay; + this.toSend = toSend; + this.promise = promise; + } + } + + @Override + protected synchronized void submitWrite(final ChannelHandlerContext ctx, final Object msg, final long delay, + final ChannelPromise promise) { + Integer key = ctx.channel().hashCode(); + List messagesQueue = messagesQueues.get(key); + if (delay == 0 && (messagesQueue == null || messagesQueue.isEmpty())) { + ctx.write(msg, promise); + return; + } + final ToSend newToSend = new ToSend(delay, msg, promise); + if (messagesQueue == null) { + messagesQueue = new LinkedList(); + messagesQueues.put(key, messagesQueue); + } + messagesQueue.add(newToSend); + final List mqfinal = messagesQueue; + ctx.executor().schedule(new Runnable() { + @Override + public void run() { + sendAllValid(ctx, mqfinal); + } + }, delay, TimeUnit.MILLISECONDS); + } + + private synchronized void sendAllValid(final ChannelHandlerContext ctx, final List messagesQueue) { + while (!messagesQueue.isEmpty()) { + ToSend newToSend = messagesQueue.remove(0); + if (newToSend.date <= System.currentTimeMillis()) { + ctx.write(newToSend.toSend, newToSend.promise); + } else { + messagesQueue.add(0, newToSend); + break; + } + } + ctx.flush(); + } } diff --git a/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java b/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java index 5d6d873d25..cf0ac5f691 100644 --- a/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java +++ b/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java @@ -15,12 +15,16 @@ */ package io.netty.handler.traffic; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; + /** * TrafficCounter is associated with {@link AbstractTrafficShapingHandler}. * @@ -33,6 +37,9 @@ import java.util.concurrent.atomic.AtomicLong; * write operation.

*/ public class TrafficCounter { + private static final InternalLogger logger = + InternalLoggerFactory.getInstance(TrafficCounter.class); + /** * Current written bytes */ @@ -83,6 +90,26 @@ public class TrafficCounter { */ private long lastReadBytes; + /** + * Last non 0 written bytes number during last check interval + */ + private long lastNonNullWrittenBytes; + + /** + * Last time written bytes with non 0 written bytes + */ + private long lastNonNullWrittenTime; + + /** + * Last time read bytes with non 0 written bytes + */ + private long lastNonNullReadTime; + + /** + * Last non 0 read bytes number during last check interval + */ + private long lastNonNullReadBytes; + /** * Delay between two captures */ @@ -204,12 +231,23 @@ public class TrafficCounter { // nothing to do return; } + if (logger.isDebugEnabled() && (interval > 2 * checkInterval())) { + logger.debug("Acct schedule not ok: " + interval + " > 2*" + checkInterval() + " from " + name); + } lastReadBytes = currentReadBytes.getAndSet(0); lastWrittenBytes = currentWrittenBytes.getAndSet(0); lastReadThroughput = lastReadBytes / interval * 1000; // nb byte / checkInterval in ms * 1000 (1s) lastWriteThroughput = lastWrittenBytes / interval * 1000; // nb byte / checkInterval in ms * 1000 (1s) + if (lastWrittenBytes > 0) { + lastNonNullWrittenBytes = lastWrittenBytes; + lastNonNullWrittenTime = newLastTime; + } + if (lastReadBytes > 0) { + lastNonNullReadBytes = lastReadBytes; + lastNonNullReadTime = newLastTime; + } } /** @@ -373,6 +411,117 @@ public class TrafficCounter { return name; } + /** + * Returns the time to wait (if any) for the given length message, using the given limitTraffic and the max wait + * time + * + * @param size + * the recv size + * @param limitTraffic + * the traffic limit in bytes per second + * @param maxTime + * the max time in ms to wait in case of excess of traffic + * @return the current time to wait (in ms) if needed for Read operation + */ + public synchronized long readTimeToWait(final long size, final long limitTraffic, final long maxTime) { + final long now = System.currentTimeMillis(); + bytesRecvFlowControl(size); + if (limitTraffic == 0) { + return 0; + } + long sum = currentReadBytes.get(); + long interval = now - lastTime.get(); + // Short time checking + if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT && sum > 0) { + long time = (sum * 1000 / limitTraffic - interval) / 10 * 10; + if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) { + if (logger.isDebugEnabled()) { + logger.debug("Time: " + time + ":" + sum + ":" + interval); + } + return time > maxTime ? maxTime : time; + } + return 0; + } + // long time checking + if (lastNonNullReadBytes > 0 && lastNonNullReadTime + AbstractTrafficShapingHandler.MINIMAL_WAIT < now) { + long lastsum = sum + lastNonNullReadBytes; + long lastinterval = now - lastNonNullReadTime; + long time = (lastsum * 1000 / limitTraffic - lastinterval) / 10 * 10; + if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) { + if (logger.isDebugEnabled()) { + logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval); + } + return time > maxTime ? maxTime : time; + } + } else { + // final "middle" time checking in case resetAccounting called very recently + sum += lastReadBytes; + long lastinterval = AbstractTrafficShapingHandler.MINIMAL_WAIT; + long time = (sum * 1000 / limitTraffic - lastinterval) / 10 * 10; + if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) { + if (logger.isDebugEnabled()) { + logger.debug("Time: " + time + ":" + sum + ":" + lastinterval); + } + return time > maxTime ? maxTime : time; + } + } + return 0; + } + + /** + * Returns the time to wait (if any) for the given length message, using the given limitTraffic and + * the max wait time + * + * @param size + * the write size + * @param limitTraffic + * the traffic limit in bytes per second + * @param maxTime + * the max time in ms to wait in case of excess of traffic + * @return the current time to wait (in ms) if needed for Write operation + */ + public synchronized long writeTimeToWait(final long size, final long limitTraffic, final long maxTime) { + bytesWriteFlowControl(size); + if (limitTraffic == 0) { + return 0; + } + long sum = currentWrittenBytes.get(); + final long now = System.currentTimeMillis(); + long interval = now - lastTime.get(); + if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT && sum > 0) { + long time = (sum * 1000 / limitTraffic - interval) / 10 * 10; + if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) { + if (logger.isDebugEnabled()) { + logger.debug("Time: " + time + ":" + sum + ":" + interval); + } + return time > maxTime ? maxTime : time; + } + return 0; + } + if (lastNonNullWrittenBytes > 0 && lastNonNullWrittenTime + AbstractTrafficShapingHandler.MINIMAL_WAIT < now) { + long lastsum = sum + lastNonNullWrittenBytes; + long lastinterval = now - lastNonNullWrittenTime; + long time = (lastsum * 1000 / limitTraffic - lastinterval) / 10 * 10; + if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) { + if (logger.isDebugEnabled()) { + logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval); + } + return time > maxTime ? maxTime : time; + } + } else { + sum += lastWrittenBytes; + long lastinterval = AbstractTrafficShapingHandler.MINIMAL_WAIT + Math.abs(interval); + long time = (sum * 1000 / limitTraffic - lastinterval) / 10 * 10; + if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) { + if (logger.isDebugEnabled()) { + logger.debug("Time: " + time + ":" + sum + ":" + lastinterval); + } + return time > maxTime ? maxTime : time; + } + } + return 0; + } + /** * String information */ diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/TrafficShapingTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/TrafficShapingTest.java new file mode 100644 index 0000000000..6ad0499901 --- /dev/null +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/TrafficShapingTest.java @@ -0,0 +1,580 @@ +/* + * Copyright 2012 The Netty Project + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport.socket; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.traffic.AbstractTrafficShapingHandler; +import io.netty.handler.traffic.ChannelTrafficShapingHandler; +import io.netty.handler.traffic.GlobalTrafficShapingHandler; +import io.netty.util.concurrent.DefaultEventExecutorGroup; +import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.Promise; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; +import io.netty.util.internal.logging.Slf4JLoggerFactory; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.LoggerFactory; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.*; + +public class TrafficShapingTest extends AbstractSocketTest { + private static final InternalLogger logger = InternalLoggerFactory.getInstance(TrafficShapingTest.class); + + static final int messageSize = 1024; + static final int bandwidthFactor = 15; + static final int minfactor = bandwidthFactor - (bandwidthFactor / 2) - 1; + static final int maxfactor = bandwidthFactor + (bandwidthFactor / 2); + static final long stepms = 1000 / bandwidthFactor; + static final long minimalms = Math.max(stepms / 2, 20) / 10 * 10; + static final long check = Math.max(Math.min(100, minimalms / 2) / 10 * 10, 20); + private static final Random random = new Random(); + static final byte[] data = new byte[messageSize]; + + private static EventExecutorGroup group; + private static ScheduledExecutorService executor = Executors.newScheduledThreadPool(10); + static { + random.nextBytes(data); + } + + @BeforeClass + public static void createGroup() { + InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory()); + Logger logger = (Logger) LoggerFactory.getLogger("ROOT"); + logger.setLevel(Level.INFO); + logger.info("Bandwidth: " + minfactor + " <= " + bandwidthFactor + " <= " + maxfactor + + " StepMs: " + stepms + " MinMs: " + minimalms + " CheckMs: " + check); + group = new DefaultEventExecutorGroup(8); + } + + @AfterClass + public static void destroyGroup() throws Exception { + group.shutdownGracefully().sync(); + } + + private static long[] computeWaitRead(int[] multipleMessage) { + long[] minimalWaitBetween = new long[multipleMessage.length + 1]; + minimalWaitBetween[0] = 0; + for (int i = 0; i < multipleMessage.length; i++) { + minimalWaitBetween[i + 1] = (multipleMessage[i] - 1) * stepms + minimalms; + } + return minimalWaitBetween; + } + + private static long[] computeWaitWrite(int[] multipleMessage) { + long[] minimalWaitBetween = new long[multipleMessage.length + 1]; + for (int i = 0; i < multipleMessage.length; i++) { + minimalWaitBetween[i] = (multipleMessage[i] - 1) * stepms + minimalms; + } + return minimalWaitBetween; + } + + @Test(timeout = 10000) + public void testNoTrafficShapping() throws Throwable { + logger.info("TEST NO TRAFFIC"); + run(); + } + + public void testNoTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = null; + int[] multipleMessage = { 1, 2, 1 }; + long[] minimalWaitBetween = null; + testTrafficShapping0(sb, cb, false, false, false, false, autoRead, minimalWaitBetween, multipleMessage); + } + + @Test(timeout = 15000) + public void testExecNoTrafficShapping() throws Throwable { + logger.info("TEST EXEC NO TRAFFIC"); + run(); + } + + public void testExecNoTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = null; + int[] multipleMessage = { 1, 2, 1 }; + long[] minimalWaitBetween = null; + testTrafficShapping0(sb, cb, true, false, false, false, autoRead, minimalWaitBetween, multipleMessage); + } + + @Test(timeout = 10000) + public void testWriteTrafficShapping() throws Throwable { + logger.info("TEST WRITE"); + run(); + } + + public void testWriteTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = null; + int[] multipleMessage = { 1, 2, 1 }; + long[] minimalWaitBetween = computeWaitWrite(multipleMessage); + testTrafficShapping0(sb, cb, false, false, true, false, autoRead, minimalWaitBetween, multipleMessage); + } + + @Test(timeout = 10000) + public void testReadTrafficShapping() throws Throwable { + logger.info("TEST READ"); + run(); + } + + public void testReadTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = null; + int[] multipleMessage = { 1, 2, 1, 1 }; + long[] minimalWaitBetween = computeWaitRead(multipleMessage); + testTrafficShapping0(sb, cb, false, true, false, false, autoRead, minimalWaitBetween, multipleMessage); + } + + @Test(timeout = 10000) + public void testWrite1TrafficShapping() throws Throwable { + logger.info("TEST WRITE"); + run(); + } + + public void testWrite1TrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = null; + int[] multipleMessage = { 1, 1, 1 }; + long[] minimalWaitBetween = computeWaitWrite(multipleMessage); + testTrafficShapping0(sb, cb, false, false, true, false, autoRead, minimalWaitBetween, multipleMessage); + } + + @Test(timeout = 10000) + public void testRead1TrafficShapping() throws Throwable { + logger.info("TEST READ"); + run(); + } + + public void testRead1TrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = null; + int[] multipleMessage = { 1, 1, 1 }; + long[] minimalWaitBetween = computeWaitRead(multipleMessage); + testTrafficShapping0(sb, cb, false, true, false, false, autoRead, minimalWaitBetween, multipleMessage); + } + + @Test(timeout = 15000) + public void testExecWriteTrafficShapping() throws Throwable { + logger.info("TEST EXEC WRITE"); + run(); + } + + public void testExecWriteTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = null; + int[] multipleMessage = { 1, 2, 1 }; + long[] minimalWaitBetween = computeWaitWrite(multipleMessage); + testTrafficShapping0(sb, cb, true, false, true, false, autoRead, minimalWaitBetween, multipleMessage); + } + + @Test(timeout = 10000) + public void testExecReadTrafficShapping() throws Throwable { + logger.info("TEST EXEC READ"); + run(); + } + + public void testExecReadTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = null; + int[] multipleMessage = { 1, 2, 1, 1 }; + long[] minimalWaitBetween = computeWaitRead(multipleMessage); + testTrafficShapping0(sb, cb, true, true, false, false, autoRead, minimalWaitBetween, multipleMessage); + } + + @Test(timeout = 10000) + public void testWriteGlobalTrafficShapping() throws Throwable { + logger.info("TEST GLOBAL WRITE"); + run(); + } + + public void testWriteGlobalTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = null; + int[] multipleMessage = { 1, 2, 1 }; + long[] minimalWaitBetween = computeWaitWrite(multipleMessage); + testTrafficShapping0(sb, cb, false, false, true, true, autoRead, minimalWaitBetween, multipleMessage); + } + + @Test(timeout = 10000) + public void testReadGlobalTrafficShapping() throws Throwable { + logger.info("TEST GLOBAL READ"); + run(); + } + + public void testReadGlobalTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = null; + int[] multipleMessage = { 1, 2, 1, 1 }; + long[] minimalWaitBetween = computeWaitRead(multipleMessage); + testTrafficShapping0(sb, cb, false, true, false, true, autoRead, minimalWaitBetween, multipleMessage); + } + + @Test(timeout = 10000) + public void testAutoReadTrafficShapping() throws Throwable { + logger.info("TEST AUTO READ"); + run(); + } + + public void testAutoReadTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 }; + int[] multipleMessage = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }; + long[] minimalWaitBetween = computeWaitRead(multipleMessage); + testTrafficShapping0(sb, cb, false, true, false, false, autoRead, minimalWaitBetween, multipleMessage); + } + @Test(timeout = 10000) + public void testAutoReadGlobalTrafficShapping() throws Throwable { + logger.info("TEST AUTO READ GLOBAL"); + run(); + } + + public void testAutoReadGlobalTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 }; + int[] multipleMessage = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }; + long[] minimalWaitBetween = computeWaitRead(multipleMessage); + testTrafficShapping0(sb, cb, false, true, false, true, autoRead, minimalWaitBetween, multipleMessage); + } + @Test(timeout = 10000) + public void testAutoReadExecTrafficShapping() throws Throwable { + logger.info("TEST AUTO READ EXEC"); + run(); + } + + public void testAutoReadExecTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 }; + int[] multipleMessage = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }; + long[] minimalWaitBetween = computeWaitRead(multipleMessage); + testTrafficShapping0(sb, cb, true, true, false, false, autoRead, minimalWaitBetween, multipleMessage); + } + @Test(timeout = 10000) + public void testAutoReadExecGlobalTrafficShapping() throws Throwable { + logger.info("TEST AUTO READ EXEC GLOBAL"); + run(); + } + + public void testAutoReadExecGlobalTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 }; + int[] multipleMessage = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }; + long[] minimalWaitBetween = computeWaitRead(multipleMessage); + testTrafficShapping0(sb, cb, true, true, false, true, autoRead, minimalWaitBetween, multipleMessage); + } + + /** + * + * @param sb + * @param cb + * @param additionalExecutor + * shall the pipeline add the handler using an additionnal executor + * @param limitRead + * True to set Read Limit on Server side + * @param limitWrite + * True to set Write Limit on Client side + * @param globalLimit + * True to change Channel to Global TrafficShapping + * @param autoRead + * @param minimalWaitBetween + * time in ms that should be waited before getting the final result (note: for READ the values are + * right shifted once, the first value being 0) + * @param multipleMessage + * how many message to send at each step (for READ: the first should be 1, as the two last steps to + * ensure correct testing) + * @throws Throwable + */ + private static void testTrafficShapping0(ServerBootstrap sb, Bootstrap cb, final boolean additionalExecutor, + final boolean limitRead, final boolean limitWrite, final boolean globalLimit, int[] autoRead, + long[] minimalWaitBetween, int[] multipleMessage) throws Throwable { + logger.info("Exec: " + additionalExecutor + " Read: " + limitRead + " Write: " + limitWrite + " Global: " + + globalLimit); + final ValidTimestampedHandler sh = new ValidTimestampedHandler(autoRead, multipleMessage); + Promise promise = group.next().newPromise(); + final ClientTrafficHandler ch = new ClientTrafficHandler(promise, minimalWaitBetween, multipleMessage, + autoRead); + + final AbstractTrafficShapingHandler handler; + if (limitRead) { + if (globalLimit) { + handler = new GlobalTrafficShapingHandler(group, 0, bandwidthFactor * messageSize, check); + } else { + handler = new ChannelTrafficShapingHandler(0, bandwidthFactor * messageSize, check); + } + } else if (limitWrite) { + if (globalLimit) { + handler = new GlobalTrafficShapingHandler(group, bandwidthFactor * messageSize, 0, check); + } else { + handler = new ChannelTrafficShapingHandler(bandwidthFactor * messageSize, 0, check); + } + } else { + handler = null; + } + + sb.childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel c) throws Exception { + if (limitRead) { + if (additionalExecutor) { + c.pipeline().addLast(group, handler); + } else { + c.pipeline().addLast(handler); + } + } + if (additionalExecutor) { + c.pipeline().addLast(group, sh); + } else { + c.pipeline().addLast(sh); + } + } + }); + cb.handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel c) throws Exception { + if (limitWrite) { + if (additionalExecutor) { + c.pipeline().addLast(group, handler); + } else { + c.pipeline().addLast(handler); + } + } + if (additionalExecutor) { + c.pipeline().addLast(group, ch); + } else { + c.pipeline().addLast(ch); + } + } + }); + + Channel sc = sb.bind().sync().channel(); + Channel cc = cb.connect().sync().channel(); + + int totalNb = 0; + for (int i = 1; i < multipleMessage.length; i++) { + totalNb += multipleMessage[i]; + } + Long start = System.currentTimeMillis(); + int nb = multipleMessage[0]; + for (int i = 0; i < nb; i++) { + cc.write(cc.alloc().buffer().writeBytes(data)); + } + cc.flush(); + + promise.await(); + Long stop = System.currentTimeMillis(); + assertTrue("Error during exceution of TrafficShapping: " + promise.cause(), promise.isSuccess()); + + float average = (totalNb * messageSize) / (float) (stop - start); + logger.info("Average of traffic: " + average + " compare to " + bandwidthFactor); + sh.channel.close().sync(); + ch.channel.close().sync(); + sc.close().sync(); + if (autoRead != null) { + // for extra release call in AutoRead + Thread.sleep(minimalms); + } + + if (autoRead == null && minimalWaitBetween != null) { + assertTrue("Overall Traffic not ok since > " + maxfactor + ": " + average, + average <= maxfactor); + if (additionalExecutor) { + // Oio is not as good when using additionalExecutor + assertTrue("Overall Traffic not ok since < 0.25: " + average, average >= 0.25); + } else { + assertTrue("Overall Traffic not ok since < " + minfactor + ": " + average, + average >= minfactor); + } + } + if (handler != null && globalLimit) { + ((GlobalTrafficShapingHandler) handler).release(); + } + + if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) { + throw sh.exception.get(); + } + if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) { + throw ch.exception.get(); + } + if (sh.exception.get() != null) { + throw sh.exception.get(); + } + if (ch.exception.get() != null) { + throw ch.exception.get(); + } + } + + private static class ClientTrafficHandler extends SimpleChannelInboundHandler { + volatile Channel channel; + final AtomicReference exception = new AtomicReference(); + volatile int step; + // first message will always be validated + private long currentLastTime = System.currentTimeMillis(); + private final long[] minimalWaitBetween; + private final int[] multipleMessage; + private final int[] autoRead; + final Promise promise; + + ClientTrafficHandler(Promise promise, long[] minimalWaitBetween, int[] multipleMessage, + int[] autoRead) { + this.minimalWaitBetween = minimalWaitBetween; + this.multipleMessage = Arrays.copyOf(multipleMessage, multipleMessage.length); + this.promise = promise; + this.autoRead = autoRead; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + channel = ctx.channel(); + } + + @Override + public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + long lastTimestamp = 0; + while (in.isReadable()) { + lastTimestamp = in.readLong(); + multipleMessage[step]--; + } + if (multipleMessage[step] > 0) { + // still some message to get + return; + } + long minimalWait = (minimalWaitBetween != null) ? minimalWaitBetween[step] : 0; + int ar = 0; + if (autoRead != null) { + if (step > 0 && autoRead[step - 1] != 0) { + ar = autoRead[step - 1]; + if (ar > 0) { + minimalWait = -1; + } else { + minimalWait = minimalms / 3; + } + } else { + minimalWait = 0; + } + } + logger.info("Step: " + step + " Interval: " + (lastTimestamp - currentLastTime) + " compareTo " + + minimalWait + " (" + ar + ")"); + assertTrue("The interval of time is incorrect:" + (lastTimestamp - currentLastTime) + " not> " + + minimalWait, lastTimestamp - currentLastTime >= minimalWait); + currentLastTime = lastTimestamp; + step++; + if (multipleMessage.length > step) { + int nb = multipleMessage[step]; + for (int i = 0; i < nb; i++) { + channel.write(channel.alloc().buffer().writeBytes(data)); + } + channel.flush(); + } else { + promise.setSuccess(true); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + if (exception.compareAndSet(null, cause)) { + cause.printStackTrace(); + promise.setFailure(cause); + ctx.close(); + } + } + } + + private static class ValidTimestampedHandler extends SimpleChannelInboundHandler { + private final int[] autoRead; + private final int[] multipleMessage; + volatile Channel channel; + volatile int step; + final AtomicReference exception = new AtomicReference(); + + ValidTimestampedHandler(int[] autoRead, int[] multipleMessage) { + this.autoRead = autoRead; + this.multipleMessage = Arrays.copyOf(multipleMessage, multipleMessage.length); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + channel = ctx.channel(); + } + + @Override + public void channelRead0(final ChannelHandlerContext ctx, ByteBuf in) throws Exception { + byte[] actual = new byte[in.readableBytes()]; + in.readBytes(actual); + long timestamp = System.currentTimeMillis(); + int nb = actual.length / messageSize; + int isAutoRead = 0; + int laststep = step; + for (int i = 0; i < nb; i++) { + multipleMessage[step]--; + if (multipleMessage[step] == 0) { + if (autoRead != null) { + isAutoRead = autoRead[step]; + } + step++; + } + } + if (laststep != step && autoRead != null && isAutoRead != 2) { + if (isAutoRead != 0) { + logger.info("Set AutoRead: " + (isAutoRead > 0) + " Step: " + step); + channel.config().setAutoRead(isAutoRead > 0); + } else { + logger.info("AutoRead: NO Step:" + step); + } + } + logger.debug("Get: " + actual.length + " TS " + timestamp + " NB: " + nb); + for (int i = 0; i < nb; i++) { + channel.write(Unpooled.copyLong(timestamp)); + } + channel.flush(); + if (laststep != step && isAutoRead != 0) { + if (isAutoRead < 0) { + final int exactStep = step; + long wait = (isAutoRead == -1) ? minimalms : stepms + minimalms; + if (isAutoRead == -3) { + wait = stepms * 3; + } + executor.schedule(new Runnable() { + public void run() { + logger.info("Reset AutoRead: Step " + exactStep); + channel.config().setAutoRead(true); + } + }, wait, TimeUnit.MILLISECONDS); + } else { + if (isAutoRead > 1) { + logger.info("Will Set AutoRead: Rrue, Step: " + step); + executor.schedule(new Runnable() { + public void run() { + logger.info("Set AutoRead: Rrue, Step: " + step); + channel.config().setAutoRead(true); + } + }, stepms + minimalms, TimeUnit.MILLISECONDS); + } + } + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + if (exception.compareAndSet(null, cause)) { + cause.printStackTrace(); + ctx.close(); + } + } + } +}