Replace synchronization with an lock free approach in OMATPE. See #80

This commit is contained in:
norman 2011-12-01 12:13:18 +01:00
parent 49f2616972
commit 77cbd3de34

View File

@ -16,13 +16,14 @@
package org.jboss.netty.handler.execution; package org.jboss.netty.handler.execution;
import java.util.IdentityHashMap; import java.util.IdentityHashMap;
import java.util.LinkedList; import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.WeakHashMap; import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelEvent;
@ -30,6 +31,7 @@ import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.util.ObjectSizeEstimator; import org.jboss.netty.util.ObjectSizeEstimator;
import org.jboss.netty.util.internal.ConcurrentIdentityWeakKeyHashMap; import org.jboss.netty.util.internal.ConcurrentIdentityWeakKeyHashMap;
import org.jboss.netty.util.internal.LinkedTransferQueue;
/** /**
* A {@link MemoryAwareThreadPoolExecutor} which makes sure the events from the * A {@link MemoryAwareThreadPoolExecutor} which makes sure the events from the
@ -284,30 +286,35 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
} }
private final class ChildExecutor implements Executor, Runnable { private final class ChildExecutor implements Executor, Runnable {
private final LinkedList<Runnable> tasks = new LinkedList<Runnable>(); private final Queue<Runnable> tasks = new LinkedTransferQueue<Runnable>();
private final AtomicBoolean isRunning = new AtomicBoolean(false);
ChildExecutor() { ChildExecutor() {
super();
} }
@Override
public void execute(Runnable command) { public void execute(Runnable command) {
boolean needsExecution; // TODO: What todo if the add return false ?
synchronized (tasks) {
needsExecution = tasks.isEmpty();
tasks.add(command); tasks.add(command);
}
if (needsExecution) {
if (isRunning.get() == false) {
doUnorderedExecute(this); doUnorderedExecute(this);
} }
} }
@Override
public void run() { public void run() {
// check if its already running by using CAS. If so just return here. So in the worst case the thread
// is executed and do nothing
if (isRunning.compareAndSet(false, true)) {
try {
Thread thread = Thread.currentThread(); Thread thread = Thread.currentThread();
for (;;) { for (;;) {
final Runnable task; final Runnable task = tasks.poll();
synchronized (tasks) { // if the task is null we should exit the loop
task = tasks.getFirst(); if (task == null) {
break;
} }
boolean ran = false; boolean ran = false;
@ -321,13 +328,11 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
onAfterExecute(task, e); onAfterExecute(task, e);
} }
throw e; throw e;
}
}
} finally { } finally {
synchronized (tasks) { // set it back to not running
tasks.removeFirst(); isRunning.set(false);
if (tasks.isEmpty()) {
break;
}
}
} }
} }
} }