diff --git a/src/main/java/org/jboss/netty/handler/execution/OrderedMemoryAwareThreadPoolExecutor.java b/src/main/java/org/jboss/netty/handler/execution/OrderedMemoryAwareThreadPoolExecutor.java index 6d03fb029c..f2729d542f 100644 --- a/src/main/java/org/jboss/netty/handler/execution/OrderedMemoryAwareThreadPoolExecutor.java +++ b/src/main/java/org/jboss/netty/handler/execution/OrderedMemoryAwareThreadPoolExecutor.java @@ -16,13 +16,14 @@ package org.jboss.netty.handler.execution; import java.util.IdentityHashMap; -import java.util.LinkedList; import java.util.Set; import java.util.WeakHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelEvent; @@ -284,51 +285,57 @@ public class OrderedMemoryAwareThreadPoolExecutor extends } private final class ChildExecutor implements Executor, Runnable { - private final LinkedList tasks = new LinkedList(); - + private final ConcurrentLinkedQueue tasks = new ConcurrentLinkedQueue(); + private final AtomicBoolean isRunning = new AtomicBoolean(false); + ChildExecutor() { } @Override public void execute(Runnable command) { - boolean needsExecution; - synchronized (tasks) { - needsExecution = tasks.isEmpty(); - tasks.add(command); - } + tasks.add(command); + - if (needsExecution) { + if (isRunning.get() == false) { doUnorderedExecute(this); } } @Override public void run() { - Thread thread = Thread.currentThread(); - for (;;) { - final Runnable task; - synchronized (tasks) { - task = tasks.getFirst(); - } - - boolean ran = false; - beforeExecute(thread, task); + // check if its already running by using CAS. If so just return here. So in the worst case the thread + // is executed and do nothing + if (isRunning.compareAndSet(false, true)) { try { - task.run(); - ran = true; - onAfterExecute(task, null); - } catch (RuntimeException e) { - if (!ran) { - onAfterExecute(task, e); - } - throw e; - } finally { - synchronized (tasks) { - tasks.removeFirst(); - if (tasks.isEmpty()) { + Thread thread = Thread.currentThread(); + for (;;) { + final Runnable task = tasks.poll(); + // this should never happen but just in case check if + // the queue was empty + if (task == null) { break; } + + boolean ran = false; + beforeExecute(thread, task); + try { + task.run(); + ran = true; + onAfterExecute(task, null); + } catch (RuntimeException e) { + if (!ran) { + onAfterExecute(task, e); + } + throw e; + } finally { + if (tasks.isEmpty()) { + break; + } + } } + } finally { + // set it back to not running + isRunning.set(false); } } }