OioWorker failed to fire channelConnected event for OioAcceptedSocketChannel which is fixed now. This also fix a race which can could lead to missing events. See #287

This commit is contained in:
norman 2012-04-26 14:41:47 +02:00
parent 880b01e45f
commit 8777c3c02b
2 changed files with 23 additions and 21 deletions

View File

@ -52,16 +52,8 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
public void run() {
thread = channel.workerThread = Thread.currentThread();
boolean fireConnected = channel instanceof OioAcceptedSocketChannel;
while (channel.isOpen()) {
if (fireConnected) {
// Fire the channelConnected event for OioAcceptedSocketChannel.
// See #287
fireConnected = false;
fireChannelConnected(channel, channel.getRemoteAddress());
}
synchronized (channel.interestOpsLock) {
while (!channel.isReadable()) {
try {
@ -76,29 +68,25 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
}
}
boolean cont = false;
try {
boolean cont = process();
cont = process();
} catch (Throwable t) {
if (!channel.isSocketClosed()) {
fireExceptionCaught(channel, t);
}
} finally {
processEventQueue();
if (!cont) {
break;
}
} catch (Throwable t) {
if (!channel.isSocketClosed()) {
fireExceptionCaught(channel, t);
}
break;
}
}
// Setting the workerThread to null will prevent any channel
// operations from interrupting this thread from now on.
channel.workerThread = null;
// execute all events that are in the queue now
processEventQueue();
// Clean up.
close(channel, succeededFuture(channel), true);
@ -106,7 +94,9 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
// Mark the worker event loop as done so we know that we need to run tasks directly and not queue them
// See #287
done = true;
// just to make we don't have something left
processEventQueue();
}

View File

@ -40,6 +40,18 @@ class OioWorker extends AbstractOioWorker<OioSocketChannel> {
super(channel);
}
@Override
public void run() {
boolean fireConnected = channel instanceof OioAcceptedSocketChannel;
if (fireConnected && channel.isOpen()) {
// Fire the channelConnected event for OioAcceptedSocketChannel.
// See #287
fireChannelConnected(channel, channel.getRemoteAddress());
}
super.run();
}
@Override
boolean process() throws IOException {
byte[] buf;