Initial incomplete checkin of the event loop API

This commit is contained in:
Trustin Lee 2012-04-03 22:03:04 +09:00
parent d66cf2cbfa
commit 116054a364
7 changed files with 462 additions and 275 deletions

View File

@ -15,16 +15,17 @@
*/
package io.netty.channel;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.NotYetConnectedException;
import java.nio.channels.SelectionKey;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannelConfig;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.NotYetConnectedException;
import java.nio.channels.SelectionKey;
/**
* A nexus to a network socket or a component which is capable of I/O
@ -136,6 +137,8 @@ public interface Channel extends Comparable<Channel> {
*/
Integer getId();
EventLoop eventLoop();
/**
* Returns the {@link ChannelFactory} which created this channel.
*/
@ -372,4 +375,21 @@ public interface Channel extends Comparable<Channel> {
* Attaches an object to this {@link Channel} to store a stateful information
*/
void setAttachment(Object attachment);
Unsafe unsafe();
public interface Unsafe {
void setEventLoop(EventLoop eventLoop);
void clearEventLoop();
java.nio.channels.Channel ch();
void bind(SocketAddress local) throws IOException;
void connect(SocketAddress remote) throws IOException;
boolean finishConnect() throws IOException;
boolean read() throws IOException;
boolean write() throws IOException;
void unbind() throws IOException;
void disconnect() throws IOException;
void close() throws IOException;
}
}

View File

@ -0,0 +1,9 @@
package io.netty.channel;
import java.util.concurrent.ExecutorService;
public interface EventLoop extends ExecutorService {
ChannelFuture attach(Channel channel);
void attach(Channel channel, ChannelFuture future);
boolean inEventLoop();
}

View File

@ -0,0 +1,206 @@
package io.netty.channel;
import io.netty.util.internal.QueueFactory;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
public abstract class SingleThreadEventLoop extends AbstractExecutorService implements EventLoop {
private final BlockingQueue<Runnable> taskQueue = QueueFactory.createQueue(Runnable.class);
private final Thread thread;
private final Object stateLock = new Object();
private final Semaphore threadLock = new Semaphore(0);
/** 0 - not started, 1 - started, 2 - shut down, 3 - terminated */
private volatile int state;
protected SingleThreadEventLoop() {
this(Executors.defaultThreadFactory());
}
protected SingleThreadEventLoop(ThreadFactory threadFactory) {
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
try {
SingleThreadEventLoop.this.run();
} finally {
synchronized (stateLock) {
state = 3;
}
try {
cleanup();
} finally {
threadLock.release();
assert taskQueue.isEmpty();
}
}
}
});
}
public ChannelFuture attach(Channel channel) {
ChannelFuture future = new DefaultChannelFuture(channel, false);
attach(channel, future);
return future;
}
protected void interruptThread() {
thread.interrupt();
}
protected Runnable pollTask() {
assert inEventLoop();
return taskQueue.poll();
}
protected Runnable takeTask() throws InterruptedException {
assert inEventLoop();
return taskQueue.take();
}
protected Runnable peekTask() {
assert inEventLoop();
return taskQueue.peek();
}
protected boolean hasTasks() {
assert inEventLoop();
return !taskQueue.isEmpty();
}
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (isShutdown()) {
reject();
}
taskQueue.add(task);
}
protected boolean removeTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
return taskQueue.remove(task);
}
protected abstract void run();
protected void cleanup() {
// Do nothing. Subclasses will override.
}
protected abstract void wakeup(boolean inEventLoop);
@Override
public boolean inEventLoop() {
return Thread.currentThread() == thread;
}
@Override
public void shutdown() {
boolean inEventLoop = inEventLoop();
boolean wakeup = false;
if (inEventLoop) {
synchronized (stateLock) {
assert state == 1;
state = 2;
wakeup = true;
}
} else {
synchronized (stateLock) {
switch (state) {
case 0:
state = 3;
try {
cleanup();
} finally {
threadLock.release();
}
break;
case 1:
state = 2;
wakeup = true;
break;
}
}
}
if (wakeup) {
wakeup(inEventLoop);
}
}
@Override
public List<Runnable> shutdownNow() {
shutdown();
return Collections.emptyList();
}
@Override
public boolean isShutdown() {
return state >= 2;
}
@Override
public boolean isTerminated() {
return state == 3;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
if (unit == null) {
throw new NullPointerException("unit");
}
if (inEventLoop()) {
throw new IllegalStateException("cannot await termination of the current thread");
}
if (threadLock.tryAcquire(timeout, unit)) {
threadLock.release();
}
return isTerminated();
}
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (inEventLoop()) {
if (isShutdown()) {
reject();
}
addTask(task);
wakeup(true);
} else {
synchronized (stateLock) {
if (state == 0) {
state = 1;
thread.start();
}
}
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
wakeup(false);
}
}
private static void reject() {
throw new RejectedExecutionException("event loop shut down");
}
}

View File

@ -1,31 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket;
/**
* A {@link Worker} is responsible to dispatch IO operations
*
*/
public interface Worker extends Runnable {
/**
* Execute the given {@link Runnable} in the IO-Thread. This may be now or later once the IO-Thread do some other work.
*
* @param task the {@link Runnable} to execute
*/
void executeInIoThread(Runnable task);
}

View File

@ -15,19 +15,15 @@
*/
package io.netty.channel.socket.nio;
import static io.netty.channel.Channels.*;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.MessageEvent;
import io.netty.channel.socket.Worker;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.socket.nio.SendBufferPool.SendBuffer;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.util.internal.DeadLockProofWorker;
import io.netty.util.internal.QueueFactory;
import java.io.IOException;
import java.net.ConnectException;
@ -44,13 +40,14 @@ import java.nio.channels.WritableByteChannel;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
abstract class AbstractNioWorker implements Worker {
import static io.netty.channel.Channels.*;
abstract class AbstractNioWorker extends SingleThreadEventLoop {
/**
* Internal Netty logger.
*/
@ -62,26 +59,10 @@ abstract class AbstractNioWorker implements Worker {
static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
/**
* Executor used to execute {@link Runnable}s such as registration task.
*/
private final Executor executor;
/**
* Boolean to indicate if this worker has been started.
*/
private boolean started;
/**
* If this worker has been started thread will be a reference to the thread
* used when starting. i.e. the current thread when the run method is executed.
*/
protected volatile Thread thread;
/**
* The NIO {@link Selector}.
*/
protected volatile Selector selector;
protected final Selector selector;
/**
* Boolean that controls determines if a blocked Selector.select should
@ -101,43 +82,33 @@ abstract class AbstractNioWorker implements Worker {
*/
private final Object startStopLock = new Object();
/**
* Queue of channel registration tasks.
*/
protected final Queue<Runnable> registerTaskQueue = QueueFactory.createQueue(Runnable.class);
/**
* Queue of WriteTasks
*/
protected final Queue<Runnable> writeTaskQueue = QueueFactory.createQueue(Runnable.class);
private final Queue<Runnable> eventQueue = QueueFactory.createQueue(Runnable.class);
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
protected final SendBufferPool sendBufferPool = new SendBufferPool();
private final boolean allowShutdownOnIdle;
AbstractNioWorker(Executor executor) {
this(executor, true);
protected AbstractNioWorker() {
selector = openSelector();
}
public AbstractNioWorker(Executor executor, boolean allowShutdownOnIdle) {
this.executor = executor;
this.allowShutdownOnIdle = allowShutdownOnIdle;
protected AbstractNioWorker(ThreadFactory threadFactory) {
super(threadFactory);
selector = openSelector();
}
public void registerWithWorker(final Channel channel, final ChannelFuture future) {
final Selector selector = start();
private static Selector openSelector() {
try {
return Selector.open();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
}
@Override
public void attach(final Channel channel, final ChannelFuture future) {
try {
if (channel instanceof NioServerSocketChannel) {
final NioServerSocketChannel ch = (NioServerSocketChannel) channel;
registerTaskQueue.add(new Runnable() {
execute(new Runnable() {
@Override
public void run() {
try {
@ -151,14 +122,13 @@ abstract class AbstractNioWorker implements Worker {
} else if (channel instanceof NioClientSocketChannel) {
final NioClientSocketChannel clientChannel = (NioClientSocketChannel) channel;
registerTaskQueue.add(new Runnable() {
execute(new Runnable() {
@Override
public void run() {
try {
try {
clientChannel.getJdkChannel().register(selector, clientChannel.getRawInterestOps() | SelectionKey.OP_CONNECT, clientChannel);
} catch (ClosedChannelException e) {
} catch (ClosedChannelException ignored) {
clientChannel.getWorker().close(clientChannel, succeededFuture(channel));
}
int connectTimeout = channel.getConfig().getConnectTimeoutMillis();
@ -172,8 +142,7 @@ abstract class AbstractNioWorker implements Worker {
}
});
} else if (channel instanceof AbstractNioChannel) {
registerTaskQueue.add(new Runnable() {
execute(new Runnable() {
@Override
public void run() {
try {
@ -195,56 +164,12 @@ abstract class AbstractNioWorker implements Worker {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
/**
* Start the {@link AbstractNioWorker} and return the {@link Selector} that will be used for the {@link AbstractNioChannel}'s when they get registered
*
* @return selector
*/
protected final Selector start() {
synchronized (startStopLock) {
if (!started && selector == null) {
// Open a selector if this worker didn't start yet.
try {
this.selector = Selector.open();
} catch (Throwable t) {
throw new ChannelException("Failed to create a selector.", t);
}
// Start the worker thread with the new Selector.
boolean success = false;
try {
DeadLockProofWorker.start(executor, this);
success = true;
} finally {
if (!success) {
// Release the Selector if the execution fails.
try {
selector.close();
} catch (Throwable t) {
logger.warn("Failed to close a selector.", t);
}
this.selector = null;
// The method will return to the caller at this point.
}
}
}
assert selector != null && selector.isOpen();
started = true;
}
return selector;
}
@Override
public void run() {
thread = Thread.currentThread();
protected void run() {
long lastConnectTimeoutCheckTimeNanos = System.nanoTime();
boolean shutdown = false;
Selector selector = this.selector;
for (;;) {
@ -293,9 +218,7 @@ abstract class AbstractNioWorker implements Worker {
}
cancelledKeys = 0;
processRegisterTaskQueue();
processEventQueue();
processWriteTaskQueue();
processTaskQueue();
processSelectedKeys(selector.selectedKeys());
// Handle connection timeout every 10 milliseconds approximately.
@ -311,33 +234,13 @@ abstract class AbstractNioWorker implements Worker {
// connections are registered in a one-by-one manner instead of
// concurrent manner.
if (selector.keys().isEmpty()) {
if (shutdown ||
executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) {
if (isShutdown()) {
synchronized (startStopLock) {
if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) {
started = false;
try {
selector.close();
} catch (IOException e) {
logger.warn(
"Failed to close a selector.", e);
} finally {
this.selector = null;
}
if (!hasTasks() && selector.keys().isEmpty()) {
break;
} else {
shutdown = false;
}
}
} else {
if (allowShutdownOnIdle) {
// Give one more second.
shutdown = true;
}
}
} else {
shutdown = false;
}
} catch (Throwable t) {
logger.warn(
@ -353,41 +256,20 @@ abstract class AbstractNioWorker implements Worker {
}
}
}
@Override
public void executeInIoThread(Runnable task) {
executeInIoThread(task, false);
}
/**
* Execute the {@link Runnable} in a IO-Thread
*
* @param task the {@link Runnable} to execute
* @param alwaysAsync <code>true</code> if the {@link Runnable} should be executed in an async
* fashion even if the current Thread == IO Thread
*/
public void executeInIoThread(Runnable task, boolean alwaysAsync) {
if (!alwaysAsync && isIoThread()) {
task.run();
} else {
start();
boolean added = eventQueue.offer(task);
assert added;
if (added) {
// wake up the selector to speed things
Selector selector = this.selector;
if (selector != null) {
selector.wakeup();
}
}
protected void cleanup() {
try {
selector.close();
} catch (IOException e) {
logger.warn(
"Failed to close a selector.", e);
}
}
private void processRegisterTaskQueue() throws IOException {
private void processTaskQueue() throws IOException {
for (;;) {
final Runnable task = registerTaskQueue.poll();
final Runnable task = pollTask();
if (task == null) {
break;
}
@ -397,30 +279,6 @@ abstract class AbstractNioWorker implements Worker {
}
}
private void processWriteTaskQueue() throws IOException {
for (;;) {
final Runnable task = writeTaskQueue.poll();
if (task == null) {
break;
}
task.run();
cleanUpCancelledKeys();
}
}
private void processEventQueue() throws IOException {
for (;;) {
final Runnable task = eventQueue.poll();
if (task == null) {
break;
}
task.run();
cleanUpCancelledKeys();
}
}
private void processSelectedKeys(Set<SelectionKey> selectedKeys) throws IOException {
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey k = i.next();
@ -445,7 +303,7 @@ abstract class AbstractNioWorker implements Worker {
connect(k);
}
} catch (CancelledKeyException e) {
} catch (CancelledKeyException ignored) {
close(k);
} finally {
if (removeKey) {
@ -609,10 +467,9 @@ abstract class AbstractNioWorker implements Worker {
protected boolean scheduleWriteIfNecessary(final AbstractNioChannel channel) {
if (!isIoThread()) {
if (!inEventLoop()) {
if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
boolean offered = writeTaskQueue.offer(channel.writeTask);
assert offered;
execute(channel.writeTask);
}
final Selector workerSelector = selector;
@ -632,7 +489,7 @@ abstract class AbstractNioWorker implements Worker {
boolean open = true;
boolean addOpWrite = false;
boolean removeOpWrite = false;
boolean iothread = isIoThread();
boolean inEventLoop = inEventLoop();
long writtenBytes = 0;
@ -704,7 +561,7 @@ abstract class AbstractNioWorker implements Worker {
buf = null;
evt = null;
future.setFailure(t);
if (iothread) {
if (inEventLoop) {
fireExceptionCaught(channel, t);
} else {
fireExceptionCaughtLater(channel, t);
@ -731,21 +588,13 @@ abstract class AbstractNioWorker implements Worker {
}
}
}
if (iothread) {
if (inEventLoop) {
fireWriteComplete(channel, writtenBytes);
} else {
fireWriteCompleteLater(channel, writtenBytes);
}
}
/**
* Return <code>true</code> if the current executing thread is the same as the one that runs the {@link #run()} method
*
*/
boolean isIoThread() {
return Thread.currentThread() == thread;
}
protected void setOpWrite(AbstractNioChannel channel) {
Selector selector = this.selector;
SelectionKey key = channel.getJdkChannel().keyFor(selector);
@ -794,7 +643,7 @@ abstract class AbstractNioWorker implements Worker {
public void close(NioServerSocketChannel channel, ChannelFuture future) {
boolean isIoThread = isIoThread();
boolean inEventLoop = inEventLoop();
boolean bound = channel.isBound();
try {
@ -813,13 +662,13 @@ abstract class AbstractNioWorker implements Worker {
if (channel.setClosed()) {
future.setSuccess();
if (bound) {
if (isIoThread) {
if (inEventLoop) {
fireChannelUnbound(channel);
} else {
fireChannelUnboundLater(channel);
}
}
if (isIoThread) {
if (inEventLoop) {
fireChannelClosed(channel);
} else {
fireChannelClosedLater(channel);
@ -832,7 +681,7 @@ abstract class AbstractNioWorker implements Worker {
}
} catch (Throwable t) {
future.setFailure(t);
if (isIoThread) {
if (inEventLoop) {
fireExceptionCaught(channel, t);
} else {
fireExceptionCaughtLater(channel, t);
@ -844,7 +693,7 @@ abstract class AbstractNioWorker implements Worker {
public void close(AbstractNioChannel channel, ChannelFuture future) {
boolean connected = channel.isConnected();
boolean bound = channel.isBound();
boolean iothread = isIoThread();
boolean inEventLoop = inEventLoop();
try {
channel.getJdkChannel().close();
@ -853,14 +702,14 @@ abstract class AbstractNioWorker implements Worker {
if (channel.setClosed()) {
future.setSuccess();
if (connected) {
if (iothread) {
if (inEventLoop) {
fireChannelDisconnected(channel);
} else {
fireChannelDisconnectedLater(channel);
}
}
if (bound) {
if (iothread) {
if (inEventLoop) {
fireChannelUnbound(channel);
} else {
fireChannelUnboundLater(channel);
@ -868,7 +717,7 @@ abstract class AbstractNioWorker implements Worker {
}
cleanUpWriteBuffer(channel);
if (iothread) {
if (inEventLoop) {
fireChannelClosed(channel);
} else {
fireChannelClosedLater(channel);
@ -878,10 +727,9 @@ abstract class AbstractNioWorker implements Worker {
}
} catch (Throwable t) {
future.setFailure(t);
if (iothread) {
if (inEventLoop) {
fireExceptionCaught(channel, t);
} else {
System.out.println(thread + "==" + channel.getWorker().thread);
fireExceptionCaughtLater(channel, t);
}
}
@ -936,7 +784,7 @@ abstract class AbstractNioWorker implements Worker {
}
if (fireExceptionCaught) {
if (isIoThread()) {
if (inEventLoop()) {
fireExceptionCaught(channel, cause);
} else {
fireExceptionCaughtLater(channel, cause);
@ -946,7 +794,7 @@ abstract class AbstractNioWorker implements Worker {
public void setInterestOps(AbstractNioChannel channel, ChannelFuture future, int interestOps) {
boolean changed = false;
boolean iothread = isIoThread();
boolean inEventLoop = inEventLoop();
try {
// interestOps can change at any time and at any thread.
// Acquire a lock to avoid possible race condition.
@ -969,7 +817,7 @@ abstract class AbstractNioWorker implements Worker {
future.setSuccess();
if (changed) {
if (iothread) {
if (inEventLoop) {
fireChannelInterestChanged(channel);
} else {
fireChannelInterestChangedLater(channel);
@ -983,7 +831,7 @@ abstract class AbstractNioWorker implements Worker {
case 0:
if (channel.getRawInterestOps() != interestOps) {
key.interestOps(interestOps);
if (!iothread &&
if (!inEventLoop &&
wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
@ -993,7 +841,7 @@ abstract class AbstractNioWorker implements Worker {
case 1:
case 2:
if (channel.getRawInterestOps() != interestOps) {
if (iothread) {
if (inEventLoop) {
key.interestOps(interestOps);
changed = true;
} else {
@ -1021,7 +869,7 @@ abstract class AbstractNioWorker implements Worker {
future.setSuccess();
if (changed) {
if (iothread) {
if (inEventLoop) {
fireChannelInterestChanged(channel);
} else {
fireChannelInterestChangedLater(channel);
@ -1031,14 +879,14 @@ abstract class AbstractNioWorker implements Worker {
// setInterestOps() was called on a closed channel.
ClosedChannelException cce = new ClosedChannelException();
future.setFailure(cce);
if (iothread) {
if (inEventLoop) {
fireExceptionCaught(channel, cce);
} else {
fireExceptionCaughtLater(channel, cce);
}
} catch (Throwable t) {
future.setFailure(t);
if (iothread) {
if (inEventLoop) {
fireExceptionCaught(channel, t);
} else {
fireExceptionCaughtLater(channel, t);

View File

@ -1,27 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import io.netty.channel.Channel;
public interface NioChannel extends Channel {
/**
* Returns the {@link AbstractNioWorker} which handles the IO of the {@link Channel}
*
*/
AbstractNioWorker getWorker();
}

View File

@ -0,0 +1,162 @@
package io.netty.channel;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.*;
public class SingleThreadEventLoopTest {
private SingleThreadEventLoopImpl loop;
@Before
public void newEventLoop() {
loop = new SingleThreadEventLoopImpl();
}
@After
public void stopEventLoop() {
if (!loop.isShutdown()) {
loop.shutdown();
}
while (!loop.isTerminated()) {
try {
loop.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
// Ignore
}
}
assertEquals(1, loop.cleanedUp.get());
}
@Test
public void shutdownBeforeStart() throws Exception {
loop.shutdown();
}
@Test
public void shutdownAfterStart() throws Exception {
final AtomicBoolean interrupted = new AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(2);
loop.execute(new Runnable() {
@Override
public void run() {
latch.countDown();
while (latch.getCount() > 0) {
try {
latch.await();
} catch (InterruptedException ignored) {
interrupted.set(true);
}
}
}
});
// Wait for the event loop thread to start.
while (latch.getCount() >= 2) {
Thread.yield();
}
// Request the event loop thread to stop - it will call wakeup(false) to interrupt the thread.
loop.shutdown();
// Make the task terminate by itself.
latch.countDown();
// Wait until the event loop is terminated.
while (!loop.isTerminated()) {
loop.awaitTermination(1, TimeUnit.DAYS);
}
// Make sure loop.shutdown() above triggered wakeup().
assertTrue(interrupted.get());
}
@Test
public void shutdownWithPendingTasks() throws Exception {
final int NUM_TASKS = 3;
final AtomicInteger ranTasks = new AtomicInteger();
final CountDownLatch latch = new CountDownLatch(1);
final Runnable task = new Runnable() {
@Override
public void run() {
ranTasks.incrementAndGet();
while (latch.getCount() > 0) {
try {
latch.await();
} catch (InterruptedException e) {
// Ignored
}
}
}
};
for (int i = 0; i < NUM_TASKS; i ++) {
loop.execute(task);
}
// At this point, the first task should be running and stuck at latch.await().
while (ranTasks.get() == 0) {
Thread.yield();
}
assertEquals(1, ranTasks.get());
// Shut down the event loop to test if the other tasks are run before termination.
loop.shutdown();
// Let the other tasks run.
latch.countDown();
// Wait until the event loop is terminated.
while (!loop.isTerminated()) {
loop.awaitTermination(1, TimeUnit.DAYS);
}
// Make sure loop.shutdown() above triggered wakeup().
assertEquals(NUM_TASKS, ranTasks.get());
}
private static class SingleThreadEventLoopImpl extends SingleThreadEventLoop {
final AtomicInteger cleanedUp = new AtomicInteger();
@Override
protected void run() {
for (;;) {
Runnable task;
try {
task = takeTask();
task.run();
} catch (InterruptedException e) {
// Waken up by interruptThread()
}
if (isShutdown() && peekTask() == null) {
break;
}
}
}
protected void cleanup() {
cleanedUp.incrementAndGet();
}
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop) {
interruptThread();
}
}
@Override
public void attach(Channel channel, ChannelFuture future) {
// Untested
}
}
}