Merge pull request #350 from fredericBregier/master

Change Executor to Timer from Netty, in reference to Issue #345 in master branch
This commit is contained in:
Norman Maurer 2012-05-20 08:36:28 -07:00
commit ba4736f33b
5 changed files with 222 additions and 302 deletions

View File

@ -15,7 +15,7 @@
*/ */
package io.netty.handler.traffic; package io.netty.handler.traffic;
import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.channel.Channel; import io.netty.channel.Channel;
@ -30,7 +30,9 @@ import io.netty.handler.execution.DefaultObjectSizeEstimator;
import io.netty.logging.InternalLogger; import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory; import io.netty.logging.InternalLoggerFactory;
import io.netty.util.ExternalResourceReleasable; import io.netty.util.ExternalResourceReleasable;
import io.netty.util.internal.ExecutorUtil; import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
/** /**
* AbstractTrafficShapingHandler allows to limit the global bandwidth * AbstractTrafficShapingHandler allows to limit the global bandwidth
@ -41,6 +43,10 @@ import io.netty.util.internal.ExecutorUtil;
* the method doAccounting of this handler.<br> * the method doAccounting of this handler.<br>
* <br> * <br>
* *
* An {@link ObjectSizeEstimator} can be passed at construction to specify what
* is the size of the object to be read or write accordingly to the type of
* object. If not specified, it will used the {@link DefaultObjectSizeEstimator} implementation.<br><br>
*
* If you want for any particular reasons to stop the monitoring (accounting) or to change * If you want for any particular reasons to stop the monitoring (accounting) or to change
* the read/write limit or the check interval, several methods allow that for you:<br> * the read/write limit or the check interval, several methods allow that for you:<br>
* <ul> * <ul>
@ -79,10 +85,14 @@ public abstract class AbstractTrafficShapingHandler extends
private ObjectSizeEstimator objectSizeEstimator; private ObjectSizeEstimator objectSizeEstimator;
/** /**
* Executor to associated to any TrafficCounter * Timer to associated to any TrafficCounter
*/ */
protected Executor executor; protected Timer timer;
/**
* used in releaseExternalResources() to cancel the timer
*/
volatile private Timeout timeout = null;
/** /**
* Limit in B/s to apply to write * Limit in B/s to apply to write
*/ */
@ -105,15 +115,16 @@ public abstract class AbstractTrafficShapingHandler extends
*/ */
final AtomicBoolean release = new AtomicBoolean(false); final AtomicBoolean release = new AtomicBoolean(false);
private void init(ObjectSizeEstimator newObjectSizeEstimator, private void init(ObjectSizeEstimator newObjectSizeEstimator,
Executor newExecutor, long newWriteLimit, long newReadLimit, long newCheckInterval) { Timer newTimer, long newWriteLimit, long newReadLimit,
objectSizeEstimator = newObjectSizeEstimator; long newCheckInterval) {
executor = newExecutor; objectSizeEstimator = newObjectSizeEstimator;
writeLimit = newWriteLimit; timer = newTimer;
readLimit = newReadLimit; writeLimit = newWriteLimit;
checkInterval = newCheckInterval; readLimit = newReadLimit;
//logger.info("TSH: "+writeLimit+":"+readLimit+":"+checkInterval+":"+isPerChannel()); checkInterval = newCheckInterval;
} //logger.warn("TSH: "+writeLimit+":"+readLimit+":"+checkInterval);
}
/** /**
* *
@ -126,8 +137,8 @@ public abstract class AbstractTrafficShapingHandler extends
/** /**
* Constructor using default {@link ObjectSizeEstimator} * Constructor using default {@link ObjectSizeEstimator}
* *
* @param executor * @param timer
* created for instance like Executors.newCachedThreadPool * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
* @param writeLimit * @param writeLimit
* 0 or a limit in bytes/s * 0 or a limit in bytes/s
* @param readLimit * @param readLimit
@ -136,10 +147,9 @@ public abstract class AbstractTrafficShapingHandler extends
* The delay between two computations of performances for * The delay between two computations of performances for
* channels or 0 if no stats are to be computed * channels or 0 if no stats are to be computed
*/ */
public AbstractTrafficShapingHandler(Executor executor, long writeLimit, public AbstractTrafficShapingHandler(Timer timer, long writeLimit,
long readLimit, long checkInterval) { long readLimit, long checkInterval) {
init(new DefaultObjectSizeEstimator(), executor, writeLimit, readLimit, init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit, checkInterval);
checkInterval);
} }
/** /**
@ -148,8 +158,8 @@ public abstract class AbstractTrafficShapingHandler extends
* @param objectSizeEstimator * @param objectSizeEstimator
* the {@link ObjectSizeEstimator} that will be used to compute * the {@link ObjectSizeEstimator} that will be used to compute
* the size of the message * the size of the message
* @param executor * @param timer
* created for instance like Executors.newCachedThreadPool * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
* @param writeLimit * @param writeLimit
* 0 or a limit in bytes/s * 0 or a limit in bytes/s
* @param readLimit * @param readLimit
@ -159,26 +169,24 @@ public abstract class AbstractTrafficShapingHandler extends
* channels or 0 if no stats are to be computed * channels or 0 if no stats are to be computed
*/ */
public AbstractTrafficShapingHandler( public AbstractTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Executor executor, ObjectSizeEstimator objectSizeEstimator, Timer timer,
long writeLimit, long readLimit, long checkInterval) { long writeLimit, long readLimit, long checkInterval) {
init(objectSizeEstimator, executor, writeLimit, readLimit, init(objectSizeEstimator, timer, writeLimit, readLimit, checkInterval);
checkInterval);
} }
/** /**
* Constructor using default {@link ObjectSizeEstimator} and using default Check Interval * Constructor using default {@link ObjectSizeEstimator} and using default Check Interval
* *
* @param executor * @param timer
* created for instance like Executors.newCachedThreadPool * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
* @param writeLimit * @param writeLimit
* 0 or a limit in bytes/s * 0 or a limit in bytes/s
* @param readLimit * @param readLimit
* 0 or a limit in bytes/s * 0 or a limit in bytes/s
*/ */
public AbstractTrafficShapingHandler(Executor executor, long writeLimit, public AbstractTrafficShapingHandler(Timer timer, long writeLimit,
long readLimit) { long readLimit) {
init(new DefaultObjectSizeEstimator(), executor, writeLimit, readLimit, init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit, DEFAULT_CHECK_INTERVAL);
DEFAULT_CHECK_INTERVAL);
} }
/** /**
@ -187,29 +195,27 @@ public abstract class AbstractTrafficShapingHandler extends
* @param objectSizeEstimator * @param objectSizeEstimator
* the {@link ObjectSizeEstimator} that will be used to compute * the {@link ObjectSizeEstimator} that will be used to compute
* the size of the message * the size of the message
* @param executor * @param timer
* created for instance like Executors.newCachedThreadPool * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
* @param writeLimit * @param writeLimit
* 0 or a limit in bytes/s * 0 or a limit in bytes/s
* @param readLimit * @param readLimit
* 0 or a limit in bytes/s * 0 or a limit in bytes/s
*/ */
public AbstractTrafficShapingHandler( public AbstractTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Executor executor, ObjectSizeEstimator objectSizeEstimator, Timer timer,
long writeLimit, long readLimit) { long writeLimit, long readLimit) {
init(objectSizeEstimator, executor, writeLimit, readLimit, init(objectSizeEstimator, timer, writeLimit, readLimit, DEFAULT_CHECK_INTERVAL);
DEFAULT_CHECK_INTERVAL);
} }
/** /**
* Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT and default Check Interval * Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT and default Check Interval
* *
* @param executor * @param timer
* created for instance like Executors.newCachedThreadPool * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
*/ */
public AbstractTrafficShapingHandler(Executor executor) { public AbstractTrafficShapingHandler(Timer timer) {
init(new DefaultObjectSizeEstimator(), executor, 0, 0, init(new DefaultObjectSizeEstimator(), timer, 0, 0, DEFAULT_CHECK_INTERVAL);
DEFAULT_CHECK_INTERVAL);
} }
/** /**
@ -218,25 +224,25 @@ public abstract class AbstractTrafficShapingHandler extends
* @param objectSizeEstimator * @param objectSizeEstimator
* the {@link ObjectSizeEstimator} that will be used to compute * the {@link ObjectSizeEstimator} that will be used to compute
* the size of the message * the size of the message
* @param executor * @param timer
* created for instance like Executors.newCachedThreadPool * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
*/ */
public AbstractTrafficShapingHandler( public AbstractTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Executor executor) { ObjectSizeEstimator objectSizeEstimator, Timer timer) {
init(objectSizeEstimator, executor, 0, 0, DEFAULT_CHECK_INTERVAL); init(objectSizeEstimator, timer, 0, 0, DEFAULT_CHECK_INTERVAL);
} }
/** /**
* Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT * Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT
* *
* @param executor * @param timer
* created for instance like Executors.newCachedThreadPool * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
* @param checkInterval * @param checkInterval
* The delay between two computations of performances for * The delay between two computations of performances for
* channels or 0 if no stats are to be computed * channels or 0 if no stats are to be computed
*/ */
public AbstractTrafficShapingHandler(Executor executor, long checkInterval) { public AbstractTrafficShapingHandler(Timer timer, long checkInterval) {
init(new DefaultObjectSizeEstimator(), executor, 0, 0, checkInterval); init(new DefaultObjectSizeEstimator(), timer, 0, 0, checkInterval);
} }
/** /**
@ -245,20 +251,24 @@ public abstract class AbstractTrafficShapingHandler extends
* @param objectSizeEstimator * @param objectSizeEstimator
* the {@link ObjectSizeEstimator} that will be used to compute * the {@link ObjectSizeEstimator} that will be used to compute
* the size of the message * the size of the message
* @param executor * @param timer
* created for instance like Executors.newCachedThreadPool * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
* @param checkInterval * @param checkInterval
* The delay between two computations of performances for * The delay between two computations of performances for
* channels or 0 if no stats are to be computed * channels or 0 if no stats are to be computed
*/ */
public AbstractTrafficShapingHandler( public AbstractTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Executor executor, ObjectSizeEstimator objectSizeEstimator, Timer timer,
long checkInterval) { long checkInterval) {
init(objectSizeEstimator, executor, 0, 0, checkInterval); init(objectSizeEstimator, timer, 0, 0, checkInterval);
} }
/** /**
* Change the underlying limitations and check interval. * Change the underlying limitations and check interval.
*
* @param newWriteLimit
* @param newReadLimit
* @param newCheckInterval
*/ */
public void configure(long newWriteLimit, long newReadLimit, public void configure(long newWriteLimit, long newReadLimit,
long newCheckInterval) { long newCheckInterval) {
@ -268,6 +278,9 @@ public abstract class AbstractTrafficShapingHandler extends
/** /**
* Change the underlying limitations. * Change the underlying limitations.
*
* @param newWriteLimit
* @param newReadLimit
*/ */
public void configure(long newWriteLimit, long newReadLimit) { public void configure(long newWriteLimit, long newReadLimit) {
writeLimit = newWriteLimit; writeLimit = newWriteLimit;
@ -279,6 +292,8 @@ public abstract class AbstractTrafficShapingHandler extends
/** /**
* Change the check interval. * Change the check interval.
*
* @param newCheckInterval
*/ */
public void configure(long newCheckInterval) { public void configure(long newCheckInterval) {
checkInterval = newCheckInterval; checkInterval = newCheckInterval;
@ -300,46 +315,25 @@ public abstract class AbstractTrafficShapingHandler extends
/** /**
* Class to implement setReadable at fix time * Class to implement setReadable at fix time
*/ */
private class ReopenRead implements Runnable { private class ReopenReadTimerTask implements TimerTask {
/** ChannelHandlerContext ctx;
* Associated ChannelHandlerContext ReopenReadTimerTask(ChannelHandlerContext ctx) {
*/
private ChannelHandlerContext ctx;
/**
* Time to wait before clearing the channel
*/
private long timeToWait;
/**
* @param ctx
* the associated channelHandlerContext
* @param timeToWait
*/
protected ReopenRead(ChannelHandlerContext ctx, long timeToWait) {
this.ctx = ctx; this.ctx = ctx;
this.timeToWait = timeToWait;
} }
public void run(Timeout timeoutArg) throws Exception {
/** //logger.warn("Start RRTT: "+release.get());
* Truly run the waken up of the channel if (release.get()) {
*/
@Override
public void run() {
try {
if (release.get()) {
return;
}
Thread.sleep(timeToWait);
} catch (InterruptedException e) {
// interruption so exit
return; return;
} }
// logger.info("WAKEUP!"); /*
logger.warn("WAKEUP! "+
(ctx != null && ctx.getChannel() != null &&
ctx.getChannel().isConnected()));
*/
if (ctx != null && ctx.getChannel() != null && if (ctx != null && ctx.getChannel() != null &&
ctx.getChannel().isConnected()) { ctx.getChannel().isConnected()) {
//logger.info(" setReadable TRUE: "+timeToWait); //logger.warn(" setReadable TRUE: ");
// readSuspended = false; // readSuspended = false;
ctx.setAttachment(null); ctx.setAttachment(null);
ctx.getChannel().setReadable(true); ctx.getChannel().setReadable(true);
@ -359,7 +353,7 @@ public abstract class AbstractTrafficShapingHandler extends
// Time is too short, so just lets continue // Time is too short, so just lets continue
return 0; return 0;
} }
return ((bytes * 1000 / limit - interval)/10)*10; return ((bytes * 1000 / limit - interval) / 10) * 10;
} }
@Override @Override
@ -375,17 +369,18 @@ public abstract class AbstractTrafficShapingHandler extends
return; return;
} }
// compute the number of ms to wait before reopening the channel // compute the number of ms to wait before reopening the channel
long wait = getTimeToWait(readLimit, trafficCounter long wait = getTimeToWait(readLimit,
.getCurrentReadBytes(), trafficCounter.getLastTime(), trafficCounter.getCurrentReadBytes(),
curtime); trafficCounter.getLastTime(), curtime);
if (wait > MINIMAL_WAIT) { // At least 10ms seems a minimal time in order to if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
// time in order to
Channel channel = arg0.getChannel(); Channel channel = arg0.getChannel();
// try to limit the traffic // try to limit the traffic
if (channel != null && channel.isConnected()) { if (channel != null && channel.isConnected()) {
// Channel version // Channel version
if (executor == null) { if (timer == null) {
// Sleep since no executor // Sleep since no executor
//logger.info("Read sleep since no executor for "+wait+" ms for "+this); // logger.warn("Read sleep since no timer for "+wait+" ms for "+this);
if (release.get()) { if (release.get()) {
return; return;
} }
@ -396,11 +391,14 @@ public abstract class AbstractTrafficShapingHandler extends
// readSuspended = true; // readSuspended = true;
arg0.setAttachment(Boolean.TRUE); arg0.setAttachment(Boolean.TRUE);
channel.setReadable(false); channel.setReadable(false);
//logger.info("Read will wakeup after "+wait+" ms "+this); // logger.warn("Read will wakeup after "+wait+" ms "+this);
executor.execute(new ReopenRead(arg0, wait)); TimerTask timerTask = new ReopenReadTimerTask(arg0);
timeout = timer.newTimeout(timerTask, wait,
TimeUnit.MILLISECONDS);
} else { } else {
// should be waiting: but can occurs sometime so as a FIX // should be waiting: but can occurs sometime so as
//logger.info("Read sleep ok but should not be here: "+wait+" "+this); // a FIX
// logger.warn("Read sleep ok but should not be here: "+wait+" "+this);
if (release.get()) { if (release.get()) {
return; return;
} }
@ -408,7 +406,7 @@ public abstract class AbstractTrafficShapingHandler extends
} }
} else { } else {
// Not connected or no channel // Not connected or no channel
//logger.info("Read sleep "+wait+" ms for "+this); // logger.warn("Read sleep "+wait+" ms for "+this);
if (release.get()) { if (release.get()) {
return; return;
} }
@ -433,11 +431,12 @@ public abstract class AbstractTrafficShapingHandler extends
if (writeLimit == 0) { if (writeLimit == 0) {
return; return;
} }
// compute the number of ms to wait before continue with the channel // compute the number of ms to wait before continue with the
long wait = getTimeToWait(writeLimit, trafficCounter // channel
.getCurrentWrittenBytes(), trafficCounter.getLastTime(), long wait = getTimeToWait(writeLimit,
curtime); trafficCounter.getCurrentWrittenBytes(),
if (wait > MINIMAL_WAIT) { trafficCounter.getLastTime(), curtime);
if (wait >= MINIMAL_WAIT) {
// Global or Channel // Global or Channel
if (release.get()) { if (release.get()) {
return; return;
@ -450,7 +449,6 @@ public abstract class AbstractTrafficShapingHandler extends
super.writeRequested(arg0, arg1); super.writeRequested(arg0, arg1);
} }
} }
@Override @Override
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception { throws Exception {
@ -481,13 +479,14 @@ public abstract class AbstractTrafficShapingHandler extends
return trafficCounter; return trafficCounter;
} }
@Override
public void releaseExternalResources() { public void releaseExternalResources() {
if (trafficCounter != null) { if (trafficCounter != null) {
trafficCounter.stop(); trafficCounter.stop();
} }
release.set(true); release.set(true);
ExecutorUtil.terminate(executor); if (timeout != null) {
timeout.cancel();
}
} }
@Override @Override

View File

@ -15,8 +15,6 @@
*/ */
package io.netty.handler.traffic; package io.netty.handler.traffic;
import java.util.concurrent.Executor;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipelineFactory; import io.netty.channel.ChannelPipelineFactory;
import io.netty.channel.ChannelStateEvent; import io.netty.channel.ChannelStateEvent;
@ -24,6 +22,7 @@ import io.netty.handler.execution.ExecutionHandler;
import io.netty.handler.execution.MemoryAwareThreadPoolExecutor; import io.netty.handler.execution.MemoryAwareThreadPoolExecutor;
import io.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor; import io.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
import io.netty.handler.execution.ObjectSizeEstimator; import io.netty.handler.execution.ObjectSizeEstimator;
import io.netty.util.Timer;
/** /**
* This implementation of the {@link AbstractTrafficShapingHandler} is for channel * This implementation of the {@link AbstractTrafficShapingHandler} is for channel
@ -33,8 +32,8 @@ import io.netty.handler.execution.ObjectSizeEstimator;
* <ul> * <ul>
* <li>Add in your pipeline a new ChannelTrafficShapingHandler, before a recommended {@link ExecutionHandler} (like * <li>Add in your pipeline a new ChannelTrafficShapingHandler, before a recommended {@link ExecutionHandler} (like
* {@link OrderedMemoryAwareThreadPoolExecutor} or {@link MemoryAwareThreadPoolExecutor}).<br> * {@link OrderedMemoryAwareThreadPoolExecutor} or {@link MemoryAwareThreadPoolExecutor}).<br>
* <tt>ChannelTrafficShapingHandler myHandler = new ChannelTrafficShapingHandler(executor);</tt><br> * <tt>ChannelTrafficShapingHandler myHandler = new ChannelTrafficShapingHandler(timer);</tt><br>
* executor could be created using <tt>Executors.newCachedThreadPool();</tt><br> * timer could be created using <tt>HashedWheelTimer<tt><br>
* <tt>pipeline.addLast("CHANNEL_TRAFFIC_SHAPING", myHandler);</tt><br><br> * <tt>pipeline.addLast("CHANNEL_TRAFFIC_SHAPING", myHandler);</tt><br><br>
* *
* <b>Note that this handler has a Pipeline Coverage of "one" which means a new handler must be created * <b>Note that this handler has a Pipeline Coverage of "one" which means a new handler must be created
@ -52,7 +51,7 @@ import io.netty.handler.execution.ObjectSizeEstimator;
* the less precise the traffic shaping will be. It is suggested as higher value something close * the less precise the traffic shaping will be. It is suggested as higher value something close
* to 5 or 10 minutes.<br> * to 5 or 10 minutes.<br>
* </li> * </li>
* <li>When you shutdown your application, release all the external resources like the executor * <li>When you shutdown your application, release all the external resources (except the timer internal itself)
* by calling:<br> * by calling:<br>
* <tt>myHandler.releaseExternalResources();</tt><br> * <tt>myHandler.releaseExternalResources();</tt><br>
* </li> * </li>
@ -60,96 +59,53 @@ import io.netty.handler.execution.ObjectSizeEstimator;
*/ */
public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler { public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler {
/** public ChannelTrafficShapingHandler(Timer timer, long writeLimit,
* @param executor
* @param writeLimit
* @param readLimit
* @param checkInterval
*/
public ChannelTrafficShapingHandler(Executor executor, long writeLimit,
long readLimit, long checkInterval) { long readLimit, long checkInterval) {
super(executor, writeLimit, readLimit, checkInterval); super(timer, writeLimit, readLimit, checkInterval);
} }
/** public ChannelTrafficShapingHandler(Timer timer, long writeLimit,
* @param executor
* @param writeLimit
* @param readLimit
*/
public ChannelTrafficShapingHandler(Executor executor, long writeLimit,
long readLimit) { long readLimit) {
super(executor, writeLimit, readLimit); super(timer, writeLimit, readLimit);
}
/**
* @param executor
* @param checkInterval
*/
public ChannelTrafficShapingHandler(Executor executor, long checkInterval) {
super(executor, checkInterval);
} }
/** public ChannelTrafficShapingHandler(Timer timer, long checkInterval) {
* @param executor super(timer, checkInterval);
*/ }
public ChannelTrafficShapingHandler(Executor executor) {
super(executor); public ChannelTrafficShapingHandler(Timer timer) {
super(timer);
} }
/**
* @param objectSizeEstimator
* @param executor
* @param writeLimit
* @param readLimit
* @param checkInterval
*/
public ChannelTrafficShapingHandler( public ChannelTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Executor executor, ObjectSizeEstimator objectSizeEstimator, Timer timer,
long writeLimit, long readLimit, long checkInterval) { long writeLimit, long readLimit, long checkInterval) {
super(objectSizeEstimator, executor, writeLimit, readLimit, super(objectSizeEstimator, timer, writeLimit, readLimit,
checkInterval); checkInterval);
} }
/**
* @param objectSizeEstimator
* @param executor
* @param writeLimit
* @param readLimit
*/
public ChannelTrafficShapingHandler( public ChannelTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Executor executor, ObjectSizeEstimator objectSizeEstimator, Timer timer,
long writeLimit, long readLimit) { long writeLimit, long readLimit) {
super(objectSizeEstimator, executor, writeLimit, readLimit); super(objectSizeEstimator, timer, writeLimit, readLimit);
} }
/**
* @param objectSizeEstimator
* @param executor
* @param checkInterval
*/
public ChannelTrafficShapingHandler( public ChannelTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Executor executor, ObjectSizeEstimator objectSizeEstimator, Timer timer,
long checkInterval) { long checkInterval) {
super(objectSizeEstimator, executor, checkInterval); super(objectSizeEstimator, timer, checkInterval);
} }
/**
* @param objectSizeEstimator
* @param executor
*/
public ChannelTrafficShapingHandler( public ChannelTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Executor executor) { ObjectSizeEstimator objectSizeEstimator, Timer timer) {
super(objectSizeEstimator, executor); super(objectSizeEstimator, timer);
} }
@Override @Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception { throws Exception {
if (trafficCounter != null) { if (trafficCounter != null) {
trafficCounter.stop(); trafficCounter.stop();
trafficCounter = null;
} }
super.channelClosed(ctx, e); super.channelClosed(ctx, e);
} }
@ -162,8 +118,10 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler
ctx.getChannel().setReadable(false); ctx.getChannel().setReadable(false);
if (trafficCounter == null) { if (trafficCounter == null) {
// create a new counter now // create a new counter now
trafficCounter = new TrafficCounter(this, executor, "ChannelTC" + if (timer != null) {
ctx.getChannel().getId(), checkInterval); trafficCounter = new TrafficCounter(this, timer, "ChannelTC" +
ctx.getChannel().getId(), checkInterval);
}
} }
if (trafficCounter != null) { if (trafficCounter != null) {
trafficCounter.start(); trafficCounter.start();

View File

@ -15,13 +15,12 @@
*/ */
package io.netty.handler.traffic; package io.netty.handler.traffic;
import java.util.concurrent.Executor;
import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandler.Sharable;
import io.netty.handler.execution.ExecutionHandler; import io.netty.handler.execution.ExecutionHandler;
import io.netty.handler.execution.MemoryAwareThreadPoolExecutor; import io.netty.handler.execution.MemoryAwareThreadPoolExecutor;
import io.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor; import io.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
import io.netty.handler.execution.ObjectSizeEstimator; import io.netty.handler.execution.ObjectSizeEstimator;
import io.netty.util.Timer;
/** /**
* This implementation of the {@link AbstractTrafficShapingHandler} is for global * This implementation of the {@link AbstractTrafficShapingHandler} is for global
@ -31,8 +30,8 @@ import io.netty.handler.execution.ObjectSizeEstimator;
* The general use should be as follow:<br> * The general use should be as follow:<br>
* <ul> * <ul>
* <li>Create your unique GlobalTrafficShapingHandler like:<br><br> * <li>Create your unique GlobalTrafficShapingHandler like:<br><br>
* <tt>GlobalTrafficShapingHandler myHandler = new GlobalTrafficShapingHandler(executor);</tt><br><br> * <tt>GlobalTrafficShapingHandler myHandler = new GlobalTrafficShapingHandler(timer);</tt><br><br>
* executor could be created using <tt>Executors.newCachedThreadPool();</tt><br> * timer could be created using <tt>HashedWheelTimer<tt><br>
* <tt>pipeline.addLast("GLOBAL_TRAFFIC_SHAPING", myHandler);</tt><br><br> * <tt>pipeline.addLast("GLOBAL_TRAFFIC_SHAPING", myHandler);</tt><br><br>
* *
* <b>Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created * <b>Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created
@ -52,7 +51,7 @@ import io.netty.handler.execution.ObjectSizeEstimator;
* {@link OrderedMemoryAwareThreadPoolExecutor} or {@link MemoryAwareThreadPoolExecutor}).<br> * {@link OrderedMemoryAwareThreadPoolExecutor} or {@link MemoryAwareThreadPoolExecutor}).<br>
* <tt>pipeline.addLast("GLOBAL_TRAFFIC_SHAPING", myHandler);</tt><br><br> * <tt>pipeline.addLast("GLOBAL_TRAFFIC_SHAPING", myHandler);</tt><br><br>
* </li> * </li>
* <li>When you shutdown your application, release all the external resources like the executor * <li>When you shutdown your application, release all the external resources (except the timer internal itself)
* by calling:<br> * by calling:<br>
* <tt>myHandler.releaseExternalResources();</tt><br> * <tt>myHandler.releaseExternalResources();</tt><br>
* </li> * </li>
@ -64,96 +63,61 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
* Create the global TrafficCounter * Create the global TrafficCounter
*/ */
void createGlobalTrafficCounter() { void createGlobalTrafficCounter() {
TrafficCounter tc = new TrafficCounter(this, executor, "GlobalTC", TrafficCounter tc;
checkInterval); if (timer != null) {
setTrafficCounter(tc); tc = new TrafficCounter(this, timer, "GlobalTC",
tc.start(); checkInterval);
setTrafficCounter(tc);
tc.start();
}
} }
/** public GlobalTrafficShapingHandler(Timer timer, long writeLimit,
* @param executor
* @param writeLimit
* @param readLimit
* @param checkInterval
*/
public GlobalTrafficShapingHandler(Executor executor, long writeLimit,
long readLimit, long checkInterval) { long readLimit, long checkInterval) {
super(executor, writeLimit, readLimit, checkInterval); super(timer, writeLimit, readLimit, checkInterval);
createGlobalTrafficCounter(); createGlobalTrafficCounter();
} }
/** public GlobalTrafficShapingHandler(Timer timer, long writeLimit,
* @param executor
* @param writeLimit
* @param readLimit
*/
public GlobalTrafficShapingHandler(Executor executor, long writeLimit,
long readLimit) { long readLimit) {
super(executor, writeLimit, readLimit); super(timer, writeLimit, readLimit);
createGlobalTrafficCounter();
}
/**
* @param executor
* @param checkInterval
*/
public GlobalTrafficShapingHandler(Executor executor, long checkInterval) {
super(executor, checkInterval);
createGlobalTrafficCounter(); createGlobalTrafficCounter();
} }
/** public GlobalTrafficShapingHandler(Timer timer, long checkInterval) {
* @param executor super(timer, checkInterval);
*/ createGlobalTrafficCounter();
public GlobalTrafficShapingHandler(Executor executor) { }
super(executor);
public GlobalTrafficShapingHandler(Timer timer) {
super(timer);
createGlobalTrafficCounter(); createGlobalTrafficCounter();
} }
/**
* @param objectSizeEstimator
* @param executor
* @param writeLimit
* @param readLimit
* @param checkInterval
*/
public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator, public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
Executor executor, long writeLimit, long readLimit, Timer timer, long writeLimit, long readLimit,
long checkInterval) { long checkInterval) {
super(objectSizeEstimator, executor, writeLimit, readLimit, super(objectSizeEstimator, timer, writeLimit, readLimit,
checkInterval); checkInterval);
createGlobalTrafficCounter(); createGlobalTrafficCounter();
} }
/**
* @param objectSizeEstimator
* @param executor
* @param writeLimit
* @param readLimit
*/
public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator, public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
Executor executor, long writeLimit, long readLimit) { Timer timer, long writeLimit, long readLimit) {
super(objectSizeEstimator, executor, writeLimit, readLimit); super(objectSizeEstimator, timer, writeLimit, readLimit);
createGlobalTrafficCounter(); createGlobalTrafficCounter();
} }
/**
* @param objectSizeEstimator
* @param executor
* @param checkInterval
*/
public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator, public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
Executor executor, long checkInterval) { Timer timer, long checkInterval) {
super(objectSizeEstimator, executor, checkInterval); super(objectSizeEstimator, timer, checkInterval);
createGlobalTrafficCounter(); createGlobalTrafficCounter();
} }
/**
* @param objectSizeEstimator
* @param executor
*/
public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator, public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
Executor executor) { Timer timer) {
super(objectSizeEstimator, executor); super(objectSizeEstimator, timer);
createGlobalTrafficCounter(); createGlobalTrafficCounter();
} }
} }

View File

@ -15,11 +15,14 @@
*/ */
package io.netty.handler.traffic; package io.netty.handler.traffic;
import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
/** /**
* TrafficCounter is associated with {@link AbstractTrafficShapingHandler}.<br> * TrafficCounter is associated with {@link AbstractTrafficShapingHandler}.<br>
@ -97,27 +100,31 @@ public class TrafficCounter {
/** /**
* The associated TrafficShapingHandler * The associated TrafficShapingHandler
*/ */
private AbstractTrafficShapingHandler trafficShapingHandler; private final AbstractTrafficShapingHandler trafficShapingHandler;
/** /**
* Default Executor * One Timer for all Counter
*/ */
private Executor executor; private final Timer timer; // replace executor
/**
* Monitor created once in start()
*/
private TimerTask timerTask;
/**
* used in stop() to cancel the timer
*/
volatile private Timeout timeout = null;
/** /**
* Is Monitor active * Is Monitor active
*/ */
AtomicBoolean monitorActive = new AtomicBoolean(); AtomicBoolean monitorActive = new AtomicBoolean();
/**
* Monitor
*/
private TrafficMonitoring trafficMonitoring;
/** /**
* Class to implement monitoring at fix delay * Class to implement monitoring at fix delay
*/ *
private static class TrafficMonitoring implements Runnable { */
private static class TrafficMonitoringTask implements TimerTask {
/** /**
* The associated TrafficShapingHandler * The associated TrafficShapingHandler
*/ */
@ -132,43 +139,30 @@ public class TrafficCounter {
* @param trafficShapingHandler * @param trafficShapingHandler
* @param counter * @param counter
*/ */
protected TrafficMonitoring( protected TrafficMonitoringTask(
AbstractTrafficShapingHandler trafficShapingHandler, AbstractTrafficShapingHandler trafficShapingHandler,
TrafficCounter counter) { TrafficCounter counter) {
trafficShapingHandler1 = trafficShapingHandler; trafficShapingHandler1 = trafficShapingHandler;
this.counter = counter; this.counter = counter;
} }
/** public void run(Timeout timeout) throws Exception {
* Default run if (!counter.monitorActive.get()) {
*/ return;
@Override
public void run() {
try {
Thread.currentThread().setName(counter.name);
for (; counter.monitorActive.get();) {
long check = counter.checkInterval.get();
if (check > 0) {
Thread.sleep(check);
} else {
// Delay goes to 0, so exit
return;
}
long endTime = System.currentTimeMillis();
counter.resetAccounting(endTime);
if (trafficShapingHandler1 != null) {
trafficShapingHandler1.doAccounting(counter);
}
}
} catch (InterruptedException e) {
// End of computations
} }
long endTime = System.currentTimeMillis();
counter.resetAccounting(endTime);
if (trafficShapingHandler1 != null) {
trafficShapingHandler1.doAccounting(counter);
}
timeout =
counter.timer.newTimeout(this, counter.checkInterval.get(), TimeUnit.MILLISECONDS);
} }
} }
/** /**
* Start the monitoring process * Start the monitoring process
*/ */
public void start() { public void start() {
synchronized (lastTime) { synchronized (lastTime) {
if (monitorActive.get()) { if (monitorActive.get()) {
@ -177,16 +171,16 @@ public class TrafficCounter {
lastTime.set(System.currentTimeMillis()); lastTime.set(System.currentTimeMillis());
if (checkInterval.get() > 0) { if (checkInterval.get() > 0) {
monitorActive.set(true); monitorActive.set(true);
trafficMonitoring = new TrafficMonitoring( timerTask = new TrafficMonitoringTask(trafficShapingHandler, this);
trafficShapingHandler, this); timeout =
executor.execute(trafficMonitoring); timer.newTimeout(timerTask, checkInterval.get(), TimeUnit.MILLISECONDS);
} }
} }
} }
/** /**
* Stop the monitoring process * Stop the monitoring process
*/ */
public void stop() { public void stop() {
synchronized (lastTime) { synchronized (lastTime) {
if (!monitorActive.get()) { if (!monitorActive.get()) {
@ -197,6 +191,9 @@ public class TrafficCounter {
if (trafficShapingHandler != null) { if (trafficShapingHandler != null) {
trafficShapingHandler.doAccounting(this); trafficShapingHandler.doAccounting(this);
} }
if (timeout != null) {
timeout.cancel();
}
} }
} }
@ -222,20 +219,20 @@ public class TrafficCounter {
} }
/** /**
* Constructor with the {@link AbstractTrafficShapingHandler} that hosts it, the executorService to use, its * Constructor with the {@link AbstractTrafficShapingHandler} that hosts it, the Timer to use, its
* name, the checkInterval between two computations in millisecond * name, the checkInterval between two computations in millisecond
* @param trafficShapingHandler the associated AbstractTrafficShapingHandler * @param trafficShapingHandler the associated AbstractTrafficShapingHandler
* @param executor * @param timer
* Should be a CachedThreadPool for efficiency * Could be a HashedWheelTimer
* @param name * @param name
* the name given to this monitor * the name given to this monitor
* @param checkInterval * @param checkInterval
* the checkInterval in millisecond between two computations * the checkInterval in millisecond between two computations
*/ */
public TrafficCounter(AbstractTrafficShapingHandler trafficShapingHandler, public TrafficCounter(AbstractTrafficShapingHandler trafficShapingHandler,
Executor executor, String name, long checkInterval) { Timer timer, String name, long checkInterval) {
this.trafficShapingHandler = trafficShapingHandler; this.trafficShapingHandler = trafficShapingHandler;
this.executor = executor; this.timer = timer;
this.name = name; this.name = name;
lastCumulativeTime = System.currentTimeMillis(); lastCumulativeTime = System.currentTimeMillis();
configure(checkInterval); configure(checkInterval);
@ -248,7 +245,7 @@ public class TrafficCounter {
* @param newcheckInterval * @param newcheckInterval
*/ */
public void configure(long newcheckInterval) { public void configure(long newcheckInterval) {
long newInterval = (newcheckInterval/10)*10; long newInterval = (newcheckInterval / 10) * 10;
if (checkInterval.get() != newInterval) { if (checkInterval.get() != newInterval) {
checkInterval.set(newInterval); checkInterval.set(newInterval);
if (newInterval <= 0) { if (newInterval <= 0) {

View File

@ -17,26 +17,28 @@
/** /**
* Implementation of a Traffic Shaping Handler and Dynamic Statistics.<br> * Implementation of a Traffic Shaping Handler and Dynamic Statistics.<br>
* <br><br> * <br><br>
*
*
* <P>The main goal of this package is to allow to shape the traffic (bandwidth limitation), * <P>The main goal of this package is to allow to shape the traffic (bandwidth limitation),
* but also to get statistics on how many bytes are read or written. Both functions can * but also to get statistics on how many bytes are read or written. Both functions can
* be active or inactive (traffic or statistics).</P> * be active or inactive (traffic or statistics).</P>
* *
* <P>Two classes implement this behavior:<br> * <P>Two classes implement this behavior:<br>
* <ul> * <ul>
* <li> <tt>{@link io.netty.handler.traffic.TrafficCounter}</tt>: this class implements the counters needed by the handlers. * <li> <tt>{@link TrafficCounter}</tt>: this class implements the counters needed by the handlers.
* It can be accessed to get some extra information like the read or write bytes since last check, the read and write * It can be accessed to get some extra information like the read or write bytes since last check, the read and write
* bandwidth from last check...</li><br><br> * bandwidth from last check...</li><br><br>
* *
* <li> <tt>{@link io.netty.handler.traffic.AbstractTrafficShapingHandler}</tt>: this abstract class implements the kernel * <li> <tt>{@link AbstractTrafficShapingHandler}</tt>: this abstract class implements the kernel
* of the traffic shaping. It could be extended to fit your needs. Two classes are proposed as default * of the traffic shaping. It could be extended to fit your needs. Two classes are proposed as default
* implementations: see {@link io.netty.handler.traffic.ChannelTrafficShapingHandler} and see {@link io.netty.handler.traffic.GlobalTrafficShapingHandler} * implementations: see {@link ChannelTrafficShapingHandler} and see {@link GlobalTrafficShapingHandler}
* respectively for Channel traffic shaping and Global traffic shaping.</li><br><br> * respectively for Channel traffic shaping and Global traffic shaping.</li><br><br>
* *
* The insertion in the pipeline of one of those handlers can be wherever you want, but * The insertion in the pipeline of one of those handlers can be wherever you want, but
* <b>it must be placed before any <tt>{@link io.netty.handler.execution.MemoryAwareThreadPoolExecutor}</tt> * <b>it must be placed before any <tt>{@link MemoryAwareThreadPoolExecutor}</tt>
* in your pipeline</b>.<br> * in your pipeline</b>.</li><br>
* <b><i>It is really recommended to have such a</i> <tt>{@link io.netty.handler.execution.MemoryAwareThreadPoolExecutor}</tt> * <b><i>It is really recommended to have such a</i> <tt>{@link MemoryAwareThreadPoolExecutor}</tt>
* <i>(either non ordered or </i> <tt>{@link io.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor}</tt> * <i>(either non ordered or </i> <tt>{@link OrderedMemoryAwareThreadPoolExecutor}</tt>
* <i>) in your pipeline</i></b> * <i>) in your pipeline</i></b>
* when you want to use this feature with some real traffic shaping, since it will allow to relax the constraint on * when you want to use this feature with some real traffic shaping, since it will allow to relax the constraint on
* NioWorker to do other jobs if necessary.<br> * NioWorker to do other jobs if necessary.<br>
@ -48,9 +50,9 @@
* 60KB/s for each channel since NioWorkers are stopping by this handler.<br> * 60KB/s for each channel since NioWorkers are stopping by this handler.<br>
* When it is used as a read traffic shaper, the handler will set the channel as not readable, so as to relax the * When it is used as a read traffic shaper, the handler will set the channel as not readable, so as to relax the
* NioWorkers.<br><br> * NioWorkers.<br><br>
* An {@link io.netty.util.ObjectSizeEstimator} can be passed at construction to specify what * An {@link ObjectSizeEstimator} can be passed at construction to specify what
* is the size of the object to be read or write accordingly to the type of * is the size of the object to be read or write accordingly to the type of
* object. If not specified, it will used the {@link io.netty.util.DefaultObjectSizeEstimator} implementation.<br><br> * object. If not specified, it will used the {@link DefaultObjectSizeEstimator} implementation.<br><br>
* </ul></P> * </ul></P>
* *
* <P>Standard use could be as follow:</P> * <P>Standard use could be as follow:</P>
@ -60,27 +62,27 @@
* [Global or per Channel] [Write or Read] Limitation in byte/s.</li><br> * [Global or per Channel] [Write or Read] Limitation in byte/s.</li><br>
* A value of <tt>0</tt> * A value of <tt>0</tt>
* stands for no limitation, so the traffic shaping is deactivate (on what you specified).<br> * stands for no limitation, so the traffic shaping is deactivate (on what you specified).<br>
* You can either change those values with the method <tt>configure</tt> in {@link io.netty.handler.traffic.AbstractTrafficShapingHandler}.<br> * You can either change those values with the method <tt>configure</tt> in {@link AbstractTrafficShapingHandler}.<br>
* <br> * <br>
* *
* <li>To activate or deactivate the statistics, you can adjust the delay to a low (suggested not less than 200ms * <li>To activate or deactivate the statistics, you can adjust the delay to a low (suggested not less than 200ms
* for efficiency reasons) or a high value (let say 24H in millisecond is huge enough to not get the problem) * for efficiency reasons) or a high value (let say 24H in millisecond is huge enough to not get the problem)
* or even using <tt>0</tt> which means no computation will be done.</li><br> * or even using <tt>0</tt> which means no computation will be done.</li><br>
* If you want to do anything with this statistics, just override the <tt>doAccounting</tt> method.<br> * If you want to do anything with this statistics, just override the <tt>doAccounting</tt> method.<br>
* This interval can be changed either from the method <tt>configure</tt> in {@link io.netty.handler.traffic.AbstractTrafficShapingHandler} * This interval can be changed either from the method <tt>configure</tt> in {@link AbstractTrafficShapingHandler}
* or directly using the method <tt>configure</tt> of {@link io.netty.handler.traffic.TrafficCounter}.<br><br> * or directly using the method <tt>configure</tt> of {@link TrafficCounter}.<br><br>
* *
* </ul></P><br><br> * </ul></P><br><br>
* *
* <P>So in your application you will create your own TrafficShapingHandler and set the values to fit your needs.</P> * <P>So in your application you will create your own TrafficShapingHandler and set the values to fit your needs.</P>
* <tt>XXXXXTrafficShapingHandler myHandler = new XXXXXTrafficShapingHandler(executor);</tt><br><br> * <tt>XXXXXTrafficShapingHandler myHandler = new XXXXXTrafficShapingHandler(timer);</tt><br><br>
* where executor could be created using <tt>Executors.newCachedThreadPool();</tt> and XXXXX could be either * timer could be created using <tt>HashedWheelTimer<tt> and XXXXX could be either
* Global or Channel<br> * Global or Channel<br>
* <tt>pipeline.addLast("XXXXX_TRAFFIC_SHAPING", myHandler);</tt><br> * <tt>pipeline.addLast("XXXXX_TRAFFIC_SHAPING", myHandler);</tt><br>
* <tt>...</tt><br> * <tt>...</tt><br>
* <tt>pipeline.addLast("MemoryExecutor",new ExecutionHandler(memoryAwareThreadPoolExecutor));</tt><br><br> * <tt>pipeline.addLast("MemoryExecutor",new ExecutionHandler(memoryAwareThreadPoolExecutor));</tt><br><br>
* <P>Note that a new {@link io.netty.handler.traffic.ChannelTrafficShapingHandler} must be created for each new channel, * <P>Note that a new {@link ChannelTrafficShapingHandler} must be created for each new channel,
* but only one {@link io.netty.handler.traffic.GlobalTrafficShapingHandler} must be created for all channels.</P> * but only one {@link GlobalTrafficShapingHandler} must be created for all channels.</P>
* *
* <P>Note also that you can create different GlobalTrafficShapingHandler if you want to separate classes of * <P>Note also that you can create different GlobalTrafficShapingHandler if you want to separate classes of
* channels (for instance either from business point of view or from bind address point of view).</P> * channels (for instance either from business point of view or from bind address point of view).</P>