diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index 37bf980815..1445b212da 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -15,6 +15,7 @@ */ package io.netty.util.concurrent; +import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -33,6 +34,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; /** * Abstract base class for {@link EventExecutor}'s that execute all its submitted tasks in a single thread. @@ -56,18 +58,31 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } }; + private static final AtomicIntegerFieldUpdater STATE_UPDATER; + + static { + AtomicIntegerFieldUpdater updater = + PlatformDependent.newAtomicIntegerFieldUpdater(SingleThreadEventExecutor.class, "state"); + if (updater == null) { + updater = AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state"); + } + STATE_UPDATER = updater; + } + private final EventExecutorGroup parent; private final Queue taskQueue; final Queue> delayedTaskQueue = new PriorityQueue>(); private final Thread thread; - private final Object stateLock = new Object(); private final Semaphore threadLock = new Semaphore(0); private final Set shutdownHooks = new LinkedHashSet(); private final boolean addTaskWakesUp; private long lastExecutionTime; + + @SuppressWarnings({ "FieldMayBeFinal", "unused" }) private volatile int state = ST_NOT_STARTED; + private volatile long gracefulShutdownQuietPeriod; private volatile long gracefulShutdownTimeout; private long gracefulShutdownStartTime; @@ -103,10 +118,13 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { - if (state < ST_SHUTTING_DOWN) { - state = ST_SHUTTING_DOWN; + for (;;) { + int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this); + if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet( + SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) { + break; + } } - // Check if confirmShutdown() was called at the end of the loop. if (success && gracefulShutdownStartTime == 0) { logger.error( @@ -126,9 +144,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { try { cleanup(); } finally { - synchronized (stateLock) { - state = ST_TERMINATED; - } + STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); threadLock.release(); if (!taskQueue.isEmpty()) { logger.warn( @@ -414,7 +430,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } protected void wakeup(boolean inEventLoop) { - if (!inEventLoop || state == ST_SHUTTING_DOWN) { + if (!inEventLoop || STATE_UPDATER.get(this) == ST_SHUTTING_DOWN) { taskQueue.add(WAKEUP_TASK); } } @@ -498,32 +514,37 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } boolean inEventLoop = inEventLoop(); - boolean wakeup = true; - - synchronized (stateLock) { + boolean wakeup; + int oldState; + for (;;) { if (isShuttingDown()) { return terminationFuture(); } - - gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod); - gracefulShutdownTimeout = unit.toNanos(timeout); - + int newState; + wakeup = true; + oldState = STATE_UPDATER.get(this); if (inEventLoop) { - assert state == ST_STARTED; - state = ST_SHUTTING_DOWN; + newState = ST_SHUTTING_DOWN; } else { - switch (state) { + switch (oldState) { case ST_NOT_STARTED: - state = ST_SHUTTING_DOWN; - thread.start(); - break; case ST_STARTED: - state = ST_SHUTTING_DOWN; + newState = ST_SHUTTING_DOWN; break; default: + newState = oldState; wakeup = false; } } + if (STATE_UPDATER.compareAndSet(this, oldState, newState)) { + break; + } + } + gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod); + gracefulShutdownTimeout = unit.toNanos(timeout); + + if (oldState == ST_NOT_STARTED) { + thread.start(); } if (wakeup) { @@ -546,30 +567,36 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } boolean inEventLoop = inEventLoop(); - boolean wakeup = true; - - synchronized (stateLock) { - if (isShutdown()) { + boolean wakeup; + int oldState; + for (;;) { + if (isShuttingDown()) { return; } - + int newState; + wakeup = true; + oldState = STATE_UPDATER.get(this); if (inEventLoop) { - assert state == ST_STARTED || state == ST_SHUTTING_DOWN; - state = ST_SHUTDOWN; + newState = ST_SHUTDOWN; } else { - switch (state) { - case ST_NOT_STARTED: - state = ST_SHUTDOWN; - thread.start(); - break; - case ST_STARTED: - case ST_SHUTTING_DOWN: - state = ST_SHUTDOWN; - break; - default: - wakeup = false; + switch (oldState) { + case ST_NOT_STARTED: + case ST_STARTED: + case ST_SHUTTING_DOWN: + newState = ST_SHUTDOWN; + break; + default: + newState = oldState; + wakeup = false; } } + if (STATE_UPDATER.compareAndSet(this, oldState, newState)) { + break; + } + } + + if (oldState == ST_NOT_STARTED) { + thread.start(); } if (wakeup) { @@ -579,17 +606,17 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { @Override public boolean isShuttingDown() { - return state >= ST_SHUTTING_DOWN; + return STATE_UPDATER.get(this) >= ST_SHUTTING_DOWN; } @Override public boolean isShutdown() { - return state >= ST_SHUTDOWN; + return STATE_UPDATER.get(this) >= ST_SHUTDOWN; } @Override public boolean isTerminated() { - return state == ST_TERMINATED; + return STATE_UPDATER.get(this) == ST_TERMINATED; } /** @@ -808,9 +835,8 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } private void startThread() { - synchronized (stateLock) { - if (state == ST_NOT_STARTED) { - state = ST_STARTED; + if (STATE_UPDATER.get(this) == ST_NOT_STARTED) { + if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { delayedTaskQueue.add(new ScheduledFutureTask( this, delayedTaskQueue, Executors.callable(new PurgeTask(), null), ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));