diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java index c3bbae21ae..55ea87aedf 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java @@ -15,7 +15,9 @@ */ package io.netty.util.concurrent; +import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.Callable; @@ -37,6 +39,28 @@ public abstract class AbstractEventExecutor extends AbstractExecutorService impl return new EventExecutorIterator(); } + @Override + public void shutdownGracefully() { + shutdownGracefully(2, 15, TimeUnit.SECONDS); + } + + /** + * @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead. + */ + @Override + @Deprecated + public abstract void shutdown(); + + /** + * @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead. + */ + @Override + @Deprecated + public List shutdownNow() { + shutdown(); + return Collections.emptyList(); + } + @Override public Promise newPromise() { return new DefaultPromise(this); diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutorGroup.java b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutorGroup.java index 6f6db8f8ba..f8011102c1 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutorGroup.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutorGroup.java @@ -65,6 +65,22 @@ public abstract class AbstractEventExecutorGroup implements EventExecutorGroup { } @Override + public void shutdownGracefully() { + shutdownGracefully(2, 15, TimeUnit.SECONDS); + } + + /** + * @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead. + */ + @Override + @Deprecated + public abstract void shutdown(); + + /** + * @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead. + */ + @Override + @Deprecated public List shutdownNow() { shutdown(); return Collections.emptyList(); diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/DefaultEventExecutor.java index bcd8886162..f2f704a3a8 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultEventExecutor.java @@ -25,21 +25,19 @@ import java.util.concurrent.ThreadFactory; final class DefaultEventExecutor extends SingleThreadEventExecutor { DefaultEventExecutor(DefaultEventExecutorGroup parent, ThreadFactory threadFactory) { - super(parent, threadFactory); + super(parent, threadFactory, true); } @Override protected void run() { for (;;) { - Runnable task; - try { - task = takeTask(); + Runnable task = takeTask(); + if (task != null) { task.run(); - } catch (InterruptedException e) { - // Waken up by interruptThread() + updateLastExecutionTime(); } - if (isShutdown() && confirmShutdown()) { + if (confirmShutdown()) { break; } } diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java index 036fb6c74a..2d7b15b1a7 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java @@ -507,7 +507,7 @@ public class DefaultPromise extends AbstractFuture implements Promise { }); } } catch (Throwable t) { - logger.error("Failed to notify listener(s). Event loop terminated?", t); + logger.error("Failed to notify listener(s). Event loop shut down?", t); } } } @@ -544,7 +544,7 @@ public class DefaultPromise extends AbstractFuture implements Promise { } }); } catch (Throwable t) { - logger.error("Failed to notify a listener. Event loop terminated?", t); + logger.error("Failed to notify a listener. Event loop shut down?", t); } } @@ -644,7 +644,7 @@ public class DefaultPromise extends AbstractFuture implements Promise { }); } } catch (Throwable t) { - logger.error("Failed to notify listener(s). Event loop terminated?", t); + logger.error("Failed to notify listener(s). Event loop shut down?", t); } } } diff --git a/common/src/main/java/io/netty/util/concurrent/EventExecutorGroup.java b/common/src/main/java/io/netty/util/concurrent/EventExecutorGroup.java index 74cc7f5f20..5f447314f1 100644 --- a/common/src/main/java/io/netty/util/concurrent/EventExecutorGroup.java +++ b/common/src/main/java/io/netty/util/concurrent/EventExecutorGroup.java @@ -16,6 +16,7 @@ package io.netty.util.concurrent; import java.util.Iterator; +import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -28,6 +29,45 @@ import java.util.concurrent.TimeUnit; */ public interface EventExecutorGroup extends ScheduledExecutorService, Iterable { + /** + * Returns {@code true} if and only if this executor was started to be + * {@linkplain #shutdownGracefully() shut down gracefuclly} or was {@linkplain #isShutdown() shut down}. + */ + boolean isShuttingDown(); + + /** + * Shortcut method for {@link #shutdownGracefully(long, long, TimeUnit)} with sensible default values. + */ + void shutdownGracefully(); + + /** + * Signals this executor that the caller wants the executor to be shut down. Once this method is called, + * {@link #isShuttingDown()} starts to return {@code true}, and the executor prepares to shut itself down. + * Unlike {@link #shutdown()}, graceful shutdown ensures that no tasks are submitted for 'the quiet period' + * (usually a couple seconds) before it shuts itself down. If a task is submitted during the quiet period, + * it is guaranteed to be accepted and the quiet period will start over. + * + * @param quietPeriod the quiet period as described in the documentation + * @param timeout the maximum amount of time to wait until the executor is {@linkplain #shutdown()} + * regardless if a task was submitted during the quiet period + * @param unit the unit of {@code quietPeriod} and {@code timeout} + */ + void shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit); + + /** + * @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead. + */ + @Override + @Deprecated + void shutdown(); + + /** + * @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead. + */ + @Override + @Deprecated + List shutdownNow(); + /** * Returns one of the {@link EventExecutor}s that belong to this group. */ diff --git a/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java b/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java index 5e802683df..3a7b49f1ef 100644 --- a/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java +++ b/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java @@ -60,7 +60,19 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto } finally { if (!success) { for (int j = 0; j < i; j ++) { - children[j].shutdown(); + children[j].shutdownGracefully(); + } + + for (int j = 0; j < i; j ++) { + EventExecutor e = children[j]; + try { + while (!e.isTerminated()) { + e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); + } + } catch (InterruptedException interrupted) { + Thread.currentThread().interrupt(); + break; + } } } } @@ -107,16 +119,30 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto ThreadFactory threadFactory, Object... args) throws Exception; @Override - public void shutdown() { - if (isShutdown()) { - return; + public void shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + for (EventExecutor l: children) { + l.shutdownGracefully(quietPeriod, timeout, unit); } + } + @Override + @Deprecated + public void shutdown() { for (EventExecutor l: children) { l.shutdown(); } } + @Override + public boolean isShuttingDown() { + for (EventExecutor l: children) { + if (!l.isShuttingDown()) { + return false; + } + } + return true; + } + @Override public boolean isShutdown() { for (EventExecutor l: children) { diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index 2859088a12..65a29365d5 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -19,7 +19,6 @@ import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.util.ArrayList; -import java.util.Collections; import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; @@ -48,19 +47,14 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { private static final InternalLogger logger = InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class); - /** - * Wait at least 2 seconds after shutdown() until there are no pending tasks anymore. - * @see #confirmShutdown() - */ - private static final long SHUTDOWN_DELAY_NANOS = TimeUnit.SECONDS.toNanos(2); - static final ThreadLocal CURRENT_EVENT_LOOP = new ThreadLocal(); private static final int ST_NOT_STARTED = 1; private static final int ST_STARTED = 2; - private static final int ST_SHUTDOWN = 3; - private static final int ST_TERMINATED = 4; + private static final int ST_SHUTTING_DOWN = 3; + private static final int ST_SHUTDOWN = 4; + private static final int ST_TERMINATED = 5; private static final Runnable WAKEUP_TASK = new Runnable() { @Override @@ -84,36 +78,50 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { private final Object stateLock = new Object(); private final Semaphore threadLock = new Semaphore(0); private final Set shutdownHooks = new LinkedHashSet(); + private final boolean addTaskWakesUp; + + private long lastExecutionTime; private volatile int state = ST_NOT_STARTED; - private long lastAccessTimeNanos; + private volatile long gracefulShutdownQuietPeriod; + private volatile long gracefulShutdownTimeout; + private long gracefulShutdownStartTime; /** * Create a new instance * * @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it * @param threadFactory the {@link ThreadFactory} which will be used for the used {@link Thread} + * @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the + * executor thread */ - protected SingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory) { + protected SingleThreadEventExecutor( + EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) { + if (threadFactory == null) { throw new NullPointerException("threadFactory"); } this.parent = parent; + this.addTaskWakesUp = addTaskWakesUp; thread = threadFactory.newThread(new Runnable() { @Override public void run() { CURRENT_EVENT_LOOP.set(SingleThreadEventExecutor.this); boolean success = false; + updateLastExecutionTime(); try { SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); - shutdown(); } finally { + if (state < ST_SHUTTING_DOWN) { + state = ST_SHUTTING_DOWN; + } + // Check if confirmShutdown() was called at the end of the loop. - if (success && lastAccessTimeNanos == 0) { + if (success && gracefulShutdownStartTime == 0) { logger.error( "Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " + @@ -187,11 +195,14 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { /** * Take the next {@link Runnable} from the task queue and so will block if no task is currently present. - * + *

* Be aware that this method will throw an {@link UnsupportedOperationException} if the task queue, which was * created via {@link #newTaskQueue()}, does not implement {@link BlockingQueue}. + *

+ * + * @return {@code null} if the executor thread has been interrupted or waken up. */ - protected Runnable takeTask() throws InterruptedException { + protected Runnable takeTask() { assert inEventLoop(); if (!(taskQueue instanceof BlockingQueue)) { throw new UnsupportedOperationException(); @@ -201,12 +212,25 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { for (;;) { ScheduledFutureTask delayedTask = delayedTaskQueue.peek(); if (delayedTask == null) { - return taskQueue.take(); + Runnable task = null; + try { + task = taskQueue.take(); + if (task == WAKEUP_TASK) { + task = null; + } + } catch (InterruptedException e) { + // Ignore + } + return task; } else { long delayNanos = delayedTask.delayNanos(); Runnable task; if (delayNanos > 0) { - task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS); + try { + task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS); + } catch (InterruptedException e) { + return null; + } } else { task = taskQueue.poll(); } @@ -278,7 +302,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { if (task == null) { throw new NullPointerException("task"); } - if (isTerminated()) { + if (isShutdown()) { reject(); } taskQueue.add(task); @@ -315,6 +339,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { task = pollTask(); if (task == null) { + lastExecutionTime = nanoTime(); return true; } } @@ -331,8 +356,9 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { return false; } - final long deadline = System.nanoTime() + timeoutNanos; + final long deadline = nanoTime() + timeoutNanos; long runTasks = 0; + long lastExecutionTime; for (;;) { try { task.run(); @@ -342,20 +368,23 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { runTasks ++; - // Check timeout every 64 tasks because System.nanoTime() is relatively expensive. + // Check timeout every 64 tasks because nanoTime() is relatively expensive. // XXX: Hard-coded value - will make it configurable if it is really a problem. if ((runTasks & 0x3F) == 0) { - if (System.nanoTime() >= deadline) { + lastExecutionTime = nanoTime(); + if (lastExecutionTime >= deadline) { break; } } task = pollTask(); if (task == null) { + lastExecutionTime = nanoTime(); break; } } + this.lastExecutionTime = lastExecutionTime; return true; } @@ -371,6 +400,17 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { return delayedTask.delayNanos(currentTimeNanos); } + /** + * Updates the internal timestamp that tells when a submitted task was executed most recently. + * {@link #runAllTasks()} and {@link #runAllTasks(long)} updates this timestamp automatically, and thus there's + * usually no need to call this method. However, if you take the tasks manually using {@link #takeTask()} or + * {@link #pollTask()}, you have to call this method at the end of task execution loop for accurate quiet period + * checks. + */ + protected void updateLastExecutionTime() { + lastExecutionTime = nanoTime(); + } + /** * */ @@ -384,8 +424,8 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } protected void wakeup(boolean inEventLoop) { - if (!inEventLoop || state == ST_SHUTDOWN) { - addTask(WAKEUP_TASK); + if (!inEventLoop || state == ST_SHUTTING_DOWN) { + taskQueue.add(WAKEUP_TASK); } } @@ -440,16 +480,74 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { for (Runnable task: copy) { try { task.run(); - ran = true; } catch (Throwable t) { logger.warn("Shutdown hook raised an exception.", t); + } finally { + ran = true; } } } + + if (ran) { + lastExecutionTime = nanoTime(); + } + return ran; } @Override + public void shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + if (quietPeriod < 0) { + throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)"); + } + if (timeout < quietPeriod) { + throw new IllegalArgumentException( + "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + + if (isShuttingDown()) { + return; + } + + boolean inEventLoop = inEventLoop(); + boolean wakeup = true; + + synchronized (stateLock) { + if (isShuttingDown()) { + return; + } + + gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod); + gracefulShutdownTimeout = unit.toNanos(timeout); + + if (inEventLoop) { + assert state == ST_STARTED; + state = ST_SHUTTING_DOWN; + } else { + switch (state) { + case ST_NOT_STARTED: + state = ST_SHUTTING_DOWN; + thread.start(); + break; + case ST_STARTED: + state = ST_SHUTTING_DOWN; + break; + default: + wakeup = false; + } + } + } + + if (wakeup) { + wakeup(inEventLoop); + } + } + + @Override + @Deprecated public void shutdown() { if (isShutdown()) { return; @@ -458,19 +556,22 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { boolean inEventLoop = inEventLoop(); boolean wakeup = true; - if (inEventLoop) { - synchronized (stateLock) { - assert state == ST_STARTED; - state = ST_SHUTDOWN; + synchronized (stateLock) { + if (isShutdown()) { + return; } - } else { - synchronized (stateLock) { + + if (inEventLoop) { + assert state == ST_STARTED || state == ST_SHUTTING_DOWN; + state = ST_SHUTDOWN; + } else { switch (state) { case ST_NOT_STARTED: state = ST_SHUTDOWN; thread.start(); break; case ST_STARTED: + case ST_SHUTTING_DOWN: state = ST_SHUTDOWN; break; default: @@ -485,9 +586,8 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } @Override - public List shutdownNow() { - shutdown(); - return Collections.emptyList(); + public boolean isShuttingDown() { + return state >= ST_SHUTTING_DOWN; } @Override @@ -504,27 +604,38 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { * Confirm that the shutdown if the instance should be done now! */ protected boolean confirmShutdown() { - if (!isShutdown()) { - throw new IllegalStateException("must be invoked after shutdown()"); + if (!isShuttingDown()) { + return false; } + if (!inEventLoop()) { throw new IllegalStateException("must be invoked from an event loop"); } cancelDelayedTasks(); + if (gracefulShutdownStartTime == 0) { + gracefulShutdownStartTime = nanoTime(); + } + if (runAllTasks() || runShutdownHooks()) { - // There were tasks in the queue. Wait a little bit more until no tasks are queued for SHUTDOWN_DELAY_NANOS. - lastAccessTimeNanos = 0; + if (isShutdown()) { + // Executor shut down - no new tasks anymore. + return true; + } + + // There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period. wakeup(true); return false; } - if (lastAccessTimeNanos == 0 || System.nanoTime() - lastAccessTimeNanos < SHUTDOWN_DELAY_NANOS) { - if (lastAccessTimeNanos == 0) { - lastAccessTimeNanos = System.nanoTime(); - } + final long nanoTime = nanoTime(); + if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) { + return true; + } + + if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) { // Check if any tasks were added to the queue every 100ms. // TODO: Change the behavior of takeTask() so that it returns on timeout. wakeup(true); @@ -537,7 +648,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { return false; } - // No tasks were added for last SHUTDOWN_DELAY_NANOS - hopefully safe to shut down. + // No tasks were added for last quiet period - hopefully safe to shut down. // (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.) return true; } @@ -580,16 +691,19 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { throw new NullPointerException("task"); } - if (inEventLoop()) { + boolean inEventLoop = inEventLoop(); + if (inEventLoop) { addTask(task); - wakeup(true); } else { startThread(); addTask(task); - if (isTerminated() && removeTask(task)) { + if (isShutdown() && removeTask(task)) { reject(); } - wakeup(false); + } + + if (!addTaskWakesUp) { + wakeup(inEventLoop); } } diff --git a/example/src/main/java/io/netty/example/applet/AppletDiscardServer.java b/example/src/main/java/io/netty/example/applet/AppletDiscardServer.java index b546dc20d6..e86e7adf53 100644 --- a/example/src/main/java/io/netty/example/applet/AppletDiscardServer.java +++ b/example/src/main/java/io/netty/example/applet/AppletDiscardServer.java @@ -65,10 +65,10 @@ public class AppletDiscardServer extends JApplet { public void destroy() { super.destroy(); if (bossGroup != null) { - bossGroup.shutdown(); + bossGroup.shutdownGracefully(); } if (workerGroup != null) { - workerGroup.shutdown(); + workerGroup.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/discard/DiscardClient.java b/example/src/main/java/io/netty/example/discard/DiscardClient.java index 9c262d6f79..657f28e437 100644 --- a/example/src/main/java/io/netty/example/discard/DiscardClient.java +++ b/example/src/main/java/io/netty/example/discard/DiscardClient.java @@ -50,7 +50,7 @@ public class DiscardClient { // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { - group.shutdown(); + group.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/discard/DiscardServer.java b/example/src/main/java/io/netty/example/discard/DiscardServer.java index 8f993ff7b4..992a3dbc1a 100644 --- a/example/src/main/java/io/netty/example/discard/DiscardServer.java +++ b/example/src/main/java/io/netty/example/discard/DiscardServer.java @@ -56,8 +56,8 @@ public class DiscardServer { // shut down your server. f.channel().closeFuture().sync(); } finally { - workerGroup.shutdown(); - bossGroup.shutdown(); + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/echo/EchoClient.java b/example/src/main/java/io/netty/example/echo/EchoClient.java index a42167f00e..d124ac2899 100644 --- a/example/src/main/java/io/netty/example/echo/EchoClient.java +++ b/example/src/main/java/io/netty/example/echo/EchoClient.java @@ -68,7 +68,7 @@ public class EchoClient { f.channel().closeFuture().sync(); } finally { // Shut down the event loop to terminate all threads. - group.shutdown(); + group.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/echo/EchoServer.java b/example/src/main/java/io/netty/example/echo/EchoServer.java index 6376f4d897..ac6f14398b 100644 --- a/example/src/main/java/io/netty/example/echo/EchoServer.java +++ b/example/src/main/java/io/netty/example/echo/EchoServer.java @@ -63,8 +63,8 @@ public class EchoServer { f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. - bossGroup.shutdown(); - workerGroup.shutdown(); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/factorial/FactorialClient.java b/example/src/main/java/io/netty/example/factorial/FactorialClient.java index 8ffdbe34ab..a7e77cbf2a 100644 --- a/example/src/main/java/io/netty/example/factorial/FactorialClient.java +++ b/example/src/main/java/io/netty/example/factorial/FactorialClient.java @@ -56,7 +56,7 @@ public class FactorialClient { System.err.format( "Factorial of %,d is: %,d", count, handler.getFactorial()); } finally { - group.shutdown(); + group.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/factorial/FactorialServer.java b/example/src/main/java/io/netty/example/factorial/FactorialServer.java index c91840472a..68126785c5 100644 --- a/example/src/main/java/io/netty/example/factorial/FactorialServer.java +++ b/example/src/main/java/io/netty/example/factorial/FactorialServer.java @@ -43,8 +43,8 @@ public class FactorialServer { b.bind(port).sync().channel().closeFuture().sync(); } finally { - bossGroup.shutdown(); - workerGroup.shutdown(); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/filetransfer/FileServer.java b/example/src/main/java/io/netty/example/filetransfer/FileServer.java index 800bd25190..c71ca587ae 100644 --- a/example/src/main/java/io/netty/example/filetransfer/FileServer.java +++ b/example/src/main/java/io/netty/example/filetransfer/FileServer.java @@ -80,8 +80,8 @@ public class FileServer { f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. - bossGroup.shutdown(); - workerGroup.shutdown(); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/http/file/HttpStaticFileServer.java b/example/src/main/java/io/netty/example/http/file/HttpStaticFileServer.java index 4d4b8e5b8d..65a25a7c48 100644 --- a/example/src/main/java/io/netty/example/http/file/HttpStaticFileServer.java +++ b/example/src/main/java/io/netty/example/http/file/HttpStaticFileServer.java @@ -39,8 +39,8 @@ public class HttpStaticFileServer { b.bind(port).sync().channel().closeFuture().sync(); } finally { - bossGroup.shutdown(); - workerGroup.shutdown(); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/http/snoop/HttpSnoopClient.java b/example/src/main/java/io/netty/example/http/snoop/HttpSnoopClient.java index bff5c95ed7..f8496c86e7 100644 --- a/example/src/main/java/io/netty/example/http/snoop/HttpSnoopClient.java +++ b/example/src/main/java/io/netty/example/http/snoop/HttpSnoopClient.java @@ -93,7 +93,7 @@ public class HttpSnoopClient { ch.closeFuture().sync(); } finally { // Shut down executor threads to exit. - group.shutdown(); + group.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServer.java b/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServer.java index a684d91d8f..8c605e257e 100644 --- a/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServer.java +++ b/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServer.java @@ -46,8 +46,8 @@ public class HttpSnoopServer { Channel ch = b.bind(port).sync().channel(); ch.closeFuture().sync(); } finally { - bossGroup.shutdown(); - workerGroup.shutdown(); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/http/upload/HttpUploadClient.java b/example/src/main/java/io/netty/example/http/upload/HttpUploadClient.java index 21f459b886..c9e21aa618 100644 --- a/example/src/main/java/io/netty/example/http/upload/HttpUploadClient.java +++ b/example/src/main/java/io/netty/example/http/upload/HttpUploadClient.java @@ -143,7 +143,7 @@ public class HttpUploadClient { formPostMultipart(b, host, port, uriFile, factory, headers, bodylist); } finally { // Shut down executor threads to exit. - group.shutdown(); + group.shutdownGracefully(); // Really clean all temporary files if they still exist factory.cleanAllHttpDatas(); diff --git a/example/src/main/java/io/netty/example/http/upload/HttpUploadServer.java b/example/src/main/java/io/netty/example/http/upload/HttpUploadServer.java index c21006b427..510c756216 100644 --- a/example/src/main/java/io/netty/example/http/upload/HttpUploadServer.java +++ b/example/src/main/java/io/netty/example/http/upload/HttpUploadServer.java @@ -47,8 +47,8 @@ public class HttpUploadServer { ch.closeFuture().sync(); } finally { - bossGroup.shutdown(); - workerGroup.shutdown(); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/http/websocketx/autobahn/AutobahnServer.java b/example/src/main/java/io/netty/example/http/websocketx/autobahn/AutobahnServer.java index eeae92cd68..457434f359 100644 --- a/example/src/main/java/io/netty/example/http/websocketx/autobahn/AutobahnServer.java +++ b/example/src/main/java/io/netty/example/http/websocketx/autobahn/AutobahnServer.java @@ -46,8 +46,8 @@ public class AutobahnServer { System.out.println("Web Socket Server started at port " + port); f.channel().closeFuture().sync(); } finally { - bossGroup.shutdown(); - workerGroup.shutdown(); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/http/websocketx/client/WebSocketClient.java b/example/src/main/java/io/netty/example/http/websocketx/client/WebSocketClient.java index 3fd9064d31..3e6dd3c047 100644 --- a/example/src/main/java/io/netty/example/http/websocketx/client/WebSocketClient.java +++ b/example/src/main/java/io/netty/example/http/websocketx/client/WebSocketClient.java @@ -119,7 +119,7 @@ public class WebSocketClient { // responds to the CloseWebSocketFrame. ch.closeFuture().sync(); } finally { - group.shutdown(); + group.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/http/websocketx/html5/WebSocketServer.java b/example/src/main/java/io/netty/example/http/websocketx/html5/WebSocketServer.java index 95df93df5e..d0697e24bf 100644 --- a/example/src/main/java/io/netty/example/http/websocketx/html5/WebSocketServer.java +++ b/example/src/main/java/io/netty/example/http/websocketx/html5/WebSocketServer.java @@ -81,8 +81,8 @@ public class WebSocketServer { ch.closeFuture().sync(); } finally { - bossGroup.shutdown(); - workerGroup.shutdown(); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/http/websocketx/server/WebSocketServer.java b/example/src/main/java/io/netty/example/http/websocketx/server/WebSocketServer.java index 85767c30cd..08bfe22544 100644 --- a/example/src/main/java/io/netty/example/http/websocketx/server/WebSocketServer.java +++ b/example/src/main/java/io/netty/example/http/websocketx/server/WebSocketServer.java @@ -63,8 +63,8 @@ public class WebSocketServer { ch.closeFuture().sync(); } finally { - bossGroup.shutdown(); - workerGroup.shutdown(); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/http/websocketx/sslserver/WebSocketSslServer.java b/example/src/main/java/io/netty/example/http/websocketx/sslserver/WebSocketSslServer.java index fd113fdc62..75a4205703 100644 --- a/example/src/main/java/io/netty/example/http/websocketx/sslserver/WebSocketSslServer.java +++ b/example/src/main/java/io/netty/example/http/websocketx/sslserver/WebSocketSslServer.java @@ -61,8 +61,8 @@ public class WebSocketSslServer { System.out.println("Open your browser and navigate to https://localhost:" + port + '/'); ch.closeFuture().sync(); } finally { - bossGroup.shutdown(); - workerGroup.shutdown(); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/localecho/LocalEcho.java b/example/src/main/java/io/netty/example/localecho/LocalEcho.java index cf1fc73e22..a4f340de9a 100644 --- a/example/src/main/java/io/netty/example/localecho/LocalEcho.java +++ b/example/src/main/java/io/netty/example/localecho/LocalEcho.java @@ -105,8 +105,8 @@ public class LocalEcho { lastWriteFuture.awaitUninterruptibly(); } } finally { - serverGroup.shutdown(); - clientGroup.shutdown(); + serverGroup.shutdownGracefully(); + clientGroup.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/objectecho/ObjectEchoClient.java b/example/src/main/java/io/netty/example/objectecho/ObjectEchoClient.java index 34618262ca..49bd948537 100644 --- a/example/src/main/java/io/netty/example/objectecho/ObjectEchoClient.java +++ b/example/src/main/java/io/netty/example/objectecho/ObjectEchoClient.java @@ -60,7 +60,7 @@ public class ObjectEchoClient { // Start the connection attempt. b.connect(host, port).sync().channel().closeFuture().sync(); } finally { - group.shutdown(); + group.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/objectecho/ObjectEchoServer.java b/example/src/main/java/io/netty/example/objectecho/ObjectEchoServer.java index 677169e36c..2e6b852208 100644 --- a/example/src/main/java/io/netty/example/objectecho/ObjectEchoServer.java +++ b/example/src/main/java/io/netty/example/objectecho/ObjectEchoServer.java @@ -57,8 +57,8 @@ public class ObjectEchoServer { // Bind and start to accept incoming connections. b.bind(port).sync().channel().closeFuture().sync(); } finally { - bossGroup.shutdown(); - workerGroup.shutdown(); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/portunification/PortUnificationServer.java b/example/src/main/java/io/netty/example/portunification/PortUnificationServer.java index a061a7c132..7bddf7a805 100644 --- a/example/src/main/java/io/netty/example/portunification/PortUnificationServer.java +++ b/example/src/main/java/io/netty/example/portunification/PortUnificationServer.java @@ -54,8 +54,8 @@ public class PortUnificationServer { // Bind and start to accept incoming connections. b.bind(port).sync().channel().closeFuture().sync(); } finally { - bossGroup.shutdown(); - workerGroup.shutdown(); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/proxy/HexDumpProxy.java b/example/src/main/java/io/netty/example/proxy/HexDumpProxy.java index 78702a1300..05322b3764 100644 --- a/example/src/main/java/io/netty/example/proxy/HexDumpProxy.java +++ b/example/src/main/java/io/netty/example/proxy/HexDumpProxy.java @@ -49,8 +49,8 @@ public class HexDumpProxy { .childOption(ChannelOption.AUTO_READ, false) .bind(localPort).sync().channel().closeFuture().sync(); } finally { - bossGroup.shutdown(); - workerGroup.shutdown(); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/qotm/QuoteOfTheMomentClient.java b/example/src/main/java/io/netty/example/qotm/QuoteOfTheMomentClient.java index 79dcb36687..d4ca08c7a8 100644 --- a/example/src/main/java/io/netty/example/qotm/QuoteOfTheMomentClient.java +++ b/example/src/main/java/io/netty/example/qotm/QuoteOfTheMomentClient.java @@ -64,7 +64,7 @@ public class QuoteOfTheMomentClient { System.err.println("QOTM request timed out."); } } finally { - group.shutdown(); + group.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/qotm/QuoteOfTheMomentServer.java b/example/src/main/java/io/netty/example/qotm/QuoteOfTheMomentServer.java index 595ff85b3b..72f67604a7 100644 --- a/example/src/main/java/io/netty/example/qotm/QuoteOfTheMomentServer.java +++ b/example/src/main/java/io/netty/example/qotm/QuoteOfTheMomentServer.java @@ -46,7 +46,7 @@ public class QuoteOfTheMomentServer { b.bind(port).sync().channel().closeFuture().await(); } finally { - group.shutdown(); + group.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/rxtx/RxtxClient.java b/example/src/main/java/io/netty/example/rxtx/RxtxClient.java index c70f7f34cb..834d723f1a 100644 --- a/example/src/main/java/io/netty/example/rxtx/RxtxClient.java +++ b/example/src/main/java/io/netty/example/rxtx/RxtxClient.java @@ -54,7 +54,7 @@ public final class RxtxClient { f.channel().closeFuture().sync(); } finally { - group.shutdown(); + group.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/sctp/NioSctpEchoClient.java b/example/src/main/java/io/netty/example/sctp/NioSctpEchoClient.java index cde8a77752..c7eecbda2d 100644 --- a/example/src/main/java/io/netty/example/sctp/NioSctpEchoClient.java +++ b/example/src/main/java/io/netty/example/sctp/NioSctpEchoClient.java @@ -70,7 +70,7 @@ public class NioSctpEchoClient { f.channel().closeFuture().sync(); } finally { // Shut down the event loop to terminate all threads. - group.shutdown(); + group.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/sctp/NioSctpEchoServer.java b/example/src/main/java/io/netty/example/sctp/NioSctpEchoServer.java index 5de6cfc761..638a202ed9 100644 --- a/example/src/main/java/io/netty/example/sctp/NioSctpEchoServer.java +++ b/example/src/main/java/io/netty/example/sctp/NioSctpEchoServer.java @@ -63,8 +63,8 @@ public class NioSctpEchoServer { f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. - bossGroup.shutdown(); - workerGroup.shutdown(); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/sctp/OioSctpEchoClient.java b/example/src/main/java/io/netty/example/sctp/OioSctpEchoClient.java index 2c777664c7..26f94ff303 100644 --- a/example/src/main/java/io/netty/example/sctp/OioSctpEchoClient.java +++ b/example/src/main/java/io/netty/example/sctp/OioSctpEchoClient.java @@ -70,7 +70,7 @@ public class OioSctpEchoClient { f.channel().closeFuture().sync(); } finally { // Shut down the event loop to terminate all threads. - group.shutdown(); + group.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/sctp/OioSctpEchoServer.java b/example/src/main/java/io/netty/example/sctp/OioSctpEchoServer.java index 714129a893..27d50b2b7e 100644 --- a/example/src/main/java/io/netty/example/sctp/OioSctpEchoServer.java +++ b/example/src/main/java/io/netty/example/sctp/OioSctpEchoServer.java @@ -63,8 +63,8 @@ public class OioSctpEchoServer { f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. - bossGroup.shutdown(); - workerGroup.shutdown(); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/securechat/SecureChatClient.java b/example/src/main/java/io/netty/example/securechat/SecureChatClient.java index 961564e6eb..a4dc9045dd 100644 --- a/example/src/main/java/io/netty/example/securechat/SecureChatClient.java +++ b/example/src/main/java/io/netty/example/securechat/SecureChatClient.java @@ -76,7 +76,7 @@ public class SecureChatClient { } } finally { // The connection is closed automatically on shutdown. - group.shutdown(); + group.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/securechat/SecureChatServer.java b/example/src/main/java/io/netty/example/securechat/SecureChatServer.java index 858d8280f1..b157096757 100644 --- a/example/src/main/java/io/netty/example/securechat/SecureChatServer.java +++ b/example/src/main/java/io/netty/example/securechat/SecureChatServer.java @@ -43,8 +43,8 @@ public class SecureChatServer { b.bind(port).sync().channel().closeFuture().sync(); } finally { - bossGroup.shutdown(); - workerGroup.shutdown(); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/socksproxy/SocksServer.java b/example/src/main/java/io/netty/example/socksproxy/SocksServer.java index f8757f8626..ca11e38d62 100644 --- a/example/src/main/java/io/netty/example/socksproxy/SocksServer.java +++ b/example/src/main/java/io/netty/example/socksproxy/SocksServer.java @@ -39,8 +39,8 @@ public final class SocksServer { .childHandler(new SocksServerInitializer()); b.bind(localPort).sync().channel().closeFuture().sync(); } finally { - bossGroup.shutdown(); - workerGroup.shutdown(); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/telnet/TelnetClient.java b/example/src/main/java/io/netty/example/telnet/TelnetClient.java index acf3760214..193ad21f4e 100644 --- a/example/src/main/java/io/netty/example/telnet/TelnetClient.java +++ b/example/src/main/java/io/netty/example/telnet/TelnetClient.java @@ -74,7 +74,7 @@ public class TelnetClient { lastWriteFuture.sync(); } } finally { - group.shutdown(); + group.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/telnet/TelnetServer.java b/example/src/main/java/io/netty/example/telnet/TelnetServer.java index d49651c737..6d319366ad 100644 --- a/example/src/main/java/io/netty/example/telnet/TelnetServer.java +++ b/example/src/main/java/io/netty/example/telnet/TelnetServer.java @@ -42,8 +42,8 @@ public class TelnetServer { b.bind(port).sync().channel().closeFuture().sync(); } finally { - bossGroup.shutdown(); - workerGroup.shutdown(); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/udt/echo/bytes/ByteEchoClient.java b/example/src/main/java/io/netty/example/udt/echo/bytes/ByteEchoClient.java index 5d3322cd44..f2ac3a0164 100644 --- a/example/src/main/java/io/netty/example/udt/echo/bytes/ByteEchoClient.java +++ b/example/src/main/java/io/netty/example/udt/echo/bytes/ByteEchoClient.java @@ -77,7 +77,7 @@ public class ByteEchoClient { f.channel().closeFuture().sync(); } finally { // Shut down the event loop to terminate all threads. - connectGroup.shutdown(); + connectGroup.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/udt/echo/bytes/ByteEchoServer.java b/example/src/main/java/io/netty/example/udt/echo/bytes/ByteEchoServer.java index c5ac3e59b7..6261e1e122 100644 --- a/example/src/main/java/io/netty/example/udt/echo/bytes/ByteEchoServer.java +++ b/example/src/main/java/io/netty/example/udt/echo/bytes/ByteEchoServer.java @@ -73,8 +73,8 @@ public class ByteEchoServer { future.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. - acceptGroup.shutdown(); - connectGroup.shutdown(); + acceptGroup.shutdownGracefully(); + connectGroup.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/udt/echo/message/MsgEchoClient.java b/example/src/main/java/io/netty/example/udt/echo/message/MsgEchoClient.java index 3861740e24..0f5fb9c290 100644 --- a/example/src/main/java/io/netty/example/udt/echo/message/MsgEchoClient.java +++ b/example/src/main/java/io/netty/example/udt/echo/message/MsgEchoClient.java @@ -77,7 +77,7 @@ public class MsgEchoClient { f.channel().closeFuture().sync(); } finally { // Shut down the event loop to terminate all threads. - connectGroup.shutdown(); + connectGroup.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/udt/echo/message/MsgEchoServer.java b/example/src/main/java/io/netty/example/udt/echo/message/MsgEchoServer.java index 9439876bcd..950f02a88d 100644 --- a/example/src/main/java/io/netty/example/udt/echo/message/MsgEchoServer.java +++ b/example/src/main/java/io/netty/example/udt/echo/message/MsgEchoServer.java @@ -73,8 +73,8 @@ public class MsgEchoServer { future.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. - acceptGroup.shutdown(); - connectGroup.shutdown(); + acceptGroup.shutdownGracefully(); + connectGroup.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/udt/echo/rendezvous/MsgEchoPeerBase.java b/example/src/main/java/io/netty/example/udt/echo/rendezvous/MsgEchoPeerBase.java index 32119f1221..cb2984b01a 100644 --- a/example/src/main/java/io/netty/example/udt/echo/rendezvous/MsgEchoPeerBase.java +++ b/example/src/main/java/io/netty/example/udt/echo/rendezvous/MsgEchoPeerBase.java @@ -70,7 +70,7 @@ public abstract class MsgEchoPeerBase { f.channel().closeFuture().sync(); } finally { // Shut down the event loop to terminate all threads. - connectGroup.shutdown(); + connectGroup.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/udt/echo/rendezvousBytes/ByteEchoPeerBase.java b/example/src/main/java/io/netty/example/udt/echo/rendezvousBytes/ByteEchoPeerBase.java index 867186f854..d1d973f243 100644 --- a/example/src/main/java/io/netty/example/udt/echo/rendezvousBytes/ByteEchoPeerBase.java +++ b/example/src/main/java/io/netty/example/udt/echo/rendezvousBytes/ByteEchoPeerBase.java @@ -67,7 +67,7 @@ public class ByteEchoPeerBase { final ChannelFuture future = bootstrap.connect(peerAddress, myAddress).sync(); future.channel().closeFuture().sync(); } finally { - connectGroup.shutdown(); + connectGroup.shutdownGracefully(); } } } diff --git a/example/src/main/java/io/netty/example/worldclock/WorldClockClient.java b/example/src/main/java/io/netty/example/worldclock/WorldClockClient.java index 1517ab9e53..48d1d7116d 100644 --- a/example/src/main/java/io/netty/example/worldclock/WorldClockClient.java +++ b/example/src/main/java/io/netty/example/worldclock/WorldClockClient.java @@ -72,7 +72,7 @@ public class WorldClockClient { System.out.format("%28s: %s%n", i1.next(), i2.next()); } } finally { - group.shutdown(); + group.shutdownGracefully(); } } diff --git a/example/src/main/java/io/netty/example/worldclock/WorldClockServer.java b/example/src/main/java/io/netty/example/worldclock/WorldClockServer.java index f1a4640533..61dc3be478 100644 --- a/example/src/main/java/io/netty/example/worldclock/WorldClockServer.java +++ b/example/src/main/java/io/netty/example/worldclock/WorldClockServer.java @@ -43,8 +43,8 @@ public class WorldClockServer { b.bind(port).sync().channel().closeFuture().sync(); } finally { - bossGroup.shutdown(); - workerGroup.shutdown(); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); } } diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java index 4876cd5209..7a6ad9fa7c 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java @@ -55,7 +55,7 @@ public class SocketEchoTest extends AbstractSocketTest { @AfterClass public static void destroyGroup() { - group.shutdown(); + group.shutdownGracefully(); } @Test(timeout = 30000) diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java index 1d06f2ff93..5a1d6b833d 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java @@ -23,8 +23,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundMessageHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; -import io.netty.util.concurrent.DefaultEventExecutorGroup; -import io.netty.util.concurrent.EventExecutorGroup; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; @@ -33,6 +31,8 @@ import io.netty.handler.logging.ByteLoggingHandler; import io.netty.handler.logging.LogLevel; import io.netty.handler.ssl.SslHandler; import io.netty.testsuite.util.BogusSslContextFactory; +import io.netty.util.concurrent.DefaultEventExecutorGroup; +import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.Future; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -56,7 +56,7 @@ public class SocketStartTlsTest extends AbstractSocketTest { @AfterClass public static void shutdownExecutor() { - executor.shutdown(); + executor.shutdownGracefully(); } @Test(timeout = 30000) diff --git a/transport-udt/src/test/java/io/netty/test/udt/bench/xfer/UdtNetty.java b/transport-udt/src/test/java/io/netty/test/udt/bench/xfer/UdtNetty.java index d5bc37a32c..c138497642 100644 --- a/transport-udt/src/test/java/io/netty/test/udt/bench/xfer/UdtNetty.java +++ b/transport-udt/src/test/java/io/netty/test/udt/bench/xfer/UdtNetty.java @@ -123,8 +123,8 @@ public final class UdtNetty { Thread.sleep(1000); - group1.shutdown(); - group2.shutdown(); + group1.shutdownGracefully(); + group2.shutdownGracefully(); Metrics.defaultRegistry().shutdown(); diff --git a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtByteRendezvousChannelTest.java b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtByteRendezvousChannelTest.java index d9ca9ffe54..dd9fe080c5 100644 --- a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtByteRendezvousChannelTest.java +++ b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtByteRendezvousChannelTest.java @@ -113,7 +113,7 @@ public class NioUdtByteRendezvousChannelTest extends AbstractUdtTest { assertEquals(handler1.meter().count(), handler2.meter().count()); - group1.shutdown(); - group2.shutdown(); + group1.shutdownGracefully(); + group2.shutdownGracefully(); } } diff --git a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtMessageRendezvousChannelTest.java b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtMessageRendezvousChannelTest.java index b5500cfb06..073943eb0d 100644 --- a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtMessageRendezvousChannelTest.java +++ b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtMessageRendezvousChannelTest.java @@ -107,7 +107,7 @@ public class NioUdtMessageRendezvousChannelTest extends AbstractUdtTest { assertEquals(handler1.meter().count(), handler2.meter().count()); - group1.shutdown(); - group2.shutdown(); + group1.shutdownGracefully(); + group2.shutdownGracefully(); } } diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java index 3abfd69ce1..58a6ac1936 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java @@ -25,9 +25,8 @@ import java.util.concurrent.ThreadFactory; */ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop { - protected SingleThreadEventLoop( - EventLoopGroup parent, ThreadFactory threadFactory) { - super(parent, threadFactory); + protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) { + super(parent, threadFactory, addTaskWakesUp); } @Override diff --git a/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoop.java b/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoop.java index 7328e4707e..b2aec5fb66 100644 --- a/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoop.java +++ b/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoop.java @@ -26,7 +26,7 @@ public class ThreadPerChannelEventLoop extends SingleThreadEventLoop { private Channel ch; public ThreadPerChannelEventLoop(ThreadPerChannelEventLoopGroup parent) { - super(parent, parent.threadFactory); + super(parent, parent.threadFactory, true); this.parent = parent; } @@ -48,16 +48,14 @@ public class ThreadPerChannelEventLoop extends SingleThreadEventLoop { @Override protected void run() { for (;;) { - Runnable task; - try { - task = takeTask(); + Runnable task = takeTask(); + if (task != null) { task.run(); - } catch (InterruptedException e) { - // Waken up by interruptThread() + updateLastExecutionTime(); } Channel ch = this.ch; - if (isShutdown()) { + if (isShuttingDown()) { if (ch != null) { ch.unsafe().close(ch.unsafe().voidFuture()); } diff --git a/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoopGroup.java b/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoopGroup.java index f28a040d17..120f53d2e6 100644 --- a/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoopGroup.java @@ -117,6 +117,17 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i } @Override + public void shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + for (EventLoop l: activeChildren) { + l.shutdownGracefully(quietPeriod, timeout, unit); + } + for (EventLoop l: idleChildren) { + l.shutdownGracefully(quietPeriod, timeout, unit); + } + } + + @Override + @Deprecated public void shutdown() { for (EventLoop l: activeChildren) { l.shutdown(); @@ -126,6 +137,21 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i } } + @Override + public boolean isShuttingDown() { + for (EventLoop l: activeChildren) { + if (!l.isShuttingDown()) { + return false; + } + } + for (EventLoop l: idleChildren) { + if (!l.isShuttingDown()) { + return false; + } + } + return true; + } + @Override public boolean isShutdown() { for (EventLoop l: activeChildren) { diff --git a/transport/src/main/java/io/netty/channel/aio/AioEventLoop.java b/transport/src/main/java/io/netty/channel/aio/AioEventLoop.java index 16ca929fae..a371953be7 100644 --- a/transport/src/main/java/io/netty/channel/aio/AioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/aio/AioEventLoop.java @@ -59,7 +59,7 @@ final class AioEventLoop extends SingleThreadEventLoop { }; AioEventLoop(AioEventLoopGroup parent, ThreadFactory threadFactory) { - super(parent, threadFactory); + super(parent, threadFactory, true); } @Override @@ -75,15 +75,13 @@ final class AioEventLoop extends SingleThreadEventLoop { @Override protected void run() { for (;;) { - Runnable task; - try { - task = takeTask(); + Runnable task = takeTask(); + if (task != null) { task.run(); - } catch (InterruptedException e) { - // Waken up by interruptThread() + updateLastExecutionTime(); } - if (isShutdown()) { + if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { break; diff --git a/transport/src/main/java/io/netty/channel/aio/AioEventLoopGroup.java b/transport/src/main/java/io/netty/channel/aio/AioEventLoopGroup.java index 4124a84bd2..a08a791863 100644 --- a/transport/src/main/java/io/netty/channel/aio/AioEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/aio/AioEventLoopGroup.java @@ -73,6 +73,8 @@ public class AioEventLoopGroup extends MultithreadEventLoopGroup { } @Override + @Deprecated + @SuppressWarnings("deprecation") public void shutdown() { boolean interrupted = false; diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java index 2996660e53..4f2ea60ee1 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java @@ -23,8 +23,6 @@ import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.AbstractEventExecutor; import java.util.ArrayDeque; -import java.util.Collections; -import java.util.List; import java.util.Queue; import java.util.concurrent.TimeUnit; @@ -52,13 +50,15 @@ final class EmbeddedEventLoop extends AbstractEventExecutor implements EventLoop } @Override - public void shutdown() { - // NOOP - } + public void shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { } @Override - public List shutdownNow() { - return Collections.emptyList(); + @Deprecated + public void shutdown() { } + + @Override + public boolean isShuttingDown() { + return false; } @Override diff --git a/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java b/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java index bb5b5096c6..4d0dbe5923 100644 --- a/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java +++ b/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java @@ -21,21 +21,16 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.FileRegion; import io.netty.channel.ServerChannel; -import io.netty.util.concurrent.AbstractEventExecutor; import io.netty.util.concurrent.EventExecutor; -import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.internal.PlatformDependent; import java.util.AbstractSet; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.Iterator; import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** @@ -312,54 +307,4 @@ public class DefaultChannelGroup extends AbstractSet implements Channel return getClass().getSimpleName() + "(name: " + name() + ", size: " + size() + ')'; } - - static final class ImmediateEventExecutor extends AbstractEventExecutor { - - @Override - public EventExecutorGroup parent() { - return null; - } - - @Override - public boolean inEventLoop() { - return true; - } - - @Override - public boolean inEventLoop(Thread thread) { - return true; - } - - @Override - public void shutdown() { - } - - @Override - public boolean isShutdown() { - return false; - } - - @Override - public boolean isTerminated() { - return false; - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) { - return false; - } - - @Override - public List shutdownNow() { - return Collections.emptyList(); - } - - @Override - public void execute(Runnable command) { - if (command == null) { - throw new NullPointerException("command"); - } - command.run(); - } - } } diff --git a/transport/src/main/java/io/netty/channel/group/DefaultChannelGroupFuture.java b/transport/src/main/java/io/netty/channel/group/DefaultChannelGroupFuture.java index b7726f9eed..d30823bc56 100644 --- a/transport/src/main/java/io/netty/channel/group/DefaultChannelGroupFuture.java +++ b/transport/src/main/java/io/netty/channel/group/DefaultChannelGroupFuture.java @@ -235,7 +235,7 @@ final class DefaultChannelGroupFuture extends DefaultPromise implements Ch @Override protected void checkDeadLock() { EventExecutor e = executor(); - if (e != null && !(e instanceof DefaultChannelGroup.ImmediateEventExecutor) && e.inEventLoop()) { + if (e != null && !(e instanceof ImmediateEventExecutor) && e.inEventLoop()) { throw new BlockingOperationException(); } } diff --git a/transport/src/main/java/io/netty/channel/group/ImmediateEventExecutor.java b/transport/src/main/java/io/netty/channel/group/ImmediateEventExecutor.java index 84df873aba..31c37459ba 100644 --- a/transport/src/main/java/io/netty/channel/group/ImmediateEventExecutor.java +++ b/transport/src/main/java/io/netty/channel/group/ImmediateEventExecutor.java @@ -21,8 +21,6 @@ import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.Promise; -import java.util.Collections; -import java.util.List; import java.util.concurrent.TimeUnit; final class ImmediateEventExecutor extends AbstractEventExecutor { @@ -43,7 +41,15 @@ final class ImmediateEventExecutor extends AbstractEventExecutor { } @Override - public void shutdown() { + public void shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { } + + @Override + @Deprecated + public void shutdown() { } + + @Override + public boolean isShuttingDown() { + return false; } @Override @@ -61,11 +67,6 @@ final class ImmediateEventExecutor extends AbstractEventExecutor { return false; } - @Override - public List shutdownNow() { - return Collections.emptyList(); - } - @Override public void execute(Runnable command) { if (command == null) { diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java index 8819a586e7..bd9980050f 100755 --- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java @@ -185,10 +185,12 @@ public class LocalChannel extends AbstractChannel { } // Update all internal state before the closeFuture is notified. - if (parent() == null) { - LocalChannelRegistry.unregister(localAddress); + if (localAddress != null) { + if (parent() == null) { + LocalChannelRegistry.unregister(localAddress); + } + localAddress = null; } - localAddress = null; state = 3; } diff --git a/transport/src/main/java/io/netty/channel/local/LocalEventLoop.java b/transport/src/main/java/io/netty/channel/local/LocalEventLoop.java index 46c645a038..418d5b2f96 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalEventLoop.java +++ b/transport/src/main/java/io/netty/channel/local/LocalEventLoop.java @@ -22,21 +22,19 @@ import java.util.concurrent.ThreadFactory; final class LocalEventLoop extends SingleThreadEventLoop { LocalEventLoop(LocalEventLoopGroup parent, ThreadFactory threadFactory) { - super(parent, threadFactory); + super(parent, threadFactory, true); } @Override protected void run() { for (;;) { - Runnable task; - try { - task = takeTask(); + Runnable task = takeTask(); + if (task != null) { task.run(); - } catch (InterruptedException e) { - // Waken up by interruptThread() + updateLastExecutionTime(); } - if (isShutdown() && confirmShutdown()) { + if (confirmShutdown()) { break; } } diff --git a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java index edbe583f37..66551d556a 100644 --- a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java @@ -48,11 +48,7 @@ import java.util.concurrent.atomic.AtomicBoolean; */ public final class NioEventLoop extends SingleThreadEventLoop { - /** - * Internal Netty logger. - */ - private static final InternalLogger logger = - InternalLoggerFactory.getInstance(NioEventLoop.class); + private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioEventLoop.class); private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization. @@ -109,9 +105,8 @@ public final class NioEventLoop extends SingleThreadEventLoop { private int cancelledKeys; private boolean needsToSelectAgain; - NioEventLoop( - NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) { - super(parent, threadFactory); + NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) { + super(parent, threadFactory, false); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } @@ -330,7 +325,7 @@ public final class NioEventLoop extends SingleThreadEventLoop { final int ioRatio = this.ioRatio; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); - if (isShutdown()) { + if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { break; @@ -438,7 +433,7 @@ public final class NioEventLoop extends SingleThreadEventLoop { } } if ((readyOps & SelectionKey.OP_WRITE) != 0) { - processWritable(k, ch); + processWritable(ch); } if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking @@ -457,7 +452,7 @@ public final class NioEventLoop extends SingleThreadEventLoop { } } - private static void processWritable(SelectionKey k, AbstractNioChannel ch) { + private static void processWritable(AbstractNioChannel ch) { NioTask task; for (;;) { task = ch.writableTasks.poll(); diff --git a/transport/src/test/java/io/netty/bootstrap/BootstrapTest.java b/transport/src/test/java/io/netty/bootstrap/BootstrapTest.java index 25b252fd97..ad630b5d37 100644 --- a/transport/src/test/java/io/netty/bootstrap/BootstrapTest.java +++ b/transport/src/test/java/io/netty/bootstrap/BootstrapTest.java @@ -73,8 +73,8 @@ public class BootstrapTest { f.sync(); } } finally { - groupA.shutdown(); - groupB.shutdown(); + groupA.shutdownGracefully(); + groupB.shutdownGracefully(); } } @@ -119,8 +119,8 @@ public class BootstrapTest { f.sync(); } } finally { - groupA.shutdown(); - groupB.shutdown(); + groupA.shutdownGracefully(); + groupB.shutdownGracefully(); } } diff --git a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java index 1cd61fe90d..27bf741351 100644 --- a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java @@ -16,6 +16,7 @@ package io.netty.channel; import io.netty.channel.local.LocalChannel; +import io.netty.util.concurrent.EventExecutor; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -36,6 +37,11 @@ import static org.junit.Assert.*; public class SingleThreadEventLoopTest { + private static final Runnable NOOP = new Runnable() { + @Override + public void run() { } + }; + private SingleThreadEventLoopA loopA; private SingleThreadEventLoopB loopB; @@ -47,11 +53,11 @@ public class SingleThreadEventLoopTest { @After public void stopEventLoop() { - if (!loopA.isShutdown()) { - loopA.shutdown(); + if (!loopA.isShuttingDown()) { + loopA.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); } - if (!loopB.isShutdown()) { - loopB.shutdown(); + if (!loopB.isShuttingDown()) { + loopB.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); } while (!loopA.isTerminated()) { @@ -73,11 +79,14 @@ public class SingleThreadEventLoopTest { } @Test + @SuppressWarnings("deprecation") public void shutdownBeforeStart() throws Exception { loopA.shutdown(); + assertRejection(loopA); } @Test + @SuppressWarnings("deprecation") public void shutdownAfterStart() throws Exception { final CountDownLatch latch = new CountDownLatch(1); loopA.execute(new Runnable() { @@ -92,6 +101,7 @@ public class SingleThreadEventLoopTest { // Request the event loop thread to stop. loopA.shutdown(); + assertRejection(loopA); assertTrue(loopA.isShutdown()); @@ -101,6 +111,15 @@ public class SingleThreadEventLoopTest { } } + private static void assertRejection(EventExecutor loop) { + try { + loop.execute(NOOP); + fail("A task must be rejected after shutdown() is called."); + } catch (RejectedExecutionException e) { + // Expected + } + } + @Test public void scheduleTaskA() throws Exception { testScheduleTask(loopA); @@ -254,6 +273,7 @@ public class SingleThreadEventLoopTest { } @Test + @SuppressWarnings("deprecation") public void shutdownWithPendingTasks() throws Exception { final int NUM_TASKS = 3; final AtomicInteger ranTasks = new AtomicInteger(); @@ -298,12 +318,9 @@ public class SingleThreadEventLoopTest { } @Test(timeout = 10000) - public void testRegistrationAfterTermination() throws Exception { + @SuppressWarnings("deprecation") + public void testRegistrationAfterShutdown() throws Exception { loopA.shutdown(); - while (!loopA.isTerminated()) { - loopA.awaitTermination(1, TimeUnit.DAYS); - } - ChannelFuture f = loopA.register(new LocalChannel()); f.awaitUninterruptibly(); assertFalse(f.isSuccess()); @@ -311,12 +328,9 @@ public class SingleThreadEventLoopTest { } @Test(timeout = 10000) - public void testRegistrationAfterTermination2() throws Exception { + @SuppressWarnings("deprecation") + public void testRegistrationAfterShutdown2() throws Exception { loopA.shutdown(); - while (!loopA.isTerminated()) { - loopA.awaitTermination(1, TimeUnit.DAYS); - } - final CountDownLatch latch = new CountDownLatch(1); Channel ch = new LocalChannel(); ChannelPromise promise = ch.newPromise(); @@ -336,26 +350,69 @@ public class SingleThreadEventLoopTest { assertFalse(latch.await(1, TimeUnit.SECONDS)); } + @Test(timeout = 5000) + public void testGracefulShutdownQuietPeriod() throws Exception { + loopA.shutdownGracefully(1, Integer.MAX_VALUE, TimeUnit.SECONDS); + // Keep Scheduling tasks for another 2 seconds. + for (int i = 0; i < 20; i ++) { + Thread.sleep(100); + loopA.execute(NOOP); + } + + long startTime = System.nanoTime(); + + assertThat(loopA.isShuttingDown(), is(true)); + assertThat(loopA.isShutdown(), is(false)); + + while (!loopA.isTerminated()) { + loopA.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); + } + + assertTrue(System.nanoTime() - startTime >= TimeUnit.SECONDS.toNanos(1)); + } + + @Test(timeout = 5000) + public void testGracefulShutdownTimeout() throws Exception { + loopA.shutdownGracefully(2, 2, TimeUnit.SECONDS); + // Keep Scheduling tasks for another 3 seconds. + // Submitted tasks must be rejected after 2 second timeout. + for (int i = 0; i < 10; i ++) { + Thread.sleep(100); + loopA.execute(NOOP); + } + + try { + for (int i = 0; i < 20; i ++) { + Thread.sleep(100); + loopA.execute(NOOP); + } + fail("shutdownGracefully() must reject a task after timeout."); + } catch (RejectedExecutionException e) { + // Expected + } + + assertThat(loopA.isShuttingDown(), is(true)); + assertThat(loopA.isShutdown(), is(true)); + } + private static class SingleThreadEventLoopA extends SingleThreadEventLoop { final AtomicInteger cleanedUp = new AtomicInteger(); SingleThreadEventLoopA() { - super(null, Executors.defaultThreadFactory()); + super(null, Executors.defaultThreadFactory(), true); } @Override protected void run() { for (;;) { - Runnable task; - try { - task = takeTask(); + Runnable task = takeTask(); + if (task != null) { task.run(); - } catch (InterruptedException e) { - // Waken up by interruptThread() + updateLastExecutionTime(); } - if (isShutdown() && confirmShutdown()) { + if (confirmShutdown()) { break; } } @@ -370,7 +427,7 @@ public class SingleThreadEventLoopTest { private static class SingleThreadEventLoopB extends SingleThreadEventLoop { SingleThreadEventLoopB() { - super(null, Executors.defaultThreadFactory()); + super(null, Executors.defaultThreadFactory(), false); } @Override @@ -384,7 +441,7 @@ public class SingleThreadEventLoopTest { runAllTasks(); - if (isShutdown() && confirmShutdown()) { + if (confirmShutdown()) { break; } } diff --git a/transport/src/test/java/io/netty/channel/group/DefaultChannnelGroupTest.java b/transport/src/test/java/io/netty/channel/group/DefaultChannnelGroupTest.java index aea879edcb..99876ae266 100644 --- a/transport/src/test/java/io/netty/channel/group/DefaultChannnelGroupTest.java +++ b/transport/src/test/java/io/netty/channel/group/DefaultChannnelGroupTest.java @@ -55,7 +55,7 @@ public class DefaultChannnelGroupTest { allChannels.close().awaitUninterruptibly(); } - bossGroup.shutdown(); - workerGroup.shutdown(); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); } } diff --git a/transport/src/test/java/io/netty/channel/local/LocalChannelRegistryTest.java b/transport/src/test/java/io/netty/channel/local/LocalChannelRegistryTest.java index f5682cdb66..17a1bc9393 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalChannelRegistryTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalChannelRegistryTest.java @@ -80,8 +80,8 @@ public class LocalChannelRegistryTest { // Close the channel cc.close().sync(); - serverGroup.shutdown(); - clientGroup.shutdown(); + serverGroup.shutdownGracefully(); + clientGroup.shutdownGracefully(); sc.closeFuture().sync(); diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java index db1677f09c..43fa14d6e8 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java @@ -76,7 +76,7 @@ public class LocalTransportThreadModelTest { @AfterClass public static void destroy() { - group.shutdown(); + group.shutdownGracefully(); } @Test(timeout = 30000) @@ -195,9 +195,9 @@ public class LocalTransportThreadModelTest { System.out.println("H3O: " + h3.outboundThreadNames); throw e; } finally { - l.shutdown(); - e1.shutdown(); - e2.shutdown(); + l.shutdownGracefully(); + e1.shutdownGracefully(); + e2.shutdownGracefully(); l.awaitTermination(5, TimeUnit.SECONDS); e1.awaitTermination(5, TimeUnit.SECONDS); e2.awaitTermination(5, TimeUnit.SECONDS); @@ -320,12 +320,12 @@ public class LocalTransportThreadModelTest { ch.close().sync(); } finally { - l.shutdown(); - e1.shutdown(); - e2.shutdown(); - e3.shutdown(); - e4.shutdown(); - e5.shutdown(); + l.shutdownGracefully(); + e1.shutdownGracefully(); + e2.shutdownGracefully(); + e3.shutdownGracefully(); + e4.shutdownGracefully(); + e5.shutdownGracefully(); } } diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java index cf8851870e..ccc01297b2 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java @@ -82,7 +82,7 @@ public class LocalTransportThreadModelTest3 { @AfterClass public static void destroy() { - group.shutdown(); + group.shutdownGracefully(); } @Test(timeout = 60000) @@ -214,12 +214,12 @@ public class LocalTransportThreadModelTest3 { Assert.assertEquals(event, expectedEvents.poll()); } } finally { - l.shutdown(); - e1.shutdown(); - e2.shutdown(); - e3.shutdown(); - e4.shutdown(); - e5.shutdown(); + l.shutdownGracefully(); + e1.shutdownGracefully(); + e2.shutdownGracefully(); + e3.shutdownGracefully(); + e4.shutdownGracefully(); + e5.shutdownGracefully(); } }