This commit is contained in:
Trustin Lee 2014-12-29 16:02:04 +09:00
parent 5f5b63528a
commit 24ab3b2fd3
6 changed files with 72 additions and 74 deletions

View File

@ -227,7 +227,7 @@ public abstract class AbstractTrafficShapingHandler extends
*/
protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
long readLimit, long checkInterval) {
this.index = userDefinedWritabilityIndex();
index = userDefinedWritabilityIndex();
init(new SimpleObjectSizeEstimator(), timer, writeLimit, readLimit, checkInterval,
DEFAULT_MAX_TIME);
}
@ -252,7 +252,7 @@ public abstract class AbstractTrafficShapingHandler extends
protected AbstractTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Timer timer,
long writeLimit, long readLimit, long checkInterval) {
this.index = userDefinedWritabilityIndex();
index = userDefinedWritabilityIndex();
init(objectSizeEstimator, timer, writeLimit, readLimit, checkInterval, DEFAULT_MAX_TIME);
}
@ -270,7 +270,7 @@ public abstract class AbstractTrafficShapingHandler extends
*/
protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
long readLimit) {
this.index = userDefinedWritabilityIndex();
index = userDefinedWritabilityIndex();
init(new SimpleObjectSizeEstimator(), timer, writeLimit, readLimit,
DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
}
@ -293,7 +293,7 @@ public abstract class AbstractTrafficShapingHandler extends
protected AbstractTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Timer timer,
long writeLimit, long readLimit) {
this.index = userDefinedWritabilityIndex();
index = userDefinedWritabilityIndex();
init(objectSizeEstimator, timer, writeLimit, readLimit,
DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
}
@ -307,7 +307,7 @@ public abstract class AbstractTrafficShapingHandler extends
* created once for instance like HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024).
*/
protected AbstractTrafficShapingHandler(Timer timer) {
this.index = userDefinedWritabilityIndex();
index = userDefinedWritabilityIndex();
init(new SimpleObjectSizeEstimator(), timer, 0, 0,
DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
}
@ -325,7 +325,7 @@ public abstract class AbstractTrafficShapingHandler extends
*/
protected AbstractTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Timer timer) {
this.index = userDefinedWritabilityIndex();
index = userDefinedWritabilityIndex();
init(objectSizeEstimator, timer, 0, 0,
DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
}
@ -341,7 +341,7 @@ public abstract class AbstractTrafficShapingHandler extends
* channels or 0 if no stats are to be computed.
*/
protected AbstractTrafficShapingHandler(Timer timer, long checkInterval) {
this.index = userDefinedWritabilityIndex();
index = userDefinedWritabilityIndex();
init(new SimpleObjectSizeEstimator(), timer, 0, 0, checkInterval, DEFAULT_MAX_TIME);
}
@ -361,7 +361,7 @@ public abstract class AbstractTrafficShapingHandler extends
protected AbstractTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Timer timer,
long checkInterval) {
this.index = userDefinedWritabilityIndex();
index = userDefinedWritabilityIndex();
init(objectSizeEstimator, timer, 0, 0, checkInterval, DEFAULT_MAX_TIME);
}
@ -383,7 +383,7 @@ public abstract class AbstractTrafficShapingHandler extends
*/
protected AbstractTrafficShapingHandler(Timer timer, long writeLimit,
long readLimit, long checkInterval, long maxTime) {
this.index = userDefinedWritabilityIndex();
index = userDefinedWritabilityIndex();
init(new SimpleObjectSizeEstimator(), timer, writeLimit, readLimit, checkInterval,
maxTime);
}
@ -410,7 +410,7 @@ public abstract class AbstractTrafficShapingHandler extends
protected AbstractTrafficShapingHandler(
ObjectSizeEstimator objectSizeEstimator, Timer timer,
long writeLimit, long readLimit, long checkInterval, long maxTime) {
this.index = userDefinedWritabilityIndex();
index = userDefinedWritabilityIndex();
init(objectSizeEstimator, timer, writeLimit, readLimit, checkInterval, maxTime);
}
@ -520,7 +520,7 @@ public abstract class AbstractTrafficShapingHandler extends
* @param newCheckInterval the checkInterval to set
*/
public void setCheckInterval(long newCheckInterval) {
this.checkInterval = newCheckInterval;
checkInterval = newCheckInterval;
if (trafficCounter != null) {
trafficCounter.configure(checkInterval);
}
@ -631,7 +631,7 @@ public abstract class AbstractTrafficShapingHandler extends
// If isReadable is False and Active is True, user make a direct setReadable(false)
// Then Just reset the status
if (logger.isDebugEnabled()) {
logger.debug("Not unsuspend: " + channel.isReadable() + ":" +
logger.debug("Not unsuspend: " + channel.isReadable() + ':' +
rws.readSuspend);
}
rws.readSuspend = false;
@ -639,10 +639,10 @@ public abstract class AbstractTrafficShapingHandler extends
// Anything else allows the handler to reset the AutoRead
if (logger.isDebugEnabled()) {
if (channel.isReadable() && rws.readSuspend) {
logger.debug("Unsuspend: " + channel.isReadable() + ":" +
logger.debug("Unsuspend: " + channel.isReadable() + ':' +
rws.readSuspend);
} else {
logger.debug("Normal unsuspend: " + channel.isReadable() + ":" +
logger.debug("Normal unsuspend: " + channel.isReadable() + ':' +
rws.readSuspend);
}
}
@ -650,7 +650,7 @@ public abstract class AbstractTrafficShapingHandler extends
channel.setReadable(true);
}
if (logger.isDebugEnabled()) {
logger.debug("Unsupsend final status => " + channel.isReadable() + ":" +
logger.debug("Unsupsend final status => " + channel.isReadable() + ':' +
rws.readSuspend);
}
}
@ -685,7 +685,7 @@ public abstract class AbstractTrafficShapingHandler extends
if (channel != null && channel.isConnected()) {
// Only AutoRead AND HandlerActive True means Context Active
if (logger.isDebugEnabled()) {
logger.debug("Read suspend: " + wait + ":" + channel.isReadable() + ":" +
logger.debug("Read suspend: " + wait + ':' + channel.isReadable() + ':' +
rws.readSuspend);
}
if (timer == null) {
@ -698,7 +698,7 @@ public abstract class AbstractTrafficShapingHandler extends
rws.readSuspend = true;
channel.setReadable(false);
if (logger.isDebugEnabled()) {
logger.debug("Suspend final status => " + channel.isReadable() + ":" +
logger.debug("Suspend final status => " + channel.isReadable() + ':' +
rws.readSuspend);
}
// Create a Runnable to reactive the read if needed. If one was create before
@ -750,7 +750,7 @@ public abstract class AbstractTrafficShapingHandler extends
// compute the number of ms to wait before continue with the channel
wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
if (logger.isDebugEnabled()) {
logger.debug("Write suspend: " + wait + ":" + channel.isWritable() + ":" +
logger.debug("Write suspend: " + wait + ':' + channel.isWritable() + ':' +
channel.getUserDefinedWritability(index));
}
if (wait < MINIMAL_WAIT || release.get()) {
@ -774,8 +774,8 @@ public abstract class AbstractTrafficShapingHandler extends
submitWrite(ctx, evt, calculateSize(evt.getMessage()), delay, TrafficCounter.milliSecondFromNano());
}
abstract void submitWrite(final ChannelHandlerContext ctx, final MessageEvent evt, final long size,
final long delay, final long now) throws Exception;
abstract void submitWrite(ChannelHandlerContext ctx, MessageEvent evt, long size,
long delay, long now) throws Exception;
void setWritable(ChannelHandlerContext ctx, boolean writable) {
Channel channel = ctx.getChannel();
@ -839,9 +839,8 @@ public abstract class AbstractTrafficShapingHandler extends
}
protected long calculateSize(Object obj) {
long size = objectSizeEstimator.estimateSize(obj);
//logger.debug("Size: "+size);
return size;
return objectSizeEstimator.estimateSize(obj);
}
@Override
@ -854,7 +853,7 @@ public abstract class AbstractTrafficShapingHandler extends
.append(" maxSize: ").append(maxWriteSize)
.append(" and Counter: ");
if (trafficCounter != null) {
builder.append(trafficCounter.toString());
builder.append(trafficCounter);
} else {
builder.append("none");
}

View File

@ -15,10 +15,6 @@
*/
package org.jboss.netty.handler.traffic;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineFactory;
@ -32,6 +28,10 @@ import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.Timer;
import org.jboss.netty.util.TimerTask;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* <p>This implementation of the {@link AbstractTrafficShapingHandler} is for channel
* traffic shaping, that is to say a per channel limitation of the bandwidth.</p>
@ -65,10 +65,10 @@ import org.jboss.netty.util.TimerTask;
* by calling:<br>
* <tt>myHandler.releaseExternalResources();</tt><br>
* </li>
* <li>In your handler, you should consider to use the <code>channel.isWritable()</code> and
* <code>channelInterestChanged(ctx, event)</code> to handle writability, or through
* <code>future.addListener(new ChannelFutureListener())</code> on the future returned by
* <code>channel.write()</code>.</li>
* <li>In your handler, you should consider to use the {@code channel.isWritable()} and
* {@code channelInterestChanged(ctx, event)} to handle writability, or through
* {@code future.addListener(new ChannelFutureListener())} on the future returned by
* {@code channel.write()}.</li>
* <li><p>You shall also consider to have object size in read or write operations relatively adapted to
* the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
* while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.</p></li>
@ -144,7 +144,7 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler
final MessageEvent toSend;
private ToSend(final long delay, final MessageEvent toSend) {
this.relativeTimeAction = delay;
relativeTimeAction = delay;
this.toSend = toSend;
}
}

View File

@ -15,13 +15,13 @@
*/
package org.jboss.netty.handler.traffic;
import java.util.concurrent.TimeUnit;
import org.jboss.netty.handler.traffic.GlobalChannelTrafficShapingHandler.PerChannel;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.Timer;
import org.jboss.netty.util.TimerTask;
import java.util.concurrent.TimeUnit;
/**
* Version for {@link GlobalChannelTrafficShapingHandler}.
* This TrafficCounter is the Global one, and its special property is to directly handle
@ -31,7 +31,6 @@ import org.jboss.netty.util.TimerTask;
public class GlobalChannelTrafficCounter extends TrafficCounter {
/**
* @param trafficShapingHandler the associated {@link GlobalChannelTrafficShapingHandler}.
* @param executor the underlying executor service for scheduling checks (both Global and per Channel).
* @param name the name given to this monitor
* @param checkInterval the checkInterval in millisecond between two computations.
*/
@ -69,7 +68,6 @@ public class GlobalChannelTrafficCounter extends TrafficCounter {
this.counter = counter;
}
@Override
public void run(Timeout timeout) throws Exception {
if (!counter.monitorActive) {
return;
@ -87,6 +85,7 @@ public class GlobalChannelTrafficCounter extends TrafficCounter {
/**
* Start the monitoring process.
*/
@Override
public synchronized void start() {
if (monitorActive) {
return;
@ -105,6 +104,7 @@ public class GlobalChannelTrafficCounter extends TrafficCounter {
/**
* Stop the monitoring process.
*/
@Override
public synchronized void stop() {
if (!monitorActive) {
return;

View File

@ -15,19 +15,10 @@
*/
package org.jboss.netty.handler.traffic;
import java.util.AbstractCollection;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
@ -37,6 +28,15 @@ import org.jboss.netty.util.Timer;
import org.jboss.netty.util.TimerTask;
import org.jboss.netty.util.internal.ConcurrentHashMap;
import java.util.AbstractCollection;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* This implementation of the {@link AbstractTrafficShapingHandler} is for global
* and per channel traffic shaping, that is to say a global limitation of the bandwidth, whatever
@ -67,10 +67,10 @@ import org.jboss.netty.util.internal.ConcurrentHashMap;
*
* maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.<br><br>
* </li>
* <li>In your handler, you should consider to use the <code>channel.isWritable()</code> and
* <code>channelWritabilityChanged(ctx)</code> to handle writability, or through
* <code>future.addListener(new GenericFutureListener())</code> on the future returned by
* <code>ctx.write()</code>.</li>
* <li>In your handler, you should consider to use the {@code channel.isWritable()} and
* {@code channelWritabilityChanged(ctx)} to handle writability, or through
* {@code future.addListener(new GenericFutureListener())} on the future returned by
* {@code ctx.write()}.</li>
* <li>You shall also consider to have object size in read or write operations relatively adapted to
* the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
* while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.<br><br></li>
@ -82,7 +82,7 @@ import org.jboss.netty.util.internal.ConcurrentHashMap;
* </ul><br>
*
* Be sure to call {@link #release()} once this handler is not needed anymore to release all internal resources.
* This will not shutdown the {@link EventExecutor} as it may be shared, so you need to do this by your own.
* This will not shutdown the {@link Timer} as it may be shared, so you need to do this by your own.
*/
@Sharable
public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHandler {
@ -610,7 +610,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
wait = perChannel.channelTrafficCounter.readTimeToWait(size, readChannelLimit, maxTime, now);
if (readDeviationActive) {
// now try to balance between the channels
long maxLocalRead = 0;
long maxLocalRead;
maxLocalRead = perChannel.channelTrafficCounter.getCumulativeReadBytes();
long maxGlobalRead = cumulativeReadBytes.get();
if (maxLocalRead <= 0) {
@ -635,7 +635,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
if (channel != null && channel.isConnected()) {
// Only AutoRead AND HandlerActive True means Context Active
if (logger.isDebugEnabled()) {
logger.debug("Read suspend: " + wait + ":" + channel.isReadable() + ":" +
logger.debug("Read suspend: " + wait + ':' + channel.isReadable() + ':' +
rws.readSuspend);
}
if (timer == null) {
@ -648,7 +648,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
rws.readSuspend = true;
channel.setReadable(false);
if (logger.isDebugEnabled()) {
logger.debug("Suspend final status => " + channel.isReadable() + ":" +
logger.debug("Suspend final status => " + channel.isReadable() + ':' +
rws.readSuspend);
}
// Create a Runnable to reactive the read if needed. If one was create before
@ -696,7 +696,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
final long size;
private ToSend(final long delay, final MessageEvent toSend, final long size) {
this.relativeTimeAction = delay;
relativeTimeAction = delay;
this.toSend = toSend;
this.size = size;
}
@ -715,7 +715,8 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
* @return the list of TrafficCounters that exists at the time of the call.
*/
public Collection<TrafficCounter> channelTrafficCounters() {
Collection<TrafficCounter> valueCollection = new AbstractCollection<TrafficCounter>() {
return new AbstractCollection<TrafficCounter>() {
@Override
public Iterator<TrafficCounter> iterator() {
return new Iterator<TrafficCounter>() {
final Iterator<PerChannel> iter = channelQueues.values().iterator();
@ -730,11 +731,11 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
}
};
}
@Override
public int size() {
return channelQueues.size();
}
};
return valueCollection;
}
@Override
@ -753,7 +754,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
wait = perChannel.channelTrafficCounter.writeTimeToWait(size, writeChannelLimit, maxTime, now);
if (writeDeviationActive) {
// now try to balance between the channels
long maxLocalWrite = 0;
long maxLocalWrite;
maxLocalWrite = perChannel.channelTrafficCounter.getCumulativeWrittenBytes();
long maxGlobalWrite = cumulativeWrittenBytes.get();
if (maxLocalWrite <= 0) {

View File

@ -15,12 +15,6 @@
*/
package org.jboss.netty.handler.traffic;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.channel.ChannelHandlerContext;
@ -35,6 +29,12 @@ import org.jboss.netty.util.Timer;
import org.jboss.netty.util.TimerTask;
import org.jboss.netty.util.internal.ConcurrentHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* <p>This implementation of the {@link AbstractTrafficShapingHandler} is for global
@ -80,7 +80,7 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
/**
* Global queues size
*/
private AtomicLong queuesSize = new AtomicLong();
private final AtomicLong queuesSize = new AtomicLong();
/**
* Max size in the list before proposing to stop writing new objects from next handlers
@ -214,7 +214,7 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
final long size;
private ToSend(final long delay, final MessageEvent toSend, final long size) {
this.relativeTimeAction = delay;
relativeTimeAction = delay;
this.toSend = toSend;
this.size = size;
}

View File

@ -39,7 +39,7 @@ public class TrafficCounter {
/**
* @return the time in ms using nanoTime, so not real EPOCH time but elapsed time in ms.
*/
public static final long milliSecondFromNano() {
public static long milliSecondFromNano() {
return System.nanoTime() / 1000000;
}
@ -176,7 +176,7 @@ public class TrafficCounter {
*/
private final TrafficCounter counter;
protected TrafficMonitoringTask(
TrafficMonitoringTask(
AbstractTrafficShapingHandler trafficShapingHandler,
TrafficCounter counter) {
trafficShapingHandler1 = trafficShapingHandler;
@ -324,8 +324,6 @@ public class TrafficCounter {
*
* @param write
* the size in bytes to write
* @param schedule
* the time when this write was scheduled
*/
void bytesRealWriteFlowControl(long write) {
realWrittenBytes.addAndGet(write);
@ -480,7 +478,7 @@ public class TrafficCounter {
long time = (sum * 1000 / limitTraffic - interval + pastDelay) / 10 * 10;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
logger.debug("Time: " + time + ":" + sum + ":" + interval + ":" + pastDelay);
logger.debug("Time: " + time + ':' + sum + ':' + interval + ':' + pastDelay);
}
if (time > maxTime && now + time - localReadingTime > maxTime) {
time = maxTime;
@ -497,7 +495,7 @@ public class TrafficCounter {
long time = (lastsum * 1000 / limitTraffic - lastinterval + pastDelay) / 10 * 10;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval + ":" + pastDelay);
logger.debug("Time: " + time + ':' + lastsum + ':' + lastinterval + ':' + pastDelay);
}
if (time > maxTime && now + time - localReadingTime > maxTime) {
time = maxTime;
@ -555,7 +553,7 @@ public class TrafficCounter {
long time = (sum * 1000 / limitTraffic - interval + pastDelay) / 10 * 10;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
logger.debug("Time: " + time + ":" + sum + ":" + interval + ":" + pastDelay);
logger.debug("Time: " + time + ':' + sum + ':' + interval + ':' + pastDelay);
}
if (time > maxTime && now + time - localWritingTime > maxTime) {
time = maxTime;
@ -572,7 +570,7 @@ public class TrafficCounter {
long time = (lastsum * 1000 / limitTraffic - lastinterval + pastDelay) / 10 * 10;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval + ":" + pastDelay);
logger.debug("Time: " + time + ':' + lastsum + ':' + lastinterval + ':' + pastDelay);
}
if (time > maxTime && now + time - localWritingTime > maxTime) {
time = maxTime;