From e04f48d802e7ca18a9978ba448d0b98a1cd25d76 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Sun, 29 Aug 2021 15:44:34 +0200 Subject: [PATCH] Add cascadeTo methods to Future (#11623) Motivation: The need of cascade from a Future to a Promise exists. We should add some default implementation for it. Modifications: - Merge PromiseNotifier into Futures - Add default cascadeTo(...) methods to Future - Add tests to FuturesTest - Replace usage of PromiseNotifier with Future.cascadeTo - Use combination of map(...) and cascadeTo(...) in *Bootstrap to reduce code duplication Result: Provide default implementation of cascadeTo. --- .../websocketx/WebSocketProtocolHandler.java | 5 +- .../WebSocketServerProtocolHandler.java | 3 +- .../http/HttpServerUpgradeHandlerTest.java | 3 +- .../http2/DefaultHttp2ConnectionEncoder.java | 3 +- .../codec/http2/DefaultHttp2FrameWriter.java | 62 +++---- .../codec/http2/Http2ConnectionHandler.java | 3 +- .../handler/codec/http2/Http2FrameCodec.java | 3 +- .../codec/http2/Http2MultiplexTest.java | 3 +- .../codec/MessageToMessageEncoder.java | 3 +- .../codec/compression/Bzip2Encoder.java | 10 +- .../codec/compression/JdkZlibEncoder.java | 10 +- .../codec/compression/Lz4FrameEncoder.java | 11 +- .../java/io/netty/util/concurrent/Future.java | 13 ++ .../io/netty/util/concurrent/Futures.java | 25 ++- .../util/concurrent/PromiseNotifier.java | 171 ------------------ .../io/netty/util/concurrent/FuturesTest.java | 63 +++++++ .../util/concurrent/PromiseNotifierTest.java | 94 ---------- .../address/ResolveAddressHandler.java | 3 +- .../java/io/netty/handler/ssl/SslHandler.java | 13 +- .../handler/stream/ChunkedWriteHandler.java | 3 +- .../traffic/ChannelTrafficShapingHandler.java | 7 +- .../GlobalChannelTrafficShapingHandler.java | 7 +- .../traffic/GlobalTrafficShapingHandler.java | 7 +- .../ssl/ParameterizedSslHandlerTest.java | 5 +- .../http2/Http2FrameWriterDataBenchmark.java | 8 +- .../io/netty/bootstrap/AbstractBootstrap.java | 8 +- .../java/io/netty/bootstrap/Bootstrap.java | 3 +- .../AbstractCoalescingBufferQueue.java | 7 +- .../channel/DefaultChannelHandlerContext.java | 20 +- .../io/netty/channel/PendingWriteQueue.java | 5 +- .../channel/embedded/EmbeddedChannelTest.java | 5 +- 31 files changed, 190 insertions(+), 396 deletions(-) delete mode 100644 common/src/main/java/io/netty/util/concurrent/PromiseNotifier.java delete mode 100644 common/src/test/java/io/netty/util/concurrent/PromiseNotifierTest.java diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java index f80c6ffa55..7f4df07b15 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java @@ -21,7 +21,6 @@ import io.netty.handler.codec.MessageToMessageDecoder; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.PromiseNotifier; import io.netty.util.concurrent.ScheduledFuture; import java.nio.channels.ClosedChannelException; @@ -92,7 +91,7 @@ abstract class WebSocketProtocolHandler extends MessageToMessageDecoder promise = ctx.newPromise(); - future.addListener(f -> ctx.close().addListener(new PromiseNotifier<>(promise))); + future.addListener(f -> ctx.close().cascadeTo(promise)); return promise; } @@ -105,7 +104,7 @@ abstract class WebSocketProtocolHandler extends MessageToMessageDecoder promise = ctx.newPromise(); closeSent(promise); - ctx.write(msg).addListener(new PromiseNotifier<>(false, closeSent)); + ctx.write(msg).cascadeTo(closeSent); return promise; } return ctx.write(msg); diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandler.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandler.java index 458af8c252..d6ba5e19be 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandler.java @@ -27,7 +27,6 @@ import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.util.AttributeKey; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.PromiseNotifier; import java.util.Objects; @@ -240,7 +239,7 @@ public class WebSocketServerProtocolHandler extends WebSocketProtocolHandler { frame.retain(); Promise promise = ctx.newPromise(); closeSent(promise); - handshaker.close(ctx, (CloseWebSocketFrame) frame).addListener(new PromiseNotifier<>(promise)); + handshaker.close(ctx, (CloseWebSocketFrame) frame).cascadeTo(promise); } else { ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ctx.channel(), ChannelFutureListeners.CLOSE); } diff --git a/codec-http/src/test/java/io/netty/handler/codec/http/HttpServerUpgradeHandlerTest.java b/codec-http/src/test/java/io/netty/handler/codec/http/HttpServerUpgradeHandlerTest.java index 6671fcf38b..507e05c8c9 100644 --- a/codec-http/src/test/java/io/netty/handler/codec/http/HttpServerUpgradeHandlerTest.java +++ b/codec-http/src/test/java/io/netty/handler/codec/http/HttpServerUpgradeHandlerTest.java @@ -26,7 +26,6 @@ import io.netty.util.CharsetUtil; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.PromiseNotifier; import org.junit.jupiter.api.Test; import java.util.Collection; @@ -101,7 +100,7 @@ public class HttpServerUpgradeHandlerTest { assertTrue(inReadCall); writeUpgradeMessage = true; Promise promise = ctx.newPromise(); - ctx.channel().executor().execute(() -> ctx.write(msg).addListener(new PromiseNotifier<>(promise))); + ctx.channel().executor().execute(() -> ctx.write(msg).cascadeTo(promise)); promise.addListener(f -> writeFlushed = true); return promise; } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java index 4ad0644afe..dc02d63edb 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java @@ -23,7 +23,6 @@ import io.netty.handler.codec.http2.Http2CodecUtil.SimpleChannelPromiseAggregato import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.PromiseNotifier; import io.netty.util.internal.UnstableApi; import java.util.ArrayDeque; @@ -486,7 +485,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder, Ht // corresponding to 0 bytes and writing it to the channel (to preserve notification order). Promise writePromise = ctx.newPromise(); writePromise.addListener(this); - ctx.write(queue.remove(0, writePromise)).addListener(new PromiseNotifier<>(writePromise)); + ctx.write(queue.remove(0, writePromise)).cascadeTo(writePromise); } return; } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameWriter.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameWriter.java index 27f25b971a..4e02365939 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameWriter.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameWriter.java @@ -22,7 +22,6 @@ import io.netty.handler.codec.http2.Http2FrameWriter.Configuration; import io.netty.handler.codec.http2.Http2HeadersEncoder.SensitivityDetector; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.PromiseNotifier; import io.netty.util.internal.UnstableApi; import static io.netty.buffer.Unpooled.directBuffer; @@ -152,12 +151,10 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize writeFrameHeaderInternal(frameHeader, maxFrameSize, DATA, flags, streamId); do { // Write the header. - ctx.write(frameHeader.retainedSlice()).addListener( - new PromiseNotifier<>(promiseAggregator.newPromise())); + ctx.write(frameHeader.retainedSlice()).cascadeTo(promiseAggregator.newPromise()); // Write the payload. - ctx.write(data.readRetainedSlice(maxFrameSize)).addListener( - new PromiseNotifier<>(promiseAggregator.newPromise())); + ctx.write(data.readRetainedSlice(maxFrameSize)).cascadeTo(promiseAggregator.newPromise()); remainingData -= maxFrameSize; // Stop iterating if remainingData == maxFrameSize so we can take care of reference counts below. @@ -173,12 +170,12 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize ByteBuf frameHeader2 = ctx.alloc().buffer(FRAME_HEADER_LENGTH); flags.endOfStream(endStream); writeFrameHeaderInternal(frameHeader2, remainingData, DATA, flags, streamId); - ctx.write(frameHeader2).addListener(new PromiseNotifier<>(promiseAggregator.newPromise())); + ctx.write(frameHeader2).cascadeTo(promiseAggregator.newPromise()); // Write the payload. ByteBuf lastFrame = data.readSlice(remainingData); data = null; - ctx.write(lastFrame).addListener(new PromiseNotifier<>(promiseAggregator.newPromise())); + ctx.write(lastFrame).cascadeTo(promiseAggregator.newPromise()); } else { if (remainingData != maxFrameSize) { if (frameHeader != null) { @@ -196,12 +193,12 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize lastFrame = frameHeader.slice(); frameHeader = null; } - ctx.write(lastFrame).addListener(new PromiseNotifier<>(promiseAggregator.newPromise())); + ctx.write(lastFrame).cascadeTo(promiseAggregator.newPromise()); // Write the payload. lastFrame = data.readableBytes() != maxFrameSize ? data.readSlice(maxFrameSize) : data; data = null; - ctx.write(lastFrame).addListener(new PromiseNotifier<>(promiseAggregator.newPromise())); + ctx.write(lastFrame).cascadeTo(promiseAggregator.newPromise()); } do { @@ -218,23 +215,23 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize flags.paddingPresent(framePaddingBytes > 0); writeFrameHeaderInternal(frameHeader2, framePaddingBytes + frameDataBytes, DATA, flags, streamId); writePaddingLength(frameHeader2, framePaddingBytes); - ctx.write(frameHeader2).addListener(new PromiseNotifier<>(promiseAggregator.newPromise())); + ctx.write(frameHeader2).cascadeTo(promiseAggregator.newPromise()); // Write the payload. if (frameDataBytes != 0 && data != null) { // Make sure Data is not null if (remainingData == 0) { ByteBuf lastFrame = data.readSlice(frameDataBytes); data = null; - ctx.write(lastFrame).addListener(new PromiseNotifier<>(promiseAggregator.newPromise())); + ctx.write(lastFrame).cascadeTo(promiseAggregator.newPromise()); } else { ctx.write(data.readRetainedSlice(frameDataBytes)) - .addListener(new PromiseNotifier<>(promiseAggregator.newPromise())); + .cascadeTo(promiseAggregator.newPromise()); } } // Write the frame padding. if (paddingBytes(framePaddingBytes) > 0) { ctx.write(ZERO_BUFFER.slice(0, paddingBytes(framePaddingBytes))) - .addListener(new PromiseNotifier<>(promiseAggregator.newPromise())); + .cascadeTo(promiseAggregator.newPromise()); } } while (remainingData != 0 || padding != 0); } @@ -285,7 +282,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize buf.writeInt(exclusive ? (int) (0x80000000L | streamDependency) : streamDependency); // Adjust the weight so that it fits into a single byte on the wire. buf.writeByte(weight - 1); - return ctx.write(buf).addListener(new PromiseNotifier<>(promise)); + return ctx.write(buf).cascadeTo(promise); } catch (Throwable t) { return promise.setFailure(t); } @@ -301,7 +298,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize ByteBuf buf = ctx.alloc().buffer(RST_STREAM_FRAME_LENGTH); writeFrameHeaderInternal(buf, INT_FIELD_LENGTH, RST_STREAM, new Http2Flags(), streamId); buf.writeInt((int) errorCode); - return ctx.write(buf).addListener(new PromiseNotifier<>(promise)); + return ctx.write(buf).cascadeTo(promise); } catch (Throwable t) { return promise.setFailure(t); } @@ -319,7 +316,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize buf.writeChar(entry.key()); buf.writeInt(entry.value().intValue()); } - return ctx.write(buf).addListener(new PromiseNotifier<>(promise)); + return ctx.write(buf).cascadeTo(promise); } catch (Throwable t) { return promise.setFailure(t); } @@ -330,7 +327,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize try { ByteBuf buf = ctx.alloc().buffer(FRAME_HEADER_LENGTH); writeFrameHeaderInternal(buf, 0, SETTINGS, new Http2Flags().ack(true), 0); - return ctx.write(buf).addListener(new PromiseNotifier<>(promise)); + return ctx.write(buf).cascadeTo(promise); } catch (Throwable t) { return promise.setFailure(t); } @@ -344,7 +341,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize // in the catch block. writeFrameHeaderInternal(buf, PING_FRAME_PAYLOAD_LENGTH, PING, flags, 0); buf.writeLong(data); - return ctx.write(buf).addListener(new PromiseNotifier<>(promise)); + return ctx.write(buf).cascadeTo(promise); } @Override @@ -377,15 +374,14 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize // Write out the promised stream ID. buf.writeInt(promisedStreamId); - ctx.write(buf).addListener(new PromiseNotifier<>(promiseAggregator.newPromise())); + ctx.write(buf).cascadeTo(promiseAggregator.newPromise()); // Write the first fragment. - ctx.write(fragment).addListener(new PromiseNotifier<>(promiseAggregator.newPromise())); + ctx.write(fragment).cascadeTo(promiseAggregator.newPromise()); // Write out the padding, if any. if (paddingBytes(padding) > 0) { - ctx.write(ZERO_BUFFER.slice(0, paddingBytes(padding))) - .addListener(new PromiseNotifier<>(promiseAggregator.newPromise())); + ctx.write(ZERO_BUFFER.slice(0, paddingBytes(padding))).cascadeTo(promiseAggregator.newPromise()); } if (!flags.endOfHeaders()) { @@ -420,7 +416,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize writeFrameHeaderInternal(buf, payloadLength, GO_AWAY, new Http2Flags(), 0); buf.writeInt(lastStreamId); buf.writeInt((int) errorCode); - ctx.write(buf).addListener(new PromiseNotifier<>(promiseAggregator.newPromise())); + ctx.write(buf).cascadeTo(promiseAggregator.newPromise()); } catch (Throwable t) { try { debugData.release(); @@ -432,7 +428,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize } try { - ctx.write(debugData).addListener(new PromiseNotifier<>(promiseAggregator.newPromise())); + ctx.write(debugData).cascadeTo(promiseAggregator.newPromise()); } catch (Throwable t) { promiseAggregator.setFailure(t); } @@ -449,7 +445,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize ByteBuf buf = ctx.alloc().buffer(WINDOW_UPDATE_FRAME_LENGTH); writeFrameHeaderInternal(buf, INT_FIELD_LENGTH, WINDOW_UPDATE, new Http2Flags(), streamId); buf.writeInt(windowSizeIncrement); - return ctx.write(buf).addListener(new PromiseNotifier<>(promise)); + return ctx.write(buf).cascadeTo(promise); } catch (Throwable t) { return promise.setFailure(t); } @@ -465,7 +461,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize // Assume nothing below will throw until buf is written. That way we don't have to take care of ownership // in the catch block. writeFrameHeaderInternal(buf, payload.readableBytes(), frameType, flags, streamId); - ctx.write(buf).addListener(new PromiseNotifier<>(promiseAggregator.newPromise())); + ctx.write(buf).cascadeTo(promiseAggregator.newPromise()); } catch (Throwable t) { try { payload.release(); @@ -476,7 +472,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize return promiseAggregator; } try { - ctx.write(payload).addListener(new PromiseNotifier<>(promiseAggregator.newPromise())); + ctx.write(payload).cascadeTo(promiseAggregator.newPromise()); } catch (Throwable t) { promiseAggregator.setFailure(t); } @@ -522,15 +518,15 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize // Adjust the weight so that it fits into a single byte on the wire. buf.writeByte(weight - 1); } - ctx.write(buf).addListener(new PromiseNotifier<>(promiseAggregator.newPromise())); + ctx.write(buf).cascadeTo(promiseAggregator.newPromise()); // Write the first fragment. - ctx.write(fragment).addListener(new PromiseNotifier<>(promiseAggregator.newPromise())); + ctx.write(fragment).cascadeTo(promiseAggregator.newPromise()); // Write out the padding, if any. if (paddingBytes(padding) > 0) { ctx.write(ZERO_BUFFER.slice(0, paddingBytes(padding))) - .addListener(new PromiseNotifier<>(promiseAggregator.newPromise())); + .cascadeTo(promiseAggregator.newPromise()); } if (!flags.endOfHeaders()) { @@ -568,17 +564,17 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize ByteBuf fragment = headerBlock.readRetainedSlice(fragmentReadableBytes); if (headerBlock.isReadable()) { - ctx.write(buf.retain()).addListener(new PromiseNotifier<>(promiseAggregator.newPromise())); + ctx.write(buf.retain()).cascadeTo(promiseAggregator.newPromise()); } else { // The frame header is different for the last frame, so re-allocate and release the old buffer flags = flags.endOfHeaders(true); buf.release(); buf = ctx.alloc().buffer(CONTINUATION_FRAME_HEADER_LENGTH); writeFrameHeaderInternal(buf, fragmentReadableBytes, CONTINUATION, flags, streamId); - ctx.write(buf).addListener(new PromiseNotifier<>(promiseAggregator.newPromise())); + ctx.write(buf).cascadeTo(promiseAggregator.newPromise()); } - ctx.write(fragment).addListener(new PromiseNotifier<>(promiseAggregator.newPromise())); + ctx.write(fragment).cascadeTo(promiseAggregator.newPromise()); } while (headerBlock.isReadable()); } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java index 22ae39d6d1..7149f72dbe 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java @@ -27,7 +27,6 @@ import io.netty.util.CharsetUtil; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.PromiseNotifier; import io.netty.util.concurrent.ScheduledFuture; import io.netty.util.internal.UnstableApi; import io.netty.util.internal.logging.InternalLogger; @@ -925,7 +924,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http if (promise == null) { ctx.close(); } else { - ctx.close().addListener(new PromiseNotifier<>(promise)); + ctx.close().cascadeTo(promise); } } else if (promise != null) { promise.setSuccess(null); diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameCodec.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameCodec.java index 60c692f58d..9436426a10 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameCodec.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameCodec.java @@ -31,7 +31,6 @@ import io.netty.util.collection.IntObjectHashMap; import io.netty.util.collection.IntObjectMap; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.PromiseNotifier; import io.netty.util.internal.UnstableApi; import io.netty.util.internal.logging.InternalLogLevel; import io.netty.util.internal.logging.InternalLogger; @@ -340,7 +339,7 @@ public class Http2FrameCodec extends Http2ConnectionHandler { encoder().writeFrame(ctx, unknownFrame.frameType(), unknownFrame.stream().id(), unknownFrame.flags(), unknownFrame.content(), promise); } else if (!(msg instanceof Http2Frame)) { - ctx.write(msg).addListener(new PromiseNotifier<>(promise)); + ctx.write(msg).cascadeTo(promise); } else { ReferenceCountUtil.release(msg); promise.setFailure(new UnsupportedMessageTypeException(msg)); diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexTest.java index 0d3fbfedc1..a9f81ffa42 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexTest.java @@ -29,7 +29,6 @@ import io.netty.util.AsciiString; import io.netty.util.AttributeKey; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.PromiseNotifier; import org.hamcrest.CoreMatchers; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -739,7 +738,7 @@ public abstract class Http2MultiplexTest { channelOpen.set(channel.isOpen()); channelActive.set(channel.isActive()); }); - childChannel.close().addListener(new PromiseNotifier<>(p)).syncUninterruptibly(); + childChannel.close().cascadeTo(p).syncUninterruptibly(); assertFalse(channelOpen.get()); assertFalse(channelActive.get()); diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java index 0022db9d7b..b825b831c2 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java @@ -24,7 +24,6 @@ import io.netty.util.ReferenceCounted; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.PromiseCombiner; -import io.netty.util.concurrent.PromiseNotifier; import io.netty.util.internal.StringUtil; import io.netty.util.internal.TypeParameterMatcher; @@ -102,7 +101,7 @@ public abstract class MessageToMessageEncoder extends ChannelHandlerAdapter { } finally { final int sizeMinusOne = out.size() - 1; if (sizeMinusOne == 0) { - PromiseNotifier.cascade(ctx.write(out.getUnsafe(0)), promise); + ctx.write(out.getUnsafe(0)).cascadeTo(promise); } else { writePromiseCombiner(ctx, out, promise); } diff --git a/codec/src/main/java/io/netty/handler/codec/compression/Bzip2Encoder.java b/codec/src/main/java/io/netty/handler/codec/compression/Bzip2Encoder.java index 0761e20f1f..c3d0ec47c2 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/Bzip2Encoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/Bzip2Encoder.java @@ -22,7 +22,6 @@ import io.netty.handler.codec.MessageToByteEncoder; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.PromiseNotifier; import java.util.concurrent.TimeUnit; @@ -182,7 +181,7 @@ public class Bzip2Encoder extends MessageToByteEncoder { Promise promise = ctx.newPromise(); executor.execute(() -> { Future f = finishEncode(ctx()); - PromiseNotifier.cascade(f, promise); + f.cascadeTo(promise); }); return promise; } @@ -195,11 +194,10 @@ public class Bzip2Encoder extends MessageToByteEncoder { return ctx.close(); } Promise promise = ctx.newPromise(); - f.addListener(f1 -> ctx.close().addListener(new PromiseNotifier<>(false, promise))); + f.addListener(f1 -> ctx.close().cascadeTo(promise)); // Ensure the channel is closed even if the write operation completes in time. - ctx.executor().schedule(() -> { - ctx.close().addListener(new PromiseNotifier<>(false, promise)); - }, 10, TimeUnit.SECONDS); // FIXME: Magic number + ctx.executor().schedule(() -> ctx.close().cascadeTo(promise), + 10, TimeUnit.SECONDS); // FIXME: Magic number return promise; } diff --git a/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java index 0b2338fbb2..168a024bd7 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java @@ -20,7 +20,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.PromiseNotifier; import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; @@ -164,7 +163,7 @@ public class JdkZlibEncoder extends ZlibEncoder { Promise p = ctx.newPromise(); executor.execute(() -> { Future f = finishEncode(ctx()); - PromiseNotifier.cascade(f, p); + f.cascadeTo(p); }); return p; } @@ -262,11 +261,10 @@ public class JdkZlibEncoder extends ZlibEncoder { return ctx.close(); } Promise promise = ctx.newPromise(); - f.addListener(f1 -> ctx.close().addListener(new PromiseNotifier<>(false, promise))); + f.addListener(f1 -> ctx.close().cascadeTo(promise)); // Ensure the channel is closed even if the write operation completes in time. - ctx.executor().schedule(() -> { - ctx.close().addListener(new PromiseNotifier<>(false, promise)); - }, 10, TimeUnit.SECONDS); // FIXME: Magic number + ctx.executor().schedule(() -> ctx.close().cascadeTo(promise), + 10, TimeUnit.SECONDS); // FIXME: Magic number return promise; } diff --git a/codec/src/main/java/io/netty/handler/codec/compression/Lz4FrameEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/Lz4FrameEncoder.java index b7c192a42a..adc1741912 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/Lz4FrameEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/Lz4FrameEncoder.java @@ -25,7 +25,6 @@ import io.netty.handler.codec.MessageToByteEncoder; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.PromiseNotifier; import io.netty.util.internal.ObjectUtil; import net.jpountz.lz4.LZ4Compressor; import net.jpountz.lz4.LZ4Exception; @@ -348,7 +347,7 @@ public class Lz4FrameEncoder extends MessageToByteEncoder { Promise promise = ctx.newPromise(); executor.execute(() -> { Future f = finishEncode(ctx()); - PromiseNotifier.cascade(f, promise); + f.cascadeTo(promise); }); return promise; } @@ -361,12 +360,10 @@ public class Lz4FrameEncoder extends MessageToByteEncoder { return ctx.close(); } Promise promise = ctx.newPromise(); - f.addListener(f1 -> - ctx.close().addListener(new PromiseNotifier<>(false, promise))); + f.addListener(f1 -> ctx.close().cascadeTo(promise)); // Ensure the channel is closed even if the write operation completes in time. - ctx.executor().schedule(() -> { - ctx.close().addListener(new PromiseNotifier<>(false, promise)); - }, 10, TimeUnit.SECONDS); // FIXME: Magic number + ctx.executor().schedule(() -> ctx.close().cascadeTo(promise), + 10, TimeUnit.SECONDS); // FIXME: Magic number return promise; } diff --git a/common/src/main/java/io/netty/util/concurrent/Future.java b/common/src/main/java/io/netty/util/concurrent/Future.java index 6f1271fd00..7f6a8ca730 100644 --- a/common/src/main/java/io/netty/util/concurrent/Future.java +++ b/common/src/main/java/io/netty/util/concurrent/Future.java @@ -360,4 +360,17 @@ public interface Future extends java.util.concurrent.Future { default Future flatMap(Function> mapper) { return Futures.flatMap(this, mapper); } + + /** + * Link the {@link Future} and {@link Promise} such that if the {@link Future} completes the {@link Promise} + * will be notified. Cancellation is propagated both ways such that if the {@link Future} is cancelled + * the {@link Promise} is cancelled and vice-versa. + * + * @param promise the {@link Promise} which will be notified + * @return itself + */ + default Future cascadeTo(final Promise promise) { + Futures.cascade(this, promise); + return this; + } } diff --git a/common/src/main/java/io/netty/util/concurrent/Futures.java b/common/src/main/java/io/netty/util/concurrent/Futures.java index 751e444cd6..609916c619 100644 --- a/common/src/main/java/io/netty/util/concurrent/Futures.java +++ b/common/src/main/java/io/netty/util/concurrent/Futures.java @@ -33,7 +33,7 @@ import static java.util.Objects.requireNonNull; */ final class Futures { private static final InternalLogger logger = InternalLoggerFactory.getInstance(Futures.class); - private static final PassThrough PASS_THROUGH = new PassThrough(); + private static final PassThrough PASS_THROUGH = new PassThrough<>(); private static final PropagateCancel PROPAGATE_CANCEL = new PropagateCancel(); /** @@ -123,7 +123,7 @@ final class Futures { recipient.cancel(false); } else { Throwable cause = completed.cause(); - tryFailure(recipient, cause, logger); + recipient.tryFailure(cause); } } @@ -229,4 +229,25 @@ final class Futures { } } } + + /** + * Link the {@link Future} and {@link Promise} such that if the {@link Future} completes the {@link Promise} + * will be notified. Cancellation is propagated both ways such that if the {@link Future} is cancelled + * the {@link Promise} is cancelled and vice-versa. + * + * @param future the {@link Future} which will be used to listen to for notifying the {@link Promise}. + * @param promise the {@link Promise} which will be notified + * @param the type of the value. + */ + static void cascade(final Future future, final Promise promise) { + requireNonNull(future, "future"); + requireNonNull(promise, "promise"); + + if (!future.isSuccess()) { + // Propagate cancellation if future is either incomplete or failed. + // Failed means it could be cancelled, so that needs to be propagated. + promise.addListener(future, propagateCancel()); + } + future.addListener(promise, passThrough()); + } } diff --git a/common/src/main/java/io/netty/util/concurrent/PromiseNotifier.java b/common/src/main/java/io/netty/util/concurrent/PromiseNotifier.java deleted file mode 100644 index 823a73f072..0000000000 --- a/common/src/main/java/io/netty/util/concurrent/PromiseNotifier.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Copyright 2014 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.util.concurrent; - -import io.netty.util.internal.PromiseNotificationUtil; -import io.netty.util.internal.logging.InternalLogger; -import io.netty.util.internal.logging.InternalLoggerFactory; - -import static io.netty.util.internal.ObjectUtil.checkNotNullWithIAE; -import static java.util.Objects.requireNonNull; - -/** - * A {@link FutureListener} implementation which takes other {@link Promise}s - * and notifies them on completion. - * - * @param the type of value returned by the future - */ -public class PromiseNotifier implements FutureListener { - - private static final InternalLogger logger = InternalLoggerFactory.getInstance(PromiseNotifier.class); - private final Promise[] promises; - private final boolean logNotifyFailure; - - /** - * Create a new instance. - * - * @param promises the {@link Promise}s to notify once this {@link FutureListener} is notified. - */ - @SafeVarargs - public PromiseNotifier(Promise... promises) { - this(true, promises); - } - - /** - * Create a new instance. - * - * @param logNotifyFailure {@code true} if logging should be done in case notification fails. - * @param promises the {@link Promise}s to notify once this {@link FutureListener} is notified. - */ - @SafeVarargs - public PromiseNotifier(boolean logNotifyFailure, Promise... promises) { - requireNonNull(promises, "promises"); - for (Promise promise: promises) { - checkNotNullWithIAE(promise, "promise"); - } - this.promises = promises.clone(); - this.logNotifyFailure = logNotifyFailure; - } - - /** - * Link the {@link Future} and {@link Promise} such that if the {@link Future} completes the {@link Promise} - * will be notified. Cancellation is propagated both ways such that if the {@link Future} is cancelled - * the {@link Promise} is cancelled and vice-versa. - * - * @param future the {@link Future} which will be used to listen to for notifying the {@link Promise}. - * @param promise the {@link Promise} which will be notified - * @param the type of the value. - * @return the passed in {@link Future} - */ - public static Future cascade(final Future future, final Promise promise) { - return cascade(true, future, promise); - } - - /** - * Link the {@link Future} and {@link Promise} such that if the {@link Future} completes the {@link Promise} - * will be notified. Cancellation is propagated both ways such that if the {@link Future} is cancelled - * the {@link Promise} is cancelled and vice-versa. - * - * @param logNotifyFailure {@code true} if logging should be done in case notification fails. - * @param future the {@link Future} which will be used to listen to for notifying the {@link Promise}. - * @param promise the {@link Promise} which will be notified - * @param the type of the value. - * @return the passed in {@link Future} - */ - public static Future cascade(boolean logNotifyFailure, final Future future, - final Promise promise) { - promise.addListener(future, PromiseNotifier::propagateCancel); - future.addListener(new PromiseNotifier(logNotifyFailure, promise), PromiseNotifier::propagateComplete); - return future; - } - - /** - * Link the {@link Future} and {@link Promise} such that if the {@link Future} completes the {@link Promise} will be - * notified with the given result. - * Cancellation is propagated both ways such that if the {@link Future} is cancelled the {@link Promise} - * is cancelled and vice-versa. - * - * @param logNotifyFailure {@code true} if logging should be done in case notification fails. - * @param future the {@link Future} which will be used to listen to for notifying the {@link Promise}. - * @param promise the {@link Promise} which will be notified - * @param successResult the result that will be propagated to the promise on success - * @return the passed in {@link Future} - */ - public static Future cascade(boolean logNotifyFailure, Future future, - Promise promise, R successResult) { - promise.addListener(future, PromiseNotifier::propagateCancel); - future.addListener(new FutureListener() { - @Override - public void operationComplete(Future f) throws Exception { - if (promise.isCancelled() && f.isCancelled()) { - // Just return if we propagate a cancel from the promise to the future and both are notified already - return; - } - if (f.isSuccess()) { - promise.setSuccess(successResult); - } else if (f.isCancelled()) { - InternalLogger internalLogger = null; - if (logNotifyFailure) { - internalLogger = InternalLoggerFactory.getInstance(PromiseNotifier.class); - } - PromiseNotificationUtil.tryCancel(promise, internalLogger); - } else { - Throwable cause = future.cause(); - promise.tryFailure(cause); - } - } - }); - return future; - } - - static > void propagateCancel(F target, Future source) { - if (source.isCancelled()) { - target.cancel(false); - } - } - - static void propagateComplete(PromiseNotifier target, Future source) throws Exception { - boolean allCancelled = target.promises.length > 0; - for (Promise promise : target.promises) { - allCancelled &= promise.isCancelled(); - } - if (allCancelled && source.isCancelled()) { - // Just return if we propagate a cancel from the promise to the future and both are notified already - return; - } - target.operationComplete(source); - } - - @Override - public void operationComplete(Future future) throws Exception { - InternalLogger internalLogger = logNotifyFailure ? logger : null; - if (future.isSuccess()) { - V result = future.get(); - for (Promise p: promises) { - PromiseNotificationUtil.trySuccess(p, result, internalLogger); - } - } else if (future.isCancelled()) { - for (Promise p: promises) { - PromiseNotificationUtil.tryCancel(p, internalLogger); - } - } else { - Throwable cause = future.cause(); - for (Promise p: promises) { - PromiseNotificationUtil.tryFailure(p, cause, internalLogger); - } - } - } -} diff --git a/common/src/test/java/io/netty/util/concurrent/FuturesTest.java b/common/src/test/java/io/netty/util/concurrent/FuturesTest.java index 7de12e997b..0cdceea76b 100644 --- a/common/src/test/java/io/netty/util/concurrent/FuturesTest.java +++ b/common/src/test/java/io/netty/util/concurrent/FuturesTest.java @@ -25,6 +25,8 @@ import static io.netty.util.concurrent.ImmediateEventExecutor.INSTANCE; import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; class FuturesTest { @@ -212,4 +214,65 @@ class FuturesTest { mappingLatchExit.countDown(); assertThat(strFut.get(5, SECONDS)).isEqualTo("42"); } + + @Test + public void cascadeToNullPromise() { + TestEventExecutor executor = new TestEventExecutor(); + DefaultPromise promise = new DefaultPromise<>(executor); + assertThrows(NullPointerException.class, () -> promise.cascadeTo(null)); + } + + @Test + public void cascadeToSuccess() throws Exception { + TestEventExecutor executor = new TestEventExecutor(); + DefaultPromise promise = new DefaultPromise<>(executor); + DefaultPromise promise2 = new DefaultPromise<>(executor); + promise.cascadeTo(promise2); + promise.setSuccess(1); + assertTrue(promise.isSuccess()); + assertThat(promise2.get(1, SECONDS)).isEqualTo(1); + } + + @Test + public void cascadeToFailure() throws Exception { + TestEventExecutor executor = new TestEventExecutor(); + DefaultPromise promise = new DefaultPromise<>(executor); + DefaultPromise promise2 = new DefaultPromise<>(executor); + promise.cascadeTo(promise2); + + Exception ex = new Exception(); + promise.setFailure(ex); + assertTrue(promise.isFailed()); + assertTrue(promise2.await(1, SECONDS)); + assertTrue(promise2.isFailed()); + assertSame(promise.cause(), promise2.cause()); + } + + @Test + public void cascadeToCancel() throws Exception { + TestEventExecutor executor = new TestEventExecutor(); + DefaultPromise promise = new DefaultPromise<>(executor); + DefaultPromise promise2 = new DefaultPromise<>(executor); + promise.cascadeTo(promise2); + + assertTrue(promise.cancel(false)); + assertTrue(promise.isCancelled()); + assertTrue(promise2.await(1, SECONDS)); + assertTrue(promise2.isCancelled()); + } + + @Test + public void cascadeToCancelSecond() throws Exception { + TestEventExecutor executor = new TestEventExecutor(); + DefaultPromise promise = new DefaultPromise<>(executor); + DefaultPromise promise2 = new DefaultPromise<>(executor); + promise.cascadeTo(promise2); + + assertTrue(promise2.cancel(false)); + assertTrue(promise2.isCancelled()); + + // + assertTrue(promise.await(1, SECONDS)); + assertTrue(promise2.isCancelled()); + } } diff --git a/common/src/test/java/io/netty/util/concurrent/PromiseNotifierTest.java b/common/src/test/java/io/netty/util/concurrent/PromiseNotifierTest.java deleted file mode 100644 index 11824c0ac6..0000000000 --- a/common/src/test/java/io/netty/util/concurrent/PromiseNotifierTest.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright 2014 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package io.netty.util.concurrent; - -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertSame; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.*; - -public class PromiseNotifierTest { - - @Test - public void testNullPromisesArray() { - assertThrows(NullPointerException.class, () -> new PromiseNotifier<>((Promise[]) null)); - } - - @Test - public void testNullPromiseInArray() { - assertThrows(IllegalArgumentException.class, () -> new PromiseNotifier<>((Promise) null)); - } - - @Test - public void testListenerSuccess() throws Exception { - @SuppressWarnings("unchecked") - Promise p1 = mock(Promise.class); - @SuppressWarnings("unchecked") - Promise p2 = mock(Promise.class); - - PromiseNotifier notifier = new PromiseNotifier<>(p1, p2); - - @SuppressWarnings("unchecked") - Future future = mock(Future.class); - when(future.isSuccess()).thenReturn(true); - when(future.get()).thenReturn(null); - when(p1.trySuccess(null)).thenReturn(true); - when(p2.trySuccess(null)).thenReturn(true); - - notifier.operationComplete(future); - verify(p1).trySuccess(null); - verify(p2).trySuccess(null); - } - - @Test - public void testListenerFailure() throws Exception { - @SuppressWarnings("unchecked") - Promise p1 = mock(Promise.class); - @SuppressWarnings("unchecked") - Promise p2 = mock(Promise.class); - - PromiseNotifier notifier = new PromiseNotifier<>(p1, p2); - - @SuppressWarnings("unchecked") - Future future = mock(Future.class); - Throwable t = mock(Throwable.class); - when(future.isSuccess()).thenReturn(false); - when(future.isCancelled()).thenReturn(false); - when(future.cause()).thenReturn(t); - when(p1.tryFailure(t)).thenReturn(true); - when(p2.tryFailure(t)).thenReturn(true); - - notifier.operationComplete(future); - verify(p1).tryFailure(t); - verify(p2).tryFailure(t); - } - - @Test - public void testCancelPropagationWhenFusedFromFuture() { - Promise p1 = ImmediateEventExecutor.INSTANCE.newPromise(); - Promise p2 = ImmediateEventExecutor.INSTANCE.newPromise(); - - Future returned = PromiseNotifier.cascade(p1, p2); - assertSame(p1, returned); - - assertTrue(returned.cancel(false)); - assertTrue(returned.isCancelled()); - assertTrue(p2.isCancelled()); - } -} diff --git a/handler/src/main/java/io/netty/handler/address/ResolveAddressHandler.java b/handler/src/main/java/io/netty/handler/address/ResolveAddressHandler.java index bdd719e830..fc88c16683 100644 --- a/handler/src/main/java/io/netty/handler/address/ResolveAddressHandler.java +++ b/handler/src/main/java/io/netty/handler/address/ResolveAddressHandler.java @@ -23,7 +23,6 @@ import io.netty.resolver.AddressResolverGroup; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.PromiseNotifier; import io.netty.util.internal.ObjectUtil; import java.net.SocketAddress; @@ -53,7 +52,7 @@ public class ResolveAddressHandler implements ChannelHandler { if (cause != null) { promise.setFailure(cause); } else { - ctx.connect(future.getNow(), localAddress).addListener(new PromiseNotifier<>(promise)); + ctx.connect(future.getNow(), localAddress).cascadeTo(promise); } ctx.pipeline().remove(ResolveAddressHandler.this); }); diff --git a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java index 8f201e6a86..660f8a7887 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java +++ b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java @@ -40,7 +40,6 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.ImmediateExecutor; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.PromiseNotifier; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.UnstableApi; import io.netty.util.internal.logging.InternalLogger; @@ -793,12 +792,12 @@ public class SslHandler extends ByteToMessageDecoder { final ByteBuf b = out; out = null; if (promise != null) { - ctx.write(b).addListener(new PromiseNotifier<>(promise)); + ctx.write(b).cascadeTo(promise); } else { ctx.write(b); } } else if (promise != null) { - ctx.write(Unpooled.EMPTY_BUFFER).addListener(new PromiseNotifier<>(promise)); + ctx.write(Unpooled.EMPTY_BUFFER).cascadeTo(promise); } // else out is not readable we can re-use it and so save an extra allocation @@ -1885,7 +1884,7 @@ public class SslHandler extends ByteToMessageDecoder { // // See https://github.com/netty/netty/issues/5931 Promise cascade = ctx.newPromise(); - PromiseNotifier.cascade(false, cascade, promise); + cascade.cascadeTo(promise); safeClose(ctx, closeNotifyPromise, cascade); } else { /// We already handling the close_notify so just attach the promise to the sslClosePromise. @@ -1980,7 +1979,7 @@ public class SslHandler extends ByteToMessageDecoder { if (!oldHandshakePromise.isDone()) { // There's no need to handshake because handshake is in progress already. // Merge the new promise into the old one. - PromiseNotifier.cascade(oldHandshakePromise, newHandshakePromise); + oldHandshakePromise.cascadeTo(newHandshakePromise); } else { handshakePromise = newHandshakePromise; handshake(true); @@ -2072,7 +2071,7 @@ public class SslHandler extends ByteToMessageDecoder { final ChannelHandlerContext ctx, final Future flushFuture, final Promise promise) { if (!ctx.channel().isActive()) { - ctx.close().addListener(new PromiseNotifier<>(promise)); + ctx.close().cascadeTo(promise); return; } @@ -2150,7 +2149,7 @@ public class SslHandler extends ByteToMessageDecoder { // IllegalStateException. // Also we not want to log if the notification happens as this is expected in some cases. // See https://github.com/netty/netty/issues/5598 - PromiseNotifier.cascade(false, future, promise); + future.cascadeTo(promise); } /** diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java index 1f78dc680c..70494fa37b 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java @@ -24,7 +24,6 @@ import io.netty.channel.ChannelPipeline; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.PromiseNotifier; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -283,7 +282,7 @@ public class ChunkedWriteHandler implements ChannelHandler { requiresFlush = false; } else { queue.remove(); - ctx.write(pendingMessage).addListener(new PromiseNotifier<>(currentWrite.promise)); + ctx.write(pendingMessage).cascadeTo(currentWrite.promise); requiresFlush = true; } diff --git a/handler/src/main/java/io/netty/handler/traffic/ChannelTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/ChannelTrafficShapingHandler.java index 6704bf4bb9..5ebd59ad86 100644 --- a/handler/src/main/java/io/netty/handler/traffic/ChannelTrafficShapingHandler.java +++ b/handler/src/main/java/io/netty/handler/traffic/ChannelTrafficShapingHandler.java @@ -18,7 +18,6 @@ package io.netty.handler.traffic; import io.netty.buffer.ByteBufConvertible; import io.netty.channel.ChannelHandlerContext; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.PromiseNotifier; import java.util.ArrayDeque; import java.util.concurrent.TimeUnit; @@ -147,7 +146,7 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler long size = calculateSize(toSend.toSend); trafficCounter.bytesRealWriteFlowControl(size); queueSize -= size; - ctx.write(toSend.toSend).addListener(new PromiseNotifier<>(toSend.promise)); + ctx.write(toSend.toSend).cascadeTo(toSend.promise); } } else { for (ToSend toSend : messagesQueue) { @@ -184,7 +183,7 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler synchronized (this) { if (delay == 0 && messagesQueue.isEmpty()) { trafficCounter.bytesRealWriteFlowControl(size); - ctx.write(msg).addListener(new PromiseNotifier<>(promise)); + ctx.write(msg).cascadeTo(promise); return; } newToSend = new ToSend(delay + now, msg, promise); @@ -205,7 +204,7 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler long size = calculateSize(newToSend.toSend); trafficCounter.bytesRealWriteFlowControl(size); queueSize -= size; - ctx.write(newToSend.toSend).addListener(new PromiseNotifier<>(newToSend.promise)); + ctx.write(newToSend.toSend).cascadeTo(newToSend.promise); } else { messagesQueue.addFirst(newToSend); break; diff --git a/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficShapingHandler.java index b85ab82e53..b2126d8240 100644 --- a/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficShapingHandler.java +++ b/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficShapingHandler.java @@ -24,7 +24,6 @@ import io.netty.util.Attribute; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.PromiseNotifier; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -495,7 +494,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size); perChannel.queueSize -= size; queuesSize.addAndGet(-size); - ctx.write(toSend.toSend).addListener(new PromiseNotifier<>(toSend.promise)); + ctx.write(toSend.toSend).cascadeTo(toSend.promise); } } else { queuesSize.addAndGet(-perChannel.queueSize); @@ -714,7 +713,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa if (writedelay == 0 && perChannel.messagesQueue.isEmpty()) { trafficCounter.bytesRealWriteFlowControl(size); perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size); - ctx.write(msg).addListener(new PromiseNotifier<>(promise)); + ctx.write(msg).cascadeTo(promise); perChannel.lastWriteTimestamp = now; return; } @@ -749,7 +748,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size); perChannel.queueSize -= size; queuesSize.addAndGet(-size); - ctx.write(newToSend.toSend).addListener(new PromiseNotifier<>(newToSend.promise)); + ctx.write(newToSend.toSend).cascadeTo(newToSend.promise); perChannel.lastWriteTimestamp = now; } else { perChannel.messagesQueue.addFirst(newToSend); diff --git a/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java index 6a984db38a..89a58ac011 100644 --- a/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java +++ b/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java @@ -21,7 +21,6 @@ import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.PromiseNotifier; import java.util.ArrayDeque; import java.util.concurrent.ConcurrentHashMap; @@ -273,7 +272,7 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler { trafficCounter.bytesRealWriteFlowControl(size); perChannel.queueSize -= size; queuesSize.addAndGet(-size); - ctx.write(toSend.toSend).addListener(new PromiseNotifier<>(toSend.promise)); + ctx.write(toSend.toSend).cascadeTo(toSend.promise); } } else { queuesSize.addAndGet(-perChannel.queueSize); @@ -345,7 +344,7 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler { synchronized (perChannel) { if (writedelay == 0 && perChannel.messagesQueue.isEmpty()) { trafficCounter.bytesRealWriteFlowControl(size); - ctx.write(msg).addListener(new PromiseNotifier<>(promise)); + ctx.write(msg).cascadeTo(promise); perChannel.lastWriteTimestamp = now; return; } @@ -379,7 +378,7 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler { trafficCounter.bytesRealWriteFlowControl(size); perChannel.queueSize -= size; queuesSize.addAndGet(-size); - ctx.write(newToSend.toSend).addListener(new PromiseNotifier<>(newToSend.promise)); + ctx.write(newToSend.toSend).cascadeTo(newToSend.promise); perChannel.lastWriteTimestamp = now; } else { perChannel.messagesQueue.addFirst(newToSend); diff --git a/handler/src/test/java/io/netty/handler/ssl/ParameterizedSslHandlerTest.java b/handler/src/test/java/io/netty/handler/ssl/ParameterizedSslHandlerTest.java index fc88cb9c0b..73ccc896f4 100644 --- a/handler/src/test/java/io/netty/handler/ssl/ParameterizedSslHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/ssl/ParameterizedSslHandlerTest.java @@ -41,7 +41,6 @@ import io.netty.handler.ssl.util.SimpleTrustManagerFactory; import io.netty.util.CharsetUtil; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.PromiseNotifier; import io.netty.util.internal.EmptyArrays; import io.netty.util.internal.ResourcesUtil; import org.junit.jupiter.api.Timeout; @@ -430,7 +429,7 @@ public class ParameterizedSslHandlerTest { protected void initChannel(Channel ch) throws Exception { SslHandler handler = sslServerCtx.newHandler(ch.alloc()); handler.setCloseNotifyReadTimeoutMillis(closeNotifyReadTimeout); - PromiseNotifier.cascade(handler.sslCloseFuture(), serverPromise); + handler.sslCloseFuture().cascadeTo(serverPromise); handler.handshakeFuture().addListener(future -> { if (future.isFailed()) { @@ -466,7 +465,7 @@ public class ParameterizedSslHandlerTest { SslHandler handler = sslClientCtx.newHandler(ch.alloc()); handler.setCloseNotifyReadTimeoutMillis(closeNotifyReadTimeout); - PromiseNotifier.cascade(handler.sslCloseFuture(), clientPromise); + handler.sslCloseFuture().cascadeTo(clientPromise); handler.handshakeFuture().addListener(future -> { if (future.isSuccess()) { closeSent.compareAndSet(false, true); diff --git a/microbench/src/main/java/io/netty/handler/codec/http2/Http2FrameWriterDataBenchmark.java b/microbench/src/main/java/io/netty/handler/codec/http2/Http2FrameWriterDataBenchmark.java index 4355691fee..f0ffd2ca65 100644 --- a/microbench/src/main/java/io/netty/handler/codec/http2/Http2FrameWriterDataBenchmark.java +++ b/microbench/src/main/java/io/netty/handler/codec/http2/Http2FrameWriterDataBenchmark.java @@ -24,7 +24,6 @@ import io.netty.microbench.channel.EmbeddedChannelWriteReleaseHandlerContext; import io.netty.microbench.util.AbstractMicrobenchmark; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.PromiseNotifier; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; @@ -146,20 +145,19 @@ public class Http2FrameWriterDataBenchmark extends AbstractMicrobenchmark { // Only the last frame is not retained. Until then, the outer finally must release. ByteBuf frameHeader = header.slice(frameDataBytes, framePaddingBytes, lastFrame && endStream); needToReleaseHeaders = !lastFrame; - ctx.write(lastFrame ? frameHeader : frameHeader.retain()) - .addListener(new PromiseNotifier<>(promiseAggregator.newPromise())); + ctx.write(lastFrame ? frameHeader : frameHeader.retain()).cascadeTo(promiseAggregator.newPromise()); // Write the frame data. ByteBuf frameData = data.readSlice(frameDataBytes); // Only the last frame is not retained. Until then, the outer finally must release. needToReleaseData = !lastFrame; ctx.write(lastFrame ? frameData : frameData.retain()) - .addListener(new PromiseNotifier<>(promiseAggregator.newPromise())); + .cascadeTo(promiseAggregator.newPromise()); // Write the frame padding. if (paddingBytes(framePaddingBytes) > 0) { ctx.write(ZERO_BUFFER.slice(0, paddingBytes(framePaddingBytes))) - .addListener(new PromiseNotifier<>(promiseAggregator.newPromise())); + .cascadeTo(promiseAggregator.newPromise()); } } while (!lastFrame); } catch (Throwable t) { diff --git a/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java b/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java index 8959b8caf9..e229e48ff0 100644 --- a/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java @@ -26,7 +26,6 @@ import io.netty.util.AttributeKey; import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.PromiseNotifier; import io.netty.util.internal.SocketUtils; import io.netty.util.internal.StringUtil; import io.netty.util.internal.logging.InternalLogger; @@ -39,7 +38,6 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import static io.netty.util.concurrent.PromiseNotifier.cascade; import static java.util.Objects.requireNonNull; /** @@ -250,7 +248,7 @@ public abstract class AbstractBootstrap, C // At this point we know that the registration was complete and successful. Channel channel = regFuture.getNow(); Promise promise = channel.newPromise(); - cascade(true, promise, bindPromise, channel); + promise.map(v -> channel).cascadeTo(bindPromise); doBind0(regFuture, channel, localAddress, promise); } else { // Registration future is almost always fulfilled already, but just in case it's not. @@ -263,7 +261,7 @@ public abstract class AbstractBootstrap, C } else { Channel channel = future.getNow(); Promise promise = channel.newPromise(); - cascade(true, promise, bindPromise, channel); + promise.map(v -> channel).cascadeTo(bindPromise); doBind0(regFuture, channel, localAddress, promise); } }); @@ -319,7 +317,7 @@ public abstract class AbstractBootstrap, C // the pipeline in its channelRegistered() implementation. channel.executor().execute(() -> { if (regFuture.isSuccess()) { - PromiseNotifier.cascade(channel.bind(localAddress), promise) + channel.bind(localAddress).cascadeTo(promise) .addListener(channel, ChannelFutureListeners.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); diff --git a/transport/src/main/java/io/netty/bootstrap/Bootstrap.java b/transport/src/main/java/io/netty/bootstrap/Bootstrap.java index f825377d2b..f4e2442037 100644 --- a/transport/src/main/java/io/netty/bootstrap/Bootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/Bootstrap.java @@ -36,7 +36,6 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; -import static io.netty.util.concurrent.PromiseNotifier.cascade; import static java.util.Objects.requireNonNull; /** @@ -268,7 +267,7 @@ public class Bootstrap extends AbstractBootstrap channel).cascadeTo(promise); }); } diff --git a/transport/src/main/java/io/netty/channel/AbstractCoalescingBufferQueue.java b/transport/src/main/java/io/netty/channel/AbstractCoalescingBufferQueue.java index a9a5464312..abcb50930d 100644 --- a/transport/src/main/java/io/netty/channel/AbstractCoalescingBufferQueue.java +++ b/transport/src/main/java/io/netty/channel/AbstractCoalescingBufferQueue.java @@ -21,7 +21,6 @@ import io.netty.buffer.CompositeByteBuf; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.PromiseNotifier; import io.netty.util.internal.UnstableApi; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -59,7 +58,7 @@ public abstract class AbstractCoalescingBufferQueue { * @param promise to complete when all the bytes have been consumed and written, can be void. */ public final void addFirst(ByteBuf buf, Promise promise) { - addFirst(buf, new PromiseNotifier(promise)); + addFirst(buf, f -> f.cascadeTo(promise)); } private void addFirst(ByteBuf buf, FutureListener listener) { @@ -86,7 +85,7 @@ public abstract class AbstractCoalescingBufferQueue { public final void add(ByteBuf buf, Promise promise) { // buffers are added before promises so that we naturally 'consume' the entire buffer during removal // before we complete it's promise. - add(buf, new PromiseNotifier(promise)); + add(buf, f -> f.cascadeTo(promise)); } /** @@ -254,7 +253,7 @@ public abstract class AbstractCoalescingBufferQueue { previousBuf = ((ByteBufConvertible) entry).asByteBuf(); } else if (entry instanceof Promise) { decrementReadableBytes(previousBuf.readableBytes()); - ctx.write(previousBuf).addListener(new PromiseNotifier<>((Promise) entry)); + ctx.write(previousBuf).cascadeTo((Promise) entry); previousBuf = null; } else { decrementReadableBytes(previousBuf.readableBytes()); diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index a9de017d3e..36a8e4aad2 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -25,7 +25,6 @@ import io.netty.util.ResourceLeakHint; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.PromiseNotifier; import io.netty.util.internal.ObjectPool; import io.netty.util.internal.ThrowableUtil; import io.netty.util.internal.StringUtil; @@ -430,7 +429,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou } Promise promise = newPromise(); - safeExecute(executor, () -> PromiseNotifier.cascade(findAndInvokeBind(localAddress), promise), promise, null); + safeExecute(executor, () -> findAndInvokeBind(localAddress).cascadeTo(promise), promise, null); return promise; } @@ -446,7 +445,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou return findAndInvokeDeregister(); } Promise promise = newPromise(); - safeExecute(executor, () -> PromiseNotifier.cascade(findAndInvokeDeregister(), promise), promise, null); + safeExecute(executor, () -> findAndInvokeDeregister().cascadeTo(promise), promise, null); return promise; } private Future findAndInvokeBind(SocketAddress localAddress) { @@ -475,7 +474,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou } Promise promise = newPromise(); safeExecute(executor, () -> - PromiseNotifier.cascade(findAndInvokeConnect(remoteAddress, localAddress), promise), promise, null); + findAndInvokeConnect(remoteAddress, localAddress).cascadeTo(promise), promise, null); return promise; } @@ -509,8 +508,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou return findAndInvokeDisconnect(); } Promise promise = newPromise(); - safeExecute(executor, () -> - PromiseNotifier.cascade(findAndInvokeDisconnect(), promise), promise, null); + safeExecute(executor, () -> findAndInvokeDisconnect().cascadeTo(promise), promise, null); return promise; } @@ -537,8 +535,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou return findAndInvokeClose(); } Promise promise = newPromise(); - safeExecute(executor, () -> - PromiseNotifier.cascade(findAndInvokeClose(), promise), promise, null); + safeExecute(executor, () -> findAndInvokeClose().cascadeTo(promise), promise, null); return promise; } @@ -565,8 +562,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou return findAndInvokeRegister(); } Promise promise = newPromise(); - safeExecute(executor, () -> - PromiseNotifier.cascade(findAndInvokeRegister(), promise), promise, null); + safeExecute(executor, () -> findAndInvokeRegister().cascadeTo(promise), promise, null); return promise; } @@ -912,7 +908,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou DefaultChannelHandlerContext next = findContext(ctx); if (next == null) { ReferenceCountUtil.release(msg); - failRemoved(ctx).addListener(new PromiseNotifier<>(promise)); + failRemoved(ctx).cascadeTo(promise); return; } write(next, msg, promise); @@ -944,7 +940,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou } protected void write(DefaultChannelHandlerContext ctx, Object msg, Promise promise) { - PromiseNotifier.cascade(ctx.invokeWrite(msg), promise); + ctx.invokeWrite(msg).cascadeTo(promise); } } diff --git a/transport/src/main/java/io/netty/channel/PendingWriteQueue.java b/transport/src/main/java/io/netty/channel/PendingWriteQueue.java index 1b7b67f872..b8a46caa9a 100644 --- a/transport/src/main/java/io/netty/channel/PendingWriteQueue.java +++ b/transport/src/main/java/io/netty/channel/PendingWriteQueue.java @@ -19,7 +19,6 @@ import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.PromiseCombiner; -import io.netty.util.concurrent.PromiseNotifier; import io.netty.util.internal.ObjectPool; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; @@ -144,7 +143,7 @@ public final class PendingWriteQueue { Object msg = write.msg; Promise promise = write.promise; recycle(write, false); - PromiseNotifier.cascade(ctx.write(msg), promise); + ctx.write(msg).cascadeTo(promise); write = next; } } @@ -221,7 +220,7 @@ public final class PendingWriteQueue { recycle(write, true); Future future = ctx.write(msg); - PromiseNotifier.cascade(future, promise); + future.cascadeTo(promise); return future; } diff --git a/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java b/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java index f7ff8ecf96..934ab5a29c 100644 --- a/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java +++ b/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java @@ -37,7 +37,6 @@ import io.netty.channel.ChannelPipeline; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.PromiseNotifier; import io.netty.util.concurrent.ScheduledFuture; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -345,7 +344,7 @@ public class EmbeddedChannelTest { @Override public Future write(final ChannelHandlerContext ctx, final Object msg) { Promise promise = ctx.newPromise(); - ctx.executor().execute(() -> ctx.write(msg).addListener(new PromiseNotifier<>(promise))); + ctx.executor().execute(() -> ctx.write(msg).cascadeTo(promise)); return promise; } }); @@ -365,7 +364,7 @@ public class EmbeddedChannelTest { public Future write(final ChannelHandlerContext ctx, final Object msg) { Promise promise = ctx.newPromise(); ctx.executor().schedule(() -> { - ctx.writeAndFlush(msg).addListener(new PromiseNotifier<>(promise)); + ctx.writeAndFlush(msg).cascadeTo(promise); }, delay, TimeUnit.MILLISECONDS); return promise; }