This commit is contained in:
Trustin Lee 2014-12-29 15:54:56 +09:00
parent 2681112080
commit a77070fe9f
6 changed files with 54 additions and 59 deletions

View File

@ -167,12 +167,12 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
* The maximum delay to wait in case of traffic excess.
* Must be positive.
*/
protected AbstractTrafficShapingHandler(long writeLimit, long readLimit,
long checkInterval, long maxTime) {
protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval, long maxTime) {
if (maxTime <= 0) {
throw new IllegalArgumentException("maxTime must be positive");
}
this.userDefinedWritabilityIndex = userDefinedWritabilityIndex();
userDefinedWritabilityIndex = userDefinedWritabilityIndex();
this.writeLimit = writeLimit;
this.readLimit = readLimit;
this.checkInterval = checkInterval;
@ -398,9 +398,9 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
* <p>Note that this limit is a best effort on memory limitation to prevent Out Of
* Memory Exception. To ensure it works, the handler generating the write should
* use one of the way provided by Netty to handle the capacity:</p>
* <p>- the <code>Channel.isWritable()</code> property and the corresponding
* <code>channelWritabilityChanged()</code></p>
* <p>- the <code>ChannelFuture.addListener(new GenericFutureListener())</code></p>
* <p>- the {@code Channel.isWritable()} property and the corresponding
* {@code channelWritabilityChanged()}</p>
* <p>- the {@code ChannelFuture.addListener(new GenericFutureListener())}</p>
*
* @param maxWriteSize the maximum Write Size allowed in the buffer
* per channel before write suspended is set,
@ -430,13 +430,14 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
this.ctx = ctx;
}
@Override
public void run() {
ChannelConfig config = ctx.channel().config();
if (!config.isAutoRead() && isHandlerActive(ctx)) {
// If AutoRead is False and Active is True, user make a direct setAutoRead(false)
// Then Just reset the status
if (logger.isDebugEnabled()) {
logger.debug("Not unsuspend: " + config.isAutoRead() + ":" +
logger.debug("Not unsuspend: " + config.isAutoRead() + ':' +
isHandlerActive(ctx));
}
ctx.attr(READ_SUSPENDED).set(false);
@ -444,10 +445,10 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
// Anything else allows the handler to reset the AutoRead
if (logger.isDebugEnabled()) {
if (config.isAutoRead() && !isHandlerActive(ctx)) {
logger.debug("Unsuspend: " + config.isAutoRead() + ":" +
logger.debug("Unsuspend: " + config.isAutoRead() + ':' +
isHandlerActive(ctx));
} else {
logger.debug("Normal unsuspend: " + config.isAutoRead() + ":"
logger.debug("Normal unsuspend: " + config.isAutoRead() + ':'
+ isHandlerActive(ctx));
}
}
@ -456,7 +457,7 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
ctx.channel().read();
}
if (logger.isDebugEnabled()) {
logger.debug("Unsupsend final status => " + config.isAutoRead() + ":"
logger.debug("Unsupsend final status => " + config.isAutoRead() + ':'
+ isHandlerActive(ctx));
}
}
@ -483,7 +484,7 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
// Only AutoRead AND HandlerActive True means Context Active
ChannelConfig config = ctx.channel().config();
if (logger.isDebugEnabled()) {
logger.debug("Read suspend: " + wait + ":" + config.isAutoRead() + ":"
logger.debug("Read suspend: " + wait + ':' + config.isAutoRead() + ':'
+ isHandlerActive(ctx));
}
if (config.isAutoRead() && isHandlerActive(ctx)) {
@ -499,7 +500,7 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
}
ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
if (logger.isDebugEnabled()) {
logger.debug("Suspend final status => " + config.isAutoRead() + ":"
logger.debug("Suspend final status => " + config.isAutoRead() + ':'
+ isHandlerActive(ctx) + " will reopened at: " + wait);
}
}
@ -551,7 +552,7 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
if (wait >= MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
logger.debug("Write suspend: " + wait + ":" + ctx.channel().config().isAutoRead() + ":"
logger.debug("Write suspend: " + wait + ':' + ctx.channel().config().isAutoRead() + ':'
+ isHandlerActive(ctx));
}
submitWrite(ctx, msg, size, wait, now, promise);
@ -569,8 +570,8 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
delay, TrafficCounter.milliSecondFromNano(), promise);
}
abstract void submitWrite(final ChannelHandlerContext ctx, final Object msg, final long size,
final long delay, final long now, final ChannelPromise promise);
abstract void submitWrite(
ChannelHandlerContext ctx, Object msg, long size, long delay, long now, ChannelPromise promise);
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
@ -621,7 +622,7 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
.append(" maxSize: ").append(maxWriteSize)
.append(" and Counter: ");
if (trafficCounter != null) {
builder.append(trafficCounter.toString());
builder.append(trafficCounter);
} else {
builder.append("none");
}

View File

@ -15,17 +15,17 @@
*/
package io.netty.handler.traffic;
import java.util.ArrayDeque;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import java.util.ArrayDeque;
import java.util.concurrent.TimeUnit;
/**
* <p>This implementation of the {@link AbstractTrafficShapingHandler} is for channel
* traffic shaping, that is to say a per channel limitation of the bandwidth.</p>
* <p>Note the index used in <code>OutboundBuffer.setUserDefinedWritability(index, boolean)</code> is <b>1</b>.</p>
* <p>Note the index used in {@code OutboundBuffer.setUserDefinedWritability(index, boolean)} is <b>1</b>.</p>
*
* <p>The general use should be as follow:</p>
* <ul>
@ -48,10 +48,10 @@ import io.netty.channel.ChannelPromise;
*
* <p>maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.</p>
* </li>
* <li>In your handler, you should consider to use the <code>channel.isWritable()</code> and
* <code>channelWritabilityChanged(ctx)</code> to handle writability, or through
* <code>future.addListener(new GenericFutureListener())</code> on the future returned by
* <code>ctx.write()</code>.</li>
* <li>In your handler, you should consider to use the {@code channel.isWritable()} and
* {@code channelWritabilityChanged(ctx)} to handle writability, or through
* {@code future.addListener(new GenericFutureListener())} on the future returned by
* {@code ctx.write()}.</li>
* <li><p>You shall also consider to have object size in read or write operations relatively adapted to
* the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
* while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.</p></li>
@ -168,7 +168,7 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler
final ChannelPromise promise;
private ToSend(final long delay, final Object toSend, final ChannelPromise promise) {
this.relativeTimeAction = delay;
relativeTimeAction = delay;
this.toSend = toSend;
this.promise = promise;
}

View File

@ -86,6 +86,7 @@ public class GlobalChannelTrafficCounter extends TrafficCounter {
/**
* Start the monitoring process.
*/
@Override
public synchronized void start() {
if (monitorActive) {
return;
@ -103,6 +104,7 @@ public class GlobalChannelTrafficCounter extends TrafficCounter {
/**
* Stop the monitoring process.
*/
@Override
public synchronized void stop() {
if (!monitorActive) {
return;

View File

@ -66,10 +66,10 @@ import java.util.concurrent.atomic.AtomicLong;
*
* maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.<br><br>
* </li>
* <li>In your handler, you should consider to use the <code>channel.isWritable()</code> and
* <code>channelWritabilityChanged(ctx)</code> to handle writability, or through
* <code>future.addListener(new GenericFutureListener())</code> on the future returned by
* <code>ctx.write()</code>.</li>
* <li>In your handler, you should consider to use the {@code channel.isWritable()} and
* {@code channelWritabilityChanged(ctx)} to handle writability, or through
* {@code future.addListener(new GenericFutureListener())} on the future returned by
* {@code ctx.write()}.</li>
* <li>You shall also consider to have object size in read or write operations relatively adapted to
* the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
* while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.<br><br></li>
@ -528,7 +528,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
wait = perChannel.channelTrafficCounter.readTimeToWait(size, readChannelLimit, maxTime, now);
if (readDeviationActive) {
// now try to balance between the channels
long maxLocalRead = 0;
long maxLocalRead;
maxLocalRead = perChannel.channelTrafficCounter.cumulativeReadBytes();
long maxGlobalRead = cumulativeReadBytes.get();
if (maxLocalRead <= 0) {
@ -549,7 +549,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
// Only AutoRead AND HandlerActive True means Context Active
ChannelConfig config = ctx.channel().config();
if (logger.isDebugEnabled()) {
logger.debug("Read Suspend: " + wait + ":" + config.isAutoRead() + ":"
logger.debug("Read Suspend: " + wait + ':' + config.isAutoRead() + ':'
+ isHandlerActive(ctx));
}
if (config.isAutoRead() && isHandlerActive(ctx)) {
@ -565,7 +565,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
}
ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
if (logger.isDebugEnabled()) {
logger.debug("Suspend final status => " + config.isAutoRead() + ":"
logger.debug("Suspend final status => " + config.isAutoRead() + ':'
+ isHandlerActive(ctx) + " will reopened at: " + wait);
}
}
@ -603,7 +603,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
final long size;
private ToSend(final long delay, final Object toSend, final long size, final ChannelPromise promise) {
this.relativeTimeAction = delay;
relativeTimeAction = delay;
this.toSend = toSend;
this.size = size;
this.promise = promise;
@ -623,17 +623,20 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
* @return the list of TrafficCounters that exists at the time of the call.
*/
public Collection<TrafficCounter> channelTrafficCounters() {
Collection<TrafficCounter> valueCollection = new AbstractCollection<TrafficCounter>() {
return new AbstractCollection<TrafficCounter>() {
@Override
public Iterator<TrafficCounter> iterator() {
return new Iterator<TrafficCounter>() {
final Iterator<PerChannel> iter = channelQueues.values().iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public TrafficCounter next() {
return iter.next().channelTrafficCounter;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
@ -644,7 +647,6 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
return channelQueues.size();
}
};
return valueCollection;
}
@Override
@ -662,7 +664,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
wait = perChannel.channelTrafficCounter.writeTimeToWait(size, writeChannelLimit, maxTime, now);
if (writeDeviationActive) {
// now try to balance between the channels
long maxLocalWrite = 0;
long maxLocalWrite;
maxLocalWrite = perChannel.channelTrafficCounter.cumulativeWrittenBytes();
long maxGlobalWrite = cumulativeWrittenBytes.get();
if (maxLocalWrite <= 0) {
@ -679,7 +681,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
}
if (wait >= MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
logger.debug("Write suspend: " + wait + ":" + ctx.channel().config().isAutoRead() + ":"
logger.debug("Write suspend: " + wait + ':' + ctx.channel().config().isAutoRead() + ':'
+ isHandlerActive(ctx));
}
submitWrite(ctx, msg, size, wait, now, promise);

View File

@ -33,7 +33,7 @@ import java.util.concurrent.atomic.AtomicLong;
* <p>This implementation of the {@link AbstractTrafficShapingHandler} is for global
* traffic shaping, that is to say a global limitation of the bandwidth, whatever
* the number of opened channels.</p>
* <p>Note the index used in <code>OutboundBuffer.setUserDefinedWritability(index, boolean)</code> is <b>2</b>.</p>
* <p>Note the index used in {@code OutboundBuffer.setUserDefinedWritability(index, boolean)} is <b>2</b>.</p>
*
* <p>The general use should be as follow:</p>
* <ul>
@ -57,10 +57,10 @@ import java.util.concurrent.atomic.AtomicLong;
*
* <p>maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.</p>
* </li>
* <li>In your handler, you should consider to use the <code>channel.isWritable()</code> and
* <code>channelWritabilityChanged(ctx)</code> to handle writability, or through
* <code>future.addListener(new GenericFutureListener())</code> on the future returned by
* <code>ctx.write()</code>.</li>
* <li>In your handler, you should consider to use the {@code channel.isWritable()} and
* {@code channelWritabilityChanged(ctx)} to handle writability, or through
* {@code future.addListener(new GenericFutureListener())} on the future returned by
* {@code ctx.write()}.</li>
* <li><p>You shall also consider to have object size in read or write operations relatively adapted to
* the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
* while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.</p></li>
@ -192,7 +192,6 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
* the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
*/
public GlobalTrafficShapingHandler(EventExecutor executor) {
super();
createGlobalTrafficCounter(executor);
}
@ -314,7 +313,7 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
final ChannelPromise promise;
private ToSend(final long delay, final Object toSend, final long size, final ChannelPromise promise) {
this.relativeTimeAction = delay;
relativeTimeAction = delay;
this.toSend = toSend;
this.size = size;
this.promise = promise;

View File

@ -42,7 +42,7 @@ public class TrafficCounter {
/**
* @return the time in ms using nanoTime, so not real EPOCH time but elapsed time in ms.
*/
public static final long milliSecondFromNano() {
public static long milliSecondFromNano() {
return System.nanoTime() / 1000000;
}
@ -178,15 +178,10 @@ public class TrafficCounter {
private final TrafficCounter counter;
/**
<<<<<<< HEAD
* @param trafficShapingHandler The parent handler to which this task needs to callback to for accounting
* @param counter The parent TrafficCounter that we need to reset the statistics for
=======
* @param trafficShapingHandler
* The parent handler to which this task needs to callback to for accounting.
* @param counter
* The parent TrafficCounter that we need to reset the statistics for.
>>>>>>> b886c05... Fix big transfer and Write traffic shaping issues
*/
protected TrafficMonitoringTask(
AbstractTrafficShapingHandler trafficShapingHandler,
@ -247,11 +242,7 @@ public class TrafficCounter {
/**
* Reset the accounting on Read and Write.
*
<<<<<<< HEAD
* @param newLastTime the millisecond unix timestamp that we should be considered up-to-date for
=======
* @param newLastTime the milliseconds unix timestamp that we should be considered up-to-date for.
>>>>>>> b886c05... Fix big transfer and Write traffic shaping issues
*/
synchronized void resetAccounting(long newLastTime) {
long interval = newLastTime - lastTime.getAndSet(newLastTime);
@ -259,7 +250,7 @@ public class TrafficCounter {
// nothing to do
return;
}
if (logger.isDebugEnabled() && (interval > checkInterval() << 1)) {
if (logger.isDebugEnabled() && interval > checkInterval() << 1) {
logger.debug("Acct schedule not ok: " + interval + " > 2*" + checkInterval() + " from " + name);
}
lastReadBytes = currentReadBytes.getAndSet(0);
@ -511,7 +502,7 @@ public class TrafficCounter {
long time = sum * 1000 / limitTraffic - interval + pastDelay;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
logger.debug("Time: " + time + ":" + sum + ":" + interval + ":" + pastDelay);
logger.debug("Time: " + time + ':' + sum + ':' + interval + ':' + pastDelay);
}
if (time > maxTime && now + time - localReadingTime > maxTime) {
time = maxTime;
@ -528,7 +519,7 @@ public class TrafficCounter {
long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval + ":" + pastDelay);
logger.debug("Time: " + time + ':' + lastsum + ':' + lastinterval + ':' + pastDelay);
}
if (time > maxTime && now + time - localReadingTime > maxTime) {
time = maxTime;
@ -586,7 +577,7 @@ public class TrafficCounter {
long time = sum * 1000 / limitTraffic - interval + pastDelay;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
logger.debug("Time: " + time + ":" + sum + ":" + interval + ":" + pastDelay);
logger.debug("Time: " + time + ':' + sum + ':' + interval + ':' + pastDelay);
}
if (time > maxTime && now + time - localWritingTime > maxTime) {
time = maxTime;
@ -603,7 +594,7 @@ public class TrafficCounter {
long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval + ":" + pastDelay);
logger.debug("Time: " + time + ':' + lastsum + ':' + lastinterval + ':' + pastDelay);
}
if (time > maxTime && now + time - localWritingTime > maxTime) {
time = maxTime;