Partially implement server
This commit is contained in:
parent
665cd730fd
commit
cb966d96e3
57
pom.xml
57
pom.xml
@ -110,7 +110,60 @@
|
||||
<artifactId>log4j-core</artifactId>
|
||||
<version>2.17.1</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-api</artifactId>
|
||||
<version>5.8.2</version>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.hamcrest</groupId>
|
||||
<artifactId>hamcrest-core</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-engine</artifactId>
|
||||
<version>5.8.2</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-params</artifactId>
|
||||
<version>5.8.2</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.assertj</groupId>
|
||||
<artifactId>assertj-core</artifactId>
|
||||
<version>3.22.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<!-- This will get hamcrest-core automatically -->
|
||||
<dependency>
|
||||
<groupId>org.hamcrest</groupId>
|
||||
<artifactId>hamcrest-library</artifactId>
|
||||
<version>2.2</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-slf4j-impl</artifactId>
|
||||
<version>2.17.1</version>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-api</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<build>
|
||||
<testSourceDirectory>src/test/java</testSourceDirectory>
|
||||
<extensions>
|
||||
@ -175,7 +228,7 @@
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-engine</artifactId>
|
||||
<version>5.8.0-M1</version>
|
||||
<version>5.8.2</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<configuration>
|
||||
|
@ -0,0 +1,5 @@
|
||||
package it.cavallium.dbengine.database.remote;
|
||||
|
||||
import it.unimi.dsi.fastutil.bytes.ByteList;
|
||||
|
||||
public record DatabasePartName(long dbRef, ByteList resourceName) {}
|
@ -0,0 +1,260 @@
|
||||
package it.cavallium.dbengine.database.remote;
|
||||
|
||||
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
|
||||
import io.net5.buffer.api.Buffer;
|
||||
import io.net5.buffer.api.DefaultBufferAllocators;
|
||||
import io.net5.buffer.api.Send;
|
||||
import io.netty.handler.ssl.ClientAuth;
|
||||
import io.netty.incubator.codec.quic.InsecureQuicTokenHandler;
|
||||
import io.netty.incubator.codec.quic.QuicConnectionIdGenerator;
|
||||
import io.netty.incubator.codec.quic.QuicSslContext;
|
||||
import io.netty.incubator.codec.quic.QuicSslContextBuilder;
|
||||
import it.cavallium.dbengine.database.LLDatabaseConnection;
|
||||
import it.cavallium.dbengine.database.LLKeyValueDatabase;
|
||||
import it.cavallium.dbengine.database.LLLuceneIndex;
|
||||
import it.cavallium.dbengine.database.LLSingleton;
|
||||
import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
|
||||
import it.cavallium.dbengine.database.remote.RPCCodecs.RPCEventCodec;
|
||||
import it.cavallium.dbengine.lucene.LuceneRocksDBManager;
|
||||
import it.cavallium.dbengine.rpc.current.data.Binary;
|
||||
import it.cavallium.dbengine.rpc.current.data.BinaryOptional;
|
||||
import it.cavallium.dbengine.rpc.current.data.CloseDatabase;
|
||||
import it.cavallium.dbengine.rpc.current.data.CloseLuceneIndex;
|
||||
import it.cavallium.dbengine.rpc.current.data.Empty;
|
||||
import it.cavallium.dbengine.rpc.current.data.GeneratedEntityId;
|
||||
import it.cavallium.dbengine.rpc.current.data.GetDatabase;
|
||||
import it.cavallium.dbengine.rpc.current.data.GetLuceneIndex;
|
||||
import it.cavallium.dbengine.rpc.current.data.GetSingleton;
|
||||
import it.cavallium.dbengine.rpc.current.data.RPCEvent;
|
||||
import it.cavallium.dbengine.rpc.current.data.ServerBoundRequest;
|
||||
import it.cavallium.dbengine.rpc.current.data.SingletonUpdateEnd;
|
||||
import it.cavallium.dbengine.rpc.current.data.SingletonUpdateInit;
|
||||
import it.cavallium.dbengine.rpc.current.data.SingletonUpdateOldData;
|
||||
import it.cavallium.dbengine.rpc.current.data.nullables.NullableBinary;
|
||||
import it.unimi.dsi.fastutil.bytes.ByteList;
|
||||
import java.io.File;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.logging.Level;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.publisher.Signal;
|
||||
import reactor.core.publisher.SignalType;
|
||||
import reactor.core.publisher.Sinks;
|
||||
import reactor.core.publisher.Sinks.Many;
|
||||
import reactor.netty.Connection;
|
||||
import reactor.netty.DisposableChannel;
|
||||
import reactor.netty.incubator.quic.QuicServer;
|
||||
|
||||
public class QuicRPCServer {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(QuicRPCServer.class);
|
||||
private final QuicServer quicServer;
|
||||
protected final LuceneRocksDBManager rocksDBManager;
|
||||
protected final LLDatabaseConnection localDb;
|
||||
private final AtomicReference<Connection> connectionAtomicReference = new AtomicReference<>();
|
||||
|
||||
private final ReferencedResources<String, GetDatabase, LLKeyValueDatabase> dbs = new ReferencedResources<>(this::obtainDatabase, LLKeyValueDatabase::close);
|
||||
private final ReferencedResources<DatabasePartName, GetSingleton, LLSingleton> singletons = new ReferencedResources<>(this::obtainSingleton, s -> Mono.empty());
|
||||
private final ReferencedResources<String, GetLuceneIndex, LLLuceneIndex> indices = new ReferencedResources<>(this::obtainLuceneIndex, LLLuceneIndex::close);
|
||||
|
||||
public QuicRPCServer(LuceneRocksDBManager rocksDBManager, LLDatabaseConnection localDb, QuicServer quicServer) {
|
||||
this.rocksDBManager = rocksDBManager;
|
||||
this.localDb = localDb;
|
||||
this.quicServer = quicServer.handleStream((in, out) -> in
|
||||
.withConnection(conn -> conn.addHandler(new RPCEventCodec()))
|
||||
.receiveObject()
|
||||
.cast(RPCEvent.class)
|
||||
.log("ServerBoundRequest", Level.FINEST, SignalType.ON_NEXT)
|
||||
.switchOnFirst((Signal<? extends RPCEvent> first, Flux<RPCEvent> flux) -> {
|
||||
if (first.hasValue()) {
|
||||
ServerBoundRequest value = (ServerBoundRequest) first.get();
|
||||
if (value instanceof GetDatabase getDatabase) {
|
||||
return handleGetDatabase(getDatabase)
|
||||
.transform(this::catchRPCErrorsFlux);
|
||||
} else if (value instanceof GetSingleton getSingleton) {
|
||||
return handleGetSingleton(getSingleton)
|
||||
.transform(this::catchRPCErrorsFlux);
|
||||
} else if (value instanceof SingletonUpdateInit singletonUpdateInit) {
|
||||
return handleSingletonUpdateInit(singletonUpdateInit, flux.skip(1))
|
||||
.transform(this::catchRPCErrorsFlux);
|
||||
} else if (value instanceof CloseDatabase closeDatabase) {
|
||||
return handleCloseDatabase(closeDatabase)
|
||||
.transform(this::catchRPCErrorsFlux);
|
||||
} else if (value instanceof CloseLuceneIndex closeLuceneIndex) {
|
||||
return handleCloseLuceneIndex(closeLuceneIndex)
|
||||
.transform(this::catchRPCErrorsFlux);
|
||||
} else if (value instanceof GetLuceneIndex getLuceneIndex) {
|
||||
return handleGetLuceneIndex(getLuceneIndex)
|
||||
.transform(this::catchRPCErrorsFlux);
|
||||
} else {
|
||||
return QuicUtils.catchRPCErrors(new UnsupportedOperationException("Unsupported request type: " + first));
|
||||
}
|
||||
} else {
|
||||
return flux;
|
||||
}
|
||||
})
|
||||
.doOnError(ex -> LOG.error("Failed to handle a request", ex))
|
||||
.onErrorResume(QuicUtils::catchRPCErrors)
|
||||
.concatMap(response -> out
|
||||
.withConnection(conn -> conn.addHandler(new RPCEventCodec()))
|
||||
.sendObject(response)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
private Flux<RPCEvent> catchRPCErrorsFlux(Publisher<? extends RPCEvent> flux) {
|
||||
return Flux
|
||||
.from(flux)
|
||||
.doOnError(ex -> LOG.error("Failed to handle a request", ex))
|
||||
.cast(RPCEvent.class)
|
||||
.onErrorResume(QuicUtils::catchRPCErrors);
|
||||
|
||||
}
|
||||
|
||||
public Mono<Void> bind() {
|
||||
return quicServer.bind().doOnNext(connectionAtomicReference::set).then();
|
||||
}
|
||||
|
||||
public Mono<Void> dispose() {
|
||||
return Mono
|
||||
.fromSupplier(connectionAtomicReference::get)
|
||||
.doOnNext(DisposableChannel::dispose)
|
||||
.flatMap(DisposableChannel::onDispose);
|
||||
}
|
||||
|
||||
public Mono<Void> onDispose() {
|
||||
return Mono.fromSupplier(connectionAtomicReference::get).flatMap(DisposableChannel::onDispose);
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws URISyntaxException {
|
||||
var rocksDBManager = new LuceneRocksDBManager();
|
||||
var localDb = new LLLocalDatabaseConnection(DefaultBufferAllocators.preferredAllocator(),
|
||||
new CompositeMeterRegistry(), Path.of("."), false, rocksDBManager);
|
||||
String keyFileLocation = System.getProperty("it.cavalliumdb.keyFile", null);
|
||||
String keyFilePassword = System.getProperty("it.cavalliumdb.keyPassword", null);
|
||||
String certFileLocation = System.getProperty("it.cavalliumdb.certFile", null);
|
||||
String clientCertsLocation = System.getProperty("it.cavalliumdb.clientCerts", null);
|
||||
String bindAddressText = Objects.requireNonNull(System.getProperty("it.cavalliumdb.bindAddress", null), "Empty bind address");
|
||||
var bindURI = new URI("inet://" + bindAddressText);
|
||||
var bindAddress = new InetSocketAddress(bindURI.getHost(), bindURI.getPort());
|
||||
|
||||
QuicSslContext sslContext = QuicSslContextBuilder
|
||||
.forServer(new File(keyFileLocation), keyFilePassword, new File(certFileLocation))
|
||||
.trustManager(new File(clientCertsLocation))
|
||||
.applicationProtocols("db/0.9")
|
||||
.clientAuth(ClientAuth.REQUIRE)
|
||||
.build();
|
||||
var qs = reactor.netty.incubator.quic.QuicServer
|
||||
.create()
|
||||
.tokenHandler(InsecureQuicTokenHandler.INSTANCE)
|
||||
.bindAddress(() -> bindAddress)
|
||||
.secure(sslContext)
|
||||
.idleTimeout(Duration.ofSeconds(30))
|
||||
.connectionIdAddressGenerator(QuicConnectionIdGenerator.randomGenerator())
|
||||
.initialSettings(spec -> spec
|
||||
.maxData(10000000)
|
||||
.maxStreamDataBidirectionalLocal(1000000)
|
||||
.maxStreamDataBidirectionalRemote(1000000)
|
||||
.maxStreamsBidirectional(100)
|
||||
.maxStreamsUnidirectional(100)
|
||||
);
|
||||
QuicRPCServer server = new QuicRPCServer(rocksDBManager, localDb, qs);
|
||||
server.bind().block();
|
||||
server.onDispose().block();
|
||||
localDb.disconnect().block();
|
||||
rocksDBManager.closeAll();
|
||||
}
|
||||
|
||||
private Flux<RPCEvent> handleSingletonUpdateInit(
|
||||
SingletonUpdateInit singletonUpdateInit,
|
||||
Flux<RPCEvent> otherRequests) {
|
||||
return singletons
|
||||
.getResource(singletonUpdateInit.singletonId())
|
||||
.flatMapMany(singleton -> {
|
||||
Many<RPCEvent> clientBound = Sinks.many().unicast().onBackpressureBuffer();
|
||||
Mono<RPCEvent> update = singleton.update(prev -> {
|
||||
clientBound
|
||||
.tryEmitNext(new SingletonUpdateOldData(prev != null, prev != null ? toByteList(prev) : ByteList.of()))
|
||||
.orThrow();
|
||||
SingletonUpdateEnd newValue = (SingletonUpdateEnd) otherRequests.singleOrEmpty().block();
|
||||
Objects.requireNonNull(newValue);
|
||||
if (!newValue.exist()) {
|
||||
return null;
|
||||
} else {
|
||||
return localDb.getAllocator().copyOf(QuicUtils.toArrayNoCopy(newValue.value()));
|
||||
}
|
||||
}, singletonUpdateInit.updateReturnMode())
|
||||
.map(result -> new BinaryOptional(result != null ? NullableBinary.of(Binary.of(toByteList(result))) : NullableBinary.empty()));
|
||||
return Flux.merge(update, clientBound.asFlux());
|
||||
});
|
||||
}
|
||||
|
||||
private static ByteList toByteList(Send<Buffer> prev) {
|
||||
try (var prevVal = prev.receive()) {
|
||||
byte[] result = new byte[prevVal.readableBytes()];
|
||||
prevVal.readBytes(result, 0, result.length);
|
||||
return ByteList.of(result);
|
||||
}
|
||||
}
|
||||
|
||||
private Mono<GeneratedEntityId> handleGetSingleton(GetSingleton getSingleton) {
|
||||
var id = new DatabasePartName(getSingleton.databaseId(), getSingleton.singletonListColumnName());
|
||||
return this.singletons.getReference(id, getSingleton).map(GeneratedEntityId::new);
|
||||
}
|
||||
|
||||
private Mono<LLSingleton> obtainSingleton(DatabasePartName id, GetSingleton getSingleton) {
|
||||
Mono<LLKeyValueDatabase> dbMono = dbs.getResource(id.dbRef());
|
||||
return dbMono.flatMap(db -> db.getSingleton(
|
||||
QuicUtils.toArrayNoCopy(getSingleton.singletonListColumnName()),
|
||||
QuicUtils.toArrayNoCopy(getSingleton.name()),
|
||||
QuicUtils.toArrayNoCopy(getSingleton.defaultValue())
|
||||
));
|
||||
}
|
||||
|
||||
private Mono<GeneratedEntityId> handleGetDatabase(GetDatabase getDatabase) {
|
||||
return this.dbs.getReference(getDatabase.name(), getDatabase).map(GeneratedEntityId::of);
|
||||
}
|
||||
|
||||
private Mono<? extends LLKeyValueDatabase> obtainDatabase(String id, GetDatabase getDatabase) {
|
||||
// Disable optimistic transactions, since network transactions require a lot of time
|
||||
var options = getDatabase.databaseOptions().setOptimistic(false);
|
||||
return localDb.getDatabase(id, getDatabase.columns(), options);
|
||||
}
|
||||
|
||||
public Mono<GeneratedEntityId> handleGetLuceneIndex(GetLuceneIndex getLuceneIndex) {
|
||||
return this.indices
|
||||
.getReference(getLuceneIndex.clusterName(), getLuceneIndex)
|
||||
.map(GeneratedEntityId::new);
|
||||
}
|
||||
|
||||
private Mono<? extends LLLuceneIndex> obtainLuceneIndex(String id, GetLuceneIndex getLuceneIndex) {
|
||||
return localDb.getLuceneIndex(getLuceneIndex.clusterName(),
|
||||
getLuceneIndex.structure(),
|
||||
getLuceneIndex.indicizerAnalyzers(),
|
||||
getLuceneIndex.indicizerSimilarities(),
|
||||
getLuceneIndex.luceneOptions(),
|
||||
null
|
||||
);
|
||||
}
|
||||
|
||||
private Mono<RPCEvent> handleCloseDatabase(CloseDatabase closeDatabase) {
|
||||
return this.dbs.getResource(closeDatabase.databaseId()).flatMap(LLKeyValueDatabase::close).thenReturn(Empty.of());
|
||||
}
|
||||
|
||||
private Mono<RPCEvent> handleCloseLuceneIndex(CloseLuceneIndex closeLuceneIndex) {
|
||||
return this.indices
|
||||
.getResource(closeLuceneIndex.luceneIndexId())
|
||||
.flatMap(LLLuceneIndex::close)
|
||||
.thenReturn(Empty.of());
|
||||
}
|
||||
|
||||
}
|
@ -1,119 +0,0 @@
|
||||
package it.cavallium.dbengine.database.remote;
|
||||
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
|
||||
import io.micrometer.core.instrument.noop.NoopMeter;
|
||||
import io.net5.buffer.api.DefaultBufferAllocators;
|
||||
import io.netty.handler.ssl.ClientAuth;
|
||||
import io.netty.incubator.codec.quic.InsecureQuicTokenHandler;
|
||||
import io.netty.incubator.codec.quic.QuicConnectionIdGenerator;
|
||||
import io.netty.incubator.codec.quic.QuicSslContext;
|
||||
import io.netty.incubator.codec.quic.QuicSslContextBuilder;
|
||||
import it.cavallium.dbengine.database.LLKeyValueDatabase;
|
||||
import it.cavallium.dbengine.database.LLLuceneIndex;
|
||||
import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
|
||||
import it.cavallium.dbengine.database.remote.RPCCodecs.RPCClientBoundResponseDecoder;
|
||||
import it.cavallium.dbengine.database.remote.RPCCodecs.RPCServerBoundRequestDecoder;
|
||||
import it.cavallium.dbengine.rpc.current.data.GeneratedEntityId;
|
||||
import it.cavallium.dbengine.rpc.current.data.GetDatabase;
|
||||
import it.cavallium.dbengine.rpc.current.data.ServerBoundRequest;
|
||||
import java.io.File;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.logging.Level;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.publisher.SignalType;
|
||||
|
||||
public class QuicServer {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(QuicServer.class);
|
||||
private static LLLocalDatabaseConnection localDb;
|
||||
|
||||
private static final AtomicLong nextResourceId = new AtomicLong(1);
|
||||
private static final ConcurrentHashMap<String, Long> dbToId = new ConcurrentHashMap<>();
|
||||
private static final ConcurrentHashMap<Long, LLKeyValueDatabase> dbById = new ConcurrentHashMap<>();
|
||||
private static final ConcurrentHashMap<String, Long> indexToId = new ConcurrentHashMap<>();
|
||||
private static final ConcurrentHashMap<Long, LLLuceneIndex> indexById = new ConcurrentHashMap<>();
|
||||
|
||||
public static void main(String[] args) throws URISyntaxException {
|
||||
localDb = new LLLocalDatabaseConnection(DefaultBufferAllocators.preferredAllocator(),
|
||||
new CompositeMeterRegistry(), Path.of("."), false);
|
||||
String keyFileLocation = System.getProperty("it.cavalliumdb.keyFile", null);
|
||||
String keyFilePassword = System.getProperty("it.cavalliumdb.keyPassword", null);
|
||||
String certFileLocation = System.getProperty("it.cavalliumdb.certFile", null);
|
||||
String clientCertsLocation = System.getProperty("it.cavalliumdb.clientCerts", null);
|
||||
String bindAddress = Objects.requireNonNull(System.getProperty("it.cavalliumdb.bindAddress", null), "Empty bind address");
|
||||
var bindURI = new URI("inet://" + bindAddress);
|
||||
|
||||
QuicSslContext sslContext = QuicSslContextBuilder
|
||||
.forServer(new File(keyFileLocation), keyFilePassword, new File(certFileLocation))
|
||||
.trustManager(new File(clientCertsLocation))
|
||||
.applicationProtocols("db/0.9")
|
||||
.clientAuth(ClientAuth.REQUIRE)
|
||||
.build();
|
||||
var qs = reactor.netty.incubator.quic.QuicServer
|
||||
.create()
|
||||
.tokenHandler(InsecureQuicTokenHandler.INSTANCE)
|
||||
.bindAddress(() -> new InetSocketAddress(bindURI.getHost(), bindURI.getPort()))
|
||||
.secure(sslContext)
|
||||
.idleTimeout(Duration.ofSeconds(30))
|
||||
.connectionIdAddressGenerator(QuicConnectionIdGenerator.randomGenerator())
|
||||
.initialSettings(spec -> spec
|
||||
.maxData(10000000)
|
||||
.maxStreamDataBidirectionalLocal(1000000)
|
||||
.maxStreamDataBidirectionalRemote(1000000)
|
||||
.maxStreamsBidirectional(100)
|
||||
.maxStreamsUnidirectional(100)
|
||||
)
|
||||
.handleStream((in, out) -> {
|
||||
var inConn = in.withConnection(conn -> conn.addHandler(new RPCClientBoundResponseDecoder()));
|
||||
var outConn = out.withConnection(conn -> conn.addHandler(new RPCServerBoundRequestDecoder()));
|
||||
return inConn
|
||||
.receiveObject()
|
||||
.cast(ServerBoundRequest.class)
|
||||
.log("req", Level.INFO, SignalType.ON_NEXT)
|
||||
.switchOnFirst((first, flux) -> {
|
||||
if (first.hasValue()) {
|
||||
var value = first.get();
|
||||
if (value instanceof GetDatabase getDatabase) {
|
||||
return handleGetDatabase(getDatabase);
|
||||
} else {
|
||||
return Mono.error(new UnsupportedOperationException("Unsupported request type: " + first));
|
||||
}
|
||||
} else {
|
||||
return flux;
|
||||
}
|
||||
})
|
||||
.doOnError(ex -> LOG.error("Failed to handle a request", ex))
|
||||
.onErrorResume(ex -> Mono.empty())
|
||||
.transform(outConn::sendObject);
|
||||
});
|
||||
var conn = qs.bindNow();
|
||||
conn.onDispose().block();
|
||||
}
|
||||
|
||||
private static Mono<GeneratedEntityId> handleGetDatabase(GetDatabase getDatabase) {
|
||||
Mono<GeneratedEntityId> dbCreationMono = localDb
|
||||
.getDatabase(getDatabase.name(), getDatabase.columns(), getDatabase.databaseOptions())
|
||||
.flatMap(db -> Mono.fromCallable(() -> {
|
||||
long id = nextResourceId.getAndIncrement();
|
||||
dbById.put(id, db);
|
||||
dbToId.put(getDatabase.name(), id);
|
||||
return GeneratedEntityId.of(id);
|
||||
}));
|
||||
|
||||
Mono<GeneratedEntityId> existingDbMono = Mono
|
||||
.fromSupplier(() -> dbToId.get(getDatabase.name()))
|
||||
.map(GeneratedEntityId::of);
|
||||
|
||||
return existingDbMono.switchIfEmpty(dbCreationMono);
|
||||
}
|
||||
}
|
@ -0,0 +1,3 @@
|
||||
package it.cavallium.dbengine.database.remote;
|
||||
|
||||
public record ReferencedResource<T>(Long reference, T resource) {}
|
@ -0,0 +1,117 @@
|
||||
package it.cavallium.dbengine.database.remote;
|
||||
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
/**
|
||||
*
|
||||
* @param <I> Identifier type
|
||||
* @param <V> Value type
|
||||
*/
|
||||
public class ReferencedResources<I, E, V> {
|
||||
|
||||
private final ResourceGetter<I, E, V> resourceGetter;
|
||||
private final ResourceFreer<V> resourceFree;
|
||||
private final AtomicLong nextReferenceId = new AtomicLong(1);
|
||||
private final ConcurrentHashMap<I, Long> identifierToReferenceId = new ConcurrentHashMap<>();
|
||||
private final ConcurrentHashMap<Long, I> identifierByReferenceId = new ConcurrentHashMap<>();
|
||||
private final ConcurrentHashMap<Long, V> resourceByReferenceId = new ConcurrentHashMap<>();
|
||||
|
||||
public ReferencedResources(ResourceGetter<I, E, V> resourceGetter, ResourceFreer<V> resourceFree) {
|
||||
this.resourceGetter = resourceGetter;
|
||||
this.resourceFree = resourceFree;
|
||||
}
|
||||
|
||||
protected Mono<? extends V> obtain(I identifier, E extraParams) {
|
||||
return resourceGetter.obtain(identifier, extraParams);
|
||||
}
|
||||
|
||||
protected Mono<Void> free(V resource) {
|
||||
return resourceFree.free(resource);
|
||||
}
|
||||
|
||||
public Mono<Long> getReference(I identifier, @Nullable E extraParams) {
|
||||
Mono<Long> existingDbMono = Mono.fromSupplier(() -> identifierToReferenceId.get(identifier));
|
||||
|
||||
if (extraParams != null) {
|
||||
// Defer to avoid building this chain when not needed
|
||||
Mono<Long> dbCreationMono = Mono.defer(() -> this
|
||||
.obtain(identifier, extraParams)
|
||||
.flatMap(db -> Mono.fromCallable(() -> {
|
||||
long referenceId = nextReferenceId.getAndIncrement();
|
||||
resourceByReferenceId.put(referenceId, db);
|
||||
identifierToReferenceId.put(identifier, referenceId);
|
||||
identifierByReferenceId.put(referenceId, identifier);
|
||||
return referenceId;
|
||||
})));
|
||||
|
||||
return existingDbMono.switchIfEmpty(dbCreationMono);
|
||||
} else {
|
||||
return existingDbMono
|
||||
.switchIfEmpty(Mono.error(() -> new NoSuchElementException("Resource not found: " + identifier)));
|
||||
}
|
||||
}
|
||||
|
||||
public Mono<ReferencedResource<V>> getResource(I identifier, @Nullable E extraParams) {
|
||||
Mono<ReferencedResource<V>> existingDbMono = Mono.fromSupplier(() -> {
|
||||
var referenceId = identifierToReferenceId.get(identifier);
|
||||
if (referenceId == null) {
|
||||
return null;
|
||||
}
|
||||
var resource = resourceByReferenceId.get(referenceId);
|
||||
if (resource == null) {
|
||||
return null;
|
||||
}
|
||||
return new ReferencedResource<>(referenceId, resource);
|
||||
});
|
||||
|
||||
if (extraParams != null) {
|
||||
// Defer to avoid building this chain when not needed
|
||||
Mono<ReferencedResource<V>> dbCreationMono = Mono.defer(() -> this
|
||||
.obtain(identifier, extraParams)
|
||||
.map(resource -> {
|
||||
long referenceId = nextReferenceId.getAndIncrement();
|
||||
resourceByReferenceId.put(referenceId, resource);
|
||||
identifierToReferenceId.put(identifier, referenceId);
|
||||
identifierByReferenceId.put(referenceId, identifier);
|
||||
return new ReferencedResource<>(referenceId, resource);
|
||||
}));
|
||||
|
||||
return existingDbMono.switchIfEmpty(dbCreationMono);
|
||||
} else {
|
||||
return existingDbMono
|
||||
.switchIfEmpty(Mono.error(() -> new NoSuchElementException("Resource not found: " + identifier)));
|
||||
}
|
||||
}
|
||||
|
||||
public Mono<V> getResource(long referenceId) {
|
||||
Mono<V> existingDbMono = Mono.fromSupplier(() -> resourceByReferenceId.get(referenceId));
|
||||
return existingDbMono.switchIfEmpty(Mono.error(() ->
|
||||
new NoSuchElementException("Resource not found: " + referenceId)));
|
||||
}
|
||||
|
||||
public Mono<Void> releaseResource(I identifier) {
|
||||
return Mono.fromSupplier(() -> {
|
||||
var referenceId = identifierToReferenceId.remove(identifier);
|
||||
if (referenceId == null) {
|
||||
return null;
|
||||
}
|
||||
identifierByReferenceId.remove(referenceId);
|
||||
return resourceByReferenceId.remove(referenceId);
|
||||
}).flatMap(this::free);
|
||||
}
|
||||
|
||||
public Mono<Void> releaseResource(long referenceId) {
|
||||
return Mono.<V>fromSupplier(() -> {
|
||||
var identifier = identifierByReferenceId.remove(referenceId);
|
||||
if (identifier == null) {
|
||||
return null;
|
||||
}
|
||||
identifierToReferenceId.remove(identifier);
|
||||
return resourceByReferenceId.remove(referenceId);
|
||||
}).flatMap(this::free);
|
||||
}
|
||||
}
|
@ -0,0 +1,8 @@
|
||||
package it.cavallium.dbengine.database.remote;
|
||||
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public interface ResourceFreer<V> {
|
||||
|
||||
Mono<Void> free(V resource);
|
||||
}
|
@ -0,0 +1,8 @@
|
||||
package it.cavallium.dbengine.database.remote;
|
||||
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public interface ResourceGetter<I, E, V> {
|
||||
|
||||
Mono<? extends V> obtain(I identifier, E extra);
|
||||
}
|
@ -12,7 +12,7 @@
|
||||
<Logger name="io.net5" level="INFO" additivity="false"/>
|
||||
-->
|
||||
|
||||
<Root level="INFO">
|
||||
<Root level="DEBUG">
|
||||
<filters>
|
||||
<MarkerFilter marker="NETWORK_PACKETS" onMatch="DENY" onMismatch="NEUTRAL"/>
|
||||
</filters>
|
||||
|
@ -0,0 +1,112 @@
|
||||
package it.cavallium.dbengine.database.remote;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
|
||||
import io.net5.buffer.api.BufferAllocator;
|
||||
import io.net5.buffer.api.DefaultBufferAllocators;
|
||||
import it.cavallium.data.generator.nativedata.Nullableboolean;
|
||||
import it.cavallium.data.generator.nativedata.Nullableint;
|
||||
import it.cavallium.data.generator.nativedata.Nullablelong;
|
||||
import it.cavallium.dbengine.client.IndicizerAnalyzers;
|
||||
import it.cavallium.dbengine.client.IndicizerSimilarities;
|
||||
import it.cavallium.dbengine.database.ColumnUtils;
|
||||
import it.cavallium.dbengine.database.LLDatabaseConnection;
|
||||
import it.cavallium.dbengine.rpc.current.data.ByteBuffersDirectory;
|
||||
import it.cavallium.dbengine.rpc.current.data.DatabaseOptions;
|
||||
import it.cavallium.dbengine.rpc.current.data.LuceneIndexStructure;
|
||||
import it.cavallium.dbengine.rpc.current.data.LuceneOptions;
|
||||
import it.unimi.dsi.fastutil.ints.IntList;
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
public class LLQuicConnectionTest {
|
||||
|
||||
private BufferAllocator allocator;
|
||||
private CompositeMeterRegistry meterRegistry;
|
||||
private TestDbServer server;
|
||||
private LLDatabaseConnection client;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
this.allocator = DefaultBufferAllocators.preferredAllocator();
|
||||
this.meterRegistry = new CompositeMeterRegistry();
|
||||
this.server = TestDbServer.create(allocator, meterRegistry);
|
||||
server.bind().block();
|
||||
this.client = TestDbClient.create(allocator, meterRegistry, server.address()).connect().block();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
void tearDown() {
|
||||
if (client != null) {
|
||||
client.disconnect().block();
|
||||
}
|
||||
if (server != null) {
|
||||
server.dispose().block();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void getAllocator() {
|
||||
assertEquals(allocator, client.getAllocator());
|
||||
}
|
||||
|
||||
@Test
|
||||
void getMeterRegistry() {
|
||||
assertEquals(meterRegistry, client.getMeterRegistry());
|
||||
}
|
||||
|
||||
@Test
|
||||
void getDatabase() {
|
||||
var dbName = "test-temp-db";
|
||||
var singletonsColumnName = "singletons";
|
||||
var db = client.getDatabase(dbName,
|
||||
List.of(ColumnUtils.special(singletonsColumnName)),
|
||||
new DatabaseOptions(List.of(),
|
||||
Map.of(),
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
Nullableint.empty(),
|
||||
Nullablelong.empty(),
|
||||
Nullablelong.empty(),
|
||||
Nullableboolean.empty()
|
||||
)
|
||||
).blockOptional().orElseThrow();
|
||||
assertEquals(dbName, db.getDatabaseName());
|
||||
assertEquals(allocator, db.getAllocator());
|
||||
assertEquals(meterRegistry, db.getMeterRegistry());
|
||||
assertDoesNotThrow(() -> db.close().block());
|
||||
}
|
||||
|
||||
@Test
|
||||
void getLuceneIndex() {
|
||||
var shardName = "test-lucene-shard";
|
||||
var index = client.getLuceneIndex(shardName,
|
||||
new LuceneIndexStructure(1, IntList.of(0)),
|
||||
IndicizerAnalyzers.of(),
|
||||
IndicizerSimilarities.of(),
|
||||
new LuceneOptions(Map.of(),
|
||||
Duration.ofSeconds(1),
|
||||
Duration.ofSeconds(1),
|
||||
false,
|
||||
new ByteBuffersDirectory(),
|
||||
-1,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
100
|
||||
),
|
||||
null).blockOptional().orElseThrow();
|
||||
assertEquals(shardName, index.getLuceneIndexName());
|
||||
assertDoesNotThrow(() -> index.close().block());
|
||||
}
|
||||
}
|
@ -0,0 +1,18 @@
|
||||
package it.cavallium.dbengine.database.remote;
|
||||
|
||||
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
|
||||
import io.net5.buffer.api.BufferAllocator;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
public class TestDbClient {
|
||||
|
||||
public static LLQuicConnection create(BufferAllocator allocator,
|
||||
CompositeMeterRegistry meterRegistry,
|
||||
InetSocketAddress serverAddress) {
|
||||
return new LLQuicConnection(allocator,
|
||||
meterRegistry,
|
||||
new InetSocketAddress(0),
|
||||
serverAddress
|
||||
);
|
||||
}
|
||||
}
|
@ -0,0 +1,71 @@
|
||||
package it.cavallium.dbengine.database.remote;
|
||||
|
||||
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
|
||||
import io.net5.buffer.api.BufferAllocator;
|
||||
import io.netty.handler.ssl.ClientAuth;
|
||||
import io.netty.handler.ssl.util.SelfSignedCertificate;
|
||||
import io.netty.incubator.codec.quic.InsecureQuicTokenHandler;
|
||||
import io.netty.incubator.codec.quic.QuicConnectionIdGenerator;
|
||||
import io.netty.incubator.codec.quic.QuicSslContext;
|
||||
import io.netty.incubator.codec.quic.QuicSslContextBuilder;
|
||||
import it.cavallium.dbengine.database.LLDatabaseConnection;
|
||||
import it.cavallium.dbengine.database.memory.LLMemoryDatabaseConnection;
|
||||
import it.cavallium.dbengine.lucene.LuceneRocksDBManager;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.security.cert.CertificateException;
|
||||
import java.time.Duration;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
import reactor.netty.incubator.quic.QuicServer;
|
||||
|
||||
public class TestDbServer extends QuicRPCServer {
|
||||
|
||||
private static final InetSocketAddress BIND_ADDRESS = new InetSocketAddress(8080);
|
||||
|
||||
public TestDbServer(LuceneRocksDBManager rocksDBManager, LLDatabaseConnection localDb, QuicServer quicServer) {
|
||||
super(rocksDBManager, localDb, quicServer);
|
||||
}
|
||||
|
||||
public static TestDbServer create(BufferAllocator allocator, CompositeMeterRegistry meterRegistry) {
|
||||
var rocksDBManager = new LuceneRocksDBManager();
|
||||
var localDb = new LLMemoryDatabaseConnection(allocator, meterRegistry);
|
||||
SelfSignedCertificate selfSignedCert;
|
||||
try {
|
||||
selfSignedCert = new SelfSignedCertificate();
|
||||
} catch (CertificateException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
QuicSslContext sslContext = QuicSslContextBuilder
|
||||
.forServer(selfSignedCert.key(), null, selfSignedCert.cert())
|
||||
.applicationProtocols("db/0.9")
|
||||
.clientAuth(ClientAuth.NONE)
|
||||
.build();
|
||||
var qs = reactor.netty.incubator.quic.QuicServer
|
||||
.create()
|
||||
.tokenHandler(InsecureQuicTokenHandler.INSTANCE)
|
||||
.bindAddress(() -> BIND_ADDRESS)
|
||||
.secure(sslContext)
|
||||
.idleTimeout(Duration.ofSeconds(30))
|
||||
.connectionIdAddressGenerator(QuicConnectionIdGenerator.randomGenerator())
|
||||
.initialSettings(spec -> spec
|
||||
.maxData(10000000)
|
||||
.maxStreamDataBidirectionalLocal(1000000)
|
||||
.maxStreamDataBidirectionalRemote(1000000)
|
||||
.maxStreamsBidirectional(100)
|
||||
.maxStreamsUnidirectional(100)
|
||||
);
|
||||
return new TestDbServer(rocksDBManager, localDb, qs);
|
||||
}
|
||||
|
||||
public InetSocketAddress address() {
|
||||
return BIND_ADDRESS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> dispose() {
|
||||
var closeManager = Mono
|
||||
.<Void>fromRunnable(rocksDBManager::closeAll)
|
||||
.subscribeOn(Schedulers.boundedElastic());
|
||||
return super.dispose().then(closeManager).then(localDb.disconnect());
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user