Change Executor to Timer from Netty, in reference to Issue #345

This commit is contained in:
Frédéric Brégier 2012-05-20 11:26:26 +03:00
parent 9ba9107267
commit fdd2a9ccfd

View File

@ -15,11 +15,14 @@
*/ */
package org.jboss.netty.handler.traffic; package org.jboss.netty.handler.traffic;
import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.Timer;
import org.jboss.netty.util.TimerTask;
/** /**
* TrafficCounter is associated with {@link AbstractTrafficShapingHandler}.<br> * TrafficCounter is associated with {@link AbstractTrafficShapingHandler}.<br>
@ -97,27 +100,31 @@ public class TrafficCounter {
/** /**
* The associated TrafficShapingHandler * The associated TrafficShapingHandler
*/ */
private AbstractTrafficShapingHandler trafficShapingHandler; private final AbstractTrafficShapingHandler trafficShapingHandler;
/** /**
* Default Executor * One Timer for all Counter
*/ */
private Executor executor; private final Timer timer; // replace executor
/**
* Monitor created once in start()
*/
private TimerTask timerTask;
/**
* used in stop() to cancel the timer
*/
volatile private Timeout timeout = null;
/** /**
* Is Monitor active * Is Monitor active
*/ */
AtomicBoolean monitorActive = new AtomicBoolean(); AtomicBoolean monitorActive = new AtomicBoolean();
/**
* Monitor
*/
private TrafficMonitoring trafficMonitoring;
/** /**
* Class to implement monitoring at fix delay * Class to implement monitoring at fix delay
*/ *
private static class TrafficMonitoring implements Runnable { */
private static class TrafficMonitoringTask implements TimerTask {
/** /**
* The associated TrafficShapingHandler * The associated TrafficShapingHandler
*/ */
@ -132,42 +139,30 @@ public class TrafficCounter {
* @param trafficShapingHandler * @param trafficShapingHandler
* @param counter * @param counter
*/ */
protected TrafficMonitoring( protected TrafficMonitoringTask(
AbstractTrafficShapingHandler trafficShapingHandler, AbstractTrafficShapingHandler trafficShapingHandler,
TrafficCounter counter) { TrafficCounter counter) {
trafficShapingHandler1 = trafficShapingHandler; trafficShapingHandler1 = trafficShapingHandler;
this.counter = counter; this.counter = counter;
} }
/** public void run(Timeout timeout) throws Exception {
* Default run if (!counter.monitorActive.get()) {
*/ return;
public void run() {
try {
Thread.currentThread().setName(counter.name);
for (; counter.monitorActive.get();) {
long check = counter.checkInterval.get();
if (check > 0) {
Thread.sleep(check);
} else {
// Delay goes to 0, so exit
return;
}
long endTime = System.currentTimeMillis();
counter.resetAccounting(endTime);
if (trafficShapingHandler1 != null) {
trafficShapingHandler1.doAccounting(counter);
}
}
} catch (InterruptedException e) {
// End of computations
} }
long endTime = System.currentTimeMillis();
counter.resetAccounting(endTime);
if (trafficShapingHandler1 != null) {
trafficShapingHandler1.doAccounting(counter);
}
timeout =
counter.timer.newTimeout(this, counter.checkInterval.get(), TimeUnit.MILLISECONDS);
} }
} }
/** /**
* Start the monitoring process * Start the monitoring process
*/ */
public void start() { public void start() {
synchronized (lastTime) { synchronized (lastTime) {
if (monitorActive.get()) { if (monitorActive.get()) {
@ -176,16 +171,16 @@ public class TrafficCounter {
lastTime.set(System.currentTimeMillis()); lastTime.set(System.currentTimeMillis());
if (checkInterval.get() > 0) { if (checkInterval.get() > 0) {
monitorActive.set(true); monitorActive.set(true);
trafficMonitoring = new TrafficMonitoring( timerTask = new TrafficMonitoringTask(trafficShapingHandler, this);
trafficShapingHandler, this); timeout =
executor.execute(trafficMonitoring); timer.newTimeout(timerTask, checkInterval.get(), TimeUnit.MILLISECONDS);
} }
} }
} }
/** /**
* Stop the monitoring process * Stop the monitoring process
*/ */
public void stop() { public void stop() {
synchronized (lastTime) { synchronized (lastTime) {
if (!monitorActive.get()) { if (!monitorActive.get()) {
@ -196,6 +191,9 @@ public class TrafficCounter {
if (trafficShapingHandler != null) { if (trafficShapingHandler != null) {
trafficShapingHandler.doAccounting(this); trafficShapingHandler.doAccounting(this);
} }
if (timeout != null) {
timeout.cancel();
}
} }
} }
@ -221,20 +219,20 @@ public class TrafficCounter {
} }
/** /**
* Constructor with the {@link AbstractTrafficShapingHandler} that hosts it, the executorService to use, its * Constructor with the {@link AbstractTrafficShapingHandler} that hosts it, the Timer to use, its
* name, the checkInterval between two computations in millisecond * name, the checkInterval between two computations in millisecond
* @param trafficShapingHandler the associated AbstractTrafficShapingHandler * @param trafficShapingHandler the associated AbstractTrafficShapingHandler
* @param executor * @param timer
* Should be a CachedThreadPool for efficiency * Could be a HashedWheelTimer
* @param name * @param name
* the name given to this monitor * the name given to this monitor
* @param checkInterval * @param checkInterval
* the checkInterval in millisecond between two computations * the checkInterval in millisecond between two computations
*/ */
public TrafficCounter(AbstractTrafficShapingHandler trafficShapingHandler, public TrafficCounter(AbstractTrafficShapingHandler trafficShapingHandler,
Executor executor, String name, long checkInterval) { Timer timer, String name, long checkInterval) {
this.trafficShapingHandler = trafficShapingHandler; this.trafficShapingHandler = trafficShapingHandler;
this.executor = executor; this.timer = timer;
this.name = name; this.name = name;
lastCumulativeTime = System.currentTimeMillis(); lastCumulativeTime = System.currentTimeMillis();
configure(checkInterval); configure(checkInterval);