Add shutdown hooks to SingleThreadEventLoop

- LocalChannel and LocalServerChannel uses it to close themselves on
  shutdown
- LocalEcho example does not call close() anymore because the channels
  are closed automatically on shutdown
This commit is contained in:
Trustin Lee 2012-05-30 04:23:15 -07:00
parent 243f6581c6
commit 078a502c5f
4 changed files with 63 additions and 7 deletions

View File

@ -78,7 +78,10 @@ public class LocalEcho {
}); });
Channel sch = sb.bind().sync().channel(); // Start the server.
sb.bind().sync();
// Start the client.
Channel ch = cb.connect().sync().channel(); Channel ch = cb.connect().sync().channel();
// Read commands from the stdin. // Read commands from the stdin.
@ -99,9 +102,6 @@ public class LocalEcho {
if (lastWriteFuture != null) { if (lastWriteFuture != null) {
lastWriteFuture.awaitUninterruptibly(); lastWriteFuture.awaitUninterruptibly();
} }
ch.close().sync();
sch.close().sync();
} finally { } finally {
sb.shutdown(); sb.shutdown();
cb.shutdown(); cb.shutdown();

View File

@ -1,11 +1,16 @@
package io.netty.channel; package io.netty.channel;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.util.internal.QueueFactory; import io.netty.util.internal.QueueFactory;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.Set;
import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -22,6 +27,9 @@ import java.util.concurrent.atomic.AtomicLong;
public abstract class SingleThreadEventLoop extends AbstractExecutorService implements EventLoop { public abstract class SingleThreadEventLoop extends AbstractExecutorService implements EventLoop {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(SingleThreadEventLoop.class);
private static final long SCHEDULE_CHECK_INTERVAL = TimeUnit.MILLISECONDS.toNanos(10); private static final long SCHEDULE_CHECK_INTERVAL = TimeUnit.MILLISECONDS.toNanos(10);
private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1); private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
private static final long START_TIME = System.nanoTime(); private static final long START_TIME = System.nanoTime();
@ -48,6 +56,7 @@ public abstract class SingleThreadEventLoop extends AbstractExecutorService impl
private final Semaphore threadLock = new Semaphore(0); private final Semaphore threadLock = new Semaphore(0);
// TODO: Use PriorityQueue to reduce the locking overhead of DelayQueue. // TODO: Use PriorityQueue to reduce the locking overhead of DelayQueue.
private final Queue<ScheduledFutureTask<?>> scheduledTasks = new DelayQueue<ScheduledFutureTask<?>>(); private final Queue<ScheduledFutureTask<?>> scheduledTasks = new DelayQueue<ScheduledFutureTask<?>>();
private final Set<Runnable> shutdownHooks = new HashSet<Runnable>();
/** 0 - not started, 1 - started, 2 - shut down, 3 - terminated */ /** 0 - not started, 1 - started, 2 - shut down, 3 - terminated */
private volatile int state; private volatile int state;
private long lastCheckTimeNanos; private long lastCheckTimeNanos;
@ -70,6 +79,7 @@ public abstract class SingleThreadEventLoop extends AbstractExecutorService impl
} }
try { try {
cancelScheduledTasks(); cancelScheduledTasks();
runShutdownHooks();
cleanup(); cleanup();
} finally { } finally {
threadLock.release(); threadLock.release();
@ -201,7 +211,7 @@ public abstract class SingleThreadEventLoop extends AbstractExecutorService impl
protected abstract void run(); protected abstract void run();
protected void cleanup() { protected void cleanup() {
// Do nothing. Subclasses will override. // Do nothing. Subclases will override.
} }
protected abstract void wakeup(boolean inEventLoop); protected abstract void wakeup(boolean inEventLoop);
@ -211,6 +221,37 @@ public abstract class SingleThreadEventLoop extends AbstractExecutorService impl
return Thread.currentThread() == thread; return Thread.currentThread() == thread;
} }
public void addShutdownHook(Runnable task) {
ensureShutdownHookAccess();
shutdownHooks.add(task);
}
public void removeShutdownHook(Runnable task) {
ensureShutdownHookAccess();
shutdownHooks.remove(task);
}
private void ensureShutdownHookAccess() {
if (!inEventLoop()) {
throw new IllegalStateException("must be called from the event loop");
}
}
private void runShutdownHooks() {
// Note shutdown hooks can add / remove shutdown hooks.
while (!shutdownHooks.isEmpty()) {
List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
shutdownHooks.clear();
for (Runnable task: copy) {
try {
task.run();
} catch (Throwable t) {
logger.warn("Shutdown hook raised an exception.", t);
}
}
}
}
@Override @Override
public void shutdown() { public void shutdown() {
boolean inEventLoop = inEventLoop(); boolean inEventLoop = inEventLoop();

View File

@ -39,6 +39,12 @@ import java.util.Queue;
public class LocalChannel extends AbstractChannel { public class LocalChannel extends AbstractChannel {
private final ChannelConfig config = new DefaultChannelConfig(); private final ChannelConfig config = new DefaultChannelConfig();
private final Runnable shutdownHook = new Runnable() {
@Override
public void run() {
unsafe().close(unsafe().voidFuture());
}
};
private volatile int state; // 0 - open, 1 - bound, 2 - connected, 3 - closed private volatile int state; // 0 - open, 1 - bound, 2 - connected, 3 - closed
private volatile LocalChannel peer; private volatile LocalChannel peer;
@ -126,6 +132,8 @@ public class LocalChannel extends AbstractChannel {
} }
}); });
} }
((SingleThreadEventLoop) eventLoop()).addShutdownHook(shutdownHook);
} }
@Override @Override
@ -162,6 +170,7 @@ public class LocalChannel extends AbstractChannel {
if (isOpen()) { if (isOpen()) {
unsafe().close(unsafe().voidFuture()); unsafe().close(unsafe().voidFuture());
} }
((SingleThreadEventLoop) eventLoop()).removeShutdownHook(shutdownHook);
} }
@Override @Override

View File

@ -30,6 +30,12 @@ import java.net.SocketAddress;
public class LocalServerChannel extends AbstractServerChannel { public class LocalServerChannel extends AbstractServerChannel {
private final ChannelConfig config = new DefaultChannelConfig(); private final ChannelConfig config = new DefaultChannelConfig();
private final Runnable shutdownHook = new Runnable() {
@Override
public void run() {
unsafe().close(unsafe().voidFuture());
}
};
private volatile int state; // 0 - open, 1 - active, 2 - closed private volatile int state; // 0 - open, 1 - active, 2 - closed
private volatile LocalAddress localAddress; private volatile LocalAddress localAddress;
@ -79,7 +85,7 @@ public class LocalServerChannel extends AbstractServerChannel {
@Override @Override
protected void doRegister() throws Exception { protected void doRegister() throws Exception {
// NOOP ((SingleThreadEventLoop) eventLoop()).addShutdownHook(shutdownHook);
} }
@Override @Override
@ -102,7 +108,7 @@ public class LocalServerChannel extends AbstractServerChannel {
@Override @Override
protected void doDeregister() throws Exception { protected void doDeregister() throws Exception {
// NOOP ((SingleThreadEventLoop) eventLoop()).removeShutdownHook(shutdownHook);
} }
LocalChannel serve(final LocalChannel peer) { LocalChannel serve(final LocalChannel peer) {