From e0e87ce2bc267a0bf8c512e5a1a5e9f2affc87bc Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 6 Mar 2012 19:26:10 +0100 Subject: [PATCH] Allow to share Workers by using a WorkerPool. --- .../channel/socket/nio/AbstractNioWorker.java | 13 ++- .../socket/nio/AbstractWorkerPool.java | 83 +++++++++++++++++++ .../nio/NioClientSocketChannelFactory.java | 31 ++++--- .../nio/NioClientSocketPipelineSink.java | 18 ++-- .../socket/nio/NioDatagramChannelFactory.java | 35 ++++---- .../socket/nio/NioDatagramPipelineSink.java | 13 +-- .../channel/socket/nio/NioDatagramWorker.java | 5 +- .../socket/nio/NioDatagramWorkerPool.java | 37 +++++++++ .../nio/NioServerSocketChannelFactory.java | 38 ++++++--- .../nio/NioServerSocketPipelineSink.java | 14 +--- .../netty/channel/socket/nio/NioWorker.java | 7 +- .../channel/socket/nio/NioWorkerPool.java | 37 +++++++++ .../socket/nio/ShareableWorkerPool.java | 48 +++++++++++ .../netty/channel/socket/nio/WorkerPool.java | 35 ++++++++ 14 files changed, 340 insertions(+), 74 deletions(-) create mode 100644 transport/src/main/java/io/netty/channel/socket/nio/AbstractWorkerPool.java create mode 100644 transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorkerPool.java create mode 100644 transport/src/main/java/io/netty/channel/socket/nio/NioWorkerPool.java create mode 100644 transport/src/main/java/io/netty/channel/socket/nio/ShareableWorkerPool.java create mode 100644 transport/src/main/java/io/netty/channel/socket/nio/WorkerPool.java diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java index 43aa1bca84..956305427f 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java @@ -115,8 +115,15 @@ abstract class AbstractNioWorker implements Worker { private final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool(); + private final boolean allowShutdownOnIdle; + AbstractNioWorker(Executor executor) { + this(executor, true); + } + + public AbstractNioWorker(Executor executor, boolean allowShutdownOnIdle) { this.executor = executor; + this.allowShutdownOnIdle = allowShutdownOnIdle; } void register(AbstractNioChannel channel, ChannelFuture future) { @@ -251,8 +258,10 @@ abstract class AbstractNioWorker implements Worker { } } } else { - // Give one more second. - shutdown = true; + if (allowShutdownOnIdle) { + // Give one more second. + shutdown = true; + } } } else { shutdown = false; diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractWorkerPool.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractWorkerPool.java new file mode 100644 index 0000000000..39b5556703 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractWorkerPool.java @@ -0,0 +1,83 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.channel.socket.nio; + +import io.netty.channel.Channel; +import io.netty.channel.socket.Worker; +import io.netty.util.ExternalResourceReleasable; +import io.netty.util.internal.ExecutorUtil; + +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Abstract base class for {@link WorkerPool} implementations that create the {@link Worker}'s up-front and return them in a "fair" fashion when calling + * {@link #nextWorker()} + * + */ +public abstract class AbstractWorkerPool implements WorkerPool , ExternalResourceReleasable{ + + private final AbstractNioWorker[] workers; + private final AtomicInteger workerIndex = new AtomicInteger(); + private final Executor workerExecutor; + + /** + * Create a new instance + * + * @param workerExecutor the {@link Executor} to use for the {@link Worker}'s + * @param allowShutdownOnIdle allow the {@link Worker}'s to shutdown when there is not {@link Channel} is registered with it + * @param workerCount the count of {@link Worker}'s to create + */ + AbstractWorkerPool(Executor workerExecutor, int workerCount, boolean allowShutDownOnIdle) { + if (workerExecutor == null) { + throw new NullPointerException("workerExecutor"); + } + if (workerCount <= 0) { + throw new IllegalArgumentException( + "workerCount (" + workerCount + ") " + + "must be a positive integer."); + } + workers = new AbstractNioWorker[workerCount]; + + for (int i = 0; i < workers.length; i++) { + workers[i] = createWorker(workerExecutor, allowShutDownOnIdle); + } + this.workerExecutor = workerExecutor; + + } + + /** + * Create a new {@link Worker} which uses the given {@link Executor} to service IO + * + * + * @param executor the {@link Executor} to use + * @param allowShutdownOnIdle allow the {@link Worker} to shutdown when there is not {@link Channel} is registered with it + * @return worker the new {@link Worker} + */ + protected abstract E createWorker(Executor executor, boolean allowShutdownOnIdle); + + @SuppressWarnings("unchecked") + public E nextWorker() { + return (E) workers[Math.abs(workerIndex.getAndIncrement() % workers.length)]; + } + + @Override + public void releaseExternalResources() { + ExecutorUtil.terminate(workerExecutor); + } + +} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketChannelFactory.java b/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketChannelFactory.java index f91e07cfc0..4eea343134 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketChannelFactory.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketChannelFactory.java @@ -25,6 +25,7 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.group.ChannelGroup; import io.netty.channel.socket.ClientSocketChannelFactory; import io.netty.channel.socket.SocketChannel; +import io.netty.util.ExternalResourceReleasable; import io.netty.util.internal.ExecutorUtil; /** @@ -82,7 +83,7 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory private static final int DEFAULT_BOSS_COUNT = 1; private final Executor bossExecutor; - private final Executor workerExecutor; + private final WorkerPool workerPool; private final NioClientSocketPipelineSink sink; /** @@ -136,29 +137,32 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory Executor bossExecutor, Executor workerExecutor, int bossCount, int workerCount) { + this(bossExecutor, bossCount, new NioWorkerPool(workerExecutor, workerCount, true)); + } + + public NioClientSocketChannelFactory( + Executor bossExecutor, int bossCount, + WorkerPool workerPool) { + if (bossExecutor == null) { throw new NullPointerException("bossExecutor"); } - if (workerExecutor == null) { - throw new NullPointerException("workerExecutor"); + if (workerPool == null) { + throw new NullPointerException("workerPool"); } if (bossCount <= 0) { throw new IllegalArgumentException( "bossCount (" + bossCount + ") " + "must be a positive integer."); } - if (workerCount <= 0) { - throw new IllegalArgumentException( - "workerCount (" + workerCount + ") " + - "must be a positive integer."); - } + this.bossExecutor = bossExecutor; - this.workerExecutor = workerExecutor; + this.workerPool = workerPool; sink = new NioClientSocketPipelineSink( - bossExecutor, workerExecutor, bossCount, workerCount); + bossExecutor, bossCount, workerPool); } - + @Override public SocketChannel newChannel(ChannelPipeline pipeline) { @@ -167,6 +171,9 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory @Override public void releaseExternalResources() { - ExecutorUtil.terminate(bossExecutor, workerExecutor); + ExecutorUtil.terminate(bossExecutor); + if (workerPool instanceof ExternalResourceReleasable) { + ((ExternalResourceReleasable) workerPool).releaseExternalResources(); + } } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java b/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java index 512d17da00..68ec7b4351 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java @@ -51,15 +51,13 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink { final Executor bossExecutor; - private final Boss[] bosses; - private final NioWorker[] workers; - + private final Boss[] bosses; private final AtomicInteger bossIndex = new AtomicInteger(); - private final AtomicInteger workerIndex = new AtomicInteger(); + + private final WorkerPool workerPool; NioClientSocketPipelineSink( - Executor bossExecutor, Executor workerExecutor, - int bossCount, int workerCount) { + Executor bossExecutor, int bossCount, WorkerPool workerPool) { this.bossExecutor = bossExecutor; @@ -68,10 +66,7 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink { bosses[i] = new Boss(); } - workers = new NioWorker[workerCount]; - for (int i = 0; i < workers.length; i ++) { - workers[i] = new NioWorker(workerExecutor); - } + this.workerPool = workerPool; } @Override @@ -162,8 +157,7 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink { } NioWorker nextWorker() { - return workers[Math.abs( - workerIndex.getAndIncrement() % workers.length)]; + return workerPool.nextWorker(); } Boss nextBoss() { diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelFactory.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelFactory.java index d0063a21c7..151d10317a 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelFactory.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelFactory.java @@ -24,8 +24,9 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.group.ChannelGroup; import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramChannelFactory; +import io.netty.channel.socket.Worker; import io.netty.channel.socket.oio.OioDatagramChannelFactory; -import io.netty.util.internal.ExecutorUtil; +import io.netty.util.ExternalResourceReleasable; /** * A {@link DatagramChannelFactory} that creates a NIO-based connectionless @@ -75,8 +76,8 @@ import io.netty.util.internal.ExecutorUtil; */ public class NioDatagramChannelFactory implements DatagramChannelFactory { - private final Executor workerExecutor; private final NioDatagramPipelineSink sink; + private final WorkerPool workerPool; /** * Creates a new instance. Calling this constructor is same with calling @@ -101,21 +102,20 @@ public class NioDatagramChannelFactory implements DatagramChannelFactory { */ public NioDatagramChannelFactory(final Executor workerExecutor, final int workerCount) { - if (workerCount <= 0) { - throw new IllegalArgumentException(String - .format("workerCount (%s) must be a positive integer.", - workerCount)); - } - - if (workerExecutor == null) { - throw new NullPointerException( - "workerExecutor argument must not be null"); - } - this.workerExecutor = workerExecutor; - - sink = new NioDatagramPipelineSink(workerExecutor, workerCount); + this(new NioDatagramWorkerPool(workerExecutor, workerCount, true)); } + /** + * Creates a new instance. + * + * @param workerPool + * the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads + */ + public NioDatagramChannelFactory(WorkerPool workerPool) { + this.workerPool = workerPool; + sink = new NioDatagramPipelineSink(workerPool); + } + @Override public DatagramChannel newChannel(final ChannelPipeline pipeline) { return NioDatagramChannel.create(this, pipeline, sink, sink.nextWorker()); @@ -123,6 +123,9 @@ public class NioDatagramChannelFactory implements DatagramChannelFactory { @Override public void releaseExternalResources() { - ExecutorUtil.terminate(workerExecutor); + if (workerPool instanceof ExternalResourceReleasable) { + ((ExternalResourceReleasable) workerPool).releaseExternalResources(); + } + } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramPipelineSink.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramPipelineSink.java index 401d6dbf8b..32fc568a42 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramPipelineSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramPipelineSink.java @@ -20,7 +20,6 @@ import static io.netty.channel.Channels.*; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicInteger; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelFuture; @@ -36,8 +35,7 @@ import io.netty.channel.MessageEvent; */ class NioDatagramPipelineSink extends AbstractNioChannelSink { - private final NioDatagramWorker[] workers; - private final AtomicInteger workerIndex = new AtomicInteger(); + private final WorkerPool workerPool; /** * Creates a new {@link NioDatagramPipelineSink} with a the number of {@link NioDatagramWorker}s specified in workerCount. @@ -49,11 +47,8 @@ class NioDatagramPipelineSink extends AbstractNioChannelSink { * @param workerCount * the number of {@link NioDatagramWorker}s for this sink */ - NioDatagramPipelineSink(final Executor workerExecutor, final int workerCount) { - workers = new NioDatagramWorker[workerCount]; - for (int i = 0; i < workers.length; i ++) { - workers[i] = new NioDatagramWorker(workerExecutor); - } + NioDatagramPipelineSink(final WorkerPool workerPool) { + this.workerPool = workerPool; } /** @@ -191,7 +186,7 @@ class NioDatagramPipelineSink extends AbstractNioChannelSink { } NioDatagramWorker nextWorker() { - return workers[Math.abs(workerIndex.getAndIncrement() % workers.length)]; + return workerPool.nextWorker(); } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java index 6191df2e08..e0c35f8072 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java @@ -38,7 +38,7 @@ import java.util.concurrent.Executor; * A class responsible for registering channels with {@link Selector}. * It also implements the {@link Selector} loop. */ -class NioDatagramWorker extends AbstractNioWorker { +public class NioDatagramWorker extends AbstractNioWorker { /** * Sole constructor. @@ -50,6 +50,9 @@ class NioDatagramWorker extends AbstractNioWorker { super(executor); } + NioDatagramWorker(final Executor executor, boolean allowShutdownOnIdle) { + super(executor, allowShutdownOnIdle); + } @Override protected boolean read(final SelectionKey key) { final NioDatagramChannel channel = (NioDatagramChannel) key.attachment(); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorkerPool.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorkerPool.java new file mode 100644 index 0000000000..938c898b77 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorkerPool.java @@ -0,0 +1,37 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket.nio; + +import java.util.concurrent.Executor; + + +/** + * Default implementation which hands of {@link NioDatagramWorker}'s + * + * + */ +public class NioDatagramWorkerPool extends AbstractWorkerPool{ + + protected NioDatagramWorkerPool(Executor executor, int workerCount, boolean allowShutdownOnIdle) { + super(executor, workerCount, allowShutdownOnIdle); + } + + @Override + protected NioDatagramWorker createWorker(Executor executor, boolean allowShutdownOnIdle) { + return new NioDatagramWorker(executor, allowShutdownOnIdle); + } + +} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannelFactory.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannelFactory.java index 5715f5a974..0a0918e925 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannelFactory.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannelFactory.java @@ -26,6 +26,8 @@ import io.netty.channel.ChannelSink; import io.netty.channel.group.ChannelGroup; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.ServerSocketChannelFactory; +import io.netty.channel.socket.Worker; +import io.netty.util.ExternalResourceReleasable; import io.netty.util.internal.ExecutorUtil; /** @@ -84,7 +86,7 @@ import io.netty.util.internal.ExecutorUtil; public class NioServerSocketChannelFactory implements ServerSocketChannelFactory { final Executor bossExecutor; - private final Executor workerExecutor; + private final WorkerPool workerPool; private final ChannelSink sink; /** @@ -116,22 +118,32 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory public NioServerSocketChannelFactory( Executor bossExecutor, Executor workerExecutor, int workerCount) { + this(bossExecutor, new NioWorkerPool(workerExecutor, workerCount, true)); + } + + /** + * Creates a new instance. + * + * @param bossExecutor + * the {@link Executor} which will execute the boss threads + * @param workerPool + * the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads + */ + public NioServerSocketChannelFactory( + Executor bossExecutor, WorkerPool workerPool) { if (bossExecutor == null) { throw new NullPointerException("bossExecutor"); } - if (workerExecutor == null) { - throw new NullPointerException("workerExecutor"); - } - if (workerCount <= 0) { - throw new IllegalArgumentException( - "workerCount (" + workerCount + ") " + - "must be a positive integer."); + if (workerPool == null) { + throw new NullPointerException("workerPool"); } + this.bossExecutor = bossExecutor; - this.workerExecutor = workerExecutor; - sink = new NioServerSocketPipelineSink(workerExecutor, workerCount); + this.workerPool = workerPool; + sink = new NioServerSocketPipelineSink(workerPool); } + @Override public ServerSocketChannel newChannel(ChannelPipeline pipeline) { return NioServerSocketChannel.create(this, pipeline, sink); @@ -139,6 +151,10 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory @Override public void releaseExternalResources() { - ExecutorUtil.terminate(bossExecutor, workerExecutor); + ExecutorUtil.terminate(bossExecutor); + if (workerPool instanceof ExternalResourceReleasable) { + ((ExternalResourceReleasable) workerPool).releaseExternalResources(); + } + } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java index 965c585827..dd847310c5 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java @@ -27,7 +27,6 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicInteger; import io.netty.channel.Channel; import io.netty.channel.ChannelEvent; @@ -45,14 +44,10 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink { static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class); - private final NioWorker[] workers; - private final AtomicInteger workerIndex = new AtomicInteger(); + private final WorkerPool workerPool; - NioServerSocketPipelineSink(Executor workerExecutor, int workerCount) { - workers = new NioWorker[workerCount]; - for (int i = 0; i < workers.length; i ++) { - workers[i] = new NioWorker(workerExecutor); - } + NioServerSocketPipelineSink(WorkerPool workerPool) { + this.workerPool = workerPool; } @Override @@ -189,8 +184,7 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink { } NioWorker nextWorker() { - return workers[Math.abs( - workerIndex.getAndIncrement() % workers.length)]; + return workerPool.nextWorker(); } private final class Boss implements Runnable { diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java index f9fb541cdd..8909774ab6 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java @@ -35,13 +35,18 @@ import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.concurrent.Executor; -class NioWorker extends AbstractNioWorker { +public class NioWorker extends AbstractNioWorker { private final SocketReceiveBufferPool recvBufferPool = new SocketReceiveBufferPool(); NioWorker(Executor executor) { super(executor); } + + NioWorker(Executor executor, boolean allowShutdownOnIdle) { + super(executor, allowShutdownOnIdle); + } + @Override protected boolean read(SelectionKey k) { diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioWorkerPool.java b/transport/src/main/java/io/netty/channel/socket/nio/NioWorkerPool.java new file mode 100644 index 0000000000..cf124080ea --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioWorkerPool.java @@ -0,0 +1,37 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket.nio; + +import java.util.concurrent.Executor; + + +/** + * Default implementation which hands of {@link NioWorker}'s + * + * + */ +public class NioWorkerPool extends AbstractWorkerPool{ + + NioWorkerPool(Executor executor, int workerCount, boolean allowShutdownOnIdle) { + super(executor, workerCount, allowShutdownOnIdle); + } + + @Override + protected NioWorker createWorker(Executor executor, boolean allowShutdownOnIdle) { + return new NioWorker(executor, allowShutdownOnIdle); + } + +} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/ShareableWorkerPool.java b/transport/src/main/java/io/netty/channel/socket/nio/ShareableWorkerPool.java new file mode 100644 index 0000000000..2c856cd562 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/nio/ShareableWorkerPool.java @@ -0,0 +1,48 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket.nio; + +import io.netty.channel.socket.Worker; +import io.netty.util.ExternalResourceReleasable; + +/** + * This implementation of a {@link WorkerPool} should be used if you plan to share a {@link WorkerPool} between different Factories. You will need to call {@link #destroy()} by your own once + * you want to release any resources of it. + * + * + */ +public final class ShareableWorkerPool implements WorkerPool{ + + private final WorkerPool wrapped; + + public ShareableWorkerPool(WorkerPool wrapped) { + this.wrapped = wrapped; + } + + @Override + public E nextWorker() { + return wrapped.nextWorker(); + } + + /** + * Destroy the {@link ShareableWorkerPool} and release all resources. After this is called its not usable anymore + */ + public void destroy() { + if (wrapped instanceof ExternalResourceReleasable) { + ((ExternalResourceReleasable) wrapped).releaseExternalResources(); + } + } +} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/WorkerPool.java b/transport/src/main/java/io/netty/channel/socket/nio/WorkerPool.java new file mode 100644 index 0000000000..f99f936544 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/nio/WorkerPool.java @@ -0,0 +1,35 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.channel.socket.nio; + +import io.netty.channel.socket.Worker; + +/** + * The {@link WorkerPool} is responsible to hand of {@link Worker}'s on demand + * + */ +public interface WorkerPool { + + /** + * Return the next {@link Worker} to use + * + * @return worker + */ + E nextWorker(); + + +}