From eb80f86204b4c4723c78fe1fb9455de98fb6ca54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Br=C3=A9gier?= Date: Sat, 21 Mar 2009 19:07:55 +0000 Subject: [PATCH] --- .../TrafficCounter.java} | 36 ++--- .../TrafficCounterFactory.java} | 150 ++++++++---------- .../TrafficShapingHandler.java | 137 ++++++++-------- .../package-info.java | 48 +++--- 4 files changed, 176 insertions(+), 195 deletions(-) rename src/main/java/org/jboss/netty/handler/{trafficshaping/PerformanceCounter.java => traffic/TrafficCounter.java} (93%) rename src/main/java/org/jboss/netty/handler/{trafficshaping/PerformanceCounterFactory.java => traffic/TrafficCounterFactory.java} (70%) rename src/main/java/org/jboss/netty/handler/{trafficshaping => traffic}/TrafficShapingHandler.java (59%) rename src/main/java/org/jboss/netty/handler/{trafficshaping => traffic}/package-info.java (62%) diff --git a/src/main/java/org/jboss/netty/handler/trafficshaping/PerformanceCounter.java b/src/main/java/org/jboss/netty/handler/traffic/TrafficCounter.java similarity index 93% rename from src/main/java/org/jboss/netty/handler/trafficshaping/PerformanceCounter.java rename to src/main/java/org/jboss/netty/handler/traffic/TrafficCounter.java index 656478b681..92c0abbf0f 100644 --- a/src/main/java/org/jboss/netty/handler/trafficshaping/PerformanceCounter.java +++ b/src/main/java/org/jboss/netty/handler/traffic/TrafficCounter.java @@ -20,7 +20,7 @@ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA * 02110-1301 USA, or see the FSF site: http://www.fsf.org. */ -package org.jboss.netty.handler.trafficshaping; +package org.jboss.netty.handler.traffic; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -36,20 +36,20 @@ import org.jboss.netty.logging.InternalLoggerFactory; * @author Frederic Bregier (fredbregier@free.fr) * @version $Rev$, $Date$ * - * PerformanceCounter is associated with {@link TrafficShapingHandler} and - * should be created through a {@link PerformanceCounterFactory}.
+ * TrafficCounter is associated with {@link TrafficShapingHandler} and + * should be created through a {@link TrafficCounterFactory}.
*
- * A PerformanceCounter can limit the traffic or not, globally or per channel, + * A TrafficCounter can limit the traffic or not, globally or per channel, * and always compute statistics on read and written bytes at the specified * interval. * */ -public class PerformanceCounter implements Runnable { +public class TrafficCounter implements Runnable { /** * Internal logger */ private static InternalLogger logger = InternalLoggerFactory - .getInstance(PerformanceCounter.class); + .getInstance(TrafficCounter.class); /** * Current writing bytes @@ -89,17 +89,17 @@ public class PerformanceCounter implements Runnable { /** * Current Limit in B/s to apply to write */ - private long limitWrite = PerformanceCounterFactory.NO_LIMIT; + private long limitWrite = 0; /** * Current Limit in B/s to apply to read */ - private long limitRead = PerformanceCounterFactory.NO_LIMIT; + private long limitRead = 0; /** * Delay between two capture */ - private long delay = PerformanceCounterFactory.DEFAULT_DELAY; + private long delay = TrafficCounterFactory.DEFAULT_DELAY; // default 1 s @@ -119,9 +119,9 @@ public class PerformanceCounter implements Runnable { protected Channel monitoredChannel = null; /** - * The associated PerformanceCounterFactory + * The associated TrafficCounterFactory */ - private PerformanceCounterFactory factory = null; + private TrafficCounterFactory factory = null; /** * Default ExecutorService @@ -177,7 +177,7 @@ public class PerformanceCounter implements Runnable { if (this.delay > 0) { Thread.sleep(this.delay); } else { - // Delay goes to PerformanceCounterFactory.NO_STAT, so exit + // Delay goes to TrafficCounterFactory.NO_STAT, so exit return; } long endTime = System.currentTimeMillis(); @@ -218,7 +218,7 @@ public class PerformanceCounter implements Runnable { * computations in ms * * @param factory - * the associated PerformanceCounterFactory + * the associated TrafficCounterFactory * @param executorService * Should be a CachedThreadPool for efficiency * @param channel @@ -235,7 +235,7 @@ public class PerformanceCounter implements Runnable { * @param delay * the delay in ms between two computations */ - public PerformanceCounter(PerformanceCounterFactory factory, + public TrafficCounter(TrafficCounterFactory factory, ExecutorService executorService, Channel channel, String name, long writeLimit, long readLimit, long delay) { this.factory = factory; @@ -362,7 +362,7 @@ public class PerformanceCounter implements Runnable { /** * Monitor */ - private PerformanceCounter monitor = null; + private TrafficCounter monitor = null; /** * Time to wait before clearing the channel @@ -376,7 +376,7 @@ public class PerformanceCounter implements Runnable { * @param timeToWait */ public ReopenRead(ChannelHandlerContext ctx, - PerformanceCounter monitor, long timeToWait) { + TrafficCounter monitor, long timeToWait) { this.ctx = ctx; this.monitor = monitor; this.timeToWait = timeToWait; @@ -419,7 +419,7 @@ public class PerformanceCounter implements Runnable { public void setReceivedBytes(ChannelHandlerContext ctx, long recv) throws InterruptedException { this.currentReadingBytes.addAndGet(recv); - if (this.limitRead == PerformanceCounterFactory.NO_LIMIT) { + if (this.limitRead == 0) { // no action return; } @@ -473,7 +473,7 @@ public class PerformanceCounter implements Runnable { */ public void setToWriteBytes(long write) throws InterruptedException { this.currentWritingBytes.addAndGet(write); - if (this.limitWrite == PerformanceCounterFactory.NO_LIMIT) { + if (this.limitWrite == 0) { return; } // compute the number of ms to wait before continue with the channel diff --git a/src/main/java/org/jboss/netty/handler/trafficshaping/PerformanceCounterFactory.java b/src/main/java/org/jboss/netty/handler/traffic/TrafficCounterFactory.java similarity index 70% rename from src/main/java/org/jboss/netty/handler/trafficshaping/PerformanceCounterFactory.java rename to src/main/java/org/jboss/netty/handler/traffic/TrafficCounterFactory.java index a34c6cd121..277f5a4a62 100644 --- a/src/main/java/org/jboss/netty/handler/trafficshaping/PerformanceCounterFactory.java +++ b/src/main/java/org/jboss/netty/handler/traffic/TrafficCounterFactory.java @@ -20,7 +20,7 @@ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA * 02110-1301 USA, or see the FSF site: http://www.fsf.org. */ -package org.jboss.netty.handler.trafficshaping; +package org.jboss.netty.handler.traffic; import java.util.concurrent.ExecutorService; @@ -31,45 +31,33 @@ import org.jboss.netty.channel.Channel; * @author Frederic Bregier (fredbregier@free.fr) * @version $Rev$, $Date$ * - * The {@link PerformanceCounterFactory} is a Factory for - * {@link PerformanceCounter}. It stores the necessary information to enable - * dynamic creation of {@link PerformanceCounter} inside the + * The {@link TrafficCounterFactory} is a Factory for + * {@link TrafficCounter}. It stores the necessary information to enable + * dynamic creation of {@link TrafficCounter} inside the * {@link TrafficShapingHandler}. * * */ -public abstract class PerformanceCounterFactory { - /** - * No limit - */ - public static long NO_LIMIT = -1; - - /** - * No statistics (if channel or global PerformanceCounter is a very long - * time living, it can produce a excess of capacity, i.e. 2^64 bytes so 17 - * billions of billions of bytes). - */ - public static long NO_STAT = -1; - +public abstract class TrafficCounterFactory { /** * Default delay between two checks: 1s */ public static long DEFAULT_DELAY = 1000; /** - * ExecutorService to associated to any PerformanceCounter + * ExecutorService to associated to any TrafficCounter */ private ExecutorService executorService = null; /** - * Limit in B/s to apply to write for all channel PerformanceCounter + * Limit in B/s to apply to write for all channel TrafficCounter */ - private long channelLimitWrite = NO_LIMIT; + private long channelLimitWrite = 0; /** - * Limit in B/s to apply to read for all channel PerformanceCounter + * Limit in B/s to apply to read for all channel TrafficCounter */ - private long channelLimitRead = NO_LIMIT; + private long channelLimitRead = 0; /** * Delay between two performance snapshots for channel @@ -77,19 +65,19 @@ public abstract class PerformanceCounterFactory { private long channelDelay = DEFAULT_DELAY; // default 1 s /** - * Will the PerformanceCounter for Channel be active + * Will the TrafficCounter for Channel be active */ private boolean channelActive = true; /** - * Limit in B/s to apply to write for the global PerformanceCounter + * Limit in B/s to apply to write for the global TrafficCounter */ - private long globalLimitWrite = NO_LIMIT; + private long globalLimitWrite = 0; /** - * Limit in B/s to apply to read for the global PerformanceCounter + * Limit in B/s to apply to read for the global TrafficCounter */ - private long globalLimitRead = NO_LIMIT; + private long globalLimitRead = 0; /** * Delay between two performance snapshots for global @@ -97,23 +85,23 @@ public abstract class PerformanceCounterFactory { private long globalDelay = DEFAULT_DELAY; // default 1 s /** - * Will the PerformanceCounter for Global be active + * Will the TrafficCounter for Global be active */ private boolean globalActive = true; /** * Global Monitor */ - private PerformanceCounter globalPerformanceMonitor = null; + private TrafficCounter globalTrafficMonitor = null; /** - * Called each time the accounting is computed for the PerformanceCounters. + * Called each time the accounting is computed for the TrafficCounters. * This method could be used for instance to implement real time accounting. * * @param counter - * the PerformanceCounter that computes its performance + * the TrafficCounter that computes its performance */ - protected abstract void accounting(PerformanceCounter counter); + protected abstract void accounting(TrafficCounter counter); /** * @@ -149,7 +137,7 @@ public abstract class PerformanceCounterFactory { * @param executorService * created for instance like Executors.newCachedThreadPool * @param channelActive - * True if each channel will have a PerformanceCounter + * True if each channel will have a TrafficCounter * @param channelLimitWrite * NO_LIMIT or a limit in bytes/s * @param channelLimitRead @@ -158,7 +146,7 @@ public abstract class PerformanceCounterFactory { * The delay between two computations of performances for * channels or NO_STAT if no stats are to be computed * @param globalActive - * True if global context will have one unique PerformanceCounter + * True if global context will have one unique TrafficCounter * @param globalLimitWrite * NO_LIMIT or a limit in bytes/s * @param globalLimitRead @@ -167,7 +155,7 @@ public abstract class PerformanceCounterFactory { * The delay between two computations of performances for global * context or NO_STAT if no stats are to be computed */ - public PerformanceCounterFactory(ExecutorService executorService, + public TrafficCounterFactory(ExecutorService executorService, boolean channelActive, long channelLimitWrite, long channelLimitRead, long channelDelay, boolean globalActive, long globalLimitWrite, long globalLimitRead, long globalDelay) { @@ -182,19 +170,19 @@ public abstract class PerformanceCounterFactory { * @param executorService * created for instance like Executors.newCachedThreadPool * @param channelActive - * True if each channel will have a PerformanceCounter + * True if each channel will have a TrafficCounter * @param channelLimitWrite * NO_LIMIT or a limit in bytes/s * @param channelLimitRead * NO_LIMIT or a limit in bytes/s * @param globalActive - * True if global context will have one unique PerformanceCounter + * True if global context will have one unique TrafficCounter * @param globalLimitWrite * NO_LIMIT or a limit in bytes/s * @param globalLimitRead * NO_LIMIT or a limit in bytes/s */ - public PerformanceCounterFactory(ExecutorService executorService, + public TrafficCounterFactory(ExecutorService executorService, boolean channelActive, long channelLimitWrite, long channelLimitRead, boolean globalActive, long globalLimitWrite, long globalLimitRead) { @@ -209,9 +197,9 @@ public abstract class PerformanceCounterFactory { * @param executorService * created for instance like Executors.newCachedThreadPool * @param channelActive - * True if each channel will have a PerformanceCounter + * True if each channel will have a TrafficCounter * @param globalActive - * True if global context will have one unique PerformanceCounter + * True if global context will have one unique TrafficCounter * @param globalLimitWrite * NO_LIMIT or a limit in bytes/s * @param globalLimitRead @@ -220,10 +208,10 @@ public abstract class PerformanceCounterFactory { * The delay between two computations of performances for global * context or NO_STAT if no stats are to be computed */ - public PerformanceCounterFactory(ExecutorService executorService, + public TrafficCounterFactory(ExecutorService executorService, boolean channelActive, boolean globalActive, long globalLimitWrite, long globalLimitRead, long globalDelay) { - init(executorService, channelActive, NO_LIMIT, NO_LIMIT, + init(executorService, channelActive, 0, 0, DEFAULT_DELAY, globalActive, globalLimitWrite, globalLimitRead, globalDelay); } @@ -234,18 +222,18 @@ public abstract class PerformanceCounterFactory { * @param executorService * created for instance like Executors.newCachedThreadPool * @param channelActive - * True if each channel will have a PerformanceCounter + * True if each channel will have a TrafficCounter * @param globalActive - * True if global context will have one unique PerformanceCounter + * True if global context will have one unique TrafficCounter * @param globalLimitWrite * NO_LIMIT or a limit in bytes/s * @param globalLimitRead * NO_LIMIT or a limit in bytes/s */ - public PerformanceCounterFactory(ExecutorService executorService, + public TrafficCounterFactory(ExecutorService executorService, boolean channelActive, boolean globalActive, long globalLimitWrite, long globalLimitRead) { - init(executorService, channelActive, NO_LIMIT, NO_LIMIT, + init(executorService, channelActive, 0, 0, DEFAULT_DELAY, globalActive, globalLimitWrite, globalLimitRead, DEFAULT_DELAY); } @@ -256,18 +244,18 @@ public abstract class PerformanceCounterFactory { * @param executorService * created for instance like Executors.newCachedThreadPool * @param channelActive - * True if each channel will have a PerformanceCounter + * True if each channel will have a TrafficCounter * @param globalActive - * True if global context will have one unique PerformanceCounter + * True if global context will have one unique TrafficCounter */ - public PerformanceCounterFactory(ExecutorService executorService, + public TrafficCounterFactory(ExecutorService executorService, boolean channelActive, boolean globalActive) { - init(executorService, channelActive, NO_LIMIT, NO_LIMIT, - DEFAULT_DELAY, globalActive, NO_LIMIT, NO_LIMIT, DEFAULT_DELAY); + init(executorService, channelActive, 0, 0, + DEFAULT_DELAY, globalActive, 0, 0, DEFAULT_DELAY); } /** - * Enable to change the active status of PerformanceCounter on Channels (for + * Enable to change the active status of TrafficCounter on Channels (for * new one only) * * @param active @@ -277,7 +265,7 @@ public abstract class PerformanceCounterFactory { } /** - * Enable to change the active status of PerformanceCounter on Global (stop + * Enable to change the active status of TrafficCounter on Global (stop * or start if necessary) * * @param active @@ -285,16 +273,16 @@ public abstract class PerformanceCounterFactory { public void setGlobalActive(boolean active) { if (this.globalActive) { if (!active) { - stopGlobalPerformanceCounter(); + stopGlobalTrafficCounter(); } } this.globalActive = active; - getGlobalPerformanceCounter(); + getGlobalTrafficCounter(); } /** - * Change the underlying limitations. Only Global PerformanceCounter (if - * any) is dynamically changed, but Channels PerformanceCounters are not + * Change the underlying limitations. Only Global TrafficCounter (if + * any) is dynamically changed, but Channels TrafficCounters are not * changed, only new created ones. * * @param newchannelLimitWrite @@ -314,37 +302,37 @@ public abstract class PerformanceCounterFactory { this.globalLimitWrite = newglobalLimitWrite; this.globalLimitRead = newglobalLimitRead; this.globalDelay = newglobaldelay; - if (this.globalPerformanceMonitor != null) { - this.globalPerformanceMonitor.changeConfiguration(null, + if (this.globalTrafficMonitor != null) { + this.globalTrafficMonitor.changeConfiguration(null, newglobalLimitWrite, newglobalLimitRead, newglobaldelay); } } /** - * @return the Global PerformanceCounter or null if this support is disabled + * @return the Global TrafficCounter or null if this support is disabled */ - public PerformanceCounter getGlobalPerformanceCounter() { + public TrafficCounter getGlobalTrafficCounter() { if (this.globalActive) { - if (this.globalPerformanceMonitor == null) { - this.globalPerformanceMonitor = new PerformanceCounter(this, + if (this.globalTrafficMonitor == null) { + this.globalTrafficMonitor = new TrafficCounter(this, this.executorService, null, "GlobalPC", this.globalLimitWrite, this.globalLimitRead, this.globalDelay); - this.globalPerformanceMonitor.startMonitoring(); + this.globalTrafficMonitor.startMonitoring(); } } - return this.globalPerformanceMonitor; + return this.globalTrafficMonitor; } /** * @param channel - * @return the channel PerformanceCounter or null if this support is + * @return the channel TrafficCounter or null if this support is * disabled */ - public PerformanceCounter createChannelPerformanceCounter(Channel channel) { - if (this.channelActive && ((this.channelLimitRead > NO_LIMIT) || (this.channelLimitWrite > NO_LIMIT) - || (this.channelDelay > NO_STAT))) { - return new PerformanceCounter(this, this.executorService, channel, + public TrafficCounter createChannelTrafficCounter(Channel channel) { + if (this.channelActive && ((this.channelLimitRead > 0) || (this.channelLimitWrite > 0) + || (this.channelDelay > 0))) { + return new TrafficCounter(this, this.executorService, channel, "ChannelPC" + channel.getId(), this.channelLimitWrite, this.channelLimitRead, this.channelDelay); } @@ -352,14 +340,14 @@ public abstract class PerformanceCounterFactory { } /** - * Stop the global performance counter if any (Even it is stopped, the + * Stop the global TrafficCounter if any (Even it is stopped, the * factory can however be reused) * */ - public void stopGlobalPerformanceCounter() { - if (this.globalPerformanceMonitor != null) { - this.globalPerformanceMonitor.stopMonitoring(); - this.globalPerformanceMonitor = null; + public void stopGlobalTrafficCounter() { + if (this.globalTrafficMonitor != null) { + this.globalTrafficMonitor.stopMonitoring(); + this.globalTrafficMonitor = null; } } @@ -421,8 +409,8 @@ public abstract class PerformanceCounterFactory { */ public void setGlobalDelay(long globalDelay) { this.globalDelay = globalDelay; - if (this.globalPerformanceMonitor != null) { - this.globalPerformanceMonitor.changeConfiguration(null, + if (this.globalTrafficMonitor != null) { + this.globalTrafficMonitor.changeConfiguration(null, this.globalLimitWrite, this.globalLimitRead, this.globalDelay); } @@ -441,8 +429,8 @@ public abstract class PerformanceCounterFactory { */ public void setGlobalLimitRead(long globalLimitRead) { this.globalLimitRead = globalLimitRead; - if (this.globalPerformanceMonitor != null) { - this.globalPerformanceMonitor.changeConfiguration(null, + if (this.globalTrafficMonitor != null) { + this.globalTrafficMonitor.changeConfiguration(null, this.globalLimitWrite, this.globalLimitRead, this.globalDelay); } @@ -461,8 +449,8 @@ public abstract class PerformanceCounterFactory { */ public void setGlobalLimitWrite(long globalLimitWrite) { this.globalLimitWrite = globalLimitWrite; - if (this.globalPerformanceMonitor != null) { - this.globalPerformanceMonitor.changeConfiguration(null, + if (this.globalTrafficMonitor != null) { + this.globalTrafficMonitor.changeConfiguration(null, this.globalLimitWrite, this.globalLimitRead, this.globalDelay); } diff --git a/src/main/java/org/jboss/netty/handler/trafficshaping/TrafficShapingHandler.java b/src/main/java/org/jboss/netty/handler/traffic/TrafficShapingHandler.java similarity index 59% rename from src/main/java/org/jboss/netty/handler/trafficshaping/TrafficShapingHandler.java rename to src/main/java/org/jboss/netty/handler/traffic/TrafficShapingHandler.java index 6b27ff1bc9..bd05297a6c 100644 --- a/src/main/java/org/jboss/netty/handler/trafficshaping/TrafficShapingHandler.java +++ b/src/main/java/org/jboss/netty/handler/traffic/TrafficShapingHandler.java @@ -20,11 +20,8 @@ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA * 02110-1301 USA, or see the FSF site: http://www.fsf.org. */ -package org.jboss.netty.handler.trafficshaping; +package org.jboss.netty.handler.traffic; -import java.io.InvalidClassException; - -import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelHandlerContext; @@ -33,6 +30,8 @@ import org.jboss.netty.channel.ChannelState; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; +import org.jboss.netty.handler.execution.DefaultObjectSizeEstimator; +import org.jboss.netty.handler.execution.ObjectSizeEstimator; /** * @author The Netty Project (netty-dev@lists.jboss.org) @@ -43,16 +42,15 @@ import org.jboss.netty.channel.SimpleChannelHandler; * bandwidth, as traffic shaping.
*
* - * The method getMessageSize(MessageEvent) has to be implemented to specify what + * An {@link ObjectSizeEstimator} can be passed at construction to specify what * is the size of the object to be read or write accordingly to the type of - * object. In simple case, it can be as simple as a call to - * getChannelBufferMessageSize(MessageEvent).
+ * object. If not specified, it will used the {@link DefaultObjectSizeEstimator} implementation.
*
* - * TrafficShapingHandler depends on {@link PerformanceCounterFactory} to create - * or use the necessary {@link PerformanceCounter} with the necessary options. + * TrafficShapingHandler depends on {@link TrafficCounterFactory} to create + * or use the necessary {@link TrafficCounter} with the necessary options. * However, you can change the behavior of both global and channel - * PerformanceCounter if you like by getting them from this handler and changing + * TrafficCounter if you like by getting them from this handler and changing * their status. * */ @@ -61,67 +59,56 @@ public abstract class TrafficShapingHandler extends SimpleChannelHandler { /** * Channel Monitor */ - private PerformanceCounter channelPerformanceCounter = null; + private TrafficCounter channelTrafficCounter = null; /** * Global Monitor */ - private PerformanceCounter globalPerformanceCounter = null; + private TrafficCounter globalTrafficCounter = null; /** * Factory if used */ - private PerformanceCounterFactory factory = null; - + private TrafficCounterFactory factory = null; /** - * Constructor + * ObjectSizeEstimator + */ + private ObjectSizeEstimator objectSizeEstimator = null; + /** + * Constructor using default {@link ObjectSizeEstimator} * * @param factory - * the PerformanceCounterFactory from which all Monitors will be + * the TrafficCounterFactory from which all Monitors will be * created */ - public TrafficShapingHandler(PerformanceCounterFactory factory) { + public TrafficShapingHandler(TrafficCounterFactory factory) { super(); this.factory = factory; - this.globalPerformanceCounter = this.factory - .getGlobalPerformanceCounter(); - this.channelPerformanceCounter = null; + this.globalTrafficCounter = this.factory + .getGlobalTrafficCounter(); + this.channelTrafficCounter = null; + this.objectSizeEstimator = new DefaultObjectSizeEstimator(); // will be set when connected is called } - /** - * This method has to be implemented. It returns the size in bytes of the - * message to be read or written. + * Constructor using the specified ObjectSizeEstimator * - * @param arg1 - * the MessageEvent to be read or written - * @return the size in bytes of the given MessageEvent - * @exception Exception - * An exception can be thrown if the object is not of the - * expected type + * @param factory + * the TrafficCounterFactory from which all Monitors will be + * created + * @param objectSizeEstimator + * the {@link ObjectSizeEstimator} that will be used to compute + * the size of the message */ - protected abstract long getMessageSize(MessageEvent arg1) throws Exception; - - /** - * Example of function (which can be used) for the ChannelBuffer - * - * @param arg1 - * @return the size in bytes of the given MessageEvent - * @throws Exception - */ - protected long getChannelBufferMessageSize(MessageEvent arg1) - throws Exception { - Object o = arg1.getMessage(); - if (!(o instanceof ChannelBuffer)) { - // Type unimplemented - throw new InvalidClassException("Wrong object received in " + - this.getClass().getName() + " codec " + - o.getClass().getName()); - } - ChannelBuffer dataBlock = (ChannelBuffer) o; - return dataBlock.readableBytes(); + public TrafficShapingHandler(TrafficCounterFactory factory, ObjectSizeEstimator objectSizeEstimator) { + super(); + this.factory = factory; + this.globalTrafficCounter = this.factory + .getGlobalTrafficCounter(); + this.channelTrafficCounter = null; + this.objectSizeEstimator = objectSizeEstimator; + // will be set when connected is called } - /* * (non-Javadoc) * @@ -131,12 +118,12 @@ public abstract class TrafficShapingHandler extends SimpleChannelHandler { @Override public void messageReceived(ChannelHandlerContext arg0, MessageEvent arg1) throws Exception { - long size = getMessageSize(arg1); - if (this.channelPerformanceCounter != null) { - this.channelPerformanceCounter.setReceivedBytes(arg0, size); + long size = this.objectSizeEstimator.estimateSize(arg1.getMessage()); + if (this.channelTrafficCounter != null) { + this.channelTrafficCounter.setReceivedBytes(arg0, size); } - if (this.globalPerformanceCounter != null) { - this.globalPerformanceCounter.setReceivedBytes(arg0, size); + if (this.globalTrafficCounter != null) { + this.globalTrafficCounter.setReceivedBytes(arg0, size); } // The message is then just passed to the next Codec super.messageReceived(arg0, arg1); @@ -151,12 +138,12 @@ public abstract class TrafficShapingHandler extends SimpleChannelHandler { @Override public void writeRequested(ChannelHandlerContext arg0, MessageEvent arg1) throws Exception { - long size = getMessageSize(arg1); - if (this.channelPerformanceCounter != null) { - this.channelPerformanceCounter.setToWriteBytes(size); + long size = this.objectSizeEstimator.estimateSize(arg1.getMessage()); + if (this.channelTrafficCounter != null) { + this.channelTrafficCounter.setToWriteBytes(size); } - if (this.globalPerformanceCounter != null) { - this.globalPerformanceCounter.setToWriteBytes(size); + if (this.globalTrafficCounter != null) { + this.globalTrafficCounter.setToWriteBytes(size); } // The message is then just passed to the next Codec super.writeRequested(arg0, arg1); @@ -171,9 +158,9 @@ public abstract class TrafficShapingHandler extends SimpleChannelHandler { @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - if (this.channelPerformanceCounter != null) { - this.channelPerformanceCounter.stopMonitoring(); - this.channelPerformanceCounter = null; + if (this.channelTrafficCounter != null) { + this.channelTrafficCounter.stopMonitoring(); + this.channelTrafficCounter = null; } super.channelClosed(ctx, e); } @@ -190,15 +177,15 @@ public abstract class TrafficShapingHandler extends SimpleChannelHandler { // readSuspended = true; ctx.setAttachment(Boolean.TRUE); ctx.getChannel().setReadable(false); - if ((this.channelPerformanceCounter == null) && (this.factory != null)) { + if ((this.channelTrafficCounter == null) && (this.factory != null)) { // A factory was used - this.channelPerformanceCounter = this.factory - .createChannelPerformanceCounter(ctx.getChannel()); + this.channelTrafficCounter = this.factory + .createChannelTrafficCounter(ctx.getChannel()); } - if (this.channelPerformanceCounter != null) { - this.channelPerformanceCounter + if (this.channelTrafficCounter != null) { + this.channelTrafficCounter .setMonitoredChannel(ctx.getChannel()); - this.channelPerformanceCounter.startMonitoring(); + this.channelTrafficCounter.startMonitoring(); } super.channelConnected(ctx, e); // readSuspended = false; @@ -229,21 +216,21 @@ public abstract class TrafficShapingHandler extends SimpleChannelHandler { /** * - * @return the current ChannelPerformanceCounter set from the factory (if + * @return the current Channel TrafficCounter set from the factory (if * channel is still connected) or null if this function was disabled * in the Factory */ - public PerformanceCounter getChannelPerformanceCounter() { - return this.channelPerformanceCounter; + public TrafficCounter getChannelTrafficCounter() { + return this.channelTrafficCounter; } /** * - * @return the GlobalPerformanceCounter from the factory or null if this + * @return the Global TrafficCounter from the factory or null if this * function was disabled in the Factory */ - public PerformanceCounter getGlobalPerformanceCounter() { - return this.globalPerformanceCounter; + public TrafficCounter getGlobalTrafficCounter() { + return this.globalTrafficCounter; } } diff --git a/src/main/java/org/jboss/netty/handler/trafficshaping/package-info.java b/src/main/java/org/jboss/netty/handler/traffic/package-info.java similarity index 62% rename from src/main/java/org/jboss/netty/handler/trafficshaping/package-info.java rename to src/main/java/org/jboss/netty/handler/traffic/package-info.java index 5cc5619df5..ac0f06cdcf 100644 --- a/src/main/java/org/jboss/netty/handler/trafficshaping/package-info.java +++ b/src/main/java/org/jboss/netty/handler/traffic/package-info.java @@ -32,17 +32,18 @@ * *

Three classes implement this behavior:
*

* *

Standard use could be as follow:

@@ -62,26 +64,26 @@ *



* - *

So in your application you will create your own PerformanceCounterFactory and setting the values to fit your needs.

- * MyPerformanceCounterFactory myFactory = new MyPerformanceCounter(...);


+ *

So in your application you will create your own TrafficCounterFactory and setting the values to fit your needs.

+ * MyTrafficCounterFactory myFactory = new MyTrafficCounter(...);


*

Then you can create your pipeline (using a PipelineFactory since each TrafficShapingHandler must be unique by channel) and adding this handler before * your MemoryAwareThreadPoolExecutor:

* pipeline.addLast("MyTrafficShaping",new MyTrafficShapingHandler(myFactory));
@@ -89,11 +91,15 @@ * pipeline.addLast("MemoryExecutor",new ExecutionHandler(memoryAwareThreadPoolExecutor));

* *

TrafficShapingHandler must be unique by channel but however it is still global due to - * the PerformanceCounterFactcory that is shared between all handlers across the channels.

+ * the TrafficCounterFactcory that is shared between all handlers across the channels.

* * * * @apiviz.exclude ^java\.lang\. */ -package org.jboss.netty.handler.trafficshaping; +package org.jboss.netty.handler.traffic; +import org.jboss.netty.handler.execution.DefaultObjectSizeEstimator; +import org.jboss.netty.handler.execution.ObjectSizeEstimator; +import org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor; +import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;