From ef3c03001320f2c2f2deb0fbf22aa7c15d41f9f8 Mon Sep 17 00:00:00 2001 From: fredericBregier Date: Fri, 1 Aug 2014 11:26:33 +0200 Subject: [PATCH] [#2722] Improve Traffic Shaping Handling Motivation: Currently Traffic Shaping is using 1 timer only and could lead to "partial" wrong bandwidth computation when "short" time occurs between adding used bytes and when the TrafficCounter updates itself and finally when the traffic is computed. Indeed, the TrafficCounter is updated every x delay and it is at the same time saved into "lastXxxxBytes" and set to 0. Therefore, when one request the counter, it first updates the TrafficCounter with the added used bytes. If this value is set just before the TrafficCounter is updated, then the bandwidth computation will use the TrafficCounter with a "0" value (this value being reset once the delay occurs). Therefore, the traffic shaping computation is wrong in rare cases. Secondly the traffic shapping should avoid if possible the "Timeout" effect by not stopping reading or writing more than a maxTime, this maxTime being less than the TimeOut limit. Thirdly the traffic shapping in read had an issue since the readOp was not set but should, turning in no read blocking from socket point of view. (see #2696) Take into account setAutoRead(boolean) setting directly by the user in the program external to this handler. Modifications: The TrafficCounter has 2 new methods that compute the time to wait according to read or write) using in priority the currentXxxxBytes (as before), but could used (if current is at 0) the lastXxxxxBytes, and therefore having more chance to take into account the real traffic. Moreover the Handler could change the default "max time to wait", which is by default set to half of "standard" Time Out (30s:2 = 15s). Finally we add the setAutoRead(boolean) accordingly to the situation, as proposed in #2696 (the original pull request is in error for unknown reason so this merge). Result: The Traffic Shaping is better take into account (no 0 value when it shouldn't) and it tries to not block traffic more than Time Out event. Moreover the read is really stopped from socket point of view. This version is similar to #2388 and #2450. This version is for V4.0, and includes the #2696 pull request to ease the merge process. The test minimizes time check by reducing to 66ms steps (50s total). --- .../AbstractTrafficShapingHandler.java | 314 +++++++--- .../traffic/ChannelTrafficShapingHandler.java | 79 ++- .../traffic/GlobalTrafficShapingHandler.java | 106 +++- .../netty/handler/traffic/TrafficCounter.java | 222 +++++-- .../transport/socket/TrafficShapingTest.java | 588 ++++++++++++++++++ 5 files changed, 1172 insertions(+), 137 deletions(-) create mode 100644 testsuite/src/test/java/io/netty/testsuite/transport/socket/TrafficShapingTest.java diff --git a/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java index 3d96d43a75..4b8f03d53e 100644 --- a/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java +++ b/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java @@ -1,12 +1,9 @@ /* * Copyright 2011 The Netty Project - * * The Netty Project licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at: - * * http://www.apache.org/licenses/LICENSE-2.0 - * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -22,6 +19,8 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.util.Attribute; import io.netty.util.AttributeKey; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; import java.util.concurrent.TimeUnit; @@ -38,20 +37,29 @@ import java.util.concurrent.TimeUnit; * the read/write limit or the check interval, several methods allow that for you:
* */ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler { + private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractTrafficShapingHandler.class); + /** * Default delay between two checks: 1s */ public static final long DEFAULT_CHECK_INTERVAL = 1000; + /** + * Default max delay in case of traffic shaping + * (during which no communication will occur). + * Shall be less than TIMEOUT. Here half of "standard" 30s + */ + public static final long DEFAULT_MAX_TIME = 15000; + /** * Default minimal time to wait */ - private static final long MINIMAL_WAIT = 10; + static final long MINIMAL_WAIT = 10; /** * Traffic Counter @@ -68,19 +76,25 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler */ private long readLimit; + /** + * Max delay in wait + */ + protected long maxTime = DEFAULT_MAX_TIME; // default 15 s + /** * Delay between two performance snapshots */ protected long checkInterval = DEFAULT_CHECK_INTERVAL; // default 1 s - private static final AttributeKey READ_SUSPENDED = AttributeKey.valueOf( - AbstractTrafficShapingHandler.class.getName() + ".READ_SUSPENDED"); - private static final AttributeKey REOPEN_TASK = AttributeKey.valueOf( - AbstractTrafficShapingHandler.class.getName() + ".REOPEN_TASK"); + private static final AttributeKey READ_SUSPENDED = AttributeKey + .valueOf(AbstractTrafficShapingHandler.class.getName() + ".READ_SUSPENDED"); + private static final AttributeKey REOPEN_TASK = AttributeKey.valueOf(AbstractTrafficShapingHandler.class + .getName() + ".REOPEN_TASK"); /** * - * @param newTrafficCounter the TrafficCounter to set + * @param newTrafficCounter + * the TrafficCounter to set */ void setTrafficCounter(TrafficCounter newTrafficCounter) { trafficCounter = newTrafficCounter; @@ -88,59 +102,76 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler /** * @param writeLimit - * 0 or a limit in bytes/s + * 0 or a limit in bytes/s * @param readLimit - * 0 or a limit in bytes/s + * 0 or a limit in bytes/s * @param checkInterval - * The delay between two computations of performances for + * The delay between two computations of performances for * channels or 0 if no stats are to be computed + * @param maxTime + * The maximum delay to wait in case of traffic excess */ - protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, - long checkInterval) { + protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval, long maxTime) { this.writeLimit = writeLimit; this.readLimit = readLimit; this.checkInterval = checkInterval; + this.maxTime = maxTime; + } + + /** + * @param writeLimit + * 0 or a limit in bytes/s + * @param readLimit + * 0 or a limit in bytes/s + * @param checkInterval + * The delay between two computations of performances for + * channels or 0 if no stats are to be computed + */ + protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval) { + this(writeLimit, readLimit, checkInterval, DEFAULT_MAX_TIME); } /** * Constructor using default Check Interval * * @param writeLimit - * 0 or a limit in bytes/s + * 0 or a limit in bytes/s * @param readLimit - * 0 or a limit in bytes/s + * 0 or a limit in bytes/s */ protected AbstractTrafficShapingHandler(long writeLimit, long readLimit) { - this(writeLimit, readLimit, DEFAULT_CHECK_INTERVAL); + this(writeLimit, readLimit, DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME); } /** * Constructor using NO LIMIT and default Check Interval */ protected AbstractTrafficShapingHandler() { - this(0, 0, DEFAULT_CHECK_INTERVAL); + this(0, 0, DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME); } /** * Constructor using NO LIMIT * * @param checkInterval - * The delay between two computations of performances for + * The delay between two computations of performances for * channels or 0 if no stats are to be computed */ protected AbstractTrafficShapingHandler(long checkInterval) { - this(0, 0, checkInterval); + this(0, 0, checkInterval, DEFAULT_MAX_TIME); } /** * Change the underlying limitations and check interval. * - * @param newWriteLimit The new write limit (in bytes) - * @param newReadLimit The new read limit (in bytes) - * @param newCheckInterval The new check interval (in milliseconds) + * @param newWriteLimit + * The new write limit (in bytes) + * @param newReadLimit + * The new read limit (in bytes) + * @param newCheckInterval + * The new check interval (in milliseconds) */ - public void configure(long newWriteLimit, long newReadLimit, - long newCheckInterval) { + public void configure(long newWriteLimit, long newReadLimit, long newCheckInterval) { configure(newWriteLimit, newReadLimit); configure(newCheckInterval); } @@ -148,8 +179,10 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler /** * Change the underlying limitations. * - * @param newWriteLimit The new write limit (in bytes) - * @param newReadLimit The new read limit (in bytes) + * @param newWriteLimit + * The new write limit (in bytes) + * @param newReadLimit + * The new read limit (in bytes) */ public void configure(long newWriteLimit, long newReadLimit) { writeLimit = newWriteLimit; @@ -162,7 +195,8 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler /** * Change the check interval. * - * @param newCheckInterval The new check interval (in milliseconds) + * @param newCheckInterval + * The new check interval (in milliseconds) */ public void configure(long newCheckInterval) { checkInterval = newCheckInterval; @@ -171,6 +205,73 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler } } + /** + * @return the writeLimit + */ + public long getWriteLimit() { + return writeLimit; + } + + /** + * @param writeLimit the writeLimit to set + */ + public void setWriteLimit(long writeLimit) { + this.writeLimit = writeLimit; + if (trafficCounter != null) { + trafficCounter.resetAccounting(System.currentTimeMillis() + 1); + } + } + + /** + * @return the readLimit + */ + public long getReadLimit() { + return readLimit; + } + + /** + * @param readLimit the readLimit to set + */ + public void setReadLimit(long readLimit) { + this.readLimit = readLimit; + if (trafficCounter != null) { + trafficCounter.resetAccounting(System.currentTimeMillis() + 1); + } + } + + /** + * @return the checkInterval + */ + public long getCheckInterval() { + return checkInterval; + } + + /** + * @param checkInterval the checkInterval to set + */ + public void setCheckInterval(long checkInterval) { + this.checkInterval = checkInterval; + if (trafficCounter != null) { + trafficCounter.configure(checkInterval); + } + } + + /** + * + * @param maxTime + * Max delay in wait, shall be less than TIME OUT in related protocol + */ + public void setMaxTimeWait(long maxTime) { + this.maxTime = maxTime; + } + + /** + * @return the max delay in wait + */ + public long getMaxTimeWait() { + return maxTime; + } + /** * Called each time the accounting is computed from the TrafficCounters. * This method could be used for instance to implement almost real time accounting. @@ -178,7 +279,6 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler * @param counter * the TrafficCounter that computes its performance */ - @SuppressWarnings("unused") protected void doAccounting(TrafficCounter counter) { // NOOP by default } @@ -188,111 +288,128 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler */ private static final class ReopenReadTimerTask implements Runnable { final ChannelHandlerContext ctx; + ReopenReadTimerTask(ChannelHandlerContext ctx) { this.ctx = ctx; } - @Override public void run() { - ctx.attr(READ_SUSPENDED).set(false); - ctx.read(); + if (!ctx.channel().config().isAutoRead() && isHandlerActive(ctx)) { + // If AutoRead is False and Active is True, user make a direct setAutoRead(false) + // Then Just reset the status + if (logger.isDebugEnabled()) { + logger.debug("Channel:" + ctx.channel().hashCode() + + " Not Unsuspend: " + ctx.channel().config().isAutoRead() + ":" + isHandlerActive(ctx)); + } + ctx.attr(READ_SUSPENDED).set(false); + } else { + // Anything else allows the handler to reset the AutoRead + if (logger.isDebugEnabled()) { + if (ctx.channel().config().isAutoRead() && !isHandlerActive(ctx)) { + logger.debug("Channel:" + ctx.channel().hashCode() + + " Unsuspend: " + ctx.channel().config().isAutoRead() + ":" + isHandlerActive(ctx)); + } else { + logger.debug("Channel:" + ctx.channel().hashCode() + + " Normal Unsuspend: " + ctx.channel().config().isAutoRead() + ":" + + isHandlerActive(ctx)); + } + } + ctx.attr(READ_SUSPENDED).set(false); + ctx.channel().config().setAutoRead(true); + ctx.channel().read(); + } + if (logger.isDebugEnabled()) { + logger.debug("Channel:" + ctx.channel().hashCode() + + " Unsupsend final status => " + ctx.channel().config().isAutoRead() + ":" + + isHandlerActive(ctx)); + } } } - /** - * @return the time that should be necessary to wait to respect limit. Can be negative time - */ - private static long getTimeToWait(long limit, long bytes, long lastTime, long curtime) { - long interval = curtime - lastTime; - if (interval <= 0) { - // Time is too short, so just lets continue - return 0; - } - return (bytes * 1000 / limit - interval) / 10 * 10; - } - @Override public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { long size = calculateSize(msg); - long curtime = System.currentTimeMillis(); - - if (trafficCounter != null) { - trafficCounter.bytesRecvFlowControl(size); - if (readLimit == 0) { - // no action - ctx.fireChannelRead(msg); - - return; - } + if (size > 0 && trafficCounter != null) { // compute the number of ms to wait before reopening the channel - long wait = getTimeToWait(readLimit, - trafficCounter.currentReadBytes(), - trafficCounter.lastTime(), curtime); + long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime); if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal - // time in order to - // try to limit the traffic - if (!isSuspended(ctx)) { + // time in order to try to limit the traffic + // Only AutoRead AND HandlerActive True means Context Active + if (logger.isDebugEnabled()) { + logger.debug("Channel:" + ctx.channel().hashCode() + + " Read Suspend: " + wait + ":" + ctx.channel().config().isAutoRead() + ":" + + isHandlerActive(ctx)); + } + if (ctx.channel().config().isAutoRead() && isHandlerActive(ctx)) { + ctx.channel().config().setAutoRead(false); ctx.attr(READ_SUSPENDED).set(true); - // Create a Runnable to reactive the read if needed. If one was create before it will just be // reused to limit object creation - Attribute attr = ctx.attr(REOPEN_TASK); + Attribute attr = ctx.attr(REOPEN_TASK); Runnable reopenTask = attr.get(); if (reopenTask == null) { reopenTask = new ReopenReadTimerTask(ctx); attr.set(reopenTask); } - ctx.executor().schedule(reopenTask, wait, - TimeUnit.MILLISECONDS); + ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS); + if (logger.isDebugEnabled()) { + logger.debug("Channel:" + ctx.channel().hashCode() + + " Suspend final status => " + ctx.channel().config().isAutoRead() + ":" + + isHandlerActive(ctx) + + " will reopened at: " + wait); + } } } } ctx.fireChannelRead(msg); } - @Override - public void read(ChannelHandlerContext ctx) { - if (!isSuspended(ctx)) { - ctx.read(); - } + protected static boolean isHandlerActive(ChannelHandlerContext ctx) { + Boolean suspended = ctx.attr(READ_SUSPENDED).get(); + return suspended == null || Boolean.FALSE.equals(suspended); } - private static boolean isSuspended(ChannelHandlerContext ctx) { - Boolean suspended = ctx.attr(READ_SUSPENDED).get(); - return !(suspended == null || Boolean.FALSE.equals(suspended)); + @Override + public void read(ChannelHandlerContext ctx) { + if (isHandlerActive(ctx)) { + // For Global Traffic (and Read when using EventLoop in pipeline) : check if READ_SUSPENDED is False + ctx.read(); + } } @Override public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception { - long curtime = System.currentTimeMillis(); long size = calculateSize(msg); - if (size > -1 && trafficCounter != null) { - trafficCounter.bytesWriteFlowControl(size); - if (writeLimit == 0) { - ctx.write(msg, promise); - return; - } - // compute the number of ms to wait before continue with the - // channel - long wait = getTimeToWait(writeLimit, - trafficCounter.currentWrittenBytes(), - trafficCounter.lastTime(), curtime); + if (size > 0 && trafficCounter != null) { + // compute the number of ms to wait before continue with the channel + long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime); if (wait >= MINIMAL_WAIT) { - ctx.executor().schedule(new Runnable() { - @Override - public void run() { - ctx.write(msg, promise); - } - }, wait, TimeUnit.MILLISECONDS); + /* + * Option 2: but issue with ctx.executor().schedule() + * Thread.sleep(wait); + * System.out.println("Write unsuspended"); + * Option 1: use an ordered list of messages to send + * Warning of memory pressure! + */ + if (logger.isDebugEnabled()) { + logger.debug("Channel:" + ctx.channel().hashCode() + + " Write suspend: " + wait + ":" + ctx.channel().config().isAutoRead() + ":" + + isHandlerActive(ctx)); + } + submitWrite(ctx, msg, wait, promise); return; } } - ctx.write(msg, promise); + // to keep message order if not using option 2 + submitWrite(ctx, msg, 0, promise); } + protected abstract void submitWrite(final ChannelHandlerContext ctx, final Object msg, final long delay, + final ChannelPromise promise); + /** * * @return the current TrafficCounter (if @@ -304,17 +421,18 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler @Override public String toString() { - return "TrafficShaping with Write Limit: " + writeLimit + - " Read Limit: " + readLimit + " and Counter: " + - (trafficCounter != null? trafficCounter.toString() : "none"); + return "TrafficShaping with Write Limit: " + writeLimit + " Read Limit: " + readLimit + " and Counter: " + + (trafficCounter != null ? trafficCounter.toString() : "none"); } /** * Calculate the size of the given {@link Object}. * * This implementation supports {@link ByteBuf} and {@link ByteBufHolder}. Sub-classes may override this. - * @param msg the msg for which the size should be calculated - * @return size the size of the msg or {@code -1} if unknown. + * + * @param msg + * the msg for which the size should be calculated + * @return size the size of the msg or {@code -1} if unknown. */ protected long calculateSize(Object msg) { if (msg instanceof ByteBuf) { diff --git a/handler/src/main/java/io/netty/handler/traffic/ChannelTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/ChannelTrafficShapingHandler.java index f296f6f691..0e4d73893c 100644 --- a/handler/src/main/java/io/netty/handler/traffic/ChannelTrafficShapingHandler.java +++ b/handler/src/main/java/io/netty/handler/traffic/ChannelTrafficShapingHandler.java @@ -15,7 +15,13 @@ */ package io.netty.handler.traffic; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; /** * This implementation of the {@link AbstractTrafficShapingHandler} is for channel @@ -38,11 +44,32 @@ import io.netty.channel.ChannelHandlerContext; * it is recommended to set a positive value, even if it is high since the precision of the * Traffic Shaping depends on the period where the traffic is computed. The highest the interval, * the less precise the traffic shaping will be. It is suggested as higher value something close - * to 5 or 10 minutes.
+ * to 5 or 10 minutes.

+ * + * maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.
* *
*/ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler { + private List messagesQueue = new LinkedList(); + + /** + * Create a new instance + * + * @param writeLimit + * 0 or a limit in bytes/s + * @param readLimit + * 0 or a limit in bytes/s + * @param checkInterval + * The delay between two computations of performances for + * channels or 0 if no stats are to be computed + * @param maxTime + * The maximum delay to wait in case of traffic excess + */ + public ChannelTrafficShapingHandler(long writeLimit, long readLimit, + long checkInterval, long maxTime) { + super(writeLimit, readLimit, checkInterval, maxTime); + } /** * Create a new instance @@ -93,9 +120,57 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler } @Override - public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + public synchronized void handlerRemoved(ChannelHandlerContext ctx) throws Exception { if (trafficCounter != null) { trafficCounter.stop(); } + for (ToSend toSend : messagesQueue) { + if (toSend.toSend instanceof ByteBuf) { + ((ByteBuf) toSend.toSend).release(); + } + } + messagesQueue.clear(); + } + + private static final class ToSend { + final long date; + final Object toSend; + final ChannelPromise promise; + + private ToSend(final long delay, final Object toSend, final ChannelPromise promise) { + this.date = System.currentTimeMillis() + delay; + this.toSend = toSend; + this.promise = promise; + } + } + + @Override + protected synchronized void submitWrite(final ChannelHandlerContext ctx, final Object msg, final long delay, + final ChannelPromise promise) { + if (delay == 0 && messagesQueue.isEmpty()) { + ctx.write(msg, promise); + return; + } + final ToSend newToSend = new ToSend(delay, msg, promise); + messagesQueue.add(newToSend); + ctx.executor().schedule(new Runnable() { + @Override + public void run() { + sendAllValid(ctx); + } + }, delay, TimeUnit.MILLISECONDS); + } + + private synchronized void sendAllValid(ChannelHandlerContext ctx) { + while (!messagesQueue.isEmpty()) { + ToSend newToSend = messagesQueue.remove(0); + if (newToSend.date <= System.currentTimeMillis()) { + ctx.write(newToSend.toSend, newToSend.promise); + } else { + messagesQueue.add(0, newToSend); + break; + } + } + ctx.flush(); } } diff --git a/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java index d5fc40dce4..f6a7691636 100644 --- a/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java +++ b/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java @@ -15,10 +15,19 @@ */ package io.netty.handler.traffic; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelHandler.Sharable; import io.netty.util.concurrent.EventExecutor; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + /** * This implementation of the {@link AbstractTrafficShapingHandler} is for global @@ -43,7 +52,9 @@ import java.util.concurrent.ScheduledExecutorService; * it is recommended to set a positive value, even if it is high since the precision of the * Traffic Shaping depends on the period where the traffic is computed. The highest the interval, * the less precise the traffic shaping will be. It is suggested as higher value something close - * to 5 or 10 minutes.
+ * to 5 or 10 minutes.

+ * + * maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.
* *
* @@ -52,6 +63,8 @@ import java.util.concurrent.ScheduledExecutorService; */ @Sharable public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler { + private Map> messagesQueues = new HashMap>(); + /** * Create the global TrafficCounter */ @@ -65,6 +78,27 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler { tc.start(); } + /** + * Create a new instance + * + * @param executor + * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter} + * @param writeLimit + * 0 or a limit in bytes/s + * @param readLimit + * 0 or a limit in bytes/s + * @param checkInterval + * The delay between two computations of performances for + * channels or 0 if no stats are to be computed + * @param maxTime + * The maximum delay to wait in case of traffic excess + */ + public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit, long readLimit, + long checkInterval, long maxTime) { + super(writeLimit, readLimit, checkInterval, maxTime); + createGlobalTrafficCounter(executor); + } + /** * Create a new instance * @@ -132,4 +166,74 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler { trafficCounter.stop(); } } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + Integer key = ctx.channel().hashCode(); + List mq = new LinkedList(); + messagesQueues.put(key, mq); + } + + @Override + public synchronized void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + Integer key = ctx.channel().hashCode(); + List mq = messagesQueues.remove(key); + if (mq != null) { + for (ToSend toSend : mq) { + if (toSend.toSend instanceof ByteBuf) { + ((ByteBuf) toSend.toSend).release(); + } + } + mq.clear(); + } + } + + private static final class ToSend { + final long date; + final Object toSend; + final ChannelPromise promise; + + private ToSend(final long delay, final Object toSend, final ChannelPromise promise) { + this.date = System.currentTimeMillis() + delay; + this.toSend = toSend; + this.promise = promise; + } + } + + @Override + protected synchronized void submitWrite(final ChannelHandlerContext ctx, final Object msg, final long delay, + final ChannelPromise promise) { + Integer key = ctx.channel().hashCode(); + List messagesQueue = messagesQueues.get(key); + if (delay == 0 && (messagesQueue == null || messagesQueue.isEmpty())) { + ctx.write(msg, promise); + return; + } + final ToSend newToSend = new ToSend(delay, msg, promise); + if (messagesQueue == null) { + messagesQueue = new LinkedList(); + messagesQueues.put(key, messagesQueue); + } + messagesQueue.add(newToSend); + final List mqfinal = messagesQueue; + ctx.executor().schedule(new Runnable() { + @Override + public void run() { + sendAllValid(ctx, mqfinal); + } + }, delay, TimeUnit.MILLISECONDS); + } + + private synchronized void sendAllValid(final ChannelHandlerContext ctx, final List messagesQueue) { + while (!messagesQueue.isEmpty()) { + ToSend newToSend = messagesQueue.remove(0); + if (newToSend.date <= System.currentTimeMillis()) { + ctx.write(newToSend.toSend, newToSend.promise); + } else { + messagesQueue.add(0, newToSend); + break; + } + } + ctx.flush(); + } } diff --git a/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java b/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java index 5d6d873d25..dedd07e020 100644 --- a/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java +++ b/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java @@ -1,12 +1,9 @@ /* * Copyright 2012 The Netty Project - * * The Netty Project licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at: - * * http://www.apache.org/licenses/LICENSE-2.0 - * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -15,6 +12,9 @@ */ package io.netty.handler.traffic; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -24,15 +24,20 @@ import java.util.concurrent.atomic.AtomicLong; /** * TrafficCounter is associated with {@link AbstractTrafficShapingHandler}. * - *

A TrafficCounter counts the read and written bytes such that the - * {@link AbstractTrafficShapingHandler} can limit the traffic, globally or per channel.

+ *

+ * A TrafficCounter counts the read and written bytes such that the {@link AbstractTrafficShapingHandler} + * can limit the traffic, globally or per channel. + *

* - *

It computes the statistics for both read and written every {@link #checkInterval}, and calls - * back to its parent {@link AbstractTrafficShapingHandler#doAccounting} method. If the checkInterval - * is set to 0, no accounting will be done and statistics will only be computed at each receive or - * write operation.

+ *

+ * It computes the statistics for both read and written every {@link #checkInterval}, and calls back to its parent + * {@link AbstractTrafficShapingHandler#doAccounting} method. If the checkInterval is set to 0, no accounting will be + * done and statistics will only be computed at each receive or write operation. + *

*/ public class TrafficCounter { + private static final InternalLogger logger = InternalLoggerFactory.getInstance(TrafficCounter.class); + /** * Current written bytes */ @@ -83,11 +88,30 @@ public class TrafficCounter { */ private long lastReadBytes; + /** + * Last non 0 written bytes number during last check interval + */ + private long lastNonNullWrittenBytes; + + /** + * Last time written bytes with non 0 written bytes + */ + private long lastNonNullWrittenTime; + + /** + * Last time read bytes with non 0 written bytes + */ + private long lastNonNullReadTime; + + /** + * Last non 0 read bytes number during last check interval + */ + private long lastNonNullReadBytes; + /** * Delay between two captures */ - final AtomicLong checkInterval = new AtomicLong( - AbstractTrafficShapingHandler.DEFAULT_CHECK_INTERVAL); + final AtomicLong checkInterval = new AtomicLong(AbstractTrafficShapingHandler.DEFAULT_CHECK_INTERVAL); // default 1 s @@ -135,12 +159,12 @@ public class TrafficCounter { private final TrafficCounter counter; /** - * @param trafficShapingHandler The parent handler to which this task needs to callback to for accounting - * @param counter The parent TrafficCounter that we need to reset the statistics for + * @param trafficShapingHandler + * The parent handler to which this task needs to callback to for accounting + * @param counter + * The parent TrafficCounter that we need to reset the statistics for */ - protected TrafficMonitoringTask( - AbstractTrafficShapingHandler trafficShapingHandler, - TrafficCounter counter) { + protected TrafficMonitoringTask(AbstractTrafficShapingHandler trafficShapingHandler, TrafficCounter counter) { trafficShapingHandler1 = trafficShapingHandler; this.counter = counter; } @@ -156,7 +180,7 @@ public class TrafficCounter { trafficShapingHandler1.doAccounting(counter); } counter.scheduledFuture = counter.executor.schedule(this, counter.checkInterval.get(), - TimeUnit.MILLISECONDS); + TimeUnit.MILLISECONDS); } } @@ -171,8 +195,7 @@ public class TrafficCounter { if (checkInterval.get() > 0) { monitorActive.set(true); monitor = new TrafficMonitoringTask(trafficShapingHandler, this); - scheduledFuture = - executor.schedule(monitor, checkInterval.get(), TimeUnit.MILLISECONDS); + scheduledFuture = executor.schedule(monitor, checkInterval.get(), TimeUnit.MILLISECONDS); } } @@ -196,7 +219,8 @@ public class TrafficCounter { /** * Reset the accounting on Read and Write * - * @param newLastTime the millisecond unix timestamp that we should be considered up-to-date for + * @param newLastTime + * the millisecond unix timestamp that we should be considered up-to-date for */ synchronized void resetAccounting(long newLastTime) { long interval = newLastTime - lastTime.getAndSet(newLastTime); @@ -204,24 +228,40 @@ public class TrafficCounter { // nothing to do return; } + if (logger.isDebugEnabled() && interval > 2 * checkInterval()) { + logger.debug("Acct schedule not ok: " + interval + " > 2*" + checkInterval() + " from " + name); + } lastReadBytes = currentReadBytes.getAndSet(0); lastWrittenBytes = currentWrittenBytes.getAndSet(0); lastReadThroughput = lastReadBytes / interval * 1000; // nb byte / checkInterval in ms * 1000 (1s) lastWriteThroughput = lastWrittenBytes / interval * 1000; // nb byte / checkInterval in ms * 1000 (1s) + if (lastWrittenBytes > 0) { + lastNonNullWrittenBytes = lastWrittenBytes; + lastNonNullWrittenTime = newLastTime; + } + if (lastReadBytes > 0) { + lastNonNullReadBytes = lastReadBytes; + lastNonNullReadTime = newLastTime; + } } /** * Constructor with the {@link AbstractTrafficShapingHandler} that hosts it, the Timer to use, its * name, the checkInterval between two computations in millisecond - * @param trafficShapingHandler the associated AbstractTrafficShapingHandler - * @param executor the underlying executor service for scheduling checks - * @param name the name given to this monitor - * @param checkInterval the checkInterval in millisecond between two computations + * + * @param trafficShapingHandler + * the associated AbstractTrafficShapingHandler + * @param executor + * the underlying executor service for scheduling checks + * @param name + * the name given to this monitor + * @param checkInterval + * the checkInterval in millisecond between two computations */ - public TrafficCounter(AbstractTrafficShapingHandler trafficShapingHandler, - ScheduledExecutorService executor, String name, long checkInterval) { + public TrafficCounter(AbstractTrafficShapingHandler trafficShapingHandler, ScheduledExecutorService executor, + String name, long checkInterval) { this.trafficShapingHandler = trafficShapingHandler; this.executor = executor; this.name = name; @@ -232,7 +272,8 @@ public class TrafficCounter { /** * Change checkInterval between two computations in millisecond * - * @param newcheckInterval The new check interval (in milliseconds) + * @param newcheckInterval + * The new check interval (in milliseconds) */ public void configure(long newcheckInterval) { long newInterval = newcheckInterval / 10 * 10; @@ -313,9 +354,9 @@ public class TrafficCounter { } /** - * - * @return the current number of bytes read since the last checkInterval - */ + * + * @return the current number of bytes read since the last checkInterval + */ public long currentReadBytes() { return currentReadBytes.get(); } @@ -351,7 +392,7 @@ public class TrafficCounter { /** * @return the lastCumulativeTime in millisecond as of System.currentTimeMillis() - * when the cumulative counters were reset to 0. + * when the cumulative counters were reset to 0. */ public long lastCumulativeTime() { return lastCumulativeTime; @@ -373,15 +414,124 @@ public class TrafficCounter { return name; } + /** + * Returns the time to wait (if any) for the given length message, using the given limitTraffic and the max wait + * time + * + * @param size + * the recv size + * @param limitTraffic + * the traffic limit in bytes per second + * @param maxTime + * the max time in ms to wait in case of excess of traffic + * @return the current time to wait (in ms) if needed for Read operation + */ + public synchronized long readTimeToWait(final long size, final long limitTraffic, final long maxTime) { + final long now = System.currentTimeMillis(); + bytesRecvFlowControl(size); + if (limitTraffic == 0) { + return 0; + } + long sum = currentReadBytes.get(); + long interval = now - lastTime.get(); + // Short time checking + if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT && sum > 0) { + long time = (sum * 1000 / limitTraffic - interval) / 10 * 10; + if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) { + if (logger.isDebugEnabled()) { + logger.debug("Time: " + time + ":" + sum + ":" + interval); + } + return time > maxTime ? maxTime : time; + } + return 0; + } + // long time checking + if (lastNonNullReadBytes > 0 && lastNonNullReadTime + AbstractTrafficShapingHandler.MINIMAL_WAIT < now) { + long lastsum = sum + lastNonNullReadBytes; + long lastinterval = now - lastNonNullReadTime; + long time = (lastsum * 1000 / limitTraffic - lastinterval) / 10 * 10; + if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) { + if (logger.isDebugEnabled()) { + logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval); + } + return time > maxTime ? maxTime : time; + } + } else { + // final "middle" time checking in case resetAccounting called very recently + sum += lastReadBytes; + long lastinterval = AbstractTrafficShapingHandler.MINIMAL_WAIT; + long time = (sum * 1000 / limitTraffic - lastinterval) / 10 * 10; + if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) { + if (logger.isDebugEnabled()) { + logger.debug("Time: " + time + ":" + sum + ":" + lastinterval); + } + return time > maxTime ? maxTime : time; + } + } + return 0; + } + + /** + * Returns the time to wait (if any) for the given length message, using the given limitTraffic and + * the max wait time + * + * @param size + * the write size + * @param limitTraffic + * the traffic limit in bytes per second + * @param maxTime + * the max time in ms to wait in case of excess of traffic + * @return the current time to wait (in ms) if needed for Write operation + */ + public synchronized long writeTimeToWait(final long size, final long limitTraffic, final long maxTime) { + bytesWriteFlowControl(size); + if (limitTraffic == 0) { + return 0; + } + long sum = currentWrittenBytes.get(); + final long now = System.currentTimeMillis(); + long interval = now - lastTime.get(); + if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT && sum > 0) { + long time = (sum * 1000 / limitTraffic - interval) / 10 * 10; + if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) { + if (logger.isDebugEnabled()) { + logger.debug("Time: " + time + ":" + sum + ":" + interval); + } + return time > maxTime ? maxTime : time; + } + return 0; + } + if (lastNonNullWrittenBytes > 0 && lastNonNullWrittenTime + AbstractTrafficShapingHandler.MINIMAL_WAIT < now) { + long lastsum = sum + lastNonNullWrittenBytes; + long lastinterval = now - lastNonNullWrittenTime; + long time = (lastsum * 1000 / limitTraffic - lastinterval) / 10 * 10; + if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) { + if (logger.isDebugEnabled()) { + logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval); + } + return time > maxTime ? maxTime : time; + } + } else { + sum += lastWrittenBytes; + long lastinterval = AbstractTrafficShapingHandler.MINIMAL_WAIT + Math.abs(interval); + long time = (sum * 1000 / limitTraffic - lastinterval) / 10 * 10; + if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) { + if (logger.isDebugEnabled()) { + logger.debug("Time: " + time + ":" + sum + ":" + lastinterval); + } + return time > maxTime ? maxTime : time; + } + } + return 0; + } + /** * String information */ @Override public String toString() { - return "Monitor " + name + " Current Speed Read: " + - (lastReadThroughput >> 10) + " KB/s, Write: " + - (lastWriteThroughput >> 10) + " KB/s Current Read: " + - (currentReadBytes.get() >> 10) + " KB Current Write: " + - (currentWrittenBytes.get() >> 10) + " KB"; + return "Monitor " + name + " Current Speed Read: " + (lastReadThroughput >> 10) + " KB/s, Write: " + + (lastWriteThroughput >> 10) + " KB/s Current Read: " + (currentReadBytes.get() >> 10) + + " KB Current Write: " + (currentWrittenBytes.get() >> 10) + " KB"; } } diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/TrafficShapingTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/TrafficShapingTest.java new file mode 100644 index 0000000000..01c4d7178c --- /dev/null +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/TrafficShapingTest.java @@ -0,0 +1,588 @@ +/* + * Copyright 2012 The Netty Project + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport.socket; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.traffic.AbstractTrafficShapingHandler; +import io.netty.handler.traffic.ChannelTrafficShapingHandler; +import io.netty.handler.traffic.GlobalTrafficShapingHandler; +import io.netty.util.concurrent.DefaultEventExecutorGroup; +import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.Promise; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; +import io.netty.util.internal.logging.Slf4JLoggerFactory; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.LoggerFactory; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.*; + +public class TrafficShapingTest extends AbstractSocketTest { + private static final InternalLogger logger = InternalLoggerFactory.getInstance(TrafficShapingTest.class); + private static final InternalLogger loggerServer = InternalLoggerFactory.getInstance(ValidTimestampedHandler.class); + private static final InternalLogger loggerClient = InternalLoggerFactory.getInstance(ClientTrafficHandler.class); + + static final int messageSize = 1024; + static final int bandwidthFactor = 15; + static final int minfactor = bandwidthFactor - (bandwidthFactor / 2); + static final int maxfactor = bandwidthFactor + (bandwidthFactor / 2); + static final long stepms = 1000 / bandwidthFactor; + static final long minimalms = Math.max(stepms / 2, 20) / 10 * 10; + static final long check = Math.max(Math.min(100, minimalms / 2) / 10 * 10, 20); + private static final Random random = new Random(); + static final byte[] data = new byte[messageSize]; + + private static final String TRAFFIC = "traffic"; + + private static EventExecutorGroup group; + private static ScheduledExecutorService executor = Executors.newScheduledThreadPool(10); + static { + random.nextBytes(data); + } + + @BeforeClass + public static void createGroup() { + logger.info("Bandwidth: " + minfactor + " <= " + bandwidthFactor + " <= " + maxfactor + + " StepMs: " + stepms + " MinMs: " + minimalms + " CheckMs: " + check); + InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory()); + Logger logger = (Logger) LoggerFactory.getLogger("ROOT"); + logger.setLevel(Level.INFO); + group = new DefaultEventExecutorGroup(8); + } + + @AfterClass + public static void destroyGroup() throws Exception { + group.shutdownGracefully().sync(); + } + + private static long[] computeWaitRead(int[] multipleMessage) { + long[] minimalWaitBetween = new long[multipleMessage.length + 1]; + minimalWaitBetween[0] = 0; + for (int i = 0; i < multipleMessage.length; i++) { + minimalWaitBetween[i + 1] = (multipleMessage[i] - 1) * stepms + minimalms; + } + return minimalWaitBetween; + } + + private static long[] computeWaitWrite(int[] multipleMessage) { + long[] minimalWaitBetween = new long[multipleMessage.length + 1]; + for (int i = 0; i < multipleMessage.length; i++) { + minimalWaitBetween[i] = (multipleMessage[i] - 1) * stepms + minimalms; + } + return minimalWaitBetween; + } + + @Test(timeout = 10000) + public void testNoTrafficShapping() throws Throwable { + logger.info("TEST NO TRAFFIC"); + run(); + } + + public void testNoTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = null; + int[] multipleMessage = { 1, 2, 1 }; + long[] minimalWaitBetween = null; + testTrafficShapping0(sb, cb, false, false, false, false, autoRead, minimalWaitBetween, multipleMessage); + } + + @Test(timeout = 20000) + public void testExecNoTrafficShapping() throws Throwable { + logger.info("TEST EXEC NO TRAFFIC"); + run(); + } + + public void testExecNoTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = null; + int[] multipleMessage = { 1, 2, 1 }; + long[] minimalWaitBetween = null; + testTrafficShapping0(sb, cb, true, false, false, false, autoRead, minimalWaitBetween, multipleMessage); + } + + @Test(timeout = 10000) + public void testWriteTrafficShapping() throws Throwable { + logger.info("TEST WRITE"); + run(); + } + + public void testWriteTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = null; + int[] multipleMessage = { 1, 2, 1 }; + long[] minimalWaitBetween = computeWaitWrite(multipleMessage); + testTrafficShapping0(sb, cb, false, false, true, false, autoRead, minimalWaitBetween, multipleMessage); + } + + @Test(timeout = 10000) + public void testReadTrafficShapping() throws Throwable { + logger.info("TEST READ"); + run(); + } + + public void testReadTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = null; + int[] multipleMessage = { 1, 2, 1, 1 }; + long[] minimalWaitBetween = computeWaitRead(multipleMessage); + testTrafficShapping0(sb, cb, false, true, false, false, autoRead, minimalWaitBetween, multipleMessage); + } + + @Test(timeout = 10000) + public void testWrite1TrafficShapping() throws Throwable { + logger.info("TEST WRITE"); + run(); + } + + public void testWrite1TrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = null; + int[] multipleMessage = { 1, 1, 1 }; + long[] minimalWaitBetween = computeWaitWrite(multipleMessage); + testTrafficShapping0(sb, cb, false, false, true, false, autoRead, minimalWaitBetween, multipleMessage); + } + + @Test(timeout = 10000) + public void testRead1TrafficShapping() throws Throwable { + logger.info("TEST READ"); + run(); + } + + public void testRead1TrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = null; + int[] multipleMessage = { 1, 1, 1 }; + long[] minimalWaitBetween = computeWaitRead(multipleMessage); + testTrafficShapping0(sb, cb, false, true, false, false, autoRead, minimalWaitBetween, multipleMessage); + } + + @Test(timeout = 20000) + public void testExecWriteTrafficShapping() throws Throwable { + logger.info("TEST EXEC WRITE"); + run(); + } + + public void testExecWriteTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = null; + int[] multipleMessage = { 1, 2, 1 }; + long[] minimalWaitBetween = computeWaitWrite(multipleMessage); + testTrafficShapping0(sb, cb, true, false, true, false, autoRead, minimalWaitBetween, multipleMessage); + } + + @Test(timeout = 10000) + public void testExecReadTrafficShapping() throws Throwable { + logger.info("TEST EXEC READ"); + run(); + } + + public void testExecReadTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = null; + int[] multipleMessage = { 1, 2, 1, 1 }; + long[] minimalWaitBetween = computeWaitRead(multipleMessage); + testTrafficShapping0(sb, cb, true, true, false, false, autoRead, minimalWaitBetween, multipleMessage); + } + + @Test(timeout = 10000) + public void testWriteGlobalTrafficShapping() throws Throwable { + logger.info("TEST GLOBAL WRITE"); + run(); + } + + public void testWriteGlobalTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = null; + int[] multipleMessage = { 1, 2, 1 }; + long[] minimalWaitBetween = computeWaitWrite(multipleMessage); + testTrafficShapping0(sb, cb, false, false, true, true, autoRead, minimalWaitBetween, multipleMessage); + } + + @Test(timeout = 10000) + public void testReadGlobalTrafficShapping() throws Throwable { + logger.info("TEST GLOBAL READ"); + run(); + } + + public void testReadGlobalTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = null; + int[] multipleMessage = { 1, 2, 1, 1 }; + long[] minimalWaitBetween = computeWaitRead(multipleMessage); + testTrafficShapping0(sb, cb, false, true, false, true, autoRead, minimalWaitBetween, multipleMessage); + } + + @Test(timeout = 10000) + public void testAutoReadTrafficShapping() throws Throwable { + logger.info("TEST AUTO READ"); + run(); + } + + public void testAutoReadTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 }; + int[] multipleMessage = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }; + long[] minimalWaitBetween = computeWaitRead(multipleMessage); + testTrafficShapping0(sb, cb, false, true, false, false, autoRead, minimalWaitBetween, multipleMessage); + } + @Test(timeout = 10000) + public void testAutoReadGlobalTrafficShapping() throws Throwable { + logger.info("TEST AUTO READ GLOBAL"); + run(); + } + + public void testAutoReadGlobalTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 }; + int[] multipleMessage = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }; + long[] minimalWaitBetween = computeWaitRead(multipleMessage); + testTrafficShapping0(sb, cb, false, true, false, true, autoRead, minimalWaitBetween, multipleMessage); + } + @Test(timeout = 10000) + public void testAutoReadExecTrafficShapping() throws Throwable { + logger.info("TEST AUTO READ EXEC"); + run(); + } + + public void testAutoReadExecTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 }; + int[] multipleMessage = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }; + long[] minimalWaitBetween = computeWaitRead(multipleMessage); + testTrafficShapping0(sb, cb, true, true, false, false, autoRead, minimalWaitBetween, multipleMessage); + } + @Test(timeout = 10000) + public void testAutoReadExecGlobalTrafficShapping() throws Throwable { + logger.info("TEST AUTO READ EXEC GLOBAL"); + run(); + } + + public void testAutoReadExecGlobalTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable { + int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 }; + int[] multipleMessage = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }; + long[] minimalWaitBetween = computeWaitRead(multipleMessage); + testTrafficShapping0(sb, cb, true, true, false, true, autoRead, minimalWaitBetween, multipleMessage); + } + + /** + * + * @param sb + * @param cb + * @param additionalExecutor + * shall the pipeline add the handler using an additionnal executor + * @param limitRead + * True to set Read Limit on Server side + * @param limitWrite + * True to set Write Limit on Client side + * @param globalLimit + * True to change Channel to Global TrafficShapping + * @param autoRead + * @param minimalWaitBetween + * time in ms that should be waited before getting the final result (note: for READ the values are + * right shifted once, the first value being 0) + * @param multipleMessage + * how many message to send at each step (for READ: the first should be 1, as the two last steps to + * ensure correct testing) + * @throws Throwable + */ + private static void testTrafficShapping0(ServerBootstrap sb, Bootstrap cb, final boolean additionalExecutor, + final boolean limitRead, final boolean limitWrite, final boolean globalLimit, int[] autoRead, + long[] minimalWaitBetween, int[] multipleMessage) throws Throwable { + logger.info("Exec: " + additionalExecutor + " Read: " + limitRead + " Write: " + limitWrite + " Global: " + + globalLimit); + final ValidTimestampedHandler sh = new ValidTimestampedHandler(autoRead, multipleMessage); + Promise promise = group.next().newPromise(); + final ClientTrafficHandler ch = new ClientTrafficHandler(promise, minimalWaitBetween, multipleMessage, + autoRead); + + final AbstractTrafficShapingHandler handler; + if (limitRead) { + if (globalLimit) { + handler = new GlobalTrafficShapingHandler(group, 0, bandwidthFactor * messageSize, check); + } else { + handler = new ChannelTrafficShapingHandler(0, bandwidthFactor * messageSize, check); + } + } else if (limitWrite) { + if (globalLimit) { + handler = new GlobalTrafficShapingHandler(group, bandwidthFactor * messageSize, 0, check); + } else { + handler = new ChannelTrafficShapingHandler(bandwidthFactor * messageSize, 0, check); + } + } else { + handler = null; + } + + sb.childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel c) throws Exception { + if (limitRead) { + if (additionalExecutor) { + c.pipeline().addLast(group, TRAFFIC, handler); + } else { + c.pipeline().addLast(TRAFFIC, handler); + } + } + if (additionalExecutor) { + c.pipeline().addLast(group, sh); + } else { + c.pipeline().addLast(sh); + } + } + }); + cb.handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel c) throws Exception { + if (limitWrite) { + if (additionalExecutor) { + c.pipeline().addLast(group, TRAFFIC, handler); + } else { + c.pipeline().addLast(TRAFFIC, handler); + } + } + if (additionalExecutor) { + c.pipeline().addLast(group, ch); + } else { + c.pipeline().addLast(ch); + } + } + }); + + Channel sc = sb.bind().sync().channel(); + Channel cc = cb.connect().sync().channel(); + + int totalNb = 0; + for (int i = 1; i < multipleMessage.length; i++) { + totalNb += multipleMessage[i]; + } + Long start = System.currentTimeMillis(); + int nb = multipleMessage[0]; + for (int i = 0; i < nb; i++) { + cc.write(cc.alloc().buffer().writeBytes(data)); + } + cc.flush(); + + promise.await(); + Long stop = System.currentTimeMillis(); + assertTrue("Error during exceution of TrafficShapping: " + promise.cause(), promise.isSuccess()); + + float average = (totalNb * messageSize) / (float) (stop - start); + logger.info("Average of traffic: " + average + " compare to " + bandwidthFactor); + sh.channel.close().sync(); + ch.channel.close().sync(); + sc.close().sync(); + if (autoRead != null) { + // for extra release call in AutoRead + Thread.sleep(minimalms); + } + + if (autoRead == null && minimalWaitBetween != null) { + assertTrue("Overall Traffic not ok since > " + maxfactor + ": " + average, + average <= maxfactor); + if (additionalExecutor) { + // Oio is not as good when using additionalExecutor + assertTrue("Overall Traffic not ok since < 0.25: " + average, average >= 0.25); + } else { + assertTrue("Overall Traffic not ok since < " + minfactor + ": " + average, + average >= minfactor); + } + } + if (handler != null && globalLimit) { + ((GlobalTrafficShapingHandler) handler).release(); + } + + if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) { + throw sh.exception.get(); + } + if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) { + throw ch.exception.get(); + } + if (sh.exception.get() != null) { + throw sh.exception.get(); + } + if (ch.exception.get() != null) { + throw ch.exception.get(); + } + } + + private static class ClientTrafficHandler extends SimpleChannelInboundHandler { + volatile Channel channel; + final AtomicReference exception = new AtomicReference(); + volatile int step; + // first message will always be validated + private long currentLastTime = System.currentTimeMillis(); + private final long[] minimalWaitBetween; + private final int[] multipleMessage; + private final int[] autoRead; + final Promise promise; + + ClientTrafficHandler(Promise promise, long[] minimalWaitBetween, int[] multipleMessage, + int[] autoRead) { + this.minimalWaitBetween = minimalWaitBetween; + this.multipleMessage = Arrays.copyOf(multipleMessage, multipleMessage.length); + this.promise = promise; + this.autoRead = autoRead; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + channel = ctx.channel(); + } + + @Override + public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + long lastTimestamp = 0; + while (in.isReadable()) { + lastTimestamp = in.readLong(); + multipleMessage[step]--; + } + if (multipleMessage[step] > 0) { + // still some message to get + return; + } + long minimalWait = (minimalWaitBetween != null) ? minimalWaitBetween[step] : 0; + int ar = 0; + if (autoRead != null) { + if (step > 0 && autoRead[step - 1] != 0) { + ar = autoRead[step - 1]; + if (ar > 0) { + minimalWait = -1; + } else { + minimalWait = minimalms / 3; + } + } else { + minimalWait = 0; + } + } + loggerClient.info("Step: " + step + " Interval: " + (lastTimestamp - currentLastTime) + " compareTo " + + minimalWait + " (" + ar + ")"); + assertTrue("The interval of time is incorrect:" + (lastTimestamp - currentLastTime) + " not> " + + minimalWait, lastTimestamp - currentLastTime >= minimalWait); + currentLastTime = lastTimestamp; + step++; + if (multipleMessage.length > step) { + int nb = multipleMessage[step]; + for (int i = 0; i < nb; i++) { + channel.write(channel.alloc().buffer().writeBytes(data)); + } + channel.flush(); + } else { + promise.setSuccess(true); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + if (exception.compareAndSet(null, cause)) { + cause.printStackTrace(); + promise.setFailure(cause); + ctx.close(); + } + } + } + + private static class ValidTimestampedHandler extends SimpleChannelInboundHandler { + private final int[] autoRead; + private final int[] multipleMessage; + volatile Channel channel; + volatile int step; + final AtomicReference exception = new AtomicReference(); + + ValidTimestampedHandler(int[] autoRead, int[] multipleMessage) { + this.autoRead = autoRead; + this.multipleMessage = Arrays.copyOf(multipleMessage, multipleMessage.length); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + channel = ctx.channel(); + } + + @Override + public void channelRead0(final ChannelHandlerContext ctx, ByteBuf in) throws Exception { + byte[] actual = new byte[in.readableBytes()]; + in.readBytes(actual); + long timestamp = System.currentTimeMillis(); + int nb = actual.length / messageSize; + int isAutoRead = 0; + int laststep = step; + for (int i = 0; i < nb; i++) { + multipleMessage[step]--; + if (multipleMessage[step] == 0) { + if (autoRead != null) { + isAutoRead = autoRead[step]; + } + step++; + } + } + if (laststep != step) { + if (autoRead != null && isAutoRead != 2) { + if (isAutoRead != 0) { + loggerServer.info("Set AutoRead: " + (isAutoRead > 0) + " Step: " + step); + channel.config().setAutoRead(isAutoRead > 0); + } else { + loggerServer.info("AutoRead: NO Step:" + step); + } + } + } + loggerServer.debug("Step: " + step + " Get: " + actual.length + " TS " + timestamp + " NB: " + nb); + for (int i = 0; i < nb; i++) { + channel.write(Unpooled.copyLong(timestamp)); + } + channel.flush(); + if (laststep != step) { + if (isAutoRead != 0) { + if (isAutoRead < 0) { + final int exactStep = step; + long wait = (isAutoRead == -1) ? minimalms : stepms + minimalms; + if (isAutoRead == -3) { + wait = stepms * 3; + } + executor.schedule(new Runnable() { + public void run() { + loggerServer.info("Reset AutoRead: Step " + exactStep); + channel.config().setAutoRead(true); + } + }, wait, TimeUnit.MILLISECONDS); + } else { + if (isAutoRead > 1) { + loggerServer.info("Will Set AutoRead: Rrue, Step: " + step); + executor.schedule(new Runnable() { + public void run() { + loggerServer.info("Set AutoRead: Rrue, Step: " + step); + channel.config().setAutoRead(true); + } + }, stepms + minimalms, TimeUnit.MILLISECONDS); + } + } + } + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + if (exception.compareAndSet(null, cause)) { + cause.printStackTrace(); + ctx.close(); + } + } + } +}