Removed the traffic shaper from 3.1 - rescheduled to 3.2.

This commit is contained in:
Trustin Lee 2009-04-15 08:06:42 +00:00
parent bb6db4baf8
commit 5966c93cfe
4 changed files with 0 additions and 1480 deletions

View File

@ -1,668 +0,0 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2009, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.handler.traffic;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ExternalResourceReleasable;
/**
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Frederic Bregier (fredbregier@free.fr)
* @version $Rev$, $Date$
*
* TrafficCounter is associated with {@link TrafficShapingHandler} and
* should be created through a {@link TrafficCounterFactory}.<br>
* <br>
* A TrafficCounter can limit the traffic or not, globally or per channel,
* and always compute statistics on read and written bytes at the specified
* interval.
*
*/
public class TrafficCounter implements ExternalResourceReleasable {
// XXX: Should the constructor be package private?
// We already have TrafficCounterFactory.newChannelTrafficCounter.
// XXX: Should TrafficCounter be able to be instantiated without TrafficCounterFactory?
/**
* Internal logger
*/
private static InternalLogger logger = InternalLoggerFactory
.getInstance(TrafficCounter.class);
/**
* Current writing bytes
*/
private final AtomicLong currentWritingBytes = new AtomicLong(0);
/**
* Current reading bytes
*/
private final AtomicLong currentReadingBytes = new AtomicLong(0);
/**
* Long life writing bytes
*/
private final AtomicLong cumulativeWrittenBytes = new AtomicLong(0);
/**
* Long life reading bytes
*/
private final AtomicLong cumulativeReadBytes = new AtomicLong(0);
/**
* Last Time where cumulative bytes where reset to zero
*/
private long lastCumulativeTime;
/**
* Last writing bandwidth
*/
private long lastWriteThroughput = 0;
/**
* Last reading bandwidth
*/
private long lastReadThroughput = 0;
/**
* Last Time Check taken
*/
private final AtomicLong lastTime = new AtomicLong(0);
/**
* Last written bytes number
*/
private long lastWrittenBytes = 0;
/**
* Last read bytes number
*/
private long lastReadBytes = 0;
/**
* Current Limit in B/s to apply to write
*/
private long writeLimit = 0;
/**
* Current Limit in B/s to apply to read
*/
private long readLimit = 0;
/**
* Delay between two capture
*/
private long checkInterval = TrafficCounterFactory.DEFAULT_CHECK_INTERVAL;
// default 1 s
/**
* Name of this Monitor
*/
private final String name;
/**
* Is this monitor for a channel monitoring or for global monitoring
*/
private boolean isPerChannel = false;
/**
* Associated monitoredChannel if any (global MUST NOT have any)
*/
protected Channel monitoredChannel = null;
/**
* The associated TrafficCounterFactory
*/
private TrafficCounterFactory factory = null;
/**
* Default ExecutorService
*/
private ExecutorService executorService = null;
/**
* Thread that will host this monitor
*/
private Future<?> monitorFuture = null;
/**
* Class to implement monitoring at fix delay
*
*/
private class TrafficMonitoring implements Runnable {
/**
* Delay between two capture
*/
private final long checkInterval1;
/**
* The associated TrafficCounterFactory
*/
private final TrafficCounterFactory factory1;
/**
* The associated TrafficCounter
*/
private final TrafficCounter counter;
/**
* @param checkInterval
* @param factory
* @param counter
*/
protected TrafficMonitoring(long checkInterval,
TrafficCounterFactory factory, TrafficCounter counter) {
checkInterval1 = checkInterval;
factory1 = factory;
this.counter = counter;
}
/**
* Default run
*/
public void run() {
try {
for (;;) {
if (checkInterval1 > 0) {
Thread.sleep(checkInterval1);
} else {
// Delay goes to TrafficCounterFactory.NO_STAT, so exit
return;
}
long endTime = System.currentTimeMillis();
counter.resetAccounting(endTime);
if (factory1 != null) {
factory1.doAccounting(counter);
}
}
} catch (InterruptedException e) {
// End of computations
}
}
}
/**
* Start the monitoring process
*
*/
public void start() {
synchronized (lastTime) {
if (monitorFuture != null) {
return;
}
lastTime.set(System.currentTimeMillis());
if (checkInterval > 0) {
monitorFuture =
executorService.submit(new TrafficMonitoring(checkInterval,
factory, this));
}
}
}
/**
* Stop the monitoring process
*
*/
public void stop() {
synchronized (lastTime) {
if (monitorFuture == null) {
return;
}
monitorFuture.cancel(true);
monitorFuture = null;
resetAccounting(System.currentTimeMillis());
if (factory != null) {
factory.doAccounting(this);
}
setMonitoredChannel(null);
}
}
/**
* Set the accounting on Read and Write
*
* @param newLastTime
*/
void resetAccounting(long newLastTime) {
synchronized (lastTime) {
long interval = newLastTime - lastTime.getAndSet(newLastTime);
if (interval == 0) {
// nothing to do
return;
}
lastReadBytes = currentReadingBytes.getAndSet(0);
lastWrittenBytes = currentWritingBytes.getAndSet(0);
lastReadThroughput = lastReadBytes / interval * 1000;
// nb byte / checkInterval in ms * 1000 (1s)
lastWriteThroughput = lastWrittenBytes / interval * 1000;
// nb byte / checkInterval in ms * 1000 (1s)
}
}
/**
* Constructor with the executorService to use, the channel if any, its
* name, the limits in Byte/s (not Bit/s) and the checkInterval between two
* computations in millisecond
*
* @param factory
* the associated TrafficCounterFactory
* @param executorService
* Should be a CachedThreadPool for efficiency
* @param channel
* Not null means this monitors will be for this channel only,
* else it will be for global monitoring. Channel can be set
* later on therefore changing its behavior from global to per
* channel
* @param name
* the name given to this monitor
* @param writeLimit
* the write limit in Byte/s
* @param readLimit
* the read limit in Byte/s
* @param checkInterval
* the checkInterval in millisecond between two computations
*/
public TrafficCounter(TrafficCounterFactory factory,
ExecutorService executorService, Channel channel, String name,
long writeLimit, long readLimit, long checkInterval) {
this.factory = factory;
this.executorService = executorService;
this.name = name;
lastCumulativeTime = System.currentTimeMillis();
this.configure(channel, writeLimit, readLimit, checkInterval);
}
/**
* Set the Session monitoredChannel (not for Global Monitor)
*
* @param channel
* Not null means this monitors will be for this channel only,
* else it will be for global monitoring. Channel can be set
* later on therefore changing its behavior from global to per
* channel
*/
void setMonitoredChannel(Channel channel) {
if (channel != null) {
monitoredChannel = channel;
isPerChannel = true;
} else {
isPerChannel = false;
monitoredChannel = null;
}
}
/**
* Specifies limits in Byte/s (not Bit/s) but do not changed the checkInterval
*
* @param channel
* Not null means this monitors will be for this channel only,
* else it will be for global monitoring. Channel can be set
* later on therefore changing its behavior from global to per
* channel
* @param newwriteLimit
* @param newreadLimit
*/
public void configure(Channel channel, long newwriteLimit,
long newreadLimit) {
this.writeLimit = newwriteLimit;
this.readLimit = newreadLimit;
setMonitoredChannel(channel);
}
/**
* Specifies limits in Byte/s (not Bit/s) and the specified checkInterval between
* two computations in millisecond
*
* @param channel
* Not null means this monitors will be for this channel only,
* else it will be for global monitoring. Channel can be set
* later on therefore changing its behavior from global to per
* channel
* @param newwriteLimit
* @param newreadLimit
* @param newcheckInterval
*/
public void configure(Channel channel, long newwriteLimit,
long newreadLimit, long newcheckInterval) {
if (this.checkInterval != newcheckInterval) {
this.checkInterval = newcheckInterval;
if (monitorFuture == null) {
this.configure(channel, newwriteLimit, newreadLimit);
return;
}
stop();
if (newcheckInterval > 0) {
start();
} else {
// No more active monitoring
lastTime.set(System.currentTimeMillis());
}
}
this.configure(channel, newwriteLimit, newreadLimit);
}
/**
* Specifies limits in Byte/s (not Bit/s) but do not changed the checkInterval
*
* @param newwriteLimit
* @param newreadLimit
*/
public void configure(long newwriteLimit,
long newreadLimit) {
this.writeLimit = newwriteLimit;
this.readLimit = newreadLimit;
}
/**
* Specifies limits in Byte/s (not Bit/s) and the specified checkInterval between
* two computations in millisecond
*
* @param newwriteLimit
* @param newreadLimit
* @param newcheckInterval
*/
public void configure(long newwriteLimit,
long newreadLimit, long newcheckInterval) {
if (this.checkInterval != newcheckInterval) {
this.checkInterval = newcheckInterval;
if (monitorFuture == null) {
this.configure(newwriteLimit, newreadLimit);
return;
}
stop();
if (newcheckInterval > 0) {
start();
} else {
// No more active monitoring
lastTime.set(System.currentTimeMillis());
}
}
this.configure(newwriteLimit, newreadLimit);
}
/**
*
* @return the time that should be necessary to wait to respect limit. Can
* be negative time
*/
private long getReadTimeToWait() {
synchronized (lastTime) {
long interval = System.currentTimeMillis() - lastTime.get();
if (interval == 0) {
// Time is too short, so just lets continue
return 0;
}
long wait = currentReadingBytes.get() * 1000 / readLimit -
interval;
return wait;
}
}
/**
*
* @return the time that should be necessary to wait to respect limit. Can
* be negative time
*/
private long getWriteTimeToWait() {
synchronized (lastTime) {
long interval = System.currentTimeMillis() - lastTime.get();
if (interval == 0) {
// Time is too short, so just lets continue
return 0;
}
long wait = currentWritingBytes.get() * 1000 /
writeLimit - interval;
return wait;
}
}
/**
* Class to implement setReadable at fix time
*
*/
private class ReopenRead implements Runnable {
/**
* Associated ChannelHandlerContext
*/
private ChannelHandlerContext ctx = null;
/**
* Monitor
*/
private TrafficCounter monitor = null;
/**
* Time to wait before clearing the channel
*/
private long timeToWait = 0;
/**
* @param monitor
* @param ctx
* the associated channelHandlerContext
* @param timeToWait
*/
protected ReopenRead(ChannelHandlerContext ctx,
TrafficCounter monitor, long timeToWait) {
this.ctx = ctx;
this.monitor = monitor;
this.timeToWait = timeToWait;
}
/**
* Truly run the waken up of the channel
*/
public void run() {
try {
Thread.sleep(timeToWait);
} catch (InterruptedException e) {
// interruption so exit
return;
}
// logger.info("WAKEUP!");
if (monitor != null &&
monitor.monitoredChannel != null &&
monitor.monitoredChannel.isConnected()) {
// logger.warn(" setReadable TRUE: "+timeToWait);
if (ctx.getHandler() instanceof TrafficShapingHandler) {
// readSuspended = false;
ctx.setAttachment(null);
}
monitor.monitoredChannel.setReadable(true);
}
}
}
/**
* If Read is in excess, it will block the read on channel or block until it
* will be ready again.
*
* @param ctx
* the associated channelHandlerContext
* @param recv
* the size in bytes to read
* @throws InterruptedException
*/
void bytesRecvFlowControl(ChannelHandlerContext ctx, long recv)
throws InterruptedException {
currentReadingBytes.addAndGet(recv);
cumulativeReadBytes.addAndGet(recv);
if (readLimit == 0) {
// no action
return;
}
if (isPerChannel && monitoredChannel != null &&
!monitoredChannel.isConnected()) {
// no action can be taken since setReadable will throw a
// NotYetConnected
return;
}
// compute the number of ms to wait before reopening the channel
long wait = getReadTimeToWait();
if (wait > 20) { // At least 20ms seems a minimal time in order to
// try to limit the traffic
if (isPerChannel && monitoredChannel != null &&
monitoredChannel.isConnected()) {
// Channel version
if (executorService == null) {
// Sleep since no executor
Thread.sleep(wait);
return;
}
if (ctx.getAttachment() == null) {
if (ctx.getHandler() instanceof TrafficShapingHandler) {
// readSuspended = true;
ctx.setAttachment(Boolean.TRUE);
}
monitoredChannel.setReadable(false);
// logger.info("Read will wakeup after "+wait+" ms "+this);
executorService
.submit(new ReopenRead(ctx, this, wait));
} else {
// should be waiting: but can occurs sometime so as a FIX
logger.info("Read sleep ok but should not be here");
Thread.sleep(wait);
}
} else {
// Global version
// logger.info("Read sleep "+wait+" ms for "+this);
Thread.sleep(wait);
}
}
}
/**
* If Write is in excess, it will block the write operation until it will be
* ready again.
*
* @param write
* the size in bytes to write
* @throws InterruptedException
*/
void bytesWriteFlowControl(long write) throws InterruptedException {
currentWritingBytes.addAndGet(write);
cumulativeWrittenBytes.addAndGet(write);
if (writeLimit == 0) {
return;
}
// compute the number of ms to wait before continue with the channel
long wait = getWriteTimeToWait();
if (wait > 20) {
// Global or Session
Thread.sleep(wait);
}
}
/**
*
* @return the current checkInterval between two computations of traffic counter
* in millisecond
*/
public long getCheckInterval() {
return checkInterval;
}
/**
*
* @return the current Read Throughput in byte/s
*/
public long getLastReadThroughput() {
return lastReadThroughput;
}
/**
*
* @return the current Write Throughput in byte/s
*/
public long getLastWriteThroughput() {
return lastWriteThroughput;
}
/**
*
* @return the current number of byte read since last checkInterval
*/
public long getLastReadBytes() {
return lastReadBytes;
}
/**
*
* @return the current number of byte written since last checkInterval
*/
public long getLastWrittenBytes() {
return lastWrittenBytes;
}
/**
* @return the cumulativeWrittenBytes
*/
public long getCumulativeWrittenBytes() {
return cumulativeWrittenBytes.get();
}
/**
* @return the cumulativeReadBytes
*/
public long getCumulativeReadBytes() {
return cumulativeReadBytes.get();
}
/**
* @return the lastCumulativeTime in millisecond as of System.currentTimeMillis()
* when the cumulative counters were reset to 0.
*/
public long getLastCumulativeTime() {
return lastCumulativeTime;
}
/**
* Reset both read and written cumulative bytes counters and the associated time.
*/
public void resetCumulativeTime() {
lastCumulativeTime = System.currentTimeMillis();
cumulativeReadBytes.set(0);
cumulativeWrittenBytes.set(0);
}
/**
* String information
*/
@Override
public String toString() {
return "Monitor " + name + " Current Speed Read: " +
(lastReadThroughput >> 10) + " KB/s, Write: " +
(lastWriteThroughput >> 10) + " KB/s Current Read: " +
(currentReadingBytes.get() >> 10) + " KB Current Write: " +
(currentWritingBytes.get() >> 10) + " KB";
}
/* (non-Javadoc)
* @see org.jboss.netty.util.ExternalResourceReleasable#releaseExternalResources()
*/
public void releaseExternalResources() {
// Nothing to do: done in TrafficCounterFactory
}
}

View File

@ -1,492 +0,0 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2009, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.handler.traffic;
import java.util.concurrent.ExecutorService;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.util.ExternalResourceReleasable;
import org.jboss.netty.util.internal.ExecutorUtil;
/**
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Frederic Bregier (fredbregier@free.fr)
* @version $Rev$, $Date$
*
* The {@link TrafficCounterFactory} is a Factory for
* {@link TrafficCounter}. It stores the necessary information to enable
* dynamic creation of {@link TrafficCounter} inside the
* {@link TrafficShapingHandler}.
*
*
*/
public class TrafficCounterFactory implements ExternalResourceReleasable {
// FIXME: Use Executor instead of ExecutorService
// TODO: Read/write limit needs to be configurable on a per-channel basis.
/**
* Default delay between two checks: 1s
*/
public static long DEFAULT_CHECK_INTERVAL = 1000;
/**
* ExecutorService to associated to any TrafficCounter
*/
private ExecutorService executorService = null;
/**
* Limit in B/s to apply to write for all channel TrafficCounter
*/
private long channelWriteLimit = 0;
/**
* Limit in B/s to apply to read for all channel TrafficCounter
*/
private long channelReadLimit = 0;
/**
* Delay between two performance snapshots for channel
*/
private long channelCheckInterval = DEFAULT_CHECK_INTERVAL; // default 1 s
/**
* Will the TrafficCounter for Channel be active
*/
private boolean channelActive = true;
/**
* Limit in B/s to apply to write for the global TrafficCounter
*/
private long globalWriteLimit = 0;
/**
* Limit in B/s to apply to read for the global TrafficCounter
*/
private long globalReadLimit = 0;
/**
* Delay between two performance snapshots for global
*/
private long globalCheckInterval = DEFAULT_CHECK_INTERVAL; // default 1 s
/**
* Will the TrafficCounter for Global be active
*/
private boolean globalActive = true;
/**
* Global Monitor
*/
private TrafficCounter globalTrafficMonitor = null;
/**
* Called each time the accounting is computed for the TrafficCounters.
* This method could be used for instance to implement real time accounting.
*
* @param counter
* the TrafficCounter that computes its performance
*/
protected void doAccounting(TrafficCounter counter) {
// NOOP by default
}
/**
*
* @param newexecutorService
* @param newChannelActive
* @param newChannelWriteLimit
* @param newChannelReadLimit
* @param newChannelCheckInterval
* @param newGlobalActive
* @param newGlobalWriteLimit
* @param newGlobalReadLimit
* @param newGlobalCheckInterval
*/
private void init(ExecutorService newexecutorService,
boolean newChannelActive, long newChannelWriteLimit,
long newChannelReadLimit, long newChannelCheckInterval,
boolean newGlobalActive, long newGlobalWriteLimit,
long newGlobalReadLimit, long newGlobalCheckInterval) {
executorService = newexecutorService;
channelActive = newChannelActive;
channelWriteLimit = newChannelWriteLimit;
channelReadLimit = newChannelReadLimit;
channelCheckInterval = newChannelCheckInterval;
globalActive = newGlobalActive;
globalWriteLimit = newGlobalWriteLimit;
globalReadLimit = newGlobalReadLimit;
globalCheckInterval = newGlobalCheckInterval;
}
/**
* Full constructor
*
* @param executorService
* created for instance like Executors.newCachedThreadPool
* @param channelActive
* True if each channel will have a TrafficCounter
* @param channelWriteLimit
* 0 or a limit in bytes/s
* @param channelReadLimit
* 0 or a limit in bytes/s
* @param channelCheckInterval
* The delay between two computations of performances for
* channels or 0 if no stats are to be computed
* @param globalActive
* True if global context will have one unique TrafficCounter
* @param globalWriteLimit
* 0 or a limit in bytes/s
* @param globalReadLimit
* 0 or a limit in bytes/s
* @param globalCheckInterval
* The delay between two computations of performances for global
* context or 0 if no stats are to be computed
*/
public TrafficCounterFactory(ExecutorService executorService,
boolean channelActive, long channelWriteLimit,
long channelReadLimit, long channelCheckInterval, boolean globalActive,
long globalWriteLimit, long globalReadLimit, long globalCheckInterval) {
init(executorService, channelActive, channelWriteLimit,
channelReadLimit, channelCheckInterval, globalActive, globalWriteLimit,
globalReadLimit, globalCheckInterval);
}
/**
* Constructor using default Delay
*
* @param executorService
* created for instance like Executors.newCachedThreadPool
* @param channelActive
* True if each channel will have a TrafficCounter
* @param channelWriteLimit
* 0 or a limit in bytes/s
* @param channelReadLimit
* 0 or a limit in bytes/s
* @param globalActive
* True if global context will have one unique TrafficCounter
* @param globalWriteLimit
* 0 or a limit in bytes/s
* @param globalReadLimit
* 0 or a limit in bytes/s
*/
public TrafficCounterFactory(ExecutorService executorService,
boolean channelActive, long channelWriteLimit,
long channelReadLimit, boolean globalActive, long globalWriteLimit,
long globalReadLimit) {
init(executorService, channelActive, channelWriteLimit,
channelReadLimit, DEFAULT_CHECK_INTERVAL, globalActive,
globalWriteLimit, globalReadLimit, DEFAULT_CHECK_INTERVAL);
}
/**
* Constructor using NO LIMIT and default delay for channels
*
* @param executorService
* created for instance like Executors.newCachedThreadPool
* @param channelActive
* True if each channel will have a TrafficCounter
* @param globalActive
* True if global context will have one unique TrafficCounter
* @param globalWriteLimit
* 0 or a limit in bytes/s
* @param globalReadLimit
* 0 or a limit in bytes/s
* @param globalCheckInterval
* The delay between two computations of performances for global
* context or NO_STAT if no stats are to be computed
*/
public TrafficCounterFactory(ExecutorService executorService,
boolean channelActive, boolean globalActive, long globalWriteLimit,
long globalReadLimit, long globalCheckInterval) {
init(executorService, channelActive, 0, 0,
DEFAULT_CHECK_INTERVAL, globalActive, globalWriteLimit, globalReadLimit,
globalCheckInterval);
}
/**
* Constructor using NO LIMIT for channels and default delay for all
*
* @param executorService
* created for instance like Executors.newCachedThreadPool
* @param channelActive
* True if each channel will have a TrafficCounter
* @param globalActive
* True if global context will have one unique TrafficCounter
* @param globalWriteLimit
* 0 or a limit in bytes/s
* @param globalReadLimit
* 0 or a limit in bytes/s
*/
public TrafficCounterFactory(ExecutorService executorService,
boolean channelActive, boolean globalActive, long globalWriteLimit,
long globalReadLimit) {
init(executorService, channelActive, 0, 0,
DEFAULT_CHECK_INTERVAL, globalActive, globalWriteLimit, globalReadLimit,
DEFAULT_CHECK_INTERVAL);
}
/**
* Constructor using NO LIMIT and default delay for all
*
* @param executorService
* created for instance like Executors.newCachedThreadPool
* @param channelActive
* True if each channel will have a TrafficCounter
* @param globalActive
* True if global context will have one unique TrafficCounter
*/
public TrafficCounterFactory(ExecutorService executorService,
boolean channelActive, boolean globalActive) {
init(executorService, channelActive, 0, 0,
DEFAULT_CHECK_INTERVAL, globalActive, 0, 0, DEFAULT_CHECK_INTERVAL);
}
/**
* Enable to change the active status of TrafficCounter on Channels (for
* new one only)
*
* @param active
*/
public void setChannelActive(boolean active) {
channelActive = active;
}
/**
* Enable to change the active status of TrafficCounter on Global (stop
* or start if necessary)
*
* @param active
*/
public void setGlobalActive(boolean active) {
if (globalActive) {
if (!active) {
stopGlobalTrafficCounter();
}
}
globalActive = active;
getGlobalTrafficCounter();
}
/**
* Change the underlying limitations. Only Global TrafficCounter (if
* any) is dynamically changed, but Channels TrafficCounters are not
* changed, only new created ones.
*
* @param newchannelWriteLimit
* @param newchannelReadLimit
* @param newchannelCheckInterval
* @param newglobalWriteLimit
* @param newglobalReadLimit
* @param newGlobalCheckInterval
*/
public void configure(long newchannelWriteLimit,
long newchannelReadLimit, long newchannelCheckInterval,
long newglobalWriteLimit, long newglobalReadLimit,
long newGlobalCheckInterval) {
channelWriteLimit = newchannelWriteLimit;
channelReadLimit = newchannelReadLimit;
channelCheckInterval = newchannelCheckInterval;
globalWriteLimit = newglobalWriteLimit;
globalReadLimit = newglobalReadLimit;
globalCheckInterval = newGlobalCheckInterval;
if (globalTrafficMonitor != null) {
globalTrafficMonitor.configure(null,
newglobalWriteLimit, newglobalReadLimit, newGlobalCheckInterval);
}
}
/**
* @return the Global TrafficCounter or null if this support is disabled
*/
public TrafficCounter getGlobalTrafficCounter() {
if (globalActive) {
if (globalTrafficMonitor == null) {
globalTrafficMonitor = new TrafficCounter(this,
executorService, null, "GlobalPC",
globalWriteLimit, globalReadLimit,
globalCheckInterval);
globalTrafficMonitor.start();
}
}
return globalTrafficMonitor;
}
/**
* @param ctx
* @param e
* @return a new TrafficCount for the given channel or null if none are required
*
* @throws UnsupportedOperationException if per-channel counter is disabled
*/
public TrafficCounter newChannelTrafficCounter(ChannelHandlerContext ctx, ChannelStateEvent e)
throws UnsupportedOperationException {
Channel channel = ctx.getChannel();
if (channelActive && (channelReadLimit > 0 || channelWriteLimit > 0
|| channelCheckInterval > 0)) {
return new TrafficCounter(this, executorService, channel,
"ChannelTC" + channel.getId(), channelWriteLimit,
channelReadLimit, channelCheckInterval);
}
throw new UnsupportedOperationException("per-channel counter disabled");
}
/**
* Stop the global TrafficCounter if any (Even it is stopped, the
* factory can however be reused)
*
*/
public void stopGlobalTrafficCounter() {
if (globalTrafficMonitor != null) {
globalTrafficMonitor.stop();
globalTrafficMonitor = null;
}
}
/**
* @return the channelCheckInterval
*/
public long getChannelCheckInterval() {
return channelCheckInterval;
}
/**
* @param channelCheckInterval
* the channelCheckInterval to set
*/
public void setChannelCheckInterval(long channelCheckInterval) {
this.channelCheckInterval = channelCheckInterval;
}
/**
* @return the channelReadLimit
*/
public long getChannelReadLimit() {
return channelReadLimit;
}
/**
* @param channelReadLimit
* the channelReadLimit to set
*/
public void setChannelReadLimit(long channelReadLimit) {
this.channelReadLimit = channelReadLimit;
}
/**
* @return the channelWriteLimit
*/
public long getChannelWriteLimit() {
return channelWriteLimit;
}
/**
* @param channelWriteLimit
* the channelWriteLimit to set
*/
public void setChannelWriteLimit(long channelWriteLimit) {
this.channelWriteLimit = channelWriteLimit;
}
/**
* @return the globalCheckInterval
*/
public long getGlobalCheckInterval() {
return globalCheckInterval;
}
/**
* @param globalCheckInterval
* the globalCheckInterval to set
*/
public void setGlobalCheckInterval(long globalCheckInterval) {
this.globalCheckInterval = globalCheckInterval;
if (globalTrafficMonitor != null) {
globalTrafficMonitor.configure(null,
globalWriteLimit, globalReadLimit,
globalCheckInterval);
}
}
/**
* @return the globalReadLimit
*/
public long getGlobalReadLimit() {
return globalReadLimit;
}
/**
* @param globalReadLimit
* the globalReadLimit to set
*/
public void setGlobalReadLimit(long globalReadLimit) {
this.globalReadLimit = globalReadLimit;
if (globalTrafficMonitor != null) {
globalTrafficMonitor.configure(null,
globalWriteLimit, this.globalReadLimit,
globalCheckInterval);
}
}
/**
* @return the globalWriteLimit
*/
public long getGlobalWriteLimit() {
return globalWriteLimit;
}
/**
* @param globalWriteLimit
* the globalWriteLimit to set
*/
public void setGlobalWriteLimit(long globalWriteLimit) {
this.globalWriteLimit = globalWriteLimit;
if (globalTrafficMonitor != null) {
globalTrafficMonitor.configure(null,
this.globalWriteLimit, globalReadLimit,
globalCheckInterval);
}
}
/**
* @return the channelActive
*/
public boolean isChannelActive() {
return channelActive;
}
/**
* @return the globalActive
*/
public boolean isGlobalActive() {
return globalActive;
}
/* (non-Javadoc)
* @see org.jboss.netty.util.ExternalResourceReleasable#releaseExternalResources()
*/
public void releaseExternalResources() {
ExecutorUtil.terminate(this.executorService);
}
}

View File

@ -1,218 +0,0 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2009, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.handler.traffic;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.util.DefaultObjectSizeEstimator;
import org.jboss.netty.util.ObjectSizeEstimator;
/**
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Frederic Bregier (fredbregier@free.fr)
* @version $Rev$, $Date$
*
* TrafficShapingHandler allows to limit the global bandwidth or per session
* bandwidth, as traffic shaping.<br>
* <br>
*
* An {@link ObjectSizeEstimator} can be passed at construction to specify what
* is the size of the object to be read or write accordingly to the type of
* object. If not specified, it will used the {@link DefaultObjectSizeEstimator} implementation.<br>
* <br>
*
* TrafficShapingHandler depends on {@link TrafficCounterFactory} to create
* or use the necessary {@link TrafficCounter} with the necessary options.
* However, you can change the behavior of both global and channel
* TrafficCounter if you like by getting them from this handler and changing
* their status.
*
*/
@ChannelPipelineCoverage("one")
public class TrafficShapingHandler extends SimpleChannelHandler {
/**
* Channel Monitor
*/
private TrafficCounter channelTrafficCounter = null;
/**
* Global Monitor
*/
private TrafficCounter globalTrafficCounter = null;
/**
* Factory if used
*/
private TrafficCounterFactory factory = null;
/**
* ObjectSizeEstimator
*/
private ObjectSizeEstimator objectSizeEstimator = null;
/**
* Constructor using default {@link ObjectSizeEstimator}
*
* @param factory
* the TrafficCounterFactory from which all Monitors will be
* created
*/
public TrafficShapingHandler(TrafficCounterFactory factory) {
super();
this.factory = factory;
globalTrafficCounter = this.factory
.getGlobalTrafficCounter();
// will be set when connected is called
channelTrafficCounter = null;
objectSizeEstimator = new DefaultObjectSizeEstimator();
}
/**
* Constructor using the specified ObjectSizeEstimator
*
* @param factory
* the TrafficCounterFactory from which all Monitors will be
* created
* @param objectSizeEstimator
* the {@link ObjectSizeEstimator} that will be used to compute
* the size of the message
*/
public TrafficShapingHandler(TrafficCounterFactory factory, ObjectSizeEstimator objectSizeEstimator) {
super();
this.factory = factory;
globalTrafficCounter = this.factory
.getGlobalTrafficCounter();
// will be set when connected is called
channelTrafficCounter = null;
this.objectSizeEstimator = objectSizeEstimator;
}
@Override
public void messageReceived(ChannelHandlerContext arg0, MessageEvent arg1)
throws Exception {
long size = objectSizeEstimator.estimateSize(arg1.getMessage());
if (channelTrafficCounter != null) {
channelTrafficCounter.bytesRecvFlowControl(arg0, size);
}
if (globalTrafficCounter != null) {
globalTrafficCounter.bytesRecvFlowControl(arg0, size);
}
// The message is then just passed to the next Codec
super.messageReceived(arg0, arg1);
}
@Override
public void writeRequested(ChannelHandlerContext arg0, MessageEvent arg1)
throws Exception {
long size = objectSizeEstimator.estimateSize(arg1.getMessage());
if (channelTrafficCounter != null) {
channelTrafficCounter.bytesWriteFlowControl(size);
}
if (globalTrafficCounter != null) {
globalTrafficCounter.bytesWriteFlowControl(size);
}
// The message is then just passed to the next Codec
super.writeRequested(arg0, arg1);
}
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
if (channelTrafficCounter != null) {
channelTrafficCounter.stop();
channelTrafficCounter = null;
}
super.channelClosed(ctx, e);
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
// readSuspended = true;
ctx.setAttachment(Boolean.TRUE);
ctx.getChannel().setReadable(false);
if (channelTrafficCounter == null && factory != null) {
// A factory was used
try {
channelTrafficCounter =
factory.newChannelTrafficCounter(ctx,e);
} catch (UnsupportedOperationException e1) {
// No Traffic Counter for this Channel but continue however!
channelTrafficCounter = null;
}
}
if (channelTrafficCounter != null) {
channelTrafficCounter
.setMonitoredChannel(ctx.getChannel());
channelTrafficCounter.start();
}
super.channelConnected(ctx, e);
// readSuspended = false;
ctx.setAttachment(null);
ctx.getChannel().setReadable(true);
}
@Override
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
if (e instanceof ChannelStateEvent) {
ChannelStateEvent cse = (ChannelStateEvent) e;
if (cse.getState() == ChannelState.INTEREST_OPS &&
(((Integer) cse.getValue()).intValue() & Channel.OP_READ) != 0) {
// setReadable(true) requested
boolean readSuspended = ctx.getAttachment() != null;
if (readSuspended) {
// Drop the request silently if PerformanceCounter has
// set the flag.
e.getFuture().setSuccess();
return;
}
}
}
super.handleDownstream(ctx, e);
}
/**
*
* @return the current Channel TrafficCounter set from the factory (if
* channel is still connected) or null if this function was disabled
* in the Factory
*/
public TrafficCounter getChannelTrafficCounter() {
return channelTrafficCounter;
}
/**
*
* @return the Global TrafficCounter from the factory or null if this
* function was disabled in the Factory
*/
public TrafficCounter getGlobalTrafficCounter() {
return globalTrafficCounter;
}
}

View File

@ -1,102 +0,0 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2008, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
/**
* Implementation of a Traffic Shaping Handler and Dynamic Statistics.<br>
* <br><br>
*
*
* <P>The main goal of this package is to allow to shape the traffic (bandwidth limitation),
* but also to get statistics on how many bytes are read or written. Both functions can
* be active or inactive (traffic or statistics).</P>
*
* <P>Three classes implement this behavior:<br>
* <ul>
* <li> <tt>{@link TrafficCounter}</tt>: this class is the kernel of the package. It can be accessed to get some extra information
* like the read or write bytes since last check, the read and write bandwidth from last check...</li><br><br>
*
* <li> <tt>{@link TrafficCounterFactory}</tt>: this class has to be implemented in your code in order to implement (eventually empty)
* the accounting method. This class is a Factory for TrafficCounter which is used in the third class to create the
* necessary TrafficCounter according to your specifications.</li><br><br>
*
* <li> <tt>{@link TrafficShapingHandler}</tt>: this class is the handler to be inserted in your pipeline. The insertion can be wherever
* you want, but <b>it must be placed before any <tt>{@link MemoryAwareThreadPoolExecutor}</tt> in your pipeline</b>.</li><br>
* <b><i>It is really recommended
* to have such a</i> <tt>{@link MemoryAwareThreadPoolExecutor}</tt> <i>(either non ordered or </i>
* <tt>{@link OrderedMemoryAwareThreadPoolExecutor}</tt>
* <i>) in your pipeline</i></b>
* when you want to use this feature with some real traffic shaping, since it will allow to relax the constraint on
* NioWorker to do other jobs if necessary.<br>
* Instead, if you don't, you can have the following situation: if there are more clients
* connected and doing data transfer (either in read or write) than NioWorker, your global performance can be under your specifications or even
* sometimes it will block for a while which can turn to "timeout" operations.
* For instance, let says that you've got 2 NioWorkers, and 10 clients wants to send data to your server. If you set a bandwidth limitation
* of 100KB/s for each channel (client), you could have a final limitation of about 60KB/s for each channel since NioWorkers are
* stopping by this handler.<br>
* When it is used as a read traffic shaper, the handler will set the channel as not readable, so as to relax the NioWorkers.<br><br>
* An {@link ObjectSizeEstimator} can be passed at construction to specify what
* is the size of the object to be read or write accordingly to the type of
* object. If not specified, it will used the {@link DefaultObjectSizeEstimator} implementation.
* </ul></P>
*
* <P>Standard use could be as follow:</P>
*
* <P><ul>
* <li>To activate or deactivate the traffic shaping, change the value corresponding to your desire as
* [Global or per Channel] [Write or Read] Limitation in byte/s.</li><br>
* A value of <tt>0</tt>
* stands for no limitation, so the traffic shaping is deactivate (on what you specified).<br>
* You can either change those values with the method <tt>configure</tt> in TrafficCounterFactory or
* directly from the TrafficCounter method <tt>configure</tt>.<br>
* <br>
*
* <li>To activate or deactivate the statistics, you can adjust the delay to a low (not less than 200ms
* for efficiency reasons) or a high value (let say 24H in ms is huge enough to not get the problem)
* or even using <tt>0</tt> which means no computation will be done.</li><br>
* And if you don't want to do anything with this statistics, just implement an empty method for
* <tt>TrafficCounterFactory.accounting(TrafficCounter)</tt>.<br>
* Again this can be changed either from TrafficCounterFactory or directly in TrafficCounter.<br><br>
*
* <li>You can also completely deactivate channel or global TrafficCounter by setting the boolean to false
* accordingly to your needs in the TrafficCounterFactory. It will deactivate the global Monitor. For channels monitor,
* it will prevent new monitors to be created (or reversely they will be created for newly connected channels).</li>
* </ul></P><br><br>
*
* <P>So in your application you will create your own TrafficCounterFactory and setting the values to fit your needs.</P>
* <tt>MyTrafficCounterFactory myFactory = new MyTrafficCounter(...);</tt><br><br><br>
* <P>Then you can create your pipeline (using a PipelineFactory since each TrafficShapingHandler must be unique by channel) and adding this handler before
* your MemoryAwareThreadPoolExecutor:</P>
* <tt>pipeline.addLast("MyTrafficShaping",new MyTrafficShapingHandler(myFactory));</tt><br>
* <tt>...</tt><br>
* <tt>pipeline.addLast("MemoryExecutor",new ExecutionHandler(memoryAwareThreadPoolExecutor));</tt><br><br>
*
* <P>TrafficShapingHandler must be unique by channel but however it is still global due to
* the TrafficCounterFactcory that is shared between all handlers across the channels.</P>
*
*
*
* @apiviz.exclude ^java\.lang\.
*/
package org.jboss.netty.handler.traffic;