diff --git a/src/main/java/org/jboss/netty/channel/socket/Worker.java b/src/main/java/org/jboss/netty/channel/socket/Worker.java index 824f933b48..9e627a99cf 100644 --- a/src/main/java/org/jboss/netty/channel/socket/Worker.java +++ b/src/main/java/org/jboss/netty/channel/socket/Worker.java @@ -16,9 +16,6 @@ package org.jboss.netty.channel.socket; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFuture; - /** * A {@link Worker} is responsible to dispatch IO operations * @@ -32,5 +29,5 @@ public interface Worker extends Runnable { * @param task * the {@link Runnable} to execute */ - ChannelFuture executeInIoThread(Channel channel, Runnable task); + void executeInIoThread(Runnable task); } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioChannelSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioChannelSink.java index 3c01a5ca94..7eadf5b294 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioChannelSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioChannelSink.java @@ -21,6 +21,7 @@ import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.socket.ChannelRunnableWrapper; public abstract class AbstractNioChannelSink extends AbstractChannelSink { @@ -29,8 +30,9 @@ public abstract class AbstractNioChannelSink extends AbstractChannelSink { Channel ch = pipeline.getChannel(); if (ch instanceof AbstractNioChannel) { AbstractNioChannel channel = (AbstractNioChannel) ch; - - return channel.worker.executeInIoThread(ch, task); + ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(pipeline.getChannel(), task); + channel.worker.executeInIoThread(wrapper); + return wrapper; } return super.execute(pipeline, task); diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorker.java index 0b31a5bff9..111f9bc1e7 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorker.java @@ -21,7 +21,6 @@ import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelException; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.socket.ChannelRunnableWrapper; import org.jboss.netty.channel.socket.Worker; import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; import org.jboss.netty.logging.InternalLogger; @@ -42,7 +41,6 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -116,20 +114,42 @@ abstract class AbstractNioWorker implements Worker { private final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool(); + private final boolean allowShutdownOnIdle; + AbstractNioWorker(Executor executor) { + this(executor, true); + } + + public AbstractNioWorker(Executor executor, boolean allowShutdownOnIdle) { this.executor = executor; + this.allowShutdownOnIdle = allowShutdownOnIdle; } void register(AbstractNioChannel channel, ChannelFuture future) { Runnable registerTask = createRegisterTask(channel, future); - Selector selector; + Selector selector = start(); + + boolean offered = registerTaskQueue.offer(registerTask); + assert offered; + + if (wakenUp.compareAndSet(false, true)) { + selector.wakeup(); + } + } + + /** + * Start the {@link AbstractNioWorker} and return the {@link Selector} that will be used for the {@link AbstractNioChannel}'s when they get registered + * + * @return selector + */ + private Selector start() { synchronized (startStopLock) { if (!started) { // Open a selector if this worker didn't start yet. try { - this.selector = selector = Selector.open(); + this.selector = Selector.open(); } catch (Throwable t) { throw new ChannelException("Failed to create a selector.", t); } @@ -147,25 +167,17 @@ abstract class AbstractNioWorker implements Worker { } catch (Throwable t) { logger.warn("Failed to close a selector.", t); } - this.selector = selector = null; + this.selector = null; // The method will return to the caller at this point. } } - } else { - // Use the existing selector if this worker has been started. - selector = this.selector; } assert selector != null && selector.isOpen(); started = true; - boolean offered = registerTaskQueue.offer(registerTask); - assert offered; - } - - if (wakenUp.compareAndSet(false, true)) { - selector.wakeup(); } + return selector; } @@ -251,8 +263,10 @@ abstract class AbstractNioWorker implements Worker { } } } else { - // Give one more second. - shutdown = true; + if (allowShutdownOnIdle) { + // Give one more second. + shutdown = true; + } } } else { shutdown = false; @@ -271,33 +285,40 @@ abstract class AbstractNioWorker implements Worker { } } } - - public ChannelFuture executeInIoThread(Channel channel, Runnable task) { - if (channel instanceof AbstractNioChannel && isIoThread((AbstractNioChannel) channel)) { - try { - task.run(); - return succeededFuture(channel); - } catch (Throwable t) { - return failedFuture(channel, t); - } - } else { - ChannelRunnableWrapper channelRunnable = new ChannelRunnableWrapper(channel, task); - boolean added = eventQueue.offer(channelRunnable); - - if (added) { - // wake up the selector to speed things - // wake up the selector to speed things - Selector selector = this.selector; - if (selector != null) { - selector.wakeup(); - } - } else { - channelRunnable.setFailure(new RejectedExecutionException("Unable to queue task " + task)); - } - return channelRunnable; - } + + public void executeInIoThread(Runnable task) { + executeInIoThread(task, false); } + /** + * Execute the {@link Runnable} in a IO-Thread + * + * @param task + * the {@link Runnable} to execute + * @param alwaysAsync + * true if the {@link Runnable} should be executed + * in an async fashion even if the current Thread == IO Thread + */ + public void executeInIoThread(Runnable task, boolean alwaysAsync) { + if (!alwaysAsync && Thread.currentThread() == thread) { + task.run(); + } else { + start(); + boolean added = eventQueue.offer(task); + + assert added; + if (added) { + // wake up the selector to speed things + Selector selector = this.selector; + if (selector != null) { + selector.wakeup(); + } + } + } + + } + + private void processRegisterTaskQueue() throws IOException { for (;;) { final Runnable task = registerTaskQueue.poll(); diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorkerPool.java b/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorkerPool.java new file mode 100644 index 0000000000..621c190fec --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorkerPool.java @@ -0,0 +1,83 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.jboss.netty.channel.socket.nio; + +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.socket.Worker; +import org.jboss.netty.util.ExternalResourceReleasable; +import org.jboss.netty.util.internal.ExecutorUtil; + +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Abstract base class for {@link WorkerPool} implementations that create the {@link Worker}'s up-front and return them in a "fair" fashion when calling + * {@link #nextWorker()} + * + */ +public abstract class AbstractNioWorkerPool implements WorkerPool , ExternalResourceReleasable { + + private final AbstractNioWorker[] workers; + private final AtomicInteger workerIndex = new AtomicInteger(); + private final Executor workerExecutor; + + /** + * Create a new instance + * + * @param workerExecutor the {@link Executor} to use for the {@link Worker}'s + * @param allowShutdownOnIdle allow the {@link Worker}'s to shutdown when there is not {@link Channel} is registered with it + * @param workerCount the count of {@link Worker}'s to create + */ + AbstractNioWorkerPool(Executor workerExecutor, int workerCount, boolean allowShutDownOnIdle) { + if (workerExecutor == null) { + throw new NullPointerException("workerExecutor"); + } + if (workerCount <= 0) { + throw new IllegalArgumentException( + "workerCount (" + workerCount + ") " + + "must be a positive integer."); + } + workers = new AbstractNioWorker[workerCount]; + + for (int i = 0; i < workers.length; i++) { + workers[i] = createWorker(workerExecutor, allowShutDownOnIdle); + } + this.workerExecutor = workerExecutor; + + } + + /** + * Create a new {@link Worker} which uses the given {@link Executor} to service IO + * + * + * @param executor the {@link Executor} to use + * @param allowShutdownOnIdle allow the {@link Worker} to shutdown when there is not {@link Channel} is registered with it + * @return worker the new {@link Worker} + */ + protected abstract E createWorker(Executor executor, boolean allowShutdownOnIdle); + + @SuppressWarnings("unchecked") + public E nextWorker() { + return (E) workers[Math.abs(workerIndex.getAndIncrement() % workers.length)]; + } + + + public void releaseExternalResources() { + ExecutorUtil.terminate(workerExecutor); + } + +} diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannelFactory.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannelFactory.java index f9038165ec..effffac269 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannelFactory.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketChannelFactory.java @@ -25,6 +25,7 @@ import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.socket.ClientSocketChannelFactory; import org.jboss.netty.channel.socket.SocketChannel; +import org.jboss.netty.util.ExternalResourceReleasable; import org.jboss.netty.util.internal.ExecutorUtil; /** @@ -83,7 +84,7 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory private static final int DEFAULT_BOSS_COUNT = 1; private final Executor bossExecutor; - private final Executor workerExecutor; + private final WorkerPool workerPool; private final NioClientSocketPipelineSink sink; /** @@ -135,27 +136,30 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory public NioClientSocketChannelFactory( Executor bossExecutor, Executor workerExecutor, int bossCount, int workerCount) { + this(bossExecutor, bossCount, new NioWorkerPool(workerExecutor, workerCount, true)); + } + + public NioClientSocketChannelFactory( + Executor bossExecutor, int bossCount, + WorkerPool workerPool) { + if (bossExecutor == null) { throw new NullPointerException("bossExecutor"); } - if (workerExecutor == null) { - throw new NullPointerException("workerExecutor"); + if (workerPool == null) { + throw new NullPointerException("workerPool"); } if (bossCount <= 0) { throw new IllegalArgumentException( "bossCount (" + bossCount + ") " + "must be a positive integer."); } - if (workerCount <= 0) { - throw new IllegalArgumentException( - "workerCount (" + workerCount + ") " + - "must be a positive integer."); - } + this.bossExecutor = bossExecutor; - this.workerExecutor = workerExecutor; + this.workerPool = workerPool; sink = new NioClientSocketPipelineSink( - bossExecutor, workerExecutor, bossCount, workerCount); + bossExecutor, bossCount, workerPool); } public SocketChannel newChannel(ChannelPipeline pipeline) { @@ -163,6 +167,9 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory } public void releaseExternalResources() { - ExecutorUtil.terminate(bossExecutor, workerExecutor); + ExecutorUtil.terminate(bossExecutor); + if (workerPool instanceof ExternalResourceReleasable) { + ((ExternalResourceReleasable) workerPool).releaseExternalResources(); + } } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java index 2d6b3a441f..06e0bb51a7 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java @@ -41,7 +41,6 @@ import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; -import org.jboss.netty.util.ThreadRenamingRunnable; import org.jboss.netty.util.internal.DeadLockProofWorker; import org.jboss.netty.util.internal.QueueFactory; @@ -49,33 +48,27 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink { static final InternalLogger logger = InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class); - private static final AtomicInteger nextId = new AtomicInteger(); - final int id = nextId.incrementAndGet(); final Executor bossExecutor; private final Boss[] bosses; - private final NioWorker[] workers; private final AtomicInteger bossIndex = new AtomicInteger(); - private final AtomicInteger workerIndex = new AtomicInteger(); + + private final WorkerPool workerPool; NioClientSocketPipelineSink( - Executor bossExecutor, Executor workerExecutor, - int bossCount, int workerCount) { + Executor bossExecutor, int bossCount, WorkerPool workerPool) { + this.bossExecutor = bossExecutor; - + bosses = new Boss[bossCount]; for (int i = 0; i < bosses.length; i ++) { - bosses[i] = new Boss(i + 1); + bosses[i] = new Boss(); } - - workers = new NioWorker[workerCount]; - for (int i = 0; i < workers.length; i ++) { - workers[i] = new NioWorker(workerExecutor); - } - } + this.workerPool = workerPool; + } public void eventSunk( ChannelPipeline pipeline, ChannelEvent e) throws Exception { if (e instanceof ChannelStateEvent) { @@ -167,21 +160,18 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink { } NioWorker nextWorker() { - return workers[Math.abs( - workerIndex.getAndIncrement() % workers.length)]; + return workerPool.nextWorker(); } private final class Boss implements Runnable { volatile Selector selector; private boolean started; - private final int subId; private final AtomicBoolean wakenUp = new AtomicBoolean(); private final Object startStopLock = new Object(); private final Queue registerTaskQueue = QueueFactory.createQueue(Runnable.class);; - Boss(int subId) { - this.subId = subId; + Boss() { } void register(NioClientSocketChannel channel) { @@ -202,9 +192,8 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink { boolean success = false; try { DeadLockProofWorker.start( - bossExecutor, - new ThreadRenamingRunnable( - this, "New I/O client boss #" + id + '-' + subId)); + bossExecutor, this); + success = true; } finally { if (!success) { diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelFactory.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelFactory.java index b219855f71..b1ac2e348b 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelFactory.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannelFactory.java @@ -25,7 +25,7 @@ import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.socket.DatagramChannel; import org.jboss.netty.channel.socket.DatagramChannelFactory; import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory; -import org.jboss.netty.util.internal.ExecutorUtil; +import org.jboss.netty.util.ExternalResourceReleasable; /** * A {@link DatagramChannelFactory} that creates a NIO-based connectionless @@ -76,8 +76,8 @@ import org.jboss.netty.util.internal.ExecutorUtil; */ public class NioDatagramChannelFactory implements DatagramChannelFactory { - private final Executor workerExecutor; private final NioDatagramPipelineSink sink; + private final WorkerPool workerPool; /** * Creates a new instance. Calling this constructor is same with calling @@ -94,34 +94,34 @@ public class NioDatagramChannelFactory implements DatagramChannelFactory { /** * Creates a new instance. - * + * * @param workerExecutor - * the {@link Executor} which will execute the I/O worker threads + * the {@link Executor} which will execute the I/O worker threads * @param workerCount - * the maximum number of I/O worker threads + * the maximum number of I/O worker threads */ - public NioDatagramChannelFactory(final Executor workerExecutor, - final int workerCount) { - if (workerCount <= 0) { - throw new IllegalArgumentException(String - .format("workerCount (%s) must be a positive integer.", - workerCount)); - } - - if (workerExecutor == null) { - throw new NullPointerException( - "workerExecutor argument must not be null"); - } - this.workerExecutor = workerExecutor; - - sink = new NioDatagramPipelineSink(workerExecutor, workerCount); + public NioDatagramChannelFactory(final Executor workerExecutor, final int workerCount) { + this(new NioDatagramWorkerPool(workerExecutor, workerCount, true)); } + /** + * Creates a new instance. + * + * @param workerPool + * the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads + */ + public NioDatagramChannelFactory(WorkerPool workerPool) { + this.workerPool = workerPool; + sink = new NioDatagramPipelineSink(workerPool); + } + public DatagramChannel newChannel(final ChannelPipeline pipeline) { return new NioDatagramChannel(this, pipeline, sink, sink.nextWorker()); } public void releaseExternalResources() { - ExecutorUtil.terminate(workerExecutor); + if (workerPool instanceof ExternalResourceReleasable) { + ((ExternalResourceReleasable) workerPool).releaseExternalResources(); + } } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java index 4f098fc223..8ea55b2944 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java @@ -20,7 +20,6 @@ import static org.jboss.netty.channel.Channels.*; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicInteger; import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelFuture; @@ -36,8 +35,7 @@ import org.jboss.netty.channel.MessageEvent; */ class NioDatagramPipelineSink extends AbstractNioChannelSink { - private final NioDatagramWorker[] workers; - private final AtomicInteger workerIndex = new AtomicInteger(); + private final WorkerPool workerPool; /** * Creates a new {@link NioDatagramPipelineSink} with a the number of {@link NioDatagramWorker}s specified in workerCount. @@ -49,11 +47,8 @@ class NioDatagramPipelineSink extends AbstractNioChannelSink { * @param workerCount * the number of {@link NioDatagramWorker}s for this sink */ - NioDatagramPipelineSink(final Executor workerExecutor, final int workerCount) { - workers = new NioDatagramWorker[workerCount]; - for (int i = 0; i < workers.length; i ++) { - workers[i] = new NioDatagramWorker(workerExecutor); - } + NioDatagramPipelineSink(final WorkerPool workerPool) { + this.workerPool = workerPool; } /** @@ -190,7 +185,7 @@ class NioDatagramPipelineSink extends AbstractNioChannelSink { } NioDatagramWorker nextWorker() { - return workers[Math.abs(workerIndex.getAndIncrement() % workers.length)]; + return workerPool.nextWorker(); } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java index 65dd7050b4..1a38a2ecdf 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java @@ -38,7 +38,7 @@ import java.util.concurrent.Executor; * A class responsible for registering channels with {@link Selector}. * It also implements the {@link Selector} loop. */ -class NioDatagramWorker extends AbstractNioWorker { +public class NioDatagramWorker extends AbstractNioWorker { /** * Sole constructor. @@ -50,6 +50,10 @@ class NioDatagramWorker extends AbstractNioWorker { super(executor); } + NioDatagramWorker(final Executor executor, boolean allowShutdownOnIdle) { + super(executor, allowShutdownOnIdle); + } + @Override protected boolean read(final SelectionKey key) { final NioDatagramChannel channel = (NioDatagramChannel) key.attachment(); diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorkerPool.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorkerPool.java new file mode 100644 index 0000000000..87c3ef4d42 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorkerPool.java @@ -0,0 +1,37 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.socket.nio; + +import java.util.concurrent.Executor; + + +/** + * Default implementation which hands of {@link NioDatagramWorker}'s + * + * + */ +public class NioDatagramWorkerPool extends AbstractNioWorkerPool { + + public NioDatagramWorkerPool(Executor executor, int workerCount, boolean allowShutdownOnIdle) { + super(executor, workerCount, allowShutdownOnIdle); + } + + @Override + protected NioDatagramWorker createWorker(Executor executor, boolean allowShutdownOnIdle) { + return new NioDatagramWorker(executor, allowShutdownOnIdle); + } + +} diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketChannelFactory.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketChannelFactory.java index 832fb06aaf..26cb114ec3 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketChannelFactory.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketChannelFactory.java @@ -26,6 +26,7 @@ import org.jboss.netty.channel.ChannelSink; import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.socket.ServerSocketChannel; import org.jboss.netty.channel.socket.ServerSocketChannelFactory; +import org.jboss.netty.util.ExternalResourceReleasable; import org.jboss.netty.util.internal.ExecutorUtil; /** @@ -85,7 +86,7 @@ import org.jboss.netty.util.internal.ExecutorUtil; public class NioServerSocketChannelFactory implements ServerSocketChannelFactory { final Executor bossExecutor; - private final Executor workerExecutor; + private final WorkerPool workerPool; private final ChannelSink sink; /** @@ -117,20 +118,29 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory public NioServerSocketChannelFactory( Executor bossExecutor, Executor workerExecutor, int workerCount) { + this(bossExecutor, new NioWorkerPool(workerExecutor, workerCount, true)); + } + + /** + * Creates a new instance. + * + * @param bossExecutor + * the {@link Executor} which will execute the boss threads + * @param workerPool + * the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads + */ + public NioServerSocketChannelFactory( + Executor bossExecutor, WorkerPool workerPool) { if (bossExecutor == null) { throw new NullPointerException("bossExecutor"); } - if (workerExecutor == null) { - throw new NullPointerException("workerExecutor"); - } - if (workerCount <= 0) { - throw new IllegalArgumentException( - "workerCount (" + workerCount + ") " + - "must be a positive integer."); + if (workerPool == null) { + throw new NullPointerException("workerPool"); } + this.bossExecutor = bossExecutor; - this.workerExecutor = workerExecutor; - sink = new NioServerSocketPipelineSink(workerExecutor, workerCount); + this.workerPool = workerPool; + sink = new NioServerSocketPipelineSink(workerPool); } public ServerSocketChannel newChannel(ChannelPipeline pipeline) { @@ -138,6 +148,9 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory } public void releaseExternalResources() { - ExecutorUtil.terminate(bossExecutor, workerExecutor); + ExecutorUtil.terminate(bossExecutor); + if (workerPool instanceof ExternalResourceReleasable) { + ((ExternalResourceReleasable) workerPool).releaseExternalResources(); + } } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java index 9f2b3e8fbf..771718f366 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java @@ -27,7 +27,6 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicInteger; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelEvent; @@ -38,26 +37,19 @@ import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; -import org.jboss.netty.util.ThreadRenamingRunnable; import org.jboss.netty.util.internal.DeadLockProofWorker; class NioServerSocketPipelineSink extends AbstractNioChannelSink { static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class); - private static final AtomicInteger nextId = new AtomicInteger(); + private final WorkerPool workerPool; - private final int id = nextId.incrementAndGet(); - private final NioWorker[] workers; - private final AtomicInteger workerIndex = new AtomicInteger(); - - NioServerSocketPipelineSink(Executor workerExecutor, int workerCount) { - workers = new NioWorker[workerCount]; - for (int i = 0; i < workers.length; i ++) { - workers[i] = new NioWorker(workerExecutor); - } + NioServerSocketPipelineSink(WorkerPool workerPool) { + this.workerPool = workerPool; } + public void eventSunk( ChannelPipeline pipeline, ChannelEvent e) throws Exception { Channel channel = e.getChannel(); @@ -145,10 +137,7 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink { Executor bossExecutor = ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor; DeadLockProofWorker.start( - bossExecutor, - new ThreadRenamingRunnable( - new Boss(channel), - "New I/O server boss #" + id + " (" + channel + ')')); + bossExecutor, new Boss(channel)); bossStarted = true; } catch (Throwable t) { future.setFailure(t); @@ -195,8 +184,7 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink { } NioWorker nextWorker() { - return workers[Math.abs( - workerIndex.getAndIncrement() % workers.length)]; + return workerPool.nextWorker(); } private final class Boss implements Runnable { diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java index b5e8837775..21c4ca2e74 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java @@ -35,13 +35,18 @@ import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.concurrent.Executor; -class NioWorker extends AbstractNioWorker { +public class NioWorker extends AbstractNioWorker { private final SocketReceiveBufferPool recvBufferPool = new SocketReceiveBufferPool(); - NioWorker(Executor executor) { + public NioWorker(Executor executor) { super(executor); } + + public NioWorker(Executor executor, boolean allowShutdownOnIdle) { + super(executor, allowShutdownOnIdle); + } + @Override protected boolean read(SelectionKey k) { diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorkerPool.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorkerPool.java new file mode 100644 index 0000000000..be15bb469b --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorkerPool.java @@ -0,0 +1,37 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.socket.nio; + +import java.util.concurrent.Executor; + + +/** + * Default implementation which hands of {@link NioWorker}'s + * + * + */ +public class NioWorkerPool extends AbstractNioWorkerPool { + + public NioWorkerPool(Executor executor, int workerCount, boolean allowShutdownOnIdle) { + super(executor, workerCount, allowShutdownOnIdle); + } + + @Override + protected NioWorker createWorker(Executor executor, boolean allowShutdownOnIdle) { + return new NioWorker(executor, allowShutdownOnIdle); + } + +} diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/ShareableWorkerPool.java b/src/main/java/org/jboss/netty/channel/socket/nio/ShareableWorkerPool.java new file mode 100644 index 0000000000..114d151f7b --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/nio/ShareableWorkerPool.java @@ -0,0 +1,47 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.channel.socket.nio; + +import org.jboss.netty.channel.socket.Worker; +import org.jboss.netty.util.ExternalResourceReleasable; + +/** + * This implementation of a {@link WorkerPool} should be used if you plan to share a {@link WorkerPool} between different Factories. You will need to call {@link #destroy()} by your own once + * you want to release any resources of it. + * + * + */ +public final class ShareableWorkerPool implements WorkerPool { + + private final WorkerPool wrapped; + + public ShareableWorkerPool(WorkerPool wrapped) { + this.wrapped = wrapped; + } + + public E nextWorker() { + return wrapped.nextWorker(); + } + + /** + * Destroy the {@link ShareableWorkerPool} and release all resources. After this is called its not usable anymore + */ + public void destroy() { + if (wrapped instanceof ExternalResourceReleasable) { + ((ExternalResourceReleasable) wrapped).releaseExternalResources(); + } + } +} diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/WorkerPool.java b/src/main/java/org/jboss/netty/channel/socket/nio/WorkerPool.java new file mode 100644 index 0000000000..21a5681548 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/nio/WorkerPool.java @@ -0,0 +1,35 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.jboss.netty.channel.socket.nio; + +import org.jboss.netty.channel.socket.Worker; + +/** + * The {@link WorkerPool} is responsible to hand of {@link Worker}'s on demand + * + */ +public interface WorkerPool { + + /** + * Return the next {@link Worker} to use + * + * @return worker + */ + E nextWorker(); + + +} diff --git a/src/main/java/org/jboss/netty/channel/socket/oio/AbstractOioChannelSink.java b/src/main/java/org/jboss/netty/channel/socket/oio/AbstractOioChannelSink.java index 17378eed3d..1eb70b9e71 100644 --- a/src/main/java/org/jboss/netty/channel/socket/oio/AbstractOioChannelSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/oio/AbstractOioChannelSink.java @@ -21,6 +21,7 @@ import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.socket.ChannelRunnableWrapper; import org.jboss.netty.channel.socket.Worker; public abstract class AbstractOioChannelSink extends AbstractChannelSink { @@ -32,7 +33,9 @@ public abstract class AbstractOioChannelSink extends AbstractChannelSink { AbstractOioChannel channel = (AbstractOioChannel) ch; Worker worker = channel.worker; if (worker != null) { - return channel.worker.executeInIoThread(ch, task); + ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(pipeline.getChannel(), task); + channel.worker.executeInIoThread(wrapper); + return wrapper; } } diff --git a/src/main/java/org/jboss/netty/channel/socket/oio/AbstractOioWorker.java b/src/main/java/org/jboss/netty/channel/socket/oio/AbstractOioWorker.java index 1559e4e15e..05094db46f 100644 --- a/src/main/java/org/jboss/netty/channel/socket/oio/AbstractOioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/oio/AbstractOioWorker.java @@ -19,13 +19,11 @@ import static org.jboss.netty.channel.Channels.*; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.socket.ChannelRunnableWrapper; import org.jboss.netty.channel.socket.Worker; import org.jboss.netty.util.internal.QueueFactory; import java.io.IOException; import java.util.Queue; -import java.util.concurrent.RejectedExecutionException; /** * Abstract base class for Oio-Worker implementations @@ -34,10 +32,16 @@ import java.util.concurrent.RejectedExecutionException; */ abstract class AbstractOioWorker implements Worker { - private final Queue eventQueue = QueueFactory.createQueue(ChannelRunnableWrapper.class); + private final Queue eventQueue = QueueFactory.createQueue(Runnable.class); protected final C channel; + /** + * If this worker has been started thread will be a reference to the thread + * used when starting. i.e. the current thread when the run method is executed. + */ + protected volatile Thread thread; + public AbstractOioWorker(C channel) { this.channel = channel; channel.worker = this; @@ -45,8 +49,8 @@ abstract class AbstractOioWorker implements Worker public void run() { - channel.workerThread = Thread.currentThread(); - + thread = channel.workerThread = Thread.currentThread(); + while (channel.isOpen()) { synchronized (channel.interestOpsLock) { while (!channel.isReadable()) { @@ -91,31 +95,21 @@ abstract class AbstractOioWorker implements Worker } - public ChannelFuture executeInIoThread(Channel channel, Runnable task) { - if (channel instanceof AbstractOioChannel && isIoThread((AbstractOioChannel) channel)) { - try { - task.run(); - return succeededFuture(channel); - } catch (Throwable t) { - return failedFuture(channel, t); - } + public void executeInIoThread(Runnable task) { + if (Thread.currentThread() == thread) { + task.run(); } else { - ChannelRunnableWrapper channelRunnable = new ChannelRunnableWrapper(channel, task); - boolean added = eventQueue.offer(channelRunnable); + boolean added = eventQueue.offer(task); if (added) { // as we set the SO_TIMEOUT to 1 second this task will get picked up in 1 second at latest - - } else { - channelRunnable.setFailure(new RejectedExecutionException("Unable to queue task " + task)); } - return channelRunnable; } } private void processEventQueue() throws IOException { for (;;) { - final ChannelRunnableWrapper task = eventQueue.poll(); + final Runnable task = eventQueue.poll(); if (task == null) { break; }