diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java index e939210ff8..35e1fe319a 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java @@ -70,12 +70,12 @@ class NioWorker implements Runnable { private final int bossId; private final int id; private final Executor executor; - private final AtomicBoolean started = new AtomicBoolean(); + private boolean started; private volatile Thread thread; volatile Selector selector; private final AtomicBoolean wakenUp = new AtomicBoolean(); private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock(); - private final Object shutdownLock = new Object(); + private final Object startStopLock = new Object(); private final Queue registerTaskQueue = new LinkedTransferQueue(); private final Queue writeTaskQueue = new LinkedTransferQueue(); @@ -86,60 +86,56 @@ class NioWorker implements Runnable { } void register(NioSocketChannel channel, ChannelFuture future) { - boolean firstChannel; - Selector selector; - for (;;) { - firstChannel = started.compareAndSet(false, true); - if (firstChannel) { - boolean success = false; - try { - synchronized (shutdownLock) { - this.selector = selector = Selector.open(); - } - success = true; - } catch (IOException e) { - throw new ChannelException( - "Failed to create a selector.", e); - } finally { - if (!success) { - started.compareAndSet(true, false); - } - } - - break; - } else { - selector = this.selector; - if (selector == null) { - do { - Thread.yield(); - selector = this.selector; - } while (selector == null && started.get()); - - if (selector != null && selector.isOpen()) { - break; - } - } else { - break; - } - } - } - + boolean server = !(channel instanceof NioClientSocketChannel); Runnable registerTask = new RegisterTask(channel, future, server); - synchronized (shutdownLock) { + Selector selector; + + synchronized (startStopLock) { + if (!started) { + // Open a selector if this worker didn't start yet. + try { + this.selector = selector = Selector.open(); + } catch (Throwable t) { + throw new ChannelException( + "Failed to create a selector.", t); + } + + // Start the worker thread with the new Selector. + String threadName = + (server ? "New I/O server worker #" + : "New I/O client worker #") + bossId + '-' + id; + + boolean success = false; + try { + executor.execute(new ThreadRenamingRunnable(this, threadName)); + success = true; + } finally { + if (!success) { + // Release the Selector if the execution fails. + try { + selector.close(); + } catch (Throwable t) { + logger.warn("Failed to close a selector.", t); + } + this.selector = selector = null; + // The method will return to the caller at this point. + } + } + } else { + // Use the existing selector if this worker has been started. + selector = this.selector; + } + + assert selector != null && selector.isOpen(); + + started = true; registerTaskQueue.offer(registerTask); } + if (wakenUp.compareAndSet(false, true)) { selector.wakeup(); } - - if (firstChannel) { - String threadName = - (server ? "New I/O server worker #" - : "New I/O client worker #") + bossId + '-' + id; - - executor.execute(new ThreadRenamingRunnable(this, threadName)); - } } public void run() { @@ -176,9 +172,9 @@ class NioWorker implements Runnable { if (shutdown || executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) { - synchronized (shutdownLock) { + synchronized (startStopLock) { if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) { - started.set(false); + started = false; try { selector.close(); } catch (IOException e) { @@ -203,7 +199,8 @@ class NioWorker implements Runnable { logger.warn( "Unexpected exception in the selector loop.", t); - // Prevent possible consecutive immediate failures. + // Prevent possible consecutive immediate failures that lead to + // excessive CPU consumption. try { Thread.sleep(1000); } catch (InterruptedException e) {