diff --git a/src/main/java/org/jboss/netty/handler/execution/OrderedDownstreamThreadPoolExecutor.java b/src/main/java/org/jboss/netty/handler/execution/OrderedDownstreamThreadPoolExecutor.java index 1a8db5b9ab..9b462f67e0 100644 --- a/src/main/java/org/jboss/netty/handler/execution/OrderedDownstreamThreadPoolExecutor.java +++ b/src/main/java/org/jboss/netty/handler/execution/OrderedDownstreamThreadPoolExecutor.java @@ -17,6 +17,8 @@ package org.jboss.netty.handler.execution; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelEvent; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; import org.jboss.netty.util.ObjectSizeEstimator; import java.util.concurrent.Executor; @@ -138,4 +140,29 @@ public final class OrderedDownstreamThreadPoolExecutor extends OrderedMemoryAwar doExecute(command); } + @Override + protected Executor getChildExecutor(ChannelEvent e) { + final Object key = getChildExecutorKey(e); + Executor executor = childExecutors.get(key); + if (executor == null) { + executor = new ChildExecutor(); + Executor oldExecutor = childExecutors.putIfAbsent(key, executor); + if (oldExecutor != null) { + executor = oldExecutor; + } else { + + // register a listener so that the ChildExecutor will get removed once the channel was closed + e.getChannel().getCloseFuture().addListener(new ChannelFutureListener() { + + public void operationComplete(ChannelFuture future) throws Exception { + removeChildExecutor(key); + } + }); + } + } + + return executor; + } + + } diff --git a/src/main/java/org/jboss/netty/handler/execution/OrderedMemoryAwareThreadPoolExecutor.java b/src/main/java/org/jboss/netty/handler/execution/OrderedMemoryAwareThreadPoolExecutor.java index 6995b3bfb3..16a7c930e6 100644 --- a/src/main/java/org/jboss/netty/handler/execution/OrderedMemoryAwareThreadPoolExecutor.java +++ b/src/main/java/org/jboss/netty/handler/execution/OrderedMemoryAwareThreadPoolExecutor.java @@ -137,7 +137,7 @@ public class OrderedMemoryAwareThreadPoolExecutor extends // TODO Make OMATPE focus on the case where Channel is the key. // Add a new less-efficient TPE that allows custom key. - private final ConcurrentMap childExecutors = newChildExecutorMap(); + protected final ConcurrentMap childExecutors = newChildExecutorMap(); /** * Creates a new instance. @@ -243,7 +243,7 @@ public class OrderedMemoryAwareThreadPoolExecutor extends } } - private Executor getChildExecutor(ChannelEvent e) { + protected Executor getChildExecutor(ChannelEvent e) { Object key = getChildExecutorKey(e); Executor executor = childExecutors.get(key); if (executor == null) { @@ -279,7 +279,7 @@ public class OrderedMemoryAwareThreadPoolExecutor extends afterExecute(r, t); } - private final class ChildExecutor implements Executor, Runnable { + protected final class ChildExecutor implements Executor, Runnable { private final Queue tasks = QueueFactory.createQueue(Runnable.class); private final AtomicBoolean isRunning = new AtomicBoolean();