[#2307] Remove synchronized bottleneck in SingleThreadEventExecutor.execute(...)

Motivation:
Remove the synchronization bottleneck in startThread() which is called by each execute(..) call from outside the EventLoop.

Modifications:
Replace the synchronized block with the use of AtomicInteger and compareAndSet loops.

Result:
Less conditions during SingleThreadEventExecutor.execute(...)
This commit is contained in:
Norman Maurer 2014-03-13 08:01:42 +01:00
parent 195e4b1fa6
commit 16a85e6cca

View File

@ -15,6 +15,7 @@
*/ */
package io.netty.util.concurrent; package io.netty.util.concurrent;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -34,6 +35,7 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
/** /**
* Abstract base class for {@link EventExecutor}'s that execute all its submitted tasks in a single thread. * Abstract base class for {@link EventExecutor}'s that execute all its submitted tasks in a single thread.
@ -57,6 +59,17 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
} }
}; };
private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER;
static {
AtomicIntegerFieldUpdater<SingleThreadEventExecutor> updater =
PlatformDependent.newAtomicIntegerFieldUpdater(SingleThreadEventExecutor.class, "state");
if (updater == null) {
updater = AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
}
STATE_UPDATER = updater;
}
private final EventExecutorGroup parent; private final EventExecutorGroup parent;
private final Queue<Runnable> taskQueue; private final Queue<Runnable> taskQueue;
final Queue<ScheduledFutureTask<?>> delayedTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>(); final Queue<ScheduledFutureTask<?>> delayedTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
@ -64,13 +77,15 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
private volatile Thread thread; private volatile Thread thread;
private final Executor executor; private final Executor executor;
private volatile boolean interrupted; private volatile boolean interrupted;
private final Object stateLock = new Object();
private final Semaphore threadLock = new Semaphore(0); private final Semaphore threadLock = new Semaphore(0);
private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>(); private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
private final boolean addTaskWakesUp; private final boolean addTaskWakesUp;
private long lastExecutionTime; private long lastExecutionTime;
@SuppressWarnings({ "FieldMayBeFinal", "unused" })
private volatile int state = ST_NOT_STARTED; private volatile int state = ST_NOT_STARTED;
private volatile long gracefulShutdownQuietPeriod; private volatile long gracefulShutdownQuietPeriod;
private volatile long gracefulShutdownTimeout; private volatile long gracefulShutdownTimeout;
private long gracefulShutdownStartTime; private long gracefulShutdownStartTime;
@ -108,7 +123,6 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
this.parent = parent; this.parent = parent;
this.addTaskWakesUp = addTaskWakesUp; this.addTaskWakesUp = addTaskWakesUp;
this.executor = executor; this.executor = executor;
taskQueue = newTaskQueue(); taskQueue = newTaskQueue();
} }
@ -385,7 +399,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
} }
protected void wakeup(boolean inEventLoop) { protected void wakeup(boolean inEventLoop) {
if (!inEventLoop || state == ST_SHUTTING_DOWN) { if (!inEventLoop || STATE_UPDATER.get(this) == ST_SHUTTING_DOWN) {
taskQueue.add(WAKEUP_TASK); taskQueue.add(WAKEUP_TASK);
} }
} }
@ -469,32 +483,37 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
} }
boolean inEventLoop = inEventLoop(); boolean inEventLoop = inEventLoop();
boolean wakeup = true; boolean wakeup;
int oldState;
synchronized (stateLock) { for (;;) {
if (isShuttingDown()) { if (isShuttingDown()) {
return terminationFuture(); return terminationFuture();
} }
int newState;
gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod); wakeup = true;
gracefulShutdownTimeout = unit.toNanos(timeout); oldState = STATE_UPDATER.get(this);
if (inEventLoop) { if (inEventLoop) {
assert state == ST_STARTED; newState = ST_SHUTTING_DOWN;
state = ST_SHUTTING_DOWN;
} else { } else {
switch (state) { switch (oldState) {
case ST_NOT_STARTED: case ST_NOT_STARTED:
state = ST_SHUTTING_DOWN;
doStartThread();
break;
case ST_STARTED: case ST_STARTED:
state = ST_SHUTTING_DOWN; newState = ST_SHUTTING_DOWN;
break; break;
default: default:
newState = oldState;
wakeup = false; wakeup = false;
} }
} }
if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
break;
}
}
gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
gracefulShutdownTimeout = unit.toNanos(timeout);
if (oldState == ST_NOT_STARTED) {
doStartThread();
} }
if (wakeup) { if (wakeup) {
@ -517,30 +536,36 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
} }
boolean inEventLoop = inEventLoop(); boolean inEventLoop = inEventLoop();
boolean wakeup = true; boolean wakeup;
int oldState;
synchronized (stateLock) { for (;;) {
if (isShutdown()) { if (isShuttingDown()) {
return; return;
} }
int newState;
wakeup = true;
oldState = STATE_UPDATER.get(this);
if (inEventLoop) { if (inEventLoop) {
assert state == ST_STARTED || state == ST_SHUTTING_DOWN; newState = ST_SHUTDOWN;
state = ST_SHUTDOWN;
} else { } else {
switch (state) { switch (oldState) {
case ST_NOT_STARTED: case ST_NOT_STARTED:
state = ST_SHUTDOWN;
doStartThread();
break;
case ST_STARTED: case ST_STARTED:
case ST_SHUTTING_DOWN: case ST_SHUTTING_DOWN:
state = ST_SHUTDOWN; newState = ST_SHUTDOWN;
break; break;
default: default:
newState = oldState;
wakeup = false; wakeup = false;
} }
} }
if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
break;
}
}
if (oldState == ST_NOT_STARTED) {
doStartThread();
} }
if (wakeup) { if (wakeup) {
@ -550,17 +575,17 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
@Override @Override
public boolean isShuttingDown() { public boolean isShuttingDown() {
return state >= ST_SHUTTING_DOWN; return STATE_UPDATER.get(this) >= ST_SHUTTING_DOWN;
} }
@Override @Override
public boolean isShutdown() { public boolean isShutdown() {
return state >= ST_SHUTDOWN; return STATE_UPDATER.get(this) >= ST_SHUTDOWN;
} }
@Override @Override
public boolean isTerminated() { public boolean isTerminated() {
return state == ST_TERMINATED; return STATE_UPDATER.get(this) == ST_TERMINATED;
} }
/** /**
@ -779,9 +804,8 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
} }
private void startThread() { private void startThread() {
synchronized (stateLock) { if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (state == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
state = ST_STARTED;
delayedTaskQueue.add(new ScheduledFutureTask<Void>( delayedTaskQueue.add(new ScheduledFutureTask<Void>(
this, delayedTaskQueue, Executors.<Void>callable(new PurgeTask(), null), this, delayedTaskQueue, Executors.<Void>callable(new PurgeTask(), null),
ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL)); ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
@ -808,8 +832,12 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
} catch (Throwable t) { } catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t); logger.warn("Unexpected exception from an event executor: ", t);
} finally { } finally {
if (state < ST_SHUTTING_DOWN) { for (;;) {
state = ST_SHUTTING_DOWN; int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
} }
// Check if confirmShutdown() was called at the end of the loop. // Check if confirmShutdown() was called at the end of the loop.
@ -830,9 +858,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
try { try {
cleanup(); cleanup();
} finally { } finally {
synchronized (stateLock) { STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
state = ST_TERMINATED;
}
threadLock.release(); threadLock.release();
if (!taskQueue.isEmpty()) { if (!taskQueue.isEmpty()) {
logger.warn( logger.warn(