[#2722] Improve Traffic Shaping Handling

Motivation:
Currently Traffic Shaping is using 1 timer only and could lead to
"partial" wrong bandwidth computation when "short" time occurs between
adding used bytes and when the TrafficCounter updates itself and finally
when the traffic is computed.
Indeed, the TrafficCounter is updated every x delay and it is at the
same time saved into "lastXxxxBytes" and set to 0. Therefore, when one
request the counter, it first updates the TrafficCounter with the added
used bytes. If this value is set just before the TrafficCounter is
updated, then the bandwidth computation will use the TrafficCounter with
a "0" value (this value being reset once the delay occurs). Therefore,
the traffic shaping computation is wrong in rare cases.

Secondly the traffic shapping should avoid if possible the "Timeout"
effect by not stopping reading or writing more than a maxTime, this
maxTime being less than the TimeOut limit.

Thirdly the traffic shapping in read had an issue since the readOp was
not set but should, turning in no read blocking from socket point of
view. (see #2696)

Take into account setAutoRead(boolean) setting directly
by the user in the program external to this handler.

Modifications:
The TrafficCounter has 2 new methods that compute the time to wait
according to read or write) using in priority the currentXxxxBytes (as
before), but could used (if current is at 0) the lastXxxxxBytes, and
therefore having more chance to take into account the real traffic.

Moreover the Handler could change the default "max time to wait", which
is by default set to half of "standard" Time Out (30s:2 = 15s).

Finally we add the setAutoRead(boolean) accordingly to the situation, as
proposed in #2696 (the original pull request is in error for unknown
reason so this merge).

Result:
The Traffic Shaping is better take into account (no 0 value when it
shouldn't) and it tries to not block traffic more than Time Out event.

Moreover the read is really stopped from socket point of view.

This version is similar to #2388 and #2450.
This version is for V4.0, and includes the #2696 pull request to ease
the merge process.

The test minimizes time check by reducing to 66ms steps (50s total).
This commit is contained in:
fredericBregier 2014-08-01 11:26:33 +02:00 committed by fbregier
parent 8f019ae4fa
commit ef3c030013
5 changed files with 1172 additions and 137 deletions

View File

@ -1,12 +1,9 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@ -22,6 +19,8 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.concurrent.TimeUnit;
@ -38,20 +37,29 @@ import java.util.concurrent.TimeUnit;
* the read/write limit or the check interval, several methods allow that for you:<br>
* <ul>
* <li><tt>configure</tt> allows you to change read or write limits, or the checkInterval</li>
* <li><tt>getTrafficCounter</tt> allows you to have access to the TrafficCounter and so to stop
* or start the monitoring, to change the checkInterval directly, or to have access to its values.</li>
* <li><tt>getTrafficCounter</tt> allows you to have access to the TrafficCounter and so to stop or start the
* monitoring, to change the checkInterval directly, or to have access to its values.</li>
* </ul>
*/
public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractTrafficShapingHandler.class);
/**
* Default delay between two checks: 1s
*/
public static final long DEFAULT_CHECK_INTERVAL = 1000;
/**
* Default max delay in case of traffic shaping
* (during which no communication will occur).
* Shall be less than TIMEOUT. Here half of "standard" 30s
*/
public static final long DEFAULT_MAX_TIME = 15000;
/**
* Default minimal time to wait
*/
private static final long MINIMAL_WAIT = 10;
static final long MINIMAL_WAIT = 10;
/**
* Traffic Counter
@ -68,19 +76,25 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
*/
private long readLimit;
/**
* Max delay in wait
*/
protected long maxTime = DEFAULT_MAX_TIME; // default 15 s
/**
* Delay between two performance snapshots
*/
protected long checkInterval = DEFAULT_CHECK_INTERVAL; // default 1 s
private static final AttributeKey<Boolean> READ_SUSPENDED = AttributeKey.valueOf(
AbstractTrafficShapingHandler.class.getName() + ".READ_SUSPENDED");
private static final AttributeKey<Runnable> REOPEN_TASK = AttributeKey.valueOf(
AbstractTrafficShapingHandler.class.getName() + ".REOPEN_TASK");
private static final AttributeKey<Boolean> READ_SUSPENDED = AttributeKey
.valueOf(AbstractTrafficShapingHandler.class.getName() + ".READ_SUSPENDED");
private static final AttributeKey<Runnable> REOPEN_TASK = AttributeKey.valueOf(AbstractTrafficShapingHandler.class
.getName() + ".REOPEN_TASK");
/**
*
* @param newTrafficCounter the TrafficCounter to set
* @param newTrafficCounter
* the TrafficCounter to set
*/
void setTrafficCounter(TrafficCounter newTrafficCounter) {
trafficCounter = newTrafficCounter;
@ -88,59 +102,76 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
/**
* @param writeLimit
* 0 or a limit in bytes/s
* 0 or a limit in bytes/s
* @param readLimit
* 0 or a limit in bytes/s
* 0 or a limit in bytes/s
* @param checkInterval
* The delay between two computations of performances for
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed
* @param maxTime
* The maximum delay to wait in case of traffic excess
*/
protected AbstractTrafficShapingHandler(long writeLimit, long readLimit,
long checkInterval) {
protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval, long maxTime) {
this.writeLimit = writeLimit;
this.readLimit = readLimit;
this.checkInterval = checkInterval;
this.maxTime = maxTime;
}
/**
* @param writeLimit
* 0 or a limit in bytes/s
* @param readLimit
* 0 or a limit in bytes/s
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed
*/
protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval) {
this(writeLimit, readLimit, checkInterval, DEFAULT_MAX_TIME);
}
/**
* Constructor using default Check Interval
*
* @param writeLimit
* 0 or a limit in bytes/s
* 0 or a limit in bytes/s
* @param readLimit
* 0 or a limit in bytes/s
* 0 or a limit in bytes/s
*/
protected AbstractTrafficShapingHandler(long writeLimit, long readLimit) {
this(writeLimit, readLimit, DEFAULT_CHECK_INTERVAL);
this(writeLimit, readLimit, DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
}
/**
* Constructor using NO LIMIT and default Check Interval
*/
protected AbstractTrafficShapingHandler() {
this(0, 0, DEFAULT_CHECK_INTERVAL);
this(0, 0, DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
}
/**
* Constructor using NO LIMIT
*
* @param checkInterval
* The delay between two computations of performances for
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed
*/
protected AbstractTrafficShapingHandler(long checkInterval) {
this(0, 0, checkInterval);
this(0, 0, checkInterval, DEFAULT_MAX_TIME);
}
/**
* Change the underlying limitations and check interval.
*
* @param newWriteLimit The new write limit (in bytes)
* @param newReadLimit The new read limit (in bytes)
* @param newCheckInterval The new check interval (in milliseconds)
* @param newWriteLimit
* The new write limit (in bytes)
* @param newReadLimit
* The new read limit (in bytes)
* @param newCheckInterval
* The new check interval (in milliseconds)
*/
public void configure(long newWriteLimit, long newReadLimit,
long newCheckInterval) {
public void configure(long newWriteLimit, long newReadLimit, long newCheckInterval) {
configure(newWriteLimit, newReadLimit);
configure(newCheckInterval);
}
@ -148,8 +179,10 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
/**
* Change the underlying limitations.
*
* @param newWriteLimit The new write limit (in bytes)
* @param newReadLimit The new read limit (in bytes)
* @param newWriteLimit
* The new write limit (in bytes)
* @param newReadLimit
* The new read limit (in bytes)
*/
public void configure(long newWriteLimit, long newReadLimit) {
writeLimit = newWriteLimit;
@ -162,7 +195,8 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
/**
* Change the check interval.
*
* @param newCheckInterval The new check interval (in milliseconds)
* @param newCheckInterval
* The new check interval (in milliseconds)
*/
public void configure(long newCheckInterval) {
checkInterval = newCheckInterval;
@ -171,6 +205,73 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
}
}
/**
* @return the writeLimit
*/
public long getWriteLimit() {
return writeLimit;
}
/**
* @param writeLimit the writeLimit to set
*/
public void setWriteLimit(long writeLimit) {
this.writeLimit = writeLimit;
if (trafficCounter != null) {
trafficCounter.resetAccounting(System.currentTimeMillis() + 1);
}
}
/**
* @return the readLimit
*/
public long getReadLimit() {
return readLimit;
}
/**
* @param readLimit the readLimit to set
*/
public void setReadLimit(long readLimit) {
this.readLimit = readLimit;
if (trafficCounter != null) {
trafficCounter.resetAccounting(System.currentTimeMillis() + 1);
}
}
/**
* @return the checkInterval
*/
public long getCheckInterval() {
return checkInterval;
}
/**
* @param checkInterval the checkInterval to set
*/
public void setCheckInterval(long checkInterval) {
this.checkInterval = checkInterval;
if (trafficCounter != null) {
trafficCounter.configure(checkInterval);
}
}
/**
*
* @param maxTime
* Max delay in wait, shall be less than TIME OUT in related protocol
*/
public void setMaxTimeWait(long maxTime) {
this.maxTime = maxTime;
}
/**
* @return the max delay in wait
*/
public long getMaxTimeWait() {
return maxTime;
}
/**
* Called each time the accounting is computed from the TrafficCounters.
* This method could be used for instance to implement almost real time accounting.
@ -178,7 +279,6 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
* @param counter
* the TrafficCounter that computes its performance
*/
@SuppressWarnings("unused")
protected void doAccounting(TrafficCounter counter) {
// NOOP by default
}
@ -188,111 +288,128 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
*/
private static final class ReopenReadTimerTask implements Runnable {
final ChannelHandlerContext ctx;
ReopenReadTimerTask(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void run() {
ctx.attr(READ_SUSPENDED).set(false);
ctx.read();
if (!ctx.channel().config().isAutoRead() && isHandlerActive(ctx)) {
// If AutoRead is False and Active is True, user make a direct setAutoRead(false)
// Then Just reset the status
if (logger.isDebugEnabled()) {
logger.debug("Channel:" + ctx.channel().hashCode() +
" Not Unsuspend: " + ctx.channel().config().isAutoRead() + ":" + isHandlerActive(ctx));
}
ctx.attr(READ_SUSPENDED).set(false);
} else {
// Anything else allows the handler to reset the AutoRead
if (logger.isDebugEnabled()) {
if (ctx.channel().config().isAutoRead() && !isHandlerActive(ctx)) {
logger.debug("Channel:" + ctx.channel().hashCode() +
" Unsuspend: " + ctx.channel().config().isAutoRead() + ":" + isHandlerActive(ctx));
} else {
logger.debug("Channel:" + ctx.channel().hashCode() +
" Normal Unsuspend: " + ctx.channel().config().isAutoRead() + ":"
+ isHandlerActive(ctx));
}
}
ctx.attr(READ_SUSPENDED).set(false);
ctx.channel().config().setAutoRead(true);
ctx.channel().read();
}
if (logger.isDebugEnabled()) {
logger.debug("Channel:" + ctx.channel().hashCode() +
" Unsupsend final status => " + ctx.channel().config().isAutoRead() + ":"
+ isHandlerActive(ctx));
}
}
}
/**
* @return the time that should be necessary to wait to respect limit. Can be negative time
*/
private static long getTimeToWait(long limit, long bytes, long lastTime, long curtime) {
long interval = curtime - lastTime;
if (interval <= 0) {
// Time is too short, so just lets continue
return 0;
}
return (bytes * 1000 / limit - interval) / 10 * 10;
}
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
long size = calculateSize(msg);
long curtime = System.currentTimeMillis();
if (trafficCounter != null) {
trafficCounter.bytesRecvFlowControl(size);
if (readLimit == 0) {
// no action
ctx.fireChannelRead(msg);
return;
}
if (size > 0 && trafficCounter != null) {
// compute the number of ms to wait before reopening the channel
long wait = getTimeToWait(readLimit,
trafficCounter.currentReadBytes(),
trafficCounter.lastTime(), curtime);
long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime);
if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
// time in order to
// try to limit the traffic
if (!isSuspended(ctx)) {
// time in order to try to limit the traffic
// Only AutoRead AND HandlerActive True means Context Active
if (logger.isDebugEnabled()) {
logger.debug("Channel:" + ctx.channel().hashCode() +
" Read Suspend: " + wait + ":" + ctx.channel().config().isAutoRead() + ":"
+ isHandlerActive(ctx));
}
if (ctx.channel().config().isAutoRead() && isHandlerActive(ctx)) {
ctx.channel().config().setAutoRead(false);
ctx.attr(READ_SUSPENDED).set(true);
// Create a Runnable to reactive the read if needed. If one was create before it will just be
// reused to limit object creation
Attribute<Runnable> attr = ctx.attr(REOPEN_TASK);
Attribute<Runnable> attr = ctx.attr(REOPEN_TASK);
Runnable reopenTask = attr.get();
if (reopenTask == null) {
reopenTask = new ReopenReadTimerTask(ctx);
attr.set(reopenTask);
}
ctx.executor().schedule(reopenTask, wait,
TimeUnit.MILLISECONDS);
ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
if (logger.isDebugEnabled()) {
logger.debug("Channel:" + ctx.channel().hashCode() +
" Suspend final status => " + ctx.channel().config().isAutoRead() + ":"
+ isHandlerActive(ctx) +
" will reopened at: " + wait);
}
}
}
}
ctx.fireChannelRead(msg);
}
@Override
public void read(ChannelHandlerContext ctx) {
if (!isSuspended(ctx)) {
ctx.read();
}
protected static boolean isHandlerActive(ChannelHandlerContext ctx) {
Boolean suspended = ctx.attr(READ_SUSPENDED).get();
return suspended == null || Boolean.FALSE.equals(suspended);
}
private static boolean isSuspended(ChannelHandlerContext ctx) {
Boolean suspended = ctx.attr(READ_SUSPENDED).get();
return !(suspended == null || Boolean.FALSE.equals(suspended));
@Override
public void read(ChannelHandlerContext ctx) {
if (isHandlerActive(ctx)) {
// For Global Traffic (and Read when using EventLoop in pipeline) : check if READ_SUSPENDED is False
ctx.read();
}
}
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
throws Exception {
long curtime = System.currentTimeMillis();
long size = calculateSize(msg);
if (size > -1 && trafficCounter != null) {
trafficCounter.bytesWriteFlowControl(size);
if (writeLimit == 0) {
ctx.write(msg, promise);
return;
}
// compute the number of ms to wait before continue with the
// channel
long wait = getTimeToWait(writeLimit,
trafficCounter.currentWrittenBytes(),
trafficCounter.lastTime(), curtime);
if (size > 0 && trafficCounter != null) {
// compute the number of ms to wait before continue with the channel
long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime);
if (wait >= MINIMAL_WAIT) {
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
ctx.write(msg, promise);
}
}, wait, TimeUnit.MILLISECONDS);
/*
* Option 2: but issue with ctx.executor().schedule()
* Thread.sleep(wait);
* System.out.println("Write unsuspended");
* Option 1: use an ordered list of messages to send
* Warning of memory pressure!
*/
if (logger.isDebugEnabled()) {
logger.debug("Channel:" + ctx.channel().hashCode() +
" Write suspend: " + wait + ":" + ctx.channel().config().isAutoRead() + ":"
+ isHandlerActive(ctx));
}
submitWrite(ctx, msg, wait, promise);
return;
}
}
ctx.write(msg, promise);
// to keep message order if not using option 2
submitWrite(ctx, msg, 0, promise);
}
protected abstract void submitWrite(final ChannelHandlerContext ctx, final Object msg, final long delay,
final ChannelPromise promise);
/**
*
* @return the current TrafficCounter (if
@ -304,17 +421,18 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
@Override
public String toString() {
return "TrafficShaping with Write Limit: " + writeLimit +
" Read Limit: " + readLimit + " and Counter: " +
(trafficCounter != null? trafficCounter.toString() : "none");
return "TrafficShaping with Write Limit: " + writeLimit + " Read Limit: " + readLimit + " and Counter: "
+ (trafficCounter != null ? trafficCounter.toString() : "none");
}
/**
* Calculate the size of the given {@link Object}.
*
* This implementation supports {@link ByteBuf} and {@link ByteBufHolder}. Sub-classes may override this.
* @param msg the msg for which the size should be calculated
* @return size the size of the msg or {@code -1} if unknown.
*
* @param msg
* the msg for which the size should be calculated
* @return size the size of the msg or {@code -1} if unknown.
*/
protected long calculateSize(Object msg) {
if (msg instanceof ByteBuf) {

View File

@ -15,7 +15,13 @@
*/
package io.netty.handler.traffic;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
/**
* This implementation of the {@link AbstractTrafficShapingHandler} is for channel
@ -38,11 +44,32 @@ import io.netty.channel.ChannelHandlerContext;
* it is recommended to set a positive value, even if it is high since the precision of the
* Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
* the less precise the traffic shaping will be. It is suggested as higher value something close
* to 5 or 10 minutes.<br>
* to 5 or 10 minutes.<br><br>
*
* maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.<br>
* </li>
* </ul><br>
*/
public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler {
private List<ToSend> messagesQueue = new LinkedList<ToSend>();
/**
* Create a new instance
*
* @param writeLimit
* 0 or a limit in bytes/s
* @param readLimit
* 0 or a limit in bytes/s
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed
* @param maxTime
* The maximum delay to wait in case of traffic excess
*/
public ChannelTrafficShapingHandler(long writeLimit, long readLimit,
long checkInterval, long maxTime) {
super(writeLimit, readLimit, checkInterval, maxTime);
}
/**
* Create a new instance
@ -93,9 +120,57 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
public synchronized void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
if (trafficCounter != null) {
trafficCounter.stop();
}
for (ToSend toSend : messagesQueue) {
if (toSend.toSend instanceof ByteBuf) {
((ByteBuf) toSend.toSend).release();
}
}
messagesQueue.clear();
}
private static final class ToSend {
final long date;
final Object toSend;
final ChannelPromise promise;
private ToSend(final long delay, final Object toSend, final ChannelPromise promise) {
this.date = System.currentTimeMillis() + delay;
this.toSend = toSend;
this.promise = promise;
}
}
@Override
protected synchronized void submitWrite(final ChannelHandlerContext ctx, final Object msg, final long delay,
final ChannelPromise promise) {
if (delay == 0 && messagesQueue.isEmpty()) {
ctx.write(msg, promise);
return;
}
final ToSend newToSend = new ToSend(delay, msg, promise);
messagesQueue.add(newToSend);
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
sendAllValid(ctx);
}
}, delay, TimeUnit.MILLISECONDS);
}
private synchronized void sendAllValid(ChannelHandlerContext ctx) {
while (!messagesQueue.isEmpty()) {
ToSend newToSend = messagesQueue.remove(0);
if (newToSend.date <= System.currentTimeMillis()) {
ctx.write(newToSend.toSend, newToSend.promise);
} else {
messagesQueue.add(0, newToSend);
break;
}
}
ctx.flush();
}
}

View File

@ -15,10 +15,19 @@
*/
package io.netty.handler.traffic;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.util.concurrent.EventExecutor;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* This implementation of the {@link AbstractTrafficShapingHandler} is for global
@ -43,7 +52,9 @@ import java.util.concurrent.ScheduledExecutorService;
* it is recommended to set a positive value, even if it is high since the precision of the
* Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
* the less precise the traffic shaping will be. It is suggested as higher value something close
* to 5 or 10 minutes.<br>
* to 5 or 10 minutes.<br><br>
*
* maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.<br>
* </li>
* </ul><br>
*
@ -52,6 +63,8 @@ import java.util.concurrent.ScheduledExecutorService;
*/
@Sharable
public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
private Map<Integer, List<ToSend>> messagesQueues = new HashMap<Integer, List<ToSend>>();
/**
* Create the global TrafficCounter
*/
@ -65,6 +78,27 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
tc.start();
}
/**
* Create a new instance
*
* @param executor
* the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}
* @param writeLimit
* 0 or a limit in bytes/s
* @param readLimit
* 0 or a limit in bytes/s
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed
* @param maxTime
* The maximum delay to wait in case of traffic excess
*/
public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit, long readLimit,
long checkInterval, long maxTime) {
super(writeLimit, readLimit, checkInterval, maxTime);
createGlobalTrafficCounter(executor);
}
/**
* Create a new instance
*
@ -132,4 +166,74 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
trafficCounter.stop();
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Integer key = ctx.channel().hashCode();
List<ToSend> mq = new LinkedList<ToSend>();
messagesQueues.put(key, mq);
}
@Override
public synchronized void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Integer key = ctx.channel().hashCode();
List<ToSend> mq = messagesQueues.remove(key);
if (mq != null) {
for (ToSend toSend : mq) {
if (toSend.toSend instanceof ByteBuf) {
((ByteBuf) toSend.toSend).release();
}
}
mq.clear();
}
}
private static final class ToSend {
final long date;
final Object toSend;
final ChannelPromise promise;
private ToSend(final long delay, final Object toSend, final ChannelPromise promise) {
this.date = System.currentTimeMillis() + delay;
this.toSend = toSend;
this.promise = promise;
}
}
@Override
protected synchronized void submitWrite(final ChannelHandlerContext ctx, final Object msg, final long delay,
final ChannelPromise promise) {
Integer key = ctx.channel().hashCode();
List<ToSend> messagesQueue = messagesQueues.get(key);
if (delay == 0 && (messagesQueue == null || messagesQueue.isEmpty())) {
ctx.write(msg, promise);
return;
}
final ToSend newToSend = new ToSend(delay, msg, promise);
if (messagesQueue == null) {
messagesQueue = new LinkedList<ToSend>();
messagesQueues.put(key, messagesQueue);
}
messagesQueue.add(newToSend);
final List<ToSend> mqfinal = messagesQueue;
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
sendAllValid(ctx, mqfinal);
}
}, delay, TimeUnit.MILLISECONDS);
}
private synchronized void sendAllValid(final ChannelHandlerContext ctx, final List<ToSend> messagesQueue) {
while (!messagesQueue.isEmpty()) {
ToSend newToSend = messagesQueue.remove(0);
if (newToSend.date <= System.currentTimeMillis()) {
ctx.write(newToSend.toSend, newToSend.promise);
} else {
messagesQueue.add(0, newToSend);
break;
}
}
ctx.flush();
}
}

View File

@ -1,12 +1,9 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@ -15,6 +12,9 @@
*/
package io.netty.handler.traffic;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@ -24,15 +24,20 @@ import java.util.concurrent.atomic.AtomicLong;
/**
* TrafficCounter is associated with {@link AbstractTrafficShapingHandler}.
*
* <p>A <tt>TrafficCounter</tt> counts the read and written bytes such that the
* {@link AbstractTrafficShapingHandler} can limit the traffic, globally or per channel.</p>
* <p>
* A <tt>TrafficCounter</tt> counts the read and written bytes such that the {@link AbstractTrafficShapingHandler}
* can limit the traffic, globally or per channel.
* </p>
*
* <p>It computes the statistics for both read and written every {@link #checkInterval}, and calls
* back to its parent {@link AbstractTrafficShapingHandler#doAccounting} method. If the checkInterval
* is set to 0, no accounting will be done and statistics will only be computed at each receive or
* write operation.</p>
* <p>
* It computes the statistics for both read and written every {@link #checkInterval}, and calls back to its parent
* {@link AbstractTrafficShapingHandler#doAccounting} method. If the checkInterval is set to 0, no accounting will be
* done and statistics will only be computed at each receive or write operation.
* </p>
*/
public class TrafficCounter {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(TrafficCounter.class);
/**
* Current written bytes
*/
@ -83,11 +88,30 @@ public class TrafficCounter {
*/
private long lastReadBytes;
/**
* Last non 0 written bytes number during last check interval
*/
private long lastNonNullWrittenBytes;
/**
* Last time written bytes with non 0 written bytes
*/
private long lastNonNullWrittenTime;
/**
* Last time read bytes with non 0 written bytes
*/
private long lastNonNullReadTime;
/**
* Last non 0 read bytes number during last check interval
*/
private long lastNonNullReadBytes;
/**
* Delay between two captures
*/
final AtomicLong checkInterval = new AtomicLong(
AbstractTrafficShapingHandler.DEFAULT_CHECK_INTERVAL);
final AtomicLong checkInterval = new AtomicLong(AbstractTrafficShapingHandler.DEFAULT_CHECK_INTERVAL);
// default 1 s
@ -135,12 +159,12 @@ public class TrafficCounter {
private final TrafficCounter counter;
/**
* @param trafficShapingHandler The parent handler to which this task needs to callback to for accounting
* @param counter The parent TrafficCounter that we need to reset the statistics for
* @param trafficShapingHandler
* The parent handler to which this task needs to callback to for accounting
* @param counter
* The parent TrafficCounter that we need to reset the statistics for
*/
protected TrafficMonitoringTask(
AbstractTrafficShapingHandler trafficShapingHandler,
TrafficCounter counter) {
protected TrafficMonitoringTask(AbstractTrafficShapingHandler trafficShapingHandler, TrafficCounter counter) {
trafficShapingHandler1 = trafficShapingHandler;
this.counter = counter;
}
@ -156,7 +180,7 @@ public class TrafficCounter {
trafficShapingHandler1.doAccounting(counter);
}
counter.scheduledFuture = counter.executor.schedule(this, counter.checkInterval.get(),
TimeUnit.MILLISECONDS);
TimeUnit.MILLISECONDS);
}
}
@ -171,8 +195,7 @@ public class TrafficCounter {
if (checkInterval.get() > 0) {
monitorActive.set(true);
monitor = new TrafficMonitoringTask(trafficShapingHandler, this);
scheduledFuture =
executor.schedule(monitor, checkInterval.get(), TimeUnit.MILLISECONDS);
scheduledFuture = executor.schedule(monitor, checkInterval.get(), TimeUnit.MILLISECONDS);
}
}
@ -196,7 +219,8 @@ public class TrafficCounter {
/**
* Reset the accounting on Read and Write
*
* @param newLastTime the millisecond unix timestamp that we should be considered up-to-date for
* @param newLastTime
* the millisecond unix timestamp that we should be considered up-to-date for
*/
synchronized void resetAccounting(long newLastTime) {
long interval = newLastTime - lastTime.getAndSet(newLastTime);
@ -204,24 +228,40 @@ public class TrafficCounter {
// nothing to do
return;
}
if (logger.isDebugEnabled() && interval > 2 * checkInterval()) {
logger.debug("Acct schedule not ok: " + interval + " > 2*" + checkInterval() + " from " + name);
}
lastReadBytes = currentReadBytes.getAndSet(0);
lastWrittenBytes = currentWrittenBytes.getAndSet(0);
lastReadThroughput = lastReadBytes / interval * 1000;
// nb byte / checkInterval in ms * 1000 (1s)
lastWriteThroughput = lastWrittenBytes / interval * 1000;
// nb byte / checkInterval in ms * 1000 (1s)
if (lastWrittenBytes > 0) {
lastNonNullWrittenBytes = lastWrittenBytes;
lastNonNullWrittenTime = newLastTime;
}
if (lastReadBytes > 0) {
lastNonNullReadBytes = lastReadBytes;
lastNonNullReadTime = newLastTime;
}
}
/**
* Constructor with the {@link AbstractTrafficShapingHandler} that hosts it, the Timer to use, its
* name, the checkInterval between two computations in millisecond
* @param trafficShapingHandler the associated AbstractTrafficShapingHandler
* @param executor the underlying executor service for scheduling checks
* @param name the name given to this monitor
* @param checkInterval the checkInterval in millisecond between two computations
*
* @param trafficShapingHandler
* the associated AbstractTrafficShapingHandler
* @param executor
* the underlying executor service for scheduling checks
* @param name
* the name given to this monitor
* @param checkInterval
* the checkInterval in millisecond between two computations
*/
public TrafficCounter(AbstractTrafficShapingHandler trafficShapingHandler,
ScheduledExecutorService executor, String name, long checkInterval) {
public TrafficCounter(AbstractTrafficShapingHandler trafficShapingHandler, ScheduledExecutorService executor,
String name, long checkInterval) {
this.trafficShapingHandler = trafficShapingHandler;
this.executor = executor;
this.name = name;
@ -232,7 +272,8 @@ public class TrafficCounter {
/**
* Change checkInterval between two computations in millisecond
*
* @param newcheckInterval The new check interval (in milliseconds)
* @param newcheckInterval
* The new check interval (in milliseconds)
*/
public void configure(long newcheckInterval) {
long newInterval = newcheckInterval / 10 * 10;
@ -313,9 +354,9 @@ public class TrafficCounter {
}
/**
*
* @return the current number of bytes read since the last checkInterval
*/
*
* @return the current number of bytes read since the last checkInterval
*/
public long currentReadBytes() {
return currentReadBytes.get();
}
@ -351,7 +392,7 @@ public class TrafficCounter {
/**
* @return the lastCumulativeTime in millisecond as of System.currentTimeMillis()
* when the cumulative counters were reset to 0.
* when the cumulative counters were reset to 0.
*/
public long lastCumulativeTime() {
return lastCumulativeTime;
@ -373,15 +414,124 @@ public class TrafficCounter {
return name;
}
/**
* Returns the time to wait (if any) for the given length message, using the given limitTraffic and the max wait
* time
*
* @param size
* the recv size
* @param limitTraffic
* the traffic limit in bytes per second
* @param maxTime
* the max time in ms to wait in case of excess of traffic
* @return the current time to wait (in ms) if needed for Read operation
*/
public synchronized long readTimeToWait(final long size, final long limitTraffic, final long maxTime) {
final long now = System.currentTimeMillis();
bytesRecvFlowControl(size);
if (limitTraffic == 0) {
return 0;
}
long sum = currentReadBytes.get();
long interval = now - lastTime.get();
// Short time checking
if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT && sum > 0) {
long time = (sum * 1000 / limitTraffic - interval) / 10 * 10;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
logger.debug("Time: " + time + ":" + sum + ":" + interval);
}
return time > maxTime ? maxTime : time;
}
return 0;
}
// long time checking
if (lastNonNullReadBytes > 0 && lastNonNullReadTime + AbstractTrafficShapingHandler.MINIMAL_WAIT < now) {
long lastsum = sum + lastNonNullReadBytes;
long lastinterval = now - lastNonNullReadTime;
long time = (lastsum * 1000 / limitTraffic - lastinterval) / 10 * 10;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval);
}
return time > maxTime ? maxTime : time;
}
} else {
// final "middle" time checking in case resetAccounting called very recently
sum += lastReadBytes;
long lastinterval = AbstractTrafficShapingHandler.MINIMAL_WAIT;
long time = (sum * 1000 / limitTraffic - lastinterval) / 10 * 10;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
logger.debug("Time: " + time + ":" + sum + ":" + lastinterval);
}
return time > maxTime ? maxTime : time;
}
}
return 0;
}
/**
* Returns the time to wait (if any) for the given length message, using the given limitTraffic and
* the max wait time
*
* @param size
* the write size
* @param limitTraffic
* the traffic limit in bytes per second
* @param maxTime
* the max time in ms to wait in case of excess of traffic
* @return the current time to wait (in ms) if needed for Write operation
*/
public synchronized long writeTimeToWait(final long size, final long limitTraffic, final long maxTime) {
bytesWriteFlowControl(size);
if (limitTraffic == 0) {
return 0;
}
long sum = currentWrittenBytes.get();
final long now = System.currentTimeMillis();
long interval = now - lastTime.get();
if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT && sum > 0) {
long time = (sum * 1000 / limitTraffic - interval) / 10 * 10;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
logger.debug("Time: " + time + ":" + sum + ":" + interval);
}
return time > maxTime ? maxTime : time;
}
return 0;
}
if (lastNonNullWrittenBytes > 0 && lastNonNullWrittenTime + AbstractTrafficShapingHandler.MINIMAL_WAIT < now) {
long lastsum = sum + lastNonNullWrittenBytes;
long lastinterval = now - lastNonNullWrittenTime;
long time = (lastsum * 1000 / limitTraffic - lastinterval) / 10 * 10;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval);
}
return time > maxTime ? maxTime : time;
}
} else {
sum += lastWrittenBytes;
long lastinterval = AbstractTrafficShapingHandler.MINIMAL_WAIT + Math.abs(interval);
long time = (sum * 1000 / limitTraffic - lastinterval) / 10 * 10;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
logger.debug("Time: " + time + ":" + sum + ":" + lastinterval);
}
return time > maxTime ? maxTime : time;
}
}
return 0;
}
/**
* String information
*/
@Override
public String toString() {
return "Monitor " + name + " Current Speed Read: " +
(lastReadThroughput >> 10) + " KB/s, Write: " +
(lastWriteThroughput >> 10) + " KB/s Current Read: " +
(currentReadBytes.get() >> 10) + " KB Current Write: " +
(currentWrittenBytes.get() >> 10) + " KB";
return "Monitor " + name + " Current Speed Read: " + (lastReadThroughput >> 10) + " KB/s, Write: "
+ (lastWriteThroughput >> 10) + " KB/s Current Read: " + (currentReadBytes.get() >> 10)
+ " KB Current Write: " + (currentWrittenBytes.get() >> 10) + " KB";
}
}

View File

@ -0,0 +1,588 @@
/*
* Copyright 2012 The Netty Project
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.testsuite.transport.socket;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.traffic.AbstractTrafficShapingHandler;
import io.netty.handler.traffic.ChannelTrafficShapingHandler;
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.netty.util.internal.logging.Slf4JLoggerFactory;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.LoggerFactory;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.*;
public class TrafficShapingTest extends AbstractSocketTest {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(TrafficShapingTest.class);
private static final InternalLogger loggerServer = InternalLoggerFactory.getInstance(ValidTimestampedHandler.class);
private static final InternalLogger loggerClient = InternalLoggerFactory.getInstance(ClientTrafficHandler.class);
static final int messageSize = 1024;
static final int bandwidthFactor = 15;
static final int minfactor = bandwidthFactor - (bandwidthFactor / 2);
static final int maxfactor = bandwidthFactor + (bandwidthFactor / 2);
static final long stepms = 1000 / bandwidthFactor;
static final long minimalms = Math.max(stepms / 2, 20) / 10 * 10;
static final long check = Math.max(Math.min(100, minimalms / 2) / 10 * 10, 20);
private static final Random random = new Random();
static final byte[] data = new byte[messageSize];
private static final String TRAFFIC = "traffic";
private static EventExecutorGroup group;
private static ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
static {
random.nextBytes(data);
}
@BeforeClass
public static void createGroup() {
logger.info("Bandwidth: " + minfactor + " <= " + bandwidthFactor + " <= " + maxfactor +
" StepMs: " + stepms + " MinMs: " + minimalms + " CheckMs: " + check);
InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
Logger logger = (Logger) LoggerFactory.getLogger("ROOT");
logger.setLevel(Level.INFO);
group = new DefaultEventExecutorGroup(8);
}
@AfterClass
public static void destroyGroup() throws Exception {
group.shutdownGracefully().sync();
}
private static long[] computeWaitRead(int[] multipleMessage) {
long[] minimalWaitBetween = new long[multipleMessage.length + 1];
minimalWaitBetween[0] = 0;
for (int i = 0; i < multipleMessage.length; i++) {
minimalWaitBetween[i + 1] = (multipleMessage[i] - 1) * stepms + minimalms;
}
return minimalWaitBetween;
}
private static long[] computeWaitWrite(int[] multipleMessage) {
long[] minimalWaitBetween = new long[multipleMessage.length + 1];
for (int i = 0; i < multipleMessage.length; i++) {
minimalWaitBetween[i] = (multipleMessage[i] - 1) * stepms + minimalms;
}
return minimalWaitBetween;
}
@Test(timeout = 10000)
public void testNoTrafficShapping() throws Throwable {
logger.info("TEST NO TRAFFIC");
run();
}
public void testNoTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
int[] autoRead = null;
int[] multipleMessage = { 1, 2, 1 };
long[] minimalWaitBetween = null;
testTrafficShapping0(sb, cb, false, false, false, false, autoRead, minimalWaitBetween, multipleMessage);
}
@Test(timeout = 20000)
public void testExecNoTrafficShapping() throws Throwable {
logger.info("TEST EXEC NO TRAFFIC");
run();
}
public void testExecNoTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
int[] autoRead = null;
int[] multipleMessage = { 1, 2, 1 };
long[] minimalWaitBetween = null;
testTrafficShapping0(sb, cb, true, false, false, false, autoRead, minimalWaitBetween, multipleMessage);
}
@Test(timeout = 10000)
public void testWriteTrafficShapping() throws Throwable {
logger.info("TEST WRITE");
run();
}
public void testWriteTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
int[] autoRead = null;
int[] multipleMessage = { 1, 2, 1 };
long[] minimalWaitBetween = computeWaitWrite(multipleMessage);
testTrafficShapping0(sb, cb, false, false, true, false, autoRead, minimalWaitBetween, multipleMessage);
}
@Test(timeout = 10000)
public void testReadTrafficShapping() throws Throwable {
logger.info("TEST READ");
run();
}
public void testReadTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
int[] autoRead = null;
int[] multipleMessage = { 1, 2, 1, 1 };
long[] minimalWaitBetween = computeWaitRead(multipleMessage);
testTrafficShapping0(sb, cb, false, true, false, false, autoRead, minimalWaitBetween, multipleMessage);
}
@Test(timeout = 10000)
public void testWrite1TrafficShapping() throws Throwable {
logger.info("TEST WRITE");
run();
}
public void testWrite1TrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
int[] autoRead = null;
int[] multipleMessage = { 1, 1, 1 };
long[] minimalWaitBetween = computeWaitWrite(multipleMessage);
testTrafficShapping0(sb, cb, false, false, true, false, autoRead, minimalWaitBetween, multipleMessage);
}
@Test(timeout = 10000)
public void testRead1TrafficShapping() throws Throwable {
logger.info("TEST READ");
run();
}
public void testRead1TrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
int[] autoRead = null;
int[] multipleMessage = { 1, 1, 1 };
long[] minimalWaitBetween = computeWaitRead(multipleMessage);
testTrafficShapping0(sb, cb, false, true, false, false, autoRead, minimalWaitBetween, multipleMessage);
}
@Test(timeout = 20000)
public void testExecWriteTrafficShapping() throws Throwable {
logger.info("TEST EXEC WRITE");
run();
}
public void testExecWriteTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
int[] autoRead = null;
int[] multipleMessage = { 1, 2, 1 };
long[] minimalWaitBetween = computeWaitWrite(multipleMessage);
testTrafficShapping0(sb, cb, true, false, true, false, autoRead, minimalWaitBetween, multipleMessage);
}
@Test(timeout = 10000)
public void testExecReadTrafficShapping() throws Throwable {
logger.info("TEST EXEC READ");
run();
}
public void testExecReadTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
int[] autoRead = null;
int[] multipleMessage = { 1, 2, 1, 1 };
long[] minimalWaitBetween = computeWaitRead(multipleMessage);
testTrafficShapping0(sb, cb, true, true, false, false, autoRead, minimalWaitBetween, multipleMessage);
}
@Test(timeout = 10000)
public void testWriteGlobalTrafficShapping() throws Throwable {
logger.info("TEST GLOBAL WRITE");
run();
}
public void testWriteGlobalTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
int[] autoRead = null;
int[] multipleMessage = { 1, 2, 1 };
long[] minimalWaitBetween = computeWaitWrite(multipleMessage);
testTrafficShapping0(sb, cb, false, false, true, true, autoRead, minimalWaitBetween, multipleMessage);
}
@Test(timeout = 10000)
public void testReadGlobalTrafficShapping() throws Throwable {
logger.info("TEST GLOBAL READ");
run();
}
public void testReadGlobalTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
int[] autoRead = null;
int[] multipleMessage = { 1, 2, 1, 1 };
long[] minimalWaitBetween = computeWaitRead(multipleMessage);
testTrafficShapping0(sb, cb, false, true, false, true, autoRead, minimalWaitBetween, multipleMessage);
}
@Test(timeout = 10000)
public void testAutoReadTrafficShapping() throws Throwable {
logger.info("TEST AUTO READ");
run();
}
public void testAutoReadTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 };
int[] multipleMessage = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
long[] minimalWaitBetween = computeWaitRead(multipleMessage);
testTrafficShapping0(sb, cb, false, true, false, false, autoRead, minimalWaitBetween, multipleMessage);
}
@Test(timeout = 10000)
public void testAutoReadGlobalTrafficShapping() throws Throwable {
logger.info("TEST AUTO READ GLOBAL");
run();
}
public void testAutoReadGlobalTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 };
int[] multipleMessage = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
long[] minimalWaitBetween = computeWaitRead(multipleMessage);
testTrafficShapping0(sb, cb, false, true, false, true, autoRead, minimalWaitBetween, multipleMessage);
}
@Test(timeout = 10000)
public void testAutoReadExecTrafficShapping() throws Throwable {
logger.info("TEST AUTO READ EXEC");
run();
}
public void testAutoReadExecTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 };
int[] multipleMessage = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
long[] minimalWaitBetween = computeWaitRead(multipleMessage);
testTrafficShapping0(sb, cb, true, true, false, false, autoRead, minimalWaitBetween, multipleMessage);
}
@Test(timeout = 10000)
public void testAutoReadExecGlobalTrafficShapping() throws Throwable {
logger.info("TEST AUTO READ EXEC GLOBAL");
run();
}
public void testAutoReadExecGlobalTrafficShapping(ServerBootstrap sb, Bootstrap cb) throws Throwable {
int[] autoRead = { 1, -1, -1, 1, -2, 0, 1, 0, -3, 0, 1, 2, 0 };
int[] multipleMessage = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
long[] minimalWaitBetween = computeWaitRead(multipleMessage);
testTrafficShapping0(sb, cb, true, true, false, true, autoRead, minimalWaitBetween, multipleMessage);
}
/**
*
* @param sb
* @param cb
* @param additionalExecutor
* shall the pipeline add the handler using an additionnal executor
* @param limitRead
* True to set Read Limit on Server side
* @param limitWrite
* True to set Write Limit on Client side
* @param globalLimit
* True to change Channel to Global TrafficShapping
* @param autoRead
* @param minimalWaitBetween
* time in ms that should be waited before getting the final result (note: for READ the values are
* right shifted once, the first value being 0)
* @param multipleMessage
* how many message to send at each step (for READ: the first should be 1, as the two last steps to
* ensure correct testing)
* @throws Throwable
*/
private static void testTrafficShapping0(ServerBootstrap sb, Bootstrap cb, final boolean additionalExecutor,
final boolean limitRead, final boolean limitWrite, final boolean globalLimit, int[] autoRead,
long[] minimalWaitBetween, int[] multipleMessage) throws Throwable {
logger.info("Exec: " + additionalExecutor + " Read: " + limitRead + " Write: " + limitWrite + " Global: "
+ globalLimit);
final ValidTimestampedHandler sh = new ValidTimestampedHandler(autoRead, multipleMessage);
Promise<Boolean> promise = group.next().newPromise();
final ClientTrafficHandler ch = new ClientTrafficHandler(promise, minimalWaitBetween, multipleMessage,
autoRead);
final AbstractTrafficShapingHandler handler;
if (limitRead) {
if (globalLimit) {
handler = new GlobalTrafficShapingHandler(group, 0, bandwidthFactor * messageSize, check);
} else {
handler = new ChannelTrafficShapingHandler(0, bandwidthFactor * messageSize, check);
}
} else if (limitWrite) {
if (globalLimit) {
handler = new GlobalTrafficShapingHandler(group, bandwidthFactor * messageSize, 0, check);
} else {
handler = new ChannelTrafficShapingHandler(bandwidthFactor * messageSize, 0, check);
}
} else {
handler = null;
}
sb.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel c) throws Exception {
if (limitRead) {
if (additionalExecutor) {
c.pipeline().addLast(group, TRAFFIC, handler);
} else {
c.pipeline().addLast(TRAFFIC, handler);
}
}
if (additionalExecutor) {
c.pipeline().addLast(group, sh);
} else {
c.pipeline().addLast(sh);
}
}
});
cb.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel c) throws Exception {
if (limitWrite) {
if (additionalExecutor) {
c.pipeline().addLast(group, TRAFFIC, handler);
} else {
c.pipeline().addLast(TRAFFIC, handler);
}
}
if (additionalExecutor) {
c.pipeline().addLast(group, ch);
} else {
c.pipeline().addLast(ch);
}
}
});
Channel sc = sb.bind().sync().channel();
Channel cc = cb.connect().sync().channel();
int totalNb = 0;
for (int i = 1; i < multipleMessage.length; i++) {
totalNb += multipleMessage[i];
}
Long start = System.currentTimeMillis();
int nb = multipleMessage[0];
for (int i = 0; i < nb; i++) {
cc.write(cc.alloc().buffer().writeBytes(data));
}
cc.flush();
promise.await();
Long stop = System.currentTimeMillis();
assertTrue("Error during exceution of TrafficShapping: " + promise.cause(), promise.isSuccess());
float average = (totalNb * messageSize) / (float) (stop - start);
logger.info("Average of traffic: " + average + " compare to " + bandwidthFactor);
sh.channel.close().sync();
ch.channel.close().sync();
sc.close().sync();
if (autoRead != null) {
// for extra release call in AutoRead
Thread.sleep(minimalms);
}
if (autoRead == null && minimalWaitBetween != null) {
assertTrue("Overall Traffic not ok since > " + maxfactor + ": " + average,
average <= maxfactor);
if (additionalExecutor) {
// Oio is not as good when using additionalExecutor
assertTrue("Overall Traffic not ok since < 0.25: " + average, average >= 0.25);
} else {
assertTrue("Overall Traffic not ok since < " + minfactor + ": " + average,
average >= minfactor);
}
}
if (handler != null && globalLimit) {
((GlobalTrafficShapingHandler) handler).release();
}
if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
throw sh.exception.get();
}
if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
throw ch.exception.get();
}
if (sh.exception.get() != null) {
throw sh.exception.get();
}
if (ch.exception.get() != null) {
throw ch.exception.get();
}
}
private static class ClientTrafficHandler extends SimpleChannelInboundHandler<ByteBuf> {
volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
volatile int step;
// first message will always be validated
private long currentLastTime = System.currentTimeMillis();
private final long[] minimalWaitBetween;
private final int[] multipleMessage;
private final int[] autoRead;
final Promise<Boolean> promise;
ClientTrafficHandler(Promise<Boolean> promise, long[] minimalWaitBetween, int[] multipleMessage,
int[] autoRead) {
this.minimalWaitBetween = minimalWaitBetween;
this.multipleMessage = Arrays.copyOf(multipleMessage, multipleMessage.length);
this.promise = promise;
this.autoRead = autoRead;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
channel = ctx.channel();
}
@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
long lastTimestamp = 0;
while (in.isReadable()) {
lastTimestamp = in.readLong();
multipleMessage[step]--;
}
if (multipleMessage[step] > 0) {
// still some message to get
return;
}
long minimalWait = (minimalWaitBetween != null) ? minimalWaitBetween[step] : 0;
int ar = 0;
if (autoRead != null) {
if (step > 0 && autoRead[step - 1] != 0) {
ar = autoRead[step - 1];
if (ar > 0) {
minimalWait = -1;
} else {
minimalWait = minimalms / 3;
}
} else {
minimalWait = 0;
}
}
loggerClient.info("Step: " + step + " Interval: " + (lastTimestamp - currentLastTime) + " compareTo "
+ minimalWait + " (" + ar + ")");
assertTrue("The interval of time is incorrect:" + (lastTimestamp - currentLastTime) + " not> "
+ minimalWait, lastTimestamp - currentLastTime >= minimalWait);
currentLastTime = lastTimestamp;
step++;
if (multipleMessage.length > step) {
int nb = multipleMessage[step];
for (int i = 0; i < nb; i++) {
channel.write(channel.alloc().buffer().writeBytes(data));
}
channel.flush();
} else {
promise.setSuccess(true);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (exception.compareAndSet(null, cause)) {
cause.printStackTrace();
promise.setFailure(cause);
ctx.close();
}
}
}
private static class ValidTimestampedHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final int[] autoRead;
private final int[] multipleMessage;
volatile Channel channel;
volatile int step;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
ValidTimestampedHandler(int[] autoRead, int[] multipleMessage) {
this.autoRead = autoRead;
this.multipleMessage = Arrays.copyOf(multipleMessage, multipleMessage.length);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
channel = ctx.channel();
}
@Override
public void channelRead0(final ChannelHandlerContext ctx, ByteBuf in) throws Exception {
byte[] actual = new byte[in.readableBytes()];
in.readBytes(actual);
long timestamp = System.currentTimeMillis();
int nb = actual.length / messageSize;
int isAutoRead = 0;
int laststep = step;
for (int i = 0; i < nb; i++) {
multipleMessage[step]--;
if (multipleMessage[step] == 0) {
if (autoRead != null) {
isAutoRead = autoRead[step];
}
step++;
}
}
if (laststep != step) {
if (autoRead != null && isAutoRead != 2) {
if (isAutoRead != 0) {
loggerServer.info("Set AutoRead: " + (isAutoRead > 0) + " Step: " + step);
channel.config().setAutoRead(isAutoRead > 0);
} else {
loggerServer.info("AutoRead: NO Step:" + step);
}
}
}
loggerServer.debug("Step: " + step + " Get: " + actual.length + " TS " + timestamp + " NB: " + nb);
for (int i = 0; i < nb; i++) {
channel.write(Unpooled.copyLong(timestamp));
}
channel.flush();
if (laststep != step) {
if (isAutoRead != 0) {
if (isAutoRead < 0) {
final int exactStep = step;
long wait = (isAutoRead == -1) ? minimalms : stepms + minimalms;
if (isAutoRead == -3) {
wait = stepms * 3;
}
executor.schedule(new Runnable() {
public void run() {
loggerServer.info("Reset AutoRead: Step " + exactStep);
channel.config().setAutoRead(true);
}
}, wait, TimeUnit.MILLISECONDS);
} else {
if (isAutoRead > 1) {
loggerServer.info("Will Set AutoRead: Rrue, Step: " + step);
executor.schedule(new Runnable() {
public void run() {
loggerServer.info("Set AutoRead: Rrue, Step: " + step);
channel.config().setAutoRead(true);
}
}, stepms + minimalms, TimeUnit.MILLISECONDS);
}
}
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (exception.compareAndSet(null, cause)) {
cause.printStackTrace();
ctx.close();
}
}
}
}