NETTY-363 Traffic shaper
* Merged Frederic's traffic shaper patch (needs some review and documentation)
This commit is contained in:
parent
96ba5819cc
commit
34b181236a
@ -0,0 +1,537 @@
|
||||
/*
|
||||
* Copyright 2009 Red Hat, Inc.
|
||||
*
|
||||
* Red Hat licenses this file to you under the Apache License, version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with the
|
||||
* License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.jboss.netty.handler.traffic;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelEvent;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ChannelState;
|
||||
import org.jboss.netty.channel.ChannelStateEvent;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
import org.jboss.netty.channel.SimpleChannelHandler;
|
||||
import org.jboss.netty.logging.InternalLogger;
|
||||
import org.jboss.netty.logging.InternalLoggerFactory;
|
||||
import org.jboss.netty.util.DefaultObjectSizeEstimator;
|
||||
import org.jboss.netty.util.ExternalResourceReleasable;
|
||||
import org.jboss.netty.util.ObjectSizeEstimator;
|
||||
import org.jboss.netty.util.internal.ExecutorUtil;
|
||||
|
||||
/**
|
||||
* AbstractTrafficShapingHandler allows to limit the global bandwidth
|
||||
* (see {@link GlobalTrafficShapingHandler}) or per session
|
||||
* bandwidth (see {@link ChannelTrafficShapingHandler}), as traffic shaping.
|
||||
* It allows too to implement an almost real time monitoring of the bandwidth using
|
||||
* the monitors from {@link TrafficCounter} that will call back every checkInterval
|
||||
* the method doAccounting of this handler.<br>
|
||||
* <br>
|
||||
*
|
||||
* An {@link ObjectSizeEstimator} can be passed at construction to specify what
|
||||
* is the size of the object to be read or write accordingly to the type of
|
||||
* object. If not specified, it will used the {@link DefaultObjectSizeEstimator} implementation.<br><br>
|
||||
*
|
||||
* If you want for any particular reasons to stop the monitoring (accounting) or to change
|
||||
* the read/write limit or the check interval, several methods allow that for you:<br>
|
||||
* <ul>
|
||||
* <li><tt>configure</tt> allows you to change read or write limits, or the checkInterval</li>
|
||||
* <li><tt>getTrafficCounter</tt> allows you to have access to the TrafficCounter and so to stop
|
||||
* or start the monitoring, to change the checkInterval directly, or to have access to its values.</li>
|
||||
* <li></li>
|
||||
* </ul>
|
||||
*
|
||||
* @author The Netty Project (netty-dev@lists.jboss.org)
|
||||
* @author Frederic Bregier
|
||||
* @version $Rev: 1832 $, $Date: 2009-10-25 19:53:54 +0900 (Sun, 25 Oct 2009) $
|
||||
*/
|
||||
public abstract class AbstractTrafficShapingHandler extends
|
||||
SimpleChannelHandler implements ExternalResourceReleasable {
|
||||
/**
|
||||
* Internal logger
|
||||
*/
|
||||
static InternalLogger logger = InternalLoggerFactory
|
||||
.getInstance(AbstractTrafficShapingHandler.class);
|
||||
|
||||
/**
|
||||
* Default delay between two checks: 1s
|
||||
*/
|
||||
public static final long DEFAULT_CHECK_INTERVAL = 1000;
|
||||
|
||||
/**
|
||||
* Default minimal time to wait
|
||||
*/
|
||||
private static final long MINIMAL_WAIT = 10;
|
||||
|
||||
/**
|
||||
* Traffic Counter
|
||||
*/
|
||||
protected TrafficCounter trafficCounter = null;
|
||||
|
||||
/**
|
||||
* ObjectSizeEstimator
|
||||
*/
|
||||
private ObjectSizeEstimator objectSizeEstimator = null;
|
||||
|
||||
/**
|
||||
* Executor to associated to any TrafficCounter
|
||||
*/
|
||||
protected Executor executor = null;
|
||||
|
||||
/**
|
||||
* Limit in B/s to apply to write
|
||||
*/
|
||||
private long writeLimit = 0;
|
||||
|
||||
/**
|
||||
* Limit in B/s to apply to read
|
||||
*/
|
||||
private long readLimit = 0;
|
||||
|
||||
/**
|
||||
* Delay between two performance snapshots
|
||||
*/
|
||||
protected long checkInterval = DEFAULT_CHECK_INTERVAL; // default 1 s
|
||||
|
||||
/**
|
||||
* Boolean associated with the release of this TrafficShapingHandler.
|
||||
* It will be true only once when the releaseExternalRessources is called
|
||||
* to prevent waiting when shutdown.
|
||||
*/
|
||||
final AtomicBoolean release = new AtomicBoolean(false);
|
||||
|
||||
/**
|
||||
* @param newObjectSizeEstimator
|
||||
* @param newExecutor
|
||||
* @param newWriteLimit
|
||||
* @param newReadLimit
|
||||
* @param newCheckInterval
|
||||
*/
|
||||
private void init(ObjectSizeEstimator newObjectSizeEstimator,
|
||||
Executor newExecutor, long newWriteLimit, long newReadLimit,
|
||||
long newCheckInterval) {
|
||||
objectSizeEstimator = newObjectSizeEstimator;
|
||||
executor = newExecutor;
|
||||
writeLimit = newWriteLimit;
|
||||
readLimit = newReadLimit;
|
||||
checkInterval = newCheckInterval;
|
||||
//logger.info("TSH: "+writeLimit+":"+readLimit+":"+checkInterval+":"+isPerChannel());
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param newTrafficCounter the TrafficCounter to set
|
||||
*/
|
||||
void setTrafficCounter(TrafficCounter newTrafficCounter) {
|
||||
trafficCounter = newTrafficCounter;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor using default {@link ObjectSizeEstimator}
|
||||
*
|
||||
* @param executor
|
||||
* created for instance like Executors.newCachedThreadPool
|
||||
* @param writeLimit
|
||||
* 0 or a limit in bytes/s
|
||||
* @param readLimit
|
||||
* 0 or a limit in bytes/s
|
||||
* @param checkInterval
|
||||
* The delay between two computations of performances for
|
||||
* channels or 0 if no stats are to be computed
|
||||
*/
|
||||
public AbstractTrafficShapingHandler(Executor executor, long writeLimit,
|
||||
long readLimit, long checkInterval) {
|
||||
super();
|
||||
init(new DefaultObjectSizeEstimator(), executor, writeLimit, readLimit,
|
||||
checkInterval);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor using the specified ObjectSizeEstimator
|
||||
*
|
||||
* @param objectSizeEstimator
|
||||
* the {@link ObjectSizeEstimator} that will be used to compute
|
||||
* the size of the message
|
||||
* @param executor
|
||||
* created for instance like Executors.newCachedThreadPool
|
||||
* @param writeLimit
|
||||
* 0 or a limit in bytes/s
|
||||
* @param readLimit
|
||||
* 0 or a limit in bytes/s
|
||||
* @param checkInterval
|
||||
* The delay between two computations of performances for
|
||||
* channels or 0 if no stats are to be computed
|
||||
*/
|
||||
public AbstractTrafficShapingHandler(
|
||||
ObjectSizeEstimator objectSizeEstimator, Executor executor,
|
||||
long writeLimit, long readLimit, long checkInterval) {
|
||||
super();
|
||||
init(objectSizeEstimator, executor, writeLimit, readLimit,
|
||||
checkInterval);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor using default {@link ObjectSizeEstimator} and using default Check Interval
|
||||
*
|
||||
* @param executor
|
||||
* created for instance like Executors.newCachedThreadPool
|
||||
* @param writeLimit
|
||||
* 0 or a limit in bytes/s
|
||||
* @param readLimit
|
||||
* 0 or a limit in bytes/s
|
||||
*/
|
||||
public AbstractTrafficShapingHandler(Executor executor, long writeLimit,
|
||||
long readLimit) {
|
||||
super();
|
||||
init(new DefaultObjectSizeEstimator(), executor, writeLimit, readLimit,
|
||||
DEFAULT_CHECK_INTERVAL);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor using the specified ObjectSizeEstimator and using default Check Interval
|
||||
*
|
||||
* @param objectSizeEstimator
|
||||
* the {@link ObjectSizeEstimator} that will be used to compute
|
||||
* the size of the message
|
||||
* @param executor
|
||||
* created for instance like Executors.newCachedThreadPool
|
||||
* @param writeLimit
|
||||
* 0 or a limit in bytes/s
|
||||
* @param readLimit
|
||||
* 0 or a limit in bytes/s
|
||||
*/
|
||||
public AbstractTrafficShapingHandler(
|
||||
ObjectSizeEstimator objectSizeEstimator, Executor executor,
|
||||
long writeLimit, long readLimit) {
|
||||
super();
|
||||
init(objectSizeEstimator, executor, writeLimit, readLimit,
|
||||
DEFAULT_CHECK_INTERVAL);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT and default Check Interval
|
||||
*
|
||||
* @param executor
|
||||
* created for instance like Executors.newCachedThreadPool
|
||||
*/
|
||||
public AbstractTrafficShapingHandler(Executor executor) {
|
||||
super();
|
||||
init(new DefaultObjectSizeEstimator(), executor, 0, 0,
|
||||
DEFAULT_CHECK_INTERVAL);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor using the specified ObjectSizeEstimator and using NO LIMIT and default Check Interval
|
||||
*
|
||||
* @param objectSizeEstimator
|
||||
* the {@link ObjectSizeEstimator} that will be used to compute
|
||||
* the size of the message
|
||||
* @param executor
|
||||
* created for instance like Executors.newCachedThreadPool
|
||||
*/
|
||||
public AbstractTrafficShapingHandler(
|
||||
ObjectSizeEstimator objectSizeEstimator, Executor executor) {
|
||||
super();
|
||||
init(objectSizeEstimator, executor, 0, 0, DEFAULT_CHECK_INTERVAL);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor using default {@link ObjectSizeEstimator} and using NO LIMIT
|
||||
*
|
||||
* @param executor
|
||||
* created for instance like Executors.newCachedThreadPool
|
||||
* @param checkInterval
|
||||
* The delay between two computations of performances for
|
||||
* channels or 0 if no stats are to be computed
|
||||
*/
|
||||
public AbstractTrafficShapingHandler(Executor executor, long checkInterval) {
|
||||
super();
|
||||
init(new DefaultObjectSizeEstimator(), executor, 0, 0, checkInterval);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor using the specified ObjectSizeEstimator and using NO LIMIT
|
||||
*
|
||||
* @param objectSizeEstimator
|
||||
* the {@link ObjectSizeEstimator} that will be used to compute
|
||||
* the size of the message
|
||||
* @param executor
|
||||
* created for instance like Executors.newCachedThreadPool
|
||||
* @param checkInterval
|
||||
* The delay between two computations of performances for
|
||||
* channels or 0 if no stats are to be computed
|
||||
*/
|
||||
public AbstractTrafficShapingHandler(
|
||||
ObjectSizeEstimator objectSizeEstimator, Executor executor,
|
||||
long checkInterval) {
|
||||
super();
|
||||
init(objectSizeEstimator, executor, 0, 0, checkInterval);
|
||||
}
|
||||
|
||||
/**
|
||||
* Change the underlying limitations and check interval.
|
||||
*
|
||||
* @param newWriteLimit
|
||||
* @param newReadLimit
|
||||
* @param newCheckInterval
|
||||
*/
|
||||
public void configure(long newWriteLimit, long newReadLimit,
|
||||
long newCheckInterval) {
|
||||
this.configure(newWriteLimit, newReadLimit);
|
||||
this.configure(newCheckInterval);
|
||||
}
|
||||
|
||||
/**
|
||||
* Change the underlying limitations.
|
||||
*
|
||||
* @param newWriteLimit
|
||||
* @param newReadLimit
|
||||
*/
|
||||
public void configure(long newWriteLimit, long newReadLimit) {
|
||||
writeLimit = newWriteLimit;
|
||||
readLimit = newReadLimit;
|
||||
if (trafficCounter != null) {
|
||||
trafficCounter.resetAccounting(System.currentTimeMillis()+1);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Change the check interval.
|
||||
*
|
||||
* @param newCheckInterval
|
||||
*/
|
||||
public void configure(long newCheckInterval) {
|
||||
checkInterval = newCheckInterval;
|
||||
if (trafficCounter != null) {
|
||||
trafficCounter.configure(checkInterval);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called each time the accounting is computed from the TrafficCounters.
|
||||
* This method could be used for instance to implement almost real time accounting.
|
||||
*
|
||||
* @param counter
|
||||
* the TrafficCounter that computes its performance
|
||||
*/
|
||||
protected void doAccounting(TrafficCounter counter) {
|
||||
// NOOP by default
|
||||
}
|
||||
|
||||
/**
|
||||
* Class to implement setReadable at fix time
|
||||
*
|
||||
*/
|
||||
private class ReopenRead implements Runnable {
|
||||
/**
|
||||
* Associated ChannelHandlerContext
|
||||
*/
|
||||
private ChannelHandlerContext ctx = null;
|
||||
|
||||
/**
|
||||
* Time to wait before clearing the channel
|
||||
*/
|
||||
private long timeToWait = 0;
|
||||
|
||||
/**
|
||||
* @param ctx
|
||||
* the associated channelHandlerContext
|
||||
* @param timeToWait
|
||||
*/
|
||||
protected ReopenRead(ChannelHandlerContext ctx, long timeToWait) {
|
||||
this.ctx = ctx;
|
||||
this.timeToWait = timeToWait;
|
||||
}
|
||||
|
||||
/**
|
||||
* Truly run the waken up of the channel
|
||||
*/
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
if (release.get()) {
|
||||
return;
|
||||
}
|
||||
Thread.sleep(timeToWait);
|
||||
} catch (InterruptedException e) {
|
||||
// interruption so exit
|
||||
return;
|
||||
}
|
||||
// logger.info("WAKEUP!");
|
||||
if (ctx != null && ctx.getChannel() != null &&
|
||||
ctx.getChannel().isConnected()) {
|
||||
//logger.info(" setReadable TRUE: "+timeToWait);
|
||||
// readSuspended = false;
|
||||
ctx.setAttachment(null);
|
||||
ctx.getChannel().setReadable(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return the time that should be necessary to wait to respect limit. Can
|
||||
* be negative time
|
||||
*/
|
||||
private long getTimeToWait(long limit, long bytes, long lastTime,
|
||||
long curtime) {
|
||||
long interval = curtime - lastTime;
|
||||
if (interval == 0) {
|
||||
// Time is too short, so just lets continue
|
||||
return 0;
|
||||
}
|
||||
long wait = bytes * 1000 / limit - interval;
|
||||
return wait;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(ChannelHandlerContext arg0, MessageEvent arg1)
|
||||
throws Exception {
|
||||
try {
|
||||
long curtime = System.currentTimeMillis();
|
||||
long size = objectSizeEstimator.estimateSize(arg1.getMessage());
|
||||
if (trafficCounter != null) {
|
||||
trafficCounter.bytesRecvFlowControl(arg0, size);
|
||||
if (readLimit == 0) {
|
||||
// no action
|
||||
return;
|
||||
}
|
||||
// compute the number of ms to wait before reopening the channel
|
||||
long wait = getTimeToWait(readLimit, trafficCounter
|
||||
.getCurrentReadBytes(), trafficCounter.getLastTime(),
|
||||
curtime);
|
||||
if (wait > MINIMAL_WAIT) { // At least 10ms seems a minimal time in order to
|
||||
Channel channel = arg0.getChannel();
|
||||
// try to limit the traffic
|
||||
if (channel != null && channel.isConnected()) {
|
||||
// Channel version
|
||||
if (executor == null) {
|
||||
// Sleep since no executor
|
||||
//logger.info("Read sleep since no executor for "+wait+" ms for "+this);
|
||||
if (release.get()) {
|
||||
return;
|
||||
}
|
||||
Thread.sleep(wait);
|
||||
return;
|
||||
}
|
||||
if (arg0.getAttachment() == null) {
|
||||
// readSuspended = true;
|
||||
arg0.setAttachment(Boolean.TRUE);
|
||||
channel.setReadable(false);
|
||||
//logger.info("Read will wakeup after "+wait+" ms "+this);
|
||||
executor.execute(new ReopenRead(arg0, wait));
|
||||
} else {
|
||||
// should be waiting: but can occurs sometime so as a FIX
|
||||
//logger.info("Read sleep ok but should not be here: "+wait+" "+this);
|
||||
if (release.get()) {
|
||||
return;
|
||||
}
|
||||
Thread.sleep(wait);
|
||||
}
|
||||
} else {
|
||||
// Not connected or no channel
|
||||
//logger.info("Read sleep "+wait+" ms for "+this);
|
||||
if (release.get()) {
|
||||
return;
|
||||
}
|
||||
Thread.sleep(wait);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
// The message is then just passed to the next handler
|
||||
super.messageReceived(arg0, arg1);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeRequested(ChannelHandlerContext arg0, MessageEvent arg1)
|
||||
throws Exception {
|
||||
try {
|
||||
long curtime = System.currentTimeMillis();
|
||||
long size = objectSizeEstimator.estimateSize(arg1.getMessage());
|
||||
if (trafficCounter != null) {
|
||||
trafficCounter.bytesWriteFlowControl(size);
|
||||
if (writeLimit == 0) {
|
||||
return;
|
||||
}
|
||||
// compute the number of ms to wait before continue with the channel
|
||||
long wait = getTimeToWait(writeLimit, trafficCounter
|
||||
.getCurrentWrittenBytes(), trafficCounter.getLastTime(),
|
||||
curtime);
|
||||
if (wait > MINIMAL_WAIT) {
|
||||
// Global or Channel
|
||||
if (release.get()) {
|
||||
return;
|
||||
}
|
||||
Thread.sleep(wait);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
// The message is then just passed to the next handler
|
||||
super.writeRequested(arg0, arg1);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
|
||||
throws Exception {
|
||||
if (e instanceof ChannelStateEvent) {
|
||||
ChannelStateEvent cse = (ChannelStateEvent) e;
|
||||
if (cse.getState() == ChannelState.INTEREST_OPS &&
|
||||
(((Integer) cse.getValue()).intValue() & Channel.OP_READ) != 0) {
|
||||
|
||||
// setReadable(true) requested
|
||||
boolean readSuspended = ctx.getAttachment() != null;
|
||||
if (readSuspended) {
|
||||
// Drop the request silently if this handler has
|
||||
// set the flag.
|
||||
e.getFuture().setSuccess();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
super.handleDownstream(ctx, e);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return the current TrafficCounter (if
|
||||
* channel is still connected)
|
||||
*/
|
||||
public TrafficCounter getTrafficCounter() {
|
||||
return trafficCounter;
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
* @see org.jboss.netty.util.ExternalResourceReleasable#releaseExternalResources()
|
||||
*/
|
||||
@Override
|
||||
public void releaseExternalResources() {
|
||||
if (trafficCounter != null) {
|
||||
trafficCounter.stop();
|
||||
}
|
||||
release.set(true);
|
||||
ExecutorUtil.terminate(executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TrafficShaping with Write Limit: " + writeLimit +
|
||||
" Read Limit: " + readLimit + " and Counter: " +
|
||||
(trafficCounter != null? trafficCounter.toString() : "none");
|
||||
}
|
||||
}
|
@ -0,0 +1,178 @@
|
||||
/*
|
||||
* Copyright 2009 Red Hat, Inc.
|
||||
*
|
||||
* Red Hat licenses this file to you under the Apache License, version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with the
|
||||
* License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.jboss.netty.handler.traffic;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ChannelPipelineFactory;
|
||||
import org.jboss.netty.channel.ChannelStateEvent;
|
||||
import org.jboss.netty.handler.execution.ExecutionHandler;
|
||||
import org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor;
|
||||
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
|
||||
import org.jboss.netty.util.ObjectSizeEstimator;
|
||||
|
||||
/**
|
||||
* This implementation of the {@link AbstractTrafficShapingHandler} is for channel
|
||||
* traffic shaping, that is to say a per channel limitation of the bandwidth.<br><br>
|
||||
*
|
||||
* The general use should be as follow:<br>
|
||||
* <ul>
|
||||
* <li>Add in your pipeline a new ChannelTrafficShapingHandler, before a recommended {@link ExecutionHandler} (like
|
||||
* {@link OrderedMemoryAwareThreadPoolExecutor} or {@link MemoryAwareThreadPoolExecutor}).<br>
|
||||
* <tt>ChannelTrafficShapingHandler myHandler = new ChannelTrafficShapingHandler(executor);</tt><br>
|
||||
* executor could be created using <tt>Executors.newCachedThreadPool();<tt><br>
|
||||
* <tt>pipeline.addLast("CHANNEL_TRAFFIC_SHAPING", myHandler);</tt><br><br>
|
||||
*
|
||||
* <b>Note that this handler has a Pipeline Coverage of "one" which means a new handler must be created
|
||||
* for each new channel as the counter cannot be shared among all channels.</b> For instance, if you have a
|
||||
* {@link ChannelPipelineFactory}, you should create a new ChannelTrafficShapingHandler in this
|
||||
* {@link ChannelPipelineFactory} each time getPipeline() method is called.<br><br>
|
||||
*
|
||||
* Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation)
|
||||
* or the check interval (in millisecond) that represents the delay between two computations of the
|
||||
* bandwidth and so the call back of the doAccounting method (0 means no accounting at all).<br><br>
|
||||
*
|
||||
* A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting,
|
||||
* it is recommended to set a positive value, even if it is high since the precision of the
|
||||
* Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
|
||||
* the less precise the traffic shaping will be. It is suggested as higher value something close
|
||||
* to 5 or 10 minutes.<br>
|
||||
* </li>
|
||||
* <li>When you shutdown your application, release all the external resources like the executor
|
||||
* by calling:<br>
|
||||
* <tt>myHandler.releaseExternalResources();</tt><br>
|
||||
* </li>
|
||||
* </ul><br>
|
||||
*
|
||||
* @author The Netty Project (netty-dev@lists.jboss.org)
|
||||
* @author Frederic Bregier
|
||||
*/
|
||||
public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler {
|
||||
|
||||
/**
|
||||
* @param executor
|
||||
* @param writeLimit
|
||||
* @param readLimit
|
||||
* @param checkInterval
|
||||
*/
|
||||
public ChannelTrafficShapingHandler(Executor executor, long writeLimit,
|
||||
long readLimit, long checkInterval) {
|
||||
super(executor, writeLimit, readLimit, checkInterval);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param executor
|
||||
* @param writeLimit
|
||||
* @param readLimit
|
||||
*/
|
||||
public ChannelTrafficShapingHandler(Executor executor, long writeLimit,
|
||||
long readLimit) {
|
||||
super(executor, writeLimit, readLimit);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param executor
|
||||
* @param checkInterval
|
||||
*/
|
||||
public ChannelTrafficShapingHandler(Executor executor, long checkInterval) {
|
||||
super(executor, checkInterval);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param executor
|
||||
*/
|
||||
public ChannelTrafficShapingHandler(Executor executor) {
|
||||
super(executor);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param objectSizeEstimator
|
||||
* @param executor
|
||||
* @param writeLimit
|
||||
* @param readLimit
|
||||
* @param checkInterval
|
||||
*/
|
||||
public ChannelTrafficShapingHandler(
|
||||
ObjectSizeEstimator objectSizeEstimator, Executor executor,
|
||||
long writeLimit, long readLimit, long checkInterval) {
|
||||
super(objectSizeEstimator, executor, writeLimit, readLimit,
|
||||
checkInterval);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param objectSizeEstimator
|
||||
* @param executor
|
||||
* @param writeLimit
|
||||
* @param readLimit
|
||||
*/
|
||||
public ChannelTrafficShapingHandler(
|
||||
ObjectSizeEstimator objectSizeEstimator, Executor executor,
|
||||
long writeLimit, long readLimit) {
|
||||
super(objectSizeEstimator, executor, writeLimit, readLimit);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param objectSizeEstimator
|
||||
* @param executor
|
||||
* @param checkInterval
|
||||
*/
|
||||
public ChannelTrafficShapingHandler(
|
||||
ObjectSizeEstimator objectSizeEstimator, Executor executor,
|
||||
long checkInterval) {
|
||||
super(objectSizeEstimator, executor, checkInterval);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param objectSizeEstimator
|
||||
* @param executor
|
||||
*/
|
||||
public ChannelTrafficShapingHandler(
|
||||
ObjectSizeEstimator objectSizeEstimator, Executor executor) {
|
||||
super(objectSizeEstimator, executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
|
||||
throws Exception {
|
||||
if (trafficCounter != null) {
|
||||
trafficCounter.stop();
|
||||
trafficCounter = null;
|
||||
}
|
||||
super.channelClosed(ctx, e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
|
||||
throws Exception {
|
||||
// readSuspended = true;
|
||||
ctx.setAttachment(Boolean.TRUE);
|
||||
ctx.getChannel().setReadable(false);
|
||||
if (trafficCounter == null) {
|
||||
// create a new counter now
|
||||
trafficCounter = new TrafficCounter(this, executor, "ChannelTC" +
|
||||
ctx.getChannel().getId(), checkInterval);
|
||||
}
|
||||
if (trafficCounter != null) {
|
||||
trafficCounter.start();
|
||||
}
|
||||
super.channelConnected(ctx, e);
|
||||
// readSuspended = false;
|
||||
ctx.setAttachment(null);
|
||||
ctx.getChannel().setReadable(true);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,165 @@
|
||||
/*
|
||||
* Copyright 2009 Red Hat, Inc.
|
||||
*
|
||||
* Red Hat licenses this file to you under the Apache License, version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with the
|
||||
* License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.jboss.netty.handler.traffic;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import org.jboss.netty.channel.ChannelHandler.Sharable;
|
||||
import org.jboss.netty.handler.execution.ExecutionHandler;
|
||||
import org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor;
|
||||
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
|
||||
import org.jboss.netty.util.ObjectSizeEstimator;
|
||||
|
||||
/**
|
||||
* This implementation of the {@link AbstractTrafficShapingHandler} is for global
|
||||
* traffic shaping, that is to say a global limitation of the bandwidth, whatever
|
||||
* the number of opened channels.<br><br>
|
||||
*
|
||||
* The general use should be as follow:<br>
|
||||
* <ul>
|
||||
* <li>Create your unique GlobalTrafficShapingHandler like:<br><br>
|
||||
* <tt>GlobalTrafficShapingHandler myHandler = new GlobalTrafficShapingHandler(executor);</tt><br><br>
|
||||
* executor could be created using <tt>Executors.newCachedThreadPool();<tt><br>
|
||||
* <tt>pipeline.addLast("GLOBAL_TRAFFIC_SHAPING", myHandler);</tt><br><br>
|
||||
*
|
||||
* <b>Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created
|
||||
* and shared among all channels as the counter must be shared among all channels.</b><br><br>
|
||||
*
|
||||
* Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation)
|
||||
* or the check interval (in millisecond) that represents the delay between two computations of the
|
||||
* bandwidth and so the call back of the doAccounting method (0 means no accounting at all).<br><br>
|
||||
*
|
||||
* A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting,
|
||||
* it is recommended to set a positive value, even if it is high since the precision of the
|
||||
* Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
|
||||
* the less precise the traffic shaping will be. It is suggested as higher value something close
|
||||
* to 5 or 10 minutes.<br>
|
||||
* </li>
|
||||
* <li>Add it in your pipeline, before a recommended {@link ExecutionHandler} (like
|
||||
* {@link OrderedMemoryAwareThreadPoolExecutor} or {@link MemoryAwareThreadPoolExecutor}).<br>
|
||||
* <tt>pipeline.addLast("GLOBAL_TRAFFIC_SHAPING", myHandler);</tt><br><br>
|
||||
* </li>
|
||||
* <li>When you shutdown your application, release all the external resources like the executor
|
||||
* by calling:<br>
|
||||
* <tt>myHandler.releaseExternalResources();</tt><br>
|
||||
* </li>
|
||||
* </ul><br>
|
||||
*
|
||||
* @author The Netty Project (netty-dev@lists.jboss.org)
|
||||
* @author Frederic Bregier
|
||||
* @version $Rev: 1794 $, $Date: 2009-10-20 14:20:28 +0900 (Tue, 20 Oct 2009) $
|
||||
*/
|
||||
@Sharable
|
||||
public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
|
||||
/**
|
||||
* Create the global TrafficCounter
|
||||
*/
|
||||
void createGlobalTrafficCounter() {
|
||||
TrafficCounter tc = new TrafficCounter(this, executor, "GlobalTC",
|
||||
checkInterval);
|
||||
setTrafficCounter(tc);
|
||||
tc.start();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param executor
|
||||
* @param writeLimit
|
||||
* @param readLimit
|
||||
* @param checkInterval
|
||||
*/
|
||||
public GlobalTrafficShapingHandler(Executor executor, long writeLimit,
|
||||
long readLimit, long checkInterval) {
|
||||
super(executor, writeLimit, readLimit, checkInterval);
|
||||
createGlobalTrafficCounter();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param executor
|
||||
* @param writeLimit
|
||||
* @param readLimit
|
||||
*/
|
||||
public GlobalTrafficShapingHandler(Executor executor, long writeLimit,
|
||||
long readLimit) {
|
||||
super(executor, writeLimit, readLimit);
|
||||
createGlobalTrafficCounter();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param executor
|
||||
* @param checkInterval
|
||||
*/
|
||||
public GlobalTrafficShapingHandler(Executor executor, long checkInterval) {
|
||||
super(executor, checkInterval);
|
||||
createGlobalTrafficCounter();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param executor
|
||||
*/
|
||||
public GlobalTrafficShapingHandler(Executor executor) {
|
||||
super(executor);
|
||||
createGlobalTrafficCounter();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param objectSizeEstimator
|
||||
* @param executor
|
||||
* @param writeLimit
|
||||
* @param readLimit
|
||||
* @param checkInterval
|
||||
*/
|
||||
public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
|
||||
Executor executor, long writeLimit, long readLimit,
|
||||
long checkInterval) {
|
||||
super(objectSizeEstimator, executor, writeLimit, readLimit,
|
||||
checkInterval);
|
||||
createGlobalTrafficCounter();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param objectSizeEstimator
|
||||
* @param executor
|
||||
* @param writeLimit
|
||||
* @param readLimit
|
||||
*/
|
||||
public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
|
||||
Executor executor, long writeLimit, long readLimit) {
|
||||
super(objectSizeEstimator, executor, writeLimit, readLimit);
|
||||
createGlobalTrafficCounter();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param objectSizeEstimator
|
||||
* @param executor
|
||||
* @param checkInterval
|
||||
*/
|
||||
public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
|
||||
Executor executor, long checkInterval) {
|
||||
super(objectSizeEstimator, executor, checkInterval);
|
||||
createGlobalTrafficCounter();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param objectSizeEstimator
|
||||
* @param executor
|
||||
*/
|
||||
public GlobalTrafficShapingHandler(ObjectSizeEstimator objectSizeEstimator,
|
||||
Executor executor) {
|
||||
super(objectSizeEstimator, executor);
|
||||
createGlobalTrafficCounter();
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,408 @@
|
||||
/*
|
||||
* Copyright 2009 Red Hat, Inc.
|
||||
*
|
||||
* Red Hat licenses this file to you under the Apache License, version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with the
|
||||
* License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.jboss.netty.handler.traffic;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
|
||||
/**
|
||||
* TrafficCounter is associated with {@link AbstractTrafficShapingHandler}.<br>
|
||||
* <br>
|
||||
* A TrafficCounter has for goal to count the traffic in order to enable to limit the traffic or not,
|
||||
* globally or per channel. It compute statistics on read and written bytes at the specified
|
||||
* interval and call back the {@link AbstractTrafficShapingHandler} doAccounting method at every
|
||||
* specified interval. If this interval is set to 0, therefore no accounting will be done and only
|
||||
* statistics will be computed at each receive or write operations.
|
||||
*
|
||||
* @author The Netty Project (netty-dev@lists.jboss.org)
|
||||
* @author Frederic Bregier
|
||||
* @version $Rev: 1794 $, $Date: 2009-10-20 14:20:28 +0900 (Tue, 20 Oct 2009) $
|
||||
*/
|
||||
public class TrafficCounter {
|
||||
/**
|
||||
* Current written bytes
|
||||
*/
|
||||
private final AtomicLong currentWrittenBytes = new AtomicLong(0);
|
||||
|
||||
/**
|
||||
* Current read bytes
|
||||
*/
|
||||
private final AtomicLong currentReadBytes = new AtomicLong(0);
|
||||
|
||||
/**
|
||||
* Long life written bytes
|
||||
*/
|
||||
private final AtomicLong cumulativeWrittenBytes = new AtomicLong(0);
|
||||
|
||||
/**
|
||||
* Long life read bytes
|
||||
*/
|
||||
private final AtomicLong cumulativeReadBytes = new AtomicLong(0);
|
||||
|
||||
/**
|
||||
* Last Time where cumulative bytes where reset to zero
|
||||
*/
|
||||
private long lastCumulativeTime;
|
||||
|
||||
/**
|
||||
* Last writing bandwidth
|
||||
*/
|
||||
private long lastWriteThroughput = 0;
|
||||
|
||||
/**
|
||||
* Last reading bandwidth
|
||||
*/
|
||||
private long lastReadThroughput = 0;
|
||||
|
||||
/**
|
||||
* Last Time Check taken
|
||||
*/
|
||||
private final AtomicLong lastTime = new AtomicLong(0);
|
||||
|
||||
/**
|
||||
* Last written bytes number during last check interval
|
||||
*/
|
||||
private long lastWrittenBytes = 0;
|
||||
|
||||
/**
|
||||
* Last read bytes number during last check interval
|
||||
*/
|
||||
private long lastReadBytes = 0;
|
||||
|
||||
/**
|
||||
* Delay between two captures
|
||||
*/
|
||||
AtomicLong checkInterval = new AtomicLong(
|
||||
AbstractTrafficShapingHandler.DEFAULT_CHECK_INTERVAL);
|
||||
|
||||
// default 1 s
|
||||
|
||||
/**
|
||||
* Name of this Monitor
|
||||
*/
|
||||
final String name;
|
||||
|
||||
/**
|
||||
* The associated TrafficShapingHandler
|
||||
*/
|
||||
private AbstractTrafficShapingHandler trafficShapingHandler = null;
|
||||
|
||||
/**
|
||||
* Default Executor
|
||||
*/
|
||||
private Executor executor = null;
|
||||
|
||||
/**
|
||||
* Is Monitor active
|
||||
*/
|
||||
AtomicBoolean monitorActive = new AtomicBoolean(false);
|
||||
|
||||
/**
|
||||
* Monitor
|
||||
*/
|
||||
private TrafficMonitoring trafficMonitoring = null;
|
||||
|
||||
/**
|
||||
* Class to implement monitoring at fix delay
|
||||
*
|
||||
*/
|
||||
private class TrafficMonitoring implements Runnable {
|
||||
/**
|
||||
* The associated TrafficShapingHandler
|
||||
*/
|
||||
private final AbstractTrafficShapingHandler trafficShapingHandler1;
|
||||
|
||||
/**
|
||||
* The associated TrafficCounter
|
||||
*/
|
||||
private final TrafficCounter counter;
|
||||
|
||||
/**
|
||||
* @param trafficShapingHandler
|
||||
* @param counter
|
||||
*/
|
||||
protected TrafficMonitoring(
|
||||
AbstractTrafficShapingHandler trafficShapingHandler,
|
||||
TrafficCounter counter) {
|
||||
trafficShapingHandler1 = trafficShapingHandler;
|
||||
this.counter = counter;
|
||||
}
|
||||
|
||||
/**
|
||||
* Default run
|
||||
*/
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
Thread.currentThread().setName(name);
|
||||
for (; monitorActive.get();) {
|
||||
long check = counter.checkInterval.get();
|
||||
if (check > 0) {
|
||||
Thread.sleep(check);
|
||||
} else {
|
||||
// Delay goes to 0, so exit
|
||||
return;
|
||||
}
|
||||
long endTime = System.currentTimeMillis();
|
||||
counter.resetAccounting(endTime);
|
||||
if (trafficShapingHandler1 != null) {
|
||||
trafficShapingHandler1.doAccounting(counter);
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
// End of computations
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the monitoring process
|
||||
*
|
||||
*/
|
||||
public void start() {
|
||||
synchronized (lastTime) {
|
||||
if (monitorActive.get()) {
|
||||
return;
|
||||
}
|
||||
lastTime.set(System.currentTimeMillis());
|
||||
if (checkInterval.get() > 0) {
|
||||
monitorActive.set(true);
|
||||
trafficMonitoring = new TrafficMonitoring(
|
||||
trafficShapingHandler, this);
|
||||
executor.execute(trafficMonitoring);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the monitoring process
|
||||
*
|
||||
*/
|
||||
public void stop() {
|
||||
synchronized (lastTime) {
|
||||
if (!monitorActive.get()) {
|
||||
return;
|
||||
}
|
||||
monitorActive.set(false);
|
||||
resetAccounting(System.currentTimeMillis());
|
||||
if (trafficShapingHandler != null) {
|
||||
trafficShapingHandler.doAccounting(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset the accounting on Read and Write
|
||||
*
|
||||
* @param newLastTime
|
||||
*/
|
||||
void resetAccounting(long newLastTime) {
|
||||
synchronized (lastTime) {
|
||||
long interval = newLastTime - lastTime.getAndSet(newLastTime);
|
||||
if (interval == 0) {
|
||||
// nothing to do
|
||||
return;
|
||||
}
|
||||
lastReadBytes = currentReadBytes.getAndSet(0);
|
||||
lastWrittenBytes = currentWrittenBytes.getAndSet(0);
|
||||
lastReadThroughput = lastReadBytes / interval * 1000;
|
||||
// nb byte / checkInterval in ms * 1000 (1s)
|
||||
lastWriteThroughput = lastWrittenBytes / interval * 1000;
|
||||
// nb byte / checkInterval in ms * 1000 (1s)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor with the {@link AbstractTrafficShapingHandler} that hosts it, the executorService to use, its
|
||||
* name, the checkInterval between two computations in millisecond
|
||||
* @param trafficShapingHandler the associated AbstractTrafficShapingHandler
|
||||
* @param executor
|
||||
* Should be a CachedThreadPool for efficiency
|
||||
* @param name
|
||||
* the name given to this monitor
|
||||
* @param checkInterval
|
||||
* the checkInterval in millisecond between two computations
|
||||
*/
|
||||
public TrafficCounter(AbstractTrafficShapingHandler trafficShapingHandler,
|
||||
Executor executor, String name, long checkInterval) {
|
||||
this.trafficShapingHandler = trafficShapingHandler;
|
||||
this.executor = executor;
|
||||
this.name = name;
|
||||
lastCumulativeTime = System.currentTimeMillis();
|
||||
configure(checkInterval);
|
||||
}
|
||||
|
||||
/**
|
||||
* Change checkInterval between
|
||||
* two computations in millisecond
|
||||
*
|
||||
* @param newcheckInterval
|
||||
*/
|
||||
public void configure(long newcheckInterval) {
|
||||
if (checkInterval.get() != newcheckInterval) {
|
||||
checkInterval.set(newcheckInterval);
|
||||
if (newcheckInterval <= 0) {
|
||||
stop();
|
||||
// No more active monitoring
|
||||
lastTime.set(System.currentTimeMillis());
|
||||
} else {
|
||||
// Start if necessary
|
||||
start();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes counters for Read.
|
||||
*
|
||||
* @param ctx
|
||||
* the associated channelHandlerContext
|
||||
* @param recv
|
||||
* the size in bytes to read
|
||||
*/
|
||||
void bytesRecvFlowControl(ChannelHandlerContext ctx, long recv) {
|
||||
currentReadBytes.addAndGet(recv);
|
||||
cumulativeReadBytes.addAndGet(recv);
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes counters for Write.
|
||||
*
|
||||
* @param write
|
||||
* the size in bytes to write
|
||||
*/
|
||||
void bytesWriteFlowControl(long write) {
|
||||
currentWrittenBytes.addAndGet(write);
|
||||
cumulativeWrittenBytes.addAndGet(write);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return the current checkInterval between two computations of traffic counter
|
||||
* in millisecond
|
||||
*/
|
||||
public long getCheckInterval() {
|
||||
return checkInterval.get();
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return the Read Throughput in bytes/s computes in the last check interval
|
||||
*/
|
||||
public long getLastReadThroughput() {
|
||||
return lastReadThroughput;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return the Write Throughput in bytes/s computes in the last check interval
|
||||
*/
|
||||
public long getLastWriteThroughput() {
|
||||
return lastWriteThroughput;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return the number of bytes read during the last check Interval
|
||||
*/
|
||||
public long getLastReadBytes() {
|
||||
return lastReadBytes;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return the number of bytes written during the last check Interval
|
||||
*/
|
||||
public long getLastWrittenBytes() {
|
||||
return lastWrittenBytes;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return the current number of bytes read since the last checkInterval
|
||||
*/
|
||||
public long getCurrentReadBytes() {
|
||||
return currentReadBytes.get();
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return the current number of bytes written since the last check Interval
|
||||
*/
|
||||
public long getCurrentWrittenBytes() {
|
||||
return currentWrittenBytes.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the Time in millisecond of the last check as of System.currentTimeMillis()
|
||||
*/
|
||||
public long getLastTime() {
|
||||
return lastTime.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the cumulativeWrittenBytes
|
||||
*/
|
||||
public long getCumulativeWrittenBytes() {
|
||||
return cumulativeWrittenBytes.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the cumulativeReadBytes
|
||||
*/
|
||||
public long getCumulativeReadBytes() {
|
||||
return cumulativeReadBytes.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the lastCumulativeTime in millisecond as of System.currentTimeMillis()
|
||||
* when the cumulative counters were reset to 0.
|
||||
*/
|
||||
public long getLastCumulativeTime() {
|
||||
return lastCumulativeTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset both read and written cumulative bytes counters and the associated time.
|
||||
*/
|
||||
public void resetCumulativeTime() {
|
||||
lastCumulativeTime = System.currentTimeMillis();
|
||||
cumulativeReadBytes.set(0);
|
||||
cumulativeWrittenBytes.set(0);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the name
|
||||
*/
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
/**
|
||||
* String information
|
||||
*/
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Monitor " + name + " Current Speed Read: " +
|
||||
(lastReadThroughput >> 10) + " KB/s, Write: " +
|
||||
(lastWriteThroughput >> 10) + " KB/s Current Read: " +
|
||||
(currentReadBytes.get() >> 10) + " KB Current Write: " +
|
||||
(currentWrittenBytes.get() >> 10) + " KB";
|
||||
}
|
||||
}
|
@ -0,0 +1,97 @@
|
||||
/*
|
||||
* Copyright 2009 Red Hat, Inc.
|
||||
*
|
||||
* Red Hat licenses this file to you under the Apache License, version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with the
|
||||
* License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Implementation of a Traffic Shaping Handler and Dynamic Statistics.<br>
|
||||
* <br><br>
|
||||
*
|
||||
*
|
||||
* <P>The main goal of this package is to allow to shape the traffic (bandwidth limitation),
|
||||
* but also to get statistics on how many bytes are read or written. Both functions can
|
||||
* be active or inactive (traffic or statistics).</P>
|
||||
*
|
||||
* <P>Two classes implement this behavior:<br>
|
||||
* <ul>
|
||||
* <li> <tt>{@link org.jboss.netty.handler.traffic.TrafficCounter}</tt>: this class implements the counters needed by the handlers.
|
||||
* It can be accessed to get some extra information like the read or write bytes since last check, the read and write
|
||||
* bandwidth from last check...</li><br><br>
|
||||
*
|
||||
* <li> <tt>{@link org.jboss.netty.handler.traffic.AbstractTrafficShapingHandler}</tt>: this abstract class implements the kernel
|
||||
* of the traffic shaping. It could be extended to fit your needs. Two classes are proposed as default
|
||||
* implementations: see {@link org.jboss.netty.handler.traffic.ChannelTrafficShapingHandler} and see {@link org.jboss.netty.handler.traffic.GlobalTrafficShapingHandler}
|
||||
* respectively for Channel traffic shaping and Global traffic shaping.</li><br><br>
|
||||
*
|
||||
* The insertion in the pipeline of one of those handlers can be wherever you want, but
|
||||
* <b>it must be placed before any <tt>{@link org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor}</tt>
|
||||
* in your pipeline</b>.</li><br>
|
||||
* <b><i>It is really recommended to have such a</i> <tt>{@link org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor}</tt>
|
||||
* <i>(either non ordered or </i> <tt>{@link org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor}</tt>
|
||||
* <i>) in your pipeline</i></b>
|
||||
* when you want to use this feature with some real traffic shaping, since it will allow to relax the constraint on
|
||||
* NioWorker to do other jobs if necessary.<br>
|
||||
* Instead, if you don't, you can have the following situation: if there are more clients
|
||||
* connected and doing data transfer (either in read or write) than NioWorker, your global performance can be under
|
||||
* your specifications or even sometimes it will block for a while which can turn to "timeout" operations.
|
||||
* For instance, let says that you've got 2 NioWorkers, and 10 clients wants to send data to your server.
|
||||
* If you set a bandwidth limitation of 100KB/s for each channel (client), you could have a final limitation of about
|
||||
* 60KB/s for each channel since NioWorkers are stopping by this handler.<br>
|
||||
* When it is used as a read traffic shaper, the handler will set the channel as not readable, so as to relax the
|
||||
* NioWorkers.<br><br>
|
||||
* An {@link org.jboss.netty.util.ObjectSizeEstimator} can be passed at construction to specify what
|
||||
* is the size of the object to be read or write accordingly to the type of
|
||||
* object. If not specified, it will used the {@link org.jboss.netty.util.DefaultObjectSizeEstimator} implementation.<br><br>
|
||||
* </ul></P>
|
||||
*
|
||||
* <P>Standard use could be as follow:</P>
|
||||
*
|
||||
* <P><ul>
|
||||
* <li>To activate or deactivate the traffic shaping, change the value corresponding to your desire as
|
||||
* [Global or per Channel] [Write or Read] Limitation in byte/s.</li><br>
|
||||
* A value of <tt>0</tt>
|
||||
* stands for no limitation, so the traffic shaping is deactivate (on what you specified).<br>
|
||||
* You can either change those values with the method <tt>configure</tt> in {@link org.jboss.netty.handler.traffic.AbstractTrafficShapingHandler}.<br>
|
||||
* <br>
|
||||
*
|
||||
* <li>To activate or deactivate the statistics, you can adjust the delay to a low (suggested not less than 200ms
|
||||
* for efficiency reasons) or a high value (let say 24H in millisecond is huge enough to not get the problem)
|
||||
* or even using <tt>0</tt> which means no computation will be done.</li><br>
|
||||
* If you want to do anything with this statistics, just override the <tt>doAccounting</tt> method.<br>
|
||||
* This interval can be changed either from the method <tt>configure</tt> in {@link org.jboss.netty.handler.traffic.AbstractTrafficShapingHandler}
|
||||
* or directly using the method <tt>configure</tt> of {@link org.jboss.netty.handler.traffic.TrafficCounter}.<br><br>
|
||||
*
|
||||
* </ul></P><br><br>
|
||||
*
|
||||
* <P>So in your application you will create your own TrafficShapingHandler and set the values to fit your needs.</P>
|
||||
* <tt>XXXXXTrafficShapingHandler myHandler = new XXXXXTrafficShapingHandler(executor);</tt><br><br>
|
||||
* where executor could be created using <tt>Executors.newCachedThreadPool();<tt> and XXXXX could be either
|
||||
* Global or Channel<br>
|
||||
* <tt>pipeline.addLast("XXXXX_TRAFFIC_SHAPING", myHandler);</tt><br>
|
||||
* <tt>...</tt><br>
|
||||
* <tt>pipeline.addLast("MemoryExecutor",new ExecutionHandler(memoryAwareThreadPoolExecutor));</tt><br><br>
|
||||
* <P>Note that a new {@link org.jboss.netty.handler.traffic.ChannelTrafficShapingHandler} must be created for each new channel,
|
||||
* but only one {@link org.jboss.netty.handler.traffic.GlobalTrafficShapingHandler} must be created for all channels.</P>
|
||||
*
|
||||
* <P>Note also that you can create different GlobalTrafficShapingHandler if you want to separate classes of
|
||||
* channels (for instance either from business point of view or from bind address point of view).</P>
|
||||
*
|
||||
* @author The Netty Project (netty-dev@lists.jboss.org)
|
||||
* @author Frederic Bregier
|
||||
*
|
||||
*
|
||||
* @apiviz.exclude ^java\.lang\.
|
||||
*/
|
||||
package org.jboss.netty.handler.traffic;
|
||||
|
@ -0,0 +1,322 @@
|
||||
package org.jboss.netty.handler.ipfilter;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelConfig;
|
||||
import org.jboss.netty.channel.ChannelEvent;
|
||||
import org.jboss.netty.channel.ChannelFactory;
|
||||
import org.jboss.netty.channel.ChannelFuture;
|
||||
import org.jboss.netty.channel.ChannelHandler;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ChannelPipeline;
|
||||
import org.jboss.netty.channel.UpstreamMessageEvent;
|
||||
import org.junit.Test;
|
||||
|
||||
public class IpFilterRuleTest extends TestCase
|
||||
{
|
||||
public static boolean accept(IpFilterRuleHandler h, InetSocketAddress addr) throws Exception
|
||||
{
|
||||
return h.accept(new ChannelHandlerContext()
|
||||
{
|
||||
|
||||
@Override
|
||||
public boolean canHandleDownstream()
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canHandleUpstream()
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getAttachment()
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Channel getChannel()
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelHandler getHandler()
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName()
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPipeline getPipeline()
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendDownstream(ChannelEvent e)
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendUpstream(ChannelEvent e)
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAttachment(Object attachment)
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
}
|
||||
|
||||
},
|
||||
new UpstreamMessageEvent(new Channel()
|
||||
{
|
||||
|
||||
@Override
|
||||
public ChannelFuture bind(SocketAddress localAddress)
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture close()
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture connect(SocketAddress remoteAddress)
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture disconnect()
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture getCloseFuture()
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelConfig getConfig()
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFactory getFactory()
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer getId()
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getInterestOps()
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SocketAddress getLocalAddress()
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Channel getParent()
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPipeline getPipeline()
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SocketAddress getRemoteAddress()
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isBound()
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isConnected()
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOpen()
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isReadable()
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isWritable()
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture setInterestOps(int interestOps)
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture setReadable(boolean readable)
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture unbind()
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture write(Object message)
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture write(Object message, SocketAddress remoteAddress)
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(Channel o)
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return 0;
|
||||
}
|
||||
|
||||
}, h, addr),
|
||||
addr);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIpFilterRule() throws Exception
|
||||
{
|
||||
IpFilterRuleHandler h = new IpFilterRuleHandler();
|
||||
h.addAll(new IpFilterRuleList("+n:localhost, -n:*"));
|
||||
InetSocketAddress addr = new InetSocketAddress(InetAddress.getLocalHost(), 8080);
|
||||
assertTrue(accept(h, addr));
|
||||
addr = new InetSocketAddress(InetAddress.getByName("127.0.0.2"), 8080);
|
||||
assertFalse(accept(h, addr));
|
||||
addr = new InetSocketAddress(InetAddress.getByName(InetAddress.getLocalHost().getHostName()), 8080);
|
||||
assertTrue(accept(h, addr));
|
||||
|
||||
h.clear();
|
||||
h.addAll(new IpFilterRuleList("+n:*"+InetAddress.getLocalHost().getHostName().substring(1)+", -n:*"));
|
||||
addr = new InetSocketAddress(InetAddress.getLocalHost(), 8080);
|
||||
assertTrue(accept(h, addr));
|
||||
addr = new InetSocketAddress(InetAddress.getByName("127.0.0.2"), 8080);
|
||||
assertFalse(accept(h, addr));
|
||||
addr = new InetSocketAddress(InetAddress.getByName(InetAddress.getLocalHost().getHostName()), 8080);
|
||||
assertTrue(accept(h, addr));
|
||||
|
||||
h.clear();
|
||||
h.addAll(new IpFilterRuleList("+c:"+InetAddress.getLocalHost().getHostAddress()+"/32, -n:*"));
|
||||
addr = new InetSocketAddress(InetAddress.getLocalHost(), 8080);
|
||||
assertTrue(accept(h, addr));
|
||||
addr = new InetSocketAddress(InetAddress.getByName("127.0.0.2"), 8080);
|
||||
assertFalse(accept(h, addr));
|
||||
addr = new InetSocketAddress(InetAddress.getByName(InetAddress.getLocalHost().getHostName()), 8080);
|
||||
assertTrue(accept(h, addr));
|
||||
|
||||
h.clear();
|
||||
h.addAll(new IpFilterRuleList(""));
|
||||
addr = new InetSocketAddress(InetAddress.getLocalHost(), 8080);
|
||||
assertTrue(accept(h, addr));
|
||||
addr = new InetSocketAddress(InetAddress.getByName("127.0.0.2"), 8080);
|
||||
assertTrue(accept(h, addr));
|
||||
addr = new InetSocketAddress(InetAddress.getByName(InetAddress.getLocalHost().getHostName()), 8080);
|
||||
assertTrue(accept(h, addr));
|
||||
|
||||
h.clear();
|
||||
addr = new InetSocketAddress(InetAddress.getLocalHost(), 8080);
|
||||
assertTrue(accept(h, addr));
|
||||
addr = new InetSocketAddress(InetAddress.getByName("127.0.0.2"), 8080);
|
||||
assertTrue(accept(h, addr));
|
||||
addr = new InetSocketAddress(InetAddress.getByName(InetAddress.getLocalHost().getHostName()), 8080);
|
||||
assertTrue(accept(h, addr));
|
||||
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user