diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioBossPool.java b/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioBossPool.java new file mode 100644 index 0000000000..2b7e987c6a --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioBossPool.java @@ -0,0 +1,92 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.socket.nio; + +import org.jboss.netty.util.ExternalResourceReleasable; +import org.jboss.netty.util.internal.ExecutorUtil; + +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; + +public abstract class AbstractNioBossPool + implements BossPool, ExternalResourceReleasable { + + private final Boss[] bosses; + private final AtomicInteger bossIndex = new AtomicInteger(); + private final Executor bossExecutor; + private volatile boolean initDone; + + /** + * Create a new instance + * + * @param bossExecutor the {@link Executor} to use for the {@link Boss}'s + * @param bossCount the count of {@link Boss}'s to create + */ + AbstractNioBossPool(Executor bossExecutor, int bossCount) { + this(bossExecutor, bossCount, true); + } + + AbstractNioBossPool(Executor bossExecutor, int bossCount, boolean autoInit) { + if (bossExecutor == null) { + throw new NullPointerException("bossExecutor"); + } + if (bossCount <= 0) { + throw new IllegalArgumentException( + "bossCount (" + bossCount + ") " + + "must be a positive integer."); + } + bosses = new Boss[bossCount]; + this.bossExecutor = bossExecutor; + if (autoInit) { + init(); + } + } + + protected void init() { + if (initDone) { + throw new IllegalStateException("Init was done before"); + } + initDone = true; + + for (int i = 0; i < bosses.length; i++) { + bosses[i] = newBoss(bossExecutor); + } + } + + /** + * Create a new {@link Boss} which uses the given {@link Executor} to service IO + * + * + * @param executor the {@link Executor} to use + * @return worker the new {@link Boss} + */ + protected abstract E newBoss(Executor executor); + + @SuppressWarnings("unchecked") + public E nextBoss() { + return (E) bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)]; + } + + + public void releaseExternalResources() { + ExecutorUtil.terminate(bossExecutor); + for (Boss boss: bosses) { + if (boss instanceof ExternalResourceReleasable) { + ((ExternalResourceReleasable) boss).releaseExternalResources(); + } + } + } +} diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorker.java index de33a1f83c..a71b500e1e 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorker.java @@ -24,6 +24,7 @@ import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.util.ExternalResourceReleasable; +import org.jboss.netty.util.ThreadNameDeterminer; import org.jboss.netty.util.ThreadRenamingRunnable; import org.jboss.netty.util.internal.DeadLockProofWorker; @@ -123,8 +124,12 @@ abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable { protected final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool(); AbstractNioWorker(Executor executor) { + this(executor, null); + } + + AbstractNioWorker(Executor executor, ThreadNameDeterminer determiner) { this.executor = executor; - openSelector(); + openSelector(determiner); } void register(AbstractNioChannel channel, ChannelFuture future) { @@ -189,7 +194,7 @@ abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable { * Start the {@link AbstractNioWorker} and return the {@link Selector} that will be used for * the {@link AbstractNioChannel}'s when they get registered */ - private void openSelector() { + private void openSelector(ThreadNameDeterminer determiner) { try { selector = Selector.open(); } catch (Throwable t) { @@ -199,7 +204,7 @@ abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable { // Start the worker thread with the new Selector. boolean success = false; try { - DeadLockProofWorker.start(executor, new ThreadRenamingRunnable(this, "New I/O worker #" + id)); + DeadLockProofWorker.start(executor, new ThreadRenamingRunnable(this, "New I/O worker #" + id, determiner)); success = true; } finally { if (!success) { diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorkerPool.java b/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorkerPool.java index ab790e7394..a5f12de1b3 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorkerPool.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorkerPool.java @@ -33,7 +33,7 @@ public abstract class AbstractNioWorkerPool private final AbstractNioWorker[] workers; private final AtomicInteger workerIndex = new AtomicInteger(); private final Executor workerExecutor; - + private volatile boolean initDone; /** * Create a new instance @@ -42,30 +42,61 @@ public abstract class AbstractNioWorkerPool * @param workerCount the count of {@link Worker}'s to create */ AbstractNioWorkerPool(Executor workerExecutor, int workerCount) { + this(workerExecutor, workerCount, true); + } + + AbstractNioWorkerPool(Executor workerExecutor, int workerCount, boolean autoInit) { if (workerExecutor == null) { throw new NullPointerException("workerExecutor"); } if (workerCount <= 0) { throw new IllegalArgumentException( - "workerCount (" + workerCount + ") " + - "must be a positive integer."); + "workerCount (" + workerCount + ") " + "must be a positive integer."); } workers = new AbstractNioWorker[workerCount]; + this.workerExecutor = workerExecutor; + if (autoInit) { + init(); + } + } + protected void init() { + if (initDone) { + throw new IllegalStateException("Init was done before"); + } + initDone = true; for (int i = 0; i < workers.length; i++) { - workers[i] = createWorker(workerExecutor); + workers[i] = newWorker(workerExecutor); } - this.workerExecutor = workerExecutor; } /** - * Create a new {@link Worker} which uses the given {@link Executor} to service IO + * Only here for backward compability and will be removed in later releases. Please use + * {@link #newWorker(java.util.concurrent.Executor)} + * + * + * @param executor the {@link Executor} to use + * @return worker the new {@link Worker} + * @deprecated use {@link #newWorker(java.util.concurrent.Executor)} + */ + @Deprecated + protected E createWorker(Executor executor) { + throw new IllegalStateException("This will be removed. Override this and the newWorker(..) method!"); + } + + /** + * Create a new {@link Worker} which uses the given {@link Executor} to service IO. + * + * This method will be made abstract in further releases (once {@link #createWorker(java.util.concurrent.Executor)} + * was removed). * * * @param executor the {@link Executor} to use * @return worker the new {@link Worker} */ - protected abstract E createWorker(Executor executor); + protected E newWorker(Executor executor) { + return createWorker(executor); + } @SuppressWarnings("unchecked") public E nextWorker() { diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/Boss.java b/src/main/java/org/jboss/netty/channel/socket/nio/Boss.java new file mode 100644 index 0000000000..9f71128362 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/nio/Boss.java @@ -0,0 +1,22 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.socket.nio; + +/** + * Serves the boss tasks like connecting/accepting + */ +public interface Boss extends Runnable { +} diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/BossPool.java b/src/main/java/org/jboss/netty/channel/socket/nio/BossPool.java new file mode 100644 index 0000000000..7640614f84 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/nio/BossPool.java @@ -0,0 +1,28 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.socket.nio; + +/** + * A Pool that holds {@link Boss} instances + */ +public interface BossPool { + /** + * Return the next {@link Boss} to use + * + */ + E nextBoss(); + +} diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientBoss.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientBoss.java new file mode 100644 index 0000000000..3cf9aa17d0 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientBoss.java @@ -0,0 +1,454 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.socket.nio; + +import org.jboss.netty.channel.ChannelException; +import org.jboss.netty.logging.InternalLogger; +import org.jboss.netty.logging.InternalLoggerFactory; +import org.jboss.netty.util.ThreadNameDeterminer; +import org.jboss.netty.util.ThreadRenamingRunnable; +import org.jboss.netty.util.Timeout; +import org.jboss.netty.util.Timer; +import org.jboss.netty.util.TimerTask; +import org.jboss.netty.util.internal.DeadLockProofWorker; + +import java.io.IOException; +import java.net.ConnectException; +import java.nio.channels.CancelledKeyException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.Iterator; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.jboss.netty.channel.Channels.*; + +/** + * {@link Boss} implementation that handles the connection attempts of clients + */ +public final class NioClientBoss implements Boss { + + private static final AtomicInteger nextId = new AtomicInteger(); + + private final int id = nextId.incrementAndGet(); + + static final InternalLogger logger = + InternalLoggerFactory.getInstance(NioClientBoss.class); + + private volatile Selector selector; + private boolean started; + private final AtomicBoolean wakenUp = new AtomicBoolean(); + private final Object startStopLock = new Object(); + private final Queue registerTaskQueue = new ConcurrentLinkedQueue(); + private final TimerTask wakeupTask = new TimerTask() { + public void run(Timeout timeout) throws Exception { + // This is needed to prevent a possible race that can lead to a NPE + // when the selector is closed before this is run + // + // See https://github.com/netty/netty/issues/685 + Selector selector = NioClientBoss.this.selector; + + if (selector != null) { + if (wakenUp.compareAndSet(false, true)) { + selector.wakeup(); + } + } + } + }; + private final Executor bossExecutor; + private final ThreadNameDeterminer determiner; + private final Timer timer; + + NioClientBoss(Executor bossExecutor, Timer timer, ThreadNameDeterminer determiner) { + this.bossExecutor = bossExecutor; + this.determiner = determiner; + this.timer = timer; + } + + void register(NioClientSocketChannel channel) { + Runnable registerTask = new RegisterTask(this, channel); + Selector selector; + + synchronized (startStopLock) { + if (!started) { + // Open a selector if this worker didn't start yet. + try { + this.selector = selector = Selector.open(); + } catch (Throwable t) { + throw new ChannelException( + "Failed to create a selector.", t); + } + + // Start the worker thread with the new Selector. + boolean success = false; + try { + DeadLockProofWorker.start(bossExecutor, + new ThreadRenamingRunnable(this, + "New I/O client boss #" + id , determiner)); + + success = true; + } finally { + if (!success) { + // Release the Selector if the execution fails. + try { + selector.close(); + } catch (Throwable t) { + if (logger.isWarnEnabled()) { + logger.warn("Failed to close a selector.", t); + } + } + this.selector = selector = null; + // The method will return to the caller at this point. + } + } + } else { + // Use the existing selector if this worker has been started. + selector = this.selector; + } + + assert selector != null && selector.isOpen(); + + started = true; + boolean offered = registerTaskQueue.offer(registerTask); + assert offered; + } + int timeout = channel.getConfig().getConnectTimeoutMillis(); + if (timeout > 0) { + if (!channel.isConnected()) { + channel.timoutTimer = timer.newTimeout(wakeupTask, + timeout, TimeUnit.MILLISECONDS); + } + } + if (wakenUp.compareAndSet(false, true)) { + selector.wakeup(); + } + + } + + public void run() { + boolean shutdown = false; + int selectReturnsImmediately = 0; + + Selector selector = this.selector; + + // use 80% of the timeout for measure + final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100; + boolean wakenupFromLoop = false; + for (;;) { + wakenUp.set(false); + + try { + long beforeSelect = System.nanoTime(); + int selected = SelectorUtil.select(selector); + if (SelectorUtil.EPOLL_BUG_WORKAROUND && selected == 0 && !wakenupFromLoop && !wakenUp.get()) { + long timeBlocked = System.nanoTime() - beforeSelect; + + if (timeBlocked < minSelectTimeout) { + boolean notConnected = false; + // loop over all keys as the selector may was unblocked because of a closed channel + for (SelectionKey key: selector.keys()) { + SelectableChannel ch = key.channel(); + try { + if (ch instanceof SocketChannel && !((SocketChannel) ch).isConnected()) { + notConnected = true; + // cancel the key just to be on the safe side + key.cancel(); + } + } catch (CancelledKeyException e) { + // ignore + } + } + if (notConnected) { + selectReturnsImmediately = 0; + } else { + // returned before the minSelectTimeout elapsed with nothing select. + // this may be the cause of the jdk epoll(..) bug, so increment the counter + // which we use later to see if its really the jdk bug. + selectReturnsImmediately ++; + + } + + } else { + selectReturnsImmediately = 0; + } + + if (selectReturnsImmediately == 1024) { + // The selector returned immediately for 10 times in a row, + // so recreate one selector as it seems like we hit the + // famous epoll(..) jdk bug. + selector = recreateSelector(); + selectReturnsImmediately = 0; + wakenupFromLoop = false; + // try to select again + continue; + } + } else { + // reset counter + selectReturnsImmediately = 0; + } + + // 'wakenUp.compareAndSet(false, true)' is always evaluated + // before calling 'selector.wakeup()' to reduce the wake-up + // overhead. (Selector.wakeup() is an expensive operation.) + // + // However, there is a race condition in this approach. + // The race condition is triggered when 'wakenUp' is set to + // true too early. + // + // 'wakenUp' is set to true too early if: + // 1) Selector is waken up between 'wakenUp.set(false)' and + // 'selector.select(...)'. (BAD) + // 2) Selector is waken up between 'selector.select(...)' and + // 'if (wakenUp.get()) { ... }'. (OK) + // + // In the first case, 'wakenUp' is set to true and the + // following 'selector.select(...)' will wake up immediately. + // Until 'wakenUp' is set to false again in the next round, + // 'wakenUp.compareAndSet(false, true)' will fail, and therefore + // any attempt to wake up the Selector will fail, too, causing + // the following 'selector.select(...)' call to block + // unnecessarily. + // + // To fix this problem, we wake up the selector again if wakenUp + // is true immediately after selector.select(...). + // It is inefficient in that it wakes up the selector for both + // the first case (BAD - wake-up required) and the second case + // (OK - no wake-up required). + + if (wakenUp.get()) { + wakenupFromLoop = true; + selector.wakeup(); + } else { + wakenupFromLoop = false; + } + processRegisterTaskQueue(); + processSelectedKeys(selector.selectedKeys()); + + // Handle connection timeout every 10 milliseconds approximately. + long currentTimeNanos = System.nanoTime(); + processConnectTimeout(selector.keys(), currentTimeNanos); + + // Exit the loop when there's nothing to handle. + // The shutdown flag is used to delay the shutdown of this + // loop to avoid excessive Selector creation when + // connection attempts are made in a one-by-one manner + // instead of concurrent manner. + if (selector.keys().isEmpty()) { + if (shutdown || + bossExecutor instanceof ExecutorService && ((ExecutorService) bossExecutor).isShutdown()) { + + synchronized (startStopLock) { + if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) { + started = false; + try { + selector.close(); + } catch (IOException e) { + if (logger.isWarnEnabled()) { + logger.warn( + "Failed to close a selector.", e); + } + + } finally { + this.selector = null; + } + break; + } else { + shutdown = false; + } + } + } else { + // Give one more second. + shutdown = true; + } + } else { + shutdown = false; + } + } catch (Throwable t) { + if (logger.isWarnEnabled()) { + logger.warn( + "Unexpected exception in the selector loop.", t); + } + + + // Prevent possible consecutive immediate failures. + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // Ignore. + } + } + } + } + + private void processRegisterTaskQueue() { + for (;;) { + final Runnable task = registerTaskQueue.poll(); + if (task == null) { + break; + } + + task.run(); + } + } + + private void processSelectedKeys(Set selectedKeys) { + // check if the set is empty and if so just return to not create garbage by + // creating a new Iterator every time even if there is nothing to process. + // See https://github.com/netty/netty/issues/597 + if (selectedKeys.isEmpty()) { + return; + } + for (Iterator i = selectedKeys.iterator(); i.hasNext();) { + SelectionKey k = i.next(); + i.remove(); + + if (!k.isValid()) { + close(k); + continue; + } + + try { + if (k.isConnectable()) { + connect(k); + } + } catch (Throwable t) { + NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); + ch.connectFuture.setFailure(t); + fireExceptionCaught(ch, t); + k.cancel(); // Some JDK implementations run into an infinite loop without this. + ch.worker.close(ch, succeededFuture(ch)); + } + } + } + + private void processConnectTimeout(Set keys, long currentTimeNanos) { + ConnectException cause = null; + for (SelectionKey k: keys) { + if (!k.isValid()) { + // Comment the close call again as it gave us major problems + // with ClosedChannelExceptions. + // + // See: + // * https://github.com/netty/netty/issues/142 + // * https://github.com/netty/netty/issues/138 + // + // close(k); + continue; + } + + NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); + if (ch.connectDeadlineNanos > 0 && + currentTimeNanos >= ch.connectDeadlineNanos) { + + if (cause == null) { + cause = new ConnectException("connection timed out"); + } + + ch.connectFuture.setFailure(cause); + fireExceptionCaught(ch, cause); + ch.worker.close(ch, succeededFuture(ch)); + } + } + } + + private void connect(SelectionKey k) throws IOException { + NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); + if (ch.channel.finishConnect()) { + k.cancel(); + if (ch.timoutTimer != null) { + ch.timoutTimer.cancel(); + } + ch.worker.register(ch, ch.connectFuture); + } + } + + private void close(SelectionKey k) { + NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); + ch.worker.close(ch, succeededFuture(ch)); + } + + // Create a new selector and "transfer" all channels from the old + // selector to the new one + private Selector recreateSelector() throws IOException { + Selector newSelector = Selector.open(); + Selector selector = this.selector; + this.selector = newSelector; + + // loop over all the keys that are registered with the old Selector + // and register them with the new one + for (SelectionKey key: selector.keys()) { + SelectableChannel ch = key.channel(); + int ops = key.interestOps(); + Object att = key.attachment(); + // cancel the old key + key.cancel(); + + try { + // register the channel with the new selector now + ch.register(newSelector, ops, att); + } catch (ClosedChannelException e) { + // close the Channel if we can't register it + close(key); + } + } + + try { + // time to close the old selector as everything else is registered to the new one + selector.close(); + } catch (Throwable t) { + if (logger.isWarnEnabled()) { + logger.warn("Failed to close a selector.", t); + } + } + if (logger.isWarnEnabled()) { + logger.warn("Recreated Selector because of possible jdk epoll(..) bug"); + } + return newSelector; + } + + private static final class RegisterTask implements Runnable { + private final NioClientBoss boss; + private final NioClientSocketChannel channel; + + RegisterTask(NioClientBoss boss, NioClientSocketChannel channel) { + this.boss = boss; + this.channel = channel; + } + + public void run() { + try { + channel.channel.register( + boss.selector, SelectionKey.OP_CONNECT, channel); + } catch (ClosedChannelException e) { + channel.worker.close(channel, succeededFuture(channel)); + } + + int connectTimeout = channel.getConfig().getConnectTimeoutMillis(); + if (connectTimeout > 0) { + channel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L; + } + } + } +} diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientBossPool.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientBossPool.java new file mode 100644 index 0000000000..489b12089f --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientBossPool.java @@ -0,0 +1,69 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.socket.nio; + +import org.jboss.netty.util.HashedWheelTimer; +import org.jboss.netty.util.ThreadNameDeterminer; +import org.jboss.netty.util.Timer; + +import java.util.concurrent.Executor; + +/** + * Holds {@link NioClientBoss} instances to use + */ +public class NioClientBossPool extends AbstractNioBossPool { + private final ThreadNameDeterminer determiner; + private final Timer timer; + + /** + * Create a new instance + * + * @param bossExecutor the Executor to use for server the {@link NioClientBoss} + * @param bossCount the number of {@link NioClientBoss} instances this {@link NioClientBossPool} will hold + * @param timer the Timer to use for handle connect timeouts + * @param determiner the {@link ThreadNameDeterminer} to use for name the threads. Use null + * if you not want to set one explicit. + */ + public NioClientBossPool(Executor bossExecutor, int bossCount, Timer timer, ThreadNameDeterminer determiner) { + super(bossExecutor, bossCount, false); + this.determiner = determiner; + this.timer = timer; + init(); + } + + + /** + * Create a new instance using a new {@link HashedWheelTimer} and no {@link ThreadNameDeterminer} + * + * @param bossExecutor the Executor to use for server the {@link NioClientBoss} + * @param bossCount the number of {@link NioClientBoss} instances this {@link NioClientBoss} will hold + */ + public NioClientBossPool(Executor bossExecutor, int bossCount) { + this(bossExecutor, bossCount, new HashedWheelTimer(), null); + } + + @Override + protected NioClientBoss newBoss(Executor executor) { + return new NioClientBoss(executor, timer, determiner); + } + + @Override + public void releaseExternalResources() { + super.releaseExternalResources(); + timer.stop(); + } +} + diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannelFactory.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannelFactory.java index f1fec87258..7e9ba0d918 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannelFactory.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannelFactory.java @@ -28,7 +28,6 @@ import org.jboss.netty.channel.socket.SocketChannel; import org.jboss.netty.util.ExternalResourceReleasable; import org.jboss.netty.util.HashedWheelTimer; import org.jboss.netty.util.Timer; -import org.jboss.netty.util.internal.ExecutorUtil; /** * A {@link ClientSocketChannelFactory} which creates a client-side NIO-based @@ -85,10 +84,9 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory private static final int DEFAULT_BOSS_COUNT = 1; - private final Executor bossExecutor; + private final BossPool bossPool; private final WorkerPool workerPool; private final NioClientSocketPipelineSink sink; - private final Timer timer; /** * Creates a new {@link NioClientSocketChannelFactory} which uses {@link Executors#newCachedThreadPool()} @@ -183,34 +181,41 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory public NioClientSocketChannelFactory( Executor bossExecutor, int bossCount, WorkerPool workerPool, Timer timer) { + this(new NioClientBossPool(bossExecutor, bossCount, timer, null), workerPool); + } - if (bossExecutor == null) { - throw new NullPointerException("bossExecutor"); + /** + * Creates a new instance. + * + * @param bossPool + * the {@link BossPool} to use to handle the connects + * @param workerPool + * the {@link WorkerPool} to use to do the IO + */ + public NioClientSocketChannelFactory( + BossPool bossPool, + WorkerPool workerPool) { + + if (bossPool == null) { + throw new NullPointerException("bossPool"); } if (workerPool == null) { throw new NullPointerException("workerPool"); } - if (bossCount <= 0) { - throw new IllegalArgumentException( - "bossCount (" + bossCount + ") " + - "must be a positive integer."); - } - - - this.bossExecutor = bossExecutor; + this.bossPool = bossPool; this.workerPool = workerPool; - this.timer = timer; - sink = new NioClientSocketPipelineSink( - bossExecutor, bossCount, workerPool, timer); + sink = new NioClientSocketPipelineSink(bossPool); } + public SocketChannel newChannel(ChannelPipeline pipeline) { - return new NioClientSocketChannel(this, pipeline, sink, sink.nextWorker()); + return new NioClientSocketChannel(this, pipeline, sink, workerPool.nextWorker()); } public void releaseExternalResources() { - ExecutorUtil.terminate(bossExecutor); - timer.stop(); + if (bossPool instanceof ExternalResourceReleasable) { + ((ExternalResourceReleasable) bossPool).releaseExternalResources(); + } if (workerPool instanceof ExternalResourceReleasable) { ((ExternalResourceReleasable) workerPool).releaseExternalResources(); } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java index 6b451ad3ac..4baca7f5e1 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java @@ -16,7 +16,6 @@ package org.jboss.netty.channel.socket.nio; import org.jboss.netty.channel.ChannelEvent; -import org.jboss.netty.channel.ChannelException; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFutureListener; import org.jboss.netty.channel.ChannelPipeline; @@ -25,62 +24,22 @@ import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; -import org.jboss.netty.util.ThreadRenamingRunnable; -import org.jboss.netty.util.Timeout; -import org.jboss.netty.util.Timer; -import org.jboss.netty.util.TimerTask; -import org.jboss.netty.util.internal.DeadLockProofWorker; -import java.io.IOException; -import java.net.ConnectException; import java.net.SocketAddress; -import java.nio.channels.CancelledKeyException; import java.nio.channels.ClosedChannelException; -import java.nio.channels.SelectableChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.SocketChannel; -import java.util.Iterator; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import static org.jboss.netty.channel.Channels.*; class NioClientSocketPipelineSink extends AbstractNioChannelSink { - private static final AtomicInteger nextId = new AtomicInteger(); - static final InternalLogger logger = InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class); - final Executor bossExecutor; - final int id = nextId.incrementAndGet(); - private final Boss[] bosses; + private final BossPool bossPool; - private final AtomicInteger bossIndex = new AtomicInteger(); - - private final WorkerPool workerPool; - - private final Timer timer; - - NioClientSocketPipelineSink( - Executor bossExecutor, int bossCount, WorkerPool workerPool, Timer timer) { - - this.bossExecutor = bossExecutor; - this.timer = timer; - bosses = new Boss[bossCount]; - for (int i = 0; i < bosses.length; i ++) { - bosses[i] = new Boss(i); - } - - this.workerPool = workerPool; + NioClientSocketPipelineSink(BossPool bossPool) { + this.bossPool = bossPool; } public void eventSunk( @@ -168,407 +127,8 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink { } } - private Boss nextBoss() { - return bosses[Math.abs( - bossIndex.getAndIncrement() % bosses.length)]; + private NioClientBoss nextBoss() { + return bossPool.nextBoss(); } - NioWorker nextWorker() { - return workerPool.nextWorker(); - } - - private final class Boss implements Runnable { - - volatile Selector selector; - private boolean started; - private final AtomicBoolean wakenUp = new AtomicBoolean(); - private final Object startStopLock = new Object(); - private final Queue registerTaskQueue = new ConcurrentLinkedQueue(); - private final int subId; - private final TimerTask wakeupTask = new TimerTask() { - public void run(Timeout timeout) throws Exception { - // This is needed to prevent a possible race that can lead to a NPE - // when the selector is closed before this is run - // - // See https://github.com/netty/netty/issues/685 - Selector selector = Boss.this.selector; - - if (selector != null) { - if (wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - } - } - }; - - Boss(int subId) { - this.subId = subId; - } - - void register(NioClientSocketChannel channel) { - Runnable registerTask = new RegisterTask(this, channel); - Selector selector; - - synchronized (startStopLock) { - if (!started) { - // Open a selector if this worker didn't start yet. - try { - this.selector = selector = Selector.open(); - } catch (Throwable t) { - throw new ChannelException( - "Failed to create a selector.", t); - } - - // Start the worker thread with the new Selector. - boolean success = false; - try { - DeadLockProofWorker.start(bossExecutor, - new ThreadRenamingRunnable(this, - "New I/O client boss #" + id + '-' + subId)); - - success = true; - } finally { - if (!success) { - // Release the Selector if the execution fails. - try { - selector.close(); - } catch (Throwable t) { - if (logger.isWarnEnabled()) { - logger.warn("Failed to close a selector.", t); - } - } - this.selector = selector = null; - // The method will return to the caller at this point. - } - } - } else { - // Use the existing selector if this worker has been started. - selector = this.selector; - } - - assert selector != null && selector.isOpen(); - - started = true; - boolean offered = registerTaskQueue.offer(registerTask); - assert offered; - } - int timeout = channel.getConfig().getConnectTimeoutMillis(); - if (timeout > 0) { - if (!channel.isConnected()) { - channel.timoutTimer = timer.newTimeout(wakeupTask, - timeout, TimeUnit.MILLISECONDS); - } - } - if (wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - - } - - public void run() { - boolean shutdown = false; - int selectReturnsImmediately = 0; - - Selector selector = this.selector; - - // use 80% of the timeout for measure - final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100; - boolean wakenupFromLoop = false; - for (;;) { - wakenUp.set(false); - - try { - long beforeSelect = System.nanoTime(); - int selected = SelectorUtil.select(selector); - if (SelectorUtil.EPOLL_BUG_WORKAROUND && selected == 0 && !wakenupFromLoop && !wakenUp.get()) { - long timeBlocked = System.nanoTime() - beforeSelect; - - if (timeBlocked < minSelectTimeout) { - boolean notConnected = false; - // loop over all keys as the selector may was unblocked because of a closed channel - for (SelectionKey key: selector.keys()) { - SelectableChannel ch = key.channel(); - try { - if (ch instanceof SocketChannel && !((SocketChannel) ch).isConnected()) { - notConnected = true; - // cancel the key just to be on the safe side - key.cancel(); - } - } catch (CancelledKeyException e) { - // ignore - } - } - if (notConnected) { - selectReturnsImmediately = 0; - } else { - // returned before the minSelectTimeout elapsed with nothing select. - // this may be the cause of the jdk epoll(..) bug, so increment the counter - // which we use later to see if its really the jdk bug. - selectReturnsImmediately ++; - - } - - } else { - selectReturnsImmediately = 0; - } - - if (selectReturnsImmediately == 1024) { - // The selector returned immediately for 10 times in a row, - // so recreate one selector as it seems like we hit the - // famous epoll(..) jdk bug. - selector = recreateSelector(); - selectReturnsImmediately = 0; - wakenupFromLoop = false; - // try to select again - continue; - } - } else { - // reset counter - selectReturnsImmediately = 0; - } - - // 'wakenUp.compareAndSet(false, true)' is always evaluated - // before calling 'selector.wakeup()' to reduce the wake-up - // overhead. (Selector.wakeup() is an expensive operation.) - // - // However, there is a race condition in this approach. - // The race condition is triggered when 'wakenUp' is set to - // true too early. - // - // 'wakenUp' is set to true too early if: - // 1) Selector is waken up between 'wakenUp.set(false)' and - // 'selector.select(...)'. (BAD) - // 2) Selector is waken up between 'selector.select(...)' and - // 'if (wakenUp.get()) { ... }'. (OK) - // - // In the first case, 'wakenUp' is set to true and the - // following 'selector.select(...)' will wake up immediately. - // Until 'wakenUp' is set to false again in the next round, - // 'wakenUp.compareAndSet(false, true)' will fail, and therefore - // any attempt to wake up the Selector will fail, too, causing - // the following 'selector.select(...)' call to block - // unnecessarily. - // - // To fix this problem, we wake up the selector again if wakenUp - // is true immediately after selector.select(...). - // It is inefficient in that it wakes up the selector for both - // the first case (BAD - wake-up required) and the second case - // (OK - no wake-up required). - - if (wakenUp.get()) { - wakenupFromLoop = true; - selector.wakeup(); - } else { - wakenupFromLoop = false; - } - processRegisterTaskQueue(); - processSelectedKeys(selector.selectedKeys()); - - // Handle connection timeout every 10 milliseconds approximately. - long currentTimeNanos = System.nanoTime(); - processConnectTimeout(selector.keys(), currentTimeNanos); - - // Exit the loop when there's nothing to handle. - // The shutdown flag is used to delay the shutdown of this - // loop to avoid excessive Selector creation when - // connection attempts are made in a one-by-one manner - // instead of concurrent manner. - if (selector.keys().isEmpty()) { - if (shutdown || - bossExecutor instanceof ExecutorService && ((ExecutorService) bossExecutor).isShutdown()) { - - synchronized (startStopLock) { - if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) { - started = false; - try { - selector.close(); - } catch (IOException e) { - if (logger.isWarnEnabled()) { - logger.warn( - "Failed to close a selector.", e); - } - - } finally { - this.selector = null; - } - break; - } else { - shutdown = false; - } - } - } else { - // Give one more second. - shutdown = true; - } - } else { - shutdown = false; - } - } catch (Throwable t) { - if (logger.isWarnEnabled()) { - logger.warn( - "Unexpected exception in the selector loop.", t); - } - - - // Prevent possible consecutive immediate failures. - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // Ignore. - } - } - } - } - - private void processRegisterTaskQueue() { - for (;;) { - final Runnable task = registerTaskQueue.poll(); - if (task == null) { - break; - } - - task.run(); - } - } - - private void processSelectedKeys(Set selectedKeys) { - // check if the set is empty and if so just return to not create garbage by - // creating a new Iterator every time even if there is nothing to process. - // See https://github.com/netty/netty/issues/597 - if (selectedKeys.isEmpty()) { - return; - } - for (Iterator i = selectedKeys.iterator(); i.hasNext();) { - SelectionKey k = i.next(); - i.remove(); - - if (!k.isValid()) { - close(k); - continue; - } - - try { - if (k.isConnectable()) { - connect(k); - } - } catch (Throwable t) { - NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); - ch.connectFuture.setFailure(t); - fireExceptionCaught(ch, t); - k.cancel(); // Some JDK implementations run into an infinite loop without this. - ch.worker.close(ch, succeededFuture(ch)); - } - } - } - - private void processConnectTimeout(Set keys, long currentTimeNanos) { - ConnectException cause = null; - for (SelectionKey k: keys) { - if (!k.isValid()) { - // Comment the close call again as it gave us major problems - // with ClosedChannelExceptions. - // - // See: - // * https://github.com/netty/netty/issues/142 - // * https://github.com/netty/netty/issues/138 - // - // close(k); - continue; - } - - NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); - if (ch.connectDeadlineNanos > 0 && - currentTimeNanos >= ch.connectDeadlineNanos) { - - if (cause == null) { - cause = new ConnectException("connection timed out"); - } - - ch.connectFuture.setFailure(cause); - fireExceptionCaught(ch, cause); - ch.worker.close(ch, succeededFuture(ch)); - } - } - } - - private void connect(SelectionKey k) throws IOException { - NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); - if (ch.channel.finishConnect()) { - k.cancel(); - if (ch.timoutTimer != null) { - ch.timoutTimer.cancel(); - } - ch.worker.register(ch, ch.connectFuture); - } - } - - private void close(SelectionKey k) { - NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); - ch.worker.close(ch, succeededFuture(ch)); - } - - // Create a new selector and "transfer" all channels from the old - // selector to the new one - private Selector recreateSelector() throws IOException { - Selector newSelector = Selector.open(); - Selector selector = this.selector; - this.selector = newSelector; - - // loop over all the keys that are registered with the old Selector - // and register them with the new one - for (SelectionKey key: selector.keys()) { - SelectableChannel ch = key.channel(); - int ops = key.interestOps(); - Object att = key.attachment(); - // cancel the old key - key.cancel(); - - try { - // register the channel with the new selector now - ch.register(newSelector, ops, att); - } catch (ClosedChannelException e) { - // close the Channel if we can't register it - close(key); - } - } - - try { - // time to close the old selector as everything else is registered to the new one - selector.close(); - } catch (Throwable t) { - if (logger.isWarnEnabled()) { - logger.warn("Failed to close a selector.", t); - } - } - if (logger.isWarnEnabled()) { - logger.warn("Recreated Selector because of possible jdk epoll(..) bug"); - } - return newSelector; - } - - } - - private static final class RegisterTask implements Runnable { - private final Boss boss; - private final NioClientSocketChannel channel; - - RegisterTask(Boss boss, NioClientSocketChannel channel) { - this.boss = boss; - this.channel = channel; - } - - public void run() { - try { - channel.channel.register( - boss.selector, SelectionKey.OP_CONNECT, channel); - } catch (ClosedChannelException e) { - channel.worker.close(channel, succeededFuture(channel)); - } - - int connectTimeout = channel.getConfig().getConnectTimeoutMillis(); - if (connectTimeout > 0) { - channel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L; - } - } - } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerBoss.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerBoss.java new file mode 100644 index 0000000000..d29e491933 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerBoss.java @@ -0,0 +1,365 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.socket.nio; + +import org.jboss.netty.channel.ChannelException; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelSink; +import org.jboss.netty.logging.InternalLogger; +import org.jboss.netty.logging.InternalLoggerFactory; +import org.jboss.netty.util.ThreadNameDeterminer; +import org.jboss.netty.util.ThreadRenamingRunnable; +import org.jboss.netty.util.internal.DeadLockProofWorker; + +import java.io.IOException; +import java.net.SocketAddress; +import java.net.SocketTimeoutException; +import java.nio.channels.CancelledKeyException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.Iterator; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.jboss.netty.channel.Channels.*; + +/** + * Boss implementation which handles accepting of new connections + */ +public final class NioServerBoss implements Boss { + + private static final AtomicInteger nextId = new AtomicInteger(); + + static final InternalLogger logger = + InternalLoggerFactory.getInstance(NioServerBoss.class); + + private final int id = nextId.incrementAndGet(); + + private volatile Selector selector; + private final Executor bossExecutor; + /** + * Queue of channel registration tasks. + */ + private final Queue bindTaskQueue = new ConcurrentLinkedQueue(); + + /** + * Monitor object used to synchronize selector open/close. + */ + private final Object startStopLock = new Object(); + + /** + * Boolean that controls determines if a blocked Selector.select should + * break out of its selection process. In our case we use a timeone for + * the select method and the select method will block for that time unless + * waken up. + */ + private final AtomicBoolean wakenUp = new AtomicBoolean(); + + private Thread currentThread; + + + private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation + static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization. + + NioServerBoss(Executor bossExecutor) { + this(bossExecutor, null); + } + + NioServerBoss(Executor bossExecutor, ThreadNameDeterminer determiner) { + this.bossExecutor = bossExecutor; + openSelector(determiner); + } + + void bind(final NioServerSocketChannel channel, final ChannelFuture future, + final SocketAddress localAddress) { + synchronized (startStopLock) { + if (selector == null) { + // the selector was null this means the Worker has already been shutdown. + throw new RejectedExecutionException("Worker has already been shutdown"); + } + + boolean offered = bindTaskQueue.offer(new Runnable() { + public void run() { + boolean bound = false; + boolean registered = false; + try { + channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog()); + bound = true; + + future.setSuccess(); + fireChannelBound(channel, channel.getLocalAddress()); + channel.socket.register(selector, SelectionKey.OP_ACCEPT, channel); + + registered = true; + } catch (Throwable t) { + future.setFailure(t); + fireExceptionCaught(channel, t); + } finally { + if (!registered && bound) { + close(channel, future); + } + } + + } + }); + assert offered; + + if (wakenUp.compareAndSet(false, true)) { + selector.wakeup(); + } + } + } + + void close(NioServerSocketChannel channel, ChannelFuture future) { + boolean bound = channel.isBound(); + + try { + channel.socket.close(); + cancelledKeys ++; + + if (channel.setClosed()) { + future.setSuccess(); + + if (bound) { + fireChannelUnbound(channel); + } + fireChannelClosed(channel); + } else { + future.setSuccess(); + } + } catch (Throwable t) { + future.setFailure(t); + fireExceptionCaught(channel, t); + } + } + + private void openSelector(ThreadNameDeterminer determiner) { + try { + selector = Selector.open(); + } catch (Throwable t) { + throw new ChannelException("Failed to create a selector.", t); + } + + // Start the worker thread with the new Selector. + boolean success = false; + try { + DeadLockProofWorker.start(bossExecutor, new ThreadRenamingRunnable(this, + "New I/O server boss #" + id, determiner)); + success = true; + } finally { + if (!success) { + // Release the Selector if the execution fails. + try { + selector.close(); + } catch (Throwable t) { + logger.warn("Failed to close a selector.", t); + } + selector = null; + // The method will return to the caller at this point. + } + } + assert selector != null && selector.isOpen(); + } + + public void run() { + currentThread = Thread.currentThread(); + boolean shutdown = false; + for (;;) { + wakenUp.set(false); + + try { + // Just do a blocking select without any timeout + // as this thread does not execute anything else. + selector.select(); + + // 'wakenUp.compareAndSet(false, true)' is always evaluated + // before calling 'selector.wakeup()' to reduce the wake-up + // overhead. (Selector.wakeup() is an expensive operation.) + // + // However, there is a race condition in this approach. + // The race condition is triggered when 'wakenUp' is set to + // true too early. + // + // 'wakenUp' is set to true too early if: + // 1) Selector is waken up between 'wakenUp.set(false)' and + // 'selector.select(...)'. (BAD) + // 2) Selector is waken up between 'selector.select(...)' and + // 'if (wakenUp.get()) { ... }'. (OK) + // + // In the first case, 'wakenUp' is set to true and the + // following 'selector.select(...)' will wake up immediately. + // Until 'wakenUp' is set to false again in the next round, + // 'wakenUp.compareAndSet(false, true)' will fail, and therefore + // any attempt to wake up the Selector will fail, too, causing + // the following 'selector.select(...)' call to block + // unnecessarily. + // + // To fix this problem, we wake up the selector again if wakenUp + // is true immediately after selector.select(...). + // It is inefficient in that it wakes up the selector for both + // the first case (BAD - wake-up required) and the second case + // (OK - no wake-up required). + if (wakenUp.get()) { + selector.wakeup(); + } + processBindTaskQueue(); + processSelectedKeys(selector.selectedKeys()); + + // Exit the loop when there's nothing to handle. + // The shutdown flag is used to delay the shutdown of this + // loop to avoid excessive Selector creation when + // connections are registered in a one-by-one manner instead of + // concurrent manner. + if (selector.keys().isEmpty()) { + if (shutdown || bossExecutor instanceof ExecutorService && + ((ExecutorService) bossExecutor).isShutdown()) { + + synchronized (startStopLock) { + if (selector.keys().isEmpty()) { + try { + selector.close(); + } catch (IOException e) { + logger.warn( + "Failed to close a selector.", e); + } finally { + selector = null; + } + break; + } else { + shutdown = false; + } + } + } + } else { + shutdown = false; + } + } catch (Throwable e) { + if (logger.isWarnEnabled()) { + logger.warn( + "Failed to accept a connection.", e); + } + + try { + Thread.sleep(1000); + } catch (InterruptedException e1) { + // Ignore + } + } + } + } + + private void processBindTaskQueue() throws IOException { + for (;;) { + final Runnable task = bindTaskQueue.poll(); + if (task == null) { + break; + } + + task.run(); + cleanUpCancelledKeys(); + } + } + private boolean cleanUpCancelledKeys() throws IOException { + if (cancelledKeys >= CLEANUP_INTERVAL) { + cancelledKeys = 0; + selector.selectNow(); + return true; + } + return false; + } + + private void processSelectedKeys(Set selectedKeys) { + if (selectedKeys.isEmpty()) { + return; + } + for (Iterator i = selectedKeys.iterator(); i.hasNext();) { + SelectionKey k = i.next(); + i.remove(); + NioServerSocketChannel channel = (NioServerSocketChannel) k.attachment(); + + try { + // accept connections in a for loop until no new connection is ready + for (;;) { + SocketChannel acceptedSocket = channel.socket.accept(); + if (acceptedSocket == null) { + break; + } + registerAcceptedChannel(channel, acceptedSocket, currentThread); + + } + } catch (CancelledKeyException e) { + // Raised by accept() when the server socket was closed. + k.cancel(); + channel.close(); + } catch (SocketTimeoutException e) { + // Thrown every second to get ClosedChannelException + // raised. + } catch (ClosedChannelException e) { + // Closed as requested. + } catch (Throwable t) { + if (logger.isWarnEnabled()) { + logger.warn( + "Failed to accept a connection.", t); + } + + try { + Thread.sleep(1000); + } catch (InterruptedException e1) { + // Ignore + } + } + } + } + + private static void registerAcceptedChannel(NioServerSocketChannel parent, SocketChannel acceptedSocket, + Thread currentThread) { + try { + ChannelSink sink = parent.getPipeline().getSink(); + ChannelPipeline pipeline = + parent.getConfig().getPipelineFactory().getPipeline(); + NioWorker worker = parent.workerPool.nextWorker(); + worker.register(new NioAcceptedSocketChannel( + parent.getFactory(), pipeline, parent, sink + , acceptedSocket, + worker, currentThread), null); + } catch (Exception e) { + if (logger.isWarnEnabled()) { + logger.warn( + "Failed to initialize an accepted socket.", e); + } + + try { + acceptedSocket.close(); + } catch (IOException e2) { + if (logger.isWarnEnabled()) { + logger.warn( + "Failed to close a partially accepted socket.", + e2); + } + + } + } + } +} diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerBossPool.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerBossPool.java new file mode 100644 index 0000000000..b5d0e492f2 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerBossPool.java @@ -0,0 +1,58 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.socket.nio; + +import org.jboss.netty.util.ThreadNameDeterminer; + +import java.util.concurrent.Executor; + + +/** + * Holds {@link NioServerBoss} instances to use + */ +public class NioServerBossPool extends AbstractNioBossPool { + private final ThreadNameDeterminer determiner; + + + /** + * Create a new instance + * + * @param bossExecutor the {@link Executor} to use for server the {@link NioServerBoss} + * @param bossCount the number of {@link NioServerBoss} instances this {@link NioServerBossPool} will hold + * @param determiner the {@link ThreadNameDeterminer} to use for name the threads. Use null + * if you not want to set one explicit. + */ + public NioServerBossPool(Executor bossExecutor, int bossCount, ThreadNameDeterminer determiner) { + super(bossExecutor, bossCount); + this.determiner = determiner; + } + + /** + * Create a new instance using no {@link ThreadNameDeterminer} + * + * @param bossExecutor the {@link Executor} to use for server the {@link NioServerBoss} + * @param bossCount the number of {@link NioServerBoss} instances this {@link NioServerBossPool} will hold + */ + public NioServerBossPool(Executor bossExecutor, int bossCount) { + this(bossExecutor, bossCount, null); + } + + + @Override + protected NioServerBoss newBoss(Executor executor) { + return new NioServerBoss(executor, determiner); + } +} diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketChannel.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketChannel.java index 15bbf586d8..fbb3049988 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketChannel.java @@ -38,18 +38,19 @@ class NioServerSocketChannel extends AbstractServerChannel InternalLoggerFactory.getInstance(NioServerSocketChannel.class); final ServerSocketChannel socket; - final Runnable boss; + final Boss boss; + final WorkerPool workerPool; private final ServerSocketChannelConfig config; NioServerSocketChannel( ChannelFactory factory, ChannelPipeline pipeline, - ChannelSink sink, Runnable boss) { + ChannelSink sink, Boss boss, WorkerPool workerPool) { super(factory, pipeline, sink); this.boss = boss; - + this.workerPool = workerPool; try { socket = ServerSocketChannel.open(); } catch (IOException e) { diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketChannelFactory.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketChannelFactory.java index 95090146b4..eca809cc1a 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketChannelFactory.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketChannelFactory.java @@ -83,9 +83,9 @@ import org.jboss.netty.util.ExternalResourceReleasable; */ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory { - final Executor bossExecutor; private final WorkerPool workerPool; private final NioServerSocketPipelineSink sink; + private final BossPool bossPool; /** * Create a new {@link NioServerSocketChannelFactory} using {@link Executors#newCachedThreadPool()} @@ -174,24 +174,41 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory */ public NioServerSocketChannelFactory( Executor bossExecutor, int bossCount, WorkerPool workerPool) { - if (bossExecutor == null) { + this(new NioServerBossPool(bossExecutor, bossCount, null), workerPool); + + } + + /** + * Create a new instance. + * + * @param bossPool + * the {@link BossPool} which will be used to obtain the {@link NioServerBoss} that execute + * the I/O for accept new connections + * @param workerPool + * the {@link WorkerPool} which will be used to obtain the {@link NioWorker} that execute + * the I/O worker threads + */ + public NioServerSocketChannelFactory(BossPool bossPool, WorkerPool workerPool) { + if (bossPool == null) { throw new NullPointerException("bossExecutor"); } if (workerPool == null) { throw new NullPointerException("workerPool"); } - this.bossExecutor = bossExecutor; + this.bossPool = bossPool; this.workerPool = workerPool; - sink = new NioServerSocketPipelineSink(bossExecutor, bossCount, workerPool); + sink = new NioServerSocketPipelineSink(); } public ServerSocketChannel newChannel(ChannelPipeline pipeline) { - return new NioServerSocketChannel(this, pipeline, sink, sink.nextBoss()); + return new NioServerSocketChannel(this, pipeline, sink, bossPool.nextBoss(), workerPool); } public void releaseExternalResources() { - sink.releaseExternalResources(); + if (bossPool instanceof ExternalResourceReleasable) { + ((ExternalResourceReleasable) bossPool).releaseExternalResources(); + } if (workerPool instanceof ExternalResourceReleasable) { ((ExternalResourceReleasable) workerPool).releaseExternalResources(); } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java index 022d1a5dbf..c30f1aaa1f 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java @@ -17,59 +17,15 @@ package org.jboss.netty.channel.socket.nio; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelEvent; -import org.jboss.netty.channel.ChannelException; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelState; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.logging.InternalLogger; -import org.jboss.netty.logging.InternalLoggerFactory; -import org.jboss.netty.util.ExternalResourceReleasable; -import org.jboss.netty.util.ThreadRenamingRunnable; -import org.jboss.netty.util.internal.DeadLockProofWorker; -import org.jboss.netty.util.internal.ExecutorUtil; -import java.io.IOException; import java.net.SocketAddress; -import java.net.SocketTimeoutException; -import java.nio.channels.CancelledKeyException; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.SocketChannel; -import java.util.Iterator; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import static org.jboss.netty.channel.Channels.*; - -class NioServerSocketPipelineSink extends AbstractNioChannelSink implements ExternalResourceReleasable { - - private static final AtomicInteger nextId = new AtomicInteger(); - - static final InternalLogger logger = - InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class); - - final int id = nextId.incrementAndGet(); - private final Boss[] bosses; - private final AtomicInteger bossIndex = new AtomicInteger(); - - private final WorkerPool workerPool; - - NioServerSocketPipelineSink(Executor bossExecutor, int bossCount, WorkerPool workerPool) { - this.workerPool = workerPool; - bosses = new Boss[bossCount]; - for (int i = 0; i < bossCount; i++) { - bosses[i] = new Boss(bossExecutor); - } - } +class NioServerSocketPipelineSink extends AbstractNioChannelSink { public void eventSunk( @@ -97,14 +53,14 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink implements Exte switch (state) { case OPEN: if (Boolean.FALSE.equals(value)) { - ((Boss) channel.boss).close(channel, future); + ((NioServerBoss) channel.boss).close(channel, future); } break; case BOUND: if (value != null) { - ((Boss) channel.boss).bind(channel, future, (SocketAddress) value); + ((NioServerBoss) channel.boss).bind(channel, future, (SocketAddress) value); } else { - ((Boss) channel.boss).close(channel, future); + ((NioServerBoss) channel.boss).close(channel, future); } break; default: @@ -144,321 +100,4 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink implements Exte channel.worker.writeFromUserCode(channel); } } - - NioWorker nextWorker() { - return workerPool.nextWorker(); - } - - Boss nextBoss() { - return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)]; - } - - public void releaseExternalResources() { - for (Boss boss: bosses) { - ExecutorUtil.terminate(boss.bossExecutor); - } - } - - final class Boss implements Runnable { - volatile Selector selector; - private final Executor bossExecutor; - /** - * Queue of channel registration tasks. - */ - private final Queue bindTaskQueue = new ConcurrentLinkedQueue(); - - /** - * Monitor object used to synchronize selector open/close. - */ - private final Object startStopLock = new Object(); - - /** - * Boolean that controls determines if a blocked Selector.select should - * break out of its selection process. In our case we use a timeone for - * the select method and the select method will block for that time unless - * waken up. - */ - private final AtomicBoolean wakenUp = new AtomicBoolean(); - - private Thread currentThread; - - - private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation - static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization. - - Boss(Executor bossExecutor) { - this.bossExecutor = bossExecutor; - openSelector(); - } - - void bind(final NioServerSocketChannel channel, final ChannelFuture future, - final SocketAddress localAddress) { - synchronized (startStopLock) { - if (selector == null) { - // the selector was null this means the Worker has already been shutdown. - throw new RejectedExecutionException("Worker has already been shutdown"); - } - - boolean offered = bindTaskQueue.offer(new Runnable() { - public void run() { - boolean bound = false; - boolean registered = false; - try { - channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog()); - bound = true; - - future.setSuccess(); - fireChannelBound(channel, channel.getLocalAddress()); - channel.socket.register(selector, SelectionKey.OP_ACCEPT, channel); - - registered = true; - } catch (Throwable t) { - future.setFailure(t); - fireExceptionCaught(channel, t); - } finally { - if (!registered && bound) { - close(channel, future); - } - } - - } - }); - assert offered; - - if (wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - } - } - - void close(NioServerSocketChannel channel, ChannelFuture future) { - boolean bound = channel.isBound(); - - try { - channel.socket.close(); - cancelledKeys ++; - - if (channel.setClosed()) { - future.setSuccess(); - - if (bound) { - fireChannelUnbound(channel); - } - fireChannelClosed(channel); - } else { - future.setSuccess(); - } - } catch (Throwable t) { - future.setFailure(t); - fireExceptionCaught(channel, t); - } - } - - private void openSelector() { - try { - selector = Selector.open(); - } catch (Throwable t) { - throw new ChannelException("Failed to create a selector.", t); - } - - // Start the worker thread with the new Selector. - boolean success = false; - try { - DeadLockProofWorker.start(bossExecutor, new ThreadRenamingRunnable(this, - "New I/O server boss #" + id)); - success = true; - } finally { - if (!success) { - // Release the Selector if the execution fails. - try { - selector.close(); - } catch (Throwable t) { - logger.warn("Failed to close a selector.", t); - } - selector = null; - // The method will return to the caller at this point. - } - } - assert selector != null && selector.isOpen(); - } - - public void run() { - currentThread = Thread.currentThread(); - boolean shutdown = false; - for (;;) { - wakenUp.set(false); - - try { - // Just do a blocking select without any timeout - // as this thread does not execute anything else. - selector.select(); - - // 'wakenUp.compareAndSet(false, true)' is always evaluated - // before calling 'selector.wakeup()' to reduce the wake-up - // overhead. (Selector.wakeup() is an expensive operation.) - // - // However, there is a race condition in this approach. - // The race condition is triggered when 'wakenUp' is set to - // true too early. - // - // 'wakenUp' is set to true too early if: - // 1) Selector is waken up between 'wakenUp.set(false)' and - // 'selector.select(...)'. (BAD) - // 2) Selector is waken up between 'selector.select(...)' and - // 'if (wakenUp.get()) { ... }'. (OK) - // - // In the first case, 'wakenUp' is set to true and the - // following 'selector.select(...)' will wake up immediately. - // Until 'wakenUp' is set to false again in the next round, - // 'wakenUp.compareAndSet(false, true)' will fail, and therefore - // any attempt to wake up the Selector will fail, too, causing - // the following 'selector.select(...)' call to block - // unnecessarily. - // - // To fix this problem, we wake up the selector again if wakenUp - // is true immediately after selector.select(...). - // It is inefficient in that it wakes up the selector for both - // the first case (BAD - wake-up required) and the second case - // (OK - no wake-up required). - if (wakenUp.get()) { - selector.wakeup(); - } - processBindTaskQueue(); - processSelectedKeys(selector.selectedKeys()); - - // Exit the loop when there's nothing to handle. - // The shutdown flag is used to delay the shutdown of this - // loop to avoid excessive Selector creation when - // connections are registered in a one-by-one manner instead of - // concurrent manner. - if (selector.keys().isEmpty()) { - if (shutdown || bossExecutor instanceof ExecutorService && - ((ExecutorService) bossExecutor).isShutdown()) { - - synchronized (startStopLock) { - if (selector.keys().isEmpty()) { - try { - selector.close(); - } catch (IOException e) { - logger.warn( - "Failed to close a selector.", e); - } finally { - selector = null; - } - break; - } else { - shutdown = false; - } - } - } - } else { - shutdown = false; - } - } catch (Throwable e) { - if (logger.isWarnEnabled()) { - logger.warn( - "Failed to accept a connection.", e); - } - - try { - Thread.sleep(1000); - } catch (InterruptedException e1) { - // Ignore - } - } - } - } - - private void processBindTaskQueue() throws IOException { - for (;;) { - final Runnable task = bindTaskQueue.poll(); - if (task == null) { - break; - } - - task.run(); - cleanUpCancelledKeys(); - } - } - private boolean cleanUpCancelledKeys() throws IOException { - if (cancelledKeys >= CLEANUP_INTERVAL) { - cancelledKeys = 0; - selector.selectNow(); - return true; - } - return false; - } - - private void processSelectedKeys(Set selectedKeys) { - if (selectedKeys.isEmpty()) { - return; - } - for (Iterator i = selectedKeys.iterator(); i.hasNext();) { - SelectionKey k = i.next(); - i.remove(); - NioServerSocketChannel channel = (NioServerSocketChannel) k.attachment(); - - try { - // accept connections in a for loop until no new connection is ready - for (;;) { - SocketChannel acceptedSocket = channel.socket.accept(); - if (acceptedSocket == null) { - break; - } - registerAcceptedChannel(channel, acceptedSocket, currentThread); - - } - } catch (CancelledKeyException e) { - // Raised by accept() when the server socket was closed. - k.cancel(); - channel.close(); - } catch (SocketTimeoutException e) { - // Thrown every second to get ClosedChannelException - // raised. - } catch (ClosedChannelException e) { - // Closed as requested. - } catch (Throwable t) { - if (logger.isWarnEnabled()) { - logger.warn( - "Failed to accept a connection.", t); - } - - try { - Thread.sleep(1000); - } catch (InterruptedException e1) { - // Ignore - } - } - } - } - - private void registerAcceptedChannel(NioServerSocketChannel parent, SocketChannel acceptedSocket, - Thread currentThread) { - try { - ChannelPipeline pipeline = - parent.getConfig().getPipelineFactory().getPipeline(); - NioWorker worker = nextWorker(); - worker.register(new NioAcceptedSocketChannel( - parent.getFactory(), pipeline, parent, - NioServerSocketPipelineSink.this, acceptedSocket, - worker, currentThread), null); - } catch (Exception e) { - if (logger.isWarnEnabled()) { - logger.warn( - "Failed to initialize an accepted socket.", e); - } - - try { - acceptedSocket.close(); - } catch (IOException e2) { - if (logger.isWarnEnabled()) { - logger.warn( - "Failed to close a partially accepted socket.", - e2); - } - - } - } - } - } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java index 93545f118b..c5f1fa88c2 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java @@ -31,6 +31,7 @@ import org.jboss.netty.buffer.ChannelBufferFactory; import org.jboss.netty.channel.ChannelException; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ReceiveBufferSizePredictor; +import org.jboss.netty.util.ThreadNameDeterminer; public class NioWorker extends AbstractNioWorker { @@ -40,6 +41,10 @@ public class NioWorker extends AbstractNioWorker { super(executor); } + public NioWorker(Executor executor, ThreadNameDeterminer determiner) { + super(executor, determiner); + } + @Override protected boolean read(SelectionKey k) { final SocketChannel ch = (SocketChannel) k.channel(); diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorkerPool.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorkerPool.java index 6311c457b0..d84cd5d0ad 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorkerPool.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorkerPool.java @@ -15,6 +15,8 @@ */ package org.jboss.netty.channel.socket.nio; +import org.jboss.netty.util.ThreadNameDeterminer; + import java.util.concurrent.Executor; @@ -25,13 +27,22 @@ import java.util.concurrent.Executor; */ public class NioWorkerPool extends AbstractNioWorkerPool { + private final ThreadNameDeterminer determiner; + public NioWorkerPool(Executor workerExecutor, int workerCount) { - super(workerExecutor, workerCount); + this(workerExecutor, workerCount, null); + } + + + public NioWorkerPool(Executor workerExecutor, int workerCount, ThreadNameDeterminer determiner) { + super(workerExecutor, workerCount, false); + this.determiner = determiner; + init(); } @Override protected NioWorker createWorker(Executor executor) { - return new NioWorker(executor); + return new NioWorker(executor, determiner); } } diff --git a/src/main/java/org/jboss/netty/util/ThreadRenamingRunnable.java b/src/main/java/org/jboss/netty/util/ThreadRenamingRunnable.java index d52dbcfc85..a8839207d5 100644 --- a/src/main/java/org/jboss/netty/util/ThreadRenamingRunnable.java +++ b/src/main/java/org/jboss/netty/util/ThreadRenamingRunnable.java @@ -34,6 +34,7 @@ public class ThreadRenamingRunnable implements Runnable { private static volatile ThreadNameDeterminer threadNameDeterminer = ThreadNameDeterminer.PROPOSED; + private final ThreadNameDeterminer determiner; /** * Returns the {@link ThreadNameDeterminer} which overrides the proposed @@ -68,7 +69,7 @@ public class ThreadRenamingRunnable implements Runnable { * and changes the thread name to the specified thread name when the * specified {@code runnable} is running. */ - public ThreadRenamingRunnable(Runnable runnable, String proposedThreadName) { + public ThreadRenamingRunnable(Runnable runnable, String proposedThreadName, ThreadNameDeterminer determiner) { if (runnable == null) { throw new NullPointerException("runnable"); } @@ -76,9 +77,14 @@ public class ThreadRenamingRunnable implements Runnable { throw new NullPointerException("proposedThreadName"); } this.runnable = runnable; + this.determiner = determiner; this.proposedThreadName = proposedThreadName; } + public ThreadRenamingRunnable(Runnable runnable, String proposedThreadName) { + this(runnable, proposedThreadName, null); + } + public void run() { final Thread currentThread = Thread.currentThread(); final String oldThreadName = currentThread.getName(); @@ -113,8 +119,12 @@ public class ThreadRenamingRunnable implements Runnable { String newThreadName = null; try { + ThreadNameDeterminer nameDeterminer = determiner; + if (nameDeterminer == null) { + nameDeterminer = getThreadNameDeterminer(); + } newThreadName = - getThreadNameDeterminer().determineThreadName( + nameDeterminer.determineThreadName( currentThreadName, proposedThreadName); } catch (Throwable t) { logger.warn("Failed to determine the thread name", t); diff --git a/src/test/java/org/jboss/netty/util/ThreadRenamingRunnableTest.java b/src/test/java/org/jboss/netty/util/ThreadRenamingRunnableTest.java index f5b3a7f7cd..8c9c41094e 100644 --- a/src/test/java/org/jboss/netty/util/ThreadRenamingRunnableTest.java +++ b/src/test/java/org/jboss/netty/util/ThreadRenamingRunnableTest.java @@ -17,10 +17,13 @@ package org.jboss.netty.util; import static org.easymock.EasyMock.*; import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; import java.security.Permission; import java.util.concurrent.Executor; +import org.junit.After; +import org.junit.AfterClass; import org.junit.Test; public class ThreadRenamingRunnableTest { @@ -84,6 +87,58 @@ public class ThreadRenamingRunnableTest { } } + // Tests mainly changed which were introduced as part of #711 + @Test + public void testThreadNameDeterminer() { + final String oldThreadName = Thread.currentThread().getName(); + final String newThreadName = "new"; + final String proposed = "proposed"; + + ThreadNameDeterminer determiner = new ThreadNameDeterminer() { + public String determineThreadName(String currentThreadName, String proposedThreadName) throws Exception { + assertEquals(proposed, proposedThreadName); + assertEquals(oldThreadName, currentThreadName); + + return newThreadName; + } + }; + ThreadRenamingRunnable.setThreadNameDeterminer(new ThreadNameDeterminer() { + public String determineThreadName(String currentThreadName, String proposedThreadName) throws Exception { + assertEquals(proposed, proposedThreadName); + assertEquals(oldThreadName, currentThreadName); + return proposed; + } + }); + + Executor e = new ImmediateExecutor(); + try { + e.execute(new ThreadRenamingRunnable(new Runnable() { + public void run() { + assertEquals("Should use the given ThreadNameDEterminer", + newThreadName, Thread.currentThread().getName()); + } + }, proposed, determiner)); + } finally { + assertEquals(oldThreadName, Thread.currentThread().getName()); + } + try { + e.execute(new ThreadRenamingRunnable(new Runnable() { + public void run() { + assertEquals("Should use the static set ThreadNameDeterminer", + proposed, Thread.currentThread().getName()); + } + }, proposed)); + } finally { + assertEquals(oldThreadName, Thread.currentThread().getName()); + } + } + + @AfterClass + public static void after() { + // reset to default + ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.PROPOSED); + } + private static class ImmediateExecutor implements Executor { ImmediateExecutor() {