Add EventExecutor.children() in favor of iterator()

Motivation:

EventExecutor.iterator() is fixed to return Iterator<EventExecutor> and there's no way to change that as long as we don't extend Iterable.  However, a user should have a way to cast the returned set of executors painlessly.  Currently, it is only possible with an explicit cast like (Iterator<NioEventLoop>).

Modifications:

Instead, I added a new method called 'children()' which returns an immutable collection of child executors whose method signature looks like the following:

    <E extends EventExecutor> Set<E> children();

Result:

A user can now do this:

    Set<NioEventLoop> loops = group.children();
    for (NioEventLoop l: loops) { ... }

Unfortunately, this is not possible:

    for (NioEventLoop l: group.children()) { ... }

However, it's still a gain that a user doesn't need to down-cast explicitly and to add the '@SuppressWarnings` annotation.
This commit is contained in:
Trustin Lee 2014-03-24 12:32:55 +09:00
parent 1e3b7d8273
commit 7dc63ccd95
6 changed files with 81 additions and 22 deletions

View File

@ -15,10 +15,12 @@
*/ */
package io.netty.util.concurrent; package io.netty.util.concurrent;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.RunnableFuture; import java.util.concurrent.RunnableFuture;
@ -33,6 +35,7 @@ public abstract class AbstractEventExecutor extends AbstractExecutorService impl
static final long DEFAULT_SHUTDOWN_TIMEOUT = 15; static final long DEFAULT_SHUTDOWN_TIMEOUT = 15;
private final EventExecutorGroup parent; private final EventExecutorGroup parent;
private final Collection<AbstractEventExecutor> selfCollection = Collections.singleton(this);
protected AbstractEventExecutor() { protected AbstractEventExecutor() {
this(null); this(null);
@ -62,6 +65,11 @@ public abstract class AbstractEventExecutor extends AbstractExecutorService impl
return new EventExecutorIterator(); return new EventExecutorIterator();
} }
@Override
public <E extends EventExecutor> Set<E> children() {
return (Set<E>) selfCollection;
}
@Override @Override
public Future<?> shutdownGracefully() { public Future<?> shutdownGracefully() {
return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS); return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);

View File

@ -15,6 +15,8 @@
*/ */
package io.netty.util.concurrent; package io.netty.util.concurrent;
import java.util.Set;
/** /**
* The {@link EventExecutor} is a special {@link EventExecutorGroup} which comes * The {@link EventExecutor} is a special {@link EventExecutorGroup} which comes
* with some handy methods to see if a {@link Thread} is executed in a event loop. * with some handy methods to see if a {@link Thread} is executed in a event loop.
@ -30,6 +32,12 @@ public interface EventExecutor extends EventExecutorGroup {
@Override @Override
EventExecutor next(); EventExecutor next();
/**
* Returns an unmodifiable singleton set which contains itself.
*/
@Override
<E extends EventExecutor> Set<E> children();
/** /**
* Return the {@link EventExecutorGroup} which is the parent of this {@link EventExecutor}, * Return the {@link EventExecutorGroup} which is the parent of this {@link EventExecutor},
*/ */

View File

@ -17,6 +17,7 @@ package io.netty.util.concurrent;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -30,8 +31,8 @@ import java.util.concurrent.TimeUnit;
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> { public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
/** /**
* Returns {@code true} if and only if this executor was started to be * Returns {@code true} if and only if all {@link EventExecutor}s managed by this {@link EventExecutorGroup}
* {@linkplain #shutdownGracefully() shut down gracefuclly} or was {@linkplain #isShutdown() shut down}. * are being {@linkplain #shutdownGracefully() shut down gracefuclly} or was {@linkplain #isShutdown() shut down}.
*/ */
boolean isShuttingDown(); boolean isShuttingDown();
@ -59,7 +60,8 @@ public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<E
Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit); Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
/** /**
* Returns the {@link Future} which is notified when this executor has been terminated. * Returns the {@link Future} which is notified when all {@link EventExecutor}s managed by this
* {@link EventExecutorGroup} have been terminated.
*/ */
Future<?> terminationFuture(); Future<?> terminationFuture();
@ -78,17 +80,22 @@ public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<E
List<Runnable> shutdownNow(); List<Runnable> shutdownNow();
/** /**
* Returns one of the {@link EventExecutor}s that belong to this group. * Returns one of the {@link EventExecutor}s managed by this {@link EventExecutorGroup}.
*/ */
EventExecutor next(); EventExecutor next();
/** /**
* Returns a read-only {@link Iterator} over all {@link EventExecutor}, which are handled by this * @deprecated Use {@link #children()} instead.
* {@link EventExecutorGroup} at the time of invoke this method.
*/ */
@Override @Override
@Deprecated
Iterator<EventExecutor> iterator(); Iterator<EventExecutor> iterator();
/**
* Returns the unmodifiable set of {@link EventExecutor}s managed by this {@link EventExecutorGroup}.
*/
<E extends EventExecutor> Set<E> children();
@Override @Override
Future<?> submit(Runnable task); Future<?> submit(Runnable task);

View File

@ -17,7 +17,7 @@ package io.netty.util.concurrent;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup { public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
private final EventExecutor[] children; private final EventExecutor[] children;
private final Set<EventExecutor> readonlyChildren;
private final AtomicInteger childIndex = new AtomicInteger(); private final AtomicInteger childIndex = new AtomicInteger();
private final AtomicInteger terminatedChildren = new AtomicInteger(); private final AtomicInteger terminatedChildren = new AtomicInteger();
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE); private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
@ -63,7 +64,7 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
} }
children = new SingleThreadEventExecutor[nThreads]; children = new EventExecutor[nThreads];
if (isPowerOfTwo(children.length)) { if (isPowerOfTwo(children.length)) {
chooser = new PowerOfTwoEventExecutorChooser(); chooser = new PowerOfTwoEventExecutorChooser();
} else { } else {
@ -91,6 +92,7 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
} }
} catch (InterruptedException interrupted) { } catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
break; break;
} }
@ -111,6 +113,10 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto
for (EventExecutor e: children) { for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener); e.terminationFuture().addListener(terminationListener);
} }
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
} }
protected ThreadFactory newDefaultThreadFactory() { protected ThreadFactory newDefaultThreadFactory() {
@ -135,13 +141,10 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto
return children.length; return children.length;
} }
/** @Override
* Return a safe-copy of all of the children of this group. @SuppressWarnings("unchecked")
*/ public final <E extends EventExecutor> Set<E> children() {
protected Set<EventExecutor> children() { return (Set<E>) readonlyChildren;
Set<EventExecutor> children = Collections.newSetFromMap(new LinkedHashMap<EventExecutor, Boolean>());
Collections.addAll(children, this.children);
return children;
} }
/** /**

View File

@ -0,0 +1,27 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.util.concurrent.AbstractEventExecutorGroup;
/**
* Skeletal implementation of {@link EventLoopGroup}.
*/
public abstract class AbstractEventLoopGroup extends AbstractEventExecutorGroup implements EventLoopGroup {
@Override
public abstract EventLoop next();
}

View File

@ -47,9 +47,10 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i
private final Object[] childArgs; private final Object[] childArgs;
private final int maxChannels; private final int maxChannels;
final Executor executor; final Executor executor;
final Set<ThreadPerChannelEventLoop> activeChildren = final Set<EventLoop> activeChildren =
Collections.newSetFromMap(PlatformDependent.<ThreadPerChannelEventLoop, Boolean>newConcurrentHashMap()); Collections.newSetFromMap(PlatformDependent.<EventLoop, Boolean>newConcurrentHashMap());
final Queue<ThreadPerChannelEventLoop> idleChildren = new ConcurrentLinkedQueue<ThreadPerChannelEventLoop>(); private final Set<EventLoop> readOnlyActiveChildren = Collections.unmodifiableSet(activeChildren);
final Queue<EventLoop> idleChildren = new ConcurrentLinkedQueue<EventLoop>();
private final ChannelException tooManyChannels; private final ChannelException tooManyChannels;
private volatile boolean shuttingDown; private volatile boolean shuttingDown;
@ -76,7 +77,7 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i
* *
* @param maxChannels the maximum number of channels to handle with this instance. Once you try to register * @param maxChannels the maximum number of channels to handle with this instance. Once you try to register
* a new {@link Channel} and the maximum is exceed it will throw an * a new {@link Channel} and the maximum is exceed it will throw an
* {@link ChannelException} on the {@link #register(Channel)} and * {@link ChannelException}. on the {@link #register(Channel)} and
* {@link #register(Channel, ChannelPromise)} method. * {@link #register(Channel, ChannelPromise)} method.
* Use {@code 0} to use no limit * Use {@code 0} to use no limit
*/ */
@ -137,8 +138,7 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i
/** /**
* Creates a new {@link EventLoop}. The default implementation creates a new {@link ThreadPerChannelEventLoop}. * Creates a new {@link EventLoop}. The default implementation creates a new {@link ThreadPerChannelEventLoop}.
*/ */
protected ThreadPerChannelEventLoop newChild( protected EventLoop newChild(@SuppressWarnings("UnusedParameters") Object... args) throws Exception {
@SuppressWarnings("UnusedParameters") Object... args) throws Exception {
return new ThreadPerChannelEventLoop(this); return new ThreadPerChannelEventLoop(this);
} }
@ -147,6 +147,12 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i
return new ReadOnlyIterator<EventExecutor>(activeChildren.iterator()); return new ReadOnlyIterator<EventExecutor>(activeChildren.iterator());
} }
@Override
@SuppressWarnings("unchecked")
public <E extends EventExecutor> Set<E> children() {
return (Set<E>) readOnlyActiveChildren;
}
@Override @Override
public EventLoop next() { public EventLoop next() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
@ -299,7 +305,7 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i
throw new RejectedExecutionException("shutting down"); throw new RejectedExecutionException("shutting down");
} }
ThreadPerChannelEventLoop loop = idleChildren.poll(); EventLoop loop = idleChildren.poll();
if (loop == null) { if (loop == null) {
if (maxChannels > 0 && activeChildren.size() >= maxChannels) { if (maxChannels > 0 && activeChildren.size() >= maxChannels) {
throw tooManyChannels; throw tooManyChannels;