From 77cbd3de3408764b557af94d40bbc4cdea625426 Mon Sep 17 00:00:00 2001 From: norman Date: Thu, 1 Dec 2011 12:13:18 +0100 Subject: [PATCH] Replace synchronization with an lock free approach in OMATPE. See #80 --- .../OrderedMemoryAwareThreadPoolExecutor.java | 67 ++++++++++--------- 1 file changed, 36 insertions(+), 31 deletions(-) diff --git a/src/main/java/org/jboss/netty/handler/execution/OrderedMemoryAwareThreadPoolExecutor.java b/src/main/java/org/jboss/netty/handler/execution/OrderedMemoryAwareThreadPoolExecutor.java index e71d42e145..776460beb9 100644 --- a/src/main/java/org/jboss/netty/handler/execution/OrderedMemoryAwareThreadPoolExecutor.java +++ b/src/main/java/org/jboss/netty/handler/execution/OrderedMemoryAwareThreadPoolExecutor.java @@ -16,13 +16,14 @@ package org.jboss.netty.handler.execution; import java.util.IdentityHashMap; -import java.util.LinkedList; +import java.util.Queue; import java.util.Set; import java.util.WeakHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelEvent; @@ -30,6 +31,7 @@ import org.jboss.netty.channel.ChannelState; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.util.ObjectSizeEstimator; import org.jboss.netty.util.internal.ConcurrentIdentityWeakKeyHashMap; +import org.jboss.netty.util.internal.LinkedTransferQueue; /** * A {@link MemoryAwareThreadPoolExecutor} which makes sure the events from the @@ -284,50 +286,53 @@ public class OrderedMemoryAwareThreadPoolExecutor extends } private final class ChildExecutor implements Executor, Runnable { - private final LinkedList tasks = new LinkedList(); - + private final Queue tasks = new LinkedTransferQueue(); + private final AtomicBoolean isRunning = new AtomicBoolean(false); + ChildExecutor() { - super(); } + @Override public void execute(Runnable command) { - boolean needsExecution; - synchronized (tasks) { - needsExecution = tasks.isEmpty(); - tasks.add(command); - } + // TODO: What todo if the add return false ? + tasks.add(command); + - if (needsExecution) { + if (isRunning.get() == false) { doUnorderedExecute(this); } } + @Override public void run() { - Thread thread = Thread.currentThread(); - for (;;) { - final Runnable task; - synchronized (tasks) { - task = tasks.getFirst(); - } - - boolean ran = false; - beforeExecute(thread, task); + // check if its already running by using CAS. If so just return here. So in the worst case the thread + // is executed and do nothing + if (isRunning.compareAndSet(false, true)) { try { - task.run(); - ran = true; - onAfterExecute(task, null); - } catch (RuntimeException e) { - if (!ran) { - onAfterExecute(task, e); - } - throw e; - } finally { - synchronized (tasks) { - tasks.removeFirst(); - if (tasks.isEmpty()) { + Thread thread = Thread.currentThread(); + for (;;) { + final Runnable task = tasks.poll(); + // if the task is null we should exit the loop + if (task == null) { break; } + + boolean ran = false; + beforeExecute(thread, task); + try { + task.run(); + ran = true; + onAfterExecute(task, null); + } catch (RuntimeException e) { + if (!ran) { + onAfterExecute(task, e); + } + throw e; + } } + } finally { + // set it back to not running + isRunning.set(false); } } }