DefaultPromise LateListener notification order
Motivation: There is a notification ordering issue in DefaultPromise when the lateListener collection is in use. The ordering issue can be observed in situations where a late listener is added to a Future returned from a write operation. It is possible that this future will run after a read operation scheduled on the I/O thread, even if the late listener is added on the I/O thread. This can lead to unexpected ordering where a listener for a write operation which must complete in order for the read operation to happen is notified after the read operation is done. Modifications: - If the lateListener collection becomes empty, it should be treated as though it was null when checking if lateListeners can be notified immediatley (instead of executing a task on the executor) Result: Ordering is more natural and will not be perceived as being out of order relative to other tasks on the same executor.
This commit is contained in:
parent
01da38d21a
commit
eed8fe5d27
@ -18,6 +18,7 @@ package io.netty.util.concurrent;
|
|||||||
import io.netty.util.Signal;
|
import io.netty.util.Signal;
|
||||||
import io.netty.util.internal.EmptyArrays;
|
import io.netty.util.internal.EmptyArrays;
|
||||||
import io.netty.util.internal.InternalThreadLocalMap;
|
import io.netty.util.internal.InternalThreadLocalMap;
|
||||||
|
import io.netty.util.internal.OneTimeTask;
|
||||||
import io.netty.util.internal.PlatformDependent;
|
import io.netty.util.internal.PlatformDependent;
|
||||||
import io.netty.util.internal.StringUtil;
|
import io.netty.util.internal.StringUtil;
|
||||||
import io.netty.util.internal.logging.InternalLogger;
|
import io.netty.util.internal.logging.InternalLogger;
|
||||||
@ -27,7 +28,7 @@ import java.util.ArrayDeque;
|
|||||||
import java.util.concurrent.CancellationException;
|
import java.util.concurrent.CancellationException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static java.util.concurrent.TimeUnit.*;
|
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||||
|
|
||||||
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
|
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
|
||||||
|
|
||||||
@ -576,7 +577,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
|
|||||||
|
|
||||||
if (listeners instanceof DefaultFutureListeners) {
|
if (listeners instanceof DefaultFutureListeners) {
|
||||||
final DefaultFutureListeners dfl = (DefaultFutureListeners) listeners;
|
final DefaultFutureListeners dfl = (DefaultFutureListeners) listeners;
|
||||||
execute(executor, new Runnable() {
|
execute(executor, new OneTimeTask() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
notifyListeners0(DefaultPromise.this, dfl);
|
notifyListeners0(DefaultPromise.this, dfl);
|
||||||
@ -586,7 +587,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
|
|||||||
} else {
|
} else {
|
||||||
final GenericFutureListener<? extends Future<V>> l =
|
final GenericFutureListener<? extends Future<V>> l =
|
||||||
(GenericFutureListener<? extends Future<V>>) listeners;
|
(GenericFutureListener<? extends Future<V>>) listeners;
|
||||||
execute(executor, new Runnable() {
|
execute(executor, new OneTimeTask() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
notifyListener0(DefaultPromise.this, l);
|
notifyListener0(DefaultPromise.this, l);
|
||||||
@ -612,7 +613,9 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
|
|||||||
private void notifyLateListener(final GenericFutureListener<?> l) {
|
private void notifyLateListener(final GenericFutureListener<?> l) {
|
||||||
final EventExecutor executor = executor();
|
final EventExecutor executor = executor();
|
||||||
if (executor.inEventLoop()) {
|
if (executor.inEventLoop()) {
|
||||||
if (listeners == null && lateListeners == null) {
|
// Execute immediately if late listeners is empty. This allows subsequent late listeners
|
||||||
|
// that are added after completion to be notified immediately and preserver order.
|
||||||
|
if (listeners == null && (lateListeners == null || lateListeners.isEmpty())) {
|
||||||
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
|
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
|
||||||
final int stackDepth = threadLocals.futureListenerStackDepth();
|
final int stackDepth = threadLocals.futureListenerStackDepth();
|
||||||
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
|
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
|
||||||
@ -658,7 +661,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
execute(eventExecutor, new Runnable() {
|
execute(eventExecutor, new OneTimeTask() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
notifyListener0(future, l);
|
notifyListener0(future, l);
|
||||||
@ -752,7 +755,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
|
|||||||
if (listeners instanceof GenericProgressiveFutureListener[]) {
|
if (listeners instanceof GenericProgressiveFutureListener[]) {
|
||||||
final GenericProgressiveFutureListener<?>[] array =
|
final GenericProgressiveFutureListener<?>[] array =
|
||||||
(GenericProgressiveFutureListener<?>[]) listeners;
|
(GenericProgressiveFutureListener<?>[]) listeners;
|
||||||
execute(executor, new Runnable() {
|
execute(executor, new OneTimeTask() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
notifyProgressiveListeners0(self, array, progress, total);
|
notifyProgressiveListeners0(self, array, progress, total);
|
||||||
@ -761,7 +764,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
|
|||||||
} else {
|
} else {
|
||||||
final GenericProgressiveFutureListener<ProgressiveFuture<V>> l =
|
final GenericProgressiveFutureListener<ProgressiveFuture<V>> l =
|
||||||
(GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners;
|
(GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners;
|
||||||
execute(executor, new Runnable() {
|
execute(executor, new OneTimeTask() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
notifyProgressiveListener0(self, l, progress, total);
|
notifyProgressiveListener0(self, l, progress, total);
|
||||||
|
@ -23,9 +23,13 @@ import java.util.concurrent.CountDownLatch;
|
|||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import static org.hamcrest.CoreMatchers.*;
|
import static org.hamcrest.CoreMatchers.is;
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertSame;
|
||||||
|
import static org.junit.Assert.assertThat;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public class DefaultPromiseTest {
|
public class DefaultPromiseTest {
|
||||||
@ -84,54 +88,56 @@ public class DefaultPromiseTest {
|
|||||||
@Test
|
@Test
|
||||||
public void testListenerNotifyOrder() throws Exception {
|
public void testListenerNotifyOrder() throws Exception {
|
||||||
EventExecutor executor = new TestEventExecutor();
|
EventExecutor executor = new TestEventExecutor();
|
||||||
final BlockingQueue<FutureListener<Void>> listeners = new LinkedBlockingQueue<FutureListener<Void>>();
|
try {
|
||||||
int runs = 100000;
|
final BlockingQueue<FutureListener<Void>> listeners = new LinkedBlockingQueue<FutureListener<Void>>();
|
||||||
|
int runs = 100000;
|
||||||
|
|
||||||
for (int i = 0; i < runs; i++) {
|
for (int i = 0; i < runs; i++) {
|
||||||
final Promise<Void> promise = new DefaultPromise<Void>(executor);
|
final Promise<Void> promise = new DefaultPromise<Void>(executor);
|
||||||
final FutureListener<Void> listener1 = new FutureListener<Void>() {
|
final FutureListener<Void> listener1 = new FutureListener<Void>() {
|
||||||
@Override
|
@Override
|
||||||
public void operationComplete(Future<Void> future) throws Exception {
|
public void operationComplete(Future<Void> future) throws Exception {
|
||||||
listeners.add(this);
|
listeners.add(this);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
final FutureListener<Void> listener2 = new FutureListener<Void>() {
|
final FutureListener<Void> listener2 = new FutureListener<Void>() {
|
||||||
@Override
|
@Override
|
||||||
public void operationComplete(Future<Void> future) throws Exception {
|
public void operationComplete(Future<Void> future) throws Exception {
|
||||||
listeners.add(this);
|
listeners.add(this);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
final FutureListener<Void> listener4 = new FutureListener<Void>() {
|
final FutureListener<Void> listener4 = new FutureListener<Void>() {
|
||||||
@Override
|
@Override
|
||||||
public void operationComplete(Future<Void> future) throws Exception {
|
public void operationComplete(Future<Void> future) throws Exception {
|
||||||
listeners.add(this);
|
listeners.add(this);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
final FutureListener<Void> listener3 = new FutureListener<Void>() {
|
final FutureListener<Void> listener3 = new FutureListener<Void>() {
|
||||||
@Override
|
@Override
|
||||||
public void operationComplete(Future<Void> future) throws Exception {
|
public void operationComplete(Future<Void> future) throws Exception {
|
||||||
// Ensure listener4 is notified *after* this method returns to maintain the order.
|
listeners.add(this);
|
||||||
future.addListener(listener4);
|
future.addListener(listener4);
|
||||||
listeners.add(this);
|
}
|
||||||
}
|
};
|
||||||
};
|
|
||||||
|
|
||||||
GlobalEventExecutor.INSTANCE.execute(new Runnable() {
|
GlobalEventExecutor.INSTANCE.execute(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
promise.setSuccess(null);
|
promise.setSuccess(null);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
promise.addListener(listener1).addListener(listener2).addListener(listener3);
|
promise.addListener(listener1).addListener(listener2).addListener(listener3);
|
||||||
|
|
||||||
assertSame("Fail during run " + i + " / " + runs, listener1, listeners.take());
|
assertSame("Fail 1 during run " + i + " / " + runs, listener1, listeners.take());
|
||||||
assertSame("Fail during run " + i + " / " + runs, listener2, listeners.take());
|
assertSame("Fail 2 during run " + i + " / " + runs, listener2, listeners.take());
|
||||||
assertSame("Fail during run " + i + " / " + runs, listener3, listeners.take());
|
assertSame("Fail 3 during run " + i + " / " + runs, listener3, listeners.take());
|
||||||
assertSame("Fail during run " + i + " / " + runs, listener4, listeners.take());
|
assertSame("Fail 4 during run " + i + " / " + runs, listener4, listeners.take());
|
||||||
assertTrue("Fail during run " + i + " / " + runs, listeners.isEmpty());
|
assertTrue("Fail during run " + i + " / " + runs, listeners.isEmpty());
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
executor.shutdownGracefully(0, 0, TimeUnit.SECONDS).sync();
|
||||||
}
|
}
|
||||||
executor.shutdownGracefully().sync();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -145,7 +151,7 @@ public class DefaultPromiseTest {
|
|||||||
|
|
||||||
@Test(timeout = 2000)
|
@Test(timeout = 2000)
|
||||||
public void testPromiseListenerAddWhenCompleteFailure() throws Exception {
|
public void testPromiseListenerAddWhenCompleteFailure() throws Exception {
|
||||||
testPromiseListenerAddWhenComplete(new RuntimeException());
|
testPromiseListenerAddWhenComplete(fakeException());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 2000)
|
@Test(timeout = 2000)
|
||||||
@ -153,6 +159,103 @@ public class DefaultPromiseTest {
|
|||||||
testPromiseListenerAddWhenComplete(null);
|
testPromiseListenerAddWhenComplete(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 2000)
|
||||||
|
public void testLateListenerIsOrderedCorrectlySuccess() throws InterruptedException {
|
||||||
|
final EventExecutor executor = new TestEventExecutor();
|
||||||
|
try {
|
||||||
|
testLateListenerIsOrderedCorrectly(null);
|
||||||
|
} finally {
|
||||||
|
executor.shutdownGracefully(0, 0, TimeUnit.SECONDS).sync();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 2000)
|
||||||
|
public void testLateListenerIsOrderedCorrectlyFailure() throws InterruptedException {
|
||||||
|
final EventExecutor executor = new TestEventExecutor();
|
||||||
|
try {
|
||||||
|
testLateListenerIsOrderedCorrectly(fakeException());
|
||||||
|
} finally {
|
||||||
|
executor.shutdownGracefully(0, 0, TimeUnit.SECONDS).sync();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test is mean to simulate the following sequence of events, which all take place on the I/O thread:
|
||||||
|
* <ol>
|
||||||
|
* <li>A write is done</li>
|
||||||
|
* <li>The write operation completes, and the promise state is changed to done</li>
|
||||||
|
* <li>A listener is added to the return from the write. The {@link FutureListener#operationComplete()} updates
|
||||||
|
* state which must be invoked before the response to the previous write is read.</li>
|
||||||
|
* <li>The write operation</li>
|
||||||
|
* </ol>
|
||||||
|
*/
|
||||||
|
private static void testLateListenerIsOrderedCorrectly(Throwable cause) throws InterruptedException {
|
||||||
|
final EventExecutor executor = new TestEventExecutor();
|
||||||
|
try {
|
||||||
|
final AtomicInteger state = new AtomicInteger();
|
||||||
|
final CountDownLatch latch1 = new CountDownLatch(1);
|
||||||
|
final CountDownLatch latch2 = new CountDownLatch(2);
|
||||||
|
final Promise<Void> promise = new DefaultPromise<Void>(executor);
|
||||||
|
|
||||||
|
// Add a listener before completion so "lateListener" is used next time we add a listener.
|
||||||
|
promise.addListener(new FutureListener<Void>() {
|
||||||
|
@Override
|
||||||
|
public void operationComplete(Future<Void> future) throws Exception {
|
||||||
|
assertTrue(state.compareAndSet(0, 1));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Simulate write operation completing, which will execute listeners in another thread.
|
||||||
|
if (cause == null) {
|
||||||
|
promise.setSuccess(null);
|
||||||
|
} else {
|
||||||
|
promise.setFailure(cause);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add a "late listener"
|
||||||
|
promise.addListener(new FutureListener<Void>() {
|
||||||
|
@Override
|
||||||
|
public void operationComplete(Future<Void> future) throws Exception {
|
||||||
|
assertTrue(state.compareAndSet(1, 2));
|
||||||
|
latch1.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Wait for the listeners and late listeners to be completed.
|
||||||
|
latch1.await();
|
||||||
|
assertEquals(2, state.get());
|
||||||
|
|
||||||
|
// This is the important listener. A late listener that is added after all late listeners
|
||||||
|
// have completed, and needs to update state before a read operation (on the same executor).
|
||||||
|
executor.execute(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
promise.addListener(new FutureListener<Void>() {
|
||||||
|
@Override
|
||||||
|
public void operationComplete(Future<Void> future) throws Exception {
|
||||||
|
assertTrue(state.compareAndSet(2, 3));
|
||||||
|
latch2.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Simulate a read operation being queued up in the executor.
|
||||||
|
executor.execute(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
// This is the key, we depend upon the state being set in the next listener.
|
||||||
|
assertEquals(3, state.get());
|
||||||
|
latch2.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
latch2.await();
|
||||||
|
} finally {
|
||||||
|
executor.shutdownGracefully(0, 0, TimeUnit.SECONDS).sync();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static void testPromiseListenerAddWhenComplete(Throwable cause) throws InterruptedException {
|
private static void testPromiseListenerAddWhenComplete(Throwable cause) throws InterruptedException {
|
||||||
final CountDownLatch latch = new CountDownLatch(1);
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
final Promise<Void> promise = new DefaultPromise<Void>(ImmediateEventExecutor.INSTANCE);
|
final Promise<Void> promise = new DefaultPromise<Void>(ImmediateEventExecutor.INSTANCE);
|
||||||
@ -228,4 +331,8 @@ public class DefaultPromiseTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static RuntimeException fakeException() {
|
||||||
|
return new RuntimeException("fake exception");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user