diff --git a/transport/src/main/java/io/netty/channel/oio/OioEventLoop.java b/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoop.java similarity index 55% rename from transport/src/main/java/io/netty/channel/oio/OioEventLoop.java rename to transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoop.java index 59634274dc..e810fcc0d0 100644 --- a/transport/src/main/java/io/netty/channel/oio/OioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoop.java @@ -13,26 +13,19 @@ * License for the specific language governing permissions and limitations * under the License. */ -package io.netty.channel.oio; - -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelPromise; -import io.netty.channel.SingleThreadEventLoop; - +package io.netty.channel; /** * {@link SingleThreadEventLoop} which is used to handle OIO {@link Channel}'s. So in general there will be - * one {@link OioEventLoop} per {@link Channel}. + * one {@link ThreadPerChannelEventLoop} per {@link Channel}. * */ -class OioEventLoop extends SingleThreadEventLoop { +public class ThreadPerChannelEventLoop extends SingleThreadEventLoop { - private final OioEventLoopGroup parent; - private AbstractOioChannel ch; + private final ThreadPerChannelEventLoopGroup parent; + private Channel ch; - OioEventLoop(OioEventLoopGroup parent) { + public ThreadPerChannelEventLoop(ThreadPerChannelEventLoopGroup parent) { super(parent, parent.threadFactory, parent.scheduler); this.parent = parent; } @@ -41,9 +34,10 @@ class OioEventLoop extends SingleThreadEventLoop { public ChannelFuture register(Channel channel, ChannelPromise promise) { return super.register(channel, promise).addListener(new ChannelFutureListener() { @Override + @SuppressWarnings("unchecked") public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { - ch = (AbstractOioChannel) future.channel(); + ch = future.channel(); } else { deregister(); } @@ -54,38 +48,15 @@ class OioEventLoop extends SingleThreadEventLoop { @Override protected void run() { for (;;) { - AbstractOioChannel ch = this.ch; - if (ch == null || !ch.isActive()) { - Runnable task; - try { - task = takeTask(); - task.run(); - } catch (InterruptedException e) { - // Waken up by interruptThread() - } - } else { - long startTime = System.nanoTime(); - for (;;) { - final Runnable task = pollTask(); - if (task == null) { - break; - } - - task.run(); - - // Ensure running tasks doesn't take too much time. - if (System.nanoTime() - startTime > AbstractOioChannel.SO_TIMEOUT * 1000000L) { - break; - } - } - - // Handle deregistration - if (!ch.isRegistered()) { - runAllTasks(); - deregister(); - } + Runnable task; + try { + task = takeTask(); + task.run(); + } catch (InterruptedException e) { + // Waken up by interruptThread() } + Channel ch = this.ch; if (isShutdown()) { if (ch != null) { ch.unsafe().close(ch.unsafe().voidFuture()); @@ -93,6 +64,14 @@ class OioEventLoop extends SingleThreadEventLoop { if (confirmShutdown()) { break; } + } else { + if (ch != null) { + // Handle deregistration + if (!ch.isRegistered()) { + runAllTasks(); + deregister(); + } + } } } } diff --git a/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoopGroup.java b/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoopGroup.java new file mode 100644 index 0000000000..4bdc246b93 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoopGroup.java @@ -0,0 +1,237 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel; + + +import io.netty.util.concurrent.TaskScheduler; +import io.netty.util.internal.PlatformDependent; + +import java.util.Collections; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +/** + * An {@link EventLoopGroup} that creates one {@link EventLoop} per {@link Channel}. + */ +public class ThreadPerChannelEventLoopGroup implements EventLoopGroup { + + private static final Object[] NO_ARGS = new Object[0]; + private static final StackTraceElement[] STACK_ELEMENTS = new StackTraceElement[0]; + + private final Object[] childArgs; + private final int maxChannels; + final TaskScheduler scheduler; + final ThreadFactory threadFactory; + final Set activeChildren = + Collections.newSetFromMap(PlatformDependent.newConcurrentHashMap()); + final Queue idleChildren = new ConcurrentLinkedQueue(); + private final ChannelException tooManyChannels; + + /** + * Create a new {@link ThreadPerChannelEventLoopGroup} with no limit in place. + */ + protected ThreadPerChannelEventLoopGroup() { + this(0); + } + + /** + * Create a new {@link ThreadPerChannelEventLoopGroup}. + * + * @param maxChannels the maximum number of channels to handle with this instance. Once you try to register + * a new {@link Channel} and the maximum is exceed it will throw an + * {@link ChannelException} on the {@link #register(Channel)} and + * {@link #register(Channel, ChannelPromise)} method. + * Use {@code 0} to use no limit + */ + protected ThreadPerChannelEventLoopGroup(int maxChannels) { + this(maxChannels, Executors.defaultThreadFactory()); + } + + /** + * Create a new {@link ThreadPerChannelEventLoopGroup}. + * + * @param maxChannels the maximum number of channels to handle with this instance. Once you try to register + * a new {@link Channel} and the maximum is exceed it will throw an + * {@link ChannelException} on the {@link #register(Channel)} and + * {@link #register(Channel, ChannelPromise)} method. + * Use {@code 0} to use no limit + * @param threadFactory the {@link ThreadFactory} used to create new {@link Thread} instances that handle the + * registered {@link Channel}s + * @param args arguments which will passed to each {@link #newChild(Object...)} call. + */ + protected ThreadPerChannelEventLoopGroup(int maxChannels, ThreadFactory threadFactory, Object... args) { + if (maxChannels < 0) { + throw new IllegalArgumentException(String.format( + "maxChannels: %d (expected: >= 0)", maxChannels)); + } + if (threadFactory == null) { + throw new NullPointerException("threadFactory"); + } + + if (args == null) { + childArgs = NO_ARGS; + } else { + childArgs = args.clone(); + } + + this.maxChannels = maxChannels; + this.threadFactory = threadFactory; + + scheduler = new TaskScheduler(threadFactory); + + tooManyChannels = new ChannelException("too many channels (max: " + maxChannels + ')'); + tooManyChannels.setStackTrace(STACK_ELEMENTS); + } + + /** + * Creates a new {@link EventLoop}. The default implementation creates a new {@link ThreadPerChannelEventLoop}. + */ + protected ThreadPerChannelEventLoop newChild( + @SuppressWarnings("UnusedParameters") Object... args) throws Exception { + return new ThreadPerChannelEventLoop(this); + } + + @Override + public EventLoop next() { + throw new UnsupportedOperationException(); + } + + @Override + public void shutdown() { + scheduler.shutdown(); + for (EventLoop l: activeChildren) { + l.shutdown(); + } + for (EventLoop l: idleChildren) { + l.shutdown(); + } + } + + @Override + public boolean isShutdown() { + if (!scheduler.isShutdown()) { + return false; + } + for (EventLoop l: activeChildren) { + if (!l.isShutdown()) { + return false; + } + } + for (EventLoop l: idleChildren) { + if (!l.isShutdown()) { + return false; + } + } + return true; + } + + @Override + public boolean isTerminated() { + if (!scheduler.isTerminated()) { + return false; + } + for (EventLoop l: activeChildren) { + if (!l.isTerminated()) { + return false; + } + } + for (EventLoop l: idleChildren) { + if (!l.isTerminated()) { + return false; + } + } + return true; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) + throws InterruptedException { + long deadline = System.nanoTime() + unit.toNanos(timeout); + for (;;) { + long timeLeft = deadline - System.nanoTime(); + if (timeLeft <= 0) { + return isTerminated(); + } + if (scheduler.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) { + break; + } + } + for (EventLoop l: activeChildren) { + for (;;) { + long timeLeft = deadline - System.nanoTime(); + if (timeLeft <= 0) { + return isTerminated(); + } + if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) { + break; + } + } + } + for (EventLoop l: idleChildren) { + for (;;) { + long timeLeft = deadline - System.nanoTime(); + if (timeLeft <= 0) { + return isTerminated(); + } + if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) { + break; + } + } + } + return isTerminated(); + } + + @Override + public ChannelFuture register(Channel channel) { + if (channel == null) { + throw new NullPointerException("channel"); + } + try { + return nextChild().register(channel); + } catch (Throwable t) { + return channel.newFailedFuture(t); + } + } + + @Override + public ChannelFuture register(Channel channel, ChannelPromise promise) { + if (channel == null) { + throw new NullPointerException("channel"); + } + try { + return nextChild().register(channel, promise); + } catch (Throwable t) { + promise.setFailure(t); + return promise; + } + } + + private EventLoop nextChild() throws Exception { + ThreadPerChannelEventLoop loop = idleChildren.poll(); + if (loop == null) { + if (maxChannels > 0 && activeChildren.size() >= maxChannels) { + throw tooManyChannels; + } + loop = newChild(childArgs); + } + activeChildren.add(loop); + return loop; + } +} diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java index 8a3abd7a71..9a45d3523d 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java @@ -19,6 +19,7 @@ import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoop; +import io.netty.channel.ThreadPerChannelEventLoop; import java.net.ConnectException; import java.net.SocketAddress; @@ -91,7 +92,7 @@ public abstract class AbstractOioChannel extends AbstractChannel { @Override protected boolean isCompatible(EventLoop loop) { - return loop instanceof OioEventLoop; + return loop instanceof ThreadPerChannelEventLoop; } @Override diff --git a/transport/src/main/java/io/netty/channel/oio/OioEventLoopGroup.java b/transport/src/main/java/io/netty/channel/oio/OioEventLoopGroup.java index 22a95fa250..cc0a9f3203 100644 --- a/transport/src/main/java/io/netty/channel/oio/OioEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/oio/OioEventLoopGroup.java @@ -18,35 +18,19 @@ package io.netty.channel.oio; import io.netty.channel.Channel; import io.netty.channel.ChannelException; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPromise; -import io.netty.util.concurrent.TaskScheduler; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; -import io.netty.util.internal.PlatformDependent; +import io.netty.channel.ThreadPerChannelEventLoopGroup; -import java.util.Collections; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; /** * {@link EventLoopGroup} which is used to handle OIO {@link Channel}'s. Each {@link Channel} will be handled by its * own {@link EventLoop} to not block others. */ -public class OioEventLoopGroup implements EventLoopGroup { - - private static final StackTraceElement[] STACK_ELEMENTS = new StackTraceElement[0]; - private final int maxChannels; - final TaskScheduler scheduler; - final ThreadFactory threadFactory; - final Set activeChildren = Collections.newSetFromMap( - PlatformDependent.newConcurrentHashMap()); - final Queue idleChildren = new ConcurrentLinkedQueue(); - private final ChannelException tooManyChannels; +public class OioEventLoopGroup extends ThreadPerChannelEventLoopGroup { /** * Create a new {@link OioEventLoopGroup} with no limit in place. @@ -80,148 +64,6 @@ public class OioEventLoopGroup implements EventLoopGroup { * registered {@link Channel}s */ public OioEventLoopGroup(int maxChannels, ThreadFactory threadFactory) { - if (maxChannels < 0) { - throw new IllegalArgumentException(String.format( - "maxChannels: %d (expected: >= 0)", maxChannels)); - } - if (threadFactory == null) { - throw new NullPointerException("threadFactory"); - } - - this.maxChannels = maxChannels; - this.threadFactory = threadFactory; - - scheduler = new TaskScheduler(threadFactory); - - tooManyChannels = new ChannelException("too many channels (max: " + maxChannels + ')'); - tooManyChannels.setStackTrace(STACK_ELEMENTS); - } - - @Override - public EventLoop next() { - throw new UnsupportedOperationException(); - } - - @Override - public void shutdown() { - scheduler.shutdown(); - for (EventLoop l: activeChildren) { - l.shutdown(); - } - for (EventLoop l: idleChildren) { - l.shutdown(); - } - } - - @Override - public boolean isShutdown() { - if (!scheduler.isShutdown()) { - return false; - } - for (EventLoop l: activeChildren) { - if (!l.isShutdown()) { - return false; - } - } - for (EventLoop l: idleChildren) { - if (!l.isShutdown()) { - return false; - } - } - return true; - } - - @Override - public boolean isTerminated() { - if (!scheduler.isTerminated()) { - return false; - } - for (EventLoop l: activeChildren) { - if (!l.isTerminated()) { - return false; - } - } - for (EventLoop l: idleChildren) { - if (!l.isTerminated()) { - return false; - } - } - return true; - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) - throws InterruptedException { - long deadline = System.nanoTime() + unit.toNanos(timeout); - for (;;) { - long timeLeft = deadline - System.nanoTime(); - if (timeLeft <= 0) { - return isTerminated(); - } - if (scheduler.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) { - break; - } - } - for (EventLoop l: activeChildren) { - for (;;) { - long timeLeft = deadline - System.nanoTime(); - if (timeLeft <= 0) { - return isTerminated(); - } - if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) { - break; - } - } - } - for (EventLoop l: idleChildren) { - for (;;) { - long timeLeft = deadline - System.nanoTime(); - if (timeLeft <= 0) { - return isTerminated(); - } - if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) { - break; - } - } - } - return isTerminated(); - } - - @Override - public ChannelFuture register(Channel channel) { - if (channel == null) { - throw new NullPointerException("channel"); - } - try { - return nextChild().register(channel); - } catch (Throwable t) { - return channel.newFailedFuture(t); - } - } - - @Override - public ChannelFuture register(Channel channel, ChannelPromise promise) { - if (channel == null) { - throw new NullPointerException("channel"); - } - try { - return nextChild().register(channel, promise); - } catch (Throwable t) { - promise.setFailure(t); - - return promise; - } - } - - private EventLoop nextChild() { - OioEventLoop loop = idleChildren.poll(); - if (loop == null) { - if (maxChannels > 0 && activeChildren.size() >= maxChannels) { - throw tooManyChannels; - } - loop = new OioEventLoop(this); - } - activeChildren.add(loop); - return loop; + super(maxChannels, threadFactory); } }