diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannelFactory.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannelFactory.java index 33b0c4fa67..f86f3a5065 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannelFactory.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannelFactory.java @@ -86,64 +86,110 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory private final WorkerPool workerPool; private final ChannelSink sink; + private WorkerPool bossWorkerPool; /** * Create a new {@link NioServerSocketChannelFactory} using - * {@link Executors#newCachedThreadPool()} for the worker. + * {@link Executors#newCachedThreadPool()} for the workers. * * See {@link #NioServerSocketChannelFactory(Executor, Executor)} */ public NioServerSocketChannelFactory() { - this(Executors.newCachedThreadPool()); + this(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); } /** * Creates a new instance. Calling this constructor is same with calling - * {@link #NioServerSocketChannelFactory(Executor, Executor, int)} with 2 * - * the number of available processors in the machine. The number of + * {@link #NioServerSocketChannelFactory(Executor, Executor, int, int)} with 1 + * as boss count and 2 * the number of available processors in the machine. The number of * available processors is obtained by {@link Runtime#availableProcessors()}. * - + * @param bossExecutor + * the {@link Executor} which will execute the I/O worker threads that handle the accepting of new connections * @param workerExecutor * the {@link Executor} which will execute the I/O worker threads */ - public NioServerSocketChannelFactory(Executor workerExecutor) { - this(workerExecutor, SelectorUtil.DEFAULT_IO_THREADS); + public NioServerSocketChannelFactory(Executor bossExecutor, Executor workerExecutor) { + this(bossExecutor, workerExecutor, SelectorUtil.DEFAULT_IO_ACCEPTING_THREADS, SelectorUtil.DEFAULT_IO_THREADS); } /** * Creates a new instance. - * + * + * @param bossExecutor + * the {@link Executor} which will execute the I/O worker threads that handle the accepting of new connections * @param workerExecutor * the {@link Executor} which will execute the I/O worker threads + * @param bossCount + * the maximum number of I/O worker threads that handling the accepting of connections * @param workerCount * the maximum number of I/O worker threads */ - public NioServerSocketChannelFactory(Executor workerExecutor, + public NioServerSocketChannelFactory(Executor bossExecutor, Executor workerExecutor, int bossCount, int workerCount) { - this(new NioWorkerPool(workerExecutor, workerCount, true)); + this(new NioWorkerPool(bossExecutor, bossCount, true), new NioWorkerPool(workerExecutor, workerCount, true)); } /** * Creates a new instance. * + * @param bossWorkerPool + * the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads that handle the accepting of new connections * @param workerPool * the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads */ - public NioServerSocketChannelFactory(WorkerPool workerPool) { + public NioServerSocketChannelFactory(WorkerPool bossWorkerPool, WorkerPool workerPool) { + if (bossWorkerPool == null) { + throw new NullPointerException("bossWorkerPool"); + } if (workerPool == null) { throw new NullPointerException("workerPool"); } - + this.bossWorkerPool = bossWorkerPool; this.workerPool = workerPool; sink = new NioServerSocketPipelineSink(); } + /** + * Creates a new instance which use the given {@link WorkerPool} for everything. + * + * @param genericExecutor + * the {@link Executor} which will execute the I/O worker threads ( this also includes handle the accepting of new connections) + * @param workerCount + * the maximum number of I/O worker threads + * + */ + public NioServerSocketChannelFactory(Executor genericExecutor, int workerCount) { + this(new NioWorkerPool(genericExecutor, workerCount, true)); + } + + /** + * Creates a new instance which use the given {@link WorkerPool} for everything. + * + * @param genericExecutor + * the {@link Executor} which will execute the I/O worker threads ( this also includes handle the accepting of new connections) + * + */ + public NioServerSocketChannelFactory(Executor genericExecutor) { + this(genericExecutor, SelectorUtil.DEFAULT_IO_ACCEPTING_THREADS + SelectorUtil.DEFAULT_IO_THREADS); + } + + + /** + * Creates a new instance which use the given {@link WorkerPool} for everything. + * + * @param genericWorkerPool + * the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads (that included accepting of new connections) + */ + public NioServerSocketChannelFactory(WorkerPool genericWorkerPool) { + this(genericWorkerPool, genericWorkerPool); + } + @Override public ServerSocketChannel newChannel(ChannelPipeline pipeline) { - return NioServerSocketChannel.create(this, pipeline, sink, workerPool.nextWorker(), workerPool); + return NioServerSocketChannel.create(this, pipeline, sink, bossWorkerPool.nextWorker(), workerPool); } @Override diff --git a/transport/src/main/java/io/netty/channel/socket/nio/SelectorUtil.java b/transport/src/main/java/io/netty/channel/socket/nio/SelectorUtil.java index 8f03731e11..9e41302ea7 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/SelectorUtil.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/SelectorUtil.java @@ -28,6 +28,9 @@ public final class SelectorUtil { public static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2; + public static final int DEFAULT_IO_ACCEPTING_THREADS = 1; + + // Workaround for JDK NIO bug. // // See: