Let EventExecutor return our Future to allow the user to work also with FutureListener here. Also add a special ScheduledFuture that extends our Future for this purpose.
This commit is contained in:
parent
2970383bac
commit
ce87b627be
@ -15,14 +15,15 @@
|
||||
*/
|
||||
package io.netty.util.concurrent;
|
||||
|
||||
import java.util.concurrent.AbstractExecutorService;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
||||
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
|
||||
private final Future succeededFuture = new SucceededFuture(this);
|
||||
/**
|
||||
* Abstract base class for {@link EventExecutor} implementations that use a {@link TaskScheduler} to support
|
||||
* scheduling tasks.
|
||||
*/
|
||||
public abstract class AbstractEventExecutor extends AbstractEventExecutorWithoutScheduler {
|
||||
private final TaskScheduler scheduler;
|
||||
|
||||
protected AbstractEventExecutor(TaskScheduler scheduler) {
|
||||
@ -32,26 +33,6 @@ public abstract class AbstractEventExecutor extends AbstractExecutorService impl
|
||||
this.scheduler = scheduler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EventExecutor next() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Promise newPromise() {
|
||||
return new DefaultPromise(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future newSucceededFuture() {
|
||||
return succeededFuture;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future newFailedFuture(Throwable cause) {
|
||||
return new FailedFuture(this, cause);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
|
||||
return scheduler.schedule(this, command, delay, unit);
|
||||
|
@ -0,0 +1,94 @@
|
||||
/*
|
||||
* Copyright 2013 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.util.concurrent;
|
||||
|
||||
import java.util.concurrent.AbstractExecutorService;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.RunnableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Abstract base class for {@link EventExecutor} implementations.
|
||||
*/
|
||||
public abstract class AbstractEventExecutorWithoutScheduler extends AbstractExecutorService implements EventExecutor {
|
||||
|
||||
@Override
|
||||
public EventExecutor next() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> Promise<V> newPromise() {
|
||||
return new DefaultPromise<V>(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> Future<V> newSucceededFuture(V result) {
|
||||
return new SucceededFuture<V>(this, result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> Future<V> newFailedFuture(Throwable cause) {
|
||||
return new FailedFuture<V>(this, cause);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<?> submit(Runnable task) {
|
||||
return (Future<?>) super.submit(task);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Future<T> submit(Runnable task, T result) {
|
||||
return (Future<T>) super.submit(task, result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Future<T> submit(Callable<T> task) {
|
||||
return (Future<T>) super.submit(task);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected final <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
|
||||
return new PromiseTask<T>(this, runnable, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
|
||||
return new PromiseTask<T>(this, callable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledFuture<?> schedule(Runnable command, long delay,
|
||||
TimeUnit unit) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,61 @@
|
||||
/*
|
||||
* Copyright 2013 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.util.concurrent;
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* Abstract {@link Future} implementation which does not allow for cancellation.
|
||||
*
|
||||
* @param <V>
|
||||
*/
|
||||
public abstract class AbstractFuture<V> implements Future<V> {
|
||||
|
||||
@Override
|
||||
public boolean cancel(boolean mayInterruptIfRunning) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCancelled() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public V get() throws InterruptedException, ExecutionException {
|
||||
await();
|
||||
|
||||
Throwable cause = cause();
|
||||
if (cause == null) {
|
||||
return getNow();
|
||||
}
|
||||
throw new ExecutionException(cause);
|
||||
}
|
||||
|
||||
@Override
|
||||
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
|
||||
if (await(timeout, unit)) {
|
||||
Throwable cause = cause();
|
||||
if (cause == null) {
|
||||
return getNow();
|
||||
}
|
||||
throw new ExecutionException(cause);
|
||||
}
|
||||
throw new TimeoutException();
|
||||
}
|
||||
}
|
@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit;
|
||||
/**
|
||||
* A skeletal {@link Future} implementation which represents a {@link Future} which has been completed already.
|
||||
*/
|
||||
public abstract class CompleteFuture implements Future {
|
||||
public abstract class CompleteFuture<V> extends AbstractFuture<V> {
|
||||
|
||||
private final EventExecutor executor;
|
||||
|
||||
@ -39,7 +39,7 @@ public abstract class CompleteFuture implements Future {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future addListener(GenericFutureListener<? extends Future> listener) {
|
||||
public Future<V> addListener(GenericFutureListener<? extends Future<V>> listener) {
|
||||
if (listener == null) {
|
||||
throw new NullPointerException("listener");
|
||||
}
|
||||
@ -48,11 +48,11 @@ public abstract class CompleteFuture implements Future {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future addListeners(GenericFutureListener<? extends Future>... listeners) {
|
||||
public Future<V> addListeners(GenericFutureListener<? extends Future<V>>... listeners) {
|
||||
if (listeners == null) {
|
||||
throw new NullPointerException("listeners");
|
||||
}
|
||||
for (GenericFutureListener<? extends Future> l: listeners) {
|
||||
for (GenericFutureListener<? extends Future<V>> l: listeners) {
|
||||
if (l == null) {
|
||||
break;
|
||||
}
|
||||
@ -62,19 +62,19 @@ public abstract class CompleteFuture implements Future {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future removeListener(GenericFutureListener<? extends Future> listener) {
|
||||
public Future<V> removeListener(GenericFutureListener<? extends Future<V>> listener) {
|
||||
// NOOP
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future removeListeners(GenericFutureListener<? extends Future>... listeners) {
|
||||
public Future<V> removeListeners(GenericFutureListener<? extends Future<V>>... listeners) {
|
||||
// NOOP
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future await() throws InterruptedException {
|
||||
public Future<V> await() throws InterruptedException {
|
||||
if (Thread.interrupted()) {
|
||||
throw new InterruptedException();
|
||||
}
|
||||
@ -90,12 +90,12 @@ public abstract class CompleteFuture implements Future {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future sync() throws InterruptedException {
|
||||
public Future<V> sync() throws InterruptedException {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future syncUninterruptibly() {
|
||||
public Future<V> syncUninterruptibly() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@ -108,7 +108,7 @@ public abstract class CompleteFuture implements Future {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future awaitUninterruptibly() {
|
||||
public Future<V> awaitUninterruptibly() {
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -16,14 +16,14 @@
|
||||
package io.netty.util.concurrent;
|
||||
|
||||
|
||||
public abstract class CompletePromise extends CompleteFuture implements Promise {
|
||||
public abstract class CompletePromise<V> extends CompleteFuture<V> implements Promise<V> {
|
||||
|
||||
protected CompletePromise(EventExecutor executor) {
|
||||
super(executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Promise setFailure(Throwable cause) {
|
||||
public Promise<V> setFailure(Throwable cause) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
@ -33,52 +33,52 @@ public abstract class CompletePromise extends CompleteFuture implements Promise
|
||||
}
|
||||
|
||||
@Override
|
||||
public Promise setSuccess() {
|
||||
public Promise<V> setSuccess(V result) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean trySuccess() {
|
||||
public boolean trySuccess(V result) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Promise await() throws InterruptedException {
|
||||
public Promise<V> await() throws InterruptedException {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Promise awaitUninterruptibly() {
|
||||
public Promise<V> awaitUninterruptibly() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Promise syncUninterruptibly() {
|
||||
public Promise<V> syncUninterruptibly() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Promise sync() throws InterruptedException {
|
||||
public Promise<V> sync() throws InterruptedException {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Promise addListener(GenericFutureListener<? extends Future> listener) {
|
||||
return (Promise) super.addListener(listener);
|
||||
public Promise<V> addListener(GenericFutureListener<? extends Future<V>> listener) {
|
||||
return (Promise<V>) super.addListener(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Promise addListeners(GenericFutureListener<? extends Future>... listeners) {
|
||||
return (Promise) super.addListeners(listeners);
|
||||
public Promise<V> addListeners(GenericFutureListener<? extends Future<V>>... listeners) {
|
||||
return (Promise<V>) super.addListeners(listeners);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Promise removeListener(GenericFutureListener<? extends Future> listener) {
|
||||
return (Promise) super.removeListener(listener);
|
||||
public Promise<V> removeListener(GenericFutureListener<? extends Future<V>> listener) {
|
||||
return (Promise<V>) super.removeListener(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Promise removeListeners(GenericFutureListener<? extends Future>... listeners) {
|
||||
return (Promise) super.removeListeners(listeners);
|
||||
public Promise<V> removeListeners(GenericFutureListener<? extends Future<V>>... listeners) {
|
||||
return (Promise<V>) super.removeListeners(listeners);
|
||||
}
|
||||
}
|
||||
|
@ -25,7 +25,7 @@ import java.util.concurrent.TimeUnit;
|
||||
import static java.util.concurrent.TimeUnit.*;
|
||||
|
||||
|
||||
public class DefaultPromise implements Promise {
|
||||
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
|
||||
|
||||
private static final InternalLogger logger =
|
||||
InternalLoggerFactory.getInstance(DefaultPromise.class);
|
||||
@ -37,12 +37,10 @@ public class DefaultPromise implements Promise {
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
|
||||
private static final Signal SUCCESS = new Signal(DefaultPromise.class.getName() + ".SUCCESS");
|
||||
|
||||
private final EventExecutor executor;
|
||||
|
||||
private volatile Throwable cause;
|
||||
private volatile Object result;
|
||||
private Object listeners; // Can be ChannelFutureListener or DefaultChannelPromiseListeners
|
||||
|
||||
/**
|
||||
@ -78,22 +76,29 @@ public class DefaultPromise implements Promise {
|
||||
|
||||
@Override
|
||||
public boolean isDone() {
|
||||
return cause != null;
|
||||
return result != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSuccess() {
|
||||
return cause == SUCCESS;
|
||||
Object result = this.result;
|
||||
if (result == null) {
|
||||
return false;
|
||||
}
|
||||
return !(result instanceof CauseHolder);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Throwable cause() {
|
||||
Throwable cause = this.cause;
|
||||
return cause == SUCCESS? null : cause;
|
||||
Object cause = result;
|
||||
if (cause instanceof CauseHolder) {
|
||||
return ((CauseHolder) cause).cause;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Promise addListener(GenericFutureListener<? extends Future> listener) {
|
||||
public Promise<V> addListener(GenericFutureListener<? extends Future<V>> listener) {
|
||||
if (listener == null) {
|
||||
throw new NullPointerException("listener");
|
||||
}
|
||||
@ -112,7 +117,7 @@ public class DefaultPromise implements Promise {
|
||||
((DefaultPromiseListeners) listeners).add(listener);
|
||||
} else {
|
||||
listeners = new DefaultPromiseListeners(
|
||||
(GenericFutureListener<? extends Future>) listeners, listener);
|
||||
(GenericFutureListener<? extends Future<V>>) listeners, listener);
|
||||
}
|
||||
}
|
||||
return this;
|
||||
@ -124,12 +129,12 @@ public class DefaultPromise implements Promise {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Promise addListeners(GenericFutureListener<? extends Future>... listeners) {
|
||||
public Promise<V> addListeners(GenericFutureListener<? extends Future<V>>... listeners) {
|
||||
if (listeners == null) {
|
||||
throw new NullPointerException("listeners");
|
||||
}
|
||||
|
||||
for (GenericFutureListener<? extends Future> l: listeners) {
|
||||
for (GenericFutureListener<? extends Future<V>> l: listeners) {
|
||||
if (l == null) {
|
||||
break;
|
||||
}
|
||||
@ -139,7 +144,7 @@ public class DefaultPromise implements Promise {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Promise removeListener(GenericFutureListener<? extends Future> listener) {
|
||||
public Promise<V> removeListener(GenericFutureListener<? extends Future<V>> listener) {
|
||||
if (listener == null) {
|
||||
throw new NullPointerException("listener");
|
||||
}
|
||||
@ -162,12 +167,12 @@ public class DefaultPromise implements Promise {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Promise removeListeners(GenericFutureListener<? extends Future>... listeners) {
|
||||
public Promise<V> removeListeners(GenericFutureListener<? extends Future<V>>... listeners) {
|
||||
if (listeners == null) {
|
||||
throw new NullPointerException("listeners");
|
||||
}
|
||||
|
||||
for (GenericFutureListener<? extends Future> l: listeners) {
|
||||
for (GenericFutureListener<? extends Future<V>> l: listeners) {
|
||||
if (l == null) {
|
||||
break;
|
||||
}
|
||||
@ -177,14 +182,14 @@ public class DefaultPromise implements Promise {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Promise sync() throws InterruptedException {
|
||||
public Promise<V> sync() throws InterruptedException {
|
||||
await();
|
||||
rethrowIfFailed();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Promise syncUninterruptibly() {
|
||||
public Promise<V> syncUninterruptibly() {
|
||||
awaitUninterruptibly();
|
||||
rethrowIfFailed();
|
||||
return this;
|
||||
@ -200,7 +205,7 @@ public class DefaultPromise implements Promise {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Promise await() throws InterruptedException {
|
||||
public Promise<V> await() throws InterruptedException {
|
||||
if (isDone()) {
|
||||
return this;
|
||||
}
|
||||
@ -235,7 +240,7 @@ public class DefaultPromise implements Promise {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Promise awaitUninterruptibly() {
|
||||
public Promise<V> awaitUninterruptibly() {
|
||||
if (isDone()) {
|
||||
return this;
|
||||
}
|
||||
@ -352,8 +357,8 @@ public class DefaultPromise implements Promise {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Promise setSuccess() {
|
||||
if (set(SUCCESS)) {
|
||||
public Promise<V> setSuccess(V result) {
|
||||
if (setSuccess0(result)) {
|
||||
notifyListeners();
|
||||
return this;
|
||||
}
|
||||
@ -361,8 +366,8 @@ public class DefaultPromise implements Promise {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean trySuccess() {
|
||||
if (set(SUCCESS)) {
|
||||
public boolean trySuccess(V result) {
|
||||
if (setSuccess0(result)) {
|
||||
notifyListeners();
|
||||
return true;
|
||||
}
|
||||
@ -370,8 +375,8 @@ public class DefaultPromise implements Promise {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Promise setFailure(Throwable cause) {
|
||||
if (set(cause)) {
|
||||
public Promise<V> setFailure(Throwable cause) {
|
||||
if (setFailure0(cause)) {
|
||||
notifyListeners();
|
||||
return this;
|
||||
}
|
||||
@ -380,14 +385,14 @@ public class DefaultPromise implements Promise {
|
||||
|
||||
@Override
|
||||
public boolean tryFailure(Throwable cause) {
|
||||
if (set(cause)) {
|
||||
if (setFailure0(cause)) {
|
||||
notifyListeners();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean set(Throwable cause) {
|
||||
private boolean setFailure0(Throwable cause) {
|
||||
if (isDone()) {
|
||||
return false;
|
||||
}
|
||||
@ -398,7 +403,7 @@ public class DefaultPromise implements Promise {
|
||||
return false;
|
||||
}
|
||||
|
||||
this.cause = cause;
|
||||
result = new CauseHolder(cause);
|
||||
if (hasWaiters()) {
|
||||
notifyAll();
|
||||
}
|
||||
@ -406,6 +411,38 @@ public class DefaultPromise implements Promise {
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean setSuccess0(V result) {
|
||||
if (isDone()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
// Allow only once.
|
||||
if (isDone()) {
|
||||
return false;
|
||||
}
|
||||
if (result == null) {
|
||||
this.result = SUCCESS;
|
||||
} else {
|
||||
this.result = result;
|
||||
}
|
||||
if (hasWaiters()) {
|
||||
notifyAll();
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public V getNow() {
|
||||
Object result = this.result;
|
||||
if (result instanceof CauseHolder || result == SUCCESS) {
|
||||
return null;
|
||||
}
|
||||
return (V) result;
|
||||
}
|
||||
|
||||
private boolean hasWaiters() {
|
||||
return (state & 0xFFFFFF0000000000L) != 0;
|
||||
}
|
||||
@ -438,7 +475,7 @@ public class DefaultPromise implements Promise {
|
||||
if (listeners instanceof DefaultPromiseListeners) {
|
||||
notifyListeners0(this, (DefaultPromiseListeners) listeners);
|
||||
} else {
|
||||
notifyListener0(this, (GenericFutureListener<? extends Future>) listeners);
|
||||
notifyListener0(this, (GenericFutureListener<? extends Future<V>>) listeners);
|
||||
}
|
||||
listeners = null;
|
||||
} else {
|
||||
@ -450,24 +487,26 @@ public class DefaultPromise implements Promise {
|
||||
if (listeners instanceof DefaultPromiseListeners) {
|
||||
notifyListeners0(DefaultPromise.this, (DefaultPromiseListeners) listeners);
|
||||
} else {
|
||||
notifyListener0(DefaultPromise.this, (GenericFutureListener<? extends Future>) listeners);
|
||||
notifyListener0(DefaultPromise.this, (GenericFutureListener<? extends Future<V>>) listeners);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private static void notifyListeners0(final Future future,
|
||||
@SuppressWarnings("unchecked")
|
||||
private static void notifyListeners0(final Future<?> future,
|
||||
DefaultPromiseListeners listeners) {
|
||||
final GenericFutureListener<? extends Future>[] a = listeners.listeners();
|
||||
final GenericFutureListener<? extends Future<?>>[] a = listeners.listeners();
|
||||
final int size = listeners.size();
|
||||
for (int i = 0; i < size; i ++) {
|
||||
notifyListener0(future, a[i]);
|
||||
}
|
||||
}
|
||||
|
||||
public static void notifyListener(final EventExecutor eventExecutor, final Future future,
|
||||
final GenericFutureListener<? extends Future> l) {
|
||||
@SuppressWarnings("unchecked")
|
||||
public static void notifyListener(final EventExecutor eventExecutor, final Future<?> future,
|
||||
final GenericFutureListener<? extends Future<?>> l) {
|
||||
if (eventExecutor.inEventLoop()) {
|
||||
final Integer stackDepth = LISTENER_STACK_DEPTH.get();
|
||||
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
|
||||
@ -501,4 +540,11 @@ public class DefaultPromise implements Promise {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static final class CauseHolder {
|
||||
final Throwable cause;
|
||||
private CauseHolder(Throwable cause) {
|
||||
this.cause = cause;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -20,19 +20,19 @@ import java.util.Arrays;
|
||||
import java.util.EventListener;
|
||||
|
||||
final class DefaultPromiseListeners {
|
||||
private GenericFutureListener<? extends Future>[] listeners;
|
||||
private GenericFutureListener<? extends Future<?>>[] listeners;
|
||||
private int size;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
DefaultPromiseListeners(GenericFutureListener<? extends Future> firstListener,
|
||||
GenericFutureListener<? extends Future> secondListener) {
|
||||
DefaultPromiseListeners(GenericFutureListener<? extends Future<?>> firstListener,
|
||||
GenericFutureListener<? extends Future<?>> secondListener) {
|
||||
|
||||
listeners = new GenericFutureListener[] { firstListener, secondListener };
|
||||
size = 2;
|
||||
}
|
||||
|
||||
void add(GenericFutureListener<? extends Future> l) {
|
||||
GenericFutureListener<? extends Future>[] listeners = this.listeners;
|
||||
void add(GenericFutureListener<? extends Future<?>> l) {
|
||||
GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
|
||||
final int size = this.size;
|
||||
if (size == listeners.length) {
|
||||
this.listeners = listeners = Arrays.copyOf(listeners, size << 1);
|
||||
@ -57,7 +57,7 @@ final class DefaultPromiseListeners {
|
||||
}
|
||||
}
|
||||
|
||||
GenericFutureListener<? extends Future>[] listeners() {
|
||||
GenericFutureListener<? extends Future<?>>[] listeners() {
|
||||
return listeners;
|
||||
}
|
||||
|
||||
|
@ -15,7 +15,9 @@
|
||||
*/
|
||||
package io.netty.util.concurrent;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* The {@link EventExecutor} is a special {@link ScheduledExecutorService} which comes
|
||||
@ -24,7 +26,7 @@ import java.util.concurrent.ScheduledExecutorService;
|
||||
* access methods.
|
||||
*
|
||||
*/
|
||||
public interface EventExecutor extends EventExecutorGroup, ScheduledExecutorService, FutureFactory {
|
||||
public interface EventExecutor extends EventExecutorGroup, ScheduledExecutorService {
|
||||
|
||||
/**
|
||||
* Returns a reference to itself.
|
||||
@ -47,4 +49,44 @@ public interface EventExecutor extends EventExecutorGroup, ScheduledExecutorServ
|
||||
* {@code false} otherwise.
|
||||
*/
|
||||
boolean inEventLoop(Thread thread);
|
||||
|
||||
/**
|
||||
* Return a new {@link Promise}.
|
||||
*/
|
||||
<V> Promise<V> newPromise();
|
||||
|
||||
/**
|
||||
* Create a new {@link Future} which is marked as successes already. So {@link Future#isSuccess()}
|
||||
* will return {@code true}. All {@link FutureListener} added to it will be notified directly. Also
|
||||
* every call of blocking methods will just return without blocking.
|
||||
*/
|
||||
<V> Future<V> newSucceededFuture(V result);
|
||||
|
||||
/**
|
||||
* Create a new {@link Future} which is marked as fakued already. So {@link Future#isSuccess()}
|
||||
* will return {@code false}. All {@link FutureListener} added to it will be notified directly. Also
|
||||
* every call of blocking methods will just return without blocking.
|
||||
*/
|
||||
<V> Future<V> newFailedFuture(Throwable cause);
|
||||
|
||||
@Override
|
||||
Future<?> submit(Runnable task);
|
||||
|
||||
@Override
|
||||
<T> Future<T> submit(Runnable task, T result);
|
||||
|
||||
@Override
|
||||
<T> Future<T> submit(Callable<T> task);
|
||||
|
||||
@Override
|
||||
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
|
||||
|
||||
@Override
|
||||
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
|
||||
|
||||
@Override
|
||||
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
|
||||
|
||||
@Override
|
||||
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
|
||||
}
|
||||
|
@ -22,7 +22,7 @@ import io.netty.util.internal.PlatformDependent;
|
||||
* recommended to use {@link EventExecutor#newFailedFuture(Throwable)}
|
||||
* instead of calling the constructor of this future.
|
||||
*/
|
||||
public final class FailedFuture extends CompleteFuture {
|
||||
public final class FailedFuture<V> extends CompleteFuture<V> {
|
||||
|
||||
private final Throwable cause;
|
||||
|
||||
@ -51,14 +51,19 @@ public final class FailedFuture extends CompleteFuture {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future sync() {
|
||||
public Future<V> sync() {
|
||||
PlatformDependent.throwException(cause);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future syncUninterruptibly() {
|
||||
public Future<V> syncUninterruptibly() {
|
||||
PlatformDependent.throwException(cause);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public V getNow() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
@ -15,19 +15,14 @@
|
||||
*/
|
||||
package io.netty.util.concurrent;
|
||||
|
||||
import java.util.concurrent.CancellationException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
||||
/**
|
||||
* The result of an asynchronous operation.
|
||||
*/
|
||||
public interface Future {
|
||||
|
||||
/**
|
||||
* Returns {@code true} if and only if this future is
|
||||
* complete, regardless of whether the operation was successful or failed.
|
||||
*/
|
||||
boolean isDone();
|
||||
public interface Future<V> extends java.util.concurrent.Future<V> {
|
||||
|
||||
/**
|
||||
* Returns {@code true} if and only if the I/O operation was completed
|
||||
@ -51,7 +46,7 @@ public interface Future {
|
||||
* {@linkplain #isDone() done}. If this future is already
|
||||
* completed, the specified listener is notified immediately.
|
||||
*/
|
||||
Future addListener(GenericFutureListener<? extends Future> listener);
|
||||
Future<V> addListener(GenericFutureListener<? extends Future<V>> listener);
|
||||
|
||||
/**
|
||||
* Adds the specified listeners to this future. The
|
||||
@ -59,7 +54,7 @@ public interface Future {
|
||||
* {@linkplain #isDone() done}. If this future is already
|
||||
* completed, the specified listeners are notified immediately.
|
||||
*/
|
||||
Future addListeners(GenericFutureListener<? extends Future>... listeners);
|
||||
Future<V> addListeners(GenericFutureListener<? extends Future<V>>... listeners);
|
||||
|
||||
/**
|
||||
* Removes the specified listener from this future.
|
||||
@ -68,7 +63,7 @@ public interface Future {
|
||||
* listener is not associated with this future, this method
|
||||
* does nothing and returns silently.
|
||||
*/
|
||||
Future removeListener(GenericFutureListener<? extends Future> listener);
|
||||
Future<V> removeListener(GenericFutureListener<? extends Future<V>> listener);
|
||||
|
||||
/**
|
||||
* Removes the specified listeners from this future.
|
||||
@ -77,19 +72,19 @@ public interface Future {
|
||||
* listeners are not associated with this future, this method
|
||||
* does nothing and returns silently.
|
||||
*/
|
||||
Future removeListeners(GenericFutureListener<? extends Future>... listeners);
|
||||
Future<V> removeListeners(GenericFutureListener<? extends Future<V>>... listeners);
|
||||
|
||||
/**
|
||||
* Waits for this future until it is done, and rethrows the cause of the failure if this future
|
||||
* failed.
|
||||
*/
|
||||
Future sync() throws InterruptedException;
|
||||
Future<V> sync() throws InterruptedException;
|
||||
|
||||
/**
|
||||
* Waits for this future until it is done, and rethrows the cause of the failure if this future
|
||||
* failed.
|
||||
*/
|
||||
Future syncUninterruptibly();
|
||||
Future<V> syncUninterruptibly();
|
||||
|
||||
/**
|
||||
* Waits for this future to be completed.
|
||||
@ -97,14 +92,14 @@ public interface Future {
|
||||
* @throws InterruptedException
|
||||
* if the current thread was interrupted
|
||||
*/
|
||||
Future await() throws InterruptedException;
|
||||
Future<V> await() throws InterruptedException;
|
||||
|
||||
/**
|
||||
* Waits for this future to be completed without
|
||||
* interruption. This method catches an {@link InterruptedException} and
|
||||
* discards it silently.
|
||||
*/
|
||||
Future awaitUninterruptibly();
|
||||
Future<V> awaitUninterruptibly();
|
||||
|
||||
/**
|
||||
* Waits for this future to be completed within the
|
||||
@ -149,4 +144,20 @@ public interface Future {
|
||||
* the specified time limit
|
||||
*/
|
||||
boolean awaitUninterruptibly(long timeoutMillis);
|
||||
|
||||
/**
|
||||
* Return the result without blocking. If the future is not done yet this will return {@code null}.
|
||||
*
|
||||
* As it is possible that a {@code null} value is used to mark the future as successful you also need to check
|
||||
* if the future is really done with {@link #isDone()} and not relay on the returned {@code null} value.
|
||||
*/
|
||||
V getNow();
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*
|
||||
* If the cancelation was successful it will fail the future with an {@link CancellationException}.
|
||||
*/
|
||||
@Override
|
||||
boolean cancel(boolean mayInterruptIfRunning);
|
||||
}
|
||||
|
@ -1,40 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.util.concurrent;
|
||||
|
||||
/**
|
||||
* Allows to create {@link Future}'s on demand.
|
||||
*/
|
||||
public interface FutureFactory {
|
||||
/**
|
||||
* Return a new {@link Promise}.
|
||||
*/
|
||||
Promise newPromise();
|
||||
|
||||
/**
|
||||
* Create a new {@link Future} which is marked as successes already. So {@link Future#isSuccess()}
|
||||
* will return {@code true}. All {@link FutureListener} added to it will be notified directly. Also
|
||||
* every call of blocking methods will just return without blocking.
|
||||
*/
|
||||
Future newSucceededFuture();
|
||||
|
||||
/**
|
||||
* Create a new {@link Future} which is marked as fakued already. So {@link Future#isSuccess()}
|
||||
* will return {@code false}. All {@link FutureListener} added to it will be notified directly. Also
|
||||
* every call of blocking methods will just return without blocking.
|
||||
*/
|
||||
Future newFailedFuture(Throwable cause);
|
||||
}
|
@ -25,4 +25,4 @@ package io.netty.util.concurrent;
|
||||
* });
|
||||
* </pre>
|
||||
*/
|
||||
public interface FutureListener extends GenericFutureListener<Future> { }
|
||||
public interface FutureListener<V> extends GenericFutureListener<Future<V>> { }
|
||||
|
@ -21,7 +21,7 @@ import java.util.EventListener;
|
||||
* Listens to the result of a {@link Future}. The result of the asynchronous operation is notified once this listener
|
||||
* is added by calling {@link Future#addListener(GenericFutureListener)}.
|
||||
*/
|
||||
public interface GenericFutureListener<F extends Future> extends EventListener {
|
||||
public interface GenericFutureListener<F extends Future<?>> extends EventListener {
|
||||
|
||||
/**
|
||||
* Invoked when the operation associated with the {@link Future} has been completed.
|
||||
|
@ -15,14 +15,8 @@
|
||||
*/
|
||||
package io.netty.util.concurrent;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.FutureTask;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
@ -74,87 +68,6 @@ public final class ImmediateEventExecutor extends AbstractEventExecutor {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Future<T> submit(Callable<T> task) {
|
||||
if (task == null) {
|
||||
throw new NullPointerException("task");
|
||||
}
|
||||
FutureTask<T> future = new FutureTask<T>(task);
|
||||
future.run();
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Future<T> submit(Runnable task, T result) {
|
||||
if (task == null) {
|
||||
throw new NullPointerException("task");
|
||||
}
|
||||
FutureTask<T> future = new FutureTask<T>(task, result);
|
||||
future.run();
|
||||
return future;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public Future<?> submit(Runnable task) {
|
||||
if (task == null) {
|
||||
throw new NullPointerException("task");
|
||||
}
|
||||
FutureTask<?> future = new FutureTask(task, null);
|
||||
future.run();
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
|
||||
if (tasks == null) {
|
||||
throw new NullPointerException("tasks");
|
||||
}
|
||||
List<Future<T>> futures = new ArrayList<Future<T>>();
|
||||
for (Callable<T> task: tasks) {
|
||||
futures.add(submit(task));
|
||||
}
|
||||
return futures;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
|
||||
long timeout, TimeUnit unit) {
|
||||
if (tasks == null) {
|
||||
throw new NullPointerException("tasks");
|
||||
}
|
||||
|
||||
List<Future<T>> futures = new ArrayList<Future<T>>();
|
||||
for (Callable<T> task: tasks) {
|
||||
futures.add(submit(task));
|
||||
}
|
||||
return futures;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
|
||||
throws InterruptedException, ExecutionException {
|
||||
if (tasks == null) {
|
||||
throw new NullPointerException("tasks");
|
||||
}
|
||||
if (tasks.isEmpty()) {
|
||||
throw new IllegalArgumentException("tasks must be non empty");
|
||||
}
|
||||
return invokeAll(tasks).get(0).get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
|
||||
throws InterruptedException, ExecutionException {
|
||||
if (tasks == null) {
|
||||
throw new NullPointerException("tasks");
|
||||
}
|
||||
if (tasks.isEmpty()) {
|
||||
throw new IllegalArgumentException("tasks must be non empty");
|
||||
}
|
||||
return invokeAll(tasks).get(0).get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Runnable command) {
|
||||
if (command == null) {
|
||||
|
@ -18,7 +18,7 @@ package io.netty.util.concurrent;
|
||||
/**
|
||||
* Special {@link Future} which is writable.
|
||||
*/
|
||||
public interface Promise extends Future {
|
||||
public interface Promise<V> extends Future<V> {
|
||||
|
||||
/**
|
||||
* Marks this future as a success and notifies all
|
||||
@ -26,7 +26,7 @@ public interface Promise extends Future {
|
||||
*
|
||||
* If it is success or failed already it will throw an {@link IllegalStateException}.
|
||||
*/
|
||||
Promise setSuccess();
|
||||
Promise<V> setSuccess(V result);
|
||||
|
||||
/**
|
||||
* Marks this future as a success and notifies all
|
||||
@ -36,7 +36,7 @@ public interface Promise extends Future {
|
||||
* a success. Otherwise {@code false} because this future is
|
||||
* already marked as either a success or a failure.
|
||||
*/
|
||||
boolean trySuccess();
|
||||
boolean trySuccess(V result);
|
||||
|
||||
/**
|
||||
* Marks this future as a failure and notifies all
|
||||
@ -44,7 +44,7 @@ public interface Promise extends Future {
|
||||
*
|
||||
* If it is success or failed already it will throw an {@link IllegalStateException}.
|
||||
*/
|
||||
Promise setFailure(Throwable cause);
|
||||
Promise<V> setFailure(Throwable cause);
|
||||
|
||||
/**
|
||||
* Marks this future as a failure and notifies all
|
||||
@ -57,26 +57,26 @@ public interface Promise extends Future {
|
||||
boolean tryFailure(Throwable cause);
|
||||
|
||||
@Override
|
||||
Promise addListener(GenericFutureListener<? extends Future> listener);
|
||||
Promise<V> addListener(GenericFutureListener<? extends Future<V>> listener);
|
||||
|
||||
@Override
|
||||
Promise addListeners(GenericFutureListener<? extends Future>... listeners);
|
||||
Promise<V> addListeners(GenericFutureListener<? extends Future<V>>... listeners);
|
||||
|
||||
@Override
|
||||
Promise removeListener(GenericFutureListener<? extends Future> listener);
|
||||
Promise<V> removeListener(GenericFutureListener<? extends Future<V>> listener);
|
||||
|
||||
@Override
|
||||
Promise removeListeners(GenericFutureListener<? extends Future>... listeners);
|
||||
Promise<V> removeListeners(GenericFutureListener<? extends Future<V>>... listeners);
|
||||
|
||||
@Override
|
||||
Promise await() throws InterruptedException;
|
||||
Promise<V> await() throws InterruptedException;
|
||||
|
||||
@Override
|
||||
Promise awaitUninterruptibly();
|
||||
Promise<V> awaitUninterruptibly();
|
||||
|
||||
@Override
|
||||
Promise sync() throws InterruptedException;
|
||||
Promise<V> sync() throws InterruptedException;
|
||||
|
||||
@Override
|
||||
Promise syncUninterruptibly();
|
||||
Promise<V> syncUninterruptibly();
|
||||
}
|
||||
|
@ -0,0 +1,91 @@
|
||||
/*
|
||||
* Copyright 2013 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.util.concurrent;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.RunnableFuture;
|
||||
|
||||
class PromiseTask<V> extends DefaultPromise<V> implements RunnableFuture<V> {
|
||||
protected final Callable<V> task;
|
||||
|
||||
PromiseTask(EventExecutor executor, Runnable runnable, V result) {
|
||||
this(executor, Executors.callable(runnable, result));
|
||||
}
|
||||
|
||||
PromiseTask(EventExecutor executor, Callable<V> callable) {
|
||||
super(executor);
|
||||
task = callable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return System.identityHashCode(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
return this == obj;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
V result = task.call();
|
||||
setSuccessInternal(result);
|
||||
} catch (Throwable e) {
|
||||
setFailureInternal(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Promise<V> setFailure(Throwable cause) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
protected final Promise<V> setFailureInternal(Throwable cause) {
|
||||
super.setFailure(cause);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean tryFailure(Throwable cause) {
|
||||
return false;
|
||||
}
|
||||
|
||||
protected final boolean tryFailureInternal(Throwable cause) {
|
||||
return super.tryFailure(cause);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Promise<V> setSuccess(V result) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
protected final Promise<V> setSuccessInternal(V result) {
|
||||
super.setSuccess(result);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean trySuccess(V result) {
|
||||
return false;
|
||||
}
|
||||
|
||||
protected final boolean trySuccessInternal(V result) {
|
||||
return super.trySuccess(result);
|
||||
}
|
||||
}
|
@ -0,0 +1,22 @@
|
||||
/*
|
||||
* Copyright 2013 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.util.concurrent;
|
||||
|
||||
/**
|
||||
* The result of an scheduled asynchronous operation.
|
||||
*/
|
||||
public interface ScheduledFuture<V> extends Future<V>, java.util.concurrent.ScheduledFuture<V> {
|
||||
}
|
@ -17,18 +17,20 @@ package io.netty.util.concurrent;
|
||||
|
||||
/**
|
||||
* The {@link CompleteFuture} which is succeeded already. It is
|
||||
* recommended to use {@link EventExecutor#newSucceededFuture()} instead of
|
||||
* recommended to use {@link EventExecutor#newSucceededFuture(Object)} instead of
|
||||
* calling the constructor of this future.
|
||||
*/
|
||||
public final class SucceededFuture extends CompleteFuture {
|
||||
public final class SucceededFuture<V> extends CompleteFuture<V> {
|
||||
private final V result;
|
||||
|
||||
/**
|
||||
* Creates a new instance.
|
||||
*
|
||||
* @param executor the {@link EventExecutor} associated with this future
|
||||
*/
|
||||
public SucceededFuture(EventExecutor executor) {
|
||||
public SucceededFuture(EventExecutor executor, V result) {
|
||||
super(executor);
|
||||
this.result = result;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -40,4 +42,9 @@ public final class SucceededFuture extends CompleteFuture {
|
||||
public boolean isSuccess() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public V getNow() {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
@ -21,14 +21,15 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CancellationException;
|
||||
import java.util.concurrent.DelayQueue;
|
||||
import java.util.concurrent.Delayed;
|
||||
import java.util.concurrent.FutureTask;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
public final class TaskScheduler {
|
||||
@ -39,7 +40,6 @@ public final class TaskScheduler {
|
||||
private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
|
||||
private static final long START_TIME = System.nanoTime();
|
||||
private static final AtomicLong nextTaskId = new AtomicLong();
|
||||
|
||||
private static long nanoTime() {
|
||||
return System.nanoTime() - START_TIME;
|
||||
}
|
||||
@ -96,7 +96,7 @@ public final class TaskScheduler {
|
||||
}
|
||||
|
||||
private void runTask(ScheduledFutureTask<?> task) {
|
||||
EventExecutor executor = task.executor;
|
||||
EventExecutor executor = task.executor();
|
||||
if (executor == null) {
|
||||
task.run();
|
||||
} else {
|
||||
@ -104,7 +104,7 @@ public final class TaskScheduler {
|
||||
task.cancel(false);
|
||||
} else {
|
||||
try {
|
||||
task.executor.execute(task);
|
||||
executor.execute(task);
|
||||
} catch (RejectedExecutionException e) {
|
||||
task.cancel(false);
|
||||
}
|
||||
@ -210,7 +210,8 @@ public final class TaskScheduler {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("delay: %d (expected: >= 0)", delay));
|
||||
}
|
||||
return schedule(new ScheduledFutureTask<Void>(executor, command, null, deadlineNanos(unit.toNanos(delay))));
|
||||
return schedule(new ScheduledFutureTask<Void>(this, executor,
|
||||
command, null, deadlineNanos(unit.toNanos(delay))));
|
||||
}
|
||||
|
||||
public <V> ScheduledFuture<V> schedule(
|
||||
@ -228,7 +229,7 @@ public final class TaskScheduler {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("delay: %d (expected: >= 0)", delay));
|
||||
}
|
||||
return schedule(new ScheduledFutureTask<V>(executor, callable, deadlineNanos(unit.toNanos(delay))));
|
||||
return schedule(new ScheduledFutureTask<V>(this, executor, callable, deadlineNanos(unit.toNanos(delay))));
|
||||
}
|
||||
|
||||
public ScheduledFuture<?> scheduleAtFixedRate(
|
||||
@ -252,7 +253,8 @@ public final class TaskScheduler {
|
||||
}
|
||||
|
||||
return schedule(new ScheduledFutureTask<Void>(
|
||||
executor, command, null, deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
|
||||
this, executor, Executors.<Void>callable(command, null),
|
||||
deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
|
||||
}
|
||||
|
||||
public ScheduledFuture<?> scheduleWithFixedDelay(
|
||||
@ -276,7 +278,8 @@ public final class TaskScheduler {
|
||||
}
|
||||
|
||||
return schedule(new ScheduledFutureTask<Void>(
|
||||
executor, command, null, deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
|
||||
this, executor, Executors.<Void>callable(command, null),
|
||||
deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
|
||||
}
|
||||
|
||||
private <V> ScheduledFuture<V> schedule(ScheduledFutureTask<V> task) {
|
||||
@ -300,8 +303,8 @@ public final class TaskScheduler {
|
||||
}
|
||||
|
||||
if (started) {
|
||||
schedule(new ScheduledFutureTask<Void>(
|
||||
null, new PurgeTask(), null,
|
||||
schedule(new ScheduledFutureTask<V>(this, new ImmediateEventExecutor(this)
|
||||
, Executors.<V>callable(new PurgeTask(), null),
|
||||
deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
|
||||
}
|
||||
|
||||
@ -324,36 +327,37 @@ public final class TaskScheduler {
|
||||
taskQueue.clear();
|
||||
}
|
||||
|
||||
private class ScheduledFutureTask<V> extends FutureTask<V> implements ScheduledFuture<V> {
|
||||
private static class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {
|
||||
|
||||
private final EventExecutor executor;
|
||||
private final long id = nextTaskId.getAndIncrement();
|
||||
private long deadlineNanos;
|
||||
/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
|
||||
private final long periodNanos;
|
||||
private final TaskScheduler scheduler;
|
||||
|
||||
ScheduledFutureTask(EventExecutor executor, Runnable runnable, V result, long nanoTime) {
|
||||
super(runnable, result);
|
||||
this.executor = executor;
|
||||
deadlineNanos = nanoTime;
|
||||
periodNanos = 0;
|
||||
private final AtomicBoolean cancellable = new AtomicBoolean(true);
|
||||
|
||||
ScheduledFutureTask(TaskScheduler scheduler, EventExecutor executor,
|
||||
Runnable runnable, V result, long nanoTime) {
|
||||
this(scheduler, executor, Executors.callable(runnable, result), nanoTime);
|
||||
}
|
||||
|
||||
ScheduledFutureTask(EventExecutor executor, Runnable runnable, V result, long nanoTime, long period) {
|
||||
super(runnable, result);
|
||||
ScheduledFutureTask(TaskScheduler scheduler, EventExecutor executor,
|
||||
Callable<V> callable, long nanoTime, long period) {
|
||||
super(executor, callable);
|
||||
if (period == 0) {
|
||||
throw new IllegalArgumentException("period: 0 (expected: != 0)");
|
||||
}
|
||||
this.executor = executor;
|
||||
deadlineNanos = nanoTime;
|
||||
periodNanos = period;
|
||||
this.scheduler = scheduler;
|
||||
}
|
||||
|
||||
ScheduledFutureTask(EventExecutor executor, Callable<V> callable, long nanoTime) {
|
||||
super(callable);
|
||||
this.executor = executor;
|
||||
ScheduledFutureTask(TaskScheduler scheduler, EventExecutor executor, Callable<V> callable, long nanoTime) {
|
||||
super(executor, callable);
|
||||
deadlineNanos = nanoTime;
|
||||
periodNanos = 0;
|
||||
this.scheduler = scheduler;
|
||||
}
|
||||
|
||||
public long deadlineNanos() {
|
||||
@ -369,16 +373,6 @@ public final class TaskScheduler {
|
||||
return unit.convert(delayNanos(), TimeUnit.NANOSECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return System.identityHashCode(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
return this == obj;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(Delayed o) {
|
||||
if (this == o) {
|
||||
@ -402,21 +396,47 @@ public final class TaskScheduler {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (periodNanos == 0) {
|
||||
super.run();
|
||||
} else {
|
||||
boolean reset = runAndReset();
|
||||
if (reset && !isShutdown()) {
|
||||
long p = periodNanos;
|
||||
if (p > 0) {
|
||||
deadlineNanos += p;
|
||||
} else {
|
||||
deadlineNanos = nanoTime() - p;
|
||||
try {
|
||||
if (periodNanos == 0) {
|
||||
if (cancellable.compareAndSet(true, false)) {
|
||||
V result = task.call();
|
||||
setSuccessInternal(result);
|
||||
}
|
||||
} else {
|
||||
task.call();
|
||||
if (!scheduler.isShutdown()) {
|
||||
long p = periodNanos;
|
||||
if (p > 0) {
|
||||
deadlineNanos += p;
|
||||
} else {
|
||||
deadlineNanos = nanoTime() - p;
|
||||
}
|
||||
if (!isDone()) {
|
||||
scheduler.schedule(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Throwable cause) {
|
||||
setFailureInternal(cause);
|
||||
}
|
||||
}
|
||||
|
||||
schedule(this);
|
||||
@Override
|
||||
public boolean isCancelled() {
|
||||
if (cause() instanceof CancellationException) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean cancel(boolean mayInterruptIfRunning) {
|
||||
if (!isDone()) {
|
||||
if (cancellable.compareAndSet(true, false)) {
|
||||
return tryFailureInternal(new CancellationException());
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -162,7 +162,7 @@ import java.util.concurrent.TimeUnit;
|
||||
* }
|
||||
* </pre>
|
||||
*/
|
||||
public interface ChannelFuture extends Future {
|
||||
public interface ChannelFuture extends Future<Void> {
|
||||
|
||||
/**
|
||||
* Returns a channel where the I/O operation associated with this
|
||||
@ -171,16 +171,16 @@ public interface ChannelFuture extends Future {
|
||||
Channel channel();
|
||||
|
||||
@Override
|
||||
ChannelFuture addListener(GenericFutureListener<? extends Future> listener);
|
||||
ChannelFuture addListener(GenericFutureListener<? extends Future<Void>> listener);
|
||||
|
||||
@Override
|
||||
ChannelFuture addListeners(GenericFutureListener<? extends Future>... listeners);
|
||||
ChannelFuture addListeners(GenericFutureListener<? extends Future<Void>>... listeners);
|
||||
|
||||
@Override
|
||||
ChannelFuture removeListener(GenericFutureListener<? extends Future> listener);
|
||||
ChannelFuture removeListener(GenericFutureListener<? extends Future<Void>> listener);
|
||||
|
||||
@Override
|
||||
ChannelFuture removeListeners(GenericFutureListener<? extends Future>... listeners);
|
||||
ChannelFuture removeListeners(GenericFutureListener<? extends Future<Void>>... listeners);
|
||||
|
||||
@Override
|
||||
ChannelFuture sync() throws InterruptedException;
|
||||
|
@ -22,25 +22,32 @@ import io.netty.util.concurrent.Promise;
|
||||
/**
|
||||
* Special {@link ChannelFuture} which is writable.
|
||||
*/
|
||||
public interface ChannelPromise extends ChannelFuture, Promise {
|
||||
public interface ChannelPromise extends ChannelFuture, Promise<Void> {
|
||||
|
||||
@Override
|
||||
Channel channel();
|
||||
|
||||
@Override
|
||||
ChannelPromise setSuccess(Void result);
|
||||
|
||||
ChannelPromise setSuccess();
|
||||
|
||||
boolean trySuccess();
|
||||
|
||||
@Override
|
||||
ChannelPromise setFailure(Throwable cause);
|
||||
|
||||
@Override
|
||||
ChannelPromise addListener(GenericFutureListener<? extends Future> listener);
|
||||
ChannelPromise addListener(GenericFutureListener<? extends Future<Void>> listener);
|
||||
|
||||
@Override
|
||||
ChannelPromise addListeners(GenericFutureListener<? extends Future>... listeners);
|
||||
ChannelPromise addListeners(GenericFutureListener<? extends Future<Void>>... listeners);
|
||||
|
||||
@Override
|
||||
ChannelPromise removeListener(GenericFutureListener<? extends Future> listener);
|
||||
ChannelPromise removeListener(GenericFutureListener<? extends Future<Void>> listener);
|
||||
|
||||
@Override
|
||||
ChannelPromise removeListeners(GenericFutureListener<? extends Future>... listeners);
|
||||
ChannelPromise removeListeners(GenericFutureListener<? extends Future<Void>>... listeners);
|
||||
|
||||
@Override
|
||||
ChannelPromise sync() throws InterruptedException;
|
||||
|
@ -17,12 +17,13 @@ package io.netty.channel;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.util.concurrent.FutureFactory;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.FutureListener;
|
||||
|
||||
/**
|
||||
* Provides common methods between {@link Channel} and {@link ChannelHandlerContext}.
|
||||
*/
|
||||
interface ChannelPropertyAccess extends FutureFactory {
|
||||
interface ChannelPropertyAccess {
|
||||
|
||||
/**
|
||||
* Return the assigned {@link ChannelPipeline}
|
||||
@ -34,12 +35,22 @@ interface ChannelPropertyAccess extends FutureFactory {
|
||||
*/
|
||||
ByteBufAllocator alloc();
|
||||
|
||||
@Override
|
||||
/**
|
||||
* Return a new {@link ChannelPromise}.
|
||||
*/
|
||||
ChannelPromise newPromise();
|
||||
|
||||
@Override
|
||||
/**
|
||||
* Create a new {@link ChannelFuture} which is marked as successes already. So {@link ChannelFuture#isSuccess()}
|
||||
* will return {@code true}. All {@link FutureListener} added to it will be notified directly. Also
|
||||
* every call of blocking methods will just return without blocking.
|
||||
*/
|
||||
ChannelFuture newSucceededFuture();
|
||||
|
||||
@Override
|
||||
/**
|
||||
* Create a new {@link ChannelFuture} which is marked as fakued already. So {@link Future#isSuccess()}
|
||||
* will return {@code false}. All {@link FutureListener} added to it will be notified directly. Also
|
||||
* every call of blocking methods will just return without blocking.
|
||||
*/
|
||||
ChannelFuture newFailedFuture(Throwable cause);
|
||||
}
|
||||
|
@ -24,7 +24,7 @@ import io.netty.util.concurrent.GenericFutureListener;
|
||||
* A skeletal {@link ChannelFuture} implementation which represents a
|
||||
* {@link ChannelFuture} which has been completed already.
|
||||
*/
|
||||
abstract class CompleteChannelFuture extends CompleteFuture implements ChannelFuture {
|
||||
abstract class CompleteChannelFuture extends CompleteFuture<Void> implements ChannelFuture {
|
||||
|
||||
private final Channel channel;
|
||||
|
||||
@ -52,25 +52,25 @@ abstract class CompleteChannelFuture extends CompleteFuture implements ChannelFu
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture addListener(GenericFutureListener<? extends Future> listener) {
|
||||
public ChannelFuture addListener(GenericFutureListener<? extends Future<Void>> listener) {
|
||||
super.addListener(listener);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture addListeners(GenericFutureListener<? extends Future>... listeners) {
|
||||
public ChannelFuture addListeners(GenericFutureListener<? extends Future<Void>>... listeners) {
|
||||
super.addListeners(listeners);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture removeListener(GenericFutureListener<? extends Future> listener) {
|
||||
public ChannelFuture removeListener(GenericFutureListener<? extends Future<Void>> listener) {
|
||||
super.removeListener(listener);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture removeListeners(GenericFutureListener<? extends Future>... listeners) {
|
||||
public ChannelFuture removeListeners(GenericFutureListener<? extends Future<Void>>... listeners) {
|
||||
super.removeListeners(listeners);
|
||||
return this;
|
||||
}
|
||||
@ -99,4 +99,9 @@ abstract class CompleteChannelFuture extends CompleteFuture implements ChannelFu
|
||||
public Channel channel() {
|
||||
return channel;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void getNow() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
@ -46,6 +46,16 @@ abstract class CompleteChannelPromise extends CompleteChannelFuture implements C
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean trySuccess(Void result) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise setSuccess(Void result) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise await() throws InterruptedException {
|
||||
return (ChannelPromise) super.await();
|
||||
@ -57,22 +67,22 @@ abstract class CompleteChannelPromise extends CompleteChannelFuture implements C
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise addListener(GenericFutureListener<? extends Future> listener) {
|
||||
public ChannelPromise addListener(GenericFutureListener<? extends Future<Void>> listener) {
|
||||
return (ChannelPromise) super.addListener(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise addListeners(GenericFutureListener<? extends Future>... listeners) {
|
||||
public ChannelPromise addListeners(GenericFutureListener<? extends Future<Void>>... listeners) {
|
||||
return (ChannelPromise) super.addListeners(listeners);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise removeListener(GenericFutureListener<? extends Future> listener) {
|
||||
public ChannelPromise removeListener(GenericFutureListener<? extends Future<Void>> listener) {
|
||||
return (ChannelPromise) super.removeListener(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise removeListeners(GenericFutureListener<? extends Future>... listeners) {
|
||||
public ChannelPromise removeListeners(GenericFutureListener<? extends Future<Void>>... listeners) {
|
||||
return (ChannelPromise) super.removeListeners(listeners);
|
||||
}
|
||||
|
||||
|
@ -25,7 +25,7 @@ import io.netty.util.concurrent.GenericFutureListener;
|
||||
* The default {@link ChannelPromise} implementation. It is recommended to use {@link Channel#newPromise()} to create
|
||||
* a new {@link ChannelPromise} rather than calling the constructor explicitly.
|
||||
*/
|
||||
public class DefaultChannelPromise extends DefaultPromise implements ChannelPromise, FlushCheckpoint {
|
||||
public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {
|
||||
|
||||
private final Channel channel;
|
||||
|
||||
@ -67,10 +67,25 @@ public class DefaultChannelPromise extends DefaultPromise implements ChannelProm
|
||||
|
||||
@Override
|
||||
public ChannelPromise setSuccess() {
|
||||
super.setSuccess();
|
||||
return setSuccess(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise setSuccess(Void result) {
|
||||
super.setSuccess(result);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean trySuccess() {
|
||||
return trySuccess(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean trySuccess(Void result) {
|
||||
return super.trySuccess(result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise setFailure(Throwable cause) {
|
||||
super.setFailure(cause);
|
||||
@ -78,25 +93,25 @@ public class DefaultChannelPromise extends DefaultPromise implements ChannelProm
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise addListener(GenericFutureListener<? extends Future> listener) {
|
||||
public ChannelPromise addListener(GenericFutureListener<? extends Future<Void>> listener) {
|
||||
super.addListener(listener);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise addListeners(GenericFutureListener<? extends Future>... listeners) {
|
||||
public ChannelPromise addListeners(GenericFutureListener<? extends Future<Void>>... listeners) {
|
||||
super.addListeners(listeners);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise removeListener(GenericFutureListener<? extends Future> listener) {
|
||||
public ChannelPromise removeListener(GenericFutureListener<? extends Future<Void>> listener) {
|
||||
super.removeListener(listener);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise removeListeners(GenericFutureListener<? extends Future>... listeners) {
|
||||
public ChannelPromise removeListeners(GenericFutureListener<? extends Future<Void>>... listeners) {
|
||||
super.removeListeners(listeners);
|
||||
return this;
|
||||
}
|
||||
|
@ -15,12 +15,13 @@
|
||||
*/
|
||||
package io.netty.channel;
|
||||
|
||||
import io.netty.util.concurrent.AbstractFuture;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.GenericFutureListener;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
final class VoidChannelPromise implements ChannelFuture.Unsafe, ChannelPromise {
|
||||
final class VoidChannelPromise extends AbstractFuture<Void> implements ChannelFuture.Unsafe, ChannelPromise {
|
||||
|
||||
private final Channel channel;
|
||||
|
||||
@ -37,25 +38,25 @@ final class VoidChannelPromise implements ChannelFuture.Unsafe, ChannelPromise {
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise addListener(GenericFutureListener<? extends Future> listener) {
|
||||
public ChannelPromise addListener(GenericFutureListener<? extends Future<Void>> listener) {
|
||||
fail();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise addListeners(GenericFutureListener<? extends Future>... listeners) {
|
||||
public ChannelPromise addListeners(GenericFutureListener<? extends Future<Void>>... listeners) {
|
||||
fail();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise removeListener(GenericFutureListener<? extends Future> listener) {
|
||||
public ChannelPromise removeListener(GenericFutureListener<? extends Future<Void>> listener) {
|
||||
// NOOP
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise removeListeners(GenericFutureListener<? extends Future>... listeners) {
|
||||
public ChannelPromise removeListeners(GenericFutureListener<? extends Future<Void>>... listeners) {
|
||||
// NOOP
|
||||
return this;
|
||||
}
|
||||
@ -152,4 +153,19 @@ final class VoidChannelPromise implements ChannelFuture.Unsafe, ChannelPromise {
|
||||
private static void fail() {
|
||||
throw new IllegalStateException("void future");
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise setSuccess(Void result) {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean trySuccess(Void result) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void getNow() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
@ -20,24 +20,16 @@ import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.util.concurrent.DefaultPromise;
|
||||
import io.netty.util.concurrent.FailedFuture;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.Promise;
|
||||
import io.netty.util.concurrent.SucceededFuture;
|
||||
import io.netty.util.concurrent.AbstractEventExecutorWithoutScheduler;
|
||||
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.AbstractExecutorService;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
final class EmbeddedEventLoop extends AbstractExecutorService implements EventLoop {
|
||||
final class EmbeddedEventLoop extends AbstractEventExecutorWithoutScheduler implements EventLoop {
|
||||
|
||||
private final SucceededFuture succeededFuture = new SucceededFuture(this);
|
||||
private final Queue<Runnable> tasks = new ArrayDeque<Runnable>(2);
|
||||
|
||||
@Override
|
||||
@ -59,30 +51,6 @@ final class EmbeddedEventLoop extends AbstractExecutorService implements EventLo
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledFuture<?> schedule(Runnable command, long delay,
|
||||
TimeUnit unit) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay,
|
||||
TimeUnit unit) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
|
||||
long initialDelay, long period, TimeUnit unit) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
|
||||
long initialDelay, long delay, TimeUnit unit) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
// NOOP
|
||||
@ -140,19 +108,4 @@ final class EmbeddedEventLoop extends AbstractExecutorService implements EventLo
|
||||
public EventLoopGroup parent() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Promise newPromise() {
|
||||
return new DefaultPromise(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future newSucceededFuture() {
|
||||
return succeededFuture;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future newFailedFuture(Throwable cause) {
|
||||
return new FailedFuture(this, cause);
|
||||
}
|
||||
}
|
||||
|
@ -102,7 +102,7 @@ import java.util.Iterator;
|
||||
* make sure you do not call {@link #await()} in an I/O thread. Otherwise,
|
||||
* {@link IllegalStateException} will be raised to prevent a dead lock.
|
||||
*/
|
||||
public interface ChannelGroupFuture extends Future, Iterable<ChannelFuture> {
|
||||
public interface ChannelGroupFuture extends Future<Void>, Iterable<ChannelFuture> {
|
||||
|
||||
/**
|
||||
* Returns the {@link ChannelGroup} which is associated with this future.
|
||||
@ -151,16 +151,16 @@ public interface ChannelGroupFuture extends Future, Iterable<ChannelFuture> {
|
||||
boolean isPartialFailure();
|
||||
|
||||
@Override
|
||||
ChannelGroupFuture addListener(GenericFutureListener<? extends Future> listener);
|
||||
ChannelGroupFuture addListener(GenericFutureListener<? extends Future<Void>> listener);
|
||||
|
||||
@Override
|
||||
ChannelGroupFuture addListeners(GenericFutureListener<? extends Future>... listeners);
|
||||
ChannelGroupFuture addListeners(GenericFutureListener<? extends Future<Void>>... listeners);
|
||||
|
||||
@Override
|
||||
ChannelGroupFuture removeListener(GenericFutureListener<? extends Future> listener);
|
||||
ChannelGroupFuture removeListener(GenericFutureListener<? extends Future<Void>> listener);
|
||||
|
||||
@Override
|
||||
ChannelGroupFuture removeListeners(GenericFutureListener<? extends Future>... listeners);
|
||||
ChannelGroupFuture removeListeners(GenericFutureListener<? extends Future<Void>>... listeners);
|
||||
|
||||
@Override
|
||||
ChannelGroupFuture await() throws InterruptedException;
|
||||
|
@ -21,14 +21,9 @@ import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.FileRegion;
|
||||
import io.netty.channel.ServerChannel;
|
||||
import io.netty.util.concurrent.DefaultPromise;
|
||||
import io.netty.util.concurrent.AbstractEventExecutorWithoutScheduler;
|
||||
import io.netty.util.concurrent.EventExecutor;
|
||||
import io.netty.util.concurrent.EventExecutorGroup;
|
||||
import io.netty.util.concurrent.FailedFuture;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.ImmediateEventExecutor;
|
||||
import io.netty.util.concurrent.Promise;
|
||||
import io.netty.util.concurrent.SucceededFuture;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
|
||||
import java.util.AbstractSet;
|
||||
@ -39,13 +34,8 @@ import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.FutureTask;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
@ -323,12 +313,7 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
|
||||
"(name: " + name() + ", size: " + size() + ')';
|
||||
}
|
||||
|
||||
private static final class ImmediateEventExecutor implements EventExecutor {
|
||||
private final Future successedFuture = new SucceededFuture(this);
|
||||
@Override
|
||||
public EventExecutor next() {
|
||||
return this;
|
||||
}
|
||||
private static final class ImmediateEventExecutor extends AbstractEventExecutorWithoutScheduler {
|
||||
|
||||
@Override
|
||||
public EventExecutorGroup parent() {
|
||||
@ -360,133 +345,15 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
|
||||
public boolean awaitTermination(long timeout, TimeUnit unit) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Promise newPromise() {
|
||||
return new DefaultPromise(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future newSucceededFuture() {
|
||||
return successedFuture;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future newFailedFuture(Throwable cause) {
|
||||
return new FailedFuture(this, cause);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
|
||||
long period, TimeUnit unit) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
|
||||
long delay, TimeUnit unit) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Runnable> shutdownNow() {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> java.util.concurrent.Future<T> submit(Callable<T> task) {
|
||||
if (task == null) {
|
||||
throw new NullPointerException("task");
|
||||
}
|
||||
FutureTask<T> future = new FutureTask<T>(task);
|
||||
future.run();
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> java.util.concurrent.Future<T> submit(Runnable task, T result) {
|
||||
if (task == null) {
|
||||
throw new NullPointerException("task");
|
||||
}
|
||||
FutureTask<T> future = new FutureTask<T>(task, result);
|
||||
future.run();
|
||||
return future;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public java.util.concurrent.Future<?> submit(Runnable task) {
|
||||
if (task == null) {
|
||||
throw new NullPointerException("task");
|
||||
}
|
||||
FutureTask<?> future = new FutureTask(task, null);
|
||||
future.run();
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
|
||||
if (tasks == null) {
|
||||
throw new NullPointerException("tasks");
|
||||
}
|
||||
List<java.util.concurrent.Future<T>> futures = new ArrayList<java.util.concurrent.Future<T>>();
|
||||
for (Callable<T> task: tasks) {
|
||||
futures.add(submit(task));
|
||||
}
|
||||
return futures;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
|
||||
long timeout, TimeUnit unit) {
|
||||
if (tasks == null) {
|
||||
throw new NullPointerException("tasks");
|
||||
}
|
||||
|
||||
List<java.util.concurrent.Future<T>> futures = new ArrayList<java.util.concurrent.Future<T>>();
|
||||
for (Callable<T> task: tasks) {
|
||||
futures.add(submit(task));
|
||||
}
|
||||
return futures;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
|
||||
throws InterruptedException, ExecutionException {
|
||||
if (tasks == null) {
|
||||
throw new NullPointerException("tasks");
|
||||
}
|
||||
if (tasks.isEmpty()) {
|
||||
throw new IllegalArgumentException("tasks must be non empty");
|
||||
}
|
||||
return invokeAll(tasks).get(0).get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
|
||||
throws InterruptedException, ExecutionException {
|
||||
if (tasks == null) {
|
||||
throw new NullPointerException("tasks");
|
||||
}
|
||||
if (tasks.isEmpty()) {
|
||||
throw new IllegalArgumentException("tasks must be non empty");
|
||||
}
|
||||
return invokeAll(tasks).get(0).get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Runnable command) {
|
||||
if (command == null) {
|
||||
|
@ -22,7 +22,6 @@ import io.netty.util.concurrent.DefaultPromise;
|
||||
import io.netty.util.concurrent.EventExecutor;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.GenericFutureListener;
|
||||
import io.netty.util.concurrent.Promise;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
@ -36,7 +35,7 @@ import java.util.Map;
|
||||
/**
|
||||
* The default {@link ChannelGroupFuture} implementation.
|
||||
*/
|
||||
final class DefaultChannelGroupFuture extends DefaultPromise implements ChannelGroupFuture {
|
||||
final class DefaultChannelGroupFuture extends DefaultPromise<Void> implements ChannelGroupFuture {
|
||||
|
||||
private final ChannelGroup group;
|
||||
private final Map<Integer, ChannelFuture> futures;
|
||||
@ -152,25 +151,25 @@ final class DefaultChannelGroupFuture extends DefaultPromise implements ChannelG
|
||||
}
|
||||
|
||||
@Override
|
||||
public DefaultChannelGroupFuture addListener(GenericFutureListener<? extends Future> listener) {
|
||||
public DefaultChannelGroupFuture addListener(GenericFutureListener<? extends Future<Void>> listener) {
|
||||
super.addListener(listener);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DefaultChannelGroupFuture addListeners(GenericFutureListener<? extends Future>... listeners) {
|
||||
public DefaultChannelGroupFuture addListeners(GenericFutureListener<? extends Future<Void>>... listeners) {
|
||||
super.addListeners(listeners);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DefaultChannelGroupFuture removeListener(GenericFutureListener<? extends Future> listener) {
|
||||
public DefaultChannelGroupFuture removeListener(GenericFutureListener<? extends Future<Void>> listener) {
|
||||
super.removeListener(listener);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DefaultChannelGroupFuture removeListeners(GenericFutureListener<? extends Future>... listeners) {
|
||||
public DefaultChannelGroupFuture removeListeners(GenericFutureListener<? extends Future<Void>>... listeners) {
|
||||
super.removeListeners(listeners);
|
||||
return this;
|
||||
}
|
||||
@ -205,7 +204,7 @@ final class DefaultChannelGroupFuture extends DefaultPromise implements ChannelG
|
||||
}
|
||||
|
||||
private void setSuccess0() {
|
||||
super.setSuccess();
|
||||
super.setSuccess(null);
|
||||
}
|
||||
|
||||
private void setFailure0(ChannelGroupException cause) {
|
||||
@ -213,17 +212,17 @@ final class DefaultChannelGroupFuture extends DefaultPromise implements ChannelG
|
||||
}
|
||||
|
||||
@Override
|
||||
public Promise setSuccess() {
|
||||
public DefaultChannelGroupFuture setSuccess(Void result) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean trySuccess() {
|
||||
public boolean trySuccess(Void result) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Promise setFailure(Throwable cause) {
|
||||
public DefaultChannelGroupFuture setFailure(Throwable cause) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
|
@ -39,6 +39,7 @@ public class DefaultChannelPipelineTest {
|
||||
public void testMessageCatchAllInboundSink() throws Exception {
|
||||
LocalChannel channel = new LocalChannel();
|
||||
LocalEventLoopGroup group = new LocalEventLoopGroup();
|
||||
|
||||
group.register(channel).awaitUninterruptibly();
|
||||
final AtomicBoolean forwarded = new AtomicBoolean();
|
||||
final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel);
|
||||
|
Loading…
Reference in New Issue
Block a user