Introduce EventExecutor.shutdownGracefully() that deprecates shutdown()

shutdownGracefully() provides two optional parameters that give more
control over when an executor has to be shut down.

- Related issue: #1307
- Add shutdownGracefully(..) and isShuttingDown()
- Deprecate shutdown() / shutdownNow()
- Replace lastAccessTime with lastExecutionTime and update it after task
  execution for accurate quiet period check
  - runAllTasks() and runShutdownTasks() update it automatically.
  - Add updateLastExecutionTime() so that subclasses can update it
- Add a constructor parameter that tells not to add an unncessary wakeup
  task in execute() if addTask() wakes up the executor thread
  automatically.  Previously, execute() always called wakeup() after
  addTask(), which often caused an extra dummy task in the task queue.
- Use shutdownGracefully() wherever possible / Deprecation javadoc
- Reduce the running time of SingleThreadEventLoopTest from 40s to 15s
  using custom graceful shutdown parameters

- Other changes made along with this commit:
  - takeTask() does not throw InterruptedException anymore.
    - Returns null on interruption or wakeup
  - Make sure runShutdownTasks() return true even if an exception was
    raised while running the shutdown tasks
  - Remove unnecessary isShutdown() checks
  - Consistent use of SingleThreadEventExecutor.nanoTime()

Replace isWakeupOverridden with a constructor parameter
This commit is contained in:
Trustin Lee 2013-04-27 16:39:19 +09:00
parent 9128d4f16a
commit 23d0178494
73 changed files with 533 additions and 294 deletions

View File

@ -15,7 +15,9 @@
*/
package io.netty.util.concurrent;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
@ -37,6 +39,28 @@ public abstract class AbstractEventExecutor extends AbstractExecutorService impl
return new EventExecutorIterator();
}
@Override
public void shutdownGracefully() {
shutdownGracefully(2, 15, TimeUnit.SECONDS);
}
/**
* @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead.
*/
@Override
@Deprecated
public abstract void shutdown();
/**
* @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead.
*/
@Override
@Deprecated
public List<Runnable> shutdownNow() {
shutdown();
return Collections.emptyList();
}
@Override
public <V> Promise<V> newPromise() {
return new DefaultPromise<V>(this);

View File

@ -65,6 +65,22 @@ public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {
}
@Override
public void shutdownGracefully() {
shutdownGracefully(2, 15, TimeUnit.SECONDS);
}
/**
* @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead.
*/
@Override
@Deprecated
public abstract void shutdown();
/**
* @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead.
*/
@Override
@Deprecated
public List<Runnable> shutdownNow() {
shutdown();
return Collections.emptyList();

View File

@ -25,21 +25,19 @@ import java.util.concurrent.ThreadFactory;
final class DefaultEventExecutor extends SingleThreadEventExecutor {
DefaultEventExecutor(DefaultEventExecutorGroup parent, ThreadFactory threadFactory) {
super(parent, threadFactory);
super(parent, threadFactory, true);
}
@Override
protected void run() {
for (;;) {
Runnable task;
try {
task = takeTask();
Runnable task = takeTask();
if (task != null) {
task.run();
} catch (InterruptedException e) {
// Waken up by interruptThread()
updateLastExecutionTime();
}
if (isShutdown() && confirmShutdown()) {
if (confirmShutdown()) {
break;
}
}

View File

@ -507,7 +507,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
});
}
} catch (Throwable t) {
logger.error("Failed to notify listener(s). Event loop terminated?", t);
logger.error("Failed to notify listener(s). Event loop shut down?", t);
}
}
}
@ -544,7 +544,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
}
});
} catch (Throwable t) {
logger.error("Failed to notify a listener. Event loop terminated?", t);
logger.error("Failed to notify a listener. Event loop shut down?", t);
}
}
@ -644,7 +644,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
});
}
} catch (Throwable t) {
logger.error("Failed to notify listener(s). Event loop terminated?", t);
logger.error("Failed to notify listener(s). Event loop shut down?", t);
}
}
}

View File

@ -16,6 +16,7 @@
package io.netty.util.concurrent;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -28,6 +29,45 @@ import java.util.concurrent.TimeUnit;
*/
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
/**
* Returns {@code true} if and only if this executor was started to be
* {@linkplain #shutdownGracefully() shut down gracefuclly} or was {@linkplain #isShutdown() shut down}.
*/
boolean isShuttingDown();
/**
* Shortcut method for {@link #shutdownGracefully(long, long, TimeUnit)} with sensible default values.
*/
void shutdownGracefully();
/**
* Signals this executor that the caller wants the executor to be shut down. Once this method is called,
* {@link #isShuttingDown()} starts to return {@code true}, and the executor prepares to shut itself down.
* Unlike {@link #shutdown()}, graceful shutdown ensures that no tasks are submitted for <i>'the quiet period'</i>
* (usually a couple seconds) before it shuts itself down. If a task is submitted during the quiet period,
* it is guaranteed to be accepted and the quiet period will start over.
*
* @param quietPeriod the quiet period as described in the documentation
* @param timeout the maximum amount of time to wait until the executor is {@linkplain #shutdown()}
* regardless if a task was submitted during the quiet period
* @param unit the unit of {@code quietPeriod} and {@code timeout}
*/
void shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
/**
* @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead.
*/
@Override
@Deprecated
void shutdown();
/**
* @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead.
*/
@Override
@Deprecated
List<Runnable> shutdownNow();
/**
* Returns one of the {@link EventExecutor}s that belong to this group.
*/

View File

@ -60,7 +60,19 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdown();
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
break;
}
}
}
}
@ -107,16 +119,30 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto
ThreadFactory threadFactory, Object... args) throws Exception;
@Override
public void shutdown() {
if (isShutdown()) {
return;
public void shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
for (EventExecutor l: children) {
l.shutdownGracefully(quietPeriod, timeout, unit);
}
}
@Override
@Deprecated
public void shutdown() {
for (EventExecutor l: children) {
l.shutdown();
}
}
@Override
public boolean isShuttingDown() {
for (EventExecutor l: children) {
if (!l.isShuttingDown()) {
return false;
}
}
return true;
}
@Override
public boolean isShutdown() {
for (EventExecutor l: children) {

View File

@ -19,7 +19,6 @@ import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
@ -48,19 +47,14 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
/**
* Wait at least 2 seconds after shutdown() until there are no pending tasks anymore.
* @see #confirmShutdown()
*/
private static final long SHUTDOWN_DELAY_NANOS = TimeUnit.SECONDS.toNanos(2);
static final ThreadLocal<SingleThreadEventExecutor> CURRENT_EVENT_LOOP =
new ThreadLocal<SingleThreadEventExecutor>();
private static final int ST_NOT_STARTED = 1;
private static final int ST_STARTED = 2;
private static final int ST_SHUTDOWN = 3;
private static final int ST_TERMINATED = 4;
private static final int ST_SHUTTING_DOWN = 3;
private static final int ST_SHUTDOWN = 4;
private static final int ST_TERMINATED = 5;
private static final Runnable WAKEUP_TASK = new Runnable() {
@Override
@ -84,36 +78,50 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
private final Object stateLock = new Object();
private final Semaphore threadLock = new Semaphore(0);
private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
private final boolean addTaskWakesUp;
private long lastExecutionTime;
private volatile int state = ST_NOT_STARTED;
private long lastAccessTimeNanos;
private volatile long gracefulShutdownQuietPeriod;
private volatile long gracefulShutdownTimeout;
private long gracefulShutdownStartTime;
/**
* Create a new instance
*
* @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
* @param threadFactory the {@link ThreadFactory} which will be used for the used {@link Thread}
* @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
* executor thread
*/
protected SingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory) {
protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.parent = parent;
this.addTaskWakesUp = addTaskWakesUp;
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
CURRENT_EVENT_LOOP.set(SingleThreadEventExecutor.this);
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
shutdown();
} finally {
if (state < ST_SHUTTING_DOWN) {
state = ST_SHUTTING_DOWN;
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && lastAccessTimeNanos == 0) {
if (success && gracefulShutdownStartTime == 0) {
logger.error(
"Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
@ -187,11 +195,14 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
/**
* Take the next {@link Runnable} from the task queue and so will block if no task is currently present.
*
* <p>
* Be aware that this method will throw an {@link UnsupportedOperationException} if the task queue, which was
* created via {@link #newTaskQueue()}, does not implement {@link BlockingQueue}.
* </p>
*
* @return {@code null} if the executor thread has been interrupted or waken up.
*/
protected Runnable takeTask() throws InterruptedException {
protected Runnable takeTask() {
assert inEventLoop();
if (!(taskQueue instanceof BlockingQueue)) {
throw new UnsupportedOperationException();
@ -201,12 +212,25 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
for (;;) {
ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
if (delayedTask == null) {
return taskQueue.take();
Runnable task = null;
try {
task = taskQueue.take();
if (task == WAKEUP_TASK) {
task = null;
}
} catch (InterruptedException e) {
// Ignore
}
return task;
} else {
long delayNanos = delayedTask.delayNanos();
Runnable task;
if (delayNanos > 0) {
try {
task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
return null;
}
} else {
task = taskQueue.poll();
}
@ -278,7 +302,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
if (task == null) {
throw new NullPointerException("task");
}
if (isTerminated()) {
if (isShutdown()) {
reject();
}
taskQueue.add(task);
@ -315,6 +339,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
task = pollTask();
if (task == null) {
lastExecutionTime = nanoTime();
return true;
}
}
@ -331,8 +356,9 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
return false;
}
final long deadline = System.nanoTime() + timeoutNanos;
final long deadline = nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
try {
task.run();
@ -342,20 +368,23 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
runTasks ++;
// Check timeout every 64 tasks because System.nanoTime() is relatively expensive.
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
if (System.nanoTime() >= deadline) {
lastExecutionTime = nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = nanoTime();
break;
}
}
this.lastExecutionTime = lastExecutionTime;
return true;
}
@ -371,6 +400,17 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
return delayedTask.delayNanos(currentTimeNanos);
}
/**
* Updates the internal timestamp that tells when a submitted task was executed most recently.
* {@link #runAllTasks()} and {@link #runAllTasks(long)} updates this timestamp automatically, and thus there's
* usually no need to call this method. However, if you take the tasks manually using {@link #takeTask()} or
* {@link #pollTask()}, you have to call this method at the end of task execution loop for accurate quiet period
* checks.
*/
protected void updateLastExecutionTime() {
lastExecutionTime = nanoTime();
}
/**
*
*/
@ -384,8 +424,8 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
}
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop || state == ST_SHUTDOWN) {
addTask(WAKEUP_TASK);
if (!inEventLoop || state == ST_SHUTTING_DOWN) {
taskQueue.add(WAKEUP_TASK);
}
}
@ -440,16 +480,74 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
for (Runnable task: copy) {
try {
task.run();
ran = true;
} catch (Throwable t) {
logger.warn("Shutdown hook raised an exception.", t);
} finally {
ran = true;
}
}
}
if (ran) {
lastExecutionTime = nanoTime();
}
return ran;
}
@Override
public void shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
if (quietPeriod < 0) {
throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)");
}
if (timeout < quietPeriod) {
throw new IllegalArgumentException(
"timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (isShuttingDown()) {
return;
}
boolean inEventLoop = inEventLoop();
boolean wakeup = true;
synchronized (stateLock) {
if (isShuttingDown()) {
return;
}
gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
gracefulShutdownTimeout = unit.toNanos(timeout);
if (inEventLoop) {
assert state == ST_STARTED;
state = ST_SHUTTING_DOWN;
} else {
switch (state) {
case ST_NOT_STARTED:
state = ST_SHUTTING_DOWN;
thread.start();
break;
case ST_STARTED:
state = ST_SHUTTING_DOWN;
break;
default:
wakeup = false;
}
}
}
if (wakeup) {
wakeup(inEventLoop);
}
}
@Override
@Deprecated
public void shutdown() {
if (isShutdown()) {
return;
@ -458,19 +556,22 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
boolean inEventLoop = inEventLoop();
boolean wakeup = true;
if (inEventLoop) {
synchronized (stateLock) {
assert state == ST_STARTED;
state = ST_SHUTDOWN;
if (isShutdown()) {
return;
}
if (inEventLoop) {
assert state == ST_STARTED || state == ST_SHUTTING_DOWN;
state = ST_SHUTDOWN;
} else {
synchronized (stateLock) {
switch (state) {
case ST_NOT_STARTED:
state = ST_SHUTDOWN;
thread.start();
break;
case ST_STARTED:
case ST_SHUTTING_DOWN:
state = ST_SHUTDOWN;
break;
default:
@ -485,9 +586,8 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
}
@Override
public List<Runnable> shutdownNow() {
shutdown();
return Collections.emptyList();
public boolean isShuttingDown() {
return state >= ST_SHUTTING_DOWN;
}
@Override
@ -504,27 +604,38 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
* Confirm that the shutdown if the instance should be done now!
*/
protected boolean confirmShutdown() {
if (!isShutdown()) {
throw new IllegalStateException("must be invoked after shutdown()");
if (!isShuttingDown()) {
return false;
}
if (!inEventLoop()) {
throw new IllegalStateException("must be invoked from an event loop");
}
cancelDelayedTasks();
if (gracefulShutdownStartTime == 0) {
gracefulShutdownStartTime = nanoTime();
}
if (runAllTasks() || runShutdownHooks()) {
// There were tasks in the queue. Wait a little bit more until no tasks are queued for SHUTDOWN_DELAY_NANOS.
lastAccessTimeNanos = 0;
if (isShutdown()) {
// Executor shut down - no new tasks anymore.
return true;
}
// There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period.
wakeup(true);
return false;
}
if (lastAccessTimeNanos == 0 || System.nanoTime() - lastAccessTimeNanos < SHUTDOWN_DELAY_NANOS) {
if (lastAccessTimeNanos == 0) {
lastAccessTimeNanos = System.nanoTime();
final long nanoTime = nanoTime();
if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
return true;
}
if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
// Check if any tasks were added to the queue every 100ms.
// TODO: Change the behavior of takeTask() so that it returns on timeout.
wakeup(true);
@ -537,7 +648,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
return false;
}
// No tasks were added for last SHUTDOWN_DELAY_NANOS - hopefully safe to shut down.
// No tasks were added for last quiet period - hopefully safe to shut down.
// (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.)
return true;
}
@ -580,16 +691,19 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
throw new NullPointerException("task");
}
if (inEventLoop()) {
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
wakeup(true);
} else {
startThread();
addTask(task);
if (isTerminated() && removeTask(task)) {
if (isShutdown() && removeTask(task)) {
reject();
}
wakeup(false);
}
if (!addTaskWakesUp) {
wakeup(inEventLoop);
}
}

View File

@ -65,10 +65,10 @@ public class AppletDiscardServer extends JApplet {
public void destroy() {
super.destroy();
if (bossGroup != null) {
bossGroup.shutdown();
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdown();
workerGroup.shutdownGracefully();
}
}

View File

@ -50,7 +50,7 @@ public class DiscardClient {
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
group.shutdown();
group.shutdownGracefully();
}
}

View File

@ -56,8 +56,8 @@ public class DiscardServer {
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdown();
bossGroup.shutdown();
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}

View File

@ -68,7 +68,7 @@ public class EchoClient {
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdown();
group.shutdownGracefully();
}
}

View File

@ -63,8 +63,8 @@ public class EchoServer {
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdown();
workerGroup.shutdown();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

View File

@ -56,7 +56,7 @@ public class FactorialClient {
System.err.format(
"Factorial of %,d is: %,d", count, handler.getFactorial());
} finally {
group.shutdown();
group.shutdownGracefully();
}
}

View File

@ -43,8 +43,8 @@ public class FactorialServer {
b.bind(port).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdown();
workerGroup.shutdown();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

View File

@ -80,8 +80,8 @@ public class FileServer {
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdown();
workerGroup.shutdown();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

View File

@ -39,8 +39,8 @@ public class HttpStaticFileServer {
b.bind(port).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdown();
workerGroup.shutdown();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

View File

@ -93,7 +93,7 @@ public class HttpSnoopClient {
ch.closeFuture().sync();
} finally {
// Shut down executor threads to exit.
group.shutdown();
group.shutdownGracefully();
}
}

View File

@ -46,8 +46,8 @@ public class HttpSnoopServer {
Channel ch = b.bind(port).sync().channel();
ch.closeFuture().sync();
} finally {
bossGroup.shutdown();
workerGroup.shutdown();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

View File

@ -143,7 +143,7 @@ public class HttpUploadClient {
formPostMultipart(b, host, port, uriFile, factory, headers, bodylist);
} finally {
// Shut down executor threads to exit.
group.shutdown();
group.shutdownGracefully();
// Really clean all temporary files if they still exist
factory.cleanAllHttpDatas();

View File

@ -47,8 +47,8 @@ public class HttpUploadServer {
ch.closeFuture().sync();
} finally {
bossGroup.shutdown();
workerGroup.shutdown();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

View File

@ -46,8 +46,8 @@ public class AutobahnServer {
System.out.println("Web Socket Server started at port " + port);
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdown();
workerGroup.shutdown();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

View File

@ -119,7 +119,7 @@ public class WebSocketClient {
// responds to the CloseWebSocketFrame.
ch.closeFuture().sync();
} finally {
group.shutdown();
group.shutdownGracefully();
}
}

View File

@ -81,8 +81,8 @@ public class WebSocketServer {
ch.closeFuture().sync();
} finally {
bossGroup.shutdown();
workerGroup.shutdown();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

View File

@ -63,8 +63,8 @@ public class WebSocketServer {
ch.closeFuture().sync();
} finally {
bossGroup.shutdown();
workerGroup.shutdown();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

View File

@ -61,8 +61,8 @@ public class WebSocketSslServer {
System.out.println("Open your browser and navigate to https://localhost:" + port + '/');
ch.closeFuture().sync();
} finally {
bossGroup.shutdown();
workerGroup.shutdown();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

View File

@ -105,8 +105,8 @@ public class LocalEcho {
lastWriteFuture.awaitUninterruptibly();
}
} finally {
serverGroup.shutdown();
clientGroup.shutdown();
serverGroup.shutdownGracefully();
clientGroup.shutdownGracefully();
}
}

View File

@ -60,7 +60,7 @@ public class ObjectEchoClient {
// Start the connection attempt.
b.connect(host, port).sync().channel().closeFuture().sync();
} finally {
group.shutdown();
group.shutdownGracefully();
}
}

View File

@ -57,8 +57,8 @@ public class ObjectEchoServer {
// Bind and start to accept incoming connections.
b.bind(port).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdown();
workerGroup.shutdown();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

View File

@ -54,8 +54,8 @@ public class PortUnificationServer {
// Bind and start to accept incoming connections.
b.bind(port).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdown();
workerGroup.shutdown();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

View File

@ -49,8 +49,8 @@ public class HexDumpProxy {
.childOption(ChannelOption.AUTO_READ, false)
.bind(localPort).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdown();
workerGroup.shutdown();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

View File

@ -64,7 +64,7 @@ public class QuoteOfTheMomentClient {
System.err.println("QOTM request timed out.");
}
} finally {
group.shutdown();
group.shutdownGracefully();
}
}

View File

@ -46,7 +46,7 @@ public class QuoteOfTheMomentServer {
b.bind(port).sync().channel().closeFuture().await();
} finally {
group.shutdown();
group.shutdownGracefully();
}
}

View File

@ -54,7 +54,7 @@ public final class RxtxClient {
f.channel().closeFuture().sync();
} finally {
group.shutdown();
group.shutdownGracefully();
}
}

View File

@ -70,7 +70,7 @@ public class NioSctpEchoClient {
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdown();
group.shutdownGracefully();
}
}

View File

@ -63,8 +63,8 @@ public class NioSctpEchoServer {
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdown();
workerGroup.shutdown();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

View File

@ -70,7 +70,7 @@ public class OioSctpEchoClient {
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdown();
group.shutdownGracefully();
}
}

View File

@ -63,8 +63,8 @@ public class OioSctpEchoServer {
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdown();
workerGroup.shutdown();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

View File

@ -76,7 +76,7 @@ public class SecureChatClient {
}
} finally {
// The connection is closed automatically on shutdown.
group.shutdown();
group.shutdownGracefully();
}
}

View File

@ -43,8 +43,8 @@ public class SecureChatServer {
b.bind(port).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdown();
workerGroup.shutdown();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

View File

@ -39,8 +39,8 @@ public final class SocksServer {
.childHandler(new SocksServerInitializer());
b.bind(localPort).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdown();
workerGroup.shutdown();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

View File

@ -74,7 +74,7 @@ public class TelnetClient {
lastWriteFuture.sync();
}
} finally {
group.shutdown();
group.shutdownGracefully();
}
}

View File

@ -42,8 +42,8 @@ public class TelnetServer {
b.bind(port).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdown();
workerGroup.shutdown();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

View File

@ -77,7 +77,7 @@ public class ByteEchoClient {
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
connectGroup.shutdown();
connectGroup.shutdownGracefully();
}
}

View File

@ -73,8 +73,8 @@ public class ByteEchoServer {
future.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
acceptGroup.shutdown();
connectGroup.shutdown();
acceptGroup.shutdownGracefully();
connectGroup.shutdownGracefully();
}
}

View File

@ -77,7 +77,7 @@ public class MsgEchoClient {
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
connectGroup.shutdown();
connectGroup.shutdownGracefully();
}
}

View File

@ -73,8 +73,8 @@ public class MsgEchoServer {
future.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
acceptGroup.shutdown();
connectGroup.shutdown();
acceptGroup.shutdownGracefully();
connectGroup.shutdownGracefully();
}
}

View File

@ -70,7 +70,7 @@ public abstract class MsgEchoPeerBase {
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
connectGroup.shutdown();
connectGroup.shutdownGracefully();
}
}

View File

@ -67,7 +67,7 @@ public class ByteEchoPeerBase {
final ChannelFuture future = bootstrap.connect(peerAddress, myAddress).sync();
future.channel().closeFuture().sync();
} finally {
connectGroup.shutdown();
connectGroup.shutdownGracefully();
}
}
}

View File

@ -72,7 +72,7 @@ public class WorldClockClient {
System.out.format("%28s: %s%n", i1.next(), i2.next());
}
} finally {
group.shutdown();
group.shutdownGracefully();
}
}

View File

@ -43,8 +43,8 @@ public class WorldClockServer {
b.bind(port).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdown();
workerGroup.shutdown();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

View File

@ -55,7 +55,7 @@ public class SocketEchoTest extends AbstractSocketTest {
@AfterClass
public static void destroyGroup() {
group.shutdown();
group.shutdownGracefully();
}
@Test(timeout = 30000)

View File

@ -23,8 +23,6 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
@ -33,6 +31,8 @@ import io.netty.handler.logging.ByteLoggingHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.ssl.SslHandler;
import io.netty.testsuite.util.BogusSslContextFactory;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -56,7 +56,7 @@ public class SocketStartTlsTest extends AbstractSocketTest {
@AfterClass
public static void shutdownExecutor() {
executor.shutdown();
executor.shutdownGracefully();
}
@Test(timeout = 30000)

View File

@ -123,8 +123,8 @@ public final class UdtNetty {
Thread.sleep(1000);
group1.shutdown();
group2.shutdown();
group1.shutdownGracefully();
group2.shutdownGracefully();
Metrics.defaultRegistry().shutdown();

View File

@ -113,7 +113,7 @@ public class NioUdtByteRendezvousChannelTest extends AbstractUdtTest {
assertEquals(handler1.meter().count(), handler2.meter().count());
group1.shutdown();
group2.shutdown();
group1.shutdownGracefully();
group2.shutdownGracefully();
}
}

View File

@ -107,7 +107,7 @@ public class NioUdtMessageRendezvousChannelTest extends AbstractUdtTest {
assertEquals(handler1.meter().count(), handler2.meter().count());
group1.shutdown();
group2.shutdown();
group1.shutdownGracefully();
group2.shutdownGracefully();
}
}

View File

@ -25,9 +25,8 @@ import java.util.concurrent.ThreadFactory;
*/
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
protected SingleThreadEventLoop(
EventLoopGroup parent, ThreadFactory threadFactory) {
super(parent, threadFactory);
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
super(parent, threadFactory, addTaskWakesUp);
}
@Override

View File

@ -26,7 +26,7 @@ public class ThreadPerChannelEventLoop extends SingleThreadEventLoop {
private Channel ch;
public ThreadPerChannelEventLoop(ThreadPerChannelEventLoopGroup parent) {
super(parent, parent.threadFactory);
super(parent, parent.threadFactory, true);
this.parent = parent;
}
@ -48,16 +48,14 @@ public class ThreadPerChannelEventLoop extends SingleThreadEventLoop {
@Override
protected void run() {
for (;;) {
Runnable task;
try {
task = takeTask();
Runnable task = takeTask();
if (task != null) {
task.run();
} catch (InterruptedException e) {
// Waken up by interruptThread()
updateLastExecutionTime();
}
Channel ch = this.ch;
if (isShutdown()) {
if (isShuttingDown()) {
if (ch != null) {
ch.unsafe().close(ch.unsafe().voidFuture());
}

View File

@ -117,6 +117,17 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i
}
@Override
public void shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
for (EventLoop l: activeChildren) {
l.shutdownGracefully(quietPeriod, timeout, unit);
}
for (EventLoop l: idleChildren) {
l.shutdownGracefully(quietPeriod, timeout, unit);
}
}
@Override
@Deprecated
public void shutdown() {
for (EventLoop l: activeChildren) {
l.shutdown();
@ -126,6 +137,21 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i
}
}
@Override
public boolean isShuttingDown() {
for (EventLoop l: activeChildren) {
if (!l.isShuttingDown()) {
return false;
}
}
for (EventLoop l: idleChildren) {
if (!l.isShuttingDown()) {
return false;
}
}
return true;
}
@Override
public boolean isShutdown() {
for (EventLoop l: activeChildren) {

View File

@ -59,7 +59,7 @@ final class AioEventLoop extends SingleThreadEventLoop {
};
AioEventLoop(AioEventLoopGroup parent, ThreadFactory threadFactory) {
super(parent, threadFactory);
super(parent, threadFactory, true);
}
@Override
@ -75,15 +75,13 @@ final class AioEventLoop extends SingleThreadEventLoop {
@Override
protected void run() {
for (;;) {
Runnable task;
try {
task = takeTask();
Runnable task = takeTask();
if (task != null) {
task.run();
} catch (InterruptedException e) {
// Waken up by interruptThread()
updateLastExecutionTime();
}
if (isShutdown()) {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;

View File

@ -73,6 +73,8 @@ public class AioEventLoopGroup extends MultithreadEventLoopGroup {
}
@Override
@Deprecated
@SuppressWarnings("deprecation")
public void shutdown() {
boolean interrupted = false;

View File

@ -23,8 +23,6 @@ import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.AbstractEventExecutor;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
@ -52,13 +50,15 @@ final class EmbeddedEventLoop extends AbstractEventExecutor implements EventLoop
}
@Override
public void shutdown() {
// NOOP
}
public void shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { }
@Override
public List<Runnable> shutdownNow() {
return Collections.emptyList();
@Deprecated
public void shutdown() { }
@Override
public boolean isShuttingDown() {
return false;
}
@Override

View File

@ -21,21 +21,16 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.FileRegion;
import io.netty.channel.ServerChannel;
import io.netty.util.concurrent.AbstractEventExecutor;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.internal.PlatformDependent;
import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -312,54 +307,4 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
return getClass().getSimpleName() +
"(name: " + name() + ", size: " + size() + ')';
}
static final class ImmediateEventExecutor extends AbstractEventExecutor {
@Override
public EventExecutorGroup parent() {
return null;
}
@Override
public boolean inEventLoop() {
return true;
}
@Override
public boolean inEventLoop(Thread thread) {
return true;
}
@Override
public void shutdown() {
}
@Override
public boolean isShutdown() {
return false;
}
@Override
public boolean isTerminated() {
return false;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) {
return false;
}
@Override
public List<Runnable> shutdownNow() {
return Collections.emptyList();
}
@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException("command");
}
command.run();
}
}
}

View File

@ -235,7 +235,7 @@ final class DefaultChannelGroupFuture extends DefaultPromise<Void> implements Ch
@Override
protected void checkDeadLock() {
EventExecutor e = executor();
if (e != null && !(e instanceof DefaultChannelGroup.ImmediateEventExecutor) && e.inEventLoop()) {
if (e != null && !(e instanceof ImmediateEventExecutor) && e.inEventLoop()) {
throw new BlockingOperationException();
}
}

View File

@ -21,8 +21,6 @@ import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Promise;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
final class ImmediateEventExecutor extends AbstractEventExecutor {
@ -43,7 +41,15 @@ final class ImmediateEventExecutor extends AbstractEventExecutor {
}
@Override
public void shutdown() {
public void shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { }
@Override
@Deprecated
public void shutdown() { }
@Override
public boolean isShuttingDown() {
return false;
}
@Override
@ -61,11 +67,6 @@ final class ImmediateEventExecutor extends AbstractEventExecutor {
return false;
}
@Override
public List<Runnable> shutdownNow() {
return Collections.emptyList();
}
@Override
public void execute(Runnable command) {
if (command == null) {

View File

@ -185,10 +185,12 @@ public class LocalChannel extends AbstractChannel {
}
// Update all internal state before the closeFuture is notified.
if (localAddress != null) {
if (parent() == null) {
LocalChannelRegistry.unregister(localAddress);
}
localAddress = null;
}
state = 3;
}

View File

@ -22,21 +22,19 @@ import java.util.concurrent.ThreadFactory;
final class LocalEventLoop extends SingleThreadEventLoop {
LocalEventLoop(LocalEventLoopGroup parent, ThreadFactory threadFactory) {
super(parent, threadFactory);
super(parent, threadFactory, true);
}
@Override
protected void run() {
for (;;) {
Runnable task;
try {
task = takeTask();
Runnable task = takeTask();
if (task != null) {
task.run();
} catch (InterruptedException e) {
// Waken up by interruptThread()
updateLastExecutionTime();
}
if (isShutdown() && confirmShutdown()) {
if (confirmShutdown()) {
break;
}
}

View File

@ -48,11 +48,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public final class NioEventLoop extends SingleThreadEventLoop {
/**
* Internal Netty logger.
*/
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioEventLoop.class);
private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioEventLoop.class);
private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
@ -109,9 +105,8 @@ public final class NioEventLoop extends SingleThreadEventLoop {
private int cancelledKeys;
private boolean needsToSelectAgain;
NioEventLoop(
NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
super(parent, threadFactory);
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
super(parent, threadFactory, false);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
@ -330,7 +325,7 @@ public final class NioEventLoop extends SingleThreadEventLoop {
final int ioRatio = this.ioRatio;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
if (isShutdown()) {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
@ -438,7 +433,7 @@ public final class NioEventLoop extends SingleThreadEventLoop {
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
processWritable(k, ch);
processWritable(ch);
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
@ -457,7 +452,7 @@ public final class NioEventLoop extends SingleThreadEventLoop {
}
}
private static void processWritable(SelectionKey k, AbstractNioChannel ch) {
private static void processWritable(AbstractNioChannel ch) {
NioTask<SelectableChannel> task;
for (;;) {
task = ch.writableTasks.poll();

View File

@ -73,8 +73,8 @@ public class BootstrapTest {
f.sync();
}
} finally {
groupA.shutdown();
groupB.shutdown();
groupA.shutdownGracefully();
groupB.shutdownGracefully();
}
}
@ -119,8 +119,8 @@ public class BootstrapTest {
f.sync();
}
} finally {
groupA.shutdown();
groupB.shutdown();
groupA.shutdownGracefully();
groupB.shutdownGracefully();
}
}

View File

@ -16,6 +16,7 @@
package io.netty.channel;
import io.netty.channel.local.LocalChannel;
import io.netty.util.concurrent.EventExecutor;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -36,6 +37,11 @@ import static org.junit.Assert.*;
public class SingleThreadEventLoopTest {
private static final Runnable NOOP = new Runnable() {
@Override
public void run() { }
};
private SingleThreadEventLoopA loopA;
private SingleThreadEventLoopB loopB;
@ -47,11 +53,11 @@ public class SingleThreadEventLoopTest {
@After
public void stopEventLoop() {
if (!loopA.isShutdown()) {
loopA.shutdown();
if (!loopA.isShuttingDown()) {
loopA.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
}
if (!loopB.isShutdown()) {
loopB.shutdown();
if (!loopB.isShuttingDown()) {
loopB.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
}
while (!loopA.isTerminated()) {
@ -73,11 +79,14 @@ public class SingleThreadEventLoopTest {
}
@Test
@SuppressWarnings("deprecation")
public void shutdownBeforeStart() throws Exception {
loopA.shutdown();
assertRejection(loopA);
}
@Test
@SuppressWarnings("deprecation")
public void shutdownAfterStart() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
loopA.execute(new Runnable() {
@ -92,6 +101,7 @@ public class SingleThreadEventLoopTest {
// Request the event loop thread to stop.
loopA.shutdown();
assertRejection(loopA);
assertTrue(loopA.isShutdown());
@ -101,6 +111,15 @@ public class SingleThreadEventLoopTest {
}
}
private static void assertRejection(EventExecutor loop) {
try {
loop.execute(NOOP);
fail("A task must be rejected after shutdown() is called.");
} catch (RejectedExecutionException e) {
// Expected
}
}
@Test
public void scheduleTaskA() throws Exception {
testScheduleTask(loopA);
@ -254,6 +273,7 @@ public class SingleThreadEventLoopTest {
}
@Test
@SuppressWarnings("deprecation")
public void shutdownWithPendingTasks() throws Exception {
final int NUM_TASKS = 3;
final AtomicInteger ranTasks = new AtomicInteger();
@ -298,12 +318,9 @@ public class SingleThreadEventLoopTest {
}
@Test(timeout = 10000)
public void testRegistrationAfterTermination() throws Exception {
@SuppressWarnings("deprecation")
public void testRegistrationAfterShutdown() throws Exception {
loopA.shutdown();
while (!loopA.isTerminated()) {
loopA.awaitTermination(1, TimeUnit.DAYS);
}
ChannelFuture f = loopA.register(new LocalChannel());
f.awaitUninterruptibly();
assertFalse(f.isSuccess());
@ -311,12 +328,9 @@ public class SingleThreadEventLoopTest {
}
@Test(timeout = 10000)
public void testRegistrationAfterTermination2() throws Exception {
@SuppressWarnings("deprecation")
public void testRegistrationAfterShutdown2() throws Exception {
loopA.shutdown();
while (!loopA.isTerminated()) {
loopA.awaitTermination(1, TimeUnit.DAYS);
}
final CountDownLatch latch = new CountDownLatch(1);
Channel ch = new LocalChannel();
ChannelPromise promise = ch.newPromise();
@ -336,26 +350,69 @@ public class SingleThreadEventLoopTest {
assertFalse(latch.await(1, TimeUnit.SECONDS));
}
@Test(timeout = 5000)
public void testGracefulShutdownQuietPeriod() throws Exception {
loopA.shutdownGracefully(1, Integer.MAX_VALUE, TimeUnit.SECONDS);
// Keep Scheduling tasks for another 2 seconds.
for (int i = 0; i < 20; i ++) {
Thread.sleep(100);
loopA.execute(NOOP);
}
long startTime = System.nanoTime();
assertThat(loopA.isShuttingDown(), is(true));
assertThat(loopA.isShutdown(), is(false));
while (!loopA.isTerminated()) {
loopA.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
assertTrue(System.nanoTime() - startTime >= TimeUnit.SECONDS.toNanos(1));
}
@Test(timeout = 5000)
public void testGracefulShutdownTimeout() throws Exception {
loopA.shutdownGracefully(2, 2, TimeUnit.SECONDS);
// Keep Scheduling tasks for another 3 seconds.
// Submitted tasks must be rejected after 2 second timeout.
for (int i = 0; i < 10; i ++) {
Thread.sleep(100);
loopA.execute(NOOP);
}
try {
for (int i = 0; i < 20; i ++) {
Thread.sleep(100);
loopA.execute(NOOP);
}
fail("shutdownGracefully() must reject a task after timeout.");
} catch (RejectedExecutionException e) {
// Expected
}
assertThat(loopA.isShuttingDown(), is(true));
assertThat(loopA.isShutdown(), is(true));
}
private static class SingleThreadEventLoopA extends SingleThreadEventLoop {
final AtomicInteger cleanedUp = new AtomicInteger();
SingleThreadEventLoopA() {
super(null, Executors.defaultThreadFactory());
super(null, Executors.defaultThreadFactory(), true);
}
@Override
protected void run() {
for (;;) {
Runnable task;
try {
task = takeTask();
Runnable task = takeTask();
if (task != null) {
task.run();
} catch (InterruptedException e) {
// Waken up by interruptThread()
updateLastExecutionTime();
}
if (isShutdown() && confirmShutdown()) {
if (confirmShutdown()) {
break;
}
}
@ -370,7 +427,7 @@ public class SingleThreadEventLoopTest {
private static class SingleThreadEventLoopB extends SingleThreadEventLoop {
SingleThreadEventLoopB() {
super(null, Executors.defaultThreadFactory());
super(null, Executors.defaultThreadFactory(), false);
}
@Override
@ -384,7 +441,7 @@ public class SingleThreadEventLoopTest {
runAllTasks();
if (isShutdown() && confirmShutdown()) {
if (confirmShutdown()) {
break;
}
}

View File

@ -55,7 +55,7 @@ public class DefaultChannnelGroupTest {
allChannels.close().awaitUninterruptibly();
}
bossGroup.shutdown();
workerGroup.shutdown();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

View File

@ -80,8 +80,8 @@ public class LocalChannelRegistryTest {
// Close the channel
cc.close().sync();
serverGroup.shutdown();
clientGroup.shutdown();
serverGroup.shutdownGracefully();
clientGroup.shutdownGracefully();
sc.closeFuture().sync();

View File

@ -76,7 +76,7 @@ public class LocalTransportThreadModelTest {
@AfterClass
public static void destroy() {
group.shutdown();
group.shutdownGracefully();
}
@Test(timeout = 30000)
@ -195,9 +195,9 @@ public class LocalTransportThreadModelTest {
System.out.println("H3O: " + h3.outboundThreadNames);
throw e;
} finally {
l.shutdown();
e1.shutdown();
e2.shutdown();
l.shutdownGracefully();
e1.shutdownGracefully();
e2.shutdownGracefully();
l.awaitTermination(5, TimeUnit.SECONDS);
e1.awaitTermination(5, TimeUnit.SECONDS);
e2.awaitTermination(5, TimeUnit.SECONDS);
@ -320,12 +320,12 @@ public class LocalTransportThreadModelTest {
ch.close().sync();
} finally {
l.shutdown();
e1.shutdown();
e2.shutdown();
e3.shutdown();
e4.shutdown();
e5.shutdown();
l.shutdownGracefully();
e1.shutdownGracefully();
e2.shutdownGracefully();
e3.shutdownGracefully();
e4.shutdownGracefully();
e5.shutdownGracefully();
}
}

View File

@ -82,7 +82,7 @@ public class LocalTransportThreadModelTest3 {
@AfterClass
public static void destroy() {
group.shutdown();
group.shutdownGracefully();
}
@Test(timeout = 60000)
@ -214,12 +214,12 @@ public class LocalTransportThreadModelTest3 {
Assert.assertEquals(event, expectedEvents.poll());
}
} finally {
l.shutdown();
e1.shutdown();
e2.shutdown();
e3.shutdown();
e4.shutdown();
e5.shutdown();
l.shutdownGracefully();
e1.shutdownGracefully();
e2.shutdownGracefully();
e3.shutdownGracefully();
e4.shutdownGracefully();
e5.shutdownGracefully();
}
}