[#5598] Ensure SslHandler not log false-positives when try to close the channel due timeout.

Motivation:

When we try to close the Channel due a timeout we need to ensure we not log if the notification of the promise fails as it may be completed in the meantime.

Modifications:

Add another constructor to ChannelPromiseNotifier and PromiseNotifier which allows to log on notification failure.

Result:

No more miss-leading logs.
This commit is contained in:
Norman Maurer 2016-07-29 14:54:50 +02:00
parent 4a5463f49a
commit a6e1f6290c
3 changed files with 40 additions and 16 deletions

View File

@ -21,7 +21,7 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
import static io.netty.util.internal.ObjectUtil.checkNotNull; import static io.netty.util.internal.ObjectUtil.checkNotNull;
/** /**
* {@link GenericFutureListener} implementation which takes other {@link Future}s * {@link GenericFutureListener} implementation which takes other {@link Promise}s
* and notifies them on completion. * and notifies them on completion.
* *
* @param <V> the type of value returned by the future * @param <V> the type of value returned by the future
@ -31,6 +31,7 @@ public class PromiseNotifier<V, F extends Future<V>> implements GenericFutureLis
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PromiseNotifier.class); private static final InternalLogger logger = InternalLoggerFactory.getInstance(PromiseNotifier.class);
private final Promise<? super V>[] promises; private final Promise<? super V>[] promises;
private final boolean logNotifyFailure;
/** /**
* Create a new instance. * Create a new instance.
@ -39,6 +40,17 @@ public class PromiseNotifier<V, F extends Future<V>> implements GenericFutureLis
*/ */
@SafeVarargs @SafeVarargs
public PromiseNotifier(Promise<? super V>... promises) { public PromiseNotifier(Promise<? super V>... promises) {
this(true, promises);
}
/**
* Create a new instance.
*
* @param logNotifyFailure {@code true} if logging should be done in case notification fails.
* @param promises the {@link Promise}s to notify once this {@link GenericFutureListener} is notified.
*/
@SafeVarargs
public PromiseNotifier(boolean logNotifyFailure, Promise<? super V>... promises) {
checkNotNull(promises, "promises"); checkNotNull(promises, "promises");
for (Promise<? super V> promise: promises) { for (Promise<? super V> promise: promises) {
if (promise == null) { if (promise == null) {
@ -46,6 +58,7 @@ public class PromiseNotifier<V, F extends Future<V>> implements GenericFutureLis
} }
} }
this.promises = promises.clone(); this.promises = promises.clone();
this.logNotifyFailure = logNotifyFailure;
} }
@Override @Override
@ -53,20 +66,20 @@ public class PromiseNotifier<V, F extends Future<V>> implements GenericFutureLis
if (future.isSuccess()) { if (future.isSuccess()) {
V result = future.get(); V result = future.get();
for (Promise<? super V> p: promises) { for (Promise<? super V> p: promises) {
if (!p.trySuccess(result)) { if (!p.trySuccess(result) && logNotifyFailure) {
logger.warn("Failed to mark a promise as success because it is done already: {}", p); logger.warn("Failed to mark a promise as success because it is done already: {}", p);
} }
} }
} else if (future.isCancelled()) { } else if (future.isCancelled()) {
for (Promise<? super V> p: promises) { for (Promise<? super V> p: promises) {
if (!p.cancel(false)) { if (!p.cancel(false) && logNotifyFailure) {
logger.warn("Failed to cancel a promise because it is done already: {}", p); logger.warn("Failed to cancel a promise because it is done already: {}", p);
} }
} }
} else { } else {
Throwable cause = future.cause(); Throwable cause = future.cause();
for (Promise<? super V> p: promises) { for (Promise<? super V> p: promises) {
if (!p.tryFailure(cause)) { if (!p.tryFailure(cause) && logNotifyFailure) {
logger.warn("Failed to mark a promise as failure because it's done already: {}", p, cause); logger.warn("Failed to mark a promise as failure because it's done already: {}", p, cause);
} }
} }

View File

@ -1414,11 +1414,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
public void run() { public void run() {
logger.warn("{} Last write attempt timed out; force-closing the connection.", ctx.channel()); logger.warn("{} Last write attempt timed out; force-closing the connection.", ctx.channel());
// We notify the promise in the TryNotifyListener as there is a "race" where the close(...) call addCloseListener(ctx.close(ctx.newPromise()), promise);
// by the timeoutFuture and the close call in the flushFuture listener will be called. Because of
// this we need to use trySuccess() and tryFailure(...) as otherwise we can cause an
// IllegalStateException.
ctx.close(ctx.newPromise()).addListener(new ChannelPromiseNotifier(promise));
} }
}, closeNotifyTimeoutMillis, TimeUnit.MILLISECONDS); }, closeNotifyTimeoutMillis, TimeUnit.MILLISECONDS);
} else { } else {
@ -1435,16 +1431,21 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
} }
// Trigger the close in all cases to make sure the promise is notified // Trigger the close in all cases to make sure the promise is notified
// See https://github.com/netty/netty/issues/2358 // See https://github.com/netty/netty/issues/2358
// addCloseListener(ctx.close(ctx.newPromise()), promise);
// We notify the promise in the ChannelPromiseNotifier as there is a "race" where the close(...) call
// by the timeoutFuture and the close call in the flushFuture listener will be called. Because of
// this we need to use trySuccess() and tryFailure(...) as otherwise we can cause an
// IllegalStateException.
ctx.close(ctx.newPromise()).addListener(new ChannelPromiseNotifier(promise));
} }
}); });
} }
private static void addCloseListener(ChannelFuture future, ChannelPromise promise) {
// We notify the promise in the ChannelPromiseNotifier as there is a "race" where the close(...) call
// by the timeoutFuture and the close call in the flushFuture listener will be called. Because of
// this we need to use trySuccess() and tryFailure(...) as otherwise we can cause an
// IllegalStateException.
// Also we not want to log if the notification happens as this is expected in some cases.
// See https://github.com/netty/netty/issues/5598
future.addListener(new ChannelPromiseNotifier(false, promise));
}
/** /**
* Always prefer a direct buffer when it's pooled, so that we reduce the number of memory copies * Always prefer a direct buffer when it's pooled, so that we reduce the number of memory copies
* in {@link OpenSslEngine}. * in {@link OpenSslEngine}.

View File

@ -18,7 +18,7 @@ package io.netty.channel;
import io.netty.util.concurrent.PromiseNotifier; import io.netty.util.concurrent.PromiseNotifier;
/** /**
* ChannelFutureListener implementation which takes other {@link ChannelFuture}(s) and notifies them on completion. * ChannelFutureListener implementation which takes other {@link ChannelPromise}(s) and notifies them on completion.
*/ */
public final class ChannelPromiseNotifier public final class ChannelPromiseNotifier
extends PromiseNotifier<Void, ChannelFuture> extends PromiseNotifier<Void, ChannelFuture>
@ -32,4 +32,14 @@ public final class ChannelPromiseNotifier
public ChannelPromiseNotifier(ChannelPromise... promises) { public ChannelPromiseNotifier(ChannelPromise... promises) {
super(promises); super(promises);
} }
/**
* Create a new instance
*
* @param logNotifyFailure {@code true} if logging should be done in case notification fails.
* @param promises the {@link ChannelPromise}s to notify once this {@link ChannelFutureListener} is notified.
*/
public ChannelPromiseNotifier(boolean logNotifyFailure, ChannelPromise... promises) {
super(logNotifyFailure, promises);
}
} }