Let ChannelGroupFuture extends ChannelFuture and ChannelGroupFutureListener GenericFutureListener

This commit is contained in:
Norman Maurer 2013-03-11 08:38:56 +01:00
parent f2a35273e6
commit 17ebbdec20
11 changed files with 656 additions and 389 deletions

View File

@ -0,0 +1,74 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
private final Future succeededFuture = new SucceededFuture(this);
private final TaskScheduler scheduler;
protected AbstractEventExecutor(TaskScheduler scheduler) {
if (scheduler == null) {
throw new NullPointerException("scheduler");
}
this.scheduler = scheduler;
}
@Override
public EventExecutor next() {
return this;
}
@Override
public Promise newPromise() {
return new DefaultPromise(this);
}
@Override
public Future newSucceededFuture() {
return succeededFuture;
}
@Override
public Future newFailedFuture(Throwable cause) {
return new FailedFuture(this, cause);
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return scheduler.schedule(this, command, delay, unit);
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return scheduler.schedule(this, callable, delay, unit);
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return scheduler.scheduleAtFixedRate(this, command, initialDelay, period, unit);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return scheduler.scheduleWithFixedDelay(this, command, initialDelay, delay, unit);
}
}

View File

@ -0,0 +1,165 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
/**
* {@link EventExecutor} which executes the command in the caller thread.
*/
public final class ImmediateEventExecutor extends AbstractEventExecutor {
public ImmediateEventExecutor(TaskScheduler scheduler) {
super(scheduler);
}
@Override
public EventExecutorGroup parent() {
return null;
}
@Override
public boolean inEventLoop() {
return true;
}
@Override
public boolean inEventLoop(Thread thread) {
return true;
}
@Override
public void shutdown() {
// NOOP
}
@Override
public boolean isShutdown() {
return false;
}
@Override
public boolean isTerminated() {
return false;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) {
return false;
}
@Override
public List<Runnable> shutdownNow() {
return Collections.emptyList();
}
@Override
public <T> Future<T> submit(Callable<T> task) {
if (task == null) {
throw new NullPointerException("task");
}
FutureTask<T> future = new FutureTask<T>(task);
future.run();
return future;
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) {
throw new NullPointerException("task");
}
FutureTask<T> future = new FutureTask<T>(task, result);
future.run();
return future;
}
@SuppressWarnings("unchecked")
@Override
public Future<?> submit(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
FutureTask<?> future = new FutureTask(task, null);
future.run();
return future;
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
if (tasks == null) {
throw new NullPointerException("tasks");
}
List<Future<T>> futures = new ArrayList<Future<T>>();
for (Callable<T> task: tasks) {
futures.add(submit(task));
}
return futures;
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit) {
if (tasks == null) {
throw new NullPointerException("tasks");
}
List<Future<T>> futures = new ArrayList<Future<T>>();
for (Callable<T> task: tasks) {
futures.add(submit(task));
}
return futures;
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
if (tasks == null) {
throw new NullPointerException("tasks");
}
if (tasks.isEmpty()) {
throw new IllegalArgumentException("tasks must be non empty");
}
return invokeAll(tasks).get(0).get();
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException {
if (tasks == null) {
throw new NullPointerException("tasks");
}
if (tasks.isEmpty()) {
throw new IllegalArgumentException("tasks must be non empty");
}
return invokeAll(tasks).get(0).get();
}
@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException("command");
}
command.run();
}
}

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2012 The Netty Project * Copyright 2013 The Netty Project
* *
* The Netty Project licenses this file to you under the Apache License, * The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance * version 2.0 (the "License"); you may not use this file except in compliance
@ -13,25 +13,22 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package io.netty.handler.ssl; package io.netty.util.concurrent;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
/** /**
* {@link Executor} which executes the command in the caller thread. * {@link Executor} which execute tasks in the callers thread.
*/ */
public final class ImmediateExecutor implements Executor { public final class ImmediateExecutor implements Executor {
public static final ImmediateExecutor INSTANCE = new ImmediateExecutor();
/** private ImmediateExecutor() {
* The default instance. // use static instance
*/ }
static final ImmediateExecutor INSTANCE = new ImmediateExecutor();
@Override @Override
public void execute(Runnable command) { public void execute(Runnable command) {
command.run(); command.run();
} }
private ImmediateExecutor() {
}
} }

View File

@ -24,12 +24,9 @@ import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -38,7 +35,7 @@ import java.util.concurrent.TimeUnit;
* Abstract base class for {@link EventExecutor}'s that execute all its submitted tasks in a single thread. * Abstract base class for {@link EventExecutor}'s that execute all its submitted tasks in a single thread.
* *
*/ */
public abstract class SingleThreadEventExecutor extends AbstractExecutorService implements EventExecutor { public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
private static final InternalLogger logger = private static final InternalLogger logger =
InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class); InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
@ -72,12 +69,10 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
} }
private final EventExecutorGroup parent; private final EventExecutorGroup parent;
private final Future succeededFuture = new SucceededFuture(this);
private final Queue<Runnable> taskQueue; private final Queue<Runnable> taskQueue;
private final Thread thread; private final Thread thread;
private final Object stateLock = new Object(); private final Object stateLock = new Object();
private final Semaphore threadLock = new Semaphore(0); private final Semaphore threadLock = new Semaphore(0);
private final TaskScheduler scheduler;
private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>(); private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
private volatile int state = ST_NOT_STARTED; private volatile int state = ST_NOT_STARTED;
private long lastAccessTimeNanos; private long lastAccessTimeNanos;
@ -92,15 +87,12 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
*/ */
protected SingleThreadEventExecutor( protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) { EventExecutorGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) {
super(scheduler);
if (threadFactory == null) { if (threadFactory == null) {
throw new NullPointerException("threadFactory"); throw new NullPointerException("threadFactory");
} }
if (scheduler == null) {
throw new NullPointerException("scheduler");
}
this.parent = parent; this.parent = parent;
this.scheduler = scheduler;
thread = threadFactory.newThread(new Runnable() { thread = threadFactory.newThread(new Runnable() {
@Override @Override
@ -166,11 +158,6 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
return parent; return parent;
} }
@Override
public EventExecutor next() {
return this;
}
/** /**
* Interrupt the current running {@link Thread}. * Interrupt the current running {@link Thread}.
*/ */
@ -479,39 +466,4 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
protected static void reject() { protected static void reject() {
throw new RejectedExecutionException("event executor terminated"); throw new RejectedExecutionException("event executor terminated");
} }
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return scheduler.schedule(this, command, delay, unit);
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return scheduler.schedule(this, callable, delay, unit);
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return scheduler.scheduleAtFixedRate(this, command, initialDelay, period, unit);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return scheduler.scheduleWithFixedDelay(this, command, initialDelay, delay, unit);
}
@Override
public Promise newPromise() {
return new DefaultPromise(this);
}
@Override
public Future newSucceededFuture() {
return succeededFuture;
}
@Override
public Future newFailedFuture(Throwable cause) {
return new FailedFuture(this, cause);
}
} }

View File

@ -13,17 +13,21 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package io.netty.handler.ssl; package io.netty.util.concurrent;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import org.easymock.EasyMock;
import org.junit.Test; import org.junit.Test;
public class ImmediateExecutorTest { import java.util.concurrent.Executors;
public class ImmediateEventExecutorTest {
@Test @Test
public void shouldExecuteImmediately() { public void shouldExecuteImmediately() {
ImmediateExecutor e = ImmediateExecutor.INSTANCE; TaskScheduler scheduler = new TaskScheduler(Executors.defaultThreadFactory());
ImmediateEventExecutor e = new ImmediateEventExecutor(scheduler);
long startTime = System.nanoTime(); long startTime = System.nanoTime();
e.execute(new Runnable() { e.execute(new Runnable() {
@Override @Override
@ -42,5 +46,6 @@ public class ImmediateExecutorTest {
} }
}); });
assertTrue(System.nanoTime() - startTime >= 1000000000L); assertTrue(System.nanoTime() - startTime >= 1000000000L);
scheduler.shutdown();
} }
} }

View File

@ -30,6 +30,7 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise; import io.netty.channel.DefaultChannelPromise;
import io.netty.channel.FileRegion; import io.netty.channel.FileRegion;
import io.netty.util.concurrent.ImmediateExecutor;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;

View File

@ -0,0 +1,52 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.group;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
/**
* {@link ChannelException} which holds {@link ChannelFuture}s that failed because of an error.
*/
public class ChannelGroupException extends ChannelException implements Iterable<Map.Entry<Integer, Throwable>> {
private static final long serialVersionUID = -4093064295562629453L;
private final Collection<Map.Entry<Integer, Throwable>> failed;
public ChannelGroupException(Collection<Map.Entry<Integer, Throwable>> causes) {
if (causes == null) {
throw new NullPointerException("causes");
}
if (causes.isEmpty()) {
throw new IllegalArgumentException("causes must be non empty");
}
this.failed = Collections.unmodifiableCollection(causes);
}
/**
* Returns a {@link Iterator} which contains all the {@link Throwable} that was a cause of the failure and the
* related id of the {@link Channel}.
*/
@Override
public Iterator<Map.Entry<Integer, Throwable>> iterator() {
return failed.iterator();
}
}

View File

@ -19,9 +19,10 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.TimeUnit;
/** /**
* The result of an asynchronous {@link ChannelGroup} operation. * The result of an asynchronous {@link ChannelGroup} operation.
@ -42,13 +43,13 @@ import java.util.concurrent.TimeUnit;
* {@link ChannelGroupFutureListener} so you can get notified when the I/O * {@link ChannelGroupFutureListener} so you can get notified when the I/O
* operation have been completed. * operation have been completed.
* *
* <h3>Prefer {@link #addListener(ChannelGroupFutureListener)} to {@link #await()}</h3> * <h3>Prefer {@link #addListener(GenericFutureListener)} to {@link #await()}</h3>
* *
* It is recommended to prefer {@link #addListener(ChannelGroupFutureListener)} to * It is recommended to prefer {@link #addListener(GenericFutureListener)} to
* {@link #await()} wherever possible to get notified when I/O operations are * {@link #await()} wherever possible to get notified when I/O operations are
* done and to do any follow-up tasks. * done and to do any follow-up tasks.
* <p> * <p>
* {@link #addListener(ChannelGroupFutureListener)} is non-blocking. It simply * {@link #addListener(GenericFutureListener)} is non-blocking. It simply
* adds the specified {@link ChannelGroupFutureListener} to the * adds the specified {@link ChannelGroupFutureListener} to the
* {@link ChannelGroupFuture}, and I/O thread will notify the listeners when * {@link ChannelGroupFuture}, and I/O thread will notify the listeners when
* the I/O operations associated with the future is done. * the I/O operations associated with the future is done.
@ -101,7 +102,7 @@ import java.util.concurrent.TimeUnit;
* make sure you do not call {@link #await()} in an I/O thread. Otherwise, * make sure you do not call {@link #await()} in an I/O thread. Otherwise,
* {@link IllegalStateException} will be raised to prevent a dead lock. * {@link IllegalStateException} will be raised to prevent a dead lock.
*/ */
public interface ChannelGroupFuture extends Iterable<ChannelFuture> { public interface ChannelGroupFuture extends Future, Iterable<ChannelFuture> {
/** /**
* Returns the {@link ChannelGroup} which is associated with this future. * Returns the {@link ChannelGroup} which is associated with this future.
@ -127,18 +128,15 @@ public interface ChannelGroupFuture extends Iterable<ChannelFuture> {
*/ */
ChannelFuture find(Channel channel); ChannelFuture find(Channel channel);
/**
* Returns {@code true} if and only if this future is
* complete, regardless of whether the operation was successful, failed,
* or canceled.
*/
boolean isDone();
/** /**
* Returns {@code true} if and only if all I/O operations associated with * Returns {@code true} if and only if all I/O operations associated with
* this future were successful without any failure. * this future were successful without any failure.
*/ */
boolean isCompleteSuccess(); @Override
boolean isSuccess();
@Override
ChannelGroupException cause();
/** /**
* Returns {@code true} if and only if the I/O operations associated with * Returns {@code true} if and only if the I/O operations associated with
@ -146,93 +144,35 @@ public interface ChannelGroupFuture extends Iterable<ChannelFuture> {
*/ */
boolean isPartialSuccess(); boolean isPartialSuccess();
/**
* Returns {@code true} if and only if all I/O operations associated with
* this future have failed without any success.
*/
boolean isCompleteFailure();
/** /**
* Returns {@code true} if and only if the I/O operations associated with * Returns {@code true} if and only if the I/O operations associated with
* this future have failed partially with some success. * this future have failed partially with some success.
*/ */
boolean isPartialFailure(); boolean isPartialFailure();
/** @Override
* Adds the specified listener to this future. The ChannelGroupFuture addListener(GenericFutureListener<? extends Future> listener);
* specified listener is notified when this future is
* {@linkplain #isDone() done}. If this future is already
* completed, the specified listener is notified immediately.
*/
ChannelGroupFuture addListener(ChannelGroupFutureListener listener);
/** @Override
* Removes the specified listener from this future. ChannelGroupFuture addListeners(GenericFutureListener<? extends Future>... listeners);
* The specified listener is no longer notified when this
* future is {@linkplain #isDone() done}. If this
* future is already completed, this method has no effect
* and returns silently.
*/
ChannelGroupFuture removeListener(ChannelGroupFutureListener listener);
/** @Override
* Waits for this future to be completed. ChannelGroupFuture removeListener(GenericFutureListener<? extends Future> listener);
*
* @throws InterruptedException @Override
* if the current thread was interrupted ChannelGroupFuture removeListeners(GenericFutureListener<? extends Future>... listeners);
*/
@Override
ChannelGroupFuture await() throws InterruptedException; ChannelGroupFuture await() throws InterruptedException;
/** @Override
* Waits for this future to be completed without
* interruption. This method catches an {@link InterruptedException} and
* discards it silently.
*/
ChannelGroupFuture awaitUninterruptibly(); ChannelGroupFuture awaitUninterruptibly();
/** @Override
* Waits for this future to be completed within the ChannelGroupFuture syncUninterruptibly();
* specified time limit.
*
* @return {@code true} if and only if the future was completed within
* the specified time limit
*
* @throws InterruptedException
* if the current thread was interrupted
*/
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
/** @Override
* Waits for this future to be completed within the ChannelGroupFuture sync() throws InterruptedException;
* specified time limit.
*
* @return {@code true} if and only if the future was completed within
* the specified time limit
*
* @throws InterruptedException
* if the current thread was interrupted
*/
boolean await(long timeoutMillis) throws InterruptedException;
/**
* Waits for this future to be completed within the
* specified time limit without interruption. This method catches an
* {@link InterruptedException} and discards it silently.
*
* @return {@code true} if and only if the future was completed within
* the specified time limit
*/
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
/**
* Waits for this future to be completed within the
* specified time limit without interruption. This method catches an
* {@link InterruptedException} and discards it silently.
*
* @return {@code true} if and only if the future was completed within
* the specified time limit
*/
boolean awaitUninterruptibly(long timeoutMillis);
/** /**
* Returns the {@link Iterator} that enumerates all {@link ChannelFuture}s * Returns the {@link Iterator} that enumerates all {@link ChannelFuture}s

View File

@ -15,22 +15,14 @@
*/ */
package io.netty.channel.group; package io.netty.channel.group;
import java.util.EventListener; import io.netty.util.concurrent.GenericFutureListener;
/** /**
* Listens to the result of a {@link ChannelGroupFuture}. The result of the * Listens to the result of a {@link ChannelGroupFuture}. The result of the
* asynchronous {@link ChannelGroup} I/O operations is notified once this * asynchronous {@link ChannelGroup} I/O operations is notified once this
* listener is added by calling {@link ChannelGroupFuture#addListener(ChannelGroupFutureListener)} * listener is added by calling {@link ChannelGroupFuture#addListener(GenericFutureListener)}
* and all I/O operations are complete. * and all I/O operations are complete.
*/ */
public interface ChannelGroupFutureListener extends EventListener { public interface ChannelGroupFutureListener extends GenericFutureListener<ChannelGroupFuture> {
/**
* Invoked when all I/O operations associated with the
* {@link ChannelGroupFuture} have been completed.
*
* @param future The source {@link ChannelGroupFuture} which called this
* callback.
*/
void operationComplete(ChannelGroupFuture future) throws Exception;
} }

View File

@ -21,15 +21,31 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.FileRegion; import io.netty.channel.FileRegion;
import io.netty.channel.ServerChannel; import io.netty.channel.ServerChannel;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.FailedFuture;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.SucceededFuture;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import java.util.AbstractSet; import java.util.AbstractSet;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
/** /**
@ -38,8 +54,9 @@ import java.util.concurrent.atomic.AtomicInteger;
public class DefaultChannelGroup extends AbstractSet<Channel> implements ChannelGroup { public class DefaultChannelGroup extends AbstractSet<Channel> implements ChannelGroup {
private static final AtomicInteger nextId = new AtomicInteger(); private static final AtomicInteger nextId = new AtomicInteger();
private static final ImmediateEventExecutor DEFAULT_EXECUTOR = new ImmediateEventExecutor();
private final String name; private final String name;
private final EventExecutor executor;
private final ConcurrentMap<Integer, Channel> serverChannels = PlatformDependent.newConcurrentHashMap(); private final ConcurrentMap<Integer, Channel> serverChannels = PlatformDependent.newConcurrentHashMap();
private final ConcurrentMap<Integer, Channel> nonServerChannels = PlatformDependent.newConcurrentHashMap(); private final ConcurrentMap<Integer, Channel> nonServerChannels = PlatformDependent.newConcurrentHashMap();
private final ChannelFutureListener remover = new ChannelFutureListener() { private final ChannelFutureListener remover = new ChannelFutureListener() {
@ -56,16 +73,34 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
this("group-0x" + Integer.toHexString(nextId.incrementAndGet())); this("group-0x" + Integer.toHexString(nextId.incrementAndGet()));
} }
/**
* Creates a new group with a generated name amd the provided {@link EventExecutor} to notify the
* {@link ChannelGroupFuture}s.
*/
public DefaultChannelGroup(EventExecutor executor) {
this("group-0x" + Integer.toHexString(nextId.incrementAndGet()), executor);
}
/** /**
* Creates a new group with the specified {@code name}. Please note that * Creates a new group with the specified {@code name}. Please note that
* different groups can have the same name, which means no duplicate check * different groups can have the same name, which means no duplicate check
* is done against group names. * is done against group names.
*/ */
public DefaultChannelGroup(String name) { public DefaultChannelGroup(String name) {
this(name, DEFAULT_EXECUTOR);
}
/**
* Creates a new group with the specified {@code name} and {@link EventExecutor} to notify the
* {@link ChannelGroupFuture}s. Please note that different groups can have the same name, which means no
* duplicate check is done against group names.
*/
public DefaultChannelGroup(String name, EventExecutor executor) {
if (name == null) { if (name == null) {
throw new NullPointerException("name"); throw new NullPointerException("name");
} }
this.name = name; this.name = name;
this.executor = executor;
} }
@Override @Override
@ -187,7 +222,7 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
futures.put(c.id(), c.close()); futures.put(c.id(), c.close());
} }
return new DefaultChannelGroupFuture(this, futures); return new DefaultChannelGroupFuture(this, futures, executor);
} }
@Override @Override
@ -202,7 +237,7 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
futures.put(c.id(), c.disconnect()); futures.put(c.id(), c.disconnect());
} }
return new DefaultChannelGroupFuture(this, futures); return new DefaultChannelGroupFuture(this, futures, executor);
} }
@Override @Override
@ -218,7 +253,7 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
} }
BufUtil.release(message); BufUtil.release(message);
return new DefaultChannelGroupFuture(this, futures); return new DefaultChannelGroupFuture(this, futures, executor);
} }
@Override @Override
@ -234,7 +269,7 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
} }
BufUtil.release(region); BufUtil.release(region);
return new DefaultChannelGroupFuture(this, futures); return new DefaultChannelGroupFuture(this, futures, executor);
} }
@Override @Override
@ -244,7 +279,7 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
futures.put(c.id(), c.flush()); futures.put(c.id(), c.flush());
} }
return new DefaultChannelGroupFuture(this, futures); return new DefaultChannelGroupFuture(this, futures, executor);
} }
@Override @Override
@ -259,7 +294,7 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
futures.put(c.id(), c.deregister()); futures.put(c.id(), c.deregister());
} }
return new DefaultChannelGroupFuture(this, futures); return new DefaultChannelGroupFuture(this, futures, executor);
} }
@Override @Override
@ -287,4 +322,177 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
return getClass().getSimpleName() + return getClass().getSimpleName() +
"(name: " + name() + ", size: " + size() + ')'; "(name: " + name() + ", size: " + size() + ')';
} }
private static final class ImmediateEventExecutor implements EventExecutor {
private final Future successedFuture = new SucceededFuture(this);
@Override
public EventExecutor next() {
return this;
}
@Override
public EventExecutorGroup parent() {
return null;
}
@Override
public boolean inEventLoop() {
return true;
}
@Override
public boolean inEventLoop(Thread thread) {
return true;
}
@Override
public void shutdown() {
}
@Override
public boolean isShutdown() {
return false;
}
@Override
public boolean isTerminated() {
return false;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public Promise newPromise() {
return new DefaultPromise(this);
}
@Override
public Future newSucceededFuture() {
return successedFuture;
}
@Override
public Future newFailedFuture(Throwable cause) {
return new FailedFuture(this, cause);
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
long period, TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
long delay, TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
public List<Runnable> shutdownNow() {
return Collections.emptyList();
}
@Override
public <T> java.util.concurrent.Future<T> submit(Callable<T> task) {
if (task == null) {
throw new NullPointerException("task");
}
FutureTask<T> future = new FutureTask<T>(task);
future.run();
return future;
}
@Override
public <T> java.util.concurrent.Future<T> submit(Runnable task, T result) {
if (task == null) {
throw new NullPointerException("task");
}
FutureTask<T> future = new FutureTask<T>(task, result);
future.run();
return future;
}
@SuppressWarnings("unchecked")
@Override
public java.util.concurrent.Future<?> submit(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
FutureTask<?> future = new FutureTask(task, null);
future.run();
return future;
}
@Override
public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
if (tasks == null) {
throw new NullPointerException("tasks");
}
List<java.util.concurrent.Future<T>> futures = new ArrayList<java.util.concurrent.Future<T>>();
for (Callable<T> task: tasks) {
futures.add(submit(task));
}
return futures;
}
@Override
public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit) {
if (tasks == null) {
throw new NullPointerException("tasks");
}
List<java.util.concurrent.Future<T>> futures = new ArrayList<java.util.concurrent.Future<T>>();
for (Callable<T> task: tasks) {
futures.add(submit(task));
}
return futures;
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
if (tasks == null) {
throw new NullPointerException("tasks");
}
if (tasks.isEmpty()) {
throw new IllegalArgumentException("tasks must be non empty");
}
return invokeAll(tasks).get(0).get();
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException {
if (tasks == null) {
throw new NullPointerException("tasks");
}
if (tasks.isEmpty()) {
throw new IllegalArgumentException("tasks must be non empty");
}
return invokeAll(tasks).get(0).get();
}
@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException("command");
}
command.run();
}
}
} }

View File

@ -18,8 +18,11 @@ package io.netty.channel.group;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
@ -28,26 +31,17 @@ import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.*;
/** /**
* The default {@link ChannelGroupFuture} implementation. * The default {@link ChannelGroupFuture} implementation.
*/ */
public class DefaultChannelGroupFuture implements ChannelGroupFuture { final class DefaultChannelGroupFuture extends DefaultPromise implements ChannelGroupFuture {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(DefaultChannelGroupFuture.class);
private final ChannelGroup group; private final ChannelGroup group;
final Map<Integer, ChannelFuture> futures; private final Map<Integer, ChannelFuture> futures;
private ChannelGroupFutureListener firstListener; private int successCount;
private List<ChannelGroupFutureListener> otherListeners; private int failureCount;
private boolean done;
int successCount;
int failureCount;
private int waiters;
private final ChannelFutureListener childListener = new ChannelFutureListener() { private final ChannelFutureListener childListener = new ChannelFutureListener() {
@Override @Override
@ -66,7 +60,18 @@ public class DefaultChannelGroupFuture implements ChannelGroupFuture {
} }
if (callSetDone) { if (callSetDone) {
setDone(); if (failureCount > 0) {
List<Map.Entry<Integer, Throwable>> failed =
new ArrayList<Map.Entry<Integer, Throwable>>(failureCount);
for (ChannelFuture f: futures.values()) {
if (!f.isSuccess()) {
failed.add(new DefaultEntry<Integer, Throwable>(f.channel().id(), f.cause()));
}
}
setFailure0(new ChannelGroupException(failed));
} else {
setSuccess0();
}
} }
} }
}; };
@ -74,7 +79,8 @@ public class DefaultChannelGroupFuture implements ChannelGroupFuture {
/** /**
* Creates a new instance. * Creates a new instance.
*/ */
public DefaultChannelGroupFuture(ChannelGroup group, Collection<ChannelFuture> futures) { public DefaultChannelGroupFuture(ChannelGroup group, Collection<ChannelFuture> futures, EventExecutor executor) {
super(executor);
if (group == null) { if (group == null) {
throw new NullPointerException("group"); throw new NullPointerException("group");
} }
@ -97,11 +103,12 @@ public class DefaultChannelGroupFuture implements ChannelGroupFuture {
// Done on arrival? // Done on arrival?
if (this.futures.isEmpty()) { if (this.futures.isEmpty()) {
setDone(); setSuccess0();
} }
} }
DefaultChannelGroupFuture(ChannelGroup group, Map<Integer, ChannelFuture> futures) { DefaultChannelGroupFuture(ChannelGroup group, Map<Integer, ChannelFuture> futures, EventExecutor executor) {
super(executor);
this.group = group; this.group = group;
this.futures = Collections.unmodifiableMap(futures); this.futures = Collections.unmodifiableMap(futures);
for (ChannelFuture f: this.futures.values()) { for (ChannelFuture f: this.futures.values()) {
@ -110,7 +117,7 @@ public class DefaultChannelGroupFuture implements ChannelGroupFuture {
// Done on arrival? // Done on arrival?
if (this.futures.isEmpty()) { if (this.futures.isEmpty()) {
setDone(); setSuccess0();
} }
} }
@ -134,16 +141,6 @@ public class DefaultChannelGroupFuture implements ChannelGroupFuture {
return futures.values().iterator(); return futures.values().iterator();
} }
@Override
public synchronized boolean isDone() {
return done;
}
@Override
public synchronized boolean isCompleteSuccess() {
return successCount == futures.size();
}
@Override @Override
public synchronized boolean isPartialSuccess() { public synchronized boolean isPartialSuccess() {
return successCount != 0 && successCount != futures.size(); return successCount != 0 && successCount != futures.size();
@ -155,224 +152,108 @@ public class DefaultChannelGroupFuture implements ChannelGroupFuture {
} }
@Override @Override
public synchronized boolean isCompleteFailure() { public DefaultChannelGroupFuture addListener(GenericFutureListener<? extends Future> listener) {
int futureCnt = futures.size(); super.addListener(listener);
return futureCnt != 0 && failureCount == futureCnt;
}
@Override
public ChannelGroupFuture addListener(ChannelGroupFutureListener listener) {
if (listener == null) {
throw new NullPointerException("listener");
}
boolean notifyNow = false;
synchronized (this) {
if (done) {
notifyNow = true;
} else {
if (firstListener == null) {
firstListener = listener;
} else {
if (otherListeners == null) {
otherListeners = new ArrayList<ChannelGroupFutureListener>(1);
}
otherListeners.add(listener);
}
}
}
if (notifyNow) {
notifyListener(listener);
}
return this; return this;
} }
@Override @Override
public ChannelGroupFuture removeListener(ChannelGroupFutureListener listener) { public DefaultChannelGroupFuture addListeners(GenericFutureListener<? extends Future>... listeners) {
if (listener == null) { super.addListeners(listeners);
throw new NullPointerException("listener");
}
synchronized (this) {
if (!done) {
if (listener == firstListener) {
if (otherListeners != null && !otherListeners.isEmpty()) {
firstListener = otherListeners.remove(0);
} else {
firstListener = null;
}
} else if (otherListeners != null) {
otherListeners.remove(listener);
}
}
}
return this; return this;
} }
@Override @Override
public ChannelGroupFuture await() throws InterruptedException { public DefaultChannelGroupFuture removeListener(GenericFutureListener<? extends Future> listener) {
if (Thread.interrupted()) { super.removeListener(listener);
throw new InterruptedException();
}
synchronized (this) {
while (!done) {
waiters++;
try {
wait();
} finally {
waiters--;
}
}
}
return this; return this;
} }
@Override @Override
public boolean await(long timeout, TimeUnit unit) public DefaultChannelGroupFuture removeListeners(GenericFutureListener<? extends Future>... listeners) {
throws InterruptedException { super.removeListeners(listeners);
return await0(unit.toNanos(timeout), true);
}
@Override
public boolean await(long timeoutMillis) throws InterruptedException {
return await0(MILLISECONDS.toNanos(timeoutMillis), true);
}
@Override
public ChannelGroupFuture awaitUninterruptibly() {
boolean interrupted = false;
synchronized (this) {
while (!done) {
waiters++;
try {
wait();
} catch (InterruptedException e) {
interrupted = true;
} finally {
waiters--;
}
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
return this; return this;
} }
@Override @Override
public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { public DefaultChannelGroupFuture await() throws InterruptedException {
try { super.await();
return await0(unit.toNanos(timeout), false); return this;
} catch (InterruptedException e) {
throw new InternalError();
}
} }
@Override @Override
public boolean awaitUninterruptibly(long timeoutMillis) { public DefaultChannelGroupFuture awaitUninterruptibly() {
try { super.awaitUninterruptibly();
return await0(MILLISECONDS.toNanos(timeoutMillis), false); return this;
} catch (InterruptedException e) {
throw new InternalError();
}
} }
private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException { @Override
if (interruptable && Thread.interrupted()) { public DefaultChannelGroupFuture syncUninterruptibly() {
throw new InterruptedException(); super.syncUninterruptibly();
} return this;
long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime();
long waitTime = timeoutNanos;
boolean interrupted = false;
try {
synchronized (this) {
if (done || waitTime <= 0) {
return done;
}
waiters++;
try {
for (;;) {
try {
wait(waitTime / 1000000, (int) (waitTime % 1000000));
} catch (InterruptedException e) {
if (interruptable) {
throw e;
} else {
interrupted = true;
}
}
if (done) {
return true;
} else {
waitTime = timeoutNanos - (System.nanoTime() - startTime);
if (waitTime <= 0) {
return done;
}
}
}
} finally {
waiters--;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
} }
void setDone() { @Override
synchronized (this) { public DefaultChannelGroupFuture sync() throws InterruptedException {
// Allow only once. super.sync();
if (done) { return this;
return;
}
done = true;
if (waiters > 0) {
notifyAll();
}
}
notifyListeners();
} }
private void notifyListeners() { @Override
// This method doesn't need synchronization because: public ChannelGroupException cause() {
// 1) This method is always called after synchronized (this) block. return (ChannelGroupException) super.cause();
// Hence any listener list modification happens-before this method.
// 2) This method is called only when 'done' is true. Once 'done'
// becomes true, the listener list is never modified - see add/removeListener()
if (firstListener != null) {
notifyListener(firstListener);
firstListener = null;
if (otherListeners != null) {
for (ChannelGroupFutureListener l: otherListeners) {
notifyListener(l);
}
otherListeners = null;
}
}
} }
private void notifyListener(ChannelGroupFutureListener l) { private void setSuccess0() {
try { super.setSuccess();
l.operationComplete(this); }
} catch (Throwable t) {
if (logger.isWarnEnabled()) { private void setFailure0(ChannelGroupException cause) {
logger.warn( super.setFailure(cause);
"An exception was thrown by " + }
ChannelFutureListener.class.getSimpleName() + '.', t);
} @Override
public Promise setSuccess() {
throw new IllegalStateException();
}
@Override
public boolean trySuccess() {
throw new IllegalStateException();
}
@Override
public Promise setFailure(Throwable cause) {
throw new IllegalStateException();
}
@Override
public boolean tryFailure(Throwable cause) {
throw new IllegalStateException();
}
private static final class DefaultEntry<K, V> implements Map.Entry<K, V> {
private final K key;
private final V value;
public DefaultEntry(K key, V value) {
this.key = key;
this.value = value;
}
@Override
public K getKey() {
return key;
}
@Override
public V getValue() {
return value;
}
@Override
public V setValue(V value) {
throw new UnsupportedOperationException("read-only");
} }
} }
} }