* - Add in your pipeline a new ChannelTrafficShapingHandler, before a recommended {@link ExecutionHandler} (like
* {@link OrderedMemoryAwareThreadPoolExecutor} or {@link MemoryAwareThreadPoolExecutor}).
- * ChannelTrafficShapingHandler myHandler = new ChannelTrafficShapingHandler(executor);
- * executor could be created using Executors.newCachedThreadPool();
+ * ChannelTrafficShapingHandler myHandler = new ChannelTrafficShapingHandler(timer);
+ * timer could be created using HashedWheelTimer
* pipeline.addLast("CHANNEL_TRAFFIC_SHAPING", myHandler);
*
* Note that this handler has a Pipeline Coverage of "one" which means a new handler must be created
@@ -52,7 +51,7 @@ import io.netty.handler.execution.ObjectSizeEstimator;
* the less precise the traffic shaping will be. It is suggested as higher value something close
* to 5 or 10 minutes.
*
- * - When you shutdown your application, release all the external resources like the executor
+ *
- When you shutdown your application, release all the external resources (except the timer internal itself)
* by calling:
* myHandler.releaseExternalResources();
*
@@ -60,96 +59,53 @@ import io.netty.handler.execution.ObjectSizeEstimator;
*/
public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler {
- /**
- * @param executor
- * @param writeLimit
- * @param readLimit
- * @param checkInterval
- */
- public ChannelTrafficShapingHandler(Executor executor, long writeLimit,
+ public ChannelTrafficShapingHandler(Timer timer, long writeLimit,
long readLimit, long checkInterval) {
- super(executor, writeLimit, readLimit, checkInterval);
+ super(timer, writeLimit, readLimit, checkInterval);
}
- /**
- * @param executor
- * @param writeLimit
- * @param readLimit
- */
- public ChannelTrafficShapingHandler(Executor executor, long writeLimit,
+ public ChannelTrafficShapingHandler(Timer timer, long writeLimit,
long readLimit) {
- super(executor, writeLimit, readLimit);
- }
-
-
- /**
- * @param executor
- * @param checkInterval
- */
- public ChannelTrafficShapingHandler(Executor executor, long checkInterval) {
- super(executor, checkInterval);
+ super(timer, writeLimit, readLimit);
}
- /**
- * @param executor
- */
- public ChannelTrafficShapingHandler(Executor executor) {
- super(executor);
+ public ChannelTrafficShapingHandler(Timer timer, long checkInterval) {
+ super(timer, checkInterval);
+ }
+
+ public ChannelTrafficShapingHandler(Timer timer) {
+ super(timer);
}
- /**
- * @param objectSizeEstimator
- * @param executor
- * @param writeLimit
- * @param readLimit
- * @param checkInterval
- */
public ChannelTrafficShapingHandler(
- ObjectSizeEstimator objectSizeEstimator, Executor executor,
+ ObjectSizeEstimator objectSizeEstimator, Timer timer,
long writeLimit, long readLimit, long checkInterval) {
- super(objectSizeEstimator, executor, writeLimit, readLimit,
+ super(objectSizeEstimator, timer, writeLimit, readLimit,
checkInterval);
}
- /**
- * @param objectSizeEstimator
- * @param executor
- * @param writeLimit
- * @param readLimit
- */
public ChannelTrafficShapingHandler(
- ObjectSizeEstimator objectSizeEstimator, Executor executor,
+ ObjectSizeEstimator objectSizeEstimator, Timer timer,
long writeLimit, long readLimit) {
- super(objectSizeEstimator, executor, writeLimit, readLimit);
+ super(objectSizeEstimator, timer, writeLimit, readLimit);
}
- /**
- * @param objectSizeEstimator
- * @param executor
- * @param checkInterval
- */
public ChannelTrafficShapingHandler(
- ObjectSizeEstimator objectSizeEstimator, Executor executor,
+ ObjectSizeEstimator objectSizeEstimator, Timer timer,
long checkInterval) {
- super(objectSizeEstimator, executor, checkInterval);
+ super(objectSizeEstimator, timer, checkInterval);
}
- /**
- * @param objectSizeEstimator
- * @param executor
- */
public ChannelTrafficShapingHandler(
- ObjectSizeEstimator objectSizeEstimator, Executor executor) {
- super(objectSizeEstimator, executor);
+ ObjectSizeEstimator objectSizeEstimator, Timer timer) {
+ super(objectSizeEstimator, timer);
}
-
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
if (trafficCounter != null) {
trafficCounter.stop();
- trafficCounter = null;
}
super.channelClosed(ctx, e);
}
@@ -162,8 +118,10 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler
ctx.getChannel().setReadable(false);
if (trafficCounter == null) {
// create a new counter now
- trafficCounter = new TrafficCounter(this, executor, "ChannelTC" +
- ctx.getChannel().getId(), checkInterval);
+ if (timer != null) {
+ trafficCounter = new TrafficCounter(this, timer, "ChannelTC" +
+ ctx.getChannel().getId(), checkInterval);
+ }
}
if (trafficCounter != null) {
trafficCounter.start();
diff --git a/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java
index a9a96a436c..b6617d8847 100644
--- a/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java
+++ b/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java
@@ -15,13 +15,12 @@
*/
package io.netty.handler.traffic;
-import java.util.concurrent.Executor;
-
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.handler.execution.ExecutionHandler;
import io.netty.handler.execution.MemoryAwareThreadPoolExecutor;
import io.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
import io.netty.handler.execution.ObjectSizeEstimator;
+import io.netty.util.Timer;
/**
* This implementation of the {@link AbstractTrafficShapingHandler} is for global
@@ -31,8 +30,8 @@ import io.netty.handler.execution.ObjectSizeEstimator;
* The general use should be as follow:
*
* - Create your unique GlobalTrafficShapingHandler like:
- * GlobalTrafficShapingHandler myHandler = new GlobalTrafficShapingHandler(executor);
- * executor could be created using Executors.newCachedThreadPool();
+ * GlobalTrafficShapingHandler myHandler = new GlobalTrafficShapingHandler(timer);
+ * timer could be created using HashedWheelTimer
* pipeline.addLast("GLOBAL_TRAFFIC_SHAPING", myHandler);
*
* Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created
@@ -52,7 +51,7 @@ import io.netty.handler.execution.ObjectSizeEstimator;
* {@link OrderedMemoryAwareThreadPoolExecutor} or {@link MemoryAwareThreadPoolExecutor}).
* pipeline.addLast("GLOBAL_TRAFFIC_SHAPING", myHandler);
*
- * - When you shutdown your application, release all the external resources like the executor
+ *
- When you shutdown your application, release all the external resources (except the timer internal itself)
* by calling:
* myHandler.releaseExternalResources();
*
@@ -64,96 +63,61 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
* Create the global TrafficCounter
*/
void createGlobalTrafficCounter() {
- TrafficCounter tc = new TrafficCounter(this, executor, "GlobalTC",
- checkInterval);
- setTrafficCounter(tc);
- tc.start();
+ TrafficCounter tc;
+ if (timer != null) {
+ tc = new TrafficCounter(this, timer, "GlobalTC",
+ checkInterval);
+ setTrafficCounter(tc);
+ tc.start();
+ }
}
- /**
- * @param executor
- * @param writeLimit
- * @param readLimit
- * @param checkInterval
- */
- public GlobalTrafficShapingHandler(Executor executor, long writeLimit,
+ public GlobalTrafficShapingHandler(Timer timer, long writeLimit,
long readLimit, long checkInterval) {
- super(executor, writeLimit, readLimit, checkInterval);
+ super(timer, writeLimit, readLimit, checkInterval);
createGlobalTrafficCounter();
}
- /**
- * @param executor
- * @param writeLimit
- * @param readLimit
- */
- public GlobalTrafficShapingHandler(Executor executor, long writeLimit,
+ public GlobalTrafficShapingHandler(Timer timer, long writeLimit,
long readLimit) {
- super(executor, writeLimit, readLimit);
- createGlobalTrafficCounter();
- }
- /**
- * @param executor
- * @param checkInterval
- */
- public GlobalTrafficShapingHandler(Executor executor, long checkInterval) {
- super(executor, checkInterval);
+ super(timer, writeLimit, readLimit);
createGlobalTrafficCounter();
}
- /**
- * @param executor
- */
- public GlobalTrafficShapingHandler(Executor executor) {
- super(executor);
+ public GlobalTrafficShapingHandler(Timer timer, long checkInterval) {
+ super(timer, checkInterval);
+ createGlobalTrafficCounter();
+ }
+
+ public GlobalTrafficShapingHandler(Timer timer) {
+ super(timer);
createGlobalTrafficCounter();
}
- /**
- * @param objectSizeEstimator
- * @param executor
- * @param writeLimit
- * @param readLimit
- * @param checkInterval
- */
public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
- Executor executor, long writeLimit, long readLimit,
+ Timer timer, long writeLimit, long readLimit,
long checkInterval) {
- super(objectSizeEstimator, executor, writeLimit, readLimit,
+ super(objectSizeEstimator, timer, writeLimit, readLimit,
checkInterval);
createGlobalTrafficCounter();
}
- /**
- * @param objectSizeEstimator
- * @param executor
- * @param writeLimit
- * @param readLimit
- */
public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
- Executor executor, long writeLimit, long readLimit) {
- super(objectSizeEstimator, executor, writeLimit, readLimit);
+ Timer timer, long writeLimit, long readLimit) {
+ super(objectSizeEstimator, timer, writeLimit, readLimit);
createGlobalTrafficCounter();
}
- /**
- * @param objectSizeEstimator
- * @param executor
- * @param checkInterval
- */
public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
- Executor executor, long checkInterval) {
- super(objectSizeEstimator, executor, checkInterval);
+ Timer timer, long checkInterval) {
+ super(objectSizeEstimator, timer, checkInterval);
createGlobalTrafficCounter();
}
- /**
- * @param objectSizeEstimator
- * @param executor
- */
public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
- Executor executor) {
- super(objectSizeEstimator, executor);
+ Timer timer) {
+ super(objectSizeEstimator, timer);
createGlobalTrafficCounter();
}
+
}
diff --git a/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java b/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java
index d8cfdb3fb6..4590fb578e 100644
--- a/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java
+++ b/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java
@@ -15,11 +15,14 @@
*/
package io.netty.handler.traffic;
-import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
/**
* TrafficCounter is associated with {@link AbstractTrafficShapingHandler}.
@@ -97,27 +100,31 @@ public class TrafficCounter {
/**
* The associated TrafficShapingHandler
*/
- private AbstractTrafficShapingHandler trafficShapingHandler;
+ private final AbstractTrafficShapingHandler trafficShapingHandler;
/**
- * Default Executor
+ * One Timer for all Counter
*/
- private Executor executor;
+ private final Timer timer; // replace executor
+ /**
+ * Monitor created once in start()
+ */
+ private TimerTask timerTask;
+ /**
+ * used in stop() to cancel the timer
+ */
+ volatile private Timeout timeout = null;
/**
* Is Monitor active
*/
AtomicBoolean monitorActive = new AtomicBoolean();
- /**
- * Monitor
- */
- private TrafficMonitoring trafficMonitoring;
-
/**
* Class to implement monitoring at fix delay
- */
- private static class TrafficMonitoring implements Runnable {
+ *
+ */
+ private static class TrafficMonitoringTask implements TimerTask {
/**
* The associated TrafficShapingHandler
*/
@@ -132,43 +139,30 @@ public class TrafficCounter {
* @param trafficShapingHandler
* @param counter
*/
- protected TrafficMonitoring(
+ protected TrafficMonitoringTask(
AbstractTrafficShapingHandler trafficShapingHandler,
TrafficCounter counter) {
trafficShapingHandler1 = trafficShapingHandler;
this.counter = counter;
}
- /**
- * Default run
- */
- @Override
- public void run() {
- try {
- Thread.currentThread().setName(counter.name);
- for (; counter.monitorActive.get();) {
- long check = counter.checkInterval.get();
- if (check > 0) {
- Thread.sleep(check);
- } else {
- // Delay goes to 0, so exit
- return;
- }
- long endTime = System.currentTimeMillis();
- counter.resetAccounting(endTime);
- if (trafficShapingHandler1 != null) {
- trafficShapingHandler1.doAccounting(counter);
- }
- }
- } catch (InterruptedException e) {
- // End of computations
+ public void run(Timeout timeout) throws Exception {
+ if (!counter.monitorActive.get()) {
+ return;
}
+ long endTime = System.currentTimeMillis();
+ counter.resetAccounting(endTime);
+ if (trafficShapingHandler1 != null) {
+ trafficShapingHandler1.doAccounting(counter);
+ }
+ timeout =
+ counter.timer.newTimeout(this, counter.checkInterval.get(), TimeUnit.MILLISECONDS);
}
}
/**
* Start the monitoring process
- */
+ */
public void start() {
synchronized (lastTime) {
if (monitorActive.get()) {
@@ -177,16 +171,16 @@ public class TrafficCounter {
lastTime.set(System.currentTimeMillis());
if (checkInterval.get() > 0) {
monitorActive.set(true);
- trafficMonitoring = new TrafficMonitoring(
- trafficShapingHandler, this);
- executor.execute(trafficMonitoring);
+ timerTask = new TrafficMonitoringTask(trafficShapingHandler, this);
+ timeout =
+ timer.newTimeout(timerTask, checkInterval.get(), TimeUnit.MILLISECONDS);
}
}
}
/**
* Stop the monitoring process
- */
+ */
public void stop() {
synchronized (lastTime) {
if (!monitorActive.get()) {
@@ -197,6 +191,9 @@ public class TrafficCounter {
if (trafficShapingHandler != null) {
trafficShapingHandler.doAccounting(this);
}
+ if (timeout != null) {
+ timeout.cancel();
+ }
}
}
@@ -222,20 +219,20 @@ public class TrafficCounter {
}
/**
- * Constructor with the {@link AbstractTrafficShapingHandler} that hosts it, the executorService to use, its
+ * Constructor with the {@link AbstractTrafficShapingHandler} that hosts it, the Timer to use, its
* name, the checkInterval between two computations in millisecond
* @param trafficShapingHandler the associated AbstractTrafficShapingHandler
- * @param executor
- * Should be a CachedThreadPool for efficiency
+ * @param timer
+ * Could be a HashedWheelTimer
* @param name
* the name given to this monitor
* @param checkInterval
* the checkInterval in millisecond between two computations
*/
public TrafficCounter(AbstractTrafficShapingHandler trafficShapingHandler,
- Executor executor, String name, long checkInterval) {
+ Timer timer, String name, long checkInterval) {
this.trafficShapingHandler = trafficShapingHandler;
- this.executor = executor;
+ this.timer = timer;
this.name = name;
lastCumulativeTime = System.currentTimeMillis();
configure(checkInterval);
@@ -248,7 +245,7 @@ public class TrafficCounter {
* @param newcheckInterval
*/
public void configure(long newcheckInterval) {
- long newInterval = (newcheckInterval/10)*10;
+ long newInterval = (newcheckInterval / 10) * 10;
if (checkInterval.get() != newInterval) {
checkInterval.set(newInterval);
if (newInterval <= 0) {
diff --git a/handler/src/main/java/io/netty/handler/traffic/package-info.java b/handler/src/main/java/io/netty/handler/traffic/package-info.java
index 2d2c2c2025..f39d8f335c 100644
--- a/handler/src/main/java/io/netty/handler/traffic/package-info.java
+++ b/handler/src/main/java/io/netty/handler/traffic/package-info.java
@@ -17,26 +17,28 @@
/**
* Implementation of a Traffic Shaping Handler and Dynamic Statistics.
*
+ *
+ *
* The main goal of this package is to allow to shape the traffic (bandwidth limitation),
* but also to get statistics on how many bytes are read or written. Both functions can
* be active or inactive (traffic or statistics).
*
* Two classes implement this behavior:
*
- * - {@link io.netty.handler.traffic.TrafficCounter}: this class implements the counters needed by the handlers.
+ *
- {@link TrafficCounter}: this class implements the counters needed by the handlers.
* It can be accessed to get some extra information like the read or write bytes since last check, the read and write
* bandwidth from last check...
*
- * - {@link io.netty.handler.traffic.AbstractTrafficShapingHandler}: this abstract class implements the kernel
+ *
- {@link AbstractTrafficShapingHandler}: this abstract class implements the kernel
* of the traffic shaping. It could be extended to fit your needs. Two classes are proposed as default
- * implementations: see {@link io.netty.handler.traffic.ChannelTrafficShapingHandler} and see {@link io.netty.handler.traffic.GlobalTrafficShapingHandler}
+ * implementations: see {@link ChannelTrafficShapingHandler} and see {@link GlobalTrafficShapingHandler}
* respectively for Channel traffic shaping and Global traffic shaping.
*
* The insertion in the pipeline of one of those handlers can be wherever you want, but
- * it must be placed before any {@link io.netty.handler.execution.MemoryAwareThreadPoolExecutor}
- * in your pipeline.
- * It is really recommended to have such a {@link io.netty.handler.execution.MemoryAwareThreadPoolExecutor}
- * (either non ordered or {@link io.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor}
+ * it must be placed before any {@link MemoryAwareThreadPoolExecutor}
+ * in your pipeline.
+ * It is really recommended to have such a {@link MemoryAwareThreadPoolExecutor}
+ * (either non ordered or {@link OrderedMemoryAwareThreadPoolExecutor}
* ) in your pipeline
* when you want to use this feature with some real traffic shaping, since it will allow to relax the constraint on
* NioWorker to do other jobs if necessary.
@@ -48,9 +50,9 @@
* 60KB/s for each channel since NioWorkers are stopping by this handler.
* When it is used as a read traffic shaper, the handler will set the channel as not readable, so as to relax the
* NioWorkers.
- * An {@link io.netty.util.ObjectSizeEstimator} can be passed at construction to specify what
+ * An {@link ObjectSizeEstimator} can be passed at construction to specify what
* is the size of the object to be read or write accordingly to the type of
- * object. If not specified, it will used the {@link io.netty.util.DefaultObjectSizeEstimator} implementation.
+ * object. If not specified, it will used the {@link DefaultObjectSizeEstimator} implementation.
*
*
* Standard use could be as follow:
@@ -60,27 +62,27 @@
* [Global or per Channel] [Write or Read] Limitation in byte/s.
* A value of 0
* stands for no limitation, so the traffic shaping is deactivate (on what you specified).
- * You can either change those values with the method configure in {@link io.netty.handler.traffic.AbstractTrafficShapingHandler}.
+ * You can either change those values with the method configure in {@link AbstractTrafficShapingHandler}.
*
*
* - To activate or deactivate the statistics, you can adjust the delay to a low (suggested not less than 200ms
* for efficiency reasons) or a high value (let say 24H in millisecond is huge enough to not get the problem)
* or even using 0 which means no computation will be done.
* If you want to do anything with this statistics, just override the doAccounting method.
- * This interval can be changed either from the method configure in {@link io.netty.handler.traffic.AbstractTrafficShapingHandler}
- * or directly using the method configure of {@link io.netty.handler.traffic.TrafficCounter}.
+ * This interval can be changed either from the method configure in {@link AbstractTrafficShapingHandler}
+ * or directly using the method configure of {@link TrafficCounter}.
*
*
*
* So in your application you will create your own TrafficShapingHandler and set the values to fit your needs.
- * XXXXXTrafficShapingHandler myHandler = new XXXXXTrafficShapingHandler(executor);
- * where executor could be created using Executors.newCachedThreadPool(); and XXXXX could be either
+ * XXXXXTrafficShapingHandler myHandler = new XXXXXTrafficShapingHandler(timer);
+ * timer could be created using HashedWheelTimer and XXXXX could be either
* Global or Channel
* pipeline.addLast("XXXXX_TRAFFIC_SHAPING", myHandler);
* ...
* pipeline.addLast("MemoryExecutor",new ExecutionHandler(memoryAwareThreadPoolExecutor));
- * Note that a new {@link io.netty.handler.traffic.ChannelTrafficShapingHandler} must be created for each new channel,
- * but only one {@link io.netty.handler.traffic.GlobalTrafficShapingHandler} must be created for all channels.
+ * Note that a new {@link ChannelTrafficShapingHandler} must be created for each new channel,
+ * but only one {@link GlobalTrafficShapingHandler} must be created for all channels.
*
* Note also that you can create different GlobalTrafficShapingHandler if you want to separate classes of
* channels (for instance either from business point of view or from bind address point of view).