Allow to share a WorkerPool for boss and worker threads but also allow to have them separate. See #240

This commit is contained in:
norman 2012-04-11 09:15:02 +02:00
parent f88cd3120d
commit 16c625cfd0
2 changed files with 62 additions and 13 deletions

View File

@ -86,64 +86,110 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory
private final WorkerPool<NioWorker> workerPool; private final WorkerPool<NioWorker> workerPool;
private final ChannelSink sink; private final ChannelSink sink;
private WorkerPool<NioWorker> bossWorkerPool;
/** /**
* Create a new {@link NioServerSocketChannelFactory} using * Create a new {@link NioServerSocketChannelFactory} using
* {@link Executors#newCachedThreadPool()} for the worker. * {@link Executors#newCachedThreadPool()} for the workers.
* *
* See {@link #NioServerSocketChannelFactory(Executor, Executor)} * See {@link #NioServerSocketChannelFactory(Executor, Executor)}
*/ */
public NioServerSocketChannelFactory() { public NioServerSocketChannelFactory() {
this(Executors.newCachedThreadPool()); this(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
} }
/** /**
* Creates a new instance. Calling this constructor is same with calling * Creates a new instance. Calling this constructor is same with calling
* {@link #NioServerSocketChannelFactory(Executor, Executor, int)} with 2 * * {@link #NioServerSocketChannelFactory(Executor, Executor, int, int)} with 1
* the number of available processors in the machine. The number of * as boss count and 2 * the number of available processors in the machine. The number of
* available processors is obtained by {@link Runtime#availableProcessors()}. * available processors is obtained by {@link Runtime#availableProcessors()}.
* *
* @param bossExecutor
* the {@link Executor} which will execute the I/O worker threads that handle the accepting of new connections
* @param workerExecutor * @param workerExecutor
* the {@link Executor} which will execute the I/O worker threads * the {@link Executor} which will execute the I/O worker threads
*/ */
public NioServerSocketChannelFactory(Executor workerExecutor) { public NioServerSocketChannelFactory(Executor bossExecutor, Executor workerExecutor) {
this(workerExecutor, SelectorUtil.DEFAULT_IO_THREADS); this(bossExecutor, workerExecutor, SelectorUtil.DEFAULT_IO_ACCEPTING_THREADS, SelectorUtil.DEFAULT_IO_THREADS);
} }
/** /**
* Creates a new instance. * Creates a new instance.
* *
* @param bossExecutor
* the {@link Executor} which will execute the I/O worker threads that handle the accepting of new connections
* @param workerExecutor * @param workerExecutor
* the {@link Executor} which will execute the I/O worker threads * the {@link Executor} which will execute the I/O worker threads
* @param bossCount
* the maximum number of I/O worker threads that handling the accepting of connections
* @param workerCount * @param workerCount
* the maximum number of I/O worker threads * the maximum number of I/O worker threads
*/ */
public NioServerSocketChannelFactory(Executor workerExecutor, public NioServerSocketChannelFactory(Executor bossExecutor, Executor workerExecutor, int bossCount,
int workerCount) { int workerCount) {
this(new NioWorkerPool(workerExecutor, workerCount, true)); this(new NioWorkerPool(bossExecutor, bossCount, true), new NioWorkerPool(workerExecutor, workerCount, true));
} }
/** /**
* Creates a new instance. * Creates a new instance.
* *
* @param bossWorkerPool
* the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads that handle the accepting of new connections
* @param workerPool * @param workerPool
* the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads * the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads
*/ */
public NioServerSocketChannelFactory(WorkerPool<NioWorker> workerPool) { public NioServerSocketChannelFactory(WorkerPool<NioWorker> bossWorkerPool, WorkerPool<NioWorker> workerPool) {
if (bossWorkerPool == null) {
throw new NullPointerException("bossWorkerPool");
}
if (workerPool == null) { if (workerPool == null) {
throw new NullPointerException("workerPool"); throw new NullPointerException("workerPool");
} }
this.bossWorkerPool = bossWorkerPool;
this.workerPool = workerPool; this.workerPool = workerPool;
sink = new NioServerSocketPipelineSink(); sink = new NioServerSocketPipelineSink();
} }
/**
* Creates a new instance which use the given {@link WorkerPool} for everything.
*
* @param genericExecutor
* the {@link Executor} which will execute the I/O worker threads ( this also includes handle the accepting of new connections)
* @param workerCount
* the maximum number of I/O worker threads
*
*/
public NioServerSocketChannelFactory(Executor genericExecutor, int workerCount) {
this(new NioWorkerPool(genericExecutor, workerCount, true));
}
/**
* Creates a new instance which use the given {@link WorkerPool} for everything.
*
* @param genericExecutor
* the {@link Executor} which will execute the I/O worker threads ( this also includes handle the accepting of new connections)
*
*/
public NioServerSocketChannelFactory(Executor genericExecutor) {
this(genericExecutor, SelectorUtil.DEFAULT_IO_ACCEPTING_THREADS + SelectorUtil.DEFAULT_IO_THREADS);
}
/**
* Creates a new instance which use the given {@link WorkerPool} for everything.
*
* @param genericWorkerPool
* the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads (that included accepting of new connections)
*/
public NioServerSocketChannelFactory(WorkerPool<NioWorker> genericWorkerPool) {
this(genericWorkerPool, genericWorkerPool);
}
@Override @Override
public ServerSocketChannel newChannel(ChannelPipeline pipeline) { public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
return NioServerSocketChannel.create(this, pipeline, sink, workerPool.nextWorker(), workerPool); return NioServerSocketChannel.create(this, pipeline, sink, bossWorkerPool.nextWorker(), workerPool);
} }
@Override @Override

View File

@ -28,6 +28,9 @@ public final class SelectorUtil {
public static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2; public static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2;
public static final int DEFAULT_IO_ACCEPTING_THREADS = 1;
// Workaround for JDK NIO bug. // Workaround for JDK NIO bug.
// //
// See: // See: