Optimization.. (reduced volatiles)
This commit is contained in:
parent
22d79a501d
commit
0241120ace
|
@ -42,6 +42,7 @@ public class ChannelEventRunnable implements Runnable {
|
||||||
|
|
||||||
private final ChannelHandlerContext ctx;
|
private final ChannelHandlerContext ctx;
|
||||||
private final ChannelEvent e;
|
private final ChannelEvent e;
|
||||||
|
volatile int estimatedSize;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a {@link Runnable} which sends the specified {@link ChannelEvent}
|
* Creates a {@link Runnable} which sends the specified {@link ChannelEvent}
|
||||||
|
|
|
@ -81,8 +81,9 @@ import org.jboss.netty.channel.ChannelStateEvent;
|
||||||
*/
|
*/
|
||||||
public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
|
public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
|
||||||
|
|
||||||
private volatile int maxChannelMemorySize;
|
private volatile Settings settings = new Settings(0, 0);
|
||||||
private volatile int maxTotalMemorySize;
|
|
||||||
|
// XXX Can be changed in runtime now. Make it mutable in 3.1.
|
||||||
private final ObjectSizeEstimator objectSizeEstimator;
|
private final ObjectSizeEstimator objectSizeEstimator;
|
||||||
|
|
||||||
private final ConcurrentMap<Channel, AtomicInteger> channelCounters =
|
private final ConcurrentMap<Channel, AtomicInteger> channelCounters =
|
||||||
|
@ -184,7 +185,7 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
|
||||||
* Returns the maximum total size of the queued events per channel.
|
* Returns the maximum total size of the queued events per channel.
|
||||||
*/
|
*/
|
||||||
public int getMaxChannelMemorySize() {
|
public int getMaxChannelMemorySize() {
|
||||||
return maxChannelMemorySize;
|
return settings.maxChannelMemorySize;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -196,14 +197,14 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
"maxChannelMemorySize: " + maxChannelMemorySize);
|
"maxChannelMemorySize: " + maxChannelMemorySize);
|
||||||
}
|
}
|
||||||
this.maxChannelMemorySize = maxChannelMemorySize;
|
settings = new Settings(maxChannelMemorySize, settings.maxTotalMemorySize);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the maximum total size of the queued events for this pool.
|
* Returns the maximum total size of the queued events for this pool.
|
||||||
*/
|
*/
|
||||||
public int getMaxTotalMemorySize() {
|
public int getMaxTotalMemorySize() {
|
||||||
return maxTotalMemorySize;
|
return settings.maxTotalMemorySize;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -215,7 +216,7 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
"maxTotalMemorySize: " + maxTotalMemorySize);
|
"maxTotalMemorySize: " + maxTotalMemorySize);
|
||||||
}
|
}
|
||||||
this.maxTotalMemorySize = maxTotalMemorySize;
|
settings = new Settings(settings.maxChannelMemorySize, maxTotalMemorySize);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -269,13 +270,17 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Settings settings = this.settings;
|
||||||
|
int maxTotalMemorySize = settings.maxTotalMemorySize;
|
||||||
|
int maxChannelMemorySize = settings.maxChannelMemorySize;
|
||||||
|
|
||||||
int increment = getObjectSizeEstimator().estimateSize(task);
|
int increment = getObjectSizeEstimator().estimateSize(task);
|
||||||
int maxTotalMemorySize = getMaxTotalMemorySize();
|
|
||||||
int totalCounter = this.totalCounter.addAndGet(increment);
|
int totalCounter = this.totalCounter.addAndGet(increment);
|
||||||
|
|
||||||
if (task instanceof ChannelEventRunnable) {
|
if (task instanceof ChannelEventRunnable) {
|
||||||
Channel channel = ((ChannelEventRunnable) task).getEvent().getChannel();
|
ChannelEventRunnable eventTask = (ChannelEventRunnable) task;
|
||||||
int maxChannelMemorySize = getMaxChannelMemorySize();
|
eventTask.estimatedSize = increment;
|
||||||
|
Channel channel = eventTask.getEvent().getChannel();
|
||||||
int channelCounter = getChannelCounter(channel).addAndGet(increment);
|
int channelCounter = getChannelCounter(channel).addAndGet(increment);
|
||||||
if (maxChannelMemorySize != 0 && channelCounter >= maxChannelMemorySize && channel.isOpen()) {
|
if (maxChannelMemorySize != 0 && channelCounter >= maxChannelMemorySize && channel.isOpen()) {
|
||||||
if (channel.isReadable()) {
|
if (channel.isReadable()) {
|
||||||
|
@ -292,8 +297,17 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int increment = getObjectSizeEstimator().estimateSize(task);
|
Settings settings = this.settings;
|
||||||
int maxTotalMemorySize = getMaxTotalMemorySize();
|
int maxTotalMemorySize = settings.maxTotalMemorySize;
|
||||||
|
int maxChannelMemorySize = settings.maxChannelMemorySize;
|
||||||
|
|
||||||
|
int increment;
|
||||||
|
if (task instanceof ChannelEventRunnable) {
|
||||||
|
increment = ((ChannelEventRunnable) task).estimatedSize;
|
||||||
|
} else {
|
||||||
|
increment = getObjectSizeEstimator().estimateSize(task);
|
||||||
|
}
|
||||||
|
|
||||||
int totalCounter = this.totalCounter.addAndGet(-increment);
|
int totalCounter = this.totalCounter.addAndGet(-increment);
|
||||||
|
|
||||||
if (maxTotalMemorySize == 0 || totalCounter < maxTotalMemorySize) {
|
if (maxTotalMemorySize == 0 || totalCounter < maxTotalMemorySize) {
|
||||||
|
@ -302,7 +316,6 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
|
||||||
|
|
||||||
if (task instanceof ChannelEventRunnable) {
|
if (task instanceof ChannelEventRunnable) {
|
||||||
Channel channel = ((ChannelEventRunnable) task).getEvent().getChannel();
|
Channel channel = ((ChannelEventRunnable) task).getEvent().getChannel();
|
||||||
int maxChannelMemorySize = getMaxChannelMemorySize();
|
|
||||||
int channelCounter = getChannelCounter(channel).addAndGet(-increment);
|
int channelCounter = getChannelCounter(channel).addAndGet(-increment);
|
||||||
if ((maxChannelMemorySize == 0 || channelCounter < maxChannelMemorySize) && channel.isOpen()) {
|
if ((maxChannelMemorySize == 0 || channelCounter < maxChannelMemorySize) && channel.isOpen()) {
|
||||||
if (!channel.isReadable()) {
|
if (!channel.isReadable()) {
|
||||||
|
@ -341,4 +354,14 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class Settings {
|
||||||
|
final int maxChannelMemorySize;
|
||||||
|
final int maxTotalMemorySize;
|
||||||
|
|
||||||
|
Settings(int maxChannelMemorySize, int maxTotalMemorySize) {
|
||||||
|
this.maxChannelMemorySize = maxChannelMemorySize;
|
||||||
|
this.maxTotalMemorySize = maxTotalMemorySize;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user