From eb308cfff66a6264d3eaf81aca28b918cb8fce1e Mon Sep 17 00:00:00 2001 From: Vladimir Krivosheev Date: Tue, 20 Aug 2013 18:42:56 +0200 Subject: [PATCH] ability to use Executor instead of ThreadFactory --- .../util/concurrent/DefaultEventExecutor.java | 6 +- .../concurrent/DefaultEventExecutorGroup.java | 6 +- .../MultithreadEventExecutorGroup.java | 23 ++- .../concurrent/SingleThreadEventExecutor.java | 145 +++++++++++------- .../concurrent/ThreadPerTaskExecutor.java | 35 +++++ .../channel/MultithreadEventLoopGroup.java | 10 +- .../netty/channel/SingleThreadEventLoop.java | 8 + .../channel/ThreadPerChannelEventLoop.java | 2 +- .../ThreadPerChannelEventLoopGroup.java | 26 +++- .../netty/channel/local/LocalEventLoop.java | 6 +- .../channel/local/LocalEventLoopGroup.java | 5 +- .../io/netty/channel/nio/NioEventLoop.java | 6 +- .../netty/channel/nio/NioEventLoopGroup.java | 16 +- .../netty/channel/oio/OioEventLoopGroup.java | 16 ++ .../channel/SingleThreadEventLoopTest.java | 3 +- 15 files changed, 225 insertions(+), 88 deletions(-) create mode 100644 common/src/main/java/io/netty/util/concurrent/ThreadPerTaskExecutor.java diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/DefaultEventExecutor.java index f2f704a3a8..3591b24e76 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultEventExecutor.java @@ -15,7 +15,7 @@ */ package io.netty.util.concurrent; -import java.util.concurrent.ThreadFactory; +import java.util.concurrent.Executor; /** * Default {@link SingleThreadEventExecutor} implementation which just execute all submitted task in a @@ -24,8 +24,8 @@ import java.util.concurrent.ThreadFactory; */ final class DefaultEventExecutor extends SingleThreadEventExecutor { - DefaultEventExecutor(DefaultEventExecutorGroup parent, ThreadFactory threadFactory) { - super(parent, threadFactory, true); + DefaultEventExecutor(DefaultEventExecutorGroup parent, Executor executor) { + super(parent, executor, true); } @Override diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultEventExecutorGroup.java b/common/src/main/java/io/netty/util/concurrent/DefaultEventExecutorGroup.java index faf88a0b8b..5343be1652 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultEventExecutorGroup.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultEventExecutorGroup.java @@ -15,6 +15,7 @@ */ package io.netty.util.concurrent; +import java.util.concurrent.Executor; import java.util.concurrent.ThreadFactory; /** @@ -41,8 +42,7 @@ public class DefaultEventExecutorGroup extends MultithreadEventExecutorGroup { } @Override - protected EventExecutor newChild( - ThreadFactory threadFactory, Object... args) throws Exception { - return new DefaultEventExecutor(this, threadFactory); + protected EventExecutor newChild(Executor executor, Object... args) throws Exception { + return new DefaultEventExecutor(this, executor); } } diff --git a/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java b/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java index 38efd4912e..ff951cb520 100644 --- a/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java +++ b/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java @@ -19,6 +19,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Set; +import java.util.concurrent.Executor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -39,22 +40,33 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto * * @param nThreads the number of threads that will be used by this instance. * @param threadFactory the ThreadFactory to use, or {@code null} if the default should be used. - * @param args arguments which will passed to each {@link #newChild(ThreadFactory, Object...)} call + * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call */ protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) { + this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args); + } + + /** + * Create a new instance. + * + * @param nThreads the number of threads that will be used by this instance. + * @param executor the Executor to use, or {@code null} if the default should be used. + * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call + */ + protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } - if (threadFactory == null) { - threadFactory = newDefaultThreadFactory(); + if (executor == null) { + executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } children = new SingleThreadEventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { - children[i] = newChild(threadFactory, args); + children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type @@ -130,8 +142,7 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto * called for each thread that will serve this {@link MultithreadEventExecutorGroup}. * */ - protected abstract EventExecutor newChild( - ThreadFactory threadFactory, Object... args) throws Exception; + protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception; @Override public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index 37bf980815..10235eefbf 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -27,6 +27,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; +import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; @@ -60,7 +61,9 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { private final Queue taskQueue; final Queue> delayedTaskQueue = new PriorityQueue>(); - private final Thread thread; + private volatile Thread thread; + private final Executor executor; + private volatile boolean interrupted; private final Object stateLock = new Object(); private final Semaphore threadLock = new Semaphore(0); private final Set shutdownHooks = new LinkedHashSet(); @@ -84,64 +87,27 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { */ protected SingleThreadEventExecutor( EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) { + this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp); + } - if (threadFactory == null) { - throw new NullPointerException("threadFactory"); + /** + * Create a new instance + * + * @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it + * @param executor the {@link Executor} which will be used for executing + * @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the + * executor thread + */ + protected SingleThreadEventExecutor( + EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) { + + if (executor == null) { + throw new NullPointerException("executor"); } this.parent = parent; this.addTaskWakesUp = addTaskWakesUp; - - thread = threadFactory.newThread(new Runnable() { - @Override - public void run() { - boolean success = false; - updateLastExecutionTime(); - try { - SingleThreadEventExecutor.this.run(); - success = true; - } catch (Throwable t) { - logger.warn("Unexpected exception from an event executor: ", t); - } finally { - if (state < ST_SHUTTING_DOWN) { - state = ST_SHUTTING_DOWN; - } - - // Check if confirmShutdown() was called at the end of the loop. - if (success && gracefulShutdownStartTime == 0) { - logger.error( - "Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + - SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " + - "before run() implementation terminates."); - } - - try { - // Run all remaining tasks and shutdown hooks. - for (;;) { - if (confirmShutdown()) { - break; - } - } - } finally { - try { - cleanup(); - } finally { - synchronized (stateLock) { - state = ST_TERMINATED; - } - threadLock.release(); - if (!taskQueue.isEmpty()) { - logger.warn( - "An event executor terminated with " + - "non-empty task queue (" + taskQueue.size() + ')'); - } - - terminationFuture.setSuccess(null); - } - } - } - } - }); + this.executor = executor; taskQueue = newTaskQueue(); } @@ -165,7 +131,12 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { * Interrupt the current running {@link Thread}. */ protected void interruptThread() { - thread.interrupt(); + Thread currentThread = thread; + if (currentThread == null) { + interrupted = true; + } else { + currentThread.interrupt(); + } } /** @@ -515,7 +486,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { switch (state) { case ST_NOT_STARTED: state = ST_SHUTTING_DOWN; - thread.start(); + doStartThread(); break; case ST_STARTED: state = ST_SHUTTING_DOWN; @@ -560,7 +531,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { switch (state) { case ST_NOT_STARTED: state = ST_SHUTDOWN; - thread.start(); + doStartThread(); break; case ST_STARTED: case ST_SHUTTING_DOWN: @@ -814,11 +785,69 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { delayedTaskQueue.add(new ScheduledFutureTask( this, delayedTaskQueue, Executors.callable(new PurgeTask(), null), ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL)); - thread.start(); + doStartThread(); } } } + private void doStartThread() { + assert thread == null; + executor.execute(new Runnable() { + @Override + public void run() { + thread = Thread.currentThread(); + if (interrupted) { + thread.interrupt(); + } + + boolean success = false; + updateLastExecutionTime(); + try { + SingleThreadEventExecutor.this.run(); + success = true; + } catch (Throwable t) { + logger.warn("Unexpected exception from an event executor: ", t); + } finally { + if (state < ST_SHUTTING_DOWN) { + state = ST_SHUTTING_DOWN; + } + + // Check if confirmShutdown() was called at the end of the loop. + if (success && gracefulShutdownStartTime == 0) { + logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " + + "before run() implementation terminates."); + } + + try { + // Run all remaining tasks and shutdown hooks. + for (;;) { + if (confirmShutdown()) { + break; + } + } + } finally { + try { + cleanup(); + } finally { + synchronized (stateLock) { + state = ST_TERMINATED; + } + threadLock.release(); + if (!taskQueue.isEmpty()) { + logger.warn( + "An event executor terminated with " + + "non-empty task queue (" + taskQueue.size() + ')'); + } + + terminationFuture.setSuccess(null); + } + } + } + } + }); + } + private final class PurgeTask implements Runnable { @Override public void run() { diff --git a/common/src/main/java/io/netty/util/concurrent/ThreadPerTaskExecutor.java b/common/src/main/java/io/netty/util/concurrent/ThreadPerTaskExecutor.java new file mode 100644 index 0000000000..21210ae067 --- /dev/null +++ b/common/src/main/java/io/netty/util/concurrent/ThreadPerTaskExecutor.java @@ -0,0 +1,35 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util.concurrent; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadFactory; + +public final class ThreadPerTaskExecutor implements Executor { + private final ThreadFactory threadFactory; + + public ThreadPerTaskExecutor(ThreadFactory threadFactory) { + if (threadFactory == null) { + throw new NullPointerException("threadFactory"); + } + this.threadFactory = threadFactory; + } + + @Override + public void execute(Runnable command) { + threadFactory.newThread(command).start(); + } +} diff --git a/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java b/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java index 8ee66f46ea..b8261288e4 100644 --- a/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java @@ -21,6 +21,7 @@ import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; +import java.util.concurrent.Executor; import java.util.concurrent.ThreadFactory; /** @@ -42,11 +43,18 @@ public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutor } } + /** + * @see {@link MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)} + */ + protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { + super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); + } + /** * @see {@link MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, ThreadFactory, Object...)} */ protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) { - super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args); + super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args); } @Override diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java index d334fbfaf5..2d3245468d 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java @@ -18,6 +18,7 @@ package io.netty.channel; import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.SingleThreadEventExecutor; +import java.util.concurrent.Executor; import java.util.concurrent.ThreadFactory; /** @@ -33,6 +34,13 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im super(parent, threadFactory, addTaskWakesUp); } + /** + * @see {@link SingleThreadEventExecutor#SingleThreadEventExecutor(EventExecutorGroup, Executor, boolean)} + */ + protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp) { + super(parent, executor, addTaskWakesUp); + } + @Override public EventLoopGroup parent() { return (EventLoopGroup) super.parent(); diff --git a/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoop.java b/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoop.java index a3a3d11b34..78397665d5 100644 --- a/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoop.java +++ b/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoop.java @@ -26,7 +26,7 @@ public class ThreadPerChannelEventLoop extends SingleThreadEventLoop { private Channel ch; public ThreadPerChannelEventLoop(ThreadPerChannelEventLoopGroup parent) { - super(parent, parent.threadFactory, true); + super(parent, parent.executor, true); this.parent = parent; } diff --git a/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoopGroup.java b/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoopGroup.java index 4c1cacebc5..39d494fd90 100644 --- a/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoopGroup.java @@ -23,6 +23,7 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.Promise; +import io.netty.util.concurrent.ThreadPerTaskExecutor; import io.netty.util.internal.EmptyArrays; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.ReadOnlyIterator; @@ -32,6 +33,7 @@ import java.util.Iterator; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; @@ -44,7 +46,7 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i private final Object[] childArgs; private final int maxChannels; - final ThreadFactory threadFactory; + final Executor executor; final Set activeChildren = Collections.newSetFromMap(PlatformDependent.newConcurrentHashMap()); final Queue idleChildren = new ConcurrentLinkedQueue(); @@ -95,12 +97,28 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i * @param args arguments which will passed to each {@link #newChild(Object...)} call. */ protected ThreadPerChannelEventLoopGroup(int maxChannels, ThreadFactory threadFactory, Object... args) { + this(maxChannels, new ThreadPerTaskExecutor(threadFactory), args); + } + + /** + * Create a new {@link ThreadPerChannelEventLoopGroup}. + * + * @param maxChannels the maximum number of channels to handle with this instance. Once you try to register + * a new {@link Channel} and the maximum is exceed it will throw an + * {@link ChannelException} on the {@link #register(Channel)} and + * {@link #register(Channel, ChannelPromise)} method. + * Use {@code 0} to use no limit + * @param executor the {@link Executor} used to create new {@link Thread} instances that handle the + * registered {@link Channel}s + * @param args arguments which will passed to each {@link #newChild(Object...)} call. + */ + protected ThreadPerChannelEventLoopGroup(int maxChannels, Executor executor, Object... args) { if (maxChannels < 0) { throw new IllegalArgumentException(String.format( "maxChannels: %d (expected: >= 0)", maxChannels)); } - if (threadFactory == null) { - throw new NullPointerException("threadFactory"); + if (executor == null) { + throw new NullPointerException("executor"); } if (args == null) { @@ -110,7 +128,7 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i } this.maxChannels = maxChannels; - this.threadFactory = threadFactory; + this.executor = executor; tooManyChannels = new ChannelException("too many channels (max: " + maxChannels + ')'); tooManyChannels.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE); diff --git a/transport/src/main/java/io/netty/channel/local/LocalEventLoop.java b/transport/src/main/java/io/netty/channel/local/LocalEventLoop.java index 418d5b2f96..b9aa21976c 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalEventLoop.java +++ b/transport/src/main/java/io/netty/channel/local/LocalEventLoop.java @@ -17,12 +17,12 @@ package io.netty.channel.local; import io.netty.channel.SingleThreadEventLoop; -import java.util.concurrent.ThreadFactory; +import java.util.concurrent.Executor; final class LocalEventLoop extends SingleThreadEventLoop { - LocalEventLoop(LocalEventLoopGroup parent, ThreadFactory threadFactory) { - super(parent, threadFactory, true); + LocalEventLoop(LocalEventLoopGroup parent, Executor executor) { + super(parent, executor, true); } @Override diff --git a/transport/src/main/java/io/netty/channel/local/LocalEventLoopGroup.java b/transport/src/main/java/io/netty/channel/local/LocalEventLoopGroup.java index dd3c7d316b..2e0a56d413 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/local/LocalEventLoopGroup.java @@ -18,6 +18,7 @@ package io.netty.channel.local; import io.netty.channel.MultithreadEventLoopGroup; import io.netty.util.concurrent.EventExecutor; +import java.util.concurrent.Executor; import java.util.concurrent.ThreadFactory; /** @@ -53,7 +54,7 @@ public class LocalEventLoopGroup extends MultithreadEventLoopGroup { @Override protected EventExecutor newChild( - ThreadFactory threadFactory, Object... args) throws Exception { - return new LocalEventLoop(this, threadFactory); + Executor executor, Object... args) throws Exception { + return new LocalEventLoop(this, executor); } } diff --git a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java index 45a4977b3a..e6817bc565 100644 --- a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java @@ -39,7 +39,7 @@ import java.util.Iterator; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ThreadFactory; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -111,8 +111,8 @@ public final class NioEventLoop extends SingleThreadEventLoop { private int cancelledKeys; private boolean needsToSelectAgain; - NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) { - super(parent, threadFactory, false); + NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider) { + super(parent, executor, false); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } diff --git a/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java b/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java index 96eb461eff..805aca589b 100644 --- a/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java @@ -21,6 +21,7 @@ import io.netty.util.concurrent.EventExecutor; import java.nio.channels.Selector; import java.nio.channels.spi.SelectorProvider; +import java.util.concurrent.Executor; import java.util.concurrent.ThreadFactory; /** @@ -41,7 +42,7 @@ public class NioEventLoopGroup extends MultithreadEventLoopGroup { * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}. */ public NioEventLoopGroup(int nThreads) { - this(nThreads, null); + this(nThreads, (Executor) null); } /** @@ -52,6 +53,10 @@ public class NioEventLoopGroup extends MultithreadEventLoopGroup { this(nThreads, threadFactory, SelectorProvider.provider()); } + public NioEventLoopGroup(int nThreads, Executor executor) { + this(nThreads, executor, SelectorProvider.provider()); + } + /** * Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the given * {@link SelectorProvider}. @@ -61,6 +66,11 @@ public class NioEventLoopGroup extends MultithreadEventLoopGroup { super(nThreads, threadFactory, selectorProvider); } + public NioEventLoopGroup( + int nThreads, Executor executor, final SelectorProvider selectorProvider) { + super(nThreads, executor, selectorProvider); + } + /** * Sets the percentage of the desired amount of time spent for I/O in the child event loops. The default value is * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks. @@ -83,7 +93,7 @@ public class NioEventLoopGroup extends MultithreadEventLoopGroup { @Override protected EventExecutor newChild( - ThreadFactory threadFactory, Object... args) throws Exception { - return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]); + Executor executor, Object... args) throws Exception { + return new NioEventLoop(this, executor, (SelectorProvider) args[0]); } } diff --git a/transport/src/main/java/io/netty/channel/oio/OioEventLoopGroup.java b/transport/src/main/java/io/netty/channel/oio/OioEventLoopGroup.java index cc0a9f3203..e803f8b325 100644 --- a/transport/src/main/java/io/netty/channel/oio/OioEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/oio/OioEventLoopGroup.java @@ -23,6 +23,7 @@ import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.ThreadPerChannelEventLoopGroup; +import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -52,6 +53,21 @@ public class OioEventLoopGroup extends ThreadPerChannelEventLoopGroup { this(maxChannels, Executors.defaultThreadFactory()); } + /** + * Create a new {@link OioEventLoopGroup}. + * + * @param maxChannels the maximum number of channels to handle with this instance. Once you try to register + * a new {@link Channel} and the maximum is exceed it will throw an + * {@link ChannelException} on the {@link #register(Channel)} and + * {@link #register(Channel, ChannelPromise)} method. + * Use {@code 0} to use no limit + * @param executor the {@link Executor} used to create new {@link Thread} instances that handle the + * registered {@link Channel}s + */ + public OioEventLoopGroup(int maxChannels, Executor executor) { + super(maxChannels, executor); + } + /** * Create a new {@link OioEventLoopGroup}. * diff --git a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java index 281cedc154..53eacaef4d 100644 --- a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java @@ -39,7 +39,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import static org.hamcrest.CoreMatchers.*; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.*; public class SingleThreadEventLoopTest {