Replace synchronization with an lock free approach

This commit is contained in:
norman 2011-11-24 11:07:16 +01:00
parent bbd251baed
commit 9f712e3291

View File

@ -16,13 +16,14 @@
package org.jboss.netty.handler.execution; package org.jboss.netty.handler.execution;
import java.util.IdentityHashMap; import java.util.IdentityHashMap;
import java.util.LinkedList;
import java.util.Set; import java.util.Set;
import java.util.WeakHashMap; import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelEvent;
@ -284,51 +285,57 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
} }
private final class ChildExecutor implements Executor, Runnable { private final class ChildExecutor implements Executor, Runnable {
private final LinkedList<Runnable> tasks = new LinkedList<Runnable>(); private final ConcurrentLinkedQueue<Runnable> tasks = new ConcurrentLinkedQueue<Runnable>();
private final AtomicBoolean isRunning = new AtomicBoolean(false);
ChildExecutor() { ChildExecutor() {
} }
@Override @Override
public void execute(Runnable command) { public void execute(Runnable command) {
boolean needsExecution; tasks.add(command);
synchronized (tasks) {
needsExecution = tasks.isEmpty();
tasks.add(command);
}
if (needsExecution) { if (isRunning.get() == false) {
doUnorderedExecute(this); doUnorderedExecute(this);
} }
} }
@Override @Override
public void run() { public void run() {
Thread thread = Thread.currentThread(); // check if its already running by using CAS. If so just return here. So in the worst case the thread
for (;;) { // is executed and do nothing
final Runnable task; if (isRunning.compareAndSet(false, true)) {
synchronized (tasks) {
task = tasks.getFirst();
}
boolean ran = false;
beforeExecute(thread, task);
try { try {
task.run(); Thread thread = Thread.currentThread();
ran = true; for (;;) {
onAfterExecute(task, null); final Runnable task = tasks.poll();
} catch (RuntimeException e) { // this should never happen but just in case check if
if (!ran) { // the queue was empty
onAfterExecute(task, e); if (task == null) {
}
throw e;
} finally {
synchronized (tasks) {
tasks.removeFirst();
if (tasks.isEmpty()) {
break; break;
} }
boolean ran = false;
beforeExecute(thread, task);
try {
task.run();
ran = true;
onAfterExecute(task, null);
} catch (RuntimeException e) {
if (!ran) {
onAfterExecute(task, e);
}
throw e;
} finally {
if (tasks.isEmpty()) {
break;
}
}
} }
} finally {
// set it back to not running
isRunning.set(false);
} }
} }
} }