diff --git a/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java
index 53625eaf4e..8a06f54f56 100644
--- a/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java
+++ b/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java
@@ -167,12 +167,12 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
* The maximum delay to wait in case of traffic excess.
* Must be positive.
*/
- protected AbstractTrafficShapingHandler(long writeLimit, long readLimit,
- long checkInterval, long maxTime) {
+ protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval, long maxTime) {
if (maxTime <= 0) {
throw new IllegalArgumentException("maxTime must be positive");
}
- this.userDefinedWritabilityIndex = userDefinedWritabilityIndex();
+
+ userDefinedWritabilityIndex = userDefinedWritabilityIndex();
this.writeLimit = writeLimit;
this.readLimit = readLimit;
this.checkInterval = checkInterval;
@@ -398,9 +398,9 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
*
Note that this limit is a best effort on memory limitation to prevent Out Of
* Memory Exception. To ensure it works, the handler generating the write should
* use one of the way provided by Netty to handle the capacity:
- * - the Channel.isWritable()
property and the corresponding
- * channelWritabilityChanged()
- * - the ChannelFuture.addListener(new GenericFutureListener())
+ * - the {@code Channel.isWritable()} property and the corresponding
+ * {@code channelWritabilityChanged()}
+ * - the {@code ChannelFuture.addListener(new GenericFutureListener())}
*
* @param maxWriteSize the maximum Write Size allowed in the buffer
* per channel before write suspended is set,
@@ -430,13 +430,14 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
this.ctx = ctx;
}
+ @Override
public void run() {
ChannelConfig config = ctx.channel().config();
if (!config.isAutoRead() && isHandlerActive(ctx)) {
// If AutoRead is False and Active is True, user make a direct setAutoRead(false)
// Then Just reset the status
if (logger.isDebugEnabled()) {
- logger.debug("Not unsuspend: " + config.isAutoRead() + ":" +
+ logger.debug("Not unsuspend: " + config.isAutoRead() + ':' +
isHandlerActive(ctx));
}
ctx.attr(READ_SUSPENDED).set(false);
@@ -444,10 +445,10 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
// Anything else allows the handler to reset the AutoRead
if (logger.isDebugEnabled()) {
if (config.isAutoRead() && !isHandlerActive(ctx)) {
- logger.debug("Unsuspend: " + config.isAutoRead() + ":" +
+ logger.debug("Unsuspend: " + config.isAutoRead() + ':' +
isHandlerActive(ctx));
} else {
- logger.debug("Normal unsuspend: " + config.isAutoRead() + ":"
+ logger.debug("Normal unsuspend: " + config.isAutoRead() + ':'
+ isHandlerActive(ctx));
}
}
@@ -456,7 +457,7 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
ctx.channel().read();
}
if (logger.isDebugEnabled()) {
- logger.debug("Unsupsend final status => " + config.isAutoRead() + ":"
+ logger.debug("Unsupsend final status => " + config.isAutoRead() + ':'
+ isHandlerActive(ctx));
}
}
@@ -483,7 +484,7 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
// Only AutoRead AND HandlerActive True means Context Active
ChannelConfig config = ctx.channel().config();
if (logger.isDebugEnabled()) {
- logger.debug("Read suspend: " + wait + ":" + config.isAutoRead() + ":"
+ logger.debug("Read suspend: " + wait + ':' + config.isAutoRead() + ':'
+ isHandlerActive(ctx));
}
if (config.isAutoRead() && isHandlerActive(ctx)) {
@@ -499,7 +500,7 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
}
ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
if (logger.isDebugEnabled()) {
- logger.debug("Suspend final status => " + config.isAutoRead() + ":"
+ logger.debug("Suspend final status => " + config.isAutoRead() + ':'
+ isHandlerActive(ctx) + " will reopened at: " + wait);
}
}
@@ -551,7 +552,7 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
if (wait >= MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
- logger.debug("Write suspend: " + wait + ":" + ctx.channel().config().isAutoRead() + ":"
+ logger.debug("Write suspend: " + wait + ':' + ctx.channel().config().isAutoRead() + ':'
+ isHandlerActive(ctx));
}
submitWrite(ctx, msg, size, wait, now, promise);
@@ -569,8 +570,8 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
delay, TrafficCounter.milliSecondFromNano(), promise);
}
- abstract void submitWrite(final ChannelHandlerContext ctx, final Object msg, final long size,
- final long delay, final long now, final ChannelPromise promise);
+ abstract void submitWrite(
+ ChannelHandlerContext ctx, Object msg, long size, long delay, long now, ChannelPromise promise);
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
@@ -621,7 +622,7 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
.append(" maxSize: ").append(maxWriteSize)
.append(" and Counter: ");
if (trafficCounter != null) {
- builder.append(trafficCounter.toString());
+ builder.append(trafficCounter);
} else {
builder.append("none");
}
diff --git a/handler/src/main/java/io/netty/handler/traffic/ChannelTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/ChannelTrafficShapingHandler.java
index 518ffde960..8074a6a344 100644
--- a/handler/src/main/java/io/netty/handler/traffic/ChannelTrafficShapingHandler.java
+++ b/handler/src/main/java/io/netty/handler/traffic/ChannelTrafficShapingHandler.java
@@ -15,17 +15,17 @@
*/
package io.netty.handler.traffic;
-import java.util.ArrayDeque;
-import java.util.concurrent.TimeUnit;
-
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
+import java.util.ArrayDeque;
+import java.util.concurrent.TimeUnit;
+
/**
* This implementation of the {@link AbstractTrafficShapingHandler} is for channel
* traffic shaping, that is to say a per channel limitation of the bandwidth.
- * Note the index used in OutboundBuffer.setUserDefinedWritability(index, boolean)
is 1.
+ * Note the index used in {@code OutboundBuffer.setUserDefinedWritability(index, boolean)} is 1.
*
* The general use should be as follow:
*
@@ -48,10 +48,10 @@ import io.netty.channel.ChannelPromise;
*
* maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.
*
- * - In your handler, you should consider to use the
channel.isWritable()
and
- * channelWritabilityChanged(ctx)
to handle writability, or through
- * future.addListener(new GenericFutureListener())
on the future returned by
- * ctx.write()
.
+ * - In your handler, you should consider to use the {@code channel.isWritable()} and
+ * {@code channelWritabilityChanged(ctx)} to handle writability, or through
+ * {@code future.addListener(new GenericFutureListener())} on the future returned by
+ * {@code ctx.write()}.
* You shall also consider to have object size in read or write operations relatively adapted to
* the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
* while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.
@@ -168,7 +168,7 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler
final ChannelPromise promise;
private ToSend(final long delay, final Object toSend, final ChannelPromise promise) {
- this.relativeTimeAction = delay;
+ relativeTimeAction = delay;
this.toSend = toSend;
this.promise = promise;
}
diff --git a/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficCounter.java b/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficCounter.java
index 6be10863dc..6af7de8d66 100644
--- a/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficCounter.java
+++ b/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficCounter.java
@@ -86,6 +86,7 @@ public class GlobalChannelTrafficCounter extends TrafficCounter {
/**
* Start the monitoring process.
*/
+ @Override
public synchronized void start() {
if (monitorActive) {
return;
@@ -103,6 +104,7 @@ public class GlobalChannelTrafficCounter extends TrafficCounter {
/**
* Stop the monitoring process.
*/
+ @Override
public synchronized void stop() {
if (!monitorActive) {
return;
diff --git a/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficShapingHandler.java
index 2a7edd2cbf..91a68f0a0e 100644
--- a/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficShapingHandler.java
+++ b/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficShapingHandler.java
@@ -66,10 +66,10 @@ import java.util.concurrent.atomic.AtomicLong;
*
* maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.
*
- * - In your handler, you should consider to use the
channel.isWritable()
and
- * channelWritabilityChanged(ctx)
to handle writability, or through
- * future.addListener(new GenericFutureListener())
on the future returned by
- * ctx.write()
.
+ * - In your handler, you should consider to use the {@code channel.isWritable()} and
+ * {@code channelWritabilityChanged(ctx)} to handle writability, or through
+ * {@code future.addListener(new GenericFutureListener())} on the future returned by
+ * {@code ctx.write()}.
* - You shall also consider to have object size in read or write operations relatively adapted to
* the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
* while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.
@@ -528,7 +528,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
wait = perChannel.channelTrafficCounter.readTimeToWait(size, readChannelLimit, maxTime, now);
if (readDeviationActive) {
// now try to balance between the channels
- long maxLocalRead = 0;
+ long maxLocalRead;
maxLocalRead = perChannel.channelTrafficCounter.cumulativeReadBytes();
long maxGlobalRead = cumulativeReadBytes.get();
if (maxLocalRead <= 0) {
@@ -549,7 +549,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
// Only AutoRead AND HandlerActive True means Context Active
ChannelConfig config = ctx.channel().config();
if (logger.isDebugEnabled()) {
- logger.debug("Read Suspend: " + wait + ":" + config.isAutoRead() + ":"
+ logger.debug("Read Suspend: " + wait + ':' + config.isAutoRead() + ':'
+ isHandlerActive(ctx));
}
if (config.isAutoRead() && isHandlerActive(ctx)) {
@@ -565,7 +565,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
}
ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
if (logger.isDebugEnabled()) {
- logger.debug("Suspend final status => " + config.isAutoRead() + ":"
+ logger.debug("Suspend final status => " + config.isAutoRead() + ':'
+ isHandlerActive(ctx) + " will reopened at: " + wait);
}
}
@@ -603,7 +603,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
final long size;
private ToSend(final long delay, final Object toSend, final long size, final ChannelPromise promise) {
- this.relativeTimeAction = delay;
+ relativeTimeAction = delay;
this.toSend = toSend;
this.size = size;
this.promise = promise;
@@ -623,17 +623,20 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
* @return the list of TrafficCounters that exists at the time of the call.
*/
public Collection channelTrafficCounters() {
- Collection valueCollection = new AbstractCollection() {
+ return new AbstractCollection() {
@Override
public Iterator iterator() {
return new Iterator() {
final Iterator iter = channelQueues.values().iterator();
+ @Override
public boolean hasNext() {
return iter.hasNext();
}
+ @Override
public TrafficCounter next() {
return iter.next().channelTrafficCounter;
}
+ @Override
public void remove() {
throw new UnsupportedOperationException();
}
@@ -644,7 +647,6 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
return channelQueues.size();
}
};
- return valueCollection;
}
@Override
@@ -662,7 +664,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
wait = perChannel.channelTrafficCounter.writeTimeToWait(size, writeChannelLimit, maxTime, now);
if (writeDeviationActive) {
// now try to balance between the channels
- long maxLocalWrite = 0;
+ long maxLocalWrite;
maxLocalWrite = perChannel.channelTrafficCounter.cumulativeWrittenBytes();
long maxGlobalWrite = cumulativeWrittenBytes.get();
if (maxLocalWrite <= 0) {
@@ -679,7 +681,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
}
if (wait >= MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
- logger.debug("Write suspend: " + wait + ":" + ctx.channel().config().isAutoRead() + ":"
+ logger.debug("Write suspend: " + wait + ':' + ctx.channel().config().isAutoRead() + ':'
+ isHandlerActive(ctx));
}
submitWrite(ctx, msg, size, wait, now, promise);
diff --git a/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java
index 21bbfc8300..b7667b2d48 100644
--- a/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java
+++ b/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java
@@ -33,7 +33,7 @@ import java.util.concurrent.atomic.AtomicLong;
* This implementation of the {@link AbstractTrafficShapingHandler} is for global
* traffic shaping, that is to say a global limitation of the bandwidth, whatever
* the number of opened channels.
- * Note the index used in OutboundBuffer.setUserDefinedWritability(index, boolean)
is 2.
+ * Note the index used in {@code OutboundBuffer.setUserDefinedWritability(index, boolean)} is 2.
*
* The general use should be as follow:
*
@@ -57,10 +57,10 @@ import java.util.concurrent.atomic.AtomicLong;
*
* maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.
*
- * - In your handler, you should consider to use the
channel.isWritable()
and
- * channelWritabilityChanged(ctx)
to handle writability, or through
- * future.addListener(new GenericFutureListener())
on the future returned by
- * ctx.write()
.
+ * - In your handler, you should consider to use the {@code channel.isWritable()} and
+ * {@code channelWritabilityChanged(ctx)} to handle writability, or through
+ * {@code future.addListener(new GenericFutureListener())} on the future returned by
+ * {@code ctx.write()}.
* You shall also consider to have object size in read or write operations relatively adapted to
* the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
* while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.
@@ -192,7 +192,6 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
* the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}.
*/
public GlobalTrafficShapingHandler(EventExecutor executor) {
- super();
createGlobalTrafficCounter(executor);
}
@@ -314,7 +313,7 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
final ChannelPromise promise;
private ToSend(final long delay, final Object toSend, final long size, final ChannelPromise promise) {
- this.relativeTimeAction = delay;
+ relativeTimeAction = delay;
this.toSend = toSend;
this.size = size;
this.promise = promise;
diff --git a/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java b/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java
index cdd963a12e..134d4b305c 100644
--- a/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java
+++ b/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java
@@ -42,7 +42,7 @@ public class TrafficCounter {
/**
* @return the time in ms using nanoTime, so not real EPOCH time but elapsed time in ms.
*/
- public static final long milliSecondFromNano() {
+ public static long milliSecondFromNano() {
return System.nanoTime() / 1000000;
}
@@ -178,15 +178,10 @@ public class TrafficCounter {
private final TrafficCounter counter;
/**
-<<<<<<< HEAD
- * @param trafficShapingHandler The parent handler to which this task needs to callback to for accounting
- * @param counter The parent TrafficCounter that we need to reset the statistics for
-=======
* @param trafficShapingHandler
* The parent handler to which this task needs to callback to for accounting.
* @param counter
* The parent TrafficCounter that we need to reset the statistics for.
->>>>>>> b886c05... Fix big transfer and Write traffic shaping issues
*/
protected TrafficMonitoringTask(
AbstractTrafficShapingHandler trafficShapingHandler,
@@ -247,11 +242,7 @@ public class TrafficCounter {
/**
* Reset the accounting on Read and Write.
*
-<<<<<<< HEAD
- * @param newLastTime the millisecond unix timestamp that we should be considered up-to-date for
-=======
* @param newLastTime the milliseconds unix timestamp that we should be considered up-to-date for.
->>>>>>> b886c05... Fix big transfer and Write traffic shaping issues
*/
synchronized void resetAccounting(long newLastTime) {
long interval = newLastTime - lastTime.getAndSet(newLastTime);
@@ -259,7 +250,7 @@ public class TrafficCounter {
// nothing to do
return;
}
- if (logger.isDebugEnabled() && (interval > checkInterval() << 1)) {
+ if (logger.isDebugEnabled() && interval > checkInterval() << 1) {
logger.debug("Acct schedule not ok: " + interval + " > 2*" + checkInterval() + " from " + name);
}
lastReadBytes = currentReadBytes.getAndSet(0);
@@ -511,7 +502,7 @@ public class TrafficCounter {
long time = sum * 1000 / limitTraffic - interval + pastDelay;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
- logger.debug("Time: " + time + ":" + sum + ":" + interval + ":" + pastDelay);
+ logger.debug("Time: " + time + ':' + sum + ':' + interval + ':' + pastDelay);
}
if (time > maxTime && now + time - localReadingTime > maxTime) {
time = maxTime;
@@ -528,7 +519,7 @@ public class TrafficCounter {
long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
- logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval + ":" + pastDelay);
+ logger.debug("Time: " + time + ':' + lastsum + ':' + lastinterval + ':' + pastDelay);
}
if (time > maxTime && now + time - localReadingTime > maxTime) {
time = maxTime;
@@ -586,7 +577,7 @@ public class TrafficCounter {
long time = sum * 1000 / limitTraffic - interval + pastDelay;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
- logger.debug("Time: " + time + ":" + sum + ":" + interval + ":" + pastDelay);
+ logger.debug("Time: " + time + ':' + sum + ':' + interval + ':' + pastDelay);
}
if (time > maxTime && now + time - localWritingTime > maxTime) {
time = maxTime;
@@ -603,7 +594,7 @@ public class TrafficCounter {
long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
- logger.debug("Time: " + time + ":" + lastsum + ":" + lastinterval + ":" + pastDelay);
+ logger.debug("Time: " + time + ':' + lastsum + ':' + lastinterval + ':' + pastDelay);
}
if (time > maxTime && now + time - localWritingTime > maxTime) {
time = maxTime;