diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java b/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java index 4615b1fc90..20e23cad42 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java @@ -91,16 +91,35 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService try { SingleThreadEventExecutor.this.run(); } finally { - synchronized (stateLock) { - state = 3; - } try { - cancelScheduledTasks(); - runShutdownHooks(); - cleanup(); + // Run all remaining tasks and shutdown hooks. + try { + cleanupTasks(); + } finally { + synchronized (stateLock) { + state = 3; + } + } + cleanupTasks(); } finally { - threadLock.release(); - assert taskQueue.isEmpty(); + try { + cleanup(); + } finally { + threadLock.release(); + assert taskQueue.isEmpty(); + } + } + } + } + + private void cleanupTasks() { + for (;;) { + boolean ran = false; + cancelScheduledTasks(); + ran |= runAllTasks(); + ran |= runShutdownHooks(); + if (!ran && !hasTasks()) { + break; } } } @@ -196,15 +215,22 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService return taskQueue.remove(task); } - protected void runAllTasks() { + protected boolean runAllTasks() { + boolean ran = false; for (;;) { final Runnable task = pollTask(); if (task == null) { break; } - task.run(); + try { + task.run(); + ran = true; + } catch (Throwable t) { + logger.warn("A task raised an exception.", t); + } } + return ran; } protected abstract void run(); @@ -251,7 +277,8 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService } } - private void runShutdownHooks() { + private boolean runShutdownHooks() { + boolean ran = false; // Note shutdown hooks can add / remove shutdown hooks. while (!shutdownHooks.isEmpty()) { List copy = new ArrayList(shutdownHooks); @@ -259,11 +286,13 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService for (Runnable task: copy) { try { task.run(); + ran = true; } catch (Throwable t) { logger.warn("Shutdown hook raised an exception.", t); } } } + return ran; } @Override @@ -358,7 +387,7 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService } private static void reject() { - throw new RejectedExecutionException("event loop shut down"); + throw new RejectedExecutionException("event executor shut down"); } @Override