From 5adb37de3d0e3625941ef2b42cdd2cc3fb4bc1c1 Mon Sep 17 00:00:00 2001 From: Luke Wood Date: Sun, 23 Dec 2012 19:29:45 +0000 Subject: [PATCH] Port traffic handler to netty 4 --- .../AbstractTrafficShapingHandler.java | 348 ++++++++++++++++ .../traffic/ChannelTrafficShapingHandler.java | 102 +++++ .../traffic/GlobalTrafficShapingHandler.java | 135 ++++++ .../netty/handler/traffic/TrafficCounter.java | 393 ++++++++++++++++++ .../netty/handler/traffic/package-info.java | 59 +++ 5 files changed, 1037 insertions(+) create mode 100644 handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java create mode 100644 handler/src/main/java/io/netty/handler/traffic/ChannelTrafficShapingHandler.java create mode 100644 handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java create mode 100644 handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java create mode 100644 handler/src/main/java/io/netty/handler/traffic/package-info.java diff --git a/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java new file mode 100644 index 0000000000..13d479bcb9 --- /dev/null +++ b/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java @@ -0,0 +1,348 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.traffic; + +import io.netty.buffer.Buf; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerAdapter; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundByteHandler; +import io.netty.channel.ChannelOutboundByteHandler; +import io.netty.util.Attribute; +import io.netty.util.AttributeKey; + +import java.util.concurrent.TimeUnit; + +/** + * AbstractTrafficShapingHandler allows to limit the global bandwidth + * (see {@link GlobalTrafficShapingHandler}) or per session + * bandwidth (see {@link ChannelTrafficShapingHandler}), as traffic shaping. + * It allows you to implement an almost real time monitoring of the bandwidth using + * the monitors from {@link TrafficCounter} that will call back every checkInterval + * the method doAccounting of this handler.
+ *
+ * + * If you want for any particular reasons to stop the monitoring (accounting) or to change + * the read/write limit or the check interval, several methods allow that for you:
+ * + */ +public abstract class AbstractTrafficShapingHandler extends ChannelHandlerAdapter + implements ChannelInboundByteHandler, ChannelOutboundByteHandler { + + /** + * Default delay between two checks: 1s + */ + public static final long DEFAULT_CHECK_INTERVAL = 1000; + + /** + * Default minimal time to wait + */ + private static final long MINIMAL_WAIT = 10; + + /** + * Traffic Counter + */ + protected TrafficCounter trafficCounter; + + /** + * Limit in B/s to apply to write + */ + private long writeLimit; + + /** + * Limit in B/s to apply to read + */ + private long readLimit; + + /** + * Delay between two performance snapshots + */ + protected long checkInterval = DEFAULT_CHECK_INTERVAL; // default 1 s + + private static final AttributeKey REOPEN_TASK = new AttributeKey("reopenTask"); + private static final AttributeKey BUFFER_UPDATE_TASK = new AttributeKey("bufferUpdateTask"); + + /** + * + * @param newTrafficCounter the TrafficCounter to set + */ + void setTrafficCounter(TrafficCounter newTrafficCounter) { + trafficCounter = newTrafficCounter; + } + + /** + * @param writeLimit + * 0 or a limit in bytes/s + * @param readLimit + * 0 or a limit in bytes/s + * @param checkInterval + * The delay between two computations of performances for + * channels or 0 if no stats are to be computed + */ + protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, + long checkInterval) { + this.writeLimit = writeLimit; + this.readLimit = readLimit; + this.checkInterval = checkInterval; + } + + /** + * Constructor using default Check Interval + * + * @param writeLimit + * 0 or a limit in bytes/s + * @param readLimit + * 0 or a limit in bytes/s + */ + protected AbstractTrafficShapingHandler(long writeLimit, long readLimit) { + this(writeLimit, readLimit, DEFAULT_CHECK_INTERVAL); + } + + /** + * Constructor using NO LIMIT and default Check Interval + */ + protected AbstractTrafficShapingHandler() { + this(0, 0, DEFAULT_CHECK_INTERVAL); + } + + /** + * Constructor using NO LIMIT + * + * @param checkInterval + * The delay between two computations of performances for + * channels or 0 if no stats are to be computed + */ + protected AbstractTrafficShapingHandler(long checkInterval) { + this(0, 0, checkInterval); + } + + /** + * Change the underlying limitations and check interval. + * + * @param newWriteLimit The new write limit (in bytes) + * @param newReadLimit The new read limit (in bytes) + * @param newCheckInterval The new check interval (in milliseconds) + */ + public void configure(long newWriteLimit, long newReadLimit, + long newCheckInterval) { + configure(newWriteLimit, newReadLimit); + configure(newCheckInterval); + } + + /** + * Change the underlying limitations. + * + * @param newWriteLimit The new write limit (in bytes) + * @param newReadLimit The new read limit (in bytes) + */ + public void configure(long newWriteLimit, long newReadLimit) { + writeLimit = newWriteLimit; + readLimit = newReadLimit; + if (trafficCounter != null) { + trafficCounter.resetAccounting(System.currentTimeMillis() + 1); + } + } + + /** + * Change the check interval. + * + * @param newCheckInterval The new check interval (in milliseconds) + */ + public void configure(long newCheckInterval) { + checkInterval = newCheckInterval; + if (trafficCounter != null) { + trafficCounter.configure(checkInterval); + } + } + + /** + * Called each time the accounting is computed from the TrafficCounters. + * This method could be used for instance to implement almost real time accounting. + * + * @param counter + * the TrafficCounter that computes its performance + */ + @SuppressWarnings("unused") + protected void doAccounting(TrafficCounter counter) { + // NOOP by default + } + + /** + * Class to implement setReadable at fix time + */ + private static final class ReopenReadTimerTask implements Runnable { + final ChannelHandlerContext ctx; + ReopenReadTimerTask(ChannelHandlerContext ctx) { + this.ctx = ctx; + } + + @Override + public void run() { + if (ctx.channel().isActive()) { + ctx.readable(true); + } + } + } + + /** + * + * @return the time that should be necessary to wait to respect limit. Can + * be negative time + */ + private static long getTimeToWait(long limit, long bytes, long lastTime, + long curtime) { + long interval = curtime - lastTime; + if (interval == 0) { + // Time is too short, so just lets continue + return 0; + } + return (bytes * 1000 / limit - interval / 10) * 10; + } + + @Override + public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception { + return ctx.nextInboundByteBuffer(); + } + + @Override + public void freeInboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception { + // do nothing + } + + @Override + public ByteBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception { + return ctx.nextOutboundByteBuffer(); + } + + @Override + public void freeOutboundBuffer(ChannelHandlerContext ctx, Buf buf) throws Exception { + // do nothing + } + + @Override + public void inboundBufferUpdated(final ChannelHandlerContext ctx) throws Exception { + ByteBuf buf = ctx.inboundByteBuffer(); + long curtime = System.currentTimeMillis(); + long size = buf.readableBytes(); + + if (trafficCounter != null) { + trafficCounter.bytesRecvFlowControl(size); + if (readLimit == 0) { + // no action + ctx.fireInboundBufferUpdated(); + + return; + } + + // compute the number of ms to wait before reopening the channel + long wait = getTimeToWait(readLimit, + trafficCounter.getCurrentReadBytes(), + trafficCounter.getLastTime(), curtime); + if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal + // time in order to + // try to limit the traffic + if (ctx.isReadable()) { + ctx.readable(false); + + // Create a Runnable to reactive the read if needed. If one was create before it will just be + // reused to limit object creation + Attribute attr = ctx.attr(REOPEN_TASK); + Runnable reopenTask = attr.get(); + if (reopenTask == null) { + reopenTask = new ReopenReadTimerTask(ctx); + attr.set(reopenTask); + } + ctx.executor().schedule(reopenTask, wait, + TimeUnit.MILLISECONDS); + } else { + // Create a Runnable to update the next handler in the chain. If one was create before it will + // just be reused to limit object creation + Attribute attr = ctx.attr(BUFFER_UPDATE_TASK); + Runnable bufferUpdateTask = attr.get(); + if (bufferUpdateTask == null) { + bufferUpdateTask = new Runnable() { + @Override + public void run() { + ctx.fireInboundBufferUpdated(); + } + }; + attr.set(bufferUpdateTask); + } + ctx.executor().schedule(bufferUpdateTask, wait, TimeUnit.MILLISECONDS); + return; + } + } + } + ctx.fireInboundBufferUpdated(); + } + + @Override + public void flush(final ChannelHandlerContext ctx, final ChannelFuture future) throws Exception { + long curtime = System.currentTimeMillis(); + long size = ctx.outboundByteBuffer().readableBytes(); + + if (trafficCounter != null) { + trafficCounter.bytesWriteFlowControl(size); + if (writeLimit == 0) { + ctx.flush(future); + return; + } + // compute the number of ms to wait before continue with the + // channel + long wait = getTimeToWait(writeLimit, + trafficCounter.getCurrentWrittenBytes(), + trafficCounter.getLastTime(), curtime); + if (wait >= MINIMAL_WAIT) { + ctx.executor().schedule(new Runnable() { + @Override + public void run() { + ctx.flush(future); + } + }, wait, TimeUnit.MILLISECONDS); + return; + } + } + ctx.flush(future); + } + + /** + * + * @return the current TrafficCounter (if + * channel is still connected) + */ + public TrafficCounter getTrafficCounter() { + return trafficCounter; + } + + @Override + public void beforeRemove(ChannelHandlerContext ctx) { + if (trafficCounter != null) { + trafficCounter.stop(); + } + } + + @Override + public String toString() { + return "TrafficShaping with Write Limit: " + writeLimit + + " Read Limit: " + readLimit + " and Counter: " + + (trafficCounter != null? trafficCounter.toString() : "none"); + } +} diff --git a/handler/src/main/java/io/netty/handler/traffic/ChannelTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/ChannelTrafficShapingHandler.java new file mode 100644 index 0000000000..0c26b6ba95 --- /dev/null +++ b/handler/src/main/java/io/netty/handler/traffic/ChannelTrafficShapingHandler.java @@ -0,0 +1,102 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.traffic; + +import io.netty.channel.ChannelHandlerContext; + +/** + * This implementation of the {@link AbstractTrafficShapingHandler} is for channel + * traffic shaping, that is to say a per channel limitation of the bandwidth.

+ * + * The general use should be as follow:
+ *
    + *
  • Add in your pipeline a new ChannelTrafficShapingHandler.
    + * ChannelTrafficShapingHandler myHandler = new ChannelTrafficShapingHandler();
    + * pipeline.addLast(myHandler);

    + * + * Note that this handler has a Pipeline Coverage of "one" which means a new handler must be created + * for each new channel as the counter cannot be shared among all channels..

    + * + * Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation) + * or the check interval (in millisecond) that represents the delay between two computations of the + * bandwidth and so the call back of the doAccounting method (0 means no accounting at all).

    + * + * A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting, + * it is recommended to set a positive value, even if it is high since the precision of the + * Traffic Shaping depends on the period where the traffic is computed. The highest the interval, + * the less precise the traffic shaping will be. It is suggested as higher value something close + * to 5 or 10 minutes.
    + *
  • + *

+ */ +public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler { + + /** + * Create a new instance + * + * @param writeLimit + * 0 or a limit in bytes/s + * @param readLimit + * 0 or a limit in bytes/s + * @param checkInterval + * The delay between two computations of performances for + * channels or 0 if no stats are to be computed + */ + public ChannelTrafficShapingHandler(long writeLimit, + long readLimit, long checkInterval) { + super(writeLimit, readLimit, checkInterval); + } + + /** + * Create a new instance + * + * @param writeLimit + * 0 or a limit in bytes/s + * @param readLimit + * 0 or a limit in bytes/s + */ + public ChannelTrafficShapingHandler(long writeLimit, + long readLimit) { + super(writeLimit, readLimit); + } + + /** + * Create a new instance + * + * @param checkInterval + * The delay between two computations of performances for + * channels or 0 if no stats are to be computed + */ + public ChannelTrafficShapingHandler(long checkInterval) { + super(checkInterval); + } + + @Override + public void beforeAdd(ChannelHandlerContext ctx) throws Exception { + TrafficCounter trafficCounter = new TrafficCounter(this, ctx.executor(), "ChannelTC" + + ctx.channel().id(), checkInterval); + setTrafficCounter(trafficCounter); + trafficCounter.start(); + } + + @Override + public void afterRemove(ChannelHandlerContext ctx) throws Exception { + super.afterRemove(ctx); + if (trafficCounter != null) { + trafficCounter.stop(); + } + } +} diff --git a/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java new file mode 100644 index 0000000000..3b3676138d --- /dev/null +++ b/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java @@ -0,0 +1,135 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.traffic; + +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.EventExecutor; + +import java.util.concurrent.ScheduledExecutorService; + +/** + * This implementation of the {@link AbstractTrafficShapingHandler} is for global + * traffic shaping, that is to say a global limitation of the bandwidth, whatever + * the number of opened channels.

+ * + * The general use should be as follow:
+ *
    + *
  • Create your unique GlobalTrafficShapingHandler like:

    + * GlobalTrafficShapingHandler myHandler = new GlobalTrafficShapingHandler(executor);

    + * The executor could be the underlying IO worker pool
    + * pipeline.addLast(myHandler);

    + * + * Note that this handler has a Pipeline Coverage of "all" which means only one such handler must be created + * and shared among all channels as the counter must be shared among all channels.

    + * + * Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation) + * or the check interval (in millisecond) that represents the delay between two computations of the + * bandwidth and so the call back of the doAccounting method (0 means no accounting at all).

    + * + * A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting, + * it is recommended to set a positive value, even if it is high since the precision of the + * Traffic Shaping depends on the period where the traffic is computed. The highest the interval, + * the less precise the traffic shaping will be. It is suggested as higher value something close + * to 5 or 10 minutes.
    + *
  • + *

+ * + * Be sure to call {@link #release()} once this handler is not needed anymore to release all internal resources. + * This will not shutdown the {@link EventExecutor} as it may be shared, so you need to do this by your own. + */ +@Sharable +public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler { + /** + * Create the global TrafficCounter + */ + void createGlobalTrafficCounter(ScheduledExecutorService executor) { + if (executor == null) { + throw new NullPointerException("executor"); + } + TrafficCounter tc = new TrafficCounter(this, executor, "GlobalTC", + checkInterval); + setTrafficCounter(tc); + tc.start(); + } + + /** + * Create a new instance + * + * @param executor + * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter} + * @param writeLimit + * 0 or a limit in bytes/s + * @param readLimit + * 0 or a limit in bytes/s + * @param checkInterval + * The delay between two computations of performances for + * channels or 0 if no stats are to be computed + */ + public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit, + long readLimit, long checkInterval) { + super(writeLimit, readLimit, checkInterval); + createGlobalTrafficCounter(executor); + } + + /** + * Create a new instance + * + * @param executor + * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter} + * @param writeLimit + * 0 or a limit in bytes/s + * @param readLimit + * 0 or a limit in bytes/s + */ + public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit, + long readLimit) { + super(writeLimit, readLimit); + createGlobalTrafficCounter(executor); + } + + /** + * Create a new instance + * + * @param executor + * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter} + * @param checkInterval + * The delay between two computations of performances for + * channels or 0 if no stats are to be computed + */ + public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long checkInterval) { + super(checkInterval); + createGlobalTrafficCounter(executor); + } + + /** + * Create a new instance + * + * @param executor + * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter} + */ + public GlobalTrafficShapingHandler(EventExecutor executor) { + createGlobalTrafficCounter(executor); + } + + /** + * Release all internal resources of this instance + */ + public final void release() { + if (trafficCounter != null) { + trafficCounter.stop(); + } + } +} diff --git a/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java b/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java new file mode 100644 index 0000000000..c067bc2ed1 --- /dev/null +++ b/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java @@ -0,0 +1,393 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.traffic; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * TrafficCounter is associated with {@link AbstractTrafficShapingHandler}. + * + *

A TrafficCounter counts the read and written bytes such that the + * {@link AbstractTrafficShapingHandler} can limit the traffic, globally or per channel.

+ * + *

It computes the statistics for both read and written every {@link #checkInterval}, and calls + * back to its parent {@link AbstractTrafficShapingHandler#doAccounting} method. If the checkInterval + * is set to 0, no accounting will be done and statistics will only be computed at each receive or + * write operation.

+ */ +public class TrafficCounter { + /** + * Current written bytes + */ + private final AtomicLong currentWrittenBytes = new AtomicLong(); + + /** + * Current read bytes + */ + private final AtomicLong currentReadBytes = new AtomicLong(); + + /** + * Long life written bytes + */ + private final AtomicLong cumulativeWrittenBytes = new AtomicLong(); + + /** + * Long life read bytes + */ + private final AtomicLong cumulativeReadBytes = new AtomicLong(); + + /** + * Last Time where cumulative bytes where reset to zero + */ + private long lastCumulativeTime; + + /** + * Last writing bandwidth + */ + private long lastWriteThroughput; + + /** + * Last reading bandwidth + */ + private long lastReadThroughput; + + /** + * Last Time Check taken + */ + private final AtomicLong lastTime = new AtomicLong(); + + /** + * Last written bytes number during last check interval + */ + private long lastWrittenBytes; + + /** + * Last read bytes number during last check interval + */ + private long lastReadBytes; + + /** + * Delay between two captures + */ + AtomicLong checkInterval = new AtomicLong( + AbstractTrafficShapingHandler.DEFAULT_CHECK_INTERVAL); + + // default 1 s + + /** + * Name of this Monitor + */ + final String name; + + /** + * The associated TrafficShapingHandler + */ + private final AbstractTrafficShapingHandler trafficShapingHandler; + + /** + * Executor that will run the monitor + */ + private final ScheduledExecutorService executor; + /** + * Monitor created once in start() + */ + private Runnable monitor; + /** + * used in stop() to cancel the timer + */ + private volatile ScheduledFuture scheduledFuture; + + /** + * Is Monitor active + */ + AtomicBoolean monitorActive = new AtomicBoolean(); + + /** + * Class to implement monitoring at fix delay + * + */ + private static class TrafficMonitoringTask implements Runnable { + /** + * The associated TrafficShapingHandler + */ + private final AbstractTrafficShapingHandler trafficShapingHandler1; + + /** + * The associated TrafficCounter + */ + private final TrafficCounter counter; + + /** + * @param trafficShapingHandler The parent handler to which this task needs to callback to for accounting + * @param counter The parent TrafficCounter that we need to reset the statistics for + */ + protected TrafficMonitoringTask( + AbstractTrafficShapingHandler trafficShapingHandler, + TrafficCounter counter) { + trafficShapingHandler1 = trafficShapingHandler; + this.counter = counter; + } + + @Override + public void run() { + if (!counter.monitorActive.get()) { + return; + } + long endTime = System.currentTimeMillis(); + counter.resetAccounting(endTime); + if (trafficShapingHandler1 != null) { + trafficShapingHandler1.doAccounting(counter); + } + counter.scheduledFuture = counter.executor.schedule(this, counter.checkInterval.get(), + TimeUnit.MILLISECONDS); + } + } + + /** + * Start the monitoring process + */ + public void start() { + synchronized (lastTime) { + if (monitorActive.get()) { + return; + } + lastTime.set(System.currentTimeMillis()); + if (checkInterval.get() > 0) { + monitorActive.set(true); + monitor = new TrafficMonitoringTask(trafficShapingHandler, this); + scheduledFuture = + executor.schedule(monitor, checkInterval.get(), TimeUnit.MILLISECONDS); + } + } + } + + /** + * Stop the monitoring process + */ + public void stop() { + synchronized (lastTime) { + if (!monitorActive.get()) { + return; + } + monitorActive.set(false); + resetAccounting(System.currentTimeMillis()); + if (trafficShapingHandler != null) { + trafficShapingHandler.doAccounting(this); + } + if (scheduledFuture != null) { + scheduledFuture.cancel(true); + } + } + } + + /** + * Reset the accounting on Read and Write + * + * @param newLastTime the millisecond unix timestamp that we should be considered up-to-date for + */ + void resetAccounting(long newLastTime) { + synchronized (lastTime) { + long interval = newLastTime - lastTime.getAndSet(newLastTime); + if (interval == 0) { + // nothing to do + return; + } + lastReadBytes = currentReadBytes.getAndSet(0); + lastWrittenBytes = currentWrittenBytes.getAndSet(0); + lastReadThroughput = lastReadBytes / interval * 1000; + // nb byte / checkInterval in ms * 1000 (1s) + lastWriteThroughput = lastWrittenBytes / interval * 1000; + // nb byte / checkInterval in ms * 1000 (1s) + } + } + + /** + * Constructor with the {@link AbstractTrafficShapingHandler} that hosts it, the Timer to use, its + * name, the checkInterval between two computations in millisecond + * @param trafficShapingHandler the associated AbstractTrafficShapingHandler + * @param executor the underlying executor service for scheduling checks + * @param name the name given to this monitor + * @param checkInterval the checkInterval in millisecond between two computations + */ + public TrafficCounter(AbstractTrafficShapingHandler trafficShapingHandler, + ScheduledExecutorService executor, String name, long checkInterval) { + this.trafficShapingHandler = trafficShapingHandler; + this.executor = executor; + this.name = name; + lastCumulativeTime = System.currentTimeMillis(); + configure(checkInterval); + } + + /** + * Change checkInterval between two computations in millisecond + * + * @param newcheckInterval The new check interval (in milliseconds) + */ + public void configure(long newcheckInterval) { + long newInterval = newcheckInterval / 10 * 10; + if (checkInterval.get() != newInterval) { + checkInterval.set(newInterval); + if (newInterval <= 0) { + stop(); + // No more active monitoring + lastTime.set(System.currentTimeMillis()); + } else { + // Start if necessary + start(); + } + } + } + + /** + * Computes counters for Read. + * + * @param recv + * the size in bytes to read + */ + void bytesRecvFlowControl(long recv) { + currentReadBytes.addAndGet(recv); + cumulativeReadBytes.addAndGet(recv); + } + + /** + * Computes counters for Write. + * + * @param write + * the size in bytes to write + */ + void bytesWriteFlowControl(long write) { + currentWrittenBytes.addAndGet(write); + cumulativeWrittenBytes.addAndGet(write); + } + + /** + * + * @return the current checkInterval between two computations of traffic counter + * in millisecond + */ + public long getCheckInterval() { + return checkInterval.get(); + } + + /** + * + * @return the Read Throughput in bytes/s computes in the last check interval + */ + public long getLastReadThroughput() { + return lastReadThroughput; + } + + /** + * + * @return the Write Throughput in bytes/s computes in the last check interval + */ + public long getLastWriteThroughput() { + return lastWriteThroughput; + } + + /** + * + * @return the number of bytes read during the last check Interval + */ + public long getLastReadBytes() { + return lastReadBytes; + } + + /** + * + * @return the number of bytes written during the last check Interval + */ + public long getLastWrittenBytes() { + return lastWrittenBytes; + } + + /** + * + * @return the current number of bytes read since the last checkInterval + */ + public long getCurrentReadBytes() { + return currentReadBytes.get(); + } + + /** + * + * @return the current number of bytes written since the last check Interval + */ + public long getCurrentWrittenBytes() { + return currentWrittenBytes.get(); + } + + /** + * @return the Time in millisecond of the last check as of System.currentTimeMillis() + */ + public long getLastTime() { + return lastTime.get(); + } + + /** + * @return the cumulativeWrittenBytes + */ + public long getCumulativeWrittenBytes() { + return cumulativeWrittenBytes.get(); + } + + /** + * @return the cumulativeReadBytes + */ + public long getCumulativeReadBytes() { + return cumulativeReadBytes.get(); + } + + /** + * @return the lastCumulativeTime in millisecond as of System.currentTimeMillis() + * when the cumulative counters were reset to 0. + */ + public long getLastCumulativeTime() { + return lastCumulativeTime; + } + + /** + * Reset both read and written cumulative bytes counters and the associated time. + */ + public void resetCumulativeTime() { + lastCumulativeTime = System.currentTimeMillis(); + cumulativeReadBytes.set(0); + cumulativeWrittenBytes.set(0); + } + + /** + * @return the name + */ + public String getName() { + return name; + } + + /** + * String information + */ + @Override + public String toString() { + return "Monitor " + name + " Current Speed Read: " + + (lastReadThroughput >> 10) + " KB/s, Write: " + + (lastWriteThroughput >> 10) + " KB/s Current Read: " + + (currentReadBytes.get() >> 10) + " KB Current Write: " + + (currentWrittenBytes.get() >> 10) + " KB"; + } +} diff --git a/handler/src/main/java/io/netty/handler/traffic/package-info.java b/handler/src/main/java/io/netty/handler/traffic/package-info.java new file mode 100644 index 0000000000..44123fbb1d --- /dev/null +++ b/handler/src/main/java/io/netty/handler/traffic/package-info.java @@ -0,0 +1,59 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +/** + * Implementation of a Traffic Shaping Handler and Dynamic Statistics. + * + *

The main goal of this package is to allow you to shape the traffic (bandwidth limitation), + * but also to get statistics on how many bytes are read or written. Both functions can + * be active or inactive (traffic or statistics).

+ * + *

Two classes implement this behavior: + *

    + *
  • {@link TrafficCounter}: this class implements the counters needed by the handlers. + * It can be accessed to get some extra information like the read or write bytes since last check, the read and write + * bandwidth from last check...
  • + * + *
  • {@link AbstractTrafficShapingHandler}: this abstract class implements the kernel + * of traffic shaping. It could be extended to fit your needs. Two classes are proposed as default + * implementations: see {@link ChannelTrafficShapingHandler} and see {@link GlobalTrafficShapingHandler} + * respectively for Channel traffic shaping and Global traffic shaping.
  • + *

+ * + *

Both inbound and outbound traffic can be shaped independently. This is done by either passing in + * the desired limiting values to the constructors of both the Channel and Global traffic shaping handlers, + * or by calling the configure method on the {@link AbstractTrafficShapingHandler}. A value of + * 0 for either parameter indicates that there should be no limitation. This allows you to monitor the + * incoming and outgoing traffic without shaping.

+ * + *

To activate or deactivate the statistics, you can adjust the delay to a low (suggested not less than 200ms + * for efficiency reasons) or a high value (let say 24H in millisecond is huge enough to not get the problem) + * or even using 0 which means no computation will be done.

+ * + *

If you want to do anything with these statistics, just override the doAccounting method.
+ * This interval can be changed either from the method configure in {@link AbstractTrafficShapingHandler} + * or directly using the method configure of {@link TrafficCounter}.

+ * + *

Note that a new {@link ChannelTrafficShapingHandler} must be created for each new channel, + * but only one {@link GlobalTrafficShapingHandler} must be created for all channels.

+ * + *

Note also that you can create different GlobalTrafficShapingHandler if you want to separate classes of + * channels (for instance either from business point of view or from bind address point of view).

+ * + * @apiviz.exclude ^java\.lang\. + */ +package io.netty.handler.traffic; +