Fix race-condition which could lead to a NPE or Exception while register a Channel with a Worker. See #469

This commit is contained in:
norman 2012-07-23 11:21:32 +02:00
parent 36acda0be9
commit 96182f97e8

View File

@ -136,14 +136,15 @@ abstract class AbstractNioWorker implements Worker {
void register(AbstractNioChannel<?> channel, ChannelFuture future) { void register(AbstractNioChannel<?> channel, ChannelFuture future) {
Runnable registerTask = createRegisterTask(channel, future); Runnable registerTask = createRegisterTask(channel, future);
Selector selector = start();
synchronized (startStopLock) {
Selector selector = start();
boolean offered = registerTaskQueue.offer(registerTask); boolean offered = registerTaskQueue.offer(registerTask);
assert offered; assert offered;
if (wakenUp.compareAndSet(false, true)) { if (wakenUp.compareAndSet(false, true)) {
synchronized (startStopLock) {
// wake up the selector to speed things // wake up the selector to speed things
selector = this.selector; selector = this.selector;
@ -164,38 +165,36 @@ abstract class AbstractNioWorker implements Worker {
* @return selector * @return selector
*/ */
private Selector start() { private Selector start() {
synchronized (startStopLock) { if (!started) {
if (!started) { // Open a selector if this worker didn't start yet.
// Open a selector if this worker didn't start yet. try {
try { selector = Selector.open();
selector = Selector.open(); } catch (Throwable t) {
} catch (Throwable t) { throw new ChannelException("Failed to create a selector.", t);
throw new ChannelException("Failed to create a selector.", t);
}
// Start the worker thread with the new Selector.
boolean success = false;
try {
DeadLockProofWorker.start(executor, new ThreadRenamingRunnable(this, "New I/O worker #" + id));
success = true;
} finally {
if (!success) {
// Release the Selector if the execution fails.
try {
selector.close();
} catch (Throwable t) {
logger.warn("Failed to close a selector.", t);
}
selector = null;
// The method will return to the caller at this point.
}
}
} }
assert selector != null && selector.isOpen(); // Start the worker thread with the new Selector.
boolean success = false;
started = true; try {
DeadLockProofWorker.start(executor, new ThreadRenamingRunnable(this, "New I/O worker #" + id));
success = true;
} finally {
if (!success) {
// Release the Selector if the execution fails.
try {
selector.close();
} catch (Throwable t) {
logger.warn("Failed to close a selector.", t);
}
selector = null;
// The method will return to the caller at this point.
}
}
} }
assert selector != null && selector.isOpen();
started = true;
return selector; return selector;
} }
@ -322,17 +321,20 @@ abstract class AbstractNioWorker implements Worker {
if (!alwaysAsync && Thread.currentThread() == thread) { if (!alwaysAsync && Thread.currentThread() == thread) {
task.run(); task.run();
} else { } else {
start(); synchronized (startStopLock) {
boolean added = eventQueue.offer(task); start();
boolean added = eventQueue.offer(task);
assert added; assert added;
if (added) { if (added) {
// wake up the selector to speed things // wake up the selector to speed things
Selector selector = this.selector; Selector selector = this.selector;
if (selector != null) { if (selector != null) {
selector.wakeup(); selector.wakeup();
}
} }
} }
} }
} }