Make sure AbstractNioWorker gets started if needed

This commit is contained in:
norman 2012-03-07 15:37:33 +01:00
parent 62028f0042
commit e207af30a3

View File

@ -127,13 +127,28 @@ abstract class AbstractNioWorker implements Worker {
void register(AbstractNioChannel<?> channel, ChannelFuture future) {
Runnable registerTask = createRegisterTask(channel, future);
Selector selector;
Selector selector = start();
boolean offered = registerTaskQueue.offer(registerTask);
assert offered;
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
/**
* Start the {@link AbstractNioWorker} and return the {@link Selector} that will be used for the {@link AbstractNioChannel}'s when they get registered
*
* @return selector
*/
private Selector start() {
synchronized (startStopLock) {
if (!started) {
// Open a selector if this worker didn't start yet.
try {
this.selector = selector = Selector.open();
this.selector = Selector.open();
} catch (Throwable t) {
throw new ChannelException("Failed to create a selector.", t);
}
@ -151,28 +166,19 @@ abstract class AbstractNioWorker implements Worker {
} catch (Throwable t) {
logger.warn("Failed to close a selector.", t);
}
this.selector = selector = null;
this.selector = null;
// The method will return to the caller at this point.
}
}
} else {
// Use the existing selector if this worker has been started.
selector = this.selector;
}
assert selector != null && selector.isOpen();
started = true;
boolean offered = registerTaskQueue.offer(registerTask);
assert offered;
}
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
return selector;
}
@Override
public void run() {
thread = Thread.currentThread();
@ -281,22 +287,22 @@ abstract class AbstractNioWorker implements Worker {
@Override
public void executeInIoThread(Runnable task) {
if (Thread.currentThread() == thread) {
task.run();
} else {
boolean added = eventQueue.offer(task);
assert added;
if (added) {
// wake up the selector to speed things
Selector selector = this.selector;
if (selector != null) {
selector.wakeup();
}
}
}
start();
if (Thread.currentThread() == thread) {
task.run();
} else {
boolean added = eventQueue.offer(task);
assert added;
if (added) {
// wake up the selector to speed things
Selector selector = this.selector;
if (selector != null) {
selector.wakeup();
}
}
}
}
private void processRegisterTaskQueue() throws IOException {