Simplified NioWorker.register() - it was too complex

This commit is contained in:
Trustin Lee 2008-11-08 12:00:07 +00:00
parent c5a211e07a
commit 59e15efbbe

View File

@ -70,12 +70,12 @@ class NioWorker implements Runnable {
private final int bossId;
private final int id;
private final Executor executor;
private final AtomicBoolean started = new AtomicBoolean();
private boolean started;
private volatile Thread thread;
volatile Selector selector;
private final AtomicBoolean wakenUp = new AtomicBoolean();
private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock();
private final Object shutdownLock = new Object();
private final Object startStopLock = new Object();
private final Queue<Runnable> registerTaskQueue = new LinkedTransferQueue<Runnable>();
private final Queue<Runnable> writeTaskQueue = new LinkedTransferQueue<Runnable>();
@ -86,60 +86,56 @@ class NioWorker implements Runnable {
}
void register(NioSocketChannel channel, ChannelFuture future) {
boolean firstChannel;
Selector selector;
for (;;) {
firstChannel = started.compareAndSet(false, true);
if (firstChannel) {
boolean success = false;
try {
synchronized (shutdownLock) {
this.selector = selector = Selector.open();
}
success = true;
} catch (IOException e) {
throw new ChannelException(
"Failed to create a selector.", e);
} finally {
if (!success) {
started.compareAndSet(true, false);
}
}
break;
} else {
selector = this.selector;
if (selector == null) {
do {
Thread.yield();
selector = this.selector;
} while (selector == null && started.get());
if (selector != null && selector.isOpen()) {
break;
}
} else {
break;
}
}
}
boolean server = !(channel instanceof NioClientSocketChannel);
Runnable registerTask = new RegisterTask(channel, future, server);
synchronized (shutdownLock) {
Selector selector;
synchronized (startStopLock) {
if (!started) {
// Open a selector if this worker didn't start yet.
try {
this.selector = selector = Selector.open();
} catch (Throwable t) {
throw new ChannelException(
"Failed to create a selector.", t);
}
// Start the worker thread with the new Selector.
String threadName =
(server ? "New I/O server worker #"
: "New I/O client worker #") + bossId + '-' + id;
boolean success = false;
try {
executor.execute(new ThreadRenamingRunnable(this, threadName));
success = true;
} finally {
if (!success) {
// Release the Selector if the execution fails.
try {
selector.close();
} catch (Throwable t) {
logger.warn("Failed to close a selector.", t);
}
this.selector = selector = null;
// The method will return to the caller at this point.
}
}
} else {
// Use the existing selector if this worker has been started.
selector = this.selector;
}
assert selector != null && selector.isOpen();
started = true;
registerTaskQueue.offer(registerTask);
}
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
if (firstChannel) {
String threadName =
(server ? "New I/O server worker #"
: "New I/O client worker #") + bossId + '-' + id;
executor.execute(new ThreadRenamingRunnable(this, threadName));
}
}
public void run() {
@ -176,9 +172,9 @@ class NioWorker implements Runnable {
if (shutdown ||
executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) {
synchronized (shutdownLock) {
synchronized (startStopLock) {
if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) {
started.set(false);
started = false;
try {
selector.close();
} catch (IOException e) {
@ -203,7 +199,8 @@ class NioWorker implements Runnable {
logger.warn(
"Unexpected exception in the selector loop.", t);
// Prevent possible consecutive immediate failures.
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {