OioWorker failed to fire channelConnected event for OioAcceptedSocketChannel which is fixed now. This also fix a race which can could lead to missing events. See #287

This commit is contained in:
norman 2012-04-26 14:30:20 +02:00
parent 86217b692a
commit 880b01e45f

View File

@ -42,6 +42,8 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
*/ */
protected volatile Thread thread; protected volatile Thread thread;
private volatile boolean done;
public AbstractOioWorker(C channel) { public AbstractOioWorker(C channel) {
this.channel = channel; this.channel = channel;
channel.worker = this; channel.worker = this;
@ -50,8 +52,16 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
public void run() { public void run() {
thread = channel.workerThread = Thread.currentThread(); thread = channel.workerThread = Thread.currentThread();
boolean fireConnected = channel instanceof OioAcceptedSocketChannel;
while (channel.isOpen()) { while (channel.isOpen()) {
if (fireConnected) {
// Fire the channelConnected event for OioAcceptedSocketChannel.
// See #287
fireConnected = false;
fireChannelConnected(channel, channel.getRemoteAddress());
}
synchronized (channel.interestOpsLock) { synchronized (channel.interestOpsLock) {
while (!channel.isReadable()) { while (!channel.isReadable()) {
try { try {
@ -78,6 +88,7 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
if (!channel.isSocketClosed()) { if (!channel.isSocketClosed()) {
fireExceptionCaught(channel, t); fireExceptionCaught(channel, t);
} }
break; break;
} }
} }
@ -86,8 +97,17 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
// operations from interrupting this thread from now on. // operations from interrupting this thread from now on.
channel.workerThread = null; channel.workerThread = null;
// execute all events that are in the queue now
processEventQueue();
// Clean up. // Clean up.
close(channel, succeededFuture(channel), true); close(channel, succeededFuture(channel), true);
// Mark the worker event loop as done so we know that we need to run tasks directly and not queue them
// See #287
done = true;
} }
static boolean isIoThread(AbstractOioChannel channel) { static boolean isIoThread(AbstractOioChannel channel) {
@ -96,7 +116,11 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
public void executeInIoThread(Runnable task) { public void executeInIoThread(Runnable task) {
if (Thread.currentThread() == thread) { // check if the current thread is the worker thread
//
// Also check if the event loop of the worker is complete. If so we need to run the task now.
// See #287
if (Thread.currentThread() == thread || done) {
task.run(); task.run();
} else { } else {
boolean added = eventQueue.offer(task); boolean added = eventQueue.offer(task);
@ -107,7 +131,7 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
} }
} }
private void processEventQueue() throws IOException { private void processEventQueue() {
for (;;) { for (;;) {
final Runnable task = eventQueue.poll(); final Runnable task = eventQueue.poll();
if (task == null) { if (task == null) {