Made OrderedMemoryAwareThreadPoolExecutor extensible so that a user can use a different key to maintain the event order

This commit is contained in:
Trustin Lee 2009-07-21 09:56:27 +00:00
parent ca19f4cdde
commit 23bece222a

View File

@ -79,8 +79,7 @@ import org.jboss.netty.util.internal.ConcurrentIdentityWeakKeyHashMap;
public class OrderedMemoryAwareThreadPoolExecutor extends public class OrderedMemoryAwareThreadPoolExecutor extends
MemoryAwareThreadPoolExecutor { MemoryAwareThreadPoolExecutor {
private final ConcurrentMap<Channel, Executor> childExecutors = private final ConcurrentMap<Object, Executor> childExecutors = newChildExecutorMap();
new ConcurrentIdentityWeakKeyHashMap<Channel, Executor>();
/** /**
* Creates a new instance. * Creates a new instance.
@ -154,6 +153,18 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
keepAliveTime, unit, objectSizeEstimator, threadFactory); keepAliveTime, unit, objectSizeEstimator, threadFactory);
} }
protected ConcurrentMap<Object, Executor> newChildExecutorMap() {
return new ConcurrentIdentityWeakKeyHashMap<Object, Executor>();
}
protected Object getChildExecutorKey(ChannelEvent e) {
return e.getChannel();
}
protected boolean removeChildExecutor(Object key) {
return childExecutors.remove(key) != null;
}
/** /**
* Executes the specified task concurrently while maintaining the event * Executes the specified task concurrently while maintaining the event
* order. * order.
@ -164,16 +175,16 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
doUnorderedExecute(task); doUnorderedExecute(task);
} else { } else {
ChannelEventRunnable r = (ChannelEventRunnable) task; ChannelEventRunnable r = (ChannelEventRunnable) task;
getOrderedExecutor(r.getEvent()).execute(task); getChildExecutor(r.getEvent()).execute(task);
} }
} }
private Executor getOrderedExecutor(ChannelEvent e) { private Executor getChildExecutor(ChannelEvent e) {
Channel channel = e.getChannel(); Object key = getChildExecutorKey(e);
Executor executor = childExecutors.get(channel); Executor executor = childExecutors.get(key);
if (executor == null) { if (executor == null) {
executor = new ChildExecutor(); executor = new ChildExecutor();
Executor oldExecutor = childExecutors.putIfAbsent(channel, executor); Executor oldExecutor = childExecutors.putIfAbsent(key, executor);
if (oldExecutor != null) { if (oldExecutor != null) {
executor = oldExecutor; executor = oldExecutor;
} }
@ -181,6 +192,7 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
// Remove the entry when the channel closes. // Remove the entry when the channel closes.
if (e instanceof ChannelStateEvent) { if (e instanceof ChannelStateEvent) {
Channel channel = e.getChannel();
ChannelStateEvent se = (ChannelStateEvent) e; ChannelStateEvent se = (ChannelStateEvent) e;
if (se.getState() == ChannelState.OPEN && if (se.getState() == ChannelState.OPEN &&
!channel.isOpen()) { !channel.isOpen()) {