Add a new constructor without handler parameter to TrafficCounter
Related: #3476 Motivation: Some users use TrafficCounter for other uses than we originally intended, such as implementing their own traffic shaper. In such a case, a user does not want to specify an AbstractTrafficShapingHandler. Modifications: - Add a new constructor that does not require an AbstractTrafficShapingHandler, so that a user can use it without it. - Simplify TrafficMonitoringTask - Javadoc cleanup Result: We open the possibility of using TrafficCounter for other purposes than just using it with AbstractTrafficShapingHandler. Eventually, we could generalize it a little bit more, so that we can potentially use it for other uses.
This commit is contained in:
parent
96cb879054
commit
54293003ef
@ -21,20 +21,16 @@ import java.util.concurrent.TimeUnit;
|
|||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* TrafficCounter is associated with {@link AbstractTrafficShapingHandler}.
|
* Counts the number of read and written bytes for rate-limiting traffic.
|
||||||
*
|
|
||||||
* <p>
|
* <p>
|
||||||
* A <tt>TrafficCounter</tt> counts the read and written bytes such that the {@link AbstractTrafficShapingHandler}
|
* It computes the statistics for both inbound and outbound traffic periodically at the given
|
||||||
* can limit the traffic, globally or per channel.
|
* {@code checkInterval}, and calls the {@link AbstractTrafficShapingHandler#doAccounting(TrafficCounter)} method back.
|
||||||
* </p>
|
* If the {@code checkInterval} is {@code 0}, no accounting will be done and statistics will only be computed at each
|
||||||
*
|
* receive or write operation.
|
||||||
* <p>
|
|
||||||
* It computes the statistics for both read and written every {@link #checkInterval}, and calls back to its parent
|
|
||||||
* {@link AbstractTrafficShapingHandler#doAccounting} method. If the checkInterval is set to 0, no accounting will be
|
|
||||||
* done and statistics will only be computed at each receive or write operation.
|
|
||||||
* </p>
|
* </p>
|
||||||
*/
|
*/
|
||||||
public class TrafficCounter {
|
public class TrafficCounter {
|
||||||
|
|
||||||
private static final InternalLogger logger = InternalLoggerFactory.getInstance(TrafficCounter.class);
|
private static final InternalLogger logger = InternalLoggerFactory.getInstance(TrafficCounter.class);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -163,39 +159,17 @@ public class TrafficCounter {
|
|||||||
* Class to implement monitoring at fix delay
|
* Class to implement monitoring at fix delay
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
private static class TrafficMonitoringTask implements Runnable {
|
private final class TrafficMonitoringTask implements Runnable {
|
||||||
/**
|
|
||||||
* The associated TrafficShapingHandler
|
|
||||||
*/
|
|
||||||
private final AbstractTrafficShapingHandler trafficShapingHandler1;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The associated TrafficCounter
|
|
||||||
*/
|
|
||||||
private final TrafficCounter counter;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param trafficShapingHandler
|
|
||||||
* The parent handler to which this task needs to callback to for accounting.
|
|
||||||
* @param counter
|
|
||||||
* The parent TrafficCounter that we need to reset the statistics for.
|
|
||||||
*/
|
|
||||||
protected TrafficMonitoringTask(AbstractTrafficShapingHandler trafficShapingHandler, TrafficCounter counter) {
|
|
||||||
trafficShapingHandler1 = trafficShapingHandler;
|
|
||||||
this.counter = counter;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
if (!counter.monitorActive) {
|
if (!monitorActive) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
counter.resetAccounting(milliSecondFromNano());
|
resetAccounting(milliSecondFromNano());
|
||||||
if (trafficShapingHandler1 != null) {
|
if (trafficShapingHandler != null) {
|
||||||
trafficShapingHandler1.doAccounting(counter);
|
trafficShapingHandler.doAccounting(TrafficCounter.this);
|
||||||
}
|
}
|
||||||
counter.scheduledFuture = counter.executor.schedule(this, counter.checkInterval.get(),
|
scheduledFuture = executor.schedule(this, checkInterval.get(), TimeUnit.MILLISECONDS);
|
||||||
TimeUnit.MILLISECONDS);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -211,7 +185,7 @@ public class TrafficCounter {
|
|||||||
// if executor is null, it means it is piloted by a GlobalChannelTrafficCounter, so no executor
|
// if executor is null, it means it is piloted by a GlobalChannelTrafficCounter, so no executor
|
||||||
if (localCheckInterval > 0 && executor != null) {
|
if (localCheckInterval > 0 && executor != null) {
|
||||||
monitorActive = true;
|
monitorActive = true;
|
||||||
monitor = new TrafficMonitoringTask(trafficShapingHandler, this);
|
monitor = new TrafficMonitoringTask();
|
||||||
scheduledFuture =
|
scheduledFuture =
|
||||||
executor.schedule(monitor, localCheckInterval, TimeUnit.MILLISECONDS);
|
executor.schedule(monitor, localCheckInterval, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
@ -259,6 +233,33 @@ public class TrafficCounter {
|
|||||||
lastReadingTime = Math.max(lastReadingTime, readingTime);
|
lastReadingTime = Math.max(lastReadingTime, readingTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor with the {@link AbstractTrafficShapingHandler} that hosts it, the {@link ScheduledExecutorService}
|
||||||
|
* to use, its name, the checkInterval between two computations in milliseconds.
|
||||||
|
*
|
||||||
|
* @param executor
|
||||||
|
* the underlying executor service for scheduling checks, might be null when used
|
||||||
|
* from {@link GlobalChannelTrafficCounter}.
|
||||||
|
* @param name
|
||||||
|
* the name given to this monitor.
|
||||||
|
* @param checkInterval
|
||||||
|
* the checkInterval in millisecond between two computations.
|
||||||
|
*/
|
||||||
|
public TrafficCounter(ScheduledExecutorService executor, String name, long checkInterval) {
|
||||||
|
if (executor == null) {
|
||||||
|
throw new NullPointerException("executor");
|
||||||
|
}
|
||||||
|
if (name == null) {
|
||||||
|
throw new NullPointerException("name");
|
||||||
|
}
|
||||||
|
|
||||||
|
trafficShapingHandler = null;
|
||||||
|
this.executor = executor;
|
||||||
|
this.name = name;
|
||||||
|
|
||||||
|
init(checkInterval);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor with the {@link AbstractTrafficShapingHandler} that hosts it, the Timer to use, its
|
* Constructor with the {@link AbstractTrafficShapingHandler} that hosts it, the Timer to use, its
|
||||||
* name, the checkInterval between two computations in millisecond.
|
* name, the checkInterval between two computations in millisecond.
|
||||||
@ -273,14 +274,28 @@ public class TrafficCounter {
|
|||||||
* @param checkInterval
|
* @param checkInterval
|
||||||
* the checkInterval in millisecond between two computations.
|
* the checkInterval in millisecond between two computations.
|
||||||
*/
|
*/
|
||||||
public TrafficCounter(AbstractTrafficShapingHandler trafficShapingHandler, ScheduledExecutorService executor,
|
public TrafficCounter(
|
||||||
|
AbstractTrafficShapingHandler trafficShapingHandler, ScheduledExecutorService executor,
|
||||||
String name, long checkInterval) {
|
String name, long checkInterval) {
|
||||||
|
|
||||||
if (trafficShapingHandler == null) {
|
if (trafficShapingHandler == null) {
|
||||||
throw new IllegalArgumentException("TrafficShapingHandler must not be null");
|
throw new IllegalArgumentException("trafficShapingHandler");
|
||||||
}
|
}
|
||||||
|
if (executor == null) {
|
||||||
|
throw new NullPointerException("executor");
|
||||||
|
}
|
||||||
|
if (name == null) {
|
||||||
|
throw new NullPointerException("name");
|
||||||
|
}
|
||||||
|
|
||||||
this.trafficShapingHandler = trafficShapingHandler;
|
this.trafficShapingHandler = trafficShapingHandler;
|
||||||
this.executor = executor;
|
this.executor = executor;
|
||||||
this.name = name;
|
this.name = name;
|
||||||
|
|
||||||
|
init(checkInterval);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void init(long checkInterval) {
|
||||||
// absolute time: informative only
|
// absolute time: informative only
|
||||||
lastCumulativeTime = System.currentTimeMillis();
|
lastCumulativeTime = System.currentTimeMillis();
|
||||||
writingTime = milliSecondFromNano();
|
writingTime = milliSecondFromNano();
|
||||||
@ -337,8 +352,6 @@ public class TrafficCounter {
|
|||||||
*
|
*
|
||||||
* @param write
|
* @param write
|
||||||
* the size in bytes to write
|
* the size in bytes to write
|
||||||
* @param schedule
|
|
||||||
* the time when this write was scheduled
|
|
||||||
*/
|
*/
|
||||||
void bytesRealWriteFlowControl(long write) {
|
void bytesRealWriteFlowControl(long write) {
|
||||||
realWrittenBytes.addAndGet(write);
|
realWrittenBytes.addAndGet(write);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user