diff --git a/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java index 4457a52761..51784beaf7 100644 --- a/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java +++ b/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java @@ -18,7 +18,9 @@ package io.netty.handler.traffic; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufHolder; import io.netty.channel.ChannelHandlerAdapter; +import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPromise; import io.netty.util.Attribute; import io.netty.util.AttributeKey; @@ -28,16 +30,15 @@ import io.netty.util.internal.logging.InternalLoggerFactory; import java.util.concurrent.TimeUnit; /** - * AbstractTrafficShapingHandler allows to limit the global bandwidth + *

AbstractTrafficShapingHandler allows to limit the global bandwidth * (see {@link GlobalTrafficShapingHandler}) or per session * bandwidth (see {@link ChannelTrafficShapingHandler}), as traffic shaping. * It allows you to implement an almost real time monitoring of the bandwidth using * the monitors from {@link TrafficCounter} that will call back every checkInterval - * the method doAccounting of this handler.
- *
+ * the method doAccounting of this handler.

* - * If you want for any particular reasons to stop the monitoring (accounting) or to change - * the read/write limit or the check interval, several methods allow that for you:
+ *

If you want for any particular reasons to stop the monitoring (accounting) or to change + * the read/write limit or the check interval, several methods allow that for you:

* */ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler { - private List messagesQueue = new LinkedList(); + private final ArrayDeque messagesQueue = new ArrayDeque(); + private long queueSize; /** - * Create a new instance + * Create a new instance. * * @param writeLimit * 0 or a limit in bytes/s @@ -62,9 +75,9 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler * 0 or a limit in bytes/s * @param checkInterval * The delay between two computations of performances for - * channels or 0 if no stats are to be computed + * channels or 0 if no stats are to be computed. * @param maxTime - * The maximum delay to wait in case of traffic excess + * The maximum delay to wait in case of traffic excess. */ public ChannelTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval, long maxTime) { @@ -72,7 +85,8 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler } /** - * Create a new instance + * Create a new instance using default + * max time as delay allowed value of 15000 ms. * * @param writeLimit * 0 or a limit in bytes/s @@ -80,7 +94,7 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler * 0 or a limit in bytes/s * @param checkInterval * The delay between two computations of performances for - * channels or 0 if no stats are to be computed + * channels or 0 if no stats are to be computed. */ public ChannelTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval) { @@ -88,7 +102,8 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler } /** - * Create a new instance + * Create a new instance using default Check Interval value of 1000 ms and + * max time as delay allowed value of 15000 ms. * * @param writeLimit * 0 or a limit in bytes/s @@ -101,11 +116,12 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler } /** - * Create a new instance + * Create a new instance using + * default max time as delay allowed value of 15000 ms and no limit. * * @param checkInterval * The delay between two computations of performances for - * channels or 0 if no stats are to be computed + * channels or 0 if no stats are to be computed. */ public ChannelTrafficShapingHandler(long checkInterval) { super(checkInterval); @@ -121,58 +137,95 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler } @Override - public synchronized void handlerRemoved(ChannelHandlerContext ctx) throws Exception { - if (trafficCounter != null) { - trafficCounter.stop(); - } - for (ToSend toSend : messagesQueue) { - if (toSend.toSend instanceof ByteBuf) { - ((ByteBuf) toSend.toSend).release(); + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + trafficCounter.stop(); + // write order control + synchronized (this) { + if (ctx.channel().isActive()) { + for (ToSend toSend : messagesQueue) { + long size = calculateSize(toSend.toSend); + trafficCounter.bytesRealWriteFlowControl(size); + queueSize -= size; + ctx.write(toSend.toSend, toSend.promise); + } + } else { + for (ToSend toSend : messagesQueue) { + if (toSend.toSend instanceof ByteBuf) { + ((ByteBuf) toSend.toSend).release(); + } + } } + messagesQueue.clear(); } - messagesQueue.clear(); + releaseWriteSuspended(ctx); + releaseReadSuspended(ctx); super.handlerRemoved(ctx); } private static final class ToSend { - final long date; + final long relativeTimeAction; final Object toSend; final ChannelPromise promise; private ToSend(final long delay, final Object toSend, final ChannelPromise promise) { - this.date = System.currentTimeMillis() + delay; + this.relativeTimeAction = delay; this.toSend = toSend; this.promise = promise; } } @Override - protected synchronized void submitWrite(final ChannelHandlerContext ctx, final Object msg, final long delay, + void submitWrite(final ChannelHandlerContext ctx, final Object msg, + final long size, final long delay, final long now, final ChannelPromise promise) { - if (delay == 0 && messagesQueue.isEmpty()) { - ctx.write(msg, promise); - return; + final ToSend newToSend; + // write order control + synchronized (this) { + if (delay == 0 && messagesQueue.isEmpty()) { + trafficCounter.bytesRealWriteFlowControl(size); + ctx.write(msg, promise); + return; + } + newToSend = new ToSend(delay + now, msg, promise); + messagesQueue.addLast(newToSend); + queueSize += size; + checkWriteSuspend(ctx, delay, queueSize); } - final ToSend newToSend = new ToSend(delay, msg, promise); - messagesQueue.add(newToSend); + final long futureNow = newToSend.relativeTimeAction; ctx.executor().schedule(new Runnable() { @Override public void run() { - sendAllValid(ctx); + sendAllValid(ctx, futureNow); } }, delay, TimeUnit.MILLISECONDS); } - private synchronized void sendAllValid(ChannelHandlerContext ctx) { - while (!messagesQueue.isEmpty()) { - ToSend newToSend = messagesQueue.remove(0); - if (newToSend.date <= System.currentTimeMillis()) { - ctx.write(newToSend.toSend, newToSend.promise); - } else { - messagesQueue.add(0, newToSend); - break; + private void sendAllValid(final ChannelHandlerContext ctx, final long now) { + // write order control + synchronized (this) { + ToSend newToSend = messagesQueue.pollFirst(); + for (; newToSend != null; newToSend = messagesQueue.pollFirst()) { + if (newToSend.relativeTimeAction <= now) { + long size = calculateSize(newToSend.toSend); + trafficCounter.bytesRealWriteFlowControl(size); + queueSize -= size; + ctx.write(newToSend.toSend, newToSend.promise); + } else { + messagesQueue.addFirst(newToSend); + break; + } + } + if (messagesQueue.isEmpty()) { + releaseWriteSuspended(ctx); } } ctx.flush(); } + + /** + * @return current size in bytes of the write buffer. + */ + public long queueSize() { + return queueSize; + } } diff --git a/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficCounter.java b/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficCounter.java new file mode 100644 index 0000000000..6be10863dc --- /dev/null +++ b/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficCounter.java @@ -0,0 +1,127 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.traffic; + +import io.netty.handler.traffic.GlobalChannelTrafficShapingHandler.PerChannel; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Version for {@link GlobalChannelTrafficShapingHandler}. + * This TrafficCounter is the Global one, and its special property is to directly handle + * other channel's TrafficCounters. In particular, there are no scheduler for those + * channel's TrafficCounters because it is managed by this one. + */ +public class GlobalChannelTrafficCounter extends TrafficCounter { + /** + * @param trafficShapingHandler the associated {@link GlobalChannelTrafficShapingHandler}. + * @param executor the underlying executor service for scheduling checks (both Global and per Channel). + * @param name the name given to this monitor. + * @param checkInterval the checkInterval in millisecond between two computations. + */ + public GlobalChannelTrafficCounter(GlobalChannelTrafficShapingHandler trafficShapingHandler, + ScheduledExecutorService executor, String name, long checkInterval) { + super(trafficShapingHandler, executor, name, checkInterval); + if (executor == null) { + throw new IllegalArgumentException("Executor must not be null"); + } + } + + /** + * Class to implement monitoring at fix delay. + * This version is Mixed in the way it mixes Global and Channel counters. + */ + private static class MixedTrafficMonitoringTask implements Runnable { + /** + * The associated TrafficShapingHandler + */ + private final GlobalChannelTrafficShapingHandler trafficShapingHandler1; + + /** + * The associated TrafficCounter + */ + private final TrafficCounter counter; + + /** + * @param trafficShapingHandler The parent handler to which this task needs to callback to for accounting. + * @param counter The parent TrafficCounter that we need to reset the statistics for. + */ + MixedTrafficMonitoringTask( + GlobalChannelTrafficShapingHandler trafficShapingHandler, + TrafficCounter counter) { + trafficShapingHandler1 = trafficShapingHandler; + this.counter = counter; + } + + @Override + public void run() { + if (!counter.monitorActive) { + return; + } + long newLastTime = milliSecondFromNano(); + counter.resetAccounting(newLastTime); + for (PerChannel perChannel : trafficShapingHandler1.channelQueues.values()) { + perChannel.channelTrafficCounter.resetAccounting(newLastTime); + } + trafficShapingHandler1.doAccounting(counter); + counter.scheduledFuture = counter.executor.schedule(this, counter.checkInterval.get(), + TimeUnit.MILLISECONDS); + } + } + + /** + * Start the monitoring process. + */ + public synchronized void start() { + if (monitorActive) { + return; + } + lastTime.set(milliSecondFromNano()); + long localCheckInterval = checkInterval.get(); + if (localCheckInterval > 0) { + monitorActive = true; + monitor = new MixedTrafficMonitoringTask((GlobalChannelTrafficShapingHandler) trafficShapingHandler, this); + scheduledFuture = + executor.schedule(monitor, localCheckInterval, TimeUnit.MILLISECONDS); + } + } + + /** + * Stop the monitoring process. + */ + public synchronized void stop() { + if (!monitorActive) { + return; + } + monitorActive = false; + resetAccounting(milliSecondFromNano()); + trafficShapingHandler.doAccounting(this); + if (scheduledFuture != null) { + scheduledFuture.cancel(true); + } + } + + @Override + public void resetCumulativeTime() { + for (PerChannel perChannel : + ((GlobalChannelTrafficShapingHandler) trafficShapingHandler).channelQueues.values()) { + perChannel.channelTrafficCounter.resetCumulativeTime(); + } + super.resetCumulativeTime(); + } + +} diff --git a/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficShapingHandler.java new file mode 100644 index 0000000000..2a7edd2cbf --- /dev/null +++ b/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficShapingHandler.java @@ -0,0 +1,773 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.traffic; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.util.Attribute; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.internal.PlatformDependent; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +import java.util.AbstractCollection; +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Iterator; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * This implementation of the {@link AbstractTrafficShapingHandler} is for global + * and per channel traffic shaping, that is to say a global limitation of the bandwidth, whatever + * the number of opened channels and a per channel limitation of the bandwidth.

+ * This version shall not be in the same pipeline than other TrafficShapingHandler.

+ * + * The general use should be as follow:
+ *
    + *
  • Create your unique GlobalChannelTrafficShapingHandler like:

    + * GlobalChannelTrafficShapingHandler myHandler = new GlobalChannelTrafficShapingHandler(executor);

    + * The executor could be the underlying IO worker pool
    + * pipeline.addLast(myHandler);

    + * + * Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created + * and shared among all channels as the counter must be shared among all channels.

    + * + * Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation) + * or the check interval (in millisecond) that represents the delay between two computations of the + * bandwidth and so the call back of the doAccounting method (0 means no accounting at all).
    + * Note that as this is a fusion of both Global and Channel Traffic Shaping, limits are in 2 sets, + * respectively Global and Channel.

    + * + * A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting, + * it is recommended to set a positive value, even if it is high since the precision of the + * Traffic Shaping depends on the period where the traffic is computed. The highest the interval, + * the less precise the traffic shaping will be. It is suggested as higher value something close + * to 5 or 10 minutes.

    + * + * maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.

    + *
  • + *
  • In your handler, you should consider to use the channel.isWritable() and + * channelWritabilityChanged(ctx) to handle writability, or through + * future.addListener(new GenericFutureListener()) on the future returned by + * ctx.write().
  • + *
  • You shall also consider to have object size in read or write operations relatively adapted to + * the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect, + * while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.

  • + *
  • Some configuration methods will be taken as best effort, meaning + * that all already scheduled traffics will not be + * changed, but only applied to new traffics.
    + * So the expected usage of those methods are to be used not too often, + * accordingly to the traffic shaping configuration.
  • + *

+ * + * Be sure to call {@link #release()} once this handler is not needed anymore to release all internal resources. + * This will not shutdown the {@link EventExecutor} as it may be shared, so you need to do this by your own. + */ +@Sharable +public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHandler { + private static final InternalLogger logger = + InternalLoggerFactory.getInstance(GlobalChannelTrafficShapingHandler.class); + /** + * All queues per channel + */ + final ConcurrentMap channelQueues = PlatformDependent.newConcurrentHashMap(); + + /** + * Global queues size + */ + private final AtomicLong queuesSize = new AtomicLong(); + + /** + * Maximum cumulative writing bytes for one channel among all (as long as channels stay the same) + */ + private final AtomicLong cumulativeWrittenBytes = new AtomicLong(); + + /** + * Maximum cumulative read bytes for one channel among all (as long as channels stay the same) + */ + private final AtomicLong cumulativeReadBytes = new AtomicLong(); + + /** + * Max size in the list before proposing to stop writing new objects from next handlers + * for all channel (global) + */ + volatile long maxGlobalWriteSize = DEFAULT_MAX_SIZE * 100; // default 400MB + + /** + * Limit in B/s to apply to write + */ + private volatile long writeChannelLimit; + + /** + * Limit in B/s to apply to read + */ + private volatile long readChannelLimit; + + private static final float DEFAULT_DEVIATION = 0.1F; + private static final float MAX_DEVIATION = 0.4F; + private static final float DEFAULT_SLOWDOWN = 0.4F; + private static final float DEFAULT_ACCELERATION = -0.1F; + private volatile float maxDeviation; + private volatile float accelerationFactor; + private volatile float slowDownFactor; + private volatile boolean readDeviationActive; + private volatile boolean writeDeviationActive; + + static final class PerChannel { + ArrayDeque messagesQueue; + TrafficCounter channelTrafficCounter; + long queueSize; + long lastWriteTimestamp; + long lastReadTimestamp; + } + + /** + * Create the global TrafficCounter + */ + void createGlobalTrafficCounter(ScheduledExecutorService executor) { + // Default + setMaxDeviation(DEFAULT_DEVIATION, DEFAULT_SLOWDOWN, DEFAULT_ACCELERATION); + if (executor == null) { + throw new IllegalArgumentException("Executor must not be null"); + } + TrafficCounter tc = new GlobalChannelTrafficCounter(this, executor, "GlobalChannelTC", checkInterval); + setTrafficCounter(tc); + tc.start(); + } + + @Override + int userDefinedWritabilityIndex() { + return AbstractTrafficShapingHandler.GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX; + } + + /** + * Create a new instance. + * + * @param executor + * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. + * @param writeGlobalLimit + * 0 or a limit in bytes/s + * @param readGlobalLimit + * 0 or a limit in bytes/s + * @param writeChannelLimit + * 0 or a limit in bytes/s + * @param readChannelLimit + * 0 or a limit in bytes/s + * @param checkInterval + * The delay between two computations of performances for + * channels or 0 if no stats are to be computed. + * @param maxTime + * The maximum delay to wait in case of traffic excess. + */ + public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, + long writeGlobalLimit, long readGlobalLimit, + long writeChannelLimit, long readChannelLimit, + long checkInterval, long maxTime) { + super(writeGlobalLimit, readGlobalLimit, checkInterval, maxTime); + createGlobalTrafficCounter(executor); + this.writeChannelLimit = writeChannelLimit; + this.readChannelLimit = readChannelLimit; + } + + /** + * Create a new instance. + * + * @param executor + * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. + * @param writeGlobalLimit + * 0 or a limit in bytes/s + * @param readGlobalLimit + * 0 or a limit in bytes/s + * @param writeChannelLimit + * 0 or a limit in bytes/s + * @param readChannelLimit + * 0 or a limit in bytes/s + * @param checkInterval + * The delay between two computations of performances for + * channels or 0 if no stats are to be computed. + */ + public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, + long writeGlobalLimit, long readGlobalLimit, + long writeChannelLimit, long readChannelLimit, + long checkInterval) { + super(writeGlobalLimit, readGlobalLimit, checkInterval); + this.writeChannelLimit = writeChannelLimit; + this.readChannelLimit = readChannelLimit; + createGlobalTrafficCounter(executor); + } + + /** + * Create a new instance. + * + * @param executor + * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. + * @param writeGlobalLimit + * 0 or a limit in bytes/s + * @param readGlobalLimit + * 0 or a limit in bytes/s + * @param writeChannelLimit + * 0 or a limit in bytes/s + * @param readChannelLimit + * 0 or a limit in bytes/s + */ + public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, + long writeGlobalLimit, long readGlobalLimit, + long writeChannelLimit, long readChannelLimit) { + super(writeGlobalLimit, readGlobalLimit); + this.writeChannelLimit = writeChannelLimit; + this.readChannelLimit = readChannelLimit; + createGlobalTrafficCounter(executor); + } + + /** + * Create a new instance. + * + * @param executor + * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. + * @param checkInterval + * The delay between two computations of performances for + * channels or 0 if no stats are to be computed. + */ + public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, long checkInterval) { + super(checkInterval); + createGlobalTrafficCounter(executor); + } + + /** + * Create a new instance. + * + * @param executor + * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. + */ + public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor) { + createGlobalTrafficCounter(executor); + } + + /** + * @return the current max deviation + */ + public float maxDeviation() { + return maxDeviation; + } + + /** + * @return the current acceleration factor + */ + public float accelerationFactor() { + return accelerationFactor; + } + + /** + * @return the current slow down factor + */ + public float slowDownFactor() { + return slowDownFactor; + } + + /** + * @param maxDeviation + * the maximum deviation to allow during computation of average, default deviation + * being 0.1, so +/-10% of the desired bandwidth. Maximum being 0.4. + * @param slowDownFactor + * the factor set as +x% to the too fast client (minimal value being 0, meaning no + * slow down factor), default being 40% (0.4). + * @param accelerationFactor + * the factor set as -x% to the too slow client (maximal value being 0, meaning no + * acceleration factor), default being -10% (-0.1). + */ + public void setMaxDeviation(float maxDeviation, float slowDownFactor, float accelerationFactor) { + if (maxDeviation > MAX_DEVIATION) { + throw new IllegalArgumentException("maxDeviation must be <= " + MAX_DEVIATION); + } + if (slowDownFactor < 0) { + throw new IllegalArgumentException("slowDownFactor must be >= 0"); + } + if (accelerationFactor > 0) { + throw new IllegalArgumentException("accelerationFactor must be <= 0"); + } + this.maxDeviation = maxDeviation; + this.accelerationFactor = 1 + accelerationFactor; + this.slowDownFactor = 1 + slowDownFactor; + } + + private void computeDeviationCumulativeBytes() { + // compute the maximum cumulativeXxxxBytes among still connected Channels + long maxWrittenBytes = 0; + long maxReadBytes = 0; + long minWrittenBytes = Long.MAX_VALUE; + long minReadBytes = Long.MAX_VALUE; + for (PerChannel perChannel : channelQueues.values()) { + long value = perChannel.channelTrafficCounter.cumulativeWrittenBytes(); + if (maxWrittenBytes < value) { + maxWrittenBytes = value; + } + if (minWrittenBytes > value) { + minWrittenBytes = value; + } + value = perChannel.channelTrafficCounter.cumulativeReadBytes(); + if (maxReadBytes < value) { + maxReadBytes = value; + } + if (minReadBytes > value) { + minReadBytes = value; + } + } + boolean multiple = channelQueues.size() > 1; + readDeviationActive = multiple && minReadBytes < maxReadBytes / 2; + writeDeviationActive = multiple && minWrittenBytes < maxWrittenBytes / 2; + cumulativeWrittenBytes.set(maxWrittenBytes); + cumulativeReadBytes.set(maxReadBytes); + } + + @Override + protected void doAccounting(TrafficCounter counter) { + computeDeviationCumulativeBytes(); + super.doAccounting(counter); + } + + private long computeBalancedWait(float maxLocal, float maxGlobal, long wait) { + if (maxGlobal == 0) { + // no change + return wait; + } + float ratio = maxLocal / maxGlobal; + // if in the boundaries, same value + if (ratio > maxDeviation) { + if (ratio < 1 - maxDeviation) { + return wait; + } else { + ratio = slowDownFactor; + if (wait < MINIMAL_WAIT) { + wait = MINIMAL_WAIT; + } + } + } else { + ratio = accelerationFactor; + } + return (long) (wait * ratio); + } + + /** + * @return the maxGlobalWriteSize + */ + public long getMaxGlobalWriteSize() { + return maxGlobalWriteSize; + } + + /** + * Note the change will be taken as best effort, meaning + * that all already scheduled traffics will not be + * changed, but only applied to new traffics.
+ * So the expected usage of this method is to be used not too often, + * accordingly to the traffic shaping configuration. + * + * @param maxGlobalWriteSize the maximum Global Write Size allowed in the buffer + * globally for all channels before write suspended is set. + */ + public void setMaxGlobalWriteSize(long maxGlobalWriteSize) { + if (maxGlobalWriteSize <= 0) { + throw new IllegalArgumentException("maxGlobalWriteSize must be positive"); + } + this.maxGlobalWriteSize = maxGlobalWriteSize; + } + + /** + * @return the global size of the buffers for all queues. + */ + public long queuesSize() { + return queuesSize.get(); + } + + /** + * @param newWriteLimit Channel write limit + * @param newReadLimit Channel read limit + */ + public void configureChannel(long newWriteLimit, long newReadLimit) { + writeChannelLimit = newWriteLimit; + readChannelLimit = newReadLimit; + long now = TrafficCounter.milliSecondFromNano(); + for (PerChannel perChannel : channelQueues.values()) { + perChannel.channelTrafficCounter.resetAccounting(now); + } + } + + /** + * @return Channel write limit + */ + public long getWriteChannelLimit() { + return writeChannelLimit; + } + + /** + * @param writeLimit Channel write limit + */ + public void setWriteChannelLimit(long writeLimit) { + writeChannelLimit = writeLimit; + long now = TrafficCounter.milliSecondFromNano(); + for (PerChannel perChannel : channelQueues.values()) { + perChannel.channelTrafficCounter.resetAccounting(now); + } + } + + /** + * @return Channel read limit + */ + public long getReadChannelLimit() { + return readChannelLimit; + } + + /** + * @param readLimit Channel read limit + */ + public void setReadChannelLimit(long readLimit) { + readChannelLimit = readLimit; + long now = TrafficCounter.milliSecondFromNano(); + for (PerChannel perChannel : channelQueues.values()) { + perChannel.channelTrafficCounter.resetAccounting(now); + } + } + + /** + * Release all internal resources of this instance. + */ + public final void release() { + trafficCounter.stop(); + } + + private PerChannel getOrSetPerChannel(ChannelHandlerContext ctx) { + // ensure creation is limited to one thread per channel + Channel channel = ctx.channel(); + Integer key = channel.hashCode(); + PerChannel perChannel = channelQueues.get(key); + if (perChannel == null) { + perChannel = new PerChannel(); + perChannel.messagesQueue = new ArrayDeque(); + // Don't start it since managed through the Global one + perChannel.channelTrafficCounter = new TrafficCounter(this, null, "ChannelTC" + + ctx.channel().hashCode(), checkInterval); + perChannel.queueSize = 0L; + perChannel.lastReadTimestamp = TrafficCounter.milliSecondFromNano(); + perChannel.lastWriteTimestamp = perChannel.lastReadTimestamp; + channelQueues.put(key, perChannel); + } + return perChannel; + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + getOrSetPerChannel(ctx); + trafficCounter.resetCumulativeTime(); + super.handlerAdded(ctx); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + trafficCounter.resetCumulativeTime(); + Channel channel = ctx.channel(); + Integer key = channel.hashCode(); + PerChannel perChannel = channelQueues.remove(key); + if (perChannel != null) { + // write operations need synchronization + synchronized (perChannel) { + if (channel.isActive()) { + for (ToSend toSend : perChannel.messagesQueue) { + long size = calculateSize(toSend.toSend); + trafficCounter.bytesRealWriteFlowControl(size); + perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size); + perChannel.queueSize -= size; + queuesSize.addAndGet(-size); + ctx.write(toSend.toSend, toSend.promise); + } + } else { + queuesSize.addAndGet(-perChannel.queueSize); + for (ToSend toSend : perChannel.messagesQueue) { + if (toSend.toSend instanceof ByteBuf) { + ((ByteBuf) toSend.toSend).release(); + } + } + } + perChannel.messagesQueue.clear(); + } + } + releaseWriteSuspended(ctx); + releaseReadSuspended(ctx); + super.handlerRemoved(ctx); + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { + long size = calculateSize(msg); + long now = TrafficCounter.milliSecondFromNano(); + if (size > 0) { + // compute the number of ms to wait before reopening the channel + long waitGlobal = trafficCounter.readTimeToWait(size, getReadLimit(), maxTime, now); + Integer key = ctx.channel().hashCode(); + PerChannel perChannel = channelQueues.get(key); + long wait = 0; + if (perChannel != null) { + wait = perChannel.channelTrafficCounter.readTimeToWait(size, readChannelLimit, maxTime, now); + if (readDeviationActive) { + // now try to balance between the channels + long maxLocalRead = 0; + maxLocalRead = perChannel.channelTrafficCounter.cumulativeReadBytes(); + long maxGlobalRead = cumulativeReadBytes.get(); + if (maxLocalRead <= 0) { + maxLocalRead = 0; + } + if (maxGlobalRead < maxLocalRead) { + maxGlobalRead = maxLocalRead; + } + wait = computeBalancedWait(maxLocalRead, maxGlobalRead, wait); + } + } + if (wait < waitGlobal) { + wait = waitGlobal; + } + wait = checkWaitReadTime(ctx, wait, now); + if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal + // time in order to try to limit the traffic + // Only AutoRead AND HandlerActive True means Context Active + ChannelConfig config = ctx.channel().config(); + if (logger.isDebugEnabled()) { + logger.debug("Read Suspend: " + wait + ":" + config.isAutoRead() + ":" + + isHandlerActive(ctx)); + } + if (config.isAutoRead() && isHandlerActive(ctx)) { + config.setAutoRead(false); + ctx.attr(READ_SUSPENDED).set(true); + // Create a Runnable to reactive the read if needed. If one was create before it will just be + // reused to limit object creation + Attribute attr = ctx.attr(REOPEN_TASK); + Runnable reopenTask = attr.get(); + if (reopenTask == null) { + reopenTask = new ReopenReadTimerTask(ctx); + attr.set(reopenTask); + } + ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS); + if (logger.isDebugEnabled()) { + logger.debug("Suspend final status => " + config.isAutoRead() + ":" + + isHandlerActive(ctx) + " will reopened at: " + wait); + } + } + } + } + informReadOperation(ctx, now); + ctx.fireChannelRead(msg); + } + + @Override + protected long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) { + Integer key = ctx.channel().hashCode(); + PerChannel perChannel = channelQueues.get(key); + if (perChannel != null) { + if (wait > maxTime && now + wait - perChannel.lastReadTimestamp > maxTime) { + wait = maxTime; + } + } + return wait; + } + + @Override + protected void informReadOperation(final ChannelHandlerContext ctx, final long now) { + Integer key = ctx.channel().hashCode(); + PerChannel perChannel = channelQueues.get(key); + if (perChannel != null) { + perChannel.lastReadTimestamp = now; + } + } + + private static final class ToSend { + final long relativeTimeAction; + final Object toSend; + final ChannelPromise promise; + final long size; + + private ToSend(final long delay, final Object toSend, final long size, final ChannelPromise promise) { + this.relativeTimeAction = delay; + this.toSend = toSend; + this.size = size; + this.promise = promise; + } + } + + protected long maximumCumulativeWrittenBytes() { + return cumulativeWrittenBytes.get(); + } + + protected long maximumCumulativeReadBytes() { + return cumulativeReadBytes.get(); + } + + /** + * To allow for instance doAccounting to use the TrafficCounter per channel. + * @return the list of TrafficCounters that exists at the time of the call. + */ + public Collection channelTrafficCounters() { + Collection valueCollection = new AbstractCollection() { + @Override + public Iterator iterator() { + return new Iterator() { + final Iterator iter = channelQueues.values().iterator(); + public boolean hasNext() { + return iter.hasNext(); + } + public TrafficCounter next() { + return iter.next().channelTrafficCounter; + } + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + @Override + public int size() { + return channelQueues.size(); + } + }; + return valueCollection; + } + + @Override + public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) + throws Exception { + long size = calculateSize(msg); + long now = TrafficCounter.milliSecondFromNano(); + if (size > 0) { + // compute the number of ms to wait before continue with the channel + long waitGlobal = trafficCounter.writeTimeToWait(size, getWriteLimit(), maxTime, now); + Integer key = ctx.channel().hashCode(); + PerChannel perChannel = channelQueues.get(key); + long wait = 0; + if (perChannel != null) { + wait = perChannel.channelTrafficCounter.writeTimeToWait(size, writeChannelLimit, maxTime, now); + if (writeDeviationActive) { + // now try to balance between the channels + long maxLocalWrite = 0; + maxLocalWrite = perChannel.channelTrafficCounter.cumulativeWrittenBytes(); + long maxGlobalWrite = cumulativeWrittenBytes.get(); + if (maxLocalWrite <= 0) { + maxLocalWrite = 0; + } + if (maxGlobalWrite < maxLocalWrite) { + maxGlobalWrite = maxLocalWrite; + } + wait = computeBalancedWait(maxLocalWrite, maxGlobalWrite, wait); + } + } + if (wait < waitGlobal) { + wait = waitGlobal; + } + if (wait >= MINIMAL_WAIT) { + if (logger.isDebugEnabled()) { + logger.debug("Write suspend: " + wait + ":" + ctx.channel().config().isAutoRead() + ":" + + isHandlerActive(ctx)); + } + submitWrite(ctx, msg, size, wait, now, promise); + return; + } + } + // to maintain order of write + submitWrite(ctx, msg, size, 0, now, promise); + } + + @Override + protected void submitWrite(final ChannelHandlerContext ctx, final Object msg, + final long size, final long writedelay, final long now, + final ChannelPromise promise) { + Channel channel = ctx.channel(); + Integer key = channel.hashCode(); + PerChannel perChannel = channelQueues.get(key); + if (perChannel == null) { + // in case write occurs before handlerAdded is raized for this handler + // imply a synchronized only if needed + perChannel = getOrSetPerChannel(ctx); + } + final ToSend newToSend; + long delay = writedelay; + boolean globalSizeExceeded = false; + // write operations need synchronization + synchronized (perChannel) { + if (writedelay == 0 && perChannel.messagesQueue.isEmpty()) { + trafficCounter.bytesRealWriteFlowControl(size); + perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size); + ctx.write(msg, promise); + perChannel.lastWriteTimestamp = now; + return; + } + if (delay > maxTime && now + delay - perChannel.lastWriteTimestamp > maxTime) { + delay = maxTime; + } + newToSend = new ToSend(delay + now, msg, size, promise); + perChannel.messagesQueue.addLast(newToSend); + perChannel.queueSize += size; + queuesSize.addAndGet(size); + checkWriteSuspend(ctx, delay, perChannel.queueSize); + if (queuesSize.get() > maxGlobalWriteSize) { + globalSizeExceeded = true; + } + } + if (globalSizeExceeded) { + setUserDefinedWritability(ctx, false); + } + final long futureNow = newToSend.relativeTimeAction; + final PerChannel forSchedule = perChannel; + ctx.executor().schedule(new Runnable() { + @Override + public void run() { + sendAllValid(ctx, forSchedule, futureNow); + } + }, delay, TimeUnit.MILLISECONDS); + } + + private void sendAllValid(final ChannelHandlerContext ctx, final PerChannel perChannel, final long now) { + // write operations need synchronization + synchronized (perChannel) { + ToSend newToSend = perChannel.messagesQueue.pollFirst(); + for (; newToSend != null; newToSend = perChannel.messagesQueue.pollFirst()) { + if (newToSend.relativeTimeAction <= now) { + long size = newToSend.size; + trafficCounter.bytesRealWriteFlowControl(size); + perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size); + perChannel.queueSize -= size; + queuesSize.addAndGet(-size); + ctx.write(newToSend.toSend, newToSend.promise); + perChannel.lastWriteTimestamp = now; + } else { + perChannel.messagesQueue.addFirst(newToSend); + break; + } + } + if (perChannel.messagesQueue.isEmpty()) { + releaseWriteSuspended(ctx); + } + } + ctx.flush(); + } + + @Override + public String toString() { + return new StringBuilder(340).append(super.toString()) + .append(" Write Channel Limit: ").append(writeChannelLimit) + .append(" Read Channel Limit: ").append(readChannelLimit).toString(); + } +} diff --git a/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java index d47c2b0123..21bbfc8300 100644 --- a/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java +++ b/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java @@ -17,81 +17,114 @@ package io.netty.handler.traffic; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.util.concurrent.EventExecutor; +import io.netty.util.internal.PlatformDependent; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; +import java.util.ArrayDeque; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; - +import java.util.concurrent.atomic.AtomicLong; /** - * This implementation of the {@link AbstractTrafficShapingHandler} is for global + *

This implementation of the {@link AbstractTrafficShapingHandler} is for global * traffic shaping, that is to say a global limitation of the bandwidth, whatever - * the number of opened channels.

+ * the number of opened channels.

+ *

Note the index used in OutboundBuffer.setUserDefinedWritability(index, boolean) is 2.

* - * The general use should be as follow:
+ *

The general use should be as follow:

*
    - *
  • Create your unique GlobalTrafficShapingHandler like:

    - * GlobalTrafficShapingHandler myHandler = new GlobalTrafficShapingHandler(executor);

    - * The executor could be the underlying IO worker pool
    - * pipeline.addLast(myHandler);

    + *
  • Create your unique GlobalTrafficShapingHandler like:

    + *

    GlobalTrafficShapingHandler myHandler = new GlobalTrafficShapingHandler(executor);

    + *

    The executor could be the underlying IO worker pool

    + *

    pipeline.addLast(myHandler);

    * - * Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created - * and shared among all channels as the counter must be shared among all channels.

    + *

    Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created + * and shared among all channels as the counter must be shared among all channels.

    * - * Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation) + *

    Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation) * or the check interval (in millisecond) that represents the delay between two computations of the - * bandwidth and so the call back of the doAccounting method (0 means no accounting at all).

    + * bandwidth and so the call back of the doAccounting method (0 means no accounting at all).

    * - * A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting, + *

    A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting, * it is recommended to set a positive value, even if it is high since the precision of the * Traffic Shaping depends on the period where the traffic is computed. The highest the interval, * the less precise the traffic shaping will be. It is suggested as higher value something close - * to 5 or 10 minutes.

    + * to 5 or 10 minutes.

    * - * maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.
    + *

    maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.

    *
  • - *

+ *
  • In your handler, you should consider to use the channel.isWritable() and + * channelWritabilityChanged(ctx) to handle writability, or through + * future.addListener(new GenericFutureListener()) on the future returned by + * ctx.write().
  • + *
  • You shall also consider to have object size in read or write operations relatively adapted to + * the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect, + * while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.

  • + *
  • Some configuration methods will be taken as best effort, meaning + * that all already scheduled traffics will not be + * changed, but only applied to new traffics.

    + * So the expected usage of those methods are to be used not too often, + * accordingly to the traffic shaping configuration.
  • + * * * Be sure to call {@link #release()} once this handler is not needed anymore to release all internal resources. * This will not shutdown the {@link EventExecutor} as it may be shared, so you need to do this by your own. */ @Sharable public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler { - private Map> messagesQueues = new HashMap>(); + /** + * All queues per channel + */ + private final ConcurrentMap channelQueues = PlatformDependent.newConcurrentHashMap(); /** - * Create the global TrafficCounter + * Global queues size + */ + private final AtomicLong queuesSize = new AtomicLong(); + + /** + * Max size in the list before proposing to stop writing new objects from next handlers + * for all channel (global) + */ + long maxGlobalWriteSize = DEFAULT_MAX_SIZE * 100; // default 400MB + + private static final class PerChannel { + ArrayDeque messagesQueue; + long queueSize; + long lastWriteTimestamp; + long lastReadTimestamp; + } + + /** + * Create the global TrafficCounter. */ void createGlobalTrafficCounter(ScheduledExecutorService executor) { if (executor == null) { throw new NullPointerException("executor"); } - TrafficCounter tc = new TrafficCounter(this, executor, "GlobalTC", - checkInterval); + TrafficCounter tc = new TrafficCounter(this, executor, "GlobalTC", checkInterval); setTrafficCounter(tc); tc.start(); } /** - * Create a new instance + * Create a new instance. * * @param executor - * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter} + * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. * @param writeLimit * 0 or a limit in bytes/s * @param readLimit * 0 or a limit in bytes/s * @param checkInterval * The delay between two computations of performances for - * channels or 0 if no stats are to be computed + * channels or 0 if no stats are to be computed. * @param maxTime - * The maximum delay to wait in case of traffic excess + * The maximum delay to wait in case of traffic excess. */ public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit, long readLimit, long checkInterval, long maxTime) { @@ -100,17 +133,18 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler { } /** - * Create a new instance + * Create a new instance using + * default max time as delay allowed value of 15000 ms. * * @param executor - * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter} + * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. * @param writeLimit * 0 or a limit in bytes/s * @param readLimit * 0 or a limit in bytes/s * @param checkInterval * The delay between two computations of performances for - * channels or 0 if no stats are to be computed + * channels or 0 if no stats are to be computed. */ public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit, long readLimit, long checkInterval) { @@ -119,10 +153,11 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler { } /** - * Create a new instance + * Create a new instance using default Check Interval value of 1000 ms and + * default max time as delay allowed value of 15000 ms. * * @param executor - * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter} + * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. * @param writeLimit * 0 or a limit in bytes/s * @param readLimit @@ -135,13 +170,14 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler { } /** - * Create a new instance + * Create a new instance using + * default max time as delay allowed value of 15000 ms and no limit. * * @param executor - * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter} + * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. * @param checkInterval * The delay between two computations of performances for - * channels or 0 if no stats are to be computed + * channels or 0 if no stats are to be computed. */ public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long checkInterval) { super(checkInterval); @@ -149,91 +185,209 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler { } /** - * Create a new instance + * Create a new instance using default Check Interval value of 1000 ms and + * default max time as delay allowed value of 15000 ms and no limit. * * @param executor - * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter} + * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. */ public GlobalTrafficShapingHandler(EventExecutor executor) { + super(); createGlobalTrafficCounter(executor); } /** - * Release all internal resources of this instance + * @return the maxGlobalWriteSize default value being 400 MB. + */ + public long getMaxGlobalWriteSize() { + return maxGlobalWriteSize; + } + + /** + * Note the change will be taken as best effort, meaning + * that all already scheduled traffics will not be + * changed, but only applied to new traffics.
    + * So the expected usage of this method is to be used not too often, + * accordingly to the traffic shaping configuration. + * + * @param maxGlobalWriteSize the maximum Global Write Size allowed in the buffer + * globally for all channels before write suspended is set, + * default value being 400 MB. + */ + public void setMaxGlobalWriteSize(long maxGlobalWriteSize) { + this.maxGlobalWriteSize = maxGlobalWriteSize; + } + + /** + * @return the global size of the buffers for all queues. + */ + public long queuesSize() { + return queuesSize.get(); + } + + /** + * Release all internal resources of this instance. */ public final void release() { - if (trafficCounter != null) { - trafficCounter.stop(); + trafficCounter.stop(); + } + + private PerChannel getOrSetPerChannel(ChannelHandlerContext ctx) { + // ensure creation is limited to one thread per channel + Channel channel = ctx.channel(); + Integer key = channel.hashCode(); + PerChannel perChannel = channelQueues.get(key); + if (perChannel == null) { + perChannel = new PerChannel(); + perChannel.messagesQueue = new ArrayDeque(); + perChannel.queueSize = 0L; + perChannel.lastReadTimestamp = TrafficCounter.milliSecondFromNano(); + perChannel.lastWriteTimestamp = perChannel.lastReadTimestamp; + channelQueues.put(key, perChannel); } + return perChannel; } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - Integer key = ctx.channel().hashCode(); - List mq = new LinkedList(); - messagesQueues.put(key, mq); + getOrSetPerChannel(ctx); super.handlerAdded(ctx); } @Override - public synchronized void handlerRemoved(ChannelHandlerContext ctx) throws Exception { - Integer key = ctx.channel().hashCode(); - List mq = messagesQueues.remove(key); - if (mq != null) { - for (ToSend toSend : mq) { - if (toSend.toSend instanceof ByteBuf) { - ((ByteBuf) toSend.toSend).release(); + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); + Integer key = channel.hashCode(); + PerChannel perChannel = channelQueues.remove(key); + if (perChannel != null) { + // write operations need synchronization + synchronized (perChannel) { + if (channel.isActive()) { + for (ToSend toSend : perChannel.messagesQueue) { + long size = calculateSize(toSend.toSend); + trafficCounter.bytesRealWriteFlowControl(size); + perChannel.queueSize -= size; + queuesSize.addAndGet(-size); + ctx.write(toSend.toSend, toSend.promise); + } + } else { + queuesSize.addAndGet(-perChannel.queueSize); + for (ToSend toSend : perChannel.messagesQueue) { + if (toSend.toSend instanceof ByteBuf) { + ((ByteBuf) toSend.toSend).release(); + } + } } + perChannel.messagesQueue.clear(); } - mq.clear(); } + releaseWriteSuspended(ctx); + releaseReadSuspended(ctx); super.handlerRemoved(ctx); } + @Override + long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) { + Integer key = ctx.channel().hashCode(); + PerChannel perChannel = channelQueues.get(key); + if (perChannel != null) { + if (wait > maxTime && now + wait - perChannel.lastReadTimestamp > maxTime) { + wait = maxTime; + } + } + return wait; + } + + @Override + void informReadOperation(final ChannelHandlerContext ctx, final long now) { + Integer key = ctx.channel().hashCode(); + PerChannel perChannel = channelQueues.get(key); + if (perChannel != null) { + perChannel.lastReadTimestamp = now; + } + } + private static final class ToSend { - final long date; + final long relativeTimeAction; final Object toSend; + final long size; final ChannelPromise promise; - private ToSend(final long delay, final Object toSend, final ChannelPromise promise) { - this.date = System.currentTimeMillis() + delay; + private ToSend(final long delay, final Object toSend, final long size, final ChannelPromise promise) { + this.relativeTimeAction = delay; this.toSend = toSend; + this.size = size; this.promise = promise; } } @Override - protected synchronized void submitWrite(final ChannelHandlerContext ctx, final Object msg, final long delay, + void submitWrite(final ChannelHandlerContext ctx, final Object msg, + final long size, final long writedelay, final long now, final ChannelPromise promise) { - Integer key = ctx.channel().hashCode(); - List messagesQueue = messagesQueues.get(key); - if (delay == 0 && (messagesQueue == null || messagesQueue.isEmpty())) { - ctx.write(msg, promise); - return; + Channel channel = ctx.channel(); + Integer key = channel.hashCode(); + PerChannel perChannel = channelQueues.get(key); + if (perChannel == null) { + // in case write occurs before handlerAdded is raized for this handler + // imply a synchronized only if needed + perChannel = getOrSetPerChannel(ctx); } - final ToSend newToSend = new ToSend(delay, msg, promise); - if (messagesQueue == null) { - messagesQueue = new LinkedList(); - messagesQueues.put(key, messagesQueue); + final ToSend newToSend; + long delay = writedelay; + boolean globalSizeExceeded = false; + // write operations need synchronization + synchronized (perChannel) { + if (writedelay == 0 && perChannel.messagesQueue.isEmpty()) { + trafficCounter.bytesRealWriteFlowControl(size); + ctx.write(msg, promise); + perChannel.lastWriteTimestamp = now; + return; + } + if (delay > maxTime && now + delay - perChannel.lastWriteTimestamp > maxTime) { + delay = maxTime; + } + newToSend = new ToSend(delay + now, msg, size, promise); + perChannel.messagesQueue.addLast(newToSend); + perChannel.queueSize += size; + queuesSize.addAndGet(size); + checkWriteSuspend(ctx, delay, perChannel.queueSize); + if (queuesSize.get() > maxGlobalWriteSize) { + globalSizeExceeded = true; + } } - messagesQueue.add(newToSend); - final List mqfinal = messagesQueue; + if (globalSizeExceeded) { + setUserDefinedWritability(ctx, false); + } + final long futureNow = newToSend.relativeTimeAction; + final PerChannel forSchedule = perChannel; ctx.executor().schedule(new Runnable() { @Override public void run() { - sendAllValid(ctx, mqfinal); + sendAllValid(ctx, forSchedule, futureNow); } }, delay, TimeUnit.MILLISECONDS); } - private synchronized void sendAllValid(final ChannelHandlerContext ctx, final List messagesQueue) { - while (!messagesQueue.isEmpty()) { - ToSend newToSend = messagesQueue.remove(0); - if (newToSend.date <= System.currentTimeMillis()) { - ctx.write(newToSend.toSend, newToSend.promise); - } else { - messagesQueue.add(0, newToSend); - break; + private void sendAllValid(final ChannelHandlerContext ctx, final PerChannel perChannel, final long now) { + // write operations need synchronization + synchronized (perChannel) { + ToSend newToSend = perChannel.messagesQueue.pollFirst(); + for (; newToSend != null; newToSend = perChannel.messagesQueue.pollFirst()) { + if (newToSend.relativeTimeAction <= now) { + long size = newToSend.size; + trafficCounter.bytesRealWriteFlowControl(size); + perChannel.queueSize -= size; + queuesSize.addAndGet(-size); + ctx.write(newToSend.toSend, newToSend.promise); + perChannel.lastWriteTimestamp = now; + } else { + perChannel.messagesQueue.addFirst(newToSend); + break; + } + } + if (perChannel.messagesQueue.isEmpty()) { + releaseWriteSuspended(ctx); } } ctx.flush(); diff --git a/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java b/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java index 82a4795254..cdd963a12e 100644 --- a/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java +++ b/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java @@ -21,7 +21,6 @@ import io.netty.util.internal.logging.InternalLoggerFactory; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -40,6 +39,13 @@ public class TrafficCounter { private static final InternalLogger logger = InternalLoggerFactory.getInstance(TrafficCounter.class); + /** + * @return the time in ms using nanoTime, so not real EPOCH time but elapsed time in ms. + */ + public static final long milliSecondFromNano() { + return System.nanoTime() / 1000000; + } + /** * Current written bytes */ @@ -50,6 +56,16 @@ public class TrafficCounter { */ private final AtomicLong currentReadBytes = new AtomicLong(); + /** + * Last writing time during current check interval + */ + private long writingTime; + + /** + * Last reading delay during current check interval + */ + private long readingTime; + /** * Long life written bytes */ @@ -61,7 +77,7 @@ public class TrafficCounter { private final AtomicLong cumulativeReadBytes = new AtomicLong(); /** - * Last Time where cumulative bytes where reset to zero + * Last Time where cumulative bytes where reset to zero: this time is a real EPOC time (informative only) */ private long lastCumulativeTime; @@ -78,37 +94,37 @@ public class TrafficCounter { /** * Last Time Check taken */ - private final AtomicLong lastTime = new AtomicLong(); + final AtomicLong lastTime = new AtomicLong(); /** * Last written bytes number during last check interval */ - private long lastWrittenBytes; + private volatile long lastWrittenBytes; /** * Last read bytes number during last check interval */ - private long lastReadBytes; + private volatile long lastReadBytes; /** - * Last non 0 written bytes number during last check interval + * Last future writing time during last check interval */ - private long lastNonNullWrittenBytes; + private volatile long lastWritingTime; /** - * Last time written bytes with non 0 written bytes + * Last reading time during last check interval */ - private long lastNonNullWrittenTime; + private volatile long lastReadingTime; /** - * Last time read bytes with non 0 written bytes + * Real written bytes */ - private long lastNonNullReadTime; + private final AtomicLong realWrittenBytes = new AtomicLong(); /** - * Last non 0 read bytes number during last check interval + * Real writing bandwidth */ - private long lastNonNullReadBytes; + private long realWriteThroughput; /** * Delay between two captures @@ -126,25 +142,25 @@ public class TrafficCounter { /** * The associated TrafficShapingHandler */ - private final AbstractTrafficShapingHandler trafficShapingHandler; + final AbstractTrafficShapingHandler trafficShapingHandler; /** * Executor that will run the monitor */ - private final ScheduledExecutorService executor; + final ScheduledExecutorService executor; /** * Monitor created once in start() */ - private Runnable monitor; + Runnable monitor; /** * used in stop() to cancel the timer */ - private volatile ScheduledFuture scheduledFuture; + volatile ScheduledFuture scheduledFuture; /** * Is Monitor active */ - final AtomicBoolean monitorActive = new AtomicBoolean(); + volatile boolean monitorActive; /** * Class to implement monitoring at fix delay @@ -162,8 +178,15 @@ public class TrafficCounter { private final TrafficCounter counter; /** +<<<<<<< HEAD * @param trafficShapingHandler The parent handler to which this task needs to callback to for accounting * @param counter The parent TrafficCounter that we need to reset the statistics for +======= + * @param trafficShapingHandler + * The parent handler to which this task needs to callback to for accounting. + * @param counter + * The parent TrafficCounter that we need to reset the statistics for. +>>>>>>> b886c05... Fix big transfer and Write traffic shaping issues */ protected TrafficMonitoringTask( AbstractTrafficShapingHandler trafficShapingHandler, @@ -174,11 +197,10 @@ public class TrafficCounter { @Override public void run() { - if (!counter.monitorActive.get()) { + if (!counter.monitorActive) { return; } - long endTime = System.currentTimeMillis(); - counter.resetAccounting(endTime); + counter.resetAccounting(milliSecondFromNano()); if (trafficShapingHandler1 != null) { trafficShapingHandler1.doAccounting(counter); } @@ -188,30 +210,32 @@ public class TrafficCounter { } /** - * Start the monitoring process + * Start the monitoring process. */ public synchronized void start() { - if (monitorActive.get()) { + if (monitorActive) { return; } - lastTime.set(System.currentTimeMillis()); - if (checkInterval.get() > 0) { - monitorActive.set(true); + lastTime.set(milliSecondFromNano()); + long localCheckInterval = checkInterval.get(); + // if executor is null, it means it is piloted by a GlobalChannelTrafficCounter, so no executor + if (localCheckInterval > 0 && executor != null) { + monitorActive = true; monitor = new TrafficMonitoringTask(trafficShapingHandler, this); scheduledFuture = - executor.schedule(monitor, checkInterval.get(), TimeUnit.MILLISECONDS); + executor.schedule(monitor, localCheckInterval, TimeUnit.MILLISECONDS); } } /** - * Stop the monitoring process + * Stop the monitoring process. */ public synchronized void stop() { - if (!monitorActive.get()) { + if (!monitorActive) { return; } - monitorActive.set(false); - resetAccounting(System.currentTimeMillis()); + monitorActive = false; + resetAccounting(milliSecondFromNano()); if (trafficShapingHandler != null) { trafficShapingHandler.doAccounting(this); } @@ -221,9 +245,13 @@ public class TrafficCounter { } /** - * Reset the accounting on Read and Write + * Reset the accounting on Read and Write. * +<<<<<<< HEAD * @param newLastTime the millisecond unix timestamp that we should be considered up-to-date for +======= + * @param newLastTime the milliseconds unix timestamp that we should be considered up-to-date for. +>>>>>>> b886c05... Fix big transfer and Write traffic shaping issues */ synchronized void resetAccounting(long newLastTime) { long interval = newLastTime - lastTime.getAndSet(newLastTime); @@ -231,7 +259,7 @@ public class TrafficCounter { // nothing to do return; } - if (logger.isDebugEnabled() && (interval > 2 * checkInterval())) { + if (logger.isDebugEnabled() && (interval > checkInterval() << 1)) { logger.debug("Acct schedule not ok: " + interval + " > 2*" + checkInterval() + " from " + name); } lastReadBytes = currentReadBytes.getAndSet(0); @@ -240,46 +268,54 @@ public class TrafficCounter { // nb byte / checkInterval in ms * 1000 (1s) lastWriteThroughput = lastWrittenBytes * 1000 / interval; // nb byte / checkInterval in ms * 1000 (1s) - if (lastWrittenBytes > 0) { - lastNonNullWrittenBytes = lastWrittenBytes; - lastNonNullWrittenTime = newLastTime; - } - if (lastReadBytes > 0) { - lastNonNullReadBytes = lastReadBytes; - lastNonNullReadTime = newLastTime; - } + realWriteThroughput = realWrittenBytes.getAndSet(0) * 1000 / interval; + lastWritingTime = Math.max(lastWritingTime, writingTime); + lastReadingTime = Math.max(lastReadingTime, readingTime); } /** * Constructor with the {@link AbstractTrafficShapingHandler} that hosts it, the Timer to use, its - * name, the checkInterval between two computations in millisecond - * @param trafficShapingHandler the associated AbstractTrafficShapingHandler - * @param executor the underlying executor service for scheduling checks - * @param name the name given to this monitor - * @param checkInterval the checkInterval in millisecond between two computations + * name, the checkInterval between two computations in millisecond. + * + * @param trafficShapingHandler + * the associated AbstractTrafficShapingHandler. + * @param executor + * the underlying executor service for scheduling checks, might be null when used + * from {@link GlobalChannelTrafficCounter}. + * @param name + * the name given to this monitor. + * @param checkInterval + * the checkInterval in millisecond between two computations. */ - public TrafficCounter(AbstractTrafficShapingHandler trafficShapingHandler, - ScheduledExecutorService executor, String name, long checkInterval) { + public TrafficCounter(AbstractTrafficShapingHandler trafficShapingHandler, ScheduledExecutorService executor, + String name, long checkInterval) { + if (trafficShapingHandler == null) { + throw new IllegalArgumentException("TrafficShapingHandler must not be null"); + } this.trafficShapingHandler = trafficShapingHandler; this.executor = executor; this.name = name; + // absolute time: informative only lastCumulativeTime = System.currentTimeMillis(); + writingTime = milliSecondFromNano(); + readingTime = writingTime; + lastWritingTime = writingTime; + lastReadingTime = writingTime; configure(checkInterval); } /** - * Change checkInterval between two computations in millisecond + * Change checkInterval between two computations in millisecond. * * @param newcheckInterval The new check interval (in milliseconds) */ public void configure(long newcheckInterval) { long newInterval = newcheckInterval / 10 * 10; - if (checkInterval.get() != newInterval) { - checkInterval.set(newInterval); + if (checkInterval.getAndSet(newInterval) != newInterval) { if (newInterval <= 0) { stop(); // No more active monitoring - lastTime.set(System.currentTimeMillis()); + lastTime.set(milliSecondFromNano()); } else { // Start if necessary start(); @@ -310,64 +346,67 @@ public class TrafficCounter { } /** + * Computes counters for Real Write. * + * @param write + * the size in bytes to write + */ + void bytesRealWriteFlowControl(long write) { + realWrittenBytes.addAndGet(write); + } + + /** * @return the current checkInterval between two computations of traffic counter - * in millisecond + * in millisecond. */ public long checkInterval() { return checkInterval.get(); } /** - * - * @return the Read Throughput in bytes/s computes in the last check interval + * @return the Read Throughput in bytes/s computes in the last check interval. */ public long lastReadThroughput() { return lastReadThroughput; } /** - * - * @return the Write Throughput in bytes/s computes in the last check interval + * @return the Write Throughput in bytes/s computes in the last check interval. */ public long lastWriteThroughput() { return lastWriteThroughput; } /** - * - * @return the number of bytes read during the last check Interval + * @return the number of bytes read during the last check Interval. */ public long lastReadBytes() { return lastReadBytes; } /** - * - * @return the number of bytes written during the last check Interval + * @return the number of bytes written during the last check Interval. */ public long lastWrittenBytes() { return lastWrittenBytes; } /** - * - * @return the current number of bytes read since the last checkInterval - */ + * @return the current number of bytes read since the last checkInterval. + */ public long currentReadBytes() { return currentReadBytes.get(); } /** - * - * @return the current number of bytes written since the last check Interval + * @return the current number of bytes written since the last check Interval. */ public long currentWrittenBytes() { return currentWrittenBytes.get(); } /** - * @return the Time in millisecond of the last check as of System.currentTimeMillis() + * @return the Time in millisecond of the last check as of System.currentTimeMillis(). */ public long lastTime() { return lastTime.get(); @@ -396,7 +435,22 @@ public class TrafficCounter { } /** - * Reset both read and written cumulative bytes counters and the associated time. + * @return the realWrittenBytes + */ + public AtomicLong getRealWrittenBytes() { + return realWrittenBytes; + } + + /** + * @return the realWriteThroughput + */ + public long getRealWriteThroughput() { + return realWriteThroughput; + } + + /** + * Reset both read and written cumulative bytes counters and the associated absolute time + * from System.currentTimeMillis(). */ public void resetCumulativeTime() { lastCumulativeTime = System.currentTimeMillis(); @@ -405,7 +459,7 @@ public class TrafficCounter { } /** - * @return the name + * @return the name of this TrafficCounter. */ public String name() { return name; @@ -413,124 +467,162 @@ public class TrafficCounter { /** * Returns the time to wait (if any) for the given length message, using the given limitTraffic and the max wait - * time + * time. + * + * @param size + * the recv size + * @param limitTraffic + * the traffic limit in bytes per second. + * @param maxTime + * the max time in ms to wait in case of excess of traffic. + * @return the current time to wait (in ms) if needed for Read operation. + */ + @Deprecated + public long readTimeToWait(final long size, final long limitTraffic, final long maxTime) { + return readTimeToWait(size, limitTraffic, maxTime, milliSecondFromNano()); + } + + /** + * Returns the time to wait (if any) for the given length message, using the given limitTraffic and the max wait + * time. * * @param size * the recv size * @param limitTraffic * the traffic limit in bytes per second * @param maxTime - * the max time in ms to wait in case of excess of traffic - * @return the current time to wait (in ms) if needed for Read operation + * the max time in ms to wait in case of excess of traffic. + * @param now the current time + * @return the current time to wait (in ms) if needed for Read operation. */ - public synchronized long readTimeToWait(final long size, final long limitTraffic, final long maxTime) { - final long now = System.currentTimeMillis(); + public long readTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) { bytesRecvFlowControl(size); - if (limitTraffic == 0) { + if (size == 0 || limitTraffic == 0) { return 0; } + final long lastTimeCheck = lastTime.get(); long sum = currentReadBytes.get(); - long interval = now - lastTime.get(); - // Short time checking - if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT && sum > 0) { - long time = (sum * 1000 / limitTraffic - interval) / 10 * 10; + long localReadingTime = readingTime; + long lastRB = lastReadBytes; + final long interval = now - lastTimeCheck; + long pastDelay = Math.max(lastReadingTime - lastTimeCheck, 0); + if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) { + // Enough interval time to compute shaping + long time = sum * 1000 / limitTraffic - interval + pastDelay; if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) { if (logger.isDebugEnabled()) { - logger.debug("Time: " + time + ":" + sum + ":" + interval); + logger.debug("Time: " + time + ":" + sum + ":" + interval + ":" + pastDelay); } - return time > maxTime ? maxTime : time; + if (time > maxTime && now + time - localReadingTime > maxTime) { + time = maxTime; + } + readingTime = Math.max(localReadingTime, now + time); + return time; } + readingTime = Math.max(localReadingTime, now); return 0; } - // long time checking - if (lastNonNullReadBytes > 0 && lastNonNullReadTime + AbstractTrafficShapingHandler.MINIMAL_WAIT < now) { - long lastsum = sum + lastNonNullReadBytes; - long lastinterval = now - lastNonNullReadTime; - long time = (lastsum * 1000 / limitTraffic - lastinterval) / 10 * 10; - if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) { - if (logger.isDebugEnabled()) { - logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval); - } - return time > maxTime ? maxTime : time; + // take the last read interval check to get enough interval time + long lastsum = sum + lastRB; + long lastinterval = interval + checkInterval.get(); + long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay; + if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) { + if (logger.isDebugEnabled()) { + logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval + ":" + pastDelay); } - } else { - // final "middle" time checking in case resetAccounting called very recently - sum += lastReadBytes; - long lastinterval = AbstractTrafficShapingHandler.MINIMAL_WAIT; - long time = (sum * 1000 / limitTraffic - lastinterval) / 10 * 10; - if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) { - if (logger.isDebugEnabled()) { - logger.debug("Time: " + time + ":" + sum + ":" + lastinterval); - } - return time > maxTime ? maxTime : time; + if (time > maxTime && now + time - localReadingTime > maxTime) { + time = maxTime; } + readingTime = Math.max(localReadingTime, now + time); + return time; } + readingTime = Math.max(localReadingTime, now); return 0; } /** * Returns the time to wait (if any) for the given length message, using the given limitTraffic and - * the max wait time + * the max wait time. * * @param size * the write size * @param limitTraffic - * the traffic limit in bytes per second + * the traffic limit in bytes per second. * @param maxTime - * the max time in ms to wait in case of excess of traffic - * @return the current time to wait (in ms) if needed for Write operation + * the max time in ms to wait in case of excess of traffic. + * @return the current time to wait (in ms) if needed for Write operation. */ - public synchronized long writeTimeToWait(final long size, final long limitTraffic, final long maxTime) { - bytesWriteFlowControl(size); - if (limitTraffic == 0) { - return 0; - } - long sum = currentWrittenBytes.get(); - final long now = System.currentTimeMillis(); - long interval = now - lastTime.get(); - if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT && sum > 0) { - long time = (sum * 1000 / limitTraffic - interval) / 10 * 10; - if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) { - if (logger.isDebugEnabled()) { - logger.debug("Time: " + time + ":" + sum + ":" + interval); - } - return time > maxTime ? maxTime : time; - } - return 0; - } - if (lastNonNullWrittenBytes > 0 && lastNonNullWrittenTime + AbstractTrafficShapingHandler.MINIMAL_WAIT < now) { - long lastsum = sum + lastNonNullWrittenBytes; - long lastinterval = now - lastNonNullWrittenTime; - long time = (lastsum * 1000 / limitTraffic - lastinterval) / 10 * 10; - if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) { - if (logger.isDebugEnabled()) { - logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval); - } - return time > maxTime ? maxTime : time; - } - } else { - sum += lastWrittenBytes; - long lastinterval = AbstractTrafficShapingHandler.MINIMAL_WAIT + Math.abs(interval); - long time = (sum * 1000 / limitTraffic - lastinterval) / 10 * 10; - if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) { - if (logger.isDebugEnabled()) { - logger.debug("Time: " + time + ":" + sum + ":" + lastinterval); - } - return time > maxTime ? maxTime : time; - } - } - return 0; + @Deprecated + public long writeTimeToWait(final long size, final long limitTraffic, final long maxTime) { + return writeTimeToWait(size, limitTraffic, maxTime, milliSecondFromNano()); } /** - * String information + * Returns the time to wait (if any) for the given length message, using the given limitTraffic and + * the max wait time. + * + * @param size + * the write size + * @param limitTraffic + * the traffic limit in bytes per second. + * @param maxTime + * the max time in ms to wait in case of excess of traffic. + * @param now the current time + * @return the current time to wait (in ms) if needed for Write operation. */ + public long writeTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) { + bytesWriteFlowControl(size); + if (size == 0 || limitTraffic == 0) { + return 0; + } + final long lastTimeCheck = lastTime.get(); + long sum = currentWrittenBytes.get(); + long lastWB = lastWrittenBytes; + long localWritingTime = writingTime; + long pastDelay = Math.max(lastWritingTime - lastTimeCheck, 0); + final long interval = now - lastTimeCheck; + if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) { + // Enough interval time to compute shaping + long time = sum * 1000 / limitTraffic - interval + pastDelay; + if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) { + if (logger.isDebugEnabled()) { + logger.debug("Time: " + time + ":" + sum + ":" + interval + ":" + pastDelay); + } + if (time > maxTime && now + time - localWritingTime > maxTime) { + time = maxTime; + } + writingTime = Math.max(localWritingTime, now + time); + return time; + } + writingTime = Math.max(localWritingTime, now); + return 0; + } + // take the last write interval check to get enough interval time + long lastsum = sum + lastWB; + long lastinterval = interval + checkInterval.get(); + long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay; + if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) { + if (logger.isDebugEnabled()) { + logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval + ":" + pastDelay); + } + if (time > maxTime && now + time - localWritingTime > maxTime) { + time = maxTime; + } + writingTime = Math.max(localWritingTime, now + time); + return time; + } + writingTime = Math.max(localWritingTime, now); + return 0; + } + @Override public String toString() { - return "Monitor " + name + " Current Speed Read: " + - (lastReadThroughput >> 10) + " KB/s, Write: " + - (lastWriteThroughput >> 10) + " KB/s Current Read: " + - (currentReadBytes.get() >> 10) + " KB Current Write: " + - (currentWrittenBytes.get() >> 10) + " KB"; + return new StringBuilder(165).append("Monitor ").append(name) + .append(" Current Speed Read: ").append(lastReadThroughput >> 10).append(" KB/s, ") + .append("Asked Write: ").append(lastWriteThroughput >> 10).append(" KB/s, ") + .append("Real Write: ").append(realWriteThroughput >> 10).append(" KB/s, ") + .append("Current Read: ").append(currentReadBytes.get() >> 10).append(" KB, ") + .append("Current asked Write: ").append(currentWrittenBytes.get() >> 10).append(" KB, ") + .append("Current real Write: ").append(realWrittenBytes.get() >> 10).append(" KB").toString(); } } diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/TrafficShapingHandlerTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/TrafficShapingHandlerTest.java index 19964a6107..949d1b808f 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/TrafficShapingHandlerTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/TrafficShapingHandlerTest.java @@ -24,6 +24,7 @@ import io.netty.channel.socket.SocketChannel; import io.netty.handler.traffic.AbstractTrafficShapingHandler; import io.netty.handler.traffic.ChannelTrafficShapingHandler; import io.netty.handler.traffic.GlobalTrafficShapingHandler; +import io.netty.handler.traffic.TrafficCounter; import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.Promise; @@ -329,7 +330,7 @@ public class TrafficShapingHandlerTest extends AbstractSocketTest { for (int i = 1; i < multipleMessage.length; i++) { totalNb += multipleMessage[i]; } - Long start = System.currentTimeMillis(); + Long start = TrafficCounter.milliSecondFromNano(); int nb = multipleMessage[0]; for (int i = 0; i < nb; i++) { cc.write(cc.alloc().buffer().writeBytes(data)); @@ -337,7 +338,7 @@ public class TrafficShapingHandlerTest extends AbstractSocketTest { cc.flush(); promise.await(); - Long stop = System.currentTimeMillis(); + Long stop = TrafficCounter.milliSecondFromNano(); assertTrue("Error during exceution of TrafficShapping: " + promise.cause(), promise.isSuccess()); float average = (totalNb * messageSize) / (float) (stop - start); @@ -385,7 +386,7 @@ public class TrafficShapingHandlerTest extends AbstractSocketTest { final AtomicReference exception = new AtomicReference(); volatile int step; // first message will always be validated - private long currentLastTime = System.currentTimeMillis(); + private long currentLastTime = TrafficCounter.milliSecondFromNano(); private final long[] minimalWaitBetween; private final int[] multipleMessage; private final int[] autoRead; @@ -473,7 +474,7 @@ public class TrafficShapingHandlerTest extends AbstractSocketTest { int nb = actual.length / messageSize; loggerServer.info("Step: " + step + " Read: " + nb + " blocks"); in.readBytes(actual); - long timestamp = System.currentTimeMillis(); + long timestamp = TrafficCounter.milliSecondFromNano(); int isAutoRead = 0; int laststep = step; for (int i = 0; i < nb; i++) {