Release ChildExecutor after the channel was closed. See #173
This commit is contained in:
parent
14d5133b22
commit
b7cbf3729b
@ -17,6 +17,8 @@ package org.jboss.netty.handler.execution;
|
||||
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelEvent;
|
||||
import org.jboss.netty.channel.ChannelFuture;
|
||||
import org.jboss.netty.channel.ChannelFutureListener;
|
||||
import org.jboss.netty.util.ObjectSizeEstimator;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
@ -138,4 +140,29 @@ public final class OrderedDownstreamThreadPoolExecutor extends OrderedMemoryAwar
|
||||
doExecute(command);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Executor getChildExecutor(ChannelEvent e) {
|
||||
final Object key = getChildExecutorKey(e);
|
||||
Executor executor = childExecutors.get(key);
|
||||
if (executor == null) {
|
||||
executor = new ChildExecutor();
|
||||
Executor oldExecutor = childExecutors.putIfAbsent(key, executor);
|
||||
if (oldExecutor != null) {
|
||||
executor = oldExecutor;
|
||||
} else {
|
||||
|
||||
// register a listener so that the ChildExecutor will get removed once the channel was closed
|
||||
e.getChannel().getCloseFuture().addListener(new ChannelFutureListener() {
|
||||
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
removeChildExecutor(key);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return executor;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -137,7 +137,7 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
|
||||
// TODO Make OMATPE focus on the case where Channel is the key.
|
||||
// Add a new less-efficient TPE that allows custom key.
|
||||
|
||||
private final ConcurrentMap<Object, Executor> childExecutors = newChildExecutorMap();
|
||||
protected final ConcurrentMap<Object, Executor> childExecutors = newChildExecutorMap();
|
||||
|
||||
/**
|
||||
* Creates a new instance.
|
||||
@ -243,7 +243,7 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
|
||||
}
|
||||
}
|
||||
|
||||
private Executor getChildExecutor(ChannelEvent e) {
|
||||
protected Executor getChildExecutor(ChannelEvent e) {
|
||||
Object key = getChildExecutorKey(e);
|
||||
Executor executor = childExecutors.get(key);
|
||||
if (executor == null) {
|
||||
@ -279,7 +279,7 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
|
||||
afterExecute(r, t);
|
||||
}
|
||||
|
||||
private final class ChildExecutor implements Executor, Runnable {
|
||||
protected final class ChildExecutor implements Executor, Runnable {
|
||||
private final Queue<Runnable> tasks = QueueFactory.createQueue(Runnable.class);
|
||||
private final AtomicBoolean isRunning = new AtomicBoolean();
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user