diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java index 9fa11f9c46..0a7d654ef6 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java @@ -15,10 +15,12 @@ */ package io.netty.util.concurrent; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; +import java.util.Set; import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.Callable; import java.util.concurrent.RunnableFuture; @@ -33,6 +35,7 @@ public abstract class AbstractEventExecutor extends AbstractExecutorService impl static final long DEFAULT_SHUTDOWN_TIMEOUT = 15; private final EventExecutorGroup parent; + private final Collection selfCollection = Collections.singleton(this); protected AbstractEventExecutor() { this(null); @@ -62,6 +65,11 @@ public abstract class AbstractEventExecutor extends AbstractExecutorService impl return new EventExecutorIterator(); } + @Override + public Set children() { + return (Set) selfCollection; + } + @Override public Future shutdownGracefully() { return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS); diff --git a/common/src/main/java/io/netty/util/concurrent/EventExecutor.java b/common/src/main/java/io/netty/util/concurrent/EventExecutor.java index 5f5c3729f1..f5a4341689 100644 --- a/common/src/main/java/io/netty/util/concurrent/EventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/EventExecutor.java @@ -15,6 +15,8 @@ */ package io.netty.util.concurrent; +import java.util.Set; + /** * The {@link EventExecutor} is a special {@link EventExecutorGroup} which comes * with some handy methods to see if a {@link Thread} is executed in a event loop. @@ -30,6 +32,12 @@ public interface EventExecutor extends EventExecutorGroup { @Override EventExecutor next(); + /** + * Returns an unmodifiable singleton set which contains itself. + */ + @Override + Set children(); + /** * Return the {@link EventExecutorGroup} which is the parent of this {@link EventExecutor}, */ diff --git a/common/src/main/java/io/netty/util/concurrent/EventExecutorGroup.java b/common/src/main/java/io/netty/util/concurrent/EventExecutorGroup.java index 30bd90d883..5ee3e84060 100644 --- a/common/src/main/java/io/netty/util/concurrent/EventExecutorGroup.java +++ b/common/src/main/java/io/netty/util/concurrent/EventExecutorGroup.java @@ -17,6 +17,7 @@ package io.netty.util.concurrent; import java.util.Iterator; import java.util.List; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -30,8 +31,8 @@ import java.util.concurrent.TimeUnit; public interface EventExecutorGroup extends ScheduledExecutorService, Iterable { /** - * Returns {@code true} if and only if this executor was started to be - * {@linkplain #shutdownGracefully() shut down gracefuclly} or was {@linkplain #isShutdown() shut down}. + * Returns {@code true} if and only if all {@link EventExecutor}s managed by this {@link EventExecutorGroup} + * are being {@linkplain #shutdownGracefully() shut down gracefuclly} or was {@linkplain #isShutdown() shut down}. */ boolean isShuttingDown(); @@ -59,7 +60,8 @@ public interface EventExecutorGroup extends ScheduledExecutorService, Iterable shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit); /** - * Returns the {@link Future} which is notified when this executor has been terminated. + * Returns the {@link Future} which is notified when all {@link EventExecutor}s managed by this + * {@link EventExecutorGroup} have been terminated. */ Future terminationFuture(); @@ -78,17 +80,22 @@ public interface EventExecutorGroup extends ScheduledExecutorService, Iterable shutdownNow(); /** - * Returns one of the {@link EventExecutor}s that belong to this group. + * Returns one of the {@link EventExecutor}s managed by this {@link EventExecutorGroup}. */ EventExecutor next(); /** - * Returns a read-only {@link Iterator} over all {@link EventExecutor}, which are handled by this - * {@link EventExecutorGroup} at the time of invoke this method. + * @deprecated Use {@link #children()} instead. */ @Override + @Deprecated Iterator iterator(); + /** + * Returns the unmodifiable set of {@link EventExecutor}s managed by this {@link EventExecutorGroup}. + */ + Set children(); + @Override Future submit(Runnable task); diff --git a/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java b/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java index 16264d38f7..5782b6fe28 100644 --- a/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java +++ b/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java @@ -17,7 +17,7 @@ package io.netty.util.concurrent; import java.util.Collections; import java.util.Iterator; -import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ThreadFactory; @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger; public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup { private final EventExecutor[] children; + private final Set readonlyChildren; private final AtomicInteger childIndex = new AtomicInteger(); private final AtomicInteger terminatedChildren = new AtomicInteger(); private final Promise terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE); @@ -63,7 +64,7 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } - children = new SingleThreadEventExecutor[nThreads]; + children = new EventExecutor[nThreads]; if (isPowerOfTwo(children.length)) { chooser = new PowerOfTwoEventExecutorChooser(); } else { @@ -91,6 +92,7 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { + // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } @@ -111,6 +113,10 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } + + Set childrenSet = new LinkedHashSet(children.length); + Collections.addAll(childrenSet, children); + readonlyChildren = Collections.unmodifiableSet(childrenSet); } protected ThreadFactory newDefaultThreadFactory() { @@ -135,13 +141,10 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto return children.length; } - /** - * Return a safe-copy of all of the children of this group. - */ - protected Set children() { - Set children = Collections.newSetFromMap(new LinkedHashMap()); - Collections.addAll(children, this.children); - return children; + @Override + @SuppressWarnings("unchecked") + public final Set children() { + return (Set) readonlyChildren; } /** diff --git a/transport/src/main/java/io/netty/channel/AbstractEventLoopGroup.java b/transport/src/main/java/io/netty/channel/AbstractEventLoopGroup.java new file mode 100644 index 0000000000..f8889b991c --- /dev/null +++ b/transport/src/main/java/io/netty/channel/AbstractEventLoopGroup.java @@ -0,0 +1,27 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.channel; + +import io.netty.util.concurrent.AbstractEventExecutorGroup; + +/** + * Skeletal implementation of {@link EventLoopGroup}. + */ +public abstract class AbstractEventLoopGroup extends AbstractEventExecutorGroup implements EventLoopGroup { + @Override + public abstract EventLoop next(); +} diff --git a/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoopGroup.java b/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoopGroup.java index 39d494fd90..700625506c 100644 --- a/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoopGroup.java @@ -47,9 +47,10 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i private final Object[] childArgs; private final int maxChannels; final Executor executor; - final Set activeChildren = - Collections.newSetFromMap(PlatformDependent.newConcurrentHashMap()); - final Queue idleChildren = new ConcurrentLinkedQueue(); + final Set activeChildren = + Collections.newSetFromMap(PlatformDependent.newConcurrentHashMap()); + private final Set readOnlyActiveChildren = Collections.unmodifiableSet(activeChildren); + final Queue idleChildren = new ConcurrentLinkedQueue(); private final ChannelException tooManyChannels; private volatile boolean shuttingDown; @@ -76,7 +77,7 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i * * @param maxChannels the maximum number of channels to handle with this instance. Once you try to register * a new {@link Channel} and the maximum is exceed it will throw an - * {@link ChannelException} on the {@link #register(Channel)} and + * {@link ChannelException}. on the {@link #register(Channel)} and * {@link #register(Channel, ChannelPromise)} method. * Use {@code 0} to use no limit */ @@ -137,8 +138,7 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i /** * Creates a new {@link EventLoop}. The default implementation creates a new {@link ThreadPerChannelEventLoop}. */ - protected ThreadPerChannelEventLoop newChild( - @SuppressWarnings("UnusedParameters") Object... args) throws Exception { + protected EventLoop newChild(@SuppressWarnings("UnusedParameters") Object... args) throws Exception { return new ThreadPerChannelEventLoop(this); } @@ -147,6 +147,12 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i return new ReadOnlyIterator(activeChildren.iterator()); } + @Override + @SuppressWarnings("unchecked") + public Set children() { + return (Set) readOnlyActiveChildren; + } + @Override public EventLoop next() { throw new UnsupportedOperationException(); @@ -299,7 +305,7 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i throw new RejectedExecutionException("shutting down"); } - ThreadPerChannelEventLoop loop = idleChildren.poll(); + EventLoop loop = idleChildren.poll(); if (loop == null) { if (maxChannels > 0 && activeChildren.size() >= maxChannels) { throw tooManyChannels;