package it.cavallium.dbengine.database.remote; import io.netty.handler.codec.ByteToMessageCodec; import io.netty5.buffer.api.Buffer; import io.netty5.util.Send; import it.cavallium.data.generator.nativedata.NullableString; import it.cavallium.dbengine.database.OptionalBuf; import it.cavallium.dbengine.rpc.current.data.RPCCrash; import it.cavallium.dbengine.rpc.current.data.RPCEvent; import it.cavallium.dbengine.rpc.current.data.nullables.NullableBytes; import it.unimi.dsi.fastutil.bytes.ByteArrayList; import it.unimi.dsi.fastutil.bytes.ByteList; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.function.Function; import java.util.function.Supplier; import java.util.logging.Level; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.Empty; import reactor.core.publisher.Sinks.One; import reactor.netty.NettyInbound; import reactor.netty.NettyOutbound; import reactor.netty.incubator.quic.QuicConnection; public class QuicUtils { public static final Mono NO_RESPONSE_ERROR = Mono.error(NoResponseReceivedException::new); public static byte[] toArrayNoCopy(ByteList b) { if (b instanceof ByteArrayList bal) { return bal.elements(); } else { return b.toByteArray(); } } public static String toString(ByteList b) { return new String(QuicUtils.toArrayNoCopy(b), StandardCharsets.UTF_8); } public static NullableBytes toBytes(OptionalBuf valueSendOpt) { if (valueSendOpt.isPresent()) { try (var value = valueSendOpt.get()) { var bytes = new byte[value.readableBytes()]; value.copyInto(value.readerOffset(), bytes, 0, bytes.length); return NullableBytes.ofNullable(ByteList.of(bytes)); } } else { return NullableBytes.empty(); } } public static Mono toBytes(Mono valueSendOptMono) { return valueSendOptMono.map(valueSendOpt -> { try (var value = valueSendOpt) { var bytes = new byte[value.readableBytes()]; value.copyInto(value.readerOffset(), bytes, 0, bytes.length); return NullableBytes.ofNullable(ByteList.of(bytes)); } }).defaultIfEmpty(NullableBytes.empty()); } public record QuicStream(NettyInbound in, NettyOutbound out) {} public static Mono catchRPCErrors(@NotNull Throwable error) { return Mono.just(new RPCCrash(500, NullableString.ofNullableBlank(error.getMessage()))); } private static RECV extractResponse(SEND request, RECV response) { return response; } /** * Create a general purpose QUIC stream */ public static Mono createStream(QuicConnection quicConnection, Mono streamTerminator) { return Mono.defer(() -> { One inOutSink = Sinks.one(); return quicConnection .createStream((in, out) -> Mono .fromRunnable(() -> inOutSink.tryEmitValue(new QuicStream(in, out)).orThrow()) .then(streamTerminator)) .then(inOutSink.asMono()); }); } /** * Send a single request, receive a single response */ @SuppressWarnings("unchecked") public static Mono> createMappedStream( @NotNull QuicConnection quicConnection, @NotNull Supplier> sendCodec, @Nullable Supplier> recvCodec) { return Mono.defer(() -> { Empty streamTerminator = Sinks.empty(); return QuicUtils .createStream(quicConnection, streamTerminator.asMono()) .map(stream -> { Flux inConn; if (recvCodec == null) { inConn = Flux.error(() -> new UnsupportedOperationException("Receiving responses is supported")); } else { inConn = Flux.defer(() -> (Flux) stream .in() .withConnection(conn -> conn.addHandler(recvCodec.get())) .receiveObject() .log("ClientBoundEvent", Level.FINEST) ) .publish(1) .refCount(); } return new MappedStream<>(stream.out, sendCodec, inConn, streamTerminator); }) .single(); }); } /** * Send a single request, receive a single response */ @SuppressWarnings("unchecked") public static Mono sendSimpleRequest(QuicConnection quicConnection, Supplier> sendCodec, Supplier> recvCodec, SEND req) { return QuicUtils .createMappedStream(quicConnection, sendCodec, recvCodec) .flatMap(stream -> { var recv = stream.receive().log("ClientBoundEvent", Level.FINEST); var send = stream.send(req).log("ServerBoundEvent", Level.FINEST); return send .then(recv) .doFinally(s -> stream.close()); }) .map(QuicUtils::mapErrors) .switchIfEmpty((Mono) NO_RESPONSE_ERROR); } /** * Send a single request, receive a single response */ public static Mono sendSimpleEvent(QuicConnection quicConnection, Supplier> sendCodec, SEND req) { return QuicUtils .createMappedStream(quicConnection, sendCodec, null) .flatMap(stream -> { var send = stream.send(req).log("ServerBoundEvent", Level.FINEST); return send.doFinally(s -> stream.close()); }) .map(QuicUtils::mapErrors) .then(); } private static R mapErrors(R value) { if (value instanceof RPCCrash crash) { throw new RPCException(crash.code(), crash.message().getNullable()); } else { return value; } } /** * Send n requests, receive n responses */ public static Flux sendSimpleRequestFlux(QuicConnection quicConnection, Supplier> sendCodec, Supplier> recvCodec, Publisher requestFlux) { return QuicUtils .createMappedStream(quicConnection, sendCodec, recvCodec) .flatMapMany(stream -> { var sends = Flux .from(requestFlux) .log("ServerBoundEvent", Level.FINEST) .concatMap(request -> stream.send(request) .thenReturn(request)); var receives = stream .receiveMany() .log("ClientBoundEvent", Level.FINEST); return Flux .zip(sends, receives, QuicUtils::extractResponse) .doFinally(s -> stream.close()); }) .map(QuicUtils::mapErrors) .log("ServerBoundEvent", Level.FINEST); } /** * Send update */ public static Mono sendUpdate(QuicConnection quicConnection, Supplier> codec, T request, Function> updater) { return QuicUtils .createMappedStream(quicConnection, codec, codec) .flatMapMany(stream -> { //noinspection unchecked var firstRequest = (Mono) stream .send(request) .then(); var receives = stream .receiveMany(); One firstResponseSink = Sinks.one(); //noinspection unchecked var firstResponse = (Mono) receives .elementAt(0) .switchIfEmpty((Mono) NO_RESPONSE_ERROR) .mapNotNull(value -> { if (value instanceof RPCCrash crash) { firstResponseSink.tryEmitEmpty(); //noinspection unchecked return (T) crash; } else { firstResponseSink.tryEmitValue(value); return null; } }) .doOnCancel(firstResponseSink::tryEmitEmpty); //noinspection unchecked var secondResponse = Mono // FirstResponse returns only if it's RPCCrash. // firstWithValue returns the crash first if it happens, otherwise it will // return receives .firstWithValue( firstResponse, receives.elementAt(1) ) .switchIfEmpty((Mono) NO_RESPONSE_ERROR); //noinspection unchecked var secondRequest = (Mono) firstResponseSink .asMono() .flatMap(updater) .flatMap(stream::send); return Flux .merge(firstRequest, firstResponse.then(Mono.empty()), secondRequest, secondResponse) .doFinally(s -> stream.close()); }) .map(QuicUtils::mapErrors) .singleOrEmpty(); } }