From e0e87ce2bc267a0bf8c512e5a1a5e9f2affc87bc Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 6 Mar 2012 19:26:10 +0100 Subject: [PATCH 01/11] Allow to share Workers by using a WorkerPool. --- .../channel/socket/nio/AbstractNioWorker.java | 13 ++- .../socket/nio/AbstractWorkerPool.java | 83 +++++++++++++++++++ .../nio/NioClientSocketChannelFactory.java | 31 ++++--- .../nio/NioClientSocketPipelineSink.java | 18 ++-- .../socket/nio/NioDatagramChannelFactory.java | 35 ++++---- .../socket/nio/NioDatagramPipelineSink.java | 13 +-- .../channel/socket/nio/NioDatagramWorker.java | 5 +- .../socket/nio/NioDatagramWorkerPool.java | 37 +++++++++ .../nio/NioServerSocketChannelFactory.java | 38 ++++++--- .../nio/NioServerSocketPipelineSink.java | 14 +--- .../netty/channel/socket/nio/NioWorker.java | 7 +- .../channel/socket/nio/NioWorkerPool.java | 37 +++++++++ .../socket/nio/ShareableWorkerPool.java | 48 +++++++++++ .../netty/channel/socket/nio/WorkerPool.java | 35 ++++++++ 14 files changed, 340 insertions(+), 74 deletions(-) create mode 100644 transport/src/main/java/io/netty/channel/socket/nio/AbstractWorkerPool.java create mode 100644 transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorkerPool.java create mode 100644 transport/src/main/java/io/netty/channel/socket/nio/NioWorkerPool.java create mode 100644 transport/src/main/java/io/netty/channel/socket/nio/ShareableWorkerPool.java create mode 100644 transport/src/main/java/io/netty/channel/socket/nio/WorkerPool.java diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java index 43aa1bca84..956305427f 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java @@ -115,8 +115,15 @@ abstract class AbstractNioWorker implements Worker { private final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool(); + private final boolean allowShutdownOnIdle; + AbstractNioWorker(Executor executor) { + this(executor, true); + } + + public AbstractNioWorker(Executor executor, boolean allowShutdownOnIdle) { this.executor = executor; + this.allowShutdownOnIdle = allowShutdownOnIdle; } void register(AbstractNioChannel channel, ChannelFuture future) { @@ -251,8 +258,10 @@ abstract class AbstractNioWorker implements Worker { } } } else { - // Give one more second. - shutdown = true; + if (allowShutdownOnIdle) { + // Give one more second. + shutdown = true; + } } } else { shutdown = false; diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractWorkerPool.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractWorkerPool.java new file mode 100644 index 0000000000..39b5556703 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractWorkerPool.java @@ -0,0 +1,83 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.channel.socket.nio; + +import io.netty.channel.Channel; +import io.netty.channel.socket.Worker; +import io.netty.util.ExternalResourceReleasable; +import io.netty.util.internal.ExecutorUtil; + +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Abstract base class for {@link WorkerPool} implementations that create the {@link Worker}'s up-front and return them in a "fair" fashion when calling + * {@link #nextWorker()} + * + */ +public abstract class AbstractWorkerPool implements WorkerPool , ExternalResourceReleasable{ + + private final AbstractNioWorker[] workers; + private final AtomicInteger workerIndex = new AtomicInteger(); + private final Executor workerExecutor; + + /** + * Create a new instance + * + * @param workerExecutor the {@link Executor} to use for the {@link Worker}'s + * @param allowShutdownOnIdle allow the {@link Worker}'s to shutdown when there is not {@link Channel} is registered with it + * @param workerCount the count of {@link Worker}'s to create + */ + AbstractWorkerPool(Executor workerExecutor, int workerCount, boolean allowShutDownOnIdle) { + if (workerExecutor == null) { + throw new NullPointerException("workerExecutor"); + } + if (workerCount <= 0) { + throw new IllegalArgumentException( + "workerCount (" + workerCount + ") " + + "must be a positive integer."); + } + workers = new AbstractNioWorker[workerCount]; + + for (int i = 0; i < workers.length; i++) { + workers[i] = createWorker(workerExecutor, allowShutDownOnIdle); + } + this.workerExecutor = workerExecutor; + + } + + /** + * Create a new {@link Worker} which uses the given {@link Executor} to service IO + * + * + * @param executor the {@link Executor} to use + * @param allowShutdownOnIdle allow the {@link Worker} to shutdown when there is not {@link Channel} is registered with it + * @return worker the new {@link Worker} + */ + protected abstract E createWorker(Executor executor, boolean allowShutdownOnIdle); + + @SuppressWarnings("unchecked") + public E nextWorker() { + return (E) workers[Math.abs(workerIndex.getAndIncrement() % workers.length)]; + } + + @Override + public void releaseExternalResources() { + ExecutorUtil.terminate(workerExecutor); + } + +} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketChannelFactory.java b/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketChannelFactory.java index f91e07cfc0..4eea343134 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketChannelFactory.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketChannelFactory.java @@ -25,6 +25,7 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.group.ChannelGroup; import io.netty.channel.socket.ClientSocketChannelFactory; import io.netty.channel.socket.SocketChannel; +import io.netty.util.ExternalResourceReleasable; import io.netty.util.internal.ExecutorUtil; /** @@ -82,7 +83,7 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory private static final int DEFAULT_BOSS_COUNT = 1; private final Executor bossExecutor; - private final Executor workerExecutor; + private final WorkerPool workerPool; private final NioClientSocketPipelineSink sink; /** @@ -136,29 +137,32 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory Executor bossExecutor, Executor workerExecutor, int bossCount, int workerCount) { + this(bossExecutor, bossCount, new NioWorkerPool(workerExecutor, workerCount, true)); + } + + public NioClientSocketChannelFactory( + Executor bossExecutor, int bossCount, + WorkerPool workerPool) { + if (bossExecutor == null) { throw new NullPointerException("bossExecutor"); } - if (workerExecutor == null) { - throw new NullPointerException("workerExecutor"); + if (workerPool == null) { + throw new NullPointerException("workerPool"); } if (bossCount <= 0) { throw new IllegalArgumentException( "bossCount (" + bossCount + ") " + "must be a positive integer."); } - if (workerCount <= 0) { - throw new IllegalArgumentException( - "workerCount (" + workerCount + ") " + - "must be a positive integer."); - } + this.bossExecutor = bossExecutor; - this.workerExecutor = workerExecutor; + this.workerPool = workerPool; sink = new NioClientSocketPipelineSink( - bossExecutor, workerExecutor, bossCount, workerCount); + bossExecutor, bossCount, workerPool); } - + @Override public SocketChannel newChannel(ChannelPipeline pipeline) { @@ -167,6 +171,9 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory @Override public void releaseExternalResources() { - ExecutorUtil.terminate(bossExecutor, workerExecutor); + ExecutorUtil.terminate(bossExecutor); + if (workerPool instanceof ExternalResourceReleasable) { + ((ExternalResourceReleasable) workerPool).releaseExternalResources(); + } } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java b/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java index 512d17da00..68ec7b4351 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java @@ -51,15 +51,13 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink { final Executor bossExecutor; - private final Boss[] bosses; - private final NioWorker[] workers; - + private final Boss[] bosses; private final AtomicInteger bossIndex = new AtomicInteger(); - private final AtomicInteger workerIndex = new AtomicInteger(); + + private final WorkerPool workerPool; NioClientSocketPipelineSink( - Executor bossExecutor, Executor workerExecutor, - int bossCount, int workerCount) { + Executor bossExecutor, int bossCount, WorkerPool workerPool) { this.bossExecutor = bossExecutor; @@ -68,10 +66,7 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink { bosses[i] = new Boss(); } - workers = new NioWorker[workerCount]; - for (int i = 0; i < workers.length; i ++) { - workers[i] = new NioWorker(workerExecutor); - } + this.workerPool = workerPool; } @Override @@ -162,8 +157,7 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink { } NioWorker nextWorker() { - return workers[Math.abs( - workerIndex.getAndIncrement() % workers.length)]; + return workerPool.nextWorker(); } Boss nextBoss() { diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelFactory.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelFactory.java index d0063a21c7..151d10317a 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelFactory.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelFactory.java @@ -24,8 +24,9 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.group.ChannelGroup; import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramChannelFactory; +import io.netty.channel.socket.Worker; import io.netty.channel.socket.oio.OioDatagramChannelFactory; -import io.netty.util.internal.ExecutorUtil; +import io.netty.util.ExternalResourceReleasable; /** * A {@link DatagramChannelFactory} that creates a NIO-based connectionless @@ -75,8 +76,8 @@ import io.netty.util.internal.ExecutorUtil; */ public class NioDatagramChannelFactory implements DatagramChannelFactory { - private final Executor workerExecutor; private final NioDatagramPipelineSink sink; + private final WorkerPool workerPool; /** * Creates a new instance. Calling this constructor is same with calling @@ -101,21 +102,20 @@ public class NioDatagramChannelFactory implements DatagramChannelFactory { */ public NioDatagramChannelFactory(final Executor workerExecutor, final int workerCount) { - if (workerCount <= 0) { - throw new IllegalArgumentException(String - .format("workerCount (%s) must be a positive integer.", - workerCount)); - } - - if (workerExecutor == null) { - throw new NullPointerException( - "workerExecutor argument must not be null"); - } - this.workerExecutor = workerExecutor; - - sink = new NioDatagramPipelineSink(workerExecutor, workerCount); + this(new NioDatagramWorkerPool(workerExecutor, workerCount, true)); } + /** + * Creates a new instance. + * + * @param workerPool + * the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads + */ + public NioDatagramChannelFactory(WorkerPool workerPool) { + this.workerPool = workerPool; + sink = new NioDatagramPipelineSink(workerPool); + } + @Override public DatagramChannel newChannel(final ChannelPipeline pipeline) { return NioDatagramChannel.create(this, pipeline, sink, sink.nextWorker()); @@ -123,6 +123,9 @@ public class NioDatagramChannelFactory implements DatagramChannelFactory { @Override public void releaseExternalResources() { - ExecutorUtil.terminate(workerExecutor); + if (workerPool instanceof ExternalResourceReleasable) { + ((ExternalResourceReleasable) workerPool).releaseExternalResources(); + } + } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramPipelineSink.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramPipelineSink.java index 401d6dbf8b..32fc568a42 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramPipelineSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramPipelineSink.java @@ -20,7 +20,6 @@ import static io.netty.channel.Channels.*; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicInteger; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelFuture; @@ -36,8 +35,7 @@ import io.netty.channel.MessageEvent; */ class NioDatagramPipelineSink extends AbstractNioChannelSink { - private final NioDatagramWorker[] workers; - private final AtomicInteger workerIndex = new AtomicInteger(); + private final WorkerPool workerPool; /** * Creates a new {@link NioDatagramPipelineSink} with a the number of {@link NioDatagramWorker}s specified in workerCount. @@ -49,11 +47,8 @@ class NioDatagramPipelineSink extends AbstractNioChannelSink { * @param workerCount * the number of {@link NioDatagramWorker}s for this sink */ - NioDatagramPipelineSink(final Executor workerExecutor, final int workerCount) { - workers = new NioDatagramWorker[workerCount]; - for (int i = 0; i < workers.length; i ++) { - workers[i] = new NioDatagramWorker(workerExecutor); - } + NioDatagramPipelineSink(final WorkerPool workerPool) { + this.workerPool = workerPool; } /** @@ -191,7 +186,7 @@ class NioDatagramPipelineSink extends AbstractNioChannelSink { } NioDatagramWorker nextWorker() { - return workers[Math.abs(workerIndex.getAndIncrement() % workers.length)]; + return workerPool.nextWorker(); } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java index 6191df2e08..e0c35f8072 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java @@ -38,7 +38,7 @@ import java.util.concurrent.Executor; * A class responsible for registering channels with {@link Selector}. * It also implements the {@link Selector} loop. */ -class NioDatagramWorker extends AbstractNioWorker { +public class NioDatagramWorker extends AbstractNioWorker { /** * Sole constructor. @@ -50,6 +50,9 @@ class NioDatagramWorker extends AbstractNioWorker { super(executor); } + NioDatagramWorker(final Executor executor, boolean allowShutdownOnIdle) { + super(executor, allowShutdownOnIdle); + } @Override protected boolean read(final SelectionKey key) { final NioDatagramChannel channel = (NioDatagramChannel) key.attachment(); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorkerPool.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorkerPool.java new file mode 100644 index 0000000000..938c898b77 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorkerPool.java @@ -0,0 +1,37 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket.nio; + +import java.util.concurrent.Executor; + + +/** + * Default implementation which hands of {@link NioDatagramWorker}'s + * + * + */ +public class NioDatagramWorkerPool extends AbstractWorkerPool{ + + protected NioDatagramWorkerPool(Executor executor, int workerCount, boolean allowShutdownOnIdle) { + super(executor, workerCount, allowShutdownOnIdle); + } + + @Override + protected NioDatagramWorker createWorker(Executor executor, boolean allowShutdownOnIdle) { + return new NioDatagramWorker(executor, allowShutdownOnIdle); + } + +} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannelFactory.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannelFactory.java index 5715f5a974..0a0918e925 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannelFactory.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannelFactory.java @@ -26,6 +26,8 @@ import io.netty.channel.ChannelSink; import io.netty.channel.group.ChannelGroup; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.ServerSocketChannelFactory; +import io.netty.channel.socket.Worker; +import io.netty.util.ExternalResourceReleasable; import io.netty.util.internal.ExecutorUtil; /** @@ -84,7 +86,7 @@ import io.netty.util.internal.ExecutorUtil; public class NioServerSocketChannelFactory implements ServerSocketChannelFactory { final Executor bossExecutor; - private final Executor workerExecutor; + private final WorkerPool workerPool; private final ChannelSink sink; /** @@ -116,22 +118,32 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory public NioServerSocketChannelFactory( Executor bossExecutor, Executor workerExecutor, int workerCount) { + this(bossExecutor, new NioWorkerPool(workerExecutor, workerCount, true)); + } + + /** + * Creates a new instance. + * + * @param bossExecutor + * the {@link Executor} which will execute the boss threads + * @param workerPool + * the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads + */ + public NioServerSocketChannelFactory( + Executor bossExecutor, WorkerPool workerPool) { if (bossExecutor == null) { throw new NullPointerException("bossExecutor"); } - if (workerExecutor == null) { - throw new NullPointerException("workerExecutor"); - } - if (workerCount <= 0) { - throw new IllegalArgumentException( - "workerCount (" + workerCount + ") " + - "must be a positive integer."); + if (workerPool == null) { + throw new NullPointerException("workerPool"); } + this.bossExecutor = bossExecutor; - this.workerExecutor = workerExecutor; - sink = new NioServerSocketPipelineSink(workerExecutor, workerCount); + this.workerPool = workerPool; + sink = new NioServerSocketPipelineSink(workerPool); } + @Override public ServerSocketChannel newChannel(ChannelPipeline pipeline) { return NioServerSocketChannel.create(this, pipeline, sink); @@ -139,6 +151,10 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory @Override public void releaseExternalResources() { - ExecutorUtil.terminate(bossExecutor, workerExecutor); + ExecutorUtil.terminate(bossExecutor); + if (workerPool instanceof ExternalResourceReleasable) { + ((ExternalResourceReleasable) workerPool).releaseExternalResources(); + } + } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java index 965c585827..dd847310c5 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java @@ -27,7 +27,6 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicInteger; import io.netty.channel.Channel; import io.netty.channel.ChannelEvent; @@ -45,14 +44,10 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink { static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class); - private final NioWorker[] workers; - private final AtomicInteger workerIndex = new AtomicInteger(); + private final WorkerPool workerPool; - NioServerSocketPipelineSink(Executor workerExecutor, int workerCount) { - workers = new NioWorker[workerCount]; - for (int i = 0; i < workers.length; i ++) { - workers[i] = new NioWorker(workerExecutor); - } + NioServerSocketPipelineSink(WorkerPool workerPool) { + this.workerPool = workerPool; } @Override @@ -189,8 +184,7 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink { } NioWorker nextWorker() { - return workers[Math.abs( - workerIndex.getAndIncrement() % workers.length)]; + return workerPool.nextWorker(); } private final class Boss implements Runnable { diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java index f9fb541cdd..8909774ab6 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java @@ -35,13 +35,18 @@ import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.concurrent.Executor; -class NioWorker extends AbstractNioWorker { +public class NioWorker extends AbstractNioWorker { private final SocketReceiveBufferPool recvBufferPool = new SocketReceiveBufferPool(); NioWorker(Executor executor) { super(executor); } + + NioWorker(Executor executor, boolean allowShutdownOnIdle) { + super(executor, allowShutdownOnIdle); + } + @Override protected boolean read(SelectionKey k) { diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioWorkerPool.java b/transport/src/main/java/io/netty/channel/socket/nio/NioWorkerPool.java new file mode 100644 index 0000000000..cf124080ea --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioWorkerPool.java @@ -0,0 +1,37 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket.nio; + +import java.util.concurrent.Executor; + + +/** + * Default implementation which hands of {@link NioWorker}'s + * + * + */ +public class NioWorkerPool extends AbstractWorkerPool{ + + NioWorkerPool(Executor executor, int workerCount, boolean allowShutdownOnIdle) { + super(executor, workerCount, allowShutdownOnIdle); + } + + @Override + protected NioWorker createWorker(Executor executor, boolean allowShutdownOnIdle) { + return new NioWorker(executor, allowShutdownOnIdle); + } + +} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/ShareableWorkerPool.java b/transport/src/main/java/io/netty/channel/socket/nio/ShareableWorkerPool.java new file mode 100644 index 0000000000..2c856cd562 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/nio/ShareableWorkerPool.java @@ -0,0 +1,48 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket.nio; + +import io.netty.channel.socket.Worker; +import io.netty.util.ExternalResourceReleasable; + +/** + * This implementation of a {@link WorkerPool} should be used if you plan to share a {@link WorkerPool} between different Factories. You will need to call {@link #destroy()} by your own once + * you want to release any resources of it. + * + * + */ +public final class ShareableWorkerPool implements WorkerPool{ + + private final WorkerPool wrapped; + + public ShareableWorkerPool(WorkerPool wrapped) { + this.wrapped = wrapped; + } + + @Override + public E nextWorker() { + return wrapped.nextWorker(); + } + + /** + * Destroy the {@link ShareableWorkerPool} and release all resources. After this is called its not usable anymore + */ + public void destroy() { + if (wrapped instanceof ExternalResourceReleasable) { + ((ExternalResourceReleasable) wrapped).releaseExternalResources(); + } + } +} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/WorkerPool.java b/transport/src/main/java/io/netty/channel/socket/nio/WorkerPool.java new file mode 100644 index 0000000000..f99f936544 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/nio/WorkerPool.java @@ -0,0 +1,35 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.channel.socket.nio; + +import io.netty.channel.socket.Worker; + +/** + * The {@link WorkerPool} is responsible to hand of {@link Worker}'s on demand + * + */ +public interface WorkerPool { + + /** + * Return the next {@link Worker} to use + * + * @return worker + */ + E nextWorker(); + + +} From bc47850bbe36ec6a75ff2a08ae75b6d321cb1b82 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 6 Mar 2012 19:26:32 +0100 Subject: [PATCH 02/11] Allow to obtain the Worker that was used to serve the IO of a Channel --- .../netty/channel/socket/nio/AbstractNioChannel.java | 10 ++++++++++ .../netty/channel/socket/nio/NioDatagramChannel.java | 5 +++++ .../io/netty/channel/socket/nio/NioSocketChannel.java | 5 +++++ 3 files changed, 20 insertions(+) diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java index 27fe0a8e44..df5334f39c 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java @@ -115,6 +115,16 @@ abstract class AbstractNioChannel } + @Override + public NioDatagramWorker getWorker() { + return (NioDatagramWorker) super.getWorker(); + } + @Override public boolean isBound() { return isOpen() && channel.socket().isBound(); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index 37f4e669ff..5d36fdc65b 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -43,6 +43,11 @@ class NioSocketChannel extends AbstractNioChannel config = new DefaultNioSocketChannelConfig(socket.socket()); } + @Override + public NioWorker getWorker() { + return (NioWorker) super.getWorker(); + } + @Override public NioSocketChannelConfig getConfig() { return config; From caff7c941b1db5ef1d143fce78b71da603dbf6a1 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 6 Mar 2012 19:31:47 +0100 Subject: [PATCH 03/11] Checkstyle fixes --- .../socket/nio/AbstractWorkerPool.java | 83 ------------------- .../socket/nio/NioDatagramWorkerPool.java | 2 +- .../channel/socket/nio/NioWorkerPool.java | 2 +- .../socket/nio/ShareableWorkerPool.java | 2 +- 4 files changed, 3 insertions(+), 86 deletions(-) delete mode 100644 transport/src/main/java/io/netty/channel/socket/nio/AbstractWorkerPool.java diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractWorkerPool.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractWorkerPool.java deleted file mode 100644 index 39b5556703..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractWorkerPool.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package io.netty.channel.socket.nio; - -import io.netty.channel.Channel; -import io.netty.channel.socket.Worker; -import io.netty.util.ExternalResourceReleasable; -import io.netty.util.internal.ExecutorUtil; - -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Abstract base class for {@link WorkerPool} implementations that create the {@link Worker}'s up-front and return them in a "fair" fashion when calling - * {@link #nextWorker()} - * - */ -public abstract class AbstractWorkerPool implements WorkerPool , ExternalResourceReleasable{ - - private final AbstractNioWorker[] workers; - private final AtomicInteger workerIndex = new AtomicInteger(); - private final Executor workerExecutor; - - /** - * Create a new instance - * - * @param workerExecutor the {@link Executor} to use for the {@link Worker}'s - * @param allowShutdownOnIdle allow the {@link Worker}'s to shutdown when there is not {@link Channel} is registered with it - * @param workerCount the count of {@link Worker}'s to create - */ - AbstractWorkerPool(Executor workerExecutor, int workerCount, boolean allowShutDownOnIdle) { - if (workerExecutor == null) { - throw new NullPointerException("workerExecutor"); - } - if (workerCount <= 0) { - throw new IllegalArgumentException( - "workerCount (" + workerCount + ") " + - "must be a positive integer."); - } - workers = new AbstractNioWorker[workerCount]; - - for (int i = 0; i < workers.length; i++) { - workers[i] = createWorker(workerExecutor, allowShutDownOnIdle); - } - this.workerExecutor = workerExecutor; - - } - - /** - * Create a new {@link Worker} which uses the given {@link Executor} to service IO - * - * - * @param executor the {@link Executor} to use - * @param allowShutdownOnIdle allow the {@link Worker} to shutdown when there is not {@link Channel} is registered with it - * @return worker the new {@link Worker} - */ - protected abstract E createWorker(Executor executor, boolean allowShutdownOnIdle); - - @SuppressWarnings("unchecked") - public E nextWorker() { - return (E) workers[Math.abs(workerIndex.getAndIncrement() % workers.length)]; - } - - @Override - public void releaseExternalResources() { - ExecutorUtil.terminate(workerExecutor); - } - -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorkerPool.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorkerPool.java index 938c898b77..16f881fc63 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorkerPool.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorkerPool.java @@ -23,7 +23,7 @@ import java.util.concurrent.Executor; * * */ -public class NioDatagramWorkerPool extends AbstractWorkerPool{ +public class NioDatagramWorkerPool extends AbstractNioWorkerPool { protected NioDatagramWorkerPool(Executor executor, int workerCount, boolean allowShutdownOnIdle) { super(executor, workerCount, allowShutdownOnIdle); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioWorkerPool.java b/transport/src/main/java/io/netty/channel/socket/nio/NioWorkerPool.java index cf124080ea..4d474799c7 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioWorkerPool.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioWorkerPool.java @@ -23,7 +23,7 @@ import java.util.concurrent.Executor; * * */ -public class NioWorkerPool extends AbstractWorkerPool{ +public class NioWorkerPool extends AbstractNioWorkerPool { NioWorkerPool(Executor executor, int workerCount, boolean allowShutdownOnIdle) { super(executor, workerCount, allowShutdownOnIdle); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/ShareableWorkerPool.java b/transport/src/main/java/io/netty/channel/socket/nio/ShareableWorkerPool.java index 2c856cd562..c3fb95b89e 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/ShareableWorkerPool.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/ShareableWorkerPool.java @@ -24,7 +24,7 @@ import io.netty.util.ExternalResourceReleasable; * * */ -public final class ShareableWorkerPool implements WorkerPool{ +public final class ShareableWorkerPool implements WorkerPool { private final WorkerPool wrapped; From fd7e165fb6e10914f57915de18bec75a5f1d69ff Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 6 Mar 2012 20:06:29 +0100 Subject: [PATCH 04/11] Commit missing file --- .../socket/nio/AbstractNioWorkerPool.java | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorkerPool.java diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorkerPool.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorkerPool.java new file mode 100644 index 0000000000..45a181f543 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorkerPool.java @@ -0,0 +1,83 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.channel.socket.nio; + +import io.netty.channel.Channel; +import io.netty.channel.socket.Worker; +import io.netty.util.ExternalResourceReleasable; +import io.netty.util.internal.ExecutorUtil; + +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Abstract base class for {@link WorkerPool} implementations that create the {@link Worker}'s up-front and return them in a "fair" fashion when calling + * {@link #nextWorker()} + * + */ +public abstract class AbstractNioWorkerPool implements WorkerPool , ExternalResourceReleasable { + + private final AbstractNioWorker[] workers; + private final AtomicInteger workerIndex = new AtomicInteger(); + private final Executor workerExecutor; + + /** + * Create a new instance + * + * @param workerExecutor the {@link Executor} to use for the {@link Worker}'s + * @param allowShutdownOnIdle allow the {@link Worker}'s to shutdown when there is not {@link Channel} is registered with it + * @param workerCount the count of {@link Worker}'s to create + */ + AbstractNioWorkerPool(Executor workerExecutor, int workerCount, boolean allowShutDownOnIdle) { + if (workerExecutor == null) { + throw new NullPointerException("workerExecutor"); + } + if (workerCount <= 0) { + throw new IllegalArgumentException( + "workerCount (" + workerCount + ") " + + "must be a positive integer."); + } + workers = new AbstractNioWorker[workerCount]; + + for (int i = 0; i < workers.length; i++) { + workers[i] = createWorker(workerExecutor, allowShutDownOnIdle); + } + this.workerExecutor = workerExecutor; + + } + + /** + * Create a new {@link Worker} which uses the given {@link Executor} to service IO + * + * + * @param executor the {@link Executor} to use + * @param allowShutdownOnIdle allow the {@link Worker} to shutdown when there is not {@link Channel} is registered with it + * @return worker the new {@link Worker} + */ + protected abstract E createWorker(Executor executor, boolean allowShutdownOnIdle); + + @SuppressWarnings("unchecked") + public E nextWorker() { + return (E) workers[Math.abs(workerIndex.getAndIncrement() % workers.length)]; + } + + @Override + public void releaseExternalResources() { + ExecutorUtil.terminate(workerExecutor); + } + +} From 0b0edea6bcc2d6978b035071acd50e2f7e91bdad Mon Sep 17 00:00:00 2001 From: norman Date: Wed, 7 Mar 2012 13:29:55 +0100 Subject: [PATCH 05/11] Make classes public --- .../java/io/netty/channel/socket/nio/NioDatagramChannel.java | 2 +- .../java/io/netty/channel/socket/nio/NioDatagramWorker.java | 1 + .../java/io/netty/channel/socket/nio/NioSocketChannel.java | 2 +- .../src/main/java/io/netty/channel/socket/nio/NioWorker.java | 4 ++-- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java index f620154176..1bc3cc8119 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java @@ -31,7 +31,7 @@ import java.nio.channels.DatagramChannel; /** * Provides an NIO based {@link io.netty.channel.socket.DatagramChannel}. */ -final class NioDatagramChannel extends AbstractNioChannel +public final class NioDatagramChannel extends AbstractNioChannel implements io.netty.channel.socket.DatagramChannel { /** diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java index e0c35f8072..31f9c44f84 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java @@ -53,6 +53,7 @@ public class NioDatagramWorker extends AbstractNioWorker { NioDatagramWorker(final Executor executor, boolean allowShutdownOnIdle) { super(executor, allowShutdownOnIdle); } + @Override protected boolean read(final SelectionKey key) { final NioDatagramChannel channel = (NioDatagramChannel) key.attachment(); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index 5d36fdc65b..9cce753d74 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -23,7 +23,7 @@ import io.netty.channel.ChannelSink; import java.net.InetSocketAddress; import java.nio.channels.SocketChannel; -class NioSocketChannel extends AbstractNioChannel +public class NioSocketChannel extends AbstractNioChannel implements io.netty.channel.socket.SocketChannel { private static final int ST_OPEN = 0; diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java index 8909774ab6..b68043df7e 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java @@ -39,11 +39,11 @@ public class NioWorker extends AbstractNioWorker { private final SocketReceiveBufferPool recvBufferPool = new SocketReceiveBufferPool(); - NioWorker(Executor executor) { + public NioWorker(Executor executor) { super(executor); } - NioWorker(Executor executor, boolean allowShutdownOnIdle) { + public NioWorker(Executor executor, boolean allowShutdownOnIdle) { super(executor, allowShutdownOnIdle); } From e8c64ea593b4b69a55e8d5486b3109dc071118bc Mon Sep 17 00:00:00 2001 From: norman Date: Wed, 7 Mar 2012 13:30:45 +0100 Subject: [PATCH 06/11] Make classes public --- .../java/io/netty/channel/socket/nio/NioDatagramWorkerPool.java | 2 +- .../main/java/io/netty/channel/socket/nio/NioWorkerPool.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorkerPool.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorkerPool.java index 16f881fc63..5f410000b2 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorkerPool.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorkerPool.java @@ -25,7 +25,7 @@ import java.util.concurrent.Executor; */ public class NioDatagramWorkerPool extends AbstractNioWorkerPool { - protected NioDatagramWorkerPool(Executor executor, int workerCount, boolean allowShutdownOnIdle) { + public NioDatagramWorkerPool(Executor executor, int workerCount, boolean allowShutdownOnIdle) { super(executor, workerCount, allowShutdownOnIdle); } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioWorkerPool.java b/transport/src/main/java/io/netty/channel/socket/nio/NioWorkerPool.java index 4d474799c7..a8eb002234 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioWorkerPool.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioWorkerPool.java @@ -25,7 +25,7 @@ import java.util.concurrent.Executor; */ public class NioWorkerPool extends AbstractNioWorkerPool { - NioWorkerPool(Executor executor, int workerCount, boolean allowShutdownOnIdle) { + public NioWorkerPool(Executor executor, int workerCount, boolean allowShutdownOnIdle) { super(executor, workerCount, allowShutdownOnIdle); } From 6375b84c9db0e5f736e22a5d085f84c0dc4d8c11 Mon Sep 17 00:00:00 2001 From: norman Date: Wed, 7 Mar 2012 14:13:48 +0100 Subject: [PATCH 07/11] Change Worker.executeInIoThread() to not need a Channel as paramater --- .../channel/sctp/AbstractSctpChannelSink.java | 5 +- .../io/netty/channel/sctp/SctpWorker.java | 60 +++++++++---------- .../java/io/netty/channel/socket/Worker.java | 5 +- .../socket/nio/AbstractNioChannelSink.java | 6 +- .../channel/socket/nio/AbstractNioWorker.java | 20 ++----- .../socket/oio/AbstractOioChannelSink.java | 5 +- .../channel/socket/oio/AbstractOioWorker.java | 34 +++++------ 7 files changed, 61 insertions(+), 74 deletions(-) diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractSctpChannelSink.java b/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractSctpChannelSink.java index f01321c2ce..35403acb18 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractSctpChannelSink.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractSctpChannelSink.java @@ -20,6 +20,7 @@ import io.netty.channel.AbstractChannelSink; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.ChannelRunnableWrapper; public abstract class AbstractSctpChannelSink extends AbstractChannelSink { @@ -28,7 +29,9 @@ public abstract class AbstractSctpChannelSink extends AbstractChannelSink { Channel ch = pipeline.getChannel(); if (ch instanceof SctpChannelImpl) { SctpChannelImpl channel = (SctpChannelImpl) ch; - return channel.worker.executeInIoThread(channel, task); + ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(channel, task); + channel.worker.executeInIoThread(task); + return wrapper; } else { return super.execute(pipeline, task); diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java index 1d0127d047..602b8c8105 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java @@ -15,7 +15,29 @@ */ package io.netty.channel.sctp; -import static io.netty.channel.Channels.*; +import static io.netty.channel.Channels.fireChannelBound; +import static io.netty.channel.Channels.fireChannelClosed; +import static io.netty.channel.Channels.fireChannelConnected; +import static io.netty.channel.Channels.fireChannelDisconnected; +import static io.netty.channel.Channels.fireChannelInterestChanged; +import static io.netty.channel.Channels.fireChannelUnbound; +import static io.netty.channel.Channels.fireExceptionCaught; +import static io.netty.channel.Channels.fireMessageReceived; +import static io.netty.channel.Channels.fireWriteComplete; +import static io.netty.channel.Channels.succeededFuture; +import io.netty.buffer.ChannelBuffer; +import io.netty.buffer.ChannelBufferFactory; +import io.netty.channel.Channel; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelFuture; +import io.netty.channel.MessageEvent; +import io.netty.channel.ReceiveBufferSizePredictor; +import io.netty.channel.sctp.SctpSendBufferPool.SendBuffer; +import io.netty.channel.socket.Worker; +import io.netty.logging.InternalLogger; +import io.netty.logging.InternalLoggerFactory; +import io.netty.util.internal.DeadLockProofWorker; +import io.netty.util.internal.QueueFactory; import java.io.IOException; import java.net.SocketAddress; @@ -31,28 +53,12 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import com.sun.nio.sctp.MessageInfo; -import io.netty.buffer.ChannelBuffer; -import io.netty.buffer.ChannelBufferFactory; -import io.netty.channel.Channel; -import io.netty.channel.ChannelException; -import io.netty.channel.ChannelFuture; -import io.netty.channel.MessageEvent; -import io.netty.channel.ReceiveBufferSizePredictor; -import io.netty.channel.sctp.SctpSendBufferPool.SendBuffer; -import io.netty.channel.socket.ChannelRunnableWrapper; -import io.netty.channel.socket.Worker; -import io.netty.logging.InternalLogger; -import io.netty.logging.InternalLoggerFactory; -import io.netty.util.internal.DeadLockProofWorker; -import io.netty.util.internal.QueueFactory; - /** */ @SuppressWarnings("unchecked") @@ -248,25 +254,17 @@ class SctpWorker implements Worker { } @Override - public ChannelFuture executeInIoThread(Channel channel, Runnable task) { - if (channel instanceof SctpChannelImpl && isIoThread((SctpChannelImpl) channel)) { - try { - task.run(); - return succeededFuture(channel); - } catch (Throwable t) { - return failedFuture(channel, t); - } + public void executeInIoThread(Runnable task) { + if (Thread.currentThread() == thread) { + task.run(); } else { - ChannelRunnableWrapper channelRunnable = new ChannelRunnableWrapper(channel, task); - boolean added = eventQueue.offer(channelRunnable); - + boolean added = eventQueue.offer(task); + if (added) { // wake up the selector to speed things selector.wakeup(); - } else { - channelRunnable.setFailure(new RejectedExecutionException("Unable to queue task " + task)); } - return channelRunnable; + } } diff --git a/transport/src/main/java/io/netty/channel/socket/Worker.java b/transport/src/main/java/io/netty/channel/socket/Worker.java index 271897881a..64dc433038 100644 --- a/transport/src/main/java/io/netty/channel/socket/Worker.java +++ b/transport/src/main/java/io/netty/channel/socket/Worker.java @@ -16,9 +16,6 @@ package io.netty.channel.socket; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; - /** * A {@link Worker} is responsible to dispatch IO operations * @@ -30,5 +27,5 @@ public interface Worker extends Runnable { * * @param task the {@link Runnable} to execute */ - ChannelFuture executeInIoThread(Channel channel, Runnable task); + void executeInIoThread(Runnable task); } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java index 1a29f5ef19..3b389ce5a5 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java @@ -21,6 +21,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.ChannelRunnableWrapper; public abstract class AbstractNioChannelSink extends AbstractChannelSink { @@ -29,8 +30,9 @@ public abstract class AbstractNioChannelSink extends AbstractChannelSink { Channel ch = pipeline.getChannel(); if (ch instanceof AbstractNioChannel) { AbstractNioChannel channel = (AbstractNioChannel) ch; - - return channel.worker.executeInIoThread(ch, task); + ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(pipeline.getChannel(), task); + channel.worker.executeInIoThread(wrapper); + return wrapper; } return super.execute(pipeline, task); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java index 956305427f..a877b15629 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java @@ -21,7 +21,6 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.MessageEvent; -import io.netty.channel.socket.ChannelRunnableWrapper; import io.netty.channel.socket.Worker; import io.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; import io.netty.logging.InternalLogger; @@ -42,7 +41,6 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -282,28 +280,20 @@ abstract class AbstractNioWorker implements Worker { } @Override - public ChannelFuture executeInIoThread(Channel channel, Runnable task) { - if (channel instanceof AbstractNioChannel && isIoThread((AbstractNioChannel) channel)) { - try { - task.run(); - return succeededFuture(channel); - } catch (Throwable t) { - return failedFuture(channel, t); - } + public void executeInIoThread(Runnable task) { + if (Thread.currentThread() == thread) { + task.run(); } else { - ChannelRunnableWrapper channelRunnable = new ChannelRunnableWrapper(channel, task); - boolean added = eventQueue.offer(channelRunnable); + boolean added = eventQueue.offer(task); + assert added; if (added) { // wake up the selector to speed things Selector selector = this.selector; if (selector != null) { selector.wakeup(); } - } else { - channelRunnable.setFailure(new RejectedExecutionException("Unable to queue task " + task)); } - return channelRunnable; } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java index d57c198534..e20d9ea5bb 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java @@ -21,6 +21,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.ChannelRunnableWrapper; import io.netty.channel.socket.Worker; public abstract class AbstractOioChannelSink extends AbstractChannelSink { @@ -32,7 +33,9 @@ public abstract class AbstractOioChannelSink extends AbstractChannelSink { AbstractOioChannel channel = (AbstractOioChannel) ch; Worker worker = channel.worker; if (worker != null) { - return channel.worker.executeInIoThread(ch, task); + ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(pipeline.getChannel(), task); + channel.worker.executeInIoThread(wrapper); + return wrapper; } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java index 930abbce59..e0b12fef11 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java @@ -19,13 +19,11 @@ import static io.netty.channel.Channels.*; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.Channels; -import io.netty.channel.socket.ChannelRunnableWrapper; import io.netty.channel.socket.Worker; import io.netty.util.internal.QueueFactory; import java.io.IOException; import java.util.Queue; -import java.util.concurrent.RejectedExecutionException; /** * Abstract base class for Oio-Worker implementations @@ -34,10 +32,16 @@ import java.util.concurrent.RejectedExecutionException; */ abstract class AbstractOioWorker implements Worker { - private final Queue eventQueue = QueueFactory.createQueue(ChannelRunnableWrapper.class); + private final Queue eventQueue = QueueFactory.createQueue(Runnable.class); protected final C channel; + /** + * If this worker has been started thread will be a reference to the thread + * used when starting. i.e. the current thread when the run method is executed. + */ + protected volatile Thread thread; + public AbstractOioWorker(C channel) { this.channel = channel; channel.worker = this; @@ -45,7 +49,7 @@ abstract class AbstractOioWorker implements Worker @Override public void run() { - channel.workerThread = Thread.currentThread(); + thread = channel.workerThread = Thread.currentThread(); while (channel.isOpen()) { synchronized (channel.interestOpsLock) { @@ -91,31 +95,21 @@ abstract class AbstractOioWorker implements Worker } @Override - public ChannelFuture executeInIoThread(Channel channel, Runnable task) { - if (channel instanceof AbstractOioChannel && isIoThread((AbstractOioChannel) channel)) { - try { - task.run(); - return succeededFuture(channel); - } catch (Throwable t) { - return failedFuture(channel, t); - } + public void executeInIoThread(Runnable task) { + if (Thread.currentThread() == thread) { + task.run(); } else { - ChannelRunnableWrapper channelRunnable = new ChannelRunnableWrapper(channel, task); - boolean added = eventQueue.offer(channelRunnable); + boolean added = eventQueue.offer(task); if (added) { // as we set the SO_TIMEOUT to 1 second this task will get picked up in 1 second at latest - - } else { - channelRunnable.setFailure(new RejectedExecutionException("Unable to queue task " + task)); - } - return channelRunnable; + } } } private void processEventQueue() throws IOException { for (;;) { - final ChannelRunnableWrapper task = eventQueue.poll(); + final Runnable task = eventQueue.poll(); if (task == null) { break; } From 62028f0042868af9211e188d0f2fc7cc06edc08f Mon Sep 17 00:00:00 2001 From: norman Date: Wed, 7 Mar 2012 14:15:42 +0100 Subject: [PATCH 08/11] execute the wrapped Runnable --- .../java/io/netty/channel/sctp/AbstractSctpChannelSink.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractSctpChannelSink.java b/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractSctpChannelSink.java index 35403acb18..906fc35c99 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractSctpChannelSink.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractSctpChannelSink.java @@ -30,7 +30,7 @@ public abstract class AbstractSctpChannelSink extends AbstractChannelSink { if (ch instanceof SctpChannelImpl) { SctpChannelImpl channel = (SctpChannelImpl) ch; ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(channel, task); - channel.worker.executeInIoThread(task); + channel.worker.executeInIoThread(wrapper); return wrapper; } else { From e207af30a357e1d5c4e87767f053920820b64d49 Mon Sep 17 00:00:00 2001 From: norman Date: Wed, 7 Mar 2012 15:37:33 +0100 Subject: [PATCH 09/11] Make sure AbstractNioWorker gets started if needed --- .../channel/socket/nio/AbstractNioWorker.java | 64 ++++++++++--------- 1 file changed, 35 insertions(+), 29 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java index a877b15629..fcf092f9b7 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java @@ -127,13 +127,28 @@ abstract class AbstractNioWorker implements Worker { void register(AbstractNioChannel channel, ChannelFuture future) { Runnable registerTask = createRegisterTask(channel, future); - Selector selector; + Selector selector = start(); + + boolean offered = registerTaskQueue.offer(registerTask); + assert offered; + + if (wakenUp.compareAndSet(false, true)) { + selector.wakeup(); + } + } + + /** + * Start the {@link AbstractNioWorker} and return the {@link Selector} that will be used for the {@link AbstractNioChannel}'s when they get registered + * + * @return selector + */ + private Selector start() { synchronized (startStopLock) { if (!started) { // Open a selector if this worker didn't start yet. try { - this.selector = selector = Selector.open(); + this.selector = Selector.open(); } catch (Throwable t) { throw new ChannelException("Failed to create a selector.", t); } @@ -151,28 +166,19 @@ abstract class AbstractNioWorker implements Worker { } catch (Throwable t) { logger.warn("Failed to close a selector.", t); } - this.selector = selector = null; + this.selector = null; // The method will return to the caller at this point. } } - } else { - // Use the existing selector if this worker has been started. - selector = this.selector; } assert selector != null && selector.isOpen(); started = true; - boolean offered = registerTaskQueue.offer(registerTask); - assert offered; - } - - if (wakenUp.compareAndSet(false, true)) { - selector.wakeup(); } + return selector; } - @Override public void run() { thread = Thread.currentThread(); @@ -281,22 +287,22 @@ abstract class AbstractNioWorker implements Worker { @Override public void executeInIoThread(Runnable task) { - if (Thread.currentThread() == thread) { - task.run(); - } else { - boolean added = eventQueue.offer(task); - - assert added; - if (added) { - // wake up the selector to speed things - Selector selector = this.selector; - if (selector != null) { - selector.wakeup(); - } - } - } - - + start(); + if (Thread.currentThread() == thread) { + task.run(); + } else { + boolean added = eventQueue.offer(task); + + assert added; + if (added) { + // wake up the selector to speed things + Selector selector = this.selector; + if (selector != null) { + selector.wakeup(); + } + } + } + } private void processRegisterTaskQueue() throws IOException { From 875d5ce5138a6c1d9e5e20c8d893d07d30cd0124 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 7 Mar 2012 17:52:49 +0100 Subject: [PATCH 10/11] Allow to force the execution of the Runnable in a async fashion even if the IO-Thread is the current Thread --- .../io/netty/channel/socket/nio/AbstractNioWorker.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java index fcf092f9b7..ba4f028988 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java @@ -287,10 +287,14 @@ abstract class AbstractNioWorker implements Worker { @Override public void executeInIoThread(Runnable task) { - start(); - if (Thread.currentThread() == thread) { + executeInIoThread(task, false); + } + + public void executeInIoThread(Runnable task, boolean alwaysAsync) { + if (!alwaysAsync && Thread.currentThread() == thread) { task.run(); } else { + start(); boolean added = eventQueue.offer(task); assert added; From 59ff76bd6696432650bc81f4d904854fcbc139cd Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 7 Mar 2012 17:55:37 +0100 Subject: [PATCH 11/11] add javadocs --- .../io/netty/channel/socket/nio/AbstractNioWorker.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java index ba4f028988..af779c8b20 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java @@ -290,6 +290,13 @@ abstract class AbstractNioWorker implements Worker { executeInIoThread(task, false); } + /** + * Execute the {@link Runnable} in a IO-Thread + * + * @param task the {@link Runnable} to execute + * @param alwaysAsync true if the {@link Runnable} should be executed in an async + * fashion even if the current Thread == IO Thread + */ public void executeInIoThread(Runnable task, boolean alwaysAsync) { if (!alwaysAsync && Thread.currentThread() == thread) { task.run();