From 79e236dfc2eebbe025b80bc7b410609c548c8bad Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Wed, 12 Jun 2013 08:00:54 +0900 Subject: [PATCH] Make EventExecutor.shutdownGracefully() return Future - Also added EventExecutor.terminationFuture() - Also fixed type signature problem with Future.add/removeListener() - Related issue: #1389 --- .../concurrent/AbstractEventExecutor.java | 4 +- .../AbstractEventExecutorGroup.java | 4 +- .../netty/util/concurrent/CompleteFuture.java | 10 ++-- .../util/concurrent/CompletePromise.java | 8 ++-- .../concurrent/DefaultProgressivePromise.java | 8 ++-- .../netty/util/concurrent/DefaultPromise.java | 12 ++--- .../util/concurrent/EventExecutorGroup.java | 13 +++++- .../java/io/netty/util/concurrent/Future.java | 8 ++-- .../util/concurrent/GlobalEventExecutor.java | 11 ++++- .../concurrent/ImmediateEventExecutor.java | 12 ++++- .../MultithreadEventExecutorGroup.java | 23 +++++++++- .../util/concurrent/ProgressiveFuture.java | 8 ++-- .../util/concurrent/ProgressivePromise.java | 8 ++-- .../io/netty/util/concurrent/Promise.java | 8 ++-- .../concurrent/SingleThreadEventExecutor.java | 17 +++++-- .../transport/socket/SocketEchoTest.java | 4 +- .../transport/socket/SocketStartTlsTest.java | 4 +- .../udt/UDTClientServerConnectionTest.java | 5 +- .../nio/NioUdtByteRendezvousChannelTest.java | 5 +- .../NioUdtMessageRendezvousChannelTest.java | 5 +- .../java/io/netty/channel/ChannelFuture.java | 8 ++-- .../channel/ChannelProgressiveFuture.java | 8 ++-- .../channel/ChannelProgressivePromise.java | 8 ++-- .../java/io/netty/channel/ChannelPromise.java | 8 ++-- .../netty/channel/CompleteChannelFuture.java | 8 ++-- .../netty/channel/CompleteChannelPromise.java | 8 ++-- .../DefaultChannelProgressivePromise.java | 9 ++-- .../netty/channel/DefaultChannelPromise.java | 8 ++-- .../ThreadPerChannelEventLoopGroup.java | 46 ++++++++++++++++++- .../io/netty/channel/VoidChannelPromise.java | 8 ++-- .../channel/embedded/EmbeddedEventLoop.java | 14 +++++- .../channel/group/ChannelGroupFuture.java | 8 ++-- .../group/DefaultChannelGroupFuture.java | 9 ++-- .../io/netty/bootstrap/BootstrapTest.java | 4 ++ .../channel/DefaultChannelPipelineTest.java | 4 +- .../group/DefaultChannnelGroupTest.java | 4 +- .../local/LocalChannelRegistryTest.java | 3 ++ .../local/LocalTransportThreadModelTest.java | 19 +++++--- .../local/LocalTransportThreadModelTest3.java | 11 ++++- .../channel/nio/NioDatagramChannelTest.java | 6 +-- 40 files changed, 262 insertions(+), 116 deletions(-) diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java index 990e001b32..142220579e 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java @@ -45,8 +45,8 @@ public abstract class AbstractEventExecutor extends AbstractExecutorService impl } @Override - public void shutdownGracefully() { - shutdownGracefully(2, 15, TimeUnit.SECONDS); + public Future shutdownGracefully() { + return shutdownGracefully(2, 15, TimeUnit.SECONDS); } /** diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutorGroup.java b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutorGroup.java index f8011102c1..61789d6c93 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutorGroup.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutorGroup.java @@ -65,8 +65,8 @@ public abstract class AbstractEventExecutorGroup implements EventExecutorGroup { } @Override - public void shutdownGracefully() { - shutdownGracefully(2, 15, TimeUnit.SECONDS); + public Future shutdownGracefully() { + return shutdownGracefully(2, 15, TimeUnit.SECONDS); } /** diff --git a/common/src/main/java/io/netty/util/concurrent/CompleteFuture.java b/common/src/main/java/io/netty/util/concurrent/CompleteFuture.java index a00f15abf7..d9c6a091f9 100644 --- a/common/src/main/java/io/netty/util/concurrent/CompleteFuture.java +++ b/common/src/main/java/io/netty/util/concurrent/CompleteFuture.java @@ -39,7 +39,7 @@ public abstract class CompleteFuture extends AbstractFuture { } @Override - public Future addListener(GenericFutureListener> listener) { + public Future addListener(GenericFutureListener> listener) { if (listener == null) { throw new NullPointerException("listener"); } @@ -48,11 +48,11 @@ public abstract class CompleteFuture extends AbstractFuture { } @Override - public Future addListeners(GenericFutureListener>... listeners) { + public Future addListeners(GenericFutureListener>... listeners) { if (listeners == null) { throw new NullPointerException("listeners"); } - for (GenericFutureListener> l: listeners) { + for (GenericFutureListener> l: listeners) { if (l == null) { break; } @@ -62,13 +62,13 @@ public abstract class CompleteFuture extends AbstractFuture { } @Override - public Future removeListener(GenericFutureListener> listener) { + public Future removeListener(GenericFutureListener> listener) { // NOOP return this; } @Override - public Future removeListeners(GenericFutureListener>... listeners) { + public Future removeListeners(GenericFutureListener>... listeners) { // NOOP return this; } diff --git a/common/src/main/java/io/netty/util/concurrent/CompletePromise.java b/common/src/main/java/io/netty/util/concurrent/CompletePromise.java index f8b94b18a3..c5eb8db2a9 100644 --- a/common/src/main/java/io/netty/util/concurrent/CompletePromise.java +++ b/common/src/main/java/io/netty/util/concurrent/CompletePromise.java @@ -63,22 +63,22 @@ public abstract class CompletePromise extends CompleteFuture implements Pr } @Override - public Promise addListener(GenericFutureListener> listener) { + public Promise addListener(GenericFutureListener> listener) { return (Promise) super.addListener(listener); } @Override - public Promise addListeners(GenericFutureListener>... listeners) { + public Promise addListeners(GenericFutureListener>... listeners) { return (Promise) super.addListeners(listeners); } @Override - public Promise removeListener(GenericFutureListener> listener) { + public Promise removeListener(GenericFutureListener> listener) { return (Promise) super.removeListener(listener); } @Override - public Promise removeListeners(GenericFutureListener>... listeners) { + public Promise removeListeners(GenericFutureListener>... listeners) { return (Promise) super.removeListeners(listeners); } } diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultProgressivePromise.java b/common/src/main/java/io/netty/util/concurrent/DefaultProgressivePromise.java index d19f890928..177e08edb4 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultProgressivePromise.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultProgressivePromise.java @@ -58,25 +58,25 @@ public class DefaultProgressivePromise extends DefaultPromise implements P } @Override - public ProgressivePromise addListener(GenericFutureListener> listener) { + public ProgressivePromise addListener(GenericFutureListener> listener) { super.addListener(listener); return this; } @Override - public ProgressivePromise addListeners(GenericFutureListener>... listeners) { + public ProgressivePromise addListeners(GenericFutureListener>... listeners) { super.addListeners(listeners); return this; } @Override - public ProgressivePromise removeListener(GenericFutureListener> listener) { + public ProgressivePromise removeListener(GenericFutureListener> listener) { super.removeListener(listener); return this; } @Override - public ProgressivePromise removeListeners(GenericFutureListener>... listeners) { + public ProgressivePromise removeListeners(GenericFutureListener>... listeners) { super.removeListeners(listeners); return this; } diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java index 9bf86d41c0..61a77c87ea 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java @@ -113,7 +113,7 @@ public class DefaultPromise extends AbstractFuture implements Promise { } @Override - public Promise addListener(GenericFutureListener> listener) { + public Promise addListener(GenericFutureListener> listener) { if (listener == null) { throw new NullPointerException("listener"); } @@ -146,12 +146,12 @@ public class DefaultPromise extends AbstractFuture implements Promise { } @Override - public Promise addListeners(GenericFutureListener>... listeners) { + public Promise addListeners(GenericFutureListener>... listeners) { if (listeners == null) { throw new NullPointerException("listeners"); } - for (GenericFutureListener> l: listeners) { + for (GenericFutureListener> l: listeners) { if (l == null) { break; } @@ -161,7 +161,7 @@ public class DefaultPromise extends AbstractFuture implements Promise { } @Override - public Promise removeListener(GenericFutureListener> listener) { + public Promise removeListener(GenericFutureListener> listener) { if (listener == null) { throw new NullPointerException("listener"); } @@ -184,12 +184,12 @@ public class DefaultPromise extends AbstractFuture implements Promise { } @Override - public Promise removeListeners(GenericFutureListener>... listeners) { + public Promise removeListeners(GenericFutureListener>... listeners) { if (listeners == null) { throw new NullPointerException("listeners"); } - for (GenericFutureListener> l: listeners) { + for (GenericFutureListener> l: listeners) { if (l == null) { break; } diff --git a/common/src/main/java/io/netty/util/concurrent/EventExecutorGroup.java b/common/src/main/java/io/netty/util/concurrent/EventExecutorGroup.java index 5f447314f1..30bd90d883 100644 --- a/common/src/main/java/io/netty/util/concurrent/EventExecutorGroup.java +++ b/common/src/main/java/io/netty/util/concurrent/EventExecutorGroup.java @@ -37,8 +37,10 @@ public interface EventExecutorGroup extends ScheduledExecutorService, Iterable shutdownGracefully(); /** * Signals this executor that the caller wants the executor to be shut down. Once this method is called, @@ -51,8 +53,15 @@ public interface EventExecutorGroup extends ScheduledExecutorService, Iterable shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit); + + /** + * Returns the {@link Future} which is notified when this executor has been terminated. + */ + Future terminationFuture(); /** * @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead. diff --git a/common/src/main/java/io/netty/util/concurrent/Future.java b/common/src/main/java/io/netty/util/concurrent/Future.java index 1ee845ff31..f3cf6878f3 100644 --- a/common/src/main/java/io/netty/util/concurrent/Future.java +++ b/common/src/main/java/io/netty/util/concurrent/Future.java @@ -52,7 +52,7 @@ public interface Future extends java.util.concurrent.Future { * {@linkplain #isDone() done}. If this future is already * completed, the specified listener is notified immediately. */ - Future addListener(GenericFutureListener> listener); + Future addListener(GenericFutureListener> listener); /** * Adds the specified listeners to this future. The @@ -60,7 +60,7 @@ public interface Future extends java.util.concurrent.Future { * {@linkplain #isDone() done}. If this future is already * completed, the specified listeners are notified immediately. */ - Future addListeners(GenericFutureListener>... listeners); + Future addListeners(GenericFutureListener>... listeners); /** * Removes the specified listener from this future. @@ -69,7 +69,7 @@ public interface Future extends java.util.concurrent.Future { * listener is not associated with this future, this method * does nothing and returns silently. */ - Future removeListener(GenericFutureListener> listener); + Future removeListener(GenericFutureListener> listener); /** * Removes the specified listeners from this future. @@ -78,7 +78,7 @@ public interface Future extends java.util.concurrent.Future { * listeners are not associated with this future, this method * does nothing and returns silently. */ - Future removeListeners(GenericFutureListener>... listeners); + Future removeListeners(GenericFutureListener>... listeners); /** * Waits for this future until it is done, and rethrows the cause of the failure if this future diff --git a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java index 58abc7a8eb..55ac6c4dbd 100644 --- a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java @@ -58,6 +58,8 @@ public final class GlobalEventExecutor extends AbstractEventExecutor { volatile Thread thread; private volatile int state = ST_NOT_STARTED; + private final Future terminationFuture = new FailedFuture(this, new UnsupportedOperationException()); + private GlobalEventExecutor() { delayedTaskQueue.add(purgeTask); } @@ -157,8 +159,13 @@ public final class GlobalEventExecutor extends AbstractEventExecutor { } @Override - public void shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { - throw new UnsupportedOperationException(); + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + return terminationFuture(); + } + + @Override + public Future terminationFuture() { + return terminationFuture; } @Override diff --git a/common/src/main/java/io/netty/util/concurrent/ImmediateEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/ImmediateEventExecutor.java index 48b399ca12..0fae584403 100644 --- a/common/src/main/java/io/netty/util/concurrent/ImmediateEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/ImmediateEventExecutor.java @@ -23,6 +23,9 @@ import java.util.concurrent.TimeUnit; public final class ImmediateEventExecutor extends AbstractEventExecutor { public static final ImmediateEventExecutor INSTANCE = new ImmediateEventExecutor(); + private final Future terminationFuture = new FailedFuture( + GlobalEventExecutor.INSTANCE, new UnsupportedOperationException()); + private ImmediateEventExecutor() { // use static instance } @@ -43,7 +46,14 @@ public final class ImmediateEventExecutor extends AbstractEventExecutor { } @Override - public void shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { } + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + return terminationFuture(); + } + + @Override + public Future terminationFuture() { + return terminationFuture; + } @Override @Deprecated diff --git a/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java b/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java index 3a7b49f1ef..2542e65340 100644 --- a/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java +++ b/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java @@ -31,6 +31,8 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto private final EventExecutor[] children; private final AtomicInteger childIndex = new AtomicInteger(); + private final AtomicInteger terminatedChildren = new AtomicInteger(); + private final Promise terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE); /** * Create a new instance. @@ -77,6 +79,19 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto } } } + + final FutureListener terminationListener = new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (terminatedChildren.incrementAndGet() == children.length) { + terminationFuture.setSuccess(null); + } + } + }; + + for (EventExecutor e: children) { + e.terminationFuture().addListener(terminationListener); + } } protected ThreadFactory newDefaultThreadFactory() { @@ -119,10 +134,16 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto ThreadFactory threadFactory, Object... args) throws Exception; @Override - public void shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { for (EventExecutor l: children) { l.shutdownGracefully(quietPeriod, timeout, unit); } + return terminationFuture(); + } + + @Override + public Future terminationFuture() { + return terminationFuture; } @Override diff --git a/common/src/main/java/io/netty/util/concurrent/ProgressiveFuture.java b/common/src/main/java/io/netty/util/concurrent/ProgressiveFuture.java index 1c903d52de..75f5eb1830 100644 --- a/common/src/main/java/io/netty/util/concurrent/ProgressiveFuture.java +++ b/common/src/main/java/io/netty/util/concurrent/ProgressiveFuture.java @@ -22,16 +22,16 @@ package io.netty.util.concurrent; public interface ProgressiveFuture extends Future { @Override - ProgressiveFuture addListener(GenericFutureListener> listener); + ProgressiveFuture addListener(GenericFutureListener> listener); @Override - ProgressiveFuture addListeners(GenericFutureListener>... listeners); + ProgressiveFuture addListeners(GenericFutureListener>... listeners); @Override - ProgressiveFuture removeListener(GenericFutureListener> listener); + ProgressiveFuture removeListener(GenericFutureListener> listener); @Override - ProgressiveFuture removeListeners(GenericFutureListener>... listeners); + ProgressiveFuture removeListeners(GenericFutureListener>... listeners); @Override ProgressiveFuture sync() throws InterruptedException; diff --git a/common/src/main/java/io/netty/util/concurrent/ProgressivePromise.java b/common/src/main/java/io/netty/util/concurrent/ProgressivePromise.java index 1574404d2c..34097ae85e 100644 --- a/common/src/main/java/io/netty/util/concurrent/ProgressivePromise.java +++ b/common/src/main/java/io/netty/util/concurrent/ProgressivePromise.java @@ -40,16 +40,16 @@ public interface ProgressivePromise extends Promise, ProgressiveFuture ProgressivePromise setFailure(Throwable cause); @Override - ProgressivePromise addListener(GenericFutureListener> listener); + ProgressivePromise addListener(GenericFutureListener> listener); @Override - ProgressivePromise addListeners(GenericFutureListener>... listeners); + ProgressivePromise addListeners(GenericFutureListener>... listeners); @Override - ProgressivePromise removeListener(GenericFutureListener> listener); + ProgressivePromise removeListener(GenericFutureListener> listener); @Override - ProgressivePromise removeListeners(GenericFutureListener>... listeners); + ProgressivePromise removeListeners(GenericFutureListener>... listeners); @Override ProgressivePromise await() throws InterruptedException; diff --git a/common/src/main/java/io/netty/util/concurrent/Promise.java b/common/src/main/java/io/netty/util/concurrent/Promise.java index e8360e17be..effb54db33 100644 --- a/common/src/main/java/io/netty/util/concurrent/Promise.java +++ b/common/src/main/java/io/netty/util/concurrent/Promise.java @@ -65,16 +65,16 @@ public interface Promise extends Future { boolean setUncancellable(); @Override - Promise addListener(GenericFutureListener> listener); + Promise addListener(GenericFutureListener> listener); @Override - Promise addListeners(GenericFutureListener>... listeners); + Promise addListeners(GenericFutureListener>... listeners); @Override - Promise removeListener(GenericFutureListener> listener); + Promise removeListener(GenericFutureListener> listener); @Override - Promise removeListeners(GenericFutureListener>... listeners); + Promise removeListeners(GenericFutureListener>... listeners); @Override Promise await() throws InterruptedException; diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index e8f1b504f8..c3e6e1963e 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -72,6 +72,8 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { private volatile long gracefulShutdownTimeout; private long gracefulShutdownStartTime; + private final Promise terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE); + /** * Create a new instance * @@ -133,6 +135,8 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { "An event executor terminated with " + "non-empty task queue (" + taskQueue.size() + ')'); } + + terminationFuture.setSuccess(null); } } } @@ -476,7 +480,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } @Override - public void shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { if (quietPeriod < 0) { throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)"); } @@ -489,7 +493,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } if (isShuttingDown()) { - return; + return terminationFuture(); } boolean inEventLoop = inEventLoop(); @@ -497,7 +501,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { synchronized (stateLock) { if (isShuttingDown()) { - return; + return terminationFuture(); } gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod); @@ -524,6 +528,13 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { if (wakeup) { wakeup(inEventLoop); } + + return terminationFuture(); + } + + @Override + public Future terminationFuture() { + return terminationFuture; } @Override diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java index 2d3e2f9e94..226a48a904 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java @@ -54,8 +54,8 @@ public class SocketEchoTest extends AbstractSocketTest { } @AfterClass - public static void destroyGroup() { - group.shutdownGracefully(); + public static void destroyGroup() throws Exception { + group.shutdownGracefully().sync(); } @Test(timeout = 30000) diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java index d50fbaad20..6d147afb1a 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java @@ -55,8 +55,8 @@ public class SocketStartTlsTest extends AbstractSocketTest { } @AfterClass - public static void shutdownExecutor() { - executor.shutdownGracefully(); + public static void shutdownExecutor() throws Exception { + executor.shutdownGracefully().sync(); } @Test(timeout = 30000) diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/udt/UDTClientServerConnectionTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/udt/UDTClientServerConnectionTest.java index 51e23dd29a..2384e428d8 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/udt/UDTClientServerConnectionTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/udt/UDTClientServerConnectionTest.java @@ -100,7 +100,7 @@ public class UDTClientServerConnectionTest { } catch (final Throwable e) { log.error("Client failed.", e); } finally { - connectGroup.shutdownGracefully(); + connectGroup.shutdownGracefully().syncUninterruptibly(); } } @@ -238,6 +238,9 @@ public class UDTClientServerConnectionTest { } finally { acceptGroup.shutdownGracefully(); connectGroup.shutdownGracefully(); + + acceptGroup.terminationFuture().syncUninterruptibly(); + connectGroup.terminationFuture().syncUninterruptibly(); } } diff --git a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtByteRendezvousChannelTest.java b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtByteRendezvousChannelTest.java index cb41cbfeab..8275ef4629 100644 --- a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtByteRendezvousChannelTest.java +++ b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtByteRendezvousChannelTest.java @@ -44,7 +44,7 @@ public class NioUdtByteRendezvousChannelTest extends AbstractUdtTest { */ @Test public void metadata() throws Exception { - assertEquals(false, new NioUdtByteRendezvousChannel().metadata().hasDisconnect()); + assertFalse(new NioUdtByteRendezvousChannel().metadata().hasDisconnect()); } /** @@ -114,5 +114,8 @@ public class NioUdtByteRendezvousChannelTest extends AbstractUdtTest { group1.shutdownGracefully(); group2.shutdownGracefully(); + + group1.terminationFuture().sync(); + group2.terminationFuture().sync(); } } diff --git a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtMessageRendezvousChannelTest.java b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtMessageRendezvousChannelTest.java index ec99de1e6a..082517b7ab 100644 --- a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtMessageRendezvousChannelTest.java +++ b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtMessageRendezvousChannelTest.java @@ -44,7 +44,7 @@ public class NioUdtMessageRendezvousChannelTest extends AbstractUdtTest { */ @Test public void metadata() throws Exception { - assertEquals(false, new NioUdtMessageRendezvousChannel().metadata().hasDisconnect()); + assertFalse(new NioUdtMessageRendezvousChannel().metadata().hasDisconnect()); } /** @@ -108,5 +108,8 @@ public class NioUdtMessageRendezvousChannelTest extends AbstractUdtTest { group1.shutdownGracefully(); group2.shutdownGracefully(); + + group1.terminationFuture().sync(); + group2.terminationFuture().sync(); } } diff --git a/transport/src/main/java/io/netty/channel/ChannelFuture.java b/transport/src/main/java/io/netty/channel/ChannelFuture.java index 13308400fc..bb29e297fe 100644 --- a/transport/src/main/java/io/netty/channel/ChannelFuture.java +++ b/transport/src/main/java/io/netty/channel/ChannelFuture.java @@ -171,16 +171,16 @@ public interface ChannelFuture extends Future { Channel channel(); @Override - ChannelFuture addListener(GenericFutureListener> listener); + ChannelFuture addListener(GenericFutureListener> listener); @Override - ChannelFuture addListeners(GenericFutureListener>... listeners); + ChannelFuture addListeners(GenericFutureListener>... listeners); @Override - ChannelFuture removeListener(GenericFutureListener> listener); + ChannelFuture removeListener(GenericFutureListener> listener); @Override - ChannelFuture removeListeners(GenericFutureListener>... listeners); + ChannelFuture removeListeners(GenericFutureListener>... listeners); @Override ChannelFuture sync() throws InterruptedException; diff --git a/transport/src/main/java/io/netty/channel/ChannelProgressiveFuture.java b/transport/src/main/java/io/netty/channel/ChannelProgressiveFuture.java index abd261bf88..c33ab9a4d2 100644 --- a/transport/src/main/java/io/netty/channel/ChannelProgressiveFuture.java +++ b/transport/src/main/java/io/netty/channel/ChannelProgressiveFuture.java @@ -24,16 +24,16 @@ import io.netty.util.concurrent.ProgressiveFuture; */ public interface ChannelProgressiveFuture extends ChannelFuture, ProgressiveFuture { @Override - ChannelProgressiveFuture addListener(GenericFutureListener> listener); + ChannelProgressiveFuture addListener(GenericFutureListener> listener); @Override - ChannelProgressiveFuture addListeners(GenericFutureListener>... listeners); + ChannelProgressiveFuture addListeners(GenericFutureListener>... listeners); @Override - ChannelProgressiveFuture removeListener(GenericFutureListener> listener); + ChannelProgressiveFuture removeListener(GenericFutureListener> listener); @Override - ChannelProgressiveFuture removeListeners(GenericFutureListener>... listeners); + ChannelProgressiveFuture removeListeners(GenericFutureListener>... listeners); @Override ChannelProgressiveFuture sync() throws InterruptedException; diff --git a/transport/src/main/java/io/netty/channel/ChannelProgressivePromise.java b/transport/src/main/java/io/netty/channel/ChannelProgressivePromise.java index 4b3744bd3f..b665b7d4e3 100644 --- a/transport/src/main/java/io/netty/channel/ChannelProgressivePromise.java +++ b/transport/src/main/java/io/netty/channel/ChannelProgressivePromise.java @@ -25,16 +25,16 @@ import io.netty.util.concurrent.ProgressivePromise; public interface ChannelProgressivePromise extends ProgressivePromise, ChannelProgressiveFuture, ChannelPromise { @Override - ChannelProgressivePromise addListener(GenericFutureListener> listener); + ChannelProgressivePromise addListener(GenericFutureListener> listener); @Override - ChannelProgressivePromise addListeners(GenericFutureListener>... listeners); + ChannelProgressivePromise addListeners(GenericFutureListener>... listeners); @Override - ChannelProgressivePromise removeListener(GenericFutureListener> listener); + ChannelProgressivePromise removeListener(GenericFutureListener> listener); @Override - ChannelProgressivePromise removeListeners(GenericFutureListener>... listeners); + ChannelProgressivePromise removeListeners(GenericFutureListener>... listeners); @Override ChannelProgressivePromise sync() throws InterruptedException; diff --git a/transport/src/main/java/io/netty/channel/ChannelPromise.java b/transport/src/main/java/io/netty/channel/ChannelPromise.java index 31d284654c..b808c6395a 100644 --- a/transport/src/main/java/io/netty/channel/ChannelPromise.java +++ b/transport/src/main/java/io/netty/channel/ChannelPromise.java @@ -38,16 +38,16 @@ public interface ChannelPromise extends ChannelFuture, Promise { ChannelPromise setFailure(Throwable cause); @Override - ChannelPromise addListener(GenericFutureListener> listener); + ChannelPromise addListener(GenericFutureListener> listener); @Override - ChannelPromise addListeners(GenericFutureListener>... listeners); + ChannelPromise addListeners(GenericFutureListener>... listeners); @Override - ChannelPromise removeListener(GenericFutureListener> listener); + ChannelPromise removeListener(GenericFutureListener> listener); @Override - ChannelPromise removeListeners(GenericFutureListener>... listeners); + ChannelPromise removeListeners(GenericFutureListener>... listeners); @Override ChannelPromise sync() throws InterruptedException; diff --git a/transport/src/main/java/io/netty/channel/CompleteChannelFuture.java b/transport/src/main/java/io/netty/channel/CompleteChannelFuture.java index 92a1c4774f..4557aa3938 100644 --- a/transport/src/main/java/io/netty/channel/CompleteChannelFuture.java +++ b/transport/src/main/java/io/netty/channel/CompleteChannelFuture.java @@ -52,25 +52,25 @@ abstract class CompleteChannelFuture extends CompleteFuture implements Cha } @Override - public ChannelFuture addListener(GenericFutureListener> listener) { + public ChannelFuture addListener(GenericFutureListener> listener) { super.addListener(listener); return this; } @Override - public ChannelFuture addListeners(GenericFutureListener>... listeners) { + public ChannelFuture addListeners(GenericFutureListener>... listeners) { super.addListeners(listeners); return this; } @Override - public ChannelFuture removeListener(GenericFutureListener> listener) { + public ChannelFuture removeListener(GenericFutureListener> listener) { super.removeListener(listener); return this; } @Override - public ChannelFuture removeListeners(GenericFutureListener>... listeners) { + public ChannelFuture removeListeners(GenericFutureListener>... listeners) { super.removeListeners(listeners); return this; } diff --git a/transport/src/main/java/io/netty/channel/CompleteChannelPromise.java b/transport/src/main/java/io/netty/channel/CompleteChannelPromise.java index c46bdc2877..5af2a0103e 100644 --- a/transport/src/main/java/io/netty/channel/CompleteChannelPromise.java +++ b/transport/src/main/java/io/netty/channel/CompleteChannelPromise.java @@ -67,22 +67,22 @@ abstract class CompleteChannelPromise extends CompleteChannelFuture implements C } @Override - public ChannelPromise addListener(GenericFutureListener> listener) { + public ChannelPromise addListener(GenericFutureListener> listener) { return (ChannelPromise) super.addListener(listener); } @Override - public ChannelPromise addListeners(GenericFutureListener>... listeners) { + public ChannelPromise addListeners(GenericFutureListener>... listeners) { return (ChannelPromise) super.addListeners(listeners); } @Override - public ChannelPromise removeListener(GenericFutureListener> listener) { + public ChannelPromise removeListener(GenericFutureListener> listener) { return (ChannelPromise) super.removeListener(listener); } @Override - public ChannelPromise removeListeners(GenericFutureListener>... listeners) { + public ChannelPromise removeListeners(GenericFutureListener>... listeners) { return (ChannelPromise) super.removeListeners(listeners); } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelProgressivePromise.java b/transport/src/main/java/io/netty/channel/DefaultChannelProgressivePromise.java index 00dcb4b5f2..4b101e0cc0 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelProgressivePromise.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelProgressivePromise.java @@ -97,25 +97,26 @@ public class DefaultChannelProgressivePromise } @Override - public ChannelProgressivePromise addListener(GenericFutureListener> listener) { + public ChannelProgressivePromise addListener(GenericFutureListener> listener) { super.addListener(listener); return this; } @Override - public ChannelProgressivePromise addListeners(GenericFutureListener>... listeners) { + public ChannelProgressivePromise addListeners(GenericFutureListener>... listeners) { super.addListeners(listeners); return this; } @Override - public ChannelProgressivePromise removeListener(GenericFutureListener> listener) { + public ChannelProgressivePromise removeListener(GenericFutureListener> listener) { super.removeListener(listener); return this; } @Override - public ChannelProgressivePromise removeListeners(GenericFutureListener>... listeners) { + public ChannelProgressivePromise removeListeners( + GenericFutureListener>... listeners) { super.removeListeners(listeners); return this; } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPromise.java b/transport/src/main/java/io/netty/channel/DefaultChannelPromise.java index d6f4d2f196..3d76347281 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPromise.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPromise.java @@ -89,25 +89,25 @@ public class DefaultChannelPromise extends DefaultPromise implements Chann } @Override - public ChannelPromise addListener(GenericFutureListener> listener) { + public ChannelPromise addListener(GenericFutureListener> listener) { super.addListener(listener); return this; } @Override - public ChannelPromise addListeners(GenericFutureListener>... listeners) { + public ChannelPromise addListeners(GenericFutureListener>... listeners) { super.addListeners(listeners); return this; } @Override - public ChannelPromise removeListener(GenericFutureListener> listener) { + public ChannelPromise removeListener(GenericFutureListener> listener) { super.removeListener(listener); return this; } @Override - public ChannelPromise removeListeners(GenericFutureListener>... listeners) { + public ChannelPromise removeListeners(GenericFutureListener>... listeners) { super.removeListeners(listeners); return this; } diff --git a/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoopGroup.java b/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoopGroup.java index 120f53d2e6..b0a32f42ba 100644 --- a/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoopGroup.java @@ -17,7 +17,12 @@ package io.netty.channel; import io.netty.util.concurrent.AbstractEventExecutorGroup; +import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.GlobalEventExecutor; +import io.netty.util.concurrent.Promise; import io.netty.util.internal.EmptyArrays; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.ReadOnlyIterator; @@ -28,6 +33,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -44,6 +50,18 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i final Queue idleChildren = new ConcurrentLinkedQueue(); private final ChannelException tooManyChannels; + private volatile boolean shuttingDown; + private final Promise terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE); + private final FutureListener childTerminationListener = new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + // Inefficient, but works. + if (isTerminated()) { + terminationFuture.setSuccess(null); + } + } + }; + /** * Create a new {@link ThreadPerChannelEventLoopGroup} with no limit in place. */ @@ -117,24 +135,45 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i } @Override - public void shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + shuttingDown = true; + for (EventLoop l: activeChildren) { l.shutdownGracefully(quietPeriod, timeout, unit); } for (EventLoop l: idleChildren) { l.shutdownGracefully(quietPeriod, timeout, unit); } + + // Notify the future if there was no children. + if (isTerminated()) { + terminationFuture.trySuccess(null); + } + + return terminationFuture(); + } + + @Override + public Future terminationFuture() { + return terminationFuture; } @Override @Deprecated public void shutdown() { + shuttingDown = true; + for (EventLoop l: activeChildren) { l.shutdown(); } for (EventLoop l: idleChildren) { l.shutdown(); } + + // Notify the future if there was no children. + if (isTerminated()) { + terminationFuture.trySuccess(null); + } } @Override @@ -237,12 +276,17 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i } private EventLoop nextChild() throws Exception { + if (shuttingDown) { + throw new RejectedExecutionException("shutting down"); + } + ThreadPerChannelEventLoop loop = idleChildren.poll(); if (loop == null) { if (maxChannels > 0 && activeChildren.size() >= maxChannels) { throw tooManyChannels; } loop = newChild(childArgs); + loop.terminationFuture().addListener(childTerminationListener); } activeChildren.add(loop); return loop; diff --git a/transport/src/main/java/io/netty/channel/VoidChannelPromise.java b/transport/src/main/java/io/netty/channel/VoidChannelPromise.java index 0250f2e729..dd836ca916 100644 --- a/transport/src/main/java/io/netty/channel/VoidChannelPromise.java +++ b/transport/src/main/java/io/netty/channel/VoidChannelPromise.java @@ -41,25 +41,25 @@ final class VoidChannelPromise extends AbstractFuture implements ChannelPr } @Override - public VoidChannelPromise addListener(GenericFutureListener> listener) { + public VoidChannelPromise addListener(GenericFutureListener> listener) { fail(); return this; } @Override - public VoidChannelPromise addListeners(GenericFutureListener>... listeners) { + public VoidChannelPromise addListeners(GenericFutureListener>... listeners) { fail(); return this; } @Override - public VoidChannelPromise removeListener(GenericFutureListener> listener) { + public VoidChannelPromise removeListener(GenericFutureListener> listener) { // NOOP return this; } @Override - public VoidChannelPromise removeListeners(GenericFutureListener>... listeners) { + public VoidChannelPromise removeListeners(GenericFutureListener>... listeners) { // NOOP return this; } diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java index 4f2ea60ee1..dc7892501c 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java @@ -21,6 +21,7 @@ import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.AbstractEventExecutor; +import io.netty.util.concurrent.Future; import java.util.ArrayDeque; import java.util.Queue; @@ -50,11 +51,20 @@ final class EmbeddedEventLoop extends AbstractEventExecutor implements EventLoop } @Override - public void shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { } + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + @Override + public Future terminationFuture() { + throw new UnsupportedOperationException(); + } @Override @Deprecated - public void shutdown() { } + public void shutdown() { + throw new UnsupportedOperationException(); + } @Override public boolean isShuttingDown() { diff --git a/transport/src/main/java/io/netty/channel/group/ChannelGroupFuture.java b/transport/src/main/java/io/netty/channel/group/ChannelGroupFuture.java index db2f6a0c8c..ac83f7e846 100644 --- a/transport/src/main/java/io/netty/channel/group/ChannelGroupFuture.java +++ b/transport/src/main/java/io/netty/channel/group/ChannelGroupFuture.java @@ -151,16 +151,16 @@ public interface ChannelGroupFuture extends Future, Iterable> listener); + ChannelGroupFuture addListener(GenericFutureListener> listener); @Override - ChannelGroupFuture addListeners(GenericFutureListener>... listeners); + ChannelGroupFuture addListeners(GenericFutureListener>... listeners); @Override - ChannelGroupFuture removeListener(GenericFutureListener> listener); + ChannelGroupFuture removeListener(GenericFutureListener> listener); @Override - ChannelGroupFuture removeListeners(GenericFutureListener>... listeners); + ChannelGroupFuture removeListeners(GenericFutureListener>... listeners); @Override ChannelGroupFuture await() throws InterruptedException; diff --git a/transport/src/main/java/io/netty/channel/group/DefaultChannelGroupFuture.java b/transport/src/main/java/io/netty/channel/group/DefaultChannelGroupFuture.java index 06bf3246b7..9400a19c1e 100644 --- a/transport/src/main/java/io/netty/channel/group/DefaultChannelGroupFuture.java +++ b/transport/src/main/java/io/netty/channel/group/DefaultChannelGroupFuture.java @@ -153,25 +153,26 @@ final class DefaultChannelGroupFuture extends DefaultPromise implements Ch } @Override - public DefaultChannelGroupFuture addListener(GenericFutureListener> listener) { + public DefaultChannelGroupFuture addListener(GenericFutureListener> listener) { super.addListener(listener); return this; } @Override - public DefaultChannelGroupFuture addListeners(GenericFutureListener>... listeners) { + public DefaultChannelGroupFuture addListeners(GenericFutureListener>... listeners) { super.addListeners(listeners); return this; } @Override - public DefaultChannelGroupFuture removeListener(GenericFutureListener> listener) { + public DefaultChannelGroupFuture removeListener(GenericFutureListener> listener) { super.removeListener(listener); return this; } @Override - public DefaultChannelGroupFuture removeListeners(GenericFutureListener>... listeners) { + public DefaultChannelGroupFuture removeListeners( + GenericFutureListener>... listeners) { super.removeListeners(listeners); return this; } diff --git a/transport/src/test/java/io/netty/bootstrap/BootstrapTest.java b/transport/src/test/java/io/netty/bootstrap/BootstrapTest.java index e99b4fdc54..efcadfb6a1 100644 --- a/transport/src/test/java/io/netty/bootstrap/BootstrapTest.java +++ b/transport/src/test/java/io/netty/bootstrap/BootstrapTest.java @@ -74,6 +74,8 @@ public class BootstrapTest { } finally { groupA.shutdownGracefully(); groupB.shutdownGracefully(); + groupA.terminationFuture().sync(); + groupB.terminationFuture().sync(); } } @@ -120,6 +122,8 @@ public class BootstrapTest { } finally { groupA.shutdownGracefully(); groupB.shutdownGracefully(); + groupA.terminationFuture().sync(); + groupB.terminationFuture().sync(); } } diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java index 00c85a86e0..cd0e3d52c3 100644 --- a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java +++ b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java @@ -45,8 +45,8 @@ public class DefaultChannelPipelineTest { private Channel peer; @AfterClass - public static void afterClass() { - group.shutdownGracefully(); + public static void afterClass() throws Exception { + group.shutdownGracefully().sync(); } private void setUp(final ChannelHandler... handlers) throws Exception { diff --git a/transport/src/test/java/io/netty/channel/group/DefaultChannnelGroupTest.java b/transport/src/test/java/io/netty/channel/group/DefaultChannnelGroupTest.java index 13a60fbc3b..2681e6bc81 100644 --- a/transport/src/test/java/io/netty/channel/group/DefaultChannnelGroupTest.java +++ b/transport/src/test/java/io/netty/channel/group/DefaultChannnelGroupTest.java @@ -29,7 +29,7 @@ public class DefaultChannnelGroupTest { // Test for #1183 @Test - public void testNotThrowBlockingOperationException() { + public void testNotThrowBlockingOperationException() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); @@ -54,5 +54,7 @@ public class DefaultChannnelGroupTest { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); + bossGroup.terminationFuture().sync(); + workerGroup.terminationFuture().sync(); } } diff --git a/transport/src/test/java/io/netty/channel/local/LocalChannelRegistryTest.java b/transport/src/test/java/io/netty/channel/local/LocalChannelRegistryTest.java index 830c77be58..13f43c7089 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalChannelRegistryTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalChannelRegistryTest.java @@ -88,6 +88,9 @@ public class LocalChannelRegistryTest { assertNull(String.format( "Expected null, got channel '%s' for local address '%s'", LocalChannelRegistry.get(addr), addr), LocalChannelRegistry.get(addr)); + + serverGroup.terminationFuture().sync(); + clientGroup.terminationFuture().sync(); } } diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java index 59bca7c134..29381476df 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java @@ -38,7 +38,6 @@ import java.util.HashSet; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; public class LocalTransportThreadModelTest { @@ -69,8 +68,8 @@ public class LocalTransportThreadModelTest { } @AfterClass - public static void destroy() { - group.shutdownGracefully(); + public static void destroy() throws Exception { + group.shutdownGracefully().sync(); } @Test(timeout = 30000) @@ -216,9 +215,10 @@ public class LocalTransportThreadModelTest { l.shutdownGracefully(); e1.shutdownGracefully(); e2.shutdownGracefully(); - l.awaitTermination(5, TimeUnit.SECONDS); - e1.awaitTermination(5, TimeUnit.SECONDS); - e2.awaitTermination(5, TimeUnit.SECONDS); + + l.terminationFuture().sync(); + e1.terminationFuture().sync(); + e2.terminationFuture().sync(); } } @@ -344,6 +344,13 @@ public class LocalTransportThreadModelTest { e3.shutdownGracefully(); e4.shutdownGracefully(); e5.shutdownGracefully(); + + l.terminationFuture().sync(); + e1.terminationFuture().sync(); + e2.terminationFuture().sync(); + e3.terminationFuture().sync(); + e4.terminationFuture().sync(); + e5.terminationFuture().sync(); } } diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java index dad52b2226..8c8d39b06c 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java @@ -83,8 +83,8 @@ public class LocalTransportThreadModelTest3 { } @AfterClass - public static void destroy() { - group.shutdownGracefully(); + public static void destroy() throws Exception { + group.shutdownGracefully().sync(); } @Test(timeout = 60000) @@ -222,6 +222,13 @@ public class LocalTransportThreadModelTest3 { e3.shutdownGracefully(); e4.shutdownGracefully(); e5.shutdownGracefully(); + + l.terminationFuture().sync(); + e1.terminationFuture().sync(); + e2.terminationFuture().sync(); + e3.terminationFuture().sync(); + e4.terminationFuture().sync(); + e5.terminationFuture().sync(); } } diff --git a/transport/src/test/java/io/netty/channel/nio/NioDatagramChannelTest.java b/transport/src/test/java/io/netty/channel/nio/NioDatagramChannelTest.java index 831556242a..7f20b876ae 100644 --- a/transport/src/test/java/io/netty/channel/nio/NioDatagramChannelTest.java +++ b/transport/src/test/java/io/netty/channel/nio/NioDatagramChannelTest.java @@ -36,7 +36,7 @@ public class NioDatagramChannelTest { * Test try to reproduce issue #1335 */ @Test - public void testBindMultiple() { + public void testBindMultiple() throws Exception { DefaultChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); NioEventLoopGroup group = new NioEventLoopGroup(); try { @@ -57,8 +57,8 @@ public class NioDatagramChannelTest { } Assert.assertEquals(100, channelGroup.size()); } finally { - channelGroup.close().syncUninterruptibly(); - group.shutdownGracefully(); + channelGroup.close().sync(); + group.shutdownGracefully().sync(); } } }