Do now swallow an exception triggered by late listener notification
Related: #3449 Motivation: When a user shut down an EventExecutor/Loop prematurely, a Promise will fail to execute its listeners. When it happens, DefaultPromise will log a message at ERROR level, but there's no way to get notified about it programmatically. Modifications: Do not catch and log the RejectedExecutionException unconditionally, but only catch and log for non-late listener notifications, so that a user gets notified on submission failure at least when the listener is late. Result: Remedies #3449 to some extent, although we will need fundamental fix for that, such as #3566
This commit is contained in:
parent
d0943dcd30
commit
4fa5d2cf52
@ -577,7 +577,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
|
|||||||
|
|
||||||
if (listeners instanceof DefaultFutureListeners) {
|
if (listeners instanceof DefaultFutureListeners) {
|
||||||
final DefaultFutureListeners dfl = (DefaultFutureListeners) listeners;
|
final DefaultFutureListeners dfl = (DefaultFutureListeners) listeners;
|
||||||
execute(executor, new OneTimeTask() {
|
safeExecute(executor, new OneTimeTask() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
notifyListeners0(DefaultPromise.this, dfl);
|
notifyListeners0(DefaultPromise.this, dfl);
|
||||||
@ -587,7 +587,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
|
|||||||
} else {
|
} else {
|
||||||
final GenericFutureListener<? extends Future<V>> l =
|
final GenericFutureListener<? extends Future<V>> l =
|
||||||
(GenericFutureListener<? extends Future<V>>) listeners;
|
(GenericFutureListener<? extends Future<V>>) listeners;
|
||||||
execute(executor, new OneTimeTask() {
|
safeExecute(executor, new OneTimeTask() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
notifyListener0(DefaultPromise.this, l);
|
notifyListener0(DefaultPromise.this, l);
|
||||||
@ -633,7 +633,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
|
|||||||
this.lateListeners = lateListeners = new LateListeners();
|
this.lateListeners = lateListeners = new LateListeners();
|
||||||
}
|
}
|
||||||
lateListeners.add(l);
|
lateListeners.add(l);
|
||||||
execute(executor, lateListeners);
|
executor.execute(lateListeners);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -641,7 +641,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
|
|||||||
// Add the late listener to lateListeners in the executor thread for thread safety.
|
// Add the late listener to lateListeners in the executor thread for thread safety.
|
||||||
// We could just make LateListeners extend ConcurrentLinkedQueue, but it's an overkill considering
|
// We could just make LateListeners extend ConcurrentLinkedQueue, but it's an overkill considering
|
||||||
// that most asynchronous applications won't execute this code path.
|
// that most asynchronous applications won't execute this code path.
|
||||||
execute(executor, new LateListenerNotifier(l));
|
executor.execute(new LateListenerNotifier(l));
|
||||||
}
|
}
|
||||||
|
|
||||||
protected static void notifyListener(
|
protected static void notifyListener(
|
||||||
@ -661,7 +661,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
execute(eventExecutor, new OneTimeTask() {
|
safeExecute(eventExecutor, new OneTimeTask() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
notifyListener0(future, l);
|
notifyListener0(future, l);
|
||||||
@ -669,7 +669,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void execute(EventExecutor executor, Runnable task) {
|
private static void safeExecute(EventExecutor executor, Runnable task) {
|
||||||
try {
|
try {
|
||||||
executor.execute(task);
|
executor.execute(task);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
@ -755,7 +755,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
|
|||||||
if (listeners instanceof GenericProgressiveFutureListener[]) {
|
if (listeners instanceof GenericProgressiveFutureListener[]) {
|
||||||
final GenericProgressiveFutureListener<?>[] array =
|
final GenericProgressiveFutureListener<?>[] array =
|
||||||
(GenericProgressiveFutureListener<?>[]) listeners;
|
(GenericProgressiveFutureListener<?>[]) listeners;
|
||||||
execute(executor, new OneTimeTask() {
|
safeExecute(executor, new OneTimeTask() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
notifyProgressiveListeners0(self, array, progress, total);
|
notifyProgressiveListeners0(self, array, progress, total);
|
||||||
@ -764,7 +764,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
|
|||||||
} else {
|
} else {
|
||||||
final GenericProgressiveFutureListener<ProgressiveFuture<V>> l =
|
final GenericProgressiveFutureListener<ProgressiveFuture<V>> l =
|
||||||
(GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners;
|
(GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners;
|
||||||
execute(executor, new OneTimeTask() {
|
safeExecute(executor, new OneTimeTask() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
notifyProgressiveListener0(self, l, progress, total);
|
notifyProgressiveListener0(self, l, progress, total);
|
||||||
@ -856,7 +856,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
|
|||||||
} else {
|
} else {
|
||||||
// Reschedule until the initial notification is done to avoid the race condition
|
// Reschedule until the initial notification is done to avoid the race condition
|
||||||
// where the notification is made in an incorrect order.
|
// where the notification is made in an incorrect order.
|
||||||
execute(executor, this);
|
safeExecute(executor, this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user