diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java index d17ee6e60c..4ab4ecb537 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java @@ -30,6 +30,7 @@ import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; +import java.util.Queue; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -47,6 +48,7 @@ import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; +import org.jboss.netty.util.LinkedTransferQueue; import org.jboss.netty.util.ThreadRenamingRunnable; /** @@ -167,55 +169,61 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { private final class Boss implements Runnable { - private final AtomicBoolean started = new AtomicBoolean(); - private volatile Selector selector; - private final Object selectorGuard = new Object(); + volatile Selector selector; + private boolean started; + private final AtomicBoolean wakenUp = new AtomicBoolean(); + private final Object startStopLock = new Object(); + private final Queue registerTaskQueue = new LinkedTransferQueue(); Boss() { super(); } void register(NioSocketChannel channel) { - // FIXME: Infinite loop on selector creation failure. - // Apply the same fix with what's applied in NioWorker.register() - boolean firstChannel = started.compareAndSet(false, true); + Runnable registerTask = new RegisterTask(this, channel); Selector selector; - if (firstChannel) { - try { - this.selector = selector = Selector.open(); - } catch (IOException e) { - throw new ChannelException( - "Failed to create a selector.", e); - } - } else { - selector = this.selector; - if (selector == null) { - do { - Thread.yield(); - selector = this.selector; - } while (selector == null); + + synchronized (startStopLock) { + if (!started) { + // Open a selector if this worker didn't start yet. + try { + this.selector = selector = Selector.open(); + } catch (Throwable t) { + throw new ChannelException( + "Failed to create a selector.", t); + } + + // Start the worker thread with the new Selector. + boolean success = false; + try { + bossExecutor.execute(new ThreadRenamingRunnable( + this, "New I/O client boss #" + id)); + success = true; + } finally { + if (!success) { + // Release the Selector if the execution fails. + try { + selector.close(); + } catch (Throwable t) { + logger.warn("Failed to close a selector.", t); + } + this.selector = selector = null; + // The method will return to the caller at this point. + } + } + } else { + // Use the existing selector if this worker has been started. + selector = this.selector; } + + assert selector != null && selector.isOpen(); + + started = true; + registerTaskQueue.offer(registerTask); } - if (firstChannel) { - try { - channel.socket.register(selector, SelectionKey.OP_CONNECT, channel); - } catch (ClosedChannelException e) { - throw new ChannelException( - "Failed to register a socket to the selector.", e); - } - bossExecutor.execute(new ThreadRenamingRunnable( - this, "New I/O client boss #" + id)); - } else { - synchronized (selectorGuard) { - selector.wakeup(); - try { - channel.socket.register(selector, SelectionKey.OP_CONNECT, channel); - } catch (ClosedChannelException e) { - throw new ChannelException( - "Failed to register a socket to the selector.", e); - } - } + if (wakenUp.compareAndSet(false, true)) { + selector.wakeup(); } } @@ -223,12 +231,13 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { boolean shutdown = false; Selector selector = this.selector; for (;;) { - synchronized (selectorGuard) { - // This empty synchronization block prevents the selector - // from acquiring its lock. - } + wakenUp.set(false); + try { int selectedKeyCount = selector.select(500); + + processRegisterTaskQueue(); + if (selectedKeyCount > 0) { processSelectedKeys(selector.selectedKeys()); } @@ -242,18 +251,17 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { if (shutdown || bossExecutor instanceof ExecutorService && ((ExecutorService) bossExecutor).isShutdown()) { - synchronized (selectorGuard) { - if (selector.keys().isEmpty()) { + synchronized (startStopLock) { + if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) { + started = false; try { selector.close(); } catch (IOException e) { logger.warn( - "Failed to close a selector.", - e); + "Failed to close a selector.", e); } finally { this.selector = null; } - started.set(false); break; } else { shutdown = false; @@ -280,6 +288,17 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { } } + private void processRegisterTaskQueue() { + for (;;) { + final Runnable task = registerTaskQueue.poll(); + if (task == null) { + break; + } + + task.run(); + } + } + private void processSelectedKeys(Set selectedKeys) { for (Iterator i = selectedKeys.iterator(); i.hasNext();) { SelectionKey k = i.next(); @@ -318,4 +337,24 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { NioWorker.close(ch, ch.getSucceededFuture()); } } + + private final class RegisterTask implements Runnable { + private final Boss boss; + private final NioSocketChannel channel; + + RegisterTask(Boss boss, NioSocketChannel channel) { + this.boss = boss; + this.channel = channel; + } + + public void run() { + try { + channel.socket.register( + boss.selector, SelectionKey.OP_CONNECT, channel); + } catch (ClosedChannelException e) { + throw new ChannelException( + "Failed to register a socket to the selector.", e); + } + } + } }