Introduce user-defined writability to fix traffic shaping accuracy and efficiency

Motivation:

Several issues were shown by various ticket (#2900 #2956).
Also propose a similar improvement on writability user management than from #3036.
And finally add a mixte handler, both for Global and Channels, with
the advantages of being uniquely created and using less memory and
less shaping.

Issue #2900:

When a huge amount of data are written, the current behavior of the
TrafficShaping handler is to limit the delay to 15s, whatever the delay
the previous write has. This is wrong, and when a huge amount of writes
are done in a short time, the traffic is not correctly shapened.

Moreover, there is a high risk of OOM if one is not using in his/her own
handler for instance ChannelFuture.addListener() to handle the write
bufferisation in the TrafficShapingHandler.

This version includes "user-defined writability" capability added to the channel,
where writability could be managed, using isWritable() only from user handler side.

ChannelInterestChanged is also managed in order to provide compatibility
with other handlers as for instance ChunkedInput.

The "bandwidth" compute on write is only on "acquired" write orders, not
on "real" write orders, which is wrong from statistic point of view.

Issue #2956:

When using GlobalTrafficShaping, every write (and read) were
synchronized, thus leading to a drop of performance.
ChannelTrafficShaping is not touched by this issue since synchronzed is
then correct (handler is per channel, so the synchronized).

Modifications:

The current write delay computation takes into account the previous
write delay and time to check is the 15s delay (maxTime) is really
exceeded or not (using last scheduled write time). The algorithm is
simplified and in the same time more accurate.

A port of the "user-defined writability" as proposed in 4.X is included.

When the real write occurs, the statistics are update accordingly on a
new attribute (getRealWriteThroughput()).

To limit the synchronisations, all synchronized on
GlobalTrafficShapingHandler on submitWrite were removed. They are
replaced with a lock per channel (since synchronization is still needed
to prevent unordered write per channel), as in the sendAllValid method
for the very same reason.
Also all synchronized on TrafficCounter on read/writeTimeToWait() are
removed as they are unnecessary since already locked before by the
caller.
Still the creation and remove operations on lock per channel (PerChannel
object) are synchronized to prevent concurrency issue on this critical
part, but then limited using a ConcurrentHashMap.

Add a Mixte GlobalChannelTrafficShapingHandler which allows to use only one
handler for mixing Global and Channel TSH. I enables to save more memory and
tries to optimize the traffic among various channels.

Add 2 test class: AbstractChannelTest and NioChannelTest
Add test in reverse order in mixed mode

Result:

The traffic shaping is more stable, even with a huge number of writes in
short time by taking into consideration last scheduled write time.

The traffic shaping is more compatible with code as ChunkedWriteHandler,
adding a "user-defined writability".

The statistics are more valuable (asked write vs real write).

The Global TrafficShapingHandler should now have less "global"
synchronization, hoping to the minimum, but still per Channel as needed.

The GlobalChannelTrafficShapingHandler allows to have only one handler for
all channels while still offering per channel in addition to global traffic
shaping.
This commit is contained in:
Frederic Bregier 2014-10-04 22:47:29 +02:00 committed by Trustin Lee
parent 0a217c328b
commit 2127c8eafe
18 changed files with 2596 additions and 464 deletions

View File

@ -20,6 +20,7 @@ import org.jboss.netty.util.internal.ConcurrentHashMap;
import java.net.SocketAddress;
import java.util.Random;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
/**
* A skeletal {@link Channel} implementation.
@ -58,6 +59,14 @@ public abstract class AbstractChannel implements Channel {
private String strVal;
private volatile Object attachment;
private static final AtomicIntegerFieldUpdater<AbstractChannel> UNWRITABLE_UPDATER;
@SuppressWarnings("UnusedDeclaration")
private volatile int unwritable;
static {
UNWRITABLE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(AbstractChannel.class, "unwritable");
}
/**
* Creates a new instance.
*
@ -212,6 +221,14 @@ public abstract class AbstractChannel implements Channel {
}
public int getInterestOps() {
if (!isOpen()) {
return Channel.OP_WRITE;
}
int interestOps = getInternalInterestOps() & ~OP_WRITE;
if (!isWritable()) {
interestOps |= OP_WRITE;
}
return interestOps;
}
@ -219,21 +236,104 @@ public abstract class AbstractChannel implements Channel {
return Channels.setInterestOps(this, interestOps);
}
protected int getInternalInterestOps() {
return interestOps;
}
/**
* Sets the {@link #getInterestOps() interestOps} property of this channel
* immediately. This method is intended to be called by an internal
* component - please do not call it unless you know what you are doing.
*/
protected void setInterestOpsNow(int interestOps) {
protected void setInternalInterestOps(int interestOps) {
this.interestOps = interestOps;
}
public boolean isReadable() {
return (getInterestOps() & OP_READ) != 0;
return (getInternalInterestOps() & OP_READ) != 0;
}
public boolean isWritable() {
return (getInterestOps() & OP_WRITE) == 0;
return unwritable == 0;
}
public final boolean getUserDefinedWritability(int index) {
return (unwritable & writabilityMask(index)) == 0;
}
public final void setUserDefinedWritability(int index, boolean writable) {
if (writable) {
setUserDefinedWritability(index);
} else {
clearUserDefinedWritability(index);
}
}
private void setUserDefinedWritability(int index) {
final int mask = ~writabilityMask(index);
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue & mask;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue != 0 && newValue == 0) {
getPipeline().sendUpstream(
new UpstreamChannelStateEvent(
this, ChannelState.INTEREST_OPS, getInterestOps()));
}
break;
}
}
}
private void clearUserDefinedWritability(int index) {
final int mask = writabilityMask(index);
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue | mask;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue == 0 && newValue != 0) {
getPipeline().sendUpstream(
new UpstreamChannelStateEvent(
this, ChannelState.INTEREST_OPS, getInterestOps()));
}
break;
}
}
}
private static int writabilityMask(int index) {
if (index < 1 || index > 31) {
throw new IllegalArgumentException("index: " + index + " (expected: 1~31)");
}
return 1 << index;
}
protected boolean setWritable() {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue & ~1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue != 0 && newValue == 0) {
return true;
}
break;
}
}
return false;
}
protected boolean setUnwritable() {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue | 1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue == 0 && newValue != 0) {
return true;
}
break;
}
}
return false;
}
public ChannelFuture setReadable(boolean readable) {
@ -335,7 +435,7 @@ public abstract class AbstractChannel implements Channel {
private final class ChannelCloseFuture extends DefaultChannelFuture {
public ChannelCloseFuture() {
ChannelCloseFuture() {
super(AbstractChannel.this, false);
}

View File

@ -71,7 +71,7 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
}
@Override
protected void setInterestOpsNow(int interestOps) {
protected void setInternalInterestOps(int interestOps) {
// Ignore.
}

View File

@ -360,6 +360,17 @@ public interface Channel extends Comparable<Channel> {
*/
ChannelFuture setReadable(boolean readable);
/**
* Returns {@code true} if and only if the user-defined writability flag at the specified index is set to
* {@code true}.
*/
boolean getUserDefinedWritability(int index);
/**
* Sets a user-defined writability flag at the specified index.
*/
void setUserDefinedWritability(int index, boolean isWritable);
/**
* Retrieves an object which is {@link #setAttachment(Object) attached} to
* this {@link Channel}.

View File

@ -153,43 +153,14 @@ abstract class AbstractNioChannel<C extends SelectableChannel & WritableByteChan
public abstract NioChannelConfig getConfig();
int getRawInterestOps() {
return super.getInterestOps();
}
void setRawInterestOpsNow(int interestOps) {
setInterestOpsNow(interestOps);
@Override
protected int getInternalInterestOps() {
return super.getInternalInterestOps();
}
@Override
public int getInterestOps() {
if (!isOpen()) {
return Channel.OP_WRITE;
}
int interestOps = getRawInterestOps();
int writeBufferSize = this.writeBufferSize.get();
if (writeBufferSize != 0) {
if (highWaterMarkCounter.get() > 0) {
int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
if (writeBufferSize >= lowWaterMark) {
interestOps |= Channel.OP_WRITE;
} else {
interestOps &= ~Channel.OP_WRITE;
}
} else {
int highWaterMark = getConfig().getWriteBufferHighWaterMark();
if (writeBufferSize >= highWaterMark) {
interestOps |= Channel.OP_WRITE;
} else {
interestOps &= ~Channel.OP_WRITE;
}
}
} else {
interestOps &= ~Channel.OP_WRITE;
}
return interestOps;
protected void setInternalInterestOps(int interestOps) {
super.setInternalInterestOps(interestOps);
}
@Override
@ -285,7 +256,7 @@ abstract class AbstractNioChannel<C extends SelectableChannel & WritableByteChan
if (newWriteBufferSize >= highWaterMark) {
if (newWriteBufferSize - messageSize < highWaterMark) {
highWaterMarkCounter.incrementAndGet();
if (!notifying.get()) {
if (!notifying.get() && setUnwritable()) {
notifying.set(Boolean.TRUE);
fireChannelInterestChanged(AbstractNioChannel.this);
notifying.set(Boolean.FALSE);
@ -305,7 +276,7 @@ abstract class AbstractNioChannel<C extends SelectableChannel & WritableByteChan
if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
if (newWriteBufferSize + messageSize >= lowWaterMark) {
highWaterMarkCounter.decrementAndGet();
if (isConnected() && !notifying.get()) {
if (isConnected() && !notifying.get() && setWritable()) {
notifying.set(Boolean.TRUE);
fireChannelInterestChanged(AbstractNioChannel.this);
notifying.set(Boolean.FALSE);

View File

@ -317,11 +317,11 @@ abstract class AbstractNioWorker extends AbstractNioSelector implements Worker {
return;
}
int interestOps = channel.getRawInterestOps();
int interestOps = channel.getInternalInterestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
interestOps |= SelectionKey.OP_WRITE;
key.interestOps(interestOps);
channel.setRawInterestOpsNow(interestOps);
channel.setInternalInterestOps(interestOps);
}
}
@ -336,11 +336,11 @@ abstract class AbstractNioWorker extends AbstractNioSelector implements Worker {
return;
}
int interestOps = channel.getRawInterestOps();
int interestOps = channel.getInternalInterestOps();
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
interestOps &= ~SelectionKey.OP_WRITE;
key.interestOps(interestOps);
channel.setRawInterestOpsNow(interestOps);
channel.setInternalInterestOps(interestOps);
}
}
@ -464,16 +464,16 @@ abstract class AbstractNioWorker extends AbstractNioSelector implements Worker {
SelectionKey key = channel.channel.keyFor(selector);
// Override OP_WRITE flag - a user cannot change this flag.
int newInterestOps = interestOps & ~Channel.OP_WRITE | channel.getRawInterestOps() & Channel.OP_WRITE;
int newInterestOps = interestOps & ~Channel.OP_WRITE | channel.getInternalInterestOps() & Channel.OP_WRITE;
if (key == null || selector == null) {
if (channel.getRawInterestOps() != newInterestOps) {
if (channel.getInternalInterestOps() != newInterestOps) {
changed = true;
}
// Not registered to the worker yet.
// Set the rawInterestOps immediately; RegisterTask will pick it up.
channel.setRawInterestOpsNow(newInterestOps);
channel.setInternalInterestOps(newInterestOps);
future.setSuccess();
if (changed) {
@ -487,14 +487,14 @@ abstract class AbstractNioWorker extends AbstractNioSelector implements Worker {
return;
}
if (channel.getRawInterestOps() != newInterestOps) {
if (channel.getInternalInterestOps() != newInterestOps) {
changed = true;
key.interestOps(newInterestOps);
if (Thread.currentThread() != thread &&
wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
channel.setRawInterestOpsNow(newInterestOps);
channel.setInternalInterestOps(newInterestOps);
}
future.setSuccess();

View File

@ -182,7 +182,7 @@ public class NioDatagramWorker extends AbstractNioWorker {
try {
channel.getDatagramChannel().register(
selector, channel.getRawInterestOps(), channel);
selector, channel.getInternalInterestOps(), channel);
if (future != null) {
future.setSuccess();

View File

@ -149,7 +149,7 @@ public class NioWorker extends AbstractNioWorker {
}
channel.channel.register(
selector, channel.getRawInterestOps(), channel);
selector, channel.getInternalInterestOps(), channel);
if (future != null) {
channel.setConnected();

View File

@ -15,10 +15,6 @@
*/
package org.jboss.netty.channel.socket.oio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import org.jboss.netty.channel.AbstractChannel;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
@ -27,6 +23,10 @@ import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.socket.Worker;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
abstract class AbstractOioChannel extends AbstractChannel {
private volatile InetSocketAddress localAddress;
volatile InetSocketAddress remoteAddress;
@ -49,8 +49,13 @@ abstract class AbstractOioChannel extends AbstractChannel {
}
@Override
protected void setInterestOpsNow(int interestOps) {
super.setInterestOpsNow(interestOps);
protected int getInternalInterestOps() {
return super.getInternalInterestOps();
}
@Override
protected void setInternalInterestOps(int interestOps) {
super.setInternalInterestOps(interestOps);
}
@Override

View File

@ -158,15 +158,15 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
// Override OP_WRITE flag - a user cannot change this flag.
interestOps &= ~Channel.OP_WRITE;
interestOps |= channel.getInterestOps() & Channel.OP_WRITE;
interestOps |= channel.getInternalInterestOps() & Channel.OP_WRITE;
boolean changed = false;
try {
if (channel.getInterestOps() != interestOps) {
if (channel.getInternalInterestOps() != interestOps) {
if ((interestOps & Channel.OP_READ) != 0) {
channel.setInterestOpsNow(Channel.OP_READ);
channel.setInternalInterestOps(Channel.OP_READ);
} else {
channel.setInterestOpsNow(Channel.OP_NONE);
channel.setInternalInterestOps(Channel.OP_NONE);
}
changed = true;
}
@ -174,7 +174,7 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
future.setSuccess();
if (changed) {
synchronized (channel.interestOpsLock) {
channel.setInterestOpsNow(interestOps);
channel.setInternalInterestOps(interestOps);
// Notify the worker so it stops or continues reading.
Thread currentThread = Thread.currentThread();

View File

@ -15,10 +15,9 @@
*/
package org.jboss.netty.handler.traffic;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
@ -35,25 +34,23 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* AbstractTrafficShapingHandler allows to limit the global bandwidth
* <p>AbstractTrafficShapingHandler allows to limit the global bandwidth
* (see {@link GlobalTrafficShapingHandler}) or per session
* bandwidth (see {@link ChannelTrafficShapingHandler}), as traffic shaping.
* It allows too to implement an almost real time monitoring of the bandwidth using
* the monitors from {@link TrafficCounter} that will call back every checkInterval
* the method doAccounting of this handler.<br>
* <br>
* the method doAccounting of this handler.</p>
*
* An {@link ObjectSizeEstimator} can be passed at construction to specify what
* <p>An {@link ObjectSizeEstimator} can be passed at construction to specify what
* is the size of the object to be read or write accordingly to the type of
* object. If not specified, it will used the {@link DefaultObjectSizeEstimator} implementation.<br><br>
* object. If not specified, it will used the {@link DefaultObjectSizeEstimator} implementation.</p>
*
* If you want for any particular reasons to stop the monitoring (accounting) or to change
* the read/write limit or the check interval, several methods allow that for you:<br>
* <p>If you want for any particular reasons to stop the monitoring (accounting) or to change
* the read/write limit or the check interval, several methods allow that for you:</p>
* <ul>
* <li><tt>configure</tt> allows you to change read or write limits, or the checkInterval</li>
* <li><tt>getTrafficCounter</tt> allows you to have access to the TrafficCounter and so to stop
* or start the monitoring, to change the checkInterval directly, or to have access to its values.</li>
* <li></li>
* </ul>
*/
public abstract class AbstractTrafficShapingHandler extends
@ -68,6 +65,7 @@ public abstract class AbstractTrafficShapingHandler extends
* Default delay between two checks: 1s
*/
public static final long DEFAULT_CHECK_INTERVAL = 1000;
/**
* Default max delay in case of traffic shaping
* (during which no communication will occur).
@ -75,11 +73,20 @@ public abstract class AbstractTrafficShapingHandler extends
*/
public static final long DEFAULT_MAX_TIME = 15000;
/**
* Default max size to not exceed in buffer (write only).
*/
static final long DEFAULT_MAX_SIZE = 4 * 1024 * 1024L;
/**
* Default minimal time to wait
*/
static final long MINIMAL_WAIT = 10;
static final int CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 1;
static final int GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 2;
static final int GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 3;
/**
* Traffic Counter
*/
@ -98,26 +105,37 @@ public abstract class AbstractTrafficShapingHandler extends
/**
* used in releaseExternalResources() to cancel the timer
*/
private volatile Timeout timeout;
volatile Timeout timeout;
/**
* Limit in B/s to apply to write
*/
private long writeLimit;
private volatile long writeLimit;
/**
* Limit in B/s to apply to read
*/
private long readLimit;
private volatile long readLimit;
/**
* Delay between two performance snapshots
*/
protected long checkInterval = DEFAULT_CHECK_INTERVAL; // default 1 s
protected volatile long checkInterval = DEFAULT_CHECK_INTERVAL; // default 1 s
/**
* Max delay in wait
*/
protected long maxTime = DEFAULT_MAX_TIME; // default 15 s
protected volatile long maxTime = DEFAULT_MAX_TIME; // default 15 s
/**
* Max time to delay before proposing to stop writing new objects from next handlers
*/
volatile long maxWriteDelay = 4 * DEFAULT_CHECK_INTERVAL; // default 4 s
/**
* Max size in the list before proposing to stop writing new objects from next handlers
*/
volatile long maxWriteSize = DEFAULT_MAX_SIZE; // default 4MB
/**
* Boolean associated with the release of this TrafficShapingHandler.
@ -125,10 +143,58 @@ public abstract class AbstractTrafficShapingHandler extends
* to prevent waiting when shutdown.
*/
final AtomicBoolean release = new AtomicBoolean(false);
final int index;
private void init(ObjectSizeEstimator newObjectSizeEstimator,
/**
* Attachment of ChannelHandlerContext
*
*/
static final class ReadWriteStatus {
volatile boolean readSuspend;
volatile TimerTask reopenReadTimerTask;
}
/**
* For simple ChannelBuffer, returns the readableBytes, else
* use standard DefaultObjectSizeEstimator.
*/
public static class SimpleObjectSizeEstimator extends DefaultObjectSizeEstimator {
@Override
public int estimateSize(Object o) {
int size;
if (o instanceof ChannelBuffer) {
size = ((ChannelBuffer) o).readableBytes();
} else {
size = super.estimateSize(o);
}
return size;
}
}
/**
* @return the index to be used by the TrafficShapingHandler to manage the user defined
* writability. For Channel TSH it is defined as
* {@value #CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX}, for Global TSH it is
* defined as {@value #GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX},
* for GlobalChannel TSH it is defined as
* {@value #GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX}.
*/
int userDefinedWritabilityIndex() {
if (this instanceof GlobalChannelTrafficShapingHandler) {
return GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
} else if (this instanceof GlobalTrafficShapingHandler) {
return GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
} else {
return CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
}
}
private void init(ObjectSizeEstimator newObjectSizeEstimator,
Timer newTimer, long newWriteLimit, long newReadLimit,
long newCheckInterval, long newMaxTime) {
if (newMaxTime <= 0) {
throw new IllegalArgumentException("maxTime must be positive");
}
objectSizeEstimator = newObjectSizeEstimator;
timer = newTimer;
writeLimit = newWriteLimit;
@ -139,7 +205,6 @@ public abstract class AbstractTrafficShapingHandler extends
}
/**
*
* @param newTrafficCounter the TrafficCounter to set
*/
void setTrafficCounter(TrafficCounter newTrafficCounter) {
@ -147,51 +212,57 @@ public abstract class AbstractTrafficShapingHandler extends
}
/**
* Constructor using default {@link ObjectSizeEstimator}
* Constructor using default {@link ObjectSizeEstimator} and
* default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
*
* @param timer
* created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
* created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024).
* @param writeLimit
* 0 or a limit in bytes/s
* @param readLimit
* 0 or a limit in bytes/s
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed
* channels or 0 if no stats are to be computed.
*/
protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
long readLimit, long checkInterval) {
init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit, checkInterval,
this.index = userDefinedWritabilityIndex();
init(new SimpleObjectSizeEstimator(), timer, writeLimit, readLimit, checkInterval,
DEFAULT_MAX_TIME);
}
/**
* Constructor using the specified ObjectSizeEstimator
* Constructor using the specified ObjectSizeEstimator and
* default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
*
* @param objectSizeEstimator
* the {@link ObjectSizeEstimator} that will be used to compute
* the size of the message
* the size of the message.
* @param timer
* created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
* created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024).
* @param writeLimit
* 0 or a limit in bytes/s
* @param readLimit
* 0 or a limit in bytes/s
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed
* channels or 0 if no stats are to be computed.
*/
protected AbstractTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Timer timer,
long writeLimit, long readLimit, long checkInterval) {
this.index = userDefinedWritabilityIndex();
init(objectSizeEstimator, timer, writeLimit, readLimit, checkInterval, DEFAULT_MAX_TIME);
}
/**
* Constructor using default {@link ObjectSizeEstimator} and using default Check Interval
* Constructor using default {@link ObjectSizeEstimator} and using
* default Check Interval value of {@value #DEFAULT_CHECK_INTERVAL} ms and
* default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
*
* @param timer
* created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
* created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024).
* @param writeLimit
* 0 or a limit in bytes/s
* @param readLimit
@ -199,18 +270,21 @@ public abstract class AbstractTrafficShapingHandler extends
*/
protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
long readLimit) {
init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit,
this.index = userDefinedWritabilityIndex();
init(new SimpleObjectSizeEstimator(), timer, writeLimit, readLimit,
DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
}
/**
* Constructor using the specified ObjectSizeEstimator and using default Check Interval
* Constructor using the specified ObjectSizeEstimator and using
* default Check Interval value of {@value #DEFAULT_CHECK_INTERVAL} ms and
* default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
*
* @param objectSizeEstimator
* the {@link ObjectSizeEstimator} that will be used to compute
* the size of the message
* the size of the message.
* @param timer
* created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
* created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024).
* @param writeLimit
* 0 or a limit in bytes/s
* @param readLimit
@ -219,114 +293,141 @@ public abstract class AbstractTrafficShapingHandler extends
protected AbstractTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Timer timer,
long writeLimit, long readLimit) {
this.index = userDefinedWritabilityIndex();
init(objectSizeEstimator, timer, writeLimit, readLimit,
DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
}
/**
* Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT and default Check Interval
* Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT and
* default Check Interval value of {@value #DEFAULT_CHECK_INTERVAL} ms and
* default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
*
* @param timer
* created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
* created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024).
*/
protected AbstractTrafficShapingHandler(Timer timer) {
init(new DefaultObjectSizeEstimator(), timer, 0, 0,
this.index = userDefinedWritabilityIndex();
init(new SimpleObjectSizeEstimator(), timer, 0, 0,
DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
}
/**
* Constructor using the specified ObjectSizeEstimator and using NO LIMIT and default Check Interval
* Constructor using the specified ObjectSizeEstimator and using NO LIMIT and
* default Check Interval value of {@value #DEFAULT_CHECK_INTERVAL} ms and
* default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
*
* @param objectSizeEstimator
* the {@link ObjectSizeEstimator} that will be used to compute
* the size of the message
* the size of the message.
* @param timer
* created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
* created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024).
*/
protected AbstractTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Timer timer) {
this.index = userDefinedWritabilityIndex();
init(objectSizeEstimator, timer, 0, 0,
DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
}
/**
* Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT
* Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT and
* default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
*
* @param timer
* created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
* created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024).
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed
* channels or 0 if no stats are to be computed.
*/
protected AbstractTrafficShapingHandler(Timer timer, long checkInterval) {
init(new DefaultObjectSizeEstimator(), timer, 0, 0, checkInterval, DEFAULT_MAX_TIME);
this.index = userDefinedWritabilityIndex();
init(new SimpleObjectSizeEstimator(), timer, 0, 0, checkInterval, DEFAULT_MAX_TIME);
}
/**
* Constructor using the specified ObjectSizeEstimator and using NO LIMIT
* Constructor using the specified ObjectSizeEstimator and using NO LIMIT and
* default max time as delay allowed value of {@value #DEFAULT_MAX_TIME} ms.
*
* @param objectSizeEstimator
* the {@link ObjectSizeEstimator} that will be used to compute
* the size of the message
* the size of the message.
* @param timer
* created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
* created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024).
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed
* channels or 0 if no stats are to be computed.
*/
protected AbstractTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Timer timer,
long checkInterval) {
this.index = userDefinedWritabilityIndex();
init(objectSizeEstimator, timer, 0, 0, checkInterval, DEFAULT_MAX_TIME);
}
/**
* Constructor using default {@link ObjectSizeEstimator}
* Constructor using default {@link ObjectSizeEstimator}.
*
* @param timer
* created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
* created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024).
* @param writeLimit
* 0 or a limit in bytes/s
* @param readLimit
* 0 or a limit in bytes/s
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed
* channels or 0 if no stats are to be computed.
* @param maxTime
* The max time to wait in case of excess of traffic (to prevent Time Out event)
* The max time to wait in case of excess of traffic (to prevent Time Out event).
* Must be positive.
*/
protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
long readLimit, long checkInterval, long maxTime) {
init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit, checkInterval,
this.index = userDefinedWritabilityIndex();
init(new SimpleObjectSizeEstimator(), timer, writeLimit, readLimit, checkInterval,
maxTime);
}
/**
* Constructor using the specified ObjectSizeEstimator
* Constructor using the specified ObjectSizeEstimator.
*
* @param objectSizeEstimator
* the {@link ObjectSizeEstimator} that will be used to compute
* the size of the message
* the size of the message.
* @param timer
* created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
* created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024).
* @param writeLimit
* 0 or a limit in bytes/s
* @param readLimit
* 0 or a limit in bytes/s
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed
* channels or 0 if no stats are to be computed.
* @param maxTime
* The max time to wait in case of excess of traffic (to prevent Time Out event)
* The max time to wait in case of excess of traffic (to prevent Time Out event).
* Must be positive.
*/
protected AbstractTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Timer timer,
long writeLimit, long readLimit, long checkInterval, long maxTime) {
this.index = userDefinedWritabilityIndex();
init(objectSizeEstimator, timer, writeLimit, readLimit, checkInterval, maxTime);
}
/**
* Change the underlying limitations and check interval.
* <p>Note the change will be taken as best effort, meaning
* that all already scheduled traffics will not be
* changed, but only applied to new traffics.</p>
* So the expected usage of this method is to be used not too often,
* accordingly to the traffic shaping configuration.
*
* @param newWriteLimit
* The new write limit (in bytes)
* @param newReadLimit
* The new read limit (in bytes)
* @param newCheckInterval
* The new check interval (in milliseconds)
*/
public void configure(long newWriteLimit, long newReadLimit,
long newCheckInterval) {
@ -336,12 +437,22 @@ public abstract class AbstractTrafficShapingHandler extends
/**
* Change the underlying limitations.
* <p>Note the change will be taken as best effort, meaning
* that all already scheduled traffics will not be
* changed, but only applied to new traffics.</p>
* So the expected usage of this method is to be used not too often,
* accordingly to the traffic shaping configuration.
*
* @param newWriteLimit
* The new write limit (in bytes)
* @param newReadLimit
* The new read limit (in bytes)
*/
public void configure(long newWriteLimit, long newReadLimit) {
writeLimit = newWriteLimit;
readLimit = newReadLimit;
if (trafficCounter != null) {
trafficCounter.resetAccounting(System.currentTimeMillis() + 1);
trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
}
}
@ -360,12 +471,18 @@ public abstract class AbstractTrafficShapingHandler extends
}
/**
* <p>Note the change will be taken as best effort, meaning
* that all already scheduled traffics will not be
* changed, but only applied to new traffics.</p>
* So the expected usage of this method is to be used not too often,
* accordingly to the traffic shaping configuration.
*
* @param writeLimit the writeLimit to set
*/
public void setWriteLimit(long writeLimit) {
this.writeLimit = writeLimit;
if (trafficCounter != null) {
trafficCounter.resetAccounting(System.currentTimeMillis() + 1);
trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
}
}
@ -377,12 +494,18 @@ public abstract class AbstractTrafficShapingHandler extends
}
/**
* <p>Note the change will be taken as best effort, meaning
* that all already scheduled traffics will not be
* changed, but only applied to new traffics.</p>
* So the expected usage of this method is to be used not too often,
* accordingly to the traffic shaping configuration.
*
* @param readLimit the readLimit to set
*/
public void setReadLimit(long readLimit) {
this.readLimit = readLimit;
if (trafficCounter != null) {
trafficCounter.resetAccounting(System.currentTimeMillis() + 1);
trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
}
}
@ -410,15 +533,70 @@ public abstract class AbstractTrafficShapingHandler extends
return maxTime;
}
/**
/**
* <p>Note the change will be taken as best effort, meaning
* that all already scheduled traffics will not be
* changed, but only applied to new traffics.</p>
* So the expected usage of this method is to be used not too often,
* accordingly to the traffic shaping configuration.
*
* @param maxTime
* Max delay in wait, shall be less than TIME OUT in related protocol
* Max delay in wait, shall be less than TIME OUT in related protocol.
* Must be positive.
*/
public void setMaxTimeWait(long maxTime) {
if (maxTime <= 0) {
throw new IllegalArgumentException("maxTime must be positive");
}
this.maxTime = maxTime;
}
/**
* @return the maxWriteDelay
*/
public long getMaxWriteDelay() {
return maxWriteDelay;
}
/**
* <p>Note the change will be taken as best effort, meaning
* that all already scheduled traffics will not be
* changed, but only applied to new traffics.</p>
* So the expected usage of this method is to be used not too often,
* accordingly to the traffic shaping configuration.
*
* @param maxWriteDelay the maximum Write Delay in ms in the buffer allowed before write suspended is set.
* Must be positive.
*/
public void setMaxWriteDelay(long maxWriteDelay) {
if (maxWriteDelay <= 0) {
throw new IllegalArgumentException("maxWriteDelay must be positive");
}
this.maxWriteDelay = maxWriteDelay;
}
/**
* @return the maxWriteSize default being {@value #DEFAULT_MAX_SIZE} bytes
*/
public long getMaxWriteSize() {
return maxWriteSize;
}
/**
* <p>Note the change will be taken as best effort, meaning
* that all already scheduled traffics will not be
* changed, but only applied to new traffics.</p>
* So the expected usage of this method is to be used not too often,
* accordingly to the traffic shaping configuration.
*
* @param maxWriteSize the maximum Write Size allowed in the buffer
* per channel before write suspended is set,
* default being {@value #DEFAULT_MAX_SIZE} bytes
*/
public void setMaxWriteSize(long maxWriteSize) {
this.maxWriteSize = maxWriteSize;
}
/**
* Called each time the accounting is computed from the TrafficCounters.
* This method could be used for instance to implement almost real time accounting.
@ -431,9 +609,9 @@ public abstract class AbstractTrafficShapingHandler extends
}
/**
* Class to implement setReadable at fix time
* Class to implement setReadable at fix time.
*/
private class ReopenReadTimerTask implements TimerTask {
class ReopenReadTimerTask implements TimerTask {
final ChannelHandlerContext ctx;
ReopenReadTimerTask(ChannelHandlerContext ctx) {
this.ctx = ctx;
@ -443,43 +621,61 @@ public abstract class AbstractTrafficShapingHandler extends
if (release.get()) {
return;
}
if (!ctx.getChannel().isReadable() && ctx.getAttachment() == null) {
ReadWriteStatus rws = checkAttachment(ctx);
Channel channel = ctx.getChannel();
if (! channel.isConnected()) {
// ignore
return;
}
if (!channel.isReadable() && ! rws.readSuspend) {
// If isReadable is False and Active is True, user make a direct setReadable(false)
// Then Just reset the status
if (logger.isDebugEnabled()) {
logger.debug("Not Unsuspend: " + ctx.getChannel().isReadable() + ":" +
(ctx.getAttachment() == null));
logger.debug("Not unsuspend: " + channel.isReadable() + ":" +
rws.readSuspend);
}
ctx.setAttachment(null);
rws.readSuspend = false;
} else {
// Anything else allows the handler to reset the AutoRead
if (logger.isDebugEnabled()) {
if (ctx.getChannel().isReadable() && ctx.getAttachment() != null) {
logger.debug("Unsuspend: " + ctx.getChannel().isReadable() + ":" +
(ctx.getAttachment() == null));
if (channel.isReadable() && rws.readSuspend) {
logger.debug("Unsuspend: " + channel.isReadable() + ":" +
rws.readSuspend);
} else {
logger.debug("Normal Unsuspend: " + ctx.getChannel().isReadable() + ":" +
(ctx.getAttachment() == null));
logger.debug("Normal unsuspend: " + channel.isReadable() + ":" +
rws.readSuspend);
}
}
ctx.setAttachment(null);
ctx.getChannel().setReadable(true);
rws.readSuspend = false;
channel.setReadable(true);
}
if (logger.isDebugEnabled()) {
logger.debug("Unsupsend final status => " + ctx.getChannel().isReadable() + ":" +
(ctx.getAttachment() == null));
logger.debug("Unsupsend final status => " + channel.isReadable() + ":" +
rws.readSuspend);
}
}
}
/**
* Release the Read suspension.
*/
void releaseReadSuspended(ChannelHandlerContext ctx) {
ReadWriteStatus rws = checkAttachment(ctx);
rws.readSuspend = false;
ctx.getChannel().setReadable(true);
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
throws Exception {
long now = TrafficCounter.milliSecondFromNano();
try {
long size = objectSizeEstimator.estimateSize(evt.getMessage());
ReadWriteStatus rws = checkAttachment(ctx);
long size = calculateSize(evt.getMessage());
if (size > 0 && trafficCounter != null) {
// compute the number of ms to wait before reopening the channel
long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime);
long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime, now);
wait = checkWaitReadTime(ctx, wait, now);
if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
// time in order to try to limit the traffic
if (release.get()) {
@ -489,8 +685,8 @@ public abstract class AbstractTrafficShapingHandler extends
if (channel != null && channel.isConnected()) {
// Only AutoRead AND HandlerActive True means Context Active
if (logger.isDebugEnabled()) {
logger.debug("Read Suspend: " + wait + ":" + channel.isReadable() + ":" +
(ctx.getAttachment() == null));
logger.debug("Read suspend: " + wait + ":" + channel.isReadable() + ":" +
rws.readSuspend);
}
if (timer == null) {
// Sleep since no executor
@ -498,97 +694,118 @@ public abstract class AbstractTrafficShapingHandler extends
Thread.sleep(wait);
return;
}
if (channel.isReadable() && ctx.getAttachment() == null) {
ctx.setAttachment(Boolean.TRUE);
if (channel.isReadable() && ! rws.readSuspend) {
rws.readSuspend = true;
channel.setReadable(false);
if (logger.isDebugEnabled()) {
logger.debug("Suspend final status => " + channel.isReadable() + ":" +
(ctx.getAttachment() == null));
rws.readSuspend);
}
// Create a Runnable to reactive the read if needed. If one was create before
// it will just be reused to limit object creation
TimerTask timerTask = new ReopenReadTimerTask(ctx);
timeout = timer.newTimeout(timerTask, wait,
if (rws.reopenReadTimerTask == null) {
rws.reopenReadTimerTask = new ReopenReadTimerTask(ctx);
}
timeout = timer.newTimeout(rws.reopenReadTimerTask, wait,
TimeUnit.MILLISECONDS);
}
}
}
}
} finally {
informReadOperation(ctx, now);
// The message is then just passed to the next handler
super.messageReceived(ctx, evt);
ctx.sendUpstream(evt);
}
}
/**
* Method overridden in GTSH to take into account specific timer for the channel.
* @param wait the wait delay computed in ms
* @param now the relative now time in ms
* @return the wait to use according to the context.
*/
long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
// no change by default
return wait;
}
/**
* Method overridden in GTSH to take into account specific timer for the channel.
* @param now the relative now time in ms
*/
void informReadOperation(final ChannelHandlerContext ctx, final long now) {
// default noop
}
@Override
public void writeRequested(ChannelHandlerContext ctx, MessageEvent evt)
throws Exception {
long wait = 0;
long size = calculateSize(evt.getMessage());
long now = TrafficCounter.milliSecondFromNano();
Channel channel = ctx.getChannel();
try {
long size = objectSizeEstimator.estimateSize(evt.getMessage());
if (size > 0 && trafficCounter != null) {
// compute the number of ms to wait before continue with the channel
wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime);
wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
if (logger.isDebugEnabled()) {
logger.debug("Write Suspend: " + wait + ":" + ctx.getChannel().isReadable() + ":" +
(ctx.getAttachment() == null));
logger.debug("Write suspend: " + wait + ":" + channel.isWritable() + ":" +
channel.getUserDefinedWritability(index));
}
if (wait >= MINIMAL_WAIT) {
if (release.get()) {
return;
}
/*
* Option 2:
* Thread.sleep(wait);
* System.out.println("Write unsuspended");
* Option 1: use an ordered list of messages to send
* Warning of memory pressure!
*/
} else {
if (wait < MINIMAL_WAIT || release.get()) {
wait = 0;
}
}
} finally {
if (release.get()) {
return;
}
// The message is then just passed to the next handler
submitWrite(ctx, evt, wait);
// The message is scheduled
submitWrite(ctx, evt, size, wait, now);
}
}
@Deprecated
protected void internalSubmitWrite(ChannelHandlerContext ctx, MessageEvent evt) throws Exception {
super.writeRequested(ctx, evt);
ctx.sendDownstream(evt);
}
protected abstract void submitWrite(final ChannelHandlerContext ctx, final MessageEvent evt, final long delay)
throws Exception;
@Deprecated
protected void submitWrite(final ChannelHandlerContext ctx, final MessageEvent evt,
final long delay) throws Exception {
submitWrite(ctx, evt, calculateSize(evt.getMessage()), delay, TrafficCounter.milliSecondFromNano());
}
@Override
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
if (e instanceof ChannelStateEvent) {
ChannelStateEvent cse = (ChannelStateEvent) e;
if (cse.getState() == ChannelState.INTEREST_OPS &&
(((Integer) cse.getValue()).intValue() & Channel.OP_READ) != 0) {
abstract void submitWrite(final ChannelHandlerContext ctx, final MessageEvent evt, final long size,
final long delay, final long now) throws Exception;
// setReadable(true) requested
boolean readSuspended = ctx.getAttachment() != null;
if (readSuspended) {
// Drop the request silently if this handler has
// set the flag.
e.getFuture().setSuccess();
return;
}
}
void setWritable(ChannelHandlerContext ctx, boolean writable) {
Channel channel = ctx.getChannel();
if (channel.isConnected()) {
channel.setUserDefinedWritability(index, writable);
}
super.handleDownstream(ctx, e);
}
/**
*
* Check the writability according to delay and size for the channel.
* Set if necessary the write suspended status.
* @param delay the computed delai
* @param queueSize the current queueSize
*/
void checkWriteSuspend(ChannelHandlerContext ctx, long delay, long queueSize) {
if (queueSize > maxWriteSize || delay > maxWriteDelay) {
setWritable(ctx, false);
}
}
/**
* Explicitly release the Write suspended status.
*/
void releaseWriteSuspended(ChannelHandlerContext ctx) {
setWritable(ctx, true);
}
/**
* @return the current TrafficCounter (if
* channel is still connected)
* channel is still connected).
*/
public TrafficCounter getTrafficCounter() {
return trafficCounter;
@ -605,10 +822,42 @@ public abstract class AbstractTrafficShapingHandler extends
//shall be done outside (since it can be shared): timer.stop();
}
static ReadWriteStatus checkAttachment(ChannelHandlerContext ctx) {
ReadWriteStatus rws = (ReadWriteStatus) ctx.getAttachment();
if (rws == null) {
rws = new ReadWriteStatus();
ctx.setAttachment(rws);
}
return rws;
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
checkAttachment(ctx);
setWritable(ctx, true);
super.channelConnected(ctx, e);
}
protected long calculateSize(Object obj) {
long size = objectSizeEstimator.estimateSize(obj);
//logger.debug("Size: "+size);
return size;
}
@Override
public String toString() {
return "TrafficShaping with Write Limit: " + writeLimit +
" Read Limit: " + readLimit + " every: " + checkInterval + " and Counter: " +
(trafficCounter != null? trafficCounter.toString() : "none");
StringBuilder builder = new StringBuilder(290)
.append("TrafficShaping with Write Limit: ").append(writeLimit)
.append(" Read Limit: ").append(readLimit)
.append(" CheckInterval: ").append(checkInterval)
.append(" maxDelay: ").append(maxWriteDelay)
.append(" maxSize: ").append(maxWriteSize)
.append(" and Counter: ");
if (trafficCounter != null) {
builder.append(trafficCounter.toString());
} else {
builder.append("none");
}
return builder.toString();
}
}

View File

@ -19,6 +19,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
@ -32,43 +33,57 @@ import org.jboss.netty.util.Timer;
import org.jboss.netty.util.TimerTask;
/**
* This implementation of the {@link AbstractTrafficShapingHandler} is for channel
* traffic shaping, that is to say a per channel limitation of the bandwidth.<br><br>
* <p>This implementation of the {@link AbstractTrafficShapingHandler} is for channel
* traffic shaping, that is to say a per channel limitation of the bandwidth.</p>
*
* The general use should be as follow:<br>
* <ul>
* <li>Add in your pipeline a new ChannelTrafficShapingHandler, before a recommended {@link ExecutionHandler} (like
* {@link OrderedMemoryAwareThreadPoolExecutor} or {@link MemoryAwareThreadPoolExecutor}).<br>
* <tt>ChannelTrafficShapingHandler myHandler = new ChannelTrafficShapingHandler(timer);</tt><br>
* timer could be created using <tt>HashedWheelTimer</tt><br>
* <tt>pipeline.addLast("CHANNEL_TRAFFIC_SHAPING", myHandler);</tt><br><br>
* <li><p>Add in your pipeline a new ChannelTrafficShapingHandler, before a recommended {@link ExecutionHandler} (like
* {@link OrderedMemoryAwareThreadPoolExecutor} or {@link MemoryAwareThreadPoolExecutor}).</p>
* <p><tt>ChannelTrafficShapingHandler myHandler = new ChannelTrafficShapingHandler(timer);</tt></p>
* <p>timer could be created using <tt>HashedWheelTimer</tt></p>
* <p><tt>pipeline.addLast("CHANNEL_TRAFFIC_SHAPING", myHandler);</tt></p>
*
* <b>Note that this handler has a Pipeline Coverage of "one" which means a new handler must be created
* <p><b>Note that this handler has a Pipeline Coverage of "one" which means a new handler must be created
* for each new channel as the counter cannot be shared among all channels.</b> For instance, if you have a
* {@link ChannelPipelineFactory}, you should create a new ChannelTrafficShapingHandler in this
* {@link ChannelPipelineFactory} each time getPipeline() method is called.<br><br>
* {@link ChannelPipelineFactory} each time getPipeline() method is called.</p>
*
* Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation)
* <p>Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation)
* or the check interval (in millisecond) that represents the delay between two computations of the
* bandwidth and so the call back of the doAccounting method (0 means no accounting at all).<br><br>
* bandwidth and so the call back of the doAccounting method (0 means no accounting at all).</p>
*
* A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting,
* <p>A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting,
* it is recommended to set a positive value, even if it is high since the precision of the
* Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
* the less precise the traffic shaping will be. It is suggested as higher value something close
* to 5 or 10 minutes.<br><br>
* to 5 or 10 minutes.</p>
*
* maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.<br><br>
* <p>maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.</p>
* </li>
* <li>When you shutdown your application, release all the external resources (except the timer internal itself)
* by calling:<br>
* <tt>myHandler.releaseExternalResources();</tt><br>
* </li>
* <li>In your handler, you should consider to use the <code>channel.isWritable()</code> and
* <code>channelInterestChanged(ctx, event)</code> to handle writability, or through
* <code>future.addListener(new ChannelFutureListener())</code> on the future returned by
* <code>channel.write()</code>.</li>
* <li><p>You shall also consider to have object size in read or write operations relatively adapted to
* the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
* while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.</p></li>
* <li><p>Some configuration methods will be taken as best effort, meaning
* that all already scheduled traffics will not be
* changed, but only applied to new traffics.</p>
* So the expected usage of those methods are to be used not too often,
* accordingly to the traffic shaping configuration.</li>
* </ul><br>
*/
public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler {
private List<ToSend> messagesQueue = new LinkedList<ToSend>();
private final List<ToSend> messagesQueue = new LinkedList<ToSend>();
private long queueSize;
private volatile Timeout writeTimeout;
private volatile ChannelHandlerContext ctx;
public ChannelTrafficShapingHandler(Timer timer, long writeLimit,
long readLimit, long checkInterval) {
@ -125,56 +140,112 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler
}
private static final class ToSend {
final long date;
final long relativeTimeAction;
final MessageEvent toSend;
private ToSend(final long delay, final MessageEvent toSend) {
this.date = System.currentTimeMillis() + delay;
this.relativeTimeAction = delay;
this.toSend = toSend;
}
}
@Override
protected synchronized void submitWrite(final ChannelHandlerContext ctx, final MessageEvent evt, final long delay)
throws Exception {
if (delay == 0 && messagesQueue.isEmpty()) {
internalSubmitWrite(ctx, evt);
return;
void submitWrite(final ChannelHandlerContext ctx, final MessageEvent evt, final long size,
final long delay, final long now) throws Exception {
if (ctx == null) {
this.ctx = ctx;
}
if (timer == null) {
// Sleep since no executor
Thread.sleep(delay);
internalSubmitWrite(ctx, evt);
return;
final ToSend newToSend;
Channel channel = ctx.getChannel();
synchronized (this) {
if (delay == 0 && messagesQueue.isEmpty()) {
if (! channel.isConnected()) {
// ignore
return;
}
if (trafficCounter != null) {
trafficCounter.bytesRealWriteFlowControl(size);
}
ctx.sendDownstream(evt);
return;
}
if (timer == null) {
// Sleep since no executor
Thread.sleep(delay);
if (! channel.isConnected()) {
// ignore
return;
}
if (trafficCounter != null) {
trafficCounter.bytesRealWriteFlowControl(size);
}
ctx.sendDownstream(evt);
return;
}
if (! channel.isConnected()) {
// ignore
return;
}
newToSend = new ToSend(delay + now, evt);
messagesQueue.add(newToSend);
queueSize += size;
checkWriteSuspend(ctx, delay, queueSize);
}
final ToSend newToSend = new ToSend(delay, evt);
messagesQueue.add(newToSend);
final long futureNow = newToSend.relativeTimeAction;
writeTimeout = timer.newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
sendAllValid(ctx);
sendAllValid(ctx, futureNow);
}
}, delay + 1, TimeUnit.MILLISECONDS);
}
private synchronized void sendAllValid(ChannelHandlerContext ctx) throws Exception {
while (!messagesQueue.isEmpty()) {
ToSend newToSend = messagesQueue.remove(0);
if (newToSend.date <= System.currentTimeMillis()) {
internalSubmitWrite(ctx, newToSend.toSend);
} else {
messagesQueue.add(0, newToSend);
break;
private void sendAllValid(ChannelHandlerContext ctx, final long now) throws Exception {
Channel channel = ctx.getChannel();
if (! channel.isConnected()) {
// ignore
return;
}
synchronized (this) {
while (!messagesQueue.isEmpty()) {
ToSend newToSend = messagesQueue.remove(0);
if (newToSend.relativeTimeAction <= now) {
long size = calculateSize(newToSend.toSend.getMessage());
if (trafficCounter != null) {
trafficCounter.bytesRealWriteFlowControl(size);
}
queueSize -= size;
if (! channel.isConnected()) {
// ignore
break;
}
ctx.sendDownstream(newToSend.toSend);
} else {
messagesQueue.add(0, newToSend);
break;
}
}
if (messagesQueue.isEmpty()) {
releaseWriteSuspended(ctx);
}
}
}
@Override
/**
* @return current size in bytes of the write buffer.
*/
public long queueSize() {
return queueSize;
}
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
if (trafficCounter != null) {
trafficCounter.stop();
}
messagesQueue.clear();
synchronized (this) {
messagesQueue.clear();
}
if (writeTimeout != null) {
writeTimeout.cancel();
}
@ -184,8 +255,10 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
this.ctx = ctx;
// readSuspended = true;
ctx.setAttachment(Boolean.TRUE);
ReadWriteStatus rws = checkAttachment(ctx);
rws.readSuspend = true;
ctx.getChannel().setReadable(false);
if (trafficCounter == null) {
// create a new counter now
@ -197,10 +270,30 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler
if (trafficCounter != null) {
trafficCounter.start();
}
// readSuspended = false;
ctx.setAttachment(null);
rws.readSuspend = false;
ctx.getChannel().setReadable(true);
super.channelConnected(ctx, e);
}
@Override
public void releaseExternalResources() {
Channel channel = ctx.getChannel();
synchronized (this) {
if (ctx != null && ctx.getChannel().isConnected()) {
for (ToSend toSend : messagesQueue) {
if (! channel.isConnected()) {
// ignore
break;
}
ctx.sendDownstream(toSend.toSend);
}
}
messagesQueue.clear();
}
if (writeTimeout != null) {
writeTimeout.cancel();
}
super.releaseExternalResources();
}
}

View File

@ -0,0 +1,129 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.traffic;
import java.util.concurrent.TimeUnit;
import org.jboss.netty.handler.traffic.GlobalChannelTrafficShapingHandler.PerChannel;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.Timer;
import org.jboss.netty.util.TimerTask;
/**
* Version for {@link GlobalChannelTrafficShapingHandler}.
* This TrafficCounter is the Global one, and its special property is to directly handle
* other channel's TrafficCounters. In particular, there are no scheduler for those
* channel's TrafficCounters because it is managed by this one.
*/
public class GlobalChannelTrafficCounter extends TrafficCounter {
/**
* @param trafficShapingHandler the associated {@link GlobalChannelTrafficShapingHandler}.
* @param executor the underlying executor service for scheduling checks (both Global and per Channel).
* @param name the name given to this monitor
* @param checkInterval the checkInterval in millisecond between two computations.
*/
public GlobalChannelTrafficCounter(GlobalChannelTrafficShapingHandler trafficShapingHandler,
Timer timer, String name, long checkInterval) {
super(trafficShapingHandler, timer, name, checkInterval);
if (timer == null) {
throw new IllegalArgumentException("Timer must not be null");
}
}
/**
* Class to implement monitoring at fix delay.
* This version is Mixed in the way it mixes Global and Channel counters.
*/
private static final class MixedTrafficMonitoringTask implements TimerTask {
/**
* The associated TrafficShapingHandler
*/
private final GlobalChannelTrafficShapingHandler trafficShapingHandler1;
/**
* The associated TrafficCounter
*/
private final TrafficCounter counter;
/**
* @param trafficShapingHandler The parent handler to which this task needs to callback to for accounting.
* @param counter The parent TrafficCounter that we need to reset the statistics for.
*/
MixedTrafficMonitoringTask(
GlobalChannelTrafficShapingHandler trafficShapingHandler,
TrafficCounter counter) {
trafficShapingHandler1 = trafficShapingHandler;
this.counter = counter;
}
@Override
public void run(Timeout timeout) throws Exception {
if (!counter.monitorActive) {
return;
}
long newLastTime = milliSecondFromNano();
counter.resetAccounting(newLastTime);
for (PerChannel perChannel : trafficShapingHandler1.channelQueues.values()) {
perChannel.channelTrafficCounter.resetAccounting(newLastTime);
}
trafficShapingHandler1.doAccounting(counter);
counter.timer.newTimeout(this, counter.checkInterval.get(), TimeUnit.MILLISECONDS);
}
}
/**
* Start the monitoring process.
*/
public synchronized void start() {
if (monitorActive) {
return;
}
lastTime.set(milliSecondFromNano());
long localCheckInterval = checkInterval.get();
if (localCheckInterval > 0) {
monitorActive = true;
timerTask =
new MixedTrafficMonitoringTask(
(GlobalChannelTrafficShapingHandler) trafficShapingHandler, this);
timeout = timer.newTimeout(timerTask, checkInterval.get(), TimeUnit.MILLISECONDS);
}
}
/**
* Stop the monitoring process.
*/
public synchronized void stop() {
if (!monitorActive) {
return;
}
monitorActive = false;
resetAccounting(milliSecondFromNano());
trafficShapingHandler.doAccounting(this);
if (timeout != null) {
timeout.cancel();
}
}
@Override
public void resetCumulativeTime() {
for (PerChannel perChannel :
((GlobalChannelTrafficShapingHandler) trafficShapingHandler).channelQueues.values()) {
perChannel.channelTrafficCounter.resetCumulativeTime();
}
super.resetCumulativeTime();
}
}

View File

@ -0,0 +1,884 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.traffic;
import java.util.AbstractCollection;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ObjectSizeEstimator;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.Timer;
import org.jboss.netty.util.TimerTask;
import org.jboss.netty.util.internal.ConcurrentHashMap;
/**
* This implementation of the {@link AbstractTrafficShapingHandler} is for global
* and per channel traffic shaping, that is to say a global limitation of the bandwidth, whatever
* the number of opened channels and a per channel limitation of the bandwidth.<br><br>
* This version shall not be in the same pipeline than other TrafficShapingHandler.<br><br>
*
* The general use should be as follow:<br>
* <ul>
* <li>Create your unique GlobalChannelTrafficShapingHandler like:<br><br>
* <tt>GlobalChannelTrafficShapingHandler myHandler = new GlobalChannelTrafficShapingHandler(executor);</tt><br><br>
* The executor could be the underlying IO worker pool<br>
* <tt>pipeline.addLast(myHandler);</tt><br><br>
*
* <b>Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created
* and shared among all channels as the counter must be shared among all channels.</b><br><br>
*
* Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation)
* or the check interval (in millisecond) that represents the delay between two computations of the
* bandwidth and so the call back of the doAccounting method (0 means no accounting at all).<br>
* Note that as this is a fusion of both Global and Channel Traffic Shaping, limits are in 2 sets,
* respectively Global and Channel.<br><br>
*
* A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting,
* it is recommended to set a positive value, even if it is high since the precision of the
* Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
* the less precise the traffic shaping will be. It is suggested as higher value something close
* to 5 or 10 minutes.<br><br>
*
* maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.<br><br>
* </li>
* <li>In your handler, you should consider to use the <code>channel.isWritable()</code> and
* <code>channelWritabilityChanged(ctx)</code> to handle writability, or through
* <code>future.addListener(new GenericFutureListener())</code> on the future returned by
* <code>ctx.write()</code>.</li>
* <li>You shall also consider to have object size in read or write operations relatively adapted to
* the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
* while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.<br><br></li>
* <li>Some configuration methods will be taken as best effort, meaning
* that all already scheduled traffics will not be
* changed, but only applied to new traffics.<br>
* So the expected usage of those methods are to be used not too often,
* accordingly to the traffic shaping configuration.</li>
* </ul><br>
*
* Be sure to call {@link #release()} once this handler is not needed anymore to release all internal resources.
* This will not shutdown the {@link EventExecutor} as it may be shared, so you need to do this by your own.
*/
@Sharable
public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHandler {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(GlobalChannelTrafficShapingHandler.class);
/**
* All queues per channel
*/
final ConcurrentMap<Integer, PerChannel> channelQueues = new ConcurrentHashMap<Integer, PerChannel>();
/**
* Global queues size
*/
private final AtomicLong queuesSize = new AtomicLong();
/**
* Maximum cumulative writing bytes for one channel among all (as long as channels stay the same)
*/
private final AtomicLong cumulativeWrittenBytes = new AtomicLong();
/**
* Maximum cumulative read bytes for one channel among all (as long as channels stay the same)
*/
private final AtomicLong cumulativeReadBytes = new AtomicLong();
/**
* Max size in the list before proposing to stop writing new objects from next handlers
* for all channel (global)
*/
long maxGlobalWriteSize = DEFAULT_MAX_SIZE * 100; // default 400MB
/**
* Limit in B/s to apply to write
*/
private volatile long writeChannelLimit;
/**
* Limit in B/s to apply to read
*/
private volatile long readChannelLimit;
private static final float DEFAULT_DEVIATION = 0.1F;
private static final float MAX_DEVIATION = 0.4F;
private static final float DEFAULT_SLOWDOWN = 0.4F;
private static final float DEFAULT_ACCELERATION = -0.1F;
private volatile float maxDeviation;
private volatile float accelerationFactor;
private volatile float slowDownFactor;
private volatile boolean readDeviationActive;
private volatile boolean writeDeviationActive;
static final class PerChannel {
List<ToSend> messagesQueue;
TrafficCounter channelTrafficCounter;
long queueSize;
long lastWriteTimestamp;
long lastReadTimestamp;
}
/**
* Create the global TrafficCounter.
*/
void createGlobalTrafficCounter(Timer timer) {
// Default
setMaxDeviation(DEFAULT_DEVIATION, DEFAULT_SLOWDOWN, DEFAULT_ACCELERATION);
if (timer == null) {
throw new IllegalArgumentException("Timer must not be null");
}
TrafficCounter tc = new GlobalChannelTrafficCounter(this, timer, "GlobalChannelTC", checkInterval);
setTrafficCounter(tc);
tc.start();
}
@Override
int userDefinedWritabilityIndex() {
return AbstractTrafficShapingHandler.GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
}
/**
* Create a new instance.
*
* @param timer
* the {@link Timer} to use for the {@link TrafficCounter}.
* @param writeGlobalLimit
* 0 or a limit in bytes/s
* @param readGlobalLimit
* 0 or a limit in bytes/s
* @param writeChannelLimit
* 0 or a limit in bytes/s
* @param readChannelLimit
* 0 or a limit in bytes/s
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed.
* @param maxTime
* The maximum delay to wait in case of traffic excess.
*/
public GlobalChannelTrafficShapingHandler(Timer timer,
long writeGlobalLimit, long readGlobalLimit,
long writeChannelLimit, long readChannelLimit,
long checkInterval, long maxTime) {
super(timer, writeGlobalLimit, readGlobalLimit, checkInterval, maxTime);
createGlobalTrafficCounter(timer);
this.writeChannelLimit = writeChannelLimit;
this.readChannelLimit = readChannelLimit;
}
/**
* Create a new instance.
*
* @param timer
* the {@link Timer} to use for the {@link TrafficCounter}.
* @param writeGlobalLimit
* 0 or a limit in bytes/s
* @param readGlobalLimit
* 0 or a limit in bytes/s
* @param writeChannelLimit
* 0 or a limit in bytes/s
* @param readChannelLimit
* 0 or a limit in bytes/s
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed.
*/
public GlobalChannelTrafficShapingHandler(Timer timer,
long writeGlobalLimit, long readGlobalLimit,
long writeChannelLimit, long readChannelLimit,
long checkInterval) {
super(timer, writeGlobalLimit, readGlobalLimit, checkInterval);
this.writeChannelLimit = writeChannelLimit;
this.readChannelLimit = readChannelLimit;
createGlobalTrafficCounter(timer);
}
/**
* Create a new instance.
*
* @param timer
* the {@link Timer} to use for the {@link TrafficCounter}.
* @param writeGlobalLimit
* 0 or a limit in bytes/s
* @param readGlobalLimit
* 0 or a limit in bytes/s
* @param writeChannelLimit
* 0 or a limit in bytes/s
* @param readChannelLimit
* 0 or a limit in bytes/s
*/
public GlobalChannelTrafficShapingHandler(Timer timer,
long writeGlobalLimit, long readGlobalLimit,
long writeChannelLimit, long readChannelLimit) {
super(timer, writeGlobalLimit, readGlobalLimit);
this.writeChannelLimit = writeChannelLimit;
this.readChannelLimit = readChannelLimit;
createGlobalTrafficCounter(timer);
}
/**
* Create a new instance.
*
* @param timer
* the {@link Timer} to use for the {@link TrafficCounter}.
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed.
*/
public GlobalChannelTrafficShapingHandler(Timer timer, long checkInterval) {
super(timer, checkInterval);
createGlobalTrafficCounter(timer);
}
/**
* Create a new instance.
*
* @param timer
* the {@link Timer} to use for the {@link TrafficCounter}.
*/
public GlobalChannelTrafficShapingHandler(Timer timer) {
super(timer);
createGlobalTrafficCounter(timer);
}
/**
* @param objectSizeEstimator ObjectSizeEstimator to use
* @param timer
* the {@link Timer} to use for the {@link TrafficCounter}.
* @param writeLimit write Global Limit
* 0 or a limit in bytes/s
* @param readLimit read Global Limit
* 0 or a limit in bytes/s
* @param writeChannelLimit
* 0 or a limit in bytes/s
* @param readChannelLimit
* 0 or a limit in bytes/s
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed.
* @param maxTime
* The maximum delay to wait in case of traffic excess.
*/
public GlobalChannelTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator, Timer timer, long writeLimit,
long readLimit, long writeChannelLimit, long readChannelLimit, long checkInterval, long maxTime) {
super(objectSizeEstimator, timer, writeLimit, readLimit, checkInterval, maxTime);
this.writeChannelLimit = writeChannelLimit;
this.readChannelLimit = readChannelLimit;
createGlobalTrafficCounter(timer);
}
/**
* @param objectSizeEstimator ObjectSizeEstimator to use
* @param timer
* the {@link Timer} to use for the {@link TrafficCounter}.
* @param writeLimit write Global Limit
* 0 or a limit in bytes/s
* @param readLimit read Global Limit
* 0 or a limit in bytes/s
* @param writeChannelLimit
* 0 or a limit in bytes/s
* @param readChannelLimit
* 0 or a limit in bytes/s
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed.
*/
public GlobalChannelTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator, Timer timer, long writeLimit,
long readLimit, long writeChannelLimit, long readChannelLimit, long checkInterval) {
super(objectSizeEstimator, timer, writeLimit, readLimit, checkInterval);
this.writeChannelLimit = writeChannelLimit;
this.readChannelLimit = readChannelLimit;
createGlobalTrafficCounter(timer);
}
/**
* @param objectSizeEstimator ObjectSizeEstimator to use
* @param timer
* the {@link Timer} to use for the {@link TrafficCounter}.
* @param writeLimit write Global Limit
* 0 or a limit in bytes/s
* @param readLimit read Global Limit
* 0 or a limit in bytes/s
* @param writeChannelLimit
* 0 or a limit in bytes/s
* @param readChannelLimit
* 0 or a limit in bytes/s
*/
public GlobalChannelTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator, Timer timer, long writeLimit,
long readLimit, long writeChannelLimit, long readChannelLimit) {
super(objectSizeEstimator, timer, writeLimit, readLimit);
this.writeChannelLimit = writeChannelLimit;
this.readChannelLimit = readChannelLimit;
createGlobalTrafficCounter(timer);
}
/**
* @param objectSizeEstimator ObjectSizeEstimator to use
* @param timer
* the {@link Timer} to use for the {@link TrafficCounter}.
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed.
*/
public GlobalChannelTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator, Timer timer,
long checkInterval) {
super(objectSizeEstimator, timer, checkInterval);
createGlobalTrafficCounter(timer);
}
/**
* @param objectSizeEstimator ObjectSizeEstimator to use
* @param timer
* the {@link Timer} to use for the {@link TrafficCounter}.
*/
public GlobalChannelTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator, Timer timer) {
super(objectSizeEstimator, timer);
createGlobalTrafficCounter(timer);
}
/**
* @return the current max deviation.
*/
public float maxDeviation() {
return maxDeviation;
}
/**
* @return the current acceleration factor.
*/
public float accelerationFactor() {
return accelerationFactor;
}
/**
* @return the current slow down factor.
*/
public float slowDownFactor() {
return slowDownFactor;
}
/**
* @param maxDeviation
* the maximum deviation to allow during computation of average, default deviation
* being 0.1, so +/-10% of the desired bandwidth. Maximum being 0.4.
* @param slowDownFactor
* the factor set as +x% to the too fast client (minimal value being 0, meaning no
* slow down factor), default being 40% (0.4).
* @param accelerationFactor
* the factor set as -x% to the too slow client (maximal value being 0, meaning no
* acceleration factor), default being -10% (-0.1).
*/
public void setMaxDeviation(float maxDeviation, float slowDownFactor, float accelerationFactor) {
if (maxDeviation > MAX_DEVIATION) {
throw new IllegalArgumentException("maxDeviation must be <= " + MAX_DEVIATION);
}
if (slowDownFactor < 0) {
throw new IllegalArgumentException("slowDownFactor must be >= 0");
}
if (accelerationFactor > 0) {
throw new IllegalArgumentException("accelerationFactor must be <= 0");
}
this.maxDeviation = maxDeviation;
this.accelerationFactor = 1 + accelerationFactor;
this.slowDownFactor = 1 + slowDownFactor;
}
private void computeDeviationCumulativeBytes() {
// compute the maximum cumulativeXxxxBytes among still connected Channels
long maxWrittenBytes = 0;
long maxReadBytes = 0;
long minWrittenBytes = Long.MAX_VALUE;
long minReadBytes = Long.MAX_VALUE;
for (PerChannel perChannel : channelQueues.values()) {
long value = perChannel.channelTrafficCounter.getCumulativeWrittenBytes();
if (maxWrittenBytes < value) {
maxWrittenBytes = value;
}
if (minWrittenBytes > value) {
minWrittenBytes = value;
}
value = perChannel.channelTrafficCounter.getCumulativeReadBytes();
if (maxReadBytes < value) {
maxReadBytes = value;
}
if (minReadBytes > value) {
minReadBytes = value;
}
}
boolean multiple = channelQueues.size() > 1;
readDeviationActive = multiple && minReadBytes < maxReadBytes / 2;
writeDeviationActive = multiple && minWrittenBytes < maxWrittenBytes / 2;
cumulativeWrittenBytes.set(maxWrittenBytes);
cumulativeReadBytes.set(maxReadBytes);
}
@Override
protected void doAccounting(TrafficCounter counter) {
computeDeviationCumulativeBytes();
super.doAccounting(counter);
}
private long computeBalancedWait(float maxLocal, float maxGlobal, long wait) {
if (maxGlobal == 0) {
// no change
return wait;
}
float ratio = maxLocal / maxGlobal;
// if in the boundaries, same value
if (ratio > maxDeviation) {
if (ratio < 1 - maxDeviation) {
return wait;
} else {
ratio = slowDownFactor;
if (wait < MINIMAL_WAIT) {
wait = MINIMAL_WAIT;
}
}
} else {
ratio = accelerationFactor;
}
return (long) (wait * ratio);
}
/**
* @return the maxGlobalWriteSize
*/
public long getMaxGlobalWriteSize() {
return maxGlobalWriteSize;
}
/**
* Note the change will be taken as best effort, meaning
* that all already scheduled traffics will not be
* changed, but only applied to new traffics.<br>
* So the expected usage of this method is to be used not too often,
* accordingly to the traffic shaping configuration.
*
* @param maxGlobalWriteSize the maximum Global Write Size allowed in the buffer
* globally for all channels before write suspended is set.
*/
public void setMaxGlobalWriteSize(long maxGlobalWriteSize) {
this.maxGlobalWriteSize = maxGlobalWriteSize;
}
/**
* @return the global size of the buffers for all queues.
*/
public long queuesSize() {
return queuesSize.get();
}
/**
* @param newWriteLimit Channel write limit
* @param newReadLimit Channel read limit
*/
public void configureChannel(long newWriteLimit, long newReadLimit) {
writeChannelLimit = newWriteLimit;
readChannelLimit = newReadLimit;
long now = TrafficCounter.milliSecondFromNano();
for (PerChannel perChannel : channelQueues.values()) {
perChannel.channelTrafficCounter.resetAccounting(now);
}
}
/**
* @return Channel write limit.
*/
public long getWriteChannelLimit() {
return writeChannelLimit;
}
/**
* @param writeLimit Channel write limit
*/
public void setWriteChannelLimit(long writeLimit) {
writeChannelLimit = writeLimit;
long now = TrafficCounter.milliSecondFromNano();
for (PerChannel perChannel : channelQueues.values()) {
perChannel.channelTrafficCounter.resetAccounting(now);
}
}
/**
* @return Channel read limit.
*/
public long getReadChannelLimit() {
return readChannelLimit;
}
/**
* @param readLimit Channel read limit
*/
public void setReadChannelLimit(long readLimit) {
readChannelLimit = readLimit;
long now = TrafficCounter.milliSecondFromNano();
for (PerChannel perChannel : channelQueues.values()) {
perChannel.channelTrafficCounter.resetAccounting(now);
}
}
/**
* Release all internal resources of this instance.
*/
public final void release() {
trafficCounter.stop();
}
private PerChannel getOrSetPerChannel(ChannelHandlerContext ctx) {
// ensure creation is limited to one thread per channel
Channel channel = ctx.getChannel();
Integer key = channel.hashCode();
PerChannel perChannel = channelQueues.get(key);
if (perChannel == null) {
perChannel = new PerChannel();
perChannel.messagesQueue = new LinkedList<ToSend>();
// Don't start it since managed through the Global one
perChannel.channelTrafficCounter = new TrafficCounter(this, null, "ChannelTC" +
ctx.getChannel().hashCode(), checkInterval);
perChannel.queueSize = 0L;
perChannel.lastReadTimestamp = TrafficCounter.milliSecondFromNano();
perChannel.lastWriteTimestamp = perChannel.lastReadTimestamp;
channelQueues.put(key, perChannel);
}
return perChannel;
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
getOrSetPerChannel(ctx);
trafficCounter.resetCumulativeTime();
super.channelConnected(ctx, e);
}
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
trafficCounter.resetCumulativeTime();
Channel channel = ctx.getChannel();
Integer key = channel.hashCode();
PerChannel perChannel = channelQueues.remove(key);
if (perChannel != null) {
// write operations need synchronization
synchronized (perChannel) {
queuesSize.addAndGet(-perChannel.queueSize);
perChannel.messagesQueue.clear();
}
}
releaseWriteSuspended(ctx);
releaseReadSuspended(ctx);
super.channelClosed(ctx, e);
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
throws Exception {
long now = TrafficCounter.milliSecondFromNano();
try {
ReadWriteStatus rws = checkAttachment(ctx);
long size = calculateSize(evt.getMessage());
if (size > 0) {
// compute the number of ms to wait before reopening the channel
// compute the number of ms to wait before reopening the channel
long waitGlobal = trafficCounter.readTimeToWait(size, getReadLimit(), maxTime, now);
Integer key = ctx.getChannel().hashCode();
PerChannel perChannel = channelQueues.get(key);
long wait = 0;
if (perChannel != null) {
wait = perChannel.channelTrafficCounter.readTimeToWait(size, readChannelLimit, maxTime, now);
if (readDeviationActive) {
// now try to balance between the channels
long maxLocalRead = 0;
maxLocalRead = perChannel.channelTrafficCounter.getCumulativeReadBytes();
long maxGlobalRead = cumulativeReadBytes.get();
if (maxLocalRead <= 0) {
maxLocalRead = 0;
}
if (maxGlobalRead < maxLocalRead) {
maxGlobalRead = maxLocalRead;
}
wait = computeBalancedWait(maxLocalRead, maxGlobalRead, wait);
}
}
if (wait < waitGlobal) {
wait = waitGlobal;
}
wait = checkWaitReadTime(ctx, wait, now);
if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
// time in order to try to limit the traffic
if (release.get()) {
return;
}
Channel channel = ctx.getChannel();
if (channel != null && channel.isConnected()) {
// Only AutoRead AND HandlerActive True means Context Active
if (logger.isDebugEnabled()) {
logger.debug("Read suspend: " + wait + ":" + channel.isReadable() + ":" +
rws.readSuspend);
}
if (timer == null) {
// Sleep since no executor
// logger.warn("Read sleep since no timer for "+wait+" ms for "+this);
Thread.sleep(wait);
return;
}
if (channel.isReadable() && ! rws.readSuspend) {
rws.readSuspend = true;
channel.setReadable(false);
if (logger.isDebugEnabled()) {
logger.debug("Suspend final status => " + channel.isReadable() + ":" +
rws.readSuspend);
}
// Create a Runnable to reactive the read if needed. If one was create before
// it will just be reused to limit object creation
if (rws.reopenReadTimerTask == null) {
rws.reopenReadTimerTask = new ReopenReadTimerTask(ctx);
}
timeout = timer.newTimeout(rws.reopenReadTimerTask, wait,
TimeUnit.MILLISECONDS);
}
}
}
}
} finally {
informReadOperation(ctx, now);
// The message is then forcedly passed to the next handler (not to super)
ctx.sendUpstream(evt);
}
}
@Override
protected long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
Integer key = ctx.getChannel().hashCode();
PerChannel perChannel = channelQueues.get(key);
if (perChannel != null) {
if (wait > maxTime && now + wait - perChannel.lastReadTimestamp > maxTime) {
wait = maxTime;
}
}
return wait;
}
@Override
protected void informReadOperation(final ChannelHandlerContext ctx, final long now) {
Integer key = ctx.getChannel().hashCode();
PerChannel perChannel = channelQueues.get(key);
if (perChannel != null) {
perChannel.lastReadTimestamp = now;
}
}
private static final class ToSend {
final long relativeTimeAction;
final MessageEvent toSend;
final long size;
private ToSend(final long delay, final MessageEvent toSend, final long size) {
this.relativeTimeAction = delay;
this.toSend = toSend;
this.size = size;
}
}
protected long maximumCumulativeWrittenBytes() {
return cumulativeWrittenBytes.get();
}
protected long maximumCumulativeReadBytes() {
return cumulativeReadBytes.get();
}
/**
* To allow for instance doAccounting to use the TrafficCounter per channel.
* @return the list of TrafficCounters that exists at the time of the call.
*/
public Collection<TrafficCounter> channelTrafficCounters() {
Collection<TrafficCounter> valueCollection = new AbstractCollection<TrafficCounter>() {
public Iterator<TrafficCounter> iterator() {
return new Iterator<TrafficCounter>() {
final Iterator<PerChannel> iter = channelQueues.values().iterator();
public boolean hasNext() {
return iter.hasNext();
}
public TrafficCounter next() {
return iter.next().channelTrafficCounter;
}
public void remove() {
throw new UnsupportedOperationException();
}
};
}
public int size() {
return channelQueues.size();
}
};
return valueCollection;
}
@Override
public void writeRequested(ChannelHandlerContext ctx, MessageEvent evt)
throws Exception {
long wait = 0;
long size = calculateSize(evt.getMessage());
long now = TrafficCounter.milliSecondFromNano();
try {
if (size > 0) {
// compute the number of ms to wait before continue with the channel
long waitGlobal = trafficCounter.writeTimeToWait(size, getWriteLimit(), maxTime, now);
Integer key = ctx.getChannel().hashCode();
PerChannel perChannel = channelQueues.get(key);
if (perChannel != null) {
wait = perChannel.channelTrafficCounter.writeTimeToWait(size, writeChannelLimit, maxTime, now);
if (writeDeviationActive) {
// now try to balance between the channels
long maxLocalWrite = 0;
maxLocalWrite = perChannel.channelTrafficCounter.getCumulativeWrittenBytes();
long maxGlobalWrite = cumulativeWrittenBytes.get();
if (maxLocalWrite <= 0) {
maxLocalWrite = 0;
}
if (maxGlobalWrite < maxLocalWrite) {
maxGlobalWrite = maxLocalWrite;
}
wait = computeBalancedWait(maxLocalWrite, maxGlobalWrite, wait);
}
}
if (wait < waitGlobal) {
wait = waitGlobal;
}
if (wait < MINIMAL_WAIT || release.get()) {
wait = 0;
}
}
} finally {
// The message is scheduled
submitWrite(ctx, evt, size, wait, now);
}
}
@Override
protected void submitWrite(final ChannelHandlerContext ctx, final MessageEvent evt,
final long size, final long writedelay, final long now) throws Exception {
Channel channel = ctx.getChannel();
Integer key = channel.hashCode();
PerChannel perChannel = channelQueues.get(key);
if (perChannel == null) {
// in case write occurs before handlerAdded is raized for this handler
// imply a synchronized only if needed
perChannel = getOrSetPerChannel(ctx);
}
final ToSend newToSend;
long delay = writedelay;
boolean globalSizeExceeded = false;
// write operations need synchronization
synchronized (perChannel) {
if (writedelay == 0 && perChannel.messagesQueue.isEmpty()) {
if (!channel.isConnected()) {
// ignore
return;
}
trafficCounter.bytesRealWriteFlowControl(size);
perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size);
ctx.sendDownstream(evt);
perChannel.lastWriteTimestamp = now;
return;
}
if (delay > maxTime && now + delay - perChannel.lastWriteTimestamp > maxTime) {
delay = maxTime;
}
if (timer == null) {
// Sleep since no executor
Thread.sleep(delay);
if (!ctx.getChannel().isConnected()) {
// ignore
return;
}
trafficCounter.bytesRealWriteFlowControl(size);
perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size);
ctx.sendDownstream(evt);
perChannel.lastWriteTimestamp = now;
return;
}
if (!ctx.getChannel().isConnected()) {
// ignore
return;
}
newToSend = new ToSend(delay + now, evt, size);
perChannel.messagesQueue.add(newToSend);
perChannel.queueSize += size;
queuesSize.addAndGet(size);
checkWriteSuspend(ctx, delay, perChannel.queueSize);
if (queuesSize.get() > maxGlobalWriteSize) {
globalSizeExceeded = true;
}
}
if (globalSizeExceeded) {
setWritable(ctx, false);
}
final long futureNow = newToSend.relativeTimeAction;
final PerChannel forSchedule = perChannel;
timer.newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
sendAllValid(ctx, forSchedule, futureNow);
}
}, delay, TimeUnit.MILLISECONDS);
}
private void sendAllValid(final ChannelHandlerContext ctx, final PerChannel perChannel, final long now)
throws Exception {
// write operations need synchronization
synchronized (perChannel) {
while (!perChannel.messagesQueue.isEmpty()) {
ToSend newToSend = perChannel.messagesQueue.remove(0);
if (newToSend.relativeTimeAction <= now) {
if (! ctx.getChannel().isConnected()) {
// ignore
break;
}
long size = newToSend.size;
trafficCounter.bytesRealWriteFlowControl(size);
perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size);
perChannel.queueSize -= size;
queuesSize.addAndGet(-size);
ctx.sendDownstream(newToSend.toSend);
perChannel.lastWriteTimestamp = now;
} else {
perChannel.messagesQueue.add(0, newToSend);
break;
}
}
if (perChannel.messagesQueue.isEmpty()) {
releaseWriteSuspended(ctx);
}
}
}
@Override
public String toString() {
return new StringBuilder(340).append(super.toString())
.append(" Write Channel Limit: ").append(writeChannelLimit)
.append(" Read Channel Limit: ").append(readChannelLimit).toString();
}
}

View File

@ -15,12 +15,13 @@
*/
package org.jboss.netty.handler.traffic;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
@ -32,51 +33,70 @@ import org.jboss.netty.util.ObjectSizeEstimator;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.Timer;
import org.jboss.netty.util.TimerTask;
import org.jboss.netty.util.internal.ConcurrentHashMap;
/**
* This implementation of the {@link AbstractTrafficShapingHandler} is for global
* <p>This implementation of the {@link AbstractTrafficShapingHandler} is for global
* traffic shaping, that is to say a global limitation of the bandwidth, whatever
* the number of opened channels.<br><br>
* the number of opened channels.</p>
*
* The general use should be as follow:<br>
* The general use should be as follow:
* <ul>
* <li>Create your unique GlobalTrafficShapingHandler like:<br><br>
* <tt>GlobalTrafficShapingHandler myHandler = new GlobalTrafficShapingHandler(timer);</tt><br><br>
* timer could be created using <tt>HashedWheelTimer</tt><br>
* <tt>pipeline.addLast("GLOBAL_TRAFFIC_SHAPING", myHandler);</tt><br><br>
* <li><p>Create your unique GlobalTrafficShapingHandler like:</p>
* <p><tt>GlobalTrafficShapingHandler myHandler = new GlobalTrafficShapingHandler(timer);</tt></p>
* <p>timer could be created using <tt>HashedWheelTimer</tt></p>
* <p><tt>pipeline.addLast("GLOBAL_TRAFFIC_SHAPING", myHandler);</tt></p>
*
* <b>Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created
* and shared among all channels as the counter must be shared among all channels.</b><br><br>
* <p><b>Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created
* and shared among all channels as the counter must be shared among all channels.</b></p>
*
* Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation)
* <p>Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation)
* or the check interval (in millisecond) that represents the delay between two computations of the
* bandwidth and so the call back of the doAccounting method (0 means no accounting at all).<br><br>
* bandwidth and so the call back of the doAccounting method (0 means no accounting at all).</p>
*
* A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting,
* <p>A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting,
* it is recommended to set a positive value, even if it is high since the precision of the
* Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
* the less precise the traffic shaping will be. It is suggested as higher value something close
* to 5 or 10 minutes.<br><br>
* to 5 or 10 minutes.</p>
*
* maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.<br><br>
* <p>maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.</p>
* </li>
* <li>Add it in your pipeline, before a recommended {@link ExecutionHandler} (like
* {@link OrderedMemoryAwareThreadPoolExecutor} or {@link MemoryAwareThreadPoolExecutor}).<br>
* <tt>pipeline.addLast("GLOBAL_TRAFFIC_SHAPING", myHandler);</tt><br><br>
* <li><p>Add it in your pipeline, before a recommended {@link ExecutionHandler} (like
* {@link OrderedMemoryAwareThreadPoolExecutor} or {@link MemoryAwareThreadPoolExecutor}).</p>
* <p><tt>pipeline.addLast("GLOBAL_TRAFFIC_SHAPING", myHandler);</tt></p>
* </li>
* <li>When you shutdown your application, release all the external resources
* by calling:<br>
* <tt>myHandler.releaseExternalResources();</tt><br>
* <li><p>When you shutdown your application, release all the external resources
* by calling:</p>
* <tt>myHandler.releaseExternalResources();</tt>
* </li>
* </ul><br>
* </ul>
*/
@Sharable
public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
private Map<Integer, List<ToSend>> messagesQueues = new HashMap<Integer, List<ToSend>>();
private final ConcurrentMap<Integer, PerChannel> channelQueues = new ConcurrentHashMap<Integer, PerChannel>();
/**
* Create the global TrafficCounter
* Global queues size
*/
private AtomicLong queuesSize = new AtomicLong();
/**
* Max size in the list before proposing to stop writing new objects from next handlers
* for all channel (global)
*/
long maxGlobalWriteSize = DEFAULT_MAX_SIZE * 100; // default 400MB
private static final class PerChannel {
List<ToSend> messagesQueue;
ChannelHandlerContext ctx;
long queueSize;
long lastWriteTimestamp;
long lastReadTimestamp;
}
/**
* Create the global TrafficCounter.
*/
void createGlobalTrafficCounter() {
TrafficCounter tc;
@ -150,54 +170,171 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
createGlobalTrafficCounter();
}
private static final class ToSend {
final long date;
final MessageEvent toSend;
/**
* @return the maxGlobalWriteSize default value being 400 MB.
*/
public long getMaxGlobalWriteSize() {
return maxGlobalWriteSize;
}
private ToSend(final long delay, final MessageEvent toSend) {
this.date = System.currentTimeMillis() + delay;
/**
* @param maxGlobalWriteSize the maximum Global Write Size allowed in the buffer
* globally for all channels before write suspended is set,
* default value being 400 MB.
*/
public void setMaxGlobalWriteSize(long maxGlobalWriteSize) {
this.maxGlobalWriteSize = maxGlobalWriteSize;
}
/**
* @return the global size of the buffers for all queues.
*/
public long queuesSize() {
return queuesSize.get();
}
private synchronized PerChannel getOrSetPerChannel(ChannelHandlerContext ctx) {
Integer key = ctx.getChannel().hashCode();
PerChannel perChannel = channelQueues.get(key);
if (perChannel == null) {
perChannel = new PerChannel();
perChannel.messagesQueue = new LinkedList<ToSend>();
perChannel.ctx = ctx;
perChannel.queueSize = 0L;
perChannel.lastReadTimestamp = TrafficCounter.milliSecondFromNano();
perChannel.lastWriteTimestamp = perChannel.lastReadTimestamp;
channelQueues.put(key, perChannel);
}
return perChannel;
}
private static final class ToSend {
final long relativeTimeAction;
final MessageEvent toSend;
final long size;
private ToSend(final long delay, final MessageEvent toSend, final long size) {
this.relativeTimeAction = delay;
this.toSend = toSend;
this.size = size;
}
}
@Override
protected synchronized void submitWrite(final ChannelHandlerContext ctx, final MessageEvent evt, final long delay)
throws Exception {
Integer key = ctx.getChannel().getId();
List<ToSend> messagesQueue = messagesQueues.get(key);
if (delay == 0 && (messagesQueue == null || messagesQueue.isEmpty())) {
internalSubmitWrite(ctx, evt);
return;
}
if (timer == null) {
// Sleep since no executor
Thread.sleep(delay);
internalSubmitWrite(ctx, evt);
return;
}
if (messagesQueue == null) {
messagesQueue = new LinkedList<ToSend>();
messagesQueues.put(key, messagesQueue);
}
final ToSend newToSend = new ToSend(delay, evt);
messagesQueue.add(newToSend);
final List<ToSend> mqfinal = messagesQueue;
timer.newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
sendAllValid(ctx, mqfinal);
long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
Integer key = ctx.getChannel().hashCode();
PerChannel perChannel = channelQueues.get(key);
if (perChannel != null) {
if (wait > maxTime && now + wait - perChannel.lastReadTimestamp > maxTime) {
wait = maxTime;
}
}, delay + 1, TimeUnit.MILLISECONDS);
}
return wait;
}
@Override
void informReadOperation(final ChannelHandlerContext ctx, final long now) {
Integer key = ctx.getChannel().hashCode();
PerChannel perChannel = channelQueues.get(key);
if (perChannel != null) {
perChannel.lastReadTimestamp = now;
}
}
private synchronized void sendAllValid(ChannelHandlerContext ctx, final List<ToSend> messagesQueue)
@Override
void submitWrite(final ChannelHandlerContext ctx, final MessageEvent evt,
final long size, final long writedelay, final long now)
throws Exception {
while (!messagesQueue.isEmpty()) {
ToSend newToSend = messagesQueue.remove(0);
if (newToSend.date <= System.currentTimeMillis()) {
internalSubmitWrite(ctx, newToSend.toSend);
} else {
messagesQueue.add(0, newToSend);
break;
PerChannel perChannel = getOrSetPerChannel(ctx);
long delay;
final ToSend newToSend;
boolean globalSizeExceeded = false;
Channel channel = ctx.getChannel();
synchronized (perChannel) {
if (writedelay == 0 && perChannel.messagesQueue.isEmpty()) {
if (! channel.isConnected()) {
// ignore
return;
}
if (trafficCounter != null) {
trafficCounter.bytesRealWriteFlowControl(size);
}
ctx.sendDownstream(evt);
perChannel.lastWriteTimestamp = now;
return;
}
delay = writedelay;
if (delay > maxTime && now + delay - perChannel.lastWriteTimestamp > maxTime) {
delay = maxTime;
}
if (timer == null) {
// Sleep since no executor
Thread.sleep(delay);
if (! ctx.getChannel().isConnected()) {
// ignore
return;
}
if (trafficCounter != null) {
trafficCounter.bytesRealWriteFlowControl(size);
}
ctx.sendDownstream(evt);
perChannel.lastWriteTimestamp = now;
return;
}
if (! ctx.getChannel().isConnected()) {
// ignore
return;
}
newToSend = new ToSend(delay + now, evt, size);
perChannel.messagesQueue.add(newToSend);
perChannel.queueSize += size;
queuesSize.addAndGet(size);
checkWriteSuspend(ctx, delay, perChannel.queueSize);
if (queuesSize.get() > maxGlobalWriteSize) {
globalSizeExceeded = true;
}
}
if (globalSizeExceeded) {
setWritable(ctx, false);
}
final long futureNow = newToSend.relativeTimeAction;
final PerChannel forSchedule = perChannel;
timer.newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
sendAllValid(ctx, forSchedule, futureNow);
}
}, delay, TimeUnit.MILLISECONDS);
}
private void sendAllValid(ChannelHandlerContext ctx, final PerChannel perChannel, final long now)
throws Exception {
Channel channel = ctx.getChannel();
if (! channel.isConnected()) {
// ignore
return;
}
synchronized (perChannel) {
while (!perChannel.messagesQueue.isEmpty()) {
ToSend newToSend = perChannel.messagesQueue.remove(0);
if (newToSend.relativeTimeAction <= now) {
if (! channel.isConnected()) {
// ignore
break;
}
long size = newToSend.size;
if (trafficCounter != null) {
trafficCounter.bytesRealWriteFlowControl(size);
}
perChannel.queueSize -= size;
queuesSize.addAndGet(-size);
ctx.sendDownstream(newToSend.toSend);
perChannel.lastWriteTimestamp = now;
} else {
perChannel.messagesQueue.add(0, newToSend);
break;
}
}
if (perChannel.messagesQueue.isEmpty()) {
releaseWriteSuspended(ctx);
}
}
}
@ -205,9 +342,7 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
Integer key = ctx.getChannel().getId();
List<ToSend> messagesQueue = new LinkedList<ToSend>();
messagesQueues.put(key, messagesQueue);
getOrSetPerChannel(ctx);
super.channelConnected(ctx, e);
}
@ -215,10 +350,35 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
Integer key = ctx.getChannel().hashCode();
List<ToSend> mq = messagesQueues.remove(key);
if (mq != null) {
mq.clear();
PerChannel perChannel = channelQueues.remove(key);
if (perChannel != null) {
synchronized (perChannel) {
queuesSize.addAndGet(-perChannel.queueSize);
perChannel.messagesQueue.clear();
}
}
super.channelClosed(ctx, e);
}
@Override
public void releaseExternalResources() {
for (PerChannel perChannel : channelQueues.values()) {
if (perChannel != null && perChannel.ctx != null && perChannel.ctx.getChannel().isConnected()) {
Channel channel = perChannel.ctx.getChannel();
synchronized (perChannel) {
for (ToSend toSend : perChannel.messagesQueue) {
if (! channel.isConnected()) {
// ignore
break;
}
perChannel.ctx.sendDownstream(toSend.toSend);
}
perChannel.messagesQueue.clear();
}
}
}
channelQueues.clear();
queuesSize.set(0);
super.releaseExternalResources();
}
}

View File

@ -22,12 +22,11 @@ import org.jboss.netty.util.Timer;
import org.jboss.netty.util.TimerTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* TrafficCounter is associated with {@link AbstractTrafficShapingHandler}.<br>
* <br>
* <p>TrafficCounter is associated with {@link AbstractTrafficShapingHandler}.</p>
*
* A TrafficCounter has for goal to count the traffic in order to enable to limit the traffic or not,
* globally or per channel. It compute statistics on read and written bytes at the specified
* interval and call back the {@link AbstractTrafficShapingHandler} doAccounting method at every
@ -37,6 +36,13 @@ import java.util.concurrent.atomic.AtomicLong;
public class TrafficCounter {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(TrafficCounter.class);
/**
* @return the time in ms using nanoTime, so not real EPOCH time but elapsed time in ms.
*/
public static final long milliSecondFromNano() {
return System.nanoTime() / 1000000;
}
/**
* Current written bytes
*/
@ -47,6 +53,16 @@ public class TrafficCounter {
*/
private final AtomicLong currentReadBytes = new AtomicLong();
/**
* Last writing time during current check interval
*/
private long writingTime;
/**
* Last reading delay during current check interval
*/
private long readingTime;
/**
* Long life written bytes
*/
@ -58,7 +74,7 @@ public class TrafficCounter {
private final AtomicLong cumulativeReadBytes = new AtomicLong();
/**
* Last Time where cumulative bytes where reset to zero
* Last Time where cumulative bytes where reset to zero: this time is a real EPOC time (informative only)
*/
private long lastCumulativeTime;
@ -75,37 +91,37 @@ public class TrafficCounter {
/**
* Last Time Check taken
*/
private final AtomicLong lastTime = new AtomicLong();
final AtomicLong lastTime = new AtomicLong();
/**
* Last written bytes number during last check interval
*/
private long lastWrittenBytes;
private volatile long lastWrittenBytes;
/**
* Last read bytes number during last check interval
*/
private long lastReadBytes;
private volatile long lastReadBytes;
/**
* Last non 0 written bytes number during last check interval
* Last future writing time during last check interval
*/
private long lastNonNullWrittenBytes;
private volatile long lastWritingTime;
/**
* Last time written bytes with non 0 written bytes
* Last reading time during last check interval
*/
private long lastNonNullWrittenTime;
private volatile long lastReadingTime;
/**
* Last time read bytes with non 0 written bytes
* Real written bytes
*/
private long lastNonNullReadTime;
private final AtomicLong realWrittenBytes = new AtomicLong();
/**
* Last non 0 read bytes number during last check interval
* Real writing bandwidth
*/
private long lastNonNullReadBytes;
private long realWriteThroughput;
/**
* Delay between two captures
@ -123,31 +139,33 @@ public class TrafficCounter {
/**
* The associated TrafficShapingHandler
*/
private final AbstractTrafficShapingHandler trafficShapingHandler;
final AbstractTrafficShapingHandler trafficShapingHandler;
/**
* One Timer for all Counter
*/
private final Timer timer; // replace executor
final Timer timer; // replace executor
/**
* Monitor created once in start()
*/
private TimerTask timerTask;
TimerTask timerTask;
/**
* used in stop() to cancel the timer
*/
private volatile Timeout timeout;
volatile Timeout timeout;
/**
* Is Monitor active
*/
final AtomicBoolean monitorActive = new AtomicBoolean();
volatile boolean monitorActive;
/**
* Class to implement monitoring at fix delay
*
*/
private static class TrafficMonitoringTask implements TimerTask {
private static final class TrafficMonitoringTask implements TimerTask {
/**
* The associated TrafficShapingHandler
*/
@ -166,11 +184,10 @@ public class TrafficCounter {
}
public void run(Timeout timeout) throws Exception {
if (!counter.monitorActive.get()) {
if (!counter.monitorActive) {
return;
}
long endTime = System.currentTimeMillis();
counter.resetAccounting(endTime);
counter.resetAccounting(milliSecondFromNano());
if (trafficShapingHandler1 != null) {
trafficShapingHandler1.doAccounting(counter);
}
@ -180,101 +197,99 @@ public class TrafficCounter {
}
/**
* Start the monitoring process
* Start the monitoring process.
*/
public void start() {
synchronized (lastTime) {
if (monitorActive.get()) {
return;
}
lastTime.set(System.currentTimeMillis());
if (checkInterval.get() > 0) {
monitorActive.set(true);
timerTask = new TrafficMonitoringTask(trafficShapingHandler, this);
timeout =
timer.newTimeout(timerTask, checkInterval.get(), TimeUnit.MILLISECONDS);
}
if (monitorActive) {
return;
}
lastTime.set(milliSecondFromNano());
// if executor is null, it means it is piloted by a GlobalChannelTrafficCounter, so no executor
if (checkInterval.get() > 0 && timer != null) {
monitorActive = true;
timerTask = new TrafficMonitoringTask(trafficShapingHandler, this);
timeout =
timer.newTimeout(timerTask, checkInterval.get(), TimeUnit.MILLISECONDS);
}
}
/**
* Stop the monitoring process
* Stop the monitoring process.
*/
public void stop() {
synchronized (lastTime) {
if (!monitorActive.get()) {
return;
}
monitorActive.set(false);
resetAccounting(System.currentTimeMillis());
if (trafficShapingHandler != null) {
trafficShapingHandler.doAccounting(this);
}
if (timeout != null) {
timeout.cancel();
}
if (!monitorActive) {
return;
}
monitorActive = false;
resetAccounting(milliSecondFromNano());
if (trafficShapingHandler != null) {
trafficShapingHandler.doAccounting(this);
}
if (timeout != null) {
timeout.cancel();
}
}
/**
* Reset the accounting on Read and Write
* Reset the accounting on Read and Write.
*/
void resetAccounting(long newLastTime) {
synchronized (lastTime) {
long interval = newLastTime - lastTime.getAndSet(newLastTime);
if (interval == 0) {
// nothing to do
return;
}
lastReadBytes = currentReadBytes.getAndSet(0);
lastWrittenBytes = currentWrittenBytes.getAndSet(0);
lastReadThroughput = lastReadBytes * 1000 / interval;
// nb byte / checkInterval in ms * 1000 (1s)
lastWriteThroughput = lastWrittenBytes * 1000 / interval;
// nb byte / checkInterval in ms * 1000 (1s)
}
if (lastWrittenBytes > 0) {
lastNonNullWrittenBytes = lastWrittenBytes;
lastNonNullWrittenTime = newLastTime;
}
if (lastReadBytes > 0) {
lastNonNullReadBytes = lastReadBytes;
lastNonNullReadTime = newLastTime;
long interval = newLastTime - lastTime.getAndSet(newLastTime);
if (interval == 0) {
// nothing to do
return;
}
lastReadBytes = currentReadBytes.getAndSet(0);
lastWrittenBytes = currentWrittenBytes.getAndSet(0);
lastReadThroughput = lastReadBytes * 1000 / interval;
// nb byte / checkInterval in ms * 1000 (1s)
lastWriteThroughput = lastWrittenBytes * 1000 / interval;
// nb byte / checkInterval in ms * 1000 (1s)
realWriteThroughput = realWrittenBytes.getAndSet(0) * 1000 / interval;
lastWritingTime = Math.max(lastWritingTime, writingTime);
lastReadingTime = Math.max(lastReadingTime, readingTime);
}
/**
* Constructor with the {@link AbstractTrafficShapingHandler} that hosts it, the Timer to use, its
* name, the checkInterval between two computations in millisecond
* name, the checkInterval between two computations in millisecond.
* @param trafficShapingHandler the associated AbstractTrafficShapingHandler
* @param timer
* Could be a HashedWheelTimer
* Could be a HashedWheelTimer, might be null when used
* from {@link GlobalChannelTrafficCounter}.
* @param name
* the name given to this monitor
* @param checkInterval
* the checkInterval in millisecond between two computations
* the checkInterval in millisecond between two computations.
*/
public TrafficCounter(AbstractTrafficShapingHandler trafficShapingHandler,
Timer timer, String name, long checkInterval) {
if (trafficShapingHandler == null) {
throw new IllegalArgumentException("TrafficShapingHandler must not be null");
}
this.trafficShapingHandler = trafficShapingHandler;
this.timer = timer;
this.name = name;
// absolute time: informative only
lastCumulativeTime = System.currentTimeMillis();
writingTime = milliSecondFromNano();
readingTime = writingTime;
lastWritingTime = writingTime;
lastReadingTime = writingTime;
configure(checkInterval);
}
/**
* Change checkInterval between
* two computations in millisecond
* two computations in millisecond.
*/
public void configure(long newcheckInterval) {
long newInterval = newcheckInterval / 10 * 10;
if (checkInterval.get() != newInterval) {
checkInterval.set(newInterval);
if (checkInterval.getAndSet(newInterval) != newInterval) {
if (newInterval <= 0) {
stop();
// No more active monitoring
lastTime.set(System.currentTimeMillis());
lastTime.set(milliSecondFromNano());
} else {
// Start if necessary
start();
@ -305,64 +320,69 @@ public class TrafficCounter {
}
/**
* Computes counters for Real Write.
*
* @param write
* the size in bytes to write
* @param schedule
* the time when this write was scheduled
*/
void bytesRealWriteFlowControl(long write) {
realWrittenBytes.addAndGet(write);
}
/**
* @return the current checkInterval between two computations of traffic counter
* in millisecond
* in millisecond.
*/
public long getCheckInterval() {
return checkInterval.get();
}
/**
*
* @return the Read Throughput in bytes/s computes in the last check interval
* @return the Read Throughput in bytes/s computes in the last check interval.
*/
public long getLastReadThroughput() {
return lastReadThroughput;
}
/**
*
* @return the Write Throughput in bytes/s computes in the last check interval
* @return the Write Throughput in bytes/s computes in the last check interval.
*/
public long getLastWriteThroughput() {
return lastWriteThroughput;
}
/**
*
* @return the number of bytes read during the last check Interval
* @return the number of bytes read during the last check Interval.
*/
public long getLastReadBytes() {
return lastReadBytes;
}
/**
*
* @return the number of bytes written during the last check Interval
* @return the number of bytes written during the last check Interval.
*/
public long getLastWrittenBytes() {
return lastWrittenBytes;
}
/**
*
* @return the current number of bytes read since the last checkInterval
* @return the current number of bytes read since the last checkInterval.
*/
public long getCurrentReadBytes() {
return currentReadBytes.get();
}
/**
*
* @return the current number of bytes written since the last check Interval
* @return the current number of bytes written since the last check Interval.
*/
public long getCurrentWrittenBytes() {
return currentWrittenBytes.get();
}
/**
* @return the Time in millisecond of the last check as of System.currentTimeMillis()
* @return the Time in millisecond of the last check as of System.currentTimeMillis().
*/
public long getLastTime() {
return lastTime.get();
@ -391,7 +411,22 @@ public class TrafficCounter {
}
/**
* Reset both read and written cumulative bytes counters and the associated time.
* @return the realWrittenBytes
*/
public AtomicLong getRealWrittenBytes() {
return realWrittenBytes;
}
/**
* @return the realWriteThroughput
*/
public long getRealWriteThroughput() {
return realWriteThroughput;
}
/**
* Reset both read and written cumulative bytes counters and the associated absolute time
* from System.currentTimeMillis().
*/
public void resetCumulativeTime() {
lastCumulativeTime = System.currentTimeMillis();
@ -401,112 +436,151 @@ public class TrafficCounter {
/**
* Returns the time to wait (if any) for the given length message, using the given limitTraffic and the max wait
* time
* time.
*
* @param size
* the recv size
* @param limitTraffic
* the traffic limit in bytes per second
* @param maxTime
* the max time in ms to wait in case of excess of traffic
* @return the current time to wait (in ms) if needed for Read operation
* the max time in ms to wait in case of excess of traffic.
* @return the current time to wait (in ms) if needed for Read operation.
*/
public synchronized long readTimeToWait(final long size, final long limitTraffic, final long maxTime) {
final long now = System.currentTimeMillis();
@Deprecated
public long readTimeToWait(final long size, final long limitTraffic, final long maxTime) {
return readTimeToWait(size, limitTraffic, maxTime, milliSecondFromNano());
}
/**
* Returns the time to wait (if any) for the given length message, using the given limitTraffic and the max wait
* time.
*
* @param size
* the recv size
* @param limitTraffic
* the traffic limit in bytes per second
* @param maxTime
* the max time in ms to wait in case of excess of traffic.
* @param now the current time
* @return the current time to wait (in ms) if needed for Read operation.
*/
public long readTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
bytesRecvFlowControl(size);
if (limitTraffic == 0) {
if (size == 0 || limitTraffic == 0) {
return 0;
}
final long lastTimeCheck = lastTime.get();
long sum = currentReadBytes.get();
long interval = now - lastTime.get();
// Short time checking
if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT && sum > 0) {
long time = (sum * 1000 / limitTraffic - interval) / 10 * 10;
long localReadingTime = readingTime;
long lastRB = lastReadBytes;
final long interval = now - lastTimeCheck;
long pastDelay = Math.max(lastReadingTime - lastTimeCheck, 0);
if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
// Enough interval time to compute shaping
long time = (sum * 1000 / limitTraffic - interval + pastDelay) / 10 * 10;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
logger.debug("Time: " + time + ":" + sum + ":" + interval);
logger.debug("Time: " + time + ":" + sum + ":" + interval + ":" + pastDelay);
}
return time > maxTime ? maxTime : time;
if (time > maxTime && now + time - localReadingTime > maxTime) {
time = maxTime;
}
readingTime = Math.max(localReadingTime, now + time);
return time;
}
readingTime = Math.max(localReadingTime, now);
return 0;
}
// long time checking
if (lastNonNullReadBytes > 0 && lastNonNullReadTime + AbstractTrafficShapingHandler.MINIMAL_WAIT < now) {
long lastsum = sum + lastNonNullReadBytes;
long lastinterval = now - lastNonNullReadTime;
long time = (lastsum * 1000 / limitTraffic - lastinterval) / 10 * 10;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval);
}
return time > maxTime ? maxTime : time;
// take the last read interval check to get enough interval time
long lastsum = sum + lastRB;
long lastinterval = interval + checkInterval.get();
long time = (lastsum * 1000 / limitTraffic - lastinterval + pastDelay) / 10 * 10;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval + ":" + pastDelay);
}
} else {
// final "middle" time checking in case resetAccounting called very recently
sum += lastReadBytes;
long lastinterval = AbstractTrafficShapingHandler.MINIMAL_WAIT;
long time = (sum * 1000 / limitTraffic - lastinterval) / 10 * 10;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
logger.debug("Time: " + time + ":" + sum + ":" + lastinterval);
}
return time > maxTime ? maxTime : time;
if (time > maxTime && now + time - localReadingTime > maxTime) {
time = maxTime;
}
readingTime = Math.max(localReadingTime, now + time);
return time;
}
readingTime = Math.max(localReadingTime, now);
return 0;
}
/**
* Returns the time to wait (if any) for the given length message, using the given limitTraffic and
* the max wait time
* the max wait time.
*
* @param size
* the write size
* @param limitTraffic
* the traffic limit in bytes per second
* @param maxTime
* the max time in ms to wait in case of excess of traffic
* @return the current time to wait (in ms) if needed for Write operation
* the max time in ms to wait in case of excess of traffic.
* @return the current time to wait (in ms) if needed for Write operation.
*/
public synchronized long writeTimeToWait(final long size, final long limitTraffic, final long maxTime) {
@Deprecated
public long writeTimeToWait(final long size, final long limitTraffic, final long maxTime) {
return writeTimeToWait(size, limitTraffic, maxTime, milliSecondFromNano());
}
/**
* Returns the time to wait (if any) for the given length message, using the given limitTraffic and
* the max wait time.
*
* @param size
* the write size
* @param limitTraffic
* the traffic limit in bytes per second
* @param maxTime
* the max time in ms to wait in case of excess of traffic.
* @param now the current time
* @return the current time to wait (in ms) if needed for Write operation.
*/
public long writeTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
bytesWriteFlowControl(size);
if (limitTraffic == 0) {
if (size == 0 || limitTraffic == 0) {
return 0;
}
final long lastTimeCheck = lastTime.get();
long sum = currentWrittenBytes.get();
final long now = System.currentTimeMillis();
long interval = now - lastTime.get();
if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT && sum > 0) {
long time = (sum * 1000 / limitTraffic - interval) / 10 * 10;
long lastWB = lastWrittenBytes;
long localWritingTime = writingTime;
long pastDelay = Math.max(lastWritingTime - lastTimeCheck, 0);
final long interval = now - lastTimeCheck;
if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
// Enough interval time to compute shaping
long time = (sum * 1000 / limitTraffic - interval + pastDelay) / 10 * 10;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
logger.debug("Time: " + time + ":" + sum + ":" + interval);
logger.debug("Time: " + time + ":" + sum + ":" + interval + ":" + pastDelay);
}
return time > maxTime ? maxTime : time;
if (time > maxTime && now + time - localWritingTime > maxTime) {
time = maxTime;
}
writingTime = Math.max(localWritingTime, now + time);
return time;
}
writingTime = Math.max(localWritingTime, now);
return 0;
}
if (lastNonNullWrittenBytes > 0 && lastNonNullWrittenTime + AbstractTrafficShapingHandler.MINIMAL_WAIT < now) {
long lastsum = sum + lastNonNullWrittenBytes;
long lastinterval = now - lastNonNullWrittenTime;
long time = (lastsum * 1000 / limitTraffic - lastinterval) / 10 * 10;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval);
}
return time > maxTime ? maxTime : time;
// take the last write interval check to get enough interval time
long lastsum = sum + lastWB;
long lastinterval = interval + checkInterval.get();
long time = (lastsum * 1000 / limitTraffic - lastinterval + pastDelay) / 10 * 10;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval + ":" + pastDelay);
}
} else {
sum += lastWrittenBytes;
long lastinterval = AbstractTrafficShapingHandler.MINIMAL_WAIT + Math.abs(interval);
long time = (sum * 1000 / limitTraffic - lastinterval) / 10 * 10;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
logger.debug("Time: " + time + ":" + sum + ":" + lastinterval);
}
return time > maxTime ? maxTime : time;
if (time > maxTime && now + time - localWritingTime > maxTime) {
time = maxTime;
}
writingTime = Math.max(localWritingTime, now + time);
return time;
}
writingTime = Math.max(localWritingTime, now);
return 0;
}
@ -522,10 +596,12 @@ public class TrafficCounter {
*/
@Override
public String toString() {
return "Monitor " + name + " Current Speed Read: " +
(lastReadThroughput >> 10) + " KB/s, Write: " +
(lastWriteThroughput >> 10) + " KB/s Current Read: " +
(currentReadBytes.get() >> 10) + " KB Current Write: " +
(currentWrittenBytes.get() >> 10) + " KB";
return new StringBuilder(165).append("Monitor ").append(name)
.append(" Current Speed Read: ").append(lastReadThroughput >> 10).append(" KB/s, ")
.append("Asked Write: ").append(lastWriteThroughput >> 10).append(" KB/s, ")
.append("Real Write: ").append(realWriteThroughput >> 10).append(" KB/s, ")
.append("Current Read: ").append(currentReadBytes.get() >> 10).append(" KB, ")
.append("Current asked Write: ").append(currentWrittenBytes.get() >> 10).append(" KB, ")
.append("Current real Write: ").append(realWrittenBytes.get() >> 10).append(" KB").toString();
}
}

View File

@ -0,0 +1,120 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel;
import org.junit.Test;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import static org.junit.Assert.*;
public class AbstractChannelTest {
private static class TestChannel extends AbstractChannel {
private static final Integer DUMMY_ID = 0;
private final ChannelConfig config;
private final SocketAddress localAddress = new InetSocketAddress(1);
private final SocketAddress remoteAddress = new InetSocketAddress(2);
TestChannel(ChannelPipeline pipeline, ChannelSink sink) {
super(DUMMY_ID, null, null, pipeline, sink);
config = new DefaultChannelConfig();
}
public ChannelConfig getConfig() {
return config;
}
public SocketAddress getLocalAddress() {
return localAddress;
}
public SocketAddress getRemoteAddress() {
return remoteAddress;
}
public boolean isBound() {
return true;
}
public boolean isConnected() {
return true;
}
}
private static class TestChannelSink extends AbstractChannelSink {
public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
// Do nothing
}
}
private static class TestChannelHandler extends SimpleChannelHandler {
private final StringBuilder buf = new StringBuilder();
@Override
public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
buf.append(ctx.getChannel().isWritable());
buf.append(' ');
super.channelInterestChanged(ctx, e);
}
}
@Test
public void testUserDefinedWritability() {
DefaultChannelPipeline pipeline = new DefaultChannelPipeline();
TestChannelHandler handler = new TestChannelHandler();
StringBuilder buf = handler.buf;
pipeline.addLast("TRACE", handler);
TestChannel channel = new TestChannel(pipeline, new TestChannelSink());
// No configuration by default on Low and High WaterMark for no Nio
// Ensure that the default value of a user-defined writability flag is true.
for (int i = 1; i <= 30; i ++) {
assertTrue(channel.getUserDefinedWritability(i));
}
assertEquals(0, channel.getInterestOps() & Channel.OP_WRITE);
// Ensure that setting a user-defined writability flag to false affects channel.isWritable();
channel.setUserDefinedWritability(1, false);
assertEquals("false ", buf.toString());
assertTrue((channel.getInterestOps() & Channel.OP_WRITE) != 0);
// Ensure that setting a user-defined writability flag to true affects channel.isWritable();
channel.setUserDefinedWritability(1, true);
assertEquals("false true ", buf.toString());
assertEquals(0, channel.getInterestOps() & Channel.OP_WRITE);
}
@Test
public void testUserDefinedWritability2() {
DefaultChannelPipeline pipeline = new DefaultChannelPipeline();
TestChannelHandler handler = new TestChannelHandler();
StringBuilder buf = handler.buf;
pipeline.addLast("TRACE", handler);
TestChannel channel = new TestChannel(pipeline, new TestChannelSink());
// No configuration by default on Low and High WaterMark for no Nio
// Ensure that the default value of a user-defined writability flag is true.
for (int i = 1; i <= 30; i ++) {
assertTrue(channel.getUserDefinedWritability(i));
}
assertEquals(0, channel.getInterestOps() & Channel.OP_WRITE);
// Ensure that setting a user-defined writability flag to false affects channel.isWritable();
channel.setUserDefinedWritability(1, false);
assertEquals("false ", buf.toString());
assertTrue((channel.getInterestOps() & Channel.OP_WRITE) != 0);
// Ensure that setting another user-defined writability flag to false does not trigger
// channelWritabilityChanged.
channel.setUserDefinedWritability(2, false);
assertEquals("false ", buf.toString());
assertTrue((channel.getInterestOps() & Channel.OP_WRITE) != 0);
// Ensure that setting only one user-defined writability flag to true does not affect channel.isWritable()
channel.setUserDefinedWritability(1, true);
assertEquals("false ", buf.toString());
assertTrue((channel.getInterestOps() & Channel.OP_WRITE) != 0);
// Ensure that setting all user-defined writability flags to true affects channel.isWritable()
channel.setUserDefinedWritability(2, true);
assertEquals("false true ", buf.toString());
assertEquals(0, channel.getInterestOps() & Channel.OP_WRITE);
}
}

View File

@ -0,0 +1,324 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.DefaultChannelPipeline;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.junit.Test;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.junit.Assert.*;
public class NioChannelTest {
private static class TestChannelHandler extends SimpleChannelHandler {
private final StringBuilder buf = new StringBuilder();
@Override
public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
buf.append(ctx.getChannel().isWritable());
buf.append(' ');
super.channelInterestChanged(ctx, e);
}
}
private static class TestChannelSink extends AbstractNioChannelSink {
public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
if (e instanceof MessageEvent) {
MessageEvent event = (MessageEvent) e;
NioSocketChannel channel = (NioSocketChannel) event.getChannel();
channel.writeBufferQueue.offer(event);
}
e.getFuture().setSuccess();
// Do nothing
}
}
private static class TestNioChannel extends NioSocketChannel {
TestNioChannel(ChannelPipeline pipeline, ChannelSink sink,
SocketChannel socket, NioWorker worker) {
super(null, null, pipeline, sink, socket, worker);
}
@Override
public boolean isConnected() {
return true;
}
@Override
public boolean isOpen() {
return true;
}
@Override
public boolean isBound() {
return true;
}
}
private static ChannelBuffer writeZero(int size) {
ChannelBuffer cb = ChannelBuffers.buffer(size);
cb.writeZero(size);
return cb;
}
@Test
public void testWritability() throws Exception {
DefaultChannelPipeline pipeline = new DefaultChannelPipeline();
TestChannelHandler handler = new TestChannelHandler();
StringBuilder buf = handler.buf;
pipeline.addLast("TRACE", handler);
SocketChannel socketChannel = SocketChannel.open();
ExecutorService executor = Executors.newCachedThreadPool();
NioWorker worker = new NioWorker(executor);
TestNioChannel channel = new TestNioChannel(pipeline, new TestChannelSink(),
socketChannel, worker);
channel.getConfig().setWriteBufferLowWaterMark(128);
channel.getConfig().setWriteBufferHighWaterMark(256);
// Startup check
assertEquals(0, channel.getInterestOps() & Channel.OP_WRITE);
// Ensure exceeding the low watermark does not make channel unwritable.
channel.write(writeZero(128)).await();
assertEquals("", buf.toString());
assertEquals(0, channel.getInterestOps() & Channel.OP_WRITE);
// Ensure exceeding the high watermark makes channel unwritable.
channel.write(writeZero(64)).await();
channel.write(writeZero(64)).await();
assertEquals("false ", buf.toString());
assertTrue((channel.getInterestOps() & Channel.OP_WRITE) != 0);
// Ensure going down to the low watermark makes channel writable again by flushing the first write.
assertNotNull(channel.writeBufferQueue.poll());
assertEquals(128, channel.writeBufferSize.get());
// once more since in Netty 3.9, the check is < and not <=
assertNotNull(channel.writeBufferQueue.poll());
assertEquals(64, channel.writeBufferSize.get());
assertEquals("false true ", buf.toString());
assertEquals(0, channel.getInterestOps() & Channel.OP_WRITE);
while (! channel.writeBufferQueue.isEmpty()) {
channel.writeBufferQueue.poll();
}
worker.shutdown();
executor.shutdown();
}
@Test
public void testUserDefinedWritability() throws Exception {
DefaultChannelPipeline pipeline = new DefaultChannelPipeline();
TestChannelHandler handler = new TestChannelHandler();
StringBuilder buf = handler.buf;
pipeline.addLast("TRACE", handler);
SocketChannel socketChannel = SocketChannel.open();
ExecutorService executor = Executors.newCachedThreadPool();
NioWorker worker = new NioWorker(executor);
TestNioChannel channel = new TestNioChannel(pipeline, new TestChannelSink(),
socketChannel, worker);
channel.getConfig().setWriteBufferLowWaterMark(128);
channel.getConfig().setWriteBufferHighWaterMark(256);
// Startup check
assertEquals(0, channel.getInterestOps() & Channel.OP_WRITE);
// Ensure that the default value of a user-defined writability flag is true.
for (int i = 1; i <= 30; i ++) {
assertTrue(channel.getUserDefinedWritability(i));
}
// Ensure that setting a user-defined writability flag to false affects channel.isWritable();
channel.setUserDefinedWritability(1, false);
assertEquals("false ", buf.toString());
assertTrue((channel.getInterestOps() & Channel.OP_WRITE) != 0);
// Ensure that setting a user-defined writability flag to true affects channel.isWritable();
channel.setUserDefinedWritability(1, true);
assertEquals("false true ", buf.toString());
assertEquals(0, channel.getInterestOps() & Channel.OP_WRITE);
while (! channel.writeBufferQueue.isEmpty()) {
channel.writeBufferQueue.poll();
}
worker.shutdown();
executor.shutdown();
}
@Test
public void testUserDefinedWritability2() throws Exception {
DefaultChannelPipeline pipeline = new DefaultChannelPipeline();
TestChannelHandler handler = new TestChannelHandler();
StringBuilder buf = handler.buf;
pipeline.addLast("TRACE", handler);
SocketChannel socketChannel = SocketChannel.open();
ExecutorService executor = Executors.newCachedThreadPool();
NioWorker worker = new NioWorker(executor);
TestNioChannel channel = new TestNioChannel(pipeline, new TestChannelSink(),
socketChannel, worker);
channel.getConfig().setWriteBufferLowWaterMark(128);
channel.getConfig().setWriteBufferHighWaterMark(256);
// Startup check
assertEquals(0, channel.getInterestOps() & Channel.OP_WRITE);
// Ensure that the default value of a user-defined writability flag is true.
for (int i = 1; i <= 30; i ++) {
assertTrue(channel.getUserDefinedWritability(i));
}
// Ensure that setting a user-defined writability flag to false affects channel.isWritable();
channel.setUserDefinedWritability(1, false);
assertEquals("false ", buf.toString());
assertTrue((channel.getInterestOps() & Channel.OP_WRITE) != 0);
// Ensure that setting another user-defined writability flag to false does not trigger
// channelWritabilityChanged.
channel.setUserDefinedWritability(2, false);
assertEquals("false ", buf.toString());
assertTrue((channel.getInterestOps() & Channel.OP_WRITE) != 0);
// Ensure that setting only one user-defined writability flag to true does not affect channel.isWritable()
channel.setUserDefinedWritability(1, true);
assertEquals("false ", buf.toString());
assertTrue((channel.getInterestOps() & Channel.OP_WRITE) != 0);
// Ensure that setting all user-defined writability flags to true affects channel.isWritable()
channel.setUserDefinedWritability(2, true);
assertEquals("false true ", buf.toString());
assertEquals(0, channel.getInterestOps() & Channel.OP_WRITE);
while (! channel.writeBufferQueue.isEmpty()) {
channel.writeBufferQueue.poll();
}
worker.shutdown();
executor.shutdown();
}
@Test
public void testMixedWritability() throws Exception {
DefaultChannelPipeline pipeline = new DefaultChannelPipeline();
TestChannelHandler handler = new TestChannelHandler();
StringBuilder buf = handler.buf;
pipeline.addLast("TRACE", handler);
SocketChannel socketChannel = SocketChannel.open();
ExecutorService executor = Executors.newCachedThreadPool();
NioWorker worker = new NioWorker(executor);
TestNioChannel channel = new TestNioChannel(pipeline, new TestChannelSink(),
socketChannel, worker);
channel.getConfig().setWriteBufferLowWaterMark(128);
channel.getConfig().setWriteBufferHighWaterMark(256);
// Startup check
assertEquals(0, channel.getInterestOps() & Channel.OP_WRITE);
// Ensure that the default value of a user-defined writability flag is true.
for (int i = 1; i <= 30; i ++) {
assertTrue(channel.getUserDefinedWritability(i));
}
// First case: write, userDefinedWritability off, poll, userDefinedWritability on
// Trigger channelWritabilityChanged() by writing a lot.
channel.write(writeZero(256));
assertEquals("false ", buf.toString());
assertTrue((channel.getInterestOps() & Channel.OP_WRITE) != 0);
// Ensure that setting a user-defined writability flag to false does not trigger channelWritabilityChanged()
channel.setUserDefinedWritability(1, false);
assertEquals("false ", buf.toString());
assertTrue((channel.getInterestOps() & Channel.OP_WRITE) != 0);
// Ensure reducing the totalPendingWriteBytes down to zero does not trigger channelWritabilityChannged()
// because of the user-defined writability flag.
assertNotNull(channel.writeBufferQueue.poll());
assertEquals(0, channel.writeBufferSize.get());
assertEquals("false ", buf.toString());
assertTrue((channel.getInterestOps() & Channel.OP_WRITE) != 0);
// Ensure that setting the user-defined writability flag to true triggers channelWritabilityChanged()
channel.setUserDefinedWritability(1, true);
assertEquals("false true ", buf.toString());
assertEquals(0, channel.getInterestOps() & Channel.OP_WRITE);
// second case: userDefinedWritability off, write, userDefinedWritability on, poll
// Ensure that setting a user-defined writability flag to false does trigger channelWritabilityChanged()
channel.setUserDefinedWritability(1, false);
assertEquals("false true false ", buf.toString());
assertTrue((channel.getInterestOps() & Channel.OP_WRITE) != 0);
// Since already triggered, writing a lot should not trigger it
channel.write(writeZero(256));
assertEquals("false true false ", buf.toString());
assertTrue((channel.getInterestOps() & Channel.OP_WRITE) != 0);
// Ensure that setting the user-defined writability flag to true does not yet
// trigger channelWritabilityChanged()
channel.setUserDefinedWritability(1, true);
assertEquals("false true false ", buf.toString());
assertTrue((channel.getInterestOps() & Channel.OP_WRITE) != 0);
// Ensure reducing the totalPendingWriteBytes down to zero does trigger channelWritabilityChannged()
assertNotNull(channel.writeBufferQueue.poll());
assertEquals(0, channel.writeBufferSize.get());
assertEquals("false true false true ", buf.toString());
assertEquals(0, channel.getInterestOps() & Channel.OP_WRITE);
// third case: write, userDefinedWritability off, userDefinedWritability on, poll
// Trigger channelWritabilityChanged() by writing a lot.
channel.write(writeZero(512));
assertEquals("false true false true false ", buf.toString());
assertTrue((channel.getInterestOps() & Channel.OP_WRITE) != 0);
// Ensure that setting a user-defined writability flag to false does not trigger channelWritabilityChanged()
channel.setUserDefinedWritability(1, false);
assertEquals("false true false true false ", buf.toString());
assertTrue((channel.getInterestOps() & Channel.OP_WRITE) != 0);
// Ensure that setting the user-defined writability flag to true does not triggers channelWritabilityChanged()
channel.setUserDefinedWritability(1, true);
assertEquals("false true false true false ", buf.toString());
assertTrue((channel.getInterestOps() & Channel.OP_WRITE) != 0);
// Ensure reducing the totalPendingWriteBytes down to zero does not trigger channelWritabilityChannged()
// because of the user-defined writability flag.
assertNotNull(channel.writeBufferQueue.poll());
assertEquals(0, channel.writeBufferSize.get());
assertEquals("false true false true false true ", buf.toString());
assertEquals(0, channel.getInterestOps() & Channel.OP_WRITE);
// fourth case: userDefinedWritability off, write, poll, userDefinedWritability on
// Ensure that setting a user-defined writability flag to false triggers channelWritabilityChanged()
channel.setUserDefinedWritability(1, false);
assertEquals("false true false true false true false ", buf.toString());
assertTrue((channel.getInterestOps() & Channel.OP_WRITE) != 0);
// Since already triggered, writing a lot should not trigger it
channel.write(writeZero(512));
assertEquals("false true false true false true false ", buf.toString());
assertTrue((channel.getInterestOps() & Channel.OP_WRITE) != 0);
// Ensure reducing the totalPendingWriteBytes down to zero does not trigger channelWritabilityChannged()
// because of the user-defined writability flag.
assertNotNull(channel.writeBufferQueue.poll());
assertEquals(0, channel.writeBufferSize.get());
assertEquals("false true false true false true false ", buf.toString());
assertTrue((channel.getInterestOps() & Channel.OP_WRITE) != 0);
// Ensure that setting the user-defined writability flag to true does triggers channelWritabilityChanged()
channel.setUserDefinedWritability(1, true);
assertEquals("false true false true false true false true ", buf.toString());
assertEquals(0, channel.getInterestOps() & Channel.OP_WRITE);
while (! channel.writeBufferQueue.isEmpty()) {
channel.writeBufferQueue.poll();
}
worker.shutdown();
executor.shutdown();
}
}

View File

@ -212,6 +212,16 @@ public class IpFilterRuleTest {
// NOOP
}
@Override
public boolean getUserDefinedWritability(int index) {
return false;
}
@Override
public void setUserDefinedWritability(int index, boolean isWritable) {
// NOOP
}
}, h, addr), addr);
System.err.println(result);
return result;