DefaultPromise concurrency bug

Motivation:
If the executor changes while listeners are added and notification of listeners is being done then listeners can be notified out of order and concurrently. We should ensure that only one executor is used at any given time to notify listeners and ensure the listeners are notified in FIFO order.

Modifications:
- Move the notifyingListeners member variable from DefaultPromise into the synchronized block to prevent concurrent notification of listeners and preserve FIFO notification order

Result:
If the executor is changed for a DefaultPromise the listener notification order should be FIFO.
This commit is contained in:
Scott Mitchell 2016-05-23 09:46:03 -07:00
parent 2618c2a649
commit f5d58e2e1a

View File

@ -64,7 +64,8 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
private short waiters;
/**
* Threading - EventExecutor. Only accessed inside the EventExecutor thread while notifying listeners.
* Threading - synchronized(this). We must prevent concurrent notification and FIFO listener notification if the
* executor changes.
*/
private boolean notifyingListeners;
@ -432,24 +433,31 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
private void notifyListeners0() {
Object listeners;
while (!notifyingListeners) {
synchronized (this) {
// Only proceed if there are listeners to notify and we are not already notifying listeners.
if (notifyingListeners || this.listeners == null) {
return;
}
notifyingListeners = true;
listeners = this.listeners;
this.listeners = null;
}
for (;;) {
if (listeners instanceof DefaultFutureListeners) {
notifyListeners0((DefaultFutureListeners) listeners);
} else {
notifyListener0(this, (GenericFutureListener<? extends Future<V>>) listeners);
}
synchronized (this) {
if (this.listeners == null) {
// Nothing can throw from within this method, so setting notifyingListeners back to false does not
// need to be in a finally block.
notifyingListeners = false;
return;
}
listeners = this.listeners;
this.listeners = null;
}
notifyingListeners = true;
try {
if (listeners instanceof DefaultFutureListeners) {
notifyListeners0((DefaultFutureListeners) listeners);
} else {
notifyListener0(this, (GenericFutureListener<? extends Future<V>>) listeners);
}
} finally {
notifyingListeners = false;
}
}
}