diff --git a/src/main/java/org/jboss/netty/handler/execution/OrderedMemoryAwareThreadPoolExecutor.java b/src/main/java/org/jboss/netty/handler/execution/OrderedMemoryAwareThreadPoolExecutor.java index bec1c71475..bfdd9fe28a 100644 --- a/src/main/java/org/jboss/netty/handler/execution/OrderedMemoryAwareThreadPoolExecutor.java +++ b/src/main/java/org/jboss/netty/handler/execution/OrderedMemoryAwareThreadPoolExecutor.java @@ -79,8 +79,7 @@ import org.jboss.netty.util.internal.ConcurrentIdentityWeakKeyHashMap; public class OrderedMemoryAwareThreadPoolExecutor extends MemoryAwareThreadPoolExecutor { - private final ConcurrentMap childExecutors = - new ConcurrentIdentityWeakKeyHashMap(); + private final ConcurrentMap childExecutors = newChildExecutorMap(); /** * Creates a new instance. @@ -154,6 +153,18 @@ public class OrderedMemoryAwareThreadPoolExecutor extends keepAliveTime, unit, objectSizeEstimator, threadFactory); } + protected ConcurrentMap newChildExecutorMap() { + return new ConcurrentIdentityWeakKeyHashMap(); + } + + protected Object getChildExecutorKey(ChannelEvent e) { + return e.getChannel(); + } + + protected boolean removeChildExecutor(Object key) { + return childExecutors.remove(key) != null; + } + /** * Executes the specified task concurrently while maintaining the event * order. @@ -164,16 +175,16 @@ public class OrderedMemoryAwareThreadPoolExecutor extends doUnorderedExecute(task); } else { ChannelEventRunnable r = (ChannelEventRunnable) task; - getOrderedExecutor(r.getEvent()).execute(task); + getChildExecutor(r.getEvent()).execute(task); } } - private Executor getOrderedExecutor(ChannelEvent e) { - Channel channel = e.getChannel(); - Executor executor = childExecutors.get(channel); + private Executor getChildExecutor(ChannelEvent e) { + Object key = getChildExecutorKey(e); + Executor executor = childExecutors.get(key); if (executor == null) { executor = new ChildExecutor(); - Executor oldExecutor = childExecutors.putIfAbsent(channel, executor); + Executor oldExecutor = childExecutors.putIfAbsent(key, executor); if (oldExecutor != null) { executor = oldExecutor; } @@ -181,6 +192,7 @@ public class OrderedMemoryAwareThreadPoolExecutor extends // Remove the entry when the channel closes. if (e instanceof ChannelStateEvent) { + Channel channel = e.getChannel(); ChannelStateEvent se = (ChannelStateEvent) e; if (se.getState() == ChannelState.OPEN && !channel.isOpen()) {