Add PromiseNotifier static method which takes care of cancel propagation (#11494)

Motivation:

At the moment we not correctly propagate cancellation in some case when we use the PromiseNotifier.

Modifications:

- Add PromiseNotifier static method which takes care of cancellation
- Add unit test
- Deprecate ChannelPromiseNotifier

Result:

Correctly propagate cancellation of operation

Co-authored-by: Nitesh Kant <nitesh_kant@apple.com>
This commit is contained in:
Norman Maurer 2021-07-21 13:37:32 +02:00 committed by GitHub
parent 3a41a97b0e
commit 26cdfe2cde
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 83 additions and 19 deletions

View File

@ -21,9 +21,9 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelPromiseNotifier;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.PromiseNotifier;
import io.netty.util.concurrent.ScheduledFuture;
import java.net.SocketAddress;
@ -113,7 +113,7 @@ abstract class WebSocketProtocolHandler extends MessageToMessageDecoder<WebSocke
promise.setFailure(new ClosedChannelException());
} else if (msg instanceof CloseWebSocketFrame) {
closeSent(promise.unvoid());
ctx.write(msg).addListener(new ChannelPromiseNotifier(false, closeSent));
ctx.write(msg).addListener(new PromiseNotifier<Void, ChannelFuture>(false, closeSent));
} else {
ctx.write(msg, promise);
}

View File

@ -21,9 +21,9 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelPromiseNotifier;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.PromiseNotifier;
import java.util.concurrent.TimeUnit;
@ -188,7 +188,7 @@ public class Bzip2Encoder extends MessageToByteEncoder<ByteBuf> {
@Override
public void run() {
ChannelFuture f = finishEncode(ctx(), promise);
f.addListener(new ChannelPromiseNotifier(promise));
PromiseNotifier.cascade(f, promise);
}
});
return promise;

View File

@ -23,8 +23,8 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelPromiseNotifier;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.PromiseNotifier;
import io.netty.util.internal.EmptyArrays;
import io.netty.util.internal.ObjectUtil;
@ -254,7 +254,7 @@ public class JZlibEncoder extends ZlibEncoder {
@Override
public void run() {
ChannelFuture f = finishEncode(ctx(), p);
f.addListener(new ChannelPromiseNotifier(promise));
PromiseNotifier.cascade(f, promise);
}
});
return p;

View File

@ -20,8 +20,8 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelPromiseNotifier;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.PromiseNotifier;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SuppressJava6Requirement;
@ -166,7 +166,7 @@ public class JdkZlibEncoder extends ZlibEncoder {
@Override
public void run() {
ChannelFuture f = finishEncode(ctx(), p);
f.addListener(new ChannelPromiseNotifier(promise));
PromiseNotifier.cascade(f, promise);
}
});
return p;

View File

@ -23,10 +23,10 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelPromiseNotifier;
import io.netty.handler.codec.EncoderException;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.PromiseNotifier;
import io.netty.util.internal.ObjectUtil;
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Exception;
@ -359,7 +359,7 @@ public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
@Override
public void run() {
ChannelFuture f = finishEncode(ctx(), promise);
f.addListener(new ChannelPromiseNotifier(promise));
PromiseNotifier.cascade(f, promise);
}
});
return promise;

View File

@ -61,6 +61,57 @@ public class PromiseNotifier<V, F extends Future<V>> implements GenericFutureLis
this.logNotifyFailure = logNotifyFailure;
}
/**
* Link the {@link Future} and {@link Promise} such that if the {@link Future} completes the {@link Promise}
* will be notified. Cancellation is propagated both ways such that if the {@link Future} is cancelled
* the {@link Promise} is cancelled and vise-versa.
*
* @param future the {@link Future} which will be used to listen to for notifying the {@link Promise}.
* @param promise the {@link Promise} which will be notified
* @param <V> the type of the value.
* @param <F> the type of the {@link Future}
* @return the passed in {@link Future}
*/
public static <V, F extends Future<V>> F cascade(final F future, final Promise<? super V> promise) {
return cascade(true, future, promise);
}
/**
* Link the {@link Future} and {@link Promise} such that if the {@link Future} completes the {@link Promise}
* will be notified. Cancellation is propagated both ways such that if the {@link Future} is cancelled
* the {@link Promise} is cancelled and vise-versa.
*
* @param logNotifyFailure {@code true} if logging should be done in case notification fails.
* @param future the {@link Future} which will be used to listen to for notifying the {@link Promise}.
* @param promise the {@link Promise} which will be notified
* @param <V> the type of the value.
* @param <F> the type of the {@link Future}
* @return the passed in {@link Future}
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public static <V, F extends Future<V>> F cascade(boolean logNotifyFailure, final F future,
final Promise<? super V> promise) {
promise.addListener(new FutureListener() {
@Override
public void operationComplete(Future f) {
if (f.isCancelled()) {
future.cancel(false);
}
}
});
future.addListener(new PromiseNotifier(logNotifyFailure, promise) {
@Override
public void operationComplete(Future f) throws Exception {
if (promise.isCancelled() && f.isCancelled()) {
// Just return if we propagate a cancel from the promise to the future and both are notified already
return;
}
super.operationComplete(future);
}
});
return future;
}
@Override
public void operationComplete(F future) throws Exception {
InternalLogger internalLogger = logNotifyFailure ? logger : null;

View File

@ -19,7 +19,9 @@ package io.netty.util.concurrent;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.*;
public class PromiseNotifierTest {
@ -93,4 +95,16 @@ public class PromiseNotifierTest {
verify(p2).tryFailure(t);
}
@Test
public void testCancelPropagationWhenFusedFromFuture() {
Promise<Void> p1 = ImmediateEventExecutor.INSTANCE.newPromise();
Promise<Void> p2 = ImmediateEventExecutor.INSTANCE.newPromise();
Promise<Void> returned = PromiseNotifier.cascade(p1, p2);
assertSame(p1, returned);
assertTrue(returned.cancel(false));
assertTrue(returned.isCancelled());
assertTrue(p2.isCancelled());
}
}

View File

@ -33,7 +33,6 @@ import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelPromiseNotifier;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.UnsupportedMessageTypeException;
@ -1949,8 +1948,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
// because of a propagated Exception.
//
// See https://github.com/netty/netty/issues/5931
safeClose(ctx, closeNotifyPromise, ctx.newPromise().addListener(
new ChannelPromiseNotifier(false, promise)));
safeClose(ctx, closeNotifyPromise, PromiseNotifier.cascade(false, ctx.newPromise(), promise));
} else {
/// We already handling the close_notify so just attach the promise to the sslClosePromise.
sslClosePromise.addListener(new FutureListener<Channel>() {
@ -2053,7 +2051,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
if (!oldHandshakePromise.isDone()) {
// There's no need to handshake because handshake is in progress already.
// Merge the new promise into the old one.
oldHandshakePromise.addListener(new PromiseNotifier<Channel, Future<Channel>>(newHandshakePromise));
PromiseNotifier.cascade(oldHandshakePromise, newHandshakePromise);
} else {
handshakePromise = newHandshakePromise;
handshake(true);
@ -2234,7 +2232,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
// IllegalStateException.
// Also we not want to log if the notification happens as this is expected in some cases.
// See https://github.com/netty/netty/issues/5598
future.addListener(new ChannelPromiseNotifier(false, promise));
PromiseNotifier.cascade(false, future, promise);
}
/**

View File

@ -436,8 +436,7 @@ public class ParameterizedSslHandlerTest {
protected void initChannel(Channel ch) throws Exception {
SslHandler handler = sslServerCtx.newHandler(ch.alloc());
handler.setCloseNotifyReadTimeoutMillis(closeNotifyReadTimeout);
handler.sslCloseFuture().addListener(
new PromiseNotifier<Channel, Future<Channel>>(serverPromise));
PromiseNotifier.cascade(handler.sslCloseFuture(), serverPromise);
handler.handshakeFuture().addListener(new FutureListener<Channel>() {
@Override
public void operationComplete(Future<Channel> future) {
@ -475,8 +474,7 @@ public class ParameterizedSslHandlerTest {
SslHandler handler = sslClientCtx.newHandler(ch.alloc());
handler.setCloseNotifyReadTimeoutMillis(closeNotifyReadTimeout);
handler.sslCloseFuture().addListener(
new PromiseNotifier<Channel, Future<Channel>>(clientPromise));
PromiseNotifier.cascade(handler.sslCloseFuture(), clientPromise);
handler.handshakeFuture().addListener(new FutureListener<Channel>() {
@Override
public void operationComplete(Future<Channel> future) {

View File

@ -19,7 +19,10 @@ import io.netty.util.concurrent.PromiseNotifier;
/**
* ChannelFutureListener implementation which takes other {@link ChannelPromise}(s) and notifies them on completion.
*
* @deprecated use {@link PromiseNotifier}.
*/
@Deprecated
public final class ChannelPromiseNotifier
extends PromiseNotifier<Void, ChannelFuture>
implements ChannelFutureListener {