diff --git a/src/main/java/org/jboss/netty/handler/trafficshaping/PerformanceCounter.java b/src/main/java/org/jboss/netty/handler/trafficshaping/PerformanceCounter.java
index 6e9ab40996..656478b681 100644
--- a/src/main/java/org/jboss/netty/handler/trafficshaping/PerformanceCounter.java
+++ b/src/main/java/org/jboss/netty/handler/trafficshaping/PerformanceCounter.java
@@ -1,540 +1,539 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Copyright 2009, Red Hat Middleware LLC, and individual contributors
- * by the @author tags. See the COPYRIGHT.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.netty.handler.trafficshaping;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.logging.InternalLogger;
-import org.jboss.netty.logging.InternalLoggerFactory;
-
-/**
- * @author The Netty Project (netty-dev@lists.jboss.org)
- * @author Trustin Lee (tlee@redhat.com)
- * @author Frederic Bregier (fredbregier@free.fr)
- * @version $Rev$, $Date$
- *
- * PerformanceCounter is associated with {@link TrafficShapingHandler} and
- * should be created through a {@link PerformanceCounterFactory}.
- *
- * A PerformanceCounter can limit the traffic or not, globally or per channel,
- * and always compute statistics on read and written bytes at the specified
- * interval.
- *
- */
-public class PerformanceCounter implements Runnable {
- /**
- * Internal logger
- */
- private static InternalLogger logger = InternalLoggerFactory
- .getInstance(PerformanceCounter.class);
-
- /**
- * Current writing bytes
- */
- private final AtomicLong currentWritingBytes = new AtomicLong(0);
-
- /**
- * Current reading bytes
- */
- private final AtomicLong currentReadingBytes = new AtomicLong(0);
-
- /**
- * Last writing bandwidth
- */
- private long lastWritingBandwidth = 0;
-
- /**
- * Last reading bandwidth
- */
- private long lastReadingBandwidth = 0;
-
- /**
- * Last Time Check taken
- */
- private final AtomicLong lastTime = new AtomicLong(0);
-
- /**
- * Last written bytes number
- */
- private long lastWritingBytes = 0;
-
- /**
- * Last read bytes number
- */
- private long lastReadingBytes = 0;
-
- /**
- * Current Limit in B/s to apply to write
- */
- private long limitWrite = PerformanceCounterFactory.NO_LIMIT;
-
- /**
- * Current Limit in B/s to apply to read
- */
- private long limitRead = PerformanceCounterFactory.NO_LIMIT;
-
- /**
- * Delay between two capture
- */
- private long delay = PerformanceCounterFactory.DEFAULT_DELAY;
-
- // default 1 s
-
- /**
- * Name of this Monitor
- */
- private final String name;
-
- /**
- * Is this monitor for a channel monitoring or for global monitoring
- */
- private boolean isPerChannel = false;
-
- /**
- * Associated monitoredChannel if any (global MUST NOT have any)
- */
- protected Channel monitoredChannel = null;
-
- /**
- * The associated PerformanceCounterFactory
- */
- private PerformanceCounterFactory factory = null;
-
- /**
- * Default ExecutorService
- */
- private ExecutorService executorService = null;
-
- /**
- * Thread that will host this monitor
- */
- private Future> monitorFuture = null;
-
- /**
- * Start the monitoring process
- *
- */
- public void startMonitoring() {
- synchronized (this.lastTime) {
- if (this.monitorFuture != null) {
- return;
- }
- this.lastTime.set(System.currentTimeMillis());
- if (this.delay > 0) {
- this.monitorFuture = this.executorService.submit(this);
- }
- }
- }
-
- /**
- * Stop the monitoring process
- *
- */
- public void stopMonitoring() {
- synchronized (this.lastTime) {
- if (this.monitorFuture == null) {
- return;
- }
- this.monitorFuture.cancel(true);
- this.monitorFuture = null;
- resetAccounting(System.currentTimeMillis());
- if (this.factory != null) {
- this.factory.accounting(this);
- }
- setMonitoredChannel(null);
- }
- }
-
- /**
- * Default run
- */
- public void run() {
- try {
- for (;;) {
- if (this.delay > 0) {
- Thread.sleep(this.delay);
- } else {
- // Delay goes to PerformanceCounterFactory.NO_STAT, so exit
- return;
- }
- long endTime = System.currentTimeMillis();
- resetAccounting(endTime);
- if (this.factory != null) {
- this.factory.accounting(this);
- }
- }
- } catch (InterruptedException e) {
- // End of computations
- }
- }
-
- /**
- * Set the accounting on Read and Write
- *
- * @param newLastTime
- */
- private void resetAccounting(long newLastTime) {
- synchronized (this.lastTime) {
- long interval = newLastTime - this.lastTime.getAndSet(newLastTime);
- if (interval == 0) {
- // nothing to do
- return;
- }
- this.lastReadingBytes = this.currentReadingBytes.getAndSet(0);
- this.lastWritingBytes = this.currentWritingBytes.getAndSet(0);
- this.lastReadingBandwidth = this.lastReadingBytes / interval * 1000;
- // nb byte / delay in ms * 1000 (1s)
- this.lastWritingBandwidth = this.lastWritingBytes / interval * 1000;
- // nb byte / delay in ms * 1000 (1s)
- }
- }
-
- /**
- * Constructor with the executorService to use, the channel if any, its
- * name, the limits in Byte/s (not Bit/s) and the delay between two
- * computations in ms
- *
- * @param factory
- * the associated PerformanceCounterFactory
- * @param executorService
- * Should be a CachedThreadPool for efficiency
- * @param channel
- * Not null means this monitors will be for this channel only,
- * else it will be for global monitoring. Channel can be set
- * later on therefore changing its behavior from global to per
- * channel
- * @param name
- * the name given to this monitor
- * @param writeLimit
- * the write limit in Byte/s
- * @param readLimit
- * the read limit in Byte/s
- * @param delay
- * the delay in ms between two computations
- */
- public PerformanceCounter(PerformanceCounterFactory factory,
- ExecutorService executorService, Channel channel, String name,
- long writeLimit, long readLimit, long delay) {
- this.factory = factory;
- this.executorService = executorService;
- this.name = name;
- this.changeConfiguration(channel, writeLimit, readLimit, delay);
- }
-
- /**
- * Set the Session monitoredChannel (not for Global Monitor)
- *
- * @param channel
- * Not null means this monitors will be for this channel only,
- * else it will be for global monitoring. Channel can be set
- * later on therefore changing its behavior from global to per
- * channel
- */
- public void setMonitoredChannel(Channel channel) {
- if (channel != null) {
- this.monitoredChannel = channel;
- this.isPerChannel = true;
- } else {
- this.isPerChannel = false;
- this.monitoredChannel = null;
- }
- }
-
- /**
- * Specifies limits in Byte/s (not Bit/s) but do not changed the delay
- *
- * @param channel
- * Not null means this monitors will be for this channel only,
- * else it will be for global monitoring. Channel can be set
- * later on therefore changing its behavior from global to per
- * channel
- * @param writeLimit
- * @param readLimit
- */
- public void changeConfiguration(Channel channel, long writeLimit,
- long readLimit) {
- this.limitWrite = writeLimit;
- this.limitRead = readLimit;
- setMonitoredChannel(channel);
- }
-
- /**
- * Specifies limits in Byte/s (not Bit/s) and the specified delay between
- * two computations in ms
- *
- * @param channel
- * Not null means this monitors will be for this channel only,
- * else it will be for global monitoring. Channel can be set
- * later on therefore changing its behavior from global to per
- * channel
- * @param writeLimit
- * @param readLimit
- * @param delayToSet
- */
- public void changeConfiguration(Channel channel, long writeLimit,
- long readLimit, long delayToSet) {
- if (this.delay != delayToSet) {
- this.delay = delayToSet;
- if (this.monitorFuture == null) {
- this.changeConfiguration(channel, writeLimit, readLimit);
- return;
- }
- stopMonitoring();
- if (this.delay > 0) {
- startMonitoring();
- } else {
- // No more active monitoring
- this.lastTime.set(System.currentTimeMillis());
- }
- }
- this.changeConfiguration(channel, writeLimit, readLimit);
- }
-
- /**
- *
- * @return the time that should be necessary to wait to respect limit. Can
- * be negative time
- */
- private long getReadTimeToWait() {
- synchronized (this.lastTime) {
- long interval = System.currentTimeMillis() - this.lastTime.get();
- if (interval == 0) {
- // Time is too short, so just lets continue
- return 0;
- }
- long wait = this.currentReadingBytes.get() * 1000 / this.limitRead -
- interval;
- return wait;
- }
- }
-
- /**
- *
- * @return the time that should be necessary to wait to respect limit. Can
- * be negative time
- */
- private long getWriteTimeToWait() {
- synchronized (this.lastTime) {
- long interval = System.currentTimeMillis() - this.lastTime.get();
- if (interval == 0) {
- // Time is too short, so just lets continue
- return 0;
- }
- long wait = this.currentWritingBytes.get() * 1000 /
- this.limitWrite - interval;
- return wait;
- }
- }
-
- /**
- * Class to implement setReadable at fix time
- *
- */
- private class ReopenRead implements Runnable {
- /**
- * Associated ChannelHandlerContext
- */
- private ChannelHandlerContext ctx = null;
-
- /**
- * Monitor
- */
- private PerformanceCounter monitor = null;
-
- /**
- * Time to wait before clearing the channel
- */
- private long timeToWait = 0;
-
- /**
- * @param monitor
- * @param ctx
- * the associated channelHandlerContext
- * @param timeToWait
- */
- public ReopenRead(ChannelHandlerContext ctx,
- PerformanceCounter monitor, long timeToWait) {
- this.ctx = ctx;
- this.monitor = monitor;
- this.timeToWait = timeToWait;
- }
-
- /**
- * Truly run the waken up of the channel
- */
- public void run() {
- try {
- Thread.sleep(this.timeToWait);
- } catch (InterruptedException e) {
- // interruption so exit
- return;
- }
- // logger.info("WAKEUP!");
- if ((this.monitor != null) &&
- (this.monitor.monitoredChannel != null) &&
- this.monitor.monitoredChannel.isConnected()) {
- // logger.warn(" setReadable TRUE: "+timeToWait);
- if (this.ctx.getHandler() instanceof TrafficShapingHandler) {
- // readSuspended = false;
- this.ctx.setAttachment(null);
- }
- this.monitor.monitoredChannel.setReadable(true);
- }
- }
- }
-
- /**
- * If Read is in excess, it will block the read on channel or block until it
- * will be ready again.
- *
- * @param ctx
- * the associated channelHandlerContext
- * @param recv
- * the size in bytes to read
- * @throws InterruptedException
- */
- public void setReceivedBytes(ChannelHandlerContext ctx, long recv)
- throws InterruptedException {
- this.currentReadingBytes.addAndGet(recv);
- if (this.limitRead == PerformanceCounterFactory.NO_LIMIT) {
- // no action
- return;
- }
- if (this.isPerChannel && (this.monitoredChannel != null) &&
- (!this.monitoredChannel.isConnected())) {
- // no action can be taken since setReadable will throw a
- // NotYetConnected
- return;
- }
- // compute the number of ms to wait before reopening the channel
- long wait = getReadTimeToWait();
- if (wait > 20) { // At least 20ms seems a minimal time in order to
- // try to limit the traffic
- if (this.isPerChannel && (this.monitoredChannel != null) &&
- this.monitoredChannel.isConnected()) {
- // Channel version
- if (this.executorService == null) {
- // Sleep since no executor
- Thread.sleep(wait);
- return;
- }
- if (ctx.getAttachment() == null) {
- if (ctx.getHandler() instanceof TrafficShapingHandler) {
- // readSuspended = true;
- ctx.setAttachment(Boolean.TRUE);
- }
- this.monitoredChannel.setReadable(false);
- // logger.info("Read will wakeup after "+wait+" ms "+this);
- this.executorService
- .submit(new ReopenRead(ctx, this, wait));
- } else {
- // should be waiting: but can occurs sometime so as a FIX
- logger.info("Read sleep ok but should not be here");
- Thread.sleep(wait);
- }
- } else {
- // Global version
- // logger.info("Read sleep "+wait+" ms for "+this);
- Thread.sleep(wait);
- }
- }
- }
-
- /**
- * If Write is in excess, it will block the write operation until it will be
- * ready again.
- *
- * @param write
- * the size in bytes to write
- * @throws InterruptedException
- */
- public void setToWriteBytes(long write) throws InterruptedException {
- this.currentWritingBytes.addAndGet(write);
- if (this.limitWrite == PerformanceCounterFactory.NO_LIMIT) {
- return;
- }
- // compute the number of ms to wait before continue with the channel
- long wait = getWriteTimeToWait();
- if (wait > 20) {
- // Global or Session
- Thread.sleep(wait);
- }
- }
-
- /**
- *
- * @return the current delay between two computations of performance counter
- * in ms
- */
- public long getDelay() {
- return this.delay;
- }
-
- /**
- *
- * @return the current Read bandwidth in byte/s
- */
- public long getLastReadBandwidth() {
- return this.lastReadingBandwidth;
- }
-
- /**
- *
- * @return the current Write bandwidth in byte/s
- */
- public long getLastWriteBandwidth() {
- return this.lastWritingBandwidth;
- }
-
- /**
- *
- * @return the current number of byte read since last delay
- */
- public long getLastBytesRead() {
- return this.lastReadingBytes;
- }
-
- /**
- *
- * @return the current number of byte written since last delay
- */
- public long getLastBytesWrite() {
- return this.lastWritingBytes;
- }
-
- /**
- * String information
- */
- @Override
- public String toString() {
- return "Monitor " + this.name + " Current Speed Read: " +
- (this.lastReadingBandwidth >> 10) + " KB/s, Write: " +
- (this.lastWritingBandwidth >> 10) + " KB/s Current Read: " +
- (this.currentReadingBytes.get() >> 10) + " KB Current Write: " +
- (this.currentWritingBytes.get() >> 10) + " KB";
- }
-}
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * by the @author tags. See the COPYRIGHT.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.netty.handler.trafficshaping;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.logging.InternalLogger;
+import org.jboss.netty.logging.InternalLoggerFactory;
+
+/**
+ * @author The Netty Project (netty-dev@lists.jboss.org)
+ * @author Frederic Bregier (fredbregier@free.fr)
+ * @version $Rev$, $Date$
+ *
+ * PerformanceCounter is associated with {@link TrafficShapingHandler} and
+ * should be created through a {@link PerformanceCounterFactory}.
+ *
+ * A PerformanceCounter can limit the traffic or not, globally or per channel,
+ * and always compute statistics on read and written bytes at the specified
+ * interval.
+ *
+ */
+public class PerformanceCounter implements Runnable {
+ /**
+ * Internal logger
+ */
+ private static InternalLogger logger = InternalLoggerFactory
+ .getInstance(PerformanceCounter.class);
+
+ /**
+ * Current writing bytes
+ */
+ private final AtomicLong currentWritingBytes = new AtomicLong(0);
+
+ /**
+ * Current reading bytes
+ */
+ private final AtomicLong currentReadingBytes = new AtomicLong(0);
+
+ /**
+ * Last writing bandwidth
+ */
+ private long lastWritingBandwidth = 0;
+
+ /**
+ * Last reading bandwidth
+ */
+ private long lastReadingBandwidth = 0;
+
+ /**
+ * Last Time Check taken
+ */
+ private final AtomicLong lastTime = new AtomicLong(0);
+
+ /**
+ * Last written bytes number
+ */
+ private long lastWritingBytes = 0;
+
+ /**
+ * Last read bytes number
+ */
+ private long lastReadingBytes = 0;
+
+ /**
+ * Current Limit in B/s to apply to write
+ */
+ private long limitWrite = PerformanceCounterFactory.NO_LIMIT;
+
+ /**
+ * Current Limit in B/s to apply to read
+ */
+ private long limitRead = PerformanceCounterFactory.NO_LIMIT;
+
+ /**
+ * Delay between two capture
+ */
+ private long delay = PerformanceCounterFactory.DEFAULT_DELAY;
+
+ // default 1 s
+
+ /**
+ * Name of this Monitor
+ */
+ private final String name;
+
+ /**
+ * Is this monitor for a channel monitoring or for global monitoring
+ */
+ private boolean isPerChannel = false;
+
+ /**
+ * Associated monitoredChannel if any (global MUST NOT have any)
+ */
+ protected Channel monitoredChannel = null;
+
+ /**
+ * The associated PerformanceCounterFactory
+ */
+ private PerformanceCounterFactory factory = null;
+
+ /**
+ * Default ExecutorService
+ */
+ private ExecutorService executorService = null;
+
+ /**
+ * Thread that will host this monitor
+ */
+ private Future> monitorFuture = null;
+
+ /**
+ * Start the monitoring process
+ *
+ */
+ public void startMonitoring() {
+ synchronized (this.lastTime) {
+ if (this.monitorFuture != null) {
+ return;
+ }
+ this.lastTime.set(System.currentTimeMillis());
+ if (this.delay > 0) {
+ this.monitorFuture = this.executorService.submit(this);
+ }
+ }
+ }
+
+ /**
+ * Stop the monitoring process
+ *
+ */
+ public void stopMonitoring() {
+ synchronized (this.lastTime) {
+ if (this.monitorFuture == null) {
+ return;
+ }
+ this.monitorFuture.cancel(true);
+ this.monitorFuture = null;
+ resetAccounting(System.currentTimeMillis());
+ if (this.factory != null) {
+ this.factory.accounting(this);
+ }
+ setMonitoredChannel(null);
+ }
+ }
+
+ /**
+ * Default run
+ */
+ public void run() {
+ try {
+ for (;;) {
+ if (this.delay > 0) {
+ Thread.sleep(this.delay);
+ } else {
+ // Delay goes to PerformanceCounterFactory.NO_STAT, so exit
+ return;
+ }
+ long endTime = System.currentTimeMillis();
+ resetAccounting(endTime);
+ if (this.factory != null) {
+ this.factory.accounting(this);
+ }
+ }
+ } catch (InterruptedException e) {
+ // End of computations
+ }
+ }
+
+ /**
+ * Set the accounting on Read and Write
+ *
+ * @param newLastTime
+ */
+ private void resetAccounting(long newLastTime) {
+ synchronized (this.lastTime) {
+ long interval = newLastTime - this.lastTime.getAndSet(newLastTime);
+ if (interval == 0) {
+ // nothing to do
+ return;
+ }
+ this.lastReadingBytes = this.currentReadingBytes.getAndSet(0);
+ this.lastWritingBytes = this.currentWritingBytes.getAndSet(0);
+ this.lastReadingBandwidth = this.lastReadingBytes / interval * 1000;
+ // nb byte / delay in ms * 1000 (1s)
+ this.lastWritingBandwidth = this.lastWritingBytes / interval * 1000;
+ // nb byte / delay in ms * 1000 (1s)
+ }
+ }
+
+ /**
+ * Constructor with the executorService to use, the channel if any, its
+ * name, the limits in Byte/s (not Bit/s) and the delay between two
+ * computations in ms
+ *
+ * @param factory
+ * the associated PerformanceCounterFactory
+ * @param executorService
+ * Should be a CachedThreadPool for efficiency
+ * @param channel
+ * Not null means this monitors will be for this channel only,
+ * else it will be for global monitoring. Channel can be set
+ * later on therefore changing its behavior from global to per
+ * channel
+ * @param name
+ * the name given to this monitor
+ * @param writeLimit
+ * the write limit in Byte/s
+ * @param readLimit
+ * the read limit in Byte/s
+ * @param delay
+ * the delay in ms between two computations
+ */
+ public PerformanceCounter(PerformanceCounterFactory factory,
+ ExecutorService executorService, Channel channel, String name,
+ long writeLimit, long readLimit, long delay) {
+ this.factory = factory;
+ this.executorService = executorService;
+ this.name = name;
+ this.changeConfiguration(channel, writeLimit, readLimit, delay);
+ }
+
+ /**
+ * Set the Session monitoredChannel (not for Global Monitor)
+ *
+ * @param channel
+ * Not null means this monitors will be for this channel only,
+ * else it will be for global monitoring. Channel can be set
+ * later on therefore changing its behavior from global to per
+ * channel
+ */
+ public void setMonitoredChannel(Channel channel) {
+ if (channel != null) {
+ this.monitoredChannel = channel;
+ this.isPerChannel = true;
+ } else {
+ this.isPerChannel = false;
+ this.monitoredChannel = null;
+ }
+ }
+
+ /**
+ * Specifies limits in Byte/s (not Bit/s) but do not changed the delay
+ *
+ * @param channel
+ * Not null means this monitors will be for this channel only,
+ * else it will be for global monitoring. Channel can be set
+ * later on therefore changing its behavior from global to per
+ * channel
+ * @param writeLimit
+ * @param readLimit
+ */
+ public void changeConfiguration(Channel channel, long writeLimit,
+ long readLimit) {
+ this.limitWrite = writeLimit;
+ this.limitRead = readLimit;
+ setMonitoredChannel(channel);
+ }
+
+ /**
+ * Specifies limits in Byte/s (not Bit/s) and the specified delay between
+ * two computations in ms
+ *
+ * @param channel
+ * Not null means this monitors will be for this channel only,
+ * else it will be for global monitoring. Channel can be set
+ * later on therefore changing its behavior from global to per
+ * channel
+ * @param writeLimit
+ * @param readLimit
+ * @param delayToSet
+ */
+ public void changeConfiguration(Channel channel, long writeLimit,
+ long readLimit, long delayToSet) {
+ if (this.delay != delayToSet) {
+ this.delay = delayToSet;
+ if (this.monitorFuture == null) {
+ this.changeConfiguration(channel, writeLimit, readLimit);
+ return;
+ }
+ stopMonitoring();
+ if (this.delay > 0) {
+ startMonitoring();
+ } else {
+ // No more active monitoring
+ this.lastTime.set(System.currentTimeMillis());
+ }
+ }
+ this.changeConfiguration(channel, writeLimit, readLimit);
+ }
+
+ /**
+ *
+ * @return the time that should be necessary to wait to respect limit. Can
+ * be negative time
+ */
+ private long getReadTimeToWait() {
+ synchronized (this.lastTime) {
+ long interval = System.currentTimeMillis() - this.lastTime.get();
+ if (interval == 0) {
+ // Time is too short, so just lets continue
+ return 0;
+ }
+ long wait = this.currentReadingBytes.get() * 1000 / this.limitRead -
+ interval;
+ return wait;
+ }
+ }
+
+ /**
+ *
+ * @return the time that should be necessary to wait to respect limit. Can
+ * be negative time
+ */
+ private long getWriteTimeToWait() {
+ synchronized (this.lastTime) {
+ long interval = System.currentTimeMillis() - this.lastTime.get();
+ if (interval == 0) {
+ // Time is too short, so just lets continue
+ return 0;
+ }
+ long wait = this.currentWritingBytes.get() * 1000 /
+ this.limitWrite - interval;
+ return wait;
+ }
+ }
+
+ /**
+ * Class to implement setReadable at fix time
+ *
+ */
+ private class ReopenRead implements Runnable {
+ /**
+ * Associated ChannelHandlerContext
+ */
+ private ChannelHandlerContext ctx = null;
+
+ /**
+ * Monitor
+ */
+ private PerformanceCounter monitor = null;
+
+ /**
+ * Time to wait before clearing the channel
+ */
+ private long timeToWait = 0;
+
+ /**
+ * @param monitor
+ * @param ctx
+ * the associated channelHandlerContext
+ * @param timeToWait
+ */
+ public ReopenRead(ChannelHandlerContext ctx,
+ PerformanceCounter monitor, long timeToWait) {
+ this.ctx = ctx;
+ this.monitor = monitor;
+ this.timeToWait = timeToWait;
+ }
+
+ /**
+ * Truly run the waken up of the channel
+ */
+ public void run() {
+ try {
+ Thread.sleep(this.timeToWait);
+ } catch (InterruptedException e) {
+ // interruption so exit
+ return;
+ }
+ // logger.info("WAKEUP!");
+ if ((this.monitor != null) &&
+ (this.monitor.monitoredChannel != null) &&
+ this.monitor.monitoredChannel.isConnected()) {
+ // logger.warn(" setReadable TRUE: "+timeToWait);
+ if (this.ctx.getHandler() instanceof TrafficShapingHandler) {
+ // readSuspended = false;
+ this.ctx.setAttachment(null);
+ }
+ this.monitor.monitoredChannel.setReadable(true);
+ }
+ }
+ }
+
+ /**
+ * If Read is in excess, it will block the read on channel or block until it
+ * will be ready again.
+ *
+ * @param ctx
+ * the associated channelHandlerContext
+ * @param recv
+ * the size in bytes to read
+ * @throws InterruptedException
+ */
+ public void setReceivedBytes(ChannelHandlerContext ctx, long recv)
+ throws InterruptedException {
+ this.currentReadingBytes.addAndGet(recv);
+ if (this.limitRead == PerformanceCounterFactory.NO_LIMIT) {
+ // no action
+ return;
+ }
+ if (this.isPerChannel && (this.monitoredChannel != null) &&
+ (!this.monitoredChannel.isConnected())) {
+ // no action can be taken since setReadable will throw a
+ // NotYetConnected
+ return;
+ }
+ // compute the number of ms to wait before reopening the channel
+ long wait = getReadTimeToWait();
+ if (wait > 20) { // At least 20ms seems a minimal time in order to
+ // try to limit the traffic
+ if (this.isPerChannel && (this.monitoredChannel != null) &&
+ this.monitoredChannel.isConnected()) {
+ // Channel version
+ if (this.executorService == null) {
+ // Sleep since no executor
+ Thread.sleep(wait);
+ return;
+ }
+ if (ctx.getAttachment() == null) {
+ if (ctx.getHandler() instanceof TrafficShapingHandler) {
+ // readSuspended = true;
+ ctx.setAttachment(Boolean.TRUE);
+ }
+ this.monitoredChannel.setReadable(false);
+ // logger.info("Read will wakeup after "+wait+" ms "+this);
+ this.executorService
+ .submit(new ReopenRead(ctx, this, wait));
+ } else {
+ // should be waiting: but can occurs sometime so as a FIX
+ logger.info("Read sleep ok but should not be here");
+ Thread.sleep(wait);
+ }
+ } else {
+ // Global version
+ // logger.info("Read sleep "+wait+" ms for "+this);
+ Thread.sleep(wait);
+ }
+ }
+ }
+
+ /**
+ * If Write is in excess, it will block the write operation until it will be
+ * ready again.
+ *
+ * @param write
+ * the size in bytes to write
+ * @throws InterruptedException
+ */
+ public void setToWriteBytes(long write) throws InterruptedException {
+ this.currentWritingBytes.addAndGet(write);
+ if (this.limitWrite == PerformanceCounterFactory.NO_LIMIT) {
+ return;
+ }
+ // compute the number of ms to wait before continue with the channel
+ long wait = getWriteTimeToWait();
+ if (wait > 20) {
+ // Global or Session
+ Thread.sleep(wait);
+ }
+ }
+
+ /**
+ *
+ * @return the current delay between two computations of performance counter
+ * in ms
+ */
+ public long getDelay() {
+ return this.delay;
+ }
+
+ /**
+ *
+ * @return the current Read bandwidth in byte/s
+ */
+ public long getLastReadBandwidth() {
+ return this.lastReadingBandwidth;
+ }
+
+ /**
+ *
+ * @return the current Write bandwidth in byte/s
+ */
+ public long getLastWriteBandwidth() {
+ return this.lastWritingBandwidth;
+ }
+
+ /**
+ *
+ * @return the current number of byte read since last delay
+ */
+ public long getLastBytesRead() {
+ return this.lastReadingBytes;
+ }
+
+ /**
+ *
+ * @return the current number of byte written since last delay
+ */
+ public long getLastBytesWrite() {
+ return this.lastWritingBytes;
+ }
+
+ /**
+ * String information
+ */
+ @Override
+ public String toString() {
+ return "Monitor " + this.name + " Current Speed Read: " +
+ (this.lastReadingBandwidth >> 10) + " KB/s, Write: " +
+ (this.lastWritingBandwidth >> 10) + " KB/s Current Read: " +
+ (this.currentReadingBytes.get() >> 10) + " KB Current Write: " +
+ (this.currentWritingBytes.get() >> 10) + " KB";
+ }
+}
diff --git a/src/main/java/org/jboss/netty/handler/trafficshaping/PerformanceCounterFactory.java b/src/main/java/org/jboss/netty/handler/trafficshaping/PerformanceCounterFactory.java
index f3139cf367..a34c6cd121 100644
--- a/src/main/java/org/jboss/netty/handler/trafficshaping/PerformanceCounterFactory.java
+++ b/src/main/java/org/jboss/netty/handler/trafficshaping/PerformanceCounterFactory.java
@@ -1,486 +1,485 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Copyright 2009, Red Hat Middleware LLC, and individual contributors
- * by the @author tags. See the COPYRIGHT.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.netty.handler.trafficshaping;
-
-import java.util.concurrent.ExecutorService;
-
-import org.jboss.netty.channel.Channel;
-
-/**
- * @author The Netty Project (netty-dev@lists.jboss.org)
- * @author Trustin Lee (tlee@redhat.com)
- * @author Frederic Bregier (fredbregier@free.fr)
- * @version $Rev$, $Date$
- *
- * The {@link PerformanceCounterFactory} is a Factory for
- * {@link PerformanceCounter}. It stores the necessary information to enable
- * dynamic creation of {@link PerformanceCounter} inside the
- * {@link TrafficShapingHandler}.
- *
- *
- */
-public abstract class PerformanceCounterFactory {
- /**
- * No limit
- */
- public static long NO_LIMIT = -1;
-
- /**
- * No statistics (if channel or global PerformanceCounter is a very long
- * time living, it can produce a excess of capacity, i.e. 2^64 bytes so 17
- * billions of billions of bytes).
- */
- public static long NO_STAT = -1;
-
- /**
- * Default delay between two checks: 1s
- */
- public static long DEFAULT_DELAY = 1000;
-
- /**
- * ExecutorService to associated to any PerformanceCounter
- */
- private ExecutorService executorService = null;
-
- /**
- * Limit in B/s to apply to write for all channel PerformanceCounter
- */
- private long channelLimitWrite = NO_LIMIT;
-
- /**
- * Limit in B/s to apply to read for all channel PerformanceCounter
- */
- private long channelLimitRead = NO_LIMIT;
-
- /**
- * Delay between two performance snapshots for channel
- */
- private long channelDelay = DEFAULT_DELAY; // default 1 s
-
- /**
- * Will the PerformanceCounter for Channel be active
- */
- private boolean channelActive = true;
-
- /**
- * Limit in B/s to apply to write for the global PerformanceCounter
- */
- private long globalLimitWrite = NO_LIMIT;
-
- /**
- * Limit in B/s to apply to read for the global PerformanceCounter
- */
- private long globalLimitRead = NO_LIMIT;
-
- /**
- * Delay between two performance snapshots for global
- */
- private long globalDelay = DEFAULT_DELAY; // default 1 s
-
- /**
- * Will the PerformanceCounter for Global be active
- */
- private boolean globalActive = true;
-
- /**
- * Global Monitor
- */
- private PerformanceCounter globalPerformanceMonitor = null;
-
- /**
- * Called each time the accounting is computed for the PerformanceCounters.
- * This method could be used for instance to implement real time accounting.
- *
- * @param counter
- * the PerformanceCounter that computes its performance
- */
- protected abstract void accounting(PerformanceCounter counter);
-
- /**
- *
- * @param newexecutorService
- * @param newChannelActive
- * @param newChannelLimitWrite
- * @param newChannelLimitRead
- * @param newChannelDelay
- * @param newGlobalActive
- * @param newGlobalLimitWrite
- * @param newGlobalLimitRead
- * @param newGlobalDelay
- */
- private void init(ExecutorService newexecutorService,
- boolean newChannelActive, long newChannelLimitWrite,
- long newChannelLimitRead, long newChannelDelay,
- boolean newGlobalActive, long newGlobalLimitWrite,
- long newGlobalLimitRead, long newGlobalDelay) {
- this.executorService = newexecutorService;
- this.channelActive = newChannelActive;
- this.channelLimitWrite = newChannelLimitWrite;
- this.channelLimitRead = newChannelLimitRead;
- this.channelDelay = newChannelDelay;
- this.globalActive = newGlobalActive;
- this.globalLimitWrite = newGlobalLimitWrite;
- this.globalLimitRead = newGlobalLimitRead;
- this.globalDelay = newGlobalDelay;
- }
-
- /**
- * Full constructor
- *
- * @param executorService
- * created for instance like Executors.newCachedThreadPool
- * @param channelActive
- * True if each channel will have a PerformanceCounter
- * @param channelLimitWrite
- * NO_LIMIT or a limit in bytes/s
- * @param channelLimitRead
- * NO_LIMIT or a limit in bytes/s
- * @param channelDelay
- * The delay between two computations of performances for
- * channels or NO_STAT if no stats are to be computed
- * @param globalActive
- * True if global context will have one unique PerformanceCounter
- * @param globalLimitWrite
- * NO_LIMIT or a limit in bytes/s
- * @param globalLimitRead
- * NO_LIMIT or a limit in bytes/s
- * @param globalDelay
- * The delay between two computations of performances for global
- * context or NO_STAT if no stats are to be computed
- */
- public PerformanceCounterFactory(ExecutorService executorService,
- boolean channelActive, long channelLimitWrite,
- long channelLimitRead, long channelDelay, boolean globalActive,
- long globalLimitWrite, long globalLimitRead, long globalDelay) {
- init(executorService, channelActive, channelLimitWrite,
- channelLimitRead, channelDelay, globalActive, globalLimitWrite,
- globalLimitRead, globalDelay);
- }
-
- /**
- * Constructor using default Delay
- *
- * @param executorService
- * created for instance like Executors.newCachedThreadPool
- * @param channelActive
- * True if each channel will have a PerformanceCounter
- * @param channelLimitWrite
- * NO_LIMIT or a limit in bytes/s
- * @param channelLimitRead
- * NO_LIMIT or a limit in bytes/s
- * @param globalActive
- * True if global context will have one unique PerformanceCounter
- * @param globalLimitWrite
- * NO_LIMIT or a limit in bytes/s
- * @param globalLimitRead
- * NO_LIMIT or a limit in bytes/s
- */
- public PerformanceCounterFactory(ExecutorService executorService,
- boolean channelActive, long channelLimitWrite,
- long channelLimitRead, boolean globalActive, long globalLimitWrite,
- long globalLimitRead) {
- init(executorService, channelActive, channelLimitWrite,
- channelLimitRead, DEFAULT_DELAY, globalActive,
- globalLimitWrite, globalLimitRead, DEFAULT_DELAY);
- }
-
- /**
- * Constructor using NO_LIMIT and default delay for channels
- *
- * @param executorService
- * created for instance like Executors.newCachedThreadPool
- * @param channelActive
- * True if each channel will have a PerformanceCounter
- * @param globalActive
- * True if global context will have one unique PerformanceCounter
- * @param globalLimitWrite
- * NO_LIMIT or a limit in bytes/s
- * @param globalLimitRead
- * NO_LIMIT or a limit in bytes/s
- * @param globalDelay
- * The delay between two computations of performances for global
- * context or NO_STAT if no stats are to be computed
- */
- public PerformanceCounterFactory(ExecutorService executorService,
- boolean channelActive, boolean globalActive, long globalLimitWrite,
- long globalLimitRead, long globalDelay) {
- init(executorService, channelActive, NO_LIMIT, NO_LIMIT,
- DEFAULT_DELAY, globalActive, globalLimitWrite, globalLimitRead,
- globalDelay);
- }
-
- /**
- * Constructor using NO_LIMIT for channels and default delay for all
- *
- * @param executorService
- * created for instance like Executors.newCachedThreadPool
- * @param channelActive
- * True if each channel will have a PerformanceCounter
- * @param globalActive
- * True if global context will have one unique PerformanceCounter
- * @param globalLimitWrite
- * NO_LIMIT or a limit in bytes/s
- * @param globalLimitRead
- * NO_LIMIT or a limit in bytes/s
- */
- public PerformanceCounterFactory(ExecutorService executorService,
- boolean channelActive, boolean globalActive, long globalLimitWrite,
- long globalLimitRead) {
- init(executorService, channelActive, NO_LIMIT, NO_LIMIT,
- DEFAULT_DELAY, globalActive, globalLimitWrite, globalLimitRead,
- DEFAULT_DELAY);
- }
-
- /**
- * Constructor using NO_LIMIT and default delay for all
- *
- * @param executorService
- * created for instance like Executors.newCachedThreadPool
- * @param channelActive
- * True if each channel will have a PerformanceCounter
- * @param globalActive
- * True if global context will have one unique PerformanceCounter
- */
- public PerformanceCounterFactory(ExecutorService executorService,
- boolean channelActive, boolean globalActive) {
- init(executorService, channelActive, NO_LIMIT, NO_LIMIT,
- DEFAULT_DELAY, globalActive, NO_LIMIT, NO_LIMIT, DEFAULT_DELAY);
- }
-
- /**
- * Enable to change the active status of PerformanceCounter on Channels (for
- * new one only)
- *
- * @param active
- */
- public void setChannelActive(boolean active) {
- this.channelActive = active;
- }
-
- /**
- * Enable to change the active status of PerformanceCounter on Global (stop
- * or start if necessary)
- *
- * @param active
- */
- public void setGlobalActive(boolean active) {
- if (this.globalActive) {
- if (!active) {
- stopGlobalPerformanceCounter();
- }
- }
- this.globalActive = active;
- getGlobalPerformanceCounter();
- }
-
- /**
- * Change the underlying limitations. Only Global PerformanceCounter (if
- * any) is dynamically changed, but Channels PerformanceCounters are not
- * changed, only new created ones.
- *
- * @param newchannelLimitWrite
- * @param newchannelLimitRead
- * @param newchanneldelay
- * @param newglobalLimitWrite
- * @param newglobalLimitRead
- * @param newglobaldelay
- */
- public void changeConfiguration(long newchannelLimitWrite,
- long newchannelLimitRead, long newchanneldelay,
- long newglobalLimitWrite, long newglobalLimitRead,
- long newglobaldelay) {
- this.channelLimitWrite = newchannelLimitWrite;
- this.channelLimitRead = newchannelLimitRead;
- this.channelDelay = newchanneldelay;
- this.globalLimitWrite = newglobalLimitWrite;
- this.globalLimitRead = newglobalLimitRead;
- this.globalDelay = newglobaldelay;
- if (this.globalPerformanceMonitor != null) {
- this.globalPerformanceMonitor.changeConfiguration(null,
- newglobalLimitWrite, newglobalLimitRead, newglobaldelay);
- }
- }
-
- /**
- * @return the Global PerformanceCounter or null if this support is disabled
- */
- public PerformanceCounter getGlobalPerformanceCounter() {
- if (this.globalActive) {
- if (this.globalPerformanceMonitor == null) {
- this.globalPerformanceMonitor = new PerformanceCounter(this,
- this.executorService, null, "GlobalPC",
- this.globalLimitWrite, this.globalLimitRead,
- this.globalDelay);
- this.globalPerformanceMonitor.startMonitoring();
- }
- }
- return this.globalPerformanceMonitor;
- }
-
- /**
- * @param channel
- * @return the channel PerformanceCounter or null if this support is
- * disabled
- */
- public PerformanceCounter createChannelPerformanceCounter(Channel channel) {
- if (this.channelActive && ((this.channelLimitRead > NO_LIMIT) || (this.channelLimitWrite > NO_LIMIT)
- || (this.channelDelay > NO_STAT))) {
- return new PerformanceCounter(this, this.executorService, channel,
- "ChannelPC" + channel.getId(), this.channelLimitWrite,
- this.channelLimitRead, this.channelDelay);
- }
- return null;
- }
-
- /**
- * Stop the global performance counter if any (Even it is stopped, the
- * factory can however be reused)
- *
- */
- public void stopGlobalPerformanceCounter() {
- if (this.globalPerformanceMonitor != null) {
- this.globalPerformanceMonitor.stopMonitoring();
- this.globalPerformanceMonitor = null;
- }
- }
-
- /**
- * @return the channelDelay
- */
- public long getChannelDelay() {
- return this.channelDelay;
- }
-
- /**
- * @param channelDelay
- * the channelDelay to set
- */
- public void setChannelDelay(long channelDelay) {
- this.channelDelay = channelDelay;
- }
-
- /**
- * @return the channelLimitRead
- */
- public long getChannelLimitRead() {
- return this.channelLimitRead;
- }
-
- /**
- * @param channelLimitRead
- * the channelLimitRead to set
- */
- public void setChannelLimitRead(long channelLimitRead) {
- this.channelLimitRead = channelLimitRead;
- }
-
- /**
- * @return the channelLimitWrite
- */
- public long getChannelLimitWrite() {
- return this.channelLimitWrite;
- }
-
- /**
- * @param channelLimitWrite
- * the channelLimitWrite to set
- */
- public void setChannelLimitWrite(long channelLimitWrite) {
- this.channelLimitWrite = channelLimitWrite;
- }
-
- /**
- * @return the globalDelay
- */
- public long getGlobalDelay() {
- return this.globalDelay;
- }
-
- /**
- * @param globalDelay
- * the globalDelay to set
- */
- public void setGlobalDelay(long globalDelay) {
- this.globalDelay = globalDelay;
- if (this.globalPerformanceMonitor != null) {
- this.globalPerformanceMonitor.changeConfiguration(null,
- this.globalLimitWrite, this.globalLimitRead,
- this.globalDelay);
- }
- }
-
- /**
- * @return the globalLimitRead
- */
- public long getGlobalLimitRead() {
- return this.globalLimitRead;
- }
-
- /**
- * @param globalLimitRead
- * the globalLimitRead to set
- */
- public void setGlobalLimitRead(long globalLimitRead) {
- this.globalLimitRead = globalLimitRead;
- if (this.globalPerformanceMonitor != null) {
- this.globalPerformanceMonitor.changeConfiguration(null,
- this.globalLimitWrite, this.globalLimitRead,
- this.globalDelay);
- }
- }
-
- /**
- * @return the globalLimitWrite
- */
- public long getGlobalLimitWrite() {
- return this.globalLimitWrite;
- }
-
- /**
- * @param globalLimitWrite
- * the globalLimitWrite to set
- */
- public void setGlobalLimitWrite(long globalLimitWrite) {
- this.globalLimitWrite = globalLimitWrite;
- if (this.globalPerformanceMonitor != null) {
- this.globalPerformanceMonitor.changeConfiguration(null,
- this.globalLimitWrite, this.globalLimitRead,
- this.globalDelay);
- }
- }
-
- /**
- * @return the channelActive
- */
- public boolean isChannelActive() {
- return this.channelActive;
- }
-
- /**
- * @return the globalActive
- */
- public boolean isGlobalActive() {
- return this.globalActive;
- }
-
-}
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * by the @author tags. See the COPYRIGHT.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.netty.handler.trafficshaping;
+
+import java.util.concurrent.ExecutorService;
+
+import org.jboss.netty.channel.Channel;
+
+/**
+ * @author The Netty Project (netty-dev@lists.jboss.org)
+ * @author Frederic Bregier (fredbregier@free.fr)
+ * @version $Rev$, $Date$
+ *
+ * The {@link PerformanceCounterFactory} is a Factory for
+ * {@link PerformanceCounter}. It stores the necessary information to enable
+ * dynamic creation of {@link PerformanceCounter} inside the
+ * {@link TrafficShapingHandler}.
+ *
+ *
+ */
+public abstract class PerformanceCounterFactory {
+ /**
+ * No limit
+ */
+ public static long NO_LIMIT = -1;
+
+ /**
+ * No statistics (if channel or global PerformanceCounter is a very long
+ * time living, it can produce a excess of capacity, i.e. 2^64 bytes so 17
+ * billions of billions of bytes).
+ */
+ public static long NO_STAT = -1;
+
+ /**
+ * Default delay between two checks: 1s
+ */
+ public static long DEFAULT_DELAY = 1000;
+
+ /**
+ * ExecutorService to associated to any PerformanceCounter
+ */
+ private ExecutorService executorService = null;
+
+ /**
+ * Limit in B/s to apply to write for all channel PerformanceCounter
+ */
+ private long channelLimitWrite = NO_LIMIT;
+
+ /**
+ * Limit in B/s to apply to read for all channel PerformanceCounter
+ */
+ private long channelLimitRead = NO_LIMIT;
+
+ /**
+ * Delay between two performance snapshots for channel
+ */
+ private long channelDelay = DEFAULT_DELAY; // default 1 s
+
+ /**
+ * Will the PerformanceCounter for Channel be active
+ */
+ private boolean channelActive = true;
+
+ /**
+ * Limit in B/s to apply to write for the global PerformanceCounter
+ */
+ private long globalLimitWrite = NO_LIMIT;
+
+ /**
+ * Limit in B/s to apply to read for the global PerformanceCounter
+ */
+ private long globalLimitRead = NO_LIMIT;
+
+ /**
+ * Delay between two performance snapshots for global
+ */
+ private long globalDelay = DEFAULT_DELAY; // default 1 s
+
+ /**
+ * Will the PerformanceCounter for Global be active
+ */
+ private boolean globalActive = true;
+
+ /**
+ * Global Monitor
+ */
+ private PerformanceCounter globalPerformanceMonitor = null;
+
+ /**
+ * Called each time the accounting is computed for the PerformanceCounters.
+ * This method could be used for instance to implement real time accounting.
+ *
+ * @param counter
+ * the PerformanceCounter that computes its performance
+ */
+ protected abstract void accounting(PerformanceCounter counter);
+
+ /**
+ *
+ * @param newexecutorService
+ * @param newChannelActive
+ * @param newChannelLimitWrite
+ * @param newChannelLimitRead
+ * @param newChannelDelay
+ * @param newGlobalActive
+ * @param newGlobalLimitWrite
+ * @param newGlobalLimitRead
+ * @param newGlobalDelay
+ */
+ private void init(ExecutorService newexecutorService,
+ boolean newChannelActive, long newChannelLimitWrite,
+ long newChannelLimitRead, long newChannelDelay,
+ boolean newGlobalActive, long newGlobalLimitWrite,
+ long newGlobalLimitRead, long newGlobalDelay) {
+ this.executorService = newexecutorService;
+ this.channelActive = newChannelActive;
+ this.channelLimitWrite = newChannelLimitWrite;
+ this.channelLimitRead = newChannelLimitRead;
+ this.channelDelay = newChannelDelay;
+ this.globalActive = newGlobalActive;
+ this.globalLimitWrite = newGlobalLimitWrite;
+ this.globalLimitRead = newGlobalLimitRead;
+ this.globalDelay = newGlobalDelay;
+ }
+
+ /**
+ * Full constructor
+ *
+ * @param executorService
+ * created for instance like Executors.newCachedThreadPool
+ * @param channelActive
+ * True if each channel will have a PerformanceCounter
+ * @param channelLimitWrite
+ * NO_LIMIT or a limit in bytes/s
+ * @param channelLimitRead
+ * NO_LIMIT or a limit in bytes/s
+ * @param channelDelay
+ * The delay between two computations of performances for
+ * channels or NO_STAT if no stats are to be computed
+ * @param globalActive
+ * True if global context will have one unique PerformanceCounter
+ * @param globalLimitWrite
+ * NO_LIMIT or a limit in bytes/s
+ * @param globalLimitRead
+ * NO_LIMIT or a limit in bytes/s
+ * @param globalDelay
+ * The delay between two computations of performances for global
+ * context or NO_STAT if no stats are to be computed
+ */
+ public PerformanceCounterFactory(ExecutorService executorService,
+ boolean channelActive, long channelLimitWrite,
+ long channelLimitRead, long channelDelay, boolean globalActive,
+ long globalLimitWrite, long globalLimitRead, long globalDelay) {
+ init(executorService, channelActive, channelLimitWrite,
+ channelLimitRead, channelDelay, globalActive, globalLimitWrite,
+ globalLimitRead, globalDelay);
+ }
+
+ /**
+ * Constructor using default Delay
+ *
+ * @param executorService
+ * created for instance like Executors.newCachedThreadPool
+ * @param channelActive
+ * True if each channel will have a PerformanceCounter
+ * @param channelLimitWrite
+ * NO_LIMIT or a limit in bytes/s
+ * @param channelLimitRead
+ * NO_LIMIT or a limit in bytes/s
+ * @param globalActive
+ * True if global context will have one unique PerformanceCounter
+ * @param globalLimitWrite
+ * NO_LIMIT or a limit in bytes/s
+ * @param globalLimitRead
+ * NO_LIMIT or a limit in bytes/s
+ */
+ public PerformanceCounterFactory(ExecutorService executorService,
+ boolean channelActive, long channelLimitWrite,
+ long channelLimitRead, boolean globalActive, long globalLimitWrite,
+ long globalLimitRead) {
+ init(executorService, channelActive, channelLimitWrite,
+ channelLimitRead, DEFAULT_DELAY, globalActive,
+ globalLimitWrite, globalLimitRead, DEFAULT_DELAY);
+ }
+
+ /**
+ * Constructor using NO_LIMIT and default delay for channels
+ *
+ * @param executorService
+ * created for instance like Executors.newCachedThreadPool
+ * @param channelActive
+ * True if each channel will have a PerformanceCounter
+ * @param globalActive
+ * True if global context will have one unique PerformanceCounter
+ * @param globalLimitWrite
+ * NO_LIMIT or a limit in bytes/s
+ * @param globalLimitRead
+ * NO_LIMIT or a limit in bytes/s
+ * @param globalDelay
+ * The delay between two computations of performances for global
+ * context or NO_STAT if no stats are to be computed
+ */
+ public PerformanceCounterFactory(ExecutorService executorService,
+ boolean channelActive, boolean globalActive, long globalLimitWrite,
+ long globalLimitRead, long globalDelay) {
+ init(executorService, channelActive, NO_LIMIT, NO_LIMIT,
+ DEFAULT_DELAY, globalActive, globalLimitWrite, globalLimitRead,
+ globalDelay);
+ }
+
+ /**
+ * Constructor using NO_LIMIT for channels and default delay for all
+ *
+ * @param executorService
+ * created for instance like Executors.newCachedThreadPool
+ * @param channelActive
+ * True if each channel will have a PerformanceCounter
+ * @param globalActive
+ * True if global context will have one unique PerformanceCounter
+ * @param globalLimitWrite
+ * NO_LIMIT or a limit in bytes/s
+ * @param globalLimitRead
+ * NO_LIMIT or a limit in bytes/s
+ */
+ public PerformanceCounterFactory(ExecutorService executorService,
+ boolean channelActive, boolean globalActive, long globalLimitWrite,
+ long globalLimitRead) {
+ init(executorService, channelActive, NO_LIMIT, NO_LIMIT,
+ DEFAULT_DELAY, globalActive, globalLimitWrite, globalLimitRead,
+ DEFAULT_DELAY);
+ }
+
+ /**
+ * Constructor using NO_LIMIT and default delay for all
+ *
+ * @param executorService
+ * created for instance like Executors.newCachedThreadPool
+ * @param channelActive
+ * True if each channel will have a PerformanceCounter
+ * @param globalActive
+ * True if global context will have one unique PerformanceCounter
+ */
+ public PerformanceCounterFactory(ExecutorService executorService,
+ boolean channelActive, boolean globalActive) {
+ init(executorService, channelActive, NO_LIMIT, NO_LIMIT,
+ DEFAULT_DELAY, globalActive, NO_LIMIT, NO_LIMIT, DEFAULT_DELAY);
+ }
+
+ /**
+ * Enable to change the active status of PerformanceCounter on Channels (for
+ * new one only)
+ *
+ * @param active
+ */
+ public void setChannelActive(boolean active) {
+ this.channelActive = active;
+ }
+
+ /**
+ * Enable to change the active status of PerformanceCounter on Global (stop
+ * or start if necessary)
+ *
+ * @param active
+ */
+ public void setGlobalActive(boolean active) {
+ if (this.globalActive) {
+ if (!active) {
+ stopGlobalPerformanceCounter();
+ }
+ }
+ this.globalActive = active;
+ getGlobalPerformanceCounter();
+ }
+
+ /**
+ * Change the underlying limitations. Only Global PerformanceCounter (if
+ * any) is dynamically changed, but Channels PerformanceCounters are not
+ * changed, only new created ones.
+ *
+ * @param newchannelLimitWrite
+ * @param newchannelLimitRead
+ * @param newchanneldelay
+ * @param newglobalLimitWrite
+ * @param newglobalLimitRead
+ * @param newglobaldelay
+ */
+ public void changeConfiguration(long newchannelLimitWrite,
+ long newchannelLimitRead, long newchanneldelay,
+ long newglobalLimitWrite, long newglobalLimitRead,
+ long newglobaldelay) {
+ this.channelLimitWrite = newchannelLimitWrite;
+ this.channelLimitRead = newchannelLimitRead;
+ this.channelDelay = newchanneldelay;
+ this.globalLimitWrite = newglobalLimitWrite;
+ this.globalLimitRead = newglobalLimitRead;
+ this.globalDelay = newglobaldelay;
+ if (this.globalPerformanceMonitor != null) {
+ this.globalPerformanceMonitor.changeConfiguration(null,
+ newglobalLimitWrite, newglobalLimitRead, newglobaldelay);
+ }
+ }
+
+ /**
+ * @return the Global PerformanceCounter or null if this support is disabled
+ */
+ public PerformanceCounter getGlobalPerformanceCounter() {
+ if (this.globalActive) {
+ if (this.globalPerformanceMonitor == null) {
+ this.globalPerformanceMonitor = new PerformanceCounter(this,
+ this.executorService, null, "GlobalPC",
+ this.globalLimitWrite, this.globalLimitRead,
+ this.globalDelay);
+ this.globalPerformanceMonitor.startMonitoring();
+ }
+ }
+ return this.globalPerformanceMonitor;
+ }
+
+ /**
+ * @param channel
+ * @return the channel PerformanceCounter or null if this support is
+ * disabled
+ */
+ public PerformanceCounter createChannelPerformanceCounter(Channel channel) {
+ if (this.channelActive && ((this.channelLimitRead > NO_LIMIT) || (this.channelLimitWrite > NO_LIMIT)
+ || (this.channelDelay > NO_STAT))) {
+ return new PerformanceCounter(this, this.executorService, channel,
+ "ChannelPC" + channel.getId(), this.channelLimitWrite,
+ this.channelLimitRead, this.channelDelay);
+ }
+ return null;
+ }
+
+ /**
+ * Stop the global performance counter if any (Even it is stopped, the
+ * factory can however be reused)
+ *
+ */
+ public void stopGlobalPerformanceCounter() {
+ if (this.globalPerformanceMonitor != null) {
+ this.globalPerformanceMonitor.stopMonitoring();
+ this.globalPerformanceMonitor = null;
+ }
+ }
+
+ /**
+ * @return the channelDelay
+ */
+ public long getChannelDelay() {
+ return this.channelDelay;
+ }
+
+ /**
+ * @param channelDelay
+ * the channelDelay to set
+ */
+ public void setChannelDelay(long channelDelay) {
+ this.channelDelay = channelDelay;
+ }
+
+ /**
+ * @return the channelLimitRead
+ */
+ public long getChannelLimitRead() {
+ return this.channelLimitRead;
+ }
+
+ /**
+ * @param channelLimitRead
+ * the channelLimitRead to set
+ */
+ public void setChannelLimitRead(long channelLimitRead) {
+ this.channelLimitRead = channelLimitRead;
+ }
+
+ /**
+ * @return the channelLimitWrite
+ */
+ public long getChannelLimitWrite() {
+ return this.channelLimitWrite;
+ }
+
+ /**
+ * @param channelLimitWrite
+ * the channelLimitWrite to set
+ */
+ public void setChannelLimitWrite(long channelLimitWrite) {
+ this.channelLimitWrite = channelLimitWrite;
+ }
+
+ /**
+ * @return the globalDelay
+ */
+ public long getGlobalDelay() {
+ return this.globalDelay;
+ }
+
+ /**
+ * @param globalDelay
+ * the globalDelay to set
+ */
+ public void setGlobalDelay(long globalDelay) {
+ this.globalDelay = globalDelay;
+ if (this.globalPerformanceMonitor != null) {
+ this.globalPerformanceMonitor.changeConfiguration(null,
+ this.globalLimitWrite, this.globalLimitRead,
+ this.globalDelay);
+ }
+ }
+
+ /**
+ * @return the globalLimitRead
+ */
+ public long getGlobalLimitRead() {
+ return this.globalLimitRead;
+ }
+
+ /**
+ * @param globalLimitRead
+ * the globalLimitRead to set
+ */
+ public void setGlobalLimitRead(long globalLimitRead) {
+ this.globalLimitRead = globalLimitRead;
+ if (this.globalPerformanceMonitor != null) {
+ this.globalPerformanceMonitor.changeConfiguration(null,
+ this.globalLimitWrite, this.globalLimitRead,
+ this.globalDelay);
+ }
+ }
+
+ /**
+ * @return the globalLimitWrite
+ */
+ public long getGlobalLimitWrite() {
+ return this.globalLimitWrite;
+ }
+
+ /**
+ * @param globalLimitWrite
+ * the globalLimitWrite to set
+ */
+ public void setGlobalLimitWrite(long globalLimitWrite) {
+ this.globalLimitWrite = globalLimitWrite;
+ if (this.globalPerformanceMonitor != null) {
+ this.globalPerformanceMonitor.changeConfiguration(null,
+ this.globalLimitWrite, this.globalLimitRead,
+ this.globalDelay);
+ }
+ }
+
+ /**
+ * @return the channelActive
+ */
+ public boolean isChannelActive() {
+ return this.channelActive;
+ }
+
+ /**
+ * @return the globalActive
+ */
+ public boolean isGlobalActive() {
+ return this.globalActive;
+ }
+
+}
diff --git a/src/main/java/org/jboss/netty/handler/trafficshaping/TrafficShapingHandler.java b/src/main/java/org/jboss/netty/handler/trafficshaping/TrafficShapingHandler.java
index dde6a6c211..6b27ff1bc9 100644
--- a/src/main/java/org/jboss/netty/handler/trafficshaping/TrafficShapingHandler.java
+++ b/src/main/java/org/jboss/netty/handler/trafficshaping/TrafficShapingHandler.java
@@ -1,250 +1,249 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Copyright 2009, Red Hat Middleware LLC, and individual contributors
- * by the @author tags. See the COPYRIGHT.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.netty.handler.trafficshaping;
-
-import java.io.InvalidClassException;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelEvent;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipelineCoverage;
-import org.jboss.netty.channel.ChannelState;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-
-/**
- * @author The Netty Project (netty-dev@lists.jboss.org)
- * @author Trustin Lee (tlee@redhat.com)
- * @author Frederic Bregier (fredbregier@free.fr)
- * @version $Rev$, $Date$
- *
- * TrafficShapingHandler allows to limit the global bandwidth or per session
- * bandwidth, as traffic shaping.
- *
- *
- * The method getMessageSize(MessageEvent) has to be implemented to specify what
- * is the size of the object to be read or write accordingly to the type of
- * object. In simple case, it can be as simple as a call to
- * getChannelBufferMessageSize(MessageEvent).
- *
- *
- * TrafficShapingHandler depends on {@link PerformanceCounterFactory} to create
- * or use the necessary {@link PerformanceCounter} with the necessary options.
- * However, you can change the behavior of both global and channel
- * PerformanceCounter if you like by getting them from this handler and changing
- * their status.
- *
- */
-@ChannelPipelineCoverage("one")
-public abstract class TrafficShapingHandler extends SimpleChannelHandler {
- /**
- * Channel Monitor
- */
- private PerformanceCounter channelPerformanceCounter = null;
-
- /**
- * Global Monitor
- */
- private PerformanceCounter globalPerformanceCounter = null;
-
- /**
- * Factory if used
- */
- private PerformanceCounterFactory factory = null;
-
- /**
- * Constructor
- *
- * @param factory
- * the PerformanceCounterFactory from which all Monitors will be
- * created
- */
- public TrafficShapingHandler(PerformanceCounterFactory factory) {
- super();
- this.factory = factory;
- this.globalPerformanceCounter = this.factory
- .getGlobalPerformanceCounter();
- this.channelPerformanceCounter = null;
- // will be set when connected is called
- }
-
- /**
- * This method has to be implemented. It returns the size in bytes of the
- * message to be read or written.
- *
- * @param arg1
- * the MessageEvent to be read or written
- * @return the size in bytes of the given MessageEvent
- * @exception Exception
- * An exception can be thrown if the object is not of the
- * expected type
- */
- protected abstract long getMessageSize(MessageEvent arg1) throws Exception;
-
- /**
- * Example of function (which can be used) for the ChannelBuffer
- *
- * @param arg1
- * @return the size in bytes of the given MessageEvent
- * @throws Exception
- */
- protected long getChannelBufferMessageSize(MessageEvent arg1)
- throws Exception {
- Object o = arg1.getMessage();
- if (!(o instanceof ChannelBuffer)) {
- // Type unimplemented
- throw new InvalidClassException("Wrong object received in " +
- this.getClass().getName() + " codec " +
- o.getClass().getName());
- }
- ChannelBuffer dataBlock = (ChannelBuffer) o;
- return dataBlock.readableBytes();
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.jboss.netty.channel.SimpleChannelHandler#messageReceived(org.jboss.netty.channel.ChannelHandlerContext,
- * org.jboss.netty.channel.MessageEvent)
- */
- @Override
- public void messageReceived(ChannelHandlerContext arg0, MessageEvent arg1)
- throws Exception {
- long size = getMessageSize(arg1);
- if (this.channelPerformanceCounter != null) {
- this.channelPerformanceCounter.setReceivedBytes(arg0, size);
- }
- if (this.globalPerformanceCounter != null) {
- this.globalPerformanceCounter.setReceivedBytes(arg0, size);
- }
- // The message is then just passed to the next Codec
- super.messageReceived(arg0, arg1);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.jboss.netty.channel.SimpleChannelHandler#writeRequested(org.jboss.netty.channel.ChannelHandlerContext,
- * org.jboss.netty.channel.MessageEvent)
- */
- @Override
- public void writeRequested(ChannelHandlerContext arg0, MessageEvent arg1)
- throws Exception {
- long size = getMessageSize(arg1);
- if (this.channelPerformanceCounter != null) {
- this.channelPerformanceCounter.setToWriteBytes(size);
- }
- if (this.globalPerformanceCounter != null) {
- this.globalPerformanceCounter.setToWriteBytes(size);
- }
- // The message is then just passed to the next Codec
- super.writeRequested(arg0, arg1);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.jboss.netty.channel.SimpleChannelHandler#channelClosed(org.jboss.netty.channel.ChannelHandlerContext,
- * org.jboss.netty.channel.ChannelStateEvent)
- */
- @Override
- public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
- throws Exception {
- if (this.channelPerformanceCounter != null) {
- this.channelPerformanceCounter.stopMonitoring();
- this.channelPerformanceCounter = null;
- }
- super.channelClosed(ctx, e);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.jboss.netty.channel.SimpleChannelHandler#channelConnected(org.jboss.netty.channel.ChannelHandlerContext,
- * org.jboss.netty.channel.ChannelStateEvent)
- */
- @Override
- public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
- throws Exception {
- // readSuspended = true;
- ctx.setAttachment(Boolean.TRUE);
- ctx.getChannel().setReadable(false);
- if ((this.channelPerformanceCounter == null) && (this.factory != null)) {
- // A factory was used
- this.channelPerformanceCounter = this.factory
- .createChannelPerformanceCounter(ctx.getChannel());
- }
- if (this.channelPerformanceCounter != null) {
- this.channelPerformanceCounter
- .setMonitoredChannel(ctx.getChannel());
- this.channelPerformanceCounter.startMonitoring();
- }
- super.channelConnected(ctx, e);
- // readSuspended = false;
- ctx.setAttachment(null);
- ctx.getChannel().setReadable(true);
- }
-
- @Override
- public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
- throws Exception {
- if (e instanceof ChannelStateEvent) {
- ChannelStateEvent cse = (ChannelStateEvent) e;
- if (cse.getState() == ChannelState.INTEREST_OPS &&
- (((Integer) cse.getValue()).intValue() & Channel.OP_READ) != 0) {
-
- // setReadable(true) requested
- boolean readSuspended = ctx.getAttachment() != null;
- if (readSuspended) {
- // Drop the request silently if PerformanceCounter has
- // set the flag.
- e.getFuture().setSuccess();
- return;
- }
- }
- }
- super.handleDownstream(ctx, e);
- }
-
- /**
- *
- * @return the current ChannelPerformanceCounter set from the factory (if
- * channel is still connected) or null if this function was disabled
- * in the Factory
- */
- public PerformanceCounter getChannelPerformanceCounter() {
- return this.channelPerformanceCounter;
- }
-
- /**
- *
- * @return the GlobalPerformanceCounter from the factory or null if this
- * function was disabled in the Factory
- */
- public PerformanceCounter getGlobalPerformanceCounter() {
- return this.globalPerformanceCounter;
- }
-
-}
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * by the @author tags. See the COPYRIGHT.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.netty.handler.trafficshaping;
+
+import java.io.InvalidClassException;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelEvent;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.ChannelState;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+
+/**
+ * @author The Netty Project (netty-dev@lists.jboss.org)
+ * @author Frederic Bregier (fredbregier@free.fr)
+ * @version $Rev$, $Date$
+ *
+ * TrafficShapingHandler allows to limit the global bandwidth or per session
+ * bandwidth, as traffic shaping.
+ *
+ *
+ * The method getMessageSize(MessageEvent) has to be implemented to specify what
+ * is the size of the object to be read or write accordingly to the type of
+ * object. In simple case, it can be as simple as a call to
+ * getChannelBufferMessageSize(MessageEvent).
+ *
+ *
+ * TrafficShapingHandler depends on {@link PerformanceCounterFactory} to create
+ * or use the necessary {@link PerformanceCounter} with the necessary options.
+ * However, you can change the behavior of both global and channel
+ * PerformanceCounter if you like by getting them from this handler and changing
+ * their status.
+ *
+ */
+@ChannelPipelineCoverage("one")
+public abstract class TrafficShapingHandler extends SimpleChannelHandler {
+ /**
+ * Channel Monitor
+ */
+ private PerformanceCounter channelPerformanceCounter = null;
+
+ /**
+ * Global Monitor
+ */
+ private PerformanceCounter globalPerformanceCounter = null;
+
+ /**
+ * Factory if used
+ */
+ private PerformanceCounterFactory factory = null;
+
+ /**
+ * Constructor
+ *
+ * @param factory
+ * the PerformanceCounterFactory from which all Monitors will be
+ * created
+ */
+ public TrafficShapingHandler(PerformanceCounterFactory factory) {
+ super();
+ this.factory = factory;
+ this.globalPerformanceCounter = this.factory
+ .getGlobalPerformanceCounter();
+ this.channelPerformanceCounter = null;
+ // will be set when connected is called
+ }
+
+ /**
+ * This method has to be implemented. It returns the size in bytes of the
+ * message to be read or written.
+ *
+ * @param arg1
+ * the MessageEvent to be read or written
+ * @return the size in bytes of the given MessageEvent
+ * @exception Exception
+ * An exception can be thrown if the object is not of the
+ * expected type
+ */
+ protected abstract long getMessageSize(MessageEvent arg1) throws Exception;
+
+ /**
+ * Example of function (which can be used) for the ChannelBuffer
+ *
+ * @param arg1
+ * @return the size in bytes of the given MessageEvent
+ * @throws Exception
+ */
+ protected long getChannelBufferMessageSize(MessageEvent arg1)
+ throws Exception {
+ Object o = arg1.getMessage();
+ if (!(o instanceof ChannelBuffer)) {
+ // Type unimplemented
+ throw new InvalidClassException("Wrong object received in " +
+ this.getClass().getName() + " codec " +
+ o.getClass().getName());
+ }
+ ChannelBuffer dataBlock = (ChannelBuffer) o;
+ return dataBlock.readableBytes();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.jboss.netty.channel.SimpleChannelHandler#messageReceived(org.jboss.netty.channel.ChannelHandlerContext,
+ * org.jboss.netty.channel.MessageEvent)
+ */
+ @Override
+ public void messageReceived(ChannelHandlerContext arg0, MessageEvent arg1)
+ throws Exception {
+ long size = getMessageSize(arg1);
+ if (this.channelPerformanceCounter != null) {
+ this.channelPerformanceCounter.setReceivedBytes(arg0, size);
+ }
+ if (this.globalPerformanceCounter != null) {
+ this.globalPerformanceCounter.setReceivedBytes(arg0, size);
+ }
+ // The message is then just passed to the next Codec
+ super.messageReceived(arg0, arg1);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.jboss.netty.channel.SimpleChannelHandler#writeRequested(org.jboss.netty.channel.ChannelHandlerContext,
+ * org.jboss.netty.channel.MessageEvent)
+ */
+ @Override
+ public void writeRequested(ChannelHandlerContext arg0, MessageEvent arg1)
+ throws Exception {
+ long size = getMessageSize(arg1);
+ if (this.channelPerformanceCounter != null) {
+ this.channelPerformanceCounter.setToWriteBytes(size);
+ }
+ if (this.globalPerformanceCounter != null) {
+ this.globalPerformanceCounter.setToWriteBytes(size);
+ }
+ // The message is then just passed to the next Codec
+ super.writeRequested(arg0, arg1);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.jboss.netty.channel.SimpleChannelHandler#channelClosed(org.jboss.netty.channel.ChannelHandlerContext,
+ * org.jboss.netty.channel.ChannelStateEvent)
+ */
+ @Override
+ public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
+ throws Exception {
+ if (this.channelPerformanceCounter != null) {
+ this.channelPerformanceCounter.stopMonitoring();
+ this.channelPerformanceCounter = null;
+ }
+ super.channelClosed(ctx, e);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.jboss.netty.channel.SimpleChannelHandler#channelConnected(org.jboss.netty.channel.ChannelHandlerContext,
+ * org.jboss.netty.channel.ChannelStateEvent)
+ */
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
+ throws Exception {
+ // readSuspended = true;
+ ctx.setAttachment(Boolean.TRUE);
+ ctx.getChannel().setReadable(false);
+ if ((this.channelPerformanceCounter == null) && (this.factory != null)) {
+ // A factory was used
+ this.channelPerformanceCounter = this.factory
+ .createChannelPerformanceCounter(ctx.getChannel());
+ }
+ if (this.channelPerformanceCounter != null) {
+ this.channelPerformanceCounter
+ .setMonitoredChannel(ctx.getChannel());
+ this.channelPerformanceCounter.startMonitoring();
+ }
+ super.channelConnected(ctx, e);
+ // readSuspended = false;
+ ctx.setAttachment(null);
+ ctx.getChannel().setReadable(true);
+ }
+
+ @Override
+ public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
+ throws Exception {
+ if (e instanceof ChannelStateEvent) {
+ ChannelStateEvent cse = (ChannelStateEvent) e;
+ if (cse.getState() == ChannelState.INTEREST_OPS &&
+ (((Integer) cse.getValue()).intValue() & Channel.OP_READ) != 0) {
+
+ // setReadable(true) requested
+ boolean readSuspended = ctx.getAttachment() != null;
+ if (readSuspended) {
+ // Drop the request silently if PerformanceCounter has
+ // set the flag.
+ e.getFuture().setSuccess();
+ return;
+ }
+ }
+ }
+ super.handleDownstream(ctx, e);
+ }
+
+ /**
+ *
+ * @return the current ChannelPerformanceCounter set from the factory (if
+ * channel is still connected) or null if this function was disabled
+ * in the Factory
+ */
+ public PerformanceCounter getChannelPerformanceCounter() {
+ return this.channelPerformanceCounter;
+ }
+
+ /**
+ *
+ * @return the GlobalPerformanceCounter from the factory or null if this
+ * function was disabled in the Factory
+ */
+ public PerformanceCounter getGlobalPerformanceCounter() {
+ return this.globalPerformanceCounter;
+ }
+
+}
diff --git a/src/main/java/org/jboss/netty/handler/trafficshaping/package-info.java b/src/main/java/org/jboss/netty/handler/trafficshaping/package-info.java
index a49a44d358..8691271775 100644
--- a/src/main/java/org/jboss/netty/handler/trafficshaping/package-info.java
+++ b/src/main/java/org/jboss/netty/handler/trafficshaping/package-info.java
@@ -1,95 +1,95 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Copyright 2008, Red Hat Middleware LLC, and individual contributors
- * by the @author tags. See the COPYRIGHT.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-/**
- * Implementation of a Traffic Shaping Handler and Dynamic Statistics.
- *
- *
- *
- *
The main goal of this package is to allow to shape the traffic (bandwidth limitation), - * but also to get statistics on how many bytes are read or written. Both functions can - * be active or inactive (traffic or statistics).
Three classes implement this behavior:
- *
Standard use could be as follow:
To activate or deactivate the traffic shaping, change the value corresponding to your desire as - * [Global or per Channel] [Write or Read] Limitation in byte/s.
PerformanceCounterFactory.NO_LIMIT (-1) - * stands for no limitation, so the traffic shaping is deactivate (on what you specified).
You can either change those values with the method changeConfiguration in PerformanceCounterFactory or - * directly from the PerformanceCounter method changeConfiguration.
To activate or deactivate the statistics, you can adjust the delay to a low (not less than 200ms - * for efficiency reasons) or a high value (let say 24H in ms is huge enough to not get the problem) - * or even using PerformanceCounterFactory.NO_STAT (-1).
And if you don't want to do anything with this statistics, just implement an empty method for - * PerformanceCounterFactory.accounting(PerformanceCounter).
Again this can be changed either from PerformanceCounterFactory or directly in PerformanceCounter.
You can also completely deactivate channel or global PerformanceCounter by setting the boolean to false - * accordingly to your needs in the PerformanceCounterFactory. It will deactivate the global Monitor. For channels monitor, - * it will prevent new monitors to be created (or reversely they will be created for newly connected channels).
So in your application you will create your own PerformanceCounterFactory and setting the values to fit your needs.
Then you can create your pipeline (using a PipelineFactory since each TrafficShapingHandler must be unique by channel) and adding this handler before - * your MemoryAwareThreadPoolExecutor:
TrafficShapingHandler must be unique by channel but however it is still global due to - * the PerformanceCounterFactcory that is shared between all handlers across the channels.
The main goal of this package is to allow to shape the traffic (bandwidth limitation), + * but also to get statistics on how many bytes are read or written. Both functions can + * be active or inactive (traffic or statistics).
Three classes implement this behavior:
+ *
Standard use could be as follow:
To activate or deactivate the traffic shaping, change the value corresponding to your desire as + * [Global or per Channel] [Write or Read] Limitation in byte/s.
PerformanceCounterFactory.NO_LIMIT (-1) + * stands for no limitation, so the traffic shaping is deactivate (on what you specified).
You can either change those values with the method changeConfiguration in PerformanceCounterFactory or + * directly from the PerformanceCounter method changeConfiguration.
To activate or deactivate the statistics, you can adjust the delay to a low (not less than 200ms + * for efficiency reasons) or a high value (let say 24H in ms is huge enough to not get the problem) + * or even using PerformanceCounterFactory.NO_STAT (-1).
And if you don't want to do anything with this statistics, just implement an empty method for + * PerformanceCounterFactory.accounting(PerformanceCounter).
Again this can be changed either from PerformanceCounterFactory or directly in PerformanceCounter.
You can also completely deactivate channel or global PerformanceCounter by setting the boolean to false + * accordingly to your needs in the PerformanceCounterFactory. It will deactivate the global Monitor. For channels monitor, + * it will prevent new monitors to be created (or reversely they will be created for newly connected channels).
So in your application you will create your own PerformanceCounterFactory and setting the values to fit your needs.
Then you can create your pipeline (using a PipelineFactory since each TrafficShapingHandler must be unique by channel) and adding this handler before + * your MemoryAwareThreadPoolExecutor:
TrafficShapingHandler must be unique by channel but however it is still global due to + * the PerformanceCounterFactcory that is shared between all handlers across the channels.