Fixed a bug where per-channel memory limitation didn't work correctly

This commit is contained in:
Trustin Lee 2008-09-30 01:49:22 +00:00
parent 8c15102bc3
commit 85dc8f93f5
2 changed files with 30 additions and 8 deletions

View File

@ -279,8 +279,12 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
decreaseCounter(r); decreaseCounter(r);
} }
private boolean increaseCounter(Runnable task) { @Override
if (isInterestOpsEvent(task)) { protected void afterExecute(Runnable r, Throwable e) {
super.afterExecute(r, e);
}
protected boolean increaseCounter(Runnable task) {
if (!shouldCount(task)) {
return false; return false;
} }
@ -296,8 +300,10 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
eventTask.estimatedSize = increment; eventTask.estimatedSize = increment;
Channel channel = eventTask.getEvent().getChannel(); Channel channel = eventTask.getEvent().getChannel();
long channelCounter = getChannelCounter(channel).addAndGet(increment); long channelCounter = getChannelCounter(channel).addAndGet(increment);
//System.out.println("IC: " + channelCounter + ", " + increment);
if (maxChannelMemorySize != 0 && channelCounter >= maxChannelMemorySize && channel.isOpen()) { if (maxChannelMemorySize != 0 && channelCounter >= maxChannelMemorySize && channel.isOpen()) {
if (channel.isReadable()) { if (channel.isReadable()) {
//System.out.println("UNREADABLE");
channel.setReadable(false); channel.setReadable(false);
} }
} }
@ -307,8 +313,8 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
return maxTotalMemorySize != 0 && totalCounter >= maxTotalMemorySize; return maxTotalMemorySize != 0 && totalCounter >= maxTotalMemorySize;
} }
private void decreaseCounter(Runnable task) { protected void decreaseCounter(Runnable task) {
if (isInterestOpsEvent(task)) { if (!shouldCount(task)) {
return; return;
} }
@ -334,8 +340,10 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
if (task instanceof ChannelEventRunnable) { if (task instanceof ChannelEventRunnable) {
Channel channel = ((ChannelEventRunnable) task).getEvent().getChannel(); Channel channel = ((ChannelEventRunnable) task).getEvent().getChannel();
long channelCounter = getChannelCounter(channel).addAndGet(-increment); long channelCounter = getChannelCounter(channel).addAndGet(-increment);
if (maxChannelMemorySize != 0 && channelCounter + increment >= maxChannelMemorySize && channel.isOpen()) { //System.out.println("DC: " + channelCounter + ", " + increment);
if (maxChannelMemorySize != 0 && channelCounter < maxChannelMemorySize && channel.isOpen()) {
if (!channel.isReadable()) { if (!channel.isReadable()) {
//System.out.println("READABLE");
channel.setReadable(true); channel.setReadable(true);
} }
} }
@ -359,17 +367,21 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
return counter; return counter;
} }
private static boolean isInterestOpsEvent(Runnable task) { private static boolean shouldCount(Runnable task) {
if (task instanceof Executor) {
return false;
}
if (task instanceof ChannelEventRunnable) { if (task instanceof ChannelEventRunnable) {
ChannelEventRunnable r = (ChannelEventRunnable) task; ChannelEventRunnable r = (ChannelEventRunnable) task;
if (r.getEvent() instanceof ChannelStateEvent) { if (r.getEvent() instanceof ChannelStateEvent) {
ChannelStateEvent e = (ChannelStateEvent) r.getEvent(); ChannelStateEvent e = (ChannelStateEvent) r.getEvent();
if (e.getState() == ChannelState.INTEREST_OPS) { if (e.getState() == ChannelState.INTEREST_OPS) {
return true; return false;
} }
} }
} }
return false; return true;
} }
private static class Settings { private static class Settings {

View File

@ -188,14 +188,24 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
} }
public void run() { public void run() {
Thread thread = Thread.currentThread();
for (;;) { for (;;) {
final Runnable task; final Runnable task;
synchronized (tasks) { synchronized (tasks) {
task = tasks.getFirst(); task = tasks.getFirst();
} }
boolean ran = false;
OrderedMemoryAwareThreadPoolExecutor.this.beforeExecute(thread, task);
try { try {
task.run(); task.run();
ran = true;
OrderedMemoryAwareThreadPoolExecutor.this.afterExecute(task, null);
} catch (RuntimeException e) {
if (!ran) {
OrderedMemoryAwareThreadPoolExecutor.this.afterExecute(task, e);
}
throw e;
} finally { } finally {
synchronized (tasks) { synchronized (tasks) {
tasks.removeFirst(); tasks.removeFirst();