From 7aa2cfad65ca6e9d7cbdd0f07ce3d13de861553d Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 4 Dec 2012 15:51:31 +0100 Subject: [PATCH] [#775] Rework closing logic of Worker/Boss Added a shutdown() method on ChannelFactory, Boss, Worker, BossPool, WorkerPool and Bootstrap which can be used to just shutdown the instance and so release all internal created resources. This method is also called when releaseExternalResources() which will also release external created resources like Executors. This commit also fixes the problem that the Worker/Boss Thread will never be released if you use an Executor (no ExecutorService). --- .../org/jboss/netty/bootstrap/Bootstrap.java | 10 + .../jboss/netty/channel/ChannelFactory.java | 7 + .../DefaultLocalClientChannelFactory.java | 4 + .../DefaultLocalServerChannelFactory.java | 4 + .../jboss/netty/channel/socket/Worker.java | 7 - ...tpTunnelingClientSocketChannelFactory.java | 4 + .../socket/nio/AbstractNioBossPool.java | 8 +- .../socket/nio/AbstractNioSelector.java | 418 ++++++++++++++++++ .../channel/socket/nio/AbstractNioWorker.java | 409 ++--------------- .../socket/nio/AbstractNioWorkerPool.java | 8 +- .../jboss/netty/channel/socket/nio/Boss.java | 9 +- .../netty/channel/socket/nio/BossPool.java | 10 +- .../channel/socket/nio/NioClientBoss.java | 351 ++------------- .../channel/socket/nio/NioClientBossPool.java | 10 + .../nio/NioClientSocketChannelFactory.java | 15 + .../nio/NioClientSocketPipelineSink.java | 2 +- .../socket/nio/NioDatagramChannel.java | 2 +- .../socket/nio/NioDatagramChannelFactory.java | 20 +- .../channel/socket/nio/NioDatagramWorker.java | 23 +- .../netty/channel/socket/nio/NioSelector.java | 34 ++ .../channel/socket/nio/NioSelectorPool.java | 31 ++ .../channel/socket/nio/NioServerBoss.java | 339 +++----------- .../nio/NioServerSocketChannelFactory.java | 16 +- .../nio/NioServerSocketPipelineSink.java | 2 +- .../netty/channel/socket/nio/NioWorker.java | 38 +- .../socket/nio/ShareableWorkerPool.java | 5 + .../netty/channel/socket/nio/WorkerPool.java | 10 +- .../channel/socket/oio/AbstractOioWorker.java | 5 - .../oio/OioClientSocketChannelFactory.java | 9 + .../socket/oio/OioDatagramChannelFactory.java | 9 + .../oio/OioServerSocketChannelFactory.java | 11 +- .../embedder/EmbeddedChannelFactory.java | 4 + .../AbstractSocketClientBootstrapTest.java | 40 +- .../AbstractSocketServerBootstrapTest.java | 26 +- .../jboss/netty/bootstrap/BootstrapTest.java | 10 + .../socket/AbstractDatagramMulticastTest.java | 21 +- .../channel/socket/AbstractDatagramTest.java | 23 +- .../socket/AbstractSocketEchoTest.java | 6 + .../NioClientSocketShutdownTimeTest.java | 2 +- .../socket/nio/AbstractNioWorkerTest.java | 7 +- .../socket/nio/NioDatagramWorkerTest.java | 7 + .../channel/socket/nio/NioWorkerTest.java | 7 + .../AbstractSocketFixedLengthEchoTest.java | 5 + ...tSocketCompatibleObjectStreamEchoTest.java | 22 +- .../AbstractSocketObjectStreamEchoTest.java | 23 +- .../spdy/AbstractSocketSpdyEchoTest.java | 23 +- .../string/AbstractSocketStringEchoTest.java | 22 +- .../ssl/AbstractSocketSslEchoTest.java | 30 +- 48 files changed, 867 insertions(+), 1241 deletions(-) create mode 100644 src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioSelector.java create mode 100644 src/main/java/org/jboss/netty/channel/socket/nio/NioSelector.java create mode 100644 src/main/java/org/jboss/netty/channel/socket/nio/NioSelectorPool.java diff --git a/src/main/java/org/jboss/netty/bootstrap/Bootstrap.java b/src/main/java/org/jboss/netty/bootstrap/Bootstrap.java index 818e75a5cd..7ade53c3a0 100644 --- a/src/main/java/org/jboss/netty/bootstrap/Bootstrap.java +++ b/src/main/java/org/jboss/netty/bootstrap/Bootstrap.java @@ -319,6 +319,16 @@ public class Bootstrap implements ExternalResourceReleasable { } } + /** + * This method simply delegates the call to + * {@link ChannelFactory#shutdown()}. + */ + public void shutdown() { + ChannelFactory factory = this.factory; + if (factory != null) { + factory.shutdown(); + } + } /** * Returns {@code true} if and only if the specified {@code map} is an * ordered map, like {@link LinkedHashMap} is. diff --git a/src/main/java/org/jboss/netty/channel/ChannelFactory.java b/src/main/java/org/jboss/netty/channel/ChannelFactory.java index 027007d08d..936b9b3177 100644 --- a/src/main/java/org/jboss/netty/channel/ChannelFactory.java +++ b/src/main/java/org/jboss/netty/channel/ChannelFactory.java @@ -67,6 +67,11 @@ public interface ChannelFactory extends ExternalResourceReleasable { */ Channel newChannel(ChannelPipeline pipeline); + /** + * Shudown the ChannelFactory and all the resource it created internal. + */ + void shutdown(); + /** * Releases the external resources that this factory depends on to function. * An external resource is a resource that this factory didn't create by @@ -76,6 +81,8 @@ public interface ChannelFactory extends ExternalResourceReleasable { * this factory or any other part of your application. An unexpected * behavior will be resulted in if the resources are released when there's * an open channel which is managed by this factory. + * + * This will also call {@link #shutdown()} before do any action */ void releaseExternalResources(); } diff --git a/src/main/java/org/jboss/netty/channel/local/DefaultLocalClientChannelFactory.java b/src/main/java/org/jboss/netty/channel/local/DefaultLocalClientChannelFactory.java index c9cfa28e22..d4f078b081 100644 --- a/src/main/java/org/jboss/netty/channel/local/DefaultLocalClientChannelFactory.java +++ b/src/main/java/org/jboss/netty/channel/local/DefaultLocalClientChannelFactory.java @@ -44,4 +44,8 @@ public class DefaultLocalClientChannelFactory implements LocalClientChannelFacto public void releaseExternalResources() { // No external resources. } + + public void shutdown() { + // nothing to shutdown + } } diff --git a/src/main/java/org/jboss/netty/channel/local/DefaultLocalServerChannelFactory.java b/src/main/java/org/jboss/netty/channel/local/DefaultLocalServerChannelFactory.java index cb1f0325fb..0702d7ef70 100644 --- a/src/main/java/org/jboss/netty/channel/local/DefaultLocalServerChannelFactory.java +++ b/src/main/java/org/jboss/netty/channel/local/DefaultLocalServerChannelFactory.java @@ -41,4 +41,8 @@ public class DefaultLocalServerChannelFactory implements LocalServerChannelFacto public void releaseExternalResources() { group.close().awaitUninterruptibly(); } + + public void shutdown() { + // nothing to shutdown + } } diff --git a/src/main/java/org/jboss/netty/channel/socket/Worker.java b/src/main/java/org/jboss/netty/channel/socket/Worker.java index 125d3c0cc1..26bb1b471b 100644 --- a/src/main/java/org/jboss/netty/channel/socket/Worker.java +++ b/src/main/java/org/jboss/netty/channel/socket/Worker.java @@ -16,8 +16,6 @@ package org.jboss.netty.channel.socket; -import java.nio.channels.Selector; - /** * A {@link Worker} is responsible to dispatch IO operations * @@ -33,9 +31,4 @@ public interface Worker extends Runnable { */ void executeInIoThread(Runnable task); - /** - * Replaces the current {@link Selector} with a new {@link Selector} to work around the infamous epoll 100% CPU - * bug. - */ - void rebuildSelector(); } diff --git a/src/main/java/org/jboss/netty/channel/socket/http/HttpTunnelingClientSocketChannelFactory.java b/src/main/java/org/jboss/netty/channel/socket/http/HttpTunnelingClientSocketChannelFactory.java index 5c0f79a299..552fc41f90 100644 --- a/src/main/java/org/jboss/netty/channel/socket/http/HttpTunnelingClientSocketChannelFactory.java +++ b/src/main/java/org/jboss/netty/channel/socket/http/HttpTunnelingClientSocketChannelFactory.java @@ -47,4 +47,8 @@ public class HttpTunnelingClientSocketChannelFactory implements ClientSocketChan public void releaseExternalResources() { clientSocketChannelFactory.releaseExternalResources(); } + + public void shutdown() { + clientSocketChannelFactory.shutdown(); + } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioBossPool.java b/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioBossPool.java index 536c1f55c9..c8e3be7725 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioBossPool.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioBossPool.java @@ -87,11 +87,13 @@ public abstract class AbstractNioBossPool } public void releaseExternalResources() { + shutdown(); ExecutorUtil.terminate(bossExecutor); + } + + public void shutdown() { for (Boss boss: bosses) { - if (boss instanceof ExternalResourceReleasable) { - ((ExternalResourceReleasable) boss).releaseExternalResources(); - } + boss.shutdown(); } } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioSelector.java b/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioSelector.java new file mode 100644 index 0000000000..6ebf167432 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioSelector.java @@ -0,0 +1,418 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.socket.nio; + +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelException; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.logging.InternalLogger; +import org.jboss.netty.logging.InternalLoggerFactory; +import org.jboss.netty.util.ThreadNameDeterminer; +import org.jboss.netty.util.ThreadRenamingRunnable; +import org.jboss.netty.util.internal.DeadLockProofWorker; + +import java.io.IOException; +import java.nio.channels.CancelledKeyException; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.ConcurrentModificationException; +import java.util.Iterator; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +abstract class AbstractNioSelector implements NioSelector { + + private static final AtomicInteger nextId = new AtomicInteger(); + + private final int id = nextId.incrementAndGet(); + + /** + * Internal Netty logger. + */ + protected static final InternalLogger logger = InternalLoggerFactory + .getInstance(AbstractNioSelector.class); + + private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization. + + /** + * Executor used to execute {@link Runnable}s such as channel registration + * task. + */ + private final Executor executor; + + /** + * If this worker has been started thread will be a reference to the thread + * used when starting. i.e. the current thread when the run method is executed. + */ + protected volatile Thread thread; + + /** + * The NIO {@link Selector}. + */ + protected volatile Selector selector; + + /** + * Boolean that controls determines if a blocked Selector.select should + * break out of its selection process. In our case we use a timeone for + * the select method and the select method will block for that time unless + * waken up. + */ + protected final AtomicBoolean wakenUp = new AtomicBoolean(); + + private final Queue taskQueue = new ConcurrentLinkedQueue(); + + private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation + + private final CountDownLatch shutdownLatch = new CountDownLatch(1); + private volatile boolean shutdown; + + AbstractNioSelector(Executor executor) { + this(executor, null); + } + + AbstractNioSelector(Executor executor, ThreadNameDeterminer determiner) { + this.executor = executor; + openSelector(determiner); + } + + public void register(Channel channel, ChannelFuture future) { + Runnable task = createRegisterTask(channel, future); + registerTask(task); + } + + protected final void registerTask(Runnable task) { + taskQueue.add(task); + + Selector selector = this.selector; + + if (selector != null) { + if (wakenUp.compareAndSet(false, true)) { + selector.wakeup(); + } + } else { + if (taskQueue.remove(task)) { + // the selector was null this means the Worker has already been shutdown. + throw new RejectedExecutionException("Worker has already been shutdown"); + } + } + } + + protected final boolean isIoThread() { + return Thread.currentThread() == thread; + } + + public void rebuildSelector() { + if (!isIoThread()) { + taskQueue.add(new Runnable() { + public void run() { + rebuildSelector(); + } + }); + return; + } + + final Selector oldSelector = selector; + final Selector newSelector; + + if (oldSelector == null) { + return; + } + + try { + newSelector = Selector.open(); + } catch (Exception e) { + logger.warn("Failed to create a new Selector.", e); + return; + } + + // Register all channels to the new Selector. + int nChannels = 0; + for (;;) { + try { + for (SelectionKey key: oldSelector.keys()) { + try { + if (key.channel().keyFor(newSelector) != null) { + continue; + } + + int interestOps = key.interestOps(); + key.cancel(); + key.channel().register(newSelector, interestOps, key.attachment()); + nChannels ++; + } catch (Exception e) { + logger.warn("Failed to re-register a Channel to the new Selector,", e); + close(key); + } + } + } catch (ConcurrentModificationException e) { + // Probably due to concurrent modification of the key set. + continue; + } + + break; + } + + selector = newSelector; + + try { + // time to close the old selector as everything else is registered to the new one + oldSelector.close(); + } catch (Throwable t) { + if (logger.isWarnEnabled()) { + logger.warn("Failed to close the old Selector.", t); + } + } + + logger.info("Migrated " + nChannels + " channel(s) to the new Selector,"); + } + + public void run() { + thread = Thread.currentThread(); + + int selectReturnsImmediately = 0; + Selector selector = this.selector; + + if (selector == null) { + return; + } + // use 80% of the timeout for measure + final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100; + boolean wakenupFromLoop = false; + for (;;) { + wakenUp.set(false); + + try { + long beforeSelect = System.nanoTime(); + int selected = select(selector); + if (SelectorUtil.EPOLL_BUG_WORKAROUND && selected == 0 && !wakenupFromLoop && !wakenUp.get()) { + long timeBlocked = System.nanoTime() - beforeSelect; + + if (timeBlocked < minSelectTimeout) { + boolean notConnected = false; + // loop over all keys as the selector may was unblocked because of a closed channel + for (SelectionKey key: selector.keys()) { + SelectableChannel ch = key.channel(); + try { + if (ch instanceof DatagramChannel && !((DatagramChannel) ch).isConnected() || + ch instanceof SocketChannel && !((SocketChannel) ch).isConnected()) { + notConnected = true; + // cancel the key just to be on the safe side + key.cancel(); + } + } catch (CancelledKeyException e) { + // ignore + } + } + if (notConnected) { + selectReturnsImmediately = 0; + } else { + // returned before the minSelectTimeout elapsed with nothing select. + // this may be the cause of the jdk epoll(..) bug, so increment the counter + // which we use later to see if its really the jdk bug. + selectReturnsImmediately ++; + } + } else { + selectReturnsImmediately = 0; + } + + if (selectReturnsImmediately == 1024) { + // The selector returned immediately for 10 times in a row, + // so recreate one selector as it seems like we hit the + // famous epoll(..) jdk bug. + rebuildSelector(); + selector = this.selector; + selectReturnsImmediately = 0; + wakenupFromLoop = false; + // try to select again + continue; + } + } else { + // reset counter + selectReturnsImmediately = 0; + } + + // 'wakenUp.compareAndSet(false, true)' is always evaluated + // before calling 'selector.wakeup()' to reduce the wake-up + // overhead. (Selector.wakeup() is an expensive operation.) + // + // However, there is a race condition in this approach. + // The race condition is triggered when 'wakenUp' is set to + // true too early. + // + // 'wakenUp' is set to true too early if: + // 1) Selector is waken up between 'wakenUp.set(false)' and + // 'selector.select(...)'. (BAD) + // 2) Selector is waken up between 'selector.select(...)' and + // 'if (wakenUp.get()) { ... }'. (OK) + // + // In the first case, 'wakenUp' is set to true and the + // following 'selector.select(...)' will wake up immediately. + // Until 'wakenUp' is set to false again in the next round, + // 'wakenUp.compareAndSet(false, true)' will fail, and therefore + // any attempt to wake up the Selector will fail, too, causing + // the following 'selector.select(...)' call to block + // unnecessarily. + // + // To fix this problem, we wake up the selector again if wakenUp + // is true immediately after selector.select(...). + // It is inefficient in that it wakes up the selector for both + // the first case (BAD - wake-up required) and the second case + // (OK - no wake-up required). + + if (wakenUp.get()) { + wakenupFromLoop = true; + selector.wakeup(); + } else { + wakenupFromLoop = false; + } + + cancelledKeys = 0; + processTaskQueue(); + selector = this.selector; // processTaskQueue() can call rebuildSelector() + + if (shutdown) { + this.selector = null; + + // process one time again + processTaskQueue(); + + for (Iterator i = selector.keys().iterator(); i.hasNext();) { + close(i.next()); + } + + try { + selector.close(); + } catch (IOException e) { + logger.warn( + "Failed to close a selector.", e); + } + shutdownLatch.countDown(); + break; + } else { + process(selector); + } + } catch (Throwable t) { + logger.warn( + "Unexpected exception in the selector loop.", t); + + // Prevent possible consecutive immediate failures that lead to + // excessive CPU consumption. + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // Ignore. + } + } + } + } + + /** + * Start the {@link AbstractNioWorker} and return the {@link Selector} that will be used for + * the {@link AbstractNioChannel}'s when they get registered + */ + private void openSelector(ThreadNameDeterminer determiner) { + try { + selector = Selector.open(); + } catch (Throwable t) { + throw new ChannelException("Failed to create a selector.", t); + } + + // Start the worker thread with the new Selector. + boolean success = false; + try { + DeadLockProofWorker.start(executor, newThreadRenamingRunnable(id, determiner)); + success = true; + } finally { + if (!success) { + // Release the Selector if the execution fails. + try { + selector.close(); + } catch (Throwable t) { + logger.warn("Failed to close a selector.", t); + } + selector = null; + // The method will return to the caller at this point. + } + } + assert selector != null && selector.isOpen(); + } + + private void processTaskQueue() { + for (;;) { + final Runnable task = taskQueue.poll(); + if (task == null) { + break; + } + task.run(); + try { + cleanUpCancelledKeys(); + } catch (IOException e) { + // Ignore + } + } + } + + protected final void increaseCancelledKeys() { + cancelledKeys ++; + } + + protected final boolean cleanUpCancelledKeys() throws IOException { + if (cancelledKeys >= CLEANUP_INTERVAL) { + cancelledKeys = 0; + selector.selectNow(); + return true; + } + return false; + } + + public void shutdown() { + if (isIoThread()) { + throw new IllegalStateException("Must not be called from a I/O-Thread to prevent deadlocks!"); + } + + Selector selector = this.selector; + shutdown = true; + if (selector != null) { + selector.wakeup(); + } + try { + shutdownLatch.await(); + } catch (InterruptedException e) { + logger.error("Interrupted while wait for resources to be released #" + id); + Thread.currentThread().interrupt(); + } + } + + protected abstract void process(Selector selector) throws IOException; + + protected int select(Selector selector) throws IOException { + return SelectorUtil.select(selector); + } + + protected abstract void close(SelectionKey k); + + protected abstract ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner); + + protected abstract Runnable createRegisterTask(Channel channel, ChannelFuture future); +} diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorker.java index 0ca7c11109..3c242b2905 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorker.java @@ -16,358 +16,39 @@ package org.jboss.netty.channel.socket.nio; import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelException; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.socket.Worker; import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; -import org.jboss.netty.logging.InternalLogger; -import org.jboss.netty.logging.InternalLoggerFactory; -import org.jboss.netty.util.ExternalResourceReleasable; import org.jboss.netty.util.ThreadNameDeterminer; import org.jboss.netty.util.ThreadRenamingRunnable; -import org.jboss.netty.util.internal.DeadLockProofWorker; import java.io.IOException; import java.nio.channels.AsynchronousCloseException; import java.nio.channels.CancelledKeyException; import java.nio.channels.ClosedChannelException; -import java.nio.channels.DatagramChannel; import java.nio.channels.NotYetConnectedException; -import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; -import java.nio.channels.SocketChannel; import java.nio.channels.WritableByteChannel; -import java.util.ConcurrentModificationException; import java.util.Iterator; import java.util.Queue; import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; + import static org.jboss.netty.channel.Channels.*; -abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable { - - private static final AtomicInteger nextId = new AtomicInteger(); - - final int id = nextId.incrementAndGet(); - - /** - * Internal Netty logger. - */ - private static final InternalLogger logger = InternalLoggerFactory - .getInstance(AbstractNioWorker.class); - - static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization. - - /** - * Executor used to execute {@link Runnable}s such as channel registration - * task. - */ - private final Executor executor; - - /** - * If this worker has been started thread will be a reference to the thread - * used when starting. i.e. the current thread when the run method is executed. - */ - protected volatile Thread thread; - - /** - * The NIO {@link Selector}. - */ - volatile Selector selector; - - /** - * Boolean that controls determines if a blocked Selector.select should - * break out of its selection process. In our case we use a timeone for - * the select method and the select method will block for that time unless - * waken up. - */ - protected final AtomicBoolean wakenUp = new AtomicBoolean(); - - /** - * Monitor object used to synchronize selector open/close. - */ - private final Object startStopLock = new Object(); - - final Queue taskQueue = new ConcurrentLinkedQueue(); - - private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation +abstract class AbstractNioWorker extends AbstractNioSelector implements Worker { protected final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool(); AbstractNioWorker(Executor executor) { - this(executor, null); + super(executor); } AbstractNioWorker(Executor executor, ThreadNameDeterminer determiner) { - this.executor = executor; - openSelector(determiner); - } - - void register(AbstractNioChannel channel, ChannelFuture future) { - - synchronized (startStopLock) { - if (selector == null) { - // the selector was null this means the Worker has already been shutdown. - throw new RejectedExecutionException("Worker has already been shutdown"); - } - - taskQueue.add(createRegisterTask(channel, future)); - - if (wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - } - } - - public void rebuildSelector() { - if (Thread.currentThread() != thread) { - executeInIoThread(new Runnable() { - public void run() { - rebuildSelector(); - } - }, true); - return; - } - - final Selector oldSelector = selector; - final Selector newSelector; - - if (oldSelector == null) { - return; - } - - try { - newSelector = Selector.open(); - } catch (Exception e) { - logger.warn("Failed to create a new Selector.", e); - return; - } - - // Register all channels to the new Selector. - int nChannels = 0; - for (;;) { - try { - for (SelectionKey key: oldSelector.keys()) { - try { - if (key.channel().keyFor(newSelector) != null) { - continue; - } - - int interestOps = key.interestOps(); - key.cancel(); - key.channel().register(newSelector, interestOps, key.attachment()); - nChannels ++; - } catch (Exception e) { - logger.warn("Failed to re-register a Channel to the new Selector,", e); - AbstractNioChannel channel = (AbstractNioChannel) key.attachment(); - close(channel, succeededFuture(channel)); - } - } - } catch (ConcurrentModificationException e) { - // Probably due to concurrent modification of the key set. - continue; - } - - break; - } - - selector = newSelector; - - try { - // time to close the old selector as everything else is registered to the new one - oldSelector.close(); - } catch (Throwable t) { - if (logger.isWarnEnabled()) { - logger.warn("Failed to close the old Selector.", t); - } - } - - logger.info("Migrated " + nChannels + " channel(s) to the new Selector,"); - } - - /** - * Start the {@link AbstractNioWorker} and return the {@link Selector} that will be used for - * the {@link AbstractNioChannel}'s when they get registered - */ - private void openSelector(ThreadNameDeterminer determiner) { - try { - selector = Selector.open(); - } catch (Throwable t) { - throw new ChannelException("Failed to create a selector.", t); - } - - // Start the worker thread with the new Selector. - boolean success = false; - try { - DeadLockProofWorker.start(executor, new ThreadRenamingRunnable(this, "New I/O worker #" + id, determiner)); - success = true; - } finally { - if (!success) { - // Release the Selector if the execution fails. - try { - selector.close(); - } catch (Throwable t) { - logger.warn("Failed to close a selector.", t); - } - selector = null; - // The method will return to the caller at this point. - } - } - assert selector != null && selector.isOpen(); - } - - public void run() { - thread = Thread.currentThread(); - - boolean shutdown = false; - int selectReturnsImmediately = 0; - Selector selector = this.selector; - - // use 80% of the timeout for measure - final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100; - boolean wakenupFromLoop = false; - for (;;) { - wakenUp.set(false); - - try { - long beforeSelect = System.nanoTime(); - int selected = SelectorUtil.select(selector); - if (SelectorUtil.EPOLL_BUG_WORKAROUND && selected == 0 && !wakenupFromLoop && !wakenUp.get()) { - long timeBlocked = System.nanoTime() - beforeSelect; - - if (timeBlocked < minSelectTimeout) { - boolean notConnected = false; - // loop over all keys as the selector may was unblocked because of a closed channel - for (SelectionKey key: selector.keys()) { - SelectableChannel ch = key.channel(); - try { - if (ch instanceof DatagramChannel && !((DatagramChannel) ch).isConnected() || - ch instanceof SocketChannel && !((SocketChannel) ch).isConnected()) { - notConnected = true; - // cancel the key just to be on the safe side - key.cancel(); - } - } catch (CancelledKeyException e) { - // ignore - } - } - if (notConnected) { - selectReturnsImmediately = 0; - } else { - // returned before the minSelectTimeout elapsed with nothing select. - // this may be the cause of the jdk epoll(..) bug, so increment the counter - // which we use later to see if its really the jdk bug. - selectReturnsImmediately ++; - } - } else { - selectReturnsImmediately = 0; - } - - if (selectReturnsImmediately == 1024) { - // The selector returned immediately for 10 times in a row, - // so recreate one selector as it seems like we hit the - // famous epoll(..) jdk bug. - rebuildSelector(); - selector = this.selector; - selectReturnsImmediately = 0; - wakenupFromLoop = false; - // try to select again - continue; - } - } else { - // reset counter - selectReturnsImmediately = 0; - } - - // 'wakenUp.compareAndSet(false, true)' is always evaluated - // before calling 'selector.wakeup()' to reduce the wake-up - // overhead. (Selector.wakeup() is an expensive operation.) - // - // However, there is a race condition in this approach. - // The race condition is triggered when 'wakenUp' is set to - // true too early. - // - // 'wakenUp' is set to true too early if: - // 1) Selector is waken up between 'wakenUp.set(false)' and - // 'selector.select(...)'. (BAD) - // 2) Selector is waken up between 'selector.select(...)' and - // 'if (wakenUp.get()) { ... }'. (OK) - // - // In the first case, 'wakenUp' is set to true and the - // following 'selector.select(...)' will wake up immediately. - // Until 'wakenUp' is set to false again in the next round, - // 'wakenUp.compareAndSet(false, true)' will fail, and therefore - // any attempt to wake up the Selector will fail, too, causing - // the following 'selector.select(...)' call to block - // unnecessarily. - // - // To fix this problem, we wake up the selector again if wakenUp - // is true immediately after selector.select(...). - // It is inefficient in that it wakes up the selector for both - // the first case (BAD - wake-up required) and the second case - // (OK - no wake-up required). - - if (wakenUp.get()) { - wakenupFromLoop = true; - selector.wakeup(); - } else { - wakenupFromLoop = false; - } - - cancelledKeys = 0; - processTaskQueue(); - selector = this.selector; // processTaskQueue() can call rebuildSelector() - processSelectedKeys(selector.selectedKeys()); - - // Exit the loop when there's nothing to handle. - // The shutdown flag is used to delay the shutdown of this - // loop to avoid excessive Selector creation when - // connections are registered in a one-by-one manner instead of - // concurrent manner. - if (selector.keys().isEmpty()) { - if (shutdown || - executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) { - - synchronized (startStopLock) { - if (taskQueue.isEmpty() && selector.keys().isEmpty()) { - try { - selector.close(); - } catch (IOException e) { - logger.warn( - "Failed to close a selector.", e); - } finally { - this.selector = null; - } - break; - } else { - shutdown = false; - } - } - } - } else { - shutdown = false; - } - } catch (Throwable t) { - logger.warn( - "Unexpected exception in the selector loop.", t); - - // Prevent possible consecutive immediate failures that lead to - // excessive CPU consumption. - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // Ignore. - } - } - } + super(executor, determiner); } public void executeInIoThread(Runnable task) { @@ -384,48 +65,33 @@ abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable { * in an async fashion even if the current Thread == IO Thread */ public void executeInIoThread(Runnable task, boolean alwaysAsync) { - if (!alwaysAsync && Thread.currentThread() == thread) { + if (!alwaysAsync && isIoThread()) { task.run(); } else { - taskQueue.offer(task); - - synchronized (startStopLock) { - // check if the selector was shutdown already or was not started yet. If so execute all - // submitted tasks in the calling thread - if (selector == null) { - // execute everything in the event queue as the - for (;;) { - Runnable r = taskQueue.poll(); - if (r == null) { - break; - } - r.run(); - } - } else { - if (wakenUp.compareAndSet(false, true)) { - // wake up the selector to speed things - Selector selector = this.selector; - if (selector != null) { - selector.wakeup(); - } - } - } - } + registerTask(task); } } - private void processTaskQueue() throws IOException { - for (;;) { - final Runnable task = taskQueue.poll(); - if (task == null) { - break; - } - task.run(); - cleanUpCancelledKeys(); - } + @Override + protected void close(SelectionKey k) { + AbstractNioChannel ch = (AbstractNioChannel) k.attachment(); + close(ch, succeededFuture(ch)); } - private void processSelectedKeys(Set selectedKeys) throws IOException { + @Override + protected ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner) { + return new ThreadRenamingRunnable(this, "New I/O worker #" + id, determiner); + } + + @Override + public void run() { + super.run(); + sendBufferPool.releaseExternalResources(); + } + + @Override + protected void process(Selector selector) throws IOException { + Set selectedKeys = selector.selectedKeys(); // check if the set is empty and if so just return to not create garbage by // creating a new Iterator every time even if there is nothing to process. // See https://github.com/netty/netty/issues/597 @@ -456,20 +122,6 @@ abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable { } } - private boolean cleanUpCancelledKeys() throws IOException { - if (cancelledKeys >= CLEANUP_INTERVAL) { - cancelledKeys = 0; - selector.selectNow(); - return true; - } - return false; - } - - private void close(SelectionKey k) { - AbstractNioChannel ch = (AbstractNioChannel) k.attachment(); - close(ch, succeededFuture(ch)); - } - void writeFromUserCode(final AbstractNioChannel channel) { if (!channel.isConnected()) { cleanUpWriteBuffer(channel); @@ -668,14 +320,14 @@ abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable { } } - void close(AbstractNioChannel channel, ChannelFuture future) { + protected void close(AbstractNioChannel channel, ChannelFuture future) { boolean connected = channel.isConnected(); boolean bound = channel.isBound(); boolean iothread = isIoThread(channel); try { channel.channel.close(); - cancelledKeys ++; + increaseCancelledKeys(); if (channel.setClosed()) { future.setSuccess(); @@ -835,10 +487,6 @@ abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable { } } - public void releaseExternalResources() { - sendBufferPool.releaseExternalResources(); - } - /** * Read is called when a Selector has been notified that the underlying channel * was something to be read. The channel would previously have registered its interest @@ -848,9 +496,4 @@ abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable { */ protected abstract boolean read(SelectionKey k); - /** - * Create a new {@link Runnable} which will register the {@link AbstractNioWorker} with the {@link Channel} - */ - protected abstract Runnable createRegisterTask(AbstractNioChannel channel, ChannelFuture future); - } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorkerPool.java b/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorkerPool.java index 8fbaaf78ba..66e18f946c 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorkerPool.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorkerPool.java @@ -105,15 +105,19 @@ public abstract class AbstractNioWorkerPool } public void rebuildSelectors() { - for (Worker worker: workers) { + for (AbstractNioWorker worker: workers) { worker.rebuildSelector(); } } public void releaseExternalResources() { + shutdown(); ExecutorUtil.terminate(workerExecutor); + } + + public void shutdown() { for (AbstractNioWorker worker: workers) { - worker.releaseExternalResources(); + worker.shutdown(); } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/Boss.java b/src/main/java/org/jboss/netty/channel/socket/nio/Boss.java index a2458d930e..8fd0d0c74c 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/Boss.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/Boss.java @@ -15,15 +15,8 @@ */ package org.jboss.netty.channel.socket.nio; -import java.nio.channels.Selector; - /** * Serves the boss tasks like connecting/accepting */ -public interface Boss extends Runnable { - /** - * Replaces the current {@link Selector} with a new {@link Selector} to work around the infamous epoll 100% CPU - * bug. - */ - void rebuildSelector(); +public interface Boss extends NioSelector { } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/BossPool.java b/src/main/java/org/jboss/netty/channel/socket/nio/BossPool.java index f24792ad71..0e91686a6c 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/BossPool.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/BossPool.java @@ -15,21 +15,13 @@ */ package org.jboss.netty.channel.socket.nio; -import java.nio.channels.Selector; - /** * A Pool that holds {@link Boss} instances */ -public interface BossPool { +public interface BossPool extends NioSelectorPool { /** * Return the next {@link Boss} to use * */ E nextBoss(); - - /** - * Replaces the current {@link Selector}s of the {@link Boss}es with new {@link Selector}s to work around the - * infamous epoll 100% CPU bug. - */ - void rebuildSelectors(); } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientBoss.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientBoss.java index e657445b17..0cedc2e6f9 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientBoss.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientBoss.java @@ -15,55 +15,31 @@ */ package org.jboss.netty.channel.socket.nio; -import org.jboss.netty.channel.ChannelException; -import org.jboss.netty.logging.InternalLogger; -import org.jboss.netty.logging.InternalLoggerFactory; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.util.ThreadNameDeterminer; import org.jboss.netty.util.ThreadRenamingRunnable; import org.jboss.netty.util.Timeout; import org.jboss.netty.util.Timer; import org.jboss.netty.util.TimerTask; -import org.jboss.netty.util.internal.DeadLockProofWorker; import java.io.IOException; import java.net.ConnectException; -import java.nio.channels.CancelledKeyException; import java.nio.channels.ClosedChannelException; -import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; -import java.nio.channels.SocketChannel; -import java.util.ConcurrentModificationException; import java.util.Iterator; -import java.util.Queue; import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import static org.jboss.netty.channel.Channels.*; /** * {@link Boss} implementation that handles the connection attempts of clients */ -public final class NioClientBoss implements Boss { +public final class NioClientBoss extends AbstractNioSelector implements Boss { - private static final AtomicInteger nextId = new AtomicInteger(); - - private final int id = nextId.incrementAndGet(); - - static final InternalLogger logger = - InternalLoggerFactory.getInstance(NioClientBoss.class); - - private volatile Selector selector; - private volatile Thread thread; - private boolean started; - private final AtomicBoolean wakenUp = new AtomicBoolean(); - private final Object startStopLock = new Object(); - private final Queue taskQueue = new ConcurrentLinkedQueue(); private final TimerTask wakeupTask = new TimerTask() { public void run(Timeout timeout) throws Exception { // This is needed to prevent a possible race that can lead to a NPE @@ -79,242 +55,35 @@ public final class NioClientBoss implements Boss { } } }; - private final Executor bossExecutor; - private final ThreadNameDeterminer determiner; + private final Timer timer; NioClientBoss(Executor bossExecutor, Timer timer, ThreadNameDeterminer determiner) { - this.bossExecutor = bossExecutor; - this.determiner = determiner; + super(bossExecutor, determiner); this.timer = timer; } - void register(NioClientSocketChannel channel) { - Runnable registerTask = new RegisterTask(this, channel); - Selector selector; - - synchronized (startStopLock) { - if (!started) { - // Open a selector if this worker didn't start yet. - try { - this.selector = selector = Selector.open(); - } catch (Throwable t) { - throw new ChannelException( - "Failed to create a selector.", t); - } - - // Start the worker thread with the new Selector. - boolean success = false; - try { - DeadLockProofWorker.start(bossExecutor, - new ThreadRenamingRunnable(this, - "New I/O client boss #" + id , determiner)); - - success = true; - } finally { - if (!success) { - // Release the Selector if the execution fails. - try { - selector.close(); - } catch (Throwable t) { - if (logger.isWarnEnabled()) { - logger.warn("Failed to close a selector.", t); - } - } - this.selector = selector = null; - // The method will return to the caller at this point. - } - } - } else { - // Use the existing selector if this worker has been started. - selector = this.selector; - } - - assert selector != null && selector.isOpen(); - - started = true; - boolean offered = taskQueue.offer(registerTask); - assert offered; - } - int timeout = channel.getConfig().getConnectTimeoutMillis(); - if (timeout > 0) { - if (!channel.isConnected()) { - channel.timoutTimer = timer.newTimeout(wakeupTask, - timeout, TimeUnit.MILLISECONDS); - } - } - if (wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } + @Override + protected ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner) { + return new ThreadRenamingRunnable(this, "New I/O boss #" + id, determiner); } - public void run() { - thread = Thread.currentThread(); - - boolean shutdown = false; - int selectReturnsImmediately = 0; - - Selector selector = this.selector; - - // use 80% of the timeout for measure - final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100; - boolean wakenupFromLoop = false; - for (;;) { - wakenUp.set(false); - - try { - long beforeSelect = System.nanoTime(); - int selected = SelectorUtil.select(selector); - if (SelectorUtil.EPOLL_BUG_WORKAROUND && selected == 0 && !wakenupFromLoop && !wakenUp.get()) { - long timeBlocked = System.nanoTime() - beforeSelect; - - if (timeBlocked < minSelectTimeout) { - boolean notConnected = false; - // loop over all keys as the selector may was unblocked because of a closed channel - for (SelectionKey key: selector.keys()) { - SelectableChannel ch = key.channel(); - try { - if (ch instanceof SocketChannel && !((SocketChannel) ch).isConnected()) { - notConnected = true; - // cancel the key just to be on the safe side - key.cancel(); - } - } catch (CancelledKeyException e) { - // ignore - } - } - if (notConnected) { - selectReturnsImmediately = 0; - } else { - // returned before the minSelectTimeout elapsed with nothing select. - // this may be the cause of the jdk epoll(..) bug, so increment the counter - // which we use later to see if its really the jdk bug. - selectReturnsImmediately ++; - } - } else { - selectReturnsImmediately = 0; - } - - if (selectReturnsImmediately == 1024) { - // The selector returned immediately for 10 times in a row, - // so recreate one selector as it seems like we hit the - // famous epoll(..) jdk bug. - rebuildSelector(); - selector = this.selector; - selectReturnsImmediately = 0; - wakenupFromLoop = false; - // try to select again - continue; - } - } else { - // reset counter - selectReturnsImmediately = 0; - } - - // 'wakenUp.compareAndSet(false, true)' is always evaluated - // before calling 'selector.wakeup()' to reduce the wake-up - // overhead. (Selector.wakeup() is an expensive operation.) - // - // However, there is a race condition in this approach. - // The race condition is triggered when 'wakenUp' is set to - // true too early. - // - // 'wakenUp' is set to true too early if: - // 1) Selector is waken up between 'wakenUp.set(false)' and - // 'selector.select(...)'. (BAD) - // 2) Selector is waken up between 'selector.select(...)' and - // 'if (wakenUp.get()) { ... }'. (OK) - // - // In the first case, 'wakenUp' is set to true and the - // following 'selector.select(...)' will wake up immediately. - // Until 'wakenUp' is set to false again in the next round, - // 'wakenUp.compareAndSet(false, true)' will fail, and therefore - // any attempt to wake up the Selector will fail, too, causing - // the following 'selector.select(...)' call to block - // unnecessarily. - // - // To fix this problem, we wake up the selector again if wakenUp - // is true immediately after selector.select(...). - // It is inefficient in that it wakes up the selector for both - // the first case (BAD - wake-up required) and the second case - // (OK - no wake-up required). - - if (wakenUp.get()) { - wakenupFromLoop = true; - selector.wakeup(); - } else { - wakenupFromLoop = false; - } - processTaskQueue(); - selector = this.selector; // processTaskQueue() can call rebuildSelector() - processSelectedKeys(selector.selectedKeys()); - - // Handle connection timeout every 10 milliseconds approximately. - long currentTimeNanos = System.nanoTime(); - processConnectTimeout(selector.keys(), currentTimeNanos); - - // Exit the loop when there's nothing to handle. - // The shutdown flag is used to delay the shutdown of this - // loop to avoid excessive Selector creation when - // connection attempts are made in a one-by-one manner - // instead of concurrent manner. - if (selector.keys().isEmpty()) { - if (shutdown || - bossExecutor instanceof ExecutorService && ((ExecutorService) bossExecutor).isShutdown()) { - - synchronized (startStopLock) { - if (taskQueue.isEmpty() && selector.keys().isEmpty()) { - started = false; - try { - selector.close(); - } catch (IOException e) { - if (logger.isWarnEnabled()) { - logger.warn( - "Failed to close a selector.", e); - } - } finally { - this.selector = null; - } - break; - } else { - shutdown = false; - } - } - } else { - // Give one more second. - shutdown = true; - } - } else { - shutdown = false; - } - } catch (Throwable t) { - if (logger.isWarnEnabled()) { - logger.warn( - "Unexpected exception in the selector loop.", t); - } - - // Prevent possible consecutive immediate failures. - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // Ignore. - } - } - } + @Override + protected Runnable createRegisterTask(Channel channel, ChannelFuture future) { + return new RegisterTask(this, (NioClientSocketChannel) channel); } - private void processTaskQueue() { - for (;;) { - final Runnable task = taskQueue.poll(); - if (task == null) { - break; - } + @Override + protected void process(Selector selector) throws IOException { + processSelectedKeys(selector.selectedKeys()); - task.run(); - } + // Handle connection timeout every 10 milliseconds approximately. + long currentTimeNanos = System.nanoTime(); + processConnectTimeout(selector.keys(), currentTimeNanos); } - private static void processSelectedKeys(Set selectedKeys) { + private void processSelectedKeys(Set selectedKeys) { + // check if the set is empty and if so just return to not create garbage by // creating a new Iterator every time even if there is nothing to process. // See https://github.com/netty/netty/issues/597 @@ -385,82 +154,13 @@ public final class NioClientBoss implements Boss { } } - private static void close(SelectionKey k) { + @Override + protected void close(SelectionKey k) { NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); ch.worker.close(ch, succeededFuture(ch)); } - public void rebuildSelector() { - if (Thread.currentThread() != thread) { - Selector selector = this.selector; - if (selector != null) { - taskQueue.add(new Runnable() { - public void run() { - rebuildSelector(); - } - }); - selector.wakeup(); - } - return; - } - - final Selector oldSelector = selector; - final Selector newSelector; - - if (oldSelector == null) { - return; - } - - try { - newSelector = Selector.open(); - } catch (Exception e) { - logger.warn("Failed to create a new Selector.", e); - return; - } - - // Register all channels to the new Selector. - int nChannels = 0; - for (;;) { - try { - for (SelectionKey key: oldSelector.keys()) { - try { - if (key.channel().keyFor(newSelector) != null) { - continue; - } - - int interestOps = key.interestOps(); - key.cancel(); - key.channel().register(newSelector, interestOps, key.attachment()); - nChannels ++; - } catch (Exception e) { - logger.warn("Failed to re-register a Channel to the new Selector,", e); - NioClientSocketChannel ch = (NioClientSocketChannel) key.attachment(); - ch.worker.close(ch, succeededFuture(ch)); - } - } - } catch (ConcurrentModificationException e) { - // Probably due to concurrent modification of the key set. - continue; - } - - break; - } - - selector = newSelector; - - try { - // time to close the old selector as everything else is registered to the new one - oldSelector.close(); - } catch (Throwable t) { - if (logger.isWarnEnabled()) { - logger.warn("Failed to close the old Selector.", t); - } - } - - logger.info("Migrated " + nChannels + " channel(s) to the new Selector,"); - } - - private static final class RegisterTask implements Runnable { + private final class RegisterTask implements Runnable { private final NioClientBoss boss; private final NioClientSocketChannel channel; @@ -470,6 +170,13 @@ public final class NioClientBoss implements Boss { } public void run() { + int timeout = channel.getConfig().getConnectTimeoutMillis(); + if (timeout > 0) { + if (!channel.isConnected()) { + channel.timoutTimer = timer.newTimeout(wakeupTask, + timeout, TimeUnit.MILLISECONDS); + } + } try { channel.channel.register( boss.selector, SelectionKey.OP_CONNECT, channel); diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientBossPool.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientBossPool.java index 2ad4f0741f..b52b57c161 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientBossPool.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientBossPool.java @@ -27,6 +27,7 @@ import java.util.concurrent.Executor; public class NioClientBossPool extends AbstractNioBossPool { private final ThreadNameDeterminer determiner; private final Timer timer; + private boolean stopTimer; /** * Create a new instance @@ -52,6 +53,7 @@ public class NioClientBossPool extends AbstractNioBossPool { */ public NioClientBossPool(Executor bossExecutor, int bossCount) { this(bossExecutor, bossCount, new HashedWheelTimer(), null); + stopTimer = true; } @Override @@ -59,6 +61,14 @@ public class NioClientBossPool extends AbstractNioBossPool { return new NioClientBoss(executor, timer, determiner); } + @Override + public void shutdown() { + super.shutdown(); + if (stopTimer) { + timer.stop(); + } + } + @Override public void releaseExternalResources() { super.releaseExternalResources(); diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannelFactory.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannelFactory.java index d81786e900..39b6c8b08d 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannelFactory.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannelFactory.java @@ -87,6 +87,7 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory private final BossPool bossPool; private final WorkerPool workerPool; private final NioClientSocketPipelineSink sink; + private boolean releasePools; /** * Creates a new {@link NioClientSocketChannelFactory} which uses {@link Executors#newCachedThreadPool()} @@ -96,6 +97,7 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory */ public NioClientSocketChannelFactory() { this(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); + releasePools = true; } /** @@ -211,7 +213,20 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory return new NioClientSocketChannel(this, pipeline, sink, workerPool.nextWorker()); } + public void shutdown() { + bossPool.shutdown(); + workerPool.shutdown(); + if (releasePools) { + releasePools(); + } + } + public void releaseExternalResources() { + shutdown(); + releasePools(); + } + + private void releasePools() { if (bossPool instanceof ExternalResourceReleasable) { ((ExternalResourceReleasable) bossPool).releaseExternalResources(); } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java index b30428a73e..1b66845e03 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java @@ -116,7 +116,7 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink { }); cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); channel.connectFuture = cf; - nextBoss().register(channel); + nextBoss().register(channel, cf); } } catch (Throwable t) { diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java index ef64a320c7..2e0cd5c790 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java @@ -43,7 +43,7 @@ import static org.jboss.netty.channel.Channels.*; /** * Provides an NIO based {@link org.jboss.netty.channel.socket.DatagramChannel}. */ -public final class NioDatagramChannel extends AbstractNioChannel +public class NioDatagramChannel extends AbstractNioChannel implements org.jboss.netty.channel.socket.DatagramChannel { /** diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelFactory.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelFactory.java index cc3a53c3b4..4d5925feda 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelFactory.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelFactory.java @@ -81,6 +81,7 @@ public class NioDatagramChannelFactory implements DatagramChannelFactory { private final NioDatagramPipelineSink sink; private final WorkerPool workerPool; private final InternetProtocolFamily family; + private boolean releasePool; /** * Create a new {@link NioDatagramChannelFactory} with a {@link Executors#newCachedThreadPool()} @@ -91,7 +92,7 @@ public class NioDatagramChannelFactory implements DatagramChannelFactory { * See {@link #NioDatagramChannelFactory(Executor)} */ public NioDatagramChannelFactory() { - this(Executors.newCachedThreadPool(), null); + this((InternetProtocolFamily) null); } /** @@ -100,7 +101,10 @@ public class NioDatagramChannelFactory implements DatagramChannelFactory { * See {@link #NioDatagramChannelFactory(Executor)} */ public NioDatagramChannelFactory(InternetProtocolFamily family) { - this(Executors.newCachedThreadPool(), family); + workerPool = new NioDatagramWorkerPool(Executors.newCachedThreadPool(), SelectorUtil.DEFAULT_IO_THREADS); + this.family = family; + sink = new NioDatagramPipelineSink(workerPool); + releasePool = true; } /** @@ -203,7 +207,19 @@ public class NioDatagramChannelFactory implements DatagramChannelFactory { return new NioDatagramChannel(this, pipeline, sink, sink.nextWorker(), family); } + public void shutdown() { + workerPool.shutdown(); + if (releasePool) { + releasePool(); + } + } + public void releaseExternalResources() { + shutdown(); + releasePool(); + } + + private void releasePool() { if (workerPool instanceof ExternalResourceReleasable) { ((ExternalResourceReleasable) workerPool).releaseExternalResources(); } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java index f3ca2f6371..e5071f42f1 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java @@ -17,6 +17,7 @@ package org.jboss.netty.channel.socket.nio; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBufferFactory; +import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelException; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.MessageEvent; @@ -108,26 +109,13 @@ public class NioDatagramWorker extends AbstractNioWorker { return true; } - @Override - public void releaseExternalResources() { - super.releaseExternalResources(); - bufferAllocator.releaseExternalResources(); - } - @Override protected boolean scheduleWriteIfNecessary(final AbstractNioChannel channel) { final Thread workerThread = thread; if (workerThread == null || Thread.currentThread() != workerThread) { if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { // "add" the channels writeTask to the writeTaskQueue. - taskQueue.add(channel.writeTask); - } - - final Selector selector = this.selector; - if (selector != null) { - if (wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } + registerTask(channel.writeTask); } return true; } @@ -159,7 +147,7 @@ public class NioDatagramWorker extends AbstractNioWorker { } @Override - protected Runnable createRegisterTask(AbstractNioChannel channel, ChannelFuture future) { + protected Runnable createRegisterTask(Channel channel, ChannelFuture future) { return new ChannelRegistionTask((NioDatagramChannel) channel, future); } @@ -350,4 +338,9 @@ public class NioDatagramWorker extends AbstractNioWorker { fireWriteComplete(channel, writtenBytes); } + @Override + public void run() { + super.run(); + bufferAllocator.releaseExternalResources(); + } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioSelector.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioSelector.java new file mode 100644 index 0000000000..e9534f0d59 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioSelector.java @@ -0,0 +1,34 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.socket.nio; + +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFuture; + + +public interface NioSelector extends Runnable { + + void register(Channel channel, ChannelFuture future); + + /** + * Replaces the current {@link java.nio.channels.Selector} with a + * new {@link java.nio.channels.Selector} to work around the infamous epoll 100% CPU + * bug. + */ + void rebuildSelector(); + + void shutdown(); +} diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioSelectorPool.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioSelectorPool.java new file mode 100644 index 0000000000..009223d9cb --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioSelectorPool.java @@ -0,0 +1,31 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.socket.nio; + + +public interface NioSelectorPool { + + /** + * Replaces the current {@link java.nio.channels.Selector}s of the {@link Boss}es with new + * {@link java.nio.channels.Selector}s to work around the infamous epoll 100% CPU bug. + */ + void rebuildSelectors(); + + /** + * Shutdown the {@link NioSelectorPool} and all internal created resources + */ + void shutdown(); +} diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerBoss.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerBoss.java index 154c46ffe6..f79c659b13 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerBoss.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerBoss.java @@ -15,15 +15,12 @@ */ package org.jboss.netty.channel.socket.nio; -import org.jboss.netty.channel.ChannelException; +import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelSink; -import org.jboss.netty.logging.InternalLogger; -import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.util.ThreadNameDeterminer; import org.jboss.netty.util.ThreadRenamingRunnable; -import org.jboss.netty.util.internal.DeadLockProofWorker; import java.io.IOException; import java.net.SocketAddress; @@ -33,102 +30,34 @@ import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; -import java.util.ConcurrentModificationException; import java.util.Iterator; -import java.util.Queue; import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import static org.jboss.netty.channel.Channels.*; /** * Boss implementation which handles accepting of new connections */ -public final class NioServerBoss implements Boss { - - private static final AtomicInteger nextId = new AtomicInteger(); - - static final InternalLogger logger = - InternalLoggerFactory.getInstance(NioServerBoss.class); - - private final int id = nextId.incrementAndGet(); - - private volatile Selector selector; - private volatile Thread thread; - - private final Executor bossExecutor; - /** - * Queue of channel registration tasks. - */ - private final Queue taskQueue = new ConcurrentLinkedQueue(); - - /** - * Monitor object used to synchronize selector open/close. - */ - private final Object startStopLock = new Object(); - - /** - * Boolean that controls determines if a blocked Selector.select should - * break out of its selection process. In our case we use a timeone for - * the select method and the select method will block for that time unless - * waken up. - */ - private final AtomicBoolean wakenUp = new AtomicBoolean(); - - private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation - static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization. +public final class NioServerBoss extends AbstractNioSelector implements Boss { NioServerBoss(Executor bossExecutor) { - this(bossExecutor, null); + super(bossExecutor); } NioServerBoss(Executor bossExecutor, ThreadNameDeterminer determiner) { - this.bossExecutor = bossExecutor; - openSelector(determiner); + super(bossExecutor, determiner); } void bind(final NioServerSocketChannel channel, final ChannelFuture future, final SocketAddress localAddress) { - synchronized (startStopLock) { - if (selector == null) { - // the selector was null this means the Worker has already been shutdown. - throw new RejectedExecutionException("Worker has already been shutdown"); - } + registerTask(new RegisterTask(channel, future, localAddress)); + } - boolean offered = taskQueue.offer(new Runnable() { - public void run() { - boolean bound = false; - boolean registered = false; - try { - channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog()); - bound = true; - - future.setSuccess(); - fireChannelBound(channel, channel.getLocalAddress()); - channel.socket.register(selector, SelectionKey.OP_ACCEPT, channel); - - registered = true; - } catch (Throwable t) { - future.setFailure(t); - fireExceptionCaught(channel, t); - } finally { - if (!registered && bound) { - close(channel, future); - } - } - } - }); - assert offered; - - if (wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - } + @Override + protected void close(SelectionKey k) { + NioServerSocketChannel ch = (NioServerSocketChannel) k.attachment(); + close(ch, succeededFuture(ch)); } void close(NioServerSocketChannel channel, ChannelFuture future) { @@ -136,7 +65,7 @@ public final class NioServerBoss implements Boss { try { channel.socket.close(); - cancelledKeys ++; + increaseCancelledKeys(); if (channel.setClosed()) { future.setSuccess(); @@ -154,144 +83,9 @@ public final class NioServerBoss implements Boss { } } - private void openSelector(ThreadNameDeterminer determiner) { - try { - selector = Selector.open(); - } catch (Throwable t) { - throw new ChannelException("Failed to create a selector.", t); - } - - // Start the worker thread with the new Selector. - boolean success = false; - try { - DeadLockProofWorker.start(bossExecutor, new ThreadRenamingRunnable(this, - "New I/O server boss #" + id, determiner)); - success = true; - } finally { - if (!success) { - // Release the Selector if the execution fails. - try { - selector.close(); - } catch (Throwable t) { - logger.warn("Failed to close a selector.", t); - } - selector = null; - // The method will return to the caller at this point. - } - } - assert selector != null && selector.isOpen(); - } - - public void run() { - thread = Thread.currentThread(); - boolean shutdown = false; - for (;;) { - wakenUp.set(false); - - try { - // Just do a blocking select without any timeout - // as this thread does not execute anything else. - selector.select(); - - // 'wakenUp.compareAndSet(false, true)' is always evaluated - // before calling 'selector.wakeup()' to reduce the wake-up - // overhead. (Selector.wakeup() is an expensive operation.) - // - // However, there is a race condition in this approach. - // The race condition is triggered when 'wakenUp' is set to - // true too early. - // - // 'wakenUp' is set to true too early if: - // 1) Selector is waken up between 'wakenUp.set(false)' and - // 'selector.select(...)'. (BAD) - // 2) Selector is waken up between 'selector.select(...)' and - // 'if (wakenUp.get()) { ... }'. (OK) - // - // In the first case, 'wakenUp' is set to true and the - // following 'selector.select(...)' will wake up immediately. - // Until 'wakenUp' is set to false again in the next round, - // 'wakenUp.compareAndSet(false, true)' will fail, and therefore - // any attempt to wake up the Selector will fail, too, causing - // the following 'selector.select(...)' call to block - // unnecessarily. - // - // To fix this problem, we wake up the selector again if wakenUp - // is true immediately after selector.select(...). - // It is inefficient in that it wakes up the selector for both - // the first case (BAD - wake-up required) and the second case - // (OK - no wake-up required). - if (wakenUp.get()) { - selector.wakeup(); - } - processTaskQueue(); - selector = this.selector; // processTaskQueue() can call rebuildSelector() - - processSelectedKeys(selector.selectedKeys()); - - // Exit the loop when there's nothing to handle. - // The shutdown flag is used to delay the shutdown of this - // loop to avoid excessive Selector creation when - // connections are registered in a one-by-one manner instead of - // concurrent manner. - if (selector.keys().isEmpty()) { - if (shutdown || bossExecutor instanceof ExecutorService && - ((ExecutorService) bossExecutor).isShutdown()) { - - synchronized (startStopLock) { - if (selector.keys().isEmpty()) { - try { - selector.close(); - } catch (IOException e) { - logger.warn( - "Failed to close a selector.", e); - } finally { - selector = null; - } - break; - } else { - shutdown = false; - } - } - } - } else { - shutdown = false; - } - } catch (Throwable e) { - if (logger.isWarnEnabled()) { - logger.warn( - "Failed to accept a connection.", e); - } - - try { - Thread.sleep(1000); - } catch (InterruptedException e1) { - // Ignore - } - } - } - } - - private void processTaskQueue() throws IOException { - for (;;) { - final Runnable task = taskQueue.poll(); - if (task == null) { - break; - } - - task.run(); - cleanUpCancelledKeys(); - } - } - private boolean cleanUpCancelledKeys() throws IOException { - if (cancelledKeys >= CLEANUP_INTERVAL) { - cancelledKeys = 0; - selector.selectNow(); - return true; - } - return false; - } - - private void processSelectedKeys(Set selectedKeys) { + @Override + protected void process(Selector selector) { + Set selectedKeys = selector.selectedKeys(); if (selectedKeys.isEmpty()) { return; } @@ -362,73 +156,56 @@ public final class NioServerBoss implements Boss { } } - public void rebuildSelector() { - if (Thread.currentThread() != thread) { - Selector selector = this.selector; - if (selector != null) { - taskQueue.add(new Runnable() { - public void run() { - rebuildSelector(); - } - }); - selector.wakeup(); - } - return; + @Override + protected int select(Selector selector) throws IOException { + // Just do a blocking select without any timeout + // as this thread does not execute anything else. + return selector.select(); + } + + @Override + protected ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner) { + return new ThreadRenamingRunnable(this, + "New I/O server boss #" + id, determiner); + } + + @Override + protected Runnable createRegisterTask(Channel channel, ChannelFuture future) { + return new RegisterTask((NioServerSocketChannel) channel, future, null); + } + + private final class RegisterTask implements Runnable { + private final NioServerSocketChannel channel; + private final ChannelFuture future; + private final SocketAddress localAddress; + + public RegisterTask(final NioServerSocketChannel channel, final ChannelFuture future, + final SocketAddress localAddress) { + this.channel = channel; + this.future = future; + this.localAddress = localAddress; } - final Selector oldSelector = selector; - final Selector newSelector; - - if (oldSelector == null) { - return; - } - - try { - newSelector = Selector.open(); - } catch (Exception e) { - logger.warn("Failed to create a new Selector.", e); - return; - } - - // Register all channels to the new Selector. - int nChannels = 0; - for (;;) { + public void run() { + boolean bound = false; + boolean registered = false; try { - for (SelectionKey key: oldSelector.keys()) { - try { - if (key.channel().keyFor(newSelector) != null) { - continue; - } + channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog()); + bound = true; - int interestOps = key.interestOps(); - key.cancel(); - key.channel().register(newSelector, interestOps, key.attachment()); - nChannels ++; - } catch (Exception e) { - logger.warn("Failed to re-register a Channel to the new Selector,", e); - NioServerSocketChannel ch = (NioServerSocketChannel) key.attachment(); - close(ch, succeededFuture(ch)); - } + future.setSuccess(); + fireChannelBound(channel, channel.getLocalAddress()); + channel.socket.register(selector, SelectionKey.OP_ACCEPT, channel); + + registered = true; + } catch (Throwable t) { + future.setFailure(t); + fireExceptionCaught(channel, t); + } finally { + if (!registered && bound) { + close(channel, future); } - } catch (ConcurrentModificationException e) { - // Probably due to concurrent modification of the key set. - continue; - } - - break; - } - - selector = newSelector; - - try { - // time to close the old selector as everything else is registered to the new one - oldSelector.close(); - } catch (Throwable t) { - if (logger.isWarnEnabled()) { - logger.warn("Failed to close the old Selector.", t); } } - - logger.info("Migrated " + nChannels + " channel(s) to the new Selector,"); } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketChannelFactory.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketChannelFactory.java index e9204aa6a0..c17a474b70 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketChannelFactory.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketChannelFactory.java @@ -86,6 +86,7 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory private final WorkerPool workerPool; private final NioServerSocketPipelineSink sink; private final BossPool bossPool; + private boolean releasePools; /** * Create a new {@link NioServerSocketChannelFactory} using {@link Executors#newCachedThreadPool()} @@ -95,6 +96,7 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory */ public NioServerSocketChannelFactory() { this(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); + releasePools = true; } /** @@ -194,7 +196,6 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory if (workerPool == null) { throw new NullPointerException("workerPool"); } - this.bossPool = bossPool; this.workerPool = workerPool; sink = new NioServerSocketPipelineSink(); @@ -204,7 +205,20 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory return new NioServerSocketChannel(this, pipeline, sink, bossPool.nextBoss(), workerPool); } + public void shutdown() { + bossPool.shutdown(); + workerPool.shutdown(); + if (releasePools) { + releasePools(); + } + } + public void releaseExternalResources() { + shutdown(); + releasePools(); + } + + private void releasePools() { if (bossPool instanceof ExternalResourceReleasable) { ((ExternalResourceReleasable) bossPool).releaseExternalResources(); } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java index 85a7e5e3b6..b48b7589c4 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java @@ -37,7 +37,7 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink { } } - private void handleServerSocket(ChannelEvent e) { + private static void handleServerSocket(ChannelEvent e) { if (!(e instanceof ChannelStateEvent)) { return; } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java index 6701a127ab..6488213998 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java @@ -17,6 +17,7 @@ package org.jboss.netty.channel.socket.nio; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBufferFactory; +import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelException; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ReceiveBufferSizePredictor; @@ -27,7 +28,6 @@ import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.concurrent.Executor; @@ -103,27 +103,7 @@ public class NioWorker extends AbstractNioWorker { final Thread workerThread = thread; if (currentThread != workerThread) { if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { - taskQueue.add(channel.writeTask); - } - - if (!(channel instanceof NioAcceptedSocketChannel) || - ((NioAcceptedSocketChannel) channel).bossThread != currentThread) { - final Selector workerSelector = selector; - if (workerSelector != null) { - if (wakenUp.compareAndSet(false, true)) { - workerSelector.wakeup(); - } - } - } else { - // A write request can be made from an acceptor thread (boss) - // when a user attempted to write something in: - // - // * channelOpen() - // * channelBound() - // * channelConnected(). - // - // In this case, there's no need to wake up the selector because - // the channel is not even registered yet at this moment. + registerTask(channel.writeTask); } return true; @@ -133,13 +113,7 @@ public class NioWorker extends AbstractNioWorker { } @Override - public void releaseExternalResources() { - super.releaseExternalResources(); - recvBufferPool.releaseExternalResources(); - } - - @Override - protected Runnable createRegisterTask(AbstractNioChannel channel, ChannelFuture future) { + protected Runnable createRegisterTask(Channel channel, ChannelFuture future) { boolean server = !(channel instanceof NioClientSocketChannel); return new RegisterTask((NioSocketChannel) channel, future, server); } @@ -198,4 +172,10 @@ public class NioWorker extends AbstractNioWorker { } } } + + @Override + public void run() { + super.run(); + recvBufferPool.releaseExternalResources(); + } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/ShareableWorkerPool.java b/src/main/java/org/jboss/netty/channel/socket/nio/ShareableWorkerPool.java index 84c16543c8..e73ccffb7b 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/ShareableWorkerPool.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/ShareableWorkerPool.java @@ -45,8 +45,13 @@ public final class ShareableWorkerPool implements WorkerPool { +public interface WorkerPool extends NioSelectorPool { /** * Return the next {@link Worker} to use @@ -32,10 +30,4 @@ public interface WorkerPool { * @return worker */ E nextWorker(); - - /** - * Replaces the current {@link Selector}s of the {@link Worker}s with new {@link Selector}s to work around the - * infamous epoll 100% CPU bug. - */ - void rebuildSelectors(); } diff --git a/src/main/java/org/jboss/netty/channel/socket/oio/AbstractOioWorker.java b/src/main/java/org/jboss/netty/channel/socket/oio/AbstractOioWorker.java index 4074cfe252..7c61f988cd 100644 --- a/src/main/java/org/jboss/netty/channel/socket/oio/AbstractOioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/oio/AbstractOioWorker.java @@ -53,7 +53,6 @@ abstract class AbstractOioWorker implements Worker public void run() { thread = channel.workerThread = Thread.currentThread(); - while (channel.isOpen()) { synchronized (channel.interestOpsLock) { while (!channel.isReadable()) { @@ -106,10 +105,6 @@ abstract class AbstractOioWorker implements Worker processEventQueue(); } - public void rebuildSelector() { - // OIO has no selector. - } - static boolean isIoThread(AbstractOioChannel channel) { return Thread.currentThread() == channel.workerThread; } diff --git a/src/main/java/org/jboss/netty/channel/socket/oio/OioClientSocketChannelFactory.java b/src/main/java/org/jboss/netty/channel/socket/oio/OioClientSocketChannelFactory.java index 80e4052707..79d085a2c7 100644 --- a/src/main/java/org/jboss/netty/channel/socket/oio/OioClientSocketChannelFactory.java +++ b/src/main/java/org/jboss/netty/channel/socket/oio/OioClientSocketChannelFactory.java @@ -76,6 +76,7 @@ public class OioClientSocketChannelFactory implements ClientSocketChannelFactory private final Executor workerExecutor; final OioClientSocketPipelineSink sink; + private boolean shutdownExecutor; /** * Creates a new instance with a {@link Executors#newCachedThreadPool()} as worker executor. @@ -84,6 +85,7 @@ public class OioClientSocketChannelFactory implements ClientSocketChannelFactory */ public OioClientSocketChannelFactory() { this(Executors.newCachedThreadPool()); + shutdownExecutor = true; } /** @@ -104,7 +106,14 @@ public class OioClientSocketChannelFactory implements ClientSocketChannelFactory return new OioClientSocketChannel(this, pipeline, sink); } + public void shutdown() { + if (shutdownExecutor) { + ExecutorUtil.terminate(workerExecutor); + } + } + public void releaseExternalResources() { + shutdown(); ExecutorUtil.terminate(workerExecutor); } } diff --git a/src/main/java/org/jboss/netty/channel/socket/oio/OioDatagramChannelFactory.java b/src/main/java/org/jboss/netty/channel/socket/oio/OioDatagramChannelFactory.java index 98e002f367..a72c04088b 100644 --- a/src/main/java/org/jboss/netty/channel/socket/oio/OioDatagramChannelFactory.java +++ b/src/main/java/org/jboss/netty/channel/socket/oio/OioDatagramChannelFactory.java @@ -75,6 +75,7 @@ public class OioDatagramChannelFactory implements DatagramChannelFactory { private final Executor workerExecutor; final OioDatagramPipelineSink sink; + private boolean shutdownExecutor; /** * Creates a new instance with a {@link Executors#newCachedThreadPool()} @@ -83,6 +84,7 @@ public class OioDatagramChannelFactory implements DatagramChannelFactory { */ public OioDatagramChannelFactory() { this(Executors.newCachedThreadPool()); + shutdownExecutor = true; } /** @@ -103,7 +105,14 @@ public class OioDatagramChannelFactory implements DatagramChannelFactory { return new OioDatagramChannel(this, pipeline, sink); } + public void shutdown() { + if (shutdownExecutor) { + ExecutorUtil.terminate(workerExecutor); + } + } + public void releaseExternalResources() { + shutdown(); ExecutorUtil.terminate(workerExecutor); } } diff --git a/src/main/java/org/jboss/netty/channel/socket/oio/OioServerSocketChannelFactory.java b/src/main/java/org/jboss/netty/channel/socket/oio/OioServerSocketChannelFactory.java index 2e74fe4e16..38e32c8dbd 100644 --- a/src/main/java/org/jboss/netty/channel/socket/oio/OioServerSocketChannelFactory.java +++ b/src/main/java/org/jboss/netty/channel/socket/oio/OioServerSocketChannelFactory.java @@ -89,6 +89,7 @@ public class OioServerSocketChannelFactory implements ServerSocketChannelFactory final Executor bossExecutor; private final Executor workerExecutor; private final ChannelSink sink; + private boolean shutdownExecutor; /** * Create a new {@link OioServerSocketChannelFactory} with a {@link Executors#newCachedThreadPool()} @@ -98,6 +99,7 @@ public class OioServerSocketChannelFactory implements ServerSocketChannelFactory */ public OioServerSocketChannelFactory() { this(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); + this.shutdownExecutor = true; } /** @@ -125,7 +127,14 @@ public class OioServerSocketChannelFactory implements ServerSocketChannelFactory return new OioServerSocketChannel(this, pipeline, sink); } + public void shutdown() { + if (shutdownExecutor) { + ExecutorUtil.terminate(workerExecutor); + } + } + public void releaseExternalResources() { - ExecutorUtil.terminate(bossExecutor, workerExecutor); + shutdown(); + ExecutorUtil.terminate(workerExecutor); } } diff --git a/src/main/java/org/jboss/netty/handler/codec/embedder/EmbeddedChannelFactory.java b/src/main/java/org/jboss/netty/handler/codec/embedder/EmbeddedChannelFactory.java index 4bc78296ed..9b91d03376 100644 --- a/src/main/java/org/jboss/netty/handler/codec/embedder/EmbeddedChannelFactory.java +++ b/src/main/java/org/jboss/netty/handler/codec/embedder/EmbeddedChannelFactory.java @@ -35,4 +35,8 @@ final class EmbeddedChannelFactory implements ChannelFactory { public void releaseExternalResources() { // No external resources } + + public void shutdown() { + // Nothing to shutdown + } } diff --git a/src/test/java/org/jboss/netty/bootstrap/AbstractSocketClientBootstrapTest.java b/src/test/java/org/jboss/netty/bootstrap/AbstractSocketClientBootstrapTest.java index 087a338a77..a6530d064f 100644 --- a/src/test/java/org/jboss/netty/bootstrap/AbstractSocketClientBootstrapTest.java +++ b/src/test/java/org/jboss/netty/bootstrap/AbstractSocketClientBootstrapTest.java @@ -22,18 +22,15 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.ServerSocketChannel; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; + import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelPipelineException; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.util.DummyHandler; import org.jboss.netty.util.TestUtil; -import org.jboss.netty.util.internal.ExecutorUtil; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; @@ -42,43 +39,32 @@ import org.junit.Test; */ public abstract class AbstractSocketClientBootstrapTest { - private static ExecutorService executor; - - @BeforeClass - public static void init() { - executor = Executors.newCachedThreadPool(); - } - - @AfterClass - public static void destroy() { - ExecutorUtil.terminate(executor); - } - protected abstract ChannelFactory newClientSocketChannelFactory(Executor executor); @Test(timeout = 10000) public void testFailedConnectionAttempt() throws Exception { ClientBootstrap bootstrap = new ClientBootstrap(); - bootstrap.setFactory(newClientSocketChannelFactory(executor)); + bootstrap.setFactory(newClientSocketChannelFactory(Executors.newCachedThreadPool())); bootstrap.getPipeline().addLast("dummy", new DummyHandler()); bootstrap.setOption("remoteAddress", new InetSocketAddress("255.255.255.255", 1)); ChannelFuture future = bootstrap.connect(); future.awaitUninterruptibly(); assertFalse(future.isSuccess()); assertTrue(future.getCause() instanceof IOException); + bootstrap.releaseExternalResources(); } @Test(timeout = 10000) public void testSuccessfulConnectionAttempt() throws Throwable { + ClientBootstrap bootstrap = + new ClientBootstrap(newClientSocketChannelFactory(Executors.newCachedThreadPool())); + ServerSocketChannel serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind(new InetSocketAddress(0)); try { serverSocket.configureBlocking(false); - ClientBootstrap bootstrap = - new ClientBootstrap(newClientSocketChannelFactory(executor)); - bootstrap.getPipeline().addLast("dummy", new DummyHandler()); bootstrap.setOption( "remoteAddress", @@ -102,19 +88,22 @@ public abstract class AbstractSocketClientBootstrapTest { } catch (IOException e) { // Ignore. } + bootstrap.shutdown(); + bootstrap.releaseExternalResources(); } } @Test(timeout = 10000) public void testSuccessfulConnectionAttemptWithLocalAddress() throws Throwable { + ClientBootstrap bootstrap = + new ClientBootstrap(newClientSocketChannelFactory(Executors.newCachedThreadPool())); + ServerSocketChannel serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind(new InetSocketAddress(0)); try { serverSocket.configureBlocking(false); - ClientBootstrap bootstrap = - new ClientBootstrap(newClientSocketChannelFactory(executor)); bootstrap.getPipeline().addLast("dummy", new DummyHandler()); bootstrap.setOption( @@ -140,6 +129,8 @@ public abstract class AbstractSocketClientBootstrapTest { } catch (IOException e) { // Ignore. } + bootstrap.shutdown(); + bootstrap.releaseExternalResources(); } } @@ -152,7 +143,10 @@ public abstract class AbstractSocketClientBootstrapTest { expect(pipelineFactory.getPipeline()).andThrow(new ChannelPipelineException()); replay(pipelineFactory); - bootstrap.connect(new InetSocketAddress(TestUtil.getLocalHost(), 1)); + ChannelFuture future = bootstrap.connect(new InetSocketAddress(TestUtil.getLocalHost(), 1)); + future.awaitUninterruptibly(); + bootstrap.shutdown(); + bootstrap.releaseExternalResources(); } @Test(expected = IllegalStateException.class) diff --git a/src/test/java/org/jboss/netty/bootstrap/AbstractSocketServerBootstrapTest.java b/src/test/java/org/jboss/netty/bootstrap/AbstractSocketServerBootstrapTest.java index d44961b514..0ca97fb1ff 100644 --- a/src/test/java/org/jboss/netty/bootstrap/AbstractSocketServerBootstrapTest.java +++ b/src/test/java/org/jboss/netty/bootstrap/AbstractSocketServerBootstrapTest.java @@ -23,7 +23,6 @@ import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.jboss.netty.channel.Channel; @@ -38,9 +37,6 @@ import org.jboss.netty.channel.SimpleChannelUpstreamHandler; import org.jboss.netty.channel.socket.SocketChannelConfig; import org.jboss.netty.util.DummyHandler; import org.jboss.netty.util.TestUtil; -import org.jboss.netty.util.internal.ExecutorUtil; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; @@ -80,38 +76,28 @@ public abstract class AbstractSocketServerBootstrapTest { } } - private static ExecutorService executor; - - @BeforeClass - public static void init() { - executor = Executors.newCachedThreadPool(); - } - - @AfterClass - public static void destroy() { - ExecutorUtil.terminate(executor); - } - protected abstract ChannelFactory newServerSocketChannelFactory(Executor executor); @Test(timeout = 30000, expected = ChannelException.class) public void testFailedBindAttempt() throws Exception { + ServerBootstrap bootstrap = new ServerBootstrap(); + final ServerSocket ss = new ServerSocket(0); final int boundPort = ss.getLocalPort(); try { - ServerBootstrap bootstrap = new ServerBootstrap(); - bootstrap.setFactory(newServerSocketChannelFactory(executor)); + bootstrap.setFactory(newServerSocketChannelFactory(Executors.newCachedThreadPool())); bootstrap.setOption("localAddress", new InetSocketAddress(boundPort)); bootstrap.bind().close().awaitUninterruptibly(); } finally { ss.close(); + bootstrap.releaseExternalResources(); } } @Test(timeout = 30000) public void testSuccessfulBindAttempt() throws Exception { ServerBootstrap bootstrap = new ServerBootstrap( - newServerSocketChannelFactory(executor)); + newServerSocketChannelFactory(Executors.newCachedThreadPool())); bootstrap.setParentHandler(new ParentChannelHandler()); bootstrap.setOption("localAddress", new InetSocketAddress(0)); @@ -172,6 +158,7 @@ public abstract class AbstractSocketServerBootstrapTest { // Confirm the received child events. assertEquals("12", pch.result.toString()); + bootstrap.releaseExternalResources(); } @Test(expected = ChannelPipelineException.class) @@ -184,6 +171,7 @@ public abstract class AbstractSocketServerBootstrapTest { replay(pipelineFactory); bootstrap.connect(new InetSocketAddress(TestUtil.getLocalHost(), 1)); + bootstrap.releaseExternalResources(); } @Test(expected = IllegalStateException.class) diff --git a/src/test/java/org/jboss/netty/bootstrap/BootstrapTest.java b/src/test/java/org/jboss/netty/bootstrap/BootstrapTest.java index 71adf1c67e..2e59a1f12a 100644 --- a/src/test/java/org/jboss/netty/bootstrap/BootstrapTest.java +++ b/src/test/java/org/jboss/netty/bootstrap/BootstrapTest.java @@ -60,6 +60,7 @@ public class BootstrapTest { } catch (IllegalStateException e) { // Success. } + b.releaseExternalResources(); } @Test(expected = NullPointerException.class) @@ -93,6 +94,7 @@ public class BootstrapTest { ChannelPipelineFactory oldPipelineFactory = b.getPipelineFactory(); b.setPipeline(createMock(ChannelPipeline.class)); assertNotSame(oldPipelineFactory, b.getPipelineFactory()); + b.releaseExternalResources(); } @Test(expected = IllegalStateException.class) @@ -100,6 +102,7 @@ public class BootstrapTest { Bootstrap b = new Bootstrap(); b.setPipelineFactory(createMock(ChannelPipelineFactory.class)); b.getPipeline(); + b.releaseExternalResources(); } @Test(expected = IllegalStateException.class) @@ -107,6 +110,7 @@ public class BootstrapTest { Bootstrap b = new Bootstrap(); b.setPipelineFactory(createMock(ChannelPipelineFactory.class)); b.getPipelineAsMap(); + b.releaseExternalResources(); } @Test(expected = NullPointerException.class) @@ -145,6 +149,7 @@ public class BootstrapTest { assertSame(p.get("d"), e.getValue()); assertFalse(m.hasNext()); + b.releaseExternalResources(); } @Test(expected = IllegalArgumentException.class) @@ -157,6 +162,7 @@ public class BootstrapTest { Bootstrap b = new Bootstrap(); b.setPipelineAsMap(m); + b.releaseExternalResources(); } @Test @@ -191,6 +197,7 @@ public class BootstrapTest { } catch (NoSuchElementException e) { // Success. } + b.releaseExternalResources(); } @Test @@ -210,6 +217,7 @@ public class BootstrapTest { assertEquals("x", o.get("s")); assertEquals(true, o.get("b")); assertEquals(42, o.get("i")); + b.releaseExternalResources(); } @Test @@ -229,6 +237,7 @@ public class BootstrapTest { assertNotSame(o1, o2); assertEquals(o1, o2); + b.releaseExternalResources(); } @Test @@ -241,6 +250,7 @@ public class BootstrapTest { b.setOption("s", null); assertNull(b.getOption("s")); assertTrue(b.getOptions().isEmpty()); + b.releaseExternalResources(); } @Test(expected = NullPointerException.class) diff --git a/src/test/java/org/jboss/netty/channel/socket/AbstractDatagramMulticastTest.java b/src/test/java/org/jboss/netty/channel/socket/AbstractDatagramMulticastTest.java index 5de24fe0d1..3df461e2b3 100644 --- a/src/test/java/org/jboss/netty/channel/socket/AbstractDatagramMulticastTest.java +++ b/src/test/java/org/jboss/netty/channel/socket/AbstractDatagramMulticastTest.java @@ -24,7 +24,6 @@ import java.net.SocketException; import java.util.Enumeration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -36,34 +35,22 @@ import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; import org.jboss.netty.util.TestUtil; -import org.jboss.netty.util.internal.ExecutorUtil; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; public abstract class AbstractDatagramMulticastTest { - private static ExecutorService executor; - @BeforeClass - public static void init() { - executor = Executors.newCachedThreadPool(); - } - - @AfterClass - public static void destroy() { - ExecutorUtil.terminate(executor); - } - protected abstract DatagramChannelFactory newServerSocketChannelFactory(Executor executor); protected abstract DatagramChannelFactory newClientSocketChannelFactory(Executor executor); @Test public void testMulticast() throws Throwable { - ConnectionlessBootstrap sb = new ConnectionlessBootstrap(newServerSocketChannelFactory(executor)); - ConnectionlessBootstrap cb = new ConnectionlessBootstrap(newClientSocketChannelFactory(executor)); + ConnectionlessBootstrap sb = new ConnectionlessBootstrap( + newServerSocketChannelFactory(Executors.newCachedThreadPool())); + ConnectionlessBootstrap cb = new ConnectionlessBootstrap( + newClientSocketChannelFactory(Executors.newCachedThreadPool())); DatagramChannel sc = null; DatagramChannel cc = null; try { diff --git a/src/test/java/org/jboss/netty/channel/socket/AbstractDatagramTest.java b/src/test/java/org/jboss/netty/channel/socket/AbstractDatagramTest.java index 99dac4e795..a507e1c6fa 100644 --- a/src/test/java/org/jboss/netty/channel/socket/AbstractDatagramTest.java +++ b/src/test/java/org/jboss/netty/channel/socket/AbstractDatagramTest.java @@ -38,26 +38,16 @@ import org.junit.Test; public abstract class AbstractDatagramTest { - private static ExecutorService executor; - - - @BeforeClass - public static void init() { - executor = Executors.newCachedThreadPool(); - } - - @AfterClass - public static void destroy() { - ExecutorUtil.terminate(executor); - } protected abstract DatagramChannelFactory newServerSocketChannelFactory(Executor executor); protected abstract DatagramChannelFactory newClientSocketChannelFactory(Executor executor); @Test public void testSimpleSend() throws Throwable { - ConnectionlessBootstrap sb = new ConnectionlessBootstrap(newServerSocketChannelFactory(executor)); - ConnectionlessBootstrap cb = new ConnectionlessBootstrap(newClientSocketChannelFactory(executor)); + ConnectionlessBootstrap sb = new ConnectionlessBootstrap( + newServerSocketChannelFactory(Executors.newCachedThreadPool())); + ConnectionlessBootstrap cb = new ConnectionlessBootstrap( + newClientSocketChannelFactory(Executors.newCachedThreadPool())); final CountDownLatch latch = new CountDownLatch(1); sb.getPipeline().addFirst("handler", new SimpleChannelUpstreamHandler() { @@ -83,6 +73,9 @@ public abstract class AbstractDatagramTest { assertTrue(latch.await(10, TimeUnit.SECONDS)); sc.close().awaitUninterruptibly(); cc.close().awaitUninterruptibly(); - + cb.shutdown(); + sb.shutdown(); + cb.releaseExternalResources(); + sb.releaseExternalResources(); } } diff --git a/src/test/java/org/jboss/netty/channel/socket/AbstractSocketEchoTest.java b/src/test/java/org/jboss/netty/channel/socket/AbstractSocketEchoTest.java index 0411078f8a..15598ab4db 100644 --- a/src/test/java/org/jboss/netty/channel/socket/AbstractSocketEchoTest.java +++ b/src/test/java/org/jboss/netty/channel/socket/AbstractSocketEchoTest.java @@ -124,6 +124,11 @@ public abstract class AbstractSocketEchoTest { sh.channel.close().awaitUninterruptibly(); ch.channel.close().awaitUninterruptibly(); sc.close().awaitUninterruptibly(); + cb.shutdown(); + sb.shutdown(); + + cb.releaseExternalResources(); + sb.releaseExternalResources(); if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) { throw sh.exception.get(); @@ -137,6 +142,7 @@ public abstract class AbstractSocketEchoTest { if (ch.exception.get() != null) { throw ch.exception.get(); } + } private static class EchoHandler extends SimpleChannelUpstreamHandler { diff --git a/src/test/java/org/jboss/netty/channel/socket/NioClientSocketShutdownTimeTest.java b/src/test/java/org/jboss/netty/channel/socket/NioClientSocketShutdownTimeTest.java index 26547a4aac..55ab6a2a5d 100644 --- a/src/test/java/org/jboss/netty/channel/socket/NioClientSocketShutdownTimeTest.java +++ b/src/test/java/org/jboss/netty/channel/socket/NioClientSocketShutdownTimeTest.java @@ -74,7 +74,7 @@ public class NioClientSocketShutdownTimeTest { // Ignore. } } - + b.releaseExternalResources(); long shutdownTime = stopTime - startTime; assertTrue("Shutdown takes too long: " + shutdownTime + " ms", shutdownTime < 500); } diff --git a/src/test/java/org/jboss/netty/channel/socket/nio/AbstractNioWorkerTest.java b/src/test/java/org/jboss/netty/channel/socket/nio/AbstractNioWorkerTest.java index 8a27a342ad..d1cea86ee3 100644 --- a/src/test/java/org/jboss/netty/channel/socket/nio/AbstractNioWorkerTest.java +++ b/src/test/java/org/jboss/netty/channel/socket/nio/AbstractNioWorkerTest.java @@ -32,7 +32,7 @@ public abstract class AbstractNioWorkerTest { @Test public void testShutdownWorkerThrowsException() throws InterruptedException { - AbstractNioChannel mockChannel = createMock(AbstractNioChannel.class); + AbstractNioChannel mockChannel = createMockChannel(); replay(mockChannel); ChannelFuture mockFuture = createMock(ChannelFuture.class); @@ -41,8 +41,7 @@ public abstract class AbstractNioWorkerTest { ExecutorService executor = Executors.newCachedThreadPool(); AbstractNioWorker worker = createWorker(executor); - - executor.shutdownNow(); + worker.shutdown(); // give the Selector time to detect the shutdown Thread.sleep(SelectorUtil.DEFAULT_SELECT_TIMEOUT * 10); @@ -57,4 +56,6 @@ public abstract class AbstractNioWorkerTest { } protected abstract AbstractNioWorker createWorker(Executor executor); + + protected abstract AbstractNioChannel createMockChannel(); } diff --git a/src/test/java/org/jboss/netty/channel/socket/nio/NioDatagramWorkerTest.java b/src/test/java/org/jboss/netty/channel/socket/nio/NioDatagramWorkerTest.java index b8fe92369d..d2d7818a4c 100644 --- a/src/test/java/org/jboss/netty/channel/socket/nio/NioDatagramWorkerTest.java +++ b/src/test/java/org/jboss/netty/channel/socket/nio/NioDatagramWorkerTest.java @@ -15,6 +15,8 @@ */ package org.jboss.netty.channel.socket.nio; +import org.easymock.EasyMock; + import java.util.concurrent.Executor; public class NioDatagramWorkerTest extends AbstractNioWorkerTest { @@ -23,4 +25,9 @@ public class NioDatagramWorkerTest extends AbstractNioWorkerTest { protected AbstractNioWorker createWorker(Executor executor) { return new NioDatagramWorker(executor); } + + @Override + protected AbstractNioChannel createMockChannel() { + return EasyMock.createMock(NioDatagramChannel.class); + } } diff --git a/src/test/java/org/jboss/netty/channel/socket/nio/NioWorkerTest.java b/src/test/java/org/jboss/netty/channel/socket/nio/NioWorkerTest.java index 0c11543aea..2cde51dcec 100644 --- a/src/test/java/org/jboss/netty/channel/socket/nio/NioWorkerTest.java +++ b/src/test/java/org/jboss/netty/channel/socket/nio/NioWorkerTest.java @@ -15,6 +15,8 @@ */ package org.jboss.netty.channel.socket.nio; +import org.easymock.EasyMock; + import java.util.concurrent.Executor; public class NioWorkerTest extends AbstractNioWorkerTest { @@ -23,4 +25,9 @@ public class NioWorkerTest extends AbstractNioWorkerTest { protected AbstractNioWorker createWorker(Executor executor) { return new NioWorker(executor); } + + @Override + protected AbstractNioChannel createMockChannel() { + return EasyMock.createMock(NioSocketChannel.class); + } } diff --git a/src/test/java/org/jboss/netty/handler/codec/frame/AbstractSocketFixedLengthEchoTest.java b/src/test/java/org/jboss/netty/handler/codec/frame/AbstractSocketFixedLengthEchoTest.java index 2484610bad..e95766ead1 100644 --- a/src/test/java/org/jboss/netty/handler/codec/frame/AbstractSocketFixedLengthEchoTest.java +++ b/src/test/java/org/jboss/netty/handler/codec/frame/AbstractSocketFixedLengthEchoTest.java @@ -123,6 +123,11 @@ public abstract class AbstractSocketFixedLengthEchoTest { sh.channel.close().awaitUninterruptibly(); ch.channel.close().awaitUninterruptibly(); sc.close().awaitUninterruptibly(); + cb.shutdown(); + sb.shutdown(); + + cb.releaseExternalResources(); + sb.releaseExternalResources(); if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) { throw sh.exception.get(); diff --git a/src/test/java/org/jboss/netty/handler/codec/serialization/AbstractSocketCompatibleObjectStreamEchoTest.java b/src/test/java/org/jboss/netty/handler/codec/serialization/AbstractSocketCompatibleObjectStreamEchoTest.java index 28519ce9a0..0ff16337de 100644 --- a/src/test/java/org/jboss/netty/handler/codec/serialization/AbstractSocketCompatibleObjectStreamEchoTest.java +++ b/src/test/java/org/jboss/netty/handler/codec/serialization/AbstractSocketCompatibleObjectStreamEchoTest.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.Random; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; @@ -36,9 +35,6 @@ import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; import org.jboss.netty.util.TestUtil; -import org.jboss.netty.util.internal.ExecutorUtil; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; public abstract class AbstractSocketCompatibleObjectStreamEchoTest { @@ -46,7 +42,6 @@ public abstract class AbstractSocketCompatibleObjectStreamEchoTest { static final Random random = new Random(); static final String[] data = new String[1024]; - private static ExecutorService executor; static { for (int i = 0; i < data.length; i ++) { @@ -60,15 +55,6 @@ public abstract class AbstractSocketCompatibleObjectStreamEchoTest { } } - @BeforeClass - public static void init() { - executor = Executors.newCachedThreadPool(); - } - - @AfterClass - public static void destroy() { - ExecutorUtil.terminate(executor); - } protected abstract ChannelFactory newServerSocketChannelFactory(Executor executor); protected abstract ChannelFactory newClientSocketChannelFactory(Executor executor); @@ -76,8 +62,8 @@ public abstract class AbstractSocketCompatibleObjectStreamEchoTest { @Test @SuppressWarnings("deprecation") public void testCompatibleObjectEcho() throws Throwable { - ServerBootstrap sb = new ServerBootstrap(newServerSocketChannelFactory(executor)); - ClientBootstrap cb = new ClientBootstrap(newClientSocketChannelFactory(executor)); + ServerBootstrap sb = new ServerBootstrap(newServerSocketChannelFactory(Executors.newCachedThreadPool())); + ClientBootstrap cb = new ClientBootstrap(newClientSocketChannelFactory(Executors.newCachedThreadPool())); EchoHandler sh = new EchoHandler(); EchoHandler ch = new EchoHandler(); @@ -134,6 +120,10 @@ public abstract class AbstractSocketCompatibleObjectStreamEchoTest { sh.channel.close().awaitUninterruptibly(); ch.channel.close().awaitUninterruptibly(); sc.close().awaitUninterruptibly(); + cb.shutdown(); + sb.shutdown(); + cb.releaseExternalResources(); + sb.releaseExternalResources(); if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) { throw sh.exception.get(); diff --git a/src/test/java/org/jboss/netty/handler/codec/serialization/AbstractSocketObjectStreamEchoTest.java b/src/test/java/org/jboss/netty/handler/codec/serialization/AbstractSocketObjectStreamEchoTest.java index 4f41b84773..9bb845beb0 100644 --- a/src/test/java/org/jboss/netty/handler/codec/serialization/AbstractSocketObjectStreamEchoTest.java +++ b/src/test/java/org/jboss/netty/handler/codec/serialization/AbstractSocketObjectStreamEchoTest.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.Random; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; @@ -36,9 +35,6 @@ import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; import org.jboss.netty.util.TestUtil; -import org.jboss.netty.util.internal.ExecutorUtil; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; public abstract class AbstractSocketObjectStreamEchoTest { @@ -46,8 +42,6 @@ public abstract class AbstractSocketObjectStreamEchoTest { static final Random random = new Random(); static final String[] data = new String[1024]; - private static ExecutorService executor; - static { for (int i = 0; i < data.length; i ++) { int eLen = random.nextInt(512); @@ -60,23 +54,14 @@ public abstract class AbstractSocketObjectStreamEchoTest { } } - @BeforeClass - public static void init() { - executor = Executors.newCachedThreadPool(); - } - - @AfterClass - public static void destroy() { - ExecutorUtil.terminate(executor); - } protected abstract ChannelFactory newServerSocketChannelFactory(Executor executor); protected abstract ChannelFactory newClientSocketChannelFactory(Executor executor); @Test public void testObjectEcho() throws Throwable { - ServerBootstrap sb = new ServerBootstrap(newServerSocketChannelFactory(executor)); - ClientBootstrap cb = new ClientBootstrap(newClientSocketChannelFactory(executor)); + ServerBootstrap sb = new ServerBootstrap(newServerSocketChannelFactory(Executors.newCachedThreadPool())); + ClientBootstrap cb = new ClientBootstrap(newClientSocketChannelFactory(Executors.newCachedThreadPool())); EchoHandler sh = new EchoHandler(); EchoHandler ch = new EchoHandler(); @@ -135,6 +120,10 @@ public abstract class AbstractSocketObjectStreamEchoTest { sh.channel.close().awaitUninterruptibly(); ch.channel.close().awaitUninterruptibly(); sc.close().awaitUninterruptibly(); + cb.shutdown(); + sb.shutdown(); + cb.releaseExternalResources(); + sb.releaseExternalResources(); if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) { throw sh.exception.get(); diff --git a/src/test/java/org/jboss/netty/handler/codec/spdy/AbstractSocketSpdyEchoTest.java b/src/test/java/org/jboss/netty/handler/codec/spdy/AbstractSocketSpdyEchoTest.java index 4f0fe2f91a..b8bc100715 100644 --- a/src/test/java/org/jboss/netty/handler/codec/spdy/AbstractSocketSpdyEchoTest.java +++ b/src/test/java/org/jboss/netty/handler/codec/spdy/AbstractSocketSpdyEchoTest.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.Random; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; @@ -40,9 +39,6 @@ import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; import org.jboss.netty.util.TestUtil; -import org.jboss.netty.util.internal.ExecutorUtil; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; public abstract class AbstractSocketSpdyEchoTest { @@ -50,8 +46,6 @@ public abstract class AbstractSocketSpdyEchoTest { private static final Random random = new Random(); static final int ignoredBytes = 20; - private static ExecutorService executor; - private static ChannelBuffer createFrames(int version) { int length = version < 3 ? 1176 : 1174; ChannelBuffer frames = ChannelBuffers.buffer(length); @@ -174,15 +168,6 @@ public abstract class AbstractSocketSpdyEchoTest { return frames; } - @BeforeClass - public static void init() { - executor = Executors.newCachedThreadPool(); - } - - @AfterClass - public static void destroy() { - ExecutorUtil.terminate(executor); - } protected abstract ChannelFactory newServerSocketChannelFactory(Executor executor); protected abstract ChannelFactory newClientSocketChannelFactory(Executor executor); @@ -195,8 +180,8 @@ public abstract class AbstractSocketSpdyEchoTest { } private void testSpdyEcho(int version) throws Throwable { - ServerBootstrap sb = new ServerBootstrap(newServerSocketChannelFactory(executor)); - ClientBootstrap cb = new ClientBootstrap(newClientSocketChannelFactory(executor)); + ServerBootstrap sb = new ServerBootstrap(newServerSocketChannelFactory(Executors.newCachedThreadPool())); + ClientBootstrap cb = new ClientBootstrap(newClientSocketChannelFactory(Executors.newCachedThreadPool())); ChannelBuffer frames = createFrames(version); @@ -236,6 +221,10 @@ public abstract class AbstractSocketSpdyEchoTest { sh.channel.close().awaitUninterruptibly(); ch.channel.close().awaitUninterruptibly(); sc.close().awaitUninterruptibly(); + cb.shutdown(); + sb.shutdown(); + cb.releaseExternalResources(); + sb.releaseExternalResources(); if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) { throw sh.exception.get(); diff --git a/src/test/java/org/jboss/netty/handler/codec/string/AbstractSocketStringEchoTest.java b/src/test/java/org/jboss/netty/handler/codec/string/AbstractSocketStringEchoTest.java index 9d2a8660e0..598362862d 100644 --- a/src/test/java/org/jboss/netty/handler/codec/string/AbstractSocketStringEchoTest.java +++ b/src/test/java/org/jboss/netty/handler/codec/string/AbstractSocketStringEchoTest.java @@ -49,8 +49,6 @@ public abstract class AbstractSocketStringEchoTest { static final Random random = new Random(); static final String[] data = new String[1024]; - private static ExecutorService executor; - static { for (int i = 0; i < data.length; i ++) { int eLen = random.nextInt(512); @@ -63,23 +61,13 @@ public abstract class AbstractSocketStringEchoTest { } } - @BeforeClass - public static void init() { - executor = Executors.newCachedThreadPool(); - } - - @AfterClass - public static void destroy() { - ExecutorUtil.terminate(executor); - } - protected abstract ChannelFactory newServerSocketChannelFactory(Executor executor); protected abstract ChannelFactory newClientSocketChannelFactory(Executor executor); @Test public void testStringEcho() throws Throwable { - ServerBootstrap sb = new ServerBootstrap(newServerSocketChannelFactory(executor)); - ClientBootstrap cb = new ClientBootstrap(newClientSocketChannelFactory(executor)); + ServerBootstrap sb = new ServerBootstrap(newServerSocketChannelFactory(Executors.newCachedThreadPool())); + ClientBootstrap cb = new ClientBootstrap(newClientSocketChannelFactory(Executors.newCachedThreadPool())); EchoHandler sh = new EchoHandler(); EchoHandler ch = new EchoHandler(); @@ -139,6 +127,12 @@ public abstract class AbstractSocketStringEchoTest { sh.channel.close().awaitUninterruptibly(); ch.channel.close().awaitUninterruptibly(); sc.close().awaitUninterruptibly(); + cc.close().awaitUninterruptibly(); + cb.shutdown(); + sb.shutdown(); + + cb.releaseExternalResources(); + sb.releaseExternalResources(); if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) { throw sh.exception.get(); diff --git a/src/test/java/org/jboss/netty/handler/ssl/AbstractSocketSslEchoTest.java b/src/test/java/org/jboss/netty/handler/ssl/AbstractSocketSslEchoTest.java index 1bb7f9c993..48cf47ee94 100644 --- a/src/test/java/org/jboss/netty/handler/ssl/AbstractSocketSslEchoTest.java +++ b/src/test/java/org/jboss/netty/handler/ssl/AbstractSocketSslEchoTest.java @@ -45,9 +45,6 @@ import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.util.TestUtil; -import org.jboss.netty.util.internal.ExecutorUtil; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; public abstract class AbstractSocketSslEchoTest { @@ -57,23 +54,10 @@ public abstract class AbstractSocketSslEchoTest { private static final Random random = new Random(); static final byte[] data = new byte[1048576]; - private static ExecutorService executor; - private static ExecutorService eventExecutor; - static { random.nextBytes(data); } - @BeforeClass - public static void init() { - executor = Executors.newCachedThreadPool(); - eventExecutor = new OrderedMemoryAwareThreadPoolExecutor(16, 0, 0); - } - - @AfterClass - public static void destroy() { - ExecutorUtil.terminate(executor, eventExecutor); - } protected abstract ChannelFactory newServerSocketChannelFactory(Executor executor); protected abstract ChannelFactory newClientSocketChannelFactory(Executor executor); @@ -84,8 +68,8 @@ public abstract class AbstractSocketSslEchoTest { @Test public void testSslEcho() throws Throwable { - ServerBootstrap sb = new ServerBootstrap(newServerSocketChannelFactory(executor)); - ClientBootstrap cb = new ClientBootstrap(newClientSocketChannelFactory(executor)); + ServerBootstrap sb = new ServerBootstrap(newServerSocketChannelFactory(Executors.newCachedThreadPool())); + ClientBootstrap cb = new ClientBootstrap(newClientSocketChannelFactory(Executors.newCachedThreadPool())); EchoHandler sh = new EchoHandler(true); EchoHandler ch = new EchoHandler(false); @@ -103,8 +87,9 @@ public abstract class AbstractSocketSslEchoTest { sb.getPipeline().addLast("handler", sh); cb.getPipeline().addFirst("ssl", new SslHandler(cse)); cb.getPipeline().addLast("handler", ch); - + ExecutorService eventExecutor = null; if (isExecutorRequired()) { + eventExecutor = new OrderedMemoryAwareThreadPoolExecutor(16, 0, 0); sb.getPipeline().addFirst("executor", new ExecutionHandler(eventExecutor)); cb.getPipeline().addFirst("executor", new ExecutionHandler(eventExecutor)); } @@ -171,7 +156,14 @@ public abstract class AbstractSocketSslEchoTest { sh.channel.close().awaitUninterruptibly(); ch.channel.close().awaitUninterruptibly(); sc.close().awaitUninterruptibly(); + cb.shutdown(); + sb.shutdown(); + cb.releaseExternalResources(); + sb.releaseExternalResources(); + if (eventExecutor != null) { + eventExecutor.shutdown(); + } if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) { throw sh.exception.get(); }