From 4c64bc04141540540b43c66b6c0c00dd7aec3476 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 13 Mar 2014 08:01:42 +0100 Subject: [PATCH] [#2307] Remove synchronized bottleneck in SingleThreadEventExecutor.execute(...) Motivation: Remove the synchronization bottleneck in startThread() which is called by each execute(..) call from outside the EventLoop. Modifications: Replace the synchronized block with the use of AtomicInteger and compareAndSet loops. Result: Less conditions during SingleThreadEventExecutor.execute(...) --- .../concurrent/SingleThreadEventExecutor.java | 118 +++++++++++------- 1 file changed, 72 insertions(+), 46 deletions(-) diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index 289026a7ce..37c7d400f5 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -15,6 +15,7 @@ */ package io.netty.util.concurrent; +import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -34,6 +35,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; /** * Abstract base class for {@link EventExecutor}'s that execute all its submitted tasks in a single thread. @@ -57,19 +59,32 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } }; + private static final AtomicIntegerFieldUpdater STATE_UPDATER; + + static { + AtomicIntegerFieldUpdater updater = + PlatformDependent.newAtomicIntegerFieldUpdater(SingleThreadEventExecutor.class, "state"); + if (updater == null) { + updater = AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state"); + } + STATE_UPDATER = updater; + } + private final Queue taskQueue; final Queue> delayedTaskQueue = new PriorityQueue>(); private volatile Thread thread; private final Executor executor; private volatile boolean interrupted; - private final Object stateLock = new Object(); private final Semaphore threadLock = new Semaphore(0); private final Set shutdownHooks = new LinkedHashSet(); private final boolean addTaskWakesUp; private long lastExecutionTime; + + @SuppressWarnings({ "FieldMayBeFinal", "unused" }) private volatile int state = ST_NOT_STARTED; + private volatile long gracefulShutdownQuietPeriod; private volatile long gracefulShutdownTimeout; private long gracefulShutdownStartTime; @@ -106,7 +121,6 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { this.addTaskWakesUp = addTaskWakesUp; this.executor = executor; - taskQueue = newTaskQueue(); } @@ -378,7 +392,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } protected void wakeup(boolean inEventLoop) { - if (!inEventLoop || state == ST_SHUTTING_DOWN) { + if (!inEventLoop || STATE_UPDATER.get(this) == ST_SHUTTING_DOWN) { taskQueue.add(WAKEUP_TASK); } } @@ -462,32 +476,37 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } boolean inEventLoop = inEventLoop(); - boolean wakeup = true; - - synchronized (stateLock) { + boolean wakeup; + int oldState; + for (;;) { if (isShuttingDown()) { return terminationFuture(); } - - gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod); - gracefulShutdownTimeout = unit.toNanos(timeout); - + int newState; + wakeup = true; + oldState = STATE_UPDATER.get(this); if (inEventLoop) { - assert state == ST_STARTED; - state = ST_SHUTTING_DOWN; + newState = ST_SHUTTING_DOWN; } else { - switch (state) { + switch (oldState) { case ST_NOT_STARTED: - state = ST_SHUTTING_DOWN; - doStartThread(); - break; case ST_STARTED: - state = ST_SHUTTING_DOWN; + newState = ST_SHUTTING_DOWN; break; default: + newState = oldState; wakeup = false; } } + if (STATE_UPDATER.compareAndSet(this, oldState, newState)) { + break; + } + } + gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod); + gracefulShutdownTimeout = unit.toNanos(timeout); + + if (oldState == ST_NOT_STARTED) { + doStartThread(); } if (wakeup) { @@ -510,30 +529,36 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } boolean inEventLoop = inEventLoop(); - boolean wakeup = true; - - synchronized (stateLock) { - if (isShutdown()) { + boolean wakeup; + int oldState; + for (;;) { + if (isShuttingDown()) { return; } - + int newState; + wakeup = true; + oldState = STATE_UPDATER.get(this); if (inEventLoop) { - assert state == ST_STARTED || state == ST_SHUTTING_DOWN; - state = ST_SHUTDOWN; + newState = ST_SHUTDOWN; } else { - switch (state) { - case ST_NOT_STARTED: - state = ST_SHUTDOWN; - doStartThread(); - break; - case ST_STARTED: - case ST_SHUTTING_DOWN: - state = ST_SHUTDOWN; - break; - default: - wakeup = false; + switch (oldState) { + case ST_NOT_STARTED: + case ST_STARTED: + case ST_SHUTTING_DOWN: + newState = ST_SHUTDOWN; + break; + default: + newState = oldState; + wakeup = false; } } + if (STATE_UPDATER.compareAndSet(this, oldState, newState)) { + break; + } + } + + if (oldState == ST_NOT_STARTED) { + doStartThread(); } if (wakeup) { @@ -543,17 +568,17 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { @Override public boolean isShuttingDown() { - return state >= ST_SHUTTING_DOWN; + return STATE_UPDATER.get(this) >= ST_SHUTTING_DOWN; } @Override public boolean isShutdown() { - return state >= ST_SHUTDOWN; + return STATE_UPDATER.get(this) >= ST_SHUTDOWN; } @Override public boolean isTerminated() { - return state == ST_TERMINATED; + return STATE_UPDATER.get(this) == ST_TERMINATED; } /** @@ -772,9 +797,8 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } private void startThread() { - synchronized (stateLock) { - if (state == ST_NOT_STARTED) { - state = ST_STARTED; + if (STATE_UPDATER.get(this) == ST_NOT_STARTED) { + if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { delayedTaskQueue.add(new ScheduledFutureTask( this, delayedTaskQueue, Executors.callable(new PurgeTask(), null), ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL)); @@ -801,8 +825,12 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { - if (state < ST_SHUTTING_DOWN) { - state = ST_SHUTTING_DOWN; + for (;;) { + int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this); + if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet( + SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) { + break; + } } // Check if confirmShutdown() was called at the end of the loop. @@ -823,9 +851,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { try { cleanup(); } finally { - synchronized (stateLock) { - state = ST_TERMINATED; - } + STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); threadLock.release(); if (!taskQueue.isEmpty()) { logger.warn(