From 880b01e45f2e62eba04a6910a2f0ce5fc02e567a Mon Sep 17 00:00:00 2001 From: norman Date: Thu, 26 Apr 2012 14:30:20 +0200 Subject: [PATCH] OioWorker failed to fire channelConnected event for OioAcceptedSocketChannel which is fixed now. This also fix a race which can could lead to missing events. See #287 --- .../channel/socket/oio/AbstractOioWorker.java | 28 +++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/jboss/netty/channel/socket/oio/AbstractOioWorker.java b/src/main/java/org/jboss/netty/channel/socket/oio/AbstractOioWorker.java index 05094db46f..f335903a48 100644 --- a/src/main/java/org/jboss/netty/channel/socket/oio/AbstractOioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/oio/AbstractOioWorker.java @@ -42,6 +42,8 @@ abstract class AbstractOioWorker implements Worker */ protected volatile Thread thread; + private volatile boolean done; + public AbstractOioWorker(C channel) { this.channel = channel; channel.worker = this; @@ -50,8 +52,16 @@ abstract class AbstractOioWorker implements Worker public void run() { thread = channel.workerThread = Thread.currentThread(); + boolean fireConnected = channel instanceof OioAcceptedSocketChannel; while (channel.isOpen()) { + if (fireConnected) { + // Fire the channelConnected event for OioAcceptedSocketChannel. + // See #287 + fireConnected = false; + fireChannelConnected(channel, channel.getRemoteAddress()); + } + synchronized (channel.interestOpsLock) { while (!channel.isReadable()) { try { @@ -78,6 +88,7 @@ abstract class AbstractOioWorker implements Worker if (!channel.isSocketClosed()) { fireExceptionCaught(channel, t); } + break; } } @@ -85,9 +96,18 @@ abstract class AbstractOioWorker implements Worker // Setting the workerThread to null will prevent any channel // operations from interrupting this thread from now on. channel.workerThread = null; + + // execute all events that are in the queue now + processEventQueue(); // Clean up. close(channel, succeededFuture(channel), true); + + // Mark the worker event loop as done so we know that we need to run tasks directly and not queue them + // See #287 + done = true; + + } static boolean isIoThread(AbstractOioChannel channel) { @@ -96,7 +116,11 @@ abstract class AbstractOioWorker implements Worker public void executeInIoThread(Runnable task) { - if (Thread.currentThread() == thread) { + // check if the current thread is the worker thread + // + // Also check if the event loop of the worker is complete. If so we need to run the task now. + // See #287 + if (Thread.currentThread() == thread || done) { task.run(); } else { boolean added = eventQueue.offer(task); @@ -107,7 +131,7 @@ abstract class AbstractOioWorker implements Worker } } - private void processEventQueue() throws IOException { + private void processEventQueue() { for (;;) { final Runnable task = eventQueue.poll(); if (task == null) {