Reduce the chance of RejectedExecutionException

When a Netty application shuts down, a user often sees a REE
(RejectedExecutionException).

A REE is raised due to various reasons we don't have control over, such
as:

- A client connects to a server while the server is shutting down.

- An event is triggered for a closed Channel while its event loop is
  also shutting down.  Some of them are:
  - channelDeregistered (triggered after a channel is closed)
  - freeIn/OutboundBuffer (triggered after channelDeregistered)
  - userEventTriggered (triggered anytime)

To address this issue, a new method called confirmShutdown() has been
added to SingleThreadEventExecutor.  After a user calls shutdown(),
confirmShutdown() runs any remaining tasks in the task queue and ensures
no events are triggered for last 2 seconds.  If any task are added to
the task queue before 2 seconds passes, confirmShutdown() prevents the
event loop from terminating by returning false.

Now that SingleThreadEventExecutor needs to accept tasks even after
shutdown(), its execute() method only rejects the task after the event
loop is terminated (i.e. isTerminated() returns true.)  Except that,
there's no change in semantics.

SingleThreadEventExecutor also checks if its subclass called
confirmShutdown() in its run() implementation, so that Netty developers
can make sure they shut down their event loop impementation correctly.

It also fixes a bug in AioSocketChannel, revealed by delayed shutdown,
where an inboundBufferUpdated() event is triggered on a closed Channel
with deallocated buffers.

Caveats:

Because SingleThreadEventExecutor.takeTask() does not have a notion of
timeout, confirmShutdown() adds a dummy task (WAKEUP_TASK) to wake up
takeTask() immediately and instead sleeps hard-coded 100ms.  I'll
address this issue later by modifying takeTask() times out dynamically.

Miscellaneous changes:

SingleThreadEventExecutor.wakeup() now has the default implementation.
Instead of interrupting the current thread, it simply adds a dummy task
(WAKEUP_TASK) to the task queue, which is more elegant and efficient.
NioEventLoop is the only implementation that overrides it. All other
implementations' wakeup()s were removed thanks to this change.
This commit is contained in:
Trustin Lee 2012-11-22 20:45:49 +09:00 committed by Norman Maurer
parent 81e2db10fa
commit dbbc6ad73f
11 changed files with 140 additions and 114 deletions

View File

@ -40,16 +40,9 @@ class DefaultEventExecutor extends SingleThreadEventExecutor {
// Waken up by interruptThread()
}
if (isShutdown() && peekTask() == null) {
if (isShutdown() && confirmShutdown()) {
break;
}
}
}
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && isShutdown()) {
interruptThread();
}
}
}

View File

@ -71,6 +71,10 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou
@Override
public void shutdown() {
if (isShutdown()) {
return;
}
scheduler.shutdown();
for (EventExecutor l: children) {
l.shutdown();

View File

@ -43,9 +43,27 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
/**
* Wait at least 2 seconds after shutdown() until there are no pending tasks anymore.
* @see #confirmShutdown()
*/
private static final long SHUTDOWN_DELAY_NANOS = TimeUnit.SECONDS.toNanos(2);
static final ThreadLocal<SingleThreadEventExecutor> CURRENT_EVENT_LOOP =
new ThreadLocal<SingleThreadEventExecutor>();
private static final int ST_NOT_STARTED = 1;
private static final int ST_STARTED = 2;
private static final int ST_SHUTDOWN = 3;
private static final int ST_TERMINATED = 4;
private static final Runnable WAKEUP_TASK = new Runnable() {
@Override
public void run() {
// Do nothing.
}
};
public static SingleThreadEventExecutor currentEventLoop() {
return CURRENT_EVENT_LOOP.get();
}
@ -57,8 +75,8 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
private final Semaphore threadLock = new Semaphore(0);
private final ChannelTaskScheduler scheduler;
private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
/** 0 - not started, 1 - started, 2 - shut down, 3 - terminated */
private volatile int state;
private volatile int state = ST_NOT_STARTED;
private long lastAccessTimeNanos;
protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory, ChannelTaskScheduler scheduler) {
@ -76,19 +94,29 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
@Override
public void run() {
CURRENT_EVENT_LOOP.set(SingleThreadEventExecutor.this);
boolean success = false;
try {
SingleThreadEventExecutor.this.run();
success = true;
} finally {
// Check if confirmShutdown() was called at the end of the loop.
if (success && lastAccessTimeNanos == 0) {
logger.error(
"Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
"before run() implementation terminates.");
}
try {
// Run all remaining tasks and shutdown hooks.
try {
cleanupTasks();
} finally {
synchronized (stateLock) {
state = 3;
for (;;) {
if (confirmShutdown()) {
break;
}
}
cleanupTasks();
synchronized (stateLock) {
state = ST_TERMINATED;
}
} finally {
try {
cleanup();
@ -99,17 +127,6 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
}
}
}
private void cleanupTasks() {
for (;;) {
boolean ran = false;
ran |= runAllTasks();
ran |= runShutdownHooks();
if (!ran && !hasTasks()) {
break;
}
}
}
});
taskQueue = newTaskQueue();
@ -161,7 +178,7 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
if (task == null) {
throw new NullPointerException("task");
}
if (isShutdown()) {
if (isTerminated()) {
reject();
}
taskQueue.add(task);
@ -182,6 +199,10 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
break;
}
if (task == WAKEUP_TASK) {
continue;
}
try {
task.run();
ran = true;
@ -198,7 +219,11 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
// Do nothing. Subclasses will override.
}
protected abstract void wakeup(boolean inEventLoop);
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop || state == ST_SHUTDOWN) {
addTask(WAKEUP_TASK);
}
}
@Override
public boolean inEventLoop() {
@ -256,29 +281,30 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
@Override
public void shutdown() {
if (isShutdown()) {
return;
}
boolean inEventLoop = inEventLoop();
boolean wakeup = false;
boolean wakeup = true;
if (inEventLoop) {
synchronized (stateLock) {
assert state == 1;
state = 2;
wakeup = true;
assert state == ST_STARTED;
state = ST_SHUTDOWN;
}
} else {
synchronized (stateLock) {
switch (state) {
case 0:
state = 3;
try {
cleanup();
} finally {
threadLock.release();
}
case ST_NOT_STARTED:
state = ST_SHUTDOWN;
thread.start();
break;
case 1:
state = 2;
wakeup = true;
case ST_STARTED:
state = ST_SHUTDOWN;
break;
default:
wakeup = false;
}
}
}
@ -296,12 +322,49 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
@Override
public boolean isShutdown() {
return state >= 2;
return state >= ST_SHUTDOWN;
}
@Override
public boolean isTerminated() {
return state == 3;
return state == ST_TERMINATED;
}
protected boolean confirmShutdown() {
if (!isShutdown()) {
throw new IllegalStateException("must be invoked after shutdown()");
}
if (!inEventLoop()) {
throw new IllegalStateException("must be invoked from an event loop");
}
if (runAllTasks() || runShutdownHooks()) {
// There were tasks in the queue. Wait a little bit more until no tasks are queued for SHUTDOWN_DELAY_NANOS.
lastAccessTimeNanos = 0;
wakeup(true);
return false;
}
if (lastAccessTimeNanos == 0 || System.nanoTime() - lastAccessTimeNanos < SHUTDOWN_DELAY_NANOS) {
if (lastAccessTimeNanos == 0) {
lastAccessTimeNanos = System.nanoTime();
}
// Check if any tasks were added to the queue every 100ms.
// TODO: Change the behavior of takeTask() so that it returns on timeout.
wakeup(true);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// Ignore
}
return false;
}
// No tasks were added for last SHUTDOWN_DELAY_NANOS - hopefully safe to shut down.
// (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.)
return true;
}
@Override
@ -332,13 +395,13 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
wakeup(true);
} else {
synchronized (stateLock) {
if (state == 0) {
state = 1;
if (state == ST_NOT_STARTED) {
state = ST_STARTED;
thread.start();
}
}
addTask(task);
if (isShutdown() && removeTask(task)) {
if (isTerminated() && removeTask(task)) {
reject();
}
wakeup(false);
@ -346,7 +409,7 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
}
private static void reject() {
throw new RejectedExecutionException("event executor shut down");
throw new RejectedExecutionException("event executor terminated");
}
@Override

View File

@ -44,6 +44,12 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im
@Override
public ChannelFuture register(final Channel channel, final ChannelFuture future) {
if (isShutdown()) {
channel.unsafe().closeForcibly();
future.setFailure(new EventLoopException("cannot register a channel to a shut down loop"));
return future;
}
if (inEventLoop()) {
channel.unsafe().register(this, future);
} else {

View File

@ -38,20 +38,9 @@ final class LocalEventLoop extends SingleThreadEventLoop {
// Waken up by interruptThread()
}
if (isShutdown()) {
task = pollTask();
if (task == null) {
break;
}
task.run();
if (isShutdown() && confirmShutdown()) {
break;
}
}
}
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && isShutdown()) {
interruptThread();
}
}
}

View File

@ -79,11 +79,9 @@ final class AioEventLoop extends SingleThreadEventLoop {
if (isShutdown()) {
closeAll();
task = pollTask();
if (task == null) {
if (confirmShutdown()) {
break;
}
task.run();
}
}
}
@ -98,11 +96,4 @@ final class AioEventLoop extends SingleThreadEventLoop {
ch.unsafe().close(ch.unsafe().voidFuture());
}
}
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && isShutdown()) {
interruptThread();
}
}
}

View File

@ -220,6 +220,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
@Override
protected void doClose() throws Exception {
javaChannel().close();
inputShutdown = true;
outputShutdown = true;
}
@ -354,6 +355,12 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
@Override
protected void completed0(T result, AioSocketChannel channel) {
if (channel.inputShutdown) {
// Channel has been closed during read. Because the inbound buffer has been deallocated already,
// there's no way to let a user handler access it unfortunately.
return;
}
final ChannelPipeline pipeline = channel.pipeline();
final ByteBuf byteBuf = pipeline.inboundByteBuffer();
@ -394,7 +401,8 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
pipeline.fireInboundBufferUpdated();
}
if (closed) {
// Double check because fireInboundBufferUpdated() might have triggered the closure by a user handler.
if (closed || !channel.isOpen()) {
channel.inputShutdown = true;
if (channel.isOpen()) {
if (channel.config().isAllowHalfClosure()) {

View File

@ -249,7 +249,7 @@ public final class NioEventLoop extends SingleThreadEventLoop {
if (isShutdown()) {
closeAll();
if (peekTask() == null) {
if (confirmShutdown()) {
break;
}
}

View File

@ -91,20 +91,13 @@ class OioEventLoop extends SingleThreadEventLoop {
if (ch != null) {
ch.unsafe().close(ch.unsafe().voidFuture());
}
if (peekTask() == null) {
if (confirmShutdown()) {
break;
}
}
}
}
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && isShutdown()) {
interruptThread();
}
}
private void deregister() {
ch = null;
parent.activeChildren.remove(this);

View File

@ -15,7 +15,9 @@
*/
package io.netty.channel;
import static org.junit.Assert.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
@ -27,9 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
public class SingleThreadEventLoopTest {
@ -68,27 +68,21 @@ public class SingleThreadEventLoopTest {
@Override
public void run() {
latch.countDown();
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException ignored) {
interrupted.set(true);
}
}
});
// Wait for the event loop thread to start.
latch.await();
// Request the event loop thread to stop - it will call wakeup(false) to interrupt the thread.
// Request the event loop thread to stop.
loop.shutdown();
assertTrue(loop.isShutdown());
// Wait until the event loop is terminated.
while (!loop.isTerminated()) {
loop.awaitTermination(1, TimeUnit.DAYS);
}
// Make sure loop.shutdown() above triggered wakeup().
assertTrue(interrupted.get());
}
@Test
@ -271,7 +265,7 @@ public class SingleThreadEventLoopTest {
// Waken up by interruptThread()
}
if (isShutdown() && peekTask() == null) {
if (isShutdown() && confirmShutdown()) {
break;
}
}
@ -282,13 +276,6 @@ public class SingleThreadEventLoopTest {
cleanedUp.incrementAndGet();
}
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && isShutdown()) {
interruptThread();
}
}
@Override
public ChannelFuture register(Channel channel, ChannelFuture future) {
// Untested

View File

@ -43,7 +43,6 @@ import java.util.HashSet;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -81,7 +80,7 @@ public class LocalTransportThreadModelTest {
sb.shutdown();
}
@Test(timeout = 5000)
@Test(timeout = 30000)
public void testStagedExecutionMultiple() throws Throwable {
for (int i = 0; i < 10; i ++) {
testStagedExecution();
@ -197,10 +196,10 @@ public class LocalTransportThreadModelTest {
throw e;
} finally {
l.shutdown();
l.awaitTermination(5, TimeUnit.SECONDS);
e1.shutdown();
e1.awaitTermination(5, TimeUnit.SECONDS);
e2.shutdown();
l.awaitTermination(5, TimeUnit.SECONDS);
e1.awaitTermination(5, TimeUnit.SECONDS);
e2.awaitTermination(5, TimeUnit.SECONDS);
}
}
@ -319,7 +318,6 @@ public class LocalTransportThreadModelTest {
}
ch.close().sync();
h6.latch.await(); // Wait until channelInactive() is triggered.
} finally {
l.shutdown();
@ -662,7 +660,6 @@ public class LocalTransportThreadModelTest {
private volatile int inCnt;
private volatile int outCnt;
private volatile Thread t;
final CountDownLatch latch = new CountDownLatch(1);
@Override
public MessageBuf<Object> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
@ -724,11 +721,6 @@ public class LocalTransportThreadModelTest {
ctx.flush(future);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
latch.countDown();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
exception.compareAndSet(null, cause);