Add PromiseNotifier static method which takes care of cancel propagation (#11494)

Motivation:

At the moment we not correctly propagate cancellation in some case when we use the PromiseNotifier.

Modifications:

- Add PromiseNotifier static method which takes care of cancellation
- Add unit test
- Deprecate ChannelPromiseNotifier

Result:

Correctly propagate cancellation of operation

Co-authored-by: Nitesh Kant <nitesh_kant@apple.com>
This commit is contained in:
Norman Maurer 2021-07-21 13:37:32 +02:00
parent 9398e6481d
commit 23d8fde855
9 changed files with 82 additions and 17 deletions

View File

@ -18,9 +18,9 @@ package io.netty.handler.codec.http.websocketx;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelPromiseNotifier;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.PromiseNotifier;
import io.netty.util.concurrent.ScheduledFuture;
import java.nio.channels.ClosedChannelException;
@ -102,7 +102,7 @@ abstract class WebSocketProtocolHandler extends MessageToMessageDecoder<WebSocke
promise.setFailure(new ClosedChannelException());
} else if (msg instanceof CloseWebSocketFrame) {
closeSent(promise);
ctx.write(msg).addListener(new ChannelPromiseNotifier(false, closeSent));
ctx.write(msg).addListener(new PromiseNotifier<>(false, closeSent));
} else {
ctx.write(msg, promise);
}

View File

@ -21,9 +21,9 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelPromiseNotifier;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.PromiseNotifier;
import java.util.concurrent.TimeUnit;
@ -186,7 +186,7 @@ public class Bzip2Encoder extends MessageToByteEncoder<ByteBuf> {
} else {
executor.execute(() -> {
ChannelFuture f = finishEncode(ctx(), promise);
f.addListener(new ChannelPromiseNotifier(promise));
PromiseNotifier.cascade(f, promise);
});
return promise;
}

View File

@ -22,8 +22,8 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelPromiseNotifier;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.PromiseNotifier;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
@ -170,7 +170,7 @@ public class JdkZlibEncoder extends ZlibEncoder {
final ChannelPromise p = ctx.newPromise();
executor.execute(() -> {
ChannelFuture f = finishEncode(ctx(), p);
f.addListener(new ChannelPromiseNotifier(promise));
PromiseNotifier.cascade(f, promise);
});
return p;
}

View File

@ -23,10 +23,10 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelPromiseNotifier;
import io.netty.handler.codec.EncoderException;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.PromiseNotifier;
import io.netty.util.internal.ObjectUtil;
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Exception;
@ -359,7 +359,7 @@ public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
} else {
executor.execute(() -> {
ChannelFuture f = finishEncode(ctx(), promise);
f.addListener(new ChannelPromiseNotifier(promise));
PromiseNotifier.cascade(f, promise);
});
return promise;
}

View File

@ -61,6 +61,57 @@ public class PromiseNotifier<V, F extends Future<V>> implements GenericFutureLis
this.logNotifyFailure = logNotifyFailure;
}
/**
* Link the {@link Future} and {@link Promise} such that if the {@link Future} completes the {@link Promise}
* will be notified. Cancellation is propagated both ways such that if the {@link Future} is cancelled
* the {@link Promise} is cancelled and vise-versa.
*
* @param future the {@link Future} which will be used to listen to for notifying the {@link Promise}.
* @param promise the {@link Promise} which will be notified
* @param <V> the type of the value.
* @param <F> the type of the {@link Future}
* @return the passed in {@link Future}
*/
public static <V, F extends Future<V>> F cascade(final F future, final Promise<? super V> promise) {
return cascade(true, future, promise);
}
/**
* Link the {@link Future} and {@link Promise} such that if the {@link Future} completes the {@link Promise}
* will be notified. Cancellation is propagated both ways such that if the {@link Future} is cancelled
* the {@link Promise} is cancelled and vise-versa.
*
* @param logNotifyFailure {@code true} if logging should be done in case notification fails.
* @param future the {@link Future} which will be used to listen to for notifying the {@link Promise}.
* @param promise the {@link Promise} which will be notified
* @param <V> the type of the value.
* @param <F> the type of the {@link Future}
* @return the passed in {@link Future}
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public static <V, F extends Future<V>> F cascade(boolean logNotifyFailure, final F future,
final Promise<? super V> promise) {
promise.addListener(new FutureListener() {
@Override
public void operationComplete(Future f) {
if (f.isCancelled()) {
future.cancel(false);
}
}
});
future.addListener(new PromiseNotifier(logNotifyFailure, promise) {
@Override
public void operationComplete(Future f) throws Exception {
if (promise.isCancelled() && f.isCancelled()) {
// Just return if we propagate a cancel from the promise to the future and both are notified already
return;
}
super.operationComplete(future);
}
});
return future;
}
@Override
public void operationComplete(F future) throws Exception {
InternalLogger internalLogger = logNotifyFailure ? logger : null;

View File

@ -18,7 +18,9 @@ package io.netty.util.concurrent;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.*;
public class PromiseNotifierTest {
@ -82,4 +84,16 @@ public class PromiseNotifierTest {
verify(p2).tryFailure(t);
}
@Test
public void testCancelPropagationWhenFusedFromFuture() {
Promise<Void> p1 = ImmediateEventExecutor.INSTANCE.newPromise();
Promise<Void> p2 = ImmediateEventExecutor.INSTANCE.newPromise();
Promise<Void> returned = PromiseNotifier.cascade(p1, p2);
assertSame(p1, returned);
assertTrue(returned.cancel(false));
assertTrue(returned.isCancelled());
assertTrue(p2.isCancelled());
}
}

View File

@ -33,7 +33,6 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelPromiseNotifier;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.UnsupportedMessageTypeException;
@ -1931,8 +1930,7 @@ public class SslHandler extends ByteToMessageDecoder {
// because of a propagated Exception.
//
// See https://github.com/netty/netty/issues/5931
safeClose(ctx, closeNotifyPromise, ctx.newPromise().addListener(
new ChannelPromiseNotifier(false, promise)));
safeClose(ctx, closeNotifyPromise, PromiseNotifier.cascade(false, ctx.newPromise(), promise));
} else {
/// We already handling the close_notify so just attach the promise to the sslClosePromise.
sslClosePromise.addListener((FutureListener<Channel>) future -> promise.setSuccess());
@ -2025,7 +2023,7 @@ public class SslHandler extends ByteToMessageDecoder {
if (!oldHandshakePromise.isDone()) {
// There's no need to handshake because handshake is in progress already.
// Merge the new promise into the old one.
oldHandshakePromise.addListener(new PromiseNotifier<>(newHandshakePromise));
PromiseNotifier.cascade(oldHandshakePromise, newHandshakePromise);
} else {
handshakePromise = newHandshakePromise;
handshake(true);
@ -2195,7 +2193,7 @@ public class SslHandler extends ByteToMessageDecoder {
// IllegalStateException.
// Also we not want to log if the notification happens as this is expected in some cases.
// See https://github.com/netty/netty/issues/5598
future.addListener(new ChannelPromiseNotifier(false, promise));
PromiseNotifier.cascade(false, future, promise);
}
/**

View File

@ -432,8 +432,8 @@ public class ParameterizedSslHandlerTest {
protected void initChannel(Channel ch) throws Exception {
SslHandler handler = sslServerCtx.newHandler(ch.alloc());
handler.setCloseNotifyReadTimeoutMillis(closeNotifyReadTimeout);
handler.sslCloseFuture().addListener(
new PromiseNotifier<>(serverPromise));
PromiseNotifier.cascade(handler.sslCloseFuture(), serverPromise);
handler.handshakeFuture().addListener((FutureListener<Channel>) future -> {
if (!future.isSuccess()) {
// Something bad happened during handshake fail the promise!
@ -468,8 +468,7 @@ public class ParameterizedSslHandlerTest {
SslHandler handler = sslClientCtx.newHandler(ch.alloc());
handler.setCloseNotifyReadTimeoutMillis(closeNotifyReadTimeout);
handler.sslCloseFuture().addListener(
new PromiseNotifier<>(clientPromise));
PromiseNotifier.cascade(handler.sslCloseFuture(), clientPromise);
handler.handshakeFuture().addListener((FutureListener<Channel>) future -> {
if (future.isSuccess()) {
closeSent.compareAndSet(false, true);

View File

@ -19,7 +19,10 @@ import io.netty.util.concurrent.PromiseNotifier;
/**
* ChannelFutureListener implementation which takes other {@link ChannelPromise}(s) and notifies them on completion.
*
* @deprecated use {@link PromiseNotifier}.
*/
@Deprecated
public final class ChannelPromiseNotifier
extends PromiseNotifier<Void, ChannelFuture>
implements ChannelFutureListener {