From cb966d96e37ec383e955f0aed08f35124e27f5e9 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sun, 6 Mar 2022 12:22:50 +0100 Subject: [PATCH] Partially implement server --- pom.xml | 57 +++- .../database/remote/DatabasePartName.java | 5 + .../database/remote/QuicRPCServer.java | 260 ++++++++++++++++++ .../dbengine/database/remote/QuicServer.java | 119 -------- .../database/remote/ReferencedResource.java | 3 + .../database/remote/ReferencedResources.java | 117 ++++++++ .../database/remote/ResourceFreer.java | 8 + .../database/remote/ResourceGetter.java | 8 + src/main/resources/log4j2.xml | 2 +- .../database/remote/LLQuicConnectionTest.java | 112 ++++++++ .../database/remote/TestDbClient.java | 18 ++ .../database/remote/TestDbServer.java | 71 +++++ 12 files changed, 658 insertions(+), 122 deletions(-) create mode 100644 src/main/java/it/cavallium/dbengine/database/remote/DatabasePartName.java create mode 100644 src/main/java/it/cavallium/dbengine/database/remote/QuicRPCServer.java delete mode 100644 src/main/java/it/cavallium/dbengine/database/remote/QuicServer.java create mode 100644 src/main/java/it/cavallium/dbengine/database/remote/ReferencedResource.java create mode 100644 src/main/java/it/cavallium/dbengine/database/remote/ReferencedResources.java create mode 100644 src/main/java/it/cavallium/dbengine/database/remote/ResourceFreer.java create mode 100644 src/main/java/it/cavallium/dbengine/database/remote/ResourceGetter.java create mode 100644 src/test/java/it/cavallium/dbengine/database/remote/LLQuicConnectionTest.java create mode 100644 src/test/java/it/cavallium/dbengine/database/remote/TestDbClient.java create mode 100644 src/test/java/it/cavallium/dbengine/database/remote/TestDbServer.java diff --git a/pom.xml b/pom.xml index e8465e4..b1a2a9a 100644 --- a/pom.xml +++ b/pom.xml @@ -110,7 +110,60 @@ log4j-core 2.17.1 - + + org.junit.jupiter + junit-jupiter-api + 5.8.2 + test + + + org.hamcrest + hamcrest-core + + + + + org.junit.jupiter + junit-jupiter-engine + 5.8.2 + test + + + org.junit.jupiter + junit-jupiter-params + 5.8.2 + test + + + org.assertj + assertj-core + 3.22.0 + test + + + + org.hamcrest + hamcrest-library + 2.2 + test + + + org.apache.logging.log4j + log4j-slf4j-impl + 2.17.1 + test + + + org.slf4j + slf4j-api + + + org.apache.logging.log4j + log4j-api + + + + src/test/java @@ -175,7 +228,7 @@ org.junit.jupiter junit-jupiter-engine - 5.8.0-M1 + 5.8.2 diff --git a/src/main/java/it/cavallium/dbengine/database/remote/DatabasePartName.java b/src/main/java/it/cavallium/dbengine/database/remote/DatabasePartName.java new file mode 100644 index 0000000..e666273 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/remote/DatabasePartName.java @@ -0,0 +1,5 @@ +package it.cavallium.dbengine.database.remote; + +import it.unimi.dsi.fastutil.bytes.ByteList; + +public record DatabasePartName(long dbRef, ByteList resourceName) {} diff --git a/src/main/java/it/cavallium/dbengine/database/remote/QuicRPCServer.java b/src/main/java/it/cavallium/dbengine/database/remote/QuicRPCServer.java new file mode 100644 index 0000000..bf282a1 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/remote/QuicRPCServer.java @@ -0,0 +1,260 @@ +package it.cavallium.dbengine.database.remote; + +import io.micrometer.core.instrument.composite.CompositeMeterRegistry; +import io.net5.buffer.api.Buffer; +import io.net5.buffer.api.DefaultBufferAllocators; +import io.net5.buffer.api.Send; +import io.netty.handler.ssl.ClientAuth; +import io.netty.incubator.codec.quic.InsecureQuicTokenHandler; +import io.netty.incubator.codec.quic.QuicConnectionIdGenerator; +import io.netty.incubator.codec.quic.QuicSslContext; +import io.netty.incubator.codec.quic.QuicSslContextBuilder; +import it.cavallium.dbengine.database.LLDatabaseConnection; +import it.cavallium.dbengine.database.LLKeyValueDatabase; +import it.cavallium.dbengine.database.LLLuceneIndex; +import it.cavallium.dbengine.database.LLSingleton; +import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection; +import it.cavallium.dbengine.database.remote.RPCCodecs.RPCEventCodec; +import it.cavallium.dbengine.lucene.LuceneRocksDBManager; +import it.cavallium.dbengine.rpc.current.data.Binary; +import it.cavallium.dbengine.rpc.current.data.BinaryOptional; +import it.cavallium.dbengine.rpc.current.data.CloseDatabase; +import it.cavallium.dbengine.rpc.current.data.CloseLuceneIndex; +import it.cavallium.dbengine.rpc.current.data.Empty; +import it.cavallium.dbengine.rpc.current.data.GeneratedEntityId; +import it.cavallium.dbengine.rpc.current.data.GetDatabase; +import it.cavallium.dbengine.rpc.current.data.GetLuceneIndex; +import it.cavallium.dbengine.rpc.current.data.GetSingleton; +import it.cavallium.dbengine.rpc.current.data.RPCEvent; +import it.cavallium.dbengine.rpc.current.data.ServerBoundRequest; +import it.cavallium.dbengine.rpc.current.data.SingletonUpdateEnd; +import it.cavallium.dbengine.rpc.current.data.SingletonUpdateInit; +import it.cavallium.dbengine.rpc.current.data.SingletonUpdateOldData; +import it.cavallium.dbengine.rpc.current.data.nullables.NullableBinary; +import it.unimi.dsi.fastutil.bytes.ByteList; +import java.io.File; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.time.Duration; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Signal; +import reactor.core.publisher.SignalType; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.Many; +import reactor.netty.Connection; +import reactor.netty.DisposableChannel; +import reactor.netty.incubator.quic.QuicServer; + +public class QuicRPCServer { + + private static final Logger LOG = LogManager.getLogger(QuicRPCServer.class); + private final QuicServer quicServer; + protected final LuceneRocksDBManager rocksDBManager; + protected final LLDatabaseConnection localDb; + private final AtomicReference connectionAtomicReference = new AtomicReference<>(); + + private final ReferencedResources dbs = new ReferencedResources<>(this::obtainDatabase, LLKeyValueDatabase::close); + private final ReferencedResources singletons = new ReferencedResources<>(this::obtainSingleton, s -> Mono.empty()); + private final ReferencedResources indices = new ReferencedResources<>(this::obtainLuceneIndex, LLLuceneIndex::close); + + public QuicRPCServer(LuceneRocksDBManager rocksDBManager, LLDatabaseConnection localDb, QuicServer quicServer) { + this.rocksDBManager = rocksDBManager; + this.localDb = localDb; + this.quicServer = quicServer.handleStream((in, out) -> in + .withConnection(conn -> conn.addHandler(new RPCEventCodec())) + .receiveObject() + .cast(RPCEvent.class) + .log("ServerBoundRequest", Level.FINEST, SignalType.ON_NEXT) + .switchOnFirst((Signal first, Flux flux) -> { + if (first.hasValue()) { + ServerBoundRequest value = (ServerBoundRequest) first.get(); + if (value instanceof GetDatabase getDatabase) { + return handleGetDatabase(getDatabase) + .transform(this::catchRPCErrorsFlux); + } else if (value instanceof GetSingleton getSingleton) { + return handleGetSingleton(getSingleton) + .transform(this::catchRPCErrorsFlux); + } else if (value instanceof SingletonUpdateInit singletonUpdateInit) { + return handleSingletonUpdateInit(singletonUpdateInit, flux.skip(1)) + .transform(this::catchRPCErrorsFlux); + } else if (value instanceof CloseDatabase closeDatabase) { + return handleCloseDatabase(closeDatabase) + .transform(this::catchRPCErrorsFlux); + } else if (value instanceof CloseLuceneIndex closeLuceneIndex) { + return handleCloseLuceneIndex(closeLuceneIndex) + .transform(this::catchRPCErrorsFlux); + } else if (value instanceof GetLuceneIndex getLuceneIndex) { + return handleGetLuceneIndex(getLuceneIndex) + .transform(this::catchRPCErrorsFlux); + } else { + return QuicUtils.catchRPCErrors(new UnsupportedOperationException("Unsupported request type: " + first)); + } + } else { + return flux; + } + }) + .doOnError(ex -> LOG.error("Failed to handle a request", ex)) + .onErrorResume(QuicUtils::catchRPCErrors) + .concatMap(response -> out + .withConnection(conn -> conn.addHandler(new RPCEventCodec())) + .sendObject(response) + ) + ); + } + + private Flux catchRPCErrorsFlux(Publisher flux) { + return Flux + .from(flux) + .doOnError(ex -> LOG.error("Failed to handle a request", ex)) + .cast(RPCEvent.class) + .onErrorResume(QuicUtils::catchRPCErrors); + + } + + public Mono bind() { + return quicServer.bind().doOnNext(connectionAtomicReference::set).then(); + } + + public Mono dispose() { + return Mono + .fromSupplier(connectionAtomicReference::get) + .doOnNext(DisposableChannel::dispose) + .flatMap(DisposableChannel::onDispose); + } + + public Mono onDispose() { + return Mono.fromSupplier(connectionAtomicReference::get).flatMap(DisposableChannel::onDispose); + } + + public static void main(String[] args) throws URISyntaxException { + var rocksDBManager = new LuceneRocksDBManager(); + var localDb = new LLLocalDatabaseConnection(DefaultBufferAllocators.preferredAllocator(), + new CompositeMeterRegistry(), Path.of("."), false, rocksDBManager); + String keyFileLocation = System.getProperty("it.cavalliumdb.keyFile", null); + String keyFilePassword = System.getProperty("it.cavalliumdb.keyPassword", null); + String certFileLocation = System.getProperty("it.cavalliumdb.certFile", null); + String clientCertsLocation = System.getProperty("it.cavalliumdb.clientCerts", null); + String bindAddressText = Objects.requireNonNull(System.getProperty("it.cavalliumdb.bindAddress", null), "Empty bind address"); + var bindURI = new URI("inet://" + bindAddressText); + var bindAddress = new InetSocketAddress(bindURI.getHost(), bindURI.getPort()); + + QuicSslContext sslContext = QuicSslContextBuilder + .forServer(new File(keyFileLocation), keyFilePassword, new File(certFileLocation)) + .trustManager(new File(clientCertsLocation)) + .applicationProtocols("db/0.9") + .clientAuth(ClientAuth.REQUIRE) + .build(); + var qs = reactor.netty.incubator.quic.QuicServer + .create() + .tokenHandler(InsecureQuicTokenHandler.INSTANCE) + .bindAddress(() -> bindAddress) + .secure(sslContext) + .idleTimeout(Duration.ofSeconds(30)) + .connectionIdAddressGenerator(QuicConnectionIdGenerator.randomGenerator()) + .initialSettings(spec -> spec + .maxData(10000000) + .maxStreamDataBidirectionalLocal(1000000) + .maxStreamDataBidirectionalRemote(1000000) + .maxStreamsBidirectional(100) + .maxStreamsUnidirectional(100) + ); + QuicRPCServer server = new QuicRPCServer(rocksDBManager, localDb, qs); + server.bind().block(); + server.onDispose().block(); + localDb.disconnect().block(); + rocksDBManager.closeAll(); + } + + private Flux handleSingletonUpdateInit( + SingletonUpdateInit singletonUpdateInit, + Flux otherRequests) { + return singletons + .getResource(singletonUpdateInit.singletonId()) + .flatMapMany(singleton -> { + Many clientBound = Sinks.many().unicast().onBackpressureBuffer(); + Mono update = singleton.update(prev -> { + clientBound + .tryEmitNext(new SingletonUpdateOldData(prev != null, prev != null ? toByteList(prev) : ByteList.of())) + .orThrow(); + SingletonUpdateEnd newValue = (SingletonUpdateEnd) otherRequests.singleOrEmpty().block(); + Objects.requireNonNull(newValue); + if (!newValue.exist()) { + return null; + } else { + return localDb.getAllocator().copyOf(QuicUtils.toArrayNoCopy(newValue.value())); + } + }, singletonUpdateInit.updateReturnMode()) + .map(result -> new BinaryOptional(result != null ? NullableBinary.of(Binary.of(toByteList(result))) : NullableBinary.empty())); + return Flux.merge(update, clientBound.asFlux()); + }); + } + + private static ByteList toByteList(Send prev) { + try (var prevVal = prev.receive()) { + byte[] result = new byte[prevVal.readableBytes()]; + prevVal.readBytes(result, 0, result.length); + return ByteList.of(result); + } + } + + private Mono handleGetSingleton(GetSingleton getSingleton) { + var id = new DatabasePartName(getSingleton.databaseId(), getSingleton.singletonListColumnName()); + return this.singletons.getReference(id, getSingleton).map(GeneratedEntityId::new); + } + + private Mono obtainSingleton(DatabasePartName id, GetSingleton getSingleton) { + Mono dbMono = dbs.getResource(id.dbRef()); + return dbMono.flatMap(db -> db.getSingleton( + QuicUtils.toArrayNoCopy(getSingleton.singletonListColumnName()), + QuicUtils.toArrayNoCopy(getSingleton.name()), + QuicUtils.toArrayNoCopy(getSingleton.defaultValue()) + )); + } + + private Mono handleGetDatabase(GetDatabase getDatabase) { + return this.dbs.getReference(getDatabase.name(), getDatabase).map(GeneratedEntityId::of); + } + + private Mono obtainDatabase(String id, GetDatabase getDatabase) { + // Disable optimistic transactions, since network transactions require a lot of time + var options = getDatabase.databaseOptions().setOptimistic(false); + return localDb.getDatabase(id, getDatabase.columns(), options); + } + + public Mono handleGetLuceneIndex(GetLuceneIndex getLuceneIndex) { + return this.indices + .getReference(getLuceneIndex.clusterName(), getLuceneIndex) + .map(GeneratedEntityId::new); + } + + private Mono obtainLuceneIndex(String id, GetLuceneIndex getLuceneIndex) { + return localDb.getLuceneIndex(getLuceneIndex.clusterName(), + getLuceneIndex.structure(), + getLuceneIndex.indicizerAnalyzers(), + getLuceneIndex.indicizerSimilarities(), + getLuceneIndex.luceneOptions(), + null + ); + } + + private Mono handleCloseDatabase(CloseDatabase closeDatabase) { + return this.dbs.getResource(closeDatabase.databaseId()).flatMap(LLKeyValueDatabase::close).thenReturn(Empty.of()); + } + + private Mono handleCloseLuceneIndex(CloseLuceneIndex closeLuceneIndex) { + return this.indices + .getResource(closeLuceneIndex.luceneIndexId()) + .flatMap(LLLuceneIndex::close) + .thenReturn(Empty.of()); + } + +} diff --git a/src/main/java/it/cavallium/dbengine/database/remote/QuicServer.java b/src/main/java/it/cavallium/dbengine/database/remote/QuicServer.java deleted file mode 100644 index 55ca4b2..0000000 --- a/src/main/java/it/cavallium/dbengine/database/remote/QuicServer.java +++ /dev/null @@ -1,119 +0,0 @@ -package it.cavallium.dbengine.database.remote; - -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.composite.CompositeMeterRegistry; -import io.micrometer.core.instrument.noop.NoopMeter; -import io.net5.buffer.api.DefaultBufferAllocators; -import io.netty.handler.ssl.ClientAuth; -import io.netty.incubator.codec.quic.InsecureQuicTokenHandler; -import io.netty.incubator.codec.quic.QuicConnectionIdGenerator; -import io.netty.incubator.codec.quic.QuicSslContext; -import io.netty.incubator.codec.quic.QuicSslContextBuilder; -import it.cavallium.dbengine.database.LLKeyValueDatabase; -import it.cavallium.dbengine.database.LLLuceneIndex; -import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection; -import it.cavallium.dbengine.database.remote.RPCCodecs.RPCClientBoundResponseDecoder; -import it.cavallium.dbengine.database.remote.RPCCodecs.RPCServerBoundRequestDecoder; -import it.cavallium.dbengine.rpc.current.data.GeneratedEntityId; -import it.cavallium.dbengine.rpc.current.data.GetDatabase; -import it.cavallium.dbengine.rpc.current.data.ServerBoundRequest; -import java.io.File; -import java.net.InetSocketAddress; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.file.Path; -import java.time.Duration; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; -import java.util.logging.Level; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import reactor.core.publisher.Mono; -import reactor.core.publisher.SignalType; - -public class QuicServer { - - private static final Logger LOG = LogManager.getLogger(QuicServer.class); - private static LLLocalDatabaseConnection localDb; - - private static final AtomicLong nextResourceId = new AtomicLong(1); - private static final ConcurrentHashMap dbToId = new ConcurrentHashMap<>(); - private static final ConcurrentHashMap dbById = new ConcurrentHashMap<>(); - private static final ConcurrentHashMap indexToId = new ConcurrentHashMap<>(); - private static final ConcurrentHashMap indexById = new ConcurrentHashMap<>(); - - public static void main(String[] args) throws URISyntaxException { - localDb = new LLLocalDatabaseConnection(DefaultBufferAllocators.preferredAllocator(), - new CompositeMeterRegistry(), Path.of("."), false); - String keyFileLocation = System.getProperty("it.cavalliumdb.keyFile", null); - String keyFilePassword = System.getProperty("it.cavalliumdb.keyPassword", null); - String certFileLocation = System.getProperty("it.cavalliumdb.certFile", null); - String clientCertsLocation = System.getProperty("it.cavalliumdb.clientCerts", null); - String bindAddress = Objects.requireNonNull(System.getProperty("it.cavalliumdb.bindAddress", null), "Empty bind address"); - var bindURI = new URI("inet://" + bindAddress); - - QuicSslContext sslContext = QuicSslContextBuilder - .forServer(new File(keyFileLocation), keyFilePassword, new File(certFileLocation)) - .trustManager(new File(clientCertsLocation)) - .applicationProtocols("db/0.9") - .clientAuth(ClientAuth.REQUIRE) - .build(); - var qs = reactor.netty.incubator.quic.QuicServer - .create() - .tokenHandler(InsecureQuicTokenHandler.INSTANCE) - .bindAddress(() -> new InetSocketAddress(bindURI.getHost(), bindURI.getPort())) - .secure(sslContext) - .idleTimeout(Duration.ofSeconds(30)) - .connectionIdAddressGenerator(QuicConnectionIdGenerator.randomGenerator()) - .initialSettings(spec -> spec - .maxData(10000000) - .maxStreamDataBidirectionalLocal(1000000) - .maxStreamDataBidirectionalRemote(1000000) - .maxStreamsBidirectional(100) - .maxStreamsUnidirectional(100) - ) - .handleStream((in, out) -> { - var inConn = in.withConnection(conn -> conn.addHandler(new RPCClientBoundResponseDecoder())); - var outConn = out.withConnection(conn -> conn.addHandler(new RPCServerBoundRequestDecoder())); - return inConn - .receiveObject() - .cast(ServerBoundRequest.class) - .log("req", Level.INFO, SignalType.ON_NEXT) - .switchOnFirst((first, flux) -> { - if (first.hasValue()) { - var value = first.get(); - if (value instanceof GetDatabase getDatabase) { - return handleGetDatabase(getDatabase); - } else { - return Mono.error(new UnsupportedOperationException("Unsupported request type: " + first)); - } - } else { - return flux; - } - }) - .doOnError(ex -> LOG.error("Failed to handle a request", ex)) - .onErrorResume(ex -> Mono.empty()) - .transform(outConn::sendObject); - }); - var conn = qs.bindNow(); - conn.onDispose().block(); - } - - private static Mono handleGetDatabase(GetDatabase getDatabase) { - Mono dbCreationMono = localDb - .getDatabase(getDatabase.name(), getDatabase.columns(), getDatabase.databaseOptions()) - .flatMap(db -> Mono.fromCallable(() -> { - long id = nextResourceId.getAndIncrement(); - dbById.put(id, db); - dbToId.put(getDatabase.name(), id); - return GeneratedEntityId.of(id); - })); - - Mono existingDbMono = Mono - .fromSupplier(() -> dbToId.get(getDatabase.name())) - .map(GeneratedEntityId::of); - - return existingDbMono.switchIfEmpty(dbCreationMono); - } -} diff --git a/src/main/java/it/cavallium/dbengine/database/remote/ReferencedResource.java b/src/main/java/it/cavallium/dbengine/database/remote/ReferencedResource.java new file mode 100644 index 0000000..3d738bc --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/remote/ReferencedResource.java @@ -0,0 +1,3 @@ +package it.cavallium.dbengine.database.remote; + +public record ReferencedResource(Long reference, T resource) {} diff --git a/src/main/java/it/cavallium/dbengine/database/remote/ReferencedResources.java b/src/main/java/it/cavallium/dbengine/database/remote/ReferencedResources.java new file mode 100644 index 0000000..ab87960 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/remote/ReferencedResources.java @@ -0,0 +1,117 @@ +package it.cavallium.dbengine.database.remote; + +import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import org.jetbrains.annotations.Nullable; +import reactor.core.publisher.Mono; + +/** + * + * @param Identifier type + * @param Value type + */ +public class ReferencedResources { + + private final ResourceGetter resourceGetter; + private final ResourceFreer resourceFree; + private final AtomicLong nextReferenceId = new AtomicLong(1); + private final ConcurrentHashMap identifierToReferenceId = new ConcurrentHashMap<>(); + private final ConcurrentHashMap identifierByReferenceId = new ConcurrentHashMap<>(); + private final ConcurrentHashMap resourceByReferenceId = new ConcurrentHashMap<>(); + + public ReferencedResources(ResourceGetter resourceGetter, ResourceFreer resourceFree) { + this.resourceGetter = resourceGetter; + this.resourceFree = resourceFree; + } + + protected Mono obtain(I identifier, E extraParams) { + return resourceGetter.obtain(identifier, extraParams); + } + + protected Mono free(V resource) { + return resourceFree.free(resource); + } + + public Mono getReference(I identifier, @Nullable E extraParams) { + Mono existingDbMono = Mono.fromSupplier(() -> identifierToReferenceId.get(identifier)); + + if (extraParams != null) { + // Defer to avoid building this chain when not needed + Mono dbCreationMono = Mono.defer(() -> this + .obtain(identifier, extraParams) + .flatMap(db -> Mono.fromCallable(() -> { + long referenceId = nextReferenceId.getAndIncrement(); + resourceByReferenceId.put(referenceId, db); + identifierToReferenceId.put(identifier, referenceId); + identifierByReferenceId.put(referenceId, identifier); + return referenceId; + }))); + + return existingDbMono.switchIfEmpty(dbCreationMono); + } else { + return existingDbMono + .switchIfEmpty(Mono.error(() -> new NoSuchElementException("Resource not found: " + identifier))); + } + } + + public Mono> getResource(I identifier, @Nullable E extraParams) { + Mono> existingDbMono = Mono.fromSupplier(() -> { + var referenceId = identifierToReferenceId.get(identifier); + if (referenceId == null) { + return null; + } + var resource = resourceByReferenceId.get(referenceId); + if (resource == null) { + return null; + } + return new ReferencedResource<>(referenceId, resource); + }); + + if (extraParams != null) { + // Defer to avoid building this chain when not needed + Mono> dbCreationMono = Mono.defer(() -> this + .obtain(identifier, extraParams) + .map(resource -> { + long referenceId = nextReferenceId.getAndIncrement(); + resourceByReferenceId.put(referenceId, resource); + identifierToReferenceId.put(identifier, referenceId); + identifierByReferenceId.put(referenceId, identifier); + return new ReferencedResource<>(referenceId, resource); + })); + + return existingDbMono.switchIfEmpty(dbCreationMono); + } else { + return existingDbMono + .switchIfEmpty(Mono.error(() -> new NoSuchElementException("Resource not found: " + identifier))); + } + } + + public Mono getResource(long referenceId) { + Mono existingDbMono = Mono.fromSupplier(() -> resourceByReferenceId.get(referenceId)); + return existingDbMono.switchIfEmpty(Mono.error(() -> + new NoSuchElementException("Resource not found: " + referenceId))); + } + + public Mono releaseResource(I identifier) { + return Mono.fromSupplier(() -> { + var referenceId = identifierToReferenceId.remove(identifier); + if (referenceId == null) { + return null; + } + identifierByReferenceId.remove(referenceId); + return resourceByReferenceId.remove(referenceId); + }).flatMap(this::free); + } + + public Mono releaseResource(long referenceId) { + return Mono.fromSupplier(() -> { + var identifier = identifierByReferenceId.remove(referenceId); + if (identifier == null) { + return null; + } + identifierToReferenceId.remove(identifier); + return resourceByReferenceId.remove(referenceId); + }).flatMap(this::free); + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/remote/ResourceFreer.java b/src/main/java/it/cavallium/dbengine/database/remote/ResourceFreer.java new file mode 100644 index 0000000..c355ac0 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/remote/ResourceFreer.java @@ -0,0 +1,8 @@ +package it.cavallium.dbengine.database.remote; + +import reactor.core.publisher.Mono; + +public interface ResourceFreer { + + Mono free(V resource); +} diff --git a/src/main/java/it/cavallium/dbengine/database/remote/ResourceGetter.java b/src/main/java/it/cavallium/dbengine/database/remote/ResourceGetter.java new file mode 100644 index 0000000..201eca0 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/remote/ResourceGetter.java @@ -0,0 +1,8 @@ +package it.cavallium.dbengine.database.remote; + +import reactor.core.publisher.Mono; + +public interface ResourceGetter { + + Mono obtain(I identifier, E extra); +} diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml index 5cfb8ca..c84b52b 100644 --- a/src/main/resources/log4j2.xml +++ b/src/main/resources/log4j2.xml @@ -12,7 +12,7 @@ --> - + diff --git a/src/test/java/it/cavallium/dbengine/database/remote/LLQuicConnectionTest.java b/src/test/java/it/cavallium/dbengine/database/remote/LLQuicConnectionTest.java new file mode 100644 index 0000000..3e3028a --- /dev/null +++ b/src/test/java/it/cavallium/dbengine/database/remote/LLQuicConnectionTest.java @@ -0,0 +1,112 @@ +package it.cavallium.dbengine.database.remote; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import io.micrometer.core.instrument.composite.CompositeMeterRegistry; +import io.net5.buffer.api.BufferAllocator; +import io.net5.buffer.api.DefaultBufferAllocators; +import it.cavallium.data.generator.nativedata.Nullableboolean; +import it.cavallium.data.generator.nativedata.Nullableint; +import it.cavallium.data.generator.nativedata.Nullablelong; +import it.cavallium.dbengine.client.IndicizerAnalyzers; +import it.cavallium.dbengine.client.IndicizerSimilarities; +import it.cavallium.dbengine.database.ColumnUtils; +import it.cavallium.dbengine.database.LLDatabaseConnection; +import it.cavallium.dbengine.rpc.current.data.ByteBuffersDirectory; +import it.cavallium.dbengine.rpc.current.data.DatabaseOptions; +import it.cavallium.dbengine.rpc.current.data.LuceneIndexStructure; +import it.cavallium.dbengine.rpc.current.data.LuceneOptions; +import it.unimi.dsi.fastutil.ints.IntList; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class LLQuicConnectionTest { + + private BufferAllocator allocator; + private CompositeMeterRegistry meterRegistry; + private TestDbServer server; + private LLDatabaseConnection client; + + @BeforeEach + void setUp() { + this.allocator = DefaultBufferAllocators.preferredAllocator(); + this.meterRegistry = new CompositeMeterRegistry(); + this.server = TestDbServer.create(allocator, meterRegistry); + server.bind().block(); + this.client = TestDbClient.create(allocator, meterRegistry, server.address()).connect().block(); + } + + @AfterEach + void tearDown() { + if (client != null) { + client.disconnect().block(); + } + if (server != null) { + server.dispose().block(); + } + } + + @Test + void getAllocator() { + assertEquals(allocator, client.getAllocator()); + } + + @Test + void getMeterRegistry() { + assertEquals(meterRegistry, client.getMeterRegistry()); + } + + @Test + void getDatabase() { + var dbName = "test-temp-db"; + var singletonsColumnName = "singletons"; + var db = client.getDatabase(dbName, + List.of(ColumnUtils.special(singletonsColumnName)), + new DatabaseOptions(List.of(), + Map.of(), + true, + true, + true, + true, + true, + true, + Nullableint.empty(), + Nullablelong.empty(), + Nullablelong.empty(), + Nullableboolean.empty() + ) + ).blockOptional().orElseThrow(); + assertEquals(dbName, db.getDatabaseName()); + assertEquals(allocator, db.getAllocator()); + assertEquals(meterRegistry, db.getMeterRegistry()); + assertDoesNotThrow(() -> db.close().block()); + } + + @Test + void getLuceneIndex() { + var shardName = "test-lucene-shard"; + var index = client.getLuceneIndex(shardName, + new LuceneIndexStructure(1, IntList.of(0)), + IndicizerAnalyzers.of(), + IndicizerSimilarities.of(), + new LuceneOptions(Map.of(), + Duration.ofSeconds(1), + Duration.ofSeconds(1), + false, + new ByteBuffersDirectory(), + -1, + false, + false, + false, + 100 + ), + null).blockOptional().orElseThrow(); + assertEquals(shardName, index.getLuceneIndexName()); + assertDoesNotThrow(() -> index.close().block()); + } +} \ No newline at end of file diff --git a/src/test/java/it/cavallium/dbengine/database/remote/TestDbClient.java b/src/test/java/it/cavallium/dbengine/database/remote/TestDbClient.java new file mode 100644 index 0000000..48aef83 --- /dev/null +++ b/src/test/java/it/cavallium/dbengine/database/remote/TestDbClient.java @@ -0,0 +1,18 @@ +package it.cavallium.dbengine.database.remote; + +import io.micrometer.core.instrument.composite.CompositeMeterRegistry; +import io.net5.buffer.api.BufferAllocator; +import java.net.InetSocketAddress; + +public class TestDbClient { + + public static LLQuicConnection create(BufferAllocator allocator, + CompositeMeterRegistry meterRegistry, + InetSocketAddress serverAddress) { + return new LLQuicConnection(allocator, + meterRegistry, + new InetSocketAddress(0), + serverAddress + ); + } +} diff --git a/src/test/java/it/cavallium/dbengine/database/remote/TestDbServer.java b/src/test/java/it/cavallium/dbengine/database/remote/TestDbServer.java new file mode 100644 index 0000000..a678abc --- /dev/null +++ b/src/test/java/it/cavallium/dbengine/database/remote/TestDbServer.java @@ -0,0 +1,71 @@ +package it.cavallium.dbengine.database.remote; + +import io.micrometer.core.instrument.composite.CompositeMeterRegistry; +import io.net5.buffer.api.BufferAllocator; +import io.netty.handler.ssl.ClientAuth; +import io.netty.handler.ssl.util.SelfSignedCertificate; +import io.netty.incubator.codec.quic.InsecureQuicTokenHandler; +import io.netty.incubator.codec.quic.QuicConnectionIdGenerator; +import io.netty.incubator.codec.quic.QuicSslContext; +import io.netty.incubator.codec.quic.QuicSslContextBuilder; +import it.cavallium.dbengine.database.LLDatabaseConnection; +import it.cavallium.dbengine.database.memory.LLMemoryDatabaseConnection; +import it.cavallium.dbengine.lucene.LuceneRocksDBManager; +import java.net.InetSocketAddress; +import java.security.cert.CertificateException; +import java.time.Duration; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; +import reactor.netty.incubator.quic.QuicServer; + +public class TestDbServer extends QuicRPCServer { + + private static final InetSocketAddress BIND_ADDRESS = new InetSocketAddress(8080); + + public TestDbServer(LuceneRocksDBManager rocksDBManager, LLDatabaseConnection localDb, QuicServer quicServer) { + super(rocksDBManager, localDb, quicServer); + } + + public static TestDbServer create(BufferAllocator allocator, CompositeMeterRegistry meterRegistry) { + var rocksDBManager = new LuceneRocksDBManager(); + var localDb = new LLMemoryDatabaseConnection(allocator, meterRegistry); + SelfSignedCertificate selfSignedCert; + try { + selfSignedCert = new SelfSignedCertificate(); + } catch (CertificateException e) { + throw new RuntimeException(e); + } + QuicSslContext sslContext = QuicSslContextBuilder + .forServer(selfSignedCert.key(), null, selfSignedCert.cert()) + .applicationProtocols("db/0.9") + .clientAuth(ClientAuth.NONE) + .build(); + var qs = reactor.netty.incubator.quic.QuicServer + .create() + .tokenHandler(InsecureQuicTokenHandler.INSTANCE) + .bindAddress(() -> BIND_ADDRESS) + .secure(sslContext) + .idleTimeout(Duration.ofSeconds(30)) + .connectionIdAddressGenerator(QuicConnectionIdGenerator.randomGenerator()) + .initialSettings(spec -> spec + .maxData(10000000) + .maxStreamDataBidirectionalLocal(1000000) + .maxStreamDataBidirectionalRemote(1000000) + .maxStreamsBidirectional(100) + .maxStreamsUnidirectional(100) + ); + return new TestDbServer(rocksDBManager, localDb, qs); + } + + public InetSocketAddress address() { + return BIND_ADDRESS; + } + + @Override + public Mono dispose() { + var closeManager = Mono + .fromRunnable(rocksDBManager::closeAll) + .subscribeOn(Schedulers.boundedElastic()); + return super.dispose().then(closeManager).then(localDb.disconnect()); + } +}