Make Nio/EpollEventLoop run on a ForkJoinPool

Related issue: #2250

Motivation:

Prior to this commit, Netty's non blocking EventLoops
were each assigned a fixed thread by which all of the
EventLoop's I/O and handler logic would be performed.

While this is a fine approach for most users of Netty,
some advanced users require more flexibility in
scheduling the EventLoops.

Modifications:

Remove all direct usages of threads in MultithreadEventExecutorGroup,
SingleThreadEventExecutor et al., and introduce an Executor
abstraction instead.

The way to think about this change is, that each
iteration of an eventloop is now a task that gets scheduled
in a ForkJoinPool.

While the ForkJoinPool is the default, one also has the
ability to plug in his/her own Executor (aka thread pool)
into a EventLoop(Group).

Result:

Netty hands off thread management to a ForkJoinPool by default.
Users can also provide their own thread pool implementation and
get some control over scheduling Netty's EventLoops
This commit is contained in:
Jakob Buchgraber 2014-07-21 14:56:38 +02:00 committed by Trustin Lee
parent 59cf8ffcb4
commit f8bee2e94c
41 changed files with 929 additions and 629 deletions

View File

@ -16,7 +16,6 @@
package io.netty.util.concurrent;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
/**
* Default {@link SingleThreadEventExecutor} implementation which just execute all submitted task in a
@ -28,20 +27,12 @@ public final class DefaultEventExecutor extends SingleThreadEventExecutor {
this((EventExecutorGroup) null);
}
public DefaultEventExecutor(ThreadFactory threadFactory) {
this(null, threadFactory);
}
public DefaultEventExecutor(Executor executor) {
this(null, executor);
}
public DefaultEventExecutor(EventExecutorGroup parent) {
this(parent, new DefaultThreadFactory(DefaultEventExecutor.class));
}
public DefaultEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory) {
super(parent, threadFactory, true);
this(parent, new DefaultExecutorFactory(DefaultEventExecutor.class).newExecutor(1));
}
public DefaultEventExecutor(EventExecutorGroup parent, Executor executor) {
@ -50,16 +41,16 @@ public final class DefaultEventExecutor extends SingleThreadEventExecutor {
@Override
protected void run() {
for (;;) {
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}
if (confirmShutdown()) {
break;
}
if (confirmShutdown()) {
cleanupAndTerminate(true);
} else {
scheduleExecution();
}
}
}

View File

@ -16,29 +16,42 @@
package io.netty.util.concurrent;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
/**
* Default implementation of {@link MultithreadEventExecutorGroup} which will use {@link DefaultEventExecutor} instances
* to handle the tasks.
* Default implementation of {@link MultithreadEventExecutorGroup} which will use {@link DefaultEventExecutor}
* instances to handle the tasks.
*/
public class DefaultEventExecutorGroup extends MultithreadEventExecutorGroup {
/**
* @see {@link #DefaultEventExecutorGroup(int, ThreadFactory)}
* Create a new instance.
*
* @param nEventExecutors the number of {@link DefaultEventExecutor}s that this group will use.
*/
public DefaultEventExecutorGroup(int nThreads) {
this(nThreads, null);
public DefaultEventExecutorGroup(int nEventExecutors) {
this(nEventExecutors, (Executor) null);
}
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param threadFactory the ThreadFactory to use, or {@code null} if the default should be used.
* @param nEventExecutors the number of {@link DefaultEventExecutor}s that this group will use.
* @param executor the {@link Executor} responsible for executing the work handled by
* this {@link EventExecutorGroup}.
*/
public DefaultEventExecutorGroup(int nThreads, ThreadFactory threadFactory) {
super(nThreads, threadFactory);
public DefaultEventExecutorGroup(int nEventExecutors, Executor executor) {
super(nEventExecutors, executor);
}
/**
* Create a new instance.
*
* @param nEventExecutors the number of {@link DefaultEventExecutor}s that this group will use.
* @param executorFactory the {@link ExecutorFactory} which produces the {@link Executor} responsible for
* executing the work handled by this {@link EventExecutorGroup}.
*/
public DefaultEventExecutorGroup(int nEventExecutors, ExecutorFactory executorFactory) {
super(nEventExecutors, executorFactory);
}
@Override

View File

@ -0,0 +1,147 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import io.netty.util.internal.InternalThreadLocalMap;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.chmv8.ForkJoinPool;
import io.netty.util.internal.chmv8.ForkJoinPool.ForkJoinWorkerThreadFactory;
import io.netty.util.internal.chmv8.ForkJoinWorkerThread;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Locale;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
/**
* An implementation of an {@link ExecutorFactory} that creates a new {@link ForkJoinPool} on each
* call to {@link #newExecutor(int)}.
* <p>
* This {@link ExecutorFactory} powers Netty's nio and epoll eventloops by default. Netty moved from managing its
* own threads and pinning a thread to each eventloop to an {@link Executor}-based approach. That way advanced
* users of Netty can plug in their own threadpools and gain more control of scheduling the eventloops.
* <p>
* The main reason behind choosing a {@link ForkJoinPool} as the default {@link Executor} is that it uses
* thread-local task queues, providing a high level of thread affinity to Netty's eventloops.
* <p>
* The whole discussion can be found on GitHub
* <a href="https://github.com/netty/netty/issues/2250">https://github.com/netty/netty/issues/2250</a>.
*/
public final class DefaultExecutorFactory implements ExecutorFactory {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(DefaultExecutorFactory.class);
private static final AtomicInteger executorId = new AtomicInteger();
private String namePrefix;
/**
* @param clazzNamePrefix the name of the class will be used to prefix the name of each
* {@link ForkJoinWorkerThread} with.
*/
public DefaultExecutorFactory(Class<?> clazzNamePrefix) {
this(toName(clazzNamePrefix));
}
/**
* @param namePrefix the string to prefix the name of each {@link ForkJoinWorkerThread} with.
*/
public DefaultExecutorFactory(String namePrefix) {
this.namePrefix = namePrefix;
}
@Override
public Executor newExecutor(int parallelism) {
ForkJoinWorkerThreadFactory threadFactory =
new DefaultForkJoinWorkerThreadFactory(namePrefix + "-" + executorId.getAndIncrement());
return new ForkJoinPool(parallelism, threadFactory, DefaultUncaughtExceptionHandler.INSTANCE, true);
}
private static String toName(Class<?> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
String clazzName = StringUtil.simpleClassName(clazz);
switch (clazzName.length()) {
case 0:
return "unknown";
case 1:
return clazzName.toLowerCase(Locale.US);
default:
if (Character.isUpperCase(clazzName.charAt(0)) && Character.isLowerCase(clazzName.charAt(1))) {
return Character.toLowerCase(clazzName.charAt(0)) + clazzName.substring(1);
} else {
return clazzName;
}
}
}
private static final class DefaultUncaughtExceptionHandler implements UncaughtExceptionHandler {
private static final DefaultUncaughtExceptionHandler INSTANCE = new DefaultUncaughtExceptionHandler();
@Override
public void uncaughtException(Thread t, Throwable e) {
if (logger.isErrorEnabled()) {
logger.error("Uncaught exception in thread: {}", t.getName(), e);
}
}
}
private static final class DefaultForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory {
private final AtomicInteger idx = new AtomicInteger();
private final String namePrefix;
DefaultForkJoinWorkerThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
}
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
// Note: The ForkJoinPool will create these threads as daemon threads.
ForkJoinWorkerThread thread = new DefaultForkJoinWorkerThread(pool);
thread.setName(namePrefix + "-" + idx.getAndIncrement());
thread.setPriority(Thread.MAX_PRIORITY);
return thread;
}
}
private static final class DefaultForkJoinWorkerThread
extends ForkJoinWorkerThread implements FastThreadLocalAccess {
private InternalThreadLocalMap threadLocalMap;
DefaultForkJoinWorkerThread(ForkJoinPool pool) {
super(pool);
}
@Override
public InternalThreadLocalMap threadLocalMap() {
return threadLocalMap;
}
@Override
public void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) {
this.threadLocalMap = threadLocalMap;
}
}
}

View File

@ -0,0 +1,27 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import java.util.concurrent.Executor;
/**
* An object that creates new {@link Executor}s on demand. Using executor factories mainly
* simplifies providing custom executor implementations to Netty's event loops.
*/
public interface ExecutorFactory {
Executor newExecutor(int parallelism);
}

View File

@ -23,18 +23,19 @@ import java.util.IdentityHashMap;
import java.util.Set;
/**
* A special variant of {@link ThreadLocal} that yields higher access performan when accessed from a
* {@link FastThreadLocalThread}.
* A special variant of {@link ThreadLocal} that yields to higher access performance when accessed from a
* {@link FastThreadLocalAccess} {@link Thread}.
* <p>
* Internally, a {@link FastThreadLocal} uses a constant index in an array, instead of using hash code and hash table,
* to look for a variable. Although seemingly very subtle, it yields slight performance advantage over using a hash
* table, and it is useful when accessed frequently.
* </p><p>
* To take advantage of this thread-local variable, your thread must be a {@link FastThreadLocalThread} or its subtype.
* By default, all threads created by {@link DefaultThreadFactory} are {@link FastThreadLocalThread} due to this reason.
* To take advantage of this thread-local variable, your thread must implement {@link FastThreadLocalAccess}.
* By default, all threads created by {@link DefaultThreadFactory} and {@link DefaultExecutorFactory} implement
* {@link FastThreadLocalAccess}.
* </p><p>
* Note that the fast path is only possible on threads that extend {@link FastThreadLocalThread}, because it requires
* a special field to store the necessary state. An access by any other kind of thread falls back to a regular
* Note that the fast path is only possible on threads that implement {@link FastThreadLocalAccess}, because it
* requires a special field to store the necessary state. An access by any other kind of thread falls back to a regular
* {@link ThreadLocal}.
* </p>
*

View File

@ -0,0 +1,39 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import io.netty.util.internal.InternalThreadLocalMap;
/**
* Netty's {@link Thread} implementations implement this interface to provide fast access to {@link FastThreadLocal}
* variables.
*
* @see FastThreadLocalThread
*/
public interface FastThreadLocalAccess {
/**
* Returns the internal data structure that keeps the thread-local variables bound to this thread.
* Note that this method is for internal use only, and thus is subject to change at any time.
*/
InternalThreadLocalMap threadLocalMap();
/**
* Sets the internal data structure that keeps the thread-local variables bound to this thread.
* Note that this method is for internal use only, and thus is subject to change at any time.
*/
void setThreadLocalMap(InternalThreadLocalMap threadLocalMap);
}

View File

@ -20,7 +20,7 @@ import io.netty.util.internal.InternalThreadLocalMap;
/**
* A special {@link Thread} that provides fast access to {@link FastThreadLocal} variables.
*/
public class FastThreadLocalThread extends Thread {
public class FastThreadLocalThread extends Thread implements FastThreadLocalAccess {
private InternalThreadLocalMap threadLocalMap;
@ -58,6 +58,7 @@ public class FastThreadLocalThread extends Thread {
* Returns the internal data structure that keeps the thread-local variables bound to this thread.
* Note that this method is for internal use only, and thus is subject to change at any time.
*/
@Override
public final InternalThreadLocalMap threadLocalMap() {
return threadLocalMap;
}
@ -66,6 +67,7 @@ public class FastThreadLocalThread extends Thread {
* Sets the internal data structure that keeps the thread-local variables bound to this thread.
* Note that this method is for internal use only, and thus is subject to change at any time.
*/
@Override
public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) {
this.threadLocalMap = threadLocalMap;
}

View File

@ -19,7 +19,6 @@ import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -37,40 +36,45 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto
private final EventExecutorChooser chooser;
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param threadFactory the ThreadFactory to use, or {@code null} if the default should be used.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
* @param nEventExecutors the number of {@link EventExecutor}s that will be used by this instance.
* If {@code executor} is {@code null} this number will also be the parallelism
* requested from the default executor. It is generally advised for the number
* of {@link EventExecutor}s and the number of {@link Thread}s used by the
* {@code executor} to lie very close together.
* @param executorFactory the {@link ExecutorFactory} to use, or {@code null} if the default should be used.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
*/
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
protected MultithreadEventExecutorGroup(int nEventExecutors, ExecutorFactory executorFactory, Object... args) {
this(nEventExecutors, executorFactory == null ? null : executorFactory.newExecutor(nEventExecutors), args);
}
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param executor the Executor to use, or {@code null} if the default should be used.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
* @param nEventExecutors the number of {@link EventExecutor}s that will be used by this instance.
* If {@code executor} is {@code null} this number will also be the parallelism
* requested from the default executor. It is generally advised for the number
* of {@link EventExecutor}s and the number of {@link Thread}s used by the
* {@code executor} to lie very close together.
* @param executor the {@link Executor} to use, or {@code null} if the default should be used.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
protected MultithreadEventExecutorGroup(int nEventExecutors, Executor executor, Object... args) {
if (nEventExecutors <= 0) {
throw new IllegalArgumentException(
String.format("nEventExecutors: %d (expected: > 0)", nEventExecutors));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
executor = newDefaultExecutor(nEventExecutors);
}
children = new EventExecutor[nThreads];
children = new EventExecutor[nEventExecutors];
if (isPowerOfTwo(children.length)) {
chooser = new PowerOfTwoEventExecutorChooser();
} else {
chooser = new GenericEventExecutorChooser();
}
for (int i = 0; i < nThreads; i ++) {
for (int i = 0; i < nEventExecutors; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
@ -118,8 +122,8 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass());
protected Executor newDefaultExecutor(int nEventExecutors) {
return new DefaultExecutorFactory(getClass()).newExecutor(nEventExecutors);
}
@Override

View File

@ -33,7 +33,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@ -61,6 +60,8 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER;
private static final long threadOffset;
static {
AtomicIntegerFieldUpdater<SingleThreadEventExecutor> updater =
PlatformDependent.newAtomicIntegerFieldUpdater(SingleThreadEventExecutor.class, "state");
@ -68,14 +69,21 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
updater = AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
}
STATE_UPDATER = updater;
try {
threadOffset =
PlatformDependent.objectFieldOffset(SingleThreadEventExecutor.class.getDeclaredField("thread"));
} catch (NoSuchFieldException e) {
throw new RuntimeException();
}
}
private final Queue<Runnable> taskQueue;
final Queue<ScheduledFutureTask<?>> delayedTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
@SuppressWarnings({ "FieldMayBeFinal", "unused" })
private volatile Thread thread;
private final Executor executor;
private volatile boolean interrupted;
private final Semaphore threadLock = new Semaphore(0);
private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
private final boolean addTaskWakesUp;
@ -91,26 +99,36 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
/**
* Create a new instance
*
* @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
* @param threadFactory the {@link ThreadFactory} which will be used for the used {@link Thread}
* @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
* executor thread
*/
protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp);
}
private boolean firstRun = true;
private final Runnable AS_RUNNABLE = new Runnable() {
@Override
public void run() {
updateThread(Thread.currentThread());
// lastExecutionTime must be set on the first run
// in order for shutdown to work correctly for the
// rare case that the eventloop did not execute
// a single task during its lifetime.
if (firstRun) {
firstRun = false;
updateLastExecutionTime();
}
try {
SingleThreadEventExecutor.this.run();
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
cleanupAndTerminate(false);
}
}
};
/**
* Create a new instance
*
* @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
* @param executor the {@link Executor} which will be used for executing
* @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
* executor thread
* @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it.
* @param executor the {@link Executor} which will be used for executing.
* @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up
* the executor thread.
*/
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
super(parent);
@ -134,18 +152,6 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
return new LinkedBlockingQueue<Runnable>();
}
/**
* Interrupt the current running {@link Thread}.
*/
protected void interruptThread() {
Thread currentThread = thread;
if (currentThread == null) {
interrupted = true;
} else {
currentThread.interrupt();
}
}
/**
* @see {@link Queue#poll()}
*/
@ -517,7 +523,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
gracefulShutdownTimeout = unit.toNanos(timeout);
if (oldState == ST_NOT_STARTED) {
doStartThread();
scheduleExecution();
}
if (wakeup) {
@ -569,7 +575,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
}
if (oldState == ST_NOT_STARTED) {
doStartThread();
scheduleExecution();
}
if (wakeup) {
@ -687,7 +693,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
if (inEventLoop) {
addTask(task);
} else {
startThread();
startExecution();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
@ -807,75 +813,65 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
return task;
}
private void startThread() {
protected void cleanupAndTerminate(boolean success) {
for (;;) {
int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
"before run() implementation terminates.");
}
try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.release();
if (!taskQueue.isEmpty()) {
logger.warn(
"An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}
firstRun = true;
terminationFuture.setSuccess(null);
}
}
}
private void startExecution() {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
delayedTaskQueue.add(new ScheduledFutureTask<Void>(
this, delayedTaskQueue, Executors.<Void>callable(new PurgeTask(), null),
ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
doStartThread();
scheduleExecution();
}
}
}
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
protected void scheduleExecution() {
updateThread(null);
executor.execute(AS_RUNNABLE);
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
"before run() implementation terminates.");
}
try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.release();
if (!taskQueue.isEmpty()) {
logger.warn(
"An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
private void updateThread(Thread t) {
PlatformDependent.putOrderedObject(this, threadOffset, t);
}
private final class PurgeTask implements Runnable {

View File

@ -17,7 +17,7 @@
package io.netty.util.internal;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.concurrent.FastThreadLocalThread;
import io.netty.util.concurrent.FastThreadLocalAccess;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
@ -39,8 +39,8 @@ public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap
public static InternalThreadLocalMap getIfSet() {
Thread thread = Thread.currentThread();
InternalThreadLocalMap threadLocalMap;
if (thread instanceof FastThreadLocalThread) {
threadLocalMap = ((FastThreadLocalThread) thread).threadLocalMap();
if (thread instanceof FastThreadLocalAccess) {
threadLocalMap = ((FastThreadLocalAccess) thread).threadLocalMap();
} else {
ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
if (slowThreadLocalMap == null) {
@ -54,14 +54,14 @@ public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap
public static InternalThreadLocalMap get() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
return fastGet((FastThreadLocalThread) thread);
if (thread instanceof FastThreadLocalAccess) {
return fastGet((FastThreadLocalAccess) thread);
} else {
return slowGet();
}
}
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
private static InternalThreadLocalMap fastGet(FastThreadLocalAccess thread) {
InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
if (threadLocalMap == null) {
thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
@ -86,8 +86,8 @@ public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap
public static void remove() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
((FastThreadLocalThread) thread).setThreadLocalMap(null);
if (thread instanceof FastThreadLocalAccess) {
((FastThreadLocalAccess) thread).setThreadLocalMap(null);
} else {
ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
if (slowThreadLocalMap != null) {

View File

@ -20,7 +20,6 @@ import org.junit.Test;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@ -178,21 +177,21 @@ public class DefaultPromiseTest {
private static final class TestEventExecutor extends SingleThreadEventExecutor {
TestEventExecutor() {
super(null, Executors.defaultThreadFactory(), true);
super(null, new DefaultExecutorFactory(TestEventExecutor.class).newExecutor(1), true);
}
@Override
protected void run() {
for (;;) {
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}
if (confirmShutdown()) {
break;
}
if (confirmShutdown()) {
cleanupAndTerminate(true);
} else {
scheduleExecution();
}
}
}

View File

@ -19,6 +19,8 @@ package io.netty.util.concurrent;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@ -75,4 +77,34 @@ public class FastThreadLocalTest {
throw t;
}
}
/**
* Make sure threads created by the {@link DefaultExecutorFactory} and {@link DefaultThreadFactory}
* implement the {@link FastThreadLocalAccess} interface.
*/
@Test
public void testIsFastThreadLocalThread() {
ExecutorFactory executorFactory = new DefaultExecutorFactory(FastThreadLocalTest.class);
int parallelism = Runtime.getRuntime().availableProcessors() * 2;
Executor executor = executorFactory.newExecutor(parallelism);
// submit a "high" number of tasks, to get a good chance to touch every thread.
for (int i = 0; i < parallelism * 100; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
assertTrue(Thread.currentThread() instanceof FastThreadLocalAccess);
}
});
}
ThreadFactory threadFactory = new DefaultThreadFactory(FastThreadLocalTest.class);
Thread t = threadFactory.newThread(new Runnable() {
@Override
public void run() {
}
});
assertTrue(t instanceof FastThreadLocalAccess);
}
}

View File

@ -23,9 +23,8 @@ import io.netty.channel.udt.UdtChannel;
import io.netty.channel.udt.nio.NioUdtProvider;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.ThreadFactory;
import io.netty.util.concurrent.DefaultExecutorFactory;
import io.netty.util.concurrent.ExecutorFactory;
/**
* UDT Byte Stream Client
@ -43,9 +42,10 @@ public final class ByteEchoClient {
public static void main(String[] args) throws Exception {
// Configure the client.
final ThreadFactory connectFactory = new DefaultThreadFactory("connect");
final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1,
connectFactory, NioUdtProvider.BYTE_PROVIDER);
final ExecutorFactory connectFactory = new DefaultExecutorFactory("connect");
final NioEventLoopGroup connectGroup =
new NioEventLoopGroup(1, connectFactory, NioUdtProvider.BYTE_PROVIDER);
try {
final Bootstrap boot = new Bootstrap();
boot.group(connectGroup)

View File

@ -24,9 +24,8 @@ import io.netty.channel.udt.UdtChannel;
import io.netty.channel.udt.nio.NioUdtProvider;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.ThreadFactory;
import io.netty.util.concurrent.DefaultExecutorFactory;
import io.netty.util.concurrent.ExecutorFactory;
/**
* UDT Byte Stream Server
@ -38,10 +37,10 @@ public final class ByteEchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
final ThreadFactory acceptFactory = new DefaultThreadFactory("accept");
final ThreadFactory connectFactory = new DefaultThreadFactory("connect");
final NioEventLoopGroup acceptGroup = new NioEventLoopGroup(1, acceptFactory, NioUdtProvider.BYTE_PROVIDER);
final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1, connectFactory, NioUdtProvider.BYTE_PROVIDER);
ExecutorFactory acceptFactory = new DefaultExecutorFactory("accept");
ExecutorFactory connectFactory = new DefaultExecutorFactory("connect");
NioEventLoopGroup acceptGroup = new NioEventLoopGroup(1, acceptFactory, NioUdtProvider.BYTE_PROVIDER);
NioEventLoopGroup connectGroup = new NioEventLoopGroup(1, connectFactory, NioUdtProvider.BYTE_PROVIDER);
// Configure the server.
try {

View File

@ -23,9 +23,9 @@ import io.netty.channel.udt.UdtChannel;
import io.netty.channel.udt.nio.NioUdtProvider;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.DefaultExecutorFactory;
import io.netty.util.concurrent.ExecutorFactory;
import java.util.concurrent.ThreadFactory;
import java.util.logging.Logger;
/**
@ -45,11 +45,11 @@ public final class MsgEchoClient {
static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
public static void main(String[] args) throws Exception {
// Configure the client.
final ThreadFactory connectFactory = new DefaultThreadFactory("connect");
final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1,
connectFactory, NioUdtProvider.MESSAGE_PROVIDER);
final ExecutorFactory connectFactory = new DefaultExecutorFactory("connect");
final NioEventLoopGroup connectGroup =
new NioEventLoopGroup(1, connectFactory, NioUdtProvider.MESSAGE_PROVIDER);
try {
final Bootstrap boot = new Bootstrap();
boot.group(connectGroup)

View File

@ -24,9 +24,8 @@ import io.netty.channel.udt.UdtChannel;
import io.netty.channel.udt.nio.NioUdtProvider;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.ThreadFactory;
import io.netty.util.concurrent.DefaultExecutorFactory;
import io.netty.util.concurrent.ExecutorFactory;
/**
* UDT Message Flow Server
@ -38,8 +37,8 @@ public final class MsgEchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
final ThreadFactory acceptFactory = new DefaultThreadFactory("accept");
final ThreadFactory connectFactory = new DefaultThreadFactory("connect");
final ExecutorFactory acceptFactory = new DefaultExecutorFactory("accept");
final ExecutorFactory connectFactory = new DefaultExecutorFactory("connect");
final NioEventLoopGroup acceptGroup =
new NioEventLoopGroup(1, acceptFactory, NioUdtProvider.MESSAGE_PROVIDER);
final NioEventLoopGroup connectGroup =

View File

@ -23,10 +23,10 @@ import io.netty.channel.udt.UdtChannel;
import io.netty.channel.udt.nio.NioUdtProvider;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.DefaultExecutorFactory;
import io.netty.util.concurrent.ExecutorFactory;
import java.net.InetSocketAddress;
import java.util.concurrent.ThreadFactory;
/**
* UDT Message Flow Peer
@ -48,9 +48,10 @@ public abstract class MsgEchoPeerBase {
public void run() throws Exception {
// Configure the peer.
final ThreadFactory connectFactory = new DefaultThreadFactory("rendezvous");
final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1,
connectFactory, NioUdtProvider.MESSAGE_PROVIDER);
final ExecutorFactory connectFactory = new DefaultExecutorFactory("rendezvous");
final NioEventLoopGroup connectGroup =
new NioEventLoopGroup(1, connectFactory, NioUdtProvider.MESSAGE_PROVIDER);
try {
final Bootstrap boot = new Bootstrap();
boot.group(connectGroup)

View File

@ -23,10 +23,10 @@ import io.netty.channel.udt.UdtChannel;
import io.netty.channel.udt.nio.NioUdtProvider;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.DefaultExecutorFactory;
import io.netty.util.concurrent.ExecutorFactory;
import java.net.SocketAddress;
import java.util.concurrent.ThreadFactory;
/**
* UDT Byte Stream Peer
@ -50,9 +50,10 @@ public class ByteEchoPeerBase {
}
public void run() throws Exception {
final ThreadFactory connectFactory = new DefaultThreadFactory("rendezvous");
final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1,
connectFactory, NioUdtProvider.BYTE_PROVIDER);
final ExecutorFactory connectFactory = new DefaultExecutorFactory("rendezvous");
final NioEventLoopGroup connectGroup =
new NioEventLoopGroup(1, connectFactory, NioUdtProvider.BYTE_PROVIDER);
try {
final Bootstrap bootstrap = new Bootstrap();
bootstrap.group(connectGroup)

View File

@ -27,6 +27,7 @@ import io.netty.channel.sctp.oio.OioSctpServerChannel;
import io.netty.testsuite.util.TestUtils;
import io.netty.testsuite.transport.TestsuitePermutation.BootstrapComboFactory;
import io.netty.testsuite.transport.TestsuitePermutation.BootstrapFactory;
import io.netty.util.concurrent.DefaultExecutorFactory;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.ArrayList;
@ -38,9 +39,9 @@ public final class SctpTestPermutation {
private static final int BOSSES = 2;
private static final int WORKERS = 3;
private static final EventLoopGroup nioBossGroup =
new NioEventLoopGroup(BOSSES, new DefaultThreadFactory("testsuite-sctp-nio-boss", true));
new NioEventLoopGroup(BOSSES, new DefaultExecutorFactory("testsuite-sctp-nio-boss"));
private static final EventLoopGroup nioWorkerGroup =
new NioEventLoopGroup(WORKERS, new DefaultThreadFactory("testsuite-sctp-nio-worker", true));
new NioEventLoopGroup(WORKERS, new DefaultExecutorFactory("testsuite-sctp-nio-worker"));
private static final EventLoopGroup oioBossGroup =
new OioEventLoopGroup(Integer.MAX_VALUE, new DefaultThreadFactory("testsuite-sctp-oio-boss", true));
private static final EventLoopGroup oioWorkerGroup =

View File

@ -25,7 +25,6 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Promise;
import org.junit.Test;
@ -37,8 +36,7 @@ import static org.junit.Assert.*;
public class SocketBufReleaseTest extends AbstractSocketTest {
private static final EventExecutor executor =
new DefaultEventExecutorGroup(1, new DefaultThreadFactory(SocketBufReleaseTest.class, true)).next();
private static final EventExecutor executor = new DefaultEventExecutorGroup(1).next();
@Test
public void testBufRelease() throws Throwable {

View File

@ -33,6 +33,7 @@ import io.netty.channel.socket.oio.OioServerSocketChannel;
import io.netty.channel.socket.oio.OioSocketChannel;
import io.netty.testsuite.transport.TestsuitePermutation.BootstrapComboFactory;
import io.netty.testsuite.transport.TestsuitePermutation.BootstrapFactory;
import io.netty.util.concurrent.DefaultExecutorFactory;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.ArrayList;
@ -49,9 +50,9 @@ public class SocketTestPermutation {
protected static final int OIO_SO_TIMEOUT = 10; // Use short timeout for faster runs.
protected final EventLoopGroup nioBossGroup =
new NioEventLoopGroup(BOSSES, new DefaultThreadFactory("testsuite-nio-boss", true));
new NioEventLoopGroup(BOSSES, new DefaultExecutorFactory("testsuite-nio-boss"));
protected final EventLoopGroup nioWorkerGroup =
new NioEventLoopGroup(WORKERS, new DefaultThreadFactory("testsuite-nio-worker", true));
new NioEventLoopGroup(WORKERS, new DefaultExecutorFactory("testsuite-nio-worker"));
protected final EventLoopGroup oioBossGroup =
new OioEventLoopGroup(Integer.MAX_VALUE, new DefaultThreadFactory("testsuite-oio-boss", true));
protected final EventLoopGroup oioWorkerGroup =

View File

@ -32,14 +32,13 @@ import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.DefaultExecutorFactory;
import io.netty.util.concurrent.ExecutorFactory;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ThreadFactory;
import static org.junit.Assert.*;
/**
@ -66,9 +65,10 @@ public class UDTClientServerConnectionTest {
@Override
public void run() {
final Bootstrap boot = new Bootstrap();
final ThreadFactory clientFactory = new DefaultThreadFactory("client");
final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1,
clientFactory, NioUdtProvider.BYTE_PROVIDER);
final ExecutorFactory clientFactory = new DefaultExecutorFactory("client");
final NioEventLoopGroup connectGroup =
new NioEventLoopGroup(1, clientFactory, NioUdtProvider.BYTE_PROVIDER);
try {
boot.group(connectGroup)
.channelFactory(NioUdtProvider.BYTE_CONNECTOR)
@ -193,12 +193,13 @@ public class UDTClientServerConnectionTest {
@Override
public void run() {
final ServerBootstrap boot = new ServerBootstrap();
final ThreadFactory acceptFactory = new DefaultThreadFactory("accept");
final ThreadFactory serverFactory = new DefaultThreadFactory("server");
final NioEventLoopGroup acceptGroup = new NioEventLoopGroup(1,
acceptFactory, NioUdtProvider.BYTE_PROVIDER);
final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1,
serverFactory, NioUdtProvider.BYTE_PROVIDER);
final ExecutorFactory acceptFactory = new DefaultExecutorFactory("accept");
final ExecutorFactory serverFactory = new DefaultExecutorFactory("server");
final NioEventLoopGroup acceptGroup =
new NioEventLoopGroup(1, acceptFactory, NioUdtProvider.BYTE_PROVIDER);
final NioEventLoopGroup connectGroup =
new NioEventLoopGroup(1, serverFactory, NioUdtProvider.BYTE_PROVIDER);
try {
boot.group(acceptGroup, connectGroup)
.channelFactory(NioUdtProvider.BYTE_ACCEPTOR)

View File

@ -33,7 +33,8 @@ import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
/**
* {@link EventLoop} which uses epoll under the covers. Only works on Linux!
* A {@link SingleThreadEventLoop} implementation which uses <a href="http://en.wikipedia.org/wiki/Epoll">epoll</a>
* under the covers. This {@link EventLoop} works only on Linux systems!
*/
final class EpollEventLoop extends SingleThreadEventLoop {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollEventLoop.class);
@ -208,84 +209,85 @@ final class EpollEventLoop extends SingleThreadEventLoop {
@Override
protected void run() {
for (;;) {
boolean oldWakenUp = WAKEN_UP_UPDATER.getAndSet(this, 0) == 1;
try {
int ready;
if (hasTasks()) {
// Non blocking just return what is ready directly without block
ready = Native.epollWait(epollFd, events, 0);
} else {
ready = epollWait(oldWakenUp);
boolean oldWakenUp = WAKEN_UP_UPDATER.getAndSet(this, 0) == 1;
try {
int ready;
if (hasTasks()) {
// Non blocking just return what is ready directly without block
ready = Native.epollWait(epollFd, events, 0);
} else {
ready = epollWait(oldWakenUp);
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp == 1) {
Native.eventFdWrite(eventFd, 1L);
}
}
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
if (ready > 0) {
processReady(events, ready);
}
runAllTasks();
} else {
final long ioStartTime = System.nanoTime();
if (ready > 0) {
processReady(events, ready);
}
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
} catch (Throwable t) {
logger.warn("Unexpected exception in the selector loop.", t);
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
if (wakenUp == 1) {
Native.eventFdWrite(eventFd, 1L);
}
}
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
if (ready > 0) {
processReady(events, ready);
}
runAllTasks();
} else {
final long ioStartTime = System.nanoTime();
if (ready > 0) {
processReady(events, ready);
}
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
cleanupAndTerminate(true);
return;
}
}
} catch (Throwable t) {
logger.warn("Unexpected exception in the selector loop.", t);
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
scheduleExecution();
}
private void closeAll() {

View File

@ -19,43 +19,90 @@ import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.ExecutorFactory;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import io.netty.util.concurrent.DefaultExecutorFactory;
/**
* {@link EventLoopGroup} which uses epoll under the covers. Because of this
* it only works on linux.
* A {@link MultithreadEventLoopGroup} which uses <a href="http://en.wikipedia.org/wiki/Epoll">epoll</a> under the
* covers. This {@link EventLoopGroup} works only on Linux systems!
*/
public final class EpollEventLoopGroup extends MultithreadEventLoopGroup {
/**
* Create a new instance using the default number of threads and the default {@link ThreadFactory}.
* Create a new instance that uses twice as many {@link EventLoop}s as there processors/cores
* available, as well as the default {@link Executor}.
*
* @see DefaultExecutorFactory
*/
public EpollEventLoopGroup() {
this(0);
}
/**
* Create a new instance using the specified number of threads and the default {@link ThreadFactory}.
* Create a new instance that uses the default {@link Executor}.
*
* @see DefaultExecutorFactory
*
* @param nEventLoops the number of {@link EventLoop}s that will be used by this instance.
* If {@code executor} is {@code null} this number will also be the parallelism
* requested from the default executor. It is generally advised for the number
* of {@link EventLoop}s and the number of {@link Thread}s used by the
*/
public EpollEventLoopGroup(int nThreads) {
this(nThreads, null);
public EpollEventLoopGroup(int nEventLoops) {
this(nEventLoops, (Executor) null);
}
/**
* Create a new instance using the specified number of threads and the given {@link ThreadFactory}.
* @param nEventLoops the number of {@link EventLoop}s that will be used by this instance.
* If {@code executor} is {@code null} this number will also be the parallelism
* requested from the default executor. It is generally advised for the number
* of {@link EventLoop}s and the number of {@link Thread}s used by the
* {@code executor} to lie very close together.
* @param executor the {@link Executor} to use, or {@code null} if the default should be used.
*/
public EpollEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, 128);
public EpollEventLoopGroup(int nEventLoops, Executor executor) {
this(nEventLoops, executor, 128);
}
/**
* Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the given
* maximal amount of epoll events to handle per epollWait(...).
* @param nEventLoops the number of {@link EventLoop}s that will be used by this instance.
* If {@code executor} is {@code null} this number will also be the parallelism
* requested from the default executor. It is generally advised for the number
* of {@link EventLoop}s and the number of {@link Thread}s used by the
* {@code executor} to lie very close together.
* @param executorFactory the {@link ExecutorFactory} to use, or {@code null} if the default should be used.
*/
public EpollEventLoopGroup(int nThreads, ThreadFactory threadFactory, int maxEventsAtOnce) {
super(nThreads, threadFactory, maxEventsAtOnce);
public EpollEventLoopGroup(int nEventLoops, ExecutorFactory executorFactory) {
this(nEventLoops, executorFactory, 128);
}
/**
* @param nEventLoops the number of {@link EventLoop}s that will be used by this instance.
* If {@code executor} is {@code null} this number will also be the parallelism
* requested from the default executor. It is generally advised for the number
* of {@link EventLoop}s and the number of {@link Thread}s used by the
* {@code executor} to lie very close together.
* @param executor the {@link Executor} to use, or {@code null} if the default should be used.
* @param maxEventsAtOnce the maximum number of epoll events to handle per epollWait(...).
*/
public EpollEventLoopGroup(int nEventLoops, Executor executor, int maxEventsAtOnce) {
super(nEventLoops, executor, maxEventsAtOnce);
}
/**
* @param nEventLoops the number of {@link EventLoop}s that will be used by this instance.
* If {@code executor} is {@code null} this number will also be the parallelism
* requested from the default executor. It is generally advised for the number
* of {@link EventLoop}s and the number of {@link Thread}s used by the
* {@code executor} to lie very close together.
* @param executorFactory the {@link ExecutorFactory} to use, or {@code null} if the default should be used.
* @param maxEventsAtOnce the maximum number of epoll events to handle per epollWait(...).
*/
public EpollEventLoopGroup(int nEventLoops, ExecutorFactory executorFactory, int maxEventsAtOnce) {
super(nEventLoops, executorFactory, maxEventsAtOnce);
}
/**

View File

@ -27,7 +27,7 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.TestsuitePermutation.BootstrapFactory;
import io.netty.testsuite.transport.socket.SocketTestPermutation;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.DefaultExecutorFactory;
import java.util.Arrays;
import java.util.List;
@ -37,9 +37,9 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
static final SocketTestPermutation INSTANCE = new EpollSocketTestPermutation();
static final EventLoopGroup EPOLL_BOSS_GROUP =
new EpollEventLoopGroup(BOSSES, new DefaultThreadFactory("testsuite-epoll-boss", true));
new EpollEventLoopGroup(BOSSES, new DefaultExecutorFactory("testsuite-epoll-boss"));
static final EventLoopGroup EPOLL_WORKER_GROUP =
new EpollEventLoopGroup(WORKERS, new DefaultThreadFactory("testsuite-epoll-worker", true));
new EpollEventLoopGroup(WORKERS, new DefaultExecutorFactory("testsuite-epoll-worker"));
@Override
public List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> socket() {

View File

@ -28,11 +28,11 @@ import io.netty.test.udt.util.CustomReporter;
import io.netty.test.udt.util.EchoMessageHandler;
import io.netty.test.udt.util.TrafficControl;
import io.netty.test.udt.util.UnitHelp;
import io.netty.util.concurrent.DefaultExecutorFactory;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -92,10 +92,10 @@ public final class UdtNetty {
final ChannelHandler handler1 = new EchoMessageHandler(rate, size);
final ChannelHandler handler2 = new EchoMessageHandler(null, size);
final NioEventLoopGroup group1 = new NioEventLoopGroup(
1, Executors.defaultThreadFactory(), NioUdtProvider.MESSAGE_PROVIDER);
final NioEventLoopGroup group2 = new NioEventLoopGroup(
1, Executors.defaultThreadFactory(), NioUdtProvider.MESSAGE_PROVIDER);
final NioEventLoopGroup group1 =
new NioEventLoopGroup(1, new DefaultExecutorFactory("group1"), NioUdtProvider.MESSAGE_PROVIDER);
final NioEventLoopGroup group2 =
new NioEventLoopGroup(1, new DefaultExecutorFactory("group2"), NioUdtProvider.MESSAGE_PROVIDER);
final Bootstrap peerBoot1 = new Bootstrap();
peerBoot1.group(group1)

View File

@ -25,12 +25,12 @@ import io.netty.channel.udt.nio.NioUdtByteRendezvousChannel;
import io.netty.channel.udt.nio.NioUdtProvider;
import io.netty.test.udt.util.EchoByteHandler;
import io.netty.test.udt.util.UnitHelp;
import io.netty.util.concurrent.DefaultExecutorFactory;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import org.junit.Test;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
@ -70,10 +70,10 @@ public class NioUdtByteRendezvousChannelTest extends AbstractUdtTest {
final EchoByteHandler handler1 = new EchoByteHandler(rate1, messageSize);
final EchoByteHandler handler2 = new EchoByteHandler(rate2, messageSize);
final NioEventLoopGroup group1 = new NioEventLoopGroup(
1, Executors.defaultThreadFactory(), NioUdtProvider.BYTE_PROVIDER);
final NioEventLoopGroup group2 = new NioEventLoopGroup(
1, Executors.defaultThreadFactory(), NioUdtProvider.BYTE_PROVIDER);
final NioEventLoopGroup group1 =
new NioEventLoopGroup(1, new DefaultExecutorFactory("group1"), NioUdtProvider.BYTE_PROVIDER);
final NioEventLoopGroup group2 =
new NioEventLoopGroup(1, new DefaultExecutorFactory("group2"), NioUdtProvider.BYTE_PROVIDER);
final Bootstrap boot1 = new Bootstrap();
boot1.group(group1)

View File

@ -25,12 +25,12 @@ import io.netty.channel.udt.nio.NioUdtMessageRendezvousChannel;
import io.netty.channel.udt.nio.NioUdtProvider;
import io.netty.test.udt.util.EchoMessageHandler;
import io.netty.test.udt.util.UnitHelp;
import io.netty.util.concurrent.DefaultExecutorFactory;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import org.junit.Test;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
@ -68,10 +68,10 @@ public class NioUdtMessageRendezvousChannelTest extends AbstractUdtTest {
final EchoMessageHandler handler1 = new EchoMessageHandler(rate1, messageSize);
final EchoMessageHandler handler2 = new EchoMessageHandler(rate2, messageSize);
final NioEventLoopGroup group1 = new NioEventLoopGroup(
1, Executors.defaultThreadFactory(), NioUdtProvider.MESSAGE_PROVIDER);
final NioEventLoopGroup group2 = new NioEventLoopGroup(
1, Executors.defaultThreadFactory(), NioUdtProvider.MESSAGE_PROVIDER);
final NioEventLoopGroup group1 =
new NioEventLoopGroup(1, new DefaultExecutorFactory("group1"), NioUdtProvider.MESSAGE_PROVIDER);
final NioEventLoopGroup group2 =
new NioEventLoopGroup(1, new DefaultExecutorFactory("group2"), NioUdtProvider.MESSAGE_PROVIDER);
final Bootstrap boot1 = new Bootstrap();
boot1.group(group1)

View File

@ -15,31 +15,21 @@
*/
package io.netty.channel;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.DefaultExecutorFactory;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
public class DefaultEventLoop extends SingleThreadEventLoop {
public DefaultEventLoop() {
this((EventLoopGroup) null);
}
public DefaultEventLoop(ThreadFactory threadFactory) {
this(null, threadFactory);
}
public DefaultEventLoop(Executor executor) {
this(null, executor);
}
public DefaultEventLoop(EventLoopGroup parent) {
this(parent, new DefaultThreadFactory(DefaultEventLoop.class));
}
public DefaultEventLoop(EventLoopGroup parent, ThreadFactory threadFactory) {
super(parent, threadFactory, true);
this(parent, new DefaultExecutorFactory(DefaultEventLoop.class).newExecutor(1));
}
public DefaultEventLoop(EventLoopGroup parent, Executor executor) {
@ -48,16 +38,16 @@ public class DefaultEventLoop extends SingleThreadEventLoop {
@Override
protected void run() {
for (;;) {
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}
if (confirmShutdown()) {
break;
}
if (confirmShutdown()) {
cleanupAndTerminate(true);
} else {
scheduleExecution();
}
}
}

View File

@ -15,8 +15,9 @@
*/
package io.netty.channel;
import io.netty.util.concurrent.ExecutorFactory;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
/**
* {@link MultithreadEventLoopGroup} which must be used for the local transport.
@ -24,29 +25,48 @@ import java.util.concurrent.ThreadFactory;
public class DefaultEventLoopGroup extends MultithreadEventLoopGroup {
/**
* Create a new instance with the default number of threads.
* Create a new instance that uses twice as many {@link EventLoop}s as there processors/cores
* available, as well as the default {@link Executor}.
*
* @see io.netty.util.concurrent.DefaultExecutorFactory
*/
public DefaultEventLoopGroup() {
this(0);
}
/**
* Create a new instance
*
* @param nThreads the number of threads to use
* @param nEventLoops the number of {@link EventLoop}s that will be used by this instance.
* If {@code executor} is {@code null} this number will also be the parallelism
* requested from the default executor. It is generally advised for the number
* of {@link EventLoop}s and the number of {@link Thread}s used by the
* {@code executor} to lie very close together.
*/
public DefaultEventLoopGroup(int nThreads) {
this(nThreads, null);
public DefaultEventLoopGroup(int nEventLoops) {
this(nEventLoops, (Executor) null);
}
/**
* Create a new instance
*
* @param nThreads the number of threads to use
* @param threadFactory the {@link ThreadFactory} or {@code null} to use the default
* @param nEventLoops the number of {@link EventLoop}s that will be used by this instance.
* If {@code executor} is {@code null} this number will also be the parallelism
* requested from the default executor. It is generally advised for the number
* of {@link EventLoop}s and the number of {@link Thread}s used by the
* {@code executor} to lie very close together.
* @param executor the {@link Executor} to use, or {@code null} if the default should be used.
*/
public DefaultEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
super(nThreads, threadFactory);
public DefaultEventLoopGroup(int nEventLoops, Executor executor) {
super(nEventLoops, executor);
}
/**
* @param nEventLoops the number of {@link EventLoop}s that will be used by this instance.
* If {@code executor} is {@code null} this number will also be the parallelism
* requested from the default executor. It is generally advised for the number
* of {@link EventLoop}s and the number of {@link Thread}s used by the
* {@code executor} to lie very close together.
* @param executorFactory the {@link ExecutorFactory} to use, or {@code null} if the default should be used.
*/
public DefaultEventLoopGroup(int nEventLoops, ExecutorFactory executorFactory) {
super(nEventLoops, executorFactory);
}
@Override

View File

@ -15,17 +15,16 @@
*/
package io.netty.channel;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.ExecutorFactory;
import io.netty.util.concurrent.MultithreadEventExecutorGroup;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
/**
* Abstract base class for {@link EventLoopGroup} implementations that handles their tasks with multiple threads at
* Abstract base class for {@link EventLoopGroup} implementations that handle their tasks with multiple threads at
* the same time.
*/
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
@ -46,20 +45,15 @@ public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutor
/**
* @see {@link MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)}
*/
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
protected MultithreadEventLoopGroup(int nEventLoops, Executor executor, Object... args) {
super(nEventLoops == 0 ? DEFAULT_EVENT_LOOP_THREADS : nEventLoops, executor, args);
}
/**
* @see {@link MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, ThreadFactory, Object...)}
* @see {@link MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, ExecutorFactory, Object...)}
*/
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
@Override
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);
protected MultithreadEventLoopGroup(int nEventLoops, ExecutorFactory executorFactory, Object... args) {
super(nEventLoops == 0 ? DEFAULT_EVENT_LOOP_THREADS : nEventLoops, executorFactory, args);
}
@Override

View File

@ -28,10 +28,6 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im
private final ChannelHandlerInvoker invoker = new DefaultChannelHandlerInvoker(this);
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
super(parent, threadFactory, addTaskWakesUp);
}
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp) {
super(parent, executor, addTaskWakesUp);
}

View File

@ -59,7 +59,8 @@ public class ThreadPerChannelEventLoop extends SingleThreadEventLoop {
ch.unsafe().close(ch.unsafe().voidPromise());
}
if (confirmShutdown()) {
break;
cleanupAndTerminate(true);
return;
}
} else {
if (ch != null) {

View File

@ -1,51 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.local;
import io.netty.channel.DefaultEventLoopGroup;
import java.util.concurrent.ThreadFactory;
/**
* @deprecated Use {@link DefaultEventLoopGroup} instead.
*/
@Deprecated
public class LocalEventLoopGroup extends DefaultEventLoopGroup {
/**
* Create a new instance with the default number of threads.
*/
public LocalEventLoopGroup() { }
/**
* Create a new instance
*
* @param nThreads the number of threads to use
*/
public LocalEventLoopGroup(int nThreads) {
super(nThreads);
}
/**
* Create a new instance
*
* @param nThreads the number of threads to use
* @param threadFactory the {@link ThreadFactory} or {@code null} to use the default
*/
public LocalEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
super(nThreads, threadFactory);
}
}

View File

@ -43,9 +43,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* {@link SingleThreadEventLoop} implementation which register the {@link Channel}'s to a
* {@link Selector} and so does the multi-plexing of these in the event loop.
*
* A {@link SingleThreadEventLoop} implementation which registers each {@link Channel} with a
* NIO {@link Selector} and performs the multiplexing of these in the event loop.
*/
public final class NioEventLoop extends SingleThreadEventLoop {
@ -300,80 +299,85 @@ public final class NioEventLoop extends SingleThreadEventLoop {
@Override
protected void run() {
for (;;) {
boolean oldWakenUp = wakenUp.getAndSet(false);
try {
if (hasTasks()) {
selectNow();
} else {
select(oldWakenUp);
boolean oldWakenUp = wakenUp.getAndSet(false);
try {
if (hasTasks()) {
selectNow();
} else {
select(oldWakenUp);
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
selector.wakeup();
}
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
processSelectedKeys();
runAllTasks();
} else {
final long ioStartTime = System.nanoTime();
processSelectedKeys();
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
} catch (Throwable t) {
logger.warn("Unexpected exception in the selector loop.", t);
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
if (wakenUp.get()) {
selector.wakeup();
}
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
processSelectedKeys();
runAllTasks();
} else {
final long ioStartTime = System.nanoTime();
processSelectedKeys();
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
cleanupAndTerminate(true);
return;
}
}
} catch (Throwable t) {
logger.warn("Unexpected exception in the selector loop.", t);
// TODO: After using a ForkJoinPool that is potentially shared with other software
// than Netty. The Thread.sleep might be problematic. Even though this is unlikely to ever
// happen anyways.
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
scheduleExecution();
}
private void processSelectedKeys() {

View File

@ -23,53 +23,95 @@ import io.netty.util.concurrent.EventExecutor;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import io.netty.util.concurrent.DefaultExecutorFactory;
import io.netty.util.concurrent.ExecutorFactory;
/**
* {@link MultithreadEventLoopGroup} implementations which is used for NIO {@link Selector} based {@link Channel}s.
* A {@link MultithreadEventLoopGroup} implementation which is used for NIO {@link Selector} based {@link Channel}s.
*/
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
/**
* Create a new instance using the default number of threads, the default {@link ThreadFactory} and
* the {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
* Create a new instance that uses twice as many {@link EventLoop}s as there processors/cores
* available, as well as the default {@link Executor} and the {@link SelectorProvider} which
* is returned by {@link SelectorProvider#provider()}.
*
* @see DefaultExecutorFactory
*/
public NioEventLoopGroup() {
this(0);
}
/**
* Create a new instance using the specified number of threads, {@link ThreadFactory} and the
* {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
* Create a new instance that uses the default {@link Executor} and the {@link SelectorProvider} which
* is returned by {@link SelectorProvider#provider()}.
*
* @see DefaultExecutorFactory
*
* @param nEventLoops the number of {@link EventLoop}s that will be used by this instance.
* If {@code executor} is {@code null} this number will also be the parallelism
* requested from the default executor. It is generally advised for the number
* of {@link EventLoop}s and the number of {@link Thread}s used by the
*/
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
public NioEventLoopGroup(int nEventLoops) {
this(nEventLoops, (Executor) null);
}
/**
* Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the
* {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
* Create a new instance that uses the the {@link SelectorProvider} which is returned by
* {@link SelectorProvider#provider()}.
*
* @param nEventLoops the number of {@link EventLoop}s that will be used by this instance.
* If {@code executor} is {@code null} this number will also be the parallelism
* requested from the default executor. It is generally advised for the number
* of {@link EventLoop}s and the number of {@link Thread}s used by the
* {@code executor} to lie very close together.
* @param executor the {@link Executor} to use, or {@code null} if the default should be used.
*/
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, SelectorProvider.provider());
}
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
public NioEventLoopGroup(int nEventLoops, Executor executor) {
this(nEventLoops, executor, SelectorProvider.provider());
}
/**
* Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the given
* {@link SelectorProvider}.
* Create a new instance that uses the the {@link SelectorProvider} which is returned by
* {@link SelectorProvider#provider()}.
*
* @param nEventLoops the number of {@link EventLoop}s that will be used by this instance.
* If {@code executor} is {@code null} this number will also be the parallelism
* requested from the default executor. It is generally advised for the number
* of {@link EventLoop}s and the number of {@link Thread}s used by the
* {@code executor} to lie very close together.
* @param executorFactory the {@link ExecutorFactory} to use, or {@code null} if the default should be used.
*/
public NioEventLoopGroup(
int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
super(nThreads, threadFactory, selectorProvider);
public NioEventLoopGroup(int nEventLoops, ExecutorFactory executorFactory) {
this(nEventLoops, executorFactory, SelectorProvider.provider());
}
/**
* @param nEventLoops the number of {@link EventLoop}s that will be used by this instance.
* If {@code executor} is {@code null} this number will also be the parallelism
* requested from the default executor. It is generally advised for the number
* of {@link EventLoop}s and the number of {@link Thread}s used by the
* {@code executor} to lie very close together.
* @param executor the {@link Executor} to use, or {@code null} if the default should be used.
* @param selectorProvider the {@link SelectorProvider} to use. This value must not be {@code null}.
*/
public NioEventLoopGroup(int nEventLoops, Executor executor, final SelectorProvider selectorProvider) {
super(nEventLoops, executor, selectorProvider);
}
/**
* @param nEventLoops the number of {@link EventLoop}s that will be used by this instance.
* If {@code executor} is {@code null} this number will also be the parallelism
* requested from the default executor. It is generally advised for the number
* of {@link EventLoop}s and the number of {@link Thread}s used by the
* {@code executor} to lie very close together.
* @param executorFactory the {@link ExecutorFactory} to use, or {@code null} if the default should be used.
* @param selectorProvider the {@link SelectorProvider} to use. This value must not be {@code null}.
*/
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
super(nThreads, executor, selectorProvider);
int nEventLoops, ExecutorFactory executorFactory, final SelectorProvider selectorProvider) {
super(nEventLoops, executorFactory, selectorProvider);
}
/**

View File

@ -19,6 +19,7 @@ import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.Appender;
import io.netty.channel.local.LocalChannel;
import io.netty.util.concurrent.DefaultExecutorFactory;
import io.netty.util.concurrent.EventExecutor;
import org.junit.After;
import org.junit.Before;
@ -31,7 +32,6 @@ import java.util.List;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
@ -440,21 +440,21 @@ public class SingleThreadEventLoopTest {
final AtomicInteger cleanedUp = new AtomicInteger();
SingleThreadEventLoopA() {
super(null, Executors.defaultThreadFactory(), true);
super(null, new DefaultExecutorFactory("A").newExecutor(1), true);
}
@Override
protected void run() {
for (;;) {
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}
if (confirmShutdown()) {
break;
}
if (confirmShutdown()) {
cleanupAndTerminate(true);
} else {
scheduleExecution();
}
}
@ -466,30 +466,43 @@ public class SingleThreadEventLoopTest {
private static class SingleThreadEventLoopB extends SingleThreadEventLoop {
private volatile Thread thread;
private volatile boolean interrupted;
SingleThreadEventLoopB() {
super(null, Executors.defaultThreadFactory(), false);
super(null, new DefaultExecutorFactory("B").newExecutor(1), false);
}
@Override
protected void run() {
for (;;) {
try {
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(delayNanos(System.nanoTime())));
} catch (InterruptedException e) {
// Waken up by interruptThread()
}
thread = Thread.currentThread();
runAllTasks();
if (interrupted) {
thread.interrupt();
}
if (confirmShutdown()) {
break;
}
try {
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(delayNanos(System.nanoTime())));
} catch (InterruptedException e) {
// Waken up by interruptThread()
}
runAllTasks();
if (confirmShutdown()) {
cleanupAndTerminate(true);
} else {
scheduleExecution();
}
}
@Override
protected void wakeup(boolean inEventLoop) {
interruptThread();
if (thread == null) {
interrupted = true;
} else {
thread.interrupt();
}
}
}
}

View File

@ -19,8 +19,8 @@ package io.netty.channel;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.DefaultExecutorFactory;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.Promise;
@ -90,24 +90,23 @@ public class ThreadPerChannelEventLoopGroupTest {
assertTrue(loopGroup.isTerminated());
}
private static class TestEventExecutor extends SingleThreadEventExecutor {
private static final class TestEventExecutor extends SingleThreadEventExecutor {
TestEventExecutor() {
super(null, new DefaultThreadFactory("test"), false);
super(null, new DefaultExecutorFactory(TestEventExecutor.class).newExecutor(1), false);
}
@Override
protected void run() {
for (;;) {
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}
if (confirmShutdown()) {
break;
}
if (confirmShutdown()) {
cleanupAndTerminate(true);
} else {
scheduleExecution();
}
}
}

View File

@ -190,29 +190,29 @@ public class LocalChannelTest {
final EventLoopGroup serverGroup = new DefaultEventLoopGroup(1);
final EventLoopGroup clientGroup = new DefaultEventLoopGroup(1) {
@Override
protected EventLoop newChild(Executor threadFactory, Object... args)
protected EventLoop newChild(Executor executor, Object... args)
throws Exception {
return new SingleThreadEventLoop(this, threadFactory, true) {
return new SingleThreadEventLoop(this, executor, true) {
@Override
protected void run() {
for (;;) {
Runnable task = takeTask();
if (task != null) {
/* Only slow down the anonymous class in LocalChannel#doRegister() */
if (task.getClass().getEnclosingClass() == LocalChannel.class) {
try {
closeLatch.await();
} catch (InterruptedException e) {
throw new Error(e);
}
Runnable task = takeTask();
if (task != null) {
/* Only slow down the anonymous class in LocalChannel#doRegister() */
if (task.getClass().getEnclosingClass() == LocalChannel.class) {
try {
closeLatch.await();
} catch (InterruptedException e) {
throw new Error(e);
}
task.run();
updateLastExecutionTime();
}
task.run();
updateLastExecutionTime();
}
if (confirmShutdown()) {
break;
}
if (confirmShutdown()) {
cleanupAndTerminate(true);
} else {
scheduleExecution();
}
}
};

View File

@ -27,18 +27,24 @@ import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.DefaultExecutorFactory;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.ExecutorFactory;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import java.util.HashSet;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class LocalTransportThreadModelTest {
@ -84,9 +90,9 @@ public class LocalTransportThreadModelTest {
@Test(timeout = 5000)
public void testStagedExecution() throws Throwable {
EventLoopGroup l = new DefaultEventLoopGroup(4, new DefaultThreadFactory("l"));
EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1"));
EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2"));
EventLoopGroup l = new DefaultEventLoopGroup(4, new DefaultExecutorFactory("l"));
EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultExecutorFactory("e1"));
EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultExecutorFactory("e2"));
ThreadNameAuditor h1 = new ThreadNameAuditor();
ThreadNameAuditor h2 = new ThreadNameAuditor();
ThreadNameAuditor h3 = new ThreadNameAuditor(true);
@ -173,25 +179,6 @@ public class LocalTransportThreadModelTest {
Assert.assertTrue(name.startsWith("e2-"));
}
// Assert that the events for the same handler were handled by the same thread.
Set<String> names = new HashSet<String>();
names.addAll(h1.inboundThreadNames);
names.addAll(h1.outboundThreadNames);
names.addAll(h1.removalThreadNames);
Assert.assertEquals(1, names.size());
names.clear();
names.addAll(h2.inboundThreadNames);
names.addAll(h2.outboundThreadNames);
names.addAll(h2.removalThreadNames);
Assert.assertEquals(1, names.size());
names.clear();
names.addAll(h3.inboundThreadNames);
names.addAll(h3.outboundThreadNames);
names.addAll(h3.removalThreadNames);
Assert.assertEquals(1, names.size());
// Count the number of events
Assert.assertEquals(1, h1.inboundThreadNames.size());
Assert.assertEquals(2, h2.inboundThreadNames.size());
@ -227,12 +214,12 @@ public class LocalTransportThreadModelTest {
@Test(timeout = 30000)
@Ignore
public void testConcurrentMessageBufferAccess() throws Throwable {
EventLoopGroup l = new DefaultEventLoopGroup(4, new DefaultThreadFactory("l"));
EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1"));
EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2"));
EventExecutorGroup e3 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e3"));
EventExecutorGroup e4 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e4"));
EventExecutorGroup e5 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e5"));
EventLoopGroup l0 = new DefaultEventLoopGroup(4, new DefaultExecutorFactory("l0"));
EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultExecutorFactory("e1"));
EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultExecutorFactory("e2"));
EventExecutorGroup e3 = new DefaultEventExecutorGroup(4, new DefaultExecutorFactory("e3"));
EventExecutorGroup e4 = new DefaultEventExecutorGroup(4, new DefaultExecutorFactory("e4"));
EventExecutorGroup e5 = new DefaultEventExecutorGroup(4, new DefaultExecutorFactory("e5"));
try {
final MessageForwarder1 h1 = new MessageForwarder1();
@ -253,7 +240,7 @@ public class LocalTransportThreadModelTest {
.addLast(e4, h5)
.addLast(e5, h6);
l.register(ch).sync().channel().connect(localAddr).sync();
l0.register(ch).sync().channel().connect(localAddr).sync();
final int ROUNDS = 1024;
final int ELEMS_PER_ROUNDS = 8192;
@ -337,14 +324,14 @@ public class LocalTransportThreadModelTest {
ch.close().sync();
} finally {
l.shutdownGracefully();
l0.shutdownGracefully();
e1.shutdownGracefully();
e2.shutdownGracefully();
e3.shutdownGracefully();
e4.shutdownGracefully();
e5.shutdownGracefully();
l.terminationFuture().sync();
l0.terminationFuture().sync();
e1.terminationFuture().sync();
e2.terminationFuture().sync();
e3.terminationFuture().sync();
@ -414,7 +401,7 @@ public class LocalTransportThreadModelTest {
if (t == null) {
this.t = Thread.currentThread();
} else {
Assert.assertSame(t, Thread.currentThread());
assertSameExecutor(t, Thread.currentThread());
}
ByteBuf out = ctx.alloc().buffer(4);
@ -428,7 +415,7 @@ public class LocalTransportThreadModelTest {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
Assert.assertSame(t, Thread.currentThread());
assertSameExecutor(t, Thread.currentThread());
// Don't let the write request go to the server-side channel - just swallow.
boolean swallow = this == ctx.pipeline().first();
@ -472,7 +459,7 @@ public class LocalTransportThreadModelTest {
if (t == null) {
this.t = Thread.currentThread();
} else {
Assert.assertSame(t, Thread.currentThread());
assertSameExecutor(t, Thread.currentThread());
}
ByteBuf m = (ByteBuf) msg;
@ -488,7 +475,7 @@ public class LocalTransportThreadModelTest {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
Assert.assertSame(t, Thread.currentThread());
assertSameExecutor(t, Thread.currentThread());
ByteBuf out = ctx.alloc().buffer(4);
int m = (Integer) msg;
@ -524,7 +511,7 @@ public class LocalTransportThreadModelTest {
if (t == null) {
this.t = Thread.currentThread();
} else {
Assert.assertSame(t, Thread.currentThread());
assertSameExecutor(t, Thread.currentThread());
}
int actual = (Integer) msg;
@ -536,7 +523,7 @@ public class LocalTransportThreadModelTest {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
Assert.assertSame(t, Thread.currentThread());
assertSameExecutor(t, Thread.currentThread());
int actual = (Integer) msg;
int expected = outCnt ++;
@ -570,7 +557,7 @@ public class LocalTransportThreadModelTest {
if (t == null) {
this.t = Thread.currentThread();
} else {
Assert.assertSame(t, Thread.currentThread());
assertSameExecutor(t, Thread.currentThread());
}
int actual = (Integer) msg;
@ -581,7 +568,7 @@ public class LocalTransportThreadModelTest {
@Override
public void write(
ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
Assert.assertSame(t, Thread.currentThread());
assertSameExecutor(t, Thread.currentThread());
int actual = (Integer) msg;
int expected = outCnt ++;
@ -597,4 +584,8 @@ public class LocalTransportThreadModelTest {
super.exceptionCaught(ctx, cause);
}
}
private static void assertSameExecutor(Thread expected, Thread actual) {
Assert.assertEquals(expected.getName().substring(0, 2), actual.getName().substring(0, 2));
}
}

View File

@ -26,7 +26,7 @@ import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.DefaultExecutorFactory;
import io.netty.util.concurrent.EventExecutorGroup;
import org.junit.AfterClass;
import org.junit.Assert;
@ -116,12 +116,12 @@ public class LocalTransportThreadModelTest3 {
}
private static void testConcurrentAddRemove(boolean inbound) throws Exception {
EventLoopGroup l = new DefaultEventLoopGroup(4, new DefaultThreadFactory("l"));
EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1"));
EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2"));
EventExecutorGroup e3 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e3"));
EventExecutorGroup e4 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e4"));
EventExecutorGroup e5 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e5"));
EventLoopGroup l = new DefaultEventLoopGroup(4, new DefaultExecutorFactory("l"));
EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultExecutorFactory("e1"));
EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultExecutorFactory("e2"));
EventExecutorGroup e3 = new DefaultEventExecutorGroup(4, new DefaultExecutorFactory("e3"));
EventExecutorGroup e4 = new DefaultEventExecutorGroup(4, new DefaultExecutorFactory("e4"));
EventExecutorGroup e5 = new DefaultEventExecutorGroup(4, new DefaultExecutorFactory("e5"));
final EventExecutorGroup[] groups = { e1, e2, e3, e4, e5 };
try {