Various renames on TrafficCounter methods

This commit is contained in:
Trustin Lee 2009-04-08 07:30:40 +00:00
parent e3fc229b03
commit cc181204c0

View File

@ -64,12 +64,12 @@ public class TrafficCounter {
/**
* Long life writing bytes
*/
private final AtomicLong cumulativeWritingBytes = new AtomicLong(0);
private final AtomicLong cumulativeWrittenBytes = new AtomicLong(0);
/**
* Long life reading bytes
*/
private final AtomicLong cumulativeReadingBytes = new AtomicLong(0);
private final AtomicLong cumulativeReadBytes = new AtomicLong(0);
/**
* Last writing bandwidth
@ -89,12 +89,12 @@ public class TrafficCounter {
/**
* Last written bytes number
*/
private long lastWritingBytes = 0;
private long lastWrittenBytes = 0;
/**
* Last read bytes number
*/
private long lastReadingBytes = 0;
private long lastReadBytes = 0;
/**
* Current Limit in B/s to apply to write
@ -160,7 +160,7 @@ public class TrafficCounter {
* The associated TrafficCounter
*/
private final TrafficCounter counter;
/**
* @param checkInterval
* @param factory
@ -168,8 +168,8 @@ public class TrafficCounter {
*/
protected TrafficMonitoring(long checkInterval,
TrafficCounterFactory factory, TrafficCounter counter) {
this.checkInterval1 = checkInterval;
this.factory1 = factory;
checkInterval1 = checkInterval;
factory1 = factory;
this.counter = counter;
}
@ -179,37 +179,37 @@ public class TrafficCounter {
public void run() {
try {
for (;;) {
if (this.checkInterval1 > 0) {
Thread.sleep(this.checkInterval1);
if (checkInterval1 > 0) {
Thread.sleep(checkInterval1);
} else {
// Delay goes to TrafficCounterFactory.NO_STAT, so exit
return;
}
long endTime = System.currentTimeMillis();
this.counter.resetAccounting(endTime);
if (this.factory1 != null) {
this.factory1.accounting(this.counter);
counter.resetAccounting(endTime);
if (factory1 != null) {
factory1.accounting(counter);
}
}
} catch (InterruptedException e) {
// End of computations
}
}
}
}
/**
* Start the monitoring process
*
*/
public void start() {
synchronized (this.lastTime) {
if (this.monitorFuture != null) {
synchronized (lastTime) {
if (monitorFuture != null) {
return;
}
this.lastTime.set(System.currentTimeMillis());
if (this.checkInterval > 0) {
this.monitorFuture =
this.executorService.submit(new TrafficMonitoring(this.checkInterval,
this.factory, this));
lastTime.set(System.currentTimeMillis());
if (checkInterval > 0) {
monitorFuture =
executorService.submit(new TrafficMonitoring(checkInterval,
factory, this));
}
}
}
@ -219,15 +219,15 @@ public class TrafficCounter {
*
*/
public void stop() {
synchronized (this.lastTime) {
if (this.monitorFuture == null) {
synchronized (lastTime) {
if (monitorFuture == null) {
return;
}
this.monitorFuture.cancel(true);
this.monitorFuture = null;
monitorFuture.cancel(true);
monitorFuture = null;
resetAccounting(System.currentTimeMillis());
if (this.factory != null) {
this.factory.accounting(this);
if (factory != null) {
factory.accounting(this);
}
setMonitoredChannel(null);
}
@ -239,17 +239,17 @@ public class TrafficCounter {
* @param newLastTime
*/
protected void resetAccounting(long newLastTime) {
synchronized (this.lastTime) {
long interval = newLastTime - this.lastTime.getAndSet(newLastTime);
synchronized (lastTime) {
long interval = newLastTime - lastTime.getAndSet(newLastTime);
if (interval == 0) {
// nothing to do
return;
}
this.lastReadingBytes = this.currentReadingBytes.getAndSet(0);
this.lastWritingBytes = this.currentWritingBytes.getAndSet(0);
this.lastReadingThroughput = this.lastReadingBytes / interval * 1000;
lastReadBytes = currentReadingBytes.getAndSet(0);
lastWrittenBytes = currentWritingBytes.getAndSet(0);
lastReadingThroughput = lastReadBytes / interval * 1000;
// nb byte / checkInterval in ms * 1000 (1s)
this.lastWritingThroughput = this.lastWritingBytes / interval * 1000;
lastWritingThroughput = lastWrittenBytes / interval * 1000;
// nb byte / checkInterval in ms * 1000 (1s)
}
}
@ -297,11 +297,11 @@ public class TrafficCounter {
*/
protected void setMonitoredChannel(Channel channel) {
if (channel != null) {
this.monitoredChannel = channel;
this.isPerChannel = true;
monitoredChannel = channel;
isPerChannel = true;
} else {
this.isPerChannel = false;
this.monitoredChannel = null;
isPerChannel = false;
monitoredChannel = null;
}
}
@ -318,8 +318,8 @@ public class TrafficCounter {
*/
public void configure(Channel channel, long writeLimit,
long readLimit) {
this.limitWrite = writeLimit;
this.limitRead = readLimit;
limitWrite = writeLimit;
limitRead = readLimit;
setMonitoredChannel(channel);
}
@ -338,18 +338,18 @@ public class TrafficCounter {
*/
public void configure(Channel channel, long writeLimit,
long readLimit, long delayToSet) {
if (this.checkInterval != delayToSet) {
this.checkInterval = delayToSet;
if (this.monitorFuture == null) {
if (checkInterval != delayToSet) {
checkInterval = delayToSet;
if (monitorFuture == null) {
this.configure(channel, writeLimit, readLimit);
return;
}
stop();
if (this.checkInterval > 0) {
if (checkInterval > 0) {
start();
} else {
// No more active monitoring
this.lastTime.set(System.currentTimeMillis());
lastTime.set(System.currentTimeMillis());
}
}
this.configure(channel, writeLimit, readLimit);
@ -361,13 +361,13 @@ public class TrafficCounter {
* be negative time
*/
private long getReadTimeToWait() {
synchronized (this.lastTime) {
long interval = System.currentTimeMillis() - this.lastTime.get();
synchronized (lastTime) {
long interval = System.currentTimeMillis() - lastTime.get();
if (interval == 0) {
// Time is too short, so just lets continue
return 0;
}
long wait = this.currentReadingBytes.get() * 1000 / this.limitRead -
long wait = currentReadingBytes.get() * 1000 / limitRead -
interval;
return wait;
}
@ -379,14 +379,14 @@ public class TrafficCounter {
* be negative time
*/
private long getWriteTimeToWait() {
synchronized (this.lastTime) {
long interval = System.currentTimeMillis() - this.lastTime.get();
synchronized (lastTime) {
long interval = System.currentTimeMillis() - lastTime.get();
if (interval == 0) {
// Time is too short, so just lets continue
return 0;
}
long wait = this.currentWritingBytes.get() * 1000 /
this.limitWrite - interval;
long wait = currentWritingBytes.get() * 1000 /
limitWrite - interval;
return wait;
}
}
@ -429,21 +429,21 @@ public class TrafficCounter {
*/
public void run() {
try {
Thread.sleep(this.timeToWait);
Thread.sleep(timeToWait);
} catch (InterruptedException e) {
// interruption so exit
return;
}
// logger.info("WAKEUP!");
if ((this.monitor != null) &&
(this.monitor.monitoredChannel != null) &&
this.monitor.monitoredChannel.isConnected()) {
if (monitor != null &&
monitor.monitoredChannel != null &&
monitor.monitoredChannel.isConnected()) {
// logger.warn(" setReadable TRUE: "+timeToWait);
if (this.ctx.getHandler() instanceof TrafficShapingHandler) {
if (ctx.getHandler() instanceof TrafficShapingHandler) {
// readSuspended = false;
this.ctx.setAttachment(null);
ctx.setAttachment(null);
}
this.monitor.monitoredChannel.setReadable(true);
monitor.monitoredChannel.setReadable(true);
}
}
}
@ -460,14 +460,14 @@ public class TrafficCounter {
*/
protected void bytesRecvFlowControl(ChannelHandlerContext ctx, long recv)
throws InterruptedException {
this.currentReadingBytes.addAndGet(recv);
this.cumulativeReadingBytes.addAndGet(recv);
if (this.limitRead == 0) {
currentReadingBytes.addAndGet(recv);
cumulativeReadBytes.addAndGet(recv);
if (limitRead == 0) {
// no action
return;
}
if (this.isPerChannel && (this.monitoredChannel != null) &&
(!this.monitoredChannel.isConnected())) {
if (isPerChannel && monitoredChannel != null &&
!monitoredChannel.isConnected()) {
// no action can be taken since setReadable will throw a
// NotYetConnected
return;
@ -476,10 +476,10 @@ public class TrafficCounter {
long wait = getReadTimeToWait();
if (wait > 20) { // At least 20ms seems a minimal time in order to
// try to limit the traffic
if (this.isPerChannel && (this.monitoredChannel != null) &&
this.monitoredChannel.isConnected()) {
if (isPerChannel && monitoredChannel != null &&
monitoredChannel.isConnected()) {
// Channel version
if (this.executorService == null) {
if (executorService == null) {
// Sleep since no executor
Thread.sleep(wait);
return;
@ -489,9 +489,9 @@ public class TrafficCounter {
// readSuspended = true;
ctx.setAttachment(Boolean.TRUE);
}
this.monitoredChannel.setReadable(false);
monitoredChannel.setReadable(false);
// logger.info("Read will wakeup after "+wait+" ms "+this);
this.executorService
executorService
.submit(new ReopenRead(ctx, this, wait));
} else {
// should be waiting: but can occurs sometime so as a FIX
@ -515,9 +515,9 @@ public class TrafficCounter {
* @throws InterruptedException
*/
protected void bytesWriteFlowControl(long write) throws InterruptedException {
this.currentWritingBytes.addAndGet(write);
this.cumulativeWritingBytes.addAndGet(write);
if (this.limitWrite == 0) {
currentWritingBytes.addAndGet(write);
cumulativeWrittenBytes.addAndGet(write);
if (limitWrite == 0) {
return;
}
// compute the number of ms to wait before continue with the channel
@ -534,7 +534,7 @@ public class TrafficCounter {
* in ms
*/
public long getCheckInterval() {
return this.checkInterval;
return checkInterval;
}
/**
@ -542,7 +542,7 @@ public class TrafficCounter {
* @return the current Read Throughput in byte/s
*/
public long getLastReadThroughput() {
return this.lastReadingThroughput;
return lastReadingThroughput;
}
/**
@ -550,37 +550,37 @@ public class TrafficCounter {
* @return the current Write Throughput in byte/s
*/
public long getLastWriteThroughput() {
return this.lastWritingThroughput;
return lastWritingThroughput;
}
/**
*
* @return the current number of byte read since last checkInterval
*/
public long getLastBytesRead() {
return this.lastReadingBytes;
public long getLastReadBytes() {
return lastReadBytes;
}
/**
*
* @return the current number of byte written since last checkInterval
*/
public long getLastBytesWritten() {
return this.lastWritingBytes;
public long getLastWrittenBytes() {
return lastWrittenBytes;
}
/**
* @return the cumulativeWritingBytes
*/
public long getCumulativeWritingBytes() {
return this.cumulativeWritingBytes.get();
public long getCumulativeWrittenBytes() {
return cumulativeWrittenBytes.get();
}
/**
* @return the cumulativeReadingBytes
*/
public long getCumulativeReadingBytes() {
return this.cumulativeReadingBytes.get();
public long getCumulativeReadBytes() {
return cumulativeReadBytes.get();
}
/**
@ -588,11 +588,11 @@ public class TrafficCounter {
*/
@Override
public String toString() {
return "Monitor " + this.name + " Current Speed Read: " +
(this.lastReadingThroughput >> 10) + " KB/s, Write: " +
(this.lastWritingThroughput >> 10) + " KB/s Current Read: " +
(this.currentReadingBytes.get() >> 10) + " KB Current Write: " +
(this.currentWritingBytes.get() >> 10) + " KB";
return "Monitor " + name + " Current Speed Read: " +
(lastReadingThroughput >> 10) + " KB/s, Write: " +
(lastWritingThroughput >> 10) + " KB/s Current Read: " +
(currentReadingBytes.get() >> 10) + " KB Current Write: " +
(currentWritingBytes.get() >> 10) + " KB";
}
}