Allow to share a WorkerPool for boss and worker threads but also allow to have them separate. See #240

This commit is contained in:
norman 2012-04-11 09:15:02 +02:00
parent 470c1a898a
commit 5b53b66fbf
2 changed files with 62 additions and 13 deletions

View File

@ -86,64 +86,110 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory
private final WorkerPool<NioWorker> workerPool;
private final ChannelSink sink;
private WorkerPool<NioWorker> bossWorkerPool;
/**
* Create a new {@link NioServerSocketChannelFactory} using
* {@link Executors#newCachedThreadPool()} for the worker.
* {@link Executors#newCachedThreadPool()} for the workers.
*
* See {@link #NioServerSocketChannelFactory(Executor, Executor)}
*/
public NioServerSocketChannelFactory() {
this(Executors.newCachedThreadPool());
this(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
}
/**
* Creates a new instance. Calling this constructor is same with calling
* {@link #NioServerSocketChannelFactory(Executor, Executor, int)} with 2 *
* the number of available processors in the machine. The number of
* {@link #NioServerSocketChannelFactory(Executor, Executor, int, int)} with 1
* as boss count and 2 * the number of available processors in the machine. The number of
* available processors is obtained by {@link Runtime#availableProcessors()}.
*
* @param bossExecutor
* the {@link Executor} which will execute the I/O worker threads that handle the accepting of new connections
* @param workerExecutor
* the {@link Executor} which will execute the I/O worker threads
*/
public NioServerSocketChannelFactory(Executor workerExecutor) {
this(workerExecutor, SelectorUtil.DEFAULT_IO_THREADS);
public NioServerSocketChannelFactory(Executor bossExecutor, Executor workerExecutor) {
this(bossExecutor, workerExecutor, SelectorUtil.DEFAULT_IO_ACCEPTING_THREADS, SelectorUtil.DEFAULT_IO_THREADS);
}
/**
* Creates a new instance.
*
* @param bossExecutor
* the {@link Executor} which will execute the I/O worker threads that handle the accepting of new connections
* @param workerExecutor
* the {@link Executor} which will execute the I/O worker threads
* @param bossCount
* the maximum number of I/O worker threads that handling the accepting of connections
* @param workerCount
* the maximum number of I/O worker threads
*/
public NioServerSocketChannelFactory(Executor workerExecutor,
public NioServerSocketChannelFactory(Executor bossExecutor, Executor workerExecutor, int bossCount,
int workerCount) {
this(new NioWorkerPool(workerExecutor, workerCount, true));
this(new NioWorkerPool(bossExecutor, bossCount, true), new NioWorkerPool(workerExecutor, workerCount, true));
}
/**
* Creates a new instance.
*
* @param bossWorkerPool
* the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads that handle the accepting of new connections
* @param workerPool
* the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads
*/
public NioServerSocketChannelFactory(WorkerPool<NioWorker> workerPool) {
public NioServerSocketChannelFactory(WorkerPool<NioWorker> bossWorkerPool, WorkerPool<NioWorker> workerPool) {
if (bossWorkerPool == null) {
throw new NullPointerException("bossWorkerPool");
}
if (workerPool == null) {
throw new NullPointerException("workerPool");
}
this.bossWorkerPool = bossWorkerPool;
this.workerPool = workerPool;
sink = new NioServerSocketPipelineSink();
}
/**
* Creates a new instance which use the given {@link WorkerPool} for everything.
*
* @param genericExecutor
* the {@link Executor} which will execute the I/O worker threads ( this also includes handle the accepting of new connections)
* @param workerCount
* the maximum number of I/O worker threads
*
*/
public NioServerSocketChannelFactory(Executor genericExecutor, int workerCount) {
this(new NioWorkerPool(genericExecutor, workerCount, true));
}
/**
* Creates a new instance which use the given {@link WorkerPool} for everything.
*
* @param genericExecutor
* the {@link Executor} which will execute the I/O worker threads ( this also includes handle the accepting of new connections)
*
*/
public NioServerSocketChannelFactory(Executor genericExecutor) {
this(genericExecutor, SelectorUtil.DEFAULT_IO_ACCEPTING_THREADS + SelectorUtil.DEFAULT_IO_THREADS);
}
/**
* Creates a new instance which use the given {@link WorkerPool} for everything.
*
* @param genericWorkerPool
* the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads (that included accepting of new connections)
*/
public NioServerSocketChannelFactory(WorkerPool<NioWorker> genericWorkerPool) {
this(genericWorkerPool, genericWorkerPool);
}
@Override
public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
return NioServerSocketChannel.create(this, pipeline, sink, workerPool.nextWorker(), workerPool);
return NioServerSocketChannel.create(this, pipeline, sink, bossWorkerPool.nextWorker(), workerPool);
}
@Override

View File

@ -28,6 +28,9 @@ public final class SelectorUtil {
public static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2;
public static final int DEFAULT_IO_ACCEPTING_THREADS = 1;
// Workaround for JDK NIO bug.
//
// See: