2022-03-04 01:26:18 +01:00
|
|
|
package it.cavallium.dbengine.database.remote;
|
|
|
|
|
|
|
|
import io.netty.handler.codec.ByteToMessageCodec;
|
2022-03-20 14:33:27 +01:00
|
|
|
import io.netty5.buffer.api.Buffer;
|
|
|
|
import io.netty5.buffer.api.Send;
|
2022-03-04 01:26:18 +01:00
|
|
|
import it.cavallium.data.generator.nativedata.NullableString;
|
2022-05-21 15:28:52 +02:00
|
|
|
import it.cavallium.dbengine.database.OptionalBuf;
|
2022-03-04 01:26:18 +01:00
|
|
|
import it.cavallium.dbengine.rpc.current.data.RPCCrash;
|
|
|
|
import it.cavallium.dbengine.rpc.current.data.RPCEvent;
|
2022-03-20 14:33:27 +01:00
|
|
|
import it.cavallium.dbengine.rpc.current.data.nullables.NullableBytes;
|
2022-03-04 01:26:18 +01:00
|
|
|
import it.unimi.dsi.fastutil.bytes.ByteArrayList;
|
|
|
|
import it.unimi.dsi.fastutil.bytes.ByteList;
|
|
|
|
import java.nio.charset.StandardCharsets;
|
2022-03-20 14:33:27 +01:00
|
|
|
import java.util.Optional;
|
2022-03-04 01:26:18 +01:00
|
|
|
import java.util.function.Function;
|
|
|
|
import java.util.function.Supplier;
|
|
|
|
import java.util.logging.Level;
|
2022-03-05 15:46:40 +01:00
|
|
|
import org.jetbrains.annotations.NotNull;
|
|
|
|
import org.jetbrains.annotations.Nullable;
|
2022-03-04 01:26:18 +01:00
|
|
|
import org.reactivestreams.Publisher;
|
|
|
|
import reactor.core.publisher.Flux;
|
|
|
|
import reactor.core.publisher.Mono;
|
|
|
|
import reactor.core.publisher.Sinks;
|
|
|
|
import reactor.core.publisher.Sinks.Empty;
|
|
|
|
import reactor.core.publisher.Sinks.One;
|
|
|
|
import reactor.netty.NettyInbound;
|
|
|
|
import reactor.netty.NettyOutbound;
|
|
|
|
import reactor.netty.incubator.quic.QuicConnection;
|
|
|
|
|
|
|
|
public class QuicUtils {
|
|
|
|
|
|
|
|
public static final Mono<?> NO_RESPONSE_ERROR = Mono.error(NoResponseReceivedException::new);
|
|
|
|
|
|
|
|
public static byte[] toArrayNoCopy(ByteList b) {
|
|
|
|
if (b instanceof ByteArrayList bal) {
|
|
|
|
return bal.elements();
|
|
|
|
} else {
|
|
|
|
return b.toByteArray();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
public static String toString(ByteList b) {
|
|
|
|
return new String(QuicUtils.toArrayNoCopy(b), StandardCharsets.UTF_8);
|
|
|
|
}
|
|
|
|
|
2022-05-21 15:28:52 +02:00
|
|
|
public static NullableBytes toBytes(OptionalBuf valueSendOpt) {
|
2022-03-20 14:33:27 +01:00
|
|
|
if (valueSendOpt.isPresent()) {
|
2022-05-20 10:20:00 +02:00
|
|
|
try (var value = valueSendOpt.get()) {
|
2022-03-20 14:33:27 +01:00
|
|
|
var bytes = new byte[value.readableBytes()];
|
|
|
|
value.copyInto(value.readerOffset(), bytes, 0, bytes.length);
|
|
|
|
return NullableBytes.ofNullable(ByteList.of(bytes));
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
return NullableBytes.empty();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-05-20 10:20:00 +02:00
|
|
|
public static Mono<NullableBytes> toBytes(Mono<Buffer> valueSendOptMono) {
|
2022-03-20 14:33:27 +01:00
|
|
|
return valueSendOptMono.map(valueSendOpt -> {
|
2022-05-20 10:20:00 +02:00
|
|
|
try (var value = valueSendOpt) {
|
2022-03-20 14:33:27 +01:00
|
|
|
var bytes = new byte[value.readableBytes()];
|
|
|
|
value.copyInto(value.readerOffset(), bytes, 0, bytes.length);
|
|
|
|
return NullableBytes.ofNullable(ByteList.of(bytes));
|
|
|
|
}
|
|
|
|
}).defaultIfEmpty(NullableBytes.empty());
|
|
|
|
}
|
|
|
|
|
2022-03-04 01:26:18 +01:00
|
|
|
public record QuicStream(NettyInbound in, NettyOutbound out) {}
|
|
|
|
|
2022-03-05 15:46:40 +01:00
|
|
|
public static Mono<RPCEvent> catchRPCErrors(@NotNull Throwable error) {
|
2022-03-04 01:26:18 +01:00
|
|
|
return Mono.just(new RPCCrash(500, NullableString.ofNullableBlank(error.getMessage())));
|
|
|
|
}
|
|
|
|
|
|
|
|
private static <SEND, RECV> RECV extractResponse(SEND request, RECV response) {
|
|
|
|
return response;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Create a general purpose QUIC stream
|
|
|
|
*/
|
|
|
|
public static Mono<QuicStream> createStream(QuicConnection quicConnection, Mono<Void> streamTerminator) {
|
|
|
|
return Mono.defer(() -> {
|
|
|
|
One<QuicStream> inOutSink = Sinks.one();
|
|
|
|
return quicConnection
|
|
|
|
.createStream((in, out) -> Mono
|
|
|
|
.fromRunnable(() -> inOutSink.tryEmitValue(new QuicStream(in, out)).orThrow())
|
|
|
|
.then(streamTerminator))
|
|
|
|
.then(inOutSink.asMono());
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Send a single request, receive a single response
|
|
|
|
*/
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
|
public static <SEND, RECV> Mono<MappedStream<SEND, RECV>> createMappedStream(
|
2022-03-05 15:46:40 +01:00
|
|
|
@NotNull QuicConnection quicConnection,
|
|
|
|
@NotNull Supplier<ByteToMessageCodec<? super SEND>> sendCodec,
|
|
|
|
@Nullable Supplier<ByteToMessageCodec<? super RECV>> recvCodec) {
|
2022-03-04 01:26:18 +01:00
|
|
|
return Mono.defer(() -> {
|
|
|
|
Empty<Void> streamTerminator = Sinks.empty();
|
|
|
|
return QuicUtils
|
|
|
|
.createStream(quicConnection, streamTerminator.asMono())
|
|
|
|
.map(stream -> {
|
2022-03-05 15:46:40 +01:00
|
|
|
Flux<RECV> inConn;
|
|
|
|
if (recvCodec == null) {
|
|
|
|
inConn = Flux.error(() -> new UnsupportedOperationException("Receiving responses is supported"));
|
|
|
|
} else {
|
|
|
|
inConn = Flux.defer(() -> (Flux<RECV>) stream
|
|
|
|
.in()
|
|
|
|
.withConnection(conn -> conn.addHandler(recvCodec.get()))
|
|
|
|
.receiveObject()
|
|
|
|
.log("ClientBoundEvent", Level.FINEST)
|
|
|
|
)
|
|
|
|
.publish(1)
|
|
|
|
.refCount();
|
|
|
|
}
|
2022-03-04 01:26:18 +01:00
|
|
|
return new MappedStream<>(stream.out, sendCodec, inConn, streamTerminator);
|
|
|
|
})
|
|
|
|
.single();
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Send a single request, receive a single response
|
|
|
|
*/
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
|
public static <SEND, RECV> Mono<RECV> sendSimpleRequest(QuicConnection quicConnection,
|
|
|
|
Supplier<ByteToMessageCodec<? super SEND>> sendCodec,
|
|
|
|
Supplier<ByteToMessageCodec<? super RECV>> recvCodec,
|
|
|
|
SEND req) {
|
|
|
|
return QuicUtils
|
|
|
|
.createMappedStream(quicConnection, sendCodec, recvCodec)
|
|
|
|
.flatMap(stream -> {
|
2022-03-04 01:28:18 +01:00
|
|
|
var recv = stream.receive().log("ClientBoundEvent", Level.FINEST);
|
|
|
|
var send = stream.send(req).log("ServerBoundEvent", Level.FINEST);
|
2022-03-04 01:26:18 +01:00
|
|
|
return send
|
|
|
|
.then(recv)
|
|
|
|
.doFinally(s -> stream.close());
|
|
|
|
})
|
2022-03-05 15:46:40 +01:00
|
|
|
.map(QuicUtils::mapErrors)
|
2022-03-04 01:28:18 +01:00
|
|
|
.switchIfEmpty((Mono<RECV>) NO_RESPONSE_ERROR);
|
2022-03-04 01:26:18 +01:00
|
|
|
}
|
|
|
|
|
2022-03-05 15:46:40 +01:00
|
|
|
/**
|
|
|
|
* Send a single request, receive a single response
|
|
|
|
*/
|
|
|
|
|
|
|
|
public static <SEND> Mono<Void> sendSimpleEvent(QuicConnection quicConnection,
|
|
|
|
Supplier<ByteToMessageCodec<? super SEND>> sendCodec,
|
|
|
|
SEND req) {
|
|
|
|
return QuicUtils
|
|
|
|
.createMappedStream(quicConnection, sendCodec, null)
|
|
|
|
.flatMap(stream -> {
|
|
|
|
var send = stream.send(req).log("ServerBoundEvent", Level.FINEST);
|
|
|
|
return send.doFinally(s -> stream.close());
|
|
|
|
})
|
|
|
|
.map(QuicUtils::mapErrors)
|
|
|
|
.then();
|
|
|
|
}
|
|
|
|
|
|
|
|
private static <R> R mapErrors(R value) {
|
|
|
|
if (value instanceof RPCCrash crash) {
|
2022-05-21 15:28:52 +02:00
|
|
|
throw new RPCException(crash.code(), crash.message().getNullable());
|
2022-03-05 15:46:40 +01:00
|
|
|
} else {
|
|
|
|
return value;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-03-04 01:26:18 +01:00
|
|
|
/**
|
|
|
|
* Send n requests, receive n responses
|
|
|
|
*/
|
|
|
|
public static <SEND, RECV> Flux<RECV> sendSimpleRequestFlux(QuicConnection quicConnection,
|
|
|
|
Supplier<ByteToMessageCodec<? super SEND>> sendCodec,
|
|
|
|
Supplier<ByteToMessageCodec<? super RECV>> recvCodec,
|
|
|
|
Publisher<SEND> requestFlux) {
|
|
|
|
return QuicUtils
|
|
|
|
.createMappedStream(quicConnection, sendCodec, recvCodec)
|
|
|
|
.flatMapMany(stream -> {
|
|
|
|
var sends = Flux
|
|
|
|
.from(requestFlux)
|
2022-03-04 01:28:18 +01:00
|
|
|
.log("ServerBoundEvent", Level.FINEST)
|
2022-03-04 01:26:18 +01:00
|
|
|
.concatMap(request -> stream.send(request)
|
|
|
|
.thenReturn(request));
|
|
|
|
var receives = stream
|
|
|
|
.receiveMany()
|
2022-03-04 01:28:18 +01:00
|
|
|
.log("ClientBoundEvent", Level.FINEST);
|
2022-03-04 01:26:18 +01:00
|
|
|
return Flux
|
|
|
|
.zip(sends, receives, QuicUtils::extractResponse)
|
|
|
|
.doFinally(s -> stream.close());
|
|
|
|
})
|
2022-03-05 15:46:40 +01:00
|
|
|
.map(QuicUtils::mapErrors)
|
2022-03-04 01:28:18 +01:00
|
|
|
.log("ServerBoundEvent", Level.FINEST);
|
2022-03-04 01:26:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Send update
|
|
|
|
*/
|
|
|
|
public static <T> Mono<T> sendUpdate(QuicConnection quicConnection,
|
|
|
|
Supplier<ByteToMessageCodec<? super T>> codec,
|
|
|
|
T request,
|
|
|
|
Function<T, Mono<T>> updater) {
|
|
|
|
return QuicUtils
|
|
|
|
.createMappedStream(quicConnection, codec, codec)
|
|
|
|
.flatMapMany(stream -> {
|
|
|
|
//noinspection unchecked
|
|
|
|
var firstRequest = (Mono<T>) stream
|
|
|
|
.send(request)
|
|
|
|
.then();
|
|
|
|
var receives = stream
|
|
|
|
.receiveMany();
|
|
|
|
One<T> firstResponseSink = Sinks.one();
|
|
|
|
//noinspection unchecked
|
|
|
|
var firstResponse = (Mono<T>) receives
|
|
|
|
.elementAt(0)
|
|
|
|
.switchIfEmpty((Mono<? extends T>) NO_RESPONSE_ERROR)
|
|
|
|
.mapNotNull(value -> {
|
|
|
|
if (value instanceof RPCCrash crash) {
|
|
|
|
firstResponseSink.tryEmitEmpty();
|
|
|
|
//noinspection unchecked
|
|
|
|
return (T) crash;
|
|
|
|
} else {
|
|
|
|
firstResponseSink.tryEmitValue(value);
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.doOnCancel(firstResponseSink::tryEmitEmpty);
|
|
|
|
//noinspection unchecked
|
|
|
|
var secondResponse = Mono
|
|
|
|
// FirstResponse returns only if it's RPCCrash.
|
|
|
|
// firstWithValue returns the crash first if it happens, otherwise it will
|
|
|
|
// return receives
|
|
|
|
.firstWithValue(
|
|
|
|
firstResponse,
|
|
|
|
receives.elementAt(1)
|
|
|
|
)
|
|
|
|
.switchIfEmpty((Mono<? extends T>) NO_RESPONSE_ERROR);
|
|
|
|
//noinspection unchecked
|
|
|
|
var secondRequest = (Mono<T>) firstResponseSink
|
|
|
|
.asMono()
|
|
|
|
.flatMap(updater)
|
|
|
|
.flatMap(stream::send);
|
|
|
|
return Flux
|
|
|
|
.merge(firstRequest, firstResponse.then(Mono.empty()), secondRequest, secondResponse)
|
|
|
|
.doFinally(s -> stream.close());
|
|
|
|
})
|
2022-03-05 15:46:40 +01:00
|
|
|
.map(QuicUtils::mapErrors)
|
2022-03-04 01:26:18 +01:00
|
|
|
.singleOrEmpty();
|
|
|
|
}
|
|
|
|
}
|