Warn if boss/worker threads are not up within 10 seconds

.. because it is very likely to be a user mistake.

- Fixes #1304
This commit is contained in:
Trustin Lee 2013-10-17 21:12:03 +09:00
parent cc9c7d1607
commit b2d624a3ba
3 changed files with 105 additions and 8 deletions

View File

@ -15,19 +15,29 @@
*/
package org.jboss.netty.channel.socket.nio;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ExternalResourceReleasable;
import org.jboss.netty.util.internal.ExecutorUtil;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public abstract class AbstractNioBossPool<E extends Boss>
implements BossPool<E>, ExternalResourceReleasable {
/**
* The boss pool raises an exception unless all boss threads start and run within this timeout (in seconds.)
*/
private static final int INITIALIZATION_TIMEOUT = 10;
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractNioBossPool.class);
private final Boss[] bosses;
private final AtomicInteger bossIndex = new AtomicInteger();
private final Executor bossExecutor;
private volatile boolean initDone;
private volatile boolean initialized;
/**
* Create a new instance
@ -56,14 +66,51 @@ public abstract class AbstractNioBossPool<E extends Boss>
}
protected void init() {
if (initDone) {
throw new IllegalStateException("Init was done before");
if (initialized) {
throw new IllegalStateException("initialized already");
}
initDone = true;
initialized = true;
for (int i = 0; i < bosses.length; i++) {
bosses[i] = newBoss(bossExecutor);
}
waitForBossThreads();
}
private void waitForBossThreads() {
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(INITIALIZATION_TIMEOUT);
boolean warn = false;
for (Boss boss: bosses) {
if (!(boss instanceof AbstractNioSelector)) {
continue;
}
AbstractNioSelector selector = (AbstractNioSelector) boss;
long waitTime = deadline - System.nanoTime();
try {
if (waitTime <= 0) {
if (selector.thread == null) {
warn = true;
break;
}
} else if (!selector.startupLatch.await(waitTime, TimeUnit.NANOSECONDS)) {
warn = true;
break;
}
} catch (InterruptedException ignore) {
// Stop waiting for the boss threads and let someone else take care of the interruption.
Thread.currentThread().interrupt();
break;
}
}
if (!warn) {
logger.warn(
"Failed to get all boss threads ready within " + INITIALIZATION_TIMEOUT + " second(s). " +
"Make sure to specify the executor which has more threads than the requested bossCount. " +
"If unsure, use Executors.newCachedThreadPool().");
}
}
/**

View File

@ -66,6 +66,11 @@ abstract class AbstractNioSelector implements NioSelector {
*/
protected volatile Thread thread;
/**
* Count down to 0 when the I/O thread starts and {@link #thread} is set to non-null.
*/
final CountDownLatch startupLatch = new CountDownLatch(1);
/**
* The NIO {@link Selector}.
*/
@ -188,6 +193,7 @@ abstract class AbstractNioSelector implements NioSelector {
public void run() {
thread = Thread.currentThread();
startupLatch.countDown();
int selectReturnsImmediately = 0;
Selector selector = this.selector;

View File

@ -17,10 +17,13 @@
package org.jboss.netty.channel.socket.nio;
import org.jboss.netty.channel.socket.Worker;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ExternalResourceReleasable;
import org.jboss.netty.util.internal.ExecutorUtil;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -30,10 +33,17 @@ import java.util.concurrent.atomic.AtomicInteger;
public abstract class AbstractNioWorkerPool<E extends AbstractNioWorker>
implements WorkerPool<E>, ExternalResourceReleasable {
/**
* The worker pool raises an exception unless all worker threads start and run within this timeout (in seconds.)
*/
private static final int INITIALIZATION_TIMEOUT = 10;
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractNioWorkerPool.class);
private final AbstractNioWorker[] workers;
private final AtomicInteger workerIndex = new AtomicInteger();
private final Executor workerExecutor;
private volatile boolean initDone;
private volatile boolean initialized;
/**
* Create a new instance
@ -59,15 +69,49 @@ public abstract class AbstractNioWorkerPool<E extends AbstractNioWorker>
init();
}
}
protected void init() {
if (initDone) {
throw new IllegalStateException("Init was done before");
if (initialized) {
throw new IllegalStateException("initialized already");
}
initDone = true;
initialized = true;
for (int i = 0; i < workers.length; i++) {
workers[i] = newWorker(workerExecutor);
}
waitForWorkerThreads();
}
private void waitForWorkerThreads() {
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(INITIALIZATION_TIMEOUT);
boolean warn = false;
for (AbstractNioSelector worker: workers) {
long waitTime = deadline - System.nanoTime();
try {
if (waitTime <= 0) {
if (worker.thread == null) {
warn = true;
break;
}
} else if (!worker.startupLatch.await(waitTime, TimeUnit.NANOSECONDS)) {
warn = true;
break;
}
} catch (InterruptedException ignore) {
// Stop waiting for the worker threads and let someone else take care of the interruption.
Thread.currentThread().interrupt();
break;
}
}
if (!warn) {
logger.warn(
"Failed to get all worker threads ready within " + INITIALIZATION_TIMEOUT + " second(s). " +
"Make sure to specify the executor which has more threads than the requested workerCount. " +
"If unsure, use Executors.newCachedThreadPool().");
}
}
/**