diff --git a/src/main/data-generator/quic-rpc.yaml b/src/main/data-generator/quic-rpc.yaml index 4315763..148ade0 100644 --- a/src/main/data-generator/quic-rpc.yaml +++ b/src/main/data-generator/quic-rpc.yaml @@ -33,7 +33,9 @@ versions: SingletonSet, SingletonUpdateInit, SingletonUpdateEnd, - RPCCrash + RPCCrash, + CloseDatabase, + CloseLuceneIndex ] ServerBoundRequest: [ GetDatabase, @@ -42,7 +44,9 @@ versions: GetSingleton, SingletonGet, SingletonSet, - SingletonUpdateInit + SingletonUpdateInit, + CloseDatabase, + CloseLuceneIndex ] ClientBoundResponse: [ Empty, @@ -115,6 +119,9 @@ versions: String2FieldAnalyzerMap: javaClass: java.util.Map serializer: it.cavallium.dbengine.database.remote.String2FieldAnalyzerMapSerializer + String2FieldSimilarityMap: + javaClass: java.util.Map + serializer: it.cavallium.dbengine.database.remote.String2FieldSimilarityMapSerializer String2ColumnFamilyHandleMap: javaClass: java.util.Map serializer: it.cavallium.dbengine.database.remote.String2ColumnFamilyHandleMapSerializer @@ -132,11 +139,11 @@ versions: databaseOptions: DatabaseOptions GetLuceneIndex: data: - clusterName: -String - shardName: -String - instancesCount: int + clusterName: String + structure: LuceneIndexStructure indicizerAnalyzers: IndicizerAnalyzers indicizerSimilarities: IndicizerSimilarities + luceneOptions: LuceneOptions Disconnect: { data: { } } GetSingleton: data: @@ -160,6 +167,12 @@ versions: data: exist: boolean value: byte[] + CloseDatabase: + data: + databaseId: long + CloseLuceneIndex: + data: + luceneIndexId: long # Client-bound responses @@ -183,6 +196,17 @@ versions: # Data + LuceneIndexStructure: + data: + totalShards: int + activeShards: int[] + SingleIndex: + data: + name: String + ClusteredShardIndex: + data: + clusterName: String + shard: int BinaryOptional: data: val: -Binary @@ -216,13 +240,10 @@ versions: data: defaultAnalyzer: TextFieldsAnalyzer fieldAnalyzer: String2FieldAnalyzerMap - indicizerSimilarities: IndicizerSimilarities - luceneOptions: LuceneOptions - luceneHacks: LuceneHacks IndicizerSimilarities: data: defaultSimilarity: TextFieldsSimilarity - fieldSimilarity: String2FieldAnalyzerMap + fieldSimilarity: String2FieldSimilarityMap LuceneOptions: data: extraFlags: StringMap @@ -253,8 +274,7 @@ versions: blockSize: int RocksDBSharedDirectory: data: - db: RocksDB - handles: String2ColumnFamilyHandleMap + managedPath: Path blockSize: int NRTCachingDirectory: data: diff --git a/src/main/java/it/cavallium/dbengine/client/Indicizer.java b/src/main/java/it/cavallium/dbengine/client/Indicizer.java index c0abac2..d2c523a 100644 --- a/src/main/java/it/cavallium/dbengine/client/Indicizer.java +++ b/src/main/java/it/cavallium/dbengine/client/Indicizer.java @@ -8,6 +8,8 @@ import it.cavallium.dbengine.database.LLUpdateDocument; import it.cavallium.dbengine.database.LLTerm; import it.cavallium.dbengine.database.LLUpdateFields; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers; +import it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities; import java.util.Map; import java.util.Set; import org.apache.lucene.util.BytesRef; diff --git a/src/main/java/it/cavallium/dbengine/client/IndicizerAnalyzers.java b/src/main/java/it/cavallium/dbengine/client/IndicizerAnalyzers.java index 0a2903e..dca3903 100644 --- a/src/main/java/it/cavallium/dbengine/client/IndicizerAnalyzers.java +++ b/src/main/java/it/cavallium/dbengine/client/IndicizerAnalyzers.java @@ -1,19 +1,20 @@ package it.cavallium.dbengine.client; import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer; +import it.cavallium.dbengine.rpc.current.serializers.IndicizerAnalyzersSerializer; import java.util.Map; -public record IndicizerAnalyzers(TextFieldsAnalyzer defaultAnalyzer, Map fieldAnalyzer) { +public class IndicizerAnalyzers { - public static IndicizerAnalyzers of() { + public static it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers of() { return of(TextFieldsAnalyzer.ICUCollationKey); } - public static IndicizerAnalyzers of(TextFieldsAnalyzer defaultAnalyzer) { + public static it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers of(TextFieldsAnalyzer defaultAnalyzer) { return of(defaultAnalyzer, Map.of()); } - public static IndicizerAnalyzers of(TextFieldsAnalyzer defaultAnalyzer, Map fieldAnalyzer) { - return new IndicizerAnalyzers(defaultAnalyzer, fieldAnalyzer); + public static it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers of(TextFieldsAnalyzer defaultAnalyzer, Map fieldAnalyzer) { + return new it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers(defaultAnalyzer, fieldAnalyzer); } } diff --git a/src/main/java/it/cavallium/dbengine/client/IndicizerSimilarities.java b/src/main/java/it/cavallium/dbengine/client/IndicizerSimilarities.java index abe1347..11be179 100644 --- a/src/main/java/it/cavallium/dbengine/client/IndicizerSimilarities.java +++ b/src/main/java/it/cavallium/dbengine/client/IndicizerSimilarities.java @@ -1,20 +1,20 @@ package it.cavallium.dbengine.client; -import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer; import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity; import java.util.Map; -public record IndicizerSimilarities(TextFieldsSimilarity defaultSimilarity, Map fieldSimilarity) { +public class IndicizerSimilarities { - public static IndicizerSimilarities of() { + public static it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities of() { return of(TextFieldsSimilarity.BM25Standard); } - public static IndicizerSimilarities of(TextFieldsSimilarity defaultSimilarity) { + public static it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities of(TextFieldsSimilarity defaultSimilarity) { return of(defaultSimilarity, Map.of()); } - public static IndicizerSimilarities of(TextFieldsSimilarity defaultSimilarity, Map fieldSimilarity) { - return new IndicizerSimilarities(defaultSimilarity, fieldSimilarity); + public static it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities of(TextFieldsSimilarity defaultSimilarity, + Map fieldSimilarity) { + return it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities.of(defaultSimilarity, fieldSimilarity); } } diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java index 5e4de64..5af5531 100644 --- a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java +++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java @@ -1,20 +1,26 @@ package it.cavallium.dbengine.client; +import io.net5.buffer.api.Resource; import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.query.ClientQueryParams; import it.cavallium.dbengine.client.query.current.data.Query; import it.cavallium.dbengine.client.query.current.data.QueryParams; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; +import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.database.LLSearchResultShard; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLTerm; +import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.collector.Buckets; import it.cavallium.dbengine.lucene.searcher.BucketParams; import it.unimi.dsi.fastutil.doubles.DoubleArrayList; +import it.unimi.dsi.fastutil.objects.ObjectArrayList; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Set; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -49,12 +55,9 @@ public class LuceneIndexImpl implements LuceneIndex { @Override public Mono addDocuments(Flux> entries) { - return luceneIndex - .addDocuments(entries - .flatMap(entry -> indicizer - .toDocument(entry.getKey(), entry.getValue()) - .map(doc -> Map.entry(indicizer.toIndex(entry.getKey()), doc))) - ); + return luceneIndex.addDocuments(entries.flatMap(entry -> indicizer + .toDocument(entry.getKey(), entry.getValue()) + .map(doc -> Map.entry(indicizer.toIndex(entry.getKey()), doc)))); } @Override @@ -72,13 +75,9 @@ public class LuceneIndexImpl implements LuceneIndex { @Override public Mono updateDocuments(Flux> entries) { - return luceneIndex - .updateDocuments(entries - .flatMap(entry -> indicizer - .toDocument(entry.getKey(), entry.getValue()) - .map(doc -> Map.entry(indicizer.toIndex(entry.getKey()), doc))) - .collectMap(Entry::getKey, Entry::getValue) - ); + return luceneIndex.updateDocuments(entries.flatMap(entry -> indicizer + .toDocument(entry.getKey(), entry.getValue()) + .map(doc -> Map.entry(indicizer.toIndex(entry.getKey()), doc)))); } @Override @@ -99,6 +98,8 @@ public class LuceneIndexImpl implements LuceneIndex { indicizer.getKeyFieldName(), mltDocumentFields ) + .collectList() + .flatMap(LuceneIndexImpl::mergeResults) .map(this::mapResults) .single(); } @@ -110,6 +111,8 @@ public class LuceneIndexImpl implements LuceneIndex { queryParams.toQueryParams(), indicizer.getKeyFieldName() ) + .collectList() + .flatMap(LuceneIndexImpl::mergeResults) .map(this::mapResults) .single(); } @@ -119,17 +122,12 @@ public class LuceneIndexImpl implements LuceneIndex { @NotNull List query, @Nullable Query normalizationQuery, BucketParams bucketParams) { - return luceneIndex - .computeBuckets(resolveSnapshot(snapshot), - query, normalizationQuery, - bucketParams - ) - .single(); + return luceneIndex.computeBuckets(resolveSnapshot(snapshot), query, + normalizationQuery, bucketParams).single(); } private Hits> mapResults(LLSearchResultShard llSearchResult) { - var scoresWithKeysFlux = llSearchResult - .results() + var scoresWithKeysFlux = llSearchResult.results() .map(hit -> new HitKey<>(indicizer.getKey(hit.key()), hit.score())); return new Hits<>(scoresWithKeysFlux, llSearchResult.totalHitsCount(), llSearchResult::close); @@ -182,4 +180,34 @@ public class LuceneIndexImpl implements LuceneIndex { public Mono releaseSnapshot(LLSnapshot snapshot) { return luceneIndex.releaseSnapshot(snapshot); } + + private static Mono mergeResults(List shards) { + return Mono.fromCallable(() -> { + TotalHitsCount count = null; + ObjectArrayList> results = new ObjectArrayList<>(shards.size()); + ObjectArrayList> resources = new ObjectArrayList<>(shards.size()); + for (LLSearchResultShard shard : shards) { + if (count == null) { + count = shard.totalHitsCount(); + } else { + count = LuceneUtils.sum(count, shard.totalHitsCount()); + } + results.add(shard.results()); + resources.add(shard); + } + Objects.requireNonNull(count); + var resultsFlux = Flux.zip(results, parts -> { + var arr = new ArrayList(parts.length); + for (Object part : parts) { + arr.add((LLKeyScore) part); + } + return arr; + }).concatMapIterable(list -> list); + return new LLSearchResultShard(resultsFlux, count, () -> { + for (Resource resource : resources) { + resource.close(); + } + }); + }); + } } diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneOptions.java b/src/main/java/it/cavallium/dbengine/client/LuceneOptions.java deleted file mode 100644 index fdd9311..0000000 --- a/src/main/java/it/cavallium/dbengine/client/LuceneOptions.java +++ /dev/null @@ -1,21 +0,0 @@ -package it.cavallium.dbengine.client; - -import io.soabase.recordbuilder.core.RecordBuilder; -import java.time.Duration; -import java.util.Map; -import java.util.Optional; -import org.apache.lucene.store.Directory; -import org.jetbrains.annotations.Nullable; - -@RecordBuilder -public record LuceneOptions(Map extraFlags, - Duration queryRefreshDebounceTime, - Duration commitDebounceTime, - boolean lowMemory, - LuceneDirectoryOptions directoryOptions, - long indexWriterBufferSize, - boolean applyAllDeletes, - boolean writeAllDeletes, - boolean allowNonVolatileCollection, - int maxInMemoryResultEntries) { -} diff --git a/src/main/java/it/cavallium/dbengine/database/LLDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/LLDatabaseConnection.java index 89dcefb..14b1fd9 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLDatabaseConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/LLDatabaseConnection.java @@ -2,13 +2,16 @@ package it.cavallium.dbengine.database; import io.micrometer.core.instrument.MeterRegistry; import io.net5.buffer.api.BufferAllocator; -import it.cavallium.dbengine.client.IndicizerAnalyzers; -import it.cavallium.dbengine.client.IndicizerSimilarities; -import it.cavallium.dbengine.client.LuceneOptions; import it.cavallium.dbengine.lucene.LuceneHacks; +import it.cavallium.dbengine.lucene.LuceneRocksDBManager; import it.cavallium.dbengine.rpc.current.data.Column; import it.cavallium.dbengine.rpc.current.data.DatabaseOptions; +import it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers; +import it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities; +import it.cavallium.dbengine.rpc.current.data.LuceneIndexStructure; +import it.cavallium.dbengine.rpc.current.data.LuceneOptions; import java.util.List; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; @@ -25,9 +28,8 @@ public interface LLDatabaseConnection { List columns, DatabaseOptions databaseOptions); - Mono getLuceneIndex(@Nullable String clusterName, - @Nullable String shardName, - int instancesCount, + Mono getLuceneIndex(String clusterName, + LuceneIndexStructure indexStructure, IndicizerAnalyzers indicizerAnalyzers, IndicizerSimilarities indicizerSimilarities, LuceneOptions luceneOptions, diff --git a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java index bb91a3a..f1585e7 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java @@ -32,7 +32,7 @@ public interface LLLuceneIndex extends LLSnapshottable { Mono update(LLTerm id, LLIndexRequest request); - Mono updateDocuments(Mono> documents); + Mono updateDocuments(Flux> documents); Mono deleteAll(); @@ -43,7 +43,7 @@ public interface LLLuceneIndex extends LLSnapshottable { * The additional query will be used with the moreLikeThis query: "mltQuery AND additionalQuery" * @return the collection has one or more flux */ - Mono moreLikeThis(@Nullable LLSnapshot snapshot, + Flux moreLikeThis(@Nullable LLSnapshot snapshot, QueryParams queryParams, @Nullable String keyFieldName, Multimap mltDocumentFields); @@ -53,7 +53,7 @@ public interface LLLuceneIndex extends LLSnapshottable { * returned can be at most limit * 15 * @return the collection has one or more flux */ - Mono search(@Nullable LLSnapshot snapshot, + Flux search(@Nullable LLSnapshot snapshot, QueryParams queryParams, @Nullable String keyFieldName); diff --git a/src/main/java/it/cavallium/dbengine/database/LLMultiDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/LLMultiDatabaseConnection.java index b7bbd2f..06a9879 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLMultiDatabaseConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/LLMultiDatabaseConnection.java @@ -8,10 +8,16 @@ import it.cavallium.dbengine.client.ConnectionSettings.ConnectionPart.Connection import it.cavallium.dbengine.client.ConnectionSettings.ConnectionPart.ConnectionPartRocksDB; import it.cavallium.dbengine.client.IndicizerAnalyzers; import it.cavallium.dbengine.client.IndicizerSimilarities; -import it.cavallium.dbengine.client.LuceneOptions; import it.cavallium.dbengine.lucene.LuceneHacks; +import it.cavallium.dbengine.lucene.LuceneRocksDBManager; +import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.rpc.current.data.Column; import it.cavallium.dbengine.rpc.current.data.DatabaseOptions; +import it.cavallium.dbengine.rpc.current.data.LuceneIndexStructure; +import it.cavallium.dbengine.rpc.current.data.LuceneOptions; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import it.unimi.dsi.fastutil.ints.IntOpenHashSet; +import it.unimi.dsi.fastutil.ints.IntSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -24,6 +30,7 @@ import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; public class LLMultiDatabaseConnection implements LLDatabaseConnection { @@ -108,25 +115,75 @@ public class LLMultiDatabaseConnection implements LLDatabaseConnection { } @Override - public Mono getLuceneIndex(@Nullable String clusterName, - @Nullable String shardName, - int instancesCount, - IndicizerAnalyzers indicizerAnalyzers, - IndicizerSimilarities indicizerSimilarities, + public Mono getLuceneIndex(String clusterName, + LuceneIndexStructure indexStructure, + it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers indicizerAnalyzers, + it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities indicizerSimilarities, LuceneOptions luceneOptions, @Nullable LuceneHacks luceneHacks) { - String indexShardName = Objects.requireNonNullElse(shardName, clusterName); - Objects.requireNonNull(indexShardName, "ClusterName and ShardName are both null"); - LLDatabaseConnection conn = luceneShardConnections.getOrDefault(indexShardName, defaultLuceneConnection); - Objects.requireNonNull(conn, "Null connection"); - return conn.getLuceneIndex(clusterName, - shardName, - instancesCount, - indicizerAnalyzers, - indicizerSimilarities, - luceneOptions, - luceneHacks - ); + IntSet registeredShards = new IntOpenHashSet(); + Map connectionToShardMap = new HashMap<>(); + for (int activeShard : indexStructure.activeShards()) { + if (activeShard >= indexStructure.totalShards()) { + throw new IllegalArgumentException( + "ActiveShard " + activeShard + " is bigger than total shards count " + indexStructure.totalShards()); + } + if (!registeredShards.add(activeShard)) { + throw new IllegalArgumentException("ActiveShard " + activeShard + " has been specified twice"); + } + var shardName = LuceneUtils.getStandardName(clusterName, activeShard); + var connection = luceneShardConnections.getOrDefault(shardName, defaultLuceneConnection); + Objects.requireNonNull(connection, "Null connection"); + connectionToShardMap.computeIfAbsent(connection, k -> new IntOpenHashSet()).add(activeShard); + } + if (connectionToShardMap.keySet().size() == 1) { + return connectionToShardMap + .keySet() + .stream() + .findFirst() + .orElseThrow() + .getLuceneIndex(clusterName, + indexStructure, + indicizerAnalyzers, + indicizerSimilarities, + luceneOptions, + luceneHacks + ); + } else { + return Flux + .fromIterable(connectionToShardMap.entrySet()) + .flatMap(entry -> { + var connectionIndexStructure = indexStructure + .setActiveShards(new IntArrayList(entry.getValue())); + + var connIndex = entry.getKey() + .getLuceneIndex(clusterName, + connectionIndexStructure, + indicizerAnalyzers, + indicizerSimilarities, + luceneOptions, + luceneHacks + ).cache().repeat(); + return Flux + .fromIterable(entry.getValue()) + .zipWith(connIndex); + }) + .collectList() + .map(indices -> { + var luceneIndices = new LLLuceneIndex[indexStructure.totalShards()]; + for (Tuple2 index : indices) { + luceneIndices[index.getT1()] = index.getT2(); + } + return new LLMultiLuceneIndex(clusterName, + indexStructure, + indicizerAnalyzers, + indicizerSimilarities, + luceneOptions, + luceneHacks, + luceneIndices + ); + }); + } } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java new file mode 100644 index 0000000..b4d6a9a --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java @@ -0,0 +1,234 @@ +package it.cavallium.dbengine.database; + +import com.google.common.collect.Multimap; +import io.net5.buffer.api.Resource; +import it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers; +import it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities; +import it.cavallium.dbengine.client.query.current.data.Query; +import it.cavallium.dbengine.client.query.current.data.QueryParams; +import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; +import it.cavallium.dbengine.lucene.LuceneHacks; +import it.cavallium.dbengine.lucene.LuceneUtils; +import it.cavallium.dbengine.lucene.collector.Buckets; +import it.cavallium.dbengine.lucene.searcher.BucketParams; +import it.cavallium.dbengine.rpc.current.data.LuceneIndexStructure; +import it.cavallium.dbengine.rpc.current.data.LuceneOptions; +import it.unimi.dsi.fastutil.doubles.DoubleArrayList; +import it.unimi.dsi.fastutil.objects.ObjectArrayList; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class LLMultiLuceneIndex implements LLLuceneIndex { + + + private final ConcurrentHashMap> registeredSnapshots = new ConcurrentHashMap<>(); + private final AtomicLong nextSnapshotNumber = new AtomicLong(1); + + private final String clusterName; + private final LuceneIndexStructure indexStructure; + private final IndicizerAnalyzers indicizerAnalyzers; + private final IndicizerSimilarities indicizerSimilarities; + private final LuceneOptions luceneOptions; + private final LuceneHacks luceneHacks; + private final LLLuceneIndex[] luceneIndicesById; + private final List luceneIndicesSet; + private final int totalShards; + private final Flux luceneIndicesFlux; + + public LLMultiLuceneIndex(String clusterName, + LuceneIndexStructure indexStructure, + IndicizerAnalyzers indicizerAnalyzers, + IndicizerSimilarities indicizerSimilarities, + LuceneOptions luceneOptions, + LuceneHacks luceneHacks, + LLLuceneIndex[] luceneIndices) { + this.clusterName = clusterName; + this.indexStructure = indexStructure; + this.indicizerAnalyzers = indicizerAnalyzers; + this.indicizerSimilarities = indicizerSimilarities; + this.luceneOptions = luceneOptions; + this.luceneHacks = luceneHacks; + this.luceneIndicesById = luceneIndices; + this.totalShards = indexStructure.totalShards(); + var luceneIndicesSet = new HashSet(); + for (LLLuceneIndex luceneIndex : luceneIndices) { + if (luceneIndex != null) { + luceneIndicesSet.add(luceneIndex); + } + } + this.luceneIndicesSet = new ArrayList<>(luceneIndicesSet); + this.luceneIndicesFlux = Flux.fromIterable(luceneIndicesSet); + } + + @Override + public String getLuceneIndexName() { + return clusterName; + } + + private LLLuceneIndex getLuceneIndex(LLTerm id) { + return luceneIndicesById[LuceneUtils.getLuceneIndexId(id, totalShards)]; + } + + @Override + public Mono addDocument(LLTerm id, LLUpdateDocument doc) { + return getLuceneIndex(id).addDocument(id, doc); + } + + @Override + public Mono addDocuments(Flux> documents) { + return documents + .groupBy(term -> LuceneUtils.getLuceneIndexId(term.getKey(), totalShards)) + .flatMap(group -> { + var index = luceneIndicesById[group.key()]; + return index.addDocuments(group); + }) + .then(); + } + + @Override + public Mono deleteDocument(LLTerm id) { + return getLuceneIndex(id).deleteDocument(id); + } + + @Override + public Mono update(LLTerm id, LLIndexRequest request) { + return getLuceneIndex(id).update(id, request); + } + + @Override + public Mono updateDocuments(Flux> documents) { + return documents + .groupBy(term -> LuceneUtils.getLuceneIndexId(term.getKey(), totalShards)) + .flatMap(group -> { + var index = luceneIndicesById[group.key()]; + return index.updateDocuments(group); + }) + .then(); + } + + @Override + public Mono deleteAll() { + return luceneIndicesFlux.flatMap(LLLuceneIndex::deleteAll).then(); + } + + @Override + public Flux moreLikeThis(@Nullable LLSnapshot snapshot, + QueryParams queryParams, + @Nullable String keyFieldName, + Multimap mltDocumentFields) { + return luceneIndicesFlux.flatMap(luceneIndex -> luceneIndex.moreLikeThis(snapshot, + queryParams, + keyFieldName, + mltDocumentFields + )); + } + + private Mono mergeShards(List shards) { + return Mono.fromCallable(() -> { + List seriesValues = new ArrayList<>(); + DoubleArrayList totals = new DoubleArrayList(shards.get(0).totals()); + + for (Buckets shard : shards) { + if (seriesValues.isEmpty()) { + seriesValues.addAll(shard.seriesValues()); + } else { + for (int serieIndex = 0; serieIndex < seriesValues.size(); serieIndex++) { + DoubleArrayList mergedSerieValues = seriesValues.get(serieIndex); + for (int dataIndex = 0; dataIndex < mergedSerieValues.size(); dataIndex++) { + mergedSerieValues.set(dataIndex, mergedSerieValues.getDouble(dataIndex) + + shard.seriesValues().get(serieIndex).getDouble(dataIndex) + ); + } + } + } + for (int i = 0; i < totals.size(); i++) { + totals.set(i, totals.getDouble(i) + shard.totals().getDouble(i)); + } + } + return new Buckets(seriesValues, totals); + }); + } + + @Override + public Flux search(@Nullable LLSnapshot snapshot, + QueryParams queryParams, + @Nullable String keyFieldName) { + return luceneIndicesFlux.flatMap(luceneIndex -> luceneIndex.search(snapshot, + queryParams, + keyFieldName + )); + } + + @Override + public Mono computeBuckets(@Nullable LLSnapshot snapshot, + @NotNull List queries, + @Nullable Query normalizationQuery, + BucketParams bucketParams) { + return luceneIndicesFlux.flatMap(luceneIndex -> luceneIndex.computeBuckets(snapshot, + queries, + normalizationQuery, + bucketParams + )).collectList().flatMap(this::mergeShards); + } + + @Override + public Mono count(@Nullable LLSnapshot snapshot, Query query) { + return LLLuceneIndex.super.count(snapshot, query); + } + + @Override + public boolean isLowMemoryMode() { + return luceneOptions.lowMemory(); + } + + @Override + public Mono close() { + return luceneIndicesFlux.flatMap(LLLuceneIndex::close).then(); + } + + @Override + public Mono flush() { + return luceneIndicesFlux.flatMap(LLLuceneIndex::flush).then(); + } + + @Override + public Mono refresh(boolean force) { + return luceneIndicesFlux.flatMap(index -> index.refresh(force)).then(); + } + + @Override + public Mono takeSnapshot() { + return Mono + // Generate next snapshot index + .fromCallable(nextSnapshotNumber::getAndIncrement) + .flatMap(snapshotIndex -> luceneIndicesFlux + .flatMapSequential(LLLuceneIndex::takeSnapshot) + .collectList() + .doOnNext(instancesSnapshotsArray -> registeredSnapshots.put(snapshotIndex, instancesSnapshotsArray)) + .thenReturn(new LLSnapshot(snapshotIndex)) + ); + } + + @Override + public Mono releaseSnapshot(LLSnapshot snapshot) { + return Mono + .fromCallable(() -> registeredSnapshots.remove(snapshot.getSequenceNumber())) + .flatMapIterable(list -> list) + .index() + .flatMap(tuple -> { + int index = (int) (long) tuple.getT1(); + LLSnapshot instanceSnapshot = tuple.getT2(); + return luceneIndicesSet.get(index).releaseSnapshot(instanceSnapshot); + }) + .then(); + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java index bd917dd..54e71d0 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java @@ -2,15 +2,17 @@ package it.cavallium.dbengine.database.disk; import io.micrometer.core.instrument.MeterRegistry; import io.net5.buffer.api.BufferAllocator; -import it.cavallium.dbengine.client.IndicizerAnalyzers; -import it.cavallium.dbengine.client.IndicizerSimilarities; -import it.cavallium.dbengine.client.LuceneOptions; import it.cavallium.dbengine.database.LLDatabaseConnection; import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.lucene.LuceneHacks; +import it.cavallium.dbengine.lucene.LuceneRocksDBManager; import it.cavallium.dbengine.netty.JMXNettyMonitoringManager; import it.cavallium.dbengine.rpc.current.data.Column; import it.cavallium.dbengine.rpc.current.data.DatabaseOptions; +import it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers; +import it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities; +import it.cavallium.dbengine.rpc.current.data.LuceneIndexStructure; +import it.cavallium.dbengine.rpc.current.data.LuceneOptions; import java.nio.file.Files; import java.nio.file.Path; import java.util.LinkedList; @@ -33,16 +35,19 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection { private final MeterRegistry meterRegistry; private final Path basePath; private final boolean inMemory; + private final LuceneRocksDBManager rocksDBManager; private final AtomicReference env = new AtomicReference<>(); public LLLocalDatabaseConnection(BufferAllocator allocator, MeterRegistry meterRegistry, Path basePath, - boolean inMemory) { + boolean inMemory, + LuceneRocksDBManager rocksDBManager) { this.allocator = allocator; this.meterRegistry = meterRegistry; this.basePath = basePath; this.inMemory = inMemory; + this.rocksDBManager = rocksDBManager; } @Override @@ -92,9 +97,8 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection { } @Override - public Mono getLuceneIndex(@Nullable String clusterName, - @Nullable String shardName, - int instancesCount, + public Mono getLuceneIndex(String clusterName, + LuceneIndexStructure indexStructure, IndicizerAnalyzers indicizerAnalyzers, IndicizerSimilarities indicizerSimilarities, LuceneOptions luceneOptions, @@ -102,32 +106,32 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection { return Mono .fromCallable(() -> { var env = this.env.get(); - if (clusterName == null && shardName == null) { - throw new IllegalArgumentException("Shard name and/or cluster name must be set"); + if (clusterName == null) { + throw new IllegalArgumentException("Cluster name must be set"); } - if (instancesCount != 1) { - if (shardName != null && !shardName.equals(clusterName)) { - throw new IllegalArgumentException("You shouldn't use a shard name for clustered instances"); - } + if (indexStructure.activeShards().size() != 1) { Objects.requireNonNull(env, "Environment not set"); return new LLLocalMultiLuceneIndex(env, meterRegistry, clusterName, - instancesCount, + indexStructure.activeShards(), + indexStructure.totalShards(), indicizerAnalyzers, indicizerSimilarities, luceneOptions, - luceneHacks + luceneHacks, + rocksDBManager ); } else { return new LLLocalLuceneIndex(env, meterRegistry, clusterName, - shardName, + indexStructure.activeShards().getInt(0), indicizerAnalyzers, indicizerSimilarities, luceneOptions, - luceneHacks + luceneHacks, + rocksDBManager ); } }) diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index d724b21..49d9f38 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -11,9 +11,6 @@ import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Timer; import io.net5.buffer.api.Send; -import it.cavallium.dbengine.client.IndicizerAnalyzers; -import it.cavallium.dbengine.client.IndicizerSimilarities; -import it.cavallium.dbengine.client.LuceneOptions; import it.cavallium.dbengine.client.query.QueryParser; import it.cavallium.dbengine.client.query.current.data.Query; import it.cavallium.dbengine.client.query.current.data.QueryParams; @@ -27,6 +24,7 @@ import it.cavallium.dbengine.database.LLUpdateDocument; import it.cavallium.dbengine.database.LLUpdateFields; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.lucene.LuceneHacks; +import it.cavallium.dbengine.lucene.LuceneRocksDBManager; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.collector.Buckets; import it.cavallium.dbengine.lucene.mlt.MoreLikeThisTransformer; @@ -35,13 +33,14 @@ import it.cavallium.dbengine.lucene.searcher.BucketParams; import it.cavallium.dbengine.lucene.searcher.DecimalBucketMultiSearcher; import it.cavallium.dbengine.lucene.searcher.LocalQueryParams; import it.cavallium.dbengine.lucene.searcher.LocalSearcher; +import it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers; +import it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities; +import it.cavallium.dbengine.rpc.current.data.LuceneOptions; import java.io.IOException; -import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; @@ -51,9 +50,6 @@ import org.apache.commons.lang3.time.StopWatch; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper; -import org.apache.lucene.codecs.Codec; -import org.apache.lucene.codecs.lucene90.Lucene90Codec; -import org.apache.lucene.codecs.lucene90.Lucene90Codec.Mode; import org.apache.lucene.index.ConcurrentMergeScheduler; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; @@ -64,7 +60,6 @@ import org.apache.lucene.index.SimpleMergedSegmentWarmer; import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.index.TieredMergePolicy; import org.apache.lucene.search.similarities.Similarity; -import org.apache.lucene.store.ByteBuffersDirectory; import org.apache.lucene.store.Directory; import org.apache.lucene.store.MMapDirectory; import org.apache.lucene.util.InfoStream; @@ -103,12 +98,13 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { private final Timer mergeTime; private final Timer refreshTime; - private final String luceneIndexName; + private final String shardName; private final IndexWriter indexWriter; private final SnapshotsManager snapshotsManager; private final IndexSearcherManager searcherManager; private final PerFieldAnalyzerWrapper luceneAnalyzer; private final Similarity luceneSimilarity; + private final LuceneRocksDBManager rocksDBManager; private final Directory directory; private final boolean lowMemory; @@ -117,19 +113,15 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { public LLLocalLuceneIndex(LLTempLMDBEnv env, MeterRegistry meterRegistry, - @Nullable String clusterName, - @Nullable String shardName, + @NotNull String clusterName, + int shardIndex, IndicizerAnalyzers indicizerAnalyzers, IndicizerSimilarities indicizerSimilarities, LuceneOptions luceneOptions, - @Nullable LuceneHacks luceneHacks) throws IOException { - if (clusterName == null && shardName == null) { - throw new IllegalArgumentException("Clustern name and/or shard name must be set"); - } - String logName = Objects.requireNonNullElse(clusterName, shardName); - String luceneIndexName = Objects.requireNonNullElse(shardName, clusterName); + @Nullable LuceneHacks luceneHacks, + @Nullable LuceneRocksDBManager rocksDBManager) throws IOException { - if (luceneIndexName.length() == 0) { + if (clusterName.isBlank()) { throw new IOException("Empty lucene database name"); } if (!MMapDirectory.UNMAP_SUPPORTED) { @@ -138,13 +130,16 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { logger.debug("Lucene MMap is supported"); } this.lowMemory = luceneOptions.lowMemory(); - this.directory = luceneOptions.directoryOptions().createLuceneDirectory(luceneIndexName); - boolean compressCodec = !luceneOptions.directoryOptions().isStorageCompressed(); + this.directory = LuceneUtils.createLuceneDirectory(luceneOptions.directoryOptions(), + LuceneUtils.getStandardName(clusterName, shardIndex), + rocksDBManager); + //boolean compressCodec = !luceneOptions.directoryOptions().isStorageCompressed(); - this.luceneIndexName = luceneIndexName; + this.shardName = clusterName; var snapshotter = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()); this.luceneAnalyzer = LuceneUtils.toPerFieldAnalyzerWrapper(indicizerAnalyzers); this.luceneSimilarity = LuceneUtils.toPerFieldSimilarityWrapper(indicizerSimilarities); + this.rocksDBManager = rocksDBManager; var useLMDB = luceneOptions.allowNonVolatileCollection(); var maxInMemoryResultEntries = luceneOptions.maxInMemoryResultEntries(); @@ -170,7 +165,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { var concurrentMergeScheduler = new ConcurrentMergeScheduler(); // false means SSD, true means HDD concurrentMergeScheduler.setDefaultMaxMergesAndThreads(false); - if (luceneOptions.directoryOptions().getManagedPath().isEmpty()) { + if (LuceneUtils.getManagedPath(luceneOptions.directoryOptions()).isEmpty()) { concurrentMergeScheduler.disableAutoIOThrottle(); } else { concurrentMergeScheduler.enableAutoIOThrottle(); @@ -200,14 +195,14 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { luceneOptions.queryRefreshDebounceTime() ); - this.startedDocIndexings = meterRegistry.counter("index.write.doc.started.counter", "index.name", logName); - this.endeddDocIndexings = meterRegistry.counter("index.write.doc.ended.counter", "index.name", logName); - this.docIndexingTime = Timer.builder("index.write.doc.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry); - this.snapshotTime = Timer.builder("index.write.snapshot.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry); - this.flushTime = Timer.builder("index.write.flush.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry); - this.commitTime = Timer.builder("index.write.commit.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry); - this.mergeTime = Timer.builder("index.write.merge.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry); - this.refreshTime = Timer.builder("index.search.refresh.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry); + this.startedDocIndexings = meterRegistry.counter("index.write.doc.started.counter", "index.name", clusterName); + this.endeddDocIndexings = meterRegistry.counter("index.write.doc.ended.counter", "index.name", clusterName); + this.docIndexingTime = Timer.builder("index.write.doc.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", clusterName).register(meterRegistry); + this.snapshotTime = Timer.builder("index.write.snapshot.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", clusterName).register(meterRegistry); + this.flushTime = Timer.builder("index.write.flush.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", clusterName).register(meterRegistry); + this.commitTime = Timer.builder("index.write.commit.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", clusterName).register(meterRegistry); + this.mergeTime = Timer.builder("index.write.merge.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", clusterName).register(meterRegistry); + this.refreshTime = Timer.builder("index.search.refresh.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", clusterName).register(meterRegistry); // Start scheduled tasks var commitMillis = luceneOptions.commitDebounceTime().toMillis(); @@ -221,7 +216,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { @Override public String getLuceneIndexName() { - return luceneIndexName; + return shardName; } @Override @@ -328,8 +323,10 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } @Override - public Mono updateDocuments(Mono> documents) { - return documents.flatMap(this::updateDocuments).then(); + public Mono updateDocuments(Flux> documents) { + return documents + .collectMap(Entry::getKey, Entry::getValue) + .flatMap(this::updateDocuments).then(); } private Mono updateDocuments(Map documentsMap) { @@ -367,7 +364,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } @Override - public Mono moreLikeThis(@Nullable LLSnapshot snapshot, + public Flux moreLikeThis(@Nullable LLSnapshot snapshot, QueryParams queryParams, @Nullable String keyFieldName, Multimap mltDocumentFieldsFlux) { @@ -377,18 +374,20 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { return localSearcher .collect(searcher, localQueryParams, keyFieldName, transformer) - .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close)); + .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close)) + .flux(); } @Override - public Mono search(@Nullable LLSnapshot snapshot, QueryParams queryParams, + public Flux search(@Nullable LLSnapshot snapshot, QueryParams queryParams, @Nullable String keyFieldName) { LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams, luceneAnalyzer); var searcher = searcherManager.retrieveSearcher(snapshot); return localSearcher .collect(searcher, localQueryParams, keyFieldName, NO_REWRITE) - .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close)); + .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close)) + .flux(); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index cad5e0e..c3b817b 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -5,9 +5,6 @@ import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterrupti import com.google.common.collect.Multimap; import io.micrometer.core.instrument.MeterRegistry; import io.net5.buffer.api.Send; -import it.cavallium.dbengine.client.IndicizerAnalyzers; -import it.cavallium.dbengine.client.IndicizerSimilarities; -import it.cavallium.dbengine.client.LuceneOptions; import it.cavallium.dbengine.client.query.QueryParser; import it.cavallium.dbengine.client.query.current.data.Query; import it.cavallium.dbengine.client.query.current.data.QueryParams; @@ -19,6 +16,7 @@ import it.cavallium.dbengine.database.LLTerm; import it.cavallium.dbengine.database.LLUpdateDocument; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.lucene.LuceneHacks; +import it.cavallium.dbengine.lucene.LuceneRocksDBManager; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.collector.Buckets; import it.cavallium.dbengine.lucene.mlt.MoreLikeThisTransformer; @@ -28,21 +26,25 @@ import it.cavallium.dbengine.lucene.searcher.DecimalBucketMultiSearcher; import it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite; import it.cavallium.dbengine.lucene.searcher.LocalQueryParams; import it.cavallium.dbengine.lucene.searcher.MultiSearcher; +import it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers; +import it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities; +import it.cavallium.dbengine.rpc.current.data.LuceneOptions; +import it.unimi.dsi.fastutil.ints.IntList; import java.io.Closeable; import java.io.IOException; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper; import org.apache.lucene.search.similarities.PerFieldSimilarityWrapper; -import org.apache.lucene.util.StringHelper; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; @@ -55,10 +57,15 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { LLUtils.initHooks(); } + private final String clusterName; + private final boolean lowMemory; private final MeterRegistry meterRegistry; - private final ConcurrentHashMap registeredSnapshots = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> registeredSnapshots = new ConcurrentHashMap<>(); private final AtomicLong nextSnapshotNumber = new AtomicLong(1); - private final LLLocalLuceneIndex[] luceneIndices; + private final LLLocalLuceneIndex[] luceneIndicesById; + private final List luceneIndicesSet; + private final int totalShards; + private final Flux luceneIndicesFlux; private final PerFieldAnalyzerWrapper luceneAnalyzer; private final PerFieldSimilarityWrapper luceneSimilarity; @@ -68,38 +75,49 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { public LLLocalMultiLuceneIndex(LLTempLMDBEnv env, MeterRegistry meterRegistry, String clusterName, - int instancesCount, + IntList activeShards, + int totalShards, IndicizerAnalyzers indicizerAnalyzers, IndicizerSimilarities indicizerSimilarities, LuceneOptions luceneOptions, - @Nullable LuceneHacks luceneHacks) throws IOException { + @Nullable LuceneHacks luceneHacks, + LuceneRocksDBManager rocksDBManager) throws IOException { - if (instancesCount <= 1 || instancesCount > 100) { - throw new IOException("Unsupported instances count: " + instancesCount); + if (totalShards <= 1 || totalShards > 100) { + throw new IOException("Unsupported instances count: " + totalShards); } this.meterRegistry = meterRegistry; - LLLocalLuceneIndex[] luceneIndices = new LLLocalLuceneIndex[instancesCount]; - for (int i = 0; i < instancesCount; i++) { - String shardName; - if (i == 0) { - shardName = clusterName; - } else { - shardName = clusterName + "_" + String.format("%03d", i); + LLLocalLuceneIndex[] luceneIndices = new LLLocalLuceneIndex[totalShards]; + for (int i = 0; i < totalShards; i++) { + if (!activeShards.contains(i)) { + continue; } luceneIndices[i] = new LLLocalLuceneIndex(env, meterRegistry, clusterName, - shardName, + i, indicizerAnalyzers, indicizerSimilarities, luceneOptions, - luceneHacks + luceneHacks, + rocksDBManager ); } - this.luceneIndices = luceneIndices; + this.clusterName = clusterName; + this.totalShards = totalShards; + this.luceneIndicesById = luceneIndices; + var luceneIndicesSet = new HashSet(); + for (var luceneIndex : luceneIndices) { + if (luceneIndex != null) { + luceneIndicesSet.add(luceneIndex); + } + } + this.luceneIndicesSet = new ArrayList<>(luceneIndicesSet); + this.luceneIndicesFlux = Flux.fromIterable(luceneIndicesSet); this.luceneAnalyzer = LuceneUtils.toPerFieldAnalyzerWrapper(indicizerAnalyzers); this.luceneSimilarity = LuceneUtils.toPerFieldSimilarityWrapper(indicizerSimilarities); + this.lowMemory = luceneOptions.lowMemory(); var useLMDB = luceneOptions.allowNonVolatileCollection(); var maxInMemoryResultEntries = luceneOptions.maxInMemoryResultEntries(); @@ -111,21 +129,16 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { } private LLLocalLuceneIndex getLuceneIndex(LLTerm id) { - return luceneIndices[getLuceneIndexId(id)]; - } - - private int getLuceneIndexId(LLTerm id) { - return Math.abs(StringHelper.murmurhash3_x86_32(id.getValueBytesRef(), 7) % luceneIndices.length); + return Objects.requireNonNull(luceneIndicesById[LuceneUtils.getLuceneIndexId(id, totalShards)]); } @Override public String getLuceneIndexName() { - return luceneIndices[0].getLuceneIndexName(); + return clusterName; } private Mono> getIndexSearchers(LLSnapshot snapshot) { - return Flux - .fromArray(luceneIndices) + return luceneIndicesFlux .index() // Resolve the snapshot of each shard .flatMap(tuple -> Mono @@ -141,38 +154,11 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { return getLuceneIndex(id).addDocument(id, doc); } - @SuppressWarnings({"unchecked"}) @Override public Mono addDocuments(Flux> documents) { return documents - .buffer(512) - .flatMap(inputEntries -> { - List>[] sortedEntries = new List[luceneIndices.length]; - Mono[] results = new Mono[luceneIndices.length]; - - // Sort entries - for(var inputEntry : inputEntries) { - int luceneIndexId = getLuceneIndexId(inputEntry.getKey()); - if (sortedEntries[luceneIndexId] == null) { - sortedEntries[luceneIndexId] = new ArrayList<>(); - } - sortedEntries[luceneIndexId].add(inputEntry); - } - - // Add documents - int luceneIndexId = 0; - for (List> docs : sortedEntries) { - if (docs != null && !docs.isEmpty()) { - LLLocalLuceneIndex luceneIndex = luceneIndices[luceneIndexId]; - results[luceneIndexId] = luceneIndex.addDocuments(Flux.fromIterable(docs)); - } else { - results[luceneIndexId] = Mono.empty(); - } - luceneIndexId++; - } - - return Mono.when(results); - }) + .groupBy(term -> getLuceneIndex(term.getKey())) + .flatMap(group -> group.key().addDocuments(group)) .then(); } @@ -187,35 +173,23 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { } @Override - public Mono updateDocuments(Mono> documents) { + public Mono updateDocuments(Flux> documents) { return documents - .flatMapMany(map -> { - var sortedMap = new HashMap>(); - map.forEach((key, value) -> sortedMap - .computeIfAbsent(getLuceneIndex(key), _unused -> new HashMap<>()) - .put(key, value) - ); - return Flux.fromIterable(Collections.unmodifiableMap(sortedMap).entrySet()); - }) - .flatMap(luceneIndexWithNewDocuments -> { - var luceneIndex = luceneIndexWithNewDocuments.getKey(); - var docs = luceneIndexWithNewDocuments.getValue(); - return luceneIndex.updateDocuments(Mono.just(docs)); - }) + .groupBy(term -> getLuceneIndex(term.getKey())) + .flatMap(group -> group.key().updateDocuments(group)) .then(); } @Override public Mono deleteAll() { - return Flux - .fromArray(luceneIndices) + return luceneIndicesFlux .flatMap(LLLocalLuceneIndex::deleteAll) .then(); } private LLSnapshot resolveSnapshot(LLSnapshot multiSnapshot, int instanceId) { if (multiSnapshot != null) { - return registeredSnapshots.get(multiSnapshot.getSequenceNumber())[instanceId]; + return registeredSnapshots.get(multiSnapshot.getSequenceNumber()).get(instanceId); } else { return null; } @@ -226,7 +200,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { } @Override - public Mono moreLikeThis(@Nullable LLSnapshot snapshot, + public Flux moreLikeThis(@Nullable LLSnapshot snapshot, QueryParams queryParams, String keyFieldName, Multimap mltDocumentFields) { @@ -238,11 +212,12 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { return multiSearcher .collectMulti(searchers, localQueryParams, keyFieldName, transformer) // Transform the result type - .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close)); + .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close)) + .flux(); } @Override - public Mono search(@Nullable LLSnapshot snapshot, + public Flux search(@Nullable LLSnapshot snapshot, QueryParams queryParams, @Nullable String keyFieldName) { LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams, luceneAnalyzer); @@ -252,7 +227,8 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { return multiSearcher .collectMulti(searchers, localQueryParams, keyFieldName, GlobalQueryRewrite.NO_REWRITE) // Transform the result type - .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close)); + .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close)) + .flux(); } @Override @@ -273,8 +249,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { @Override public Mono close() { - return Flux - .fromArray(luceneIndices) + return luceneIndicesFlux .flatMap(LLLocalLuceneIndex::close) .then(Mono.fromCallable(() -> { if (multiSearcher instanceof Closeable closeable) { @@ -289,16 +264,14 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { @Override public Mono flush() { - return Flux - .fromArray(luceneIndices) + return luceneIndicesFlux .flatMap(LLLocalLuceneIndex::flush) .then(); } @Override public Mono refresh(boolean force) { - return Flux - .fromArray(luceneIndices) + return luceneIndicesFlux .flatMap(index -> index.refresh(force)) .then(); } @@ -308,11 +281,9 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { return Mono // Generate next snapshot index .fromCallable(nextSnapshotNumber::getAndIncrement) - .flatMap(snapshotIndex -> Flux - .fromArray(luceneIndices) + .flatMap(snapshotIndex -> luceneIndicesFlux .flatMapSequential(LLLocalLuceneIndex::takeSnapshot) .collectList() - .map(list -> list.toArray(LLSnapshot[]::new)) .doOnNext(instancesSnapshotsArray -> registeredSnapshots.put(snapshotIndex, instancesSnapshotsArray)) .thenReturn(new LLSnapshot(snapshotIndex)) ); @@ -322,18 +293,18 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { public Mono releaseSnapshot(LLSnapshot snapshot) { return Mono .fromCallable(() -> registeredSnapshots.remove(snapshot.getSequenceNumber())) - .flatMapMany(Flux::fromArray) + .flatMapIterable(list -> list) .index() .flatMapSequential(tuple -> { int index = (int) (long) tuple.getT1(); LLSnapshot instanceSnapshot = tuple.getT2(); - return luceneIndices[index].releaseSnapshot(instanceSnapshot); + return luceneIndicesSet.get(index).releaseSnapshot(instanceSnapshot); }) .then(); } @Override public boolean isLowMemoryMode() { - return luceneIndices[0].isLowMemoryMode(); + return lowMemory; } } diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java index 26e13ca..7b8d4c8 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java @@ -2,20 +2,22 @@ package it.cavallium.dbengine.database.memory; import io.micrometer.core.instrument.MeterRegistry; import io.net5.buffer.api.BufferAllocator; -import it.cavallium.dbengine.client.IndicizerAnalyzers; -import it.cavallium.dbengine.client.IndicizerSimilarities; -import it.cavallium.dbengine.client.LuceneDirectoryOptions.ByteBuffersDirectory; -import it.cavallium.dbengine.client.LuceneOptions; -import it.cavallium.dbengine.client.LuceneOptionsBuilder; import it.cavallium.dbengine.database.LLDatabaseConnection; import it.cavallium.dbengine.database.LLKeyValueDatabase; import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.database.disk.LLLocalLuceneIndex; import it.cavallium.dbengine.database.disk.LLTempLMDBEnv; import it.cavallium.dbengine.lucene.LuceneHacks; +import it.cavallium.dbengine.lucene.LuceneRocksDBManager; import it.cavallium.dbengine.netty.JMXNettyMonitoringManager; +import it.cavallium.dbengine.rpc.current.data.ByteBuffersDirectory; import it.cavallium.dbengine.rpc.current.data.Column; import it.cavallium.dbengine.rpc.current.data.DatabaseOptions; +import it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers; +import it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities; +import it.cavallium.dbengine.rpc.current.data.LuceneIndexStructure; +import it.cavallium.dbengine.rpc.current.data.LuceneOptions; +import it.cavallium.dbengine.rpc.current.data.LuceneOptionsBuilder; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -80,15 +82,14 @@ public class LLMemoryDatabaseConnection implements LLDatabaseConnection { } @Override - public Mono getLuceneIndex(@Nullable String clusterName, - @Nullable String shardName, - int instancesCount, + public Mono getLuceneIndex(String clusterName, + LuceneIndexStructure indexStructure, IndicizerAnalyzers indicizerAnalyzers, IndicizerSimilarities indicizerSimilarities, - LuceneOptions inputLuceneOptions, + LuceneOptions luceneOptions, @Nullable LuceneHacks luceneHacks) { var memoryLuceneOptions = LuceneOptionsBuilder - .builder(inputLuceneOptions) + .builder(luceneOptions) .directoryOptions(new ByteBuffersDirectory()) .build(); return Mono @@ -97,11 +98,12 @@ public class LLMemoryDatabaseConnection implements LLDatabaseConnection { return new LLLocalLuceneIndex(env, meterRegistry, clusterName, - shardName, + 0, indicizerAnalyzers, indicizerSimilarities, memoryLuceneOptions, - luceneHacks + luceneHacks, + null ); }) .subscribeOn(Schedulers.boundedElastic()); diff --git a/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java b/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java index 3437ac3..b0dc4bf 100644 --- a/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java @@ -1,35 +1,49 @@ package it.cavallium.dbengine.database.remote; +import com.google.common.collect.Multimap; import io.micrometer.core.instrument.MeterRegistry; import io.net5.buffer.api.Buffer; import io.net5.buffer.api.BufferAllocator; import io.net5.buffer.api.Send; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.incubator.codec.quic.QuicSslContextBuilder; -import it.cavallium.dbengine.client.IndicizerAnalyzers; -import it.cavallium.dbengine.client.IndicizerSimilarities; -import it.cavallium.dbengine.client.LuceneOptions; import it.cavallium.dbengine.client.MemoryStats; +import it.cavallium.dbengine.client.query.current.data.Query; +import it.cavallium.dbengine.client.query.current.data.QueryParams; import it.cavallium.dbengine.database.LLDatabaseConnection; import it.cavallium.dbengine.database.LLDictionary; +import it.cavallium.dbengine.database.LLIndexRequest; import it.cavallium.dbengine.database.LLKeyValueDatabase; import it.cavallium.dbengine.database.LLLuceneIndex; +import it.cavallium.dbengine.database.LLSearchResultShard; import it.cavallium.dbengine.database.LLSingleton; import it.cavallium.dbengine.database.LLSnapshot; +import it.cavallium.dbengine.database.LLTerm; +import it.cavallium.dbengine.database.LLUpdateDocument; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.remote.RPCCodecs.RPCEventCodec; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.lucene.LuceneHacks; +import it.cavallium.dbengine.lucene.LuceneRocksDBManager; +import it.cavallium.dbengine.lucene.collector.Buckets; +import it.cavallium.dbengine.lucene.searcher.BucketParams; import it.cavallium.dbengine.rpc.current.data.BinaryOptional; import it.cavallium.dbengine.rpc.current.data.ClientBoundRequest; import it.cavallium.dbengine.rpc.current.data.ClientBoundResponse; +import it.cavallium.dbengine.rpc.current.data.CloseDatabase; +import it.cavallium.dbengine.rpc.current.data.CloseLuceneIndex; import it.cavallium.dbengine.rpc.current.data.Column; import it.cavallium.dbengine.rpc.current.data.DatabaseOptions; import it.cavallium.dbengine.rpc.current.data.GeneratedEntityId; import it.cavallium.dbengine.rpc.current.data.GetDatabase; +import it.cavallium.dbengine.rpc.current.data.GetLuceneIndex; import it.cavallium.dbengine.rpc.current.data.GetSingleton; +import it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers; +import it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities; +import it.cavallium.dbengine.rpc.current.data.LuceneIndexStructure; +import it.cavallium.dbengine.rpc.current.data.LuceneOptions; import it.cavallium.dbengine.rpc.current.data.RPCEvent; import it.cavallium.dbengine.rpc.current.data.ServerBoundRequest; import it.cavallium.dbengine.rpc.current.data.ServerBoundResponse; @@ -44,13 +58,13 @@ import java.io.File; import java.net.SocketAddress; import java.time.Duration; import java.util.List; +import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; -import java.util.logging.Level; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.Sinks; -import reactor.core.publisher.Sinks.Empty; import reactor.netty.incubator.quic.QuicClient; import reactor.netty.incubator.quic.QuicConnection; @@ -144,6 +158,13 @@ public class LLQuicConnection implements LLDatabaseConnection { ).map(event -> (T) event); } + private Mono sendEvent(ServerBoundRequest serverBoundRequest) { + return QuicUtils.sendSimpleEvent(quicConnection, + RPCEventCodec::new, + serverBoundRequest + ); + } + private Mono sendUpdateRequest(ServerBoundRequest serverBoundReq, Function updaterFunction) { return Mono.empty(); @@ -293,22 +314,22 @@ public class LLQuicConnection implements LLDatabaseConnection { @Override public BufferAllocator getAllocator() { - return null; + return allocator; } @Override public MeterRegistry getMeterRegistry() { - return null; + return meterRegistry; } @Override public Mono close() { - return null; + return sendRequest(new CloseDatabase(id)).then(); } @Override public String getDatabaseName() { - return null; + return databaseName; } @Override @@ -333,19 +354,109 @@ public class LLQuicConnection implements LLDatabaseConnection { } @Override - public Mono getLuceneIndex(@Nullable String clusterName, - @Nullable String shardName, - int instancesCount, + public Mono getLuceneIndex(String clusterName, + LuceneIndexStructure indexStructure, IndicizerAnalyzers indicizerAnalyzers, IndicizerSimilarities indicizerSimilarities, LuceneOptions luceneOptions, @Nullable LuceneHacks luceneHacks) { - return null; + return sendRequest(new GetLuceneIndex(clusterName, indexStructure, indicizerAnalyzers, indicizerSimilarities, luceneOptions)) + .cast(GeneratedEntityId.class) + .map(GeneratedEntityId::id) + .map(id -> new LLLuceneIndex() { + @Override + public String getLuceneIndexName() { + return clusterName; + } + + @Override + public Mono addDocument(LLTerm id, LLUpdateDocument doc) { + return null; + } + + @Override + public Mono addDocuments(Flux> documents) { + return null; + } + + @Override + public Mono deleteDocument(LLTerm id) { + return null; + } + + @Override + public Mono update(LLTerm id, LLIndexRequest request) { + return null; + } + + @Override + public Mono updateDocuments(Flux> documents) { + return null; + } + + @Override + public Mono deleteAll() { + return null; + } + + @Override + public Flux moreLikeThis(@Nullable LLSnapshot snapshot, + QueryParams queryParams, + @Nullable String keyFieldName, + Multimap mltDocumentFields) { + return null; + } + + @Override + public Flux search(@Nullable LLSnapshot snapshot, + QueryParams queryParams, + @Nullable String keyFieldName) { + return null; + } + + @Override + public Mono computeBuckets(@Nullable LLSnapshot snapshot, + @NotNull List queries, + @Nullable Query normalizationQuery, + BucketParams bucketParams) { + return null; + } + + @Override + public boolean isLowMemoryMode() { + return false; + } + + @Override + public Mono close() { + return sendRequest(new CloseLuceneIndex(id)).then(); + } + + @Override + public Mono flush() { + return null; + } + + @Override + public Mono refresh(boolean force) { + return null; + } + + @Override + public Mono takeSnapshot() { + return null; + } + + @Override + public Mono releaseSnapshot(LLSnapshot snapshot) { + return null; + } + }); } @Override public Mono disconnect() { - return sendDisconnect().then(quicConnection.onDispose().timeout(Duration.ofMinutes(1))); + return sendDisconnect().then(Mono.fromRunnable(() -> quicConnection.dispose())).then(quicConnection.onDispose()); } private Mono sendDisconnect() { diff --git a/src/main/java/it/cavallium/dbengine/database/remote/QuicUtils.java b/src/main/java/it/cavallium/dbengine/database/remote/QuicUtils.java index 6640837..7c351f1 100644 --- a/src/main/java/it/cavallium/dbengine/database/remote/QuicUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/remote/QuicUtils.java @@ -10,6 +10,8 @@ import java.nio.charset.StandardCharsets; import java.util.function.Function; import java.util.function.Supplier; import java.util.logging.Level; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -38,7 +40,7 @@ public class QuicUtils { public record QuicStream(NettyInbound in, NettyOutbound out) {} - public static Mono catchRPCErrors(Throwable error) { + public static Mono catchRPCErrors(@NotNull Throwable error) { return Mono.just(new RPCCrash(500, NullableString.ofNullableBlank(error.getMessage()))); } @@ -65,22 +67,27 @@ public class QuicUtils { */ @SuppressWarnings("unchecked") public static Mono> createMappedStream( - QuicConnection quicConnection, - Supplier> sendCodec, - Supplier> recvCodec) { + @NotNull QuicConnection quicConnection, + @NotNull Supplier> sendCodec, + @Nullable Supplier> recvCodec) { return Mono.defer(() -> { Empty streamTerminator = Sinks.empty(); return QuicUtils .createStream(quicConnection, streamTerminator.asMono()) .map(stream -> { - Flux inConn = Flux.defer(() -> (Flux) stream - .in() - .withConnection(conn -> conn.addHandler(recvCodec.get())) - .receiveObject() - .log("ClientBoundEvent", Level.FINEST) - ) - .publish(1) - .refCount(); + Flux inConn; + if (recvCodec == null) { + inConn = Flux.error(() -> new UnsupportedOperationException("Receiving responses is supported")); + } else { + inConn = Flux.defer(() -> (Flux) stream + .in() + .withConnection(conn -> conn.addHandler(recvCodec.get())) + .receiveObject() + .log("ClientBoundEvent", Level.FINEST) + ) + .publish(1) + .refCount(); + } return new MappedStream<>(stream.out, sendCodec, inConn, streamTerminator); }) .single(); @@ -104,9 +111,35 @@ public class QuicUtils { .then(recv) .doFinally(s -> stream.close()); }) + .map(QuicUtils::mapErrors) .switchIfEmpty((Mono) NO_RESPONSE_ERROR); } + /** + * Send a single request, receive a single response + */ + + public static Mono sendSimpleEvent(QuicConnection quicConnection, + Supplier> sendCodec, + SEND req) { + return QuicUtils + .createMappedStream(quicConnection, sendCodec, null) + .flatMap(stream -> { + var send = stream.send(req).log("ServerBoundEvent", Level.FINEST); + return send.doFinally(s -> stream.close()); + }) + .map(QuicUtils::mapErrors) + .then(); + } + + private static R mapErrors(R value) { + if (value instanceof RPCCrash crash) { + throw new RPCException(crash.code(), crash.message().orElse(null)); + } else { + return value; + } + } + /** * Send n requests, receive n responses */ @@ -129,6 +162,7 @@ public class QuicUtils { .zip(sends, receives, QuicUtils::extractResponse) .doFinally(s -> stream.close()); }) + .map(QuicUtils::mapErrors) .log("ServerBoundEvent", Level.FINEST); } @@ -183,6 +217,7 @@ public class QuicUtils { .merge(firstRequest, firstResponse.then(Mono.empty()), secondRequest, secondResponse) .doFinally(s -> stream.close()); }) + .map(QuicUtils::mapErrors) .singleOrEmpty(); } } diff --git a/src/main/java/it/cavallium/dbengine/database/remote/RPCException.java b/src/main/java/it/cavallium/dbengine/database/remote/RPCException.java new file mode 100644 index 0000000..559941c --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/remote/RPCException.java @@ -0,0 +1,10 @@ +package it.cavallium.dbengine.database.remote; + +import org.jetbrains.annotations.Nullable; + +public class RPCException extends RuntimeException { + + public RPCException(int code, @Nullable String message) { + super("RPC error " + code + (message != null ? (": " + message) : "")); + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/remote/String2FieldSimilarityMapSerializer.java b/src/main/java/it/cavallium/dbengine/database/remote/String2FieldSimilarityMapSerializer.java new file mode 100644 index 0000000..0d40f98 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/remote/String2FieldSimilarityMapSerializer.java @@ -0,0 +1,37 @@ +package it.cavallium.dbengine.database.remote; + +import it.cavallium.data.generator.DataSerializer; +import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import org.jetbrains.annotations.NotNull; + +public class String2FieldSimilarityMapSerializer implements DataSerializer> { + + private static final TextFieldsSimilaritySerializer TEXT_FIELDS_SIMILARITY_SERIALIZER = new TextFieldsSimilaritySerializer(); + + @Override + public void serialize(DataOutput dataOutput, @NotNull Map stringTextFieldsSimilarityMap) + throws IOException { + dataOutput.writeInt(stringTextFieldsSimilarityMap.size()); + for (Entry entry : stringTextFieldsSimilarityMap.entrySet()) { + dataOutput.writeUTF(entry.getKey()); + TEXT_FIELDS_SIMILARITY_SERIALIZER.serialize(dataOutput, entry.getValue()); + } + } + + @Override + public @NotNull Map deserialize(DataInput dataInput) throws IOException { + var size = dataInput.readInt(); + var result = new HashMap(size); + for (int i = 0; i < size; i++) { + result.put(dataInput.readUTF(), TEXT_FIELDS_SIMILARITY_SERIALIZER.deserialize(dataInput)); + } + return Collections.unmodifiableMap(result); + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneRocksDBManager.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneRocksDBManager.java new file mode 100644 index 0000000..47ff0bb --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneRocksDBManager.java @@ -0,0 +1,45 @@ +package it.cavallium.dbengine.lucene; + +import it.cavallium.dbengine.lucene.directory.RocksDBInstance; +import it.cavallium.dbengine.lucene.directory.RocksdbFileStore; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class LuceneRocksDBManager { + + private static final Logger LOG = LogManager.getLogger(LuceneRocksDBManager.class); + private final List> dbs = new ArrayList<>(); + + public synchronized RocksDBInstance getOrCreate(Path path) { + try { + for (var entry : dbs) { + if (Files.isSameFile(path, entry.getKey())) { + return entry.getValue(); + } + } + RocksDBInstance db = RocksdbFileStore.createEmpty(path); + dbs.add(Map.entry(path, db)); + return db; + } catch (IOException ex) { + throw new UnsupportedOperationException("Can't load RocksDB database at path: " + path, ex); + } + } + + public synchronized void closeAll() { + for (Entry db : dbs) { + try { + db.getValue().db().closeE(); + } catch (Throwable ex) { + LOG.error("Failed to close lucene RocksDB database", ex); + } + } + dbs.clear(); + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java index b593c77..9bd0d20 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java @@ -5,12 +5,11 @@ import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterrupti import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import it.cavallium.dbengine.client.CompositeSnapshot; -import it.cavallium.dbengine.client.IndicizerAnalyzers; -import it.cavallium.dbengine.client.IndicizerSimilarities; import it.cavallium.dbengine.client.query.QueryParser; import it.cavallium.dbengine.client.query.current.data.QueryParams; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.database.LLKeyScore; +import it.cavallium.dbengine.database.LLTerm; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.collections.DatabaseMapDictionary; import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep; @@ -22,15 +21,27 @@ import it.cavallium.dbengine.lucene.analyzer.NCharGramEdgeAnalyzer; import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer; import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity; import it.cavallium.dbengine.lucene.analyzer.WordAnalyzer; +import it.cavallium.dbengine.lucene.directory.RocksdbDirectory; import it.cavallium.dbengine.lucene.mlt.BigCompositeReader; import it.cavallium.dbengine.lucene.mlt.MultiMoreLikeThis; import it.cavallium.dbengine.lucene.searcher.LocalQueryParams; import it.cavallium.dbengine.lucene.similarity.NGramSimilarity; +import it.cavallium.dbengine.rpc.current.data.ByteBuffersDirectory; +import it.cavallium.dbengine.rpc.current.data.DirectIOFSDirectory; +import it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers; +import it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities; +import it.cavallium.dbengine.rpc.current.data.LuceneDirectoryOptions; +import it.cavallium.dbengine.rpc.current.data.MemoryMappedFSDirectory; +import it.cavallium.dbengine.rpc.current.data.NIOFSDirectory; +import it.cavallium.dbengine.rpc.current.data.NRTCachingDirectory; +import it.cavallium.dbengine.rpc.current.data.RocksDBSharedDirectory; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap; import java.io.EOFException; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; @@ -39,6 +50,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; +import java.util.Optional; import java.util.stream.Collectors; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -51,7 +63,7 @@ import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexableField; -import org.apache.lucene.queries.mlt.MoreLikeThisQuery; +import org.apache.lucene.misc.store.DirectIODirectory; import org.apache.lucene.search.BooleanClause.Occur; import org.apache.lucene.search.BooleanQuery.Builder; import org.apache.lucene.search.Collector; @@ -71,7 +83,11 @@ import org.apache.lucene.search.similarities.ClassicSimilarity; import org.apache.lucene.search.similarities.PerFieldSimilarityWrapper; import org.apache.lucene.search.similarities.Similarity; import org.apache.lucene.search.similarities.TFIDFSimilarity; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.Constants; +import org.apache.lucene.util.StringHelper; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.novasearch.lucene.search.similarities.BM25Similarity; @@ -544,4 +560,78 @@ public class LuceneUtils { public static Collector withTimeout(Collector collector, Duration timeout) { return new TimeLimitingCollector(collector, TimeLimitingCollector.getGlobalCounter(), timeout.toMillis()); } + + public static String getStandardName(String clusterName, int shardIndex) { + return clusterName + "-shard" + shardIndex; + } + + public static int getLuceneIndexId(LLTerm id, int totalShards) { + return Math.abs(StringHelper.murmurhash3_x86_32(id.getValueBytesRef(), 7) % totalShards); + } + + public static Directory createLuceneDirectory(LuceneDirectoryOptions directoryOptions, + String directoryName, + LuceneRocksDBManager rocksDBManager) + throws IOException { + if (directoryOptions instanceof ByteBuffersDirectory) { + return new org.apache.lucene.store.ByteBuffersDirectory(); + } else if (directoryOptions instanceof DirectIOFSDirectory directIOFSDirectory) { + FSDirectory delegateDirectory = (FSDirectory) createLuceneDirectory(directIOFSDirectory.delegate(), + directoryName, + rocksDBManager + ); + if (Constants.LINUX || Constants.MAC_OS_X) { + try { + int mergeBufferSize = directIOFSDirectory.mergeBufferSize().orElse(DirectIODirectory.DEFAULT_MERGE_BUFFER_SIZE); + long minBytesDirect = directIOFSDirectory.minBytesDirect().orElse(DirectIODirectory.DEFAULT_MIN_BYTES_DIRECT); + return new DirectIODirectory(delegateDirectory, mergeBufferSize, minBytesDirect); + } catch (UnsupportedOperationException ex) { + logger.warn("Failed to open FSDirectory with DIRECT flag", ex); + return delegateDirectory; + } + } else { + logger.warn("Failed to open FSDirectory with DIRECT flag because the operating system is Windows"); + return delegateDirectory; + } + } else if (directoryOptions instanceof MemoryMappedFSDirectory memoryMappedFSDirectory) { + return FSDirectory.open(memoryMappedFSDirectory.managedPath().resolve(directoryName + ".lucene.db")); + } else if (directoryOptions instanceof NIOFSDirectory niofsDirectory) { + return org.apache.lucene.store.NIOFSDirectory.open(niofsDirectory + .managedPath() + .resolve(directoryName + ".lucene.db")); + } else if (directoryOptions instanceof NRTCachingDirectory nrtCachingDirectory) { + var delegateDirectory = createLuceneDirectory(nrtCachingDirectory.delegate(), directoryName, rocksDBManager); + return new org.apache.lucene.store.NRTCachingDirectory(delegateDirectory, + nrtCachingDirectory.maxMergeSizeBytes() / 1024D / 1024D, + nrtCachingDirectory.maxCachedBytes() / 1024D / 1024D + ); + } else if (directoryOptions instanceof RocksDBSharedDirectory rocksDBSharedDirectory) { + var dbInstance = rocksDBManager.getOrCreate(rocksDBSharedDirectory.managedPath()); + return new RocksdbDirectory(dbInstance.db(), + dbInstance.handles(), + directoryName, + rocksDBSharedDirectory.blockSize() + ); + } else { + throw new UnsupportedOperationException("Unsupported directory: " + directoryName + ", " + directoryOptions); + } + } + + public static Optional getManagedPath(LuceneDirectoryOptions directoryOptions) { + if (directoryOptions instanceof ByteBuffersDirectory) { + return Optional.empty(); + } else if (directoryOptions instanceof DirectIOFSDirectory directIOFSDirectory) { + return getManagedPath(directIOFSDirectory.delegate()); + } else if (directoryOptions instanceof MemoryMappedFSDirectory memoryMappedFSDirectory) { + return Optional.of(memoryMappedFSDirectory.managedPath()); + } else if (directoryOptions instanceof NIOFSDirectory niofsDirectory) { + return Optional.of(niofsDirectory.managedPath()); + } else if (directoryOptions instanceof NRTCachingDirectory nrtCachingDirectory) { + return getManagedPath(nrtCachingDirectory.delegate()); + } else if (directoryOptions instanceof RocksDBSharedDirectory rocksDBSharedDirectory) { + return Optional.of(rocksDBSharedDirectory.managedPath()); + } else { + throw new UnsupportedOperationException("Unsupported directory: " + directoryOptions); + } + } } diff --git a/src/test/java/it/cavallium/dbengine/LocalTemporaryDbGenerator.java b/src/test/java/it/cavallium/dbengine/LocalTemporaryDbGenerator.java index bc30e74..2ad028b 100644 --- a/src/test/java/it/cavallium/dbengine/LocalTemporaryDbGenerator.java +++ b/src/test/java/it/cavallium/dbengine/LocalTemporaryDbGenerator.java @@ -11,16 +11,17 @@ import it.cavallium.dbengine.DbTestUtils.TempDb; import it.cavallium.dbengine.DbTestUtils.TestAllocator; import it.cavallium.dbengine.client.IndicizerAnalyzers; import it.cavallium.dbengine.client.IndicizerSimilarities; -import it.cavallium.dbengine.client.LuceneDirectoryOptions; -import it.cavallium.dbengine.client.LuceneDirectoryOptions.ByteBuffersDirectory; -import it.cavallium.dbengine.client.LuceneOptions; import it.cavallium.dbengine.database.ColumnUtils; import it.cavallium.dbengine.database.LLKeyValueDatabase; import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection; import it.cavallium.dbengine.lucene.LuceneHacks; import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer; import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity; +import it.cavallium.dbengine.rpc.current.data.ByteBuffersDirectory; import it.cavallium.dbengine.rpc.current.data.DatabaseOptions; +import it.cavallium.dbengine.rpc.current.data.LuceneIndexStructure; +import it.cavallium.dbengine.rpc.current.data.LuceneOptions; +import it.unimi.dsi.fastutil.ints.IntList; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -28,7 +29,6 @@ import java.time.Duration; import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.CompletionException; import java.util.concurrent.atomic.AtomicInteger; import reactor.core.publisher.Mono; @@ -70,7 +70,12 @@ public class LocalTemporaryDbGenerator implements TemporaryDbGenerator { return null; }) .subscribeOn(Schedulers.boundedElastic()) - .then(new LLLocalDatabaseConnection(allocator.allocator(), new SimpleMeterRegistry(), wrkspcPath, true).connect()) + .then(new LLLocalDatabaseConnection(allocator.allocator(), + new SimpleMeterRegistry(), + wrkspcPath, + true, + null + ).connect()) .flatMap(conn -> { SwappableLuceneSearcher searcher = new SwappableLuceneSearcher(); var luceneHacks = new LuceneHacks(() -> searcher, () -> searcher); @@ -91,17 +96,15 @@ public class LocalTemporaryDbGenerator implements TemporaryDbGenerator { Nullableboolean.empty() ) ), - conn.getLuceneIndex(null, - "testluceneindex1", - 1, + conn.getLuceneIndex("testluceneindex1", + new LuceneIndexStructure(1, IntList.of(0)), IndicizerAnalyzers.of(TextFieldsAnalyzer.ICUCollationKey), IndicizerSimilarities.of(TextFieldsSimilarity.Boolean), LUCENE_OPTS, luceneHacks ), conn.getLuceneIndex("testluceneindex16", - null, - 3, + new LuceneIndexStructure(3, IntList.of(0, 1, 2)), IndicizerAnalyzers.of(TextFieldsAnalyzer.ICUCollationKey), IndicizerSimilarities.of(TextFieldsSimilarity.Boolean), LUCENE_OPTS, diff --git a/src/test/java/it/cavallium/dbengine/MemoryTemporaryDbGenerator.java b/src/test/java/it/cavallium/dbengine/MemoryTemporaryDbGenerator.java index 56ebf47..c98a2f6 100644 --- a/src/test/java/it/cavallium/dbengine/MemoryTemporaryDbGenerator.java +++ b/src/test/java/it/cavallium/dbengine/MemoryTemporaryDbGenerator.java @@ -10,14 +10,16 @@ import it.cavallium.dbengine.DbTestUtils.TempDb; import it.cavallium.dbengine.DbTestUtils.TestAllocator; import it.cavallium.dbengine.client.IndicizerAnalyzers; import it.cavallium.dbengine.client.IndicizerSimilarities; -import it.cavallium.dbengine.client.LuceneDirectoryOptions.ByteBuffersDirectory; -import it.cavallium.dbengine.client.LuceneOptions; import it.cavallium.dbengine.database.ColumnUtils; import it.cavallium.dbengine.database.memory.LLMemoryDatabaseConnection; import it.cavallium.dbengine.lucene.LuceneHacks; import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer; import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity; +import it.cavallium.dbengine.rpc.current.data.ByteBuffersDirectory; import it.cavallium.dbengine.rpc.current.data.DatabaseOptions; +import it.cavallium.dbengine.rpc.current.data.LuceneIndexStructure; +import it.cavallium.dbengine.rpc.current.data.LuceneOptions; +import it.unimi.dsi.fastutil.ints.IntList; import java.time.Duration; import java.util.List; import java.util.Map; @@ -63,17 +65,15 @@ public class MemoryTemporaryDbGenerator implements TemporaryDbGenerator { Nullableboolean.empty() ) ), - conn.getLuceneIndex(null, - "testluceneindex1", - 1, + conn.getLuceneIndex("testluceneindex1", + new LuceneIndexStructure(1, IntList.of(0)), IndicizerAnalyzers.of(TextFieldsAnalyzer.ICUCollationKey), IndicizerSimilarities.of(TextFieldsSimilarity.Boolean), LUCENE_OPTS, luceneHacks ), conn.getLuceneIndex("testluceneindex16", - null, - 3, + new LuceneIndexStructure(3, IntList.of(0, 1, 2)), IndicizerAnalyzers.of(TextFieldsAnalyzer.ICUCollationKey), IndicizerSimilarities.of(TextFieldsSimilarity.Boolean), LUCENE_OPTS, diff --git a/src/test/java/it/cavallium/dbengine/StringIndicizer.java b/src/test/java/it/cavallium/dbengine/StringIndicizer.java index 551afe4..faba88e 100644 --- a/src/test/java/it/cavallium/dbengine/StringIndicizer.java +++ b/src/test/java/it/cavallium/dbengine/StringIndicizer.java @@ -3,13 +3,13 @@ package it.cavallium.dbengine; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; import it.cavallium.dbengine.client.Indicizer; -import it.cavallium.dbengine.client.IndicizerAnalyzers; -import it.cavallium.dbengine.client.IndicizerSimilarities; import it.cavallium.dbengine.database.LLUpdateDocument; import it.cavallium.dbengine.database.LLItem; import it.cavallium.dbengine.database.LLTerm; import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer; import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity; +import it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers; +import it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities; import java.util.LinkedList; import org.apache.lucene.document.Field; import org.apache.lucene.document.Field.Store; @@ -58,11 +58,11 @@ public class StringIndicizer extends Indicizer { @Override public IndicizerAnalyzers getPerFieldAnalyzer() { - return IndicizerAnalyzers.of(TextFieldsAnalyzer.ICUCollationKey); + return it.cavallium.dbengine.client.IndicizerAnalyzers.of(TextFieldsAnalyzer.ICUCollationKey); } @Override public IndicizerSimilarities getPerFieldSimilarity() { - return IndicizerSimilarities.of(TextFieldsSimilarity.BM25Standard); + return it.cavallium.dbengine.client.IndicizerSimilarities.of(TextFieldsSimilarity.BM25Standard); } }