diff --git a/common/src/main/java/io/netty/util/concurrent/PromiseNotifier.java b/common/src/main/java/io/netty/util/concurrent/PromiseNotifier.java index db1604fce9..5b0112cdae 100644 --- a/common/src/main/java/io/netty/util/concurrent/PromiseNotifier.java +++ b/common/src/main/java/io/netty/util/concurrent/PromiseNotifier.java @@ -21,7 +21,7 @@ import io.netty.util.internal.logging.InternalLoggerFactory; import static io.netty.util.internal.ObjectUtil.checkNotNull; /** - * {@link GenericFutureListener} implementation which takes other {@link Future}s + * {@link GenericFutureListener} implementation which takes other {@link Promise}s * and notifies them on completion. * * @param the type of value returned by the future @@ -31,6 +31,7 @@ public class PromiseNotifier> implements GenericFutureLis private static final InternalLogger logger = InternalLoggerFactory.getInstance(PromiseNotifier.class); private final Promise[] promises; + private final boolean logNotifyFailure; /** * Create a new instance. @@ -39,6 +40,17 @@ public class PromiseNotifier> implements GenericFutureLis */ @SafeVarargs public PromiseNotifier(Promise... promises) { + this(true, promises); + } + + /** + * Create a new instance. + * + * @param logNotifyFailure {@code true} if logging should be done in case notification fails. + * @param promises the {@link Promise}s to notify once this {@link GenericFutureListener} is notified. + */ + @SafeVarargs + public PromiseNotifier(boolean logNotifyFailure, Promise... promises) { checkNotNull(promises, "promises"); for (Promise promise: promises) { if (promise == null) { @@ -46,6 +58,7 @@ public class PromiseNotifier> implements GenericFutureLis } } this.promises = promises.clone(); + this.logNotifyFailure = logNotifyFailure; } @Override @@ -53,20 +66,20 @@ public class PromiseNotifier> implements GenericFutureLis if (future.isSuccess()) { V result = future.get(); for (Promise p: promises) { - if (!p.trySuccess(result)) { + if (!p.trySuccess(result) && logNotifyFailure) { logger.warn("Failed to mark a promise as success because it is done already: {}", p); } } } else if (future.isCancelled()) { for (Promise p: promises) { - if (!p.cancel(false)) { + if (!p.cancel(false) && logNotifyFailure) { logger.warn("Failed to cancel a promise because it is done already: {}", p); } } } else { Throwable cause = future.cause(); for (Promise p: promises) { - if (!p.tryFailure(cause)) { + if (!p.tryFailure(cause) && logNotifyFailure) { logger.warn("Failed to mark a promise as failure because it's done already: {}", p, cause); } } diff --git a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java index c2db7a6c99..5eb4cd8bd3 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java +++ b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java @@ -1414,11 +1414,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH public void run() { logger.warn("{} Last write attempt timed out; force-closing the connection.", ctx.channel()); - // We notify the promise in the TryNotifyListener as there is a "race" where the close(...) call - // by the timeoutFuture and the close call in the flushFuture listener will be called. Because of - // this we need to use trySuccess() and tryFailure(...) as otherwise we can cause an - // IllegalStateException. - ctx.close(ctx.newPromise()).addListener(new ChannelPromiseNotifier(promise)); + addCloseListener(ctx.close(ctx.newPromise()), promise); } }, closeNotifyTimeoutMillis, TimeUnit.MILLISECONDS); } else { @@ -1435,16 +1431,21 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH } // Trigger the close in all cases to make sure the promise is notified // See https://github.com/netty/netty/issues/2358 - // - // We notify the promise in the ChannelPromiseNotifier as there is a "race" where the close(...) call - // by the timeoutFuture and the close call in the flushFuture listener will be called. Because of - // this we need to use trySuccess() and tryFailure(...) as otherwise we can cause an - // IllegalStateException. - ctx.close(ctx.newPromise()).addListener(new ChannelPromiseNotifier(promise)); + addCloseListener(ctx.close(ctx.newPromise()), promise); } }); } + private static void addCloseListener(ChannelFuture future, ChannelPromise promise) { + // We notify the promise in the ChannelPromiseNotifier as there is a "race" where the close(...) call + // by the timeoutFuture and the close call in the flushFuture listener will be called. Because of + // this we need to use trySuccess() and tryFailure(...) as otherwise we can cause an + // IllegalStateException. + // Also we not want to log if the notification happens as this is expected in some cases. + // See https://github.com/netty/netty/issues/5598 + future.addListener(new ChannelPromiseNotifier(false, promise)); + } + /** * Always prefer a direct buffer when it's pooled, so that we reduce the number of memory copies * in {@link OpenSslEngine}. diff --git a/transport/src/main/java/io/netty/channel/ChannelPromiseNotifier.java b/transport/src/main/java/io/netty/channel/ChannelPromiseNotifier.java index eb42101780..7e2b9c8ba8 100644 --- a/transport/src/main/java/io/netty/channel/ChannelPromiseNotifier.java +++ b/transport/src/main/java/io/netty/channel/ChannelPromiseNotifier.java @@ -18,7 +18,7 @@ package io.netty.channel; import io.netty.util.concurrent.PromiseNotifier; /** - * ChannelFutureListener implementation which takes other {@link ChannelFuture}(s) and notifies them on completion. + * ChannelFutureListener implementation which takes other {@link ChannelPromise}(s) and notifies them on completion. */ public final class ChannelPromiseNotifier extends PromiseNotifier @@ -32,4 +32,14 @@ public final class ChannelPromiseNotifier public ChannelPromiseNotifier(ChannelPromise... promises) { super(promises); } + + /** + * Create a new instance + * + * @param logNotifyFailure {@code true} if logging should be done in case notification fails. + * @param promises the {@link ChannelPromise}s to notify once this {@link ChannelFutureListener} is notified. + */ + public ChannelPromiseNotifier(boolean logNotifyFailure, ChannelPromise... promises) { + super(logNotifyFailure, promises); + } }