Fix a regression which could lead to GenericFutureListeners never been notifed. Part of [#2186].
This regression was introduced by commit c97f2d2de00ad74835067cb6f5a62cd4651d1161
This commit is contained in:
parent
ddb6e3672c
commit
dbb2198839
@ -567,9 +567,9 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
|
|||||||
final GenericFutureListener<? extends Future<V>> l =
|
final GenericFutureListener<? extends Future<V>> l =
|
||||||
(GenericFutureListener<? extends Future<V>>) listeners;
|
(GenericFutureListener<? extends Future<V>>) listeners;
|
||||||
notifyListener0(this, l);
|
notifyListener0(this, l);
|
||||||
this.listeners = null;
|
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
this.listeners = null;
|
||||||
LISTENER_STACK_DEPTH.set(stackDepth);
|
LISTENER_STACK_DEPTH.set(stackDepth);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
|
@ -19,8 +19,10 @@ package io.netty.util.concurrent;
|
|||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static org.hamcrest.CoreMatchers.*;
|
import static org.hamcrest.CoreMatchers.*;
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
@ -81,26 +83,9 @@ public class DefaultPromiseTest {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testListenerNotifyOrder() throws Exception {
|
public void testListenerNotifyOrder() throws Exception {
|
||||||
SingleThreadEventExecutor executor =
|
EventExecutor executor = new TestEventExecutor();
|
||||||
new SingleThreadEventExecutor(null, Executors.defaultThreadFactory(), true) {
|
|
||||||
@Override
|
|
||||||
protected void run() {
|
|
||||||
for (;;) {
|
|
||||||
Runnable task = takeTask();
|
|
||||||
if (task != null) {
|
|
||||||
task.run();
|
|
||||||
updateLastExecutionTime();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (confirmShutdown()) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
final BlockingQueue<FutureListener<Void>> listeners = new LinkedBlockingQueue<FutureListener<Void>>();
|
final BlockingQueue<FutureListener<Void>> listeners = new LinkedBlockingQueue<FutureListener<Void>>();
|
||||||
int runs = 20000;
|
int runs = 100000;
|
||||||
|
|
||||||
for (int i = 0; i < runs; i++) {
|
for (int i = 0; i < runs; i++) {
|
||||||
final Promise<Void> promise = new DefaultPromise<Void>(executor);
|
final Promise<Void> promise = new DefaultPromise<Void>(executor);
|
||||||
@ -148,4 +133,67 @@ public class DefaultPromiseTest {
|
|||||||
}
|
}
|
||||||
executor.shutdownGracefully().sync();
|
executor.shutdownGracefully().sync();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testListenerNotifyLater() throws Exception {
|
||||||
|
// Testing first execution path in DefaultPromise
|
||||||
|
testListenerNotifyLater(1);
|
||||||
|
|
||||||
|
// Testing second execution path in DefaultPromise
|
||||||
|
testListenerNotifyLater(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void testListenerNotifyLater(final int numListenersBefore) throws Exception {
|
||||||
|
EventExecutor executor = new TestEventExecutor();
|
||||||
|
int expectedCount = numListenersBefore + 2;
|
||||||
|
final CountDownLatch latch = new CountDownLatch(expectedCount);
|
||||||
|
final FutureListener<Void> listener = new FutureListener<Void>() {
|
||||||
|
@Override
|
||||||
|
public void operationComplete(Future<Void> future) throws Exception {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
final Promise<Void> promise = new DefaultPromise<Void>(executor);
|
||||||
|
executor.execute(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
for (int i = 0; i < numListenersBefore; i++) {
|
||||||
|
promise.addListener(listener);
|
||||||
|
}
|
||||||
|
promise.setSuccess(null);
|
||||||
|
|
||||||
|
GlobalEventExecutor.INSTANCE.execute(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
promise.addListener(listener);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
promise.addListener(listener);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
assertTrue("Should have notifed " + expectedCount + " listeners", latch.await(5, TimeUnit.SECONDS));
|
||||||
|
executor.shutdownGracefully().sync();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class TestEventExecutor extends SingleThreadEventExecutor {
|
||||||
|
TestEventExecutor() {
|
||||||
|
super(null, Executors.defaultThreadFactory(), true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void run() {
|
||||||
|
for (;;) {
|
||||||
|
Runnable task = takeTask();
|
||||||
|
if (task != null) {
|
||||||
|
task.run();
|
||||||
|
updateLastExecutionTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (confirmShutdown()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user