diff --git a/pom.xml b/pom.xml
index e8465e4..b1a2a9a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -110,7 +110,60 @@
log4j-core
2.17.1
-
+
+ org.junit.jupiter
+ junit-jupiter-api
+ 5.8.2
+ test
+
+
+ org.hamcrest
+ hamcrest-core
+
+
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ 5.8.2
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-params
+ 5.8.2
+ test
+
+
+ org.assertj
+ assertj-core
+ 3.22.0
+ test
+
+
+
+ org.hamcrest
+ hamcrest-library
+ 2.2
+ test
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+ 2.17.1
+ test
+
+
+ org.slf4j
+ slf4j-api
+
+
+ org.apache.logging.log4j
+ log4j-api
+
+
+
+
src/test/java
@@ -175,7 +228,7 @@
org.junit.jupiter
junit-jupiter-engine
- 5.8.0-M1
+ 5.8.2
diff --git a/src/main/java/it/cavallium/dbengine/database/remote/DatabasePartName.java b/src/main/java/it/cavallium/dbengine/database/remote/DatabasePartName.java
new file mode 100644
index 0000000..e666273
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/remote/DatabasePartName.java
@@ -0,0 +1,5 @@
+package it.cavallium.dbengine.database.remote;
+
+import it.unimi.dsi.fastutil.bytes.ByteList;
+
+public record DatabasePartName(long dbRef, ByteList resourceName) {}
diff --git a/src/main/java/it/cavallium/dbengine/database/remote/QuicRPCServer.java b/src/main/java/it/cavallium/dbengine/database/remote/QuicRPCServer.java
new file mode 100644
index 0000000..bf282a1
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/remote/QuicRPCServer.java
@@ -0,0 +1,260 @@
+package it.cavallium.dbengine.database.remote;
+
+import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
+import io.net5.buffer.api.Buffer;
+import io.net5.buffer.api.DefaultBufferAllocators;
+import io.net5.buffer.api.Send;
+import io.netty.handler.ssl.ClientAuth;
+import io.netty.incubator.codec.quic.InsecureQuicTokenHandler;
+import io.netty.incubator.codec.quic.QuicConnectionIdGenerator;
+import io.netty.incubator.codec.quic.QuicSslContext;
+import io.netty.incubator.codec.quic.QuicSslContextBuilder;
+import it.cavallium.dbengine.database.LLDatabaseConnection;
+import it.cavallium.dbengine.database.LLKeyValueDatabase;
+import it.cavallium.dbengine.database.LLLuceneIndex;
+import it.cavallium.dbengine.database.LLSingleton;
+import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
+import it.cavallium.dbengine.database.remote.RPCCodecs.RPCEventCodec;
+import it.cavallium.dbengine.lucene.LuceneRocksDBManager;
+import it.cavallium.dbengine.rpc.current.data.Binary;
+import it.cavallium.dbengine.rpc.current.data.BinaryOptional;
+import it.cavallium.dbengine.rpc.current.data.CloseDatabase;
+import it.cavallium.dbengine.rpc.current.data.CloseLuceneIndex;
+import it.cavallium.dbengine.rpc.current.data.Empty;
+import it.cavallium.dbengine.rpc.current.data.GeneratedEntityId;
+import it.cavallium.dbengine.rpc.current.data.GetDatabase;
+import it.cavallium.dbengine.rpc.current.data.GetLuceneIndex;
+import it.cavallium.dbengine.rpc.current.data.GetSingleton;
+import it.cavallium.dbengine.rpc.current.data.RPCEvent;
+import it.cavallium.dbengine.rpc.current.data.ServerBoundRequest;
+import it.cavallium.dbengine.rpc.current.data.SingletonUpdateEnd;
+import it.cavallium.dbengine.rpc.current.data.SingletonUpdateInit;
+import it.cavallium.dbengine.rpc.current.data.SingletonUpdateOldData;
+import it.cavallium.dbengine.rpc.current.data.nullables.NullableBinary;
+import it.unimi.dsi.fastutil.bytes.ByteList;
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Signal;
+import reactor.core.publisher.SignalType;
+import reactor.core.publisher.Sinks;
+import reactor.core.publisher.Sinks.Many;
+import reactor.netty.Connection;
+import reactor.netty.DisposableChannel;
+import reactor.netty.incubator.quic.QuicServer;
+
+public class QuicRPCServer {
+
+ private static final Logger LOG = LogManager.getLogger(QuicRPCServer.class);
+ private final QuicServer quicServer;
+ protected final LuceneRocksDBManager rocksDBManager;
+ protected final LLDatabaseConnection localDb;
+ private final AtomicReference connectionAtomicReference = new AtomicReference<>();
+
+ private final ReferencedResources dbs = new ReferencedResources<>(this::obtainDatabase, LLKeyValueDatabase::close);
+ private final ReferencedResources singletons = new ReferencedResources<>(this::obtainSingleton, s -> Mono.empty());
+ private final ReferencedResources indices = new ReferencedResources<>(this::obtainLuceneIndex, LLLuceneIndex::close);
+
+ public QuicRPCServer(LuceneRocksDBManager rocksDBManager, LLDatabaseConnection localDb, QuicServer quicServer) {
+ this.rocksDBManager = rocksDBManager;
+ this.localDb = localDb;
+ this.quicServer = quicServer.handleStream((in, out) -> in
+ .withConnection(conn -> conn.addHandler(new RPCEventCodec()))
+ .receiveObject()
+ .cast(RPCEvent.class)
+ .log("ServerBoundRequest", Level.FINEST, SignalType.ON_NEXT)
+ .switchOnFirst((Signal extends RPCEvent> first, Flux flux) -> {
+ if (first.hasValue()) {
+ ServerBoundRequest value = (ServerBoundRequest) first.get();
+ if (value instanceof GetDatabase getDatabase) {
+ return handleGetDatabase(getDatabase)
+ .transform(this::catchRPCErrorsFlux);
+ } else if (value instanceof GetSingleton getSingleton) {
+ return handleGetSingleton(getSingleton)
+ .transform(this::catchRPCErrorsFlux);
+ } else if (value instanceof SingletonUpdateInit singletonUpdateInit) {
+ return handleSingletonUpdateInit(singletonUpdateInit, flux.skip(1))
+ .transform(this::catchRPCErrorsFlux);
+ } else if (value instanceof CloseDatabase closeDatabase) {
+ return handleCloseDatabase(closeDatabase)
+ .transform(this::catchRPCErrorsFlux);
+ } else if (value instanceof CloseLuceneIndex closeLuceneIndex) {
+ return handleCloseLuceneIndex(closeLuceneIndex)
+ .transform(this::catchRPCErrorsFlux);
+ } else if (value instanceof GetLuceneIndex getLuceneIndex) {
+ return handleGetLuceneIndex(getLuceneIndex)
+ .transform(this::catchRPCErrorsFlux);
+ } else {
+ return QuicUtils.catchRPCErrors(new UnsupportedOperationException("Unsupported request type: " + first));
+ }
+ } else {
+ return flux;
+ }
+ })
+ .doOnError(ex -> LOG.error("Failed to handle a request", ex))
+ .onErrorResume(QuicUtils::catchRPCErrors)
+ .concatMap(response -> out
+ .withConnection(conn -> conn.addHandler(new RPCEventCodec()))
+ .sendObject(response)
+ )
+ );
+ }
+
+ private Flux catchRPCErrorsFlux(Publisher extends RPCEvent> flux) {
+ return Flux
+ .from(flux)
+ .doOnError(ex -> LOG.error("Failed to handle a request", ex))
+ .cast(RPCEvent.class)
+ .onErrorResume(QuicUtils::catchRPCErrors);
+
+ }
+
+ public Mono bind() {
+ return quicServer.bind().doOnNext(connectionAtomicReference::set).then();
+ }
+
+ public Mono dispose() {
+ return Mono
+ .fromSupplier(connectionAtomicReference::get)
+ .doOnNext(DisposableChannel::dispose)
+ .flatMap(DisposableChannel::onDispose);
+ }
+
+ public Mono onDispose() {
+ return Mono.fromSupplier(connectionAtomicReference::get).flatMap(DisposableChannel::onDispose);
+ }
+
+ public static void main(String[] args) throws URISyntaxException {
+ var rocksDBManager = new LuceneRocksDBManager();
+ var localDb = new LLLocalDatabaseConnection(DefaultBufferAllocators.preferredAllocator(),
+ new CompositeMeterRegistry(), Path.of("."), false, rocksDBManager);
+ String keyFileLocation = System.getProperty("it.cavalliumdb.keyFile", null);
+ String keyFilePassword = System.getProperty("it.cavalliumdb.keyPassword", null);
+ String certFileLocation = System.getProperty("it.cavalliumdb.certFile", null);
+ String clientCertsLocation = System.getProperty("it.cavalliumdb.clientCerts", null);
+ String bindAddressText = Objects.requireNonNull(System.getProperty("it.cavalliumdb.bindAddress", null), "Empty bind address");
+ var bindURI = new URI("inet://" + bindAddressText);
+ var bindAddress = new InetSocketAddress(bindURI.getHost(), bindURI.getPort());
+
+ QuicSslContext sslContext = QuicSslContextBuilder
+ .forServer(new File(keyFileLocation), keyFilePassword, new File(certFileLocation))
+ .trustManager(new File(clientCertsLocation))
+ .applicationProtocols("db/0.9")
+ .clientAuth(ClientAuth.REQUIRE)
+ .build();
+ var qs = reactor.netty.incubator.quic.QuicServer
+ .create()
+ .tokenHandler(InsecureQuicTokenHandler.INSTANCE)
+ .bindAddress(() -> bindAddress)
+ .secure(sslContext)
+ .idleTimeout(Duration.ofSeconds(30))
+ .connectionIdAddressGenerator(QuicConnectionIdGenerator.randomGenerator())
+ .initialSettings(spec -> spec
+ .maxData(10000000)
+ .maxStreamDataBidirectionalLocal(1000000)
+ .maxStreamDataBidirectionalRemote(1000000)
+ .maxStreamsBidirectional(100)
+ .maxStreamsUnidirectional(100)
+ );
+ QuicRPCServer server = new QuicRPCServer(rocksDBManager, localDb, qs);
+ server.bind().block();
+ server.onDispose().block();
+ localDb.disconnect().block();
+ rocksDBManager.closeAll();
+ }
+
+ private Flux handleSingletonUpdateInit(
+ SingletonUpdateInit singletonUpdateInit,
+ Flux otherRequests) {
+ return singletons
+ .getResource(singletonUpdateInit.singletonId())
+ .flatMapMany(singleton -> {
+ Many clientBound = Sinks.many().unicast().onBackpressureBuffer();
+ Mono update = singleton.update(prev -> {
+ clientBound
+ .tryEmitNext(new SingletonUpdateOldData(prev != null, prev != null ? toByteList(prev) : ByteList.of()))
+ .orThrow();
+ SingletonUpdateEnd newValue = (SingletonUpdateEnd) otherRequests.singleOrEmpty().block();
+ Objects.requireNonNull(newValue);
+ if (!newValue.exist()) {
+ return null;
+ } else {
+ return localDb.getAllocator().copyOf(QuicUtils.toArrayNoCopy(newValue.value()));
+ }
+ }, singletonUpdateInit.updateReturnMode())
+ .map(result -> new BinaryOptional(result != null ? NullableBinary.of(Binary.of(toByteList(result))) : NullableBinary.empty()));
+ return Flux.merge(update, clientBound.asFlux());
+ });
+ }
+
+ private static ByteList toByteList(Send prev) {
+ try (var prevVal = prev.receive()) {
+ byte[] result = new byte[prevVal.readableBytes()];
+ prevVal.readBytes(result, 0, result.length);
+ return ByteList.of(result);
+ }
+ }
+
+ private Mono handleGetSingleton(GetSingleton getSingleton) {
+ var id = new DatabasePartName(getSingleton.databaseId(), getSingleton.singletonListColumnName());
+ return this.singletons.getReference(id, getSingleton).map(GeneratedEntityId::new);
+ }
+
+ private Mono obtainSingleton(DatabasePartName id, GetSingleton getSingleton) {
+ Mono dbMono = dbs.getResource(id.dbRef());
+ return dbMono.flatMap(db -> db.getSingleton(
+ QuicUtils.toArrayNoCopy(getSingleton.singletonListColumnName()),
+ QuicUtils.toArrayNoCopy(getSingleton.name()),
+ QuicUtils.toArrayNoCopy(getSingleton.defaultValue())
+ ));
+ }
+
+ private Mono handleGetDatabase(GetDatabase getDatabase) {
+ return this.dbs.getReference(getDatabase.name(), getDatabase).map(GeneratedEntityId::of);
+ }
+
+ private Mono extends LLKeyValueDatabase> obtainDatabase(String id, GetDatabase getDatabase) {
+ // Disable optimistic transactions, since network transactions require a lot of time
+ var options = getDatabase.databaseOptions().setOptimistic(false);
+ return localDb.getDatabase(id, getDatabase.columns(), options);
+ }
+
+ public Mono handleGetLuceneIndex(GetLuceneIndex getLuceneIndex) {
+ return this.indices
+ .getReference(getLuceneIndex.clusterName(), getLuceneIndex)
+ .map(GeneratedEntityId::new);
+ }
+
+ private Mono extends LLLuceneIndex> obtainLuceneIndex(String id, GetLuceneIndex getLuceneIndex) {
+ return localDb.getLuceneIndex(getLuceneIndex.clusterName(),
+ getLuceneIndex.structure(),
+ getLuceneIndex.indicizerAnalyzers(),
+ getLuceneIndex.indicizerSimilarities(),
+ getLuceneIndex.luceneOptions(),
+ null
+ );
+ }
+
+ private Mono handleCloseDatabase(CloseDatabase closeDatabase) {
+ return this.dbs.getResource(closeDatabase.databaseId()).flatMap(LLKeyValueDatabase::close).thenReturn(Empty.of());
+ }
+
+ private Mono handleCloseLuceneIndex(CloseLuceneIndex closeLuceneIndex) {
+ return this.indices
+ .getResource(closeLuceneIndex.luceneIndexId())
+ .flatMap(LLLuceneIndex::close)
+ .thenReturn(Empty.of());
+ }
+
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/remote/QuicServer.java b/src/main/java/it/cavallium/dbengine/database/remote/QuicServer.java
deleted file mode 100644
index 55ca4b2..0000000
--- a/src/main/java/it/cavallium/dbengine/database/remote/QuicServer.java
+++ /dev/null
@@ -1,119 +0,0 @@
-package it.cavallium.dbengine.database.remote;
-
-import io.micrometer.core.instrument.MeterRegistry;
-import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
-import io.micrometer.core.instrument.noop.NoopMeter;
-import io.net5.buffer.api.DefaultBufferAllocators;
-import io.netty.handler.ssl.ClientAuth;
-import io.netty.incubator.codec.quic.InsecureQuicTokenHandler;
-import io.netty.incubator.codec.quic.QuicConnectionIdGenerator;
-import io.netty.incubator.codec.quic.QuicSslContext;
-import io.netty.incubator.codec.quic.QuicSslContextBuilder;
-import it.cavallium.dbengine.database.LLKeyValueDatabase;
-import it.cavallium.dbengine.database.LLLuceneIndex;
-import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
-import it.cavallium.dbengine.database.remote.RPCCodecs.RPCClientBoundResponseDecoder;
-import it.cavallium.dbengine.database.remote.RPCCodecs.RPCServerBoundRequestDecoder;
-import it.cavallium.dbengine.rpc.current.data.GeneratedEntityId;
-import it.cavallium.dbengine.rpc.current.data.GetDatabase;
-import it.cavallium.dbengine.rpc.current.data.ServerBoundRequest;
-import java.io.File;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.file.Path;
-import java.time.Duration;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import reactor.core.publisher.Mono;
-import reactor.core.publisher.SignalType;
-
-public class QuicServer {
-
- private static final Logger LOG = LogManager.getLogger(QuicServer.class);
- private static LLLocalDatabaseConnection localDb;
-
- private static final AtomicLong nextResourceId = new AtomicLong(1);
- private static final ConcurrentHashMap dbToId = new ConcurrentHashMap<>();
- private static final ConcurrentHashMap dbById = new ConcurrentHashMap<>();
- private static final ConcurrentHashMap indexToId = new ConcurrentHashMap<>();
- private static final ConcurrentHashMap indexById = new ConcurrentHashMap<>();
-
- public static void main(String[] args) throws URISyntaxException {
- localDb = new LLLocalDatabaseConnection(DefaultBufferAllocators.preferredAllocator(),
- new CompositeMeterRegistry(), Path.of("."), false);
- String keyFileLocation = System.getProperty("it.cavalliumdb.keyFile", null);
- String keyFilePassword = System.getProperty("it.cavalliumdb.keyPassword", null);
- String certFileLocation = System.getProperty("it.cavalliumdb.certFile", null);
- String clientCertsLocation = System.getProperty("it.cavalliumdb.clientCerts", null);
- String bindAddress = Objects.requireNonNull(System.getProperty("it.cavalliumdb.bindAddress", null), "Empty bind address");
- var bindURI = new URI("inet://" + bindAddress);
-
- QuicSslContext sslContext = QuicSslContextBuilder
- .forServer(new File(keyFileLocation), keyFilePassword, new File(certFileLocation))
- .trustManager(new File(clientCertsLocation))
- .applicationProtocols("db/0.9")
- .clientAuth(ClientAuth.REQUIRE)
- .build();
- var qs = reactor.netty.incubator.quic.QuicServer
- .create()
- .tokenHandler(InsecureQuicTokenHandler.INSTANCE)
- .bindAddress(() -> new InetSocketAddress(bindURI.getHost(), bindURI.getPort()))
- .secure(sslContext)
- .idleTimeout(Duration.ofSeconds(30))
- .connectionIdAddressGenerator(QuicConnectionIdGenerator.randomGenerator())
- .initialSettings(spec -> spec
- .maxData(10000000)
- .maxStreamDataBidirectionalLocal(1000000)
- .maxStreamDataBidirectionalRemote(1000000)
- .maxStreamsBidirectional(100)
- .maxStreamsUnidirectional(100)
- )
- .handleStream((in, out) -> {
- var inConn = in.withConnection(conn -> conn.addHandler(new RPCClientBoundResponseDecoder()));
- var outConn = out.withConnection(conn -> conn.addHandler(new RPCServerBoundRequestDecoder()));
- return inConn
- .receiveObject()
- .cast(ServerBoundRequest.class)
- .log("req", Level.INFO, SignalType.ON_NEXT)
- .switchOnFirst((first, flux) -> {
- if (first.hasValue()) {
- var value = first.get();
- if (value instanceof GetDatabase getDatabase) {
- return handleGetDatabase(getDatabase);
- } else {
- return Mono.error(new UnsupportedOperationException("Unsupported request type: " + first));
- }
- } else {
- return flux;
- }
- })
- .doOnError(ex -> LOG.error("Failed to handle a request", ex))
- .onErrorResume(ex -> Mono.empty())
- .transform(outConn::sendObject);
- });
- var conn = qs.bindNow();
- conn.onDispose().block();
- }
-
- private static Mono handleGetDatabase(GetDatabase getDatabase) {
- Mono dbCreationMono = localDb
- .getDatabase(getDatabase.name(), getDatabase.columns(), getDatabase.databaseOptions())
- .flatMap(db -> Mono.fromCallable(() -> {
- long id = nextResourceId.getAndIncrement();
- dbById.put(id, db);
- dbToId.put(getDatabase.name(), id);
- return GeneratedEntityId.of(id);
- }));
-
- Mono existingDbMono = Mono
- .fromSupplier(() -> dbToId.get(getDatabase.name()))
- .map(GeneratedEntityId::of);
-
- return existingDbMono.switchIfEmpty(dbCreationMono);
- }
-}
diff --git a/src/main/java/it/cavallium/dbengine/database/remote/ReferencedResource.java b/src/main/java/it/cavallium/dbengine/database/remote/ReferencedResource.java
new file mode 100644
index 0000000..3d738bc
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/remote/ReferencedResource.java
@@ -0,0 +1,3 @@
+package it.cavallium.dbengine.database.remote;
+
+public record ReferencedResource(Long reference, T resource) {}
diff --git a/src/main/java/it/cavallium/dbengine/database/remote/ReferencedResources.java b/src/main/java/it/cavallium/dbengine/database/remote/ReferencedResources.java
new file mode 100644
index 0000000..ab87960
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/remote/ReferencedResources.java
@@ -0,0 +1,117 @@
+package it.cavallium.dbengine.database.remote;
+
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.jetbrains.annotations.Nullable;
+import reactor.core.publisher.Mono;
+
+/**
+ *
+ * @param Identifier type
+ * @param Value type
+ */
+public class ReferencedResources {
+
+ private final ResourceGetter resourceGetter;
+ private final ResourceFreer resourceFree;
+ private final AtomicLong nextReferenceId = new AtomicLong(1);
+ private final ConcurrentHashMap identifierToReferenceId = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap identifierByReferenceId = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap resourceByReferenceId = new ConcurrentHashMap<>();
+
+ public ReferencedResources(ResourceGetter resourceGetter, ResourceFreer resourceFree) {
+ this.resourceGetter = resourceGetter;
+ this.resourceFree = resourceFree;
+ }
+
+ protected Mono extends V> obtain(I identifier, E extraParams) {
+ return resourceGetter.obtain(identifier, extraParams);
+ }
+
+ protected Mono free(V resource) {
+ return resourceFree.free(resource);
+ }
+
+ public Mono getReference(I identifier, @Nullable E extraParams) {
+ Mono existingDbMono = Mono.fromSupplier(() -> identifierToReferenceId.get(identifier));
+
+ if (extraParams != null) {
+ // Defer to avoid building this chain when not needed
+ Mono dbCreationMono = Mono.defer(() -> this
+ .obtain(identifier, extraParams)
+ .flatMap(db -> Mono.fromCallable(() -> {
+ long referenceId = nextReferenceId.getAndIncrement();
+ resourceByReferenceId.put(referenceId, db);
+ identifierToReferenceId.put(identifier, referenceId);
+ identifierByReferenceId.put(referenceId, identifier);
+ return referenceId;
+ })));
+
+ return existingDbMono.switchIfEmpty(dbCreationMono);
+ } else {
+ return existingDbMono
+ .switchIfEmpty(Mono.error(() -> new NoSuchElementException("Resource not found: " + identifier)));
+ }
+ }
+
+ public Mono> getResource(I identifier, @Nullable E extraParams) {
+ Mono> existingDbMono = Mono.fromSupplier(() -> {
+ var referenceId = identifierToReferenceId.get(identifier);
+ if (referenceId == null) {
+ return null;
+ }
+ var resource = resourceByReferenceId.get(referenceId);
+ if (resource == null) {
+ return null;
+ }
+ return new ReferencedResource<>(referenceId, resource);
+ });
+
+ if (extraParams != null) {
+ // Defer to avoid building this chain when not needed
+ Mono> dbCreationMono = Mono.defer(() -> this
+ .obtain(identifier, extraParams)
+ .map(resource -> {
+ long referenceId = nextReferenceId.getAndIncrement();
+ resourceByReferenceId.put(referenceId, resource);
+ identifierToReferenceId.put(identifier, referenceId);
+ identifierByReferenceId.put(referenceId, identifier);
+ return new ReferencedResource<>(referenceId, resource);
+ }));
+
+ return existingDbMono.switchIfEmpty(dbCreationMono);
+ } else {
+ return existingDbMono
+ .switchIfEmpty(Mono.error(() -> new NoSuchElementException("Resource not found: " + identifier)));
+ }
+ }
+
+ public Mono getResource(long referenceId) {
+ Mono existingDbMono = Mono.fromSupplier(() -> resourceByReferenceId.get(referenceId));
+ return existingDbMono.switchIfEmpty(Mono.error(() ->
+ new NoSuchElementException("Resource not found: " + referenceId)));
+ }
+
+ public Mono releaseResource(I identifier) {
+ return Mono.fromSupplier(() -> {
+ var referenceId = identifierToReferenceId.remove(identifier);
+ if (referenceId == null) {
+ return null;
+ }
+ identifierByReferenceId.remove(referenceId);
+ return resourceByReferenceId.remove(referenceId);
+ }).flatMap(this::free);
+ }
+
+ public Mono releaseResource(long referenceId) {
+ return Mono.fromSupplier(() -> {
+ var identifier = identifierByReferenceId.remove(referenceId);
+ if (identifier == null) {
+ return null;
+ }
+ identifierToReferenceId.remove(identifier);
+ return resourceByReferenceId.remove(referenceId);
+ }).flatMap(this::free);
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/remote/ResourceFreer.java b/src/main/java/it/cavallium/dbengine/database/remote/ResourceFreer.java
new file mode 100644
index 0000000..c355ac0
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/remote/ResourceFreer.java
@@ -0,0 +1,8 @@
+package it.cavallium.dbengine.database.remote;
+
+import reactor.core.publisher.Mono;
+
+public interface ResourceFreer {
+
+ Mono free(V resource);
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/remote/ResourceGetter.java b/src/main/java/it/cavallium/dbengine/database/remote/ResourceGetter.java
new file mode 100644
index 0000000..201eca0
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/remote/ResourceGetter.java
@@ -0,0 +1,8 @@
+package it.cavallium.dbengine.database.remote;
+
+import reactor.core.publisher.Mono;
+
+public interface ResourceGetter {
+
+ Mono extends V> obtain(I identifier, E extra);
+}
diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml
index 5cfb8ca..c84b52b 100644
--- a/src/main/resources/log4j2.xml
+++ b/src/main/resources/log4j2.xml
@@ -12,7 +12,7 @@
-->
-
+
diff --git a/src/test/java/it/cavallium/dbengine/database/remote/LLQuicConnectionTest.java b/src/test/java/it/cavallium/dbengine/database/remote/LLQuicConnectionTest.java
new file mode 100644
index 0000000..3e3028a
--- /dev/null
+++ b/src/test/java/it/cavallium/dbengine/database/remote/LLQuicConnectionTest.java
@@ -0,0 +1,112 @@
+package it.cavallium.dbengine.database.remote;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
+import io.net5.buffer.api.BufferAllocator;
+import io.net5.buffer.api.DefaultBufferAllocators;
+import it.cavallium.data.generator.nativedata.Nullableboolean;
+import it.cavallium.data.generator.nativedata.Nullableint;
+import it.cavallium.data.generator.nativedata.Nullablelong;
+import it.cavallium.dbengine.client.IndicizerAnalyzers;
+import it.cavallium.dbengine.client.IndicizerSimilarities;
+import it.cavallium.dbengine.database.ColumnUtils;
+import it.cavallium.dbengine.database.LLDatabaseConnection;
+import it.cavallium.dbengine.rpc.current.data.ByteBuffersDirectory;
+import it.cavallium.dbengine.rpc.current.data.DatabaseOptions;
+import it.cavallium.dbengine.rpc.current.data.LuceneIndexStructure;
+import it.cavallium.dbengine.rpc.current.data.LuceneOptions;
+import it.unimi.dsi.fastutil.ints.IntList;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class LLQuicConnectionTest {
+
+ private BufferAllocator allocator;
+ private CompositeMeterRegistry meterRegistry;
+ private TestDbServer server;
+ private LLDatabaseConnection client;
+
+ @BeforeEach
+ void setUp() {
+ this.allocator = DefaultBufferAllocators.preferredAllocator();
+ this.meterRegistry = new CompositeMeterRegistry();
+ this.server = TestDbServer.create(allocator, meterRegistry);
+ server.bind().block();
+ this.client = TestDbClient.create(allocator, meterRegistry, server.address()).connect().block();
+ }
+
+ @AfterEach
+ void tearDown() {
+ if (client != null) {
+ client.disconnect().block();
+ }
+ if (server != null) {
+ server.dispose().block();
+ }
+ }
+
+ @Test
+ void getAllocator() {
+ assertEquals(allocator, client.getAllocator());
+ }
+
+ @Test
+ void getMeterRegistry() {
+ assertEquals(meterRegistry, client.getMeterRegistry());
+ }
+
+ @Test
+ void getDatabase() {
+ var dbName = "test-temp-db";
+ var singletonsColumnName = "singletons";
+ var db = client.getDatabase(dbName,
+ List.of(ColumnUtils.special(singletonsColumnName)),
+ new DatabaseOptions(List.of(),
+ Map.of(),
+ true,
+ true,
+ true,
+ true,
+ true,
+ true,
+ Nullableint.empty(),
+ Nullablelong.empty(),
+ Nullablelong.empty(),
+ Nullableboolean.empty()
+ )
+ ).blockOptional().orElseThrow();
+ assertEquals(dbName, db.getDatabaseName());
+ assertEquals(allocator, db.getAllocator());
+ assertEquals(meterRegistry, db.getMeterRegistry());
+ assertDoesNotThrow(() -> db.close().block());
+ }
+
+ @Test
+ void getLuceneIndex() {
+ var shardName = "test-lucene-shard";
+ var index = client.getLuceneIndex(shardName,
+ new LuceneIndexStructure(1, IntList.of(0)),
+ IndicizerAnalyzers.of(),
+ IndicizerSimilarities.of(),
+ new LuceneOptions(Map.of(),
+ Duration.ofSeconds(1),
+ Duration.ofSeconds(1),
+ false,
+ new ByteBuffersDirectory(),
+ -1,
+ false,
+ false,
+ false,
+ 100
+ ),
+ null).blockOptional().orElseThrow();
+ assertEquals(shardName, index.getLuceneIndexName());
+ assertDoesNotThrow(() -> index.close().block());
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/it/cavallium/dbengine/database/remote/TestDbClient.java b/src/test/java/it/cavallium/dbengine/database/remote/TestDbClient.java
new file mode 100644
index 0000000..48aef83
--- /dev/null
+++ b/src/test/java/it/cavallium/dbengine/database/remote/TestDbClient.java
@@ -0,0 +1,18 @@
+package it.cavallium.dbengine.database.remote;
+
+import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
+import io.net5.buffer.api.BufferAllocator;
+import java.net.InetSocketAddress;
+
+public class TestDbClient {
+
+ public static LLQuicConnection create(BufferAllocator allocator,
+ CompositeMeterRegistry meterRegistry,
+ InetSocketAddress serverAddress) {
+ return new LLQuicConnection(allocator,
+ meterRegistry,
+ new InetSocketAddress(0),
+ serverAddress
+ );
+ }
+}
diff --git a/src/test/java/it/cavallium/dbengine/database/remote/TestDbServer.java b/src/test/java/it/cavallium/dbengine/database/remote/TestDbServer.java
new file mode 100644
index 0000000..a678abc
--- /dev/null
+++ b/src/test/java/it/cavallium/dbengine/database/remote/TestDbServer.java
@@ -0,0 +1,71 @@
+package it.cavallium.dbengine.database.remote;
+
+import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
+import io.net5.buffer.api.BufferAllocator;
+import io.netty.handler.ssl.ClientAuth;
+import io.netty.handler.ssl.util.SelfSignedCertificate;
+import io.netty.incubator.codec.quic.InsecureQuicTokenHandler;
+import io.netty.incubator.codec.quic.QuicConnectionIdGenerator;
+import io.netty.incubator.codec.quic.QuicSslContext;
+import io.netty.incubator.codec.quic.QuicSslContextBuilder;
+import it.cavallium.dbengine.database.LLDatabaseConnection;
+import it.cavallium.dbengine.database.memory.LLMemoryDatabaseConnection;
+import it.cavallium.dbengine.lucene.LuceneRocksDBManager;
+import java.net.InetSocketAddress;
+import java.security.cert.CertificateException;
+import java.time.Duration;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+import reactor.netty.incubator.quic.QuicServer;
+
+public class TestDbServer extends QuicRPCServer {
+
+ private static final InetSocketAddress BIND_ADDRESS = new InetSocketAddress(8080);
+
+ public TestDbServer(LuceneRocksDBManager rocksDBManager, LLDatabaseConnection localDb, QuicServer quicServer) {
+ super(rocksDBManager, localDb, quicServer);
+ }
+
+ public static TestDbServer create(BufferAllocator allocator, CompositeMeterRegistry meterRegistry) {
+ var rocksDBManager = new LuceneRocksDBManager();
+ var localDb = new LLMemoryDatabaseConnection(allocator, meterRegistry);
+ SelfSignedCertificate selfSignedCert;
+ try {
+ selfSignedCert = new SelfSignedCertificate();
+ } catch (CertificateException e) {
+ throw new RuntimeException(e);
+ }
+ QuicSslContext sslContext = QuicSslContextBuilder
+ .forServer(selfSignedCert.key(), null, selfSignedCert.cert())
+ .applicationProtocols("db/0.9")
+ .clientAuth(ClientAuth.NONE)
+ .build();
+ var qs = reactor.netty.incubator.quic.QuicServer
+ .create()
+ .tokenHandler(InsecureQuicTokenHandler.INSTANCE)
+ .bindAddress(() -> BIND_ADDRESS)
+ .secure(sslContext)
+ .idleTimeout(Duration.ofSeconds(30))
+ .connectionIdAddressGenerator(QuicConnectionIdGenerator.randomGenerator())
+ .initialSettings(spec -> spec
+ .maxData(10000000)
+ .maxStreamDataBidirectionalLocal(1000000)
+ .maxStreamDataBidirectionalRemote(1000000)
+ .maxStreamsBidirectional(100)
+ .maxStreamsUnidirectional(100)
+ );
+ return new TestDbServer(rocksDBManager, localDb, qs);
+ }
+
+ public InetSocketAddress address() {
+ return BIND_ADDRESS;
+ }
+
+ @Override
+ public Mono dispose() {
+ var closeManager = Mono
+ .fromRunnable(rocksDBManager::closeAll)
+ .subscribeOn(Schedulers.boundedElastic());
+ return super.dispose().then(closeManager).then(localDb.disconnect());
+ }
+}