Update from comments on ML

This commit is contained in:
Frédéric Brégier 2009-04-04 08:12:08 +00:00
parent 5a96ed2195
commit c86bf34b30
4 changed files with 144 additions and 85 deletions

View File

@ -44,7 +44,7 @@ import org.jboss.netty.logging.InternalLoggerFactory;
* interval. * interval.
* *
*/ */
public class TrafficCounter implements Runnable { public class TrafficCounter {
/** /**
* Internal logger * Internal logger
*/ */
@ -61,15 +61,25 @@ public class TrafficCounter implements Runnable {
*/ */
private final AtomicLong currentReadingBytes = new AtomicLong(0); private final AtomicLong currentReadingBytes = new AtomicLong(0);
/**
* Long life writing bytes
*/
private final AtomicLong cumulativeWritingBytes = new AtomicLong(0);
/**
* Long life reading bytes
*/
private final AtomicLong cumulativeReadingBytes = new AtomicLong(0);
/** /**
* Last writing bandwidth * Last writing bandwidth
*/ */
private long lastWritingBandwidth = 0; private long lastWritingThroughput = 0;
/** /**
* Last reading bandwidth * Last reading bandwidth
*/ */
private long lastReadingBandwidth = 0; private long lastReadingThroughput = 0;
/** /**
* Last Time Check taken * Last Time Check taken
@ -99,7 +109,7 @@ public class TrafficCounter implements Runnable {
/** /**
* Delay between two capture * Delay between two capture
*/ */
private long delay = TrafficCounterFactory.DEFAULT_DELAY; private long checkInterval = TrafficCounterFactory.DEFAULT_DELAY;
// default 1 s // default 1 s
@ -133,18 +143,73 @@ public class TrafficCounter implements Runnable {
*/ */
private Future<?> monitorFuture = null; private Future<?> monitorFuture = null;
/**
* Class to implement monitoring at fix delay
*
*/
private class TrafficMonitoring implements Runnable {
/**
* Delay between two capture
*/
private final long checkInterval1;
/**
* The associated TrafficCounterFactory
*/
private final TrafficCounterFactory factory1;
/**
* The associated TrafficCounter
*/
private final TrafficCounter counter;
/**
* @param checkInterval
* @param factory
* @param counter
*/
protected TrafficMonitoring(long checkInterval,
TrafficCounterFactory factory, TrafficCounter counter) {
this.checkInterval1 = checkInterval;
this.factory1 = factory;
this.counter = counter;
}
/**
* Default run
*/
public void run() {
try {
for (;;) {
if (this.checkInterval1 > 0) {
Thread.sleep(this.checkInterval1);
} else {
// Delay goes to TrafficCounterFactory.NO_STAT, so exit
return;
}
long endTime = System.currentTimeMillis();
this.counter.resetAccounting(endTime);
if (this.factory1 != null) {
this.factory1.accounting(this.counter);
}
}
} catch (InterruptedException e) {
// End of computations
}
}
}
/** /**
* Start the monitoring process * Start the monitoring process
* *
*/ */
public void startMonitoring() { public void start() {
synchronized (this.lastTime) { synchronized (this.lastTime) {
if (this.monitorFuture != null) { if (this.monitorFuture != null) {
return; return;
} }
this.lastTime.set(System.currentTimeMillis()); this.lastTime.set(System.currentTimeMillis());
if (this.delay > 0) { if (this.checkInterval > 0) {
this.monitorFuture = this.executorService.submit(this); this.monitorFuture =
this.executorService.submit(new TrafficMonitoring(this.checkInterval,
this.factory, this));
} }
} }
} }
@ -153,7 +218,7 @@ public class TrafficCounter implements Runnable {
* Stop the monitoring process * Stop the monitoring process
* *
*/ */
public void stopMonitoring() { public void stop() {
synchronized (this.lastTime) { synchronized (this.lastTime) {
if (this.monitorFuture == null) { if (this.monitorFuture == null) {
return; return;
@ -168,35 +233,12 @@ public class TrafficCounter implements Runnable {
} }
} }
/**
* Default run
*/
public void run() {
try {
for (;;) {
if (this.delay > 0) {
Thread.sleep(this.delay);
} else {
// Delay goes to TrafficCounterFactory.NO_STAT, so exit
return;
}
long endTime = System.currentTimeMillis();
resetAccounting(endTime);
if (this.factory != null) {
this.factory.accounting(this);
}
}
} catch (InterruptedException e) {
// End of computations
}
}
/** /**
* Set the accounting on Read and Write * Set the accounting on Read and Write
* *
* @param newLastTime * @param newLastTime
*/ */
private void resetAccounting(long newLastTime) { protected void resetAccounting(long newLastTime) {
synchronized (this.lastTime) { synchronized (this.lastTime) {
long interval = newLastTime - this.lastTime.getAndSet(newLastTime); long interval = newLastTime - this.lastTime.getAndSet(newLastTime);
if (interval == 0) { if (interval == 0) {
@ -205,16 +247,16 @@ public class TrafficCounter implements Runnable {
} }
this.lastReadingBytes = this.currentReadingBytes.getAndSet(0); this.lastReadingBytes = this.currentReadingBytes.getAndSet(0);
this.lastWritingBytes = this.currentWritingBytes.getAndSet(0); this.lastWritingBytes = this.currentWritingBytes.getAndSet(0);
this.lastReadingBandwidth = this.lastReadingBytes / interval * 1000; this.lastReadingThroughput = this.lastReadingBytes / interval * 1000;
// nb byte / delay in ms * 1000 (1s) // nb byte / checkInterval in ms * 1000 (1s)
this.lastWritingBandwidth = this.lastWritingBytes / interval * 1000; this.lastWritingThroughput = this.lastWritingBytes / interval * 1000;
// nb byte / delay in ms * 1000 (1s) // nb byte / checkInterval in ms * 1000 (1s)
} }
} }
/** /**
* Constructor with the executorService to use, the channel if any, its * Constructor with the executorService to use, the channel if any, its
* name, the limits in Byte/s (not Bit/s) and the delay between two * name, the limits in Byte/s (not Bit/s) and the checkInterval between two
* computations in ms * computations in ms
* *
* @param factory * @param factory
@ -232,16 +274,16 @@ public class TrafficCounter implements Runnable {
* the write limit in Byte/s * the write limit in Byte/s
* @param readLimit * @param readLimit
* the read limit in Byte/s * the read limit in Byte/s
* @param delay * @param checkInterval
* the delay in ms between two computations * the checkInterval in ms between two computations
*/ */
public TrafficCounter(TrafficCounterFactory factory, public TrafficCounter(TrafficCounterFactory factory,
ExecutorService executorService, Channel channel, String name, ExecutorService executorService, Channel channel, String name,
long writeLimit, long readLimit, long delay) { long writeLimit, long readLimit, long checkInterval) {
this.factory = factory; this.factory = factory;
this.executorService = executorService; this.executorService = executorService;
this.name = name; this.name = name;
this.changeConfiguration(channel, writeLimit, readLimit, delay); this.configure(channel, writeLimit, readLimit, checkInterval);
} }
/** /**
@ -253,7 +295,7 @@ public class TrafficCounter implements Runnable {
* later on therefore changing its behavior from global to per * later on therefore changing its behavior from global to per
* channel * channel
*/ */
public void setMonitoredChannel(Channel channel) { protected void setMonitoredChannel(Channel channel) {
if (channel != null) { if (channel != null) {
this.monitoredChannel = channel; this.monitoredChannel = channel;
this.isPerChannel = true; this.isPerChannel = true;
@ -264,7 +306,7 @@ public class TrafficCounter implements Runnable {
} }
/** /**
* Specifies limits in Byte/s (not Bit/s) but do not changed the delay * Specifies limits in Byte/s (not Bit/s) but do not changed the checkInterval
* *
* @param channel * @param channel
* Not null means this monitors will be for this channel only, * Not null means this monitors will be for this channel only,
@ -274,7 +316,7 @@ public class TrafficCounter implements Runnable {
* @param writeLimit * @param writeLimit
* @param readLimit * @param readLimit
*/ */
public void changeConfiguration(Channel channel, long writeLimit, public void configure(Channel channel, long writeLimit,
long readLimit) { long readLimit) {
this.limitWrite = writeLimit; this.limitWrite = writeLimit;
this.limitRead = readLimit; this.limitRead = readLimit;
@ -282,7 +324,7 @@ public class TrafficCounter implements Runnable {
} }
/** /**
* Specifies limits in Byte/s (not Bit/s) and the specified delay between * Specifies limits in Byte/s (not Bit/s) and the specified checkInterval between
* two computations in ms * two computations in ms
* *
* @param channel * @param channel
@ -294,23 +336,23 @@ public class TrafficCounter implements Runnable {
* @param readLimit * @param readLimit
* @param delayToSet * @param delayToSet
*/ */
public void changeConfiguration(Channel channel, long writeLimit, public void configure(Channel channel, long writeLimit,
long readLimit, long delayToSet) { long readLimit, long delayToSet) {
if (this.delay != delayToSet) { if (this.checkInterval != delayToSet) {
this.delay = delayToSet; this.checkInterval = delayToSet;
if (this.monitorFuture == null) { if (this.monitorFuture == null) {
this.changeConfiguration(channel, writeLimit, readLimit); this.configure(channel, writeLimit, readLimit);
return; return;
} }
stopMonitoring(); stop();
if (this.delay > 0) { if (this.checkInterval > 0) {
startMonitoring(); start();
} else { } else {
// No more active monitoring // No more active monitoring
this.lastTime.set(System.currentTimeMillis()); this.lastTime.set(System.currentTimeMillis());
} }
} }
this.changeConfiguration(channel, writeLimit, readLimit); this.configure(channel, writeLimit, readLimit);
} }
/** /**
@ -375,7 +417,7 @@ public class TrafficCounter implements Runnable {
* the associated channelHandlerContext * the associated channelHandlerContext
* @param timeToWait * @param timeToWait
*/ */
public ReopenRead(ChannelHandlerContext ctx, protected ReopenRead(ChannelHandlerContext ctx,
TrafficCounter monitor, long timeToWait) { TrafficCounter monitor, long timeToWait) {
this.ctx = ctx; this.ctx = ctx;
this.monitor = monitor; this.monitor = monitor;
@ -416,9 +458,10 @@ public class TrafficCounter implements Runnable {
* the size in bytes to read * the size in bytes to read
* @throws InterruptedException * @throws InterruptedException
*/ */
public void setReceivedBytes(ChannelHandlerContext ctx, long recv) protected void bytesRecvFlowControl(ChannelHandlerContext ctx, long recv)
throws InterruptedException { throws InterruptedException {
this.currentReadingBytes.addAndGet(recv); this.currentReadingBytes.addAndGet(recv);
this.cumulativeReadingBytes.addAndGet(recv);
if (this.limitRead == 0) { if (this.limitRead == 0) {
// no action // no action
return; return;
@ -471,8 +514,9 @@ public class TrafficCounter implements Runnable {
* the size in bytes to write * the size in bytes to write
* @throws InterruptedException * @throws InterruptedException
*/ */
public void setToWriteBytes(long write) throws InterruptedException { protected void bytesWriteFlowControl(long write) throws InterruptedException {
this.currentWritingBytes.addAndGet(write); this.currentWritingBytes.addAndGet(write);
this.cumulativeWritingBytes.addAndGet(write);
if (this.limitWrite == 0) { if (this.limitWrite == 0) {
return; return;
} }
@ -486,32 +530,32 @@ public class TrafficCounter implements Runnable {
/** /**
* *
* @return the current delay between two computations of performance counter * @return the current checkInterval between two computations of performance counter
* in ms * in ms
*/ */
public long getDelay() { public long getCheckInterval() {
return this.delay; return this.checkInterval;
} }
/** /**
* *
* @return the current Read bandwidth in byte/s * @return the current Read Throughput in byte/s
*/ */
public long getLastReadBandwidth() { public long getLastReadThroughput() {
return this.lastReadingBandwidth; return this.lastReadingThroughput;
} }
/** /**
* *
* @return the current Write bandwidth in byte/s * @return the current Write Throughput in byte/s
*/ */
public long getLastWriteBandwidth() { public long getLastWriteThroughput() {
return this.lastWritingBandwidth; return this.lastWritingThroughput;
} }
/** /**
* *
* @return the current number of byte read since last delay * @return the current number of byte read since last checkInterval
*/ */
public long getLastBytesRead() { public long getLastBytesRead() {
return this.lastReadingBytes; return this.lastReadingBytes;
@ -519,21 +563,36 @@ public class TrafficCounter implements Runnable {
/** /**
* *
* @return the current number of byte written since last delay * @return the current number of byte written since last checkInterval
*/ */
public long getLastBytesWrite() { public long getLastBytesWritten() {
return this.lastWritingBytes; return this.lastWritingBytes;
} }
/**
* @return the cumulativeWritingBytes
*/
public long getCumulativeWritingBytes() {
return this.cumulativeWritingBytes.get();
}
/**
* @return the cumulativeReadingBytes
*/
public long getCumulativeReadingBytes() {
return this.cumulativeReadingBytes.get();
}
/** /**
* String information * String information
*/ */
@Override @Override
public String toString() { public String toString() {
return "Monitor " + this.name + " Current Speed Read: " + return "Monitor " + this.name + " Current Speed Read: " +
(this.lastReadingBandwidth >> 10) + " KB/s, Write: " + (this.lastReadingThroughput >> 10) + " KB/s, Write: " +
(this.lastWritingBandwidth >> 10) + " KB/s Current Read: " + (this.lastWritingThroughput >> 10) + " KB/s Current Read: " +
(this.currentReadingBytes.get() >> 10) + " KB Current Write: " + (this.currentReadingBytes.get() >> 10) + " KB Current Write: " +
(this.currentWritingBytes.get() >> 10) + " KB"; (this.currentWritingBytes.get() >> 10) + " KB";
} }
} }

View File

@ -292,7 +292,7 @@ public abstract class TrafficCounterFactory {
* @param newglobalLimitRead * @param newglobalLimitRead
* @param newglobaldelay * @param newglobaldelay
*/ */
public void changeConfiguration(long newchannelLimitWrite, public void configure(long newchannelLimitWrite,
long newchannelLimitRead, long newchanneldelay, long newchannelLimitRead, long newchanneldelay,
long newglobalLimitWrite, long newglobalLimitRead, long newglobalLimitWrite, long newglobalLimitRead,
long newglobaldelay) { long newglobaldelay) {
@ -303,7 +303,7 @@ public abstract class TrafficCounterFactory {
this.globalLimitRead = newglobalLimitRead; this.globalLimitRead = newglobalLimitRead;
this.globalDelay = newglobaldelay; this.globalDelay = newglobaldelay;
if (this.globalTrafficMonitor != null) { if (this.globalTrafficMonitor != null) {
this.globalTrafficMonitor.changeConfiguration(null, this.globalTrafficMonitor.configure(null,
newglobalLimitWrite, newglobalLimitRead, newglobaldelay); newglobalLimitWrite, newglobalLimitRead, newglobaldelay);
} }
} }
@ -318,7 +318,7 @@ public abstract class TrafficCounterFactory {
this.executorService, null, "GlobalPC", this.executorService, null, "GlobalPC",
this.globalLimitWrite, this.globalLimitRead, this.globalLimitWrite, this.globalLimitRead,
this.globalDelay); this.globalDelay);
this.globalTrafficMonitor.startMonitoring(); this.globalTrafficMonitor.start();
} }
} }
return this.globalTrafficMonitor; return this.globalTrafficMonitor;
@ -346,7 +346,7 @@ public abstract class TrafficCounterFactory {
*/ */
public void stopGlobalTrafficCounter() { public void stopGlobalTrafficCounter() {
if (this.globalTrafficMonitor != null) { if (this.globalTrafficMonitor != null) {
this.globalTrafficMonitor.stopMonitoring(); this.globalTrafficMonitor.stop();
this.globalTrafficMonitor = null; this.globalTrafficMonitor = null;
} }
} }
@ -410,7 +410,7 @@ public abstract class TrafficCounterFactory {
public void setGlobalDelay(long globalDelay) { public void setGlobalDelay(long globalDelay) {
this.globalDelay = globalDelay; this.globalDelay = globalDelay;
if (this.globalTrafficMonitor != null) { if (this.globalTrafficMonitor != null) {
this.globalTrafficMonitor.changeConfiguration(null, this.globalTrafficMonitor.configure(null,
this.globalLimitWrite, this.globalLimitRead, this.globalLimitWrite, this.globalLimitRead,
this.globalDelay); this.globalDelay);
} }
@ -430,7 +430,7 @@ public abstract class TrafficCounterFactory {
public void setGlobalLimitRead(long globalLimitRead) { public void setGlobalLimitRead(long globalLimitRead) {
this.globalLimitRead = globalLimitRead; this.globalLimitRead = globalLimitRead;
if (this.globalTrafficMonitor != null) { if (this.globalTrafficMonitor != null) {
this.globalTrafficMonitor.changeConfiguration(null, this.globalTrafficMonitor.configure(null,
this.globalLimitWrite, this.globalLimitRead, this.globalLimitWrite, this.globalLimitRead,
this.globalDelay); this.globalDelay);
} }
@ -450,7 +450,7 @@ public abstract class TrafficCounterFactory {
public void setGlobalLimitWrite(long globalLimitWrite) { public void setGlobalLimitWrite(long globalLimitWrite) {
this.globalLimitWrite = globalLimitWrite; this.globalLimitWrite = globalLimitWrite;
if (this.globalTrafficMonitor != null) { if (this.globalTrafficMonitor != null) {
this.globalTrafficMonitor.changeConfiguration(null, this.globalTrafficMonitor.configure(null,
this.globalLimitWrite, this.globalLimitRead, this.globalLimitWrite, this.globalLimitRead,
this.globalDelay); this.globalDelay);
} }

View File

@ -120,10 +120,10 @@ public abstract class TrafficShapingHandler extends SimpleChannelHandler {
throws Exception { throws Exception {
long size = this.objectSizeEstimator.estimateSize(arg1.getMessage()); long size = this.objectSizeEstimator.estimateSize(arg1.getMessage());
if (this.channelTrafficCounter != null) { if (this.channelTrafficCounter != null) {
this.channelTrafficCounter.setReceivedBytes(arg0, size); this.channelTrafficCounter.bytesRecvFlowControl(arg0, size);
} }
if (this.globalTrafficCounter != null) { if (this.globalTrafficCounter != null) {
this.globalTrafficCounter.setReceivedBytes(arg0, size); this.globalTrafficCounter.bytesRecvFlowControl(arg0, size);
} }
// The message is then just passed to the next Codec // The message is then just passed to the next Codec
super.messageReceived(arg0, arg1); super.messageReceived(arg0, arg1);
@ -140,10 +140,10 @@ public abstract class TrafficShapingHandler extends SimpleChannelHandler {
throws Exception { throws Exception {
long size = this.objectSizeEstimator.estimateSize(arg1.getMessage()); long size = this.objectSizeEstimator.estimateSize(arg1.getMessage());
if (this.channelTrafficCounter != null) { if (this.channelTrafficCounter != null) {
this.channelTrafficCounter.setToWriteBytes(size); this.channelTrafficCounter.bytesWriteFlowControl(size);
} }
if (this.globalTrafficCounter != null) { if (this.globalTrafficCounter != null) {
this.globalTrafficCounter.setToWriteBytes(size); this.globalTrafficCounter.bytesWriteFlowControl(size);
} }
// The message is then just passed to the next Codec // The message is then just passed to the next Codec
super.writeRequested(arg0, arg1); super.writeRequested(arg0, arg1);
@ -159,7 +159,7 @@ public abstract class TrafficShapingHandler extends SimpleChannelHandler {
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception { throws Exception {
if (this.channelTrafficCounter != null) { if (this.channelTrafficCounter != null) {
this.channelTrafficCounter.stopMonitoring(); this.channelTrafficCounter.stop();
this.channelTrafficCounter = null; this.channelTrafficCounter = null;
} }
super.channelClosed(ctx, e); super.channelClosed(ctx, e);
@ -185,7 +185,7 @@ public abstract class TrafficShapingHandler extends SimpleChannelHandler {
if (this.channelTrafficCounter != null) { if (this.channelTrafficCounter != null) {
this.channelTrafficCounter this.channelTrafficCounter
.setMonitoredChannel(ctx.getChannel()); .setMonitoredChannel(ctx.getChannel());
this.channelTrafficCounter.startMonitoring(); this.channelTrafficCounter.start();
} }
super.channelConnected(ctx, e); super.channelConnected(ctx, e);
// readSuspended = false; // readSuspended = false;

View File

@ -66,8 +66,8 @@
* [Global or per Channel] [Write or Read] Limitation in byte/s.</li><br> * [Global or per Channel] [Write or Read] Limitation in byte/s.</li><br>
* A value of <tt>0</tt> * A value of <tt>0</tt>
* stands for no limitation, so the traffic shaping is deactivate (on what you specified).<br> * stands for no limitation, so the traffic shaping is deactivate (on what you specified).<br>
* You can either change those values with the method <tt>changeConfiguration</tt> in TrafficCounterFactory or * You can either change those values with the method <tt>configure</tt> in TrafficCounterFactory or
* directly from the TrafficCounter method <tt>changeConfiguration</tt>.<br> * directly from the TrafficCounter method <tt>configure</tt>.<br>
* <br> * <br>
* *
* <li>To activate or deactivate the statistics, you can adjust the delay to a low (not less than 200ms * <li>To activate or deactivate the statistics, you can adjust the delay to a low (not less than 200ms