From 54c97d07202d98eac734beacf885579973b515b8 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Br=C3=A9gier?= The main goal of this package is to allow to shape the traffic (bandwidth limitation),
* but also to get statistics on how many bytes are read or written. Both functions can
* be active or inactive (traffic or statistics). Two classes implement this behavior:
*
*
+ * An {@link ObjectSizeEstimator} can be passed at construction to specify what
+ * is the size of the object to be read or write accordingly to the type of
+ * object. If not specified, it will used the {@link DefaultObjectSizeEstimator} implementation.
+ *
* If you want for any particular reasons to stop the monitoring (accounting) or to change
* the read/write limit or the check interval, several methods allow that for you:
*
@@ -79,10 +85,14 @@ public abstract class AbstractTrafficShapingHandler extends
private ObjectSizeEstimator objectSizeEstimator;
/**
- * Executor to associated to any TrafficCounter
+ * Timer to associated to any TrafficCounter
*/
- protected Executor executor;
-
+ protected Timer timer;
+ /**
+ * used in releaseExternalResources() to cancel the timer
+ */
+ volatile private Timeout timeout = null;
+
/**
* Limit in B/s to apply to write
*/
@@ -105,15 +115,16 @@ public abstract class AbstractTrafficShapingHandler extends
*/
final AtomicBoolean release = new AtomicBoolean(false);
- private void init(ObjectSizeEstimator newObjectSizeEstimator,
- Executor newExecutor, long newWriteLimit, long newReadLimit, long newCheckInterval) {
- objectSizeEstimator = newObjectSizeEstimator;
- executor = newExecutor;
- writeLimit = newWriteLimit;
- readLimit = newReadLimit;
- checkInterval = newCheckInterval;
- //logger.info("TSH: "+writeLimit+":"+readLimit+":"+checkInterval+":"+isPerChannel());
- }
+ private void init(ObjectSizeEstimator newObjectSizeEstimator,
+ Timer newTimer, long newWriteLimit, long newReadLimit,
+ long newCheckInterval) {
+ objectSizeEstimator = newObjectSizeEstimator;
+ timer = newTimer;
+ writeLimit = newWriteLimit;
+ readLimit = newReadLimit;
+ checkInterval = newCheckInterval;
+ //logger.warn("TSH: "+writeLimit+":"+readLimit+":"+checkInterval);
+ }
/**
*
@@ -126,8 +137,8 @@ public abstract class AbstractTrafficShapingHandler extends
/**
* Constructor using default {@link ObjectSizeEstimator}
*
- * @param executor
- * created for instance like Executors.newCachedThreadPool
+ * @param timer
+ * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
* @param writeLimit
* 0 or a limit in bytes/s
* @param readLimit
@@ -136,10 +147,9 @@ public abstract class AbstractTrafficShapingHandler extends
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed
*/
- public AbstractTrafficShapingHandler(Executor executor, long writeLimit,
+ public AbstractTrafficShapingHandler(Timer timer, long writeLimit,
long readLimit, long checkInterval) {
- init(new DefaultObjectSizeEstimator(), executor, writeLimit, readLimit,
- checkInterval);
+ init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit, checkInterval);
}
/**
@@ -148,8 +158,8 @@ public abstract class AbstractTrafficShapingHandler extends
* @param objectSizeEstimator
* the {@link ObjectSizeEstimator} that will be used to compute
* the size of the message
- * @param executor
- * created for instance like Executors.newCachedThreadPool
+ * @param timer
+ * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
* @param writeLimit
* 0 or a limit in bytes/s
* @param readLimit
@@ -159,26 +169,24 @@ public abstract class AbstractTrafficShapingHandler extends
* channels or 0 if no stats are to be computed
*/
public AbstractTrafficShapingHandler(
- ObjectSizeEstimator objectSizeEstimator, Executor executor,
+ ObjectSizeEstimator objectSizeEstimator, Timer timer,
long writeLimit, long readLimit, long checkInterval) {
- init(objectSizeEstimator, executor, writeLimit, readLimit,
- checkInterval);
+ init(objectSizeEstimator, timer, writeLimit, readLimit, checkInterval);
}
/**
* Constructor using default {@link ObjectSizeEstimator} and using default Check Interval
*
- * @param executor
- * created for instance like Executors.newCachedThreadPool
+ * @param timer
+ * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
* @param writeLimit
* 0 or a limit in bytes/s
* @param readLimit
* 0 or a limit in bytes/s
*/
- public AbstractTrafficShapingHandler(Executor executor, long writeLimit,
+ public AbstractTrafficShapingHandler(Timer timer, long writeLimit,
long readLimit) {
- init(new DefaultObjectSizeEstimator(), executor, writeLimit, readLimit,
- DEFAULT_CHECK_INTERVAL);
+ init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit, DEFAULT_CHECK_INTERVAL);
}
/**
@@ -187,29 +195,27 @@ public abstract class AbstractTrafficShapingHandler extends
* @param objectSizeEstimator
* the {@link ObjectSizeEstimator} that will be used to compute
* the size of the message
- * @param executor
- * created for instance like Executors.newCachedThreadPool
+ * @param timer
+ * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
* @param writeLimit
* 0 or a limit in bytes/s
* @param readLimit
* 0 or a limit in bytes/s
*/
public AbstractTrafficShapingHandler(
- ObjectSizeEstimator objectSizeEstimator, Executor executor,
+ ObjectSizeEstimator objectSizeEstimator, Timer timer,
long writeLimit, long readLimit) {
- init(objectSizeEstimator, executor, writeLimit, readLimit,
- DEFAULT_CHECK_INTERVAL);
+ init(objectSizeEstimator, timer, writeLimit, readLimit, DEFAULT_CHECK_INTERVAL);
}
/**
* Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT and default Check Interval
*
- * @param executor
- * created for instance like Executors.newCachedThreadPool
+ * @param timer
+ * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
*/
- public AbstractTrafficShapingHandler(Executor executor) {
- init(new DefaultObjectSizeEstimator(), executor, 0, 0,
- DEFAULT_CHECK_INTERVAL);
+ public AbstractTrafficShapingHandler(Timer timer) {
+ init(new DefaultObjectSizeEstimator(), timer, 0, 0, DEFAULT_CHECK_INTERVAL);
}
/**
@@ -218,25 +224,25 @@ public abstract class AbstractTrafficShapingHandler extends
* @param objectSizeEstimator
* the {@link ObjectSizeEstimator} that will be used to compute
* the size of the message
- * @param executor
- * created for instance like Executors.newCachedThreadPool
+ * @param timer
+ * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
*/
public AbstractTrafficShapingHandler(
- ObjectSizeEstimator objectSizeEstimator, Executor executor) {
- init(objectSizeEstimator, executor, 0, 0, DEFAULT_CHECK_INTERVAL);
+ ObjectSizeEstimator objectSizeEstimator, Timer timer) {
+ init(objectSizeEstimator, timer, 0, 0, DEFAULT_CHECK_INTERVAL);
}
/**
* Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT
*
- * @param executor
- * created for instance like Executors.newCachedThreadPool
+ * @param timer
+ * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed
*/
- public AbstractTrafficShapingHandler(Executor executor, long checkInterval) {
- init(new DefaultObjectSizeEstimator(), executor, 0, 0, checkInterval);
+ public AbstractTrafficShapingHandler(Timer timer, long checkInterval) {
+ init(new DefaultObjectSizeEstimator(), timer, 0, 0, checkInterval);
}
/**
@@ -245,20 +251,24 @@ public abstract class AbstractTrafficShapingHandler extends
* @param objectSizeEstimator
* the {@link ObjectSizeEstimator} that will be used to compute
* the size of the message
- * @param executor
- * created for instance like Executors.newCachedThreadPool
+ * @param timer
+ * created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed
*/
public AbstractTrafficShapingHandler(
- ObjectSizeEstimator objectSizeEstimator, Executor executor,
+ ObjectSizeEstimator objectSizeEstimator, Timer timer,
long checkInterval) {
- init(objectSizeEstimator, executor, 0, 0, checkInterval);
+ init(objectSizeEstimator, timer, 0, 0, checkInterval);
}
/**
* Change the underlying limitations and check interval.
+ *
+ * @param newWriteLimit
+ * @param newReadLimit
+ * @param newCheckInterval
*/
public void configure(long newWriteLimit, long newReadLimit,
long newCheckInterval) {
@@ -268,17 +278,22 @@ public abstract class AbstractTrafficShapingHandler extends
/**
* Change the underlying limitations.
+ *
+ * @param newWriteLimit
+ * @param newReadLimit
*/
public void configure(long newWriteLimit, long newReadLimit) {
writeLimit = newWriteLimit;
readLimit = newReadLimit;
if (trafficCounter != null) {
- trafficCounter.resetAccounting(System.currentTimeMillis() + 1);
+ trafficCounter.resetAccounting(System.currentTimeMillis()+1);
}
}
/**
* Change the check interval.
+ *
+ * @param newCheckInterval
*/
public void configure(long newCheckInterval) {
checkInterval = newCheckInterval;
@@ -300,46 +315,25 @@ public abstract class AbstractTrafficShapingHandler extends
/**
* Class to implement setReadable at fix time
- */
- private class ReopenRead implements Runnable {
- /**
- * Associated ChannelHandlerContext
- */
- private ChannelHandlerContext ctx;
-
- /**
- * Time to wait before clearing the channel
- */
- private long timeToWait;
-
- /**
- * @param ctx
- * the associated channelHandlerContext
- * @param timeToWait
- */
- protected ReopenRead(ChannelHandlerContext ctx, long timeToWait) {
+ */
+ private class ReopenReadTimerTask implements TimerTask {
+ ChannelHandlerContext ctx;
+ ReopenReadTimerTask(ChannelHandlerContext ctx) {
this.ctx = ctx;
- this.timeToWait = timeToWait;
}
-
- /**
- * Truly run the waken up of the channel
- */
- @Override
- public void run() {
- try {
- if (release.get()) {
- return;
- }
- Thread.sleep(timeToWait);
- } catch (InterruptedException e) {
- // interruption so exit
+ public void run(Timeout timeoutArg) throws Exception {
+ //logger.warn("Start RRTT: "+release.get());
+ if (release.get()) {
return;
}
- // logger.info("WAKEUP!");
+ /*
+ logger.warn("WAKEUP! "+
+ (ctx != null && ctx.getChannel() != null &&
+ ctx.getChannel().isConnected()));
+ */
if (ctx != null && ctx.getChannel() != null &&
ctx.getChannel().isConnected()) {
- //logger.info(" setReadable TRUE: "+timeToWait);
+ //logger.warn(" setReadable TRUE: ");
// readSuspended = false;
ctx.setAttachment(null);
ctx.getChannel().setReadable(true);
@@ -375,17 +369,18 @@ public abstract class AbstractTrafficShapingHandler extends
return;
}
// compute the number of ms to wait before reopening the channel
- long wait = getTimeToWait(readLimit, trafficCounter
- .getCurrentReadBytes(), trafficCounter.getLastTime(),
- curtime);
- if (wait > MINIMAL_WAIT) { // At least 10ms seems a minimal time in order to
+ long wait = getTimeToWait(readLimit,
+ trafficCounter.getCurrentReadBytes(),
+ trafficCounter.getLastTime(), curtime);
+ if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
+ // time in order to
Channel channel = arg0.getChannel();
// try to limit the traffic
if (channel != null && channel.isConnected()) {
// Channel version
- if (executor == null) {
+ if (timer == null) {
// Sleep since no executor
- //logger.info("Read sleep since no executor for "+wait+" ms for "+this);
+ // logger.warn("Read sleep since no timer for "+wait+" ms for "+this);
if (release.get()) {
return;
}
@@ -396,11 +391,14 @@ public abstract class AbstractTrafficShapingHandler extends
// readSuspended = true;
arg0.setAttachment(Boolean.TRUE);
channel.setReadable(false);
- //logger.info("Read will wakeup after "+wait+" ms "+this);
- executor.execute(new ReopenRead(arg0, wait));
+ // logger.warn("Read will wakeup after "+wait+" ms "+this);
+ TimerTask timerTask = new ReopenReadTimerTask(arg0);
+ timeout = timer.newTimeout(timerTask, wait,
+ TimeUnit.MILLISECONDS);
} else {
- // should be waiting: but can occurs sometime so as a FIX
- //logger.info("Read sleep ok but should not be here: "+wait+" "+this);
+ // should be waiting: but can occurs sometime so as
+ // a FIX
+ // logger.warn("Read sleep ok but should not be here: "+wait+" "+this);
if (release.get()) {
return;
}
@@ -408,7 +406,7 @@ public abstract class AbstractTrafficShapingHandler extends
}
} else {
// Not connected or no channel
- //logger.info("Read sleep "+wait+" ms for "+this);
+ // logger.warn("Read sleep "+wait+" ms for "+this);
if (release.get()) {
return;
}
@@ -433,11 +431,12 @@ public abstract class AbstractTrafficShapingHandler extends
if (writeLimit == 0) {
return;
}
- // compute the number of ms to wait before continue with the channel
- long wait = getTimeToWait(writeLimit, trafficCounter
- .getCurrentWrittenBytes(), trafficCounter.getLastTime(),
- curtime);
- if (wait > MINIMAL_WAIT) {
+ // compute the number of ms to wait before continue with the
+ // channel
+ long wait = getTimeToWait(writeLimit,
+ trafficCounter.getCurrentWrittenBytes(),
+ trafficCounter.getLastTime(), curtime);
+ if (wait >= MINIMAL_WAIT) {
// Global or Channel
if (release.get()) {
return;
@@ -450,7 +449,6 @@ public abstract class AbstractTrafficShapingHandler extends
super.writeRequested(arg0, arg1);
}
}
-
@Override
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
@@ -481,13 +479,14 @@ public abstract class AbstractTrafficShapingHandler extends
return trafficCounter;
}
- @Override
public void releaseExternalResources() {
if (trafficCounter != null) {
trafficCounter.stop();
}
release.set(true);
- ExecutorUtil.terminate(executor);
+ if (timeout != null) {
+ timeout.cancel();
+ }
}
@Override
From 714e3d682eadfa2ad8ca2edc70e0076000ef038c Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Br=C3=A9gier?=
*
- * ChannelTrafficShapingHandler myHandler = new ChannelTrafficShapingHandler(executor);
- * executor could be created using Executors.newCachedThreadPool();
+ * ChannelTrafficShapingHandler myHandler = new ChannelTrafficShapingHandler(timer);
+ * timer could be created using HashedWheelTimer
* pipeline.addLast("CHANNEL_TRAFFIC_SHAPING", myHandler);
*
* Note that this handler has a Pipeline Coverage of "one" which means a new handler must be created
@@ -52,7 +51,7 @@ import io.netty.handler.execution.ObjectSizeEstimator;
* the less precise the traffic shaping will be. It is suggested as higher value something close
* to 5 or 10 minutes.
*
* myHandler.releaseExternalResources();
*
*
*
- * GlobalTrafficShapingHandler myHandler = new GlobalTrafficShapingHandler(executor);
- * executor could be created using Executors.newCachedThreadPool();
+ * GlobalTrafficShapingHandler myHandler = new GlobalTrafficShapingHandler(timer);
+ * timer could be created using HashedWheelTimer
* pipeline.addLast("GLOBAL_TRAFFIC_SHAPING", myHandler);
*
* Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created
@@ -52,7 +51,7 @@ import io.netty.handler.execution.ObjectSizeEstimator;
* {@link OrderedMemoryAwareThreadPoolExecutor} or {@link MemoryAwareThreadPoolExecutor}).
* pipeline.addLast("GLOBAL_TRAFFIC_SHAPING", myHandler);
*
* myHandler.releaseExternalResources();
*
@@ -97,27 +100,31 @@ public class TrafficCounter {
/**
* The associated TrafficShapingHandler
*/
- private AbstractTrafficShapingHandler trafficShapingHandler;
+ private final AbstractTrafficShapingHandler trafficShapingHandler;
/**
- * Default Executor
+ * One Timer for all Counter
*/
- private Executor executor;
+ private final Timer timer; // replace executor
+ /**
+ * Monitor created once in start()
+ */
+ private TimerTask timerTask;
+ /**
+ * used in stop() to cancel the timer
+ */
+ volatile private Timeout timeout = null;
/**
* Is Monitor active
*/
AtomicBoolean monitorActive = new AtomicBoolean();
- /**
- * Monitor
- */
- private TrafficMonitoring trafficMonitoring;
-
/**
* Class to implement monitoring at fix delay
- */
- private static class TrafficMonitoring implements Runnable {
+ *
+ */
+ private static class TrafficMonitoringTask implements TimerTask {
/**
* The associated TrafficShapingHandler
*/
@@ -132,43 +139,30 @@ public class TrafficCounter {
* @param trafficShapingHandler
* @param counter
*/
- protected TrafficMonitoring(
+ protected TrafficMonitoringTask(
AbstractTrafficShapingHandler trafficShapingHandler,
TrafficCounter counter) {
trafficShapingHandler1 = trafficShapingHandler;
this.counter = counter;
}
- /**
- * Default run
- */
- @Override
- public void run() {
- try {
- Thread.currentThread().setName(counter.name);
- for (; counter.monitorActive.get();) {
- long check = counter.checkInterval.get();
- if (check > 0) {
- Thread.sleep(check);
- } else {
- // Delay goes to 0, so exit
- return;
- }
- long endTime = System.currentTimeMillis();
- counter.resetAccounting(endTime);
- if (trafficShapingHandler1 != null) {
- trafficShapingHandler1.doAccounting(counter);
- }
- }
- } catch (InterruptedException e) {
- // End of computations
+ public void run(Timeout timeout) throws Exception {
+ if (!counter.monitorActive.get()) {
+ return;
}
+ long endTime = System.currentTimeMillis();
+ counter.resetAccounting(endTime);
+ if (trafficShapingHandler1 != null) {
+ trafficShapingHandler1.doAccounting(counter);
+ }
+ timeout =
+ counter.timer.newTimeout(this, counter.checkInterval.get(), TimeUnit.MILLISECONDS);
}
}
/**
* Start the monitoring process
- */
+ */
public void start() {
synchronized (lastTime) {
if (monitorActive.get()) {
@@ -177,16 +171,16 @@ public class TrafficCounter {
lastTime.set(System.currentTimeMillis());
if (checkInterval.get() > 0) {
monitorActive.set(true);
- trafficMonitoring = new TrafficMonitoring(
- trafficShapingHandler, this);
- executor.execute(trafficMonitoring);
+ timerTask = new TrafficMonitoringTask(trafficShapingHandler, this);
+ timeout =
+ timer.newTimeout(timerTask, checkInterval.get(), TimeUnit.MILLISECONDS);
}
}
}
/**
* Stop the monitoring process
- */
+ */
public void stop() {
synchronized (lastTime) {
if (!monitorActive.get()) {
@@ -197,6 +191,9 @@ public class TrafficCounter {
if (trafficShapingHandler != null) {
trafficShapingHandler.doAccounting(this);
}
+ if (timeout != null) {
+ timeout.cancel();
+ }
}
}
@@ -222,20 +219,20 @@ public class TrafficCounter {
}
/**
- * Constructor with the {@link AbstractTrafficShapingHandler} that hosts it, the executorService to use, its
+ * Constructor with the {@link AbstractTrafficShapingHandler} that hosts it, the Timer to use, its
* name, the checkInterval between two computations in millisecond
* @param trafficShapingHandler the associated AbstractTrafficShapingHandler
- * @param executor
- * Should be a CachedThreadPool for efficiency
+ * @param timer
+ * Could be a HashedWheelTimer
* @param name
* the name given to this monitor
* @param checkInterval
* the checkInterval in millisecond between two computations
*/
public TrafficCounter(AbstractTrafficShapingHandler trafficShapingHandler,
- Executor executor, String name, long checkInterval) {
+ Timer timer, String name, long checkInterval) {
this.trafficShapingHandler = trafficShapingHandler;
- this.executor = executor;
+ this.timer = timer;
this.name = name;
lastCumulativeTime = System.currentTimeMillis();
configure(checkInterval);
From 8846947081b37a53f884961c33b6f126a2cbaaa3 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Br=C3=A9gier?=
*
+ *
+ *
*
*
- *
*
- *
*
* The insertion in the pipeline of one of those handlers can be wherever you want, but
- * it must be placed before any {@link io.netty.handler.execution.MemoryAwareThreadPoolExecutor}
- * in your pipeline.
- * It is really recommended to have such a {@link io.netty.handler.execution.MemoryAwareThreadPoolExecutor}
- * (either non ordered or {@link io.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor}
+ * it must be placed before any {@link MemoryAwareThreadPoolExecutor}
+ * in your pipeline.
+ * It is really recommended to have such a {@link MemoryAwareThreadPoolExecutor}
+ * (either non ordered or {@link OrderedMemoryAwareThreadPoolExecutor}
* ) in your pipeline
* when you want to use this feature with some real traffic shaping, since it will allow to relax the constraint on
* NioWorker to do other jobs if necessary.
@@ -48,9 +50,9 @@
* 60KB/s for each channel since NioWorkers are stopping by this handler.
* When it is used as a read traffic shaper, the handler will set the channel as not readable, so as to relax the
* NioWorkers.
- * An {@link io.netty.util.ObjectSizeEstimator} can be passed at construction to specify what
+ * An {@link ObjectSizeEstimator} can be passed at construction to specify what
* is the size of the object to be read or write accordingly to the type of
- * object. If not specified, it will used the {@link io.netty.util.DefaultObjectSizeEstimator} implementation.
+ * object. If not specified, it will used the {@link DefaultObjectSizeEstimator} implementation.
*
Standard use could be as follow:
@@ -60,27 +62,27 @@ * [Global or per Channel] [Write or Read] Limitation in byte/s.So in your application you will create your own TrafficShapingHandler and set the values to fit your needs.
- * XXXXXTrafficShapingHandler myHandler = new XXXXXTrafficShapingHandler(executor);Note that a new {@link io.netty.handler.traffic.ChannelTrafficShapingHandler} must be created for each new channel, - * but only one {@link io.netty.handler.traffic.GlobalTrafficShapingHandler} must be created for all channels.
+ *Note that a new {@link ChannelTrafficShapingHandler} must be created for each new channel, + * but only one {@link GlobalTrafficShapingHandler} must be created for all channels.
* *Note also that you can create different GlobalTrafficShapingHandler if you want to separate classes of * channels (for instance either from business point of view or from bind address point of view).
From c634539faa81f9f551ca5beaf15243b6c34c3e0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Br=C3=A9gier?=