Revert "Make OrderedMemoryAwareThreadPoolExecutor lock free"
This reverts commit caa925198e8eca352d5b679e38311d6c3ba33aef.
This commit is contained in:
parent
24f5379ee4
commit
6b7b822f72
@ -16,14 +16,13 @@
|
|||||||
package org.jboss.netty.handler.execution;
|
package org.jboss.netty.handler.execution;
|
||||||
|
|
||||||
import java.util.IdentityHashMap;
|
import java.util.IdentityHashMap;
|
||||||
|
import java.util.LinkedList;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.WeakHashMap;
|
import java.util.WeakHashMap;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
|
|
||||||
import org.jboss.netty.channel.Channel;
|
import org.jboss.netty.channel.Channel;
|
||||||
import org.jboss.netty.channel.ChannelEvent;
|
import org.jboss.netty.channel.ChannelEvent;
|
||||||
@ -285,57 +284,51 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
|
|||||||
}
|
}
|
||||||
|
|
||||||
private final class ChildExecutor implements Executor, Runnable {
|
private final class ChildExecutor implements Executor, Runnable {
|
||||||
private final ConcurrentLinkedQueue<Runnable> tasks = new ConcurrentLinkedQueue<Runnable>();
|
private final LinkedList<Runnable> tasks = new LinkedList<Runnable>();
|
||||||
private final AtomicBoolean isRunning = new AtomicBoolean(false);
|
|
||||||
|
|
||||||
ChildExecutor() {
|
ChildExecutor() {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void execute(Runnable command) {
|
public void execute(Runnable command) {
|
||||||
tasks.add(command);
|
boolean needsExecution;
|
||||||
|
synchronized (tasks) {
|
||||||
|
needsExecution = tasks.isEmpty();
|
||||||
|
tasks.add(command);
|
||||||
|
}
|
||||||
|
|
||||||
if (isRunning.get() == false) {
|
if (needsExecution) {
|
||||||
doUnorderedExecute(this);
|
doUnorderedExecute(this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
// check if its already running by using CAS. If so just return here. So in the worst case the thread
|
Thread thread = Thread.currentThread();
|
||||||
// is executed and do nothing
|
for (;;) {
|
||||||
if (isRunning.compareAndSet(false, true)) {
|
final Runnable task;
|
||||||
|
synchronized (tasks) {
|
||||||
|
task = tasks.getFirst();
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean ran = false;
|
||||||
|
beforeExecute(thread, task);
|
||||||
try {
|
try {
|
||||||
Thread thread = Thread.currentThread();
|
task.run();
|
||||||
for (;;) {
|
ran = true;
|
||||||
final Runnable task = tasks.poll();
|
onAfterExecute(task, null);
|
||||||
// this should never happen but just in case check if
|
} catch (RuntimeException e) {
|
||||||
// the queue was empty
|
if (!ran) {
|
||||||
if (task == null) {
|
onAfterExecute(task, e);
|
||||||
|
}
|
||||||
|
throw e;
|
||||||
|
} finally {
|
||||||
|
synchronized (tasks) {
|
||||||
|
tasks.removeFirst();
|
||||||
|
if (tasks.isEmpty()) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean ran = false;
|
|
||||||
beforeExecute(thread, task);
|
|
||||||
try {
|
|
||||||
task.run();
|
|
||||||
ran = true;
|
|
||||||
onAfterExecute(task, null);
|
|
||||||
} catch (RuntimeException e) {
|
|
||||||
if (!ran) {
|
|
||||||
onAfterExecute(task, e);
|
|
||||||
}
|
|
||||||
throw e;
|
|
||||||
} finally {
|
|
||||||
if (tasks.isEmpty()) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} finally {
|
|
||||||
// set it back to not running
|
|
||||||
isRunning.set(false);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user