Allow for multiple client boss threads.

This commit is contained in:
Jeff Griffith 2011-11-21 15:03:18 -05:00
parent 1e0dee3e9b
commit 0d0764d082
2 changed files with 50 additions and 5 deletions

View File

@ -88,6 +88,7 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
private final Executor bossExecutor; private final Executor bossExecutor;
private final Executor workerExecutor; private final Executor workerExecutor;
private final NioClientSocketPipelineSink sink; private final NioClientSocketPipelineSink sink;
private static final int DEFAULT_BOSS_COUNT = 1;
/** /**
* Creates a new instance. Calling this constructor is same with calling * Creates a new instance. Calling this constructor is same with calling
@ -118,12 +119,36 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
public NioClientSocketChannelFactory( public NioClientSocketChannelFactory(
Executor bossExecutor, Executor workerExecutor, Executor bossExecutor, Executor workerExecutor,
int workerCount) { int workerCount) {
this(bossExecutor, workerExecutor, DEFAULT_BOSS_COUNT, workerCount);
}
/**
* Creates a new instance.
*
* @param bossExecutor
* the {@link Executor} which will execute the boss thread
* @param workerExecutor
* the {@link Executor} which will execute the I/O worker threads
* @param bossCount
* the maximum number of boss threads
* @param workerCount
* the maximum number of I/O worker threads
*/
public NioClientSocketChannelFactory(
Executor bossExecutor, Executor workerExecutor,
int bossCount, int workerCount) {
if (bossExecutor == null) { if (bossExecutor == null) {
throw new NullPointerException("bossExecutor"); throw new NullPointerException("bossExecutor");
} }
if (workerExecutor == null) { if (workerExecutor == null) {
throw new NullPointerException("workerExecutor"); throw new NullPointerException("workerExecutor");
} }
if (bossCount <= 0) {
throw new IllegalArgumentException(
"bossCount (" + bossCount + ") " +
"must be a positive integer.");
}
if (workerCount <= 0) { if (workerCount <= 0) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"workerCount (" + workerCount + ") " + "workerCount (" + workerCount + ") " +
@ -132,9 +157,10 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
this.bossExecutor = bossExecutor; this.bossExecutor = bossExecutor;
this.workerExecutor = workerExecutor; this.workerExecutor = workerExecutor;
sink = new NioClientSocketPipelineSink(bossExecutor, workerExecutor, workerCount); sink = new NioClientSocketPipelineSink(bossExecutor, workerExecutor, bossCount, workerCount);
} }
@Override @Override
public SocketChannel newChannel(ChannelPipeline pipeline) { public SocketChannel newChannel(ChannelPipeline pipeline) {
return NioClientSocketChannel.create(this, pipeline, sink, sink.nextWorker()); return NioClientSocketChannel.create(this, pipeline, sink, sink.nextWorker());

View File

@ -59,17 +59,31 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class); InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class);
final Executor bossExecutor; final Executor bossExecutor;
private final Boss boss = new Boss(); private int numBosses = 1;
private final Boss bosses[];
private final NioWorker[] workers; private final NioWorker[] workers;
private final AtomicInteger bossIndex = new AtomicInteger();
private final AtomicInteger workerIndex = new AtomicInteger(); private final AtomicInteger workerIndex = new AtomicInteger();
NioClientSocketPipelineSink( NioClientSocketPipelineSink(Executor bossExecutor, Executor workerExecutor, int workerCount) {
Executor bossExecutor, Executor workerExecutor, int workerCount) { this(bossExecutor, workerExecutor, 1, workerCount);
}
NioClientSocketPipelineSink(Executor bossExecutor, Executor workerExecutor, int bossCount, int workerCount) {
this.bossExecutor = bossExecutor; this.bossExecutor = bossExecutor;
this.numBosses = bossCount;
workers = new NioWorker[workerCount]; workers = new NioWorker[workerCount];
for (int i = 0; i < workers.length; i ++) { for (int i = 0; i < workers.length; i ++) {
workers[i] = new NioWorker(workerExecutor); workers[i] = new NioWorker(workerExecutor);
} }
bosses = new Boss[numBosses];
for (int i = 0; i < numBosses; ++i) {
bosses[i] = new Boss();
}
} }
@Override @Override
@ -149,7 +163,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
}); });
cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
channel.connectFuture = cf; channel.connectFuture = cf;
boss.register(channel); nextBoss().register(channel);
} }
} catch (Throwable t) { } catch (Throwable t) {
@ -164,6 +178,11 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
workerIndex.getAndIncrement() % workers.length)]; workerIndex.getAndIncrement() % workers.length)];
} }
Boss nextBoss() {
return bosses[Math.abs(
bossIndex.getAndIncrement() % bosses.length)];
}
private final class Boss implements Runnable { private final class Boss implements Runnable {
volatile Selector selector; volatile Selector selector;