[#1065] Provide Future/Promise without channel reference

This commit is contained in:
Norman Maurer 2013-03-05 21:41:19 +01:00
parent 0f8fbac9f8
commit 88cc8c1739
68 changed files with 1616 additions and 906 deletions

View File

@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
package io.netty.util.concurrent;
/**
* An {@link IllegalStateException} which is raised when a user performed a blocking operation

View File

@ -0,0 +1,129 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import java.util.concurrent.TimeUnit;
/**
* A skeletal {@link Future} implementation which represents a {@link Future} which has been completed already.
*/
public abstract class CompleteFuture implements Future {
private final EventExecutor executor;
/**
* Creates a new instance.
*
* @param executor the {@link EventExecutor} associated with this future
*/
protected CompleteFuture(EventExecutor executor) {
this.executor = executor;
}
protected EventExecutor executor() {
return executor;
}
@Override
public Future addListener(GenericFutureListener<? extends Future> listener) {
if (listener == null) {
throw new NullPointerException("listener");
}
DefaultPromise.notifyListener(executor(), this, listener);
return this;
}
@Override
public Future addListeners(GenericFutureListener<? extends Future>... listeners) {
if (listeners == null) {
throw new NullPointerException("listeners");
}
for (GenericFutureListener<? extends Future> l: listeners) {
if (l == null) {
break;
}
DefaultPromise.notifyListener(executor(), this, l);
}
return this;
}
@Override
public Future removeListener(GenericFutureListener<? extends Future> listener) {
// NOOP
return this;
}
@Override
public Future removeListeners(GenericFutureListener<? extends Future>... listeners) {
// NOOP
return this;
}
@Override
public Future await() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
return this;
}
@Override
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
return true;
}
@Override
public Future sync() throws InterruptedException {
return this;
}
@Override
public Future syncUninterruptibly() {
return this;
}
@Override
public boolean await(long timeoutMillis) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
return true;
}
@Override
public Future awaitUninterruptibly() {
return this;
}
@Override
public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
return true;
}
@Override
public boolean awaitUninterruptibly(long timeoutMillis) {
return true;
}
@Override
public boolean isDone() {
return true;
}
}

View File

@ -0,0 +1,84 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
public abstract class CompletePromise extends CompleteFuture implements Promise {
protected CompletePromise(EventExecutor executor) {
super(executor);
}
@Override
public Promise setFailure(Throwable cause) {
throw new IllegalStateException();
}
@Override
public boolean tryFailure(Throwable cause) {
return false;
}
@Override
public Promise setSuccess() {
throw new IllegalStateException();
}
@Override
public boolean trySuccess() {
return false;
}
@Override
public Promise await() throws InterruptedException {
return this;
}
@Override
public Promise awaitUninterruptibly() {
return this;
}
@Override
public Promise syncUninterruptibly() {
return this;
}
@Override
public Promise sync() throws InterruptedException {
return this;
}
@Override
public Promise addListener(GenericFutureListener<? extends Future> listener) {
return (Promise) super.addListener(listener);
}
@Override
public Promise addListeners(GenericFutureListener<? extends Future>... listeners) {
return (Promise) super.addListeners(listeners);
}
@Override
public Promise removeListener(GenericFutureListener<? extends Future> listener) {
return (Promise) super.removeListener(listener);
}
@Override
public Promise removeListeners(GenericFutureListener<? extends Future>... listeners) {
return (Promise) super.removeListeners(listeners);
}
}

View File

@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
package io.netty.util.concurrent;
import java.util.concurrent.ThreadFactory;
@ -25,10 +25,10 @@ import java.util.concurrent.ThreadFactory;
final class DefaultEventExecutor extends SingleThreadEventExecutor {
/**
* @see SingleThreadEventExecutor#SingleThreadEventExecutor(EventExecutorGroup, ThreadFactory, ChannelTaskScheduler)
* @see SingleThreadEventExecutor#SingleThreadEventExecutor(EventExecutorGroup, ThreadFactory, TaskScheduler)
*/
DefaultEventExecutor(
DefaultEventExecutorGroup parent, ThreadFactory threadFactory, ChannelTaskScheduler scheduler) {
DefaultEventExecutorGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) {
super(parent, threadFactory, scheduler);
}

View File

@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
package io.netty.util.concurrent;
import java.util.concurrent.ThreadFactory;
@ -39,7 +39,7 @@ public class DefaultEventExecutorGroup extends MultithreadEventExecutorGroup {
@Override
protected EventExecutor newChild(
ThreadFactory threadFactory, ChannelTaskScheduler scheduler, Object... args) throws Exception {
ThreadFactory threadFactory, TaskScheduler scheduler, Object... args) throws Exception {
return new DefaultEventExecutor(this, threadFactory, scheduler);
}
}

View File

@ -0,0 +1,504 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import io.netty.util.Signal;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.*;
public class DefaultPromise implements Promise {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(DefaultPromise.class);
private static final int MAX_LISTENER_STACK_DEPTH = 8;
private static final ThreadLocal<Integer> LISTENER_STACK_DEPTH = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return 0;
}
};
private static final Signal SUCCESS = new Signal(DefaultPromise.class.getName() + ".SUCCESS");
private final EventExecutor executor;
private volatile Throwable cause;
private Object listeners; // Can be ChannelFutureListener or DefaultChannelPromiseListeners
/**
* The the most significant 24 bits of this field represents the number of waiter threads waiting for this promise
* with await*() and sync*(). Subclasses can use the other 40 bits of this field to represents its own state, and
* are responsible for retaining the most significant 24 bits as is when modifying this field.
*/
protected long state;
/**
* Creates a new instance.
*
* It is preferable to use {@link EventExecutor#newPromise()} to create a new promise
*
* @param executor
* the {@link EventExecutor} which is used to notify the promise once it is complete
*/
public DefaultPromise(EventExecutor executor) {
if (executor == null) {
throw new NullPointerException("executor");
}
this.executor = executor;
}
protected DefaultPromise() {
// only for subclasses
executor = null;
}
protected EventExecutor executor() {
return executor;
}
@Override
public boolean isDone() {
return cause != null;
}
@Override
public boolean isSuccess() {
return cause == SUCCESS;
}
@Override
public Throwable cause() {
Throwable cause = this.cause;
return cause == SUCCESS? null : cause;
}
@Override
public Promise addListener(GenericFutureListener<? extends Future> listener) {
if (listener == null) {
throw new NullPointerException("listener");
}
if (isDone()) {
notifyListener(executor(), this, listener);
return this;
}
synchronized (this) {
if (!isDone()) {
if (listeners == null) {
listeners = listener;
} else {
if (listeners instanceof DefaultPromiseListeners) {
((DefaultPromiseListeners) listeners).add(listener);
} else {
listeners = new DefaultPromiseListeners(
(GenericFutureListener<? extends Future>) listeners, listener);
}
}
return this;
}
}
notifyListener(executor(), this, listener);
return this;
}
@Override
public Promise addListeners(GenericFutureListener<? extends Future>... listeners) {
if (listeners == null) {
throw new NullPointerException("listeners");
}
for (GenericFutureListener<? extends Future> l: listeners) {
if (l == null) {
break;
}
addListener(l);
}
return this;
}
@Override
public Promise removeListener(GenericFutureListener<? extends Future> listener) {
if (listener == null) {
throw new NullPointerException("listener");
}
if (isDone()) {
return this;
}
synchronized (this) {
if (!isDone()) {
if (listeners instanceof DefaultPromiseListeners) {
((DefaultPromiseListeners) listeners).remove(listener);
} else if (listeners == listener) {
listeners = null;
}
}
}
return this;
}
@Override
public Promise removeListeners(GenericFutureListener<? extends Future>... listeners) {
if (listeners == null) {
throw new NullPointerException("listeners");
}
for (GenericFutureListener<? extends Future> l: listeners) {
if (l == null) {
break;
}
removeListener(l);
}
return this;
}
@Override
public Promise sync() throws InterruptedException {
await();
rethrowIfFailed();
return this;
}
@Override
public Promise syncUninterruptibly() {
awaitUninterruptibly();
rethrowIfFailed();
return this;
}
private void rethrowIfFailed() {
Throwable cause = cause();
if (cause == null) {
return;
}
PlatformDependent.throwException(cause);
}
@Override
public Promise await() throws InterruptedException {
if (isDone()) {
return this;
}
if (Thread.interrupted()) {
throw new InterruptedException();
}
synchronized (this) {
while (!isDone()) {
checkDeadLock();
incWaiters();
try {
wait();
} finally {
decWaiters();
}
}
}
return this;
}
@Override
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return await0(unit.toNanos(timeout), true);
}
@Override
public boolean await(long timeoutMillis) throws InterruptedException {
return await0(MILLISECONDS.toNanos(timeoutMillis), true);
}
@Override
public Promise awaitUninterruptibly() {
if (isDone()) {
return this;
}
boolean interrupted = false;
synchronized (this) {
while (!isDone()) {
checkDeadLock();
incWaiters();
try {
wait();
} catch (InterruptedException e) {
interrupted = true;
} finally {
decWaiters();
}
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
return this;
}
@Override
public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
try {
return await0(unit.toNanos(timeout), false);
} catch (InterruptedException e) {
throw new InternalError();
}
}
@Override
public boolean awaitUninterruptibly(long timeoutMillis) {
try {
return await0(MILLISECONDS.toNanos(timeoutMillis), false);
} catch (InterruptedException e) {
throw new InternalError();
}
}
private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
if (isDone()) {
return true;
}
if (timeoutNanos <= 0) {
return isDone();
}
if (interruptable && Thread.interrupted()) {
throw new InterruptedException();
}
long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime();
long waitTime = timeoutNanos;
boolean interrupted = false;
try {
synchronized (this) {
if (isDone()) {
return true;
}
if (waitTime <= 0) {
return isDone();
}
checkDeadLock();
incWaiters();
try {
for (;;) {
try {
wait(waitTime / 1000000, (int) (waitTime % 1000000));
} catch (InterruptedException e) {
if (interruptable) {
throw e;
} else {
interrupted = true;
}
}
if (isDone()) {
return true;
} else {
waitTime = timeoutNanos - (System.nanoTime() - startTime);
if (waitTime <= 0) {
return isDone();
}
}
}
} finally {
decWaiters();
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
/**
* Do deadlock checks
*/
protected void checkDeadLock() {
EventExecutor e = executor();
if (e != null && e.inEventLoop()) {
throw new BlockingOperationException();
}
}
@Override
public Promise setSuccess() {
if (set(SUCCESS)) {
notifyListeners();
return this;
}
throw new IllegalStateException();
}
@Override
public boolean trySuccess() {
if (set(SUCCESS)) {
notifyListeners();
return true;
}
return false;
}
@Override
public Promise setFailure(Throwable cause) {
if (set(cause)) {
notifyListeners();
return this;
}
throw new IllegalStateException();
}
@Override
public boolean tryFailure(Throwable cause) {
if (set(cause)) {
notifyListeners();
return true;
}
return false;
}
private boolean set(Throwable cause) {
if (isDone()) {
return false;
}
synchronized (this) {
// Allow only once.
if (isDone()) {
return false;
}
this.cause = cause;
if (hasWaiters()) {
notifyAll();
}
}
return true;
}
private boolean hasWaiters() {
return (state & 0xFFFFFF0000000000L) != 0;
}
private void incWaiters() {
long newState = state + 0x10000000000L;
if ((newState & 0xFFFFFF0000000000L) == 0) {
throw new IllegalStateException("too many waiters");
}
state = newState;
}
private void decWaiters() {
state -= 0x10000000000L;
}
private void notifyListeners() {
// This method doesn't need synchronization because:
// 1) This method is always called after synchronized (this) block.
// Hence any listener list modification happens-before this method.
// 2) This method is called only when 'done' is true. Once 'done'
// becomes true, the listener list is never modified - see add/removeListener()
if (listeners == null) {
return;
}
EventExecutor executor = executor();
if (executor.inEventLoop()) {
if (listeners instanceof DefaultPromiseListeners) {
notifyListeners0(this, (DefaultPromiseListeners) listeners);
} else {
notifyListener0(this, (GenericFutureListener<? extends Future>) listeners);
}
listeners = null;
} else {
final Object listeners = this.listeners;
this.listeners = null;
executor.execute(new Runnable() {
@Override
public void run() {
if (listeners instanceof DefaultPromiseListeners) {
notifyListeners0(DefaultPromise.this, (DefaultPromiseListeners) listeners);
} else {
notifyListener0(DefaultPromise.this, (GenericFutureListener<? extends Future>) listeners);
}
}
});
}
}
private static void notifyListeners0(final Future future,
DefaultPromiseListeners listeners) {
final GenericFutureListener<? extends Future>[] a = listeners.listeners();
final int size = listeners.size();
for (int i = 0; i < size; i ++) {
notifyListener0(future, a[i]);
}
}
public static void notifyListener(final EventExecutor eventExecutor, final Future future,
final GenericFutureListener<? extends Future> l) {
if (eventExecutor.inEventLoop()) {
final Integer stackDepth = LISTENER_STACK_DEPTH.get();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
LISTENER_STACK_DEPTH.set(stackDepth + 1);
try {
notifyListener0(future, l);
} finally {
LISTENER_STACK_DEPTH.set(stackDepth);
}
return;
}
}
eventExecutor.execute(new Runnable() {
@Override
public void run() {
notifyListener(eventExecutor, future, l);
}
});
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn(
"An exception was thrown by " +
GenericFutureListener.class.getSimpleName() + '.', t);
}
}
}
}

View File

@ -0,0 +1,67 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import java.util.Arrays;
import java.util.EventListener;
final class DefaultPromiseListeners {
private GenericFutureListener<? extends Future>[] listeners;
private int size;
@SuppressWarnings("unchecked")
DefaultPromiseListeners(GenericFutureListener<? extends Future> firstListener,
GenericFutureListener<? extends Future> secondListener) {
listeners = new GenericFutureListener[] { firstListener, secondListener };
size = 2;
}
void add(GenericFutureListener<? extends Future> l) {
GenericFutureListener<? extends Future>[] listeners = this.listeners;
final int size = this.size;
if (size == listeners.length) {
this.listeners = listeners = Arrays.copyOf(listeners, size << 1);
}
listeners[size] = l;
this.size = size + 1;
}
void remove(EventListener l) {
final EventListener[] listeners = this.listeners;
int size = this.size;
for (int i = 0; i < size; i ++) {
if (listeners[i] == l) {
int listenersToMove = size - i - 1;
if (listenersToMove > 0) {
System.arraycopy(listeners, i + 1, listeners, i, listenersToMove);
}
listeners[-- size] = null;
this.size = size;
return;
}
}
}
GenericFutureListener<? extends Future>[] listeners() {
return listeners;
}
int size() {
return size;
}
}

View File

@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
package io.netty.util.concurrent;
import java.util.concurrent.ScheduledExecutorService;
@ -24,7 +24,7 @@ import java.util.concurrent.ScheduledExecutorService;
* access methods.
*
*/
public interface EventExecutor extends EventExecutorGroup, ScheduledExecutorService {
public interface EventExecutor extends EventExecutorGroup, ScheduledExecutorService, FutureFactory {
/**
* Returns a reference to itself.

View File

@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
package io.netty.util.concurrent;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

View File

@ -0,0 +1,64 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import io.netty.util.internal.PlatformDependent;
/**
* The {@link CompleteFuture} which is failed already. It is
* recommended to use {@link EventExecutor#newFailedFuture(Throwable)}
* instead of calling the constructor of this future.
*/
public final class FailedFuture extends CompleteFuture {
private final Throwable cause;
/**
* Creates a new instance.
*
* @param executor the {@link EventExecutor} associated with this future
* @param cause the cause of failure
*/
public FailedFuture(EventExecutor executor, Throwable cause) {
super(executor);
if (cause == null) {
throw new NullPointerException("cause");
}
this.cause = cause;
}
@Override
public Throwable cause() {
return cause;
}
@Override
public boolean isSuccess() {
return false;
}
@Override
public Future sync() {
PlatformDependent.throwException(cause);
return this;
}
@Override
public Future syncUninterruptibly() {
PlatformDependent.throwException(cause);
return this;
}
}

View File

@ -0,0 +1,152 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import java.util.concurrent.TimeUnit;
/**
* The result of an asynchronous operation.
*/
public interface Future {
/**
* Returns {@code true} if and only if this future is
* complete, regardless of whether the operation was successful or failed.
*/
boolean isDone();
/**
* Returns {@code true} if and only if the I/O operation was completed
* successfully.
*/
boolean isSuccess();
/**
* Returns the cause of the failed I/O operation if the I/O operation has
* failed.
*
* @return the cause of the failure.
* {@code null} if succeeded or this future is not
* completed yet.
*/
Throwable cause();
/**
* Adds the specified listener to this future. The
* specified listener is notified when this future is
* {@linkplain #isDone() done}. If this future is already
* completed, the specified listener is notified immediately.
*/
Future addListener(GenericFutureListener<? extends Future> listener);
/**
* Adds the specified listeners to this future. The
* specified listeners are notified when this future is
* {@linkplain #isDone() done}. If this future is already
* completed, the specified listeners are notified immediately.
*/
Future addListeners(GenericFutureListener<? extends Future>... listeners);
/**
* Removes the specified listener from this future.
* The specified listener is no longer notified when this
* future is {@linkplain #isDone() done}. If the specified
* listener is not associated with this future, this method
* does nothing and returns silently.
*/
Future removeListener(GenericFutureListener<? extends Future> listener);
/**
* Removes the specified listeners from this future.
* The specified listeners are no longer notified when this
* future is {@linkplain #isDone() done}. If the specified
* listeners are not associated with this future, this method
* does nothing and returns silently.
*/
Future removeListeners(GenericFutureListener<? extends Future>... listeners);
/**
* Waits for this future until it is done, and rethrows the cause of the failure if this future
* failed.
*/
Future sync() throws InterruptedException;
/**
* Waits for this future until it is done, and rethrows the cause of the failure if this future
* failed.
*/
Future syncUninterruptibly();
/**
* Waits for this future to be completed.
*
* @throws InterruptedException
* if the current thread was interrupted
*/
Future await() throws InterruptedException;
/**
* Waits for this future to be completed without
* interruption. This method catches an {@link InterruptedException} and
* discards it silently.
*/
Future awaitUninterruptibly();
/**
* Waits for this future to be completed within the
* specified time limit.
*
* @return {@code true} if and only if the future was completed within
* the specified time limit
*
* @throws InterruptedException
* if the current thread was interrupted
*/
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
/**
* Waits for this future to be completed within the
* specified time limit.
*
* @return {@code true} if and only if the future was completed within
* the specified time limit
*
* @throws InterruptedException
* if the current thread was interrupted
*/
boolean await(long timeoutMillis) throws InterruptedException;
/**
* Waits for this future to be completed within the
* specified time limit without interruption. This method catches an
* {@link InterruptedException} and discards it silently.
*
* @return {@code true} if and only if the future was completed within
* the specified time limit
*/
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
/**
* Waits for this future to be completed within the
* specified time limit without interruption. This method catches an
* {@link InterruptedException} and discards it silently.
*
* @return {@code true} if and only if the future was completed within
* the specified time limit
*/
boolean awaitUninterruptibly(long timeoutMillis);
}

View File

@ -0,0 +1,40 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
/**
* Allows to create {@link Future}'s on demand.
*/
public interface FutureFactory {
/**
* Return a new {@link Promise}.
*/
Promise newPromise();
/**
* Create a new {@link Future} which is marked as successes already. So {@link Future#isSuccess()}
* will return {@code true}. All {@link FutureListener} added to it will be notified directly. Also
* every call of blocking methods will just return without blocking.
*/
Future newSucceededFuture();
/**
* Create a new {@link Future} which is marked as fakued already. So {@link Future#isSuccess()}
* will return {@code false}. All {@link FutureListener} added to it will be notified directly. Also
* every call of blocking methods will just return without blocking.
*/
Future newFailedFuture(Throwable cause);
}

View File

@ -0,0 +1,28 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
/**
* A subtype of {@link GenericFutureListener} that hides type parameter for convenience.
* <pre>
* Future f = new DefaultPromise(..);
* f.addListener(new FutureListener() {
* public void operationComplete(Future f) { .. }
* });
* </pre>
*/
public interface FutureListener extends GenericFutureListener<Future> { }

View File

@ -0,0 +1,32 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import java.util.EventListener;
/**
* Listens to the result of a {@link Future}. The result of the asynchronous operation is notified once this listener
* is added by calling {@link Future#addListener(GenericFutureListener)}.
*/
public interface GenericFutureListener<F extends Future> extends EventListener {
/**
* Invoked when the operation associated with the {@link Future} has been completed.
*
* @param future the source {@link Future} which called this callback
*/
void operationComplete(F future) throws Exception;
}

View File

@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
package io.netty.util.concurrent;
import java.util.Collections;
import java.util.LinkedHashMap;
@ -31,7 +31,7 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou
public static final int DEFAULT_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
private static final AtomicInteger poolId = new AtomicInteger();
final ChannelTaskScheduler scheduler;
final TaskScheduler scheduler;
private final EventExecutor[] children;
private final AtomicInteger childIndex = new AtomicInteger();
@ -42,7 +42,7 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou
* of {@link #DEFAULT_POOL_SIZE}
* @param threadFactory the ThreadFactory to use, or {@code null} if the default should be used.
* @param args arguments which will passed to each
* {@link #newChild(ThreadFactory, ChannelTaskScheduler, Object...)}
* {@link #newChild(ThreadFactory, TaskScheduler, Object...)}
* call.
*/
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
@ -58,7 +58,7 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou
threadFactory = new DefaultThreadFactory();
}
scheduler = new ChannelTaskScheduler(threadFactory);
scheduler = new TaskScheduler(threadFactory);
children = new SingleThreadEventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
@ -67,7 +67,8 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou
children[i] = newChild(threadFactory, scheduler, args);
success = true;
} catch (Exception e) {
throw new EventLoopException("failed to create a child event loop", e);
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
@ -98,7 +99,7 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou
*
*/
protected abstract EventExecutor newChild(
ThreadFactory threadFactory, ChannelTaskScheduler scheduler, Object... args) throws Exception;
ThreadFactory threadFactory, TaskScheduler scheduler, Object... args) throws Exception;
@Override
public void shutdown() {

View File

@ -0,0 +1,82 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
/**
* Special {@link Future} which is writable.
*/
public interface Promise extends Future {
/**
* Marks this future as a success and notifies all
* listeners.
*
* If it is success or failed already it will throw an {@link IllegalStateException}.
*/
Promise setSuccess();
/**
* Marks this future as a success and notifies all
* listeners.
*
* @return {@code true} if and only if successfully marked this future as
* a success. Otherwise {@code false} because this future is
* already marked as either a success or a failure.
*/
boolean trySuccess();
/**
* Marks this future as a failure and notifies all
* listeners.
*
* If it is success or failed already it will throw an {@link IllegalStateException}.
*/
Promise setFailure(Throwable cause);
/**
* Marks this future as a failure and notifies all
* listeners.
*
* @return {@code true} if and only if successfully marked this future as
* a failure. Otherwise {@code false} because this future is
* already marked as either a success or a failure.
*/
boolean tryFailure(Throwable cause);
@Override
Promise addListener(GenericFutureListener<? extends Future> listener);
@Override
Promise addListeners(GenericFutureListener<? extends Future>... listeners);
@Override
Promise removeListener(GenericFutureListener<? extends Future> listener);
@Override
Promise removeListeners(GenericFutureListener<? extends Future>... listeners);
@Override
Promise await() throws InterruptedException;
@Override
Promise awaitUninterruptibly();
@Override
Promise sync() throws InterruptedException;
@Override
Promise syncUninterruptibly();
}

View File

@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
package io.netty.util.concurrent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -72,11 +72,12 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
}
private final EventExecutorGroup parent;
private final Future succeededFuture = new SucceededFuture(this);
private final Queue<Runnable> taskQueue;
private final Thread thread;
private final Object stateLock = new Object();
private final Semaphore threadLock = new Semaphore(0);
private final ChannelTaskScheduler scheduler;
private final TaskScheduler scheduler;
private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
private volatile int state = ST_NOT_STARTED;
private long lastAccessTimeNanos;
@ -86,11 +87,11 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
*
* @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
* @param threadFactory the {@link ThreadFactory} which will be used for the used {@link Thread}
* @param scheduler the {@link ChannelTaskScheduler} which will be used to schedule Tasks for later
* @param scheduler the {@link TaskScheduler} which will be used to schedule Tasks for later
* execution
*/
protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory, ChannelTaskScheduler scheduler) {
EventExecutorGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
@ -498,4 +499,19 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return scheduler.scheduleWithFixedDelay(this, command, initialDelay, delay, unit);
}
@Override
public Promise newPromise() {
return new DefaultPromise(this);
}
@Override
public Future newSucceededFuture() {
return succeededFuture;
}
@Override
public Future newFailedFuture(Throwable cause) {
return new FailedFuture(this, cause);
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
/**
* The {@link CompleteFuture} which is succeeded already. It is
* recommended to use {@link EventExecutor#newSucceededFuture()} instead of
* calling the constructor of this future.
*/
public final class SucceededFuture extends CompleteFuture {
/**
* Creates a new instance.
*
* @param executor the {@link EventExecutor} associated with this future
*/
public SucceededFuture(EventExecutor executor) {
super(executor);
}
@Override
public Throwable cause() {
return null;
}
@Override
public boolean isSuccess() {
return true;
}
}

View File

@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
package io.netty.util.concurrent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -31,10 +31,10 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public final class ChannelTaskScheduler {
public final class TaskScheduler {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(ChannelTaskScheduler.class);
InternalLoggerFactory.getInstance(TaskScheduler.class);
private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
private static final long START_TIME = System.nanoTime();
@ -55,7 +55,7 @@ public final class ChannelTaskScheduler {
/** 0 - not started, 1 - started, 2 - shut down, 3 - terminated */
private volatile int state;
public ChannelTaskScheduler(ThreadFactory threadFactory) {
public TaskScheduler(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}

View File

@ -0,0 +1,20 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* Utility classes for concurrent / async tasks.
*/
package io.netty.util.concurrent;

View File

@ -127,6 +127,22 @@ public final class PlatformDependent {
return HAS_JAVASSIST;
}
/**
* Raises an exception bypassing compiler checks for checked exceptions.
*/
public static void throwException(Throwable t) {
if (hasUnsafe()) {
PlatformDependent0.throwException(t);
} else {
PlatformDependent.<RuntimeException>throwException0(t);
}
}
@SuppressWarnings("unchecked")
private static <E extends Throwable> void throwException0(Throwable t) throws E {
throw (E) t;
}
/**
* Creates a new fastest {@link ConcurrentMap} implementaion for the current platform.
*/

View File

@ -119,6 +119,10 @@ final class PlatformDependent0 {
return UNSAFE != null;
}
static void throwException(Throwable t) {
UNSAFE.throwException(t);
}
static void freeDirectBuffer(ByteBuffer buffer) {
Cleaner cleaner;
try {

View File

@ -25,7 +25,7 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOperationHandler;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelStateHandlerAdapter;
import io.netty.channel.EventExecutor;
import io.netty.util.concurrent.EventExecutor;
import io.netty.channel.FileRegion;
import java.net.SocketAddress;

View File

@ -16,7 +16,7 @@
package io.netty.handler.traffic;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.EventExecutor;
import io.netty.util.concurrent.EventExecutor;
import java.util.concurrent.ScheduledExecutorService;

View File

@ -23,8 +23,8 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.DefaultEventExecutorGroup;
import io.netty.channel.EventExecutorGroup;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.channel.socket.SocketChannel;
import org.junit.AfterClass;
import org.junit.BeforeClass;

View File

@ -24,8 +24,8 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.DefaultEventExecutorGroup;
import io.netty.channel.EventExecutorGroup;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

View File

@ -33,6 +33,7 @@ import io.netty.channel.sctp.SctpChannelConfig;
import io.netty.channel.sctp.SctpMessage;
import io.netty.channel.sctp.SctpNotificationHandler;
import io.netty.channel.sctp.SctpServerChannel;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -276,16 +277,8 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
free = false;
return 1;
} catch (Throwable cause) {
if (cause instanceof Error) {
throw (Error) cause;
}
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
if (cause instanceof Exception) {
throw (Exception) cause;
}
throw new ChannelException(cause);
PlatformDependent.throwException(cause);
return -1;
} finally {
if (free) {
buffer.release();

View File

@ -33,6 +33,7 @@ import io.netty.channel.sctp.SctpChannelConfig;
import io.netty.channel.sctp.SctpMessage;
import io.netty.channel.sctp.SctpNotificationHandler;
import io.netty.channel.sctp.SctpServerChannel;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -197,16 +198,7 @@ public class OioSctpChannel extends AbstractOioMessageChannel
free = false;
readMessages ++;
} catch (Throwable cause) {
if (cause instanceof Error) {
throw (Error) cause;
}
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
if (cause instanceof Exception) {
throw (Exception) cause;
}
throw new ChannelException(cause);
PlatformDependent.throwException(cause);
} finally {
if (free) {
buffer.release();

View File

@ -78,7 +78,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private final Integer id;
private final Unsafe unsafe;
private final DefaultChannelPipeline pipeline;
private final ChannelFuture succeededFuture = new SucceededChannelFuture(this);
private final ChannelFuture succeededFuture = new SucceededChannelFuture(this, null);
private final VoidChannelPromise voidPromise = new VoidChannelPromise(this);
private final CloseFuture closeFuture = new CloseFuture(this);
@ -313,7 +313,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
@Override
public ChannelFuture newFailedFuture(Throwable cause) {
return new FailedChannelFuture(this, cause);
return new FailedChannelFuture(this, null, cause);
}
@Override

View File

@ -210,13 +210,13 @@ public final class ChannelFlushPromiseNotifier {
}
}
abstract static class FlushCheckpoint {
abstract long flushCheckpoint();
abstract void flushCheckpoint(long checkpoint);
abstract ChannelPromise future();
interface FlushCheckpoint {
long flushCheckpoint();
void flushCheckpoint(long checkpoint);
ChannelPromise future();
}
private static class DefaultFlushCheckpoint extends FlushCheckpoint {
private static class DefaultFlushCheckpoint implements FlushCheckpoint {
private long checkpoint;
private final ChannelPromise future;
@ -226,17 +226,17 @@ public final class ChannelFlushPromiseNotifier {
}
@Override
long flushCheckpoint() {
public long flushCheckpoint() {
return checkpoint;
}
@Override
void flushCheckpoint(long checkpoint) {
public void flushCheckpoint(long checkpoint) {
this.checkpoint = checkpoint;
}
@Override
ChannelPromise future() {
public ChannelPromise future() {
return future;
}
}

View File

@ -16,6 +16,9 @@
package io.netty.channel;
import io.netty.bootstrap.Bootstrap;
import io.netty.util.concurrent.BlockingOperationException;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.concurrent.TimeUnit;
@ -61,13 +64,13 @@ import java.util.concurrent.TimeUnit;
* operation. It also allows you to add {@link ChannelFutureListener}s so you
* can get notified when the I/O operation is completed.
*
* <h3>Prefer {@link #addListener(ChannelFutureListener)} to {@link #await()}</h3>
* <h3>Prefer {@link #addListener(GenericFutureListener)} to {@link #await()}</h3>
*
* It is recommended to prefer {@link #addListener(ChannelFutureListener)} to
* It is recommended to prefer {@link #addListener(GenericFutureListener)} to
* {@link #await()} wherever possible to get notified when an I/O operation is
* done and to do any follow-up tasks.
* <p>
* {@link #addListener(ChannelFutureListener)} is non-blocking. It simply adds
* {@link #addListener(GenericFutureListener)} is non-blocking. It simply adds
* the specified {@link ChannelFutureListener} to the {@link ChannelFuture}, and
* I/O thread will notify the listeners when the I/O operation associated with
* the future is done. {@link ChannelFutureListener} yields the best
@ -159,7 +162,7 @@ import java.util.concurrent.TimeUnit;
* }
* </pre>
*/
public interface ChannelFuture {
public interface ChannelFuture extends Future {
/**
* Returns a channel where the I/O operation associated with this
@ -167,135 +170,30 @@ public interface ChannelFuture {
*/
Channel channel();
/**
* Returns {@code true} if and only if this future is
* complete, regardless of whether the operation was successful or failed.
*/
boolean isDone();
@Override
ChannelFuture addListener(GenericFutureListener<? extends Future> listener);
/**
* Returns {@code true} if and only if the I/O operation was completed
* successfully.
*/
boolean isSuccess();
@Override
ChannelFuture addListeners(GenericFutureListener<? extends Future>... listeners);
/**
* Returns the cause of the failed I/O operation if the I/O operation has
* failed.
*
* @return the cause of the failure.
* {@code null} if succeeded or this future is not
* completed yet.
*/
Throwable cause();
@Override
ChannelFuture removeListener(GenericFutureListener<? extends Future> listener);
/**
* Adds the specified listener to this future. The
* specified listener is notified when this future is
* {@linkplain #isDone() done}. If this future is already
* completed, the specified listener is notified immediately.
*/
ChannelFuture addListener(ChannelFutureListener listener);
@Override
ChannelFuture removeListeners(GenericFutureListener<? extends Future>... listeners);
/**
* Adds the specified listeners to this future. The
* specified listeners are notified when this future is
* {@linkplain #isDone() done}. If this future is already
* completed, the specified listeners are notified immediately.
*/
ChannelFuture addListeners(ChannelFutureListener... listeners);
/**
* Removes the specified listener from this future.
* The specified listener is no longer notified when this
* future is {@linkplain #isDone() done}. If the specified
* listener is not associated with this future, this method
* does nothing and returns silently.
*/
ChannelFuture removeListener(ChannelFutureListener listener);
/**
* Removes the specified listeners from this future.
* The specified listeners are no longer notified when this
* future is {@linkplain #isDone() done}. If the specified
* listeners are not associated with this future, this method
* does nothing and returns silently.
*/
ChannelFuture removeListeners(ChannelFutureListener... listeners);
/**
* Waits for this future until it is done, and rethrows the cause of the failure if this future
* failed. If the cause of the failure is a checked exception, it is wrapped with a new
* {@link ChannelException} before being thrown.
*/
@Override
ChannelFuture sync() throws InterruptedException;
/**
* Waits for this future until it is done, and rethrows the cause of the failure if this future
* failed. If the cause of the failure is a checked exception, it is wrapped with a new
* {@link ChannelException} before being thrown.
*/
@Override
ChannelFuture syncUninterruptibly();
/**
* Waits for this future to be completed.
*
* @throws InterruptedException
* if the current thread was interrupted
*/
@Override
ChannelFuture await() throws InterruptedException;
/**
* Waits for this future to be completed without
* interruption. This method catches an {@link InterruptedException} and
* discards it silently.
*/
@Override
ChannelFuture awaitUninterruptibly();
/**
* Waits for this future to be completed within the
* specified time limit.
*
* @return {@code true} if and only if the future was completed within
* the specified time limit
*
* @throws InterruptedException
* if the current thread was interrupted
*/
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
/**
* Waits for this future to be completed within the
* specified time limit.
*
* @return {@code true} if and only if the future was completed within
* the specified time limit
*
* @throws InterruptedException
* if the current thread was interrupted
*/
boolean await(long timeoutMillis) throws InterruptedException;
/**
* Waits for this future to be completed within the
* specified time limit without interruption. This method catches an
* {@link InterruptedException} and discards it silently.
*
* @return {@code true} if and only if the future was completed within
* the specified time limit
*/
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
/**
* Waits for this future to be completed within the
* specified time limit without interruption. This method catches an
* {@link InterruptedException} and discards it silently.
*
* @return {@code true} if and only if the future was completed within
* the specified time limit
*/
boolean awaitUninterruptibly(long timeoutMillis);
/**
* A {@link ChannelFuture} which is not allowed to be sent to {@link ChannelPipeline} due to
* implementation details.

View File

@ -15,22 +15,24 @@
*/
package io.netty.channel;
import java.util.EventListener;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
/**
* Listens to the result of a {@link ChannelFuture}. The result of the
* asynchronous {@link Channel} I/O operation is notified once this listener
* is added by calling {@link ChannelFuture#addListener(ChannelFutureListener)}.
* is added by calling {@link ChannelFuture#addListener(GenericFutureListener)}.
*
* <h3>Return the control to the caller quickly</h3>
*
* {@link #operationComplete(ChannelFuture)} is directly called by an I/O
* {@link #operationComplete(Future)} is directly called by an I/O
* thread. Therefore, performing a time consuming task or a blocking operation
* in the handler method can cause an unexpected pause during I/O. If you need
* to perform a blocking operation on I/O completion, try to execute the
* operation in a different thread using a thread pool.
*/
public interface ChannelFutureListener extends EventListener {
public interface ChannelFutureListener extends GenericFutureListener<ChannelFuture> {
/**
* A {@link ChannelFutureListener} that closes the {@link Channel} which is
@ -68,13 +70,4 @@ public interface ChannelFutureListener extends EventListener {
}
}
};
/**
* Invoked when the I/O operation associated with the {@link ChannelFuture}
* has been completed.
*
* @param future the source {@link ChannelFuture} which called this
* callback
*/
void operationComplete(ChannelFuture future) throws Exception;
}

View File

@ -21,6 +21,7 @@ import io.netty.buffer.MessageBuf;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import io.netty.util.AttributeMap;
import io.netty.util.concurrent.EventExecutor;
import java.nio.channels.Channels;
import java.util.Set;

View File

@ -16,7 +16,7 @@
package io.netty.channel;
import java.net.SocketAddress;
import io.netty.util.concurrent.EventExecutor;
/**
* Interface which is shared by others which need to execute outbound logic.
*/

View File

@ -18,6 +18,7 @@ package io.netty.channel;
import io.netty.buffer.Buf;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.util.concurrent.EventExecutorGroup;
import java.io.InputStream;
import java.io.OutputStream;

View File

@ -15,58 +15,32 @@
*/
package io.netty.channel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
/**
* Special {@link ChannelFuture} which is writable.
*/
public interface ChannelPromise extends ChannelFuture {
public interface ChannelPromise extends ChannelFuture, Promise {
/**
* Marks this future as a success and notifies all
* listeners.
*
* If it is success or failed already it will throw an {@link IllegalStateException}.
*/
@Override
ChannelPromise setSuccess();
/**
* Marks this future as a success and notifies all
* listeners.
*
* @return {@code true} if and only if successfully marked this future as
* a success. Otherwise {@code false} because this future is
* already marked as either a success or a failure.
*/
boolean trySuccess();
/**
* Marks this future as a failure and notifies all
* listeners.
*
* If it is success or failed already it will throw an {@link IllegalStateException}.
*/
@Override
ChannelPromise setFailure(Throwable cause);
/**
* Marks this future as a failure and notifies all
* listeners.
*
* @return {@code true} if and only if successfully marked this future as
* a failure. Otherwise {@code false} because this future is
* already marked as either a success or a failure.
*/
boolean tryFailure(Throwable cause);
@Override
ChannelPromise addListener(GenericFutureListener<? extends Future> listener);
@Override
ChannelPromise addListener(ChannelFutureListener listener);
ChannelPromise addListeners(GenericFutureListener<? extends Future>... listeners);
@Override
ChannelPromise addListeners(ChannelFutureListener... listeners);
ChannelPromise removeListener(GenericFutureListener<? extends Future> listener);
@Override
ChannelPromise removeListener(ChannelFutureListener listener);
@Override
ChannelPromise removeListeners(ChannelFutureListener... listeners);
ChannelPromise removeListeners(GenericFutureListener<? extends Future>... listeners);
@Override
ChannelPromise sync() throws InterruptedException;

View File

@ -17,11 +17,12 @@ package io.netty.channel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.concurrent.FutureFactory;
/**
* Provides common methods between {@link Channel} and {@link ChannelHandlerContext}.
*/
interface ChannelPropertyAccess {
interface ChannelPropertyAccess extends FutureFactory {
/**
* Return the assigned {@link ChannelPipeline}
@ -33,22 +34,12 @@ interface ChannelPropertyAccess {
*/
ByteBufAllocator alloc();
/**
* Create a new {@link ChannelPromise}
*/
@Override
ChannelPromise newPromise();
/**
* Create a new {@link ChannelFuture} which is marked as successes already. So {@link ChannelFuture#isSuccess()}
* will return {@code true}. All {@link ChannelFutureListener} added to it will be notified directly. Also
* every call of blocking methods will just return without blocking.
*/
@Override
ChannelFuture newSucceededFuture();
/**
* Create a new {@link ChannelFuture} which is marked as fakued already. So {@link ChannelFuture#isSuccess()}
* will return {@code false}. All {@link ChannelFutureListener} added to it will be notified directly. Also
* every call of blocking methods will just return without blocking.
*/
@Override
ChannelFuture newFailedFuture(Throwable cause);
}

View File

@ -15,13 +15,16 @@
*/
package io.netty.channel;
import java.util.concurrent.TimeUnit;
import io.netty.util.concurrent.CompleteFuture;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
/**
* A skeletal {@link ChannelFuture} implementation which represents a
* {@link ChannelFuture} which has been completed already.
*/
abstract class CompleteChannelFuture implements ChannelFuture {
abstract class CompleteChannelFuture extends CompleteFuture implements ChannelFuture {
private final Channel channel;
@ -30,7 +33,8 @@ abstract class CompleteChannelFuture implements ChannelFuture {
*
* @param channel the {@link Channel} associated with this future
*/
protected CompleteChannelFuture(Channel channel) {
protected CompleteChannelFuture(Channel channel, EventExecutor executor) {
super(executor);
if (channel == null) {
throw new NullPointerException("channel");
}
@ -38,86 +42,61 @@ abstract class CompleteChannelFuture implements ChannelFuture {
}
@Override
public ChannelFuture addListener(final ChannelFutureListener listener) {
if (listener == null) {
throw new NullPointerException("listener");
protected EventExecutor executor() {
EventExecutor e = super.executor();
if (e == null) {
return channel().eventLoop();
} else {
return e;
}
DefaultChannelPromise.notifyListener(this, listener);
}
@Override
public ChannelFuture addListener(GenericFutureListener<? extends Future> listener) {
super.addListener(listener);
return this;
}
@Override
public ChannelFuture addListeners(ChannelFutureListener... listeners) {
if (listeners == null) {
throw new NullPointerException("listeners");
}
for (ChannelFutureListener l: listeners) {
if (l == null) {
break;
}
DefaultChannelPromise.notifyListener(this, l);
}
public ChannelFuture addListeners(GenericFutureListener<? extends Future>... listeners) {
super.addListeners(listeners);
return this;
}
@Override
public ChannelFuture removeListener(ChannelFutureListener listener) {
// NOOP
public ChannelFuture removeListener(GenericFutureListener<? extends Future> listener) {
super.removeListener(listener);
return this;
}
@Override
public ChannelFuture removeListeners(ChannelFutureListener... listeners) {
// NOOP
public ChannelFuture removeListeners(GenericFutureListener<? extends Future>... listeners) {
super.removeListeners(listeners);
return this;
}
@Override
public ChannelFuture syncUninterruptibly() {
return this;
}
@Override
public ChannelFuture sync() throws InterruptedException {
return this;
}
@Override
public ChannelFuture await() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
return this;
}
@Override
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
return true;
}
@Override
public boolean await(long timeoutMillis) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
return true;
}
@Override
public ChannelFuture awaitUninterruptibly() {
return this;
}
@Override
public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
return true;
}
@Override
public boolean awaitUninterruptibly(long timeoutMillis) {
return true;
}
@Override
public Channel channel() {
return channel;
}
@Override
public boolean isDone() {
return true;
}
}

View File

@ -16,10 +16,14 @@
package io.netty.channel;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
abstract class CompleteChannelPromise extends CompleteChannelFuture implements ChannelPromise {
protected CompleteChannelPromise(Channel channel) {
super(channel);
protected CompleteChannelPromise(Channel channel, EventExecutor executor) {
super(channel, executor);
}
@Override
@ -42,26 +46,6 @@ abstract class CompleteChannelPromise extends CompleteChannelFuture implements C
return false;
}
@Override
public ChannelPromise addListener(final ChannelFutureListener listener) {
return (ChannelPromise) super.addListener(listener);
}
@Override
public ChannelPromise addListeners(ChannelFutureListener... listeners) {
return (ChannelPromise) super.addListeners(listeners);
}
@Override
public ChannelPromise removeListener(ChannelFutureListener listener) {
return (ChannelPromise) super.removeListener(listener);
}
@Override
public ChannelPromise removeListeners(ChannelFutureListener... listeners) {
return (ChannelPromise) super.removeListeners(listeners);
}
@Override
public ChannelPromise await() throws InterruptedException {
return (ChannelPromise) super.await();
@ -71,4 +55,34 @@ abstract class CompleteChannelPromise extends CompleteChannelFuture implements C
public ChannelPromise awaitUninterruptibly() {
return (ChannelPromise) super.awaitUninterruptibly();
}
@Override
public ChannelPromise addListener(GenericFutureListener<? extends Future> listener) {
return (ChannelPromise) super.addListener(listener);
}
@Override
public ChannelPromise addListeners(GenericFutureListener<? extends Future>... listeners) {
return (ChannelPromise) super.addListeners(listeners);
}
@Override
public ChannelPromise removeListener(GenericFutureListener<? extends Future> listener) {
return (ChannelPromise) super.removeListener(listener);
}
@Override
public ChannelPromise removeListeners(GenericFutureListener<? extends Future>... listeners) {
return (ChannelPromise) super.removeListeners(listeners);
}
@Override
public ChannelPromise sync() throws InterruptedException {
return this;
}
@Override
public ChannelPromise syncUninterruptibly() {
return this;
}
}

View File

@ -21,6 +21,9 @@ import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.DefaultAttributeMap;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.internal.PlatformDependent;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
@ -52,6 +55,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
// Will be set to null if no child executor should be used, otherwise it will be set to the
// child executor.
final EventExecutor executor;
private ChannelFuture succeededFuture;
private final MessageBuf<Object> inMsgBuf;
private final ByteBuf inByteBuf;
@ -541,17 +545,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
} catch (ExecutionException ex) {
// In the arbitrary case, we can throw Error, RuntimeException, and Exception
Throwable t = ex.getCause();
if (t instanceof Error) { throw (Error) t; }
if (t instanceof RuntimeException) { throw (RuntimeException) t; }
if (t instanceof Exception) { throw (Exception) t; }
throw new ChannelPipelineException(t);
PlatformDependent.throwException(ex.getCause());
} catch (InterruptedException ex) {
// Interrupt the calling thread (note that this method is not called from the event loop)
Thread.currentThread().interrupt();
return null;
}
return null;
}
/**
@ -574,11 +575,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
future.get();
} catch (ExecutionException ex) {
// In the arbitrary case, we can throw Error, RuntimeException, and Exception
Throwable t = ex.getCause();
if (t instanceof Error) { throw (Error) t; }
if (t instanceof RuntimeException) { throw (RuntimeException) t; }
throw new ChannelPipelineException(t);
PlatformDependent.throwException(ex.getCause());
} catch (InterruptedException ex) {
// Interrupt the calling thread (note that this method is not called from the event loop)
@ -1580,17 +1577,21 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public ChannelPromise newPromise() {
return new DefaultChannelPromise(channel());
return new DefaultChannelPromise(channel(), executor());
}
@Override
public ChannelFuture newSucceededFuture() {
return channel().newSucceededFuture();
ChannelFuture succeededFuture = this.succeededFuture;
if (succeededFuture == null) {
this.succeededFuture = succeededFuture = new SucceededChannelFuture(channel(), executor());
}
return succeededFuture;
}
@Override
public ChannelFuture newFailedFuture(Throwable cause) {
return channel().newFailedFuture(cause);
return new FailedChannelFuture(channel(), executor(), cause);
}
private void validateFuture(ChannelFuture future) {

View File

@ -21,6 +21,8 @@ import io.netty.buffer.MessageBuf;
import io.netty.buffer.ReferenceCounted;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel.Unsafe;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

View File

@ -16,44 +16,18 @@
package io.netty.channel;
import io.netty.channel.ChannelFlushPromiseNotifier.FlushCheckpoint;
import io.netty.util.Signal;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.*;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
/**
* The default {@link ChannelPromise} implementation. It is recommended to use {@link Channel#newPromise()} to create
* a new {@link ChannelPromise} rather than calling the constructor explicitly.
*/
public class DefaultChannelPromise extends FlushCheckpoint implements ChannelPromise {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(DefaultChannelPromise.class);
private static final int MAX_LISTENER_STACK_DEPTH = 8;
private static final ThreadLocal<Integer> LISTENER_STACK_DEPTH = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return 0;
}
};
private static final Signal SUCCESS = new Signal(DefaultChannelPromise.class.getName() + ".SUCCESS");
public class DefaultChannelPromise extends DefaultPromise implements ChannelPromise, FlushCheckpoint {
private final Channel channel;
private volatile Throwable cause;
private Object listeners; // Can be ChannelFutureListener or DefaultChannelPromiseListeners
/**
* The first 24 bits of this field represents the number of waiters waiting for this promise with await*().
* The other 40 bits of this field represents the flushCheckpoint used by ChannelFlushPromiseNotifier and
* AbstractChannel.Unsafe.flush().
*/
private long flushCheckpoint;
/**
* Creates a new instance.
@ -65,500 +39,114 @@ public class DefaultChannelPromise extends FlushCheckpoint implements ChannelPro
this.channel = channel;
}
/**
* Creates a new instance.
*
* @param channel
* the {@link Channel} associated with this future
*/
public DefaultChannelPromise(Channel channel, EventExecutor executor) {
super(executor);
this.channel = channel;
}
@Override
protected EventExecutor executor() {
EventExecutor e = super.executor();
if (e == null) {
return channel().eventLoop();
} else {
return e;
}
}
@Override
public Channel channel() {
return channel;
}
@Override
public boolean isDone() {
return cause != null;
}
@Override
public boolean isSuccess() {
return cause == SUCCESS;
}
@Override
public Throwable cause() {
Throwable cause = this.cause;
return cause == SUCCESS? null : cause;
}
@Override
public ChannelPromise addListener(final ChannelFutureListener listener) {
if (listener == null) {
throw new NullPointerException("listener");
}
if (isDone()) {
notifyListener(this, listener);
return this;
}
synchronized (this) {
if (!isDone()) {
if (listeners == null) {
listeners = listener;
} else {
if (listeners instanceof DefaultChannelPromiseListeners) {
((DefaultChannelPromiseListeners) listeners).add(listener);
} else {
listeners = new DefaultChannelPromiseListeners((ChannelFutureListener) listeners, listener);
}
}
return this;
}
}
notifyListener(this, listener);
public ChannelPromise setSuccess() {
super.setSuccess();
return this;
}
@Override
public ChannelPromise addListeners(ChannelFutureListener... listeners) {
if (listeners == null) {
throw new NullPointerException("listeners");
}
for (ChannelFutureListener l: listeners) {
if (l == null) {
break;
}
addListener(l);
}
public ChannelPromise setFailure(Throwable cause) {
super.setFailure(cause);
return this;
}
@Override
public ChannelPromise removeListener(ChannelFutureListener listener) {
if (listener == null) {
throw new NullPointerException("listener");
}
if (isDone()) {
return this;
}
synchronized (this) {
if (!isDone()) {
if (listeners instanceof DefaultChannelPromiseListeners) {
((DefaultChannelPromiseListeners) listeners).remove(listener);
} else if (listeners == listener) {
listeners = null;
}
}
}
public ChannelPromise addListener(GenericFutureListener<? extends Future> listener) {
super.addListener(listener);
return this;
}
@Override
public ChannelPromise removeListeners(ChannelFutureListener... listeners) {
if (listeners == null) {
throw new NullPointerException("listeners");
}
public ChannelPromise addListeners(GenericFutureListener<? extends Future>... listeners) {
super.addListeners(listeners);
return this;
}
for (ChannelFutureListener l: listeners) {
if (l == null) {
break;
}
removeListener(l);
}
@Override
public ChannelPromise removeListener(GenericFutureListener<? extends Future> listener) {
super.removeListener(listener);
return this;
}
@Override
public ChannelPromise removeListeners(GenericFutureListener<? extends Future>... listeners) {
super.removeListeners(listeners);
return this;
}
@Override
public ChannelPromise sync() throws InterruptedException {
await();
rethrowIfFailed();
super.sync();
return this;
}
@Override
public ChannelPromise syncUninterruptibly() {
awaitUninterruptibly();
rethrowIfFailed();
super.syncUninterruptibly();
return this;
}
private void rethrowIfFailed() {
Throwable cause = cause();
if (cause == null) {
return;
}
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
if (cause instanceof Error) {
throw (Error) cause;
}
throw new ChannelException(cause);
}
@Override
public ChannelPromise await() throws InterruptedException {
if (isDone()) {
return this;
}
if (Thread.interrupted()) {
throw new InterruptedException();
}
synchronized (this) {
while (!isDone()) {
checkDeadLock();
incWaiters();
try {
wait();
} finally {
decWaiters();
}
}
}
super.await();
return this;
}
@Override
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return await0(unit.toNanos(timeout), true);
}
@Override
public boolean await(long timeoutMillis) throws InterruptedException {
return await0(MILLISECONDS.toNanos(timeoutMillis), true);
}
@Override
public ChannelPromise awaitUninterruptibly() {
if (isDone()) {
return this;
}
boolean interrupted = false;
synchronized (this) {
while (!isDone()) {
checkDeadLock();
incWaiters();
try {
wait();
} catch (InterruptedException e) {
interrupted = true;
} finally {
decWaiters();
}
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
super.awaitUninterruptibly();
return this;
}
@Override
public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
try {
return await0(unit.toNanos(timeout), false);
} catch (InterruptedException e) {
throw new InternalError();
}
public long flushCheckpoint() {
return state & 0x000000FFFFFFFFFFL;
}
@Override
public boolean awaitUninterruptibly(long timeoutMillis) {
try {
return await0(MILLISECONDS.toNanos(timeoutMillis), false);
} catch (InterruptedException e) {
throw new InternalError();
}
}
private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
if (isDone()) {
return true;
}
if (timeoutNanos <= 0) {
return isDone();
}
if (interruptable && Thread.interrupted()) {
throw new InterruptedException();
}
long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime();
long waitTime = timeoutNanos;
boolean interrupted = false;
try {
synchronized (this) {
if (isDone()) {
return true;
}
if (waitTime <= 0) {
return isDone();
}
checkDeadLock();
incWaiters();
try {
for (;;) {
try {
wait(waitTime / 1000000, (int) (waitTime % 1000000));
} catch (InterruptedException e) {
if (interruptable) {
throw e;
} else {
interrupted = true;
}
}
if (isDone()) {
return true;
} else {
waitTime = timeoutNanos - (System.nanoTime() - startTime);
if (waitTime <= 0) {
return isDone();
}
}
}
} finally {
decWaiters();
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
private void checkDeadLock() {
if (channel().isRegistered() && channel().eventLoop().inEventLoop()) {
throw new BlockingOperationException();
}
}
@Override
public ChannelPromise setSuccess() {
if (set(SUCCESS)) {
notifyListeners();
return this;
}
throw new IllegalStateException();
}
@Override
public boolean trySuccess() {
if (set(SUCCESS)) {
notifyListeners();
return true;
}
return false;
}
@Override
public ChannelPromise setFailure(Throwable cause) {
if (set(cause)) {
notifyListeners();
return this;
}
throw new IllegalStateException();
}
@Override
public boolean tryFailure(Throwable cause) {
if (set(cause)) {
notifyListeners();
return true;
}
return false;
}
private boolean set(Throwable cause) {
if (isDone()) {
return false;
}
synchronized (this) {
// Allow only once.
if (isDone()) {
return false;
}
this.cause = cause;
if (hasWaiters()) {
notifyAll();
}
}
return true;
}
private void notifyListeners() {
// This method doesn't need synchronization because:
// 1) This method is always called after synchronized (this) block.
// Hence any listener list modification happens-before this method.
// 2) This method is called only when 'done' is true. Once 'done'
// becomes true, the listener list is never modified - see add/removeListener()
if (listeners == null) {
return;
}
if (channel().eventLoop().inEventLoop()) {
if (listeners instanceof DefaultChannelPromiseListeners) {
notifyListeners0(this, (DefaultChannelPromiseListeners) listeners);
} else {
notifyListener0(this, (ChannelFutureListener) listeners);
}
listeners = null;
} else {
final Object listeners = this.listeners;
this.listeners = null;
channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
if (listeners instanceof DefaultChannelPromiseListeners) {
notifyListeners0(DefaultChannelPromise.this, (DefaultChannelPromiseListeners) listeners);
} else {
notifyListener0(DefaultChannelPromise.this, (ChannelFutureListener) listeners);
}
}
});
}
}
private static void notifyListeners0(ChannelFuture f, DefaultChannelPromiseListeners listeners) {
final ChannelFutureListener[] a = listeners.listeners();
final int size = listeners.size();
for (int i = 0; i < size; i ++) {
notifyListener0(f, a[i]);
}
}
static void notifyListener(final ChannelFuture f, final ChannelFutureListener l) {
EventLoop loop = f.channel().eventLoop();
if (loop.inEventLoop()) {
final Integer stackDepth = LISTENER_STACK_DEPTH.get();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
LISTENER_STACK_DEPTH.set(stackDepth + 1);
try {
notifyListener0(f, l);
} finally {
LISTENER_STACK_DEPTH.set(stackDepth);
}
return;
}
}
loop.execute(new Runnable() {
@Override
public void run() {
notifyListener(f, l);
}
});
}
private static void notifyListener0(ChannelFuture f, ChannelFutureListener l) {
try {
l.operationComplete(f);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn(
"An exception was thrown by " +
ChannelFutureListener.class.getSimpleName() + '.', t);
}
}
}
@Override
long flushCheckpoint() {
return flushCheckpoint & 0x000000FFFFFFFFFFL;
}
@Override
void flushCheckpoint(long checkpoint) {
public void flushCheckpoint(long checkpoint) {
if ((checkpoint & 0xFFFFFF0000000000L) != 0) {
throw new IllegalStateException("flushCheckpoint overflow");
}
flushCheckpoint = flushCheckpoint & 0xFFFFFF0000000000L | checkpoint;
}
private boolean hasWaiters() {
return (flushCheckpoint & 0xFFFFFF0000000000L) != 0;
}
private void incWaiters() {
long waiters = waiters() + 1;
if ((waiters & 0xFFFFFFFFFF000000L) != 0) {
throw new IllegalStateException("too many waiters");
}
flushCheckpoint = flushCheckpoint() | waiters << 40L;
}
private void decWaiters() {
flushCheckpoint = flushCheckpoint() | waiters() - 1L << 40L;
}
private long waiters() {
return flushCheckpoint >>> 40;
state = state & 0xFFFFFF0000000000L | checkpoint;
}
@Override
ChannelPromise future() {
public ChannelPromise future() {
return this;
}
private static final class DefaultChannelPromiseListeners {
private ChannelFutureListener[] listeners;
private int size;
DefaultChannelPromiseListeners(ChannelFutureListener firstListener, ChannelFutureListener secondListener) {
listeners = new ChannelFutureListener[] { firstListener, secondListener };
size = 2;
}
void add(ChannelFutureListener l) {
ChannelFutureListener[] listeners = this.listeners;
final int size = this.size;
if (size == listeners.length) {
this.listeners = listeners = Arrays.copyOf(listeners, size << 1);
}
listeners[size] = l;
this.size = size + 1;
}
void remove(ChannelFutureListener l) {
final ChannelFutureListener[] listeners = this.listeners;
int size = this.size;
for (int i = 0; i < size; i ++) {
if (listeners[i] == l) {
int listenersToMove = size - i - 1;
if (listenersToMove > 0) {
System.arraycopy(listeners, i + 1, listeners, i, listenersToMove);
}
listeners[-- size] = null;
this.size = size;
return;
}
}
}
ChannelFutureListener[] listeners() {
return listeners;
}
int size() {
return size;
@Override
protected void checkDeadLock() {
if (channel().isRegistered()) {
super.checkDeadLock();
}
}
}

View File

@ -15,6 +15,8 @@
*/
package io.netty.channel;
import io.netty.util.concurrent.EventExecutor;
/**
* Will handle all the I/O-Operations for a {@link Channel} once it was registered.
*

View File

@ -15,8 +15,10 @@
*/
package io.netty.channel;
import io.netty.util.concurrent.EventExecutorGroup;
/**
* Special {@link EventExecutorGroup} which allows to register {@link Channel}'s that get
* Special {@link io.netty.util.concurrent.EventExecutorGroup} which allows to register {@link Channel}'s that get
* processed for later selection during the event loop.
*
*/

View File

@ -15,6 +15,9 @@
*/
package io.netty.channel;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.PlatformDependent;
/**
* The {@link CompleteChannelFuture} which is failed already. It is
* recommended to use {@link Channel#newFailedFuture(Throwable)}
@ -30,8 +33,8 @@ final class FailedChannelFuture extends CompleteChannelFuture {
* @param channel the {@link Channel} associated with this future
* @param cause the cause of failure
*/
public FailedChannelFuture(Channel channel, Throwable cause) {
super(channel);
public FailedChannelFuture(Channel channel, EventExecutor executor, Throwable cause) {
super(channel, executor);
if (cause == null) {
throw new NullPointerException("cause");
}
@ -50,23 +53,13 @@ final class FailedChannelFuture extends CompleteChannelFuture {
@Override
public ChannelFuture sync() {
return rethrow();
PlatformDependent.throwException(cause);
return this;
}
@Override
public ChannelFuture syncUninterruptibly() {
return rethrow();
}
private ChannelFuture rethrow() {
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
if (cause instanceof Error) {
throw (Error) cause;
}
throw new ChannelException(cause);
PlatformDependent.throwException(cause);
return this;
}
}

View File

@ -15,6 +15,8 @@
*/
package io.netty.channel;
import io.netty.util.concurrent.MultithreadEventExecutorGroup;
import java.util.concurrent.ThreadFactory;
/**

View File

@ -15,6 +15,10 @@
*/
package io.netty.channel;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.TaskScheduler;
import io.netty.util.concurrent.SingleThreadEventExecutor;
import java.util.concurrent.ThreadFactory;
/**
@ -25,10 +29,10 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im
/**
*
* @see SingleThreadEventExecutor#SingleThreadEventExecutor(EventExecutorGroup, ThreadFactory, ChannelTaskScheduler)
* @see SingleThreadEventExecutor#SingleThreadEventExecutor(EventExecutorGroup, ThreadFactory, TaskScheduler)
*/
protected SingleThreadEventLoop(
EventLoopGroup parent, ThreadFactory threadFactory, ChannelTaskScheduler scheduler) {
EventLoopGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) {
super(parent, threadFactory, scheduler);
}

View File

@ -15,6 +15,8 @@
*/
package io.netty.channel;
import io.netty.util.concurrent.EventExecutor;
/**
* The {@link CompleteChannelFuture} which is succeeded already. It is
* recommended to use {@link Channel#newSucceededFuture()} instead of
@ -27,8 +29,8 @@ final class SucceededChannelFuture extends CompleteChannelFuture {
*
* @param channel the {@link Channel} associated with this future
*/
public SucceededChannelFuture(Channel channel) {
super(channel);
public SucceededChannelFuture(Channel channel, EventExecutor executor) {
super(channel, executor);
}
@Override
@ -40,14 +42,4 @@ final class SucceededChannelFuture extends CompleteChannelFuture {
public boolean isSuccess() {
return true;
}
@Override
public ChannelFuture sync() {
return this;
}
@Override
public ChannelFuture syncUninterruptibly() {
return this;
}
}

View File

@ -15,6 +15,9 @@
*/
package io.netty.channel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.concurrent.TimeUnit;
final class VoidChannelPromise implements ChannelFuture.Unsafe, ChannelPromise {
@ -34,25 +37,25 @@ final class VoidChannelPromise implements ChannelFuture.Unsafe, ChannelPromise {
}
@Override
public ChannelPromise addListener(final ChannelFutureListener listener) {
public ChannelPromise addListener(GenericFutureListener<? extends Future> listener) {
fail();
return this;
}
@Override
public ChannelPromise addListeners(final ChannelFutureListener... listeners) {
public ChannelPromise addListeners(GenericFutureListener<? extends Future>... listeners) {
fail();
return this;
}
@Override
public ChannelPromise removeListener(ChannelFutureListener listener) {
public ChannelPromise removeListener(GenericFutureListener<? extends Future> listener) {
// NOOP
return this;
}
@Override
public ChannelPromise removeListeners(ChannelFutureListener... listeners) {
public ChannelPromise removeListeners(GenericFutureListener<? extends Future>... listeners) {
// NOOP
return this;
}

View File

@ -19,7 +19,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelTaskScheduler;
import io.netty.util.concurrent.TaskScheduler;
import io.netty.channel.SingleThreadEventLoop;
import java.util.ArrayList;
@ -59,7 +59,7 @@ final class AioEventLoop extends SingleThreadEventLoop {
}
};
AioEventLoop(AioEventLoopGroup parent, ThreadFactory threadFactory, ChannelTaskScheduler scheduler) {
AioEventLoop(AioEventLoopGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) {
super(parent, threadFactory, scheduler);
}

View File

@ -16,8 +16,8 @@
package io.netty.channel.aio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelTaskScheduler;
import io.netty.channel.EventExecutor;
import io.netty.util.concurrent.TaskScheduler;
import io.netty.util.concurrent.EventExecutor;
import io.netty.channel.EventLoopException;
import io.netty.channel.MultithreadEventLoopGroup;
@ -108,7 +108,7 @@ public class AioEventLoopGroup extends MultithreadEventLoopGroup {
@Override
protected EventExecutor newChild(
ThreadFactory threadFactory, ChannelTaskScheduler scheduler, Object... args) throws Exception {
ThreadFactory threadFactory, TaskScheduler scheduler, Object... args) throws Exception {
return new AioEventLoop(this, threadFactory, scheduler);
}

View File

@ -22,7 +22,6 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
@ -33,6 +32,7 @@ import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelStateHandlerAdapter;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -178,14 +178,7 @@ public abstract class AbstractEmbeddedChannel<O> extends AbstractChannel {
lastException = null;
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
}
if (t instanceof Error) {
throw (Error) t;
}
throw new ChannelException(t);
PlatformDependent.throwException(t);
}
protected final void ensureOpen() {

View File

@ -20,6 +20,11 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.FailedFuture;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.SucceededFuture;
import java.util.ArrayDeque;
import java.util.Collections;
@ -32,6 +37,7 @@ import java.util.concurrent.TimeUnit;
final class EmbeddedEventLoop extends AbstractExecutorService implements EventLoop {
private final SucceededFuture succeededFuture = new SucceededFuture(this);
private final Queue<Runnable> tasks = new ArrayDeque<Runnable>(2);
@Override
@ -134,4 +140,19 @@ final class EmbeddedEventLoop extends AbstractExecutorService implements EventLo
public EventLoopGroup parent() {
return this;
}
@Override
public Promise newPromise() {
return new DefaultPromise(this);
}
@Override
public Future newSucceededFuture() {
return succeededFuture;
}
@Override
public Future newFailedFuture(Throwable cause) {
return new FailedFuture(this, cause);
}
}

View File

@ -26,7 +26,7 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop;
import io.netty.channel.SingleThreadEventExecutor;
import io.netty.util.concurrent.SingleThreadEventExecutor;
import io.netty.channel.SingleThreadEventLoop;
import java.net.SocketAddress;

View File

@ -16,14 +16,14 @@
package io.netty.channel.local;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.ChannelTaskScheduler;
import io.netty.util.concurrent.TaskScheduler;
import java.util.concurrent.ThreadFactory;
final class LocalEventLoop extends SingleThreadEventLoop {
LocalEventLoop(
LocalEventLoopGroup parent, ThreadFactory threadFactory, ChannelTaskScheduler scheduler) {
LocalEventLoopGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) {
super(parent, threadFactory, scheduler);
}

View File

@ -15,9 +15,9 @@
*/
package io.netty.channel.local;
import io.netty.channel.EventExecutor;
import io.netty.util.concurrent.EventExecutor;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.ChannelTaskScheduler;
import io.netty.util.concurrent.TaskScheduler;
import java.util.concurrent.ThreadFactory;
@ -54,7 +54,7 @@ public class LocalEventLoopGroup extends MultithreadEventLoopGroup {
@Override
protected EventExecutor newChild(
ThreadFactory threadFactory, ChannelTaskScheduler scheduler, Object... args) throws Exception {
ThreadFactory threadFactory, TaskScheduler scheduler, Object... args) throws Exception {
return new LocalEventLoop(this, threadFactory, scheduler);
}
}

View File

@ -22,7 +22,7 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop;
import io.netty.channel.ServerChannel;
import io.netty.channel.SingleThreadEventExecutor;
import io.netty.util.concurrent.SingleThreadEventExecutor;
import io.netty.channel.SingleThreadEventLoop;
import java.net.SocketAddress;

View File

@ -18,7 +18,7 @@ package io.netty.channel.nio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelTaskScheduler;
import io.netty.util.concurrent.TaskScheduler;
import io.netty.channel.EventLoopException;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.nio.AbstractNioChannel.NioUnsafe;
@ -76,7 +76,7 @@ public final class NioEventLoop extends SingleThreadEventLoop {
NioEventLoop(
NioEventLoopGroup parent, ThreadFactory threadFactory,
ChannelTaskScheduler scheduler, SelectorProvider selectorProvider) {
TaskScheduler scheduler, SelectorProvider selectorProvider) {
super(parent, threadFactory, scheduler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");

View File

@ -16,8 +16,8 @@
package io.netty.channel.nio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelTaskScheduler;
import io.netty.channel.EventExecutor;
import io.netty.util.concurrent.TaskScheduler;
import io.netty.util.concurrent.EventExecutor;
import io.netty.channel.MultithreadEventLoopGroup;
import java.nio.channels.Selector;
@ -74,7 +74,7 @@ public class NioEventLoopGroup extends MultithreadEventLoopGroup {
@Override
protected EventExecutor newChild(
ThreadFactory threadFactory, ChannelTaskScheduler scheduler, Object... args) throws Exception {
ThreadFactory threadFactory, TaskScheduler scheduler, Object... args) throws Exception {
return new NioEventLoop(this, threadFactory, scheduler, (SelectorProvider) args[0]);
}
}

View File

@ -20,7 +20,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelTaskScheduler;
import io.netty.util.concurrent.TaskScheduler;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.util.internal.PlatformDependent;
@ -41,7 +41,7 @@ public class OioEventLoopGroup implements EventLoopGroup {
private static final StackTraceElement[] STACK_ELEMENTS = new StackTraceElement[0];
private final int maxChannels;
final ChannelTaskScheduler scheduler;
final TaskScheduler scheduler;
final ThreadFactory threadFactory;
final Set<OioEventLoop> activeChildren = Collections.newSetFromMap(
PlatformDependent.<OioEventLoop, Boolean>newConcurrentHashMap());
@ -91,7 +91,7 @@ public class OioEventLoopGroup implements EventLoopGroup {
this.maxChannels = maxChannels;
this.threadFactory = threadFactory;
scheduler = new ChannelTaskScheduler(threadFactory);
scheduler = new TaskScheduler(threadFactory);
tooManyChannels = new ChannelException("too many channels (max: " + maxChannels + ')');
tooManyChannels.setStackTrace(STACK_ELEMENTS);

View File

@ -205,16 +205,8 @@ public final class NioDatagramChannel
free = false;
return 1;
} catch (Throwable cause) {
if (cause instanceof Error) {
throw (Error) cause;
}
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
if (cause instanceof Exception) {
throw (Exception) cause;
}
throw new ChannelException(cause);
PlatformDependent.throwException(cause);
return -1;
} finally {
if (free) {
buffer.release();

View File

@ -27,6 +27,7 @@ import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramChannelConfig;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.DefaultDatagramChannelConfig;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -218,16 +219,8 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
}
return -1;
} catch (Throwable cause) {
if (cause instanceof Error) {
throw (Error) cause;
}
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
if (cause instanceof Exception) {
throw (Exception) cause;
}
throw new ChannelException(cause);
PlatformDependent.throwException(cause);
return -1;
} finally {
if (free) {
buffer.release();

View File

@ -18,6 +18,9 @@ package io.netty.channel;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import org.junit.Test;
import static org.junit.Assert.*;

View File

@ -15,13 +15,13 @@
*/
package io.netty.channel;
import static org.easymock.EasyMock.*;
import static org.junit.Assert.*;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import static org.easymock.EasyMock.*;
import static org.junit.Assert.*;
public class CompleteChannelFutureTest {
@ -62,7 +62,7 @@ public class CompleteChannelFutureTest {
private static class CompleteChannelFutureImpl extends CompleteChannelFuture {
CompleteChannelFutureImpl(Channel channel) {
super(channel);
super(channel, null);
}
@Override

View File

@ -15,17 +15,17 @@
*/
package io.netty.channel;
import org.junit.Test;
import static org.easymock.EasyMock.*;
import static org.junit.Assert.*;
import org.junit.Test;
public class FailedChannelFutureTest {
@Test
public void testConstantProperties() {
Channel channel = createMock(Channel.class);
Exception e = new Exception();
FailedChannelFuture future = new FailedChannelFuture(channel, e);
FailedChannelFuture future = new FailedChannelFuture(channel, null, e);
assertFalse(future.isSuccess());
assertSame(e, future.cause());
@ -33,6 +33,6 @@ public class FailedChannelFutureTest {
@Test(expected = NullPointerException.class)
public void shouldDisallowNullException() {
new FailedChannelFuture(createMock(Channel.class), null);
new FailedChannelFuture(createMock(Channel.class), null, null);
}
}

View File

@ -15,6 +15,7 @@
*/
package io.netty.channel;
import io.netty.util.concurrent.TaskScheduler;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -249,7 +250,7 @@ public class SingleThreadEventLoopTest {
SingleThreadEventLoopImpl() {
super(null, Executors.defaultThreadFactory(),
new ChannelTaskScheduler(Executors.defaultThreadFactory()));
new TaskScheduler(Executors.defaultThreadFactory()));
}
@Override

View File

@ -15,16 +15,16 @@
*/
package io.netty.channel;
import org.junit.Test;
import static org.easymock.EasyMock.*;
import static org.junit.Assert.*;
import org.junit.Test;
public class SucceededChannelFutureTest {
@Test
public void testConstantProperties() {
Channel channel = createMock(Channel.class);
SucceededChannelFuture future = new SucceededChannelFuture(channel);
SucceededChannelFuture future = new SucceededChannelFuture(channel, null);
assertTrue(future.isSuccess());
assertNull(future.cause());

View File

@ -29,8 +29,8 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundByteHandler;
import io.netty.channel.ChannelOutboundMessageHandler;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultEventExecutorGroup;
import io.netty.channel.EventExecutorGroup;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.channel.EventLoopGroup;
import org.junit.AfterClass;
import org.junit.Assert;