diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java index edf4e2fd14..652e93be95 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java @@ -18,9 +18,9 @@ package io.netty.handler.codec.http.websocketx; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; -import io.netty.channel.ChannelPromiseNotifier; import io.netty.handler.codec.MessageToMessageDecoder; import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.PromiseNotifier; import io.netty.util.concurrent.ScheduledFuture; import java.nio.channels.ClosedChannelException; @@ -102,7 +102,7 @@ abstract class WebSocketProtocolHandler extends MessageToMessageDecoder(false, closeSent)); } else { ctx.write(msg, promise); } diff --git a/codec/src/main/java/io/netty/handler/codec/compression/Bzip2Encoder.java b/codec/src/main/java/io/netty/handler/codec/compression/Bzip2Encoder.java index 127ea8e6a8..f42d96bc2c 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/Bzip2Encoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/Bzip2Encoder.java @@ -21,9 +21,9 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; -import io.netty.channel.ChannelPromiseNotifier; import io.netty.handler.codec.MessageToByteEncoder; import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.PromiseNotifier; import java.util.concurrent.TimeUnit; @@ -186,7 +186,7 @@ public class Bzip2Encoder extends MessageToByteEncoder { } else { executor.execute(() -> { ChannelFuture f = finishEncode(ctx(), promise); - f.addListener(new ChannelPromiseNotifier(promise)); + PromiseNotifier.cascade(f, promise); }); return promise; } diff --git a/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java index 70fb97eace..4a756d8884 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java @@ -22,8 +22,8 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; -import io.netty.channel.ChannelPromiseNotifier; import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.PromiseNotifier; import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; @@ -170,7 +170,7 @@ public class JdkZlibEncoder extends ZlibEncoder { final ChannelPromise p = ctx.newPromise(); executor.execute(() -> { ChannelFuture f = finishEncode(ctx(), p); - f.addListener(new ChannelPromiseNotifier(promise)); + PromiseNotifier.cascade(f, promise); }); return p; } diff --git a/codec/src/main/java/io/netty/handler/codec/compression/Lz4FrameEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/Lz4FrameEncoder.java index 5efdbceaf7..1c1c7f6fe5 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/Lz4FrameEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/Lz4FrameEncoder.java @@ -23,10 +23,10 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; -import io.netty.channel.ChannelPromiseNotifier; import io.netty.handler.codec.EncoderException; import io.netty.handler.codec.MessageToByteEncoder; import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.PromiseNotifier; import io.netty.util.internal.ObjectUtil; import net.jpountz.lz4.LZ4Compressor; import net.jpountz.lz4.LZ4Exception; @@ -359,7 +359,7 @@ public class Lz4FrameEncoder extends MessageToByteEncoder { } else { executor.execute(() -> { ChannelFuture f = finishEncode(ctx(), promise); - f.addListener(new ChannelPromiseNotifier(promise)); + PromiseNotifier.cascade(f, promise); }); return promise; } diff --git a/common/src/main/java/io/netty/util/concurrent/PromiseNotifier.java b/common/src/main/java/io/netty/util/concurrent/PromiseNotifier.java index 5e0193bed4..ac856a18cb 100644 --- a/common/src/main/java/io/netty/util/concurrent/PromiseNotifier.java +++ b/common/src/main/java/io/netty/util/concurrent/PromiseNotifier.java @@ -61,6 +61,57 @@ public class PromiseNotifier> implements GenericFutureLis this.logNotifyFailure = logNotifyFailure; } + /** + * Link the {@link Future} and {@link Promise} such that if the {@link Future} completes the {@link Promise} + * will be notified. Cancellation is propagated both ways such that if the {@link Future} is cancelled + * the {@link Promise} is cancelled and vise-versa. + * + * @param future the {@link Future} which will be used to listen to for notifying the {@link Promise}. + * @param promise the {@link Promise} which will be notified + * @param the type of the value. + * @param the type of the {@link Future} + * @return the passed in {@link Future} + */ + public static > F cascade(final F future, final Promise promise) { + return cascade(true, future, promise); + } + + /** + * Link the {@link Future} and {@link Promise} such that if the {@link Future} completes the {@link Promise} + * will be notified. Cancellation is propagated both ways such that if the {@link Future} is cancelled + * the {@link Promise} is cancelled and vise-versa. + * + * @param logNotifyFailure {@code true} if logging should be done in case notification fails. + * @param future the {@link Future} which will be used to listen to for notifying the {@link Promise}. + * @param promise the {@link Promise} which will be notified + * @param the type of the value. + * @param the type of the {@link Future} + * @return the passed in {@link Future} + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + public static > F cascade(boolean logNotifyFailure, final F future, + final Promise promise) { + promise.addListener(new FutureListener() { + @Override + public void operationComplete(Future f) { + if (f.isCancelled()) { + future.cancel(false); + } + } + }); + future.addListener(new PromiseNotifier(logNotifyFailure, promise) { + @Override + public void operationComplete(Future f) throws Exception { + if (promise.isCancelled() && f.isCancelled()) { + // Just return if we propagate a cancel from the promise to the future and both are notified already + return; + } + super.operationComplete(future); + } + }); + return future; + } + @Override public void operationComplete(F future) throws Exception { InternalLogger internalLogger = logNotifyFailure ? logger : null; diff --git a/common/src/test/java/io/netty/util/concurrent/PromiseNotifierTest.java b/common/src/test/java/io/netty/util/concurrent/PromiseNotifierTest.java index b85d4b42dc..0755b6b512 100644 --- a/common/src/test/java/io/netty/util/concurrent/PromiseNotifierTest.java +++ b/common/src/test/java/io/netty/util/concurrent/PromiseNotifierTest.java @@ -18,7 +18,9 @@ package io.netty.util.concurrent; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.*; public class PromiseNotifierTest { @@ -82,4 +84,16 @@ public class PromiseNotifierTest { verify(p2).tryFailure(t); } + @Test + public void testCancelPropagationWhenFusedFromFuture() { + Promise p1 = ImmediateEventExecutor.INSTANCE.newPromise(); + Promise p2 = ImmediateEventExecutor.INSTANCE.newPromise(); + + Promise returned = PromiseNotifier.cascade(p1, p2); + assertSame(p1, returned); + + assertTrue(returned.cancel(false)); + assertTrue(returned.isCancelled()); + assertTrue(p2.isCancelled()); + } } diff --git a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java index cf9824b28f..333c2e45fc 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java +++ b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java @@ -33,7 +33,6 @@ import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; -import io.netty.channel.ChannelPromiseNotifier; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.UnsupportedMessageTypeException; @@ -1931,8 +1930,7 @@ public class SslHandler extends ByteToMessageDecoder { // because of a propagated Exception. // // See https://github.com/netty/netty/issues/5931 - safeClose(ctx, closeNotifyPromise, ctx.newPromise().addListener( - new ChannelPromiseNotifier(false, promise))); + safeClose(ctx, closeNotifyPromise, PromiseNotifier.cascade(false, ctx.newPromise(), promise)); } else { /// We already handling the close_notify so just attach the promise to the sslClosePromise. sslClosePromise.addListener((FutureListener) future -> promise.setSuccess()); @@ -2025,7 +2023,7 @@ public class SslHandler extends ByteToMessageDecoder { if (!oldHandshakePromise.isDone()) { // There's no need to handshake because handshake is in progress already. // Merge the new promise into the old one. - oldHandshakePromise.addListener(new PromiseNotifier<>(newHandshakePromise)); + PromiseNotifier.cascade(oldHandshakePromise, newHandshakePromise); } else { handshakePromise = newHandshakePromise; handshake(true); @@ -2195,7 +2193,7 @@ public class SslHandler extends ByteToMessageDecoder { // IllegalStateException. // Also we not want to log if the notification happens as this is expected in some cases. // See https://github.com/netty/netty/issues/5598 - future.addListener(new ChannelPromiseNotifier(false, promise)); + PromiseNotifier.cascade(false, future, promise); } /** diff --git a/handler/src/test/java/io/netty/handler/ssl/ParameterizedSslHandlerTest.java b/handler/src/test/java/io/netty/handler/ssl/ParameterizedSslHandlerTest.java index 883960ef2c..da76b7f822 100644 --- a/handler/src/test/java/io/netty/handler/ssl/ParameterizedSslHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/ssl/ParameterizedSslHandlerTest.java @@ -432,8 +432,8 @@ public class ParameterizedSslHandlerTest { protected void initChannel(Channel ch) throws Exception { SslHandler handler = sslServerCtx.newHandler(ch.alloc()); handler.setCloseNotifyReadTimeoutMillis(closeNotifyReadTimeout); - handler.sslCloseFuture().addListener( - new PromiseNotifier<>(serverPromise)); + PromiseNotifier.cascade(handler.sslCloseFuture(), serverPromise); + handler.handshakeFuture().addListener((FutureListener) future -> { if (!future.isSuccess()) { // Something bad happened during handshake fail the promise! @@ -468,8 +468,7 @@ public class ParameterizedSslHandlerTest { SslHandler handler = sslClientCtx.newHandler(ch.alloc()); handler.setCloseNotifyReadTimeoutMillis(closeNotifyReadTimeout); - handler.sslCloseFuture().addListener( - new PromiseNotifier<>(clientPromise)); + PromiseNotifier.cascade(handler.sslCloseFuture(), clientPromise); handler.handshakeFuture().addListener((FutureListener) future -> { if (future.isSuccess()) { closeSent.compareAndSet(false, true); diff --git a/transport/src/main/java/io/netty/channel/ChannelPromiseNotifier.java b/transport/src/main/java/io/netty/channel/ChannelPromiseNotifier.java index 92f87fb581..bcd8ba67e7 100644 --- a/transport/src/main/java/io/netty/channel/ChannelPromiseNotifier.java +++ b/transport/src/main/java/io/netty/channel/ChannelPromiseNotifier.java @@ -19,7 +19,10 @@ import io.netty.util.concurrent.PromiseNotifier; /** * ChannelFutureListener implementation which takes other {@link ChannelPromise}(s) and notifies them on completion. + * + * @deprecated use {@link PromiseNotifier}. */ +@Deprecated public final class ChannelPromiseNotifier extends PromiseNotifier implements ChannelFutureListener {