* Renamed following the general naming convention used in Netty

* Renamed 'delay' to 'checkInterval'
* Added some design ideas, TODOs, and FIXMEs
This commit is contained in:
Trustin Lee 2009-04-12 07:16:01 +00:00
parent 7376367973
commit 48e258c810
3 changed files with 125 additions and 137 deletions

View File

@ -45,6 +45,11 @@ import org.jboss.netty.logging.InternalLoggerFactory;
*
*/
public class TrafficCounter {
// XXX: Should the constructor package private?
// We already have TrafficCounterFactory.newChannelTrafficCounter.
// XXX: Should TrafficCounter be able to be instantiated without TrafficCounterFactory?
// TODO: Implement ExternalResourceReleasable
/**
* Internal logger
*/
@ -71,18 +76,18 @@ public class TrafficCounter {
*/
private final AtomicLong cumulativeReadBytes = new AtomicLong(0);
/**
* Last Time where cumulative bytes where reset to zero
* Last Time where cumulative bytes where reset to zero
*/
private long lastCumulativeTime;
/**
* Last writing bandwidth
*/
private long lastWritingThroughput = 0;
private long lastWriteThroughput = 0;
/**
* Last reading bandwidth
*/
private long lastReadingThroughput = 0;
private long lastReadThroughput = 0;
/**
* Last Time Check taken
@ -102,17 +107,17 @@ public class TrafficCounter {
/**
* Current Limit in B/s to apply to write
*/
private long limitWrite = 0;
private long writeLimit = 0;
/**
* Current Limit in B/s to apply to read
*/
private long limitRead = 0;
private long readLimit = 0;
/**
* Delay between two capture
*/
private long checkInterval = TrafficCounterFactory.DEFAULT_DELAY;
private long checkInterval = TrafficCounterFactory.DEFAULT_CHECK_INTERVAL;
// default 1 s
@ -191,7 +196,7 @@ public class TrafficCounter {
long endTime = System.currentTimeMillis();
counter.resetAccounting(endTime);
if (factory1 != null) {
factory1.accounting(counter);
factory1.doAccounting(counter);
}
}
} catch (InterruptedException e) {
@ -230,7 +235,7 @@ public class TrafficCounter {
monitorFuture = null;
resetAccounting(System.currentTimeMillis());
if (factory != null) {
factory.accounting(this);
factory.doAccounting(this);
}
setMonitoredChannel(null);
}
@ -250,9 +255,9 @@ public class TrafficCounter {
}
lastReadBytes = currentReadingBytes.getAndSet(0);
lastWrittenBytes = currentWritingBytes.getAndSet(0);
lastReadingThroughput = lastReadBytes / interval * 1000;
lastReadThroughput = lastReadBytes / interval * 1000;
// nb byte / checkInterval in ms * 1000 (1s)
lastWritingThroughput = lastWrittenBytes / interval * 1000;
lastWriteThroughput = lastWrittenBytes / interval * 1000;
// nb byte / checkInterval in ms * 1000 (1s)
}
}
@ -286,7 +291,7 @@ public class TrafficCounter {
this.factory = factory;
this.executorService = executorService;
this.name = name;
this.lastCumulativeTime = System.currentTimeMillis();
lastCumulativeTime = System.currentTimeMillis();
this.configure(channel, writeLimit, readLimit, checkInterval);
}
@ -322,8 +327,8 @@ public class TrafficCounter {
*/
public void configure(Channel channel, long writeLimit,
long readLimit) {
limitWrite = writeLimit;
limitRead = readLimit;
this.writeLimit = writeLimit;
this.readLimit = readLimit;
setMonitoredChannel(channel);
}
@ -338,12 +343,12 @@ public class TrafficCounter {
* channel
* @param writeLimit
* @param readLimit
* @param delayToSet
* @param checkInterval
*/
public void configure(Channel channel, long writeLimit,
long readLimit, long delayToSet) {
if (checkInterval != delayToSet) {
checkInterval = delayToSet;
long readLimit, long checkInterval) {
if (this.checkInterval != checkInterval) {
this.checkInterval = checkInterval;
if (monitorFuture == null) {
this.configure(channel, writeLimit, readLimit);
return;
@ -371,7 +376,7 @@ public class TrafficCounter {
// Time is too short, so just lets continue
return 0;
}
long wait = currentReadingBytes.get() * 1000 / limitRead -
long wait = currentReadingBytes.get() * 1000 / readLimit -
interval;
return wait;
}
@ -390,7 +395,7 @@ public class TrafficCounter {
return 0;
}
long wait = currentWritingBytes.get() * 1000 /
limitWrite - interval;
writeLimit - interval;
return wait;
}
}
@ -466,7 +471,7 @@ public class TrafficCounter {
throws InterruptedException {
currentReadingBytes.addAndGet(recv);
cumulativeReadBytes.addAndGet(recv);
if (limitRead == 0) {
if (readLimit == 0) {
// no action
return;
}
@ -521,7 +526,7 @@ public class TrafficCounter {
void bytesWriteFlowControl(long write) throws InterruptedException {
currentWritingBytes.addAndGet(write);
cumulativeWrittenBytes.addAndGet(write);
if (limitWrite == 0) {
if (writeLimit == 0) {
return;
}
// compute the number of ms to wait before continue with the channel
@ -546,7 +551,7 @@ public class TrafficCounter {
* @return the current Read Throughput in byte/s
*/
public long getLastReadThroughput() {
return lastReadingThroughput;
return lastReadThroughput;
}
/**
@ -554,7 +559,7 @@ public class TrafficCounter {
* @return the current Write Throughput in byte/s
*/
public long getLastWriteThroughput() {
return lastWritingThroughput;
return lastWriteThroughput;
}
/**
@ -588,18 +593,18 @@ public class TrafficCounter {
}
/**
* @return the lastCumulativeTime in millisecond as of System.currentTimeMillis()
* @return the lastCumulativeTime in millisecond as of System.currentTimeMillis()
* when the cumulative counters were reset to 0.
*/
public long getLastCumulativeTime() {
return this.lastCumulativeTime;
return lastCumulativeTime;
}
/**
* Reset both read and written cumulative bytes counters and the associated time.
*/
public void resetCumulativeTime() {
this.lastCumulativeTime = System.currentTimeMillis();
lastCumulativeTime = System.currentTimeMillis();
cumulativeReadBytes.set(0);
cumulativeWrittenBytes.set(0);
}
@ -610,8 +615,8 @@ public class TrafficCounter {
@Override
public String toString() {
return "Monitor " + name + " Current Speed Read: " +
(lastReadingThroughput >> 10) + " KB/s, Write: " +
(lastWritingThroughput >> 10) + " KB/s Current Read: " +
(lastReadThroughput >> 10) + " KB/s, Write: " +
(lastWriteThroughput >> 10) + " KB/s Current Read: " +
(currentReadingBytes.get() >> 10) + " KB Current Write: " +
(currentWritingBytes.get() >> 10) + " KB";
}

View File

@ -38,11 +38,15 @@ import org.jboss.netty.channel.Channel;
*
*
*/
public abstract class TrafficCounterFactory {
public class TrafficCounterFactory {
// FIXME: Use Executor instead of ExecutorService
// TODO: Read/write limit needs to be configurable on a per-channel basis.
// TODO: Implement ExternalResourceReleasable
/**
* Default delay between two checks: 1s
*/
public static long DEFAULT_DELAY = 1000;
public static long DEFAULT_CHECK_INTERVAL = 1000;
/**
* ExecutorService to associated to any TrafficCounter
@ -62,7 +66,7 @@ public abstract class TrafficCounterFactory {
/**
* Delay between two performance snapshots for channel
*/
private long channelDelay = DEFAULT_DELAY; // default 1 s
private long channelCheckInterval = DEFAULT_CHECK_INTERVAL; // default 1 s
/**
* Will the TrafficCounter for Channel be active
@ -82,7 +86,7 @@ public abstract class TrafficCounterFactory {
/**
* Delay between two performance snapshots for global
*/
private long globalDelay = DEFAULT_DELAY; // default 1 s
private long globalCheckInterval = DEFAULT_CHECK_INTERVAL; // default 1 s
/**
* Will the TrafficCounter for Global be active
@ -101,7 +105,10 @@ public abstract class TrafficCounterFactory {
* @param counter
* the TrafficCounter that computes its performance
*/
protected abstract void accounting(TrafficCounter counter);
@SuppressWarnings("unused")
protected void doAccounting(TrafficCounter counter) {
// NOOP by default
}
/**
*
@ -109,26 +116,26 @@ public abstract class TrafficCounterFactory {
* @param newChannelActive
* @param newChannelWriteLimit
* @param newChannelReadLimit
* @param newChannelDelay
* @param newChannelCheckInterval
* @param newGlobalActive
* @param newGlobalWriteLimit
* @param newGlobalReadLimit
* @param newGlobalDelay
* @param newGlobalCheckInterval
*/
private void init(ExecutorService newexecutorService,
boolean newChannelActive, long newChannelWriteLimit,
long newChannelReadLimit, long newChannelDelay,
long newChannelReadLimit, long newChannelCheckInterval,
boolean newGlobalActive, long newGlobalWriteLimit,
long newGlobalReadLimit, long newGlobalDelay) {
long newGlobalReadLimit, long newGlobalCheckInterval) {
executorService = newexecutorService;
channelActive = newChannelActive;
channelWriteLimit = newChannelWriteLimit;
channelReadLimit = newChannelReadLimit;
channelDelay = newChannelDelay;
channelCheckInterval = newChannelCheckInterval;
globalActive = newGlobalActive;
globalWriteLimit = newGlobalWriteLimit;
globalReadLimit = newGlobalReadLimit;
globalDelay = newGlobalDelay;
globalCheckInterval = newGlobalCheckInterval;
}
/**
@ -142,7 +149,7 @@ public abstract class TrafficCounterFactory {
* NO_LIMIT or a limit in bytes/s
* @param channelReadLimit
* NO_LIMIT or a limit in bytes/s
* @param channelDelay
* @param channelCheckInterval
* The delay between two computations of performances for
* channels or NO_STAT if no stats are to be computed
* @param globalActive
@ -151,17 +158,17 @@ public abstract class TrafficCounterFactory {
* NO_LIMIT or a limit in bytes/s
* @param globalReadLimit
* NO_LIMIT or a limit in bytes/s
* @param globalDelay
* @param globalCheckInterval
* The delay between two computations of performances for global
* context or NO_STAT if no stats are to be computed
*/
public TrafficCounterFactory(ExecutorService executorService,
boolean channelActive, long channelWriteLimit,
long channelReadLimit, long channelDelay, boolean globalActive,
long globalWriteLimit, long globalReadLimit, long globalDelay) {
long channelReadLimit, long channelCheckInterval, boolean globalActive,
long globalWriteLimit, long globalReadLimit, long globalCheckInterval) {
init(executorService, channelActive, channelWriteLimit,
channelReadLimit, channelDelay, globalActive, globalWriteLimit,
globalReadLimit, globalDelay);
channelReadLimit, channelCheckInterval, globalActive, globalWriteLimit,
globalReadLimit, globalCheckInterval);
}
/**
@ -187,8 +194,8 @@ public abstract class TrafficCounterFactory {
long channelReadLimit, boolean globalActive, long globalWriteLimit,
long globalReadLimit) {
init(executorService, channelActive, channelWriteLimit,
channelReadLimit, DEFAULT_DELAY, globalActive,
globalWriteLimit, globalReadLimit, DEFAULT_DELAY);
channelReadLimit, DEFAULT_CHECK_INTERVAL, globalActive,
globalWriteLimit, globalReadLimit, DEFAULT_CHECK_INTERVAL);
}
/**
@ -204,16 +211,16 @@ public abstract class TrafficCounterFactory {
* NO_LIMIT or a limit in bytes/s
* @param globalReadLimit
* NO_LIMIT or a limit in bytes/s
* @param globalDelay
* @param globalCheckInterval
* The delay between two computations of performances for global
* context or NO_STAT if no stats are to be computed
*/
public TrafficCounterFactory(ExecutorService executorService,
boolean channelActive, boolean globalActive, long globalWriteLimit,
long globalReadLimit, long globalDelay) {
long globalReadLimit, long globalCheckInterval) {
init(executorService, channelActive, 0, 0,
DEFAULT_DELAY, globalActive, globalWriteLimit, globalReadLimit,
globalDelay);
DEFAULT_CHECK_INTERVAL, globalActive, globalWriteLimit, globalReadLimit,
globalCheckInterval);
}
/**
@ -234,8 +241,8 @@ public abstract class TrafficCounterFactory {
boolean channelActive, boolean globalActive, long globalWriteLimit,
long globalReadLimit) {
init(executorService, channelActive, 0, 0,
DEFAULT_DELAY, globalActive, globalWriteLimit, globalReadLimit,
DEFAULT_DELAY);
DEFAULT_CHECK_INTERVAL, globalActive, globalWriteLimit, globalReadLimit,
DEFAULT_CHECK_INTERVAL);
}
/**
@ -251,7 +258,7 @@ public abstract class TrafficCounterFactory {
public TrafficCounterFactory(ExecutorService executorService,
boolean channelActive, boolean globalActive) {
init(executorService, channelActive, 0, 0,
DEFAULT_DELAY, globalActive, 0, 0, DEFAULT_DELAY);
DEFAULT_CHECK_INTERVAL, globalActive, 0, 0, DEFAULT_CHECK_INTERVAL);
}
/**
@ -287,24 +294,24 @@ public abstract class TrafficCounterFactory {
*
* @param newchannelWriteLimit
* @param newchannelReadLimit
* @param newchanneldelay
* @param newchannelCheckInterval
* @param newglobalWriteLimit
* @param newglobalReadLimit
* @param newglobaldelay
* @param newGlobalCheckInterval
*/
public void configure(long newchannelWriteLimit,
long newchannelReadLimit, long newchanneldelay,
long newchannelReadLimit, long newchannelCheckInterval,
long newglobalWriteLimit, long newglobalReadLimit,
long newglobaldelay) {
long newGlobalCheckInterval) {
channelWriteLimit = newchannelWriteLimit;
channelReadLimit = newchannelReadLimit;
channelDelay = newchanneldelay;
channelCheckInterval = newchannelCheckInterval;
globalWriteLimit = newglobalWriteLimit;
globalReadLimit = newglobalReadLimit;
globalDelay = newglobaldelay;
globalCheckInterval = newGlobalCheckInterval;
if (globalTrafficMonitor != null) {
globalTrafficMonitor.configure(null,
newglobalWriteLimit, newglobalReadLimit, newglobaldelay);
newglobalWriteLimit, newglobalReadLimit, newGlobalCheckInterval);
}
}
@ -317,7 +324,7 @@ public abstract class TrafficCounterFactory {
globalTrafficMonitor = new TrafficCounter(this,
executorService, null, "GlobalPC",
globalWriteLimit, globalReadLimit,
globalDelay);
globalCheckInterval);
globalTrafficMonitor.start();
}
}
@ -326,17 +333,17 @@ public abstract class TrafficCounterFactory {
/**
* @param channel
* @return the channel TrafficCounter or null if this support is
* disabled
*
* @throws UnsupportedOperationException if per-channel counter is disabled
*/
public TrafficCounter createChannelTrafficCounter(Channel channel) {
public TrafficCounter newChannelTrafficCounter(Channel channel) {
if (channelActive && (channelReadLimit > 0 || channelWriteLimit > 0
|| channelDelay > 0)) {
|| channelCheckInterval > 0)) {
return new TrafficCounter(this, executorService, channel,
"ChannelPC" + channel.getId(), channelWriteLimit,
channelReadLimit, channelDelay);
channelReadLimit, channelCheckInterval);
}
return null;
throw new UnsupportedOperationException("per-channel counter disabled");
}
/**
@ -352,18 +359,18 @@ public abstract class TrafficCounterFactory {
}
/**
* @return the channelDelay
* @return the channelCheckInterval
*/
public long getChannelDelay() {
return channelDelay;
public long getChannelCheckInterval() {
return channelCheckInterval;
}
/**
* @param channelDelay
* the channelDelay to set
* @param channelCheckInterval
* the channelCheckInterval to set
*/
public void setChannelDelay(long channelDelay) {
this.channelDelay = channelDelay;
public void setChannelCheckInterval(long channelCheckInterval) {
this.channelCheckInterval = channelCheckInterval;
}
/**
@ -397,22 +404,22 @@ public abstract class TrafficCounterFactory {
}
/**
* @return the globalDelay
* @return the globalCheckInterval
*/
public long getGlobalDelay() {
return globalDelay;
public long getGlobalCheckInterval() {
return globalCheckInterval;
}
/**
* @param globalDelay
* the globalDelay to set
* @param globalCheckInterval
* the globalCheckInterval to set
*/
public void setGlobalDelay(long globalDelay) {
this.globalDelay = globalDelay;
public void setGlobalCheckInterval(long globalCheckInterval) {
this.globalCheckInterval = globalCheckInterval;
if (globalTrafficMonitor != null) {
globalTrafficMonitor.configure(null,
globalWriteLimit, globalReadLimit,
this.globalDelay);
globalCheckInterval);
}
}
@ -432,7 +439,7 @@ public abstract class TrafficCounterFactory {
if (globalTrafficMonitor != null) {
globalTrafficMonitor.configure(null,
globalWriteLimit, this.globalReadLimit,
globalDelay);
globalCheckInterval);
}
}
@ -452,7 +459,7 @@ public abstract class TrafficCounterFactory {
if (globalTrafficMonitor != null) {
globalTrafficMonitor.configure(null,
this.globalWriteLimit, globalReadLimit,
globalDelay);
globalCheckInterval);
}
}
@ -469,5 +476,4 @@ public abstract class TrafficCounterFactory {
public boolean isGlobalActive() {
return globalActive;
}
}

View File

@ -84,10 +84,10 @@ public class TrafficShapingHandler extends SimpleChannelHandler {
public TrafficShapingHandler(TrafficCounterFactory factory) {
super();
this.factory = factory;
this.globalTrafficCounter = this.factory
globalTrafficCounter = this.factory
.getGlobalTrafficCounter();
this.channelTrafficCounter = null;
this.objectSizeEstimator = new DefaultObjectSizeEstimator();
channelTrafficCounter = null;
objectSizeEstimator = new DefaultObjectSizeEstimator();
// will be set when connected is called
}
/**
@ -97,95 +97,72 @@ public class TrafficShapingHandler extends SimpleChannelHandler {
* the TrafficCounterFactory from which all Monitors will be
* created
* @param objectSizeEstimator
* the {@link ObjectSizeEstimator} that will be used to compute
* the {@link ObjectSizeEstimator} that will be used to compute
* the size of the message
*/
public TrafficShapingHandler(TrafficCounterFactory factory, ObjectSizeEstimator objectSizeEstimator) {
super();
this.factory = factory;
this.globalTrafficCounter = this.factory
globalTrafficCounter = this.factory
.getGlobalTrafficCounter();
this.channelTrafficCounter = null;
channelTrafficCounter = null;
this.objectSizeEstimator = objectSizeEstimator;
// will be set when connected is called
}
/*
* (non-Javadoc)
*
* @see org.jboss.netty.channel.SimpleChannelHandler#messageReceived(org.jboss.netty.channel.ChannelHandlerContext,
* org.jboss.netty.channel.MessageEvent)
*/
@Override
public void messageReceived(ChannelHandlerContext arg0, MessageEvent arg1)
throws Exception {
long size = this.objectSizeEstimator.estimateSize(arg1.getMessage());
if (this.channelTrafficCounter != null) {
this.channelTrafficCounter.bytesRecvFlowControl(arg0, size);
long size = objectSizeEstimator.estimateSize(arg1.getMessage());
if (channelTrafficCounter != null) {
channelTrafficCounter.bytesRecvFlowControl(arg0, size);
}
if (this.globalTrafficCounter != null) {
this.globalTrafficCounter.bytesRecvFlowControl(arg0, size);
if (globalTrafficCounter != null) {
globalTrafficCounter.bytesRecvFlowControl(arg0, size);
}
// The message is then just passed to the next Codec
super.messageReceived(arg0, arg1);
}
/*
* (non-Javadoc)
*
* @see org.jboss.netty.channel.SimpleChannelHandler#writeRequested(org.jboss.netty.channel.ChannelHandlerContext,
* org.jboss.netty.channel.MessageEvent)
*/
@Override
public void writeRequested(ChannelHandlerContext arg0, MessageEvent arg1)
throws Exception {
long size = this.objectSizeEstimator.estimateSize(arg1.getMessage());
if (this.channelTrafficCounter != null) {
this.channelTrafficCounter.bytesWriteFlowControl(size);
long size = objectSizeEstimator.estimateSize(arg1.getMessage());
if (channelTrafficCounter != null) {
channelTrafficCounter.bytesWriteFlowControl(size);
}
if (this.globalTrafficCounter != null) {
this.globalTrafficCounter.bytesWriteFlowControl(size);
if (globalTrafficCounter != null) {
globalTrafficCounter.bytesWriteFlowControl(size);
}
// The message is then just passed to the next Codec
super.writeRequested(arg0, arg1);
}
/*
* (non-Javadoc)
*
* @see org.jboss.netty.channel.SimpleChannelHandler#channelClosed(org.jboss.netty.channel.ChannelHandlerContext,
* org.jboss.netty.channel.ChannelStateEvent)
*/
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
if (this.channelTrafficCounter != null) {
this.channelTrafficCounter.stop();
this.channelTrafficCounter = null;
if (channelTrafficCounter != null) {
channelTrafficCounter.stop();
channelTrafficCounter = null;
}
super.channelClosed(ctx, e);
}
/*
* (non-Javadoc)
*
* @see org.jboss.netty.channel.SimpleChannelHandler#channelConnected(org.jboss.netty.channel.ChannelHandlerContext,
* org.jboss.netty.channel.ChannelStateEvent)
*/
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
// readSuspended = true;
ctx.setAttachment(Boolean.TRUE);
ctx.getChannel().setReadable(false);
if ((this.channelTrafficCounter == null) && (this.factory != null)) {
if (channelTrafficCounter == null && factory != null) {
// A factory was used
this.channelTrafficCounter = this.factory
.createChannelTrafficCounter(ctx.getChannel());
channelTrafficCounter =
factory.newChannelTrafficCounter(ctx.getChannel());
}
if (this.channelTrafficCounter != null) {
this.channelTrafficCounter
if (channelTrafficCounter != null) {
channelTrafficCounter
.setMonitoredChannel(ctx.getChannel());
this.channelTrafficCounter.start();
channelTrafficCounter.start();
}
super.channelConnected(ctx, e);
// readSuspended = false;
@ -221,7 +198,7 @@ public class TrafficShapingHandler extends SimpleChannelHandler {
* in the Factory
*/
public TrafficCounter getChannelTrafficCounter() {
return this.channelTrafficCounter;
return channelTrafficCounter;
}
/**
@ -230,7 +207,7 @@ public class TrafficShapingHandler extends SimpleChannelHandler {
* function was disabled in the Factory
*/
public TrafficCounter getGlobalTrafficCounter() {
return this.globalTrafficCounter;
return globalTrafficCounter;
}
}