Add cascadeTo methods to Future (#11623)

Motivation:

The need of cascade from a Future to a Promise exists. We should add some default implementation for it.

Modifications:

- Merge PromiseNotifier into Futures
- Add default cascadeTo(...) methods to Future
- Add tests to FuturesTest
- Replace usage of PromiseNotifier with Future.cascadeTo
- Use combination of map(...) and cascadeTo(...) in *Bootstrap to reduce code duplication

Result:

Provide default implementation of cascadeTo.
This commit is contained in:
Norman Maurer 2021-08-29 15:44:34 +02:00 committed by GitHub
parent 3fc44b0a8e
commit e04f48d802
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 190 additions and 396 deletions

View File

@ -21,7 +21,6 @@ import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseNotifier;
import io.netty.util.concurrent.ScheduledFuture;
import java.nio.channels.ClosedChannelException;
@ -92,7 +91,7 @@ abstract class WebSocketProtocolHandler extends MessageToMessageDecoder<WebSocke
flush(ctx);
applyCloseSentTimeout(ctx);
Promise<Void> promise = ctx.newPromise();
future.addListener(f -> ctx.close().addListener(new PromiseNotifier<>(promise)));
future.addListener(f -> ctx.close().cascadeTo(promise));
return promise;
}
@ -105,7 +104,7 @@ abstract class WebSocketProtocolHandler extends MessageToMessageDecoder<WebSocke
if (msg instanceof CloseWebSocketFrame) {
Promise<Void> promise = ctx.newPromise();
closeSent(promise);
ctx.write(msg).addListener(new PromiseNotifier<>(false, closeSent));
ctx.write(msg).cascadeTo(closeSent);
return promise;
}
return ctx.write(msg);

View File

@ -27,7 +27,6 @@ import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseNotifier;
import java.util.Objects;
@ -240,7 +239,7 @@ public class WebSocketServerProtocolHandler extends WebSocketProtocolHandler {
frame.retain();
Promise<Void> promise = ctx.newPromise();
closeSent(promise);
handshaker.close(ctx, (CloseWebSocketFrame) frame).addListener(new PromiseNotifier<>(promise));
handshaker.close(ctx, (CloseWebSocketFrame) frame).cascadeTo(promise);
} else {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ctx.channel(), ChannelFutureListeners.CLOSE);
}

View File

@ -26,7 +26,6 @@ import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseNotifier;
import org.junit.jupiter.api.Test;
import java.util.Collection;
@ -101,7 +100,7 @@ public class HttpServerUpgradeHandlerTest {
assertTrue(inReadCall);
writeUpgradeMessage = true;
Promise<Void> promise = ctx.newPromise();
ctx.channel().executor().execute(() -> ctx.write(msg).addListener(new PromiseNotifier<>(promise)));
ctx.channel().executor().execute(() -> ctx.write(msg).cascadeTo(promise));
promise.addListener(f -> writeFlushed = true);
return promise;
}

View File

@ -23,7 +23,6 @@ import io.netty.handler.codec.http2.Http2CodecUtil.SimpleChannelPromiseAggregato
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseNotifier;
import io.netty.util.internal.UnstableApi;
import java.util.ArrayDeque;
@ -486,7 +485,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder, Ht
// corresponding to 0 bytes and writing it to the channel (to preserve notification order).
Promise<Void> writePromise = ctx.newPromise();
writePromise.addListener(this);
ctx.write(queue.remove(0, writePromise)).addListener(new PromiseNotifier<>(writePromise));
ctx.write(queue.remove(0, writePromise)).cascadeTo(writePromise);
}
return;
}

View File

@ -22,7 +22,6 @@ import io.netty.handler.codec.http2.Http2FrameWriter.Configuration;
import io.netty.handler.codec.http2.Http2HeadersEncoder.SensitivityDetector;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseNotifier;
import io.netty.util.internal.UnstableApi;
import static io.netty.buffer.Unpooled.directBuffer;
@ -152,12 +151,10 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize
writeFrameHeaderInternal(frameHeader, maxFrameSize, DATA, flags, streamId);
do {
// Write the header.
ctx.write(frameHeader.retainedSlice()).addListener(
new PromiseNotifier<>(promiseAggregator.newPromise()));
ctx.write(frameHeader.retainedSlice()).cascadeTo(promiseAggregator.newPromise());
// Write the payload.
ctx.write(data.readRetainedSlice(maxFrameSize)).addListener(
new PromiseNotifier<>(promiseAggregator.newPromise()));
ctx.write(data.readRetainedSlice(maxFrameSize)).cascadeTo(promiseAggregator.newPromise());
remainingData -= maxFrameSize;
// Stop iterating if remainingData == maxFrameSize so we can take care of reference counts below.
@ -173,12 +170,12 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize
ByteBuf frameHeader2 = ctx.alloc().buffer(FRAME_HEADER_LENGTH);
flags.endOfStream(endStream);
writeFrameHeaderInternal(frameHeader2, remainingData, DATA, flags, streamId);
ctx.write(frameHeader2).addListener(new PromiseNotifier<>(promiseAggregator.newPromise()));
ctx.write(frameHeader2).cascadeTo(promiseAggregator.newPromise());
// Write the payload.
ByteBuf lastFrame = data.readSlice(remainingData);
data = null;
ctx.write(lastFrame).addListener(new PromiseNotifier<>(promiseAggregator.newPromise()));
ctx.write(lastFrame).cascadeTo(promiseAggregator.newPromise());
} else {
if (remainingData != maxFrameSize) {
if (frameHeader != null) {
@ -196,12 +193,12 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize
lastFrame = frameHeader.slice();
frameHeader = null;
}
ctx.write(lastFrame).addListener(new PromiseNotifier<>(promiseAggregator.newPromise()));
ctx.write(lastFrame).cascadeTo(promiseAggregator.newPromise());
// Write the payload.
lastFrame = data.readableBytes() != maxFrameSize ? data.readSlice(maxFrameSize) : data;
data = null;
ctx.write(lastFrame).addListener(new PromiseNotifier<>(promiseAggregator.newPromise()));
ctx.write(lastFrame).cascadeTo(promiseAggregator.newPromise());
}
do {
@ -218,23 +215,23 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize
flags.paddingPresent(framePaddingBytes > 0);
writeFrameHeaderInternal(frameHeader2, framePaddingBytes + frameDataBytes, DATA, flags, streamId);
writePaddingLength(frameHeader2, framePaddingBytes);
ctx.write(frameHeader2).addListener(new PromiseNotifier<>(promiseAggregator.newPromise()));
ctx.write(frameHeader2).cascadeTo(promiseAggregator.newPromise());
// Write the payload.
if (frameDataBytes != 0 && data != null) { // Make sure Data is not null
if (remainingData == 0) {
ByteBuf lastFrame = data.readSlice(frameDataBytes);
data = null;
ctx.write(lastFrame).addListener(new PromiseNotifier<>(promiseAggregator.newPromise()));
ctx.write(lastFrame).cascadeTo(promiseAggregator.newPromise());
} else {
ctx.write(data.readRetainedSlice(frameDataBytes))
.addListener(new PromiseNotifier<>(promiseAggregator.newPromise()));
.cascadeTo(promiseAggregator.newPromise());
}
}
// Write the frame padding.
if (paddingBytes(framePaddingBytes) > 0) {
ctx.write(ZERO_BUFFER.slice(0, paddingBytes(framePaddingBytes)))
.addListener(new PromiseNotifier<>(promiseAggregator.newPromise()));
.cascadeTo(promiseAggregator.newPromise());
}
} while (remainingData != 0 || padding != 0);
}
@ -285,7 +282,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize
buf.writeInt(exclusive ? (int) (0x80000000L | streamDependency) : streamDependency);
// Adjust the weight so that it fits into a single byte on the wire.
buf.writeByte(weight - 1);
return ctx.write(buf).addListener(new PromiseNotifier<>(promise));
return ctx.write(buf).cascadeTo(promise);
} catch (Throwable t) {
return promise.setFailure(t);
}
@ -301,7 +298,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize
ByteBuf buf = ctx.alloc().buffer(RST_STREAM_FRAME_LENGTH);
writeFrameHeaderInternal(buf, INT_FIELD_LENGTH, RST_STREAM, new Http2Flags(), streamId);
buf.writeInt((int) errorCode);
return ctx.write(buf).addListener(new PromiseNotifier<>(promise));
return ctx.write(buf).cascadeTo(promise);
} catch (Throwable t) {
return promise.setFailure(t);
}
@ -319,7 +316,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize
buf.writeChar(entry.key());
buf.writeInt(entry.value().intValue());
}
return ctx.write(buf).addListener(new PromiseNotifier<>(promise));
return ctx.write(buf).cascadeTo(promise);
} catch (Throwable t) {
return promise.setFailure(t);
}
@ -330,7 +327,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize
try {
ByteBuf buf = ctx.alloc().buffer(FRAME_HEADER_LENGTH);
writeFrameHeaderInternal(buf, 0, SETTINGS, new Http2Flags().ack(true), 0);
return ctx.write(buf).addListener(new PromiseNotifier<>(promise));
return ctx.write(buf).cascadeTo(promise);
} catch (Throwable t) {
return promise.setFailure(t);
}
@ -344,7 +341,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize
// in the catch block.
writeFrameHeaderInternal(buf, PING_FRAME_PAYLOAD_LENGTH, PING, flags, 0);
buf.writeLong(data);
return ctx.write(buf).addListener(new PromiseNotifier<>(promise));
return ctx.write(buf).cascadeTo(promise);
}
@Override
@ -377,15 +374,14 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize
// Write out the promised stream ID.
buf.writeInt(promisedStreamId);
ctx.write(buf).addListener(new PromiseNotifier<>(promiseAggregator.newPromise()));
ctx.write(buf).cascadeTo(promiseAggregator.newPromise());
// Write the first fragment.
ctx.write(fragment).addListener(new PromiseNotifier<>(promiseAggregator.newPromise()));
ctx.write(fragment).cascadeTo(promiseAggregator.newPromise());
// Write out the padding, if any.
if (paddingBytes(padding) > 0) {
ctx.write(ZERO_BUFFER.slice(0, paddingBytes(padding)))
.addListener(new PromiseNotifier<>(promiseAggregator.newPromise()));
ctx.write(ZERO_BUFFER.slice(0, paddingBytes(padding))).cascadeTo(promiseAggregator.newPromise());
}
if (!flags.endOfHeaders()) {
@ -420,7 +416,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize
writeFrameHeaderInternal(buf, payloadLength, GO_AWAY, new Http2Flags(), 0);
buf.writeInt(lastStreamId);
buf.writeInt((int) errorCode);
ctx.write(buf).addListener(new PromiseNotifier<>(promiseAggregator.newPromise()));
ctx.write(buf).cascadeTo(promiseAggregator.newPromise());
} catch (Throwable t) {
try {
debugData.release();
@ -432,7 +428,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize
}
try {
ctx.write(debugData).addListener(new PromiseNotifier<>(promiseAggregator.newPromise()));
ctx.write(debugData).cascadeTo(promiseAggregator.newPromise());
} catch (Throwable t) {
promiseAggregator.setFailure(t);
}
@ -449,7 +445,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize
ByteBuf buf = ctx.alloc().buffer(WINDOW_UPDATE_FRAME_LENGTH);
writeFrameHeaderInternal(buf, INT_FIELD_LENGTH, WINDOW_UPDATE, new Http2Flags(), streamId);
buf.writeInt(windowSizeIncrement);
return ctx.write(buf).addListener(new PromiseNotifier<>(promise));
return ctx.write(buf).cascadeTo(promise);
} catch (Throwable t) {
return promise.setFailure(t);
}
@ -465,7 +461,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize
// Assume nothing below will throw until buf is written. That way we don't have to take care of ownership
// in the catch block.
writeFrameHeaderInternal(buf, payload.readableBytes(), frameType, flags, streamId);
ctx.write(buf).addListener(new PromiseNotifier<>(promiseAggregator.newPromise()));
ctx.write(buf).cascadeTo(promiseAggregator.newPromise());
} catch (Throwable t) {
try {
payload.release();
@ -476,7 +472,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize
return promiseAggregator;
}
try {
ctx.write(payload).addListener(new PromiseNotifier<>(promiseAggregator.newPromise()));
ctx.write(payload).cascadeTo(promiseAggregator.newPromise());
} catch (Throwable t) {
promiseAggregator.setFailure(t);
}
@ -522,15 +518,15 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize
// Adjust the weight so that it fits into a single byte on the wire.
buf.writeByte(weight - 1);
}
ctx.write(buf).addListener(new PromiseNotifier<>(promiseAggregator.newPromise()));
ctx.write(buf).cascadeTo(promiseAggregator.newPromise());
// Write the first fragment.
ctx.write(fragment).addListener(new PromiseNotifier<>(promiseAggregator.newPromise()));
ctx.write(fragment).cascadeTo(promiseAggregator.newPromise());
// Write out the padding, if any.
if (paddingBytes(padding) > 0) {
ctx.write(ZERO_BUFFER.slice(0, paddingBytes(padding)))
.addListener(new PromiseNotifier<>(promiseAggregator.newPromise()));
.cascadeTo(promiseAggregator.newPromise());
}
if (!flags.endOfHeaders()) {
@ -568,17 +564,17 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize
ByteBuf fragment = headerBlock.readRetainedSlice(fragmentReadableBytes);
if (headerBlock.isReadable()) {
ctx.write(buf.retain()).addListener(new PromiseNotifier<>(promiseAggregator.newPromise()));
ctx.write(buf.retain()).cascadeTo(promiseAggregator.newPromise());
} else {
// The frame header is different for the last frame, so re-allocate and release the old buffer
flags = flags.endOfHeaders(true);
buf.release();
buf = ctx.alloc().buffer(CONTINUATION_FRAME_HEADER_LENGTH);
writeFrameHeaderInternal(buf, fragmentReadableBytes, CONTINUATION, flags, streamId);
ctx.write(buf).addListener(new PromiseNotifier<>(promiseAggregator.newPromise()));
ctx.write(buf).cascadeTo(promiseAggregator.newPromise());
}
ctx.write(fragment).addListener(new PromiseNotifier<>(promiseAggregator.newPromise()));
ctx.write(fragment).cascadeTo(promiseAggregator.newPromise());
} while (headerBlock.isReadable());
}

View File

@ -27,7 +27,6 @@ import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseNotifier;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger;
@ -925,7 +924,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
if (promise == null) {
ctx.close();
} else {
ctx.close().addListener(new PromiseNotifier<>(promise));
ctx.close().cascadeTo(promise);
}
} else if (promise != null) {
promise.setSuccess(null);

View File

@ -31,7 +31,6 @@ import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseNotifier;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogLevel;
import io.netty.util.internal.logging.InternalLogger;
@ -340,7 +339,7 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
encoder().writeFrame(ctx, unknownFrame.frameType(), unknownFrame.stream().id(),
unknownFrame.flags(), unknownFrame.content(), promise);
} else if (!(msg instanceof Http2Frame)) {
ctx.write(msg).addListener(new PromiseNotifier<>(promise));
ctx.write(msg).cascadeTo(promise);
} else {
ReferenceCountUtil.release(msg);
promise.setFailure(new UnsupportedMessageTypeException(msg));

View File

@ -29,7 +29,6 @@ import io.netty.util.AsciiString;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseNotifier;
import org.hamcrest.CoreMatchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@ -739,7 +738,7 @@ public abstract class Http2MultiplexTest<C extends Http2FrameCodec> {
channelOpen.set(channel.isOpen());
channelActive.set(channel.isActive());
});
childChannel.close().addListener(new PromiseNotifier<>(p)).syncUninterruptibly();
childChannel.close().cascadeTo(p).syncUninterruptibly();
assertFalse(channelOpen.get());
assertFalse(channelActive.get());

View File

@ -24,7 +24,6 @@ import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseCombiner;
import io.netty.util.concurrent.PromiseNotifier;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.TypeParameterMatcher;
@ -102,7 +101,7 @@ public abstract class MessageToMessageEncoder<I> extends ChannelHandlerAdapter {
} finally {
final int sizeMinusOne = out.size() - 1;
if (sizeMinusOne == 0) {
PromiseNotifier.cascade(ctx.write(out.getUnsafe(0)), promise);
ctx.write(out.getUnsafe(0)).cascadeTo(promise);
} else {
writePromiseCombiner(ctx, out, promise);
}

View File

@ -22,7 +22,6 @@ import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseNotifier;
import java.util.concurrent.TimeUnit;
@ -182,7 +181,7 @@ public class Bzip2Encoder extends MessageToByteEncoder<ByteBuf> {
Promise<Void> promise = ctx.newPromise();
executor.execute(() -> {
Future<Void> f = finishEncode(ctx());
PromiseNotifier.cascade(f, promise);
f.cascadeTo(promise);
});
return promise;
}
@ -195,11 +194,10 @@ public class Bzip2Encoder extends MessageToByteEncoder<ByteBuf> {
return ctx.close();
}
Promise<Void> promise = ctx.newPromise();
f.addListener(f1 -> ctx.close().addListener(new PromiseNotifier<>(false, promise)));
f.addListener(f1 -> ctx.close().cascadeTo(promise));
// Ensure the channel is closed even if the write operation completes in time.
ctx.executor().schedule(() -> {
ctx.close().addListener(new PromiseNotifier<>(false, promise));
}, 10, TimeUnit.SECONDS); // FIXME: Magic number
ctx.executor().schedule(() -> ctx.close().cascadeTo(promise),
10, TimeUnit.SECONDS); // FIXME: Magic number
return promise;
}

View File

@ -20,7 +20,6 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseNotifier;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
@ -164,7 +163,7 @@ public class JdkZlibEncoder extends ZlibEncoder {
Promise<Void> p = ctx.newPromise();
executor.execute(() -> {
Future<Void> f = finishEncode(ctx());
PromiseNotifier.cascade(f, p);
f.cascadeTo(p);
});
return p;
}
@ -262,11 +261,10 @@ public class JdkZlibEncoder extends ZlibEncoder {
return ctx.close();
}
Promise<Void> promise = ctx.newPromise();
f.addListener(f1 -> ctx.close().addListener(new PromiseNotifier<>(false, promise)));
f.addListener(f1 -> ctx.close().cascadeTo(promise));
// Ensure the channel is closed even if the write operation completes in time.
ctx.executor().schedule(() -> {
ctx.close().addListener(new PromiseNotifier<>(false, promise));
}, 10, TimeUnit.SECONDS); // FIXME: Magic number
ctx.executor().schedule(() -> ctx.close().cascadeTo(promise),
10, TimeUnit.SECONDS); // FIXME: Magic number
return promise;
}

View File

@ -25,7 +25,6 @@ import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseNotifier;
import io.netty.util.internal.ObjectUtil;
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Exception;
@ -348,7 +347,7 @@ public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
Promise<Void> promise = ctx.newPromise();
executor.execute(() -> {
Future<Void> f = finishEncode(ctx());
PromiseNotifier.cascade(f, promise);
f.cascadeTo(promise);
});
return promise;
}
@ -361,12 +360,10 @@ public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
return ctx.close();
}
Promise<Void> promise = ctx.newPromise();
f.addListener(f1 ->
ctx.close().addListener(new PromiseNotifier<>(false, promise)));
f.addListener(f1 -> ctx.close().cascadeTo(promise));
// Ensure the channel is closed even if the write operation completes in time.
ctx.executor().schedule(() -> {
ctx.close().addListener(new PromiseNotifier<>(false, promise));
}, 10, TimeUnit.SECONDS); // FIXME: Magic number
ctx.executor().schedule(() -> ctx.close().cascadeTo(promise),
10, TimeUnit.SECONDS); // FIXME: Magic number
return promise;
}

View File

@ -360,4 +360,17 @@ public interface Future<V> extends java.util.concurrent.Future<V> {
default <R> Future<R> flatMap(Function<V, Future<R>> mapper) {
return Futures.flatMap(this, mapper);
}
/**
* Link the {@link Future} and {@link Promise} such that if the {@link Future} completes the {@link Promise}
* will be notified. Cancellation is propagated both ways such that if the {@link Future} is cancelled
* the {@link Promise} is cancelled and vice-versa.
*
* @param promise the {@link Promise} which will be notified
* @return itself
*/
default Future<V> cascadeTo(final Promise<? super V> promise) {
Futures.cascade(this, promise);
return this;
}
}

View File

@ -33,7 +33,7 @@ import static java.util.Objects.requireNonNull;
*/
final class Futures {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(Futures.class);
private static final PassThrough<?> PASS_THROUGH = new PassThrough<Object>();
private static final PassThrough<?> PASS_THROUGH = new PassThrough<>();
private static final PropagateCancel PROPAGATE_CANCEL = new PropagateCancel();
/**
@ -123,7 +123,7 @@ final class Futures {
recipient.cancel(false);
} else {
Throwable cause = completed.cause();
tryFailure(recipient, cause, logger);
recipient.tryFailure(cause);
}
}
@ -229,4 +229,25 @@ final class Futures {
}
}
}
/**
* Link the {@link Future} and {@link Promise} such that if the {@link Future} completes the {@link Promise}
* will be notified. Cancellation is propagated both ways such that if the {@link Future} is cancelled
* the {@link Promise} is cancelled and vice-versa.
*
* @param future the {@link Future} which will be used to listen to for notifying the {@link Promise}.
* @param promise the {@link Promise} which will be notified
* @param <V> the type of the value.
*/
static <V> void cascade(final Future<V> future, final Promise<? super V> promise) {
requireNonNull(future, "future");
requireNonNull(promise, "promise");
if (!future.isSuccess()) {
// Propagate cancellation if future is either incomplete or failed.
// Failed means it could be cancelled, so that needs to be propagated.
promise.addListener(future, propagateCancel());
}
future.addListener(promise, passThrough());
}
}

View File

@ -1,171 +0,0 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import io.netty.util.internal.PromiseNotificationUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import static io.netty.util.internal.ObjectUtil.checkNotNullWithIAE;
import static java.util.Objects.requireNonNull;
/**
* A {@link FutureListener} implementation which takes other {@link Promise}s
* and notifies them on completion.
*
* @param <V> the type of value returned by the future
*/
public class PromiseNotifier<V> implements FutureListener<V> {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PromiseNotifier.class);
private final Promise<? super V>[] promises;
private final boolean logNotifyFailure;
/**
* Create a new instance.
*
* @param promises the {@link Promise}s to notify once this {@link FutureListener} is notified.
*/
@SafeVarargs
public PromiseNotifier(Promise<? super V>... promises) {
this(true, promises);
}
/**
* Create a new instance.
*
* @param logNotifyFailure {@code true} if logging should be done in case notification fails.
* @param promises the {@link Promise}s to notify once this {@link FutureListener} is notified.
*/
@SafeVarargs
public PromiseNotifier(boolean logNotifyFailure, Promise<? super V>... promises) {
requireNonNull(promises, "promises");
for (Promise<? super V> promise: promises) {
checkNotNullWithIAE(promise, "promise");
}
this.promises = promises.clone();
this.logNotifyFailure = logNotifyFailure;
}
/**
* Link the {@link Future} and {@link Promise} such that if the {@link Future} completes the {@link Promise}
* will be notified. Cancellation is propagated both ways such that if the {@link Future} is cancelled
* the {@link Promise} is cancelled and vice-versa.
*
* @param future the {@link Future} which will be used to listen to for notifying the {@link Promise}.
* @param promise the {@link Promise} which will be notified
* @param <V> the type of the value.
* @return the passed in {@link Future}
*/
public static <V> Future<V> cascade(final Future<V> future, final Promise<? super V> promise) {
return cascade(true, future, promise);
}
/**
* Link the {@link Future} and {@link Promise} such that if the {@link Future} completes the {@link Promise}
* will be notified. Cancellation is propagated both ways such that if the {@link Future} is cancelled
* the {@link Promise} is cancelled and vice-versa.
*
* @param logNotifyFailure {@code true} if logging should be done in case notification fails.
* @param future the {@link Future} which will be used to listen to for notifying the {@link Promise}.
* @param promise the {@link Promise} which will be notified
* @param <V> the type of the value.
* @return the passed in {@link Future}
*/
public static <V> Future<V> cascade(boolean logNotifyFailure, final Future<V> future,
final Promise<? super V> promise) {
promise.addListener(future, PromiseNotifier::propagateCancel);
future.addListener(new PromiseNotifier<V>(logNotifyFailure, promise), PromiseNotifier::propagateComplete);
return future;
}
/**
* Link the {@link Future} and {@link Promise} such that if the {@link Future} completes the {@link Promise} will be
* notified with the given result.
* Cancellation is propagated both ways such that if the {@link Future} is cancelled the {@link Promise}
* is cancelled and vice-versa.
*
* @param logNotifyFailure {@code true} if logging should be done in case notification fails.
* @param future the {@link Future} which will be used to listen to for notifying the {@link Promise}.
* @param promise the {@link Promise} which will be notified
* @param successResult the result that will be propagated to the promise on success
* @return the passed in {@link Future}
*/
public static <R> Future<Void> cascade(boolean logNotifyFailure, Future<Void> future,
Promise<R> promise, R successResult) {
promise.addListener(future, PromiseNotifier::propagateCancel);
future.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<? extends Void> f) throws Exception {
if (promise.isCancelled() && f.isCancelled()) {
// Just return if we propagate a cancel from the promise to the future and both are notified already
return;
}
if (f.isSuccess()) {
promise.setSuccess(successResult);
} else if (f.isCancelled()) {
InternalLogger internalLogger = null;
if (logNotifyFailure) {
internalLogger = InternalLoggerFactory.getInstance(PromiseNotifier.class);
}
PromiseNotificationUtil.tryCancel(promise, internalLogger);
} else {
Throwable cause = future.cause();
promise.tryFailure(cause);
}
}
});
return future;
}
static <V, F extends Future<?>> void propagateCancel(F target, Future<? extends V> source) {
if (source.isCancelled()) {
target.cancel(false);
}
}
static <V> void propagateComplete(PromiseNotifier<V> target, Future<? extends V> source) throws Exception {
boolean allCancelled = target.promises.length > 0;
for (Promise<? super V> promise : target.promises) {
allCancelled &= promise.isCancelled();
}
if (allCancelled && source.isCancelled()) {
// Just return if we propagate a cancel from the promise to the future and both are notified already
return;
}
target.operationComplete(source);
}
@Override
public void operationComplete(Future<? extends V> future) throws Exception {
InternalLogger internalLogger = logNotifyFailure ? logger : null;
if (future.isSuccess()) {
V result = future.get();
for (Promise<? super V> p: promises) {
PromiseNotificationUtil.trySuccess(p, result, internalLogger);
}
} else if (future.isCancelled()) {
for (Promise<? super V> p: promises) {
PromiseNotificationUtil.tryCancel(p, internalLogger);
}
} else {
Throwable cause = future.cause();
for (Promise<? super V> p: promises) {
PromiseNotificationUtil.tryFailure(p, cause, internalLogger);
}
}
}
}

View File

@ -25,6 +25,8 @@ import static io.netty.util.concurrent.ImmediateEventExecutor.INSTANCE;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
class FuturesTest {
@ -212,4 +214,65 @@ class FuturesTest {
mappingLatchExit.countDown();
assertThat(strFut.get(5, SECONDS)).isEqualTo("42");
}
@Test
public void cascadeToNullPromise() {
TestEventExecutor executor = new TestEventExecutor();
DefaultPromise<Void> promise = new DefaultPromise<>(executor);
assertThrows(NullPointerException.class, () -> promise.cascadeTo(null));
}
@Test
public void cascadeToSuccess() throws Exception {
TestEventExecutor executor = new TestEventExecutor();
DefaultPromise<Integer> promise = new DefaultPromise<>(executor);
DefaultPromise<Integer> promise2 = new DefaultPromise<>(executor);
promise.cascadeTo(promise2);
promise.setSuccess(1);
assertTrue(promise.isSuccess());
assertThat(promise2.get(1, SECONDS)).isEqualTo(1);
}
@Test
public void cascadeToFailure() throws Exception {
TestEventExecutor executor = new TestEventExecutor();
DefaultPromise<Integer> promise = new DefaultPromise<>(executor);
DefaultPromise<Integer> promise2 = new DefaultPromise<>(executor);
promise.cascadeTo(promise2);
Exception ex = new Exception();
promise.setFailure(ex);
assertTrue(promise.isFailed());
assertTrue(promise2.await(1, SECONDS));
assertTrue(promise2.isFailed());
assertSame(promise.cause(), promise2.cause());
}
@Test
public void cascadeToCancel() throws Exception {
TestEventExecutor executor = new TestEventExecutor();
DefaultPromise<Integer> promise = new DefaultPromise<>(executor);
DefaultPromise<Integer> promise2 = new DefaultPromise<>(executor);
promise.cascadeTo(promise2);
assertTrue(promise.cancel(false));
assertTrue(promise.isCancelled());
assertTrue(promise2.await(1, SECONDS));
assertTrue(promise2.isCancelled());
}
@Test
public void cascadeToCancelSecond() throws Exception {
TestEventExecutor executor = new TestEventExecutor();
DefaultPromise<Integer> promise = new DefaultPromise<>(executor);
DefaultPromise<Integer> promise2 = new DefaultPromise<>(executor);
promise.cascadeTo(promise2);
assertTrue(promise2.cancel(false));
assertTrue(promise2.isCancelled());
//
assertTrue(promise.await(1, SECONDS));
assertTrue(promise2.isCancelled());
}
}

View File

@ -1,94 +0,0 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.*;
public class PromiseNotifierTest {
@Test
public void testNullPromisesArray() {
assertThrows(NullPointerException.class, () -> new PromiseNotifier<>((Promise<Void>[]) null));
}
@Test
public void testNullPromiseInArray() {
assertThrows(IllegalArgumentException.class, () -> new PromiseNotifier<>((Promise<Void>) null));
}
@Test
public void testListenerSuccess() throws Exception {
@SuppressWarnings("unchecked")
Promise<Void> p1 = mock(Promise.class);
@SuppressWarnings("unchecked")
Promise<Void> p2 = mock(Promise.class);
PromiseNotifier<Void> notifier = new PromiseNotifier<>(p1, p2);
@SuppressWarnings("unchecked")
Future<Void> future = mock(Future.class);
when(future.isSuccess()).thenReturn(true);
when(future.get()).thenReturn(null);
when(p1.trySuccess(null)).thenReturn(true);
when(p2.trySuccess(null)).thenReturn(true);
notifier.operationComplete(future);
verify(p1).trySuccess(null);
verify(p2).trySuccess(null);
}
@Test
public void testListenerFailure() throws Exception {
@SuppressWarnings("unchecked")
Promise<Void> p1 = mock(Promise.class);
@SuppressWarnings("unchecked")
Promise<Void> p2 = mock(Promise.class);
PromiseNotifier<Void> notifier = new PromiseNotifier<>(p1, p2);
@SuppressWarnings("unchecked")
Future<Void> future = mock(Future.class);
Throwable t = mock(Throwable.class);
when(future.isSuccess()).thenReturn(false);
when(future.isCancelled()).thenReturn(false);
when(future.cause()).thenReturn(t);
when(p1.tryFailure(t)).thenReturn(true);
when(p2.tryFailure(t)).thenReturn(true);
notifier.operationComplete(future);
verify(p1).tryFailure(t);
verify(p2).tryFailure(t);
}
@Test
public void testCancelPropagationWhenFusedFromFuture() {
Promise<Void> p1 = ImmediateEventExecutor.INSTANCE.newPromise();
Promise<Void> p2 = ImmediateEventExecutor.INSTANCE.newPromise();
Future<Void> returned = PromiseNotifier.cascade(p1, p2);
assertSame(p1, returned);
assertTrue(returned.cancel(false));
assertTrue(returned.isCancelled());
assertTrue(p2.isCancelled());
}
}

View File

@ -23,7 +23,6 @@ import io.netty.resolver.AddressResolverGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseNotifier;
import io.netty.util.internal.ObjectUtil;
import java.net.SocketAddress;
@ -53,7 +52,7 @@ public class ResolveAddressHandler implements ChannelHandler {
if (cause != null) {
promise.setFailure(cause);
} else {
ctx.connect(future.getNow(), localAddress).addListener(new PromiseNotifier<>(promise));
ctx.connect(future.getNow(), localAddress).cascadeTo(promise);
}
ctx.pipeline().remove(ResolveAddressHandler.this);
});

View File

@ -40,7 +40,6 @@ import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.ImmediateExecutor;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseNotifier;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger;
@ -793,12 +792,12 @@ public class SslHandler extends ByteToMessageDecoder {
final ByteBuf b = out;
out = null;
if (promise != null) {
ctx.write(b).addListener(new PromiseNotifier<>(promise));
ctx.write(b).cascadeTo(promise);
} else {
ctx.write(b);
}
} else if (promise != null) {
ctx.write(Unpooled.EMPTY_BUFFER).addListener(new PromiseNotifier<>(promise));
ctx.write(Unpooled.EMPTY_BUFFER).cascadeTo(promise);
}
// else out is not readable we can re-use it and so save an extra allocation
@ -1885,7 +1884,7 @@ public class SslHandler extends ByteToMessageDecoder {
//
// See https://github.com/netty/netty/issues/5931
Promise<Void> cascade = ctx.newPromise();
PromiseNotifier.cascade(false, cascade, promise);
cascade.cascadeTo(promise);
safeClose(ctx, closeNotifyPromise, cascade);
} else {
/// We already handling the close_notify so just attach the promise to the sslClosePromise.
@ -1980,7 +1979,7 @@ public class SslHandler extends ByteToMessageDecoder {
if (!oldHandshakePromise.isDone()) {
// There's no need to handshake because handshake is in progress already.
// Merge the new promise into the old one.
PromiseNotifier.cascade(oldHandshakePromise, newHandshakePromise);
oldHandshakePromise.cascadeTo(newHandshakePromise);
} else {
handshakePromise = newHandshakePromise;
handshake(true);
@ -2072,7 +2071,7 @@ public class SslHandler extends ByteToMessageDecoder {
final ChannelHandlerContext ctx, final Future<Void> flushFuture,
final Promise<Void> promise) {
if (!ctx.channel().isActive()) {
ctx.close().addListener(new PromiseNotifier<>(promise));
ctx.close().cascadeTo(promise);
return;
}
@ -2150,7 +2149,7 @@ public class SslHandler extends ByteToMessageDecoder {
// IllegalStateException.
// Also we not want to log if the notification happens as this is expected in some cases.
// See https://github.com/netty/netty/issues/5598
PromiseNotifier.cascade(false, future, promise);
future.cascadeTo(promise);
}
/**

View File

@ -24,7 +24,6 @@ import io.netty.channel.ChannelPipeline;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseNotifier;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -283,7 +282,7 @@ public class ChunkedWriteHandler implements ChannelHandler {
requiresFlush = false;
} else {
queue.remove();
ctx.write(pendingMessage).addListener(new PromiseNotifier<>(currentWrite.promise));
ctx.write(pendingMessage).cascadeTo(currentWrite.promise);
requiresFlush = true;
}

View File

@ -18,7 +18,6 @@ package io.netty.handler.traffic;
import io.netty.buffer.ByteBufConvertible;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseNotifier;
import java.util.ArrayDeque;
import java.util.concurrent.TimeUnit;
@ -147,7 +146,7 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler
long size = calculateSize(toSend.toSend);
trafficCounter.bytesRealWriteFlowControl(size);
queueSize -= size;
ctx.write(toSend.toSend).addListener(new PromiseNotifier<>(toSend.promise));
ctx.write(toSend.toSend).cascadeTo(toSend.promise);
}
} else {
for (ToSend toSend : messagesQueue) {
@ -184,7 +183,7 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler
synchronized (this) {
if (delay == 0 && messagesQueue.isEmpty()) {
trafficCounter.bytesRealWriteFlowControl(size);
ctx.write(msg).addListener(new PromiseNotifier<>(promise));
ctx.write(msg).cascadeTo(promise);
return;
}
newToSend = new ToSend(delay + now, msg, promise);
@ -205,7 +204,7 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler
long size = calculateSize(newToSend.toSend);
trafficCounter.bytesRealWriteFlowControl(size);
queueSize -= size;
ctx.write(newToSend.toSend).addListener(new PromiseNotifier<>(newToSend.promise));
ctx.write(newToSend.toSend).cascadeTo(newToSend.promise);
} else {
messagesQueue.addFirst(newToSend);
break;

View File

@ -24,7 +24,6 @@ import io.netty.util.Attribute;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseNotifier;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -495,7 +494,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size);
perChannel.queueSize -= size;
queuesSize.addAndGet(-size);
ctx.write(toSend.toSend).addListener(new PromiseNotifier<>(toSend.promise));
ctx.write(toSend.toSend).cascadeTo(toSend.promise);
}
} else {
queuesSize.addAndGet(-perChannel.queueSize);
@ -714,7 +713,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
if (writedelay == 0 && perChannel.messagesQueue.isEmpty()) {
trafficCounter.bytesRealWriteFlowControl(size);
perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size);
ctx.write(msg).addListener(new PromiseNotifier<>(promise));
ctx.write(msg).cascadeTo(promise);
perChannel.lastWriteTimestamp = now;
return;
}
@ -749,7 +748,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size);
perChannel.queueSize -= size;
queuesSize.addAndGet(-size);
ctx.write(newToSend.toSend).addListener(new PromiseNotifier<>(newToSend.promise));
ctx.write(newToSend.toSend).cascadeTo(newToSend.promise);
perChannel.lastWriteTimestamp = now;
} else {
perChannel.messagesQueue.addFirst(newToSend);

View File

@ -21,7 +21,6 @@ import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseNotifier;
import java.util.ArrayDeque;
import java.util.concurrent.ConcurrentHashMap;
@ -273,7 +272,7 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
trafficCounter.bytesRealWriteFlowControl(size);
perChannel.queueSize -= size;
queuesSize.addAndGet(-size);
ctx.write(toSend.toSend).addListener(new PromiseNotifier<>(toSend.promise));
ctx.write(toSend.toSend).cascadeTo(toSend.promise);
}
} else {
queuesSize.addAndGet(-perChannel.queueSize);
@ -345,7 +344,7 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
synchronized (perChannel) {
if (writedelay == 0 && perChannel.messagesQueue.isEmpty()) {
trafficCounter.bytesRealWriteFlowControl(size);
ctx.write(msg).addListener(new PromiseNotifier<>(promise));
ctx.write(msg).cascadeTo(promise);
perChannel.lastWriteTimestamp = now;
return;
}
@ -379,7 +378,7 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler {
trafficCounter.bytesRealWriteFlowControl(size);
perChannel.queueSize -= size;
queuesSize.addAndGet(-size);
ctx.write(newToSend.toSend).addListener(new PromiseNotifier<>(newToSend.promise));
ctx.write(newToSend.toSend).cascadeTo(newToSend.promise);
perChannel.lastWriteTimestamp = now;
} else {
perChannel.messagesQueue.addFirst(newToSend);

View File

@ -41,7 +41,6 @@ import io.netty.handler.ssl.util.SimpleTrustManagerFactory;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseNotifier;
import io.netty.util.internal.EmptyArrays;
import io.netty.util.internal.ResourcesUtil;
import org.junit.jupiter.api.Timeout;
@ -430,7 +429,7 @@ public class ParameterizedSslHandlerTest {
protected void initChannel(Channel ch) throws Exception {
SslHandler handler = sslServerCtx.newHandler(ch.alloc());
handler.setCloseNotifyReadTimeoutMillis(closeNotifyReadTimeout);
PromiseNotifier.cascade(handler.sslCloseFuture(), serverPromise);
handler.sslCloseFuture().cascadeTo(serverPromise);
handler.handshakeFuture().addListener(future -> {
if (future.isFailed()) {
@ -466,7 +465,7 @@ public class ParameterizedSslHandlerTest {
SslHandler handler = sslClientCtx.newHandler(ch.alloc());
handler.setCloseNotifyReadTimeoutMillis(closeNotifyReadTimeout);
PromiseNotifier.cascade(handler.sslCloseFuture(), clientPromise);
handler.sslCloseFuture().cascadeTo(clientPromise);
handler.handshakeFuture().addListener(future -> {
if (future.isSuccess()) {
closeSent.compareAndSet(false, true);

View File

@ -24,7 +24,6 @@ import io.netty.microbench.channel.EmbeddedChannelWriteReleaseHandlerContext;
import io.netty.microbench.util.AbstractMicrobenchmark;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseNotifier;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@ -146,20 +145,19 @@ public class Http2FrameWriterDataBenchmark extends AbstractMicrobenchmark {
// Only the last frame is not retained. Until then, the outer finally must release.
ByteBuf frameHeader = header.slice(frameDataBytes, framePaddingBytes, lastFrame && endStream);
needToReleaseHeaders = !lastFrame;
ctx.write(lastFrame ? frameHeader : frameHeader.retain())
.addListener(new PromiseNotifier<>(promiseAggregator.newPromise()));
ctx.write(lastFrame ? frameHeader : frameHeader.retain()).cascadeTo(promiseAggregator.newPromise());
// Write the frame data.
ByteBuf frameData = data.readSlice(frameDataBytes);
// Only the last frame is not retained. Until then, the outer finally must release.
needToReleaseData = !lastFrame;
ctx.write(lastFrame ? frameData : frameData.retain())
.addListener(new PromiseNotifier<>(promiseAggregator.newPromise()));
.cascadeTo(promiseAggregator.newPromise());
// Write the frame padding.
if (paddingBytes(framePaddingBytes) > 0) {
ctx.write(ZERO_BUFFER.slice(0, paddingBytes(framePaddingBytes)))
.addListener(new PromiseNotifier<>(promiseAggregator.newPromise()));
.cascadeTo(promiseAggregator.newPromise());
}
} while (!lastFrame);
} catch (Throwable t) {

View File

@ -26,7 +26,6 @@ import io.netty.util.AttributeKey;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseNotifier;
import io.netty.util.internal.SocketUtils;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.logging.InternalLogger;
@ -39,7 +38,6 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import static io.netty.util.concurrent.PromiseNotifier.cascade;
import static java.util.Objects.requireNonNull;
/**
@ -250,7 +248,7 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C, F>, C
// At this point we know that the registration was complete and successful.
Channel channel = regFuture.getNow();
Promise<Void> promise = channel.newPromise();
cascade(true, promise, bindPromise, channel);
promise.map(v -> channel).cascadeTo(bindPromise);
doBind0(regFuture, channel, localAddress, promise);
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
@ -263,7 +261,7 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C, F>, C
} else {
Channel channel = future.getNow();
Promise<Void> promise = channel.newPromise();
cascade(true, promise, bindPromise, channel);
promise.map(v -> channel).cascadeTo(bindPromise);
doBind0(regFuture, channel, localAddress, promise);
}
});
@ -319,7 +317,7 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C, F>, C
// the pipeline in its channelRegistered() implementation.
channel.executor().execute(() -> {
if (regFuture.isSuccess()) {
PromiseNotifier.cascade(channel.bind(localAddress), promise)
channel.bind(localAddress).cascadeTo(promise)
.addListener(channel, ChannelFutureListeners.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());

View File

@ -36,7 +36,6 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import static io.netty.util.concurrent.PromiseNotifier.cascade;
import static java.util.Objects.requireNonNull;
/**
@ -268,7 +267,7 @@ public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel, ChannelFact
future = channel.connect(remoteAddress, localAddress);
}
future.addListener(channel, ChannelFutureListeners.CLOSE_ON_FAILURE);
cascade(true, future, promise, channel);
future.map(v -> channel).cascadeTo(promise);
});
}

View File

@ -21,7 +21,6 @@ import io.netty.buffer.CompositeByteBuf;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseNotifier;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -59,7 +58,7 @@ public abstract class AbstractCoalescingBufferQueue {
* @param promise to complete when all the bytes have been consumed and written, can be void.
*/
public final void addFirst(ByteBuf buf, Promise<Void> promise) {
addFirst(buf, new PromiseNotifier<Void>(promise));
addFirst(buf, f -> f.cascadeTo(promise));
}
private void addFirst(ByteBuf buf, FutureListener<Void> listener) {
@ -86,7 +85,7 @@ public abstract class AbstractCoalescingBufferQueue {
public final void add(ByteBuf buf, Promise<Void> promise) {
// buffers are added before promises so that we naturally 'consume' the entire buffer during removal
// before we complete it's promise.
add(buf, new PromiseNotifier<Void>(promise));
add(buf, f -> f.cascadeTo(promise));
}
/**
@ -254,7 +253,7 @@ public abstract class AbstractCoalescingBufferQueue {
previousBuf = ((ByteBufConvertible) entry).asByteBuf();
} else if (entry instanceof Promise) {
decrementReadableBytes(previousBuf.readableBytes());
ctx.write(previousBuf).addListener(new PromiseNotifier<>((Promise<? super Void>) entry));
ctx.write(previousBuf).cascadeTo((Promise<? super Void>) entry);
previousBuf = null;
} else {
decrementReadableBytes(previousBuf.readableBytes());

View File

@ -25,7 +25,6 @@ import io.netty.util.ResourceLeakHint;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseNotifier;
import io.netty.util.internal.ObjectPool;
import io.netty.util.internal.ThrowableUtil;
import io.netty.util.internal.StringUtil;
@ -430,7 +429,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
}
Promise<Void> promise = newPromise();
safeExecute(executor, () -> PromiseNotifier.cascade(findAndInvokeBind(localAddress), promise), promise, null);
safeExecute(executor, () -> findAndInvokeBind(localAddress).cascadeTo(promise), promise, null);
return promise;
}
@ -446,7 +445,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
return findAndInvokeDeregister();
}
Promise<Void> promise = newPromise();
safeExecute(executor, () -> PromiseNotifier.cascade(findAndInvokeDeregister(), promise), promise, null);
safeExecute(executor, () -> findAndInvokeDeregister().cascadeTo(promise), promise, null);
return promise;
}
private Future<Void> findAndInvokeBind(SocketAddress localAddress) {
@ -475,7 +474,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
}
Promise<Void> promise = newPromise();
safeExecute(executor, () ->
PromiseNotifier.cascade(findAndInvokeConnect(remoteAddress, localAddress), promise), promise, null);
findAndInvokeConnect(remoteAddress, localAddress).cascadeTo(promise), promise, null);
return promise;
}
@ -509,8 +508,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
return findAndInvokeDisconnect();
}
Promise<Void> promise = newPromise();
safeExecute(executor, () ->
PromiseNotifier.cascade(findAndInvokeDisconnect(), promise), promise, null);
safeExecute(executor, () -> findAndInvokeDisconnect().cascadeTo(promise), promise, null);
return promise;
}
@ -537,8 +535,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
return findAndInvokeClose();
}
Promise<Void> promise = newPromise();
safeExecute(executor, () ->
PromiseNotifier.cascade(findAndInvokeClose(), promise), promise, null);
safeExecute(executor, () -> findAndInvokeClose().cascadeTo(promise), promise, null);
return promise;
}
@ -565,8 +562,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
return findAndInvokeRegister();
}
Promise<Void> promise = newPromise();
safeExecute(executor, () ->
PromiseNotifier.cascade(findAndInvokeRegister(), promise), promise, null);
safeExecute(executor, () -> findAndInvokeRegister().cascadeTo(promise), promise, null);
return promise;
}
@ -912,7 +908,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
DefaultChannelHandlerContext next = findContext(ctx);
if (next == null) {
ReferenceCountUtil.release(msg);
failRemoved(ctx).addListener(new PromiseNotifier<>(promise));
failRemoved(ctx).cascadeTo(promise);
return;
}
write(next, msg, promise);
@ -944,7 +940,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
}
protected void write(DefaultChannelHandlerContext ctx, Object msg, Promise<Void> promise) {
PromiseNotifier.cascade(ctx.invokeWrite(msg), promise);
ctx.invokeWrite(msg).cascadeTo(promise);
}
}

View File

@ -19,7 +19,6 @@ import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseCombiner;
import io.netty.util.concurrent.PromiseNotifier;
import io.netty.util.internal.ObjectPool;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
@ -144,7 +143,7 @@ public final class PendingWriteQueue {
Object msg = write.msg;
Promise<Void> promise = write.promise;
recycle(write, false);
PromiseNotifier.cascade(ctx.write(msg), promise);
ctx.write(msg).cascadeTo(promise);
write = next;
}
}
@ -221,7 +220,7 @@ public final class PendingWriteQueue {
recycle(write, true);
Future<Void> future = ctx.write(msg);
PromiseNotifier.cascade(future, promise);
future.cascadeTo(promise);
return future;
}

View File

@ -37,7 +37,6 @@ import io.netty.channel.ChannelPipeline;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseNotifier;
import io.netty.util.concurrent.ScheduledFuture;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@ -345,7 +344,7 @@ public class EmbeddedChannelTest {
@Override
public Future<Void> write(final ChannelHandlerContext ctx, final Object msg) {
Promise<Void> promise = ctx.newPromise();
ctx.executor().execute(() -> ctx.write(msg).addListener(new PromiseNotifier<>(promise)));
ctx.executor().execute(() -> ctx.write(msg).cascadeTo(promise));
return promise;
}
});
@ -365,7 +364,7 @@ public class EmbeddedChannelTest {
public Future<Void> write(final ChannelHandlerContext ctx, final Object msg) {
Promise<Void> promise = ctx.newPromise();
ctx.executor().schedule(() -> {
ctx.writeAndFlush(msg).addListener(new PromiseNotifier<>(promise));
ctx.writeAndFlush(msg).cascadeTo(promise);
}, delay, TimeUnit.MILLISECONDS);
return promise;
}