[#2307] Remove synchronized bottleneck in SingleThreadEventExecutor.execute(...)
Motivation: Remove the synchronization bottleneck in startThread() which is called by each execute(..) call from outside the EventLoop. Modifications: Replace the synchronized block with the use of AtomicInteger and compareAndSet loops. Result: Less conditions during SingleThreadEventExecutor.execute(...)
This commit is contained in:
parent
b10b79b455
commit
4c64bc0414
@ -15,6 +15,7 @@
|
||||
*/
|
||||
package io.netty.util.concurrent;
|
||||
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
|
||||
@ -34,6 +35,7 @@ import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
|
||||
|
||||
/**
|
||||
* Abstract base class for {@link EventExecutor}'s that execute all its submitted tasks in a single thread.
|
||||
@ -57,19 +59,32 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
||||
}
|
||||
};
|
||||
|
||||
private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER;
|
||||
|
||||
static {
|
||||
AtomicIntegerFieldUpdater<SingleThreadEventExecutor> updater =
|
||||
PlatformDependent.newAtomicIntegerFieldUpdater(SingleThreadEventExecutor.class, "state");
|
||||
if (updater == null) {
|
||||
updater = AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
|
||||
}
|
||||
STATE_UPDATER = updater;
|
||||
}
|
||||
|
||||
private final Queue<Runnable> taskQueue;
|
||||
final Queue<ScheduledFutureTask<?>> delayedTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
|
||||
|
||||
private volatile Thread thread;
|
||||
private final Executor executor;
|
||||
private volatile boolean interrupted;
|
||||
private final Object stateLock = new Object();
|
||||
private final Semaphore threadLock = new Semaphore(0);
|
||||
private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
|
||||
private final boolean addTaskWakesUp;
|
||||
|
||||
private long lastExecutionTime;
|
||||
|
||||
@SuppressWarnings({ "FieldMayBeFinal", "unused" })
|
||||
private volatile int state = ST_NOT_STARTED;
|
||||
|
||||
private volatile long gracefulShutdownQuietPeriod;
|
||||
private volatile long gracefulShutdownTimeout;
|
||||
private long gracefulShutdownStartTime;
|
||||
@ -106,7 +121,6 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
||||
|
||||
this.addTaskWakesUp = addTaskWakesUp;
|
||||
this.executor = executor;
|
||||
|
||||
taskQueue = newTaskQueue();
|
||||
}
|
||||
|
||||
@ -378,7 +392,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
||||
}
|
||||
|
||||
protected void wakeup(boolean inEventLoop) {
|
||||
if (!inEventLoop || state == ST_SHUTTING_DOWN) {
|
||||
if (!inEventLoop || STATE_UPDATER.get(this) == ST_SHUTTING_DOWN) {
|
||||
taskQueue.add(WAKEUP_TASK);
|
||||
}
|
||||
}
|
||||
@ -462,32 +476,37 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
||||
}
|
||||
|
||||
boolean inEventLoop = inEventLoop();
|
||||
boolean wakeup = true;
|
||||
|
||||
synchronized (stateLock) {
|
||||
boolean wakeup;
|
||||
int oldState;
|
||||
for (;;) {
|
||||
if (isShuttingDown()) {
|
||||
return terminationFuture();
|
||||
}
|
||||
|
||||
gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
|
||||
gracefulShutdownTimeout = unit.toNanos(timeout);
|
||||
|
||||
int newState;
|
||||
wakeup = true;
|
||||
oldState = STATE_UPDATER.get(this);
|
||||
if (inEventLoop) {
|
||||
assert state == ST_STARTED;
|
||||
state = ST_SHUTTING_DOWN;
|
||||
newState = ST_SHUTTING_DOWN;
|
||||
} else {
|
||||
switch (state) {
|
||||
switch (oldState) {
|
||||
case ST_NOT_STARTED:
|
||||
state = ST_SHUTTING_DOWN;
|
||||
doStartThread();
|
||||
break;
|
||||
case ST_STARTED:
|
||||
state = ST_SHUTTING_DOWN;
|
||||
newState = ST_SHUTTING_DOWN;
|
||||
break;
|
||||
default:
|
||||
newState = oldState;
|
||||
wakeup = false;
|
||||
}
|
||||
}
|
||||
if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
|
||||
gracefulShutdownTimeout = unit.toNanos(timeout);
|
||||
|
||||
if (oldState == ST_NOT_STARTED) {
|
||||
doStartThread();
|
||||
}
|
||||
|
||||
if (wakeup) {
|
||||
@ -510,30 +529,36 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
||||
}
|
||||
|
||||
boolean inEventLoop = inEventLoop();
|
||||
boolean wakeup = true;
|
||||
|
||||
synchronized (stateLock) {
|
||||
if (isShutdown()) {
|
||||
boolean wakeup;
|
||||
int oldState;
|
||||
for (;;) {
|
||||
if (isShuttingDown()) {
|
||||
return;
|
||||
}
|
||||
|
||||
int newState;
|
||||
wakeup = true;
|
||||
oldState = STATE_UPDATER.get(this);
|
||||
if (inEventLoop) {
|
||||
assert state == ST_STARTED || state == ST_SHUTTING_DOWN;
|
||||
state = ST_SHUTDOWN;
|
||||
newState = ST_SHUTDOWN;
|
||||
} else {
|
||||
switch (state) {
|
||||
switch (oldState) {
|
||||
case ST_NOT_STARTED:
|
||||
state = ST_SHUTDOWN;
|
||||
doStartThread();
|
||||
break;
|
||||
case ST_STARTED:
|
||||
case ST_SHUTTING_DOWN:
|
||||
state = ST_SHUTDOWN;
|
||||
newState = ST_SHUTDOWN;
|
||||
break;
|
||||
default:
|
||||
newState = oldState;
|
||||
wakeup = false;
|
||||
}
|
||||
}
|
||||
if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (oldState == ST_NOT_STARTED) {
|
||||
doStartThread();
|
||||
}
|
||||
|
||||
if (wakeup) {
|
||||
@ -543,17 +568,17 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
||||
|
||||
@Override
|
||||
public boolean isShuttingDown() {
|
||||
return state >= ST_SHUTTING_DOWN;
|
||||
return STATE_UPDATER.get(this) >= ST_SHUTTING_DOWN;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isShutdown() {
|
||||
return state >= ST_SHUTDOWN;
|
||||
return STATE_UPDATER.get(this) >= ST_SHUTDOWN;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isTerminated() {
|
||||
return state == ST_TERMINATED;
|
||||
return STATE_UPDATER.get(this) == ST_TERMINATED;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -772,9 +797,8 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
||||
}
|
||||
|
||||
private void startThread() {
|
||||
synchronized (stateLock) {
|
||||
if (state == ST_NOT_STARTED) {
|
||||
state = ST_STARTED;
|
||||
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
|
||||
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
|
||||
delayedTaskQueue.add(new ScheduledFutureTask<Void>(
|
||||
this, delayedTaskQueue, Executors.<Void>callable(new PurgeTask(), null),
|
||||
ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
|
||||
@ -801,8 +825,12 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
||||
} catch (Throwable t) {
|
||||
logger.warn("Unexpected exception from an event executor: ", t);
|
||||
} finally {
|
||||
if (state < ST_SHUTTING_DOWN) {
|
||||
state = ST_SHUTTING_DOWN;
|
||||
for (;;) {
|
||||
int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
|
||||
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
|
||||
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Check if confirmShutdown() was called at the end of the loop.
|
||||
@ -823,9 +851,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
||||
try {
|
||||
cleanup();
|
||||
} finally {
|
||||
synchronized (stateLock) {
|
||||
state = ST_TERMINATED;
|
||||
}
|
||||
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
|
||||
threadLock.release();
|
||||
if (!taskQueue.isEmpty()) {
|
||||
logger.warn(
|
||||
|
Loading…
Reference in New Issue
Block a user