Fix big transfer and Write traffic shaping issues

Motivation:

Several issues were shown by various ticket (#2900 #2956).
Also use the improvement on writability user management from #3036.
And finally add a mixte handler, both for Global and Channels, with
the advantages of being uniquely created and using less memory and
less shaping.

Issue #2900

When a huge amount of data are written, the current behavior of the
TrafficShaping handler is to limit the delay to 15s, whatever the delay
the previous write has. This is wrong, and when a huge amount of writes
are done in a short time, the traffic is not correctly shapened.

Moreover, there is a high risk of OOM if one is not using in his/her own
handler for instance ChannelFuture.addListener() to handle the write
bufferisation in the TrafficShapingHandler.

This fix use the "user-defined writability flags" from #3036 to
allow the TrafficShapingHandlers to "user-defined" managed writability
directly, as for reading, thus using the default isWritable() and
channelWritabilityChanged().
This allows for instance HttpChunkedInput to be fully compatible.

The "bandwidth" compute on write is only on "acquired" write orders, not
on "real" write orders, which is wrong from statistic point of view.

Issue #2956

When using GlobalTrafficShaping, every write (and read) are
synchronized, thus leading to a drop of performance.
ChannelTrafficShaping is not touched by this issue since synchronized is
then correct (handler is per channel, so the synchronized).

Modifications:
The current write delay computation takes into account the previous
write delay and time to check is the 15s delay (maxTime) is really
exceeded or not (using last scheduled write time). The algorithm is
simplified and in the same time more accurate.

This proposal uses the #3036 improvement on user-defined writability
flags.

When the real write occurs, the statistics are update accordingly on a
new attribute (getRealWriteThroughput()).

To limit the synchronisations, all synchronized on
GlobalTrafficShapingHandler on submitWrite were removed. They are
replaced with a lock per channel (since synchronization is still needed
to prevent unordered write per channel), as in the sendAllValid method
for the very same reason.
Also all synchronized on TrafficCounter on read/writeTimeToWait() are
removed as they are unnecessary since already locked before by the
caller.
Still the creation and remove operations on lock per channel (PerChannel
object) are synchronized to prevent concurrency issue on this critical
part, but then limited.

Additionnal changes:
1) Use System.nanoTime() instead of System.currentTimeMillis() and
minimize calls
2) Remove / 10 ° 10 since no more sleep usage
3) Use nanoTime instead of currentTime such that time spend is computed,
not real time clock. Therefore the "now" relative time (nanoTime based)
is passed on all sub methods.
4) Take care of removal of the handler to force write all pending writes
and release read too
8) Review Javadoc to explicit:

- recommandations to take into account isWritable

- recommandations to provide reasonable message size according to
traffic shaping limit

- explicit "best effort" traffic shaping behavior when changing
configuration dynamically

Add a MixteGlobalChannelTrafficShapingHandler which allows to use only one
handler for mixing Global and Channel TSH. I enables to save more memory and
tries to optimize the traffic among various channels.

Result:
The traffic shaping is more stable, even with a huge number of writes in
short time by taking into consideration last scheduled write time.

The current implementation of TrafficShapingHandler using user-defined
writability flags and default isWritable() and
fireChannelWritabilityChanged works as expected.

The statistics are more valuable (asked write vs real write).

The Global TrafficShapingHandler should now have less "global"
synchronization, hoping to the minimum, but still per Channel as needed.

The GlobalChannel TrafficShapingHandler allows to have only one handler for all channels while still offering per channel in addition to global traffic shaping.

And finally maintain backward compatibility.
This commit is contained in:
Frederic Bregier 2014-10-25 13:16:56 +02:00 committed by Trustin Lee
parent e03bbc340e
commit b886c056bc
7 changed files with 1744 additions and 351 deletions

View File

@ -15,7 +15,9 @@ package io.netty.handler.traffic;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder; import io.netty.buffer.ByteBufHolder;
import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.util.Attribute; import io.netty.util.Attribute;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
@ -25,16 +27,15 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
* AbstractTrafficShapingHandler allows to limit the global bandwidth * <p>AbstractTrafficShapingHandler allows to limit the global bandwidth
* (see {@link GlobalTrafficShapingHandler}) or per session * (see {@link GlobalTrafficShapingHandler}) or per session
* bandwidth (see {@link ChannelTrafficShapingHandler}), as traffic shaping. * bandwidth (see {@link ChannelTrafficShapingHandler}), as traffic shaping.
* It allows you to implement an almost real time monitoring of the bandwidth using * It allows you to implement an almost real time monitoring of the bandwidth using
* the monitors from {@link TrafficCounter} that will call back every checkInterval * the monitors from {@link TrafficCounter} that will call back every checkInterval
* the method doAccounting of this handler.<br> * the method doAccounting of this handler.</p>
* <br>
* *
* If you want for any particular reasons to stop the monitoring (accounting) or to change * <p>If you want for any particular reasons to stop the monitoring (accounting) or to change
* the read/write limit or the check interval, several methods allow that for you:<br> * the read/write limit or the check interval, several methods allow that for you:</p>
* <ul> * <ul>
* <li><tt>configure</tt> allows you to change read or write limits, or the checkInterval</li> * <li><tt>configure</tt> allows you to change read or write limits, or the checkInterval</li>
* <li><tt>getTrafficCounter</tt> allows you to have access to the TrafficCounter and so to stop or start the * <li><tt>getTrafficCounter</tt> allows you to have access to the TrafficCounter and so to stop or start the
@ -56,6 +57,11 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
*/ */
public static final long DEFAULT_MAX_TIME = 15000; public static final long DEFAULT_MAX_TIME = 15000;
/**
* Default max size to not exceed in buffer (write only).
*/
static final long DEFAULT_MAX_SIZE = 4 * 1024 * 1024L;
/** /**
* Default minimal time to wait * Default minimal time to wait
*/ */
@ -69,30 +75,59 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
/** /**
* Limit in B/s to apply to write * Limit in B/s to apply to write
*/ */
private long writeLimit; private volatile long writeLimit;
/** /**
* Limit in B/s to apply to read * Limit in B/s to apply to read
*/ */
private long readLimit; private volatile long readLimit;
/** /**
* Max delay in wait * Max delay in wait
*/ */
protected long maxTime = DEFAULT_MAX_TIME; // default 15 s protected volatile long maxTime = DEFAULT_MAX_TIME; // default 15 s
/** /**
* Delay between two performance snapshots * Delay between two performance snapshots
*/ */
protected long checkInterval = DEFAULT_CHECK_INTERVAL; // default 1 s protected volatile long checkInterval = DEFAULT_CHECK_INTERVAL; // default 1 s
private static final AttributeKey<Boolean> READ_SUSPENDED = AttributeKey static final AttributeKey<Boolean> READ_SUSPENDED = AttributeKey
.valueOf(AbstractTrafficShapingHandler.class.getName() + ".READ_SUSPENDED"); .valueOf(AbstractTrafficShapingHandler.class.getName() + ".READ_SUSPENDED");
private static final AttributeKey<Runnable> REOPEN_TASK = AttributeKey.valueOf(AbstractTrafficShapingHandler.class static final AttributeKey<Runnable> REOPEN_TASK = AttributeKey.valueOf(AbstractTrafficShapingHandler.class
.getName() + ".REOPEN_TASK"); .getName() + ".REOPEN_TASK");
/** /**
* * Max time to delay before proposing to stop writing new objects from next handlers
*/
volatile long maxWriteDelay = 4 * DEFAULT_CHECK_INTERVAL; // default 4 s
/**
* Max size in the list before proposing to stop writing new objects from next handlers
*/
volatile long maxWriteSize = DEFAULT_MAX_SIZE; // default 4MB
/**
* Rank in UserDefinedWritability (1 for Channel, 2 for Global TrafficShapingHandler).
* Set in final constructor. Must be between 1 and 31
*/
final int userDefinedWritabilityIndex;
/**
* Default value for Channel UserDefinedWritability index
*/
static final int CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 1;
/**
* Default value for Global UserDefinedWritability index
*/
static final int GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 2;
/**
* Default value for GlobalChannel UserDefinedWritability index
*/
static final int GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 3;
/**
* @param newTrafficCounter * @param newTrafficCounter
* the TrafficCounter to set * the TrafficCounter to set
*/ */
@ -100,6 +135,23 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
trafficCounter = newTrafficCounter; trafficCounter = newTrafficCounter;
} }
/**
* @return the index to be used by the TrafficShapingHandler to manage the user defined writability.
* For Channel TSH it is defined as {@value #CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX},
* for Global TSH it is defined as {@value #GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX},
* for GlobalChannel TSH it is defined as
* {@value #GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX}.
*/
int userDefinedWritabilityIndex() {
if (this instanceof GlobalChannelTrafficShapingHandler) {
return GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
} else if (this instanceof GlobalTrafficShapingHandler) {
return GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
} else {
return CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
}
}
/** /**
* @param writeLimit * @param writeLimit
* 0 or a limit in bytes/s * 0 or a limit in bytes/s
@ -107,11 +159,17 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
* 0 or a limit in bytes/s * 0 or a limit in bytes/s
* @param checkInterval * @param checkInterval
* The delay between two computations of performances for * The delay between two computations of performances for
* channels or 0 if no stats are to be computed * channels or 0 if no stats are to be computed.
* @param maxTime * @param maxTime
* The maximum delay to wait in case of traffic excess * The maximum delay to wait in case of traffic excess.
* Must be positive.
*/ */
protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval, long maxTime) { protected AbstractTrafficShapingHandler(long writeLimit, long readLimit,
long checkInterval, long maxTime) {
if (maxTime <= 0) {
throw new IllegalArgumentException("maxTime must be positive");
}
this.userDefinedWritabilityIndex = userDefinedWritabilityIndex();
this.writeLimit = writeLimit; this.writeLimit = writeLimit;
this.readLimit = readLimit; this.readLimit = readLimit;
this.checkInterval = checkInterval; this.checkInterval = checkInterval;
@ -119,20 +177,22 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
} }
/** /**
* Constructor using default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
* @param writeLimit * @param writeLimit
* 0 or a limit in bytes/s * 0 or a limit in bytes/s
* @param readLimit * @param readLimit
* 0 or a limit in bytes/s * 0 or a limit in bytes/s
* @param checkInterval * @param checkInterval
* The delay between two computations of performances for * The delay between two computations of performances for
* channels or 0 if no stats are to be computed * channels or 0 if no stats are to be computed.
*/ */
protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval) { protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval) {
this(writeLimit, readLimit, checkInterval, DEFAULT_MAX_TIME); this(writeLimit, readLimit, checkInterval, DEFAULT_MAX_TIME);
} }
/** /**
* Constructor using default Check Interval * Constructor using default Check Interval value of {@value #DEFAULT_CHECK_INTERVAL} ms and
* default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
* *
* @param writeLimit * @param writeLimit
* 0 or a limit in bytes/s * 0 or a limit in bytes/s
@ -144,18 +204,20 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
} }
/** /**
* Constructor using NO LIMIT and default Check Interval * Constructor using NO LIMIT, default Check Interval value of {@value #DEFAULT_CHECK_INTERVAL} ms and
* default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
*/ */
protected AbstractTrafficShapingHandler() { protected AbstractTrafficShapingHandler() {
this(0, 0, DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME); this(0, 0, DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
} }
/** /**
* Constructor using NO LIMIT * Constructor using NO LIMIT and
* default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
* *
* @param checkInterval * @param checkInterval
* The delay between two computations of performances for * The delay between two computations of performances for
* channels or 0 if no stats are to be computed * channels or 0 if no stats are to be computed.
*/ */
protected AbstractTrafficShapingHandler(long checkInterval) { protected AbstractTrafficShapingHandler(long checkInterval) {
this(0, 0, checkInterval, DEFAULT_MAX_TIME); this(0, 0, checkInterval, DEFAULT_MAX_TIME);
@ -163,6 +225,11 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
/** /**
* Change the underlying limitations and check interval. * Change the underlying limitations and check interval.
* <p>Note the change will be taken as best effort, meaning
* that all already scheduled traffics will not be
* changed, but only applied to new traffics.</p>
* <p>So the expected usage of this method is to be used not too often,
* accordingly to the traffic shaping configuration.</p>
* *
* @param newWriteLimit * @param newWriteLimit
* The new write limit (in bytes) * The new write limit (in bytes)
@ -178,6 +245,11 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
/** /**
* Change the underlying limitations. * Change the underlying limitations.
* <p>Note the change will be taken as best effort, meaning
* that all already scheduled traffics will not be
* changed, but only applied to new traffics.</p>
* <p>So the expected usage of this method is to be used not too often,
* accordingly to the traffic shaping configuration.</p>
* *
* @param newWriteLimit * @param newWriteLimit
* The new write limit (in bytes) * The new write limit (in bytes)
@ -188,7 +260,7 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
writeLimit = newWriteLimit; writeLimit = newWriteLimit;
readLimit = newReadLimit; readLimit = newReadLimit;
if (trafficCounter != null) { if (trafficCounter != null) {
trafficCounter.resetAccounting(System.currentTimeMillis() + 1); trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
} }
} }
@ -213,12 +285,18 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
} }
/** /**
* <p>Note the change will be taken as best effort, meaning
* that all already scheduled traffics will not be
* changed, but only applied to new traffics.</p>
* <p>So the expected usage of this method is to be used not too often,
* accordingly to the traffic shaping configuration.</p>
*
* @param writeLimit the writeLimit to set * @param writeLimit the writeLimit to set
*/ */
public void setWriteLimit(long writeLimit) { public void setWriteLimit(long writeLimit) {
this.writeLimit = writeLimit; this.writeLimit = writeLimit;
if (trafficCounter != null) { if (trafficCounter != null) {
trafficCounter.resetAccounting(System.currentTimeMillis() + 1); trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
} }
} }
@ -230,12 +308,18 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
} }
/** /**
* <p>Note the change will be taken as best effort, meaning
* that all already scheduled traffics will not be
* changed, but only applied to new traffics.</p>
* <p>So the expected usage of this method is to be used not too often,
* accordingly to the traffic shaping configuration.</p>
*
* @param readLimit the readLimit to set * @param readLimit the readLimit to set
*/ */
public void setReadLimit(long readLimit) { public void setReadLimit(long readLimit) {
this.readLimit = readLimit; this.readLimit = readLimit;
if (trafficCounter != null) { if (trafficCounter != null) {
trafficCounter.resetAccounting(System.currentTimeMillis() + 1); trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
} }
} }
@ -247,7 +331,7 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
} }
/** /**
* @param checkInterval the checkInterval to set * @param checkInterval the interval in ms between each step check to set, default value beeing 1000 ms.
*/ */
public void setCheckInterval(long checkInterval) { public void setCheckInterval(long checkInterval) {
this.checkInterval = checkInterval; this.checkInterval = checkInterval;
@ -257,21 +341,77 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
} }
/** /**
* <p>Note the change will be taken as best effort, meaning
* that all already scheduled traffics will not be
* changed, but only applied to new traffics.</p>
* <p>So the expected usage of this method is to be used not too often,
* accordingly to the traffic shaping configuration.</p>
* *
* @param maxTime * @param maxTime
* Max delay in wait, shall be less than TIME OUT in related protocol * Max delay in wait, shall be less than TIME OUT in related protocol.
* Must be positive.
*/ */
public void setMaxTimeWait(long maxTime) { public void setMaxTimeWait(long maxTime) {
if (maxTime <= 0) {
throw new IllegalArgumentException("maxTime must be positive");
}
this.maxTime = maxTime; this.maxTime = maxTime;
} }
/** /**
* @return the max delay in wait * @return the max delay in wait to prevent TIME OUT
*/ */
public long getMaxTimeWait() { public long getMaxTimeWait() {
return maxTime; return maxTime;
} }
/**
* @return the maxWriteDelay
*/
public long getMaxWriteDelay() {
return maxWriteDelay;
}
/**
* <p>Note the change will be taken as best effort, meaning
* that all already scheduled traffics will not be
* changed, but only applied to new traffics.</p>
* <p>So the expected usage of this method is to be used not too often,
* accordingly to the traffic shaping configuration.</p>
*
* @param maxWriteDelay the maximum Write Delay in ms in the buffer allowed before write suspension is set.
* Must be positive.
*/
public void setMaxWriteDelay(long maxWriteDelay) {
if (maxWriteDelay <= 0) {
throw new IllegalArgumentException("maxWriteDelay must be positive");
}
this.maxWriteDelay = maxWriteDelay;
}
/**
* @return the maxWriteSize default being {@value #DEFAULT_MAX_SIZE} bytes.
*/
public long getMaxWriteSize() {
return maxWriteSize;
}
/**
* <p>Note that this limit is a best effort on memory limitation to prevent Out Of
* Memory Exception. To ensure it works, the handler generating the write should
* use one of the way provided by Netty to handle the capacity:</p>
* <p>- the <code>Channel.isWritable()</code> property and the corresponding
* <code>channelWritabilityChanged()</code></p>
* <p>- the <code>ChannelFuture.addListener(new GenericFutureListener())</code></p>
*
* @param maxWriteSize the maximum Write Size allowed in the buffer
* per channel before write suspended is set,
* default being {@value #DEFAULT_MAX_SIZE} bytes.
*/
public void setMaxWriteSize(long maxWriteSize) {
this.maxWriteSize = maxWriteSize;
}
/** /**
* Called each time the accounting is computed from the TrafficCounters. * Called each time the accounting is computed from the TrafficCounters.
* This method could be used for instance to implement almost real time accounting. * This method could be used for instance to implement almost real time accounting.
@ -286,7 +426,7 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
/** /**
* Class to implement setReadable at fix time * Class to implement setReadable at fix time
*/ */
private static final class ReopenReadTimerTask implements Runnable { static final class ReopenReadTimerTask implements Runnable {
final ChannelHandlerContext ctx; final ChannelHandlerContext ctx;
ReopenReadTimerTask(ChannelHandlerContext ctx) { ReopenReadTimerTask(ChannelHandlerContext ctx) {
@ -294,55 +434,63 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
} }
public void run() { public void run() {
if (!ctx.channel().config().isAutoRead() && isHandlerActive(ctx)) { ChannelConfig config = ctx.channel().config();
if (!config.isAutoRead() && isHandlerActive(ctx)) {
// If AutoRead is False and Active is True, user make a direct setAutoRead(false) // If AutoRead is False and Active is True, user make a direct setAutoRead(false)
// Then Just reset the status // Then Just reset the status
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Channel:" + ctx.channel().hashCode() + logger.debug("Not unsuspend: " + config.isAutoRead() + ":" +
" Not Unsuspend: " + ctx.channel().config().isAutoRead() + ":" + isHandlerActive(ctx)); isHandlerActive(ctx));
} }
ctx.attr(READ_SUSPENDED).set(false); ctx.attr(READ_SUSPENDED).set(false);
} else { } else {
// Anything else allows the handler to reset the AutoRead // Anything else allows the handler to reset the AutoRead
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
if (ctx.channel().config().isAutoRead() && !isHandlerActive(ctx)) { if (config.isAutoRead() && !isHandlerActive(ctx)) {
logger.debug("Channel:" + ctx.channel().hashCode() + logger.debug("Unsuspend: " + config.isAutoRead() + ":" +
" Unsuspend: " + ctx.channel().config().isAutoRead() + ":" + isHandlerActive(ctx)); isHandlerActive(ctx));
} else { } else {
logger.debug("Channel:" + ctx.channel().hashCode() + logger.debug("Normal unsuspend: " + config.isAutoRead() + ":"
" Normal Unsuspend: " + ctx.channel().config().isAutoRead() + ":"
+ isHandlerActive(ctx)); + isHandlerActive(ctx));
} }
} }
ctx.attr(READ_SUSPENDED).set(false); ctx.attr(READ_SUSPENDED).set(false);
ctx.channel().config().setAutoRead(true); config.setAutoRead(true);
ctx.channel().read(); ctx.channel().read();
} }
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Channel:" + ctx.channel().hashCode() + logger.debug("Unsupsend final status => " + config.isAutoRead() + ":"
" Unsupsend final status => " + ctx.channel().config().isAutoRead() + ":"
+ isHandlerActive(ctx)); + isHandlerActive(ctx));
} }
} }
} }
/**
* Release the Read suspension
*/
void releaseReadSuspended(ChannelHandlerContext ctx) {
ctx.attr(READ_SUSPENDED).set(false);
ctx.channel().config().setAutoRead(true);
}
@Override @Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
long size = calculateSize(msg); long size = calculateSize(msg);
long now = TrafficCounter.milliSecondFromNano();
if (size > 0 && trafficCounter != null) { if (size > 0) {
// compute the number of ms to wait before reopening the channel // compute the number of ms to wait before reopening the channel
long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime); long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime, now);
wait = checkWaitReadTime(ctx, wait, now);
if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
// time in order to try to limit the traffic // time in order to try to limit the traffic
// Only AutoRead AND HandlerActive True means Context Active // Only AutoRead AND HandlerActive True means Context Active
ChannelConfig config = ctx.channel().config();
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Channel:" + ctx.channel().hashCode() + logger.debug("Read suspend: " + wait + ":" + config.isAutoRead() + ":"
" Read Suspend: " + wait + ":" + ctx.channel().config().isAutoRead() + ":"
+ isHandlerActive(ctx)); + isHandlerActive(ctx));
} }
if (ctx.channel().config().isAutoRead() && isHandlerActive(ctx)) { if (config.isAutoRead() && isHandlerActive(ctx)) {
ctx.channel().config().setAutoRead(false); config.setAutoRead(false);
ctx.attr(READ_SUSPENDED).set(true); ctx.attr(READ_SUSPENDED).set(true);
// Create a Runnable to reactive the read if needed. If one was create before it will just be // Create a Runnable to reactive the read if needed. If one was create before it will just be
// reused to limit object creation // reused to limit object creation
@ -354,17 +502,35 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
} }
ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS); ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Channel:" + ctx.channel().hashCode() + logger.debug("Suspend final status => " + config.isAutoRead() + ":"
" Suspend final status => " + ctx.channel().config().isAutoRead() + ":" + isHandlerActive(ctx) + " will reopened at: " + wait);
+ isHandlerActive(ctx) +
" will reopened at: " + wait);
} }
} }
} }
} }
informReadOperation(ctx, now);
ctx.fireChannelRead(msg); ctx.fireChannelRead(msg);
} }
/**
* Method overridden in GTSH to take into account specific timer for the channel.
* @param wait the wait delay computed in ms
* @param now the relative now time in ms
* @return the wait to use according to the context
*/
long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
// no change by default
return wait;
}
/**
* Method overridden in GTSH to take into account specific timer for the channel.
* @param now the relative now time in ms
*/
void informReadOperation(final ChannelHandlerContext ctx, final long now) {
// default noop
}
protected static boolean isHandlerActive(ChannelHandlerContext ctx) { protected static boolean isHandlerActive(ChannelHandlerContext ctx) {
Boolean suspended = ctx.attr(READ_SUSPENDED).get(); Boolean suspended = ctx.attr(READ_SUSPENDED).get();
return suspended == null || Boolean.FALSE.equals(suspended); return suspended == null || Boolean.FALSE.equals(suspended);
@ -382,36 +548,65 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
throws Exception { throws Exception {
long size = calculateSize(msg); long size = calculateSize(msg);
long now = TrafficCounter.milliSecondFromNano();
if (size > 0 && trafficCounter != null) { if (size > 0) {
// compute the number of ms to wait before continue with the channel // compute the number of ms to wait before continue with the channel
long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime); long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
if (wait >= MINIMAL_WAIT) { if (wait >= MINIMAL_WAIT) {
/*
* Option 2: but issue with ctx.executor().schedule()
* Thread.sleep(wait);
* System.out.println("Write unsuspended");
* Option 1: use an ordered list of messages to send
* Warning of memory pressure!
*/
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Channel:" + ctx.channel().hashCode() + logger.debug("Write suspend: " + wait + ":" + ctx.channel().config().isAutoRead() + ":"
" Write suspend: " + wait + ":" + ctx.channel().config().isAutoRead() + ":"
+ isHandlerActive(ctx)); + isHandlerActive(ctx));
} }
submitWrite(ctx, msg, wait, promise); submitWrite(ctx, msg, size, wait, now, promise);
return; return;
} }
} }
// to keep message order if not using option 2 // to maintain order of write
submitWrite(ctx, msg, 0, promise); submitWrite(ctx, msg, size, 0, now, promise);
} }
protected abstract void submitWrite(final ChannelHandlerContext ctx, final Object msg, final long delay, @Deprecated
final ChannelPromise promise); protected void submitWrite(final ChannelHandlerContext ctx, final Object msg,
final long delay, final ChannelPromise promise) {
submitWrite(ctx, msg, calculateSize(msg),
delay, TrafficCounter.milliSecondFromNano(), promise);
}
abstract void submitWrite(final ChannelHandlerContext ctx, final Object msg, final long size,
final long delay, final long now, final ChannelPromise promise);
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
setUserDefinedWritability(ctx, true);
super.channelRegistered(ctx);
}
void setUserDefinedWritability(ChannelHandlerContext ctx, boolean writable) {
ChannelOutboundBuffer cob = ctx.channel().unsafe().outboundBuffer();
if (cob != null) {
cob.setUserDefinedWritability(userDefinedWritabilityIndex, writable);
}
}
/**
* Check the writability according to delay and size for the channel.
* Set if necessary setUserDefinedWritability status.
* @param delay the computed delay
* @param queueSize the current queueSize
*/
void checkWriteSuspend(ChannelHandlerContext ctx, long delay, long queueSize) {
if (queueSize > maxWriteSize || delay > maxWriteDelay) {
setUserDefinedWritability(ctx, false);
}
}
/**
* Explicitly release the Write suspended status.
*/
void releaseWriteSuspended(ChannelHandlerContext ctx) {
setUserDefinedWritability(ctx, true);
}
/** /**
*
* @return the current TrafficCounter (if * @return the current TrafficCounter (if
* channel is still connected) * channel is still connected)
*/ */
@ -421,9 +616,19 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
@Override @Override
public String toString() { public String toString() {
return "TrafficShaping with Write Limit: " + writeLimit + " Read Limit: " + readLimit + StringBuilder builder = new StringBuilder(290)
" CheckInterval: " + checkInterval + " and Counter: " .append("TrafficShaping with Write Limit: ").append(writeLimit)
+ (trafficCounter != null ? trafficCounter.toString() : "none"); .append(" Read Limit: ").append(readLimit)
.append(" CheckInterval: ").append(checkInterval)
.append(" maxDelay: ").append(maxWriteDelay)
.append(" maxSize: ").append(maxWriteSize)
.append(" and Counter: ");
if (trafficCounter != null) {
builder.append(trafficCounter.toString());
} else {
builder.append("none");
}
return builder.toString();
} }
/** /**
@ -432,7 +637,7 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
* This implementation supports {@link ByteBuf} and {@link ByteBufHolder}. Sub-classes may override this. * This implementation supports {@link ByteBuf} and {@link ByteBufHolder}. Sub-classes may override this.
* *
* @param msg * @param msg
* the msg for which the size should be calculated * the msg for which the size should be calculated.
* @return size the size of the msg or {@code -1} if unknown. * @return size the size of the msg or {@code -1} if unknown.
*/ */
protected long calculateSize(Object msg) { protected long calculateSize(Object msg) {

View File

@ -15,8 +15,7 @@
*/ */
package io.netty.handler.traffic; package io.netty.handler.traffic;
import java.util.LinkedList; import java.util.ArrayDeque;
import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
@ -24,37 +23,51 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
/** /**
* This implementation of the {@link AbstractTrafficShapingHandler} is for channel * <p>This implementation of the {@link AbstractTrafficShapingHandler} is for channel
* traffic shaping, that is to say a per channel limitation of the bandwidth.<br><br> * traffic shaping, that is to say a per channel limitation of the bandwidth.</p>
* <p>Note the index used in <code>OutboundBuffer.setUserDefinedWritability(index, boolean)</code> is <b>1</b>.</p>
* *
* The general use should be as follow:<br> * <p>The general use should be as follow:</p>
* <ul> * <ul>
* <li>Add in your pipeline a new ChannelTrafficShapingHandler.<br> * <li><p>Add in your pipeline a new ChannelTrafficShapingHandler.</p>
* <tt>ChannelTrafficShapingHandler myHandler = new ChannelTrafficShapingHandler();</tt><br> * <p><tt>ChannelTrafficShapingHandler myHandler = new ChannelTrafficShapingHandler();</tt></p>
* <tt>pipeline.addLast(myHandler);</tt><br><br> * <p><tt>pipeline.addLast(myHandler);</tt></p>
* *
* <b>Note that this handler has a Pipeline Coverage of "one" which means a new handler must be created * <p><b>Note that this handler has a Pipeline Coverage of "one" which means a new handler must be created
* for each new channel as the counter cannot be shared among all channels.</b>.<br><br> * for each new channel as the counter cannot be shared among all channels.</b>.</p>
* *
* Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation) * <p>Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation)
* or the check interval (in millisecond) that represents the delay between two computations of the * or the check interval (in millisecond) that represents the delay between two computations of the
* bandwidth and so the call back of the doAccounting method (0 means no accounting at all).<br><br> * bandwidth and so the call back of the doAccounting method (0 means no accounting at all).</p>
* *
* A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting, * <p>A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting,
* it is recommended to set a positive value, even if it is high since the precision of the * it is recommended to set a positive value, even if it is high since the precision of the
* Traffic Shaping depends on the period where the traffic is computed. The highest the interval, * Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
* the less precise the traffic shaping will be. It is suggested as higher value something close * the less precise the traffic shaping will be. It is suggested as higher value something close
* to 5 or 10 minutes.<br><br> * to 5 or 10 minutes.</p>
* *
* maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.<br> * <p>maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.</p>
* </li> * </li>
* </ul><br> * <li>In your handler, you should consider to use the <code>channel.isWritable()</code> and
* <code>channelWritabilityChanged(ctx)</code> to handle writability, or through
* <code>future.addListener(new GenericFutureListener())</code> on the future returned by
* <code>ctx.write()</code>.</li>
* <li><p>You shall also consider to have object size in read or write operations relatively adapted to
* the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
* while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.</p></li>
* <li><p>Some configuration methods will be taken as best effort, meaning
* that all already scheduled traffics will not be
* changed, but only applied to new traffics.</p>
* <p>So the expected usage of those methods are to be used not too often,
* accordingly to the traffic shaping configuration.</p></li>
* </ul>
*/ */
public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler { public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler {
private List<ToSend> messagesQueue = new LinkedList<ToSend>(); private final ArrayDeque<ToSend> messagesQueue = new ArrayDeque<ToSend>();
private long queueSize;
/** /**
* Create a new instance * Create a new instance.
* *
* @param writeLimit * @param writeLimit
* 0 or a limit in bytes/s * 0 or a limit in bytes/s
@ -62,9 +75,9 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler
* 0 or a limit in bytes/s * 0 or a limit in bytes/s
* @param checkInterval * @param checkInterval
* The delay between two computations of performances for * The delay between two computations of performances for
* channels or 0 if no stats are to be computed * channels or 0 if no stats are to be computed.
* @param maxTime * @param maxTime
* The maximum delay to wait in case of traffic excess * The maximum delay to wait in case of traffic excess.
*/ */
public ChannelTrafficShapingHandler(long writeLimit, long readLimit, public ChannelTrafficShapingHandler(long writeLimit, long readLimit,
long checkInterval, long maxTime) { long checkInterval, long maxTime) {
@ -72,7 +85,8 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler
} }
/** /**
* Create a new instance * Create a new instance using default
* max time as delay allowed value of 15000 ms.
* *
* @param writeLimit * @param writeLimit
* 0 or a limit in bytes/s * 0 or a limit in bytes/s
@ -80,7 +94,7 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler
* 0 or a limit in bytes/s * 0 or a limit in bytes/s
* @param checkInterval * @param checkInterval
* The delay between two computations of performances for * The delay between two computations of performances for
* channels or 0 if no stats are to be computed * channels or 0 if no stats are to be computed.
*/ */
public ChannelTrafficShapingHandler(long writeLimit, public ChannelTrafficShapingHandler(long writeLimit,
long readLimit, long checkInterval) { long readLimit, long checkInterval) {
@ -88,7 +102,8 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler
} }
/** /**
* Create a new instance * Create a new instance using default Check Interval value of 1000 ms and
* max time as delay allowed value of 15000 ms.
* *
* @param writeLimit * @param writeLimit
* 0 or a limit in bytes/s * 0 or a limit in bytes/s
@ -101,11 +116,12 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler
} }
/** /**
* Create a new instance * Create a new instance using
* default max time as delay allowed value of 15000 ms and no limit.
* *
* @param checkInterval * @param checkInterval
* The delay between two computations of performances for * The delay between two computations of performances for
* channels or 0 if no stats are to be computed * channels or 0 if no stats are to be computed.
*/ */
public ChannelTrafficShapingHandler(long checkInterval) { public ChannelTrafficShapingHandler(long checkInterval) {
super(checkInterval); super(checkInterval);
@ -121,58 +137,95 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler
} }
@Override @Override
public synchronized void handlerRemoved(ChannelHandlerContext ctx) throws Exception { public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
if (trafficCounter != null) { trafficCounter.stop();
trafficCounter.stop(); // write order control
} synchronized (this) {
for (ToSend toSend : messagesQueue) { if (ctx.channel().isActive()) {
if (toSend.toSend instanceof ByteBuf) { for (ToSend toSend : messagesQueue) {
((ByteBuf) toSend.toSend).release(); long size = calculateSize(toSend.toSend);
trafficCounter.bytesRealWriteFlowControl(size);
queueSize -= size;
ctx.write(toSend.toSend, toSend.promise);
}
} else {
for (ToSend toSend : messagesQueue) {
if (toSend.toSend instanceof ByteBuf) {
((ByteBuf) toSend.toSend).release();
}
}
} }
messagesQueue.clear();
} }
messagesQueue.clear(); releaseWriteSuspended(ctx);
releaseReadSuspended(ctx);
super.handlerRemoved(ctx); super.handlerRemoved(ctx);
} }
private static final class ToSend { private static final class ToSend {
final long date; final long relativeTimeAction;
final Object toSend; final Object toSend;
final ChannelPromise promise; final ChannelPromise promise;
private ToSend(final long delay, final Object toSend, final ChannelPromise promise) { private ToSend(final long delay, final Object toSend, final ChannelPromise promise) {
this.date = System.currentTimeMillis() + delay; this.relativeTimeAction = delay;
this.toSend = toSend; this.toSend = toSend;
this.promise = promise; this.promise = promise;
} }
} }
@Override @Override
protected synchronized void submitWrite(final ChannelHandlerContext ctx, final Object msg, final long delay, void submitWrite(final ChannelHandlerContext ctx, final Object msg,
final long size, final long delay, final long now,
final ChannelPromise promise) { final ChannelPromise promise) {
if (delay == 0 && messagesQueue.isEmpty()) { final ToSend newToSend;
ctx.write(msg, promise); // write order control
return; synchronized (this) {
if (delay == 0 && messagesQueue.isEmpty()) {
trafficCounter.bytesRealWriteFlowControl(size);
ctx.write(msg, promise);
return;
}
newToSend = new ToSend(delay + now, msg, promise);
messagesQueue.addLast(newToSend);
queueSize += size;
checkWriteSuspend(ctx, delay, queueSize);
} }
final ToSend newToSend = new ToSend(delay, msg, promise); final long futureNow = newToSend.relativeTimeAction;
messagesQueue.add(newToSend);
ctx.executor().schedule(new Runnable() { ctx.executor().schedule(new Runnable() {
@Override @Override
public void run() { public void run() {
sendAllValid(ctx); sendAllValid(ctx, futureNow);
} }
}, delay, TimeUnit.MILLISECONDS); }, delay, TimeUnit.MILLISECONDS);
} }
private synchronized void sendAllValid(ChannelHandlerContext ctx) { private void sendAllValid(final ChannelHandlerContext ctx, final long now) {
while (!messagesQueue.isEmpty()) { // write order control
ToSend newToSend = messagesQueue.remove(0); synchronized (this) {
if (newToSend.date <= System.currentTimeMillis()) { ToSend newToSend = messagesQueue.pollFirst();
ctx.write(newToSend.toSend, newToSend.promise); for (; newToSend != null; newToSend = messagesQueue.pollFirst()) {
} else { if (newToSend.relativeTimeAction <= now) {
messagesQueue.add(0, newToSend); long size = calculateSize(newToSend.toSend);
break; trafficCounter.bytesRealWriteFlowControl(size);
queueSize -= size;
ctx.write(newToSend.toSend, newToSend.promise);
} else {
messagesQueue.addFirst(newToSend);
break;
}
}
if (messagesQueue.isEmpty()) {
releaseWriteSuspended(ctx);
} }
} }
ctx.flush(); ctx.flush();
} }
/**
* @return current size in bytes of the write buffer.
*/
public long queueSize() {
return queueSize;
}
} }

View File

@ -0,0 +1,127 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.traffic;
import io.netty.handler.traffic.GlobalChannelTrafficShapingHandler.PerChannel;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Version for {@link GlobalChannelTrafficShapingHandler}.
* This TrafficCounter is the Global one, and its special property is to directly handle
* other channel's TrafficCounters. In particular, there are no scheduler for those
* channel's TrafficCounters because it is managed by this one.
*/
public class GlobalChannelTrafficCounter extends TrafficCounter {
/**
* @param trafficShapingHandler the associated {@link GlobalChannelTrafficShapingHandler}.
* @param executor the underlying executor service for scheduling checks (both Global and per Channel).
* @param name the name given to this monitor.
* @param checkInterval the checkInterval in millisecond between two computations.
*/
public GlobalChannelTrafficCounter(GlobalChannelTrafficShapingHandler trafficShapingHandler,
ScheduledExecutorService executor, String name, long checkInterval) {
super(trafficShapingHandler, executor, name, checkInterval);
if (executor == null) {
throw new IllegalArgumentException("Executor must not be null");
}
}
/**
* Class to implement monitoring at fix delay.
* This version is Mixed in the way it mixes Global and Channel counters.
*/
private static class MixedTrafficMonitoringTask implements Runnable {
/**
* The associated TrafficShapingHandler
*/
private final GlobalChannelTrafficShapingHandler trafficShapingHandler1;
/**
* The associated TrafficCounter
*/
private final TrafficCounter counter;
/**
* @param trafficShapingHandler The parent handler to which this task needs to callback to for accounting.
* @param counter The parent TrafficCounter that we need to reset the statistics for.
*/
MixedTrafficMonitoringTask(
GlobalChannelTrafficShapingHandler trafficShapingHandler,
TrafficCounter counter) {
trafficShapingHandler1 = trafficShapingHandler;
this.counter = counter;
}
@Override
public void run() {
if (!counter.monitorActive) {
return;
}
long newLastTime = milliSecondFromNano();
counter.resetAccounting(newLastTime);
for (PerChannel perChannel : trafficShapingHandler1.channelQueues.values()) {
perChannel.channelTrafficCounter.resetAccounting(newLastTime);
}
trafficShapingHandler1.doAccounting(counter);
counter.scheduledFuture = counter.executor.schedule(this, counter.checkInterval.get(),
TimeUnit.MILLISECONDS);
}
}
/**
* Start the monitoring process.
*/
public synchronized void start() {
if (monitorActive) {
return;
}
lastTime.set(milliSecondFromNano());
long localCheckInterval = checkInterval.get();
if (localCheckInterval > 0) {
monitorActive = true;
monitor = new MixedTrafficMonitoringTask((GlobalChannelTrafficShapingHandler) trafficShapingHandler, this);
scheduledFuture =
executor.schedule(monitor, localCheckInterval, TimeUnit.MILLISECONDS);
}
}
/**
* Stop the monitoring process.
*/
public synchronized void stop() {
if (!monitorActive) {
return;
}
monitorActive = false;
resetAccounting(milliSecondFromNano());
trafficShapingHandler.doAccounting(this);
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
}
@Override
public void resetCumulativeTime() {
for (PerChannel perChannel :
((GlobalChannelTrafficShapingHandler) trafficShapingHandler).channelQueues.values()) {
perChannel.channelTrafficCounter.resetCumulativeTime();
}
super.resetCumulativeTime();
}
}

View File

@ -0,0 +1,773 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.traffic;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.Attribute;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.AbstractCollection;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* This implementation of the {@link AbstractTrafficShapingHandler} is for global
* and per channel traffic shaping, that is to say a global limitation of the bandwidth, whatever
* the number of opened channels and a per channel limitation of the bandwidth.<br><br>
* This version shall not be in the same pipeline than other TrafficShapingHandler.<br><br>
*
* The general use should be as follow:<br>
* <ul>
* <li>Create your unique GlobalChannelTrafficShapingHandler like:<br><br>
* <tt>GlobalChannelTrafficShapingHandler myHandler = new GlobalChannelTrafficShapingHandler(executor);</tt><br><br>
* The executor could be the underlying IO worker pool<br>
* <tt>pipeline.addLast(myHandler);</tt><br><br>
*
* <b>Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created
* and shared among all channels as the counter must be shared among all channels.</b><br><br>
*
* Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation)
* or the check interval (in millisecond) that represents the delay between two computations of the
* bandwidth and so the call back of the doAccounting method (0 means no accounting at all).<br>
* Note that as this is a fusion of both Global and Channel Traffic Shaping, limits are in 2 sets,
* respectively Global and Channel.<br><br>
*
* A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting,
* it is recommended to set a positive value, even if it is high since the precision of the
* Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
* the less precise the traffic shaping will be. It is suggested as higher value something close
* to 5 or 10 minutes.<br><br>
*
* maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.<br><br>
* </li>
* <li>In your handler, you should consider to use the <code>channel.isWritable()</code> and
* <code>channelWritabilityChanged(ctx)</code> to handle writability, or through
* <code>future.addListener(new GenericFutureListener())</code> on the future returned by
* <code>ctx.write()</code>.</li>
* <li>You shall also consider to have object size in read or write operations relatively adapted to
* the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
* while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.<br><br></li>
* <li>Some configuration methods will be taken as best effort, meaning
* that all already scheduled traffics will not be
* changed, but only applied to new traffics.<br>
* So the expected usage of those methods are to be used not too often,
* accordingly to the traffic shaping configuration.</li>
* </ul><br>
*
* Be sure to call {@link #release()} once this handler is not needed anymore to release all internal resources.
* This will not shutdown the {@link EventExecutor} as it may be shared, so you need to do this by your own.
*/
@Sharable
public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHandler {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(GlobalChannelTrafficShapingHandler.class);
/**
* All queues per channel
*/
final ConcurrentMap<Integer, PerChannel> channelQueues = PlatformDependent.newConcurrentHashMap();
/**
* Global queues size
*/
private final AtomicLong queuesSize = new AtomicLong();
/**
* Maximum cumulative writing bytes for one channel among all (as long as channels stay the same)
*/
private final AtomicLong cumulativeWrittenBytes = new AtomicLong();
/**
* Maximum cumulative read bytes for one channel among all (as long as channels stay the same)
*/
private final AtomicLong cumulativeReadBytes = new AtomicLong();
/**
* Max size in the list before proposing to stop writing new objects from next handlers
* for all channel (global)
*/
volatile long maxGlobalWriteSize = DEFAULT_MAX_SIZE * 100; // default 400MB
/**
* Limit in B/s to apply to write
*/
private volatile long writeChannelLimit;
/**
* Limit in B/s to apply to read
*/
private volatile long readChannelLimit;
private static final float DEFAULT_DEVIATION = 0.1F;
private static final float MAX_DEVIATION = 0.4F;
private static final float DEFAULT_SLOWDOWN = 0.4F;
private static final float DEFAULT_ACCELERATION = -0.1F;
private volatile float maxDeviation;
private volatile float accelerationFactor;
private volatile float slowDownFactor;
private volatile boolean readDeviationActive;
private volatile boolean writeDeviationActive;
static final class PerChannel {
ArrayDeque<ToSend> messagesQueue;
TrafficCounter channelTrafficCounter;
long queueSize;
long lastWriteTimestamp;
long lastReadTimestamp;
}
/**
* Create the global TrafficCounter
*/
void createGlobalTrafficCounter(ScheduledExecutorService executor) {
// Default
setMaxDeviation(DEFAULT_DEVIATION, DEFAULT_SLOWDOWN, DEFAULT_ACCELERATION);
if (executor == null) {
throw new IllegalArgumentException("Executor must not be null");
}
TrafficCounter tc = new GlobalChannelTrafficCounter(this, executor, "GlobalChannelTC", checkInterval);
setTrafficCounter(tc);
tc.start();
}
@Override
int userDefinedWritabilityIndex() {
return AbstractTrafficShapingHandler.GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
}
/**
* Create a new instance.
*
* @param executor
* the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
* @param writeGlobalLimit
* 0 or a limit in bytes/s
* @param readGlobalLimit
* 0 or a limit in bytes/s
* @param writeChannelLimit
* 0 or a limit in bytes/s
* @param readChannelLimit
* 0 or a limit in bytes/s
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed.
* @param maxTime
* The maximum delay to wait in case of traffic excess.
*/
public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor,
long writeGlobalLimit, long readGlobalLimit,
long writeChannelLimit, long readChannelLimit,
long checkInterval, long maxTime) {
super(writeGlobalLimit, readGlobalLimit, checkInterval, maxTime);
createGlobalTrafficCounter(executor);
this.writeChannelLimit = writeChannelLimit;
this.readChannelLimit = readChannelLimit;
}
/**
* Create a new instance.
*
* @param executor
* the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
* @param writeGlobalLimit
* 0 or a limit in bytes/s
* @param readGlobalLimit
* 0 or a limit in bytes/s
* @param writeChannelLimit
* 0 or a limit in bytes/s
* @param readChannelLimit
* 0 or a limit in bytes/s
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed.
*/
public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor,
long writeGlobalLimit, long readGlobalLimit,
long writeChannelLimit, long readChannelLimit,
long checkInterval) {
super(writeGlobalLimit, readGlobalLimit, checkInterval);
this.writeChannelLimit = writeChannelLimit;
this.readChannelLimit = readChannelLimit;
createGlobalTrafficCounter(executor);
}
/**
* Create a new instance.
*
* @param executor
* the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
* @param writeGlobalLimit
* 0 or a limit in bytes/s
* @param readGlobalLimit
* 0 or a limit in bytes/s
* @param writeChannelLimit
* 0 or a limit in bytes/s
* @param readChannelLimit
* 0 or a limit in bytes/s
*/
public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor,
long writeGlobalLimit, long readGlobalLimit,
long writeChannelLimit, long readChannelLimit) {
super(writeGlobalLimit, readGlobalLimit);
this.writeChannelLimit = writeChannelLimit;
this.readChannelLimit = readChannelLimit;
createGlobalTrafficCounter(executor);
}
/**
* Create a new instance.
*
* @param executor
* the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed.
*/
public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, long checkInterval) {
super(checkInterval);
createGlobalTrafficCounter(executor);
}
/**
* Create a new instance.
*
* @param executor
* the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
*/
public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor) {
createGlobalTrafficCounter(executor);
}
/**
* @return the current max deviation
*/
public float maxDeviation() {
return maxDeviation;
}
/**
* @return the current acceleration factor
*/
public float accelerationFactor() {
return accelerationFactor;
}
/**
* @return the current slow down factor
*/
public float slowDownFactor() {
return slowDownFactor;
}
/**
* @param maxDeviation
* the maximum deviation to allow during computation of average, default deviation
* being 0.1, so +/-10% of the desired bandwidth. Maximum being 0.4.
* @param slowDownFactor
* the factor set as +x% to the too fast client (minimal value being 0, meaning no
* slow down factor), default being 40% (0.4).
* @param accelerationFactor
* the factor set as -x% to the too slow client (maximal value being 0, meaning no
* acceleration factor), default being -10% (-0.1).
*/
public void setMaxDeviation(float maxDeviation, float slowDownFactor, float accelerationFactor) {
if (maxDeviation > MAX_DEVIATION) {
throw new IllegalArgumentException("maxDeviation must be <= " + MAX_DEVIATION);
}
if (slowDownFactor < 0) {
throw new IllegalArgumentException("slowDownFactor must be >= 0");
}
if (accelerationFactor > 0) {
throw new IllegalArgumentException("accelerationFactor must be <= 0");
}
this.maxDeviation = maxDeviation;
this.accelerationFactor = 1 + accelerationFactor;
this.slowDownFactor = 1 + slowDownFactor;
}
private void computeDeviationCumulativeBytes() {
// compute the maximum cumulativeXxxxBytes among still connected Channels
long maxWrittenBytes = 0;
long maxReadBytes = 0;
long minWrittenBytes = Long.MAX_VALUE;
long minReadBytes = Long.MAX_VALUE;
for (PerChannel perChannel : channelQueues.values()) {
long value = perChannel.channelTrafficCounter.cumulativeWrittenBytes();
if (maxWrittenBytes < value) {
maxWrittenBytes = value;
}
if (minWrittenBytes > value) {
minWrittenBytes = value;
}
value = perChannel.channelTrafficCounter.cumulativeReadBytes();
if (maxReadBytes < value) {
maxReadBytes = value;
}
if (minReadBytes > value) {
minReadBytes = value;
}
}
boolean multiple = channelQueues.size() > 1;
readDeviationActive = multiple && minReadBytes < maxReadBytes / 2;
writeDeviationActive = multiple && minWrittenBytes < maxWrittenBytes / 2;
cumulativeWrittenBytes.set(maxWrittenBytes);
cumulativeReadBytes.set(maxReadBytes);
}
@Override
protected void doAccounting(TrafficCounter counter) {
computeDeviationCumulativeBytes();
super.doAccounting(counter);
}
private long computeBalancedWait(float maxLocal, float maxGlobal, long wait) {
if (maxGlobal == 0) {
// no change
return wait;
}
float ratio = maxLocal / maxGlobal;
// if in the boundaries, same value
if (ratio > maxDeviation) {
if (ratio < 1 - maxDeviation) {
return wait;
} else {
ratio = slowDownFactor;
if (wait < MINIMAL_WAIT) {
wait = MINIMAL_WAIT;
}
}
} else {
ratio = accelerationFactor;
}
return (long) (wait * ratio);
}
/**
* @return the maxGlobalWriteSize
*/
public long getMaxGlobalWriteSize() {
return maxGlobalWriteSize;
}
/**
* Note the change will be taken as best effort, meaning
* that all already scheduled traffics will not be
* changed, but only applied to new traffics.<br>
* So the expected usage of this method is to be used not too often,
* accordingly to the traffic shaping configuration.
*
* @param maxGlobalWriteSize the maximum Global Write Size allowed in the buffer
* globally for all channels before write suspended is set.
*/
public void setMaxGlobalWriteSize(long maxGlobalWriteSize) {
if (maxGlobalWriteSize <= 0) {
throw new IllegalArgumentException("maxGlobalWriteSize must be positive");
}
this.maxGlobalWriteSize = maxGlobalWriteSize;
}
/**
* @return the global size of the buffers for all queues.
*/
public long queuesSize() {
return queuesSize.get();
}
/**
* @param newWriteLimit Channel write limit
* @param newReadLimit Channel read limit
*/
public void configureChannel(long newWriteLimit, long newReadLimit) {
writeChannelLimit = newWriteLimit;
readChannelLimit = newReadLimit;
long now = TrafficCounter.milliSecondFromNano();
for (PerChannel perChannel : channelQueues.values()) {
perChannel.channelTrafficCounter.resetAccounting(now);
}
}
/**
* @return Channel write limit
*/
public long getWriteChannelLimit() {
return writeChannelLimit;
}
/**
* @param writeLimit Channel write limit
*/
public void setWriteChannelLimit(long writeLimit) {
writeChannelLimit = writeLimit;
long now = TrafficCounter.milliSecondFromNano();
for (PerChannel perChannel : channelQueues.values()) {
perChannel.channelTrafficCounter.resetAccounting(now);
}
}
/**
* @return Channel read limit
*/
public long getReadChannelLimit() {
return readChannelLimit;
}
/**
* @param readLimit Channel read limit
*/
public void setReadChannelLimit(long readLimit) {
readChannelLimit = readLimit;
long now = TrafficCounter.milliSecondFromNano();
for (PerChannel perChannel : channelQueues.values()) {
perChannel.channelTrafficCounter.resetAccounting(now);
}
}
/**
* Release all internal resources of this instance.
*/
public final void release() {
trafficCounter.stop();
}
private PerChannel getOrSetPerChannel(ChannelHandlerContext ctx) {
// ensure creation is limited to one thread per channel
Channel channel = ctx.channel();
Integer key = channel.hashCode();
PerChannel perChannel = channelQueues.get(key);
if (perChannel == null) {
perChannel = new PerChannel();
perChannel.messagesQueue = new ArrayDeque<ToSend>();
// Don't start it since managed through the Global one
perChannel.channelTrafficCounter = new TrafficCounter(this, null, "ChannelTC" +
ctx.channel().hashCode(), checkInterval);
perChannel.queueSize = 0L;
perChannel.lastReadTimestamp = TrafficCounter.milliSecondFromNano();
perChannel.lastWriteTimestamp = perChannel.lastReadTimestamp;
channelQueues.put(key, perChannel);
}
return perChannel;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
getOrSetPerChannel(ctx);
trafficCounter.resetCumulativeTime();
super.handlerAdded(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
trafficCounter.resetCumulativeTime();
Channel channel = ctx.channel();
Integer key = channel.hashCode();
PerChannel perChannel = channelQueues.remove(key);
if (perChannel != null) {
// write operations need synchronization
synchronized (perChannel) {
if (channel.isActive()) {
for (ToSend toSend : perChannel.messagesQueue) {
long size = calculateSize(toSend.toSend);
trafficCounter.bytesRealWriteFlowControl(size);
perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size);
perChannel.queueSize -= size;
queuesSize.addAndGet(-size);
ctx.write(toSend.toSend, toSend.promise);
}
} else {
queuesSize.addAndGet(-perChannel.queueSize);
for (ToSend toSend : perChannel.messagesQueue) {
if (toSend.toSend instanceof ByteBuf) {
((ByteBuf) toSend.toSend).release();
}
}
}
perChannel.messagesQueue.clear();
}
}
releaseWriteSuspended(ctx);
releaseReadSuspended(ctx);
super.handlerRemoved(ctx);
}
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
long size = calculateSize(msg);
long now = TrafficCounter.milliSecondFromNano();
if (size > 0) {
// compute the number of ms to wait before reopening the channel
long waitGlobal = trafficCounter.readTimeToWait(size, getReadLimit(), maxTime, now);
Integer key = ctx.channel().hashCode();
PerChannel perChannel = channelQueues.get(key);
long wait = 0;
if (perChannel != null) {
wait = perChannel.channelTrafficCounter.readTimeToWait(size, readChannelLimit, maxTime, now);
if (readDeviationActive) {
// now try to balance between the channels
long maxLocalRead = 0;
maxLocalRead = perChannel.channelTrafficCounter.cumulativeReadBytes();
long maxGlobalRead = cumulativeReadBytes.get();
if (maxLocalRead <= 0) {
maxLocalRead = 0;
}
if (maxGlobalRead < maxLocalRead) {
maxGlobalRead = maxLocalRead;
}
wait = computeBalancedWait(maxLocalRead, maxGlobalRead, wait);
}
}
if (wait < waitGlobal) {
wait = waitGlobal;
}
wait = checkWaitReadTime(ctx, wait, now);
if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
// time in order to try to limit the traffic
// Only AutoRead AND HandlerActive True means Context Active
ChannelConfig config = ctx.channel().config();
if (logger.isDebugEnabled()) {
logger.debug("Read Suspend: " + wait + ":" + config.isAutoRead() + ":"
+ isHandlerActive(ctx));
}
if (config.isAutoRead() && isHandlerActive(ctx)) {
config.setAutoRead(false);
ctx.attr(READ_SUSPENDED).set(true);
// Create a Runnable to reactive the read if needed. If one was create before it will just be
// reused to limit object creation
Attribute<Runnable> attr = ctx.attr(REOPEN_TASK);
Runnable reopenTask = attr.get();
if (reopenTask == null) {
reopenTask = new ReopenReadTimerTask(ctx);
attr.set(reopenTask);
}
ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
if (logger.isDebugEnabled()) {
logger.debug("Suspend final status => " + config.isAutoRead() + ":"
+ isHandlerActive(ctx) + " will reopened at: " + wait);
}
}
}
}
informReadOperation(ctx, now);
ctx.fireChannelRead(msg);
}
@Override
protected long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
Integer key = ctx.channel().hashCode();
PerChannel perChannel = channelQueues.get(key);
if (perChannel != null) {
if (wait > maxTime && now + wait - perChannel.lastReadTimestamp > maxTime) {
wait = maxTime;
}
}
return wait;
}
@Override
protected void informReadOperation(final ChannelHandlerContext ctx, final long now) {
Integer key = ctx.channel().hashCode();
PerChannel perChannel = channelQueues.get(key);
if (perChannel != null) {
perChannel.lastReadTimestamp = now;
}
}
private static final class ToSend {
final long relativeTimeAction;
final Object toSend;
final ChannelPromise promise;
final long size;
private ToSend(final long delay, final Object toSend, final long size, final ChannelPromise promise) {
this.relativeTimeAction = delay;
this.toSend = toSend;
this.size = size;
this.promise = promise;
}
}
protected long maximumCumulativeWrittenBytes() {
return cumulativeWrittenBytes.get();
}
protected long maximumCumulativeReadBytes() {
return cumulativeReadBytes.get();
}
/**
* To allow for instance doAccounting to use the TrafficCounter per channel.
* @return the list of TrafficCounters that exists at the time of the call.
*/
public Collection<TrafficCounter> channelTrafficCounters() {
Collection<TrafficCounter> valueCollection = new AbstractCollection<TrafficCounter>() {
@Override
public Iterator<TrafficCounter> iterator() {
return new Iterator<TrafficCounter>() {
final Iterator<PerChannel> iter = channelQueues.values().iterator();
public boolean hasNext() {
return iter.hasNext();
}
public TrafficCounter next() {
return iter.next().channelTrafficCounter;
}
public void remove() {
throw new UnsupportedOperationException();
}
};
}
@Override
public int size() {
return channelQueues.size();
}
};
return valueCollection;
}
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
throws Exception {
long size = calculateSize(msg);
long now = TrafficCounter.milliSecondFromNano();
if (size > 0) {
// compute the number of ms to wait before continue with the channel
long waitGlobal = trafficCounter.writeTimeToWait(size, getWriteLimit(), maxTime, now);
Integer key = ctx.channel().hashCode();
PerChannel perChannel = channelQueues.get(key);
long wait = 0;
if (perChannel != null) {
wait = perChannel.channelTrafficCounter.writeTimeToWait(size, writeChannelLimit, maxTime, now);
if (writeDeviationActive) {
// now try to balance between the channels
long maxLocalWrite = 0;
maxLocalWrite = perChannel.channelTrafficCounter.cumulativeWrittenBytes();
long maxGlobalWrite = cumulativeWrittenBytes.get();
if (maxLocalWrite <= 0) {
maxLocalWrite = 0;
}
if (maxGlobalWrite < maxLocalWrite) {
maxGlobalWrite = maxLocalWrite;
}
wait = computeBalancedWait(maxLocalWrite, maxGlobalWrite, wait);
}
}
if (wait < waitGlobal) {
wait = waitGlobal;
}
if (wait >= MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
logger.debug("Write suspend: " + wait + ":" + ctx.channel().config().isAutoRead() + ":"
+ isHandlerActive(ctx));
}
submitWrite(ctx, msg, size, wait, now, promise);
return;
}
}
// to maintain order of write
submitWrite(ctx, msg, size, 0, now, promise);
}
@Override
protected void submitWrite(final ChannelHandlerContext ctx, final Object msg,
final long size, final long writedelay, final long now,
final ChannelPromise promise) {
Channel channel = ctx.channel();
Integer key = channel.hashCode();
PerChannel perChannel = channelQueues.get(key);
if (perChannel == null) {
// in case write occurs before handlerAdded is raized for this handler
// imply a synchronized only if needed
perChannel = getOrSetPerChannel(ctx);
}
final ToSend newToSend;
long delay = writedelay;
boolean globalSizeExceeded = false;
// write operations need synchronization
synchronized (perChannel) {
if (writedelay == 0 && perChannel.messagesQueue.isEmpty()) {
trafficCounter.bytesRealWriteFlowControl(size);
perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size);
ctx.write(msg, promise);
perChannel.lastWriteTimestamp = now;
return;
}
if (delay > maxTime && now + delay - perChannel.lastWriteTimestamp > maxTime) {
delay = maxTime;
}
newToSend = new ToSend(delay + now, msg, size, promise);
perChannel.messagesQueue.addLast(newToSend);
perChannel.queueSize += size;
queuesSize.addAndGet(size);
checkWriteSuspend(ctx, delay, perChannel.queueSize);
if (queuesSize.get() > maxGlobalWriteSize) {
globalSizeExceeded = true;
}
}
if (globalSizeExceeded) {
setUserDefinedWritability(ctx, false);
}
final long futureNow = newToSend.relativeTimeAction;
final PerChannel forSchedule = perChannel;
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
sendAllValid(ctx, forSchedule, futureNow);
}
}, delay, TimeUnit.MILLISECONDS);
}
private void sendAllValid(final ChannelHandlerContext ctx, final PerChannel perChannel, final long now) {
// write operations need synchronization
synchronized (perChannel) {
ToSend newToSend = perChannel.messagesQueue.pollFirst();
for (; newToSend != null; newToSend = perChannel.messagesQueue.pollFirst()) {
if (newToSend.relativeTimeAction <= now) {
long size = newToSend.size;
trafficCounter.bytesRealWriteFlowControl(size);
perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size);
perChannel.queueSize -= size;
queuesSize.addAndGet(-size);
ctx.write(newToSend.toSend, newToSend.promise);
perChannel.lastWriteTimestamp = now;
} else {
perChannel.messagesQueue.addFirst(newToSend);
break;
}
}
if (perChannel.messagesQueue.isEmpty()) {
releaseWriteSuspended(ctx);
}
}
ctx.flush();
}
@Override
public String toString() {
return new StringBuilder(340).append(super.toString())
.append(" Write Channel Limit: ").append(writeChannelLimit)
.append(" Read Channel Limit: ").append(readChannelLimit).toString();
}
}

View File

@ -16,82 +16,115 @@
package io.netty.handler.traffic; package io.netty.handler.traffic;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandler.Sharable;
import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.PlatformDependent;
import java.util.HashMap; import java.util.ArrayDeque;
import java.util.LinkedList; import java.util.concurrent.ConcurrentMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/** /**
* This implementation of the {@link AbstractTrafficShapingHandler} is for global * <p>This implementation of the {@link AbstractTrafficShapingHandler} is for global
* traffic shaping, that is to say a global limitation of the bandwidth, whatever * traffic shaping, that is to say a global limitation of the bandwidth, whatever
* the number of opened channels.<br><br> * the number of opened channels.</p>
* <p>Note the index used in <code>OutboundBuffer.setUserDefinedWritability(index, boolean)</code> is <b>2</b>.</p>
* *
* The general use should be as follow:<br> * <p>The general use should be as follow:</p>
* <ul> * <ul>
* <li>Create your unique GlobalTrafficShapingHandler like:<br><br> * <li><p>Create your unique GlobalTrafficShapingHandler like:</p>
* <tt>GlobalTrafficShapingHandler myHandler = new GlobalTrafficShapingHandler(executor);</tt><br><br> * <p><tt>GlobalTrafficShapingHandler myHandler = new GlobalTrafficShapingHandler(executor);</tt></p>
* The executor could be the underlying IO worker pool<br> * <p>The executor could be the underlying IO worker pool</p>
* <tt>pipeline.addLast(myHandler);</tt><br><br> * <p><tt>pipeline.addLast(myHandler);</tt></p>
* *
* <b>Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created * <p><b>Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created
* and shared among all channels as the counter must be shared among all channels.</b><br><br> * and shared among all channels as the counter must be shared among all channels.</b></p>
* *
* Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation) * <p>Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation)
* or the check interval (in millisecond) that represents the delay between two computations of the * or the check interval (in millisecond) that represents the delay between two computations of the
* bandwidth and so the call back of the doAccounting method (0 means no accounting at all).<br><br> * bandwidth and so the call back of the doAccounting method (0 means no accounting at all).</p>
* *
* A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting, * <p>A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting,
* it is recommended to set a positive value, even if it is high since the precision of the * it is recommended to set a positive value, even if it is high since the precision of the
* Traffic Shaping depends on the period where the traffic is computed. The highest the interval, * Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
* the less precise the traffic shaping will be. It is suggested as higher value something close * the less precise the traffic shaping will be. It is suggested as higher value something close
* to 5 or 10 minutes.<br><br> * to 5 or 10 minutes.</p>
* *
* maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.<br> * <p>maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.</p>
* </li> * </li>
* </ul><br> * <li>In your handler, you should consider to use the <code>channel.isWritable()</code> and
* <code>channelWritabilityChanged(ctx)</code> to handle writability, or through
* <code>future.addListener(new GenericFutureListener())</code> on the future returned by
* <code>ctx.write()</code>.</li>
* <li><p>You shall also consider to have object size in read or write operations relatively adapted to
* the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
* while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.</p></li>
* <li><p>Some configuration methods will be taken as best effort, meaning
* that all already scheduled traffics will not be
* changed, but only applied to new traffics.</p>
* So the expected usage of those methods are to be used not too often,
* accordingly to the traffic shaping configuration.</li>
* </ul>
* *
* Be sure to call {@link #release()} once this handler is not needed anymore to release all internal resources. * Be sure to call {@link #release()} once this handler is not needed anymore to release all internal resources.
* This will not shutdown the {@link EventExecutor} as it may be shared, so you need to do this by your own. * This will not shutdown the {@link EventExecutor} as it may be shared, so you need to do this by your own.
*/ */
@Sharable @Sharable
public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler { public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
private Map<Integer, List<ToSend>> messagesQueues = new HashMap<Integer, List<ToSend>>(); /**
* All queues per channel
*/
private final ConcurrentMap<Integer, PerChannel> channelQueues = PlatformDependent.newConcurrentHashMap();
/** /**
* Create the global TrafficCounter * Global queues size
*/
private final AtomicLong queuesSize = new AtomicLong();
/**
* Max size in the list before proposing to stop writing new objects from next handlers
* for all channel (global)
*/
long maxGlobalWriteSize = DEFAULT_MAX_SIZE * 100; // default 400MB
private static final class PerChannel {
ArrayDeque<ToSend> messagesQueue;
long queueSize;
long lastWriteTimestamp;
long lastReadTimestamp;
}
/**
* Create the global TrafficCounter.
*/ */
void createGlobalTrafficCounter(ScheduledExecutorService executor) { void createGlobalTrafficCounter(ScheduledExecutorService executor) {
if (executor == null) { if (executor == null) {
throw new NullPointerException("executor"); throw new NullPointerException("executor");
} }
TrafficCounter tc = new TrafficCounter(this, executor, "GlobalTC", TrafficCounter tc = new TrafficCounter(this, executor, "GlobalTC", checkInterval);
checkInterval);
setTrafficCounter(tc); setTrafficCounter(tc);
tc.start(); tc.start();
} }
/** /**
* Create a new instance * Create a new instance.
* *
* @param executor * @param executor
* the {@link ScheduledExecutorService} to use for the {@link TrafficCounter} * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
* @param writeLimit * @param writeLimit
* 0 or a limit in bytes/s * 0 or a limit in bytes/s
* @param readLimit * @param readLimit
* 0 or a limit in bytes/s * 0 or a limit in bytes/s
* @param checkInterval * @param checkInterval
* The delay between two computations of performances for * The delay between two computations of performances for
* channels or 0 if no stats are to be computed * channels or 0 if no stats are to be computed.
* @param maxTime * @param maxTime
* The maximum delay to wait in case of traffic excess * The maximum delay to wait in case of traffic excess.
*/ */
public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit, long readLimit, public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit, long readLimit,
long checkInterval, long maxTime) { long checkInterval, long maxTime) {
@ -100,17 +133,18 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
} }
/** /**
* Create a new instance * Create a new instance using
* default max time as delay allowed value of 15000 ms.
* *
* @param executor * @param executor
* the {@link ScheduledExecutorService} to use for the {@link TrafficCounter} * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
* @param writeLimit * @param writeLimit
* 0 or a limit in bytes/s * 0 or a limit in bytes/s
* @param readLimit * @param readLimit
* 0 or a limit in bytes/s * 0 or a limit in bytes/s
* @param checkInterval * @param checkInterval
* The delay between two computations of performances for * The delay between two computations of performances for
* channels or 0 if no stats are to be computed * channels or 0 if no stats are to be computed.
*/ */
public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit, public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit,
long readLimit, long checkInterval) { long readLimit, long checkInterval) {
@ -119,10 +153,11 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
} }
/** /**
* Create a new instance * Create a new instance using default Check Interval value of 1000 ms and
* default max time as delay allowed value of 15000 ms.
* *
* @param executor * @param executor
* the {@link ScheduledExecutorService} to use for the {@link TrafficCounter} * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
* @param writeLimit * @param writeLimit
* 0 or a limit in bytes/s * 0 or a limit in bytes/s
* @param readLimit * @param readLimit
@ -135,13 +170,14 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
} }
/** /**
* Create a new instance * Create a new instance using
* default max time as delay allowed value of 15000 ms and no limit.
* *
* @param executor * @param executor
* the {@link ScheduledExecutorService} to use for the {@link TrafficCounter} * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
* @param checkInterval * @param checkInterval
* The delay between two computations of performances for * The delay between two computations of performances for
* channels or 0 if no stats are to be computed * channels or 0 if no stats are to be computed.
*/ */
public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long checkInterval) { public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long checkInterval) {
super(checkInterval); super(checkInterval);
@ -149,91 +185,209 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
} }
/** /**
* Create a new instance * Create a new instance using default Check Interval value of 1000 ms and
* default max time as delay allowed value of 15000 ms and no limit.
* *
* @param executor * @param executor
* the {@link ScheduledExecutorService} to use for the {@link TrafficCounter} * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
*/ */
public GlobalTrafficShapingHandler(EventExecutor executor) { public GlobalTrafficShapingHandler(EventExecutor executor) {
super();
createGlobalTrafficCounter(executor); createGlobalTrafficCounter(executor);
} }
/** /**
* Release all internal resources of this instance * @return the maxGlobalWriteSize default value being 400 MB.
*/
public long getMaxGlobalWriteSize() {
return maxGlobalWriteSize;
}
/**
* Note the change will be taken as best effort, meaning
* that all already scheduled traffics will not be
* changed, but only applied to new traffics.<br>
* So the expected usage of this method is to be used not too often,
* accordingly to the traffic shaping configuration.
*
* @param maxGlobalWriteSize the maximum Global Write Size allowed in the buffer
* globally for all channels before write suspended is set,
* default value being 400 MB.
*/
public void setMaxGlobalWriteSize(long maxGlobalWriteSize) {
this.maxGlobalWriteSize = maxGlobalWriteSize;
}
/**
* @return the global size of the buffers for all queues.
*/
public long queuesSize() {
return queuesSize.get();
}
/**
* Release all internal resources of this instance.
*/ */
public final void release() { public final void release() {
if (trafficCounter != null) { trafficCounter.stop();
trafficCounter.stop(); }
private PerChannel getOrSetPerChannel(ChannelHandlerContext ctx) {
// ensure creation is limited to one thread per channel
Channel channel = ctx.channel();
Integer key = channel.hashCode();
PerChannel perChannel = channelQueues.get(key);
if (perChannel == null) {
perChannel = new PerChannel();
perChannel.messagesQueue = new ArrayDeque<ToSend>();
perChannel.queueSize = 0L;
perChannel.lastReadTimestamp = TrafficCounter.milliSecondFromNano();
perChannel.lastWriteTimestamp = perChannel.lastReadTimestamp;
channelQueues.put(key, perChannel);
} }
return perChannel;
} }
@Override @Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Integer key = ctx.channel().hashCode(); getOrSetPerChannel(ctx);
List<ToSend> mq = new LinkedList<ToSend>();
messagesQueues.put(key, mq);
super.handlerAdded(ctx); super.handlerAdded(ctx);
} }
@Override @Override
public synchronized void handlerRemoved(ChannelHandlerContext ctx) throws Exception { public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Integer key = ctx.channel().hashCode(); Channel channel = ctx.channel();
List<ToSend> mq = messagesQueues.remove(key); Integer key = channel.hashCode();
if (mq != null) { PerChannel perChannel = channelQueues.remove(key);
for (ToSend toSend : mq) { if (perChannel != null) {
if (toSend.toSend instanceof ByteBuf) { // write operations need synchronization
((ByteBuf) toSend.toSend).release(); synchronized (perChannel) {
if (channel.isActive()) {
for (ToSend toSend : perChannel.messagesQueue) {
long size = calculateSize(toSend.toSend);
trafficCounter.bytesRealWriteFlowControl(size);
perChannel.queueSize -= size;
queuesSize.addAndGet(-size);
ctx.write(toSend.toSend, toSend.promise);
}
} else {
queuesSize.addAndGet(-perChannel.queueSize);
for (ToSend toSend : perChannel.messagesQueue) {
if (toSend.toSend instanceof ByteBuf) {
((ByteBuf) toSend.toSend).release();
}
}
} }
perChannel.messagesQueue.clear();
} }
mq.clear();
} }
releaseWriteSuspended(ctx);
releaseReadSuspended(ctx);
super.handlerRemoved(ctx); super.handlerRemoved(ctx);
} }
@Override
long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
Integer key = ctx.channel().hashCode();
PerChannel perChannel = channelQueues.get(key);
if (perChannel != null) {
if (wait > maxTime && now + wait - perChannel.lastReadTimestamp > maxTime) {
wait = maxTime;
}
}
return wait;
}
@Override
void informReadOperation(final ChannelHandlerContext ctx, final long now) {
Integer key = ctx.channel().hashCode();
PerChannel perChannel = channelQueues.get(key);
if (perChannel != null) {
perChannel.lastReadTimestamp = now;
}
}
private static final class ToSend { private static final class ToSend {
final long date; final long relativeTimeAction;
final Object toSend; final Object toSend;
final long size;
final ChannelPromise promise; final ChannelPromise promise;
private ToSend(final long delay, final Object toSend, final ChannelPromise promise) { private ToSend(final long delay, final Object toSend, final long size, final ChannelPromise promise) {
this.date = System.currentTimeMillis() + delay; this.relativeTimeAction = delay;
this.toSend = toSend; this.toSend = toSend;
this.size = size;
this.promise = promise; this.promise = promise;
} }
} }
@Override @Override
protected synchronized void submitWrite(final ChannelHandlerContext ctx, final Object msg, final long delay, void submitWrite(final ChannelHandlerContext ctx, final Object msg,
final long size, final long writedelay, final long now,
final ChannelPromise promise) { final ChannelPromise promise) {
Integer key = ctx.channel().hashCode(); Channel channel = ctx.channel();
List<ToSend> messagesQueue = messagesQueues.get(key); Integer key = channel.hashCode();
if (delay == 0 && (messagesQueue == null || messagesQueue.isEmpty())) { PerChannel perChannel = channelQueues.get(key);
ctx.write(msg, promise); if (perChannel == null) {
return; // in case write occurs before handlerAdded is raized for this handler
// imply a synchronized only if needed
perChannel = getOrSetPerChannel(ctx);
} }
final ToSend newToSend = new ToSend(delay, msg, promise); final ToSend newToSend;
if (messagesQueue == null) { long delay = writedelay;
messagesQueue = new LinkedList<ToSend>(); boolean globalSizeExceeded = false;
messagesQueues.put(key, messagesQueue); // write operations need synchronization
synchronized (perChannel) {
if (writedelay == 0 && perChannel.messagesQueue.isEmpty()) {
trafficCounter.bytesRealWriteFlowControl(size);
ctx.write(msg, promise);
perChannel.lastWriteTimestamp = now;
return;
}
if (delay > maxTime && now + delay - perChannel.lastWriteTimestamp > maxTime) {
delay = maxTime;
}
newToSend = new ToSend(delay + now, msg, size, promise);
perChannel.messagesQueue.addLast(newToSend);
perChannel.queueSize += size;
queuesSize.addAndGet(size);
checkWriteSuspend(ctx, delay, perChannel.queueSize);
if (queuesSize.get() > maxGlobalWriteSize) {
globalSizeExceeded = true;
}
} }
messagesQueue.add(newToSend); if (globalSizeExceeded) {
final List<ToSend> mqfinal = messagesQueue; setUserDefinedWritability(ctx, false);
}
final long futureNow = newToSend.relativeTimeAction;
final PerChannel forSchedule = perChannel;
ctx.executor().schedule(new Runnable() { ctx.executor().schedule(new Runnable() {
@Override @Override
public void run() { public void run() {
sendAllValid(ctx, mqfinal); sendAllValid(ctx, forSchedule, futureNow);
} }
}, delay, TimeUnit.MILLISECONDS); }, delay, TimeUnit.MILLISECONDS);
} }
private synchronized void sendAllValid(final ChannelHandlerContext ctx, final List<ToSend> messagesQueue) { private void sendAllValid(final ChannelHandlerContext ctx, final PerChannel perChannel, final long now) {
while (!messagesQueue.isEmpty()) { // write operations need synchronization
ToSend newToSend = messagesQueue.remove(0); synchronized (perChannel) {
if (newToSend.date <= System.currentTimeMillis()) { ToSend newToSend = perChannel.messagesQueue.pollFirst();
ctx.write(newToSend.toSend, newToSend.promise); for (; newToSend != null; newToSend = perChannel.messagesQueue.pollFirst()) {
} else { if (newToSend.relativeTimeAction <= now) {
messagesQueue.add(0, newToSend); long size = newToSend.size;
break; trafficCounter.bytesRealWriteFlowControl(size);
perChannel.queueSize -= size;
queuesSize.addAndGet(-size);
ctx.write(newToSend.toSend, newToSend.promise);
perChannel.lastWriteTimestamp = now;
} else {
perChannel.messagesQueue.addFirst(newToSend);
break;
}
}
if (perChannel.messagesQueue.isEmpty()) {
releaseWriteSuspended(ctx);
} }
} }
ctx.flush(); ctx.flush();

View File

@ -18,7 +18,6 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
/** /**
@ -38,6 +37,13 @@ import java.util.concurrent.atomic.AtomicLong;
public class TrafficCounter { public class TrafficCounter {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(TrafficCounter.class); private static final InternalLogger logger = InternalLoggerFactory.getInstance(TrafficCounter.class);
/**
* @return the time in ms using nanoTime, so not real EPOCH time but elapsed time in ms.
*/
public static final long milliSecondFromNano() {
return System.nanoTime() / 1000000;
}
/** /**
* Current written bytes * Current written bytes
*/ */
@ -48,6 +54,16 @@ public class TrafficCounter {
*/ */
private final AtomicLong currentReadBytes = new AtomicLong(); private final AtomicLong currentReadBytes = new AtomicLong();
/**
* Last writing time during current check interval
*/
private long writingTime;
/**
* Last reading delay during current check interval
*/
private long readingTime;
/** /**
* Long life written bytes * Long life written bytes
*/ */
@ -59,7 +75,7 @@ public class TrafficCounter {
private final AtomicLong cumulativeReadBytes = new AtomicLong(); private final AtomicLong cumulativeReadBytes = new AtomicLong();
/** /**
* Last Time where cumulative bytes where reset to zero * Last Time where cumulative bytes where reset to zero: this time is a real EPOC time (informative only)
*/ */
private long lastCumulativeTime; private long lastCumulativeTime;
@ -76,37 +92,37 @@ public class TrafficCounter {
/** /**
* Last Time Check taken * Last Time Check taken
*/ */
private final AtomicLong lastTime = new AtomicLong(); final AtomicLong lastTime = new AtomicLong();
/** /**
* Last written bytes number during last check interval * Last written bytes number during last check interval
*/ */
private long lastWrittenBytes; private volatile long lastWrittenBytes;
/** /**
* Last read bytes number during last check interval * Last read bytes number during last check interval
*/ */
private long lastReadBytes; private volatile long lastReadBytes;
/** /**
* Last non 0 written bytes number during last check interval * Last future writing time during last check interval
*/ */
private long lastNonNullWrittenBytes; private volatile long lastWritingTime;
/** /**
* Last time written bytes with non 0 written bytes * Last reading time during last check interval
*/ */
private long lastNonNullWrittenTime; private volatile long lastReadingTime;
/** /**
* Last time read bytes with non 0 written bytes * Real written bytes
*/ */
private long lastNonNullReadTime; private final AtomicLong realWrittenBytes = new AtomicLong();
/** /**
* Last non 0 read bytes number during last check interval * Real writing bandwidth
*/ */
private long lastNonNullReadBytes; private long realWriteThroughput;
/** /**
* Delay between two captures * Delay between two captures
@ -123,25 +139,25 @@ public class TrafficCounter {
/** /**
* The associated TrafficShapingHandler * The associated TrafficShapingHandler
*/ */
private final AbstractTrafficShapingHandler trafficShapingHandler; final AbstractTrafficShapingHandler trafficShapingHandler;
/** /**
* Executor that will run the monitor * Executor that will run the monitor
*/ */
private final ScheduledExecutorService executor; final ScheduledExecutorService executor;
/** /**
* Monitor created once in start() * Monitor created once in start()
*/ */
private Runnable monitor; Runnable monitor;
/** /**
* used in stop() to cancel the timer * used in stop() to cancel the timer
*/ */
private volatile ScheduledFuture<?> scheduledFuture; volatile ScheduledFuture<?> scheduledFuture;
/** /**
* Is Monitor active * Is Monitor active
*/ */
final AtomicBoolean monitorActive = new AtomicBoolean(); volatile boolean monitorActive;
/** /**
* Class to implement monitoring at fix delay * Class to implement monitoring at fix delay
@ -160,9 +176,9 @@ public class TrafficCounter {
/** /**
* @param trafficShapingHandler * @param trafficShapingHandler
* The parent handler to which this task needs to callback to for accounting * The parent handler to which this task needs to callback to for accounting.
* @param counter * @param counter
* The parent TrafficCounter that we need to reset the statistics for * The parent TrafficCounter that we need to reset the statistics for.
*/ */
protected TrafficMonitoringTask(AbstractTrafficShapingHandler trafficShapingHandler, TrafficCounter counter) { protected TrafficMonitoringTask(AbstractTrafficShapingHandler trafficShapingHandler, TrafficCounter counter) {
trafficShapingHandler1 = trafficShapingHandler; trafficShapingHandler1 = trafficShapingHandler;
@ -171,11 +187,10 @@ public class TrafficCounter {
@Override @Override
public void run() { public void run() {
if (!counter.monitorActive.get()) { if (!counter.monitorActive) {
return; return;
} }
long endTime = System.currentTimeMillis(); counter.resetAccounting(milliSecondFromNano());
counter.resetAccounting(endTime);
if (trafficShapingHandler1 != null) { if (trafficShapingHandler1 != null) {
trafficShapingHandler1.doAccounting(counter); trafficShapingHandler1.doAccounting(counter);
} }
@ -185,29 +200,32 @@ public class TrafficCounter {
} }
/** /**
* Start the monitoring process * Start the monitoring process.
*/ */
public synchronized void start() { public synchronized void start() {
if (monitorActive.get()) { if (monitorActive) {
return; return;
} }
lastTime.set(System.currentTimeMillis()); lastTime.set(milliSecondFromNano());
if (checkInterval.get() > 0) { long localCheckInterval = checkInterval.get();
monitorActive.set(true); // if executor is null, it means it is piloted by a GlobalChannelTrafficCounter, so no executor
if (localCheckInterval > 0 && executor != null) {
monitorActive = true;
monitor = new TrafficMonitoringTask(trafficShapingHandler, this); monitor = new TrafficMonitoringTask(trafficShapingHandler, this);
scheduledFuture = executor.schedule(monitor, checkInterval.get(), TimeUnit.MILLISECONDS); scheduledFuture =
executor.schedule(monitor, localCheckInterval, TimeUnit.MILLISECONDS);
} }
} }
/** /**
* Stop the monitoring process * Stop the monitoring process.
*/ */
public synchronized void stop() { public synchronized void stop() {
if (!monitorActive.get()) { if (!monitorActive) {
return; return;
} }
monitorActive.set(false); monitorActive = false;
resetAccounting(System.currentTimeMillis()); resetAccounting(milliSecondFromNano());
if (trafficShapingHandler != null) { if (trafficShapingHandler != null) {
trafficShapingHandler.doAccounting(this); trafficShapingHandler.doAccounting(this);
} }
@ -217,10 +235,9 @@ public class TrafficCounter {
} }
/** /**
* Reset the accounting on Read and Write * Reset the accounting on Read and Write.
* *
* @param newLastTime * @param newLastTime the milliseconds unix timestamp that we should be considered up-to-date for.
* the millisecond unix timestamp that we should be considered up-to-date for
*/ */
synchronized void resetAccounting(long newLastTime) { synchronized void resetAccounting(long newLastTime) {
long interval = newLastTime - lastTime.getAndSet(newLastTime); long interval = newLastTime - lastTime.getAndSet(newLastTime);
@ -228,7 +245,7 @@ public class TrafficCounter {
// nothing to do // nothing to do
return; return;
} }
if (logger.isDebugEnabled() && interval > 2 * checkInterval()) { if (logger.isDebugEnabled() && (interval > checkInterval() << 1)) {
logger.debug("Acct schedule not ok: " + interval + " > 2*" + checkInterval() + " from " + name); logger.debug("Acct schedule not ok: " + interval + " > 2*" + checkInterval() + " from " + name);
} }
lastReadBytes = currentReadBytes.getAndSet(0); lastReadBytes = currentReadBytes.getAndSet(0);
@ -237,52 +254,55 @@ public class TrafficCounter {
// nb byte / checkInterval in ms * 1000 (1s) // nb byte / checkInterval in ms * 1000 (1s)
lastWriteThroughput = lastWrittenBytes * 1000 / interval; lastWriteThroughput = lastWrittenBytes * 1000 / interval;
// nb byte / checkInterval in ms * 1000 (1s) // nb byte / checkInterval in ms * 1000 (1s)
if (lastWrittenBytes > 0) { realWriteThroughput = realWrittenBytes.getAndSet(0) * 1000 / interval;
lastNonNullWrittenBytes = lastWrittenBytes; lastWritingTime = Math.max(lastWritingTime, writingTime);
lastNonNullWrittenTime = newLastTime; lastReadingTime = Math.max(lastReadingTime, readingTime);
}
if (lastReadBytes > 0) {
lastNonNullReadBytes = lastReadBytes;
lastNonNullReadTime = newLastTime;
}
} }
/** /**
* Constructor with the {@link AbstractTrafficShapingHandler} that hosts it, the Timer to use, its * Constructor with the {@link AbstractTrafficShapingHandler} that hosts it, the Timer to use, its
* name, the checkInterval between two computations in millisecond * name, the checkInterval between two computations in millisecond.
* *
* @param trafficShapingHandler * @param trafficShapingHandler
* the associated AbstractTrafficShapingHandler * the associated AbstractTrafficShapingHandler.
* @param executor * @param executor
* the underlying executor service for scheduling checks * the underlying executor service for scheduling checks, might be null when used
* from {@link GlobalChannelTrafficCounter}.
* @param name * @param name
* the name given to this monitor * the name given to this monitor.
* @param checkInterval * @param checkInterval
* the checkInterval in millisecond between two computations * the checkInterval in millisecond between two computations.
*/ */
public TrafficCounter(AbstractTrafficShapingHandler trafficShapingHandler, ScheduledExecutorService executor, public TrafficCounter(AbstractTrafficShapingHandler trafficShapingHandler, ScheduledExecutorService executor,
String name, long checkInterval) { String name, long checkInterval) {
if (trafficShapingHandler == null) {
throw new IllegalArgumentException("TrafficShapingHandler must not be null");
}
this.trafficShapingHandler = trafficShapingHandler; this.trafficShapingHandler = trafficShapingHandler;
this.executor = executor; this.executor = executor;
this.name = name; this.name = name;
// absolute time: informative only
lastCumulativeTime = System.currentTimeMillis(); lastCumulativeTime = System.currentTimeMillis();
writingTime = milliSecondFromNano();
readingTime = writingTime;
lastWritingTime = writingTime;
lastReadingTime = writingTime;
configure(checkInterval); configure(checkInterval);
} }
/** /**
* Change checkInterval between two computations in millisecond * Change checkInterval between two computations in millisecond.
* *
* @param newcheckInterval * @param newcheckInterval
* The new check interval (in milliseconds) * The new check interval (in milliseconds)
*/ */
public void configure(long newcheckInterval) { public void configure(long newcheckInterval) {
long newInterval = newcheckInterval / 10 * 10; long newInterval = newcheckInterval / 10 * 10;
if (checkInterval.get() != newInterval) { if (checkInterval.getAndSet(newInterval) != newInterval) {
checkInterval.set(newInterval);
if (newInterval <= 0) { if (newInterval <= 0) {
stop(); stop();
// No more active monitoring // No more active monitoring
lastTime.set(System.currentTimeMillis()); lastTime.set(milliSecondFromNano());
} else { } else {
// Start if necessary // Start if necessary
start(); start();
@ -313,64 +333,69 @@ public class TrafficCounter {
} }
/** /**
* Computes counters for Real Write.
* *
* @param write
* the size in bytes to write
* @param schedule
* the time when this write was scheduled
*/
void bytesRealWriteFlowControl(long write) {
realWrittenBytes.addAndGet(write);
}
/**
* @return the current checkInterval between two computations of traffic counter * @return the current checkInterval between two computations of traffic counter
* in millisecond * in millisecond.
*/ */
public long checkInterval() { public long checkInterval() {
return checkInterval.get(); return checkInterval.get();
} }
/** /**
* * @return the Read Throughput in bytes/s computes in the last check interval.
* @return the Read Throughput in bytes/s computes in the last check interval
*/ */
public long lastReadThroughput() { public long lastReadThroughput() {
return lastReadThroughput; return lastReadThroughput;
} }
/** /**
* * @return the Write Throughput in bytes/s computes in the last check interval.
* @return the Write Throughput in bytes/s computes in the last check interval
*/ */
public long lastWriteThroughput() { public long lastWriteThroughput() {
return lastWriteThroughput; return lastWriteThroughput;
} }
/** /**
* * @return the number of bytes read during the last check Interval.
* @return the number of bytes read during the last check Interval
*/ */
public long lastReadBytes() { public long lastReadBytes() {
return lastReadBytes; return lastReadBytes;
} }
/** /**
* * @return the number of bytes written during the last check Interval.
* @return the number of bytes written during the last check Interval
*/ */
public long lastWrittenBytes() { public long lastWrittenBytes() {
return lastWrittenBytes; return lastWrittenBytes;
} }
/** /**
* * @return the current number of bytes read since the last checkInterval.
* @return the current number of bytes read since the last checkInterval
*/ */
public long currentReadBytes() { public long currentReadBytes() {
return currentReadBytes.get(); return currentReadBytes.get();
} }
/** /**
* * @return the current number of bytes written since the last check Interval.
* @return the current number of bytes written since the last check Interval
*/ */
public long currentWrittenBytes() { public long currentWrittenBytes() {
return currentWrittenBytes.get(); return currentWrittenBytes.get();
} }
/** /**
* @return the Time in millisecond of the last check as of System.currentTimeMillis() * @return the Time in millisecond of the last check as of System.currentTimeMillis().
*/ */
public long lastTime() { public long lastTime() {
return lastTime.get(); return lastTime.get();
@ -399,7 +424,22 @@ public class TrafficCounter {
} }
/** /**
* Reset both read and written cumulative bytes counters and the associated time. * @return the realWrittenBytes
*/
public AtomicLong getRealWrittenBytes() {
return realWrittenBytes;
}
/**
* @return the realWriteThroughput
*/
public long getRealWriteThroughput() {
return realWriteThroughput;
}
/**
* Reset both read and written cumulative bytes counters and the associated absolute time
* from System.currentTimeMillis().
*/ */
public void resetCumulativeTime() { public void resetCumulativeTime() {
lastCumulativeTime = System.currentTimeMillis(); lastCumulativeTime = System.currentTimeMillis();
@ -408,7 +448,7 @@ public class TrafficCounter {
} }
/** /**
* @return the name * @return the name of this TrafficCounter.
*/ */
public String name() { public String name() {
return name; return name;
@ -416,122 +456,162 @@ public class TrafficCounter {
/** /**
* Returns the time to wait (if any) for the given length message, using the given limitTraffic and the max wait * Returns the time to wait (if any) for the given length message, using the given limitTraffic and the max wait
* time * time.
*
* @param size
* the recv size
* @param limitTraffic
* the traffic limit in bytes per second.
* @param maxTime
* the max time in ms to wait in case of excess of traffic.
* @return the current time to wait (in ms) if needed for Read operation.
*/
@Deprecated
public long readTimeToWait(final long size, final long limitTraffic, final long maxTime) {
return readTimeToWait(size, limitTraffic, maxTime, milliSecondFromNano());
}
/**
* Returns the time to wait (if any) for the given length message, using the given limitTraffic and the max wait
* time.
* *
* @param size * @param size
* the recv size * the recv size
* @param limitTraffic * @param limitTraffic
* the traffic limit in bytes per second * the traffic limit in bytes per second
* @param maxTime * @param maxTime
* the max time in ms to wait in case of excess of traffic * the max time in ms to wait in case of excess of traffic.
* @return the current time to wait (in ms) if needed for Read operation * @param now the current time
* @return the current time to wait (in ms) if needed for Read operation.
*/ */
public synchronized long readTimeToWait(final long size, final long limitTraffic, final long maxTime) { public long readTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
final long now = System.currentTimeMillis();
bytesRecvFlowControl(size); bytesRecvFlowControl(size);
if (limitTraffic == 0) { if (size == 0 || limitTraffic == 0) {
return 0; return 0;
} }
final long lastTimeCheck = lastTime.get();
long sum = currentReadBytes.get(); long sum = currentReadBytes.get();
long interval = now - lastTime.get(); long localReadingTime = readingTime;
// Short time checking long lastRB = lastReadBytes;
if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT && sum > 0) { final long interval = now - lastTimeCheck;
long time = (sum * 1000 / limitTraffic - interval) / 10 * 10; long pastDelay = Math.max(lastReadingTime - lastTimeCheck, 0);
if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
// Enough interval time to compute shaping
long time = sum * 1000 / limitTraffic - interval + pastDelay;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) { if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Time: " + time + ":" + sum + ":" + interval); logger.debug("Time: " + time + ":" + sum + ":" + interval + ":" + pastDelay);
} }
return time > maxTime ? maxTime : time; if (time > maxTime && now + time - localReadingTime > maxTime) {
time = maxTime;
}
readingTime = Math.max(localReadingTime, now + time);
return time;
} }
readingTime = Math.max(localReadingTime, now);
return 0; return 0;
} }
// long time checking // take the last read interval check to get enough interval time
if (lastNonNullReadBytes > 0 && lastNonNullReadTime + AbstractTrafficShapingHandler.MINIMAL_WAIT < now) { long lastsum = sum + lastRB;
long lastsum = sum + lastNonNullReadBytes; long lastinterval = interval + checkInterval.get();
long lastinterval = now - lastNonNullReadTime; long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
long time = (lastsum * 1000 / limitTraffic - lastinterval) / 10 * 10; if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) { if (logger.isDebugEnabled()) {
if (logger.isDebugEnabled()) { logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval + ":" + pastDelay);
logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval);
}
return time > maxTime ? maxTime : time;
} }
} else { if (time > maxTime && now + time - localReadingTime > maxTime) {
// final "middle" time checking in case resetAccounting called very recently time = maxTime;
sum += lastReadBytes;
long lastinterval = AbstractTrafficShapingHandler.MINIMAL_WAIT;
long time = (sum * 1000 / limitTraffic - lastinterval) / 10 * 10;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
logger.debug("Time: " + time + ":" + sum + ":" + lastinterval);
}
return time > maxTime ? maxTime : time;
} }
readingTime = Math.max(localReadingTime, now + time);
return time;
} }
readingTime = Math.max(localReadingTime, now);
return 0; return 0;
} }
/** /**
* Returns the time to wait (if any) for the given length message, using the given limitTraffic and * Returns the time to wait (if any) for the given length message, using the given limitTraffic and
* the max wait time * the max wait time.
* *
* @param size * @param size
* the write size * the write size
* @param limitTraffic * @param limitTraffic
* the traffic limit in bytes per second * the traffic limit in bytes per second.
* @param maxTime * @param maxTime
* the max time in ms to wait in case of excess of traffic * the max time in ms to wait in case of excess of traffic.
* @return the current time to wait (in ms) if needed for Write operation * @return the current time to wait (in ms) if needed for Write operation.
*/ */
public synchronized long writeTimeToWait(final long size, final long limitTraffic, final long maxTime) { @Deprecated
bytesWriteFlowControl(size); public long writeTimeToWait(final long size, final long limitTraffic, final long maxTime) {
if (limitTraffic == 0) { return writeTimeToWait(size, limitTraffic, maxTime, milliSecondFromNano());
return 0;
}
long sum = currentWrittenBytes.get();
final long now = System.currentTimeMillis();
long interval = now - lastTime.get();
if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT && sum > 0) {
long time = (sum * 1000 / limitTraffic - interval) / 10 * 10;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
logger.debug("Time: " + time + ":" + sum + ":" + interval);
}
return time > maxTime ? maxTime : time;
}
return 0;
}
if (lastNonNullWrittenBytes > 0 && lastNonNullWrittenTime + AbstractTrafficShapingHandler.MINIMAL_WAIT < now) {
long lastsum = sum + lastNonNullWrittenBytes;
long lastinterval = now - lastNonNullWrittenTime;
long time = (lastsum * 1000 / limitTraffic - lastinterval) / 10 * 10;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval);
}
return time > maxTime ? maxTime : time;
}
} else {
sum += lastWrittenBytes;
long lastinterval = AbstractTrafficShapingHandler.MINIMAL_WAIT + Math.abs(interval);
long time = (sum * 1000 / limitTraffic - lastinterval) / 10 * 10;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
logger.debug("Time: " + time + ":" + sum + ":" + lastinterval);
}
return time > maxTime ? maxTime : time;
}
}
return 0;
} }
/** /**
* String information * Returns the time to wait (if any) for the given length message, using the given limitTraffic and
* the max wait time.
*
* @param size
* the write size
* @param limitTraffic
* the traffic limit in bytes per second.
* @param maxTime
* the max time in ms to wait in case of excess of traffic.
* @param now the current time
* @return the current time to wait (in ms) if needed for Write operation.
*/ */
public long writeTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
bytesWriteFlowControl(size);
if (size == 0 || limitTraffic == 0) {
return 0;
}
final long lastTimeCheck = lastTime.get();
long sum = currentWrittenBytes.get();
long lastWB = lastWrittenBytes;
long localWritingTime = writingTime;
long pastDelay = Math.max(lastWritingTime - lastTimeCheck, 0);
final long interval = now - lastTimeCheck;
if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
// Enough interval time to compute shaping
long time = sum * 1000 / limitTraffic - interval + pastDelay;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
logger.debug("Time: " + time + ":" + sum + ":" + interval + ":" + pastDelay);
}
if (time > maxTime && now + time - localWritingTime > maxTime) {
time = maxTime;
}
writingTime = Math.max(localWritingTime, now + time);
return time;
}
writingTime = Math.max(localWritingTime, now);
return 0;
}
// take the last write interval check to get enough interval time
long lastsum = sum + lastWB;
long lastinterval = interval + checkInterval.get();
long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval + ":" + pastDelay);
}
if (time > maxTime && now + time - localWritingTime > maxTime) {
time = maxTime;
}
writingTime = Math.max(localWritingTime, now + time);
return time;
}
writingTime = Math.max(localWritingTime, now);
return 0;
}
@Override @Override
public String toString() { public String toString() {
return "Monitor " + name + " Current Speed Read: " + (lastReadThroughput >> 10) + " KB/s, Write: " return new StringBuilder(165).append("Monitor ").append(name)
+ (lastWriteThroughput >> 10) + " KB/s Current Read: " + (currentReadBytes.get() >> 10) .append(" Current Speed Read: ").append(lastReadThroughput >> 10).append(" KB/s, ")
+ " KB Current Write: " + (currentWrittenBytes.get() >> 10) + " KB"; .append("Asked Write: ").append(lastWriteThroughput >> 10).append(" KB/s, ")
.append("Real Write: ").append(realWriteThroughput >> 10).append(" KB/s, ")
.append("Current Read: ").append(currentReadBytes.get() >> 10).append(" KB, ")
.append("Current asked Write: ").append(currentWrittenBytes.get() >> 10).append(" KB, ")
.append("Current real Write: ").append(realWrittenBytes.get() >> 10).append(" KB").toString();
} }
} }

View File

@ -24,6 +24,7 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.handler.traffic.AbstractTrafficShapingHandler; import io.netty.handler.traffic.AbstractTrafficShapingHandler;
import io.netty.handler.traffic.ChannelTrafficShapingHandler; import io.netty.handler.traffic.ChannelTrafficShapingHandler;
import io.netty.handler.traffic.GlobalTrafficShapingHandler; import io.netty.handler.traffic.GlobalTrafficShapingHandler;
import io.netty.handler.traffic.TrafficCounter;
import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.Promise;
@ -328,7 +329,7 @@ public class TrafficShapingHandlerTest extends AbstractSocketTest {
for (int i = 1; i < multipleMessage.length; i++) { for (int i = 1; i < multipleMessage.length; i++) {
totalNb += multipleMessage[i]; totalNb += multipleMessage[i];
} }
Long start = System.currentTimeMillis(); Long start = TrafficCounter.milliSecondFromNano();
int nb = multipleMessage[0]; int nb = multipleMessage[0];
for (int i = 0; i < nb; i++) { for (int i = 0; i < nb; i++) {
cc.write(cc.alloc().buffer().writeBytes(data)); cc.write(cc.alloc().buffer().writeBytes(data));
@ -336,7 +337,7 @@ public class TrafficShapingHandlerTest extends AbstractSocketTest {
cc.flush(); cc.flush();
promise.await(); promise.await();
Long stop = System.currentTimeMillis(); Long stop = TrafficCounter.milliSecondFromNano();
assertTrue("Error during exceution of TrafficShapping: " + promise.cause(), promise.isSuccess()); assertTrue("Error during exceution of TrafficShapping: " + promise.cause(), promise.isSuccess());
float average = (totalNb * messageSize) / (float) (stop - start); float average = (totalNb * messageSize) / (float) (stop - start);
@ -384,7 +385,7 @@ public class TrafficShapingHandlerTest extends AbstractSocketTest {
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
volatile int step; volatile int step;
// first message will always be validated // first message will always be validated
private long currentLastTime = System.currentTimeMillis(); private long currentLastTime = TrafficCounter.milliSecondFromNano();
private final long[] minimalWaitBetween; private final long[] minimalWaitBetween;
private final int[] multipleMessage; private final int[] multipleMessage;
private final int[] autoRead; private final int[] autoRead;
@ -472,7 +473,7 @@ public class TrafficShapingHandlerTest extends AbstractSocketTest {
int nb = actual.length / messageSize; int nb = actual.length / messageSize;
loggerServer.info("Step: " + step + " Read: " + nb + " blocks"); loggerServer.info("Step: " + step + " Read: " + nb + " blocks");
in.readBytes(actual); in.readBytes(actual);
long timestamp = System.currentTimeMillis(); long timestamp = TrafficCounter.milliSecondFromNano();
int isAutoRead = 0; int isAutoRead = 0;
int laststep = step; int laststep = step;
for (int i = 0; i < nb; i++) { for (int i = 0; i < nb; i++) {