Add MultithreadEventLoop

- Add EventLoopException to wrap the exceptions while an event loop does
something
- Make EventLoop.register() return EventLoop so that the caller knows
the actual EventLoop that will handle the Channel even if the caller
called register() from MultithreadEventLoop
This commit is contained in:
Trustin Lee 2012-04-29 18:40:55 +09:00
parent e76e2aeac8
commit 470e7da5d7
4 changed files with 209 additions and 1 deletions

View File

@ -4,6 +4,6 @@ import java.util.concurrent.ExecutorService;
public interface EventLoop extends ExecutorService {
ChannelFuture register(Channel channel);
void register(Channel channel, ChannelFuture future);
EventLoop register(Channel channel, ChannelFuture future);
boolean inEventLoop();
}

View File

@ -0,0 +1,22 @@
package io.netty.channel;
public class EventLoopException extends ChannelException {
private static final long serialVersionUID = -8969100344583703616L;
public EventLoopException() {
}
public EventLoopException(String message, Throwable cause) {
super(message, cause);
}
public EventLoopException(String message) {
super(message);
}
public EventLoopException(Throwable cause) {
super(cause);
}
}

View File

@ -0,0 +1,175 @@
package io.netty.channel;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
public class MultithreadEventLoop implements EventLoop {
private final EventLoop[] children;
private final AtomicInteger childIndex = new AtomicInteger();
public MultithreadEventLoop(Class<? extends SingleThreadEventLoop> loopType, int nThreads) {
this(loopType, nThreads, Executors.defaultThreadFactory());
}
public MultithreadEventLoop(Class<? extends SingleThreadEventLoop> loopType, int nThreads, ThreadFactory threadFactory) {
if (loopType == null) {
throw new NullPointerException("loopType");
}
if (nThreads <= 0) {
throw new IllegalArgumentException("nThreads: " + nThreads + " (expected: > 0)");
}
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
children = new EventLoop[nThreads];
for (int i = 0; i < nThreads; i ++) {
try {
children[i] = loopType.getConstructor(ThreadFactory.class).newInstance(threadFactory);
} catch (Exception e) {
throw new EventLoopException("failed to create a child event loop: " + loopType.getName(), e);
} finally {
for (int j = 0; j < i; j ++) {
children[j].shutdown();
}
}
}
}
@Override
public void shutdown() {
for (EventLoop l: children) {
l.shutdown();
}
}
@Override
public List<Runnable> shutdownNow() {
for (EventLoop l: children) {
l.shutdownNow();
}
return Collections.emptyList();
}
@Override
public boolean isShutdown() {
for (EventLoop l: children) {
if (!l.isShutdown()) {
return false;
}
}
return true;
}
@Override
public boolean isTerminated() {
for (EventLoop l: children) {
if (!l.isTerminated()) {
return false;
}
}
return true;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long deadline = System.nanoTime() + unit.toNanos(timeout);
loop: for (EventLoop l: children) {
for (;;) {
long timeLeft = deadline - System.nanoTime();
if (timeLeft <= 0) {
break loop;
}
if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
break;
}
}
}
return isTerminated();
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return currentEventLoop().submit(task);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return currentEventLoop().submit(task, result);
}
@Override
public Future<?> submit(Runnable task) {
return currentEventLoop().submit(task);
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return currentEventLoop().invokeAll(tasks);
}
@Override
public <T> List<Future<T>> invokeAll(
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return currentEventLoop().invokeAll(tasks, timeout, unit);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return currentEventLoop().invokeAny(tasks);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
return currentEventLoop().invokeAny(tasks, timeout, unit);
}
@Override
public void execute(Runnable command) {
currentEventLoop().execute(command);
}
@Override
public ChannelFuture register(Channel channel) {
return nextEventLoop().register(channel);
}
@Override
public EventLoop register(Channel channel, ChannelFuture future) {
return nextEventLoop().register(channel, future);
}
@Override
public boolean inEventLoop() {
return SingleThreadEventLoop.CURRENT_EVENT_LOOP.get() != null;
}
private EventLoop nextEventLoop() {
return children[Math.abs(childIndex.getAndIncrement() % children.length)];
}
private static SingleThreadEventLoop currentEventLoop() {
SingleThreadEventLoop loop = SingleThreadEventLoop.CURRENT_EVENT_LOOP.get();
if (loop == null) {
throw new IllegalStateException("not called from an event loop thread");
}
return loop;
}
}

View File

@ -14,6 +14,8 @@ import java.util.concurrent.TimeUnit;
public abstract class SingleThreadEventLoop extends AbstractExecutorService implements EventLoop {
static final ThreadLocal<SingleThreadEventLoop> CURRENT_EVENT_LOOP = new ThreadLocal<SingleThreadEventLoop>();
private final BlockingQueue<Runnable> taskQueue = QueueFactory.createQueue(Runnable.class);
private final Thread thread;
private final Object stateLock = new Object();
@ -29,6 +31,7 @@ public abstract class SingleThreadEventLoop extends AbstractExecutorService impl
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
CURRENT_EVENT_LOOP.set(SingleThreadEventLoop.this);
try {
SingleThreadEventLoop.this.run();
} finally {
@ -53,6 +56,12 @@ public abstract class SingleThreadEventLoop extends AbstractExecutorService impl
return future;
}
@Override
public EventLoop register(Channel channel, ChannelFuture future) {
execute(newRegistrationTask(channel, future));
return this;
}
protected void interruptThread() {
thread.interrupt();
}
@ -102,6 +111,8 @@ public abstract class SingleThreadEventLoop extends AbstractExecutorService impl
protected abstract void wakeup(boolean inEventLoop);
protected abstract Runnable newRegistrationTask(Channel channel, ChannelFuture future);
@Override
public boolean inEventLoop() {
return Thread.currentThread() == thread;