diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannelFactory.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannelFactory.java index 12b07ba8c5..ee64834d71 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannelFactory.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannelFactory.java @@ -85,24 +85,44 @@ import org.jboss.netty.util.internal.ExecutorUtil; */ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory { + private static final int DEFAULT_BOSS_COUNT = 1; + private final Executor bossExecutor; private final Executor workerExecutor; private final NioClientSocketPipelineSink sink; /** * Creates a new instance. Calling this constructor is same with calling - * {@link #NioClientSocketChannelFactory(Executor, Executor, int)} with 2 * - * the number of available processors in the machine. The number of + * {@link #NioClientSocketChannelFactory(Executor, Executor, int, int)} with + * 1 and (2 * the number of available processors in the machine) for + * bossCount and workerCount respectively. The number of * available processors is obtained by {@link Runtime#availableProcessors()}. * * @param bossExecutor * the {@link Executor} which will execute the boss thread * @param workerExecutor - * the {@link Executor} which will execute the I/O worker threads + * the {@link Executor} which will execute the worker threads */ public NioClientSocketChannelFactory( Executor bossExecutor, Executor workerExecutor) { - this(bossExecutor, workerExecutor, SelectorUtil.DEFAULT_IO_THREADS); + this(bossExecutor, workerExecutor, DEFAULT_BOSS_COUNT, SelectorUtil.DEFAULT_IO_THREADS); + } + + /** + * Creates a new instance. Calling this constructor is same with calling + * {@link #NioClientSocketChannelFactory(Executor, Executor, int, int)} with + * 1 as bossCount. + * + * @param bossExecutor + * the {@link Executor} which will execute the boss thread + * @param workerExecutor + * the {@link Executor} which will execute the worker threads + * @param workerCount + * the maximum number of I/O worker threads + */ + public NioClientSocketChannelFactory( + Executor bossExecutor, Executor workerExecutor, int workerCount) { + this(bossExecutor, workerExecutor, DEFAULT_BOSS_COUNT, workerCount); } /** @@ -111,19 +131,26 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory * @param bossExecutor * the {@link Executor} which will execute the boss thread * @param workerExecutor - * the {@link Executor} which will execute the I/O worker threads + * the {@link Executor} which will execute the worker threads + * @param bossCount + * the maximum number of boss threads * @param workerCount * the maximum number of I/O worker threads */ public NioClientSocketChannelFactory( Executor bossExecutor, Executor workerExecutor, - int workerCount) { + int bossCount, int workerCount) { if (bossExecutor == null) { throw new NullPointerException("bossExecutor"); } if (workerExecutor == null) { throw new NullPointerException("workerExecutor"); } + if (bossCount <= 0) { + throw new IllegalArgumentException( + "bossCount (" + bossCount + ") " + + "must be a positive integer."); + } if (workerCount <= 0) { throw new IllegalArgumentException( "workerCount (" + workerCount + ") " + @@ -132,7 +159,8 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory this.bossExecutor = bossExecutor; this.workerExecutor = workerExecutor; - sink = new NioClientSocketPipelineSink(bossExecutor, workerExecutor, workerCount); + sink = new NioClientSocketPipelineSink( + bossExecutor, workerExecutor, bossCount, workerCount); } public SocketChannel newChannel(ChannelPipeline pipeline) { diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java index 6286da56c7..9ff9d196a2 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java @@ -62,13 +62,23 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { final int id = nextId.incrementAndGet(); final Executor bossExecutor; - private final Boss boss = new Boss(); + + private final Boss[] bosses; private final NioWorker[] workers; + + private final AtomicInteger bossIndex = new AtomicInteger(); private final AtomicInteger workerIndex = new AtomicInteger(); NioClientSocketPipelineSink( - Executor bossExecutor, Executor workerExecutor, int workerCount) { + Executor bossExecutor, Executor workerExecutor, + int bossCount, int workerCount) { this.bossExecutor = bossExecutor; + + bosses = new Boss[bossCount]; + for (int i = 0; i < bosses.length; i ++) { + bosses[i] = new Boss(i + 1); + } + workers = new NioWorker[workerCount]; for (int i = 0; i < workers.length; i ++) { workers[i] = new NioWorker(id, i + 1, workerExecutor); @@ -150,7 +160,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { }); cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); channel.connectFuture = cf; - boss.register(channel); + nextBoss().register(channel); } } catch (Throwable t) { @@ -159,6 +169,11 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { channel.worker.close(channel, succeededFuture(channel)); } } + + Boss nextBoss() { + return bosses[Math.abs( + bossIndex.getAndIncrement() % bosses.length)]; + } NioWorker nextWorker() { return workers[Math.abs( @@ -169,12 +184,13 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { volatile Selector selector; private boolean started; + private final int subId; private final AtomicBoolean wakenUp = new AtomicBoolean(); private final Object startStopLock = new Object(); private final Queue registerTaskQueue = new LinkedTransferQueue(); - Boss() { - super(); + Boss(int subId) { + this.subId = subId; } void register(NioClientSocketChannel channel) { @@ -197,7 +213,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { DeadLockProofWorker.start( bossExecutor, new ThreadRenamingRunnable( - this, "New I/O client boss #" + id)); + this, "New I/O client boss #" + id + '-' + subId)); success = true; } finally { if (!success) {