Clean-up
This commit is contained in:
parent
b886c056bc
commit
6201bbd2e1
@ -164,12 +164,12 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
|
|||||||
* The maximum delay to wait in case of traffic excess.
|
* The maximum delay to wait in case of traffic excess.
|
||||||
* Must be positive.
|
* Must be positive.
|
||||||
*/
|
*/
|
||||||
protected AbstractTrafficShapingHandler(long writeLimit, long readLimit,
|
protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval, long maxTime) {
|
||||||
long checkInterval, long maxTime) {
|
|
||||||
if (maxTime <= 0) {
|
if (maxTime <= 0) {
|
||||||
throw new IllegalArgumentException("maxTime must be positive");
|
throw new IllegalArgumentException("maxTime must be positive");
|
||||||
}
|
}
|
||||||
this.userDefinedWritabilityIndex = userDefinedWritabilityIndex();
|
|
||||||
|
userDefinedWritabilityIndex = userDefinedWritabilityIndex();
|
||||||
this.writeLimit = writeLimit;
|
this.writeLimit = writeLimit;
|
||||||
this.readLimit = readLimit;
|
this.readLimit = readLimit;
|
||||||
this.checkInterval = checkInterval;
|
this.checkInterval = checkInterval;
|
||||||
@ -400,9 +400,9 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
|
|||||||
* <p>Note that this limit is a best effort on memory limitation to prevent Out Of
|
* <p>Note that this limit is a best effort on memory limitation to prevent Out Of
|
||||||
* Memory Exception. To ensure it works, the handler generating the write should
|
* Memory Exception. To ensure it works, the handler generating the write should
|
||||||
* use one of the way provided by Netty to handle the capacity:</p>
|
* use one of the way provided by Netty to handle the capacity:</p>
|
||||||
* <p>- the <code>Channel.isWritable()</code> property and the corresponding
|
* <p>- the {@code Channel.isWritable()} property and the corresponding
|
||||||
* <code>channelWritabilityChanged()</code></p>
|
* {@code channelWritabilityChanged()}</p>
|
||||||
* <p>- the <code>ChannelFuture.addListener(new GenericFutureListener())</code></p>
|
* <p>- the {@code ChannelFuture.addListener(new GenericFutureListener())}</p>
|
||||||
*
|
*
|
||||||
* @param maxWriteSize the maximum Write Size allowed in the buffer
|
* @param maxWriteSize the maximum Write Size allowed in the buffer
|
||||||
* per channel before write suspended is set,
|
* per channel before write suspended is set,
|
||||||
@ -433,13 +433,14 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
|
|||||||
this.ctx = ctx;
|
this.ctx = ctx;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
ChannelConfig config = ctx.channel().config();
|
ChannelConfig config = ctx.channel().config();
|
||||||
if (!config.isAutoRead() && isHandlerActive(ctx)) {
|
if (!config.isAutoRead() && isHandlerActive(ctx)) {
|
||||||
// If AutoRead is False and Active is True, user make a direct setAutoRead(false)
|
// If AutoRead is False and Active is True, user make a direct setAutoRead(false)
|
||||||
// Then Just reset the status
|
// Then Just reset the status
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("Not unsuspend: " + config.isAutoRead() + ":" +
|
logger.debug("Not unsuspend: " + config.isAutoRead() + ':' +
|
||||||
isHandlerActive(ctx));
|
isHandlerActive(ctx));
|
||||||
}
|
}
|
||||||
ctx.attr(READ_SUSPENDED).set(false);
|
ctx.attr(READ_SUSPENDED).set(false);
|
||||||
@ -447,10 +448,10 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
|
|||||||
// Anything else allows the handler to reset the AutoRead
|
// Anything else allows the handler to reset the AutoRead
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
if (config.isAutoRead() && !isHandlerActive(ctx)) {
|
if (config.isAutoRead() && !isHandlerActive(ctx)) {
|
||||||
logger.debug("Unsuspend: " + config.isAutoRead() + ":" +
|
logger.debug("Unsuspend: " + config.isAutoRead() + ':' +
|
||||||
isHandlerActive(ctx));
|
isHandlerActive(ctx));
|
||||||
} else {
|
} else {
|
||||||
logger.debug("Normal unsuspend: " + config.isAutoRead() + ":"
|
logger.debug("Normal unsuspend: " + config.isAutoRead() + ':'
|
||||||
+ isHandlerActive(ctx));
|
+ isHandlerActive(ctx));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -459,7 +460,7 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
|
|||||||
ctx.channel().read();
|
ctx.channel().read();
|
||||||
}
|
}
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("Unsupsend final status => " + config.isAutoRead() + ":"
|
logger.debug("Unsupsend final status => " + config.isAutoRead() + ':'
|
||||||
+ isHandlerActive(ctx));
|
+ isHandlerActive(ctx));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -486,7 +487,7 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
|
|||||||
// Only AutoRead AND HandlerActive True means Context Active
|
// Only AutoRead AND HandlerActive True means Context Active
|
||||||
ChannelConfig config = ctx.channel().config();
|
ChannelConfig config = ctx.channel().config();
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("Read suspend: " + wait + ":" + config.isAutoRead() + ":"
|
logger.debug("Read suspend: " + wait + ':' + config.isAutoRead() + ':'
|
||||||
+ isHandlerActive(ctx));
|
+ isHandlerActive(ctx));
|
||||||
}
|
}
|
||||||
if (config.isAutoRead() && isHandlerActive(ctx)) {
|
if (config.isAutoRead() && isHandlerActive(ctx)) {
|
||||||
@ -502,7 +503,7 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
|
|||||||
}
|
}
|
||||||
ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
|
ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("Suspend final status => " + config.isAutoRead() + ":"
|
logger.debug("Suspend final status => " + config.isAutoRead() + ':'
|
||||||
+ isHandlerActive(ctx) + " will reopened at: " + wait);
|
+ isHandlerActive(ctx) + " will reopened at: " + wait);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -554,7 +555,7 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
|
|||||||
long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
|
long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
|
||||||
if (wait >= MINIMAL_WAIT) {
|
if (wait >= MINIMAL_WAIT) {
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("Write suspend: " + wait + ":" + ctx.channel().config().isAutoRead() + ":"
|
logger.debug("Write suspend: " + wait + ':' + ctx.channel().config().isAutoRead() + ':'
|
||||||
+ isHandlerActive(ctx));
|
+ isHandlerActive(ctx));
|
||||||
}
|
}
|
||||||
submitWrite(ctx, msg, size, wait, now, promise);
|
submitWrite(ctx, msg, size, wait, now, promise);
|
||||||
@ -572,8 +573,8 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
|
|||||||
delay, TrafficCounter.milliSecondFromNano(), promise);
|
delay, TrafficCounter.milliSecondFromNano(), promise);
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract void submitWrite(final ChannelHandlerContext ctx, final Object msg, final long size,
|
abstract void submitWrite(
|
||||||
final long delay, final long now, final ChannelPromise promise);
|
ChannelHandlerContext ctx, Object msg, long size, long delay, long now, ChannelPromise promise);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
|
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
|
||||||
@ -624,7 +625,7 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
|
|||||||
.append(" maxSize: ").append(maxWriteSize)
|
.append(" maxSize: ").append(maxWriteSize)
|
||||||
.append(" and Counter: ");
|
.append(" and Counter: ");
|
||||||
if (trafficCounter != null) {
|
if (trafficCounter != null) {
|
||||||
builder.append(trafficCounter.toString());
|
builder.append(trafficCounter);
|
||||||
} else {
|
} else {
|
||||||
builder.append("none");
|
builder.append("none");
|
||||||
}
|
}
|
||||||
|
@ -15,17 +15,17 @@
|
|||||||
*/
|
*/
|
||||||
package io.netty.handler.traffic;
|
package io.netty.handler.traffic;
|
||||||
|
|
||||||
import java.util.ArrayDeque;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.ChannelPromise;
|
import io.netty.channel.ChannelPromise;
|
||||||
|
|
||||||
|
import java.util.ArrayDeque;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>This implementation of the {@link AbstractTrafficShapingHandler} is for channel
|
* <p>This implementation of the {@link AbstractTrafficShapingHandler} is for channel
|
||||||
* traffic shaping, that is to say a per channel limitation of the bandwidth.</p>
|
* traffic shaping, that is to say a per channel limitation of the bandwidth.</p>
|
||||||
* <p>Note the index used in <code>OutboundBuffer.setUserDefinedWritability(index, boolean)</code> is <b>1</b>.</p>
|
* <p>Note the index used in {@code OutboundBuffer.setUserDefinedWritability(index, boolean)} is <b>1</b>.</p>
|
||||||
*
|
*
|
||||||
* <p>The general use should be as follow:</p>
|
* <p>The general use should be as follow:</p>
|
||||||
* <ul>
|
* <ul>
|
||||||
@ -48,10 +48,10 @@ import io.netty.channel.ChannelPromise;
|
|||||||
*
|
*
|
||||||
* <p>maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.</p>
|
* <p>maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.</p>
|
||||||
* </li>
|
* </li>
|
||||||
* <li>In your handler, you should consider to use the <code>channel.isWritable()</code> and
|
* <li>In your handler, you should consider to use the {@code channel.isWritable()} and
|
||||||
* <code>channelWritabilityChanged(ctx)</code> to handle writability, or through
|
* {@code channelWritabilityChanged(ctx)} to handle writability, or through
|
||||||
* <code>future.addListener(new GenericFutureListener())</code> on the future returned by
|
* {@code future.addListener(new GenericFutureListener())} on the future returned by
|
||||||
* <code>ctx.write()</code>.</li>
|
* {@code ctx.write()}.</li>
|
||||||
* <li><p>You shall also consider to have object size in read or write operations relatively adapted to
|
* <li><p>You shall also consider to have object size in read or write operations relatively adapted to
|
||||||
* the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
|
* the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
|
||||||
* while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.</p></li>
|
* while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.</p></li>
|
||||||
@ -168,7 +168,7 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler
|
|||||||
final ChannelPromise promise;
|
final ChannelPromise promise;
|
||||||
|
|
||||||
private ToSend(final long delay, final Object toSend, final ChannelPromise promise) {
|
private ToSend(final long delay, final Object toSend, final ChannelPromise promise) {
|
||||||
this.relativeTimeAction = delay;
|
relativeTimeAction = delay;
|
||||||
this.toSend = toSend;
|
this.toSend = toSend;
|
||||||
this.promise = promise;
|
this.promise = promise;
|
||||||
}
|
}
|
||||||
|
@ -86,6 +86,7 @@ public class GlobalChannelTrafficCounter extends TrafficCounter {
|
|||||||
/**
|
/**
|
||||||
* Start the monitoring process.
|
* Start the monitoring process.
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public synchronized void start() {
|
public synchronized void start() {
|
||||||
if (monitorActive) {
|
if (monitorActive) {
|
||||||
return;
|
return;
|
||||||
@ -103,6 +104,7 @@ public class GlobalChannelTrafficCounter extends TrafficCounter {
|
|||||||
/**
|
/**
|
||||||
* Stop the monitoring process.
|
* Stop the monitoring process.
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public synchronized void stop() {
|
public synchronized void stop() {
|
||||||
if (!monitorActive) {
|
if (!monitorActive) {
|
||||||
return;
|
return;
|
||||||
|
@ -66,10 +66,10 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||||||
*
|
*
|
||||||
* maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.<br><br>
|
* maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.<br><br>
|
||||||
* </li>
|
* </li>
|
||||||
* <li>In your handler, you should consider to use the <code>channel.isWritable()</code> and
|
* <li>In your handler, you should consider to use the {@code channel.isWritable()} and
|
||||||
* <code>channelWritabilityChanged(ctx)</code> to handle writability, or through
|
* {@code channelWritabilityChanged(ctx)} to handle writability, or through
|
||||||
* <code>future.addListener(new GenericFutureListener())</code> on the future returned by
|
* {@code future.addListener(new GenericFutureListener())} on the future returned by
|
||||||
* <code>ctx.write()</code>.</li>
|
* {@code ctx.write()}.</li>
|
||||||
* <li>You shall also consider to have object size in read or write operations relatively adapted to
|
* <li>You shall also consider to have object size in read or write operations relatively adapted to
|
||||||
* the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
|
* the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
|
||||||
* while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.<br><br></li>
|
* while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.<br><br></li>
|
||||||
@ -528,7 +528,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
|
|||||||
wait = perChannel.channelTrafficCounter.readTimeToWait(size, readChannelLimit, maxTime, now);
|
wait = perChannel.channelTrafficCounter.readTimeToWait(size, readChannelLimit, maxTime, now);
|
||||||
if (readDeviationActive) {
|
if (readDeviationActive) {
|
||||||
// now try to balance between the channels
|
// now try to balance between the channels
|
||||||
long maxLocalRead = 0;
|
long maxLocalRead;
|
||||||
maxLocalRead = perChannel.channelTrafficCounter.cumulativeReadBytes();
|
maxLocalRead = perChannel.channelTrafficCounter.cumulativeReadBytes();
|
||||||
long maxGlobalRead = cumulativeReadBytes.get();
|
long maxGlobalRead = cumulativeReadBytes.get();
|
||||||
if (maxLocalRead <= 0) {
|
if (maxLocalRead <= 0) {
|
||||||
@ -549,7 +549,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
|
|||||||
// Only AutoRead AND HandlerActive True means Context Active
|
// Only AutoRead AND HandlerActive True means Context Active
|
||||||
ChannelConfig config = ctx.channel().config();
|
ChannelConfig config = ctx.channel().config();
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("Read Suspend: " + wait + ":" + config.isAutoRead() + ":"
|
logger.debug("Read Suspend: " + wait + ':' + config.isAutoRead() + ':'
|
||||||
+ isHandlerActive(ctx));
|
+ isHandlerActive(ctx));
|
||||||
}
|
}
|
||||||
if (config.isAutoRead() && isHandlerActive(ctx)) {
|
if (config.isAutoRead() && isHandlerActive(ctx)) {
|
||||||
@ -565,7 +565,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
|
|||||||
}
|
}
|
||||||
ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
|
ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("Suspend final status => " + config.isAutoRead() + ":"
|
logger.debug("Suspend final status => " + config.isAutoRead() + ':'
|
||||||
+ isHandlerActive(ctx) + " will reopened at: " + wait);
|
+ isHandlerActive(ctx) + " will reopened at: " + wait);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -603,7 +603,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
|
|||||||
final long size;
|
final long size;
|
||||||
|
|
||||||
private ToSend(final long delay, final Object toSend, final long size, final ChannelPromise promise) {
|
private ToSend(final long delay, final Object toSend, final long size, final ChannelPromise promise) {
|
||||||
this.relativeTimeAction = delay;
|
relativeTimeAction = delay;
|
||||||
this.toSend = toSend;
|
this.toSend = toSend;
|
||||||
this.size = size;
|
this.size = size;
|
||||||
this.promise = promise;
|
this.promise = promise;
|
||||||
@ -623,17 +623,20 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
|
|||||||
* @return the list of TrafficCounters that exists at the time of the call.
|
* @return the list of TrafficCounters that exists at the time of the call.
|
||||||
*/
|
*/
|
||||||
public Collection<TrafficCounter> channelTrafficCounters() {
|
public Collection<TrafficCounter> channelTrafficCounters() {
|
||||||
Collection<TrafficCounter> valueCollection = new AbstractCollection<TrafficCounter>() {
|
return new AbstractCollection<TrafficCounter>() {
|
||||||
@Override
|
@Override
|
||||||
public Iterator<TrafficCounter> iterator() {
|
public Iterator<TrafficCounter> iterator() {
|
||||||
return new Iterator<TrafficCounter>() {
|
return new Iterator<TrafficCounter>() {
|
||||||
final Iterator<PerChannel> iter = channelQueues.values().iterator();
|
final Iterator<PerChannel> iter = channelQueues.values().iterator();
|
||||||
|
@Override
|
||||||
public boolean hasNext() {
|
public boolean hasNext() {
|
||||||
return iter.hasNext();
|
return iter.hasNext();
|
||||||
}
|
}
|
||||||
|
@Override
|
||||||
public TrafficCounter next() {
|
public TrafficCounter next() {
|
||||||
return iter.next().channelTrafficCounter;
|
return iter.next().channelTrafficCounter;
|
||||||
}
|
}
|
||||||
|
@Override
|
||||||
public void remove() {
|
public void remove() {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
@ -644,7 +647,6 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
|
|||||||
return channelQueues.size();
|
return channelQueues.size();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
return valueCollection;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -662,7 +664,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
|
|||||||
wait = perChannel.channelTrafficCounter.writeTimeToWait(size, writeChannelLimit, maxTime, now);
|
wait = perChannel.channelTrafficCounter.writeTimeToWait(size, writeChannelLimit, maxTime, now);
|
||||||
if (writeDeviationActive) {
|
if (writeDeviationActive) {
|
||||||
// now try to balance between the channels
|
// now try to balance between the channels
|
||||||
long maxLocalWrite = 0;
|
long maxLocalWrite;
|
||||||
maxLocalWrite = perChannel.channelTrafficCounter.cumulativeWrittenBytes();
|
maxLocalWrite = perChannel.channelTrafficCounter.cumulativeWrittenBytes();
|
||||||
long maxGlobalWrite = cumulativeWrittenBytes.get();
|
long maxGlobalWrite = cumulativeWrittenBytes.get();
|
||||||
if (maxLocalWrite <= 0) {
|
if (maxLocalWrite <= 0) {
|
||||||
@ -679,7 +681,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
|
|||||||
}
|
}
|
||||||
if (wait >= MINIMAL_WAIT) {
|
if (wait >= MINIMAL_WAIT) {
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("Write suspend: " + wait + ":" + ctx.channel().config().isAutoRead() + ":"
|
logger.debug("Write suspend: " + wait + ':' + ctx.channel().config().isAutoRead() + ':'
|
||||||
+ isHandlerActive(ctx));
|
+ isHandlerActive(ctx));
|
||||||
}
|
}
|
||||||
submitWrite(ctx, msg, size, wait, now, promise);
|
submitWrite(ctx, msg, size, wait, now, promise);
|
||||||
|
@ -33,7 +33,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||||||
* <p>This implementation of the {@link AbstractTrafficShapingHandler} is for global
|
* <p>This implementation of the {@link AbstractTrafficShapingHandler} is for global
|
||||||
* traffic shaping, that is to say a global limitation of the bandwidth, whatever
|
* traffic shaping, that is to say a global limitation of the bandwidth, whatever
|
||||||
* the number of opened channels.</p>
|
* the number of opened channels.</p>
|
||||||
* <p>Note the index used in <code>OutboundBuffer.setUserDefinedWritability(index, boolean)</code> is <b>2</b>.</p>
|
* <p>Note the index used in {@code OutboundBuffer.setUserDefinedWritability(index, boolean)} is <b>2</b>.</p>
|
||||||
*
|
*
|
||||||
* <p>The general use should be as follow:</p>
|
* <p>The general use should be as follow:</p>
|
||||||
* <ul>
|
* <ul>
|
||||||
@ -57,10 +57,10 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||||||
*
|
*
|
||||||
* <p>maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.</p>
|
* <p>maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.</p>
|
||||||
* </li>
|
* </li>
|
||||||
* <li>In your handler, you should consider to use the <code>channel.isWritable()</code> and
|
* <li>In your handler, you should consider to use the {@code channel.isWritable()} and
|
||||||
* <code>channelWritabilityChanged(ctx)</code> to handle writability, or through
|
* {@code channelWritabilityChanged(ctx)} to handle writability, or through
|
||||||
* <code>future.addListener(new GenericFutureListener())</code> on the future returned by
|
* {@code future.addListener(new GenericFutureListener())} on the future returned by
|
||||||
* <code>ctx.write()</code>.</li>
|
* {@code ctx.write()}.</li>
|
||||||
* <li><p>You shall also consider to have object size in read or write operations relatively adapted to
|
* <li><p>You shall also consider to have object size in read or write operations relatively adapted to
|
||||||
* the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
|
* the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
|
||||||
* while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.</p></li>
|
* while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.</p></li>
|
||||||
@ -192,7 +192,6 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
|
|||||||
* the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
|
* the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
|
||||||
*/
|
*/
|
||||||
public GlobalTrafficShapingHandler(EventExecutor executor) {
|
public GlobalTrafficShapingHandler(EventExecutor executor) {
|
||||||
super();
|
|
||||||
createGlobalTrafficCounter(executor);
|
createGlobalTrafficCounter(executor);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -314,7 +313,7 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
|
|||||||
final ChannelPromise promise;
|
final ChannelPromise promise;
|
||||||
|
|
||||||
private ToSend(final long delay, final Object toSend, final long size, final ChannelPromise promise) {
|
private ToSend(final long delay, final Object toSend, final long size, final ChannelPromise promise) {
|
||||||
this.relativeTimeAction = delay;
|
relativeTimeAction = delay;
|
||||||
this.toSend = toSend;
|
this.toSend = toSend;
|
||||||
this.size = size;
|
this.size = size;
|
||||||
this.promise = promise;
|
this.promise = promise;
|
||||||
|
@ -40,7 +40,7 @@ public class TrafficCounter {
|
|||||||
/**
|
/**
|
||||||
* @return the time in ms using nanoTime, so not real EPOCH time but elapsed time in ms.
|
* @return the time in ms using nanoTime, so not real EPOCH time but elapsed time in ms.
|
||||||
*/
|
*/
|
||||||
public static final long milliSecondFromNano() {
|
public static long milliSecondFromNano() {
|
||||||
return System.nanoTime() / 1000000;
|
return System.nanoTime() / 1000000;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -245,7 +245,7 @@ public class TrafficCounter {
|
|||||||
// nothing to do
|
// nothing to do
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (logger.isDebugEnabled() && (interval > checkInterval() << 1)) {
|
if (logger.isDebugEnabled() && interval > checkInterval() << 1) {
|
||||||
logger.debug("Acct schedule not ok: " + interval + " > 2*" + checkInterval() + " from " + name);
|
logger.debug("Acct schedule not ok: " + interval + " > 2*" + checkInterval() + " from " + name);
|
||||||
}
|
}
|
||||||
lastReadBytes = currentReadBytes.getAndSet(0);
|
lastReadBytes = currentReadBytes.getAndSet(0);
|
||||||
@ -500,7 +500,7 @@ public class TrafficCounter {
|
|||||||
long time = sum * 1000 / limitTraffic - interval + pastDelay;
|
long time = sum * 1000 / limitTraffic - interval + pastDelay;
|
||||||
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
|
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("Time: " + time + ":" + sum + ":" + interval + ":" + pastDelay);
|
logger.debug("Time: " + time + ':' + sum + ':' + interval + ':' + pastDelay);
|
||||||
}
|
}
|
||||||
if (time > maxTime && now + time - localReadingTime > maxTime) {
|
if (time > maxTime && now + time - localReadingTime > maxTime) {
|
||||||
time = maxTime;
|
time = maxTime;
|
||||||
@ -517,7 +517,7 @@ public class TrafficCounter {
|
|||||||
long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
|
long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
|
||||||
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
|
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval + ":" + pastDelay);
|
logger.debug("Time: " + time + ':' + lastsum + ':' + lastinterval + ':' + pastDelay);
|
||||||
}
|
}
|
||||||
if (time > maxTime && now + time - localReadingTime > maxTime) {
|
if (time > maxTime && now + time - localReadingTime > maxTime) {
|
||||||
time = maxTime;
|
time = maxTime;
|
||||||
@ -575,7 +575,7 @@ public class TrafficCounter {
|
|||||||
long time = sum * 1000 / limitTraffic - interval + pastDelay;
|
long time = sum * 1000 / limitTraffic - interval + pastDelay;
|
||||||
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
|
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("Time: " + time + ":" + sum + ":" + interval + ":" + pastDelay);
|
logger.debug("Time: " + time + ':' + sum + ':' + interval + ':' + pastDelay);
|
||||||
}
|
}
|
||||||
if (time > maxTime && now + time - localWritingTime > maxTime) {
|
if (time > maxTime && now + time - localWritingTime > maxTime) {
|
||||||
time = maxTime;
|
time = maxTime;
|
||||||
@ -592,7 +592,7 @@ public class TrafficCounter {
|
|||||||
long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
|
long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
|
||||||
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
|
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval + ":" + pastDelay);
|
logger.debug("Time: " + time + ':' + lastsum + ':' + lastinterval + ':' + pastDelay);
|
||||||
}
|
}
|
||||||
if (time > maxTime && now + time - localWritingTime > maxTime) {
|
if (time > maxTime && now + time - localWritingTime > maxTime) {
|
||||||
time = maxTime;
|
time = maxTime;
|
||||||
|
Loading…
Reference in New Issue
Block a user