Resolved issue: NETTY-68 (Make MemoryAwareThreadPoolExecutor.objectSizeEstimator property mutable)
This commit is contained in:
parent
678137b638
commit
70151828dc
@ -88,10 +88,7 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
|
||||
private static final InternalLogger logger =
|
||||
InternalLoggerFactory.getInstance(MemoryAwareThreadPoolExecutor.class);
|
||||
|
||||
private volatile Settings settings = new Settings(0, 0);
|
||||
|
||||
// XXX Can be changed in runtime now. Make it mutable in 3.1.
|
||||
private final ObjectSizeEstimator objectSizeEstimator;
|
||||
private volatile Settings settings;
|
||||
|
||||
private final ConcurrentMap<Channel, AtomicLong> channelCounters =
|
||||
new ConcurrentHashMap<Channel, AtomicLong>();
|
||||
@ -175,7 +172,14 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
|
||||
if (objectSizeEstimator == null) {
|
||||
throw new NullPointerException("objectSizeEstimator");
|
||||
}
|
||||
this.objectSizeEstimator = objectSizeEstimator;
|
||||
if (maxChannelMemorySize < 0) {
|
||||
throw new IllegalArgumentException(
|
||||
"maxChannelMemorySize: " + maxChannelMemorySize);
|
||||
}
|
||||
if (maxTotalMemorySize < 0) {
|
||||
throw new IllegalArgumentException(
|
||||
"maxTotalMemorySize: " + maxTotalMemorySize);
|
||||
}
|
||||
|
||||
// Call allowCoreThreadTimeOut(true) using reflection
|
||||
// because it is not supported in Java 5.
|
||||
@ -189,15 +193,28 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
|
||||
"supported in this platform.");
|
||||
}
|
||||
|
||||
setMaxChannelMemorySize(maxChannelMemorySize);
|
||||
setMaxTotalMemorySize(maxTotalMemorySize);
|
||||
settings = new Settings(
|
||||
objectSizeEstimator, maxChannelMemorySize, maxTotalMemorySize);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@link ObjectSizeEstimator} of this pool.
|
||||
*/
|
||||
public ObjectSizeEstimator getObjectSizeEstimator() {
|
||||
return objectSizeEstimator;
|
||||
return settings.objectSizeEstimator;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the {@link ObjectSizeEstimator} of this pool.
|
||||
*/
|
||||
public void setObjectSizeEstimator(ObjectSizeEstimator objectSizeEstimator) {
|
||||
if (objectSizeEstimator == null) {
|
||||
throw new NullPointerException("objectSizeEstimator");
|
||||
}
|
||||
|
||||
settings = new Settings(
|
||||
objectSizeEstimator,
|
||||
settings.maxChannelMemorySize, settings.maxTotalMemorySize);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -222,7 +239,9 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
|
||||
"can't be changed after a task is executed");
|
||||
}
|
||||
|
||||
settings = new Settings(maxChannelMemorySize, settings.maxTotalMemorySize);
|
||||
settings = new Settings(
|
||||
settings.objectSizeEstimator,
|
||||
maxChannelMemorySize, settings.maxTotalMemorySize);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -247,11 +266,17 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
|
||||
"can't be changed after a task is executed");
|
||||
}
|
||||
|
||||
settings = new Settings(settings.maxChannelMemorySize, maxTotalMemorySize);
|
||||
settings = new Settings(
|
||||
settings.objectSizeEstimator,
|
||||
settings.maxChannelMemorySize, maxTotalMemorySize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Runnable command) {
|
||||
if (!(command instanceof ChannelEventRunnable)) {
|
||||
command = new MemoryAwareRunnable(command);
|
||||
}
|
||||
|
||||
boolean pause = increaseCounter(command);
|
||||
doExecute(command);
|
||||
if (pause) {
|
||||
@ -294,6 +319,7 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
|
||||
protected void afterExecute(Runnable r, Throwable e) {
|
||||
super.afterExecute(r, e);
|
||||
}
|
||||
|
||||
protected boolean increaseCounter(Runnable task) {
|
||||
if (!shouldCount(task)) {
|
||||
return false;
|
||||
@ -303,7 +329,7 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
|
||||
long maxTotalMemorySize = settings.maxTotalMemorySize;
|
||||
long maxChannelMemorySize = settings.maxChannelMemorySize;
|
||||
|
||||
int increment = getObjectSizeEstimator().estimateSize(task);
|
||||
int increment = settings.objectSizeEstimator.estimateSize(task);
|
||||
long totalCounter = this.totalCounter.addAndGet(increment);
|
||||
|
||||
if (task instanceof ChannelEventRunnable) {
|
||||
@ -318,6 +344,8 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
|
||||
channel.setReadable(false);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
((MemoryAwareRunnable) task).estimatedSize = increment;
|
||||
}
|
||||
|
||||
//System.out.println("I: " + totalCounter + ", " + increment);
|
||||
@ -337,7 +365,7 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
|
||||
if (task instanceof ChannelEventRunnable) {
|
||||
increment = ((ChannelEventRunnable) task).estimatedSize;
|
||||
} else {
|
||||
increment = getObjectSizeEstimator().estimateSize(task);
|
||||
increment = ((MemoryAwareRunnable) task).estimatedSize;
|
||||
}
|
||||
|
||||
long totalCounter = this.totalCounter.addAndGet(-increment);
|
||||
@ -398,10 +426,13 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
|
||||
}
|
||||
|
||||
private static class Settings {
|
||||
final ObjectSizeEstimator objectSizeEstimator;
|
||||
final long maxChannelMemorySize;
|
||||
final long maxTotalMemorySize;
|
||||
|
||||
Settings(long maxChannelMemorySize, long maxTotalMemorySize) {
|
||||
Settings(ObjectSizeEstimator objectSizeEstimator,
|
||||
long maxChannelMemorySize, long maxTotalMemorySize) {
|
||||
this.objectSizeEstimator = objectSizeEstimator;
|
||||
this.maxChannelMemorySize = maxChannelMemorySize;
|
||||
this.maxTotalMemorySize = maxTotalMemorySize;
|
||||
}
|
||||
@ -422,4 +453,17 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class MemoryAwareRunnable implements Runnable {
|
||||
final Runnable task;
|
||||
volatile int estimatedSize;
|
||||
|
||||
MemoryAwareRunnable(Runnable task) {
|
||||
this.task = task;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
task.run();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user