diff --git a/src/main/java/org/jboss/netty/handler/traffic/TrafficCounter.java b/src/main/java/org/jboss/netty/handler/traffic/TrafficCounter.java index 0d1f8a79c5..6fb8e6fdcb 100644 --- a/src/main/java/org/jboss/netty/handler/traffic/TrafficCounter.java +++ b/src/main/java/org/jboss/netty/handler/traffic/TrafficCounter.java @@ -45,6 +45,11 @@ import org.jboss.netty.logging.InternalLoggerFactory; * */ public class TrafficCounter { + // XXX: Should the constructor package private? + // We already have TrafficCounterFactory.newChannelTrafficCounter. + // XXX: Should TrafficCounter be able to be instantiated without TrafficCounterFactory? + // TODO: Implement ExternalResourceReleasable + /** * Internal logger */ @@ -71,18 +76,18 @@ public class TrafficCounter { */ private final AtomicLong cumulativeReadBytes = new AtomicLong(0); /** - * Last Time where cumulative bytes where reset to zero + * Last Time where cumulative bytes where reset to zero */ private long lastCumulativeTime; /** * Last writing bandwidth */ - private long lastWritingThroughput = 0; + private long lastWriteThroughput = 0; /** * Last reading bandwidth */ - private long lastReadingThroughput = 0; + private long lastReadThroughput = 0; /** * Last Time Check taken @@ -102,17 +107,17 @@ public class TrafficCounter { /** * Current Limit in B/s to apply to write */ - private long limitWrite = 0; + private long writeLimit = 0; /** * Current Limit in B/s to apply to read */ - private long limitRead = 0; + private long readLimit = 0; /** * Delay between two capture */ - private long checkInterval = TrafficCounterFactory.DEFAULT_DELAY; + private long checkInterval = TrafficCounterFactory.DEFAULT_CHECK_INTERVAL; // default 1 s @@ -191,7 +196,7 @@ public class TrafficCounter { long endTime = System.currentTimeMillis(); counter.resetAccounting(endTime); if (factory1 != null) { - factory1.accounting(counter); + factory1.doAccounting(counter); } } } catch (InterruptedException e) { @@ -230,7 +235,7 @@ public class TrafficCounter { monitorFuture = null; resetAccounting(System.currentTimeMillis()); if (factory != null) { - factory.accounting(this); + factory.doAccounting(this); } setMonitoredChannel(null); } @@ -250,9 +255,9 @@ public class TrafficCounter { } lastReadBytes = currentReadingBytes.getAndSet(0); lastWrittenBytes = currentWritingBytes.getAndSet(0); - lastReadingThroughput = lastReadBytes / interval * 1000; + lastReadThroughput = lastReadBytes / interval * 1000; // nb byte / checkInterval in ms * 1000 (1s) - lastWritingThroughput = lastWrittenBytes / interval * 1000; + lastWriteThroughput = lastWrittenBytes / interval * 1000; // nb byte / checkInterval in ms * 1000 (1s) } } @@ -286,7 +291,7 @@ public class TrafficCounter { this.factory = factory; this.executorService = executorService; this.name = name; - this.lastCumulativeTime = System.currentTimeMillis(); + lastCumulativeTime = System.currentTimeMillis(); this.configure(channel, writeLimit, readLimit, checkInterval); } @@ -322,8 +327,8 @@ public class TrafficCounter { */ public void configure(Channel channel, long writeLimit, long readLimit) { - limitWrite = writeLimit; - limitRead = readLimit; + this.writeLimit = writeLimit; + this.readLimit = readLimit; setMonitoredChannel(channel); } @@ -338,12 +343,12 @@ public class TrafficCounter { * channel * @param writeLimit * @param readLimit - * @param delayToSet + * @param checkInterval */ public void configure(Channel channel, long writeLimit, - long readLimit, long delayToSet) { - if (checkInterval != delayToSet) { - checkInterval = delayToSet; + long readLimit, long checkInterval) { + if (this.checkInterval != checkInterval) { + this.checkInterval = checkInterval; if (monitorFuture == null) { this.configure(channel, writeLimit, readLimit); return; @@ -371,7 +376,7 @@ public class TrafficCounter { // Time is too short, so just lets continue return 0; } - long wait = currentReadingBytes.get() * 1000 / limitRead - + long wait = currentReadingBytes.get() * 1000 / readLimit - interval; return wait; } @@ -390,7 +395,7 @@ public class TrafficCounter { return 0; } long wait = currentWritingBytes.get() * 1000 / - limitWrite - interval; + writeLimit - interval; return wait; } } @@ -466,7 +471,7 @@ public class TrafficCounter { throws InterruptedException { currentReadingBytes.addAndGet(recv); cumulativeReadBytes.addAndGet(recv); - if (limitRead == 0) { + if (readLimit == 0) { // no action return; } @@ -521,7 +526,7 @@ public class TrafficCounter { void bytesWriteFlowControl(long write) throws InterruptedException { currentWritingBytes.addAndGet(write); cumulativeWrittenBytes.addAndGet(write); - if (limitWrite == 0) { + if (writeLimit == 0) { return; } // compute the number of ms to wait before continue with the channel @@ -546,7 +551,7 @@ public class TrafficCounter { * @return the current Read Throughput in byte/s */ public long getLastReadThroughput() { - return lastReadingThroughput; + return lastReadThroughput; } /** @@ -554,7 +559,7 @@ public class TrafficCounter { * @return the current Write Throughput in byte/s */ public long getLastWriteThroughput() { - return lastWritingThroughput; + return lastWriteThroughput; } /** @@ -588,18 +593,18 @@ public class TrafficCounter { } /** - * @return the lastCumulativeTime in millisecond as of System.currentTimeMillis() + * @return the lastCumulativeTime in millisecond as of System.currentTimeMillis() * when the cumulative counters were reset to 0. */ public long getLastCumulativeTime() { - return this.lastCumulativeTime; + return lastCumulativeTime; } /** * Reset both read and written cumulative bytes counters and the associated time. */ public void resetCumulativeTime() { - this.lastCumulativeTime = System.currentTimeMillis(); + lastCumulativeTime = System.currentTimeMillis(); cumulativeReadBytes.set(0); cumulativeWrittenBytes.set(0); } @@ -610,8 +615,8 @@ public class TrafficCounter { @Override public String toString() { return "Monitor " + name + " Current Speed Read: " + - (lastReadingThroughput >> 10) + " KB/s, Write: " + - (lastWritingThroughput >> 10) + " KB/s Current Read: " + + (lastReadThroughput >> 10) + " KB/s, Write: " + + (lastWriteThroughput >> 10) + " KB/s Current Read: " + (currentReadingBytes.get() >> 10) + " KB Current Write: " + (currentWritingBytes.get() >> 10) + " KB"; } diff --git a/src/main/java/org/jboss/netty/handler/traffic/TrafficCounterFactory.java b/src/main/java/org/jboss/netty/handler/traffic/TrafficCounterFactory.java index e764efaeb3..0818049f1f 100644 --- a/src/main/java/org/jboss/netty/handler/traffic/TrafficCounterFactory.java +++ b/src/main/java/org/jboss/netty/handler/traffic/TrafficCounterFactory.java @@ -38,11 +38,15 @@ import org.jboss.netty.channel.Channel; * * */ -public abstract class TrafficCounterFactory { +public class TrafficCounterFactory { + // FIXME: Use Executor instead of ExecutorService + // TODO: Read/write limit needs to be configurable on a per-channel basis. + // TODO: Implement ExternalResourceReleasable + /** * Default delay between two checks: 1s */ - public static long DEFAULT_DELAY = 1000; + public static long DEFAULT_CHECK_INTERVAL = 1000; /** * ExecutorService to associated to any TrafficCounter @@ -62,7 +66,7 @@ public abstract class TrafficCounterFactory { /** * Delay between two performance snapshots for channel */ - private long channelDelay = DEFAULT_DELAY; // default 1 s + private long channelCheckInterval = DEFAULT_CHECK_INTERVAL; // default 1 s /** * Will the TrafficCounter for Channel be active @@ -82,7 +86,7 @@ public abstract class TrafficCounterFactory { /** * Delay between two performance snapshots for global */ - private long globalDelay = DEFAULT_DELAY; // default 1 s + private long globalCheckInterval = DEFAULT_CHECK_INTERVAL; // default 1 s /** * Will the TrafficCounter for Global be active @@ -101,7 +105,10 @@ public abstract class TrafficCounterFactory { * @param counter * the TrafficCounter that computes its performance */ - protected abstract void accounting(TrafficCounter counter); + @SuppressWarnings("unused") + protected void doAccounting(TrafficCounter counter) { + // NOOP by default + } /** * @@ -109,26 +116,26 @@ public abstract class TrafficCounterFactory { * @param newChannelActive * @param newChannelWriteLimit * @param newChannelReadLimit - * @param newChannelDelay + * @param newChannelCheckInterval * @param newGlobalActive * @param newGlobalWriteLimit * @param newGlobalReadLimit - * @param newGlobalDelay + * @param newGlobalCheckInterval */ private void init(ExecutorService newexecutorService, boolean newChannelActive, long newChannelWriteLimit, - long newChannelReadLimit, long newChannelDelay, + long newChannelReadLimit, long newChannelCheckInterval, boolean newGlobalActive, long newGlobalWriteLimit, - long newGlobalReadLimit, long newGlobalDelay) { + long newGlobalReadLimit, long newGlobalCheckInterval) { executorService = newexecutorService; channelActive = newChannelActive; channelWriteLimit = newChannelWriteLimit; channelReadLimit = newChannelReadLimit; - channelDelay = newChannelDelay; + channelCheckInterval = newChannelCheckInterval; globalActive = newGlobalActive; globalWriteLimit = newGlobalWriteLimit; globalReadLimit = newGlobalReadLimit; - globalDelay = newGlobalDelay; + globalCheckInterval = newGlobalCheckInterval; } /** @@ -142,7 +149,7 @@ public abstract class TrafficCounterFactory { * NO_LIMIT or a limit in bytes/s * @param channelReadLimit * NO_LIMIT or a limit in bytes/s - * @param channelDelay + * @param channelCheckInterval * The delay between two computations of performances for * channels or NO_STAT if no stats are to be computed * @param globalActive @@ -151,17 +158,17 @@ public abstract class TrafficCounterFactory { * NO_LIMIT or a limit in bytes/s * @param globalReadLimit * NO_LIMIT or a limit in bytes/s - * @param globalDelay + * @param globalCheckInterval * The delay between two computations of performances for global * context or NO_STAT if no stats are to be computed */ public TrafficCounterFactory(ExecutorService executorService, boolean channelActive, long channelWriteLimit, - long channelReadLimit, long channelDelay, boolean globalActive, - long globalWriteLimit, long globalReadLimit, long globalDelay) { + long channelReadLimit, long channelCheckInterval, boolean globalActive, + long globalWriteLimit, long globalReadLimit, long globalCheckInterval) { init(executorService, channelActive, channelWriteLimit, - channelReadLimit, channelDelay, globalActive, globalWriteLimit, - globalReadLimit, globalDelay); + channelReadLimit, channelCheckInterval, globalActive, globalWriteLimit, + globalReadLimit, globalCheckInterval); } /** @@ -187,8 +194,8 @@ public abstract class TrafficCounterFactory { long channelReadLimit, boolean globalActive, long globalWriteLimit, long globalReadLimit) { init(executorService, channelActive, channelWriteLimit, - channelReadLimit, DEFAULT_DELAY, globalActive, - globalWriteLimit, globalReadLimit, DEFAULT_DELAY); + channelReadLimit, DEFAULT_CHECK_INTERVAL, globalActive, + globalWriteLimit, globalReadLimit, DEFAULT_CHECK_INTERVAL); } /** @@ -204,16 +211,16 @@ public abstract class TrafficCounterFactory { * NO_LIMIT or a limit in bytes/s * @param globalReadLimit * NO_LIMIT or a limit in bytes/s - * @param globalDelay + * @param globalCheckInterval * The delay between two computations of performances for global * context or NO_STAT if no stats are to be computed */ public TrafficCounterFactory(ExecutorService executorService, boolean channelActive, boolean globalActive, long globalWriteLimit, - long globalReadLimit, long globalDelay) { + long globalReadLimit, long globalCheckInterval) { init(executorService, channelActive, 0, 0, - DEFAULT_DELAY, globalActive, globalWriteLimit, globalReadLimit, - globalDelay); + DEFAULT_CHECK_INTERVAL, globalActive, globalWriteLimit, globalReadLimit, + globalCheckInterval); } /** @@ -234,8 +241,8 @@ public abstract class TrafficCounterFactory { boolean channelActive, boolean globalActive, long globalWriteLimit, long globalReadLimit) { init(executorService, channelActive, 0, 0, - DEFAULT_DELAY, globalActive, globalWriteLimit, globalReadLimit, - DEFAULT_DELAY); + DEFAULT_CHECK_INTERVAL, globalActive, globalWriteLimit, globalReadLimit, + DEFAULT_CHECK_INTERVAL); } /** @@ -251,7 +258,7 @@ public abstract class TrafficCounterFactory { public TrafficCounterFactory(ExecutorService executorService, boolean channelActive, boolean globalActive) { init(executorService, channelActive, 0, 0, - DEFAULT_DELAY, globalActive, 0, 0, DEFAULT_DELAY); + DEFAULT_CHECK_INTERVAL, globalActive, 0, 0, DEFAULT_CHECK_INTERVAL); } /** @@ -287,24 +294,24 @@ public abstract class TrafficCounterFactory { * * @param newchannelWriteLimit * @param newchannelReadLimit - * @param newchanneldelay + * @param newchannelCheckInterval * @param newglobalWriteLimit * @param newglobalReadLimit - * @param newglobaldelay + * @param newGlobalCheckInterval */ public void configure(long newchannelWriteLimit, - long newchannelReadLimit, long newchanneldelay, + long newchannelReadLimit, long newchannelCheckInterval, long newglobalWriteLimit, long newglobalReadLimit, - long newglobaldelay) { + long newGlobalCheckInterval) { channelWriteLimit = newchannelWriteLimit; channelReadLimit = newchannelReadLimit; - channelDelay = newchanneldelay; + channelCheckInterval = newchannelCheckInterval; globalWriteLimit = newglobalWriteLimit; globalReadLimit = newglobalReadLimit; - globalDelay = newglobaldelay; + globalCheckInterval = newGlobalCheckInterval; if (globalTrafficMonitor != null) { globalTrafficMonitor.configure(null, - newglobalWriteLimit, newglobalReadLimit, newglobaldelay); + newglobalWriteLimit, newglobalReadLimit, newGlobalCheckInterval); } } @@ -317,7 +324,7 @@ public abstract class TrafficCounterFactory { globalTrafficMonitor = new TrafficCounter(this, executorService, null, "GlobalPC", globalWriteLimit, globalReadLimit, - globalDelay); + globalCheckInterval); globalTrafficMonitor.start(); } } @@ -326,17 +333,17 @@ public abstract class TrafficCounterFactory { /** * @param channel - * @return the channel TrafficCounter or null if this support is - * disabled + * + * @throws UnsupportedOperationException if per-channel counter is disabled */ - public TrafficCounter createChannelTrafficCounter(Channel channel) { + public TrafficCounter newChannelTrafficCounter(Channel channel) { if (channelActive && (channelReadLimit > 0 || channelWriteLimit > 0 - || channelDelay > 0)) { + || channelCheckInterval > 0)) { return new TrafficCounter(this, executorService, channel, "ChannelPC" + channel.getId(), channelWriteLimit, - channelReadLimit, channelDelay); + channelReadLimit, channelCheckInterval); } - return null; + throw new UnsupportedOperationException("per-channel counter disabled"); } /** @@ -352,18 +359,18 @@ public abstract class TrafficCounterFactory { } /** - * @return the channelDelay + * @return the channelCheckInterval */ - public long getChannelDelay() { - return channelDelay; + public long getChannelCheckInterval() { + return channelCheckInterval; } /** - * @param channelDelay - * the channelDelay to set + * @param channelCheckInterval + * the channelCheckInterval to set */ - public void setChannelDelay(long channelDelay) { - this.channelDelay = channelDelay; + public void setChannelCheckInterval(long channelCheckInterval) { + this.channelCheckInterval = channelCheckInterval; } /** @@ -397,22 +404,22 @@ public abstract class TrafficCounterFactory { } /** - * @return the globalDelay + * @return the globalCheckInterval */ - public long getGlobalDelay() { - return globalDelay; + public long getGlobalCheckInterval() { + return globalCheckInterval; } /** - * @param globalDelay - * the globalDelay to set + * @param globalCheckInterval + * the globalCheckInterval to set */ - public void setGlobalDelay(long globalDelay) { - this.globalDelay = globalDelay; + public void setGlobalCheckInterval(long globalCheckInterval) { + this.globalCheckInterval = globalCheckInterval; if (globalTrafficMonitor != null) { globalTrafficMonitor.configure(null, globalWriteLimit, globalReadLimit, - this.globalDelay); + globalCheckInterval); } } @@ -432,7 +439,7 @@ public abstract class TrafficCounterFactory { if (globalTrafficMonitor != null) { globalTrafficMonitor.configure(null, globalWriteLimit, this.globalReadLimit, - globalDelay); + globalCheckInterval); } } @@ -452,7 +459,7 @@ public abstract class TrafficCounterFactory { if (globalTrafficMonitor != null) { globalTrafficMonitor.configure(null, this.globalWriteLimit, globalReadLimit, - globalDelay); + globalCheckInterval); } } @@ -469,5 +476,4 @@ public abstract class TrafficCounterFactory { public boolean isGlobalActive() { return globalActive; } - } diff --git a/src/main/java/org/jboss/netty/handler/traffic/TrafficShapingHandler.java b/src/main/java/org/jboss/netty/handler/traffic/TrafficShapingHandler.java index a668b84f43..32ed64a630 100644 --- a/src/main/java/org/jboss/netty/handler/traffic/TrafficShapingHandler.java +++ b/src/main/java/org/jboss/netty/handler/traffic/TrafficShapingHandler.java @@ -84,10 +84,10 @@ public class TrafficShapingHandler extends SimpleChannelHandler { public TrafficShapingHandler(TrafficCounterFactory factory) { super(); this.factory = factory; - this.globalTrafficCounter = this.factory + globalTrafficCounter = this.factory .getGlobalTrafficCounter(); - this.channelTrafficCounter = null; - this.objectSizeEstimator = new DefaultObjectSizeEstimator(); + channelTrafficCounter = null; + objectSizeEstimator = new DefaultObjectSizeEstimator(); // will be set when connected is called } /** @@ -97,95 +97,72 @@ public class TrafficShapingHandler extends SimpleChannelHandler { * the TrafficCounterFactory from which all Monitors will be * created * @param objectSizeEstimator - * the {@link ObjectSizeEstimator} that will be used to compute + * the {@link ObjectSizeEstimator} that will be used to compute * the size of the message */ public TrafficShapingHandler(TrafficCounterFactory factory, ObjectSizeEstimator objectSizeEstimator) { super(); this.factory = factory; - this.globalTrafficCounter = this.factory + globalTrafficCounter = this.factory .getGlobalTrafficCounter(); - this.channelTrafficCounter = null; + channelTrafficCounter = null; this.objectSizeEstimator = objectSizeEstimator; // will be set when connected is called } - /* - * (non-Javadoc) - * - * @see org.jboss.netty.channel.SimpleChannelHandler#messageReceived(org.jboss.netty.channel.ChannelHandlerContext, - * org.jboss.netty.channel.MessageEvent) - */ + @Override public void messageReceived(ChannelHandlerContext arg0, MessageEvent arg1) throws Exception { - long size = this.objectSizeEstimator.estimateSize(arg1.getMessage()); - if (this.channelTrafficCounter != null) { - this.channelTrafficCounter.bytesRecvFlowControl(arg0, size); + long size = objectSizeEstimator.estimateSize(arg1.getMessage()); + if (channelTrafficCounter != null) { + channelTrafficCounter.bytesRecvFlowControl(arg0, size); } - if (this.globalTrafficCounter != null) { - this.globalTrafficCounter.bytesRecvFlowControl(arg0, size); + if (globalTrafficCounter != null) { + globalTrafficCounter.bytesRecvFlowControl(arg0, size); } // The message is then just passed to the next Codec super.messageReceived(arg0, arg1); } - /* - * (non-Javadoc) - * - * @see org.jboss.netty.channel.SimpleChannelHandler#writeRequested(org.jboss.netty.channel.ChannelHandlerContext, - * org.jboss.netty.channel.MessageEvent) - */ @Override public void writeRequested(ChannelHandlerContext arg0, MessageEvent arg1) throws Exception { - long size = this.objectSizeEstimator.estimateSize(arg1.getMessage()); - if (this.channelTrafficCounter != null) { - this.channelTrafficCounter.bytesWriteFlowControl(size); + long size = objectSizeEstimator.estimateSize(arg1.getMessage()); + if (channelTrafficCounter != null) { + channelTrafficCounter.bytesWriteFlowControl(size); } - if (this.globalTrafficCounter != null) { - this.globalTrafficCounter.bytesWriteFlowControl(size); + if (globalTrafficCounter != null) { + globalTrafficCounter.bytesWriteFlowControl(size); } // The message is then just passed to the next Codec super.writeRequested(arg0, arg1); } - /* - * (non-Javadoc) - * - * @see org.jboss.netty.channel.SimpleChannelHandler#channelClosed(org.jboss.netty.channel.ChannelHandlerContext, - * org.jboss.netty.channel.ChannelStateEvent) - */ @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - if (this.channelTrafficCounter != null) { - this.channelTrafficCounter.stop(); - this.channelTrafficCounter = null; + if (channelTrafficCounter != null) { + channelTrafficCounter.stop(); + channelTrafficCounter = null; } super.channelClosed(ctx, e); } - /* - * (non-Javadoc) - * - * @see org.jboss.netty.channel.SimpleChannelHandler#channelConnected(org.jboss.netty.channel.ChannelHandlerContext, - * org.jboss.netty.channel.ChannelStateEvent) - */ @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { // readSuspended = true; ctx.setAttachment(Boolean.TRUE); ctx.getChannel().setReadable(false); - if ((this.channelTrafficCounter == null) && (this.factory != null)) { + if (channelTrafficCounter == null && factory != null) { // A factory was used - this.channelTrafficCounter = this.factory - .createChannelTrafficCounter(ctx.getChannel()); + channelTrafficCounter = + factory.newChannelTrafficCounter(ctx.getChannel()); } - if (this.channelTrafficCounter != null) { - this.channelTrafficCounter + if (channelTrafficCounter != null) { + channelTrafficCounter .setMonitoredChannel(ctx.getChannel()); - this.channelTrafficCounter.start(); + channelTrafficCounter.start(); } super.channelConnected(ctx, e); // readSuspended = false; @@ -221,7 +198,7 @@ public class TrafficShapingHandler extends SimpleChannelHandler { * in the Factory */ public TrafficCounter getChannelTrafficCounter() { - return this.channelTrafficCounter; + return channelTrafficCounter; } /** @@ -230,7 +207,7 @@ public class TrafficShapingHandler extends SimpleChannelHandler { * function was disabled in the Factory */ public TrafficCounter getGlobalTrafficCounter() { - return this.globalTrafficCounter; + return globalTrafficCounter; } }