ability to use Executor instead of ThreadFactory

This commit is contained in:
Vladimir Krivosheev 2013-08-20 18:42:56 +02:00 committed by Trustin Lee
parent 780516471f
commit eb308cfff6
15 changed files with 225 additions and 88 deletions

View File

@ -15,7 +15,7 @@
*/
package io.netty.util.concurrent;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.Executor;
/**
* Default {@link SingleThreadEventExecutor} implementation which just execute all submitted task in a
@ -24,8 +24,8 @@ import java.util.concurrent.ThreadFactory;
*/
final class DefaultEventExecutor extends SingleThreadEventExecutor {
DefaultEventExecutor(DefaultEventExecutorGroup parent, ThreadFactory threadFactory) {
super(parent, threadFactory, true);
DefaultEventExecutor(DefaultEventExecutorGroup parent, Executor executor) {
super(parent, executor, true);
}
@Override

View File

@ -15,6 +15,7 @@
*/
package io.netty.util.concurrent;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
/**
@ -41,8 +42,7 @@ public class DefaultEventExecutorGroup extends MultithreadEventExecutorGroup {
}
@Override
protected EventExecutor newChild(
ThreadFactory threadFactory, Object... args) throws Exception {
return new DefaultEventExecutor(this, threadFactory);
protected EventExecutor newChild(Executor executor, Object... args) throws Exception {
return new DefaultEventExecutor(this, executor);
}
}

View File

@ -19,6 +19,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -39,22 +40,33 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto
*
* @param nThreads the number of threads that will be used by this instance.
* @param threadFactory the ThreadFactory to use, or {@code null} if the default should be used.
* @param args arguments which will passed to each {@link #newChild(ThreadFactory, Object...)} call
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
*/
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
}
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param executor the Executor to use, or {@code null} if the default should be used.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (threadFactory == null) {
threadFactory = newDefaultThreadFactory();
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new SingleThreadEventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(threadFactory, args);
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
@ -130,8 +142,7 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto
* called for each thread that will serve this {@link MultithreadEventExecutorGroup}.
*
*/
protected abstract EventExecutor newChild(
ThreadFactory threadFactory, Object... args) throws Exception;
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {

View File

@ -27,6 +27,7 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
@ -60,7 +61,9 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
private final Queue<Runnable> taskQueue;
final Queue<ScheduledFutureTask<?>> delayedTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
private final Thread thread;
private volatile Thread thread;
private final Executor executor;
private volatile boolean interrupted;
private final Object stateLock = new Object();
private final Semaphore threadLock = new Semaphore(0);
private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
@ -84,64 +87,27 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
*/
protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp);
}
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
/**
* Create a new instance
*
* @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
* @param executor the {@link Executor} which will be used for executing
* @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
* executor thread
*/
protected SingleThreadEventExecutor(
EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
if (executor == null) {
throw new NullPointerException("executor");
}
this.parent = parent;
this.addTaskWakesUp = addTaskWakesUp;
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
if (state < ST_SHUTTING_DOWN) {
state = ST_SHUTTING_DOWN;
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
logger.error(
"Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
"before run() implementation terminates.");
}
try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
synchronized (stateLock) {
state = ST_TERMINATED;
}
threadLock.release();
if (!taskQueue.isEmpty()) {
logger.warn(
"An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
this.executor = executor;
taskQueue = newTaskQueue();
}
@ -165,7 +131,12 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
* Interrupt the current running {@link Thread}.
*/
protected void interruptThread() {
thread.interrupt();
Thread currentThread = thread;
if (currentThread == null) {
interrupted = true;
} else {
currentThread.interrupt();
}
}
/**
@ -515,7 +486,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
switch (state) {
case ST_NOT_STARTED:
state = ST_SHUTTING_DOWN;
thread.start();
doStartThread();
break;
case ST_STARTED:
state = ST_SHUTTING_DOWN;
@ -560,7 +531,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
switch (state) {
case ST_NOT_STARTED:
state = ST_SHUTDOWN;
thread.start();
doStartThread();
break;
case ST_STARTED:
case ST_SHUTTING_DOWN:
@ -814,11 +785,69 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
delayedTaskQueue.add(new ScheduledFutureTask<Void>(
this, delayedTaskQueue, Executors.<Void>callable(new PurgeTask(), null),
ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
thread.start();
doStartThread();
}
}
}
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
if (state < ST_SHUTTING_DOWN) {
state = ST_SHUTTING_DOWN;
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
"before run() implementation terminates.");
}
try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
synchronized (stateLock) {
state = ST_TERMINATED;
}
threadLock.release();
if (!taskQueue.isEmpty()) {
logger.warn(
"An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
}
private final class PurgeTask implements Runnable {
@Override
public void run() {

View File

@ -0,0 +1,35 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}

View File

@ -21,6 +21,7 @@ import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
/**
@ -42,6 +43,13 @@ public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutor
}
}
/**
* @see {@link MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)}
*/
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
/**
* @see {@link MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, ThreadFactory, Object...)}
*/

View File

@ -18,6 +18,7 @@ package io.netty.channel;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.SingleThreadEventExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
/**
@ -33,6 +34,13 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im
super(parent, threadFactory, addTaskWakesUp);
}
/**
* @see {@link SingleThreadEventExecutor#SingleThreadEventExecutor(EventExecutorGroup, Executor, boolean)}
*/
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp) {
super(parent, executor, addTaskWakesUp);
}
@Override
public EventLoopGroup parent() {
return (EventLoopGroup) super.parent();

View File

@ -26,7 +26,7 @@ public class ThreadPerChannelEventLoop extends SingleThreadEventLoop {
private Channel ch;
public ThreadPerChannelEventLoop(ThreadPerChannelEventLoopGroup parent) {
super(parent, parent.threadFactory, true);
super(parent, parent.executor, true);
this.parent = parent;
}

View File

@ -23,6 +23,7 @@ import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.ThreadPerTaskExecutor;
import io.netty.util.internal.EmptyArrays;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ReadOnlyIterator;
@ -32,6 +33,7 @@ import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
@ -44,7 +46,7 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i
private final Object[] childArgs;
private final int maxChannels;
final ThreadFactory threadFactory;
final Executor executor;
final Set<ThreadPerChannelEventLoop> activeChildren =
Collections.newSetFromMap(PlatformDependent.<ThreadPerChannelEventLoop, Boolean>newConcurrentHashMap());
final Queue<ThreadPerChannelEventLoop> idleChildren = new ConcurrentLinkedQueue<ThreadPerChannelEventLoop>();
@ -95,12 +97,28 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i
* @param args arguments which will passed to each {@link #newChild(Object...)} call.
*/
protected ThreadPerChannelEventLoopGroup(int maxChannels, ThreadFactory threadFactory, Object... args) {
this(maxChannels, new ThreadPerTaskExecutor(threadFactory), args);
}
/**
* Create a new {@link ThreadPerChannelEventLoopGroup}.
*
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
* a new {@link Channel} and the maximum is exceed it will throw an
* {@link ChannelException} on the {@link #register(Channel)} and
* {@link #register(Channel, ChannelPromise)} method.
* Use {@code 0} to use no limit
* @param executor the {@link Executor} used to create new {@link Thread} instances that handle the
* registered {@link Channel}s
* @param args arguments which will passed to each {@link #newChild(Object...)} call.
*/
protected ThreadPerChannelEventLoopGroup(int maxChannels, Executor executor, Object... args) {
if (maxChannels < 0) {
throw new IllegalArgumentException(String.format(
"maxChannels: %d (expected: >= 0)", maxChannels));
}
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
if (executor == null) {
throw new NullPointerException("executor");
}
if (args == null) {
@ -110,7 +128,7 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i
}
this.maxChannels = maxChannels;
this.threadFactory = threadFactory;
this.executor = executor;
tooManyChannels = new ChannelException("too many channels (max: " + maxChannels + ')');
tooManyChannels.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);

View File

@ -17,12 +17,12 @@ package io.netty.channel.local;
import io.netty.channel.SingleThreadEventLoop;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.Executor;
final class LocalEventLoop extends SingleThreadEventLoop {
LocalEventLoop(LocalEventLoopGroup parent, ThreadFactory threadFactory) {
super(parent, threadFactory, true);
LocalEventLoop(LocalEventLoopGroup parent, Executor executor) {
super(parent, executor, true);
}
@Override

View File

@ -18,6 +18,7 @@ package io.netty.channel.local;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.util.concurrent.EventExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
/**
@ -53,7 +54,7 @@ public class LocalEventLoopGroup extends MultithreadEventLoopGroup {
@Override
protected EventExecutor newChild(
ThreadFactory threadFactory, Object... args) throws Exception {
return new LocalEventLoop(this, threadFactory);
Executor executor, Object... args) throws Exception {
return new LocalEventLoop(this, executor);
}
}

View File

@ -39,7 +39,7 @@ import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@ -111,8 +111,8 @@ public final class NioEventLoop extends SingleThreadEventLoop {
private int cancelledKeys;
private boolean needsToSelectAgain;
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
super(parent, threadFactory, false);
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider) {
super(parent, executor, false);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}

View File

@ -21,6 +21,7 @@ import io.netty.util.concurrent.EventExecutor;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
/**
@ -41,7 +42,7 @@ public class NioEventLoopGroup extends MultithreadEventLoopGroup {
* {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
*/
public NioEventLoopGroup(int nThreads) {
this(nThreads, null);
this(nThreads, (Executor) null);
}
/**
@ -52,6 +53,10 @@ public class NioEventLoopGroup extends MultithreadEventLoopGroup {
this(nThreads, threadFactory, SelectorProvider.provider());
}
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
/**
* Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the given
* {@link SelectorProvider}.
@ -61,6 +66,11 @@ public class NioEventLoopGroup extends MultithreadEventLoopGroup {
super(nThreads, threadFactory, selectorProvider);
}
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
super(nThreads, executor, selectorProvider);
}
/**
* Sets the percentage of the desired amount of time spent for I/O in the child event loops. The default value is
* {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
@ -83,7 +93,7 @@ public class NioEventLoopGroup extends MultithreadEventLoopGroup {
@Override
protected EventExecutor newChild(
ThreadFactory threadFactory, Object... args) throws Exception {
return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0]);
}
}

View File

@ -23,6 +23,7 @@ import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ThreadPerChannelEventLoopGroup;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
@ -52,6 +53,21 @@ public class OioEventLoopGroup extends ThreadPerChannelEventLoopGroup {
this(maxChannels, Executors.defaultThreadFactory());
}
/**
* Create a new {@link OioEventLoopGroup}.
*
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
* a new {@link Channel} and the maximum is exceed it will throw an
* {@link ChannelException} on the {@link #register(Channel)} and
* {@link #register(Channel, ChannelPromise)} method.
* Use {@code 0} to use no limit
* @param executor the {@link Executor} used to create new {@link Thread} instances that handle the
* registered {@link Channel}s
*/
public OioEventLoopGroup(int maxChannels, Executor executor) {
super(maxChannels, executor);
}
/**
* Create a new {@link OioEventLoopGroup}.
*

View File

@ -39,7 +39,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.CoreMatchers.*;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.*;
public class SingleThreadEventLoopTest {