From 26cdfe2cdef8e6533260c12e87e33fe02f3cde2d Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 21 Jul 2021 13:37:32 +0200 Subject: [PATCH] Add PromiseNotifier static method which takes care of cancel propagation (#11494) Motivation: At the moment we not correctly propagate cancellation in some case when we use the PromiseNotifier. Modifications: - Add PromiseNotifier static method which takes care of cancellation - Add unit test - Deprecate ChannelPromiseNotifier Result: Correctly propagate cancellation of operation Co-authored-by: Nitesh Kant --- .../websocketx/WebSocketProtocolHandler.java | 4 +- .../codec/compression/Bzip2Encoder.java | 4 +- .../codec/compression/JZlibEncoder.java | 4 +- .../codec/compression/JdkZlibEncoder.java | 4 +- .../codec/compression/Lz4FrameEncoder.java | 4 +- .../util/concurrent/PromiseNotifier.java | 51 +++++++++++++++++++ .../util/concurrent/PromiseNotifierTest.java | 14 +++++ .../java/io/netty/handler/ssl/SslHandler.java | 8 ++- .../ssl/ParameterizedSslHandlerTest.java | 6 +-- .../netty/channel/ChannelPromiseNotifier.java | 3 ++ 10 files changed, 83 insertions(+), 19 deletions(-) diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java index 9f767a72cf..930edcc823 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java @@ -21,9 +21,9 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandler; import io.netty.channel.ChannelPromise; -import io.netty.channel.ChannelPromiseNotifier; import io.netty.handler.codec.MessageToMessageDecoder; import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.PromiseNotifier; import io.netty.util.concurrent.ScheduledFuture; import java.net.SocketAddress; @@ -113,7 +113,7 @@ abstract class WebSocketProtocolHandler extends MessageToMessageDecoder(false, closeSent)); } else { ctx.write(msg, promise); } diff --git a/codec/src/main/java/io/netty/handler/codec/compression/Bzip2Encoder.java b/codec/src/main/java/io/netty/handler/codec/compression/Bzip2Encoder.java index 3f9b62dcbb..d836ddda5e 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/Bzip2Encoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/Bzip2Encoder.java @@ -21,9 +21,9 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; -import io.netty.channel.ChannelPromiseNotifier; import io.netty.handler.codec.MessageToByteEncoder; import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.PromiseNotifier; import java.util.concurrent.TimeUnit; @@ -188,7 +188,7 @@ public class Bzip2Encoder extends MessageToByteEncoder { @Override public void run() { ChannelFuture f = finishEncode(ctx(), promise); - f.addListener(new ChannelPromiseNotifier(promise)); + PromiseNotifier.cascade(f, promise); } }); return promise; diff --git a/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java index 531bed44fd..79b1783f09 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java @@ -23,8 +23,8 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; -import io.netty.channel.ChannelPromiseNotifier; import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.PromiseNotifier; import io.netty.util.internal.EmptyArrays; import io.netty.util.internal.ObjectUtil; @@ -254,7 +254,7 @@ public class JZlibEncoder extends ZlibEncoder { @Override public void run() { ChannelFuture f = finishEncode(ctx(), p); - f.addListener(new ChannelPromiseNotifier(promise)); + PromiseNotifier.cascade(f, promise); } }); return p; diff --git a/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java index 939a073128..08292ef954 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java @@ -20,8 +20,8 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; -import io.netty.channel.ChannelPromiseNotifier; import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.PromiseNotifier; import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.SuppressJava6Requirement; @@ -166,7 +166,7 @@ public class JdkZlibEncoder extends ZlibEncoder { @Override public void run() { ChannelFuture f = finishEncode(ctx(), p); - f.addListener(new ChannelPromiseNotifier(promise)); + PromiseNotifier.cascade(f, promise); } }); return p; diff --git a/codec/src/main/java/io/netty/handler/codec/compression/Lz4FrameEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/Lz4FrameEncoder.java index d4ea1296a8..18c26d7161 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/Lz4FrameEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/Lz4FrameEncoder.java @@ -23,10 +23,10 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; -import io.netty.channel.ChannelPromiseNotifier; import io.netty.handler.codec.EncoderException; import io.netty.handler.codec.MessageToByteEncoder; import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.PromiseNotifier; import io.netty.util.internal.ObjectUtil; import net.jpountz.lz4.LZ4Compressor; import net.jpountz.lz4.LZ4Exception; @@ -359,7 +359,7 @@ public class Lz4FrameEncoder extends MessageToByteEncoder { @Override public void run() { ChannelFuture f = finishEncode(ctx(), promise); - f.addListener(new ChannelPromiseNotifier(promise)); + PromiseNotifier.cascade(f, promise); } }); return promise; diff --git a/common/src/main/java/io/netty/util/concurrent/PromiseNotifier.java b/common/src/main/java/io/netty/util/concurrent/PromiseNotifier.java index a4599fa40e..9bd0017217 100644 --- a/common/src/main/java/io/netty/util/concurrent/PromiseNotifier.java +++ b/common/src/main/java/io/netty/util/concurrent/PromiseNotifier.java @@ -61,6 +61,57 @@ public class PromiseNotifier> implements GenericFutureLis this.logNotifyFailure = logNotifyFailure; } + /** + * Link the {@link Future} and {@link Promise} such that if the {@link Future} completes the {@link Promise} + * will be notified. Cancellation is propagated both ways such that if the {@link Future} is cancelled + * the {@link Promise} is cancelled and vise-versa. + * + * @param future the {@link Future} which will be used to listen to for notifying the {@link Promise}. + * @param promise the {@link Promise} which will be notified + * @param the type of the value. + * @param the type of the {@link Future} + * @return the passed in {@link Future} + */ + public static > F cascade(final F future, final Promise promise) { + return cascade(true, future, promise); + } + + /** + * Link the {@link Future} and {@link Promise} such that if the {@link Future} completes the {@link Promise} + * will be notified. Cancellation is propagated both ways such that if the {@link Future} is cancelled + * the {@link Promise} is cancelled and vise-versa. + * + * @param logNotifyFailure {@code true} if logging should be done in case notification fails. + * @param future the {@link Future} which will be used to listen to for notifying the {@link Promise}. + * @param promise the {@link Promise} which will be notified + * @param the type of the value. + * @param the type of the {@link Future} + * @return the passed in {@link Future} + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + public static > F cascade(boolean logNotifyFailure, final F future, + final Promise promise) { + promise.addListener(new FutureListener() { + @Override + public void operationComplete(Future f) { + if (f.isCancelled()) { + future.cancel(false); + } + } + }); + future.addListener(new PromiseNotifier(logNotifyFailure, promise) { + @Override + public void operationComplete(Future f) throws Exception { + if (promise.isCancelled() && f.isCancelled()) { + // Just return if we propagate a cancel from the promise to the future and both are notified already + return; + } + super.operationComplete(future); + } + }); + return future; + } + @Override public void operationComplete(F future) throws Exception { InternalLogger internalLogger = logNotifyFailure ? logger : null; diff --git a/common/src/test/java/io/netty/util/concurrent/PromiseNotifierTest.java b/common/src/test/java/io/netty/util/concurrent/PromiseNotifierTest.java index 14b5ec9aff..6eb4e7e003 100644 --- a/common/src/test/java/io/netty/util/concurrent/PromiseNotifierTest.java +++ b/common/src/test/java/io/netty/util/concurrent/PromiseNotifierTest.java @@ -19,7 +19,9 @@ package io.netty.util.concurrent; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.Executable; +import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.*; public class PromiseNotifierTest { @@ -93,4 +95,16 @@ public class PromiseNotifierTest { verify(p2).tryFailure(t); } + @Test + public void testCancelPropagationWhenFusedFromFuture() { + Promise p1 = ImmediateEventExecutor.INSTANCE.newPromise(); + Promise p2 = ImmediateEventExecutor.INSTANCE.newPromise(); + + Promise returned = PromiseNotifier.cascade(p1, p2); + assertSame(p1, returned); + + assertTrue(returned.cancel(false)); + assertTrue(returned.isCancelled()); + assertTrue(p2.isCancelled()); + } } diff --git a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java index 69895027de..15f7cbe115 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java +++ b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java @@ -33,7 +33,6 @@ import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelOutboundHandler; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; -import io.netty.channel.ChannelPromiseNotifier; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.UnsupportedMessageTypeException; @@ -1949,8 +1948,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH // because of a propagated Exception. // // See https://github.com/netty/netty/issues/5931 - safeClose(ctx, closeNotifyPromise, ctx.newPromise().addListener( - new ChannelPromiseNotifier(false, promise))); + safeClose(ctx, closeNotifyPromise, PromiseNotifier.cascade(false, ctx.newPromise(), promise)); } else { /// We already handling the close_notify so just attach the promise to the sslClosePromise. sslClosePromise.addListener(new FutureListener() { @@ -2053,7 +2051,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH if (!oldHandshakePromise.isDone()) { // There's no need to handshake because handshake is in progress already. // Merge the new promise into the old one. - oldHandshakePromise.addListener(new PromiseNotifier>(newHandshakePromise)); + PromiseNotifier.cascade(oldHandshakePromise, newHandshakePromise); } else { handshakePromise = newHandshakePromise; handshake(true); @@ -2234,7 +2232,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH // IllegalStateException. // Also we not want to log if the notification happens as this is expected in some cases. // See https://github.com/netty/netty/issues/5598 - future.addListener(new ChannelPromiseNotifier(false, promise)); + PromiseNotifier.cascade(false, future, promise); } /** diff --git a/handler/src/test/java/io/netty/handler/ssl/ParameterizedSslHandlerTest.java b/handler/src/test/java/io/netty/handler/ssl/ParameterizedSslHandlerTest.java index 95b9e36035..b0d4baf997 100644 --- a/handler/src/test/java/io/netty/handler/ssl/ParameterizedSslHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/ssl/ParameterizedSslHandlerTest.java @@ -436,8 +436,7 @@ public class ParameterizedSslHandlerTest { protected void initChannel(Channel ch) throws Exception { SslHandler handler = sslServerCtx.newHandler(ch.alloc()); handler.setCloseNotifyReadTimeoutMillis(closeNotifyReadTimeout); - handler.sslCloseFuture().addListener( - new PromiseNotifier>(serverPromise)); + PromiseNotifier.cascade(handler.sslCloseFuture(), serverPromise); handler.handshakeFuture().addListener(new FutureListener() { @Override public void operationComplete(Future future) { @@ -475,8 +474,7 @@ public class ParameterizedSslHandlerTest { SslHandler handler = sslClientCtx.newHandler(ch.alloc()); handler.setCloseNotifyReadTimeoutMillis(closeNotifyReadTimeout); - handler.sslCloseFuture().addListener( - new PromiseNotifier>(clientPromise)); + PromiseNotifier.cascade(handler.sslCloseFuture(), clientPromise); handler.handshakeFuture().addListener(new FutureListener() { @Override public void operationComplete(Future future) { diff --git a/transport/src/main/java/io/netty/channel/ChannelPromiseNotifier.java b/transport/src/main/java/io/netty/channel/ChannelPromiseNotifier.java index 92f87fb581..bcd8ba67e7 100644 --- a/transport/src/main/java/io/netty/channel/ChannelPromiseNotifier.java +++ b/transport/src/main/java/io/netty/channel/ChannelPromiseNotifier.java @@ -19,7 +19,10 @@ import io.netty.util.concurrent.PromiseNotifier; /** * ChannelFutureListener implementation which takes other {@link ChannelPromise}(s) and notifies them on completion. + * + * @deprecated use {@link PromiseNotifier}. */ +@Deprecated public final class ChannelPromiseNotifier extends PromiseNotifier implements ChannelFutureListener {