Backport the pull request #70

This commit is contained in:
Trustin Lee 2011-11-22 16:34:35 +09:00
parent 13d0d84852
commit 0d4dfefeb0
2 changed files with 57 additions and 13 deletions

View File

@ -85,24 +85,44 @@ import org.jboss.netty.util.internal.ExecutorUtil;
*/
public class NioClientSocketChannelFactory implements ClientSocketChannelFactory {
private static final int DEFAULT_BOSS_COUNT = 1;
private final Executor bossExecutor;
private final Executor workerExecutor;
private final NioClientSocketPipelineSink sink;
/**
* Creates a new instance. Calling this constructor is same with calling
* {@link #NioClientSocketChannelFactory(Executor, Executor, int)} with 2 *
* the number of available processors in the machine. The number of
* {@link #NioClientSocketChannelFactory(Executor, Executor, int, int)} with
* 1 and (2 * the number of available processors in the machine) for
* <tt>bossCount</tt> and <tt>workerCount</tt> respectively. The number of
* available processors is obtained by {@link Runtime#availableProcessors()}.
*
* @param bossExecutor
* the {@link Executor} which will execute the boss thread
* @param workerExecutor
* the {@link Executor} which will execute the I/O worker threads
* the {@link Executor} which will execute the worker threads
*/
public NioClientSocketChannelFactory(
Executor bossExecutor, Executor workerExecutor) {
this(bossExecutor, workerExecutor, SelectorUtil.DEFAULT_IO_THREADS);
this(bossExecutor, workerExecutor, DEFAULT_BOSS_COUNT, SelectorUtil.DEFAULT_IO_THREADS);
}
/**
* Creates a new instance. Calling this constructor is same with calling
* {@link #NioClientSocketChannelFactory(Executor, Executor, int, int)} with
* 1 as <tt>bossCount</tt>.
*
* @param bossExecutor
* the {@link Executor} which will execute the boss thread
* @param workerExecutor
* the {@link Executor} which will execute the worker threads
* @param workerCount
* the maximum number of I/O worker threads
*/
public NioClientSocketChannelFactory(
Executor bossExecutor, Executor workerExecutor, int workerCount) {
this(bossExecutor, workerExecutor, DEFAULT_BOSS_COUNT, workerCount);
}
/**
@ -111,19 +131,26 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
* @param bossExecutor
* the {@link Executor} which will execute the boss thread
* @param workerExecutor
* the {@link Executor} which will execute the I/O worker threads
* the {@link Executor} which will execute the worker threads
* @param bossCount
* the maximum number of boss threads
* @param workerCount
* the maximum number of I/O worker threads
*/
public NioClientSocketChannelFactory(
Executor bossExecutor, Executor workerExecutor,
int workerCount) {
int bossCount, int workerCount) {
if (bossExecutor == null) {
throw new NullPointerException("bossExecutor");
}
if (workerExecutor == null) {
throw new NullPointerException("workerExecutor");
}
if (bossCount <= 0) {
throw new IllegalArgumentException(
"bossCount (" + bossCount + ") " +
"must be a positive integer.");
}
if (workerCount <= 0) {
throw new IllegalArgumentException(
"workerCount (" + workerCount + ") " +
@ -132,7 +159,8 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
this.bossExecutor = bossExecutor;
this.workerExecutor = workerExecutor;
sink = new NioClientSocketPipelineSink(bossExecutor, workerExecutor, workerCount);
sink = new NioClientSocketPipelineSink(
bossExecutor, workerExecutor, bossCount, workerCount);
}
public SocketChannel newChannel(ChannelPipeline pipeline) {

View File

@ -62,13 +62,23 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
final int id = nextId.incrementAndGet();
final Executor bossExecutor;
private final Boss boss = new Boss();
private final Boss[] bosses;
private final NioWorker[] workers;
private final AtomicInteger bossIndex = new AtomicInteger();
private final AtomicInteger workerIndex = new AtomicInteger();
NioClientSocketPipelineSink(
Executor bossExecutor, Executor workerExecutor, int workerCount) {
Executor bossExecutor, Executor workerExecutor,
int bossCount, int workerCount) {
this.bossExecutor = bossExecutor;
bosses = new Boss[bossCount];
for (int i = 0; i < bosses.length; i ++) {
bosses[i] = new Boss(i + 1);
}
workers = new NioWorker[workerCount];
for (int i = 0; i < workers.length; i ++) {
workers[i] = new NioWorker(id, i + 1, workerExecutor);
@ -150,7 +160,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
});
cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
channel.connectFuture = cf;
boss.register(channel);
nextBoss().register(channel);
}
} catch (Throwable t) {
@ -159,6 +169,11 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
channel.worker.close(channel, succeededFuture(channel));
}
}
Boss nextBoss() {
return bosses[Math.abs(
bossIndex.getAndIncrement() % bosses.length)];
}
NioWorker nextWorker() {
return workers[Math.abs(
@ -169,12 +184,13 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
volatile Selector selector;
private boolean started;
private final int subId;
private final AtomicBoolean wakenUp = new AtomicBoolean();
private final Object startStopLock = new Object();
private final Queue<Runnable> registerTaskQueue = new LinkedTransferQueue<Runnable>();
Boss() {
super();
Boss(int subId) {
this.subId = subId;
}
void register(NioClientSocketChannel channel) {
@ -197,7 +213,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
DeadLockProofWorker.start(
bossExecutor,
new ThreadRenamingRunnable(
this, "New I/O client boss #" + id));
this, "New I/O client boss #" + id + '-' + subId));
success = true;
} finally {
if (!success) {