Merge pull request #70 from jeffgriffith/master
Modification to allow multiple boss threads in client
This commit is contained in:
commit
5c2c8d9d1d
@ -88,6 +88,7 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
|
||||
private final Executor bossExecutor;
|
||||
private final Executor workerExecutor;
|
||||
private final NioClientSocketPipelineSink sink;
|
||||
private static final int DEFAULT_BOSS_COUNT = 1;
|
||||
|
||||
/**
|
||||
* Creates a new instance. Calling this constructor is same with calling
|
||||
@ -118,12 +119,36 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
|
||||
public NioClientSocketChannelFactory(
|
||||
Executor bossExecutor, Executor workerExecutor,
|
||||
int workerCount) {
|
||||
this(bossExecutor, workerExecutor, DEFAULT_BOSS_COUNT, workerCount);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new instance.
|
||||
*
|
||||
* @param bossExecutor
|
||||
* the {@link Executor} which will execute the boss thread
|
||||
* @param workerExecutor
|
||||
* the {@link Executor} which will execute the I/O worker threads
|
||||
* @param bossCount
|
||||
* the maximum number of boss threads
|
||||
* @param workerCount
|
||||
* the maximum number of I/O worker threads
|
||||
*/
|
||||
public NioClientSocketChannelFactory(
|
||||
Executor bossExecutor, Executor workerExecutor,
|
||||
int bossCount, int workerCount) {
|
||||
|
||||
if (bossExecutor == null) {
|
||||
throw new NullPointerException("bossExecutor");
|
||||
}
|
||||
if (workerExecutor == null) {
|
||||
throw new NullPointerException("workerExecutor");
|
||||
}
|
||||
if (bossCount <= 0) {
|
||||
throw new IllegalArgumentException(
|
||||
"bossCount (" + bossCount + ") " +
|
||||
"must be a positive integer.");
|
||||
}
|
||||
if (workerCount <= 0) {
|
||||
throw new IllegalArgumentException(
|
||||
"workerCount (" + workerCount + ") " +
|
||||
@ -132,9 +157,10 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
|
||||
|
||||
this.bossExecutor = bossExecutor;
|
||||
this.workerExecutor = workerExecutor;
|
||||
sink = new NioClientSocketPipelineSink(bossExecutor, workerExecutor, workerCount);
|
||||
sink = new NioClientSocketPipelineSink(bossExecutor, workerExecutor, bossCount, workerCount);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public SocketChannel newChannel(ChannelPipeline pipeline) {
|
||||
return NioClientSocketChannel.create(this, pipeline, sink, sink.nextWorker());
|
||||
|
@ -58,18 +58,33 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
|
||||
static final InternalLogger logger =
|
||||
InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class);
|
||||
|
||||
private static final int DEFAULT_BOSS_COUNT = 1;
|
||||
final Executor bossExecutor;
|
||||
private final Boss boss = new Boss();
|
||||
private final int numBosses;
|
||||
private final Boss bosses[];
|
||||
private final NioWorker[] workers;
|
||||
|
||||
private final AtomicInteger bossIndex = new AtomicInteger();
|
||||
private final AtomicInteger workerIndex = new AtomicInteger();
|
||||
|
||||
NioClientSocketPipelineSink(
|
||||
Executor bossExecutor, Executor workerExecutor, int workerCount) {
|
||||
NioClientSocketPipelineSink(Executor bossExecutor, Executor workerExecutor, int workerCount) {
|
||||
this(bossExecutor, workerExecutor, DEFAULT_BOSS_COUNT, workerCount);
|
||||
}
|
||||
|
||||
NioClientSocketPipelineSink(Executor bossExecutor, Executor workerExecutor, int bossCount, int workerCount) {
|
||||
|
||||
this.bossExecutor = bossExecutor;
|
||||
this.numBosses = bossCount;
|
||||
|
||||
workers = new NioWorker[workerCount];
|
||||
for (int i = 0; i < workers.length; i ++) {
|
||||
workers[i] = new NioWorker(workerExecutor);
|
||||
}
|
||||
|
||||
bosses = new Boss[numBosses];
|
||||
for (int i = 0; i < numBosses; ++i) {
|
||||
bosses[i] = new Boss();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -149,7 +164,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
|
||||
});
|
||||
cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
|
||||
channel.connectFuture = cf;
|
||||
boss.register(channel);
|
||||
nextBoss().register(channel);
|
||||
}
|
||||
|
||||
} catch (Throwable t) {
|
||||
@ -163,6 +178,11 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
|
||||
return workers[Math.abs(
|
||||
workerIndex.getAndIncrement() % workers.length)];
|
||||
}
|
||||
|
||||
Boss nextBoss() {
|
||||
return bosses[Math.abs(
|
||||
bossIndex.getAndIncrement() % bosses.length)];
|
||||
}
|
||||
|
||||
private final class Boss implements Runnable {
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user