Release ChildExecutor after the channel was closed. See #173

This commit is contained in:
Norman Maurer 2012-03-01 21:36:34 +01:00
parent 68f9c7a5f3
commit d8021fc6a8
2 changed files with 29 additions and 3 deletions

View File

@ -17,6 +17,8 @@ package io.netty.handler.execution;
import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@ -136,4 +138,28 @@ public final class OrderedDownstreamThreadPoolExecutor extends OrderedMemoryAwar
doExecute(command);
}
@Override
protected Executor getChildExecutor(ChannelEvent e) {
final Object key = getChildExecutorKey(e);
Executor executor = childExecutors.get(key);
if (executor == null) {
executor = new ChildExecutor();
Executor oldExecutor = childExecutors.putIfAbsent(key, executor);
if (oldExecutor != null) {
executor = oldExecutor;
} else {
// register a listener so that the ChildExecutor will get removed once the channel was closed
e.getChannel().getCloseFuture().addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
removeChildExecutor(key);
}
});
}
}
return executor;
}
}

View File

@ -136,7 +136,7 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
// TODO Make OMATPE focus on the case where Channel is the key.
// Add a new less-efficient TPE that allows custom key.
private final ConcurrentMap<Object, Executor> childExecutors = newChildExecutorMap();
protected final ConcurrentMap<Object, Executor> childExecutors = newChildExecutorMap();
/**
* Creates a new instance.
@ -242,7 +242,7 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
}
}
private Executor getChildExecutor(ChannelEvent e) {
protected Executor getChildExecutor(ChannelEvent e) {
Object key = getChildExecutorKey(e);
Executor executor = childExecutors.get(key);
if (executor == null) {
@ -278,7 +278,7 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
afterExecute(r, t);
}
private final class ChildExecutor implements Executor, Runnable {
protected final class ChildExecutor implements Executor, Runnable {
private final Queue<Runnable> tasks = QueueFactory.createQueue(Runnable.class);
private final AtomicBoolean isRunning = new AtomicBoolean();