Change Executor to Timer from Netty, in reference to Issue #345

This commit is contained in:
Frédéric Brégier 2012-05-20 11:24:12 +03:00
parent 16e271325d
commit 21284c430e

View File

@ -15,483 +15,385 @@
*/
package org.jboss.netty.handler.traffic;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.DefaultObjectSizeEstimator;
import org.jboss.netty.util.ExternalResourceReleasable;
import org.jboss.netty.util.ObjectSizeEstimator;
import org.jboss.netty.util.internal.ExecutorUtil;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.Timer;
import org.jboss.netty.util.TimerTask;
/**
* AbstractTrafficShapingHandler allows to limit the global bandwidth
* (see {@link GlobalTrafficShapingHandler}) or per session
* bandwidth (see {@link ChannelTrafficShapingHandler}), as traffic shaping.
* It allows too to implement an almost real time monitoring of the bandwidth using
* the monitors from {@link TrafficCounter} that will call back every checkInterval
* the method doAccounting of this handler.<br>
* TrafficCounter is associated with {@link AbstractTrafficShapingHandler}.<br>
* <br>
*
* If you want for any particular reasons to stop the monitoring (accounting) or to change
* the read/write limit or the check interval, several methods allow that for you:<br>
* <ul>
* <li><tt>configure</tt> allows you to change read or write limits, or the checkInterval</li>
* <li><tt>getTrafficCounter</tt> allows you to have access to the TrafficCounter and so to stop
* or start the monitoring, to change the checkInterval directly, or to have access to its values.</li>
* <li></li>
* </ul>
* A TrafficCounter has for goal to count the traffic in order to enable to limit the traffic or not,
* globally or per channel. It compute statistics on read and written bytes at the specified
* interval and call back the {@link AbstractTrafficShapingHandler} doAccounting method at every
* specified interval. If this interval is set to 0, therefore no accounting will be done and only
* statistics will be computed at each receive or write operations.
*/
public abstract class AbstractTrafficShapingHandler extends
SimpleChannelHandler implements ExternalResourceReleasable {
public class TrafficCounter {
/**
* Internal logger
* Current written bytes
*/
static InternalLogger logger = InternalLoggerFactory
.getInstance(AbstractTrafficShapingHandler.class);
private final AtomicLong currentWrittenBytes = new AtomicLong();
/**
* Default delay between two checks: 1s
* Current read bytes
*/
public static final long DEFAULT_CHECK_INTERVAL = 1000;
private final AtomicLong currentReadBytes = new AtomicLong();
/**
* Default minimal time to wait
* Long life written bytes
*/
private static final long MINIMAL_WAIT = 10;
private final AtomicLong cumulativeWrittenBytes = new AtomicLong();
/**
* Traffic Counter
* Long life read bytes
*/
protected TrafficCounter trafficCounter;
private final AtomicLong cumulativeReadBytes = new AtomicLong();
/**
* ObjectSizeEstimator
* Last Time where cumulative bytes where reset to zero
*/
private ObjectSizeEstimator objectSizeEstimator;
private long lastCumulativeTime;
/**
* Executor to associated to any TrafficCounter
* Last writing bandwidth
*/
protected Executor executor;
private long lastWriteThroughput;
/**
* Limit in B/s to apply to write
* Last reading bandwidth
*/
private long writeLimit;
private long lastReadThroughput;
/**
* Limit in B/s to apply to read
* Last Time Check taken
*/
private long readLimit;
private final AtomicLong lastTime = new AtomicLong();
/**
* Delay between two performance snapshots
* Last written bytes number during last check interval
*/
protected long checkInterval = DEFAULT_CHECK_INTERVAL; // default 1 s
private long lastWrittenBytes;
/**
* Boolean associated with the release of this TrafficShapingHandler.
* It will be true only once when the releaseExternalRessources is called
* to prevent waiting when shutdown.
* Last read bytes number during last check interval
*/
final AtomicBoolean release = new AtomicBoolean(false);
private void init(ObjectSizeEstimator newObjectSizeEstimator,
Executor newExecutor, long newWriteLimit, long newReadLimit, long newCheckInterval) {
objectSizeEstimator = newObjectSizeEstimator;
executor = newExecutor;
writeLimit = newWriteLimit;
readLimit = newReadLimit;
checkInterval = newCheckInterval;
//logger.info("TSH: "+writeLimit+":"+readLimit+":"+checkInterval+":"+isPerChannel());
}
private long lastReadBytes;
/**
* Delay between two captures
*/
AtomicLong checkInterval = new AtomicLong(
AbstractTrafficShapingHandler.DEFAULT_CHECK_INTERVAL);
// default 1 s
/**
* Name of this Monitor
*/
final String name;
/**
* The associated TrafficShapingHandler
*/
private final AbstractTrafficShapingHandler trafficShapingHandler;
/**
* One Timer for all Counter
*/
private final Timer timer; // replace executor
/**
* Monitor created once in start()
*/
private TimerTask timerTask;
/**
* used in stop() to cancel the timer
*/
volatile private Timeout timeout = null;
/**
* Is Monitor active
*/
AtomicBoolean monitorActive = new AtomicBoolean();
/**
* Class to implement monitoring at fix delay
*
* @param newTrafficCounter the TrafficCounter to set
*/
void setTrafficCounter(TrafficCounter newTrafficCounter) {
trafficCounter = newTrafficCounter;
}
/**
* Constructor using default {@link ObjectSizeEstimator}
*
* @param executor
* created for instance like Executors.newCachedThreadPool
* @param writeLimit
* 0 or a limit in bytes/s
* @param readLimit
* 0 or a limit in bytes/s
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed
*/
public AbstractTrafficShapingHandler(Executor executor, long writeLimit,
long readLimit, long checkInterval) {
init(new DefaultObjectSizeEstimator(), executor, writeLimit, readLimit,
checkInterval);
}
/**
* Constructor using the specified ObjectSizeEstimator
*
* @param objectSizeEstimator
* the {@link ObjectSizeEstimator} that will be used to compute
* the size of the message
* @param executor
* created for instance like Executors.newCachedThreadPool
* @param writeLimit
* 0 or a limit in bytes/s
* @param readLimit
* 0 or a limit in bytes/s
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed
*/
public AbstractTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Executor executor,
long writeLimit, long readLimit, long checkInterval) {
init(objectSizeEstimator, executor, writeLimit, readLimit,
checkInterval);
}
/**
* Constructor using default {@link ObjectSizeEstimator} and using default Check Interval
*
* @param executor
* created for instance like Executors.newCachedThreadPool
* @param writeLimit
* 0 or a limit in bytes/s
* @param readLimit
* 0 or a limit in bytes/s
*/
public AbstractTrafficShapingHandler(Executor executor, long writeLimit,
long readLimit) {
init(new DefaultObjectSizeEstimator(), executor, writeLimit, readLimit,
DEFAULT_CHECK_INTERVAL);
}
/**
* Constructor using the specified ObjectSizeEstimator and using default Check Interval
*
* @param objectSizeEstimator
* the {@link ObjectSizeEstimator} that will be used to compute
* the size of the message
* @param executor
* created for instance like Executors.newCachedThreadPool
* @param writeLimit
* 0 or a limit in bytes/s
* @param readLimit
* 0 or a limit in bytes/s
*/
public AbstractTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Executor executor,
long writeLimit, long readLimit) {
init(objectSizeEstimator, executor, writeLimit, readLimit,
DEFAULT_CHECK_INTERVAL);
}
/**
* Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT and default Check Interval
*
* @param executor
* created for instance like Executors.newCachedThreadPool
*/
public AbstractTrafficShapingHandler(Executor executor) {
init(new DefaultObjectSizeEstimator(), executor, 0, 0,
DEFAULT_CHECK_INTERVAL);
}
/**
* Constructor using the specified ObjectSizeEstimator and using NO LIMIT and default Check Interval
*
* @param objectSizeEstimator
* the {@link ObjectSizeEstimator} that will be used to compute
* the size of the message
* @param executor
* created for instance like Executors.newCachedThreadPool
*/
public AbstractTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Executor executor) {
init(objectSizeEstimator, executor, 0, 0, DEFAULT_CHECK_INTERVAL);
}
/**
* Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT
*
* @param executor
* created for instance like Executors.newCachedThreadPool
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed
*/
public AbstractTrafficShapingHandler(Executor executor, long checkInterval) {
init(new DefaultObjectSizeEstimator(), executor, 0, 0, checkInterval);
}
/**
* Constructor using the specified ObjectSizeEstimator and using NO LIMIT
*
* @param objectSizeEstimator
* the {@link ObjectSizeEstimator} that will be used to compute
* the size of the message
* @param executor
* created for instance like Executors.newCachedThreadPool
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed
*/
public AbstractTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Executor executor,
long checkInterval) {
init(objectSizeEstimator, executor, 0, 0, checkInterval);
}
/**
* Change the underlying limitations and check interval.
*/
public void configure(long newWriteLimit, long newReadLimit,
long newCheckInterval) {
this.configure(newWriteLimit, newReadLimit);
this.configure(newCheckInterval);
}
/**
* Change the underlying limitations.
*/
public void configure(long newWriteLimit, long newReadLimit) {
writeLimit = newWriteLimit;
readLimit = newReadLimit;
if (trafficCounter != null) {
trafficCounter.resetAccounting(System.currentTimeMillis() + 1);
}
}
/**
* Change the check interval.
*/
public void configure(long newCheckInterval) {
checkInterval = newCheckInterval;
if (trafficCounter != null) {
trafficCounter.configure(checkInterval);
}
}
/**
* Called each time the accounting is computed from the TrafficCounters.
* This method could be used for instance to implement almost real time accounting.
*
* @param counter
* the TrafficCounter that computes its performance
*/
protected void doAccounting(TrafficCounter counter) {
// NOOP by default
}
/**
* Class to implement setReadable at fix time
*/
private class ReopenRead implements Runnable {
private static class TrafficMonitoringTask implements TimerTask {
/**
* Associated ChannelHandlerContext
* The associated TrafficShapingHandler
*/
private ChannelHandlerContext ctx;
private final AbstractTrafficShapingHandler trafficShapingHandler1;
/**
* Time to wait before clearing the channel
* The associated TrafficCounter
*/
private long timeToWait;
private final TrafficCounter counter;
/**
* @param ctx
* the associated channelHandlerContext
* @param timeToWait
* @param trafficShapingHandler
* @param counter
*/
protected ReopenRead(ChannelHandlerContext ctx, long timeToWait) {
this.ctx = ctx;
this.timeToWait = timeToWait;
protected TrafficMonitoringTask(
AbstractTrafficShapingHandler trafficShapingHandler,
TrafficCounter counter) {
trafficShapingHandler1 = trafficShapingHandler;
this.counter = counter;
}
/**
* Truly run the waken up of the channel
*/
public void run() {
try {
if (release.get()) {
return;
}
Thread.sleep(timeToWait);
} catch (InterruptedException e) {
// interruption so exit
public void run(Timeout timeout) throws Exception {
if (!counter.monitorActive.get()) {
return;
}
// logger.info("WAKEUP!");
if (ctx != null && ctx.getChannel() != null &&
ctx.getChannel().isConnected()) {
//logger.info(" setReadable TRUE: "+timeToWait);
// readSuspended = false;
ctx.setAttachment(null);
ctx.getChannel().setReadable(true);
long endTime = System.currentTimeMillis();
counter.resetAccounting(endTime);
if (trafficShapingHandler1 != null) {
trafficShapingHandler1.doAccounting(counter);
}
timeout =
counter.timer.newTimeout(this, counter.checkInterval.get(), TimeUnit.MILLISECONDS);
}
}
/**
* Start the monitoring process
*/
public void start() {
synchronized (lastTime) {
if (monitorActive.get()) {
return;
}
lastTime.set(System.currentTimeMillis());
if (checkInterval.get() > 0) {
monitorActive.set(true);
timerTask = new TrafficMonitoringTask(trafficShapingHandler, this);
timeout =
timer.newTimeout(timerTask, checkInterval.get(), TimeUnit.MILLISECONDS);
}
}
}
/**
* Stop the monitoring process
*/
public void stop() {
synchronized (lastTime) {
if (!monitorActive.get()) {
return;
}
monitorActive.set(false);
resetAccounting(System.currentTimeMillis());
if (trafficShapingHandler != null) {
trafficShapingHandler.doAccounting(this);
}
if (timeout != null) {
timeout.cancel();
}
}
}
/**
* Reset the accounting on Read and Write
*
* @param newLastTime
*/
void resetAccounting(long newLastTime) {
synchronized (lastTime) {
long interval = newLastTime - lastTime.getAndSet(newLastTime);
if (interval == 0) {
// nothing to do
return;
}
lastReadBytes = currentReadBytes.getAndSet(0);
lastWrittenBytes = currentWrittenBytes.getAndSet(0);
lastReadThroughput = lastReadBytes / interval * 1000;
// nb byte / checkInterval in ms * 1000 (1s)
lastWriteThroughput = lastWrittenBytes / interval * 1000;
// nb byte / checkInterval in ms * 1000 (1s)
}
}
/**
* Constructor with the {@link AbstractTrafficShapingHandler} that hosts it, the Timer to use, its
* name, the checkInterval between two computations in millisecond
* @param trafficShapingHandler the associated AbstractTrafficShapingHandler
* @param timer
* Could be a HashedWheelTimer
* @param name
* the name given to this monitor
* @param checkInterval
* the checkInterval in millisecond between two computations
*/
public TrafficCounter(AbstractTrafficShapingHandler trafficShapingHandler,
Timer timer, String name, long checkInterval) {
this.trafficShapingHandler = trafficShapingHandler;
this.timer = timer;
this.name = name;
lastCumulativeTime = System.currentTimeMillis();
configure(checkInterval);
}
/**
* Change checkInterval between
* two computations in millisecond
*
* @param newcheckInterval
*/
public void configure(long newcheckInterval) {
long newInterval = (newcheckInterval/10)*10;
if (checkInterval.get() != newInterval) {
checkInterval.set(newInterval);
if (newInterval <= 0) {
stop();
// No more active monitoring
lastTime.set(System.currentTimeMillis());
} else {
// Start if necessary
start();
}
}
}
/**
* Computes counters for Read.
*
* @param ctx
* the associated channelHandlerContext
* @param recv
* the size in bytes to read
*/
void bytesRecvFlowControl(ChannelHandlerContext ctx, long recv) {
currentReadBytes.addAndGet(recv);
cumulativeReadBytes.addAndGet(recv);
}
/**
* Computes counters for Write.
*
* @param write
* the size in bytes to write
*/
void bytesWriteFlowControl(long write) {
currentWrittenBytes.addAndGet(write);
cumulativeWrittenBytes.addAndGet(write);
}
/**
*
* @return the current checkInterval between two computations of traffic counter
* in millisecond
*/
public long getCheckInterval() {
return checkInterval.get();
}
/**
*
* @return the Read Throughput in bytes/s computes in the last check interval
*/
public long getLastReadThroughput() {
return lastReadThroughput;
}
/**
*
* @return the Write Throughput in bytes/s computes in the last check interval
*/
public long getLastWriteThroughput() {
return lastWriteThroughput;
}
/**
*
* @return the number of bytes read during the last check Interval
*/
public long getLastReadBytes() {
return lastReadBytes;
}
/**
*
* @return the number of bytes written during the last check Interval
*/
public long getLastWrittenBytes() {
return lastWrittenBytes;
}
/**
*
* @return the time that should be necessary to wait to respect limit. Can
* be negative time
* @return the current number of bytes read since the last checkInterval
*/
private long getTimeToWait(long limit, long bytes, long lastTime,
long curtime) {
long interval = curtime - lastTime;
if (interval == 0) {
// Time is too short, so just lets continue
return 0;
}
return ((bytes * 1000 / limit - interval)/10)*10;
}
@Override
public void messageReceived(ChannelHandlerContext arg0, MessageEvent arg1)
throws Exception {
try {
long curtime = System.currentTimeMillis();
long size = objectSizeEstimator.estimateSize(arg1.getMessage());
if (trafficCounter != null) {
trafficCounter.bytesRecvFlowControl(arg0, size);
if (readLimit == 0) {
// no action
return;
}
// compute the number of ms to wait before reopening the channel
long wait = getTimeToWait(readLimit, trafficCounter
.getCurrentReadBytes(), trafficCounter.getLastTime(),
curtime);
if (wait > MINIMAL_WAIT) { // At least 10ms seems a minimal time in order to
Channel channel = arg0.getChannel();
// try to limit the traffic
if (channel != null && channel.isConnected()) {
// Channel version
if (executor == null) {
// Sleep since no executor
//logger.info("Read sleep since no executor for "+wait+" ms for "+this);
if (release.get()) {
return;
}
Thread.sleep(wait);
return;
}
if (arg0.getAttachment() == null) {
// readSuspended = true;
arg0.setAttachment(Boolean.TRUE);
channel.setReadable(false);
//logger.info("Read will wakeup after "+wait+" ms "+this);
executor.execute(new ReopenRead(arg0, wait));
} else {
// should be waiting: but can occurs sometime so as a FIX
//logger.info("Read sleep ok but should not be here: "+wait+" "+this);
if (release.get()) {
return;
}
Thread.sleep(wait);
}
} else {
// Not connected or no channel
//logger.info("Read sleep "+wait+" ms for "+this);
if (release.get()) {
return;
}
Thread.sleep(wait);
}
}
}
} finally {
// The message is then just passed to the next handler
super.messageReceived(arg0, arg1);
}
}
@Override
public void writeRequested(ChannelHandlerContext arg0, MessageEvent arg1)
throws Exception {
try {
long curtime = System.currentTimeMillis();
long size = objectSizeEstimator.estimateSize(arg1.getMessage());
if (trafficCounter != null) {
trafficCounter.bytesWriteFlowControl(size);
if (writeLimit == 0) {
return;
}
// compute the number of ms to wait before continue with the channel
long wait = getTimeToWait(writeLimit, trafficCounter
.getCurrentWrittenBytes(), trafficCounter.getLastTime(),
curtime);
if (wait > MINIMAL_WAIT) {
// Global or Channel
if (release.get()) {
return;
}
Thread.sleep(wait);
}
}
} finally {
// The message is then just passed to the next handler
super.writeRequested(arg0, arg1);
}
}
@Override
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
if (e instanceof ChannelStateEvent) {
ChannelStateEvent cse = (ChannelStateEvent) e;
if (cse.getState() == ChannelState.INTEREST_OPS &&
(((Integer) cse.getValue()).intValue() & Channel.OP_READ) != 0) {
// setReadable(true) requested
boolean readSuspended = ctx.getAttachment() != null;
if (readSuspended) {
// Drop the request silently if this handler has
// set the flag.
e.getFuture().setSuccess();
return;
}
}
}
super.handleDownstream(ctx, e);
public long getCurrentReadBytes() {
return currentReadBytes.get();
}
/**
*
* @return the current TrafficCounter (if
* channel is still connected)
* @return the current number of bytes written since the last check Interval
*/
public TrafficCounter getTrafficCounter() {
return trafficCounter;
public long getCurrentWrittenBytes() {
return currentWrittenBytes.get();
}
public void releaseExternalResources() {
if (trafficCounter != null) {
trafficCounter.stop();
}
release.set(true);
ExecutorUtil.terminate(executor);
/**
* @return the Time in millisecond of the last check as of System.currentTimeMillis()
*/
public long getLastTime() {
return lastTime.get();
}
/**
* @return the cumulativeWrittenBytes
*/
public long getCumulativeWrittenBytes() {
return cumulativeWrittenBytes.get();
}
/**
* @return the cumulativeReadBytes
*/
public long getCumulativeReadBytes() {
return cumulativeReadBytes.get();
}
/**
* @return the lastCumulativeTime in millisecond as of System.currentTimeMillis()
* when the cumulative counters were reset to 0.
*/
public long getLastCumulativeTime() {
return lastCumulativeTime;
}
/**
* Reset both read and written cumulative bytes counters and the associated time.
*/
public void resetCumulativeTime() {
lastCumulativeTime = System.currentTimeMillis();
cumulativeReadBytes.set(0);
cumulativeWrittenBytes.set(0);
}
/**
* @return the name
*/
public String getName() {
return name;
}
/**
* String information
*/
@Override
public String toString() {
return "TrafficShaping with Write Limit: " + writeLimit +
" Read Limit: " + readLimit + " and Counter: " +
(trafficCounter != null? trafficCounter.toString() : "none");
return "Monitor " + name + " Current Speed Read: " +
(lastReadThroughput >> 10) + " KB/s, Write: " +
(lastWriteThroughput >> 10) + " KB/s Current Read: " +
(currentReadBytes.get() >> 10) + " KB Current Write: " +
(currentWrittenBytes.get() >> 10) + " KB";
}
}