Merge pull request #80 from netty/OMATPE-refactoring
Replace synchronization with an lock free approach
This commit is contained in:
commit
73b89a2b1f
@ -16,13 +16,14 @@
|
||||
package org.jboss.netty.handler.execution;
|
||||
|
||||
import java.util.IdentityHashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.WeakHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelEvent;
|
||||
@ -30,6 +31,7 @@ import org.jboss.netty.channel.ChannelState;
|
||||
import org.jboss.netty.channel.ChannelStateEvent;
|
||||
import org.jboss.netty.util.ObjectSizeEstimator;
|
||||
import org.jboss.netty.util.internal.ConcurrentIdentityWeakKeyHashMap;
|
||||
import org.jboss.netty.util.internal.LinkedTransferQueue;
|
||||
|
||||
/**
|
||||
* A {@link MemoryAwareThreadPoolExecutor} which makes sure the events from the
|
||||
@ -284,51 +286,53 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
|
||||
}
|
||||
|
||||
private final class ChildExecutor implements Executor, Runnable {
|
||||
private final LinkedList<Runnable> tasks = new LinkedList<Runnable>();
|
||||
|
||||
private final Queue<Runnable> tasks = new LinkedTransferQueue<Runnable>();
|
||||
private final AtomicBoolean isRunning = new AtomicBoolean(false);
|
||||
|
||||
ChildExecutor() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Runnable command) {
|
||||
boolean needsExecution;
|
||||
synchronized (tasks) {
|
||||
needsExecution = tasks.isEmpty();
|
||||
tasks.add(command);
|
||||
}
|
||||
// TODO: What todo if the add return false ?
|
||||
tasks.add(command);
|
||||
|
||||
|
||||
if (needsExecution) {
|
||||
if (isRunning.get() == false) {
|
||||
doUnorderedExecute(this);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
Thread thread = Thread.currentThread();
|
||||
for (;;) {
|
||||
final Runnable task;
|
||||
synchronized (tasks) {
|
||||
task = tasks.getFirst();
|
||||
}
|
||||
|
||||
boolean ran = false;
|
||||
beforeExecute(thread, task);
|
||||
// check if its already running by using CAS. If so just return here. So in the worst case the thread
|
||||
// is executed and do nothing
|
||||
if (isRunning.compareAndSet(false, true)) {
|
||||
try {
|
||||
task.run();
|
||||
ran = true;
|
||||
onAfterExecute(task, null);
|
||||
} catch (RuntimeException e) {
|
||||
if (!ran) {
|
||||
onAfterExecute(task, e);
|
||||
}
|
||||
throw e;
|
||||
} finally {
|
||||
synchronized (tasks) {
|
||||
tasks.removeFirst();
|
||||
if (tasks.isEmpty()) {
|
||||
Thread thread = Thread.currentThread();
|
||||
for (;;) {
|
||||
final Runnable task = tasks.poll();
|
||||
// if the task is null we should exit the loop
|
||||
if (task == null) {
|
||||
break;
|
||||
}
|
||||
|
||||
boolean ran = false;
|
||||
beforeExecute(thread, task);
|
||||
try {
|
||||
task.run();
|
||||
ran = true;
|
||||
onAfterExecute(task, null);
|
||||
} catch (RuntimeException e) {
|
||||
if (!ran) {
|
||||
onAfterExecute(task, e);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
// set it back to not running
|
||||
isRunning.set(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user