[#2307] Remove synchronized bottleneck in SingleThreadEventExecutor.execute(...)

Motivation:
Remove the synchronization bottleneck in startThread() which is called by each execute(..) call from outside the EventLoop.

Modifications:
Replace the synchronized block with the use of AtomicInteger and compareAndSet loops.

Result:
Less conditions during SingleThreadEventExecutor.execute(...)
This commit is contained in:
Norman Maurer 2014-03-13 08:01:42 +01:00
parent d4d2085377
commit 92037e8bea

View File

@ -15,6 +15,7 @@
*/
package io.netty.util.concurrent;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -33,6 +34,7 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
/**
* Abstract base class for {@link EventExecutor}'s that execute all its submitted tasks in a single thread.
@ -56,18 +58,31 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
}
};
private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER;
static {
AtomicIntegerFieldUpdater<SingleThreadEventExecutor> updater =
PlatformDependent.newAtomicIntegerFieldUpdater(SingleThreadEventExecutor.class, "state");
if (updater == null) {
updater = AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
}
STATE_UPDATER = updater;
}
private final EventExecutorGroup parent;
private final Queue<Runnable> taskQueue;
final Queue<ScheduledFutureTask<?>> delayedTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
private final Thread thread;
private final Object stateLock = new Object();
private final Semaphore threadLock = new Semaphore(0);
private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
private final boolean addTaskWakesUp;
private long lastExecutionTime;
@SuppressWarnings({ "FieldMayBeFinal", "unused" })
private volatile int state = ST_NOT_STARTED;
private volatile long gracefulShutdownQuietPeriod;
private volatile long gracefulShutdownTimeout;
private long gracefulShutdownStartTime;
@ -103,10 +118,13 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
if (state < ST_SHUTTING_DOWN) {
state = ST_SHUTTING_DOWN;
for (;;) {
int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
logger.error(
@ -126,9 +144,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
try {
cleanup();
} finally {
synchronized (stateLock) {
state = ST_TERMINATED;
}
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.release();
if (!taskQueue.isEmpty()) {
logger.warn(
@ -414,7 +430,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
}
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop || state == ST_SHUTTING_DOWN) {
if (!inEventLoop || STATE_UPDATER.get(this) == ST_SHUTTING_DOWN) {
taskQueue.add(WAKEUP_TASK);
}
}
@ -498,32 +514,37 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
}
boolean inEventLoop = inEventLoop();
boolean wakeup = true;
synchronized (stateLock) {
boolean wakeup;
int oldState;
for (;;) {
if (isShuttingDown()) {
return terminationFuture();
}
gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
gracefulShutdownTimeout = unit.toNanos(timeout);
int newState;
wakeup = true;
oldState = STATE_UPDATER.get(this);
if (inEventLoop) {
assert state == ST_STARTED;
state = ST_SHUTTING_DOWN;
newState = ST_SHUTTING_DOWN;
} else {
switch (state) {
switch (oldState) {
case ST_NOT_STARTED:
state = ST_SHUTTING_DOWN;
thread.start();
break;
case ST_STARTED:
state = ST_SHUTTING_DOWN;
newState = ST_SHUTTING_DOWN;
break;
default:
newState = oldState;
wakeup = false;
}
}
if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
break;
}
}
gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
gracefulShutdownTimeout = unit.toNanos(timeout);
if (oldState == ST_NOT_STARTED) {
thread.start();
}
if (wakeup) {
@ -546,30 +567,36 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
}
boolean inEventLoop = inEventLoop();
boolean wakeup = true;
synchronized (stateLock) {
if (isShutdown()) {
boolean wakeup;
int oldState;
for (;;) {
if (isShuttingDown()) {
return;
}
int newState;
wakeup = true;
oldState = STATE_UPDATER.get(this);
if (inEventLoop) {
assert state == ST_STARTED || state == ST_SHUTTING_DOWN;
state = ST_SHUTDOWN;
newState = ST_SHUTDOWN;
} else {
switch (state) {
switch (oldState) {
case ST_NOT_STARTED:
state = ST_SHUTDOWN;
thread.start();
break;
case ST_STARTED:
case ST_SHUTTING_DOWN:
state = ST_SHUTDOWN;
newState = ST_SHUTDOWN;
break;
default:
newState = oldState;
wakeup = false;
}
}
if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
break;
}
}
if (oldState == ST_NOT_STARTED) {
thread.start();
}
if (wakeup) {
@ -579,17 +606,17 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
@Override
public boolean isShuttingDown() {
return state >= ST_SHUTTING_DOWN;
return STATE_UPDATER.get(this) >= ST_SHUTTING_DOWN;
}
@Override
public boolean isShutdown() {
return state >= ST_SHUTDOWN;
return STATE_UPDATER.get(this) >= ST_SHUTDOWN;
}
@Override
public boolean isTerminated() {
return state == ST_TERMINATED;
return STATE_UPDATER.get(this) == ST_TERMINATED;
}
/**
@ -808,9 +835,8 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
}
private void startThread() {
synchronized (stateLock) {
if (state == ST_NOT_STARTED) {
state = ST_STARTED;
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
delayedTaskQueue.add(new ScheduledFutureTask<Void>(
this, delayedTaskQueue, Executors.<Void>callable(new PurgeTask(), null),
ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));