Make EventExecutor.shutdownGracefully() return Future

- Also added EventExecutor.terminationFuture()
- Also fixed type signature problem with Future.add/removeListener()
- Related issue: #1389
This commit is contained in:
Trustin Lee 2013-06-12 08:00:54 +09:00
parent fd0084ecfa
commit 79e236dfc2
40 changed files with 262 additions and 116 deletions

View File

@ -45,8 +45,8 @@ public abstract class AbstractEventExecutor extends AbstractExecutorService impl
}
@Override
public void shutdownGracefully() {
shutdownGracefully(2, 15, TimeUnit.SECONDS);
public Future<?> shutdownGracefully() {
return shutdownGracefully(2, 15, TimeUnit.SECONDS);
}
/**

View File

@ -65,8 +65,8 @@ public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {
}
@Override
public void shutdownGracefully() {
shutdownGracefully(2, 15, TimeUnit.SECONDS);
public Future<?> shutdownGracefully() {
return shutdownGracefully(2, 15, TimeUnit.SECONDS);
}
/**

View File

@ -39,7 +39,7 @@ public abstract class CompleteFuture<V> extends AbstractFuture<V> {
}
@Override
public Future<V> addListener(GenericFutureListener<? extends Future<V>> listener) {
public Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
if (listener == null) {
throw new NullPointerException("listener");
}
@ -48,11 +48,11 @@ public abstract class CompleteFuture<V> extends AbstractFuture<V> {
}
@Override
public Future<V> addListeners(GenericFutureListener<? extends Future<V>>... listeners) {
public Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
if (listeners == null) {
throw new NullPointerException("listeners");
}
for (GenericFutureListener<? extends Future<V>> l: listeners) {
for (GenericFutureListener<? extends Future<? super V>> l: listeners) {
if (l == null) {
break;
}
@ -62,13 +62,13 @@ public abstract class CompleteFuture<V> extends AbstractFuture<V> {
}
@Override
public Future<V> removeListener(GenericFutureListener<? extends Future<V>> listener) {
public Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener) {
// NOOP
return this;
}
@Override
public Future<V> removeListeners(GenericFutureListener<? extends Future<V>>... listeners) {
public Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
// NOOP
return this;
}

View File

@ -63,22 +63,22 @@ public abstract class CompletePromise<V> extends CompleteFuture<V> implements Pr
}
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<V>> listener) {
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
return (Promise<V>) super.addListener(listener);
}
@Override
public Promise<V> addListeners(GenericFutureListener<? extends Future<V>>... listeners) {
public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
return (Promise<V>) super.addListeners(listeners);
}
@Override
public Promise<V> removeListener(GenericFutureListener<? extends Future<V>> listener) {
public Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener) {
return (Promise<V>) super.removeListener(listener);
}
@Override
public Promise<V> removeListeners(GenericFutureListener<? extends Future<V>>... listeners) {
public Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
return (Promise<V>) super.removeListeners(listeners);
}
}

View File

@ -58,25 +58,25 @@ public class DefaultProgressivePromise<V> extends DefaultPromise<V> implements P
}
@Override
public ProgressivePromise<V> addListener(GenericFutureListener<? extends Future<V>> listener) {
public ProgressivePromise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
super.addListener(listener);
return this;
}
@Override
public ProgressivePromise<V> addListeners(GenericFutureListener<? extends Future<V>>... listeners) {
public ProgressivePromise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
super.addListeners(listeners);
return this;
}
@Override
public ProgressivePromise<V> removeListener(GenericFutureListener<? extends Future<V>> listener) {
public ProgressivePromise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener) {
super.removeListener(listener);
return this;
}
@Override
public ProgressivePromise<V> removeListeners(GenericFutureListener<? extends Future<V>>... listeners) {
public ProgressivePromise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
super.removeListeners(listeners);
return this;
}

View File

@ -113,7 +113,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
}
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<V>> listener) {
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
if (listener == null) {
throw new NullPointerException("listener");
}
@ -146,12 +146,12 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
}
@Override
public Promise<V> addListeners(GenericFutureListener<? extends Future<V>>... listeners) {
public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
if (listeners == null) {
throw new NullPointerException("listeners");
}
for (GenericFutureListener<? extends Future<V>> l: listeners) {
for (GenericFutureListener<? extends Future<? super V>> l: listeners) {
if (l == null) {
break;
}
@ -161,7 +161,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
}
@Override
public Promise<V> removeListener(GenericFutureListener<? extends Future<V>> listener) {
public Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener) {
if (listener == null) {
throw new NullPointerException("listener");
}
@ -184,12 +184,12 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
}
@Override
public Promise<V> removeListeners(GenericFutureListener<? extends Future<V>>... listeners) {
public Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
if (listeners == null) {
throw new NullPointerException("listeners");
}
for (GenericFutureListener<? extends Future<V>> l: listeners) {
for (GenericFutureListener<? extends Future<? super V>> l: listeners) {
if (l == null) {
break;
}

View File

@ -37,8 +37,10 @@ public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<E
/**
* Shortcut method for {@link #shutdownGracefully(long, long, TimeUnit)} with sensible default values.
*
* @return the {@link #terminationFuture()}
*/
void shutdownGracefully();
Future<?> shutdownGracefully();
/**
* Signals this executor that the caller wants the executor to be shut down. Once this method is called,
@ -51,8 +53,15 @@ public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<E
* @param timeout the maximum amount of time to wait until the executor is {@linkplain #shutdown()}
* regardless if a task was submitted during the quiet period
* @param unit the unit of {@code quietPeriod} and {@code timeout}
*
* @return the {@link #terminationFuture()}
*/
void shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
/**
* Returns the {@link Future} which is notified when this executor has been terminated.
*/
Future<?> terminationFuture();
/**
* @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead.

View File

@ -52,7 +52,7 @@ public interface Future<V> extends java.util.concurrent.Future<V> {
* {@linkplain #isDone() done}. If this future is already
* completed, the specified listener is notified immediately.
*/
Future<V> addListener(GenericFutureListener<? extends Future<V>> listener);
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
/**
* Adds the specified listeners to this future. The
@ -60,7 +60,7 @@ public interface Future<V> extends java.util.concurrent.Future<V> {
* {@linkplain #isDone() done}. If this future is already
* completed, the specified listeners are notified immediately.
*/
Future<V> addListeners(GenericFutureListener<? extends Future<V>>... listeners);
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
/**
* Removes the specified listener from this future.
@ -69,7 +69,7 @@ public interface Future<V> extends java.util.concurrent.Future<V> {
* listener is not associated with this future, this method
* does nothing and returns silently.
*/
Future<V> removeListener(GenericFutureListener<? extends Future<V>> listener);
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
/**
* Removes the specified listeners from this future.
@ -78,7 +78,7 @@ public interface Future<V> extends java.util.concurrent.Future<V> {
* listeners are not associated with this future, this method
* does nothing and returns silently.
*/
Future<V> removeListeners(GenericFutureListener<? extends Future<V>>... listeners);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
/**
* Waits for this future until it is done, and rethrows the cause of the failure if this future

View File

@ -58,6 +58,8 @@ public final class GlobalEventExecutor extends AbstractEventExecutor {
volatile Thread thread;
private volatile int state = ST_NOT_STARTED;
private final Future<?> terminationFuture = new FailedFuture<Object>(this, new UnsupportedOperationException());
private GlobalEventExecutor() {
delayedTaskQueue.add(purgeTask);
}
@ -157,8 +159,13 @@ public final class GlobalEventExecutor extends AbstractEventExecutor {
}
@Override
public void shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
throw new UnsupportedOperationException();
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
return terminationFuture();
}
@Override
public Future<?> terminationFuture() {
return terminationFuture;
}
@Override

View File

@ -23,6 +23,9 @@ import java.util.concurrent.TimeUnit;
public final class ImmediateEventExecutor extends AbstractEventExecutor {
public static final ImmediateEventExecutor INSTANCE = new ImmediateEventExecutor();
private final Future<?> terminationFuture = new FailedFuture<Object>(
GlobalEventExecutor.INSTANCE, new UnsupportedOperationException());
private ImmediateEventExecutor() {
// use static instance
}
@ -43,7 +46,14 @@ public final class ImmediateEventExecutor extends AbstractEventExecutor {
}
@Override
public void shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { }
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
return terminationFuture();
}
@Override
public Future<?> terminationFuture() {
return terminationFuture;
}
@Override
@Deprecated

View File

@ -31,6 +31,8 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto
private final EventExecutor[] children;
private final AtomicInteger childIndex = new AtomicInteger();
private final AtomicInteger terminatedChildren = new AtomicInteger();
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
/**
* Create a new instance.
@ -77,6 +79,19 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto
}
}
}
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
}
protected ThreadFactory newDefaultThreadFactory() {
@ -119,10 +134,16 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto
ThreadFactory threadFactory, Object... args) throws Exception;
@Override
public void shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
for (EventExecutor l: children) {
l.shutdownGracefully(quietPeriod, timeout, unit);
}
return terminationFuture();
}
@Override
public Future<?> terminationFuture() {
return terminationFuture;
}
@Override

View File

@ -22,16 +22,16 @@ package io.netty.util.concurrent;
public interface ProgressiveFuture<V> extends Future<V> {
@Override
ProgressiveFuture<V> addListener(GenericFutureListener<? extends Future<V>> listener);
ProgressiveFuture<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
ProgressiveFuture<V> addListeners(GenericFutureListener<? extends Future<V>>... listeners);
ProgressiveFuture<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
ProgressiveFuture<V> removeListener(GenericFutureListener<? extends Future<V>> listener);
ProgressiveFuture<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
ProgressiveFuture<V> removeListeners(GenericFutureListener<? extends Future<V>>... listeners);
ProgressiveFuture<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
ProgressiveFuture<V> sync() throws InterruptedException;

View File

@ -40,16 +40,16 @@ public interface ProgressivePromise<V> extends Promise<V>, ProgressiveFuture<V>
ProgressivePromise<V> setFailure(Throwable cause);
@Override
ProgressivePromise<V> addListener(GenericFutureListener<? extends Future<V>> listener);
ProgressivePromise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
ProgressivePromise<V> addListeners(GenericFutureListener<? extends Future<V>>... listeners);
ProgressivePromise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
ProgressivePromise<V> removeListener(GenericFutureListener<? extends Future<V>> listener);
ProgressivePromise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
ProgressivePromise<V> removeListeners(GenericFutureListener<? extends Future<V>>... listeners);
ProgressivePromise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
ProgressivePromise<V> await() throws InterruptedException;

View File

@ -65,16 +65,16 @@ public interface Promise<V> extends Future<V> {
boolean setUncancellable();
@Override
Promise<V> addListener(GenericFutureListener<? extends Future<V>> listener);
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> addListeners(GenericFutureListener<? extends Future<V>>... listeners);
Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
Promise<V> removeListener(GenericFutureListener<? extends Future<V>> listener);
Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> removeListeners(GenericFutureListener<? extends Future<V>>... listeners);
Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
Promise<V> await() throws InterruptedException;

View File

@ -72,6 +72,8 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
private volatile long gracefulShutdownTimeout;
private long gracefulShutdownStartTime;
private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
/**
* Create a new instance
*
@ -133,6 +135,8 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
"An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}
terminationFuture.setSuccess(null);
}
}
}
@ -476,7 +480,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
}
@Override
public void shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
if (quietPeriod < 0) {
throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)");
}
@ -489,7 +493,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
}
if (isShuttingDown()) {
return;
return terminationFuture();
}
boolean inEventLoop = inEventLoop();
@ -497,7 +501,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
synchronized (stateLock) {
if (isShuttingDown()) {
return;
return terminationFuture();
}
gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
@ -524,6 +528,13 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
if (wakeup) {
wakeup(inEventLoop);
}
return terminationFuture();
}
@Override
public Future<?> terminationFuture() {
return terminationFuture;
}
@Override

View File

@ -54,8 +54,8 @@ public class SocketEchoTest extends AbstractSocketTest {
}
@AfterClass
public static void destroyGroup() {
group.shutdownGracefully();
public static void destroyGroup() throws Exception {
group.shutdownGracefully().sync();
}
@Test(timeout = 30000)

View File

@ -55,8 +55,8 @@ public class SocketStartTlsTest extends AbstractSocketTest {
}
@AfterClass
public static void shutdownExecutor() {
executor.shutdownGracefully();
public static void shutdownExecutor() throws Exception {
executor.shutdownGracefully().sync();
}
@Test(timeout = 30000)

View File

@ -100,7 +100,7 @@ public class UDTClientServerConnectionTest {
} catch (final Throwable e) {
log.error("Client failed.", e);
} finally {
connectGroup.shutdownGracefully();
connectGroup.shutdownGracefully().syncUninterruptibly();
}
}
@ -238,6 +238,9 @@ public class UDTClientServerConnectionTest {
} finally {
acceptGroup.shutdownGracefully();
connectGroup.shutdownGracefully();
acceptGroup.terminationFuture().syncUninterruptibly();
connectGroup.terminationFuture().syncUninterruptibly();
}
}

View File

@ -44,7 +44,7 @@ public class NioUdtByteRendezvousChannelTest extends AbstractUdtTest {
*/
@Test
public void metadata() throws Exception {
assertEquals(false, new NioUdtByteRendezvousChannel().metadata().hasDisconnect());
assertFalse(new NioUdtByteRendezvousChannel().metadata().hasDisconnect());
}
/**
@ -114,5 +114,8 @@ public class NioUdtByteRendezvousChannelTest extends AbstractUdtTest {
group1.shutdownGracefully();
group2.shutdownGracefully();
group1.terminationFuture().sync();
group2.terminationFuture().sync();
}
}

View File

@ -44,7 +44,7 @@ public class NioUdtMessageRendezvousChannelTest extends AbstractUdtTest {
*/
@Test
public void metadata() throws Exception {
assertEquals(false, new NioUdtMessageRendezvousChannel().metadata().hasDisconnect());
assertFalse(new NioUdtMessageRendezvousChannel().metadata().hasDisconnect());
}
/**
@ -108,5 +108,8 @@ public class NioUdtMessageRendezvousChannelTest extends AbstractUdtTest {
group1.shutdownGracefully();
group2.shutdownGracefully();
group1.terminationFuture().sync();
group2.terminationFuture().sync();
}
}

View File

@ -171,16 +171,16 @@ public interface ChannelFuture extends Future<Void> {
Channel channel();
@Override
ChannelFuture addListener(GenericFutureListener<? extends Future<Void>> listener);
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture addListeners(GenericFutureListener<? extends Future<Void>>... listeners);
ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture removeListener(GenericFutureListener<? extends Future<Void>> listener);
ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture removeListeners(GenericFutureListener<? extends Future<Void>>... listeners);
ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture sync() throws InterruptedException;

View File

@ -24,16 +24,16 @@ import io.netty.util.concurrent.ProgressiveFuture;
*/
public interface ChannelProgressiveFuture extends ChannelFuture, ProgressiveFuture<Void> {
@Override
ChannelProgressiveFuture addListener(GenericFutureListener<? extends Future<Void>> listener);
ChannelProgressiveFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelProgressiveFuture addListeners(GenericFutureListener<? extends Future<Void>>... listeners);
ChannelProgressiveFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelProgressiveFuture removeListener(GenericFutureListener<? extends Future<Void>> listener);
ChannelProgressiveFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelProgressiveFuture removeListeners(GenericFutureListener<? extends Future<Void>>... listeners);
ChannelProgressiveFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelProgressiveFuture sync() throws InterruptedException;

View File

@ -25,16 +25,16 @@ import io.netty.util.concurrent.ProgressivePromise;
public interface ChannelProgressivePromise extends ProgressivePromise<Void>, ChannelProgressiveFuture, ChannelPromise {
@Override
ChannelProgressivePromise addListener(GenericFutureListener<? extends Future<Void>> listener);
ChannelProgressivePromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelProgressivePromise addListeners(GenericFutureListener<? extends Future<Void>>... listeners);
ChannelProgressivePromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelProgressivePromise removeListener(GenericFutureListener<? extends Future<Void>> listener);
ChannelProgressivePromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelProgressivePromise removeListeners(GenericFutureListener<? extends Future<Void>>... listeners);
ChannelProgressivePromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelProgressivePromise sync() throws InterruptedException;

View File

@ -38,16 +38,16 @@ public interface ChannelPromise extends ChannelFuture, Promise<Void> {
ChannelPromise setFailure(Throwable cause);
@Override
ChannelPromise addListener(GenericFutureListener<? extends Future<Void>> listener);
ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelPromise addListeners(GenericFutureListener<? extends Future<Void>>... listeners);
ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelPromise removeListener(GenericFutureListener<? extends Future<Void>> listener);
ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelPromise removeListeners(GenericFutureListener<? extends Future<Void>>... listeners);
ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelPromise sync() throws InterruptedException;

View File

@ -52,25 +52,25 @@ abstract class CompleteChannelFuture extends CompleteFuture<Void> implements Cha
}
@Override
public ChannelFuture addListener(GenericFutureListener<? extends Future<Void>> listener) {
public ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
super.addListener(listener);
return this;
}
@Override
public ChannelFuture addListeners(GenericFutureListener<? extends Future<Void>>... listeners) {
public ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) {
super.addListeners(listeners);
return this;
}
@Override
public ChannelFuture removeListener(GenericFutureListener<? extends Future<Void>> listener) {
public ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener) {
super.removeListener(listener);
return this;
}
@Override
public ChannelFuture removeListeners(GenericFutureListener<? extends Future<Void>>... listeners) {
public ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) {
super.removeListeners(listeners);
return this;
}

View File

@ -67,22 +67,22 @@ abstract class CompleteChannelPromise extends CompleteChannelFuture implements C
}
@Override
public ChannelPromise addListener(GenericFutureListener<? extends Future<Void>> listener) {
public ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
return (ChannelPromise) super.addListener(listener);
}
@Override
public ChannelPromise addListeners(GenericFutureListener<? extends Future<Void>>... listeners) {
public ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) {
return (ChannelPromise) super.addListeners(listeners);
}
@Override
public ChannelPromise removeListener(GenericFutureListener<? extends Future<Void>> listener) {
public ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener) {
return (ChannelPromise) super.removeListener(listener);
}
@Override
public ChannelPromise removeListeners(GenericFutureListener<? extends Future<Void>>... listeners) {
public ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) {
return (ChannelPromise) super.removeListeners(listeners);
}

View File

@ -97,25 +97,26 @@ public class DefaultChannelProgressivePromise
}
@Override
public ChannelProgressivePromise addListener(GenericFutureListener<? extends Future<Void>> listener) {
public ChannelProgressivePromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
super.addListener(listener);
return this;
}
@Override
public ChannelProgressivePromise addListeners(GenericFutureListener<? extends Future<Void>>... listeners) {
public ChannelProgressivePromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) {
super.addListeners(listeners);
return this;
}
@Override
public ChannelProgressivePromise removeListener(GenericFutureListener<? extends Future<Void>> listener) {
public ChannelProgressivePromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener) {
super.removeListener(listener);
return this;
}
@Override
public ChannelProgressivePromise removeListeners(GenericFutureListener<? extends Future<Void>>... listeners) {
public ChannelProgressivePromise removeListeners(
GenericFutureListener<? extends Future<? super Void>>... listeners) {
super.removeListeners(listeners);
return this;
}

View File

@ -89,25 +89,25 @@ public class DefaultChannelPromise extends DefaultPromise<Void> implements Chann
}
@Override
public ChannelPromise addListener(GenericFutureListener<? extends Future<Void>> listener) {
public ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
super.addListener(listener);
return this;
}
@Override
public ChannelPromise addListeners(GenericFutureListener<? extends Future<Void>>... listeners) {
public ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) {
super.addListeners(listeners);
return this;
}
@Override
public ChannelPromise removeListener(GenericFutureListener<? extends Future<Void>> listener) {
public ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener) {
super.removeListener(listener);
return this;
}
@Override
public ChannelPromise removeListeners(GenericFutureListener<? extends Future<Void>>... listeners) {
public ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) {
super.removeListeners(listeners);
return this;
}

View File

@ -17,7 +17,12 @@ package io.netty.channel;
import io.netty.util.concurrent.AbstractEventExecutorGroup;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.EmptyArrays;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ReadOnlyIterator;
@ -28,6 +33,7 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@ -44,6 +50,18 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i
final Queue<ThreadPerChannelEventLoop> idleChildren = new ConcurrentLinkedQueue<ThreadPerChannelEventLoop>();
private final ChannelException tooManyChannels;
private volatile boolean shuttingDown;
private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
private final FutureListener<Object> childTerminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
// Inefficient, but works.
if (isTerminated()) {
terminationFuture.setSuccess(null);
}
}
};
/**
* Create a new {@link ThreadPerChannelEventLoopGroup} with no limit in place.
*/
@ -117,24 +135,45 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i
}
@Override
public void shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
shuttingDown = true;
for (EventLoop l: activeChildren) {
l.shutdownGracefully(quietPeriod, timeout, unit);
}
for (EventLoop l: idleChildren) {
l.shutdownGracefully(quietPeriod, timeout, unit);
}
// Notify the future if there was no children.
if (isTerminated()) {
terminationFuture.trySuccess(null);
}
return terminationFuture();
}
@Override
public Future<?> terminationFuture() {
return terminationFuture;
}
@Override
@Deprecated
public void shutdown() {
shuttingDown = true;
for (EventLoop l: activeChildren) {
l.shutdown();
}
for (EventLoop l: idleChildren) {
l.shutdown();
}
// Notify the future if there was no children.
if (isTerminated()) {
terminationFuture.trySuccess(null);
}
}
@Override
@ -237,12 +276,17 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i
}
private EventLoop nextChild() throws Exception {
if (shuttingDown) {
throw new RejectedExecutionException("shutting down");
}
ThreadPerChannelEventLoop loop = idleChildren.poll();
if (loop == null) {
if (maxChannels > 0 && activeChildren.size() >= maxChannels) {
throw tooManyChannels;
}
loop = newChild(childArgs);
loop.terminationFuture().addListener(childTerminationListener);
}
activeChildren.add(loop);
return loop;

View File

@ -41,25 +41,25 @@ final class VoidChannelPromise extends AbstractFuture<Void> implements ChannelPr
}
@Override
public VoidChannelPromise addListener(GenericFutureListener<? extends Future<Void>> listener) {
public VoidChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
fail();
return this;
}
@Override
public VoidChannelPromise addListeners(GenericFutureListener<? extends Future<Void>>... listeners) {
public VoidChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) {
fail();
return this;
}
@Override
public VoidChannelPromise removeListener(GenericFutureListener<? extends Future<Void>> listener) {
public VoidChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener) {
// NOOP
return this;
}
@Override
public VoidChannelPromise removeListeners(GenericFutureListener<? extends Future<Void>>... listeners) {
public VoidChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) {
// NOOP
return this;
}

View File

@ -21,6 +21,7 @@ import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.AbstractEventExecutor;
import io.netty.util.concurrent.Future;
import java.util.ArrayDeque;
import java.util.Queue;
@ -50,11 +51,20 @@ final class EmbeddedEventLoop extends AbstractEventExecutor implements EventLoop
}
@Override
public void shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { }
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
public Future<?> terminationFuture() {
throw new UnsupportedOperationException();
}
@Override
@Deprecated
public void shutdown() { }
public void shutdown() {
throw new UnsupportedOperationException();
}
@Override
public boolean isShuttingDown() {

View File

@ -151,16 +151,16 @@ public interface ChannelGroupFuture extends Future<Void>, Iterable<ChannelFuture
boolean isPartialFailure();
@Override
ChannelGroupFuture addListener(GenericFutureListener<? extends Future<Void>> listener);
ChannelGroupFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelGroupFuture addListeners(GenericFutureListener<? extends Future<Void>>... listeners);
ChannelGroupFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelGroupFuture removeListener(GenericFutureListener<? extends Future<Void>> listener);
ChannelGroupFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelGroupFuture removeListeners(GenericFutureListener<? extends Future<Void>>... listeners);
ChannelGroupFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelGroupFuture await() throws InterruptedException;

View File

@ -153,25 +153,26 @@ final class DefaultChannelGroupFuture extends DefaultPromise<Void> implements Ch
}
@Override
public DefaultChannelGroupFuture addListener(GenericFutureListener<? extends Future<Void>> listener) {
public DefaultChannelGroupFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
super.addListener(listener);
return this;
}
@Override
public DefaultChannelGroupFuture addListeners(GenericFutureListener<? extends Future<Void>>... listeners) {
public DefaultChannelGroupFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) {
super.addListeners(listeners);
return this;
}
@Override
public DefaultChannelGroupFuture removeListener(GenericFutureListener<? extends Future<Void>> listener) {
public DefaultChannelGroupFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener) {
super.removeListener(listener);
return this;
}
@Override
public DefaultChannelGroupFuture removeListeners(GenericFutureListener<? extends Future<Void>>... listeners) {
public DefaultChannelGroupFuture removeListeners(
GenericFutureListener<? extends Future<? super Void>>... listeners) {
super.removeListeners(listeners);
return this;
}

View File

@ -74,6 +74,8 @@ public class BootstrapTest {
} finally {
groupA.shutdownGracefully();
groupB.shutdownGracefully();
groupA.terminationFuture().sync();
groupB.terminationFuture().sync();
}
}
@ -120,6 +122,8 @@ public class BootstrapTest {
} finally {
groupA.shutdownGracefully();
groupB.shutdownGracefully();
groupA.terminationFuture().sync();
groupB.terminationFuture().sync();
}
}

View File

@ -45,8 +45,8 @@ public class DefaultChannelPipelineTest {
private Channel peer;
@AfterClass
public static void afterClass() {
group.shutdownGracefully();
public static void afterClass() throws Exception {
group.shutdownGracefully().sync();
}
private void setUp(final ChannelHandler... handlers) throws Exception {

View File

@ -29,7 +29,7 @@ public class DefaultChannnelGroupTest {
// Test for #1183
@Test
public void testNotThrowBlockingOperationException() {
public void testNotThrowBlockingOperationException() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
@ -54,5 +54,7 @@ public class DefaultChannnelGroupTest {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
bossGroup.terminationFuture().sync();
workerGroup.terminationFuture().sync();
}
}

View File

@ -88,6 +88,9 @@ public class LocalChannelRegistryTest {
assertNull(String.format(
"Expected null, got channel '%s' for local address '%s'",
LocalChannelRegistry.get(addr), addr), LocalChannelRegistry.get(addr));
serverGroup.terminationFuture().sync();
clientGroup.terminationFuture().sync();
}
}

View File

@ -38,7 +38,6 @@ import java.util.HashSet;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public class LocalTransportThreadModelTest {
@ -69,8 +68,8 @@ public class LocalTransportThreadModelTest {
}
@AfterClass
public static void destroy() {
group.shutdownGracefully();
public static void destroy() throws Exception {
group.shutdownGracefully().sync();
}
@Test(timeout = 30000)
@ -216,9 +215,10 @@ public class LocalTransportThreadModelTest {
l.shutdownGracefully();
e1.shutdownGracefully();
e2.shutdownGracefully();
l.awaitTermination(5, TimeUnit.SECONDS);
e1.awaitTermination(5, TimeUnit.SECONDS);
e2.awaitTermination(5, TimeUnit.SECONDS);
l.terminationFuture().sync();
e1.terminationFuture().sync();
e2.terminationFuture().sync();
}
}
@ -344,6 +344,13 @@ public class LocalTransportThreadModelTest {
e3.shutdownGracefully();
e4.shutdownGracefully();
e5.shutdownGracefully();
l.terminationFuture().sync();
e1.terminationFuture().sync();
e2.terminationFuture().sync();
e3.terminationFuture().sync();
e4.terminationFuture().sync();
e5.terminationFuture().sync();
}
}

View File

@ -83,8 +83,8 @@ public class LocalTransportThreadModelTest3 {
}
@AfterClass
public static void destroy() {
group.shutdownGracefully();
public static void destroy() throws Exception {
group.shutdownGracefully().sync();
}
@Test(timeout = 60000)
@ -222,6 +222,13 @@ public class LocalTransportThreadModelTest3 {
e3.shutdownGracefully();
e4.shutdownGracefully();
e5.shutdownGracefully();
l.terminationFuture().sync();
e1.terminationFuture().sync();
e2.terminationFuture().sync();
e3.terminationFuture().sync();
e4.terminationFuture().sync();
e5.terminationFuture().sync();
}
}

View File

@ -36,7 +36,7 @@ public class NioDatagramChannelTest {
* Test try to reproduce issue #1335
*/
@Test
public void testBindMultiple() {
public void testBindMultiple() throws Exception {
DefaultChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
NioEventLoopGroup group = new NioEventLoopGroup();
try {
@ -57,8 +57,8 @@ public class NioDatagramChannelTest {
}
Assert.assertEquals(100, channelGroup.size());
} finally {
channelGroup.close().syncUninterruptibly();
group.shutdownGracefully();
channelGroup.close().sync();
group.shutdownGracefully().sync();
}
}
}