From 660e4548a6f2bd7a913f624e65611dfe6a123e4b Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Tue, 12 Jun 2012 17:41:06 +0900 Subject: [PATCH] Fix an unexpected RejectedExecutionException - Ensure to run all remaining tasks before marking the executor as 'shut down'. --- .../channel/SingleThreadEventExecutor.java | 53 ++++++++++++++----- 1 file changed, 41 insertions(+), 12 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java b/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java index 4615b1fc90..20e23cad42 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java @@ -91,16 +91,35 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService try { SingleThreadEventExecutor.this.run(); } finally { - synchronized (stateLock) { - state = 3; - } try { - cancelScheduledTasks(); - runShutdownHooks(); - cleanup(); + // Run all remaining tasks and shutdown hooks. + try { + cleanupTasks(); + } finally { + synchronized (stateLock) { + state = 3; + } + } + cleanupTasks(); } finally { - threadLock.release(); - assert taskQueue.isEmpty(); + try { + cleanup(); + } finally { + threadLock.release(); + assert taskQueue.isEmpty(); + } + } + } + } + + private void cleanupTasks() { + for (;;) { + boolean ran = false; + cancelScheduledTasks(); + ran |= runAllTasks(); + ran |= runShutdownHooks(); + if (!ran && !hasTasks()) { + break; } } } @@ -196,15 +215,22 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService return taskQueue.remove(task); } - protected void runAllTasks() { + protected boolean runAllTasks() { + boolean ran = false; for (;;) { final Runnable task = pollTask(); if (task == null) { break; } - task.run(); + try { + task.run(); + ran = true; + } catch (Throwable t) { + logger.warn("A task raised an exception.", t); + } } + return ran; } protected abstract void run(); @@ -251,7 +277,8 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService } } - private void runShutdownHooks() { + private boolean runShutdownHooks() { + boolean ran = false; // Note shutdown hooks can add / remove shutdown hooks. while (!shutdownHooks.isEmpty()) { List copy = new ArrayList(shutdownHooks); @@ -259,11 +286,13 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService for (Runnable task: copy) { try { task.run(); + ran = true; } catch (Throwable t) { logger.warn("Shutdown hook raised an exception.", t); } } } + return ran; } @Override @@ -358,7 +387,7 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService } private static void reject() { - throw new RejectedExecutionException("event loop shut down"); + throw new RejectedExecutionException("event executor shut down"); } @Override