Implement and test some utilities

This commit is contained in:
Andrea Cavalli 2022-03-04 01:26:18 +01:00
parent 77af845a8a
commit 090a47ae86
7 changed files with 667 additions and 254 deletions

View File

@ -5,13 +5,26 @@ interfacesData:
extendInterfaces: [PathDirectoryOptions]
PathDirectoryOptions:
extendInterfaces: [LuceneDirectoryOptions]
ClientBoundRequest:
extendInterfaces: [RPCEvent]
ClientBoundResponse:
extendInterfaces: [RPCEvent]
ServerBoundRequest:
extendInterfaces: [RPCEvent]
ServerBoundResponse:
extendInterfaces: [RPCEvent]
# versions must have only numbers, lowercase letters, dots, dashes. Maximum: 99.999.9999
versions:
0.0.0:
details:
changelog: "First version"
superTypes:
ServerBoundRequest: [
RPCEvent: [
Empty,
Binary,
BinaryOptional,
SingletonUpdateOldData,
GeneratedEntityId,
GetDatabase,
GetLuceneIndex,
Disconnect,
@ -19,19 +32,31 @@ versions:
SingletonGet,
SingletonSet,
SingletonUpdateInit,
SingletonUpdateEnd
SingletonUpdateEnd,
RPCCrash
]
ServerBoundRequest: [
GetDatabase,
GetLuceneIndex,
Disconnect,
GetSingleton,
SingletonGet,
SingletonSet,
SingletonUpdateInit
]
ClientBoundResponse: [
Empty,
GeneratedEntityId,
Binary,
BinaryOptional
BinaryOptional,
RPCCrash
]
ClientBoundRequest: [
SingletonUpdateOldData
]
ServerBoundResponse: [
Empty
Empty,
SingletonUpdateEnd
]
LuceneDirectoryOptions: [
ByteBuffersDirectory,
@ -94,12 +119,12 @@ versions:
javaClass: java.util.Map<java.lang.String, org.rocksdb.ColumnFamilyHandle>
serializer: it.cavallium.dbengine.database.remote.String2ColumnFamilyHandleMapSerializer
classes:
BoxedRPCEvent:
data:
val: RPCEvent
# Server-bound requests
BoxedServerBoundRequest:
data:
val: ServerBoundRequest
GetDatabase:
data:
name: String
@ -138,18 +163,16 @@ versions:
# Client-bound responses
BoxedClientBoundResponse:
data:
val: ClientBoundResponse
GeneratedEntityId:
data:
id: long
RPCCrash:
data:
code: int
message: -String
# Client-bound requests
BoxedClientBoundRequest:
data:
val: ClientBoundRequest
SingletonUpdateOldData:
data:
exist: boolean
@ -157,9 +180,6 @@ versions:
# Server-bound responses
BoxedServerBoundResponse:
data:
val: ServerBoundResponse
# Data

View File

@ -4,7 +4,6 @@ import io.micrometer.core.instrument.MeterRegistry;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.Send;
import io.netty.handler.codec.PrematureChannelClosureException;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.incubator.codec.quic.QuicSslContextBuilder;
import it.cavallium.dbengine.client.IndicizerAnalyzers;
@ -19,10 +18,8 @@ import it.cavallium.dbengine.database.LLSingleton;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.remote.RPCCodecs.RPCClientBoundResponseDecoder;
import it.cavallium.dbengine.database.remote.RPCCodecs.RPCClientAlternateDecoder;
import it.cavallium.dbengine.database.remote.RPCCodecs.RPCServerAlternateDecoder;
import it.cavallium.dbengine.database.remote.RPCCodecs.RPCServerBoundRequestDecoder;
import it.cavallium.dbengine.database.remote.RPCCodecs.RPCEventCodec;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.lucene.LuceneHacks;
import it.cavallium.dbengine.rpc.current.data.BinaryOptional;
@ -33,12 +30,15 @@ import it.cavallium.dbengine.rpc.current.data.DatabaseOptions;
import it.cavallium.dbengine.rpc.current.data.GeneratedEntityId;
import it.cavallium.dbengine.rpc.current.data.GetDatabase;
import it.cavallium.dbengine.rpc.current.data.GetSingleton;
import it.cavallium.dbengine.rpc.current.data.RPCEvent;
import it.cavallium.dbengine.rpc.current.data.ServerBoundRequest;
import it.cavallium.dbengine.rpc.current.data.ServerBoundResponse;
import it.cavallium.dbengine.rpc.current.data.SingletonGet;
import it.cavallium.dbengine.rpc.current.data.SingletonSet;
import it.cavallium.dbengine.rpc.current.data.SingletonUpdateEnd;
import it.cavallium.dbengine.rpc.current.data.SingletonUpdateInit;
import it.cavallium.dbengine.rpc.current.data.SingletonUpdateOldData;
import it.cavallium.dbengine.rpc.current.data.nullables.NullableLLSnapshot;
import it.unimi.dsi.fastutil.bytes.ByteArrayList;
import it.unimi.dsi.fastutil.bytes.ByteList;
import java.io.File;
import java.net.SocketAddress;
@ -48,10 +48,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.logging.Level;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import reactor.core.publisher.Sinks.Empty;
import reactor.netty.incubator.quic.QuicClient;
import reactor.netty.incubator.quic.QuicConnection;
@ -136,73 +135,64 @@ public class LLQuicConnection implements LLDatabaseConnection {
.thenReturn(this);
}
private <T extends ClientBoundResponse> Mono<T> sendRequest(ServerBoundRequest req) {
return Mono
.<T>create(sink -> {
var sub = quicConnection
.createStream((in, out) -> {
var writerMono = out
.withConnection(conn -> conn.addHandler(new RPCServerBoundRequestDecoder()))
.sendObject(req)
.then();
var readerMono = in
.withConnection(conn -> conn.addHandler(new RPCClientBoundResponseDecoder()))
.receiveObject()
.doOnNext(result -> {
if (result != null) {
//noinspection unchecked
sink.success((T) result);
} else {
sink.success();
}
})
.take(1, true)
.singleOrEmpty()
.then();
return writerMono
.then(readerMono)
.doOnCancel(() -> sink.error(new PrematureChannelClosureException("Request failed")));
})
.log("a", Level.INFO)
.subscribeOn(Schedulers.parallel())
.subscribe(x -> {}, sink::error);
sink.onDispose(sub);
})
.log("x", Level.INFO);
@SuppressWarnings("unchecked")
private <T extends ClientBoundResponse> Mono<T> sendRequest(ServerBoundRequest serverBoundRequest) {
return QuicUtils.<RPCEvent, RPCEvent>sendSimpleRequest(quicConnection,
RPCEventCodec::new,
RPCEventCodec::new,
serverBoundRequest
).map(event -> (T) event);
}
private <T extends ClientBoundResponse, U extends ClientBoundRequest> Mono<T> sendUpdateRequest(ServerBoundRequest req,
private <T extends ClientBoundResponse, U extends ClientBoundRequest> Mono<T> sendUpdateRequest(ServerBoundRequest serverBoundReq,
Function<U, ServerBoundResponse> updaterFunction) {
return Mono
.<T>create(sink -> {
var sub = quicConnection
.createStream((in, out) -> {
var inConn = in.withConnection(conn -> conn.addHandler(new RPCServerAlternateDecoder()));
var outConn = out.withConnection(conn -> conn.addHandler(new RPCClientAlternateDecoder()));
var request2 = Sinks.one();
var writerMono = outConn
.sendObject(Mono.<Object>just(req).concatWith(request2.asMono()))
.then();
var responseMono = inConn
.receiveObject()
.switchOnFirst((first, flux) -> {
//noinspection unchecked
var req2 = updaterFunction.apply((U) first);
request2.tryEmitValue(req2);
//noinspection unchecked
return flux.skip(1).map(resp2 -> (T) resp2);
});
return writerMono
.thenMany(responseMono)
.doOnCancel(() -> sink.error(new PrematureChannelClosureException("Request failed")))
.then();
})
.log("a", Level.INFO)
.subscribeOn(Schedulers.parallel())
.subscribe(x -> {}, sink::error);
sink.onDispose(sub);
})
.log("x", Level.INFO);
return Mono.empty();
/*
return Mono.defer(() -> {
Empty<Void> streamTerminator = Sinks.empty();
return QuicUtils.createStream(quicConnection, stream -> {
Mono<Void> serverReq = Mono.defer(() -> stream.out()
.withConnection(conn -> conn.addHandler(new RPCCodecs.RPCServerBoundRequestDecoder()))
.sendObject(serverBoundReq)
.then()).doOnSubscribe(s -> System.out.println("out1"));
//noinspection unchecked
Mono<U> clientBoundReqMono = Mono.defer(() -> stream.in()
.withConnection(conn -> conn.addHandler(new RPCClientBoundRequestDecoder()))
.receiveObject()
.log("TO_CLIENT_REQ", Level.INFO)
.take(1, true)
.singleOrEmpty()
.map(req -> (U) req)
.doOnSubscribe(s -> System.out.println("in1"))
.switchIfEmpty((Mono<U>) QuicUtils.NO_RESPONSE_ERROR)
);
Mono<Void> serverBoundRespFlux = clientBoundReqMono
.map(updaterFunction)
.transform(respMono -> Mono.defer(() -> stream.out()
.withConnection(conn -> conn.addHandler(new RPCServerBoundResponseDecoder()))
.sendObject(respMono)
.then()
.doOnSubscribe(s -> System.out.println("out2"))
));
//noinspection unchecked
Mono<T> clientBoundResponseMono = Mono.defer(() -> stream.in()
.withConnection(conn -> conn.addHandler(new RPCClientBoundResponseDecoder()))
.receiveObject()
.map(resp -> (T) resp)
.log("TO_SERVER_RESP", Level.INFO)
.take(1, true)
.doOnSubscribe(s -> System.out.println("out2"))
.singleOrEmpty()
.switchIfEmpty((Mono<T>) QuicUtils.NO_RESPONSE_ERROR));
return serverReq
.then(serverBoundRespFlux)
.then(clientBoundResponseMono)
.doFinally(s -> streamTerminator.tryEmitEmpty());
}, streamTerminator.asMono()).single();
});
*/
}
@Override
@ -232,7 +222,7 @@ public class LLQuicConnection implements LLDatabaseConnection {
return sendRequest(new SingletonGet(singletonId, NullableLLSnapshot.ofNullable(snapshot)))
.cast(BinaryOptional.class)
.mapNotNull(b -> b.val().getNullable())
.map(binary -> toArrayNoCopy(binary.val()));
.map(binary -> QuicUtils.toArrayNoCopy(binary.val()));
}
@Override
@ -244,7 +234,34 @@ public class LLQuicConnection implements LLDatabaseConnection {
@Override
public Mono<Send<Buffer>> update(SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
UpdateReturnMode updateReturnMode) {
return null;
return LLQuicConnection.this.<BinaryOptional, SingletonUpdateOldData>sendUpdateRequest(new SingletonUpdateInit(singletonId, updateReturnMode), prev -> {
byte[] oldData = toArrayNoCopy(prev);
Send<Buffer> oldDataBuf;
if (oldData != null) {
oldDataBuf = allocator.copyOf(oldData).send();
} else {
oldDataBuf = null;
}
try (oldDataBuf) {
try (var result = updater.apply(oldDataBuf)) {
if (result == null) {
return new SingletonUpdateEnd(false, ByteList.of());
} else {
byte[] resultArray = new byte[result.readableBytes()];
result.readBytes(resultArray, 0, resultArray.length);
return new SingletonUpdateEnd(true, ByteList.of(resultArray));
}
}
} catch (SerializationException e) {
throw new IllegalStateException(e);
}
}).mapNotNull(result -> {
if (result.val().isPresent()) {
return allocator.copyOf(QuicUtils.toArrayNoCopy(result.val().get().val())).send();
} else {
return null;
}
});
}
@Override
@ -306,11 +323,12 @@ public class LLQuicConnection implements LLDatabaseConnection {
});
}
private static byte[] toArrayNoCopy(ByteList b) {
if (b instanceof ByteArrayList bal) {
return bal.elements();
@Nullable
private static byte[] toArrayNoCopy(SingletonUpdateOldData oldData) {
if (oldData.exist()) {
return QuicUtils.toArrayNoCopy(oldData.oldValue());
} else {
return b.toByteArray();
return null;
}
}

View File

@ -0,0 +1,48 @@
package it.cavallium.dbengine.database.remote;
import io.netty.handler.codec.ByteToMessageCodec;
import java.util.function.Supplier;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks.Empty;
import reactor.netty.NettyOutbound;
public class MappedStream <SEND, RECV> implements AutoCloseable {
private final Flux<RECV> inConn;
private final NettyOutbound outConn;
private final Supplier<ByteToMessageCodec<? super SEND>> outCodec;
private final Empty<Void> streamTerminator;
public MappedStream(NettyOutbound outConn, Supplier<ByteToMessageCodec<? super SEND>> outCodec, Flux<RECV> inConn, Empty<Void> streamTerminator) {
this.inConn = inConn;
this.outConn = outConn;
this.outCodec = outCodec;
this.streamTerminator = streamTerminator;
}
private NettyOutbound getOut() {
return outConn.withConnection(conn -> conn.addHandler(outCodec.get()));
}
public Mono<Void> send(SEND item) {
return getOut().sendObject(item).then();
}
public Mono<Void> sendMany(Flux<SEND> items) {
return getOut().sendObject(items).then();
}
public Mono<RECV> receive() {
return inConn.take(1, true).singleOrEmpty();
}
public Flux<RECV> receiveMany() {
return inConn.hide();
}
@Override
public void close() {
streamTerminator.tryEmitEmpty();
}
}

View File

@ -0,0 +1,12 @@
package it.cavallium.dbengine.database.remote;
public class NoResponseReceivedException extends IllegalStateException {
public NoResponseReceivedException() {
super("No response received");
}
public NoResponseReceivedException(Throwable cause) {
super("No response received", cause);
}
}

View File

@ -0,0 +1,188 @@
package it.cavallium.dbengine.database.remote;
import io.netty.handler.codec.ByteToMessageCodec;
import it.cavallium.data.generator.nativedata.NullableString;
import it.cavallium.dbengine.rpc.current.data.RPCCrash;
import it.cavallium.dbengine.rpc.current.data.RPCEvent;
import it.unimi.dsi.fastutil.bytes.ByteArrayList;
import it.unimi.dsi.fastutil.bytes.ByteList;
import java.nio.charset.StandardCharsets;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.logging.Level;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Empty;
import reactor.core.publisher.Sinks.One;
import reactor.netty.NettyInbound;
import reactor.netty.NettyOutbound;
import reactor.netty.incubator.quic.QuicConnection;
public class QuicUtils {
public static final Mono<?> NO_RESPONSE_ERROR = Mono.error(NoResponseReceivedException::new);
public static byte[] toArrayNoCopy(ByteList b) {
if (b instanceof ByteArrayList bal) {
return bal.elements();
} else {
return b.toByteArray();
}
}
public static String toString(ByteList b) {
return new String(QuicUtils.toArrayNoCopy(b), StandardCharsets.UTF_8);
}
public record QuicStream(NettyInbound in, NettyOutbound out) {}
public static Mono<RPCEvent> catchRPCErrors(Throwable error) {
return Mono.just(new RPCCrash(500, NullableString.ofNullableBlank(error.getMessage())));
}
private static <SEND, RECV> RECV extractResponse(SEND request, RECV response) {
return response;
}
/**
* Create a general purpose QUIC stream
*/
public static Mono<QuicStream> createStream(QuicConnection quicConnection, Mono<Void> streamTerminator) {
return Mono.defer(() -> {
One<QuicStream> inOutSink = Sinks.one();
return quicConnection
.createStream((in, out) -> Mono
.fromRunnable(() -> inOutSink.tryEmitValue(new QuicStream(in, out)).orThrow())
.then(streamTerminator))
.then(inOutSink.asMono());
});
}
/**
* Send a single request, receive a single response
*/
@SuppressWarnings("unchecked")
public static <SEND, RECV> Mono<MappedStream<SEND, RECV>> createMappedStream(
QuicConnection quicConnection,
Supplier<ByteToMessageCodec<? super SEND>> sendCodec,
Supplier<ByteToMessageCodec<? super RECV>> recvCodec) {
return Mono.defer(() -> {
Empty<Void> streamTerminator = Sinks.empty();
return QuicUtils
.createStream(quicConnection, streamTerminator.asMono())
.map(stream -> {
Flux<RECV> inConn = Flux.defer(() -> (Flux<RECV>) stream
.in()
.withConnection(conn -> conn.addHandler(recvCodec.get()))
.receiveObject()
.log("RECEIVE_OBJECT_FROM_SERVER", Level.FINEST))
.publish(1)
.refCount();
return new MappedStream<>(stream.out, sendCodec, inConn, streamTerminator);
})
.single();
});
}
/**
* Send a single request, receive a single response
*/
@SuppressWarnings("unchecked")
public static <SEND, RECV> Mono<RECV> sendSimpleRequest(QuicConnection quicConnection,
Supplier<ByteToMessageCodec<? super SEND>> sendCodec,
Supplier<ByteToMessageCodec<? super RECV>> recvCodec,
SEND req) {
return QuicUtils
.createMappedStream(quicConnection, sendCodec, recvCodec)
.flatMap(stream -> {
var recv = stream.receive().log("SR-Receive", Level.FINEST);
var send = stream.send(req).log("SR-Send", Level.FINEST);
return send
.then(recv)
.doFinally(s -> stream.close());
})
.switchIfEmpty((Mono<RECV>) NO_RESPONSE_ERROR)
.log("SR-Result", Level.FINEST);
}
/**
* Send n requests, receive n responses
*/
public static <SEND, RECV> Flux<RECV> sendSimpleRequestFlux(QuicConnection quicConnection,
Supplier<ByteToMessageCodec<? super SEND>> sendCodec,
Supplier<ByteToMessageCodec<? super RECV>> recvCodec,
Publisher<SEND> requestFlux) {
return QuicUtils
.createMappedStream(quicConnection, sendCodec, recvCodec)
.flatMapMany(stream -> {
var sends = Flux
.from(requestFlux)
.log("SR-Send", Level.FINEST)
.concatMap(request -> stream.send(request)
.thenReturn(request));
var receives = stream
.receiveMany()
.log("SR-Receive", Level.FINEST);
return Flux
.zip(sends, receives, QuicUtils::extractResponse)
.doFinally(s -> stream.close());
})
.log("SR-Result", Level.FINEST);
}
/**
* Send update
*/
public static <T> Mono<T> sendUpdate(QuicConnection quicConnection,
Supplier<ByteToMessageCodec<? super T>> codec,
T request,
Function<T, Mono<T>> updater) {
return QuicUtils
.createMappedStream(quicConnection, codec, codec)
.flatMapMany(stream -> {
//noinspection unchecked
var firstRequest = (Mono<T>) stream
.send(request)
.then();
var receives = stream
.receiveMany();
One<T> firstResponseSink = Sinks.one();
//noinspection unchecked
var firstResponse = (Mono<T>) receives
.elementAt(0)
.switchIfEmpty((Mono<? extends T>) NO_RESPONSE_ERROR)
.mapNotNull(value -> {
if (value instanceof RPCCrash crash) {
firstResponseSink.tryEmitEmpty();
//noinspection unchecked
return (T) crash;
} else {
firstResponseSink.tryEmitValue(value);
return null;
}
})
.doOnCancel(firstResponseSink::tryEmitEmpty);
//noinspection unchecked
var secondResponse = Mono
// FirstResponse returns only if it's RPCCrash.
// firstWithValue returns the crash first if it happens, otherwise it will
// return receives
.firstWithValue(
firstResponse,
receives.elementAt(1)
)
.switchIfEmpty((Mono<? extends T>) NO_RESPONSE_ERROR);
//noinspection unchecked
var secondRequest = (Mono<T>) firstResponseSink
.asMono()
.flatMap(updater)
.flatMap(stream::send);
return Flux
.merge(firstRequest, firstResponse.then(Mono.empty()), secondRequest, secondResponse)
.doFinally(s -> stream.close());
})
.singleOrEmpty();
}
}

View File

@ -6,34 +6,29 @@ import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageCodec;
import it.cavallium.dbengine.rpc.current.data.BoxedClientBoundRequest;
import it.cavallium.dbengine.rpc.current.data.BoxedClientBoundResponse;
import it.cavallium.dbengine.rpc.current.data.BoxedServerBoundRequest;
import it.cavallium.dbengine.rpc.current.data.BoxedServerBoundResponse;
import it.cavallium.dbengine.rpc.current.data.BoxedRPCEvent;
import it.cavallium.dbengine.rpc.current.data.ClientBoundRequest;
import it.cavallium.dbengine.rpc.current.data.ClientBoundResponse;
import it.cavallium.dbengine.rpc.current.data.IBasicType;
import it.cavallium.dbengine.rpc.current.data.RPCEvent;
import it.cavallium.dbengine.rpc.current.data.ServerBoundRequest;
import it.cavallium.dbengine.rpc.current.data.ServerBoundResponse;
import it.cavallium.dbengine.rpc.current.serializers.BoxedClientBoundRequestSerializer;
import it.cavallium.dbengine.rpc.current.serializers.BoxedClientBoundResponseSerializer;
import it.cavallium.dbengine.rpc.current.serializers.BoxedServerBoundRequestSerializer;
import it.cavallium.dbengine.rpc.current.serializers.BoxedServerBoundResponseSerializer;
import it.cavallium.dbengine.rpc.current.serializers.BoxedRPCEventSerializer;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.util.List;
public class RPCCodecs {
public static class RPCClientBoundRequestDecoder extends ByteToMessageCodec<ClientBoundRequest> {
public static class RPCEventCodec extends ByteToMessageCodec<RPCEvent> {
public static final ChannelHandler INSTANCE = new RPCClientBoundRequestDecoder();
public static final ChannelHandler INSTANCE = new RPCEventCodec();
@Override
protected void encode(ChannelHandlerContext ctx, ClientBoundRequest msg, ByteBuf out) throws Exception {
protected void encode(ChannelHandlerContext ctx, RPCEvent msg, ByteBuf out) throws Exception {
try (var bbos = new ByteBufOutputStream(out)) {
try (var dos = new DataOutputStream(bbos)) {
BoxedClientBoundRequestSerializer.INSTANCE.serialize(dos, BoxedClientBoundRequest.of(msg));
BoxedRPCEventSerializer.INSTANCE.serialize(dos, BoxedRPCEvent.of(msg));
}
}
}
@ -42,154 +37,7 @@ public class RPCCodecs {
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
try (var bbis = new ByteBufInputStream(msg)) {
try (var dis = new DataInputStream(bbis)) {
out.add(BoxedClientBoundRequestSerializer.INSTANCE.deserialize(dis).val());
}
}
}
}
public static class RPCServerBoundRequestDecoder extends ByteToMessageCodec<ServerBoundRequest> {
public static final ChannelHandler INSTANCE = new RPCServerBoundRequestDecoder();
@Override
protected void encode(ChannelHandlerContext ctx, ServerBoundRequest msg, ByteBuf out) throws Exception {
try (var bbos = new ByteBufOutputStream(out)) {
try (var dos = new DataOutputStream(bbos)) {
BoxedServerBoundRequestSerializer.INSTANCE.serialize(dos, BoxedServerBoundRequest.of(msg));
}
}
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
try (var bbis = new ByteBufInputStream(msg)) {
try (var dis = new DataInputStream(bbis)) {
out.add(BoxedServerBoundRequestSerializer.INSTANCE.deserialize(dis).val());
}
}
}
}
public static class RPCClientBoundResponseDecoder extends ByteToMessageCodec<ClientBoundResponse> {
public static final ChannelHandler INSTANCE = new RPCClientBoundResponseDecoder();
@Override
protected void encode(ChannelHandlerContext ctx, ClientBoundResponse msg, ByteBuf out) throws Exception {
try (var bbos = new ByteBufOutputStream(out)) {
try (var dos = new DataOutputStream(bbos)) {
BoxedClientBoundResponseSerializer.INSTANCE.serialize(dos, BoxedClientBoundResponse.of(msg));
}
}
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
try (var bbis = new ByteBufInputStream(msg)) {
try (var dis = new DataInputStream(bbis)) {
out.add(BoxedClientBoundResponseSerializer.INSTANCE.deserialize(dis).val());
}
}
}
}
public static class RPCServerBoundResponseDecoder extends ByteToMessageCodec<ServerBoundResponse> {
public static final ChannelHandler INSTANCE = new RPCServerBoundResponseDecoder();
@Override
protected void encode(ChannelHandlerContext ctx, ServerBoundResponse msg, ByteBuf out) throws Exception {
try (var bbos = new ByteBufOutputStream(out)) {
try (var dos = new DataOutputStream(bbos)) {
BoxedServerBoundResponseSerializer.INSTANCE.serialize(dos, BoxedServerBoundResponse.of(msg));
}
}
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
try (var bbis = new ByteBufInputStream(msg)) {
try (var dis = new DataInputStream(bbis)) {
out.add(BoxedServerBoundResponseSerializer.INSTANCE.deserialize(dis).val());
}
}
}
}
public static class RPCClientAlternateDecoder extends ByteToMessageCodec<IBasicType> {
public static final ChannelHandler INSTANCE = new RPCClientAlternateDecoder();
private boolean alternated = false;
@Override
protected void encode(ChannelHandlerContext ctx, IBasicType msg, ByteBuf out) throws Exception {
try (var bbos = new ByteBufOutputStream(out)) {
try (var dos = new DataOutputStream(bbos)) {
if (!alternated) {
BoxedServerBoundRequestSerializer.INSTANCE.serialize(dos,
BoxedServerBoundRequest.of((ServerBoundRequest) msg)
);
} else {
BoxedClientBoundResponseSerializer.INSTANCE.serialize(dos,
BoxedClientBoundResponse.of((ClientBoundResponse) msg)
);
}
alternated = !alternated;
}
}
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
try (var bbis = new ByteBufInputStream(msg)) {
try (var dis = new DataInputStream(bbis)) {
if (!alternated) {
out.add(BoxedServerBoundRequestSerializer.INSTANCE.deserialize(dis).val());
} else {
out.add(BoxedClientBoundResponseSerializer.INSTANCE.deserialize(dis).val());
}
alternated = !alternated;
}
}
}
}
public static class RPCServerAlternateDecoder extends ByteToMessageCodec<IBasicType> {
public static final ChannelHandler INSTANCE = new RPCServerAlternateDecoder();
private boolean alternated = false;
@Override
protected void encode(ChannelHandlerContext ctx, IBasicType msg, ByteBuf out) throws Exception {
try (var bbos = new ByteBufOutputStream(out)) {
try (var dos = new DataOutputStream(bbos)) {
if (!alternated) {
BoxedClientBoundRequestSerializer.INSTANCE.serialize(dos,
BoxedClientBoundRequest.of((ClientBoundRequest) msg)
);
} else {
BoxedServerBoundResponseSerializer.INSTANCE.serialize(dos,
BoxedServerBoundResponse.of((ServerBoundResponse) msg)
);
}
alternated = !alternated;
}
}
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
try (var bbis = new ByteBufInputStream(msg)) {
try (var dis = new DataInputStream(bbis)) {
if (!alternated) {
out.add(BoxedClientBoundRequestSerializer.INSTANCE.deserialize(dis).val());
} else {
out.add(BoxedServerBoundResponseSerializer.INSTANCE.deserialize(dis).val());
}
alternated = !alternated;
out.add(BoxedRPCEventSerializer.INSTANCE.deserialize(dis).val());
}
}
}

View File

@ -0,0 +1,279 @@
package it.cavallium.dbengine.database.remote;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import io.netty.incubator.codec.quic.InsecureQuicTokenHandler;
import io.netty.incubator.codec.quic.QuicConnectionIdGenerator;
import io.netty.incubator.codec.quic.QuicSslContext;
import io.netty.incubator.codec.quic.QuicSslContextBuilder;
import it.cavallium.data.generator.nativedata.NullableString;
import it.cavallium.dbengine.database.remote.RPCCodecs.RPCEventCodec;
import it.cavallium.dbengine.rpc.current.data.Empty;
import it.cavallium.dbengine.rpc.current.data.RPCCrash;
import it.cavallium.dbengine.rpc.current.data.RPCEvent;
import it.cavallium.dbengine.rpc.current.data.SingletonGet;
import it.cavallium.dbengine.rpc.current.data.nullables.NullableLLSnapshot;
import java.net.InetSocketAddress;
import java.security.cert.CertificateException;
import java.time.Duration;
import java.util.List;
import java.util.logging.Level;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.netty.Connection;
import reactor.netty.incubator.quic.QuicClient;
import reactor.netty.incubator.quic.QuicConnection;
class QuicUtilsTest {
private static final int NORMAL = 0;
private static final int WAIT_TIME = 1;
private static final int FAIL_IMMEDIATELY = 2;
private static final int WAIT_TIME_THEN_FAIL = 3;
private Connection serverConn;
private QuicConnection clientConn;
private InetSocketAddress clientAddress;
private InetSocketAddress serverAddress;
@BeforeEach
void setUp() throws CertificateException {
var selfSignedCert = new SelfSignedCertificate();
this.clientAddress = new InetSocketAddress("localhost", 8081);
this.serverAddress = new InetSocketAddress("localhost", 8080);
QuicSslContext sslContext = QuicSslContextBuilder
.forServer(selfSignedCert.key(), null, selfSignedCert.cert())
.applicationProtocols("db/0.9")
.clientAuth(ClientAuth.NONE)
.build();
var qs = reactor.netty.incubator.quic.QuicServer
.create()
.tokenHandler(InsecureQuicTokenHandler.INSTANCE)
.bindAddress(() -> serverAddress)
.secure(sslContext)
.idleTimeout(Duration.ofSeconds(30))
.connectionIdAddressGenerator(QuicConnectionIdGenerator.randomGenerator())
.initialSettings(spec -> spec
.maxData(10000000)
.maxStreamDataBidirectionalLocal(1000000)
.maxStreamDataBidirectionalRemote(1000000)
.maxStreamsBidirectional(100)
.maxStreamsUnidirectional(100)
)
.handleStream((in, out) -> in
.withConnection(conn -> conn.addHandler(new RPCEventCodec()))
.receiveObject()
.cast(RPCEvent.class)
.log("recv", Level.FINEST)
.flatMapSequential(req -> (switch ((int) ((SingletonGet) req).singletonId()) {
case NORMAL -> Mono.<RPCEvent>just(Empty.of());
case FAIL_IMMEDIATELY -> Mono.<RPCEvent>error(new Throwable("Expected error"));
case WAIT_TIME -> Mono.delay(Duration.ofSeconds(3)).<RPCEvent>thenReturn(Empty.of());
case WAIT_TIME_THEN_FAIL -> Mono
.delay(Duration.ofSeconds(3))
.then(Mono.<RPCEvent>error(new Throwable("Expected error")));
default -> Mono.<RPCEvent>error(new UnsupportedOperationException("Unsupported request id " + req));
}).log("Server", Level.SEVERE, SignalType.ON_ERROR).onErrorResume(QuicUtils::catchRPCErrors))
.concatMap(message -> Mono.defer(() -> out
.withConnection(conn -> conn.addHandler(new RPCEventCodec()))
.sendObject(message)
.then())
.log("send", Level.FINEST)
)
);
this.serverConn = qs.bindNow();
var clientSslContext = QuicSslContextBuilder
.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.applicationProtocols("db/0.9")
.build();
this.clientConn = QuicClient.create()
.bindAddress(() -> new InetSocketAddress(0))
.remoteAddress(() -> serverAddress)
.secure(clientSslContext)
.idleTimeout(Duration.ofSeconds(30))
.initialSettings(spec -> spec
.maxData(10000000)
.maxStreamDataBidirectionalLocal(1000000)
)
.connectNow();
}
@AfterEach
void tearDown() {
if (clientConn != null) {
clientConn.disposeNow();
}
if (serverConn != null) {
serverConn.disposeNow();
}
}
@Test
void sendSimpleRequest() {
RPCEvent response = QuicUtils.<RPCEvent, RPCEvent>sendSimpleRequest(clientConn,
RPCEventCodec::new,
RPCEventCodec::new,
new SingletonGet(NORMAL, NullableLLSnapshot.empty())
).blockOptional().orElseThrow();
assertEquals(Empty.of(), response);
}
@Test
void sendSimpleRequestFlux() {
List<RPCEvent> results = QuicUtils.<RPCEvent, RPCEvent>sendSimpleRequestFlux(clientConn,
RPCEventCodec::new,
RPCEventCodec::new,
Flux.just(
new SingletonGet(NORMAL, NullableLLSnapshot.empty()),
new SingletonGet(NORMAL, NullableLLSnapshot.empty()),
new SingletonGet(NORMAL, NullableLLSnapshot.empty()),
new SingletonGet(NORMAL, NullableLLSnapshot.empty()),
new SingletonGet(NORMAL, NullableLLSnapshot.empty())
)
).collectList().blockOptional().orElseThrow();
assertEquals(5, results.size());
assertEquals(List.of(Empty.of(), Empty.of(), Empty.of(), Empty.of(), Empty.of()), results);
}
@Test
void sendUpdateFluxNormal() {
RPCEvent results = QuicUtils.<RPCEvent>sendUpdate(clientConn,
RPCEventCodec::new,
new SingletonGet(NORMAL, NullableLLSnapshot.empty()),
serverData -> Mono.fromCallable(() -> {
assertEquals(Empty.of(), serverData);
return new SingletonGet(NORMAL, NullableLLSnapshot.empty());
})
).blockOptional().orElseThrow();
assertEquals(Empty.of(), results);
}
@Test
void sendUpdateFluxSlowClient() {
RPCEvent results = QuicUtils.<RPCEvent>sendUpdate(clientConn,
RPCEventCodec::new,
new SingletonGet(NORMAL, NullableLLSnapshot.empty()),
serverData -> Mono.<RPCEvent>fromCallable(() -> {
assertEquals(Empty.of(), serverData);
return new SingletonGet(NORMAL, NullableLLSnapshot.empty());
}).delayElement(Duration.ofSeconds(2))
).blockOptional().orElseThrow();
assertEquals(Empty.of(), results);
}
@Test
void sendUpdateFluxSlowServer() {
RPCEvent results = QuicUtils.<RPCEvent>sendUpdate(clientConn,
RPCEventCodec::new,
new SingletonGet(WAIT_TIME, NullableLLSnapshot.empty()),
serverData -> Mono.fromCallable(() -> {
assertEquals(Empty.of(), serverData);
return new SingletonGet(WAIT_TIME, NullableLLSnapshot.empty());
})
).blockOptional().orElseThrow();
assertEquals(Empty.of(), results);
}
@Test
void sendUpdateFluxSlowClientAndServer() {
RPCEvent results = QuicUtils.<RPCEvent>sendUpdate(clientConn,
RPCEventCodec::new,
new SingletonGet(WAIT_TIME, NullableLLSnapshot.empty()),
serverData -> Mono.<RPCEvent>fromCallable(() -> {
assertEquals(Empty.of(), serverData);
return new SingletonGet(WAIT_TIME, NullableLLSnapshot.empty());
}).delayElement(Duration.ofSeconds(2))
).blockOptional().orElseThrow();
assertEquals(Empty.of(), results);
}
@Test
void sendUpdateClientFail() {
class ExpectedException extends Throwable {}
assertThrows(ExpectedException.class, () -> {
try {
RPCEvent results = QuicUtils
.<RPCEvent>sendUpdate(clientConn,
RPCEventCodec::new,
new SingletonGet(NORMAL, NullableLLSnapshot.empty()),
serverData -> Mono.error(new ExpectedException())
)
.blockOptional()
.orElseThrow();
} catch (Throwable e) {
throw e.getCause();
}
});
}
@Test
void sendUpdateServerFail1() {
RPCEvent results = QuicUtils.<RPCEvent>sendUpdate(clientConn,
RPCEventCodec::new,
new SingletonGet(FAIL_IMMEDIATELY, NullableLLSnapshot.empty()),
serverData -> Mono.fromCallable(() -> {
fail("Called update");
return new SingletonGet(NORMAL, NullableLLSnapshot.empty());
})
).blockOptional().orElseThrow();
assertEquals(RPCCrash.of(500, NullableString.of("Expected error")), results);
}
@Test
void sendUpdateServerFail2() {
RPCEvent results = QuicUtils.<RPCEvent>sendUpdate(clientConn,
RPCEventCodec::new,
new SingletonGet(NORMAL, NullableLLSnapshot.empty()),
serverData -> Mono.fromCallable(() -> {
assertEquals(Empty.of(), serverData);
return new SingletonGet(FAIL_IMMEDIATELY, NullableLLSnapshot.empty());
})
).blockOptional().orElseThrow();
assertEquals(RPCCrash.of(500, NullableString.of("Expected error")), results);
}
@Test
void sendSimpleRequestConcurrently() {
// Send the request a second time
var requestMono = QuicUtils.<RPCEvent, RPCEvent>sendSimpleRequest(clientConn,
RPCEventCodec::new,
RPCEventCodec::new,
new SingletonGet(NORMAL, NullableLLSnapshot.empty())
);
var results = Flux
.merge(requestMono, requestMono, requestMono, requestMono, requestMono)
.collectList()
.blockOptional()
.orElseThrow();
assertEquals(5, results.size());
assertEquals(List.of(Empty.of(), Empty.of(), Empty.of(), Empty.of(), Empty.of()), results);
}
@Test
void sendFailedRequest() {
RPCEvent response = QuicUtils.<RPCEvent, RPCEvent>sendSimpleRequest(clientConn,
RPCEventCodec::new,
RPCEventCodec::new,
new SingletonGet(FAIL_IMMEDIATELY, NullableLLSnapshot.empty())
).blockOptional().orElseThrow();
assertEquals(RPCCrash.of(500, NullableString.of("Expected error")), response);
}
@Test
void createStream() {
}
}