[#5598] Ensure SslHandler not log false-positives when try to close the channel due timeout.
Motivation: When we try to close the Channel due a timeout we need to ensure we not log if the notification of the promise fails as it may be completed in the meantime. Modifications: Add another constructor to ChannelPromiseNotifier and PromiseNotifier which allows to log on notification failure. Result: No more miss-leading logs.
This commit is contained in:
parent
82b617dfe9
commit
f585806a74
@ -21,7 +21,7 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
|
|||||||
import static io.netty.util.internal.ObjectUtil.checkNotNull;
|
import static io.netty.util.internal.ObjectUtil.checkNotNull;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@link GenericFutureListener} implementation which takes other {@link Future}s
|
* {@link GenericFutureListener} implementation which takes other {@link Promise}s
|
||||||
* and notifies them on completion.
|
* and notifies them on completion.
|
||||||
*
|
*
|
||||||
* @param <V> the type of value returned by the future
|
* @param <V> the type of value returned by the future
|
||||||
@ -31,6 +31,7 @@ public class PromiseNotifier<V, F extends Future<V>> implements GenericFutureLis
|
|||||||
|
|
||||||
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PromiseNotifier.class);
|
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PromiseNotifier.class);
|
||||||
private final Promise<? super V>[] promises;
|
private final Promise<? super V>[] promises;
|
||||||
|
private final boolean logNotifyFailure;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new instance.
|
* Create a new instance.
|
||||||
@ -39,6 +40,17 @@ public class PromiseNotifier<V, F extends Future<V>> implements GenericFutureLis
|
|||||||
*/
|
*/
|
||||||
@SafeVarargs
|
@SafeVarargs
|
||||||
public PromiseNotifier(Promise<? super V>... promises) {
|
public PromiseNotifier(Promise<? super V>... promises) {
|
||||||
|
this(true, promises);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new instance.
|
||||||
|
*
|
||||||
|
* @param logNotifyFailure {@code true} if logging should be done in case notification fails.
|
||||||
|
* @param promises the {@link Promise}s to notify once this {@link GenericFutureListener} is notified.
|
||||||
|
*/
|
||||||
|
@SafeVarargs
|
||||||
|
public PromiseNotifier(boolean logNotifyFailure, Promise<? super V>... promises) {
|
||||||
checkNotNull(promises, "promises");
|
checkNotNull(promises, "promises");
|
||||||
for (Promise<? super V> promise: promises) {
|
for (Promise<? super V> promise: promises) {
|
||||||
if (promise == null) {
|
if (promise == null) {
|
||||||
@ -46,6 +58,7 @@ public class PromiseNotifier<V, F extends Future<V>> implements GenericFutureLis
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.promises = promises.clone();
|
this.promises = promises.clone();
|
||||||
|
this.logNotifyFailure = logNotifyFailure;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -53,20 +66,20 @@ public class PromiseNotifier<V, F extends Future<V>> implements GenericFutureLis
|
|||||||
if (future.isSuccess()) {
|
if (future.isSuccess()) {
|
||||||
V result = future.get();
|
V result = future.get();
|
||||||
for (Promise<? super V> p: promises) {
|
for (Promise<? super V> p: promises) {
|
||||||
if (!p.trySuccess(result)) {
|
if (!p.trySuccess(result) && logNotifyFailure) {
|
||||||
logger.warn("Failed to mark a promise as success because it is done already: {}", p);
|
logger.warn("Failed to mark a promise as success because it is done already: {}", p);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (future.isCancelled()) {
|
} else if (future.isCancelled()) {
|
||||||
for (Promise<? super V> p: promises) {
|
for (Promise<? super V> p: promises) {
|
||||||
if (!p.cancel(false)) {
|
if (!p.cancel(false) && logNotifyFailure) {
|
||||||
logger.warn("Failed to cancel a promise because it is done already: {}", p);
|
logger.warn("Failed to cancel a promise because it is done already: {}", p);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
Throwable cause = future.cause();
|
Throwable cause = future.cause();
|
||||||
for (Promise<? super V> p: promises) {
|
for (Promise<? super V> p: promises) {
|
||||||
if (!p.tryFailure(cause)) {
|
if (!p.tryFailure(cause) && logNotifyFailure) {
|
||||||
logger.warn("Failed to mark a promise as failure because it's done already: {}", p, cause);
|
logger.warn("Failed to mark a promise as failure because it's done already: {}", p, cause);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1414,11 +1414,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
|||||||
public void run() {
|
public void run() {
|
||||||
logger.warn("{} Last write attempt timed out; force-closing the connection.", ctx.channel());
|
logger.warn("{} Last write attempt timed out; force-closing the connection.", ctx.channel());
|
||||||
|
|
||||||
// We notify the promise in the TryNotifyListener as there is a "race" where the close(...) call
|
addCloseListener(ctx.close(ctx.newPromise()), promise);
|
||||||
// by the timeoutFuture and the close call in the flushFuture listener will be called. Because of
|
|
||||||
// this we need to use trySuccess() and tryFailure(...) as otherwise we can cause an
|
|
||||||
// IllegalStateException.
|
|
||||||
ctx.close(ctx.newPromise()).addListener(new ChannelPromiseNotifier(promise));
|
|
||||||
}
|
}
|
||||||
}, closeNotifyTimeoutMillis, TimeUnit.MILLISECONDS);
|
}, closeNotifyTimeoutMillis, TimeUnit.MILLISECONDS);
|
||||||
} else {
|
} else {
|
||||||
@ -1435,16 +1431,21 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
|||||||
}
|
}
|
||||||
// Trigger the close in all cases to make sure the promise is notified
|
// Trigger the close in all cases to make sure the promise is notified
|
||||||
// See https://github.com/netty/netty/issues/2358
|
// See https://github.com/netty/netty/issues/2358
|
||||||
//
|
addCloseListener(ctx.close(ctx.newPromise()), promise);
|
||||||
// We notify the promise in the ChannelPromiseNotifier as there is a "race" where the close(...) call
|
|
||||||
// by the timeoutFuture and the close call in the flushFuture listener will be called. Because of
|
|
||||||
// this we need to use trySuccess() and tryFailure(...) as otherwise we can cause an
|
|
||||||
// IllegalStateException.
|
|
||||||
ctx.close(ctx.newPromise()).addListener(new ChannelPromiseNotifier(promise));
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void addCloseListener(ChannelFuture future, ChannelPromise promise) {
|
||||||
|
// We notify the promise in the ChannelPromiseNotifier as there is a "race" where the close(...) call
|
||||||
|
// by the timeoutFuture and the close call in the flushFuture listener will be called. Because of
|
||||||
|
// this we need to use trySuccess() and tryFailure(...) as otherwise we can cause an
|
||||||
|
// IllegalStateException.
|
||||||
|
// Also we not want to log if the notification happens as this is expected in some cases.
|
||||||
|
// See https://github.com/netty/netty/issues/5598
|
||||||
|
future.addListener(new ChannelPromiseNotifier(false, promise));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Always prefer a direct buffer when it's pooled, so that we reduce the number of memory copies
|
* Always prefer a direct buffer when it's pooled, so that we reduce the number of memory copies
|
||||||
* in {@link OpenSslEngine}.
|
* in {@link OpenSslEngine}.
|
||||||
|
@ -18,7 +18,7 @@ package io.netty.channel;
|
|||||||
import io.netty.util.concurrent.PromiseNotifier;
|
import io.netty.util.concurrent.PromiseNotifier;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ChannelFutureListener implementation which takes other {@link ChannelFuture}(s) and notifies them on completion.
|
* ChannelFutureListener implementation which takes other {@link ChannelPromise}(s) and notifies them on completion.
|
||||||
*/
|
*/
|
||||||
public final class ChannelPromiseNotifier
|
public final class ChannelPromiseNotifier
|
||||||
extends PromiseNotifier<Void, ChannelFuture>
|
extends PromiseNotifier<Void, ChannelFuture>
|
||||||
@ -32,4 +32,14 @@ public final class ChannelPromiseNotifier
|
|||||||
public ChannelPromiseNotifier(ChannelPromise... promises) {
|
public ChannelPromiseNotifier(ChannelPromise... promises) {
|
||||||
super(promises);
|
super(promises);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new instance
|
||||||
|
*
|
||||||
|
* @param logNotifyFailure {@code true} if logging should be done in case notification fails.
|
||||||
|
* @param promises the {@link ChannelPromise}s to notify once this {@link ChannelFutureListener} is notified.
|
||||||
|
*/
|
||||||
|
public ChannelPromiseNotifier(boolean logNotifyFailure, ChannelPromise... promises) {
|
||||||
|
super(logNotifyFailure, promises);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user