Same fix than in version 3.5 for Master branch (refer to issue #345)

Will be proposed once the one in 3.5 will be validated
This commit is contained in:
Frédéric Brégier 2012-05-20 16:36:45 +03:00
parent 7018b8453f
commit 54c97d0720

View File

@ -15,7 +15,7 @@
*/
package io.netty.handler.traffic;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.channel.Channel;
@ -30,7 +30,9 @@ import io.netty.handler.execution.DefaultObjectSizeEstimator;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.util.ExternalResourceReleasable;
import io.netty.util.internal.ExecutorUtil;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
/**
* AbstractTrafficShapingHandler allows to limit the global bandwidth
@ -41,6 +43,10 @@ import io.netty.util.internal.ExecutorUtil;
* the method doAccounting of this handler.<br>
* <br>
*
* An {@link ObjectSizeEstimator} can be passed at construction to specify what
* is the size of the object to be read or write accordingly to the type of
* object. If not specified, it will used the {@link DefaultObjectSizeEstimator} implementation.<br><br>
*
* If you want for any particular reasons to stop the monitoring (accounting) or to change
* the read/write limit or the check interval, several methods allow that for you:<br>
* <ul>
@ -79,10 +85,14 @@ public abstract class AbstractTrafficShapingHandler extends
private ObjectSizeEstimator objectSizeEstimator;
/**
* Executor to associated to any TrafficCounter
* Timer to associated to any TrafficCounter
*/
protected Executor executor;
protected Timer timer;
/**
* used in releaseExternalResources() to cancel the timer
*/
volatile private Timeout timeout = null;
/**
* Limit in B/s to apply to write
*/
@ -105,15 +115,16 @@ public abstract class AbstractTrafficShapingHandler extends
*/
final AtomicBoolean release = new AtomicBoolean(false);
private void init(ObjectSizeEstimator newObjectSizeEstimator,
Executor newExecutor, long newWriteLimit, long newReadLimit, long newCheckInterval) {
objectSizeEstimator = newObjectSizeEstimator;
executor = newExecutor;
writeLimit = newWriteLimit;
readLimit = newReadLimit;
checkInterval = newCheckInterval;
//logger.info("TSH: "+writeLimit+":"+readLimit+":"+checkInterval+":"+isPerChannel());
}
private void init(ObjectSizeEstimator newObjectSizeEstimator,
Timer newTimer, long newWriteLimit, long newReadLimit,
long newCheckInterval) {
objectSizeEstimator = newObjectSizeEstimator;
timer = newTimer;
writeLimit = newWriteLimit;
readLimit = newReadLimit;
checkInterval = newCheckInterval;
//logger.warn("TSH: "+writeLimit+":"+readLimit+":"+checkInterval);
}
/**
*
@ -126,8 +137,8 @@ public abstract class AbstractTrafficShapingHandler extends
/**
* Constructor using default {@link ObjectSizeEstimator}
*
* @param executor
* created for instance like Executors.newCachedThreadPool
* @param timer
* created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
* @param writeLimit
* 0 or a limit in bytes/s
* @param readLimit
@ -136,10 +147,9 @@ public abstract class AbstractTrafficShapingHandler extends
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed
*/
public AbstractTrafficShapingHandler(Executor executor, long writeLimit,
public AbstractTrafficShapingHandler(Timer timer, long writeLimit,
long readLimit, long checkInterval) {
init(new DefaultObjectSizeEstimator(), executor, writeLimit, readLimit,
checkInterval);
init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit, checkInterval);
}
/**
@ -148,8 +158,8 @@ public abstract class AbstractTrafficShapingHandler extends
* @param objectSizeEstimator
* the {@link ObjectSizeEstimator} that will be used to compute
* the size of the message
* @param executor
* created for instance like Executors.newCachedThreadPool
* @param timer
* created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
* @param writeLimit
* 0 or a limit in bytes/s
* @param readLimit
@ -159,26 +169,24 @@ public abstract class AbstractTrafficShapingHandler extends
* channels or 0 if no stats are to be computed
*/
public AbstractTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Executor executor,
ObjectSizeEstimator objectSizeEstimator, Timer timer,
long writeLimit, long readLimit, long checkInterval) {
init(objectSizeEstimator, executor, writeLimit, readLimit,
checkInterval);
init(objectSizeEstimator, timer, writeLimit, readLimit, checkInterval);
}
/**
* Constructor using default {@link ObjectSizeEstimator} and using default Check Interval
*
* @param executor
* created for instance like Executors.newCachedThreadPool
* @param timer
* created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
* @param writeLimit
* 0 or a limit in bytes/s
* @param readLimit
* 0 or a limit in bytes/s
*/
public AbstractTrafficShapingHandler(Executor executor, long writeLimit,
public AbstractTrafficShapingHandler(Timer timer, long writeLimit,
long readLimit) {
init(new DefaultObjectSizeEstimator(), executor, writeLimit, readLimit,
DEFAULT_CHECK_INTERVAL);
init(new DefaultObjectSizeEstimator(), timer, writeLimit, readLimit, DEFAULT_CHECK_INTERVAL);
}
/**
@ -187,29 +195,27 @@ public abstract class AbstractTrafficShapingHandler extends
* @param objectSizeEstimator
* the {@link ObjectSizeEstimator} that will be used to compute
* the size of the message
* @param executor
* created for instance like Executors.newCachedThreadPool
* @param timer
* created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
* @param writeLimit
* 0 or a limit in bytes/s
* @param readLimit
* 0 or a limit in bytes/s
*/
public AbstractTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Executor executor,
ObjectSizeEstimator objectSizeEstimator, Timer timer,
long writeLimit, long readLimit) {
init(objectSizeEstimator, executor, writeLimit, readLimit,
DEFAULT_CHECK_INTERVAL);
init(objectSizeEstimator, timer, writeLimit, readLimit, DEFAULT_CHECK_INTERVAL);
}
/**
* Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT and default Check Interval
*
* @param executor
* created for instance like Executors.newCachedThreadPool
* @param timer
* created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
*/
public AbstractTrafficShapingHandler(Executor executor) {
init(new DefaultObjectSizeEstimator(), executor, 0, 0,
DEFAULT_CHECK_INTERVAL);
public AbstractTrafficShapingHandler(Timer timer) {
init(new DefaultObjectSizeEstimator(), timer, 0, 0, DEFAULT_CHECK_INTERVAL);
}
/**
@ -218,25 +224,25 @@ public abstract class AbstractTrafficShapingHandler extends
* @param objectSizeEstimator
* the {@link ObjectSizeEstimator} that will be used to compute
* the size of the message
* @param executor
* created for instance like Executors.newCachedThreadPool
* @param timer
* created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
*/
public AbstractTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Executor executor) {
init(objectSizeEstimator, executor, 0, 0, DEFAULT_CHECK_INTERVAL);
ObjectSizeEstimator objectSizeEstimator, Timer timer) {
init(objectSizeEstimator, timer, 0, 0, DEFAULT_CHECK_INTERVAL);
}
/**
* Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT
*
* @param executor
* created for instance like Executors.newCachedThreadPool
* @param timer
* created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed
*/
public AbstractTrafficShapingHandler(Executor executor, long checkInterval) {
init(new DefaultObjectSizeEstimator(), executor, 0, 0, checkInterval);
public AbstractTrafficShapingHandler(Timer timer, long checkInterval) {
init(new DefaultObjectSizeEstimator(), timer, 0, 0, checkInterval);
}
/**
@ -245,20 +251,24 @@ public abstract class AbstractTrafficShapingHandler extends
* @param objectSizeEstimator
* the {@link ObjectSizeEstimator} that will be used to compute
* the size of the message
* @param executor
* created for instance like Executors.newCachedThreadPool
* @param timer
* created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024)
* @param checkInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed
*/
public AbstractTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Executor executor,
ObjectSizeEstimator objectSizeEstimator, Timer timer,
long checkInterval) {
init(objectSizeEstimator, executor, 0, 0, checkInterval);
init(objectSizeEstimator, timer, 0, 0, checkInterval);
}
/**
* Change the underlying limitations and check interval.
*
* @param newWriteLimit
* @param newReadLimit
* @param newCheckInterval
*/
public void configure(long newWriteLimit, long newReadLimit,
long newCheckInterval) {
@ -268,17 +278,22 @@ public abstract class AbstractTrafficShapingHandler extends
/**
* Change the underlying limitations.
*
* @param newWriteLimit
* @param newReadLimit
*/
public void configure(long newWriteLimit, long newReadLimit) {
writeLimit = newWriteLimit;
readLimit = newReadLimit;
if (trafficCounter != null) {
trafficCounter.resetAccounting(System.currentTimeMillis() + 1);
trafficCounter.resetAccounting(System.currentTimeMillis()+1);
}
}
/**
* Change the check interval.
*
* @param newCheckInterval
*/
public void configure(long newCheckInterval) {
checkInterval = newCheckInterval;
@ -300,46 +315,25 @@ public abstract class AbstractTrafficShapingHandler extends
/**
* Class to implement setReadable at fix time
*/
private class ReopenRead implements Runnable {
/**
* Associated ChannelHandlerContext
*/
private ChannelHandlerContext ctx;
/**
* Time to wait before clearing the channel
*/
private long timeToWait;
/**
* @param ctx
* the associated channelHandlerContext
* @param timeToWait
*/
protected ReopenRead(ChannelHandlerContext ctx, long timeToWait) {
*/
private class ReopenReadTimerTask implements TimerTask {
ChannelHandlerContext ctx;
ReopenReadTimerTask(ChannelHandlerContext ctx) {
this.ctx = ctx;
this.timeToWait = timeToWait;
}
/**
* Truly run the waken up of the channel
*/
@Override
public void run() {
try {
if (release.get()) {
return;
}
Thread.sleep(timeToWait);
} catch (InterruptedException e) {
// interruption so exit
public void run(Timeout timeoutArg) throws Exception {
//logger.warn("Start RRTT: "+release.get());
if (release.get()) {
return;
}
// logger.info("WAKEUP!");
/*
logger.warn("WAKEUP! "+
(ctx != null && ctx.getChannel() != null &&
ctx.getChannel().isConnected()));
*/
if (ctx != null && ctx.getChannel() != null &&
ctx.getChannel().isConnected()) {
//logger.info(" setReadable TRUE: "+timeToWait);
//logger.warn(" setReadable TRUE: ");
// readSuspended = false;
ctx.setAttachment(null);
ctx.getChannel().setReadable(true);
@ -375,17 +369,18 @@ public abstract class AbstractTrafficShapingHandler extends
return;
}
// compute the number of ms to wait before reopening the channel
long wait = getTimeToWait(readLimit, trafficCounter
.getCurrentReadBytes(), trafficCounter.getLastTime(),
curtime);
if (wait > MINIMAL_WAIT) { // At least 10ms seems a minimal time in order to
long wait = getTimeToWait(readLimit,
trafficCounter.getCurrentReadBytes(),
trafficCounter.getLastTime(), curtime);
if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
// time in order to
Channel channel = arg0.getChannel();
// try to limit the traffic
if (channel != null && channel.isConnected()) {
// Channel version
if (executor == null) {
if (timer == null) {
// Sleep since no executor
//logger.info("Read sleep since no executor for "+wait+" ms for "+this);
// logger.warn("Read sleep since no timer for "+wait+" ms for "+this);
if (release.get()) {
return;
}
@ -396,11 +391,14 @@ public abstract class AbstractTrafficShapingHandler extends
// readSuspended = true;
arg0.setAttachment(Boolean.TRUE);
channel.setReadable(false);
//logger.info("Read will wakeup after "+wait+" ms "+this);
executor.execute(new ReopenRead(arg0, wait));
// logger.warn("Read will wakeup after "+wait+" ms "+this);
TimerTask timerTask = new ReopenReadTimerTask(arg0);
timeout = timer.newTimeout(timerTask, wait,
TimeUnit.MILLISECONDS);
} else {
// should be waiting: but can occurs sometime so as a FIX
//logger.info("Read sleep ok but should not be here: "+wait+" "+this);
// should be waiting: but can occurs sometime so as
// a FIX
// logger.warn("Read sleep ok but should not be here: "+wait+" "+this);
if (release.get()) {
return;
}
@ -408,7 +406,7 @@ public abstract class AbstractTrafficShapingHandler extends
}
} else {
// Not connected or no channel
//logger.info("Read sleep "+wait+" ms for "+this);
// logger.warn("Read sleep "+wait+" ms for "+this);
if (release.get()) {
return;
}
@ -433,11 +431,12 @@ public abstract class AbstractTrafficShapingHandler extends
if (writeLimit == 0) {
return;
}
// compute the number of ms to wait before continue with the channel
long wait = getTimeToWait(writeLimit, trafficCounter
.getCurrentWrittenBytes(), trafficCounter.getLastTime(),
curtime);
if (wait > MINIMAL_WAIT) {
// compute the number of ms to wait before continue with the
// channel
long wait = getTimeToWait(writeLimit,
trafficCounter.getCurrentWrittenBytes(),
trafficCounter.getLastTime(), curtime);
if (wait >= MINIMAL_WAIT) {
// Global or Channel
if (release.get()) {
return;
@ -450,7 +449,6 @@ public abstract class AbstractTrafficShapingHandler extends
super.writeRequested(arg0, arg1);
}
}
@Override
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
@ -481,13 +479,14 @@ public abstract class AbstractTrafficShapingHandler extends
return trafficCounter;
}
@Override
public void releaseExternalResources() {
if (trafficCounter != null) {
trafficCounter.stop();
}
release.set(true);
ExecutorUtil.terminate(executor);
if (timeout != null) {
timeout.cancel();
}
}
@Override