diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannelFactory.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannelFactory.java index b9f06d639d..e77bfc8640 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannelFactory.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannelFactory.java @@ -88,6 +88,7 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory private final Executor bossExecutor; private final Executor workerExecutor; private final NioClientSocketPipelineSink sink; + private static final int DEFAULT_BOSS_COUNT = 1; /** * Creates a new instance. Calling this constructor is same with calling @@ -118,12 +119,36 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory public NioClientSocketChannelFactory( Executor bossExecutor, Executor workerExecutor, int workerCount) { + this(bossExecutor, workerExecutor, DEFAULT_BOSS_COUNT, workerCount); + } + + /** + * Creates a new instance. + * + * @param bossExecutor + * the {@link Executor} which will execute the boss thread + * @param workerExecutor + * the {@link Executor} which will execute the I/O worker threads + * @param bossCount + * the maximum number of boss threads + * @param workerCount + * the maximum number of I/O worker threads + */ + public NioClientSocketChannelFactory( + Executor bossExecutor, Executor workerExecutor, + int bossCount, int workerCount) { + if (bossExecutor == null) { throw new NullPointerException("bossExecutor"); } if (workerExecutor == null) { throw new NullPointerException("workerExecutor"); } + if (bossCount <= 0) { + throw new IllegalArgumentException( + "bossCount (" + bossCount + ") " + + "must be a positive integer."); + } if (workerCount <= 0) { throw new IllegalArgumentException( "workerCount (" + workerCount + ") " + @@ -132,9 +157,10 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory this.bossExecutor = bossExecutor; this.workerExecutor = workerExecutor; - sink = new NioClientSocketPipelineSink(bossExecutor, workerExecutor, workerCount); + sink = new NioClientSocketPipelineSink(bossExecutor, workerExecutor, bossCount, workerCount); } + @Override public SocketChannel newChannel(ChannelPipeline pipeline) { return NioClientSocketChannel.create(this, pipeline, sink, sink.nextWorker()); diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java index 276dd2c929..1688d8ece7 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java @@ -59,17 +59,31 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class); final Executor bossExecutor; - private final Boss boss = new Boss(); + private int numBosses = 1; + private final Boss bosses[]; private final NioWorker[] workers; + + private final AtomicInteger bossIndex = new AtomicInteger(); private final AtomicInteger workerIndex = new AtomicInteger(); - NioClientSocketPipelineSink( - Executor bossExecutor, Executor workerExecutor, int workerCount) { + NioClientSocketPipelineSink(Executor bossExecutor, Executor workerExecutor, int workerCount) { + this(bossExecutor, workerExecutor, 1, workerCount); + } + + NioClientSocketPipelineSink(Executor bossExecutor, Executor workerExecutor, int bossCount, int workerCount) { + this.bossExecutor = bossExecutor; + this.numBosses = bossCount; + workers = new NioWorker[workerCount]; for (int i = 0; i < workers.length; i ++) { workers[i] = new NioWorker(workerExecutor); } + + bosses = new Boss[numBosses]; + for (int i = 0; i < numBosses; ++i) { + bosses[i] = new Boss(); + } } @Override @@ -149,7 +163,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { }); cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); channel.connectFuture = cf; - boss.register(channel); + nextBoss().register(channel); } } catch (Throwable t) { @@ -163,6 +177,11 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { return workers[Math.abs( workerIndex.getAndIncrement() % workers.length)]; } + + Boss nextBoss() { + return bosses[Math.abs( + bossIndex.getAndIncrement() % bosses.length)]; + } private final class Boss implements Runnable {