Fix an unexpected RejectedExecutionException

- Ensure to run all remaining tasks before marking the executor as
  'shut down'.
This commit is contained in:
Trustin Lee 2012-06-12 17:41:06 +09:00
parent 3393629eed
commit 660e4548a6

View File

@ -91,16 +91,35 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
try { try {
SingleThreadEventExecutor.this.run(); SingleThreadEventExecutor.this.run();
} finally { } finally {
synchronized (stateLock) {
state = 3;
}
try { try {
cancelScheduledTasks(); // Run all remaining tasks and shutdown hooks.
runShutdownHooks(); try {
cleanup(); cleanupTasks();
} finally {
synchronized (stateLock) {
state = 3;
}
}
cleanupTasks();
} finally { } finally {
threadLock.release(); try {
assert taskQueue.isEmpty(); cleanup();
} finally {
threadLock.release();
assert taskQueue.isEmpty();
}
}
}
}
private void cleanupTasks() {
for (;;) {
boolean ran = false;
cancelScheduledTasks();
ran |= runAllTasks();
ran |= runShutdownHooks();
if (!ran && !hasTasks()) {
break;
} }
} }
} }
@ -196,15 +215,22 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
return taskQueue.remove(task); return taskQueue.remove(task);
} }
protected void runAllTasks() { protected boolean runAllTasks() {
boolean ran = false;
for (;;) { for (;;) {
final Runnable task = pollTask(); final Runnable task = pollTask();
if (task == null) { if (task == null) {
break; break;
} }
task.run(); try {
task.run();
ran = true;
} catch (Throwable t) {
logger.warn("A task raised an exception.", t);
}
} }
return ran;
} }
protected abstract void run(); protected abstract void run();
@ -251,7 +277,8 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
} }
} }
private void runShutdownHooks() { private boolean runShutdownHooks() {
boolean ran = false;
// Note shutdown hooks can add / remove shutdown hooks. // Note shutdown hooks can add / remove shutdown hooks.
while (!shutdownHooks.isEmpty()) { while (!shutdownHooks.isEmpty()) {
List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks); List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
@ -259,11 +286,13 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
for (Runnable task: copy) { for (Runnable task: copy) {
try { try {
task.run(); task.run();
ran = true;
} catch (Throwable t) { } catch (Throwable t) {
logger.warn("Shutdown hook raised an exception.", t); logger.warn("Shutdown hook raised an exception.", t);
} }
} }
} }
return ran;
} }
@Override @Override
@ -358,7 +387,7 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
} }
private static void reject() { private static void reject() {
throw new RejectedExecutionException("event loop shut down"); throw new RejectedExecutionException("event executor shut down");
} }
@Override @Override