From f2ed3e6ce8039d142e4c047fcc9cf09409105243 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Wed, 4 May 2016 22:58:40 -0700 Subject: [PATCH] DefaultPromise LateListener Logic Issues Motivation: The LateListener logic is prone to infinite loops and relies on being processed in the EventExecutor's thread for synchronization, but this EventExecutor may not be constant. An infinite loop can occur if the EventExecutor's execute method does not introduce a context switch in LateListener.run. The EventExecutor can be changed by classes which inherit from DefaultPromise. For example the DefaultChannelPromise will return w/e EventLoop the channel is registered to, but this EventLoop can change (re-registration). Modifications: - Remove the LateListener concept and instead use a single Object to maintain the listeners while still preserving notification order - Make the result member variable an atomic variable so it can be outside the synchronized(this) blocks - Cleanup/simplify existing state management code Result: Fixes https://github.com/netty/netty/issues/5185 --- .../netty/util/concurrent/DefaultPromise.java | 1115 +++++++---------- .../util/concurrent/DefaultPromiseTest.java | 6 +- .../channel/embedded/EmbeddedChannelTest.java | 20 +- 3 files changed, 498 insertions(+), 643 deletions(-) diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java index bb3eb02aa1..bd088391ac 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java @@ -24,45 +24,49 @@ import io.netty.util.internal.StringUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; -import java.util.ArrayDeque; import java.util.concurrent.CancellationException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import static io.netty.util.internal.ObjectUtil.checkNotNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; public class DefaultPromise extends AbstractFuture implements Promise { - private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class); private static final InternalLogger rejectedExecutionLogger = InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution"); - private static final int MAX_LISTENER_STACK_DEPTH = 8; + private static final AtomicReferenceFieldUpdater RESULT_UPDATER; private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class, "SUCCESS"); private static final Signal UNCANCELLABLE = Signal.valueOf(DefaultPromise.class, "UNCANCELLABLE"); private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(new CancellationException()); static { + AtomicReferenceFieldUpdater updater = + PlatformDependent.newAtomicReferenceFieldUpdater(DefaultPromise.class, "result"); + RESULT_UPDATER = updater == null ? AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, + Object.class, "result") : updater; CANCELLATION_CAUSE_HOLDER.cause.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE); } - private final EventExecutor executor; - private volatile Object result; - + private final EventExecutor executor; /** * One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}. * If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified. + * + * Threading - synchronized(this). We must support adding listeners when there is no EventExecutor. */ private Object listeners; + /** + * Threading - synchronized(this). We are required to hold the monitor to use Java's underlying wait()/notifyAll(). + */ + private short waiters; /** - * The list of the listeners that were added after the promise is done. Initially {@code null} and lazily - * instantiated when the late listener is scheduled to be notified later. Also used as a cached {@link Runnable} - * that performs the notification of the listeners it contains. + * Threading - EventExecutor. Only accessed inside the EventExecutor thread while notifying listeners. */ - private LateListeners lateListeners; - - private short waiters; + private boolean notifyingListeners; /** * Creates a new instance. @@ -73,10 +77,7 @@ public class DefaultPromise extends AbstractFuture implements Promise { * the {@link EventExecutor} which is used to notify the promise once it is complete */ public DefaultPromise(EventExecutor executor) { - if (executor == null) { - throw new NullPointerException("executor"); - } - this.executor = executor; + this.executor = checkNotNull(executor, "executor"); } protected DefaultPromise() { @@ -84,314 +85,6 @@ public class DefaultPromise extends AbstractFuture implements Promise { executor = null; } - protected EventExecutor executor() { - return executor; - } - - @Override - public boolean isCancelled() { - return isCancelled0(result); - } - - private static boolean isCancelled0(Object result) { - return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException; - } - - @Override - public boolean isCancellable() { - return result == null; - } - - @Override - public boolean isDone() { - return isDone0(result); - } - - private static boolean isDone0(Object result) { - return result != null && result != UNCANCELLABLE; - } - - @Override - public boolean isSuccess() { - Object result = this.result; - if (result == null || result == UNCANCELLABLE) { - return false; - } - return !(result instanceof CauseHolder); - } - - @Override - public Throwable cause() { - Object result = this.result; - if (result instanceof CauseHolder) { - return ((CauseHolder) result).cause; - } - return null; - } - - @Override - public Promise addListener(GenericFutureListener> listener) { - if (listener == null) { - throw new NullPointerException("listener"); - } - - if (isDone()) { - notifyLateListener(listener); - return this; - } - - synchronized (this) { - if (!isDone()) { - if (listeners == null) { - listeners = listener; - } else { - if (listeners instanceof DefaultFutureListeners) { - ((DefaultFutureListeners) listeners).add(listener); - } else { - final GenericFutureListener> firstListener = - (GenericFutureListener>) listeners; - listeners = new DefaultFutureListeners(firstListener, listener); - } - } - return this; - } - } - - notifyLateListener(listener); - return this; - } - - @Override - public Promise addListeners(GenericFutureListener>... listeners) { - if (listeners == null) { - throw new NullPointerException("listeners"); - } - - for (GenericFutureListener> l: listeners) { - if (l == null) { - break; - } - addListener(l); - } - return this; - } - - @Override - public Promise removeListener(GenericFutureListener> listener) { - if (listener == null) { - throw new NullPointerException("listener"); - } - - if (isDone()) { - return this; - } - - synchronized (this) { - if (!isDone()) { - if (listeners instanceof DefaultFutureListeners) { - ((DefaultFutureListeners) listeners).remove(listener); - } else if (listeners == listener) { - listeners = null; - } - } - } - - return this; - } - - @Override - public Promise removeListeners(GenericFutureListener>... listeners) { - if (listeners == null) { - throw new NullPointerException("listeners"); - } - - for (GenericFutureListener> l: listeners) { - if (l == null) { - break; - } - removeListener(l); - } - return this; - } - - @Override - public Promise sync() throws InterruptedException { - await(); - rethrowIfFailed(); - return this; - } - - @Override - public Promise syncUninterruptibly() { - awaitUninterruptibly(); - rethrowIfFailed(); - return this; - } - - private void rethrowIfFailed() { - Throwable cause = cause(); - if (cause == null) { - return; - } - - PlatformDependent.throwException(cause); - } - - @Override - public Promise await() throws InterruptedException { - if (isDone()) { - return this; - } - - if (Thread.interrupted()) { - throw new InterruptedException(toString()); - } - - synchronized (this) { - while (!isDone()) { - checkDeadLock(); - incWaiters(); - try { - wait(); - } finally { - decWaiters(); - } - } - } - return this; - } - - @Override - public boolean await(long timeout, TimeUnit unit) - throws InterruptedException { - return await0(unit.toNanos(timeout), true); - } - - @Override - public boolean await(long timeoutMillis) throws InterruptedException { - return await0(MILLISECONDS.toNanos(timeoutMillis), true); - } - - @Override - public Promise awaitUninterruptibly() { - if (isDone()) { - return this; - } - - boolean interrupted = false; - synchronized (this) { - while (!isDone()) { - checkDeadLock(); - incWaiters(); - try { - wait(); - } catch (InterruptedException e) { - // Interrupted while waiting. - interrupted = true; - } finally { - decWaiters(); - } - } - } - - if (interrupted) { - Thread.currentThread().interrupt(); - } - - return this; - } - - @Override - public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { - try { - return await0(unit.toNanos(timeout), false); - } catch (InterruptedException e) { - // Should not be raised at all. - throw new InternalError(); - } - } - - @Override - public boolean awaitUninterruptibly(long timeoutMillis) { - try { - return await0(MILLISECONDS.toNanos(timeoutMillis), false); - } catch (InterruptedException e) { - // Should not be raised at all. - throw new InternalError(); - } - } - - private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException { - if (isDone()) { - return true; - } - - if (timeoutNanos <= 0) { - return isDone(); - } - - if (interruptable && Thread.interrupted()) { - throw new InterruptedException(toString()); - } - - long startTime = System.nanoTime(); - long waitTime = timeoutNanos; - boolean interrupted = false; - - try { - synchronized (this) { - if (isDone()) { - return true; - } - - if (waitTime <= 0) { - return isDone(); - } - - checkDeadLock(); - incWaiters(); - try { - for (;;) { - try { - wait(waitTime / 1000000, (int) (waitTime % 1000000)); - } catch (InterruptedException e) { - if (interruptable) { - throw e; - } else { - interrupted = true; - } - } - - if (isDone()) { - return true; - } else { - waitTime = timeoutNanos - (System.nanoTime() - startTime); - if (waitTime <= 0) { - return isDone(); - } - } - } - } finally { - decWaiters(); - } - } - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); - } - } - } - - /** - * Do deadlock checks - */ - protected void checkDeadLock() { - EventExecutor e = executor(); - if (e != null && e.inEventLoop()) { - throw new BlockingOperationException(toString()); - } - } - @Override public Promise setSuccess(V result) { if (setSuccess0(result)) { @@ -428,96 +121,180 @@ public class DefaultPromise extends AbstractFuture implements Promise { return false; } - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - Object result = this.result; - if (isDone0(result) || result == UNCANCELLABLE) { - return false; - } - - synchronized (this) { - // Allow only once. - result = this.result; - if (isDone0(result) || result == UNCANCELLABLE) { - return false; - } - - this.result = CANCELLATION_CAUSE_HOLDER; - if (hasWaiters()) { - notifyAll(); - } - } - - notifyListeners(); - return true; - } - @Override public boolean setUncancellable() { + if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) { + return true; + } Object result = this.result; - if (isDone0(result)) { - return !isCancelled0(result); - } - - synchronized (this) { - // Allow only once. - result = this.result; - if (isDone0(result)) { - return !isCancelled0(result); - } - - this.result = UNCANCELLABLE; - } - return true; - } - - private boolean setFailure0(Throwable cause) { - if (cause == null) { - throw new NullPointerException("cause"); - } - - if (isDone()) { - return false; - } - - synchronized (this) { - // Allow only once. - if (isDone()) { - return false; - } - - result = new CauseHolder(cause); - if (hasWaiters()) { - notifyAll(); - } - } - return true; - } - - private boolean setSuccess0(V result) { - if (isDone()) { - return false; - } - - synchronized (this) { - // Allow only once. - if (isDone()) { - return false; - } - if (result == null) { - this.result = SUCCESS; - } else { - this.result = result; - } - if (hasWaiters()) { - notifyAll(); - } - } - return true; + return !isDone0(result) || !isCancelled0(result); + } + + @Override + public boolean isSuccess() { + Object result = this.result; + return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder); + } + + @Override + public boolean isCancellable() { + return result == null; + } + + @Override + public Throwable cause() { + Object result = this.result; + return (result instanceof CauseHolder) ? ((CauseHolder) result).cause : null; + } + + @Override + public Promise addListener(GenericFutureListener> listener) { + checkNotNull(listener, "listener"); + + synchronized (this) { + addListener0(listener); + } + + if (isDone()) { + notifyListeners(); + } + + return this; + } + + @Override + public Promise addListeners(GenericFutureListener>... listeners) { + checkNotNull(listeners, "listeners"); + + synchronized (this) { + for (GenericFutureListener> listener : listeners) { + if (listener == null) { + break; + } + addListener0(listener); + } + } + + if (isDone()) { + notifyListeners(); + } + + return this; + } + + @Override + public Promise removeListener(final GenericFutureListener> listener) { + checkNotNull(listener, "listener"); + + synchronized (this) { + removeListener0(listener); + } + + return this; + } + + @Override + public Promise removeListeners(final GenericFutureListener>... listeners) { + checkNotNull(listeners, "listeners"); + + synchronized (this) { + for (GenericFutureListener> listener : listeners) { + if (listener == null) { + break; + } + removeListener0(listener); + } + } + + return this; + } + + @Override + public Promise await() throws InterruptedException { + if (isDone()) { + return this; + } + + if (Thread.interrupted()) { + throw new InterruptedException(toString()); + } + + checkDeadLock(); + + synchronized (this) { + while (!isDone()) { + incWaiters(); + try { + wait(); + } finally { + decWaiters(); + } + } + } + return this; + } + + @Override + public Promise awaitUninterruptibly() { + if (isDone()) { + return this; + } + + checkDeadLock(); + + boolean interrupted = false; + synchronized (this) { + while (!isDone()) { + incWaiters(); + try { + wait(); + } catch (InterruptedException e) { + // Interrupted while waiting. + interrupted = true; + } finally { + decWaiters(); + } + } + } + + if (interrupted) { + Thread.currentThread().interrupt(); + } + + return this; + } + + @Override + public boolean await(long timeout, TimeUnit unit) throws InterruptedException { + return await0(unit.toNanos(timeout), true); + } + + @Override + public boolean await(long timeoutMillis) throws InterruptedException { + return await0(MILLISECONDS.toNanos(timeoutMillis), true); + } + + @Override + public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { + try { + return await0(unit.toNanos(timeout), false); + } catch (InterruptedException e) { + // Should not be raised at all. + throw new InternalError(); + } + } + + @Override + public boolean awaitUninterruptibly(long timeoutMillis) { + try { + return await0(MILLISECONDS.toNanos(timeoutMillis), false); + } catch (InterruptedException e) { + // Should not be raised at all. + throw new InternalError(); + } } @Override - @SuppressWarnings("unchecked") public V getNow() { Object result = this.result; if (result instanceof CauseHolder || result == SUCCESS) { @@ -526,127 +303,93 @@ public class DefaultPromise extends AbstractFuture implements Promise { return (V) result; } - private boolean hasWaiters() { - return waiters > 0; + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) { + checkNotifyWaiters(); + notifyListeners(); + return true; + } + return false; } - private void incWaiters() { - if (waiters == Short.MAX_VALUE) { - throw new IllegalStateException("too many waiters: " + this); - } - waiters ++; + @Override + public boolean isCancelled() { + return isCancelled0(result); } - private void decWaiters() { - waiters --; + @Override + public boolean isDone() { + return isDone0(result); } - private void notifyListeners() { - // This method doesn't need synchronization because: - // 1) This method is always called after synchronized (this) block. - // Hence any listener list modification happens-before this method. - // 2) This method is called only when 'done' is true. Once 'done' - // becomes true, the listener list is never modified - see add/removeListener() + @Override + public Promise sync() throws InterruptedException { + await(); + rethrowIfFailed(); + return this; + } - Object listeners = this.listeners; - if (listeners == null) { - return; - } + @Override + public Promise syncUninterruptibly() { + awaitUninterruptibly(); + rethrowIfFailed(); + return this; + } - EventExecutor executor = executor(); - if (executor.inEventLoop()) { - final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); - final int stackDepth = threadLocals.futureListenerStackDepth(); - if (stackDepth < MAX_LISTENER_STACK_DEPTH) { - threadLocals.setFutureListenerStackDepth(stackDepth + 1); - try { - if (listeners instanceof DefaultFutureListeners) { - notifyListeners0(this, (DefaultFutureListeners) listeners); - } else { - final GenericFutureListener> l = - (GenericFutureListener>) listeners; - notifyListener0(this, l); - } - } finally { - this.listeners = null; - threadLocals.setFutureListenerStackDepth(stackDepth); - } - return; - } - } + @Override + public String toString() { + return toStringBuilder().toString(); + } - if (listeners instanceof DefaultFutureListeners) { - final DefaultFutureListeners dfl = (DefaultFutureListeners) listeners; - safeExecute(executor, new OneTimeTask() { - @Override - public void run() { - notifyListeners0(DefaultPromise.this, dfl); - DefaultPromise.this.listeners = null; - } - }); + protected StringBuilder toStringBuilder() { + StringBuilder buf = new StringBuilder(64) + .append(StringUtil.simpleClassName(this)) + .append('@') + .append(Integer.toHexString(hashCode())); + + Object result = this.result; + if (result == SUCCESS) { + buf.append("(success)"); + } else if (result == UNCANCELLABLE) { + buf.append("(uncancellable)"); + } else if (result instanceof CauseHolder) { + buf.append("(failure: ") + .append(((CauseHolder) result).cause) + .append(')'); + } else if (result != null) { + buf.append("(success: ") + .append(result) + .append(')'); } else { - final GenericFutureListener> l = - (GenericFutureListener>) listeners; - safeExecute(executor, new OneTimeTask() { - @Override - public void run() { - notifyListener0(DefaultPromise.this, l); - DefaultPromise.this.listeners = null; - } - }); + buf.append("(incomplete)"); } + + return buf; } - private static void notifyListeners0(Future future, DefaultFutureListeners listeners) { - final GenericFutureListener[] a = listeners.listeners(); - final int size = listeners.size(); - for (int i = 0; i < size; i ++) { - notifyListener0(future, a[i]); + protected EventExecutor executor() { + return executor; + } + + protected void checkDeadLock() { + EventExecutor e = executor(); + if (e != null && e.inEventLoop()) { + throw new BlockingOperationException(toString()); } } /** - * Notifies the specified listener which were added after this promise is already done. - * This method ensures that the specified listener is not notified until {@link #listeners} becomes {@code null} - * to avoid the case where the late listeners are notified even before the early listeners are notified. + * Notify a listener that a future has completed. + *

+ * This method has a fixed depth of {@link #MAX_LISTENER_STACK_DEPTH} that will limit recursion to prevent + * {@link StackOverflowError} and will stop notifying listeners added after this threshold is exceeded. + * @param eventExecutor the executor to use to notify the listener {@code l}. + * @param future the future that is complete. + * @param l the listener to notify. */ - private void notifyLateListener(final GenericFutureListener l) { - final EventExecutor executor = executor(); - if (executor.inEventLoop()) { - // Execute immediately if late listeners is empty. This allows subsequent late listeners - // that are added after completion to be notified immediately and preserver order. - if (listeners == null && (lateListeners == null || lateListeners.isEmpty())) { - final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); - final int stackDepth = threadLocals.futureListenerStackDepth(); - if (stackDepth < MAX_LISTENER_STACK_DEPTH) { - threadLocals.setFutureListenerStackDepth(stackDepth + 1); - try { - notifyListener0(this, l); - } finally { - threadLocals.setFutureListenerStackDepth(stackDepth); - } - return; - } - } else { - LateListeners lateListeners = this.lateListeners; - if (lateListeners == null) { - this.lateListeners = lateListeners = new LateListeners(); - } - lateListeners.add(l); - executor.execute(lateListeners); - return; - } - } - - // Add the late listener to lateListeners in the executor thread for thread safety. - // We could just make LateListeners extend ConcurrentLinkedQueue, but it's an overkill considering - // that most asynchronous applications won't execute this code path. - executor.execute(new LateListenerNotifier(l)); - } - protected static void notifyListener( - final EventExecutor eventExecutor, final Future future, final GenericFutureListener l) { - + EventExecutor eventExecutor, final Future future, final GenericFutureListener l) { if (eventExecutor.inEventLoop()) { final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); final int stackDepth = threadLocals.futureListenerStackDepth(); @@ -669,21 +412,222 @@ public class DefaultPromise extends AbstractFuture implements Promise { }); } - private static void safeExecute(EventExecutor executor, Runnable task) { - try { - executor.execute(task); - } catch (Throwable t) { - rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t); + private void notifyListeners() { + // Modifications to listeners should be done in a synchronized block before this, and should be visible here. + if (listeners == null) { + return; + } + EventExecutor executor = executor(); + if (executor.inEventLoop()) { + notifyListeners0(); + return; + } + safeExecute(executor, new OneTimeTask() { + @Override + public void run() { + notifyListeners0(); + } + }); + } + + private void notifyListeners0() { + Object listeners; + while (!notifyingListeners) { + synchronized (this) { + if (this.listeners == null) { + return; + } + listeners = this.listeners; + this.listeners = null; + } + notifyingListeners = true; + try { + if (listeners instanceof DefaultFutureListeners) { + notifyListeners0((DefaultFutureListeners) listeners); + } else { + notifyListener0(this, (GenericFutureListener>) listeners); + } + } finally { + notifyingListeners = false; + } + } + } + + private void notifyListeners0(DefaultFutureListeners listeners) { + GenericFutureListener[] a = listeners.listeners(); + int size = listeners.size(); + for (int i = 0; i < size; i ++) { + notifyListener0(this, a[i]); } } @SuppressWarnings({ "unchecked", "rawtypes" }) - static void notifyListener0(Future future, GenericFutureListener l) { + private static void notifyListener0(Future future, GenericFutureListener l) { try { l.operationComplete(future); } catch (Throwable t) { - if (logger.isWarnEnabled()) { - logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t); + logger.warn("An exception was thrown by {}.operationComplete()", l.getClass().getName(), t); + } + } + + private void addListener0(GenericFutureListener> listener) { + if (listeners == null) { + listeners = listener; + } else if (listeners instanceof DefaultFutureListeners) { + ((DefaultFutureListeners) listeners).add(listener); + } else { + listeners = new DefaultFutureListeners((GenericFutureListener>) listeners, listener); + } + } + + private void removeListener0(GenericFutureListener> listener) { + if (listeners instanceof DefaultFutureListeners) { + ((DefaultFutureListeners) listeners).remove(listener); + } else if (listeners == listener) { + listeners = null; + } + } + + private boolean setSuccess0(V result) { + return setValue0(result == null ? SUCCESS : result); + } + + private boolean setFailure0(Throwable cause) { + return setValue0(new CauseHolder(checkNotNull(cause, "cause"))); + } + + private boolean setValue0(Object objResult) { + if (RESULT_UPDATER.compareAndSet(this, null, objResult) || + RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { + checkNotifyWaiters(); + return true; + } + return false; + } + + private synchronized void checkNotifyWaiters() { + if (waiters > 0) { + notifyAll(); + } + } + + private void incWaiters() { + if (waiters == Short.MAX_VALUE) { + throw new IllegalStateException("too many waiters: " + this); + } + ++waiters; + } + + private void decWaiters() { + --waiters; + } + + private void rethrowIfFailed() { + Throwable cause = cause(); + if (cause == null) { + return; + } + + PlatformDependent.throwException(cause); + } + + private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException { + if (isDone()) { + return true; + } + + if (timeoutNanos <= 0) { + return isDone(); + } + + if (interruptable && Thread.interrupted()) { + throw new InterruptedException(toString()); + } + + checkDeadLock(); + + long startTime = System.nanoTime(); + long waitTime = timeoutNanos; + boolean interrupted = false; + try { + for (;;) { + synchronized (this) { + incWaiters(); + try { + wait(waitTime / 1000000, (int) (waitTime % 1000000)); + } catch (InterruptedException e) { + if (interruptable) { + throw e; + } else { + interrupted = true; + } + } finally { + decWaiters(); + } + } + if (isDone()) { + return true; + } else { + waitTime = timeoutNanos - (System.nanoTime() - startTime); + if (waitTime <= 0) { + return isDone(); + } + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + + /** + * Notify all progressive listeners. + *

+ * No attempt is made to ensure notification order if multiple calls are made to this method before + * the original invocation completes. + *

+ * This will do an iteration over all listeners to get all of type {@link GenericProgressiveFutureListener}s. + * @param progress the new progress. + * @param total the total progress. + */ + @SuppressWarnings("unchecked") + void notifyProgressiveListeners(final long progress, final long total) { + final Object listeners = progressiveListeners(); + if (listeners == null) { + return; + } + + final ProgressiveFuture self = (ProgressiveFuture) this; + + EventExecutor executor = executor(); + if (executor.inEventLoop()) { + if (listeners instanceof GenericProgressiveFutureListener[]) { + notifyProgressiveListeners0( + self, (GenericProgressiveFutureListener[]) listeners, progress, total); + } else { + notifyProgressiveListener0( + self, (GenericProgressiveFutureListener>) listeners, progress, total); + } + } else { + if (listeners instanceof GenericProgressiveFutureListener[]) { + final GenericProgressiveFutureListener[] array = + (GenericProgressiveFutureListener[]) listeners; + safeExecute(executor, new OneTimeTask() { + @Override + public void run() { + notifyProgressiveListeners0(self, array, progress, total); + } + }); + } else { + final GenericProgressiveFutureListener> l = + (GenericProgressiveFutureListener>) listeners; + safeExecute(executor, new OneTimeTask() { + @Override + public void run() { + notifyProgressiveListener0(self, l, progress, total); + } + }); } } } @@ -733,47 +677,6 @@ public class DefaultPromise extends AbstractFuture implements Promise { } } - @SuppressWarnings("unchecked") - void notifyProgressiveListeners(final long progress, final long total) { - final Object listeners = progressiveListeners(); - if (listeners == null) { - return; - } - - final ProgressiveFuture self = (ProgressiveFuture) this; - - EventExecutor executor = executor(); - if (executor.inEventLoop()) { - if (listeners instanceof GenericProgressiveFutureListener[]) { - notifyProgressiveListeners0( - self, (GenericProgressiveFutureListener[]) listeners, progress, total); - } else { - notifyProgressiveListener0( - self, (GenericProgressiveFutureListener>) listeners, progress, total); - } - } else { - if (listeners instanceof GenericProgressiveFutureListener[]) { - final GenericProgressiveFutureListener[] array = - (GenericProgressiveFutureListener[]) listeners; - safeExecute(executor, new OneTimeTask() { - @Override - public void run() { - notifyProgressiveListeners0(self, array, progress, total); - } - }); - } else { - final GenericProgressiveFutureListener> l = - (GenericProgressiveFutureListener>) listeners; - safeExecute(executor, new OneTimeTask() { - @Override - public void run() { - notifyProgressiveListener0(self, l, progress, total); - } - }); - } - } - } - private static void notifyProgressiveListeners0( ProgressiveFuture future, GenericProgressiveFutureListener[] listeners, long progress, long total) { for (GenericProgressiveFutureListener l: listeners) { @@ -790,12 +693,18 @@ public class DefaultPromise extends AbstractFuture implements Promise { try { l.operationProgressed(future, progress, total); } catch (Throwable t) { - if (logger.isWarnEnabled()) { - logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationProgressed()", t); - } + logger.warn("An exception was thrown by {}.operationProgressed()", l.getClass().getName(), t); } } + private static boolean isCancelled0(Object result) { + return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException; + } + + private static boolean isDone0(Object result) { + return result != null && result != UNCANCELLABLE; + } + private static final class CauseHolder { final Throwable cause; CauseHolder(Throwable cause) { @@ -803,83 +712,11 @@ public class DefaultPromise extends AbstractFuture implements Promise { } } - @Override - public String toString() { - return toStringBuilder().toString(); - } - - protected StringBuilder toStringBuilder() { - StringBuilder buf = new StringBuilder(64) - .append(StringUtil.simpleClassName(this)) - .append('@') - .append(Integer.toHexString(hashCode())); - - Object result = this.result; - if (result == SUCCESS) { - buf.append("(success)"); - } else if (result == UNCANCELLABLE) { - buf.append("(uncancellable)"); - } else if (result instanceof CauseHolder) { - buf.append("(failure: ") - .append(((CauseHolder) result).cause) - .append(')'); - } else if (result != null) { - buf.append("(success: ") - .append(result) - .append(')'); - } else { - buf.append("(incomplete)"); - } - - return buf; - } - - private final class LateListeners extends ArrayDeque> implements Runnable { - - private static final long serialVersionUID = -687137418080392244L; - - LateListeners() { - super(2); - } - - @Override - public void run() { - final EventExecutor executor = executor(); - if (listeners == null || executor == ImmediateEventExecutor.INSTANCE) { - for (;;) { - GenericFutureListener l = poll(); - if (l == null) { - break; - } - notifyListener0(DefaultPromise.this, l); - } - } else { - // Reschedule until the initial notification is done to avoid the race condition - // where the notification is made in an incorrect order. - safeExecute(executor, this); - } - } - } - - private final class LateListenerNotifier implements Runnable { - private GenericFutureListener l; - - LateListenerNotifier(GenericFutureListener l) { - this.l = l; - } - - @Override - public void run() { - LateListeners lateListeners = DefaultPromise.this.lateListeners; - if (l != null) { - if (lateListeners == null) { - DefaultPromise.this.lateListeners = lateListeners = new LateListeners(); - } - lateListeners.add(l); - l = null; - } - - lateListeners.run(); + private static void safeExecute(EventExecutor executor, Runnable task) { + try { + executor.execute(task); + } catch (Throwable t) { + rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t); } } } diff --git a/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java b/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java index ab5d4f03fd..fa0154d9fb 100644 --- a/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java +++ b/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java @@ -84,7 +84,7 @@ public class DefaultPromiseTest { p[i].addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { - DefaultPromise.notifyListener(ImmediateEventExecutor.INSTANCE, future, new FutureListener() { + future.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (finalI + 1 < p.length) { @@ -192,8 +192,8 @@ public class DefaultPromiseTest { *

    *
  1. A write is done
  2. *
  3. The write operation completes, and the promise state is changed to done
  4. - *
  5. A listener is added to the return from the write. The {@link FutureListener#operationComplete()} updates - * state which must be invoked before the response to the previous write is read.
  6. + *
  7. A listener is added to the return from the write. The {@link FutureListener#operationComplete(Future)} + * updates state which must be invoked before the response to the previous write is read.
  8. *
  9. The write operation
  10. *
*/ diff --git a/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java b/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java index 5d35360d14..658e46bc2f 100644 --- a/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java +++ b/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java @@ -19,6 +19,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; @@ -39,10 +40,27 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; public class EmbeddedChannelTest { + @Test(timeout = 2000) + public void promiseDoesNotInfiniteLoop() throws InterruptedException { + EmbeddedChannel channel = new EmbeddedChannel(); + channel.closeFuture().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + future.channel().close(); + } + }); + + channel.close().syncUninterruptibly(); + } + @Test public void testConstructWithChannelInitializer() { final Integer first = 1;