TrafficShaping v1

This commit is contained in:
Frédéric Brégier 2009-03-18 18:48:37 +00:00
parent f777482a97
commit 4d61f6bda6
4 changed files with 1118 additions and 0 deletions

View File

@ -0,0 +1,448 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2009, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.handler.trafficshaping;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
/**
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
* @author Frederic Bregier (fredbregier@free.fr)
* @version $Rev$, $Date$
*
* PerformanceCounter is associated with {@link TrafficShapingHandler} and should be created through
* a {@link PerformanceCounterFactory}.<br>
* <br>
* A PerformanceCounter can limit the traffic or not, globaly or per channel, and always
* compute statistics on read and written bytes at the specified interval.
*
*/
public class PerformanceCounter implements Runnable {
/**
* Internal logger
*/
private static InternalLogger logger =
InternalLoggerFactory.getInstance(PerformanceCounter.class);
/**
* Current writing bytes
*/
private final AtomicLong currentWritingBytes = new AtomicLong(0);
/**
* Current reading bytes
*/
private final AtomicLong currentReadingBytes = new AtomicLong(0);
/**
* Last writing bandwidth
*/
private long lastWritingBandwidth = 0;
/**
* Last reading bandwidth
*/
private long lastReadingBandwidth = 0;
/**
* Last Time Check taken
*/
private final AtomicLong lastTime = new AtomicLong(0);
/**
* Last written bytes number
*/
private long lastWritingBytes = 0;
/**
* Last read bytes number
*/
private long lastReadingBytes = 0;
/**
* Current Limit in B/s to apply to write
*/
private long limitWrite = PerformanceCounterFactory.NO_LIMIT;
/**
* Current Limit in B/s to apply to read
*/
private long limitRead = PerformanceCounterFactory.NO_LIMIT;
/**
* Delay between two capture
*/
private long delay = PerformanceCounterFactory.DEFAULT_DELAY; // default 1 s
/**
* Name of this Monitor
*/
private final String name;
/**
* Is this monitor for a channel monitoring or for global monitoring
*/
private boolean isPerChannel = false;
/**
* Associated monitoredChannel if any (global MUST NOT have any)
*/
private Channel monitoredChannel = null;
/**
* The associated PerformanceCounterFactory
*/
private PerformanceCounterFactory factory = null;
/**
* Default ExecutorService
*/
private ExecutorService executorService = null;
/**
* Thread that will host this monitor
*/
private Future<?> monitorFuture = null;
/**
* Start the monitoring process
*
*/
public void startMonitoring() {
synchronized (this.lastTime) {
if (this.monitorFuture != null) {
return;
}
lastTime.set(System.currentTimeMillis());
if (this.delay > 0) {
this.monitorFuture = executorService.submit(this);
}
}
}
/**
* Stop the monitoring process
*
*/
public void stopMonitoring() {
synchronized (this.lastTime) {
if (this.monitorFuture == null) {
return;
}
this.monitorFuture.cancel(true);
this.monitorFuture = null;
this.resetAccounting(System.currentTimeMillis());
if (this.factory != null) {
this.factory.accounting(this);
}
this.setMonitoredChannel(null);
}
}
/**
* Default run
*/
public void run() {
try {
for (;;) {
if (this.delay > 0) {
Thread.sleep(this.delay);
} else {
// Delay goes to PerformanceCounterFactory.NO_STAT, so exit
return;
}
long endTime = System.currentTimeMillis();
this.resetAccounting(endTime);
if (this.factory != null) {
this.factory.accounting(this);
}
}
} catch (InterruptedException e) {
// End of computations
}
}
/**
* Set the accounting on Read and Write
* @param newLastTime
*/
private void resetAccounting(long newLastTime) {
synchronized (this.lastTime) {
long interval = newLastTime - this.lastTime.getAndSet(newLastTime);
if (interval == 0) {
// nothing to do
return;
}
this.lastReadingBytes = this.currentReadingBytes.getAndSet(0);
this.lastWritingBytes = this.currentWritingBytes.getAndSet(0);
this.lastReadingBandwidth = ((this.lastReadingBytes /
interval) * 1000); // nb byte / delay in ms * 1000 (1s)
this.lastWritingBandwidth = ((this.lastWritingBytes /
interval) * 1000); // nb byte / delay in ms * 1000 (1s)
}
}
/**
* Constructor with the executorService to use, the channel if any, its name, the limits in Byte/s (not Bit/s) and the delay between two computations in ms
* @param factory the associated PerformanceCounterFactory
* @param executorService Should be a CachedThreadPool for efficiency
* @param channel Not null means this monitors will be for this channel only, else it will be for global monitoring.
* Channel can be set later on therefore changing its behaviour from global to per channel
* @param name the name given to this monitor
* @param writeLimit the write limit in Byte/s
* @param readLimit the read limit in Byte/s
* @param delay the delay in ms between two computations
*/
public PerformanceCounter(PerformanceCounterFactory factory, ExecutorService executorService, Channel channel, String name,
long writeLimit, long readLimit, long delay) {
this.factory = factory;
this.executorService = executorService;
this.name = name;
this.changeConfiguration(channel, writeLimit, readLimit, delay);
}
/**
* Set the Session monitoredChannel (not for Global Monitor)
* @param channel Not null means this monitors will be for this channel only, else it will be for global monitoring.
* Channel can be set later on therefore changing its behaviour from global to per channel
*/
public void setMonitoredChannel(Channel channel) {
if (channel != null) {
this.monitoredChannel = channel;
this.isPerChannel = true;
} else {
this.isPerChannel = false;
this.monitoredChannel = null;
}
}
/**
* Specifies limits in Byte/s (not Bit/s) but do not changed the delay
* @param channel Not null means this monitors will be for this channel only, else it will be for global monitoring.
* Channel can be set later on therefore changing its behaviour from global to per channel
* @param writeLimit
* @param readLimit
*/
public void changeConfiguration(Channel channel, long writeLimit, long readLimit) {
this.limitWrite = writeLimit;
this.limitRead = readLimit;
this.setMonitoredChannel(channel);
}
/**
* Specifies limits in Byte/s (not Bit/s) and the specified delay between two computations in ms
* @param channel Not null means this monitors will be for this channel only, else it will be for global monitoring.
* Channel can be set later on therefore changing its behaviour from global to per channel
* @param writeLimit
* @param readLimit
* @param delayToSet
*/
public void changeConfiguration(Channel channel, long writeLimit, long readLimit, long delayToSet) {
if (this.delay != delayToSet) {
this.delay = delayToSet;
if (this.monitorFuture == null) {
this.changeConfiguration(channel, writeLimit, readLimit);
return;
}
this.stopMonitoring();
if (this.delay > 0) {
this.startMonitoring();
} else {
// No more active monitoring
lastTime.set(System.currentTimeMillis());
}
}
this.changeConfiguration(channel, writeLimit, readLimit);
}
/**
*
* @return the time that should be necessary to wait to respect limit. Can be negative time
*/
private long getReadTimeToWait() {
synchronized (this.lastTime) {
long interval = System.currentTimeMillis() - this.lastTime.get();
if (interval == 0) {
// Time is too short, so just lets continue
return 0;
}
long wait = this.currentReadingBytes.get() * 1000 / this.limitRead - interval;
return wait;
}
}
/**
*
* @return the time that should be necessary to wait to respect limit. Can be negative time
*/
private long getWriteTimeToWait() {
synchronized (this.lastTime) {
long interval = System.currentTimeMillis() - this.lastTime.get();
if (interval == 0) {
// Time is too short, so just lets continue
return 0;
}
long wait = this.currentWritingBytes.get() * 1000 / this.limitWrite - interval;
return wait;
}
}
/**
* Class to implement setReadable at fix time
*
*/
private class ReopenRead implements Runnable {
/**
* Associated ChannelHandlerContext
*/
private ChannelHandlerContext ctx = null;
/**
* Monitor
*/
private PerformanceCounter monitor = null;
/**
* Time to wait before clearing the channel
*/
private long timeToWait = 0;
/**
* @param monitor
* @param ctx the associated channelHandlerContext
* @param timeToWait
*/
public ReopenRead(ChannelHandlerContext ctx, PerformanceCounter monitor, long timeToWait) {
this.ctx = ctx;
this.monitor = monitor;
this.timeToWait = timeToWait;
}
/**
* Truely run the waken up of the channel
*/
public void run() {
try {
Thread.sleep(timeToWait);
} catch (InterruptedException e) {
// interruption so exit
return;
}
//logger.info("WAKEUP!");
if ((this.monitor != null) && (this.monitor.monitoredChannel != null) && (this.monitor.monitoredChannel.isConnected())) {
//logger.warn(" setReadable TRUE: "+timeToWait);
if (ctx.getHandler() instanceof TrafficShapingHandler) {
// readSuspended = false;
ctx.setAttachment(null);
}
this.monitor.monitoredChannel.setReadable(true);
}
}
}
/**
* If Read is in excess, it will block the read on channel or block until it will be ready again.
* @param ctx the associated channelHandlerContext
* @param recv the size in bytes to read
* @throws InterruptedException
*/
public void setReceivedBytes(ChannelHandlerContext ctx, long recv) throws InterruptedException {
this.currentReadingBytes.addAndGet(recv);
if (this.limitRead == PerformanceCounterFactory.NO_LIMIT) {
// no action
return;
}
if ((this.isPerChannel) && (this.monitoredChannel != null) && (! this.monitoredChannel.isConnected())) {
// no action can be taken since setReadable will throw a NotYetConnected
return;
}
// compute the number of ms to wait before reopening the channel
long wait = this.getReadTimeToWait();
if (wait > 20) { // At least 20ms seems a minimal time in order to try to limit the traffic
if ((this.isPerChannel) && (this.monitoredChannel != null) && (this.monitoredChannel.isConnected())) {
// Channel version
if (this.executorService == null) {
//Sleep since no executor
Thread.sleep(wait);
return;
}
if (ctx.getAttachment() == null) {
if (ctx.getHandler() instanceof TrafficShapingHandler) {
// readSuspended = true;
ctx.setAttachment(Boolean.TRUE);
}
this.monitoredChannel.setReadable(false);
//logger.info("Read will wakeup after "+wait+" ms "+this);
this.executorService.submit(new ReopenRead(ctx, this,wait));
} else {
// should be waiting: but can occurs sometime so as a FIX
logger.info("Read sleep ok but should not be here");
Thread.sleep(wait);
}
} else {
// Global version
//logger.info("Read sleep "+wait+" ms for "+this);
Thread.sleep(wait);
}
}
}
/**
* If Write is in excess, it will block the write operation until it will be ready again.
* @param write the size in bytes to write
* @throws InterruptedException
*/
public void setToWriteBytes(long write) throws InterruptedException {
this.currentWritingBytes.addAndGet(write);
if (this.limitWrite == PerformanceCounterFactory.NO_LIMIT) {
return;
}
// compute the number of ms to wait before continue with the channel
long wait = this.getWriteTimeToWait();
if (wait > 20) {
// Global or Session
Thread.sleep(wait);
}
}
/**
*
* @return the current delay between two computations of performance counter in ms
*/
public long getDelay() {
return this.delay;
}
/**
*
* @return the current Read bandwidth in byte/s
*/
public long getLastReadBandwidth() {
return this.lastReadingBandwidth;
}
/**
*
* @return the current Write bandwidth in byte/s
*/
public long getLastWriteBandwidth() {
return this.lastWritingBandwidth;
}
/**
*
* @return the current number of byte read since last delay
*/
public long getLastBytesRead() {
return this.lastReadingBytes;
}
/**
*
* @return the current number of byte written since last delay
*/
public long getLastBytesWrite() {
return this.lastWritingBytes;
}
/**
* String information
*/
public String toString() {
return "Monitor "+this.name+" Current Speed Read: "+(this.lastReadingBandwidth>>10)+" KB/s, Write: "+(this.lastWritingBandwidth>>10)+
" KB/s Current Read: "+(this.currentReadingBytes.get()>>10)+" KB Current Write: "+(this.currentWritingBytes.get()>>10)+" KB";
}
}

View File

@ -0,0 +1,380 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2009, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.handler.trafficshaping;
import java.util.concurrent.ExecutorService;
import org.jboss.netty.channel.Channel;
/**
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
* @author Frederic Bregier (fredbregier@free.fr)
* @version $Rev$, $Date$
*
* The {@link PerformanceCounterFactory} is a Factory for {@link PerformanceCounter}. It stores
* the necessary information to enable dynamic creation of {@link PerformanceCounter} inside the {@link TrafficShapingHandler}.
*
*
*/
public abstract class PerformanceCounterFactory {
/**
* No limit
*/
public static long NO_LIMIT = -1;
/**
* No statistics (if channel or global PerformanceCounter is a very long time living, it can produce
* a excess of capacity, i.e. 2^64 bytes so 17 billions of billions of bytes).
*/
public static long NO_STAT = -1;
/**
* Default delay between two checks: 1s
*/
public static long DEFAULT_DELAY = 1000;
/**
* ExecutorService to associated to any PerformanceCounter
*/
private ExecutorService executorService = null;
/**
* Limit in B/s to apply to write for all channel PerformanceCounter
*/
private long channelLimitWrite = NO_LIMIT;
/**
* Limit in B/s to apply to read for all channel PerformanceCounter
*/
private long channelLimitRead = NO_LIMIT;
/**
* Delay between two performance snapshots for channel
*/
private long channelDelay = DEFAULT_DELAY; // default 1 s
/**
* Will the PerformanceCounter for Channel be active
*/
private boolean channelActive = true;
/**
* Limit in B/s to apply to write for the global PerformanceCounter
*/
private long globalLimitWrite = NO_LIMIT;
/**
* Limit in B/s to apply to read for the global PerformanceCounter
*/
private long globalLimitRead = NO_LIMIT;
/**
* Delay between two performance snapshots for global
*/
private long globalDelay = DEFAULT_DELAY; // default 1 s
/**
* Will the PerformanceCounter for Global be active
*/
private boolean globalActive = true;
/**
* Global Monitor
*/
private PerformanceCounter globalPerformanceMonitor = null;
/**
* Called each time the accounting is computed for the PerformanceCounters.
* This method could be used for instance to implement real time accounting.
* @param counter the PerformanceCounter that computes its performance
*/
protected abstract void accounting(PerformanceCounter counter);
/**
*
* @param newexecutorService
* @param newChannelActive
* @param newChannelLimitWrite
* @param newChannelLimitRead
* @param newChannelDelay
* @param newGlobalActive
* @param newGlobalLimitWrite
* @param newGlobalLimitRead
* @param newGlobalDelay
*/
private void init(ExecutorService newexecutorService,
boolean newChannelActive, long newChannelLimitWrite, long newChannelLimitRead, long newChannelDelay,
boolean newGlobalActive, long newGlobalLimitWrite, long newGlobalLimitRead, long newGlobalDelay) {
this.executorService = newexecutorService;
this.channelActive = newChannelActive;
this.channelLimitWrite = newChannelLimitWrite;
this.channelLimitRead = newChannelLimitRead;
this.channelDelay = newChannelDelay;
this.globalActive = newGlobalActive;
this.globalLimitWrite = newGlobalLimitWrite;
this.globalLimitRead = newGlobalLimitRead;
this.globalDelay = newGlobalDelay;
}
/**
* Full constructor
* @param executorService created for instance like Executors.newCachedThreadPool
* @param channelActive True if each channel will have a PerformanceCounter
* @param channelLimitWrite NO_LIMIT or a limit in bytes/s
* @param channelLimitRead NO_LIMIT or a limit in bytes/s
* @param channelDelay The delay between two computations of performances for channels or NO_STAT if no stats are to be computed
* @param globalActive True if global context will have one unique PerformanceCounter
* @param globalLimitWrite NO_LIMIT or a limit in bytes/s
* @param globalLimitRead NO_LIMIT or a limit in bytes/s
* @param globalDelay The delay between two computations of performances for global context or NO_STAT if no stats are to be computed
*/
public PerformanceCounterFactory(ExecutorService executorService,
boolean channelActive, long channelLimitWrite, long channelLimitRead, long channelDelay,
boolean globalActive, long globalLimitWrite, long globalLimitRead, long globalDelay) {
this.init(executorService,
channelActive, channelLimitWrite, channelLimitRead, channelDelay,
globalActive, globalLimitWrite, globalLimitRead, globalDelay);
}
/**
* Constructor using default Delay
* @param executorService created for instance like Executors.newCachedThreadPool
* @param channelActive True if each channel will have a PerformanceCounter
* @param channelLimitWrite NO_LIMIT or a limit in bytes/s
* @param channelLimitRead NO_LIMIT or a limit in bytes/s
* @param globalActive True if global context will have one unique PerformanceCounter
* @param globalLimitWrite NO_LIMIT or a limit in bytes/s
* @param globalLimitRead NO_LIMIT or a limit in bytes/s
*/
public PerformanceCounterFactory(ExecutorService executorService,
boolean channelActive, long channelLimitWrite, long channelLimitRead,
boolean globalActive, long globalLimitWrite, long globalLimitRead) {
this.init(executorService,
channelActive, channelLimitWrite, channelLimitRead, DEFAULT_DELAY,
globalActive, globalLimitWrite, globalLimitRead, DEFAULT_DELAY);
}
/**
* Constructor using NO_LIMIT and default delay for channels
* @param executorService created for instance like Executors.newCachedThreadPool
* @param channelActive True if each channel will have a PerformanceCounter
* @param globalActive True if global context will have one unique PerformanceCounter
* @param globalLimitWrite NO_LIMIT or a limit in bytes/s
* @param globalLimitRead NO_LIMIT or a limit in bytes/s
* @param globalDelay The delay between two computations of performances for global context or NO_STAT if no stats are to be computed
*/
public PerformanceCounterFactory(ExecutorService executorService,
boolean channelActive,
boolean globalActive, long globalLimitWrite, long globalLimitRead, long globalDelay) {
this.init(executorService,
channelActive, NO_LIMIT, NO_LIMIT, DEFAULT_DELAY,
globalActive, globalLimitWrite, globalLimitRead, globalDelay);
}
/**
* Constructor using NO_LIMIT for channels and default delay for all
* @param executorService created for instance like Executors.newCachedThreadPool
* @param channelActive True if each channel will have a PerformanceCounter
* @param globalActive True if global context will have one unique PerformanceCounter
* @param globalLimitWrite NO_LIMIT or a limit in bytes/s
* @param globalLimitRead NO_LIMIT or a limit in bytes/s
*/
public PerformanceCounterFactory(ExecutorService executorService,
boolean channelActive,
boolean globalActive, long globalLimitWrite, long globalLimitRead) {
this.init(executorService,
channelActive, NO_LIMIT, NO_LIMIT, DEFAULT_DELAY,
globalActive, globalLimitWrite, globalLimitRead, DEFAULT_DELAY);
}
/**
* Constructor using NO_LIMIT and default delay for all
* @param executorService created for instance like Executors.newCachedThreadPool
* @param channelActive True if each channel will have a PerformanceCounter
* @param globalActive True if global context will have one unique PerformanceCounter
*/
public PerformanceCounterFactory(ExecutorService executorService,
boolean channelActive,
boolean globalActive) {
this.init(executorService,
channelActive, NO_LIMIT, NO_LIMIT, DEFAULT_DELAY,
globalActive, NO_LIMIT, NO_LIMIT, DEFAULT_DELAY);
}
/**
* Enable to change the active status of PerformanceCounter on Channels (for new one only)
* @param active
*/
public void setChannelActive(boolean active) {
this.channelActive = active;
}
/**
* Enable to change the active status of PerformanceCounter on Global (stop or start if necessary)
* @param active
*/
public void setGlobalActive(boolean active) {
if (this.globalActive) {
if (!active) {
this.stopGlobalPerformanceCounter();
}
}
this.globalActive = active;
this.getGlobalPerformanceCounter();
}
/**
* Change the underlying limitations. Only Global PerformanceCounter (if any) is dynamically changed, but Channels PerformanceCounters
* are not changed, only new created ones.
* @param newchannelLimitWrite
* @param newchannelLimitRead
* @param newchanneldelay
* @param newglobalLimitWrite
* @param newglobalLimitRead
* @param newglobaldelay
*/
public void changeConfiguration(long newchannelLimitWrite, long newchannelLimitRead, long newchanneldelay,
long newglobalLimitWrite, long newglobalLimitRead, long newglobaldelay) {
this.channelLimitWrite = newchannelLimitWrite;
this.channelLimitRead = newchannelLimitRead;
this.channelDelay = newchanneldelay;
this.globalLimitWrite = newglobalLimitWrite;
this.globalLimitRead = newglobalLimitRead;
this.globalDelay = newglobaldelay;
if (this.globalPerformanceMonitor != null) {
this.globalPerformanceMonitor.changeConfiguration(null, newglobalLimitWrite, newglobalLimitRead, newglobaldelay);
}
}
/**
* @return the Global PerformanceCounter or null if this support is disabled
*/
public PerformanceCounter getGlobalPerformanceCounter() {
if (this.globalActive) {
if (this.globalPerformanceMonitor == null) {
this.globalPerformanceMonitor =
new PerformanceCounter(this, this.executorService, null, "GLOBAL", this.globalLimitWrite, this.globalLimitRead, this.globalDelay);
this.globalPerformanceMonitor.startMonitoring();
}
}
return this.globalPerformanceMonitor;
}
/**
* @param channel
* @return the channel PerformanceCounter or null if this support is disabled
*/
public PerformanceCounter createChannelPerformanceCounter(Channel channel) {
if (this.channelActive) {
return new PerformanceCounter(this, this.executorService, channel, "Channel"+channel.getId(),
this.channelLimitWrite, this.channelLimitRead, this.channelDelay);
}
return null;
}
/**
* Stop the global performance counter if any (Even it is stopped, the factory can however be reused)
*
*/
public void stopGlobalPerformanceCounter() {
if (this.globalPerformanceMonitor != null) {
this.globalPerformanceMonitor.stopMonitoring();
this.globalPerformanceMonitor = null;
}
}
/**
* @return the channelDelay
*/
public long getChannelDelay() {
return this.channelDelay;
}
/**
* @param channelDelay the channelDelay to set
*/
public void setChannelDelay(long channelDelay) {
this.channelDelay = channelDelay;
}
/**
* @return the channelLimitRead
*/
public long getChannelLimitRead() {
return this.channelLimitRead;
}
/**
* @param channelLimitRead the channelLimitRead to set
*/
public void setChannelLimitRead(long channelLimitRead) {
this.channelLimitRead = channelLimitRead;
}
/**
* @return the channelLimitWrite
*/
public long getChannelLimitWrite() {
return this.channelLimitWrite;
}
/**
* @param channelLimitWrite the channelLimitWrite to set
*/
public void setChannelLimitWrite(long channelLimitWrite) {
this.channelLimitWrite = channelLimitWrite;
}
/**
* @return the globalDelay
*/
public long getGlobalDelay() {
return this.globalDelay;
}
/**
* @param globalDelay the globalDelay to set
*/
public void setGlobalDelay(long globalDelay) {
this.globalDelay = globalDelay;
if (this.globalPerformanceMonitor != null) {
this.globalPerformanceMonitor.changeConfiguration(null, this.globalLimitWrite, this.globalLimitRead, this.globalDelay);
}
}
/**
* @return the globalLimitRead
*/
public long getGlobalLimitRead() {
return this.globalLimitRead;
}
/**
* @param globalLimitRead the globalLimitRead to set
*/
public void setGlobalLimitRead(long globalLimitRead) {
this.globalLimitRead = globalLimitRead;
if (this.globalPerformanceMonitor != null) {
this.globalPerformanceMonitor.changeConfiguration(null, this.globalLimitWrite, this.globalLimitRead, this.globalDelay);
}
}
/**
* @return the globalLimitWrite
*/
public long getGlobalLimitWrite() {
return this.globalLimitWrite;
}
/**
* @param globalLimitWrite the globalLimitWrite to set
*/
public void setGlobalLimitWrite(long globalLimitWrite) {
this.globalLimitWrite = globalLimitWrite;
if (this.globalPerformanceMonitor != null) {
this.globalPerformanceMonitor.changeConfiguration(null, this.globalLimitWrite, this.globalLimitRead, this.globalDelay);
}
}
/**
* @return the channelActive
*/
public boolean isChannelActive() {
return this.channelActive;
}
/**
* @return the globalActive
*/
public boolean isGlobalActive() {
return this.globalActive;
}
}

View File

@ -0,0 +1,197 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2009, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.handler.trafficshaping;
import java.io.InvalidClassException;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
/**
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
* @author Frederic Bregier (fredbregier@free.fr)
* @version $Rev$, $Date$
*
* TrafficShapingHandler allows to limit the global bandwidth or per session bandwidth, as traffic shaping.<br><br>
*
* The method getMessageSize(MessageEvent) has to be implemented to specify what is the size of the object to be read or write
* accordingly to the type of object. In simple case, it can be as simple as a call to getChannelBufferMessageSize(MessageEvent).<br><br>
*
* TrafficShapingHandler depends on {@link PerformanceCounterFactory} to create or use the necessary {@link PerformanceCounter}
* with the necessary options. However, you can change the behaviour of both global and channel PerformanceCounter if you like
* by getting them from this handler and changing their status.
*
*/
@ChannelPipelineCoverage("one")
public abstract class TrafficShapingHandler extends SimpleChannelHandler {
/**
* Channel Monitor
*/
private PerformanceCounter channelPerformanceCounter = null;
/**
* Global Monitor
*/
private PerformanceCounter globalPerformanceCounter = null;
/**
* Factory if used
*/
private PerformanceCounterFactory factory = null;
/**
* Constructor
* @param factory the PerformanceCounterFactory from which all Monitors will be created
*/
public TrafficShapingHandler(PerformanceCounterFactory factory) {
super();
this.factory = factory;
this.globalPerformanceCounter = this.factory.getGlobalPerformanceCounter();
this.channelPerformanceCounter = null; // will be set when connected is called
}
/**
* This method has to be implemented. It returns the size in bytes of the message to be read or written.
* @param arg1 the MessageEvent to be read or written
* @return the size in bytes of the given MessageEvent
* @exception Exception An exception can be thrown if the object is not of the expected type
*/
protected abstract long getMessageSize(MessageEvent arg1) throws Exception ;
/**
* Example of function (which can be used) for the ChannelBuffer
* @param arg1
* @return the size in bytes of the given MessageEvent
* @throws Exception
*/
protected long getChannelBufferMessageSize(MessageEvent arg1) throws Exception {
Object o = arg1.getMessage();
if (! (o instanceof ChannelBuffer)) {
// Type unimplemented
throw new InvalidClassException("Wrong object received in "+this.getClass().getName()+" codec "+o.getClass().getName());
}
ChannelBuffer dataBlock = (ChannelBuffer) o;
return dataBlock.readableBytes();
}
/* (non-Javadoc)
* @see org.jboss.netty.channel.SimpleChannelHandler#messageReceived(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.channel.MessageEvent)
*/
@Override
public void messageReceived(ChannelHandlerContext arg0, MessageEvent arg1) throws Exception {
long size = this.getMessageSize(arg1);
if (this.channelPerformanceCounter != null) {
this.channelPerformanceCounter.setReceivedBytes(arg0, size);
}
if (this.globalPerformanceCounter != null) {
this.globalPerformanceCounter.setReceivedBytes(arg0, size);
}
// The message is then just passed to the next Codec
super.messageReceived(arg0, arg1);
}
/* (non-Javadoc)
* @see org.jboss.netty.channel.SimpleChannelHandler#writeRequested(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.channel.MessageEvent)
*/
@Override
public void writeRequested(ChannelHandlerContext arg0, MessageEvent arg1) throws Exception {
long size = this.getMessageSize(arg1);
if (this.channelPerformanceCounter != null) {
this.channelPerformanceCounter.setToWriteBytes(size);
}
if (this.globalPerformanceCounter != null) {
this.globalPerformanceCounter.setToWriteBytes(size);
}
// The message is then just passed to the next Codec
super.writeRequested(arg0, arg1);
}
/* (non-Javadoc)
* @see org.jboss.netty.channel.SimpleChannelHandler#channelClosed(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.channel.ChannelStateEvent)
*/
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
if (this.channelPerformanceCounter != null) {
this.channelPerformanceCounter.stopMonitoring();
this.channelPerformanceCounter = null;
}
super.channelClosed(ctx, e);
}
/* (non-Javadoc)
* @see org.jboss.netty.channel.SimpleChannelHandler#channelConnected(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.channel.ChannelStateEvent)
*/
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
// readSuspended = true;
ctx.setAttachment(Boolean.TRUE);
ctx.getChannel().setReadable(false);
if ((this.channelPerformanceCounter == null) && (this.factory != null)) {
// A factory was used
this.channelPerformanceCounter = this.factory.createChannelPerformanceCounter(ctx.getChannel());
}
if (this.channelPerformanceCounter != null) {
this.channelPerformanceCounter.setMonitoredChannel(ctx.getChannel());
this.channelPerformanceCounter.startMonitoring();
}
super.channelConnected(ctx, e);
// readSuspended = false;
ctx.setAttachment(null);
ctx.getChannel().setReadable(true);
}
@Override
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
ChannelStateEvent cse = (ChannelStateEvent) e;
if (cse.getState() == ChannelState.INTEREST_OPS &&
(((Integer) cse.getValue()).intValue() & Channel.OP_READ) != 0) {
// setReadable(true) requested
boolean readSuspended = ctx.getAttachment() != null;
if (readSuspended) {
// Drop the request silently if PerformanceCounter has
// set the flag.
e.getFuture().setSuccess();
return;
}
}
}
super.handleDownstream(ctx, e);
}
/**
*
* @return the current ChannelPerformanceCounter set from the factory (if channel is still connected) or null if this function was disabled in the Factory
*/
public PerformanceCounter getChannelPerformanceCounter() {
return this.channelPerformanceCounter;
}
/**
*
* @return the GlobalPerformanceCounter from the factory or null if this function was disabled in the Factory
*/
public PerformanceCounter getGlobalPerformanceCounter() {
return this.globalPerformanceCounter;
}
}

View File

@ -0,0 +1,93 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2008, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
/**
* Implementation of a Traffic Shaping Handler and Dynamic Statistics.<br>
* <br><br>
*
*
* The main goal of this package is to allow to shape the traffic (bandwidth limitation),
* but also to get statistics on how many bytes are read or written. Both functions can
* be active or inactive (traffic or statistics).<br><br><br>
*
* Three classes implement this behaviour:<br><br>
*
* 1) <tt>PerformanceCounter</tt>: this class is the kernel of the package. It can be accessed to get some extra information
* like the read or write bytes since last check, the read and write bandwidth from last check...<br><br>
*
* 2) <tt>PerformanceCounterFactory</tt>: this class has to be implemented in your code in order to implement (eventually empty)
* the accounting method. This class is a Factory for PerformanceCounter which is used in the third class to create the
* necessary PerformanceCounter according to your specifications.<br><br>
*
* 3) <tt>TrafficShapingHandler</tt>: this class is the handler to be inserted in your pipeline. The insertion can be wherever
* you want, but <b>it must be placed before any MemoryAwareThreadPoolExecutor</b> in your pipeline.<br><br>
* <b><i>It is really recommanded
* to have such a MemoryAwareThreadPoolExecutor (either non ordered or OrderedMemoryAwareThreadPoolExecutor) in your pipeline</i></b>
* when you want to use this feature with some real traffic shaping, since it will allow to relax the constraint on
* NioWorker to do other jobs if necessary.<br>
* Instead, if you don't, you can have the following situation: if there are more clients
* connected and doing data transfer (either in read or write) than NioWorker, your global performance can be under your specifications or even
* sometimes it will block for a while which can turn to "timeout" operations.
* For instance, let says that you've got 2 NioWorkers, and 10 clients wants to send data to your server. If you set a bandwidth limitation
* of 100KB/s for each channel (client), you could have a final limitation of about 60KB/s for each channel since NioWorkers are
* stopping by this handler.<br><br>
* The method getMessageSize(MessageEvent) has to be implemented to specify what is the size of the object to be read or write
* accordingly to the type of object. In simple case, it can be as simple as a call to getChannelBufferMessageSize(MessageEvent).<br><br><br>
*
* Standard use could be as follow:<br><br>
*
* To activate or deactivate the traffic shaping, change the value corresponding to your desire as
* [Global or per Channel] [Write or Read] Limitation in byte/s.<br>
* PerformanceCounterFactory.NO_LIMIT (-1)
* stands for no limitation, so the traffic shaping is deactivate (on what you specified).<br>
* You can either change those values with the method changeConfiguration in PerformanceCounterFactory or
* directly from the PerformanceCounter method changeConfiguration.<br>
* <br><br>
*
* To activate or deactivate the statistics, you can adjust the delay to a low (not less than 200ms
* for efficiency reasons) or a high value (let say 24H in ms is huge enough to not get the problem)
* or even using PerformanceCounterFactory.NO_STAT (-1).<br>
* And if you don't want to do anything with this statistics, just implement an empty method for
* PerformanceCounterFactory.accounting(PerformanceCounter).<br>
* Again this can be changed either from PerformanceCounterFactory or directly in PerformanceCounter.<br><br><br>
*
* You can also completely deactivate channel or global PerformanceCounter by setting the boolean to false
* accordingly to your needs in the PerformanceCounterFactory. It will deactivate the global Monitor. For channels monitor,
* it will prevent new monitors to be created (or reversely they will be created for newly connected channels).<br><br><br>
*
* So in your application you will create your own PerformanceCounterFactory and setting the values to fit your needs.<br><br>
* <tt>MyPerformanceCounterFactory myFactory = new MyPerformanceCounter(...);</tt><br><br><br>
* Then you can create your pipeline (using a PipelineFactory since each TrafficShapingHandler must be unique by channel) and adding this handler before
* your MemoryAwareThreadPoolExecutor:<br><br>
* <tt>pipeline.addLast("MyTrafficShaping",new MyTrafficShapingHandler(myFactory));</tt><br>
* <tt>...</tt><br>
* <tt>pipeline.addLast("MemoryExecutor",new ExecutionHandler(memoryAwareThreadPoolExecutor));</tt><br><br><br>
*
* TrafficShapingHandler must be unique by channel but however it is still global due to
* the PerformanceCounterFactcory that is shared between all handlers accross the channels.<br><br>
*
*
*
* @apiviz.exclude ^java\.lang\.
*/
package org.jboss.netty.handler.trafficshaping;