Fixed issue NETTY-51 (MemoryAwareThreadPoolExecutor and its subtypes should use long instead of int to store the counters)

This commit is contained in:
Trustin Lee 2008-09-30 00:42:04 +00:00
parent 6f18c940dd
commit 8c15102bc3
2 changed files with 39 additions and 31 deletions

View File

@ -32,7 +32,7 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelEvent;
@ -86,9 +86,9 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
// XXX Can be changed in runtime now. Make it mutable in 3.1. // XXX Can be changed in runtime now. Make it mutable in 3.1.
private final ObjectSizeEstimator objectSizeEstimator; private final ObjectSizeEstimator objectSizeEstimator;
private final ConcurrentMap<Channel, AtomicInteger> channelCounters = private final ConcurrentMap<Channel, AtomicLong> channelCounters =
new ConcurrentHashMap<Channel, AtomicInteger>(); new ConcurrentHashMap<Channel, AtomicLong>();
private final AtomicInteger totalCounter = new AtomicInteger(); private final AtomicLong totalCounter = new AtomicLong();
private final Semaphore semaphore = new Semaphore(0); private final Semaphore semaphore = new Semaphore(0);
@ -102,7 +102,8 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
* Specify {@code 0} to disable. * Specify {@code 0} to disable.
*/ */
public MemoryAwareThreadPoolExecutor( public MemoryAwareThreadPoolExecutor(
int corePoolSize, int maxChannelMemorySize, int maxTotalMemorySize) { int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize) {
this(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, 30, TimeUnit.SECONDS); this(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, 30, TimeUnit.SECONDS);
} }
@ -118,7 +119,9 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
* @param unit the {@link TimeUnit} of {@code keepAliveTime} * @param unit the {@link TimeUnit} of {@code keepAliveTime}
*/ */
public MemoryAwareThreadPoolExecutor( public MemoryAwareThreadPoolExecutor(
int corePoolSize, int maxChannelMemorySize, int maxTotalMemorySize, long keepAliveTime, TimeUnit unit) { int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize,
long keepAliveTime, TimeUnit unit) {
this(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit, Executors.defaultThreadFactory()); this(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit, Executors.defaultThreadFactory());
} }
@ -135,7 +138,9 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
* @param threadFactory the {@link ThreadFactory} of this pool * @param threadFactory the {@link ThreadFactory} of this pool
*/ */
public MemoryAwareThreadPoolExecutor( public MemoryAwareThreadPoolExecutor(
int corePoolSize, int maxChannelMemorySize, int maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize,
long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
this(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit, new DefaultObjectSizeEstimator(), threadFactory); this(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit, new DefaultObjectSizeEstimator(), threadFactory);
} }
@ -153,7 +158,10 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
* @param objectSizeEstimator the {@link ObjectSizeEstimator} of this pool * @param objectSizeEstimator the {@link ObjectSizeEstimator} of this pool
*/ */
public MemoryAwareThreadPoolExecutor( public MemoryAwareThreadPoolExecutor(
int corePoolSize, int maxChannelMemorySize, int maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory) { int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize,
long keepAliveTime, TimeUnit unit, ObjectSizeEstimator objectSizeEstimator,
ThreadFactory threadFactory) {
super(corePoolSize, corePoolSize, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(), threadFactory); super(corePoolSize, corePoolSize, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(), threadFactory);
if (objectSizeEstimator == null) { if (objectSizeEstimator == null) {
@ -184,7 +192,7 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
/** /**
* Returns the maximum total size of the queued events per channel. * Returns the maximum total size of the queued events per channel.
*/ */
public int getMaxChannelMemorySize() { public long getMaxChannelMemorySize() {
return settings.maxChannelMemorySize; return settings.maxChannelMemorySize;
} }
@ -192,7 +200,7 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
* Sets the maximum total size of the queued events per channel. * Sets the maximum total size of the queued events per channel.
* Specify {@code 0} to disable. * Specify {@code 0} to disable.
*/ */
public void setMaxChannelMemorySize(int maxChannelMemorySize) { public void setMaxChannelMemorySize(long maxChannelMemorySize) {
if (maxChannelMemorySize < 0) { if (maxChannelMemorySize < 0) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"maxChannelMemorySize: " + maxChannelMemorySize); "maxChannelMemorySize: " + maxChannelMemorySize);
@ -209,7 +217,7 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
/** /**
* Returns the maximum total size of the queued events for this pool. * Returns the maximum total size of the queued events for this pool.
*/ */
public int getMaxTotalMemorySize() { public long getMaxTotalMemorySize() {
return settings.maxTotalMemorySize; return settings.maxTotalMemorySize;
} }
@ -217,7 +225,7 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
* Sets the maximum total size of the queued events for this pool. * Sets the maximum total size of the queued events for this pool.
* Specify {@code 0} to disable. * Specify {@code 0} to disable.
*/ */
public void setMaxTotalMemorySize(int maxTotalMemorySize) { public void setMaxTotalMemorySize(long maxTotalMemorySize) {
if (maxTotalMemorySize < 0) { if (maxTotalMemorySize < 0) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"maxTotalMemorySize: " + maxTotalMemorySize); "maxTotalMemorySize: " + maxTotalMemorySize);
@ -277,17 +285,17 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
} }
Settings settings = this.settings; Settings settings = this.settings;
int maxTotalMemorySize = settings.maxTotalMemorySize; long maxTotalMemorySize = settings.maxTotalMemorySize;
int maxChannelMemorySize = settings.maxChannelMemorySize; long maxChannelMemorySize = settings.maxChannelMemorySize;
int increment = getObjectSizeEstimator().estimateSize(task); int increment = getObjectSizeEstimator().estimateSize(task);
int totalCounter = this.totalCounter.addAndGet(increment); long totalCounter = this.totalCounter.addAndGet(increment);
if (task instanceof ChannelEventRunnable) { if (task instanceof ChannelEventRunnable) {
ChannelEventRunnable eventTask = (ChannelEventRunnable) task; ChannelEventRunnable eventTask = (ChannelEventRunnable) task;
eventTask.estimatedSize = increment; eventTask.estimatedSize = increment;
Channel channel = eventTask.getEvent().getChannel(); Channel channel = eventTask.getEvent().getChannel();
int channelCounter = getChannelCounter(channel).addAndGet(increment); long channelCounter = getChannelCounter(channel).addAndGet(increment);
if (maxChannelMemorySize != 0 && channelCounter >= maxChannelMemorySize && channel.isOpen()) { if (maxChannelMemorySize != 0 && channelCounter >= maxChannelMemorySize && channel.isOpen()) {
if (channel.isReadable()) { if (channel.isReadable()) {
channel.setReadable(false); channel.setReadable(false);
@ -305,8 +313,8 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
} }
Settings settings = this.settings; Settings settings = this.settings;
int maxTotalMemorySize = settings.maxTotalMemorySize; long maxTotalMemorySize = settings.maxTotalMemorySize;
int maxChannelMemorySize = settings.maxChannelMemorySize; long maxChannelMemorySize = settings.maxChannelMemorySize;
int increment; int increment;
if (task instanceof ChannelEventRunnable) { if (task instanceof ChannelEventRunnable) {
@ -315,7 +323,7 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
increment = getObjectSizeEstimator().estimateSize(task); increment = getObjectSizeEstimator().estimateSize(task);
} }
int totalCounter = this.totalCounter.addAndGet(-increment); long totalCounter = this.totalCounter.addAndGet(-increment);
//System.out.println("D: " + totalCounter + ", " + increment); //System.out.println("D: " + totalCounter + ", " + increment);
if (maxTotalMemorySize != 0 && totalCounter + increment >= maxTotalMemorySize) { if (maxTotalMemorySize != 0 && totalCounter + increment >= maxTotalMemorySize) {
@ -325,7 +333,7 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
if (task instanceof ChannelEventRunnable) { if (task instanceof ChannelEventRunnable) {
Channel channel = ((ChannelEventRunnable) task).getEvent().getChannel(); Channel channel = ((ChannelEventRunnable) task).getEvent().getChannel();
int channelCounter = getChannelCounter(channel).addAndGet(-increment); long channelCounter = getChannelCounter(channel).addAndGet(-increment);
if (maxChannelMemorySize != 0 && channelCounter + increment >= maxChannelMemorySize && channel.isOpen()) { if (maxChannelMemorySize != 0 && channelCounter + increment >= maxChannelMemorySize && channel.isOpen()) {
if (!channel.isReadable()) { if (!channel.isReadable()) {
channel.setReadable(true); channel.setReadable(true);
@ -334,11 +342,11 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
} }
} }
private AtomicInteger getChannelCounter(Channel channel) { private AtomicLong getChannelCounter(Channel channel) {
AtomicInteger counter = channelCounters.get(channel); AtomicLong counter = channelCounters.get(channel);
if (counter == null) { if (counter == null) {
counter = new AtomicInteger(); counter = new AtomicLong();
AtomicInteger oldCounter = channelCounters.putIfAbsent(channel, counter); AtomicLong oldCounter = channelCounters.putIfAbsent(channel, counter);
if (oldCounter != null) { if (oldCounter != null) {
counter = oldCounter; counter = oldCounter;
} }
@ -365,10 +373,10 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
} }
private static class Settings { private static class Settings {
final int maxChannelMemorySize; final long maxChannelMemorySize;
final int maxTotalMemorySize; final long maxTotalMemorySize;
Settings(int maxChannelMemorySize, int maxTotalMemorySize) { Settings(long maxChannelMemorySize, long maxTotalMemorySize) {
this.maxChannelMemorySize = maxChannelMemorySize; this.maxChannelMemorySize = maxChannelMemorySize;
this.maxTotalMemorySize = maxTotalMemorySize; this.maxTotalMemorySize = maxTotalMemorySize;
} }

View File

@ -75,7 +75,7 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
* Specify {@code 0} to disable. * Specify {@code 0} to disable.
*/ */
public OrderedMemoryAwareThreadPoolExecutor( public OrderedMemoryAwareThreadPoolExecutor(
int corePoolSize, int maxChannelMemorySize, int maxTotalMemorySize) { int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize) {
super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize); super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize);
} }
@ -91,7 +91,7 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
* @param unit the {@link TimeUnit} of {@code keepAliveTime} * @param unit the {@link TimeUnit} of {@code keepAliveTime}
*/ */
public OrderedMemoryAwareThreadPoolExecutor( public OrderedMemoryAwareThreadPoolExecutor(
int corePoolSize, int maxChannelMemorySize, int maxTotalMemorySize, int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize,
long keepAliveTime, TimeUnit unit) { long keepAliveTime, TimeUnit unit) {
super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize,
keepAliveTime, unit); keepAliveTime, unit);
@ -110,7 +110,7 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
* @param threadFactory the {@link ThreadFactory} of this pool * @param threadFactory the {@link ThreadFactory} of this pool
*/ */
public OrderedMemoryAwareThreadPoolExecutor( public OrderedMemoryAwareThreadPoolExecutor(
int corePoolSize, int maxChannelMemorySize, int maxTotalMemorySize, int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize,
long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize,
keepAliveTime, unit, threadFactory); keepAliveTime, unit, threadFactory);
@ -130,7 +130,7 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
* @param objectSizeEstimator the {@link ObjectSizeEstimator} of this pool * @param objectSizeEstimator the {@link ObjectSizeEstimator} of this pool
*/ */
public OrderedMemoryAwareThreadPoolExecutor( public OrderedMemoryAwareThreadPoolExecutor(
int corePoolSize, int maxChannelMemorySize, int maxTotalMemorySize, int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize,
long keepAliveTime, TimeUnit unit, long keepAliveTime, TimeUnit unit,
ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory) { ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory) {
super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize,