This commit is contained in:
Frédéric Brégier 2009-03-21 19:07:55 +00:00
parent b3f9f91f6a
commit eb80f86204
4 changed files with 176 additions and 195 deletions

View File

@ -20,7 +20,7 @@
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.handler.trafficshaping;
package org.jboss.netty.handler.traffic;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@ -36,20 +36,20 @@ import org.jboss.netty.logging.InternalLoggerFactory;
* @author Frederic Bregier (fredbregier@free.fr)
* @version $Rev$, $Date$
*
* PerformanceCounter is associated with {@link TrafficShapingHandler} and
* should be created through a {@link PerformanceCounterFactory}.<br>
* TrafficCounter is associated with {@link TrafficShapingHandler} and
* should be created through a {@link TrafficCounterFactory}.<br>
* <br>
* A PerformanceCounter can limit the traffic or not, globally or per channel,
* A TrafficCounter can limit the traffic or not, globally or per channel,
* and always compute statistics on read and written bytes at the specified
* interval.
*
*/
public class PerformanceCounter implements Runnable {
public class TrafficCounter implements Runnable {
/**
* Internal logger
*/
private static InternalLogger logger = InternalLoggerFactory
.getInstance(PerformanceCounter.class);
.getInstance(TrafficCounter.class);
/**
* Current writing bytes
@ -89,17 +89,17 @@ public class PerformanceCounter implements Runnable {
/**
* Current Limit in B/s to apply to write
*/
private long limitWrite = PerformanceCounterFactory.NO_LIMIT;
private long limitWrite = 0;
/**
* Current Limit in B/s to apply to read
*/
private long limitRead = PerformanceCounterFactory.NO_LIMIT;
private long limitRead = 0;
/**
* Delay between two capture
*/
private long delay = PerformanceCounterFactory.DEFAULT_DELAY;
private long delay = TrafficCounterFactory.DEFAULT_DELAY;
// default 1 s
@ -119,9 +119,9 @@ public class PerformanceCounter implements Runnable {
protected Channel monitoredChannel = null;
/**
* The associated PerformanceCounterFactory
* The associated TrafficCounterFactory
*/
private PerformanceCounterFactory factory = null;
private TrafficCounterFactory factory = null;
/**
* Default ExecutorService
@ -177,7 +177,7 @@ public class PerformanceCounter implements Runnable {
if (this.delay > 0) {
Thread.sleep(this.delay);
} else {
// Delay goes to PerformanceCounterFactory.NO_STAT, so exit
// Delay goes to TrafficCounterFactory.NO_STAT, so exit
return;
}
long endTime = System.currentTimeMillis();
@ -218,7 +218,7 @@ public class PerformanceCounter implements Runnable {
* computations in ms
*
* @param factory
* the associated PerformanceCounterFactory
* the associated TrafficCounterFactory
* @param executorService
* Should be a CachedThreadPool for efficiency
* @param channel
@ -235,7 +235,7 @@ public class PerformanceCounter implements Runnable {
* @param delay
* the delay in ms between two computations
*/
public PerformanceCounter(PerformanceCounterFactory factory,
public TrafficCounter(TrafficCounterFactory factory,
ExecutorService executorService, Channel channel, String name,
long writeLimit, long readLimit, long delay) {
this.factory = factory;
@ -362,7 +362,7 @@ public class PerformanceCounter implements Runnable {
/**
* Monitor
*/
private PerformanceCounter monitor = null;
private TrafficCounter monitor = null;
/**
* Time to wait before clearing the channel
@ -376,7 +376,7 @@ public class PerformanceCounter implements Runnable {
* @param timeToWait
*/
public ReopenRead(ChannelHandlerContext ctx,
PerformanceCounter monitor, long timeToWait) {
TrafficCounter monitor, long timeToWait) {
this.ctx = ctx;
this.monitor = monitor;
this.timeToWait = timeToWait;
@ -419,7 +419,7 @@ public class PerformanceCounter implements Runnable {
public void setReceivedBytes(ChannelHandlerContext ctx, long recv)
throws InterruptedException {
this.currentReadingBytes.addAndGet(recv);
if (this.limitRead == PerformanceCounterFactory.NO_LIMIT) {
if (this.limitRead == 0) {
// no action
return;
}
@ -473,7 +473,7 @@ public class PerformanceCounter implements Runnable {
*/
public void setToWriteBytes(long write) throws InterruptedException {
this.currentWritingBytes.addAndGet(write);
if (this.limitWrite == PerformanceCounterFactory.NO_LIMIT) {
if (this.limitWrite == 0) {
return;
}
// compute the number of ms to wait before continue with the channel

View File

@ -20,7 +20,7 @@
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.handler.trafficshaping;
package org.jboss.netty.handler.traffic;
import java.util.concurrent.ExecutorService;
@ -31,45 +31,33 @@ import org.jboss.netty.channel.Channel;
* @author Frederic Bregier (fredbregier@free.fr)
* @version $Rev$, $Date$
*
* The {@link PerformanceCounterFactory} is a Factory for
* {@link PerformanceCounter}. It stores the necessary information to enable
* dynamic creation of {@link PerformanceCounter} inside the
* The {@link TrafficCounterFactory} is a Factory for
* {@link TrafficCounter}. It stores the necessary information to enable
* dynamic creation of {@link TrafficCounter} inside the
* {@link TrafficShapingHandler}.
*
*
*/
public abstract class PerformanceCounterFactory {
/**
* No limit
*/
public static long NO_LIMIT = -1;
/**
* No statistics (if channel or global PerformanceCounter is a very long
* time living, it can produce a excess of capacity, i.e. 2^64 bytes so 17
* billions of billions of bytes).
*/
public static long NO_STAT = -1;
public abstract class TrafficCounterFactory {
/**
* Default delay between two checks: 1s
*/
public static long DEFAULT_DELAY = 1000;
/**
* ExecutorService to associated to any PerformanceCounter
* ExecutorService to associated to any TrafficCounter
*/
private ExecutorService executorService = null;
/**
* Limit in B/s to apply to write for all channel PerformanceCounter
* Limit in B/s to apply to write for all channel TrafficCounter
*/
private long channelLimitWrite = NO_LIMIT;
private long channelLimitWrite = 0;
/**
* Limit in B/s to apply to read for all channel PerformanceCounter
* Limit in B/s to apply to read for all channel TrafficCounter
*/
private long channelLimitRead = NO_LIMIT;
private long channelLimitRead = 0;
/**
* Delay between two performance snapshots for channel
@ -77,19 +65,19 @@ public abstract class PerformanceCounterFactory {
private long channelDelay = DEFAULT_DELAY; // default 1 s
/**
* Will the PerformanceCounter for Channel be active
* Will the TrafficCounter for Channel be active
*/
private boolean channelActive = true;
/**
* Limit in B/s to apply to write for the global PerformanceCounter
* Limit in B/s to apply to write for the global TrafficCounter
*/
private long globalLimitWrite = NO_LIMIT;
private long globalLimitWrite = 0;
/**
* Limit in B/s to apply to read for the global PerformanceCounter
* Limit in B/s to apply to read for the global TrafficCounter
*/
private long globalLimitRead = NO_LIMIT;
private long globalLimitRead = 0;
/**
* Delay between two performance snapshots for global
@ -97,23 +85,23 @@ public abstract class PerformanceCounterFactory {
private long globalDelay = DEFAULT_DELAY; // default 1 s
/**
* Will the PerformanceCounter for Global be active
* Will the TrafficCounter for Global be active
*/
private boolean globalActive = true;
/**
* Global Monitor
*/
private PerformanceCounter globalPerformanceMonitor = null;
private TrafficCounter globalTrafficMonitor = null;
/**
* Called each time the accounting is computed for the PerformanceCounters.
* Called each time the accounting is computed for the TrafficCounters.
* This method could be used for instance to implement real time accounting.
*
* @param counter
* the PerformanceCounter that computes its performance
* the TrafficCounter that computes its performance
*/
protected abstract void accounting(PerformanceCounter counter);
protected abstract void accounting(TrafficCounter counter);
/**
*
@ -149,7 +137,7 @@ public abstract class PerformanceCounterFactory {
* @param executorService
* created for instance like Executors.newCachedThreadPool
* @param channelActive
* True if each channel will have a PerformanceCounter
* True if each channel will have a TrafficCounter
* @param channelLimitWrite
* NO_LIMIT or a limit in bytes/s
* @param channelLimitRead
@ -158,7 +146,7 @@ public abstract class PerformanceCounterFactory {
* The delay between two computations of performances for
* channels or NO_STAT if no stats are to be computed
* @param globalActive
* True if global context will have one unique PerformanceCounter
* True if global context will have one unique TrafficCounter
* @param globalLimitWrite
* NO_LIMIT or a limit in bytes/s
* @param globalLimitRead
@ -167,7 +155,7 @@ public abstract class PerformanceCounterFactory {
* The delay between two computations of performances for global
* context or NO_STAT if no stats are to be computed
*/
public PerformanceCounterFactory(ExecutorService executorService,
public TrafficCounterFactory(ExecutorService executorService,
boolean channelActive, long channelLimitWrite,
long channelLimitRead, long channelDelay, boolean globalActive,
long globalLimitWrite, long globalLimitRead, long globalDelay) {
@ -182,19 +170,19 @@ public abstract class PerformanceCounterFactory {
* @param executorService
* created for instance like Executors.newCachedThreadPool
* @param channelActive
* True if each channel will have a PerformanceCounter
* True if each channel will have a TrafficCounter
* @param channelLimitWrite
* NO_LIMIT or a limit in bytes/s
* @param channelLimitRead
* NO_LIMIT or a limit in bytes/s
* @param globalActive
* True if global context will have one unique PerformanceCounter
* True if global context will have one unique TrafficCounter
* @param globalLimitWrite
* NO_LIMIT or a limit in bytes/s
* @param globalLimitRead
* NO_LIMIT or a limit in bytes/s
*/
public PerformanceCounterFactory(ExecutorService executorService,
public TrafficCounterFactory(ExecutorService executorService,
boolean channelActive, long channelLimitWrite,
long channelLimitRead, boolean globalActive, long globalLimitWrite,
long globalLimitRead) {
@ -209,9 +197,9 @@ public abstract class PerformanceCounterFactory {
* @param executorService
* created for instance like Executors.newCachedThreadPool
* @param channelActive
* True if each channel will have a PerformanceCounter
* True if each channel will have a TrafficCounter
* @param globalActive
* True if global context will have one unique PerformanceCounter
* True if global context will have one unique TrafficCounter
* @param globalLimitWrite
* NO_LIMIT or a limit in bytes/s
* @param globalLimitRead
@ -220,10 +208,10 @@ public abstract class PerformanceCounterFactory {
* The delay between two computations of performances for global
* context or NO_STAT if no stats are to be computed
*/
public PerformanceCounterFactory(ExecutorService executorService,
public TrafficCounterFactory(ExecutorService executorService,
boolean channelActive, boolean globalActive, long globalLimitWrite,
long globalLimitRead, long globalDelay) {
init(executorService, channelActive, NO_LIMIT, NO_LIMIT,
init(executorService, channelActive, 0, 0,
DEFAULT_DELAY, globalActive, globalLimitWrite, globalLimitRead,
globalDelay);
}
@ -234,18 +222,18 @@ public abstract class PerformanceCounterFactory {
* @param executorService
* created for instance like Executors.newCachedThreadPool
* @param channelActive
* True if each channel will have a PerformanceCounter
* True if each channel will have a TrafficCounter
* @param globalActive
* True if global context will have one unique PerformanceCounter
* True if global context will have one unique TrafficCounter
* @param globalLimitWrite
* NO_LIMIT or a limit in bytes/s
* @param globalLimitRead
* NO_LIMIT or a limit in bytes/s
*/
public PerformanceCounterFactory(ExecutorService executorService,
public TrafficCounterFactory(ExecutorService executorService,
boolean channelActive, boolean globalActive, long globalLimitWrite,
long globalLimitRead) {
init(executorService, channelActive, NO_LIMIT, NO_LIMIT,
init(executorService, channelActive, 0, 0,
DEFAULT_DELAY, globalActive, globalLimitWrite, globalLimitRead,
DEFAULT_DELAY);
}
@ -256,18 +244,18 @@ public abstract class PerformanceCounterFactory {
* @param executorService
* created for instance like Executors.newCachedThreadPool
* @param channelActive
* True if each channel will have a PerformanceCounter
* True if each channel will have a TrafficCounter
* @param globalActive
* True if global context will have one unique PerformanceCounter
* True if global context will have one unique TrafficCounter
*/
public PerformanceCounterFactory(ExecutorService executorService,
public TrafficCounterFactory(ExecutorService executorService,
boolean channelActive, boolean globalActive) {
init(executorService, channelActive, NO_LIMIT, NO_LIMIT,
DEFAULT_DELAY, globalActive, NO_LIMIT, NO_LIMIT, DEFAULT_DELAY);
init(executorService, channelActive, 0, 0,
DEFAULT_DELAY, globalActive, 0, 0, DEFAULT_DELAY);
}
/**
* Enable to change the active status of PerformanceCounter on Channels (for
* Enable to change the active status of TrafficCounter on Channels (for
* new one only)
*
* @param active
@ -277,7 +265,7 @@ public abstract class PerformanceCounterFactory {
}
/**
* Enable to change the active status of PerformanceCounter on Global (stop
* Enable to change the active status of TrafficCounter on Global (stop
* or start if necessary)
*
* @param active
@ -285,16 +273,16 @@ public abstract class PerformanceCounterFactory {
public void setGlobalActive(boolean active) {
if (this.globalActive) {
if (!active) {
stopGlobalPerformanceCounter();
stopGlobalTrafficCounter();
}
}
this.globalActive = active;
getGlobalPerformanceCounter();
getGlobalTrafficCounter();
}
/**
* Change the underlying limitations. Only Global PerformanceCounter (if
* any) is dynamically changed, but Channels PerformanceCounters are not
* Change the underlying limitations. Only Global TrafficCounter (if
* any) is dynamically changed, but Channels TrafficCounters are not
* changed, only new created ones.
*
* @param newchannelLimitWrite
@ -314,37 +302,37 @@ public abstract class PerformanceCounterFactory {
this.globalLimitWrite = newglobalLimitWrite;
this.globalLimitRead = newglobalLimitRead;
this.globalDelay = newglobaldelay;
if (this.globalPerformanceMonitor != null) {
this.globalPerformanceMonitor.changeConfiguration(null,
if (this.globalTrafficMonitor != null) {
this.globalTrafficMonitor.changeConfiguration(null,
newglobalLimitWrite, newglobalLimitRead, newglobaldelay);
}
}
/**
* @return the Global PerformanceCounter or null if this support is disabled
* @return the Global TrafficCounter or null if this support is disabled
*/
public PerformanceCounter getGlobalPerformanceCounter() {
public TrafficCounter getGlobalTrafficCounter() {
if (this.globalActive) {
if (this.globalPerformanceMonitor == null) {
this.globalPerformanceMonitor = new PerformanceCounter(this,
if (this.globalTrafficMonitor == null) {
this.globalTrafficMonitor = new TrafficCounter(this,
this.executorService, null, "GlobalPC",
this.globalLimitWrite, this.globalLimitRead,
this.globalDelay);
this.globalPerformanceMonitor.startMonitoring();
this.globalTrafficMonitor.startMonitoring();
}
}
return this.globalPerformanceMonitor;
return this.globalTrafficMonitor;
}
/**
* @param channel
* @return the channel PerformanceCounter or null if this support is
* @return the channel TrafficCounter or null if this support is
* disabled
*/
public PerformanceCounter createChannelPerformanceCounter(Channel channel) {
if (this.channelActive && ((this.channelLimitRead > NO_LIMIT) || (this.channelLimitWrite > NO_LIMIT)
|| (this.channelDelay > NO_STAT))) {
return new PerformanceCounter(this, this.executorService, channel,
public TrafficCounter createChannelTrafficCounter(Channel channel) {
if (this.channelActive && ((this.channelLimitRead > 0) || (this.channelLimitWrite > 0)
|| (this.channelDelay > 0))) {
return new TrafficCounter(this, this.executorService, channel,
"ChannelPC" + channel.getId(), this.channelLimitWrite,
this.channelLimitRead, this.channelDelay);
}
@ -352,14 +340,14 @@ public abstract class PerformanceCounterFactory {
}
/**
* Stop the global performance counter if any (Even it is stopped, the
* Stop the global TrafficCounter if any (Even it is stopped, the
* factory can however be reused)
*
*/
public void stopGlobalPerformanceCounter() {
if (this.globalPerformanceMonitor != null) {
this.globalPerformanceMonitor.stopMonitoring();
this.globalPerformanceMonitor = null;
public void stopGlobalTrafficCounter() {
if (this.globalTrafficMonitor != null) {
this.globalTrafficMonitor.stopMonitoring();
this.globalTrafficMonitor = null;
}
}
@ -421,8 +409,8 @@ public abstract class PerformanceCounterFactory {
*/
public void setGlobalDelay(long globalDelay) {
this.globalDelay = globalDelay;
if (this.globalPerformanceMonitor != null) {
this.globalPerformanceMonitor.changeConfiguration(null,
if (this.globalTrafficMonitor != null) {
this.globalTrafficMonitor.changeConfiguration(null,
this.globalLimitWrite, this.globalLimitRead,
this.globalDelay);
}
@ -441,8 +429,8 @@ public abstract class PerformanceCounterFactory {
*/
public void setGlobalLimitRead(long globalLimitRead) {
this.globalLimitRead = globalLimitRead;
if (this.globalPerformanceMonitor != null) {
this.globalPerformanceMonitor.changeConfiguration(null,
if (this.globalTrafficMonitor != null) {
this.globalTrafficMonitor.changeConfiguration(null,
this.globalLimitWrite, this.globalLimitRead,
this.globalDelay);
}
@ -461,8 +449,8 @@ public abstract class PerformanceCounterFactory {
*/
public void setGlobalLimitWrite(long globalLimitWrite) {
this.globalLimitWrite = globalLimitWrite;
if (this.globalPerformanceMonitor != null) {
this.globalPerformanceMonitor.changeConfiguration(null,
if (this.globalTrafficMonitor != null) {
this.globalTrafficMonitor.changeConfiguration(null,
this.globalLimitWrite, this.globalLimitRead,
this.globalDelay);
}

View File

@ -20,11 +20,8 @@
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.handler.trafficshaping;
package org.jboss.netty.handler.traffic;
import java.io.InvalidClassException;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
@ -33,6 +30,8 @@ import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.handler.execution.DefaultObjectSizeEstimator;
import org.jboss.netty.handler.execution.ObjectSizeEstimator;
/**
* @author The Netty Project (netty-dev@lists.jboss.org)
@ -43,16 +42,15 @@ import org.jboss.netty.channel.SimpleChannelHandler;
* bandwidth, as traffic shaping.<br>
* <br>
*
* The method getMessageSize(MessageEvent) has to be implemented to specify what
* An {@link ObjectSizeEstimator} can be passed at construction to specify what
* is the size of the object to be read or write accordingly to the type of
* object. In simple case, it can be as simple as a call to
* getChannelBufferMessageSize(MessageEvent).<br>
* object. If not specified, it will used the {@link DefaultObjectSizeEstimator} implementation.<br>
* <br>
*
* TrafficShapingHandler depends on {@link PerformanceCounterFactory} to create
* or use the necessary {@link PerformanceCounter} with the necessary options.
* TrafficShapingHandler depends on {@link TrafficCounterFactory} to create
* or use the necessary {@link TrafficCounter} with the necessary options.
* However, you can change the behavior of both global and channel
* PerformanceCounter if you like by getting them from this handler and changing
* TrafficCounter if you like by getting them from this handler and changing
* their status.
*
*/
@ -61,67 +59,56 @@ public abstract class TrafficShapingHandler extends SimpleChannelHandler {
/**
* Channel Monitor
*/
private PerformanceCounter channelPerformanceCounter = null;
private TrafficCounter channelTrafficCounter = null;
/**
* Global Monitor
*/
private PerformanceCounter globalPerformanceCounter = null;
private TrafficCounter globalTrafficCounter = null;
/**
* Factory if used
*/
private PerformanceCounterFactory factory = null;
private TrafficCounterFactory factory = null;
/**
* Constructor
* ObjectSizeEstimator
*/
private ObjectSizeEstimator objectSizeEstimator = null;
/**
* Constructor using default {@link ObjectSizeEstimator}
*
* @param factory
* the PerformanceCounterFactory from which all Monitors will be
* the TrafficCounterFactory from which all Monitors will be
* created
*/
public TrafficShapingHandler(PerformanceCounterFactory factory) {
public TrafficShapingHandler(TrafficCounterFactory factory) {
super();
this.factory = factory;
this.globalPerformanceCounter = this.factory
.getGlobalPerformanceCounter();
this.channelPerformanceCounter = null;
this.globalTrafficCounter = this.factory
.getGlobalTrafficCounter();
this.channelTrafficCounter = null;
this.objectSizeEstimator = new DefaultObjectSizeEstimator();
// will be set when connected is called
}
/**
* This method has to be implemented. It returns the size in bytes of the
* message to be read or written.
* Constructor using the specified ObjectSizeEstimator
*
* @param arg1
* the MessageEvent to be read or written
* @return the size in bytes of the given MessageEvent
* @exception Exception
* An exception can be thrown if the object is not of the
* expected type
* @param factory
* the TrafficCounterFactory from which all Monitors will be
* created
* @param objectSizeEstimator
* the {@link ObjectSizeEstimator} that will be used to compute
* the size of the message
*/
protected abstract long getMessageSize(MessageEvent arg1) throws Exception;
/**
* Example of function (which can be used) for the ChannelBuffer
*
* @param arg1
* @return the size in bytes of the given MessageEvent
* @throws Exception
*/
protected long getChannelBufferMessageSize(MessageEvent arg1)
throws Exception {
Object o = arg1.getMessage();
if (!(o instanceof ChannelBuffer)) {
// Type unimplemented
throw new InvalidClassException("Wrong object received in " +
this.getClass().getName() + " codec " +
o.getClass().getName());
}
ChannelBuffer dataBlock = (ChannelBuffer) o;
return dataBlock.readableBytes();
public TrafficShapingHandler(TrafficCounterFactory factory, ObjectSizeEstimator objectSizeEstimator) {
super();
this.factory = factory;
this.globalTrafficCounter = this.factory
.getGlobalTrafficCounter();
this.channelTrafficCounter = null;
this.objectSizeEstimator = objectSizeEstimator;
// will be set when connected is called
}
/*
* (non-Javadoc)
*
@ -131,12 +118,12 @@ public abstract class TrafficShapingHandler extends SimpleChannelHandler {
@Override
public void messageReceived(ChannelHandlerContext arg0, MessageEvent arg1)
throws Exception {
long size = getMessageSize(arg1);
if (this.channelPerformanceCounter != null) {
this.channelPerformanceCounter.setReceivedBytes(arg0, size);
long size = this.objectSizeEstimator.estimateSize(arg1.getMessage());
if (this.channelTrafficCounter != null) {
this.channelTrafficCounter.setReceivedBytes(arg0, size);
}
if (this.globalPerformanceCounter != null) {
this.globalPerformanceCounter.setReceivedBytes(arg0, size);
if (this.globalTrafficCounter != null) {
this.globalTrafficCounter.setReceivedBytes(arg0, size);
}
// The message is then just passed to the next Codec
super.messageReceived(arg0, arg1);
@ -151,12 +138,12 @@ public abstract class TrafficShapingHandler extends SimpleChannelHandler {
@Override
public void writeRequested(ChannelHandlerContext arg0, MessageEvent arg1)
throws Exception {
long size = getMessageSize(arg1);
if (this.channelPerformanceCounter != null) {
this.channelPerformanceCounter.setToWriteBytes(size);
long size = this.objectSizeEstimator.estimateSize(arg1.getMessage());
if (this.channelTrafficCounter != null) {
this.channelTrafficCounter.setToWriteBytes(size);
}
if (this.globalPerformanceCounter != null) {
this.globalPerformanceCounter.setToWriteBytes(size);
if (this.globalTrafficCounter != null) {
this.globalTrafficCounter.setToWriteBytes(size);
}
// The message is then just passed to the next Codec
super.writeRequested(arg0, arg1);
@ -171,9 +158,9 @@ public abstract class TrafficShapingHandler extends SimpleChannelHandler {
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
if (this.channelPerformanceCounter != null) {
this.channelPerformanceCounter.stopMonitoring();
this.channelPerformanceCounter = null;
if (this.channelTrafficCounter != null) {
this.channelTrafficCounter.stopMonitoring();
this.channelTrafficCounter = null;
}
super.channelClosed(ctx, e);
}
@ -190,15 +177,15 @@ public abstract class TrafficShapingHandler extends SimpleChannelHandler {
// readSuspended = true;
ctx.setAttachment(Boolean.TRUE);
ctx.getChannel().setReadable(false);
if ((this.channelPerformanceCounter == null) && (this.factory != null)) {
if ((this.channelTrafficCounter == null) && (this.factory != null)) {
// A factory was used
this.channelPerformanceCounter = this.factory
.createChannelPerformanceCounter(ctx.getChannel());
this.channelTrafficCounter = this.factory
.createChannelTrafficCounter(ctx.getChannel());
}
if (this.channelPerformanceCounter != null) {
this.channelPerformanceCounter
if (this.channelTrafficCounter != null) {
this.channelTrafficCounter
.setMonitoredChannel(ctx.getChannel());
this.channelPerformanceCounter.startMonitoring();
this.channelTrafficCounter.startMonitoring();
}
super.channelConnected(ctx, e);
// readSuspended = false;
@ -229,21 +216,21 @@ public abstract class TrafficShapingHandler extends SimpleChannelHandler {
/**
*
* @return the current ChannelPerformanceCounter set from the factory (if
* @return the current Channel TrafficCounter set from the factory (if
* channel is still connected) or null if this function was disabled
* in the Factory
*/
public PerformanceCounter getChannelPerformanceCounter() {
return this.channelPerformanceCounter;
public TrafficCounter getChannelTrafficCounter() {
return this.channelTrafficCounter;
}
/**
*
* @return the GlobalPerformanceCounter from the factory or null if this
* @return the Global TrafficCounter from the factory or null if this
* function was disabled in the Factory
*/
public PerformanceCounter getGlobalPerformanceCounter() {
return this.globalPerformanceCounter;
public TrafficCounter getGlobalTrafficCounter() {
return this.globalTrafficCounter;
}
}

View File

@ -32,17 +32,18 @@
*
* <P>Three classes implement this behavior:<br>
* <ul>
* <li> <tt>PerformanceCounter</tt>: this class is the kernel of the package. It can be accessed to get some extra information
* <li> <tt>{@link TrafficCounter}</tt>: this class is the kernel of the package. It can be accessed to get some extra information
* like the read or write bytes since last check, the read and write bandwidth from last check...</li><br><br>
*
* <li> <tt>PerformanceCounterFactory</tt>: this class has to be implemented in your code in order to implement (eventually empty)
* the accounting method. This class is a Factory for PerformanceCounter which is used in the third class to create the
* necessary PerformanceCounter according to your specifications.</li><br><br>
* <li> <tt>{@link TrafficCounterFactory}</tt>: this class has to be implemented in your code in order to implement (eventually empty)
* the accounting method. This class is a Factory for TrafficCounter which is used in the third class to create the
* necessary TrafficCounter according to your specifications.</li><br><br>
*
* <li> <tt>TrafficShapingHandler</tt>: this class is the handler to be inserted in your pipeline. The insertion can be wherever
* you want, but <b>it must be placed before any <tt>MemoryAwareThreadPoolExecutor</tt> in your pipeline</b>.</li><br>
* <li> <tt>{@link TrafficShapingHandler}</tt>: this class is the handler to be inserted in your pipeline. The insertion can be wherever
* you want, but <b>it must be placed before any <tt>{@link MemoryAwareThreadPoolExecutor}</tt> in your pipeline</b>.</li><br>
* <b><i>It is really recommended
* to have such a</i> <tt>MemoryAwareThreadPoolExecutor</tt> <i>(either non ordered or </i><tt>OrderedMemoryAwareThreadPoolExecutor</tt>
* to have such a</i> <tt>{@link MemoryAwareThreadPoolExecutor}</tt> <i>(either non ordered or </i>
* <tt>{@link OrderedMemoryAwareThreadPoolExecutor}</tt>
* <i>) in your pipeline</i></b>
* when you want to use this feature with some real traffic shaping, since it will allow to relax the constraint on
* NioWorker to do other jobs if necessary.<br>
@ -53,8 +54,9 @@
* of 100KB/s for each channel (client), you could have a final limitation of about 60KB/s for each channel since NioWorkers are
* stopping by this handler.<br>
* When it is used as a read traffic shaper, the handler will set the channel as not readable, so as to relax the NioWorkers.<br><br>
* The method <tt>getMessageSize(MessageEvent)</tt> has to be implemented to specify what is the size of the object to be read or write
* accordingly to the type of object. In simple case, it can be as simple as a call to <tt>getChannelBufferMessageSize(MessageEvent)</tt>.<br>
* An {@link ObjectSizeEstimator} can be passed at construction to specify what
* is the size of the object to be read or write accordingly to the type of
* object. If not specified, it will used the {@link DefaultObjectSizeEstimator} implementation.
* </ul></P>
*
* <P>Standard use could be as follow:</P>
@ -62,26 +64,26 @@
* <P><ul>
* <li>To activate or deactivate the traffic shaping, change the value corresponding to your desire as
* [Global or per Channel] [Write or Read] Limitation in byte/s.</li><br>
* <tt>PerformanceCounterFactory.NO_LIMIT</tt> (-1)
* A value of <tt>0</tt>
* stands for no limitation, so the traffic shaping is deactivate (on what you specified).<br>
* You can either change those values with the method <tt>changeConfiguration</tt> in PerformanceCounterFactory or
* directly from the PerformanceCounter method <tt>changeConfiguration</tt>.<br>
* You can either change those values with the method <tt>changeConfiguration</tt> in TrafficCounterFactory or
* directly from the TrafficCounter method <tt>changeConfiguration</tt>.<br>
* <br>
*
* <li>To activate or deactivate the statistics, you can adjust the delay to a low (not less than 200ms
* for efficiency reasons) or a high value (let say 24H in ms is huge enough to not get the problem)
* or even using <tt>PerformanceCounterFactory.NO_STAT</tt> (-1)</li>.<br>
* or even using <tt>0</tt> which means no computation will be done.</li><br>
* And if you don't want to do anything with this statistics, just implement an empty method for
* <tt>PerformanceCounterFactory.accounting(PerformanceCounter)</tt>.<br>
* Again this can be changed either from PerformanceCounterFactory or directly in PerformanceCounter.<br><br>
* <tt>TrafficCounterFactory.accounting(TrafficCounter)</tt>.<br>
* Again this can be changed either from TrafficCounterFactory or directly in TrafficCounter.<br><br>
*
* <li>You can also completely deactivate channel or global PerformanceCounter by setting the boolean to false
* accordingly to your needs in the PerformanceCounterFactory. It will deactivate the global Monitor. For channels monitor,
* <li>You can also completely deactivate channel or global TrafficCounter by setting the boolean to false
* accordingly to your needs in the TrafficCounterFactory. It will deactivate the global Monitor. For channels monitor,
* it will prevent new monitors to be created (or reversely they will be created for newly connected channels).</li>
* </ul></P><br><br>
*
* <P>So in your application you will create your own PerformanceCounterFactory and setting the values to fit your needs.</P>
* <tt>MyPerformanceCounterFactory myFactory = new MyPerformanceCounter(...);</tt><br><br><br>
* <P>So in your application you will create your own TrafficCounterFactory and setting the values to fit your needs.</P>
* <tt>MyTrafficCounterFactory myFactory = new MyTrafficCounter(...);</tt><br><br><br>
* <P>Then you can create your pipeline (using a PipelineFactory since each TrafficShapingHandler must be unique by channel) and adding this handler before
* your MemoryAwareThreadPoolExecutor:</P>
* <tt>pipeline.addLast("MyTrafficShaping",new MyTrafficShapingHandler(myFactory));</tt><br>
@ -89,11 +91,15 @@
* <tt>pipeline.addLast("MemoryExecutor",new ExecutionHandler(memoryAwareThreadPoolExecutor));</tt><br><br>
*
* <P>TrafficShapingHandler must be unique by channel but however it is still global due to
* the PerformanceCounterFactcory that is shared between all handlers across the channels.</P>
* the TrafficCounterFactcory that is shared between all handlers across the channels.</P>
*
*
*
* @apiviz.exclude ^java\.lang\.
*/
package org.jboss.netty.handler.trafficshaping;
package org.jboss.netty.handler.traffic;
import org.jboss.netty.handler.execution.DefaultObjectSizeEstimator;
import org.jboss.netty.handler.execution.ObjectSizeEstimator;
import org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor;
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;