OioWorker failed to fire channelConnected event for OioAcceptedSocketChannel which is fixed now. This also fix a race which can could lead to missing events. See #287

This commit is contained in:
norman 2012-04-26 14:53:31 +02:00
parent a8b9e27c92
commit 9d555b0b97
2 changed files with 37 additions and 9 deletions

View File

@ -36,6 +36,8 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
protected final C channel;
private volatile boolean done;
/**
* If this worker has been started thread will be a reference to the thread
* used when starting. i.e. the current thread when the run method is executed.
@ -66,19 +68,20 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
}
}
boolean cont = false;
try {
boolean cont = process();
cont = process();
} catch (Throwable t) {
if (!channel.isSocketClosed()) {
fireExceptionCaught(channel, t);
}
} finally {
processEventQueue();
if (!cont) {
break;
}
} catch (Throwable t) {
if (!channel.isSocketClosed()) {
fireExceptionCaught(channel, t);
}
break;
}
}
@ -88,6 +91,14 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
// Clean up.
close(channel, succeededFuture(channel), true);
// Mark the worker event loop as done so we know that we need to run tasks directly and not queue them
// See #287
done = true;
// just to make we don't have something left
processEventQueue();
}
static boolean isIoThread(AbstractOioChannel channel) {
@ -96,7 +107,11 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
@Override
public void executeInIoThread(Runnable task) {
if (Thread.currentThread() == thread) {
// check if the current thread is the worker thread
//
// Also check if the event loop of the worker is complete. If so we need to run the task now.
// See #287
if (Thread.currentThread() == thread || done) {
task.run();
} else {
boolean added = eventQueue.offer(task);
@ -107,7 +122,7 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
}
}
private void processEventQueue() throws IOException {
private void processEventQueue() {
for (;;) {
final Runnable task = eventQueue.poll();
if (task == null) {

View File

@ -39,6 +39,19 @@ class OioWorker extends AbstractOioWorker<OioSocketChannel> {
super(channel);
}
@Override
public void run() {
boolean fireConnected = channel instanceof OioAcceptedSocketChannel;
if (fireConnected && channel.isOpen()) {
// Fire the channelConnected event for OioAcceptedSocketChannel.
// See #287
fireChannelConnected(channel, channel.getRemoteAddress());
}
super.run();
}
@Override
boolean process() throws IOException {
byte[] buf;