diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/DefaultEventExecutor.java index 2470ec66bf..7200fa9098 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultEventExecutor.java @@ -16,7 +16,6 @@ package io.netty.util.concurrent; import java.util.concurrent.Executor; -import java.util.concurrent.ThreadFactory; /** * Default {@link SingleThreadEventExecutor} implementation which just execute all submitted task in a @@ -28,20 +27,12 @@ public final class DefaultEventExecutor extends SingleThreadEventExecutor { this((EventExecutorGroup) null); } - public DefaultEventExecutor(ThreadFactory threadFactory) { - this(null, threadFactory); - } - public DefaultEventExecutor(Executor executor) { this(null, executor); } public DefaultEventExecutor(EventExecutorGroup parent) { - this(parent, new DefaultThreadFactory(DefaultEventExecutor.class)); - } - - public DefaultEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory) { - super(parent, threadFactory, true); + this(parent, new DefaultExecutorFactory(DefaultEventExecutor.class).newExecutor(1)); } public DefaultEventExecutor(EventExecutorGroup parent, Executor executor) { @@ -50,16 +41,16 @@ public final class DefaultEventExecutor extends SingleThreadEventExecutor { @Override protected void run() { - for (;;) { - Runnable task = takeTask(); - if (task != null) { - task.run(); - updateLastExecutionTime(); - } + Runnable task = takeTask(); + if (task != null) { + task.run(); + updateLastExecutionTime(); + } - if (confirmShutdown()) { - break; - } + if (confirmShutdown()) { + cleanupAndTerminate(true); + } else { + scheduleExecution(); } } } diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultEventExecutorGroup.java b/common/src/main/java/io/netty/util/concurrent/DefaultEventExecutorGroup.java index 5343be1652..09fead9298 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultEventExecutorGroup.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultEventExecutorGroup.java @@ -16,29 +16,42 @@ package io.netty.util.concurrent; import java.util.concurrent.Executor; -import java.util.concurrent.ThreadFactory; /** - * Default implementation of {@link MultithreadEventExecutorGroup} which will use {@link DefaultEventExecutor} instances - * to handle the tasks. + * Default implementation of {@link MultithreadEventExecutorGroup} which will use {@link DefaultEventExecutor} + * instances to handle the tasks. */ public class DefaultEventExecutorGroup extends MultithreadEventExecutorGroup { /** - * @see {@link #DefaultEventExecutorGroup(int, ThreadFactory)} + * Create a new instance. + * + * @param nEventExecutors the number of {@link DefaultEventExecutor}s that this group will use. */ - public DefaultEventExecutorGroup(int nThreads) { - this(nThreads, null); + public DefaultEventExecutorGroup(int nEventExecutors) { + this(nEventExecutors, (Executor) null); } /** * Create a new instance. * - * @param nThreads the number of threads that will be used by this instance. - * @param threadFactory the ThreadFactory to use, or {@code null} if the default should be used. + * @param nEventExecutors the number of {@link DefaultEventExecutor}s that this group will use. + * @param executor the {@link Executor} responsible for executing the work handled by + * this {@link EventExecutorGroup}. */ - public DefaultEventExecutorGroup(int nThreads, ThreadFactory threadFactory) { - super(nThreads, threadFactory); + public DefaultEventExecutorGroup(int nEventExecutors, Executor executor) { + super(nEventExecutors, executor); + } + + /** + * Create a new instance. + * + * @param nEventExecutors the number of {@link DefaultEventExecutor}s that this group will use. + * @param executorFactory the {@link ExecutorFactory} which produces the {@link Executor} responsible for + * executing the work handled by this {@link EventExecutorGroup}. + */ + public DefaultEventExecutorGroup(int nEventExecutors, ExecutorFactory executorFactory) { + super(nEventExecutors, executorFactory); } @Override diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultExecutorFactory.java b/common/src/main/java/io/netty/util/concurrent/DefaultExecutorFactory.java new file mode 100644 index 0000000000..947f0cba76 --- /dev/null +++ b/common/src/main/java/io/netty/util/concurrent/DefaultExecutorFactory.java @@ -0,0 +1,147 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.util.concurrent; + +import io.netty.util.internal.InternalThreadLocalMap; +import io.netty.util.internal.StringUtil; +import io.netty.util.internal.chmv8.ForkJoinPool; +import io.netty.util.internal.chmv8.ForkJoinPool.ForkJoinWorkerThreadFactory; +import io.netty.util.internal.chmv8.ForkJoinWorkerThread; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.Locale; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * An implementation of an {@link ExecutorFactory} that creates a new {@link ForkJoinPool} on each + * call to {@link #newExecutor(int)}. + *

+ * This {@link ExecutorFactory} powers Netty's nio and epoll eventloops by default. Netty moved from managing its + * own threads and pinning a thread to each eventloop to an {@link Executor}-based approach. That way advanced + * users of Netty can plug in their own threadpools and gain more control of scheduling the eventloops. + *

+ * The main reason behind choosing a {@link ForkJoinPool} as the default {@link Executor} is that it uses + * thread-local task queues, providing a high level of thread affinity to Netty's eventloops. + *

+ * The whole discussion can be found on GitHub + * https://github.com/netty/netty/issues/2250. + */ +public final class DefaultExecutorFactory implements ExecutorFactory { + + private static final InternalLogger logger = + InternalLoggerFactory.getInstance(DefaultExecutorFactory.class); + + private static final AtomicInteger executorId = new AtomicInteger(); + private String namePrefix; + + /** + * @param clazzNamePrefix the name of the class will be used to prefix the name of each + * {@link ForkJoinWorkerThread} with. + */ + public DefaultExecutorFactory(Class clazzNamePrefix) { + this(toName(clazzNamePrefix)); + } + + /** + * @param namePrefix the string to prefix the name of each {@link ForkJoinWorkerThread} with. + */ + public DefaultExecutorFactory(String namePrefix) { + this.namePrefix = namePrefix; + } + + @Override + public Executor newExecutor(int parallelism) { + ForkJoinWorkerThreadFactory threadFactory = + new DefaultForkJoinWorkerThreadFactory(namePrefix + "-" + executorId.getAndIncrement()); + + return new ForkJoinPool(parallelism, threadFactory, DefaultUncaughtExceptionHandler.INSTANCE, true); + } + + private static String toName(Class clazz) { + if (clazz == null) { + throw new NullPointerException("clazz"); + } + + String clazzName = StringUtil.simpleClassName(clazz); + switch (clazzName.length()) { + case 0: + return "unknown"; + case 1: + return clazzName.toLowerCase(Locale.US); + default: + if (Character.isUpperCase(clazzName.charAt(0)) && Character.isLowerCase(clazzName.charAt(1))) { + return Character.toLowerCase(clazzName.charAt(0)) + clazzName.substring(1); + } else { + return clazzName; + } + } + } + + private static final class DefaultUncaughtExceptionHandler implements UncaughtExceptionHandler { + + private static final DefaultUncaughtExceptionHandler INSTANCE = new DefaultUncaughtExceptionHandler(); + + @Override + public void uncaughtException(Thread t, Throwable e) { + if (logger.isErrorEnabled()) { + logger.error("Uncaught exception in thread: {}", t.getName(), e); + } + } + } + + private static final class DefaultForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { + + private final AtomicInteger idx = new AtomicInteger(); + private final String namePrefix; + + DefaultForkJoinWorkerThreadFactory(String namePrefix) { + this.namePrefix = namePrefix; + } + + @Override + public ForkJoinWorkerThread newThread(ForkJoinPool pool) { + // Note: The ForkJoinPool will create these threads as daemon threads. + ForkJoinWorkerThread thread = new DefaultForkJoinWorkerThread(pool); + thread.setName(namePrefix + "-" + idx.getAndIncrement()); + thread.setPriority(Thread.MAX_PRIORITY); + return thread; + } + } + + private static final class DefaultForkJoinWorkerThread + extends ForkJoinWorkerThread implements FastThreadLocalAccess { + + private InternalThreadLocalMap threadLocalMap; + + DefaultForkJoinWorkerThread(ForkJoinPool pool) { + super(pool); + } + + @Override + public InternalThreadLocalMap threadLocalMap() { + return threadLocalMap; + } + + @Override + public void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) { + this.threadLocalMap = threadLocalMap; + } + } +} diff --git a/common/src/main/java/io/netty/util/concurrent/ExecutorFactory.java b/common/src/main/java/io/netty/util/concurrent/ExecutorFactory.java new file mode 100644 index 0000000000..6112c6703f --- /dev/null +++ b/common/src/main/java/io/netty/util/concurrent/ExecutorFactory.java @@ -0,0 +1,27 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.util.concurrent; + +import java.util.concurrent.Executor; + +/** + * An object that creates new {@link Executor}s on demand. Using executor factories mainly + * simplifies providing custom executor implementations to Netty's event loops. + */ +public interface ExecutorFactory { + Executor newExecutor(int parallelism); +} diff --git a/common/src/main/java/io/netty/util/concurrent/FastThreadLocal.java b/common/src/main/java/io/netty/util/concurrent/FastThreadLocal.java index df943b4997..0e9213c73f 100644 --- a/common/src/main/java/io/netty/util/concurrent/FastThreadLocal.java +++ b/common/src/main/java/io/netty/util/concurrent/FastThreadLocal.java @@ -23,18 +23,19 @@ import java.util.IdentityHashMap; import java.util.Set; /** - * A special variant of {@link ThreadLocal} that yields higher access performan when accessed from a - * {@link FastThreadLocalThread}. + * A special variant of {@link ThreadLocal} that yields to higher access performance when accessed from a + * {@link FastThreadLocalAccess} {@link Thread}. *

* Internally, a {@link FastThreadLocal} uses a constant index in an array, instead of using hash code and hash table, * to look for a variable. Although seemingly very subtle, it yields slight performance advantage over using a hash * table, and it is useful when accessed frequently. *

- * To take advantage of this thread-local variable, your thread must be a {@link FastThreadLocalThread} or its subtype. - * By default, all threads created by {@link DefaultThreadFactory} are {@link FastThreadLocalThread} due to this reason. + * To take advantage of this thread-local variable, your thread must implement {@link FastThreadLocalAccess}. + * By default, all threads created by {@link DefaultThreadFactory} and {@link DefaultExecutorFactory} implement + * {@link FastThreadLocalAccess}. *

- * Note that the fast path is only possible on threads that extend {@link FastThreadLocalThread}, because it requires - * a special field to store the necessary state. An access by any other kind of thread falls back to a regular + * Note that the fast path is only possible on threads that implement {@link FastThreadLocalAccess}, because it + * requires a special field to store the necessary state. An access by any other kind of thread falls back to a regular * {@link ThreadLocal}. *

* diff --git a/common/src/main/java/io/netty/util/concurrent/FastThreadLocalAccess.java b/common/src/main/java/io/netty/util/concurrent/FastThreadLocalAccess.java new file mode 100644 index 0000000000..04c2efb15f --- /dev/null +++ b/common/src/main/java/io/netty/util/concurrent/FastThreadLocalAccess.java @@ -0,0 +1,39 @@ +/* +* Copyright 2014 The Netty Project +* +* The Netty Project licenses this file to you under the Apache License, +* version 2.0 (the "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at: +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations +* under the License. +*/ +package io.netty.util.concurrent; + +import io.netty.util.internal.InternalThreadLocalMap; + +/** + * Netty's {@link Thread} implementations implement this interface to provide fast access to {@link FastThreadLocal} + * variables. + * + * @see FastThreadLocalThread + */ +public interface FastThreadLocalAccess { + + /** + * Returns the internal data structure that keeps the thread-local variables bound to this thread. + * Note that this method is for internal use only, and thus is subject to change at any time. + */ + InternalThreadLocalMap threadLocalMap(); + + /** + * Sets the internal data structure that keeps the thread-local variables bound to this thread. + * Note that this method is for internal use only, and thus is subject to change at any time. + */ + void setThreadLocalMap(InternalThreadLocalMap threadLocalMap); +} diff --git a/common/src/main/java/io/netty/util/concurrent/FastThreadLocalThread.java b/common/src/main/java/io/netty/util/concurrent/FastThreadLocalThread.java index bd3e1c4441..0866b311a1 100644 --- a/common/src/main/java/io/netty/util/concurrent/FastThreadLocalThread.java +++ b/common/src/main/java/io/netty/util/concurrent/FastThreadLocalThread.java @@ -20,7 +20,7 @@ import io.netty.util.internal.InternalThreadLocalMap; /** * A special {@link Thread} that provides fast access to {@link FastThreadLocal} variables. */ -public class FastThreadLocalThread extends Thread { +public class FastThreadLocalThread extends Thread implements FastThreadLocalAccess { private InternalThreadLocalMap threadLocalMap; @@ -58,6 +58,7 @@ public class FastThreadLocalThread extends Thread { * Returns the internal data structure that keeps the thread-local variables bound to this thread. * Note that this method is for internal use only, and thus is subject to change at any time. */ + @Override public final InternalThreadLocalMap threadLocalMap() { return threadLocalMap; } @@ -66,6 +67,7 @@ public class FastThreadLocalThread extends Thread { * Sets the internal data structure that keeps the thread-local variables bound to this thread. * Note that this method is for internal use only, and thus is subject to change at any time. */ + @Override public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) { this.threadLocalMap = threadLocalMap; } diff --git a/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java b/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java index 1b7a0303bb..4c471650e4 100644 --- a/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java +++ b/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java @@ -19,7 +19,6 @@ import java.util.Collections; import java.util.LinkedHashSet; import java.util.Set; import java.util.concurrent.Executor; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -37,40 +36,45 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto private final EventExecutorChooser chooser; /** - * Create a new instance. - * - * @param nThreads the number of threads that will be used by this instance. - * @param threadFactory the ThreadFactory to use, or {@code null} if the default should be used. - * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call + * @param nEventExecutors the number of {@link EventExecutor}s that will be used by this instance. + * If {@code executor} is {@code null} this number will also be the parallelism + * requested from the default executor. It is generally advised for the number + * of {@link EventExecutor}s and the number of {@link Thread}s used by the + * {@code executor} to lie very close together. + * @param executorFactory the {@link ExecutorFactory} to use, or {@code null} if the default should be used. + * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call */ - protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) { - this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args); + protected MultithreadEventExecutorGroup(int nEventExecutors, ExecutorFactory executorFactory, Object... args) { + this(nEventExecutors, executorFactory == null ? null : executorFactory.newExecutor(nEventExecutors), args); } /** - * Create a new instance. - * - * @param nThreads the number of threads that will be used by this instance. - * @param executor the Executor to use, or {@code null} if the default should be used. - * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call + * @param nEventExecutors the number of {@link EventExecutor}s that will be used by this instance. + * If {@code executor} is {@code null} this number will also be the parallelism + * requested from the default executor. It is generally advised for the number + * of {@link EventExecutor}s and the number of {@link Thread}s used by the + * {@code executor} to lie very close together. + * @param executor the {@link Executor} to use, or {@code null} if the default should be used. + * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call */ - protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { - if (nThreads <= 0) { - throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); + protected MultithreadEventExecutorGroup(int nEventExecutors, Executor executor, Object... args) { + if (nEventExecutors <= 0) { + throw new IllegalArgumentException( + String.format("nEventExecutors: %d (expected: > 0)", nEventExecutors)); } if (executor == null) { - executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); + executor = newDefaultExecutor(nEventExecutors); } - children = new EventExecutor[nThreads]; + children = new EventExecutor[nEventExecutors]; if (isPowerOfTwo(children.length)) { chooser = new PowerOfTwoEventExecutorChooser(); } else { chooser = new GenericEventExecutorChooser(); } - for (int i = 0; i < nThreads; i ++) { + for (int i = 0; i < nEventExecutors; i ++) { boolean success = false; try { children[i] = newChild(executor, args); @@ -118,8 +122,8 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto readonlyChildren = Collections.unmodifiableSet(childrenSet); } - protected ThreadFactory newDefaultThreadFactory() { - return new DefaultThreadFactory(getClass()); + protected Executor newDefaultExecutor(int nEventExecutors) { + return new DefaultExecutorFactory(getClass()).newExecutor(nEventExecutors); } @Override diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index c5105786c8..c88a395c74 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -33,7 +33,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -61,6 +60,8 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { private static final AtomicIntegerFieldUpdater STATE_UPDATER; + private static final long threadOffset; + static { AtomicIntegerFieldUpdater updater = PlatformDependent.newAtomicIntegerFieldUpdater(SingleThreadEventExecutor.class, "state"); @@ -68,14 +69,21 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { updater = AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state"); } STATE_UPDATER = updater; + + try { + threadOffset = + PlatformDependent.objectFieldOffset(SingleThreadEventExecutor.class.getDeclaredField("thread")); + } catch (NoSuchFieldException e) { + throw new RuntimeException(); + } } private final Queue taskQueue; final Queue> delayedTaskQueue = new PriorityQueue>(); + @SuppressWarnings({ "FieldMayBeFinal", "unused" }) private volatile Thread thread; private final Executor executor; - private volatile boolean interrupted; private final Semaphore threadLock = new Semaphore(0); private final Set shutdownHooks = new LinkedHashSet(); private final boolean addTaskWakesUp; @@ -91,26 +99,36 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { private final Promise terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE); - /** - * Create a new instance - * - * @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it - * @param threadFactory the {@link ThreadFactory} which will be used for the used {@link Thread} - * @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the - * executor thread - */ - protected SingleThreadEventExecutor( - EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) { - this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp); - } + private boolean firstRun = true; + + private final Runnable AS_RUNNABLE = new Runnable() { + @Override + public void run() { + updateThread(Thread.currentThread()); + + // lastExecutionTime must be set on the first run + // in order for shutdown to work correctly for the + // rare case that the eventloop did not execute + // a single task during its lifetime. + if (firstRun) { + firstRun = false; + updateLastExecutionTime(); + } + + try { + SingleThreadEventExecutor.this.run(); + } catch (Throwable t) { + logger.warn("Unexpected exception from an event executor: ", t); + cleanupAndTerminate(false); + } + } + }; /** - * Create a new instance - * - * @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it - * @param executor the {@link Executor} which will be used for executing - * @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the - * executor thread + * @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it. + * @param executor the {@link Executor} which will be used for executing. + * @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up + * the executor thread. */ protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) { super(parent); @@ -134,18 +152,6 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { return new LinkedBlockingQueue(); } - /** - * Interrupt the current running {@link Thread}. - */ - protected void interruptThread() { - Thread currentThread = thread; - if (currentThread == null) { - interrupted = true; - } else { - currentThread.interrupt(); - } - } - /** * @see {@link Queue#poll()} */ @@ -517,7 +523,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { gracefulShutdownTimeout = unit.toNanos(timeout); if (oldState == ST_NOT_STARTED) { - doStartThread(); + scheduleExecution(); } if (wakeup) { @@ -569,7 +575,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } if (oldState == ST_NOT_STARTED) { - doStartThread(); + scheduleExecution(); } if (wakeup) { @@ -687,7 +693,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { if (inEventLoop) { addTask(task); } else { - startThread(); + startExecution(); addTask(task); if (isShutdown() && removeTask(task)) { reject(); @@ -807,75 +813,65 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { return task; } - private void startThread() { + protected void cleanupAndTerminate(boolean success) { + for (;;) { + int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this); + if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet( + SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) { + break; + } + } + + // Check if confirmShutdown() was called at the end of the loop. + if (success && gracefulShutdownStartTime == 0) { + logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " + + "before run() implementation terminates."); + } + + try { + // Run all remaining tasks and shutdown hooks. + for (;;) { + if (confirmShutdown()) { + break; + } + } + } finally { + try { + cleanup(); + } finally { + STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); + threadLock.release(); + if (!taskQueue.isEmpty()) { + logger.warn( + "An event executor terminated with " + + "non-empty task queue (" + taskQueue.size() + ')'); + } + + firstRun = true; + terminationFuture.setSuccess(null); + } + } + } + + private void startExecution() { if (STATE_UPDATER.get(this) == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { delayedTaskQueue.add(new ScheduledFutureTask( this, delayedTaskQueue, Executors.callable(new PurgeTask(), null), ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL)); - doStartThread(); + scheduleExecution(); } } } - private void doStartThread() { - assert thread == null; - executor.execute(new Runnable() { - @Override - public void run() { - thread = Thread.currentThread(); - if (interrupted) { - thread.interrupt(); - } + protected void scheduleExecution() { + updateThread(null); + executor.execute(AS_RUNNABLE); + } - boolean success = false; - updateLastExecutionTime(); - try { - SingleThreadEventExecutor.this.run(); - success = true; - } catch (Throwable t) { - logger.warn("Unexpected exception from an event executor: ", t); - } finally { - for (;;) { - int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this); - if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet( - SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) { - break; - } - } - - // Check if confirmShutdown() was called at the end of the loop. - if (success && gracefulShutdownStartTime == 0) { - logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + - SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " + - "before run() implementation terminates."); - } - - try { - // Run all remaining tasks and shutdown hooks. - for (;;) { - if (confirmShutdown()) { - break; - } - } - } finally { - try { - cleanup(); - } finally { - STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); - threadLock.release(); - if (!taskQueue.isEmpty()) { - logger.warn( - "An event executor terminated with " + - "non-empty task queue (" + taskQueue.size() + ')'); - } - - terminationFuture.setSuccess(null); - } - } - } - } - }); + private void updateThread(Thread t) { + PlatformDependent.putOrderedObject(this, threadOffset, t); } private final class PurgeTask implements Runnable { diff --git a/common/src/main/java/io/netty/util/internal/InternalThreadLocalMap.java b/common/src/main/java/io/netty/util/internal/InternalThreadLocalMap.java index 64f3c77ff1..652f7fde5d 100644 --- a/common/src/main/java/io/netty/util/internal/InternalThreadLocalMap.java +++ b/common/src/main/java/io/netty/util/internal/InternalThreadLocalMap.java @@ -17,7 +17,7 @@ package io.netty.util.internal; import io.netty.util.concurrent.FastThreadLocal; -import io.netty.util.concurrent.FastThreadLocalThread; +import io.netty.util.concurrent.FastThreadLocalAccess; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; @@ -39,8 +39,8 @@ public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap public static InternalThreadLocalMap getIfSet() { Thread thread = Thread.currentThread(); InternalThreadLocalMap threadLocalMap; - if (thread instanceof FastThreadLocalThread) { - threadLocalMap = ((FastThreadLocalThread) thread).threadLocalMap(); + if (thread instanceof FastThreadLocalAccess) { + threadLocalMap = ((FastThreadLocalAccess) thread).threadLocalMap(); } else { ThreadLocal slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap; if (slowThreadLocalMap == null) { @@ -54,14 +54,14 @@ public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap public static InternalThreadLocalMap get() { Thread thread = Thread.currentThread(); - if (thread instanceof FastThreadLocalThread) { - return fastGet((FastThreadLocalThread) thread); + if (thread instanceof FastThreadLocalAccess) { + return fastGet((FastThreadLocalAccess) thread); } else { return slowGet(); } } - private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) { + private static InternalThreadLocalMap fastGet(FastThreadLocalAccess thread) { InternalThreadLocalMap threadLocalMap = thread.threadLocalMap(); if (threadLocalMap == null) { thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap()); @@ -86,8 +86,8 @@ public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap public static void remove() { Thread thread = Thread.currentThread(); - if (thread instanceof FastThreadLocalThread) { - ((FastThreadLocalThread) thread).setThreadLocalMap(null); + if (thread instanceof FastThreadLocalAccess) { + ((FastThreadLocalAccess) thread).setThreadLocalMap(null); } else { ThreadLocal slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap; if (slowThreadLocalMap != null) { diff --git a/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java b/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java index 26f304b09f..618103044a 100644 --- a/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java +++ b/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java @@ -20,7 +20,6 @@ import org.junit.Test; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -178,21 +177,21 @@ public class DefaultPromiseTest { private static final class TestEventExecutor extends SingleThreadEventExecutor { TestEventExecutor() { - super(null, Executors.defaultThreadFactory(), true); + super(null, new DefaultExecutorFactory(TestEventExecutor.class).newExecutor(1), true); } @Override protected void run() { - for (;;) { - Runnable task = takeTask(); - if (task != null) { - task.run(); - updateLastExecutionTime(); - } + Runnable task = takeTask(); + if (task != null) { + task.run(); + updateLastExecutionTime(); + } - if (confirmShutdown()) { - break; - } + if (confirmShutdown()) { + cleanupAndTerminate(true); + } else { + scheduleExecution(); } } } diff --git a/common/src/test/java/io/netty/util/concurrent/FastThreadLocalTest.java b/common/src/test/java/io/netty/util/concurrent/FastThreadLocalTest.java index 2a758781f0..126a45ce5f 100644 --- a/common/src/test/java/io/netty/util/concurrent/FastThreadLocalTest.java +++ b/common/src/test/java/io/netty/util/concurrent/FastThreadLocalTest.java @@ -19,6 +19,8 @@ package io.netty.util.concurrent; import org.junit.Before; import org.junit.Test; +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -75,4 +77,34 @@ public class FastThreadLocalTest { throw t; } } + + /** + * Make sure threads created by the {@link DefaultExecutorFactory} and {@link DefaultThreadFactory} + * implement the {@link FastThreadLocalAccess} interface. + */ + @Test + public void testIsFastThreadLocalThread() { + ExecutorFactory executorFactory = new DefaultExecutorFactory(FastThreadLocalTest.class); + int parallelism = Runtime.getRuntime().availableProcessors() * 2; + + Executor executor = executorFactory.newExecutor(parallelism); + // submit a "high" number of tasks, to get a good chance to touch every thread. + for (int i = 0; i < parallelism * 100; i++) { + executor.execute(new Runnable() { + @Override + public void run() { + assertTrue(Thread.currentThread() instanceof FastThreadLocalAccess); + } + }); + } + + ThreadFactory threadFactory = new DefaultThreadFactory(FastThreadLocalTest.class); + Thread t = threadFactory.newThread(new Runnable() { + @Override + public void run() { + } + }); + + assertTrue(t instanceof FastThreadLocalAccess); + } } diff --git a/example/src/main/java/io/netty/example/udt/echo/bytes/ByteEchoClient.java b/example/src/main/java/io/netty/example/udt/echo/bytes/ByteEchoClient.java index aa95a3544e..5651806832 100644 --- a/example/src/main/java/io/netty/example/udt/echo/bytes/ByteEchoClient.java +++ b/example/src/main/java/io/netty/example/udt/echo/bytes/ByteEchoClient.java @@ -23,9 +23,8 @@ import io.netty.channel.udt.UdtChannel; import io.netty.channel.udt.nio.NioUdtProvider; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; -import io.netty.util.concurrent.DefaultThreadFactory; - -import java.util.concurrent.ThreadFactory; +import io.netty.util.concurrent.DefaultExecutorFactory; +import io.netty.util.concurrent.ExecutorFactory; /** * UDT Byte Stream Client @@ -43,9 +42,10 @@ public final class ByteEchoClient { public static void main(String[] args) throws Exception { // Configure the client. - final ThreadFactory connectFactory = new DefaultThreadFactory("connect"); - final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1, - connectFactory, NioUdtProvider.BYTE_PROVIDER); + final ExecutorFactory connectFactory = new DefaultExecutorFactory("connect"); + final NioEventLoopGroup connectGroup = + new NioEventLoopGroup(1, connectFactory, NioUdtProvider.BYTE_PROVIDER); + try { final Bootstrap boot = new Bootstrap(); boot.group(connectGroup) diff --git a/example/src/main/java/io/netty/example/udt/echo/bytes/ByteEchoServer.java b/example/src/main/java/io/netty/example/udt/echo/bytes/ByteEchoServer.java index e5c43224f4..3723a8b1d2 100644 --- a/example/src/main/java/io/netty/example/udt/echo/bytes/ByteEchoServer.java +++ b/example/src/main/java/io/netty/example/udt/echo/bytes/ByteEchoServer.java @@ -24,9 +24,8 @@ import io.netty.channel.udt.UdtChannel; import io.netty.channel.udt.nio.NioUdtProvider; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; -import io.netty.util.concurrent.DefaultThreadFactory; - -import java.util.concurrent.ThreadFactory; +import io.netty.util.concurrent.DefaultExecutorFactory; +import io.netty.util.concurrent.ExecutorFactory; /** * UDT Byte Stream Server @@ -38,10 +37,10 @@ public final class ByteEchoServer { static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { - final ThreadFactory acceptFactory = new DefaultThreadFactory("accept"); - final ThreadFactory connectFactory = new DefaultThreadFactory("connect"); - final NioEventLoopGroup acceptGroup = new NioEventLoopGroup(1, acceptFactory, NioUdtProvider.BYTE_PROVIDER); - final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1, connectFactory, NioUdtProvider.BYTE_PROVIDER); + ExecutorFactory acceptFactory = new DefaultExecutorFactory("accept"); + ExecutorFactory connectFactory = new DefaultExecutorFactory("connect"); + NioEventLoopGroup acceptGroup = new NioEventLoopGroup(1, acceptFactory, NioUdtProvider.BYTE_PROVIDER); + NioEventLoopGroup connectGroup = new NioEventLoopGroup(1, connectFactory, NioUdtProvider.BYTE_PROVIDER); // Configure the server. try { diff --git a/example/src/main/java/io/netty/example/udt/echo/message/MsgEchoClient.java b/example/src/main/java/io/netty/example/udt/echo/message/MsgEchoClient.java index 349f11ce86..0549f53292 100644 --- a/example/src/main/java/io/netty/example/udt/echo/message/MsgEchoClient.java +++ b/example/src/main/java/io/netty/example/udt/echo/message/MsgEchoClient.java @@ -23,9 +23,9 @@ import io.netty.channel.udt.UdtChannel; import io.netty.channel.udt.nio.NioUdtProvider; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; -import io.netty.util.concurrent.DefaultThreadFactory; +import io.netty.util.concurrent.DefaultExecutorFactory; +import io.netty.util.concurrent.ExecutorFactory; -import java.util.concurrent.ThreadFactory; import java.util.logging.Logger; /** @@ -45,11 +45,11 @@ public final class MsgEchoClient { static final int SIZE = Integer.parseInt(System.getProperty("size", "256")); public static void main(String[] args) throws Exception { - // Configure the client. - final ThreadFactory connectFactory = new DefaultThreadFactory("connect"); - final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1, - connectFactory, NioUdtProvider.MESSAGE_PROVIDER); + final ExecutorFactory connectFactory = new DefaultExecutorFactory("connect"); + final NioEventLoopGroup connectGroup = + new NioEventLoopGroup(1, connectFactory, NioUdtProvider.MESSAGE_PROVIDER); + try { final Bootstrap boot = new Bootstrap(); boot.group(connectGroup) diff --git a/example/src/main/java/io/netty/example/udt/echo/message/MsgEchoServer.java b/example/src/main/java/io/netty/example/udt/echo/message/MsgEchoServer.java index 6a18a291f9..52caa91013 100644 --- a/example/src/main/java/io/netty/example/udt/echo/message/MsgEchoServer.java +++ b/example/src/main/java/io/netty/example/udt/echo/message/MsgEchoServer.java @@ -24,9 +24,8 @@ import io.netty.channel.udt.UdtChannel; import io.netty.channel.udt.nio.NioUdtProvider; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; -import io.netty.util.concurrent.DefaultThreadFactory; - -import java.util.concurrent.ThreadFactory; +import io.netty.util.concurrent.DefaultExecutorFactory; +import io.netty.util.concurrent.ExecutorFactory; /** * UDT Message Flow Server @@ -38,8 +37,8 @@ public final class MsgEchoServer { static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { - final ThreadFactory acceptFactory = new DefaultThreadFactory("accept"); - final ThreadFactory connectFactory = new DefaultThreadFactory("connect"); + final ExecutorFactory acceptFactory = new DefaultExecutorFactory("accept"); + final ExecutorFactory connectFactory = new DefaultExecutorFactory("connect"); final NioEventLoopGroup acceptGroup = new NioEventLoopGroup(1, acceptFactory, NioUdtProvider.MESSAGE_PROVIDER); final NioEventLoopGroup connectGroup = diff --git a/example/src/main/java/io/netty/example/udt/echo/rendezvous/MsgEchoPeerBase.java b/example/src/main/java/io/netty/example/udt/echo/rendezvous/MsgEchoPeerBase.java index 2efe568b2b..8f0c33743b 100644 --- a/example/src/main/java/io/netty/example/udt/echo/rendezvous/MsgEchoPeerBase.java +++ b/example/src/main/java/io/netty/example/udt/echo/rendezvous/MsgEchoPeerBase.java @@ -23,10 +23,10 @@ import io.netty.channel.udt.UdtChannel; import io.netty.channel.udt.nio.NioUdtProvider; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; -import io.netty.util.concurrent.DefaultThreadFactory; +import io.netty.util.concurrent.DefaultExecutorFactory; +import io.netty.util.concurrent.ExecutorFactory; import java.net.InetSocketAddress; -import java.util.concurrent.ThreadFactory; /** * UDT Message Flow Peer @@ -48,9 +48,10 @@ public abstract class MsgEchoPeerBase { public void run() throws Exception { // Configure the peer. - final ThreadFactory connectFactory = new DefaultThreadFactory("rendezvous"); - final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1, - connectFactory, NioUdtProvider.MESSAGE_PROVIDER); + final ExecutorFactory connectFactory = new DefaultExecutorFactory("rendezvous"); + final NioEventLoopGroup connectGroup = + new NioEventLoopGroup(1, connectFactory, NioUdtProvider.MESSAGE_PROVIDER); + try { final Bootstrap boot = new Bootstrap(); boot.group(connectGroup) diff --git a/example/src/main/java/io/netty/example/udt/echo/rendezvousBytes/ByteEchoPeerBase.java b/example/src/main/java/io/netty/example/udt/echo/rendezvousBytes/ByteEchoPeerBase.java index 55d857d9b5..bcb8773872 100644 --- a/example/src/main/java/io/netty/example/udt/echo/rendezvousBytes/ByteEchoPeerBase.java +++ b/example/src/main/java/io/netty/example/udt/echo/rendezvousBytes/ByteEchoPeerBase.java @@ -23,10 +23,10 @@ import io.netty.channel.udt.UdtChannel; import io.netty.channel.udt.nio.NioUdtProvider; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; -import io.netty.util.concurrent.DefaultThreadFactory; +import io.netty.util.concurrent.DefaultExecutorFactory; +import io.netty.util.concurrent.ExecutorFactory; import java.net.SocketAddress; -import java.util.concurrent.ThreadFactory; /** * UDT Byte Stream Peer @@ -50,9 +50,10 @@ public class ByteEchoPeerBase { } public void run() throws Exception { - final ThreadFactory connectFactory = new DefaultThreadFactory("rendezvous"); - final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1, - connectFactory, NioUdtProvider.BYTE_PROVIDER); + final ExecutorFactory connectFactory = new DefaultExecutorFactory("rendezvous"); + final NioEventLoopGroup connectGroup = + new NioEventLoopGroup(1, connectFactory, NioUdtProvider.BYTE_PROVIDER); + try { final Bootstrap bootstrap = new Bootstrap(); bootstrap.group(connectGroup) diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/sctp/SctpTestPermutation.java b/testsuite/src/test/java/io/netty/testsuite/transport/sctp/SctpTestPermutation.java index bc5ab90dca..c19f8027ea 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/sctp/SctpTestPermutation.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/sctp/SctpTestPermutation.java @@ -27,6 +27,7 @@ import io.netty.channel.sctp.oio.OioSctpServerChannel; import io.netty.testsuite.util.TestUtils; import io.netty.testsuite.transport.TestsuitePermutation.BootstrapComboFactory; import io.netty.testsuite.transport.TestsuitePermutation.BootstrapFactory; +import io.netty.util.concurrent.DefaultExecutorFactory; import io.netty.util.concurrent.DefaultThreadFactory; import java.util.ArrayList; @@ -38,9 +39,9 @@ public final class SctpTestPermutation { private static final int BOSSES = 2; private static final int WORKERS = 3; private static final EventLoopGroup nioBossGroup = - new NioEventLoopGroup(BOSSES, new DefaultThreadFactory("testsuite-sctp-nio-boss", true)); + new NioEventLoopGroup(BOSSES, new DefaultExecutorFactory("testsuite-sctp-nio-boss")); private static final EventLoopGroup nioWorkerGroup = - new NioEventLoopGroup(WORKERS, new DefaultThreadFactory("testsuite-sctp-nio-worker", true)); + new NioEventLoopGroup(WORKERS, new DefaultExecutorFactory("testsuite-sctp-nio-worker")); private static final EventLoopGroup oioBossGroup = new OioEventLoopGroup(Integer.MAX_VALUE, new DefaultThreadFactory("testsuite-sctp-oio-boss", true)); private static final EventLoopGroup oioWorkerGroup = diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketBufReleaseTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketBufReleaseTest.java index 8e8461c17d..d25dcf52b2 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketBufReleaseTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketBufReleaseTest.java @@ -25,7 +25,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.DefaultPromise; -import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Promise; import org.junit.Test; @@ -37,8 +36,7 @@ import static org.junit.Assert.*; public class SocketBufReleaseTest extends AbstractSocketTest { - private static final EventExecutor executor = - new DefaultEventExecutorGroup(1, new DefaultThreadFactory(SocketBufReleaseTest.class, true)).next(); + private static final EventExecutor executor = new DefaultEventExecutorGroup(1).next(); @Test public void testBufRelease() throws Throwable { diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java index 740c69753d..e8c87350f7 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java @@ -33,6 +33,7 @@ import io.netty.channel.socket.oio.OioServerSocketChannel; import io.netty.channel.socket.oio.OioSocketChannel; import io.netty.testsuite.transport.TestsuitePermutation.BootstrapComboFactory; import io.netty.testsuite.transport.TestsuitePermutation.BootstrapFactory; +import io.netty.util.concurrent.DefaultExecutorFactory; import io.netty.util.concurrent.DefaultThreadFactory; import java.util.ArrayList; @@ -49,9 +50,9 @@ public class SocketTestPermutation { protected static final int OIO_SO_TIMEOUT = 10; // Use short timeout for faster runs. protected final EventLoopGroup nioBossGroup = - new NioEventLoopGroup(BOSSES, new DefaultThreadFactory("testsuite-nio-boss", true)); + new NioEventLoopGroup(BOSSES, new DefaultExecutorFactory("testsuite-nio-boss")); protected final EventLoopGroup nioWorkerGroup = - new NioEventLoopGroup(WORKERS, new DefaultThreadFactory("testsuite-nio-worker", true)); + new NioEventLoopGroup(WORKERS, new DefaultExecutorFactory("testsuite-nio-worker")); protected final EventLoopGroup oioBossGroup = new OioEventLoopGroup(Integer.MAX_VALUE, new DefaultThreadFactory("testsuite-oio-boss", true)); protected final EventLoopGroup oioWorkerGroup = diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/udt/UDTClientServerConnectionTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/udt/UDTClientServerConnectionTest.java index 3dcfb26559..658a6ff9f7 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/udt/UDTClientServerConnectionTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/udt/UDTClientServerConnectionTest.java @@ -32,14 +32,13 @@ import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; -import io.netty.util.concurrent.DefaultThreadFactory; +import io.netty.util.concurrent.DefaultExecutorFactory; +import io.netty.util.concurrent.ExecutorFactory; import io.netty.util.concurrent.GlobalEventExecutor; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.ThreadFactory; - import static org.junit.Assert.*; /** @@ -66,9 +65,10 @@ public class UDTClientServerConnectionTest { @Override public void run() { final Bootstrap boot = new Bootstrap(); - final ThreadFactory clientFactory = new DefaultThreadFactory("client"); - final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1, - clientFactory, NioUdtProvider.BYTE_PROVIDER); + final ExecutorFactory clientFactory = new DefaultExecutorFactory("client"); + final NioEventLoopGroup connectGroup = + new NioEventLoopGroup(1, clientFactory, NioUdtProvider.BYTE_PROVIDER); + try { boot.group(connectGroup) .channelFactory(NioUdtProvider.BYTE_CONNECTOR) @@ -193,12 +193,13 @@ public class UDTClientServerConnectionTest { @Override public void run() { final ServerBootstrap boot = new ServerBootstrap(); - final ThreadFactory acceptFactory = new DefaultThreadFactory("accept"); - final ThreadFactory serverFactory = new DefaultThreadFactory("server"); - final NioEventLoopGroup acceptGroup = new NioEventLoopGroup(1, - acceptFactory, NioUdtProvider.BYTE_PROVIDER); - final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1, - serverFactory, NioUdtProvider.BYTE_PROVIDER); + final ExecutorFactory acceptFactory = new DefaultExecutorFactory("accept"); + final ExecutorFactory serverFactory = new DefaultExecutorFactory("server"); + final NioEventLoopGroup acceptGroup = + new NioEventLoopGroup(1, acceptFactory, NioUdtProvider.BYTE_PROVIDER); + final NioEventLoopGroup connectGroup = + new NioEventLoopGroup(1, serverFactory, NioUdtProvider.BYTE_PROVIDER); + try { boot.group(acceptGroup, connectGroup) .channelFactory(NioUdtProvider.BYTE_ACCEPTOR) diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java index 62a64cee44..6faf6cb23b 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java @@ -33,7 +33,8 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; /** - * {@link EventLoop} which uses epoll under the covers. Only works on Linux! + * A {@link SingleThreadEventLoop} implementation which uses epoll + * under the covers. This {@link EventLoop} works only on Linux systems! */ final class EpollEventLoop extends SingleThreadEventLoop { private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollEventLoop.class); @@ -208,84 +209,85 @@ final class EpollEventLoop extends SingleThreadEventLoop { @Override protected void run() { - for (;;) { - boolean oldWakenUp = WAKEN_UP_UPDATER.getAndSet(this, 0) == 1; - try { - int ready; - if (hasTasks()) { - // Non blocking just return what is ready directly without block - ready = Native.epollWait(epollFd, events, 0); - } else { - ready = epollWait(oldWakenUp); + boolean oldWakenUp = WAKEN_UP_UPDATER.getAndSet(this, 0) == 1; + try { + int ready; + if (hasTasks()) { + // Non blocking just return what is ready directly without block + ready = Native.epollWait(epollFd, events, 0); + } else { + ready = epollWait(oldWakenUp); - // 'wakenUp.compareAndSet(false, true)' is always evaluated - // before calling 'selector.wakeup()' to reduce the wake-up - // overhead. (Selector.wakeup() is an expensive operation.) - // - // However, there is a race condition in this approach. - // The race condition is triggered when 'wakenUp' is set to - // true too early. - // - // 'wakenUp' is set to true too early if: - // 1) Selector is waken up between 'wakenUp.set(false)' and - // 'selector.select(...)'. (BAD) - // 2) Selector is waken up between 'selector.select(...)' and - // 'if (wakenUp.get()) { ... }'. (OK) - // - // In the first case, 'wakenUp' is set to true and the - // following 'selector.select(...)' will wake up immediately. - // Until 'wakenUp' is set to false again in the next round, - // 'wakenUp.compareAndSet(false, true)' will fail, and therefore - // any attempt to wake up the Selector will fail, too, causing - // the following 'selector.select(...)' call to block - // unnecessarily. - // - // To fix this problem, we wake up the selector again if wakenUp - // is true immediately after selector.select(...). - // It is inefficient in that it wakes up the selector for both - // the first case (BAD - wake-up required) and the second case - // (OK - no wake-up required). + // 'wakenUp.compareAndSet(false, true)' is always evaluated + // before calling 'selector.wakeup()' to reduce the wake-up + // overhead. (Selector.wakeup() is an expensive operation.) + // + // However, there is a race condition in this approach. + // The race condition is triggered when 'wakenUp' is set to + // true too early. + // + // 'wakenUp' is set to true too early if: + // 1) Selector is waken up between 'wakenUp.set(false)' and + // 'selector.select(...)'. (BAD) + // 2) Selector is waken up between 'selector.select(...)' and + // 'if (wakenUp.get()) { ... }'. (OK) + // + // In the first case, 'wakenUp' is set to true and the + // following 'selector.select(...)' will wake up immediately. + // Until 'wakenUp' is set to false again in the next round, + // 'wakenUp.compareAndSet(false, true)' will fail, and therefore + // any attempt to wake up the Selector will fail, too, causing + // the following 'selector.select(...)' call to block + // unnecessarily. + // + // To fix this problem, we wake up the selector again if wakenUp + // is true immediately after selector.select(...). + // It is inefficient in that it wakes up the selector for both + // the first case (BAD - wake-up required) and the second case + // (OK - no wake-up required). - if (wakenUp == 1) { - Native.eventFdWrite(eventFd, 1L); - } - } - - final int ioRatio = this.ioRatio; - if (ioRatio == 100) { - if (ready > 0) { - processReady(events, ready); - } - runAllTasks(); - } else { - final long ioStartTime = System.nanoTime(); - - if (ready > 0) { - processReady(events, ready); - } - - final long ioTime = System.nanoTime() - ioStartTime; - runAllTasks(ioTime * (100 - ioRatio) / ioRatio); - } - - if (isShuttingDown()) { - closeAll(); - if (confirmShutdown()) { - break; - } - } - } catch (Throwable t) { - logger.warn("Unexpected exception in the selector loop.", t); - - // Prevent possible consecutive immediate failures that lead to - // excessive CPU consumption. - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // Ignore. + if (wakenUp == 1) { + Native.eventFdWrite(eventFd, 1L); } } + + final int ioRatio = this.ioRatio; + if (ioRatio == 100) { + if (ready > 0) { + processReady(events, ready); + } + runAllTasks(); + } else { + final long ioStartTime = System.nanoTime(); + + if (ready > 0) { + processReady(events, ready); + } + + final long ioTime = System.nanoTime() - ioStartTime; + runAllTasks(ioTime * (100 - ioRatio) / ioRatio); + } + + if (isShuttingDown()) { + closeAll(); + if (confirmShutdown()) { + cleanupAndTerminate(true); + return; + } + } + } catch (Throwable t) { + logger.warn("Unexpected exception in the selector loop.", t); + + // Prevent possible consecutive immediate failures that lead to + // excessive CPU consumption. + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // Ignore. + } } + + scheduleExecution(); } private void closeAll() { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java index 101312c611..8704a64058 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java @@ -19,43 +19,90 @@ import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.MultithreadEventLoopGroup; import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.ExecutorFactory; import java.util.concurrent.Executor; -import java.util.concurrent.ThreadFactory; +import io.netty.util.concurrent.DefaultExecutorFactory; + /** - * {@link EventLoopGroup} which uses epoll under the covers. Because of this - * it only works on linux. + * A {@link MultithreadEventLoopGroup} which uses epoll under the + * covers. This {@link EventLoopGroup} works only on Linux systems! */ public final class EpollEventLoopGroup extends MultithreadEventLoopGroup { /** - * Create a new instance using the default number of threads and the default {@link ThreadFactory}. + * Create a new instance that uses twice as many {@link EventLoop}s as there processors/cores + * available, as well as the default {@link Executor}. + * + * @see DefaultExecutorFactory */ public EpollEventLoopGroup() { this(0); } /** - * Create a new instance using the specified number of threads and the default {@link ThreadFactory}. + * Create a new instance that uses the default {@link Executor}. + * + * @see DefaultExecutorFactory + * + * @param nEventLoops the number of {@link EventLoop}s that will be used by this instance. + * If {@code executor} is {@code null} this number will also be the parallelism + * requested from the default executor. It is generally advised for the number + * of {@link EventLoop}s and the number of {@link Thread}s used by the */ - public EpollEventLoopGroup(int nThreads) { - this(nThreads, null); + public EpollEventLoopGroup(int nEventLoops) { + this(nEventLoops, (Executor) null); } /** - * Create a new instance using the specified number of threads and the given {@link ThreadFactory}. + * @param nEventLoops the number of {@link EventLoop}s that will be used by this instance. + * If {@code executor} is {@code null} this number will also be the parallelism + * requested from the default executor. It is generally advised for the number + * of {@link EventLoop}s and the number of {@link Thread}s used by the + * {@code executor} to lie very close together. + * @param executor the {@link Executor} to use, or {@code null} if the default should be used. */ - public EpollEventLoopGroup(int nThreads, ThreadFactory threadFactory) { - this(nThreads, threadFactory, 128); + public EpollEventLoopGroup(int nEventLoops, Executor executor) { + this(nEventLoops, executor, 128); } /** - * Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the given - * maximal amount of epoll events to handle per epollWait(...). + * @param nEventLoops the number of {@link EventLoop}s that will be used by this instance. + * If {@code executor} is {@code null} this number will also be the parallelism + * requested from the default executor. It is generally advised for the number + * of {@link EventLoop}s and the number of {@link Thread}s used by the + * {@code executor} to lie very close together. + * @param executorFactory the {@link ExecutorFactory} to use, or {@code null} if the default should be used. */ - public EpollEventLoopGroup(int nThreads, ThreadFactory threadFactory, int maxEventsAtOnce) { - super(nThreads, threadFactory, maxEventsAtOnce); + public EpollEventLoopGroup(int nEventLoops, ExecutorFactory executorFactory) { + this(nEventLoops, executorFactory, 128); + } + + /** + * @param nEventLoops the number of {@link EventLoop}s that will be used by this instance. + * If {@code executor} is {@code null} this number will also be the parallelism + * requested from the default executor. It is generally advised for the number + * of {@link EventLoop}s and the number of {@link Thread}s used by the + * {@code executor} to lie very close together. + * @param executor the {@link Executor} to use, or {@code null} if the default should be used. + * @param maxEventsAtOnce the maximum number of epoll events to handle per epollWait(...). + */ + public EpollEventLoopGroup(int nEventLoops, Executor executor, int maxEventsAtOnce) { + super(nEventLoops, executor, maxEventsAtOnce); + } + + /** + * @param nEventLoops the number of {@link EventLoop}s that will be used by this instance. + * If {@code executor} is {@code null} this number will also be the parallelism + * requested from the default executor. It is generally advised for the number + * of {@link EventLoop}s and the number of {@link Thread}s used by the + * {@code executor} to lie very close together. + * @param executorFactory the {@link ExecutorFactory} to use, or {@code null} if the default should be used. + * @param maxEventsAtOnce the maximum number of epoll events to handle per epollWait(...). + */ + public EpollEventLoopGroup(int nEventLoops, ExecutorFactory executorFactory, int maxEventsAtOnce) { + super(nEventLoops, executorFactory, maxEventsAtOnce); } /** diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java index 6d56c6b891..e35a651b42 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java @@ -27,7 +27,7 @@ import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.testsuite.transport.TestsuitePermutation; import io.netty.testsuite.transport.TestsuitePermutation.BootstrapFactory; import io.netty.testsuite.transport.socket.SocketTestPermutation; -import io.netty.util.concurrent.DefaultThreadFactory; +import io.netty.util.concurrent.DefaultExecutorFactory; import java.util.Arrays; import java.util.List; @@ -37,9 +37,9 @@ class EpollSocketTestPermutation extends SocketTestPermutation { static final SocketTestPermutation INSTANCE = new EpollSocketTestPermutation(); static final EventLoopGroup EPOLL_BOSS_GROUP = - new EpollEventLoopGroup(BOSSES, new DefaultThreadFactory("testsuite-epoll-boss", true)); + new EpollEventLoopGroup(BOSSES, new DefaultExecutorFactory("testsuite-epoll-boss")); static final EventLoopGroup EPOLL_WORKER_GROUP = - new EpollEventLoopGroup(WORKERS, new DefaultThreadFactory("testsuite-epoll-worker", true)); + new EpollEventLoopGroup(WORKERS, new DefaultExecutorFactory("testsuite-epoll-worker")); @Override public List> socket() { diff --git a/transport-udt/src/test/java/io/netty/test/udt/bench/xfer/UdtNetty.java b/transport-udt/src/test/java/io/netty/test/udt/bench/xfer/UdtNetty.java index c138497642..aa1a27ad90 100644 --- a/transport-udt/src/test/java/io/netty/test/udt/bench/xfer/UdtNetty.java +++ b/transport-udt/src/test/java/io/netty/test/udt/bench/xfer/UdtNetty.java @@ -28,11 +28,11 @@ import io.netty.test.udt.util.CustomReporter; import io.netty.test.udt.util.EchoMessageHandler; import io.netty.test.udt.util.TrafficControl; import io.netty.test.udt.util.UnitHelp; +import io.netty.util.concurrent.DefaultExecutorFactory; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.net.InetSocketAddress; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -92,10 +92,10 @@ public final class UdtNetty { final ChannelHandler handler1 = new EchoMessageHandler(rate, size); final ChannelHandler handler2 = new EchoMessageHandler(null, size); - final NioEventLoopGroup group1 = new NioEventLoopGroup( - 1, Executors.defaultThreadFactory(), NioUdtProvider.MESSAGE_PROVIDER); - final NioEventLoopGroup group2 = new NioEventLoopGroup( - 1, Executors.defaultThreadFactory(), NioUdtProvider.MESSAGE_PROVIDER); + final NioEventLoopGroup group1 = + new NioEventLoopGroup(1, new DefaultExecutorFactory("group1"), NioUdtProvider.MESSAGE_PROVIDER); + final NioEventLoopGroup group2 = + new NioEventLoopGroup(1, new DefaultExecutorFactory("group2"), NioUdtProvider.MESSAGE_PROVIDER); final Bootstrap peerBoot1 = new Bootstrap(); peerBoot1.group(group1) diff --git a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtByteRendezvousChannelTest.java b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtByteRendezvousChannelTest.java index 8275ef4629..ce21277aaa 100644 --- a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtByteRendezvousChannelTest.java +++ b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtByteRendezvousChannelTest.java @@ -25,12 +25,12 @@ import io.netty.channel.udt.nio.NioUdtByteRendezvousChannel; import io.netty.channel.udt.nio.NioUdtProvider; import io.netty.test.udt.util.EchoByteHandler; import io.netty.test.udt.util.UnitHelp; +import io.netty.util.concurrent.DefaultExecutorFactory; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import org.junit.Test; import java.net.InetSocketAddress; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static org.junit.Assert.*; @@ -70,10 +70,10 @@ public class NioUdtByteRendezvousChannelTest extends AbstractUdtTest { final EchoByteHandler handler1 = new EchoByteHandler(rate1, messageSize); final EchoByteHandler handler2 = new EchoByteHandler(rate2, messageSize); - final NioEventLoopGroup group1 = new NioEventLoopGroup( - 1, Executors.defaultThreadFactory(), NioUdtProvider.BYTE_PROVIDER); - final NioEventLoopGroup group2 = new NioEventLoopGroup( - 1, Executors.defaultThreadFactory(), NioUdtProvider.BYTE_PROVIDER); + final NioEventLoopGroup group1 = + new NioEventLoopGroup(1, new DefaultExecutorFactory("group1"), NioUdtProvider.BYTE_PROVIDER); + final NioEventLoopGroup group2 = + new NioEventLoopGroup(1, new DefaultExecutorFactory("group2"), NioUdtProvider.BYTE_PROVIDER); final Bootstrap boot1 = new Bootstrap(); boot1.group(group1) diff --git a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtMessageRendezvousChannelTest.java b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtMessageRendezvousChannelTest.java index 082517b7ab..0d511c31ed 100644 --- a/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtMessageRendezvousChannelTest.java +++ b/transport-udt/src/test/java/io/netty/test/udt/nio/NioUdtMessageRendezvousChannelTest.java @@ -25,12 +25,12 @@ import io.netty.channel.udt.nio.NioUdtMessageRendezvousChannel; import io.netty.channel.udt.nio.NioUdtProvider; import io.netty.test.udt.util.EchoMessageHandler; import io.netty.test.udt.util.UnitHelp; +import io.netty.util.concurrent.DefaultExecutorFactory; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import org.junit.Test; import java.net.InetSocketAddress; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static org.junit.Assert.*; @@ -68,10 +68,10 @@ public class NioUdtMessageRendezvousChannelTest extends AbstractUdtTest { final EchoMessageHandler handler1 = new EchoMessageHandler(rate1, messageSize); final EchoMessageHandler handler2 = new EchoMessageHandler(rate2, messageSize); - final NioEventLoopGroup group1 = new NioEventLoopGroup( - 1, Executors.defaultThreadFactory(), NioUdtProvider.MESSAGE_PROVIDER); - final NioEventLoopGroup group2 = new NioEventLoopGroup( - 1, Executors.defaultThreadFactory(), NioUdtProvider.MESSAGE_PROVIDER); + final NioEventLoopGroup group1 = + new NioEventLoopGroup(1, new DefaultExecutorFactory("group1"), NioUdtProvider.MESSAGE_PROVIDER); + final NioEventLoopGroup group2 = + new NioEventLoopGroup(1, new DefaultExecutorFactory("group2"), NioUdtProvider.MESSAGE_PROVIDER); final Bootstrap boot1 = new Bootstrap(); boot1.group(group1) diff --git a/transport/src/main/java/io/netty/channel/DefaultEventLoop.java b/transport/src/main/java/io/netty/channel/DefaultEventLoop.java index 53d2e3b12b..289b035700 100644 --- a/transport/src/main/java/io/netty/channel/DefaultEventLoop.java +++ b/transport/src/main/java/io/netty/channel/DefaultEventLoop.java @@ -15,31 +15,21 @@ */ package io.netty.channel; -import io.netty.util.concurrent.DefaultThreadFactory; +import io.netty.util.concurrent.DefaultExecutorFactory; import java.util.concurrent.Executor; -import java.util.concurrent.ThreadFactory; - public class DefaultEventLoop extends SingleThreadEventLoop { public DefaultEventLoop() { this((EventLoopGroup) null); } - public DefaultEventLoop(ThreadFactory threadFactory) { - this(null, threadFactory); - } - public DefaultEventLoop(Executor executor) { this(null, executor); } public DefaultEventLoop(EventLoopGroup parent) { - this(parent, new DefaultThreadFactory(DefaultEventLoop.class)); - } - - public DefaultEventLoop(EventLoopGroup parent, ThreadFactory threadFactory) { - super(parent, threadFactory, true); + this(parent, new DefaultExecutorFactory(DefaultEventLoop.class).newExecutor(1)); } public DefaultEventLoop(EventLoopGroup parent, Executor executor) { @@ -48,16 +38,16 @@ public class DefaultEventLoop extends SingleThreadEventLoop { @Override protected void run() { - for (;;) { - Runnable task = takeTask(); - if (task != null) { - task.run(); - updateLastExecutionTime(); - } + Runnable task = takeTask(); + if (task != null) { + task.run(); + updateLastExecutionTime(); + } - if (confirmShutdown()) { - break; - } + if (confirmShutdown()) { + cleanupAndTerminate(true); + } else { + scheduleExecution(); } } } diff --git a/transport/src/main/java/io/netty/channel/DefaultEventLoopGroup.java b/transport/src/main/java/io/netty/channel/DefaultEventLoopGroup.java index 6e8ba13452..cfb48de3bd 100644 --- a/transport/src/main/java/io/netty/channel/DefaultEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/DefaultEventLoopGroup.java @@ -15,8 +15,9 @@ */ package io.netty.channel; +import io.netty.util.concurrent.ExecutorFactory; + import java.util.concurrent.Executor; -import java.util.concurrent.ThreadFactory; /** * {@link MultithreadEventLoopGroup} which must be used for the local transport. @@ -24,29 +25,48 @@ import java.util.concurrent.ThreadFactory; public class DefaultEventLoopGroup extends MultithreadEventLoopGroup { /** - * Create a new instance with the default number of threads. + * Create a new instance that uses twice as many {@link EventLoop}s as there processors/cores + * available, as well as the default {@link Executor}. + * + * @see io.netty.util.concurrent.DefaultExecutorFactory */ public DefaultEventLoopGroup() { this(0); } /** - * Create a new instance - * - * @param nThreads the number of threads to use + * @param nEventLoops the number of {@link EventLoop}s that will be used by this instance. + * If {@code executor} is {@code null} this number will also be the parallelism + * requested from the default executor. It is generally advised for the number + * of {@link EventLoop}s and the number of {@link Thread}s used by the + * {@code executor} to lie very close together. */ - public DefaultEventLoopGroup(int nThreads) { - this(nThreads, null); + public DefaultEventLoopGroup(int nEventLoops) { + this(nEventLoops, (Executor) null); } /** - * Create a new instance - * - * @param nThreads the number of threads to use - * @param threadFactory the {@link ThreadFactory} or {@code null} to use the default + * @param nEventLoops the number of {@link EventLoop}s that will be used by this instance. + * If {@code executor} is {@code null} this number will also be the parallelism + * requested from the default executor. It is generally advised for the number + * of {@link EventLoop}s and the number of {@link Thread}s used by the + * {@code executor} to lie very close together. + * @param executor the {@link Executor} to use, or {@code null} if the default should be used. */ - public DefaultEventLoopGroup(int nThreads, ThreadFactory threadFactory) { - super(nThreads, threadFactory); + public DefaultEventLoopGroup(int nEventLoops, Executor executor) { + super(nEventLoops, executor); + } + + /** + * @param nEventLoops the number of {@link EventLoop}s that will be used by this instance. + * If {@code executor} is {@code null} this number will also be the parallelism + * requested from the default executor. It is generally advised for the number + * of {@link EventLoop}s and the number of {@link Thread}s used by the + * {@code executor} to lie very close together. + * @param executorFactory the {@link ExecutorFactory} to use, or {@code null} if the default should be used. + */ + public DefaultEventLoopGroup(int nEventLoops, ExecutorFactory executorFactory) { + super(nEventLoops, executorFactory); } @Override diff --git a/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java b/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java index aae17b4505..3b4fb9696e 100644 --- a/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java @@ -15,17 +15,16 @@ */ package io.netty.channel; -import io.netty.util.concurrent.DefaultThreadFactory; +import io.netty.util.concurrent.ExecutorFactory; import io.netty.util.concurrent.MultithreadEventExecutorGroup; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.util.concurrent.Executor; -import java.util.concurrent.ThreadFactory; /** - * Abstract base class for {@link EventLoopGroup} implementations that handles their tasks with multiple threads at + * Abstract base class for {@link EventLoopGroup} implementations that handle their tasks with multiple threads at * the same time. */ public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup { @@ -46,20 +45,15 @@ public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutor /** * @see {@link MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)} */ - protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { - super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); + protected MultithreadEventLoopGroup(int nEventLoops, Executor executor, Object... args) { + super(nEventLoops == 0 ? DEFAULT_EVENT_LOOP_THREADS : nEventLoops, executor, args); } /** - * @see {@link MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, ThreadFactory, Object...)} + * @see {@link MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, ExecutorFactory, Object...)} */ - protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) { - super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args); - } - - @Override - protected ThreadFactory newDefaultThreadFactory() { - return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY); + protected MultithreadEventLoopGroup(int nEventLoops, ExecutorFactory executorFactory, Object... args) { + super(nEventLoops == 0 ? DEFAULT_EVENT_LOOP_THREADS : nEventLoops, executorFactory, args); } @Override diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java index 81cc3dbaef..14b80b67ba 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java @@ -28,10 +28,6 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im private final ChannelHandlerInvoker invoker = new DefaultChannelHandlerInvoker(this); - protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) { - super(parent, threadFactory, addTaskWakesUp); - } - protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp) { super(parent, executor, addTaskWakesUp); } diff --git a/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoop.java b/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoop.java index c73f36a929..171e3ef2d2 100644 --- a/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoop.java +++ b/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoop.java @@ -59,7 +59,8 @@ public class ThreadPerChannelEventLoop extends SingleThreadEventLoop { ch.unsafe().close(ch.unsafe().voidPromise()); } if (confirmShutdown()) { - break; + cleanupAndTerminate(true); + return; } } else { if (ch != null) { diff --git a/transport/src/main/java/io/netty/channel/local/LocalEventLoopGroup.java b/transport/src/main/java/io/netty/channel/local/LocalEventLoopGroup.java deleted file mode 100644 index 2bd3ff611e..0000000000 --- a/transport/src/main/java/io/netty/channel/local/LocalEventLoopGroup.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.local; - -import io.netty.channel.DefaultEventLoopGroup; - -import java.util.concurrent.ThreadFactory; - -/** - * @deprecated Use {@link DefaultEventLoopGroup} instead. - */ -@Deprecated -public class LocalEventLoopGroup extends DefaultEventLoopGroup { - - /** - * Create a new instance with the default number of threads. - */ - public LocalEventLoopGroup() { } - - /** - * Create a new instance - * - * @param nThreads the number of threads to use - */ - public LocalEventLoopGroup(int nThreads) { - super(nThreads); - } - - /** - * Create a new instance - * - * @param nThreads the number of threads to use - * @param threadFactory the {@link ThreadFactory} or {@code null} to use the default - */ - public LocalEventLoopGroup(int nThreads, ThreadFactory threadFactory) { - super(nThreads, threadFactory); - } -} diff --git a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java index ad64cd5f3a..1416541fe6 100644 --- a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java @@ -43,9 +43,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** - * {@link SingleThreadEventLoop} implementation which register the {@link Channel}'s to a - * {@link Selector} and so does the multi-plexing of these in the event loop. - * + * A {@link SingleThreadEventLoop} implementation which registers each {@link Channel} with a + * NIO {@link Selector} and performs the multiplexing of these in the event loop. */ public final class NioEventLoop extends SingleThreadEventLoop { @@ -300,80 +299,85 @@ public final class NioEventLoop extends SingleThreadEventLoop { @Override protected void run() { - for (;;) { - boolean oldWakenUp = wakenUp.getAndSet(false); - try { - if (hasTasks()) { - selectNow(); - } else { - select(oldWakenUp); + boolean oldWakenUp = wakenUp.getAndSet(false); + try { + if (hasTasks()) { + selectNow(); + } else { + select(oldWakenUp); - // 'wakenUp.compareAndSet(false, true)' is always evaluated - // before calling 'selector.wakeup()' to reduce the wake-up - // overhead. (Selector.wakeup() is an expensive operation.) - // - // However, there is a race condition in this approach. - // The race condition is triggered when 'wakenUp' is set to - // true too early. - // - // 'wakenUp' is set to true too early if: - // 1) Selector is waken up between 'wakenUp.set(false)' and - // 'selector.select(...)'. (BAD) - // 2) Selector is waken up between 'selector.select(...)' and - // 'if (wakenUp.get()) { ... }'. (OK) - // - // In the first case, 'wakenUp' is set to true and the - // following 'selector.select(...)' will wake up immediately. - // Until 'wakenUp' is set to false again in the next round, - // 'wakenUp.compareAndSet(false, true)' will fail, and therefore - // any attempt to wake up the Selector will fail, too, causing - // the following 'selector.select(...)' call to block - // unnecessarily. - // - // To fix this problem, we wake up the selector again if wakenUp - // is true immediately after selector.select(...). - // It is inefficient in that it wakes up the selector for both - // the first case (BAD - wake-up required) and the second case - // (OK - no wake-up required). + // 'wakenUp.compareAndSet(false, true)' is always evaluated + // before calling 'selector.wakeup()' to reduce the wake-up + // overhead. (Selector.wakeup() is an expensive operation.) + // + // However, there is a race condition in this approach. + // The race condition is triggered when 'wakenUp' is set to + // true too early. + // + // 'wakenUp' is set to true too early if: + // 1) Selector is waken up between 'wakenUp.set(false)' and + // 'selector.select(...)'. (BAD) + // 2) Selector is waken up between 'selector.select(...)' and + // 'if (wakenUp.get()) { ... }'. (OK) + // + // In the first case, 'wakenUp' is set to true and the + // following 'selector.select(...)' will wake up immediately. + // Until 'wakenUp' is set to false again in the next round, + // 'wakenUp.compareAndSet(false, true)' will fail, and therefore + // any attempt to wake up the Selector will fail, too, causing + // the following 'selector.select(...)' call to block + // unnecessarily. + // + // To fix this problem, we wake up the selector again if wakenUp + // is true immediately after selector.select(...). + // It is inefficient in that it wakes up the selector for both + // the first case (BAD - wake-up required) and the second case + // (OK - no wake-up required). - if (wakenUp.get()) { - selector.wakeup(); - } - } - - cancelledKeys = 0; - needsToSelectAgain = false; - final int ioRatio = this.ioRatio; - if (ioRatio == 100) { - processSelectedKeys(); - runAllTasks(); - } else { - final long ioStartTime = System.nanoTime(); - - processSelectedKeys(); - - final long ioTime = System.nanoTime() - ioStartTime; - runAllTasks(ioTime * (100 - ioRatio) / ioRatio); - } - - if (isShuttingDown()) { - closeAll(); - if (confirmShutdown()) { - break; - } - } - } catch (Throwable t) { - logger.warn("Unexpected exception in the selector loop.", t); - - // Prevent possible consecutive immediate failures that lead to - // excessive CPU consumption. - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // Ignore. + if (wakenUp.get()) { + selector.wakeup(); } } + + cancelledKeys = 0; + needsToSelectAgain = false; + final int ioRatio = this.ioRatio; + if (ioRatio == 100) { + processSelectedKeys(); + runAllTasks(); + } else { + final long ioStartTime = System.nanoTime(); + + processSelectedKeys(); + + final long ioTime = System.nanoTime() - ioStartTime; + runAllTasks(ioTime * (100 - ioRatio) / ioRatio); + } + + if (isShuttingDown()) { + closeAll(); + if (confirmShutdown()) { + cleanupAndTerminate(true); + return; + } + } + } catch (Throwable t) { + logger.warn("Unexpected exception in the selector loop.", t); + + // TODO: After using a ForkJoinPool that is potentially shared with other software + // than Netty. The Thread.sleep might be problematic. Even though this is unlikely to ever + // happen anyways. + + // Prevent possible consecutive immediate failures that lead to + // excessive CPU consumption. + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // Ignore. + } } + + scheduleExecution(); } private void processSelectedKeys() { diff --git a/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java b/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java index 584307816f..c5b32a8e37 100644 --- a/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java @@ -23,53 +23,95 @@ import io.netty.util.concurrent.EventExecutor; import java.nio.channels.Selector; import java.nio.channels.spi.SelectorProvider; import java.util.concurrent.Executor; -import java.util.concurrent.ThreadFactory; +import io.netty.util.concurrent.DefaultExecutorFactory; +import io.netty.util.concurrent.ExecutorFactory; /** - * {@link MultithreadEventLoopGroup} implementations which is used for NIO {@link Selector} based {@link Channel}s. + * A {@link MultithreadEventLoopGroup} implementation which is used for NIO {@link Selector} based {@link Channel}s. */ public class NioEventLoopGroup extends MultithreadEventLoopGroup { /** - * Create a new instance using the default number of threads, the default {@link ThreadFactory} and - * the {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}. + * Create a new instance that uses twice as many {@link EventLoop}s as there processors/cores + * available, as well as the default {@link Executor} and the {@link SelectorProvider} which + * is returned by {@link SelectorProvider#provider()}. + * + * @see DefaultExecutorFactory */ public NioEventLoopGroup() { this(0); } /** - * Create a new instance using the specified number of threads, {@link ThreadFactory} and the - * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}. + * Create a new instance that uses the default {@link Executor} and the {@link SelectorProvider} which + * is returned by {@link SelectorProvider#provider()}. + * + * @see DefaultExecutorFactory + * + * @param nEventLoops the number of {@link EventLoop}s that will be used by this instance. + * If {@code executor} is {@code null} this number will also be the parallelism + * requested from the default executor. It is generally advised for the number + * of {@link EventLoop}s and the number of {@link Thread}s used by the */ - public NioEventLoopGroup(int nThreads) { - this(nThreads, (Executor) null); + public NioEventLoopGroup(int nEventLoops) { + this(nEventLoops, (Executor) null); } /** - * Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the - * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}. + * Create a new instance that uses the the {@link SelectorProvider} which is returned by + * {@link SelectorProvider#provider()}. + * + * @param nEventLoops the number of {@link EventLoop}s that will be used by this instance. + * If {@code executor} is {@code null} this number will also be the parallelism + * requested from the default executor. It is generally advised for the number + * of {@link EventLoop}s and the number of {@link Thread}s used by the + * {@code executor} to lie very close together. + * @param executor the {@link Executor} to use, or {@code null} if the default should be used. */ - public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) { - this(nThreads, threadFactory, SelectorProvider.provider()); - } - - public NioEventLoopGroup(int nThreads, Executor executor) { - this(nThreads, executor, SelectorProvider.provider()); + public NioEventLoopGroup(int nEventLoops, Executor executor) { + this(nEventLoops, executor, SelectorProvider.provider()); } /** - * Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the given - * {@link SelectorProvider}. + * Create a new instance that uses the the {@link SelectorProvider} which is returned by + * {@link SelectorProvider#provider()}. + * + * @param nEventLoops the number of {@link EventLoop}s that will be used by this instance. + * If {@code executor} is {@code null} this number will also be the parallelism + * requested from the default executor. It is generally advised for the number + * of {@link EventLoop}s and the number of {@link Thread}s used by the + * {@code executor} to lie very close together. + * @param executorFactory the {@link ExecutorFactory} to use, or {@code null} if the default should be used. */ - public NioEventLoopGroup( - int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) { - super(nThreads, threadFactory, selectorProvider); + public NioEventLoopGroup(int nEventLoops, ExecutorFactory executorFactory) { + this(nEventLoops, executorFactory, SelectorProvider.provider()); } + /** + * @param nEventLoops the number of {@link EventLoop}s that will be used by this instance. + * If {@code executor} is {@code null} this number will also be the parallelism + * requested from the default executor. It is generally advised for the number + * of {@link EventLoop}s and the number of {@link Thread}s used by the + * {@code executor} to lie very close together. + * @param executor the {@link Executor} to use, or {@code null} if the default should be used. + * @param selectorProvider the {@link SelectorProvider} to use. This value must not be {@code null}. + */ + public NioEventLoopGroup(int nEventLoops, Executor executor, final SelectorProvider selectorProvider) { + super(nEventLoops, executor, selectorProvider); + } + + /** + * @param nEventLoops the number of {@link EventLoop}s that will be used by this instance. + * If {@code executor} is {@code null} this number will also be the parallelism + * requested from the default executor. It is generally advised for the number + * of {@link EventLoop}s and the number of {@link Thread}s used by the + * {@code executor} to lie very close together. + * @param executorFactory the {@link ExecutorFactory} to use, or {@code null} if the default should be used. + * @param selectorProvider the {@link SelectorProvider} to use. This value must not be {@code null}. + */ public NioEventLoopGroup( - int nThreads, Executor executor, final SelectorProvider selectorProvider) { - super(nThreads, executor, selectorProvider); + int nEventLoops, ExecutorFactory executorFactory, final SelectorProvider selectorProvider) { + super(nEventLoops, executorFactory, selectorProvider); } /** diff --git a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java index 281cedc154..5897a83d59 100644 --- a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java @@ -19,6 +19,7 @@ import ch.qos.logback.classic.Logger; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.Appender; import io.netty.channel.local.LocalChannel; +import io.netty.util.concurrent.DefaultExecutorFactory; import io.netty.util.concurrent.EventExecutor; import org.junit.After; import org.junit.Before; @@ -31,7 +32,6 @@ import java.util.List; import java.util.Queue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledFuture; @@ -440,21 +440,21 @@ public class SingleThreadEventLoopTest { final AtomicInteger cleanedUp = new AtomicInteger(); SingleThreadEventLoopA() { - super(null, Executors.defaultThreadFactory(), true); + super(null, new DefaultExecutorFactory("A").newExecutor(1), true); } @Override protected void run() { - for (;;) { - Runnable task = takeTask(); - if (task != null) { - task.run(); - updateLastExecutionTime(); - } + Runnable task = takeTask(); + if (task != null) { + task.run(); + updateLastExecutionTime(); + } - if (confirmShutdown()) { - break; - } + if (confirmShutdown()) { + cleanupAndTerminate(true); + } else { + scheduleExecution(); } } @@ -466,30 +466,43 @@ public class SingleThreadEventLoopTest { private static class SingleThreadEventLoopB extends SingleThreadEventLoop { + private volatile Thread thread; + private volatile boolean interrupted; + SingleThreadEventLoopB() { - super(null, Executors.defaultThreadFactory(), false); + super(null, new DefaultExecutorFactory("B").newExecutor(1), false); } @Override protected void run() { - for (;;) { - try { - Thread.sleep(TimeUnit.NANOSECONDS.toMillis(delayNanos(System.nanoTime()))); - } catch (InterruptedException e) { - // Waken up by interruptThread() - } + thread = Thread.currentThread(); - runAllTasks(); + if (interrupted) { + thread.interrupt(); + } - if (confirmShutdown()) { - break; - } + try { + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(delayNanos(System.nanoTime()))); + } catch (InterruptedException e) { + // Waken up by interruptThread() + } + + runAllTasks(); + + if (confirmShutdown()) { + cleanupAndTerminate(true); + } else { + scheduleExecution(); } } @Override protected void wakeup(boolean inEventLoop) { - interruptThread(); + if (thread == null) { + interrupted = true; + } else { + thread.interrupt(); + } } } } diff --git a/transport/src/test/java/io/netty/channel/ThreadPerChannelEventLoopGroupTest.java b/transport/src/test/java/io/netty/channel/ThreadPerChannelEventLoopGroupTest.java index 2c936bd233..8ebd39fba7 100644 --- a/transport/src/test/java/io/netty/channel/ThreadPerChannelEventLoopGroupTest.java +++ b/transport/src/test/java/io/netty/channel/ThreadPerChannelEventLoopGroupTest.java @@ -19,8 +19,8 @@ package io.netty.channel; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; +import io.netty.util.concurrent.DefaultExecutorFactory; import io.netty.util.concurrent.DefaultPromise; -import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.Promise; @@ -90,24 +90,23 @@ public class ThreadPerChannelEventLoopGroupTest { assertTrue(loopGroup.isTerminated()); } - private static class TestEventExecutor extends SingleThreadEventExecutor { - + private static final class TestEventExecutor extends SingleThreadEventExecutor { TestEventExecutor() { - super(null, new DefaultThreadFactory("test"), false); + super(null, new DefaultExecutorFactory(TestEventExecutor.class).newExecutor(1), false); } @Override protected void run() { - for (;;) { - Runnable task = takeTask(); - if (task != null) { - task.run(); - updateLastExecutionTime(); - } + Runnable task = takeTask(); + if (task != null) { + task.run(); + updateLastExecutionTime(); + } - if (confirmShutdown()) { - break; - } + if (confirmShutdown()) { + cleanupAndTerminate(true); + } else { + scheduleExecution(); } } } diff --git a/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java b/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java index ade9e904bf..aabceeaca7 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java @@ -190,29 +190,29 @@ public class LocalChannelTest { final EventLoopGroup serverGroup = new DefaultEventLoopGroup(1); final EventLoopGroup clientGroup = new DefaultEventLoopGroup(1) { @Override - protected EventLoop newChild(Executor threadFactory, Object... args) + protected EventLoop newChild(Executor executor, Object... args) throws Exception { - return new SingleThreadEventLoop(this, threadFactory, true) { + return new SingleThreadEventLoop(this, executor, true) { @Override protected void run() { - for (;;) { - Runnable task = takeTask(); - if (task != null) { - /* Only slow down the anonymous class in LocalChannel#doRegister() */ - if (task.getClass().getEnclosingClass() == LocalChannel.class) { - try { - closeLatch.await(); - } catch (InterruptedException e) { - throw new Error(e); - } + Runnable task = takeTask(); + if (task != null) { + /* Only slow down the anonymous class in LocalChannel#doRegister() */ + if (task.getClass().getEnclosingClass() == LocalChannel.class) { + try { + closeLatch.await(); + } catch (InterruptedException e) { + throw new Error(e); } - task.run(); - updateLastExecutionTime(); } + task.run(); + updateLastExecutionTime(); + } - if (confirmShutdown()) { - break; - } + if (confirmShutdown()) { + cleanupAndTerminate(true); + } else { + scheduleExecution(); } } }; diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java index 988ede3824..d50721e7b3 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java @@ -27,18 +27,24 @@ import io.netty.channel.DefaultEventLoopGroup; import io.netty.channel.EventLoopGroup; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.DefaultEventExecutorGroup; -import io.netty.util.concurrent.DefaultThreadFactory; +import io.netty.util.concurrent.DefaultExecutorFactory; import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.ExecutorFactory; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; -import java.util.HashSet; +import java.util.ArrayList; +import java.util.List; import java.util.Queue; -import java.util.Set; +import java.util.Random; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; public class LocalTransportThreadModelTest { @@ -84,9 +90,9 @@ public class LocalTransportThreadModelTest { @Test(timeout = 5000) public void testStagedExecution() throws Throwable { - EventLoopGroup l = new DefaultEventLoopGroup(4, new DefaultThreadFactory("l")); - EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1")); - EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2")); + EventLoopGroup l = new DefaultEventLoopGroup(4, new DefaultExecutorFactory("l")); + EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultExecutorFactory("e1")); + EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultExecutorFactory("e2")); ThreadNameAuditor h1 = new ThreadNameAuditor(); ThreadNameAuditor h2 = new ThreadNameAuditor(); ThreadNameAuditor h3 = new ThreadNameAuditor(true); @@ -173,25 +179,6 @@ public class LocalTransportThreadModelTest { Assert.assertTrue(name.startsWith("e2-")); } - // Assert that the events for the same handler were handled by the same thread. - Set names = new HashSet(); - names.addAll(h1.inboundThreadNames); - names.addAll(h1.outboundThreadNames); - names.addAll(h1.removalThreadNames); - Assert.assertEquals(1, names.size()); - - names.clear(); - names.addAll(h2.inboundThreadNames); - names.addAll(h2.outboundThreadNames); - names.addAll(h2.removalThreadNames); - Assert.assertEquals(1, names.size()); - - names.clear(); - names.addAll(h3.inboundThreadNames); - names.addAll(h3.outboundThreadNames); - names.addAll(h3.removalThreadNames); - Assert.assertEquals(1, names.size()); - // Count the number of events Assert.assertEquals(1, h1.inboundThreadNames.size()); Assert.assertEquals(2, h2.inboundThreadNames.size()); @@ -227,12 +214,12 @@ public class LocalTransportThreadModelTest { @Test(timeout = 30000) @Ignore public void testConcurrentMessageBufferAccess() throws Throwable { - EventLoopGroup l = new DefaultEventLoopGroup(4, new DefaultThreadFactory("l")); - EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1")); - EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2")); - EventExecutorGroup e3 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e3")); - EventExecutorGroup e4 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e4")); - EventExecutorGroup e5 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e5")); + EventLoopGroup l0 = new DefaultEventLoopGroup(4, new DefaultExecutorFactory("l0")); + EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultExecutorFactory("e1")); + EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultExecutorFactory("e2")); + EventExecutorGroup e3 = new DefaultEventExecutorGroup(4, new DefaultExecutorFactory("e3")); + EventExecutorGroup e4 = new DefaultEventExecutorGroup(4, new DefaultExecutorFactory("e4")); + EventExecutorGroup e5 = new DefaultEventExecutorGroup(4, new DefaultExecutorFactory("e5")); try { final MessageForwarder1 h1 = new MessageForwarder1(); @@ -253,7 +240,7 @@ public class LocalTransportThreadModelTest { .addLast(e4, h5) .addLast(e5, h6); - l.register(ch).sync().channel().connect(localAddr).sync(); + l0.register(ch).sync().channel().connect(localAddr).sync(); final int ROUNDS = 1024; final int ELEMS_PER_ROUNDS = 8192; @@ -337,14 +324,14 @@ public class LocalTransportThreadModelTest { ch.close().sync(); } finally { - l.shutdownGracefully(); + l0.shutdownGracefully(); e1.shutdownGracefully(); e2.shutdownGracefully(); e3.shutdownGracefully(); e4.shutdownGracefully(); e5.shutdownGracefully(); - l.terminationFuture().sync(); + l0.terminationFuture().sync(); e1.terminationFuture().sync(); e2.terminationFuture().sync(); e3.terminationFuture().sync(); @@ -414,7 +401,7 @@ public class LocalTransportThreadModelTest { if (t == null) { this.t = Thread.currentThread(); } else { - Assert.assertSame(t, Thread.currentThread()); + assertSameExecutor(t, Thread.currentThread()); } ByteBuf out = ctx.alloc().buffer(4); @@ -428,7 +415,7 @@ public class LocalTransportThreadModelTest { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - Assert.assertSame(t, Thread.currentThread()); + assertSameExecutor(t, Thread.currentThread()); // Don't let the write request go to the server-side channel - just swallow. boolean swallow = this == ctx.pipeline().first(); @@ -472,7 +459,7 @@ public class LocalTransportThreadModelTest { if (t == null) { this.t = Thread.currentThread(); } else { - Assert.assertSame(t, Thread.currentThread()); + assertSameExecutor(t, Thread.currentThread()); } ByteBuf m = (ByteBuf) msg; @@ -488,7 +475,7 @@ public class LocalTransportThreadModelTest { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - Assert.assertSame(t, Thread.currentThread()); + assertSameExecutor(t, Thread.currentThread()); ByteBuf out = ctx.alloc().buffer(4); int m = (Integer) msg; @@ -524,7 +511,7 @@ public class LocalTransportThreadModelTest { if (t == null) { this.t = Thread.currentThread(); } else { - Assert.assertSame(t, Thread.currentThread()); + assertSameExecutor(t, Thread.currentThread()); } int actual = (Integer) msg; @@ -536,7 +523,7 @@ public class LocalTransportThreadModelTest { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - Assert.assertSame(t, Thread.currentThread()); + assertSameExecutor(t, Thread.currentThread()); int actual = (Integer) msg; int expected = outCnt ++; @@ -570,7 +557,7 @@ public class LocalTransportThreadModelTest { if (t == null) { this.t = Thread.currentThread(); } else { - Assert.assertSame(t, Thread.currentThread()); + assertSameExecutor(t, Thread.currentThread()); } int actual = (Integer) msg; @@ -581,7 +568,7 @@ public class LocalTransportThreadModelTest { @Override public void write( ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - Assert.assertSame(t, Thread.currentThread()); + assertSameExecutor(t, Thread.currentThread()); int actual = (Integer) msg; int expected = outCnt ++; @@ -597,4 +584,8 @@ public class LocalTransportThreadModelTest { super.exceptionCaught(ctx, cause); } } + + private static void assertSameExecutor(Thread expected, Thread actual) { + Assert.assertEquals(expected.getName().substring(0, 2), actual.getName().substring(0, 2)); + } } diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java index 19960ab343..5be2f4b77a 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java @@ -26,7 +26,7 @@ import io.netty.channel.DefaultEventLoopGroup; import io.netty.channel.EventLoopGroup; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.DefaultEventExecutorGroup; -import io.netty.util.concurrent.DefaultThreadFactory; +import io.netty.util.concurrent.DefaultExecutorFactory; import io.netty.util.concurrent.EventExecutorGroup; import org.junit.AfterClass; import org.junit.Assert; @@ -116,12 +116,12 @@ public class LocalTransportThreadModelTest3 { } private static void testConcurrentAddRemove(boolean inbound) throws Exception { - EventLoopGroup l = new DefaultEventLoopGroup(4, new DefaultThreadFactory("l")); - EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1")); - EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2")); - EventExecutorGroup e3 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e3")); - EventExecutorGroup e4 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e4")); - EventExecutorGroup e5 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e5")); + EventLoopGroup l = new DefaultEventLoopGroup(4, new DefaultExecutorFactory("l")); + EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultExecutorFactory("e1")); + EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultExecutorFactory("e2")); + EventExecutorGroup e3 = new DefaultEventExecutorGroup(4, new DefaultExecutorFactory("e3")); + EventExecutorGroup e4 = new DefaultEventExecutorGroup(4, new DefaultExecutorFactory("e4")); + EventExecutorGroup e5 = new DefaultEventExecutorGroup(4, new DefaultExecutorFactory("e5")); final EventExecutorGroup[] groups = { e1, e2, e3, e4, e5 }; try {