Do now swallow an exception triggered by late listener notification

Related: #3449

Motivation:

When a user shut down an EventExecutor/Loop prematurely, a Promise will
fail to execute its listeners. When it happens, DefaultPromise will log
a message at ERROR level, but there's no way to get notified about it
programmatically.

Modifications:

Do not catch and log the RejectedExecutionException unconditionally,
but only catch and log for non-late listener notifications, so that a
user gets notified on submission failure at least when the listener is
late.

Result:

Remedies #3449 to some extent, although we will need fundamental fix for
that, such as #3566
This commit is contained in:
Trustin Lee 2016-04-06 13:08:13 +09:00 committed by Norman Maurer
parent 2e68e37025
commit c453c3ce0d

View File

@ -577,7 +577,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
if (listeners instanceof DefaultFutureListeners) { if (listeners instanceof DefaultFutureListeners) {
final DefaultFutureListeners dfl = (DefaultFutureListeners) listeners; final DefaultFutureListeners dfl = (DefaultFutureListeners) listeners;
execute(executor, new OneTimeTask() { safeExecute(executor, new OneTimeTask() {
@Override @Override
public void run() { public void run() {
notifyListeners0(DefaultPromise.this, dfl); notifyListeners0(DefaultPromise.this, dfl);
@ -587,7 +587,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
} else { } else {
final GenericFutureListener<? extends Future<V>> l = final GenericFutureListener<? extends Future<V>> l =
(GenericFutureListener<? extends Future<V>>) listeners; (GenericFutureListener<? extends Future<V>>) listeners;
execute(executor, new OneTimeTask() { safeExecute(executor, new OneTimeTask() {
@Override @Override
public void run() { public void run() {
notifyListener0(DefaultPromise.this, l); notifyListener0(DefaultPromise.this, l);
@ -633,7 +633,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
this.lateListeners = lateListeners = new LateListeners(); this.lateListeners = lateListeners = new LateListeners();
} }
lateListeners.add(l); lateListeners.add(l);
execute(executor, lateListeners); executor.execute(lateListeners);
return; return;
} }
} }
@ -641,7 +641,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
// Add the late listener to lateListeners in the executor thread for thread safety. // Add the late listener to lateListeners in the executor thread for thread safety.
// We could just make LateListeners extend ConcurrentLinkedQueue, but it's an overkill considering // We could just make LateListeners extend ConcurrentLinkedQueue, but it's an overkill considering
// that most asynchronous applications won't execute this code path. // that most asynchronous applications won't execute this code path.
execute(executor, new LateListenerNotifier(l)); executor.execute(new LateListenerNotifier(l));
} }
protected static void notifyListener( protected static void notifyListener(
@ -661,7 +661,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
} }
} }
execute(eventExecutor, new OneTimeTask() { safeExecute(eventExecutor, new OneTimeTask() {
@Override @Override
public void run() { public void run() {
notifyListener0(future, l); notifyListener0(future, l);
@ -669,7 +669,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
}); });
} }
private static void execute(EventExecutor executor, Runnable task) { private static void safeExecute(EventExecutor executor, Runnable task) {
try { try {
executor.execute(task); executor.execute(task);
} catch (Throwable t) { } catch (Throwable t) {
@ -755,7 +755,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
if (listeners instanceof GenericProgressiveFutureListener[]) { if (listeners instanceof GenericProgressiveFutureListener[]) {
final GenericProgressiveFutureListener<?>[] array = final GenericProgressiveFutureListener<?>[] array =
(GenericProgressiveFutureListener<?>[]) listeners; (GenericProgressiveFutureListener<?>[]) listeners;
execute(executor, new OneTimeTask() { safeExecute(executor, new OneTimeTask() {
@Override @Override
public void run() { public void run() {
notifyProgressiveListeners0(self, array, progress, total); notifyProgressiveListeners0(self, array, progress, total);
@ -764,7 +764,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
} else { } else {
final GenericProgressiveFutureListener<ProgressiveFuture<V>> l = final GenericProgressiveFutureListener<ProgressiveFuture<V>> l =
(GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners; (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners;
execute(executor, new OneTimeTask() { safeExecute(executor, new OneTimeTask() {
@Override @Override
public void run() { public void run() {
notifyProgressiveListener0(self, l, progress, total); notifyProgressiveListener0(self, l, progress, total);
@ -856,7 +856,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
} else { } else {
// Reschedule until the initial notification is done to avoid the race condition // Reschedule until the initial notification is done to avoid the race condition
// where the notification is made in an incorrect order. // where the notification is made in an incorrect order.
execute(executor, this); safeExecute(executor, this);
} }
} }
} }