2020-12-07 22:15:18 +01:00
|
|
|
package it.cavallium.dbengine.database.disk;
|
|
|
|
|
2021-12-16 16:14:44 +01:00
|
|
|
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
|
|
|
|
|
2022-01-28 19:31:25 +01:00
|
|
|
import com.google.common.collect.Multimap;
|
2021-10-30 11:13:46 +02:00
|
|
|
import io.micrometer.core.instrument.MeterRegistry;
|
2022-03-16 13:47:56 +01:00
|
|
|
import io.netty5.buffer.api.Send;
|
2021-11-19 19:03:31 +01:00
|
|
|
import it.cavallium.dbengine.client.query.QueryParser;
|
|
|
|
import it.cavallium.dbengine.client.query.current.data.Query;
|
2021-03-02 01:53:36 +01:00
|
|
|
import it.cavallium.dbengine.client.query.current.data.QueryParams;
|
2021-11-07 17:46:40 +01:00
|
|
|
import it.cavallium.dbengine.database.LLIndexRequest;
|
2021-01-24 03:15:05 +01:00
|
|
|
import it.cavallium.dbengine.database.LLLuceneIndex;
|
2021-03-27 03:35:27 +01:00
|
|
|
import it.cavallium.dbengine.database.LLSearchResultShard;
|
2021-01-24 03:15:05 +01:00
|
|
|
import it.cavallium.dbengine.database.LLSnapshot;
|
|
|
|
import it.cavallium.dbengine.database.LLTerm;
|
2022-01-28 19:31:25 +01:00
|
|
|
import it.cavallium.dbengine.database.LLUpdateDocument;
|
2022-01-26 14:22:54 +01:00
|
|
|
import it.cavallium.dbengine.database.LLUtils;
|
2021-10-13 12:25:32 +02:00
|
|
|
import it.cavallium.dbengine.lucene.LuceneHacks;
|
2022-03-05 15:46:40 +01:00
|
|
|
import it.cavallium.dbengine.lucene.LuceneRocksDBManager;
|
2021-07-06 01:30:37 +02:00
|
|
|
import it.cavallium.dbengine.lucene.LuceneUtils;
|
2021-11-19 19:03:31 +01:00
|
|
|
import it.cavallium.dbengine.lucene.collector.Buckets;
|
2022-01-28 19:31:25 +01:00
|
|
|
import it.cavallium.dbengine.lucene.mlt.MoreLikeThisTransformer;
|
2021-10-13 12:25:32 +02:00
|
|
|
import it.cavallium.dbengine.lucene.searcher.AdaptiveMultiSearcher;
|
2021-11-18 17:13:53 +01:00
|
|
|
import it.cavallium.dbengine.lucene.searcher.BucketParams;
|
|
|
|
import it.cavallium.dbengine.lucene.searcher.DecimalBucketMultiSearcher;
|
2022-01-28 21:12:10 +01:00
|
|
|
import it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite;
|
2021-07-06 01:30:37 +02:00
|
|
|
import it.cavallium.dbengine.lucene.searcher.LocalQueryParams;
|
2021-10-13 12:25:32 +02:00
|
|
|
import it.cavallium.dbengine.lucene.searcher.MultiSearcher;
|
2022-03-05 15:46:40 +01:00
|
|
|
import it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers;
|
|
|
|
import it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities;
|
|
|
|
import it.cavallium.dbengine.rpc.current.data.LuceneOptions;
|
|
|
|
import it.unimi.dsi.fastutil.ints.IntList;
|
2021-10-13 00:23:56 +02:00
|
|
|
import java.io.Closeable;
|
2020-12-07 22:15:18 +01:00
|
|
|
import java.io.IOException;
|
2021-05-11 21:59:05 +02:00
|
|
|
import java.util.ArrayList;
|
2022-03-05 15:46:40 +01:00
|
|
|
import java.util.HashSet;
|
2021-05-28 16:04:59 +02:00
|
|
|
import java.util.List;
|
2021-05-11 21:59:05 +02:00
|
|
|
import java.util.Map;
|
2021-05-28 16:04:59 +02:00
|
|
|
import java.util.Map.Entry;
|
2022-03-05 15:46:40 +01:00
|
|
|
import java.util.Objects;
|
2021-02-03 13:48:30 +01:00
|
|
|
import java.util.Optional;
|
2021-02-04 22:42:57 +01:00
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
2020-12-07 22:15:18 +01:00
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
2022-03-19 00:08:23 +01:00
|
|
|
import java.util.logging.Level;
|
2022-03-13 11:01:51 +01:00
|
|
|
import java.util.stream.Collectors;
|
|
|
|
import org.apache.logging.log4j.LogManager;
|
|
|
|
import org.apache.logging.log4j.Logger;
|
2021-09-20 11:52:21 +02:00
|
|
|
import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper;
|
|
|
|
import org.apache.lucene.search.similarities.PerFieldSimilarityWrapper;
|
2021-11-19 19:03:31 +01:00
|
|
|
import org.jetbrains.annotations.NotNull;
|
2020-12-07 22:15:18 +01:00
|
|
|
import org.jetbrains.annotations.Nullable;
|
|
|
|
import reactor.core.publisher.Flux;
|
|
|
|
import reactor.core.publisher.Mono;
|
2022-03-19 00:08:23 +01:00
|
|
|
import reactor.core.publisher.SignalType;
|
2021-10-13 00:23:56 +02:00
|
|
|
import reactor.core.scheduler.Schedulers;
|
2020-12-07 22:15:18 +01:00
|
|
|
|
|
|
|
public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
|
|
|
|
|
2022-03-13 11:01:51 +01:00
|
|
|
private static final Logger LOG = LogManager.getLogger(LLLuceneIndex.class);
|
|
|
|
private static final boolean BYPASS_GROUPBY_BUG = Boolean.parseBoolean(System.getProperty(
|
|
|
|
"it.cavallium.dbengine.bypassGroupByBug",
|
|
|
|
"false"
|
|
|
|
));
|
|
|
|
|
2022-01-26 14:22:54 +01:00
|
|
|
static {
|
|
|
|
LLUtils.initHooks();
|
|
|
|
}
|
|
|
|
|
2022-03-05 15:46:40 +01:00
|
|
|
private final String clusterName;
|
|
|
|
private final boolean lowMemory;
|
2021-10-30 11:13:46 +02:00
|
|
|
private final MeterRegistry meterRegistry;
|
2022-03-05 15:46:40 +01:00
|
|
|
private final ConcurrentHashMap<Long, List<LLSnapshot>> registeredSnapshots = new ConcurrentHashMap<>();
|
2020-12-07 22:15:18 +01:00
|
|
|
private final AtomicLong nextSnapshotNumber = new AtomicLong(1);
|
2022-03-05 15:46:40 +01:00
|
|
|
private final LLLocalLuceneIndex[] luceneIndicesById;
|
|
|
|
private final List<LLLocalLuceneIndex> luceneIndicesSet;
|
|
|
|
private final int totalShards;
|
|
|
|
private final Flux<LLLocalLuceneIndex> luceneIndicesFlux;
|
2021-09-20 11:52:21 +02:00
|
|
|
private final PerFieldAnalyzerWrapper luceneAnalyzer;
|
|
|
|
private final PerFieldSimilarityWrapper luceneSimilarity;
|
2021-06-25 20:07:19 +02:00
|
|
|
|
2021-10-13 12:25:32 +02:00
|
|
|
private final MultiSearcher multiSearcher;
|
2021-11-18 17:13:53 +01:00
|
|
|
private final DecimalBucketMultiSearcher decimalBucketMultiSearcher = new DecimalBucketMultiSearcher();
|
2021-06-25 20:07:19 +02:00
|
|
|
|
2022-04-06 02:41:32 +02:00
|
|
|
public LLLocalMultiLuceneIndex(LLTempHugePqEnv env,
|
2021-10-30 11:13:46 +02:00
|
|
|
MeterRegistry meterRegistry,
|
2021-12-30 17:28:06 +01:00
|
|
|
String clusterName,
|
2022-03-05 15:46:40 +01:00
|
|
|
IntList activeShards,
|
|
|
|
int totalShards,
|
2021-05-28 16:04:59 +02:00
|
|
|
IndicizerAnalyzers indicizerAnalyzers,
|
|
|
|
IndicizerSimilarities indicizerSimilarities,
|
2021-10-13 00:23:56 +02:00
|
|
|
LuceneOptions luceneOptions,
|
2022-03-05 15:46:40 +01:00
|
|
|
@Nullable LuceneHacks luceneHacks,
|
|
|
|
LuceneRocksDBManager rocksDBManager) throws IOException {
|
2020-12-07 22:15:18 +01:00
|
|
|
|
2022-03-05 15:46:40 +01:00
|
|
|
if (totalShards <= 1 || totalShards > 100) {
|
|
|
|
throw new IOException("Unsupported instances count: " + totalShards);
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
2021-10-30 11:13:46 +02:00
|
|
|
this.meterRegistry = meterRegistry;
|
2022-03-05 15:46:40 +01:00
|
|
|
LLLocalLuceneIndex[] luceneIndices = new LLLocalLuceneIndex[totalShards];
|
|
|
|
for (int i = 0; i < totalShards; i++) {
|
|
|
|
if (!activeShards.contains(i)) {
|
|
|
|
continue;
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
2021-11-21 12:31:23 +01:00
|
|
|
luceneIndices[i] = new LLLocalLuceneIndex(env,
|
2021-10-30 11:13:46 +02:00
|
|
|
meterRegistry,
|
2021-12-30 17:28:06 +01:00
|
|
|
clusterName,
|
2022-03-05 15:46:40 +01:00
|
|
|
i,
|
2021-05-28 16:04:59 +02:00
|
|
|
indicizerAnalyzers,
|
|
|
|
indicizerSimilarities,
|
2021-10-13 00:23:56 +02:00
|
|
|
luceneOptions,
|
2022-03-05 15:46:40 +01:00
|
|
|
luceneHacks,
|
|
|
|
rocksDBManager
|
2020-12-07 22:15:18 +01:00
|
|
|
);
|
|
|
|
}
|
2022-03-05 15:46:40 +01:00
|
|
|
this.clusterName = clusterName;
|
|
|
|
this.totalShards = totalShards;
|
|
|
|
this.luceneIndicesById = luceneIndices;
|
|
|
|
var luceneIndicesSet = new HashSet<LLLocalLuceneIndex>();
|
|
|
|
for (var luceneIndex : luceneIndices) {
|
|
|
|
if (luceneIndex != null) {
|
|
|
|
luceneIndicesSet.add(luceneIndex);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
this.luceneIndicesSet = new ArrayList<>(luceneIndicesSet);
|
|
|
|
this.luceneIndicesFlux = Flux.fromIterable(luceneIndicesSet);
|
2021-09-20 11:52:21 +02:00
|
|
|
this.luceneAnalyzer = LuceneUtils.toPerFieldAnalyzerWrapper(indicizerAnalyzers);
|
|
|
|
this.luceneSimilarity = LuceneUtils.toPerFieldSimilarityWrapper(indicizerSimilarities);
|
2022-03-05 15:46:40 +01:00
|
|
|
this.lowMemory = luceneOptions.lowMemory();
|
2021-10-13 00:23:56 +02:00
|
|
|
|
2022-04-06 02:41:32 +02:00
|
|
|
var useHugePq = luceneOptions.allowNonVolatileCollection();
|
2021-12-12 23:40:30 +01:00
|
|
|
var maxInMemoryResultEntries = luceneOptions.maxInMemoryResultEntries();
|
2021-10-13 00:23:56 +02:00
|
|
|
if (luceneHacks != null && luceneHacks.customMultiSearcher() != null) {
|
|
|
|
multiSearcher = luceneHacks.customMultiSearcher().get();
|
|
|
|
} else {
|
2022-04-06 02:41:32 +02:00
|
|
|
multiSearcher = new AdaptiveMultiSearcher(env, useHugePq, maxInMemoryResultEntries);
|
2021-10-13 00:23:56 +02:00
|
|
|
}
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
private LLLocalLuceneIndex getLuceneIndex(LLTerm id) {
|
2022-03-05 15:46:40 +01:00
|
|
|
return Objects.requireNonNull(luceneIndicesById[LuceneUtils.getLuceneIndexId(id, totalShards)]);
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public String getLuceneIndexName() {
|
2022-03-05 15:46:40 +01:00
|
|
|
return clusterName;
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
2022-06-14 13:10:38 +02:00
|
|
|
private Mono<LLIndexSearchers> getIndexSearchers(LLSnapshot snapshot) {
|
2022-06-14 17:46:49 +02:00
|
|
|
return luceneIndicesFlux.index()
|
2021-09-19 19:59:37 +02:00
|
|
|
// Resolve the snapshot of each shard
|
|
|
|
.flatMap(tuple -> Mono
|
|
|
|
.fromCallable(() -> resolveSnapshotOptional(snapshot, (int) (long) tuple.getT1()))
|
2021-09-22 11:03:39 +02:00
|
|
|
.flatMap(luceneSnapshot -> tuple.getT2().retrieveSearcher(luceneSnapshot.orElse(null)))
|
2021-09-20 11:35:01 +02:00
|
|
|
)
|
|
|
|
.collectList()
|
2022-06-14 17:46:49 +02:00
|
|
|
.doOnDiscard(LLIndexSearcher.class, indexSearcher -> {
|
|
|
|
try {
|
|
|
|
indexSearcher.close();
|
|
|
|
} catch (IOException ex) {
|
|
|
|
LOG.error("Failed to close an index searcher", ex);
|
|
|
|
}
|
|
|
|
})
|
2022-06-14 13:10:38 +02:00
|
|
|
.map(LLIndexSearchers::of);
|
2021-09-19 19:59:37 +02:00
|
|
|
}
|
|
|
|
|
2020-12-07 22:15:18 +01:00
|
|
|
@Override
|
2021-11-07 17:46:40 +01:00
|
|
|
public Mono<Void> addDocument(LLTerm id, LLUpdateDocument doc) {
|
2021-01-30 01:41:04 +01:00
|
|
|
return getLuceneIndex(id).addDocument(id, doc);
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2022-03-19 00:08:23 +01:00
|
|
|
public Mono<Long> addDocuments(boolean atomic, Flux<Entry<LLTerm, LLUpdateDocument>> documents) {
|
2022-03-13 11:01:51 +01:00
|
|
|
if (BYPASS_GROUPBY_BUG) {
|
|
|
|
return documents
|
|
|
|
.buffer(8192)
|
|
|
|
.flatMap(inputEntries -> {
|
|
|
|
List<Entry<LLTerm, LLUpdateDocument>>[] sortedEntries = new List[totalShards];
|
2022-03-19 00:08:23 +01:00
|
|
|
Mono<Long>[] results = new Mono[totalShards];
|
2022-03-13 11:01:51 +01:00
|
|
|
|
|
|
|
// Sort entries
|
|
|
|
for(var inputEntry : inputEntries) {
|
|
|
|
int luceneIndexId = LuceneUtils.getLuceneIndexId(inputEntry.getKey(), totalShards);
|
|
|
|
if (sortedEntries[luceneIndexId] == null) {
|
|
|
|
sortedEntries[luceneIndexId] = new ArrayList<>();
|
|
|
|
}
|
|
|
|
sortedEntries[luceneIndexId].add(inputEntry);
|
|
|
|
}
|
|
|
|
|
|
|
|
// Add documents
|
|
|
|
int luceneIndexId = 0;
|
|
|
|
for (List<Entry<LLTerm, LLUpdateDocument>> docs : sortedEntries) {
|
|
|
|
if (docs != null && !docs.isEmpty()) {
|
|
|
|
LLLocalLuceneIndex luceneIndex = Objects.requireNonNull(luceneIndicesById[luceneIndexId]);
|
|
|
|
results[luceneIndexId] = luceneIndex.addDocuments(atomic, Flux.fromIterable(docs));
|
|
|
|
} else {
|
|
|
|
results[luceneIndexId] = Mono.empty();
|
|
|
|
}
|
|
|
|
luceneIndexId++;
|
|
|
|
}
|
|
|
|
|
2022-03-19 00:08:23 +01:00
|
|
|
return Flux.merge(results).reduce(0L, Long::sum);
|
2022-03-13 11:01:51 +01:00
|
|
|
})
|
2022-03-19 00:08:23 +01:00
|
|
|
.reduce(0L, Long::sum);
|
2022-03-13 11:01:51 +01:00
|
|
|
} else {
|
|
|
|
return documents
|
|
|
|
.groupBy(term -> getLuceneIndex(term.getKey()))
|
|
|
|
.flatMap(group -> group.key().addDocuments(atomic, group))
|
2022-03-19 00:08:23 +01:00
|
|
|
.reduce(0L, Long::sum);
|
2022-03-13 11:01:51 +01:00
|
|
|
}
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-01-30 01:41:04 +01:00
|
|
|
public Mono<Void> deleteDocument(LLTerm id) {
|
|
|
|
return getLuceneIndex(id).deleteDocument(id);
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-11-07 17:46:40 +01:00
|
|
|
public Mono<Void> update(LLTerm id, LLIndexRequest request) {
|
|
|
|
return getLuceneIndex(id).update(id, request);
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2022-03-19 00:08:23 +01:00
|
|
|
public Mono<Long> updateDocuments(Flux<Entry<LLTerm, LLUpdateDocument>> documents) {
|
|
|
|
documents = documents
|
|
|
|
.log("local-multi-update-documents", Level.FINEST, false, SignalType.ON_NEXT, SignalType.ON_COMPLETE);
|
2022-03-13 11:01:51 +01:00
|
|
|
if (BYPASS_GROUPBY_BUG) {
|
2022-03-18 19:16:06 +01:00
|
|
|
int bufferSize = 8192;
|
2022-03-13 11:01:51 +01:00
|
|
|
return documents
|
2022-03-18 19:16:06 +01:00
|
|
|
.window(bufferSize)
|
|
|
|
.flatMap(bufferFlux -> bufferFlux
|
|
|
|
.collect(Collectors.groupingBy(inputEntry -> LuceneUtils.getLuceneIndexId(inputEntry.getKey(), totalShards),
|
|
|
|
Collectors.collectingAndThen(Collectors.toList(), docs -> {
|
2022-03-19 00:08:23 +01:00
|
|
|
var luceneIndex = getLuceneIndex(docs.get(0).getKey());
|
2022-03-18 19:16:06 +01:00
|
|
|
return luceneIndex.updateDocuments(Flux.fromIterable(docs));
|
|
|
|
}))
|
|
|
|
)
|
|
|
|
.map(Map::values)
|
2022-03-19 00:08:23 +01:00
|
|
|
.flatMap(parts -> Flux.merge(parts).reduce(0L, Long::sum))
|
2022-03-18 19:16:06 +01:00
|
|
|
)
|
2022-03-19 00:08:23 +01:00
|
|
|
.reduce(0L, Long::sum);
|
2022-03-13 11:01:51 +01:00
|
|
|
} else {
|
|
|
|
return documents
|
|
|
|
.groupBy(term -> getLuceneIndex(term.getKey()))
|
|
|
|
.flatMap(group -> group.key().updateDocuments(group))
|
2022-03-19 00:08:23 +01:00
|
|
|
.reduce(0L, Long::sum);
|
2022-03-13 11:01:51 +01:00
|
|
|
}
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-01-30 01:41:04 +01:00
|
|
|
public Mono<Void> deleteAll() {
|
2022-03-05 15:46:40 +01:00
|
|
|
return luceneIndicesFlux
|
2021-01-30 01:41:04 +01:00
|
|
|
.flatMap(LLLocalLuceneIndex::deleteAll)
|
|
|
|
.then();
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
private LLSnapshot resolveSnapshot(LLSnapshot multiSnapshot, int instanceId) {
|
|
|
|
if (multiSnapshot != null) {
|
2022-03-05 15:46:40 +01:00
|
|
|
return registeredSnapshots.get(multiSnapshot.getSequenceNumber()).get(instanceId);
|
2020-12-07 22:15:18 +01:00
|
|
|
} else {
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-02-03 13:48:30 +01:00
|
|
|
private Optional<LLSnapshot> resolveSnapshotOptional(LLSnapshot multiSnapshot, int instanceId) {
|
|
|
|
return Optional.ofNullable(resolveSnapshot(multiSnapshot, instanceId));
|
|
|
|
}
|
|
|
|
|
2020-12-07 22:15:18 +01:00
|
|
|
@Override
|
2022-03-05 15:46:40 +01:00
|
|
|
public Flux<LLSearchResultShard> moreLikeThis(@Nullable LLSnapshot snapshot,
|
2021-03-02 01:53:36 +01:00
|
|
|
QueryParams queryParams,
|
|
|
|
String keyFieldName,
|
2022-01-28 19:31:25 +01:00
|
|
|
Multimap<String, String> mltDocumentFields) {
|
2021-11-16 23:19:23 +01:00
|
|
|
LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams, luceneAnalyzer);
|
2021-09-20 11:35:01 +02:00
|
|
|
var searchers = this.getIndexSearchers(snapshot);
|
2022-01-28 19:31:25 +01:00
|
|
|
var transformer = new MoreLikeThisTransformer(mltDocumentFields, luceneAnalyzer, luceneSimilarity);
|
2021-09-19 19:59:37 +02:00
|
|
|
|
|
|
|
// Collect all the shards results into a single global result
|
|
|
|
return multiSearcher
|
2021-09-20 11:35:01 +02:00
|
|
|
.collectMulti(searchers, localQueryParams, keyFieldName, transformer)
|
2021-09-19 19:59:37 +02:00
|
|
|
// Transform the result type
|
2022-03-05 15:46:40 +01:00
|
|
|
.map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close))
|
|
|
|
.flux();
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2022-03-05 15:46:40 +01:00
|
|
|
public Flux<LLSearchResultShard> search(@Nullable LLSnapshot snapshot,
|
2021-03-02 01:53:36 +01:00
|
|
|
QueryParams queryParams,
|
2022-02-26 03:28:20 +01:00
|
|
|
@Nullable String keyFieldName) {
|
2021-11-16 23:19:23 +01:00
|
|
|
LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams, luceneAnalyzer);
|
2021-09-20 11:35:01 +02:00
|
|
|
var searchers = getIndexSearchers(snapshot);
|
2021-04-14 02:37:03 +02:00
|
|
|
|
2021-09-19 19:59:37 +02:00
|
|
|
// Collect all the shards results into a single global result
|
2021-07-06 00:30:14 +02:00
|
|
|
return multiSearcher
|
2022-01-28 21:12:10 +01:00
|
|
|
.collectMulti(searchers, localQueryParams, keyFieldName, GlobalQueryRewrite.NO_REWRITE)
|
2021-09-19 19:59:37 +02:00
|
|
|
// Transform the result type
|
2022-03-05 15:46:40 +01:00
|
|
|
.map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close))
|
|
|
|
.flux();
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
2021-11-18 17:13:53 +01:00
|
|
|
@Override
|
2021-11-19 19:03:31 +01:00
|
|
|
public Mono<Buckets> computeBuckets(@Nullable LLSnapshot snapshot,
|
|
|
|
@NotNull List<Query> queries,
|
|
|
|
@Nullable Query normalizationQuery,
|
2021-11-18 17:13:53 +01:00
|
|
|
BucketParams bucketParams) {
|
2021-11-19 19:03:31 +01:00
|
|
|
List<org.apache.lucene.search.Query> localQueries = new ArrayList<>(queries.size());
|
|
|
|
for (Query query : queries) {
|
|
|
|
localQueries.add(QueryParser.toQuery(query, luceneAnalyzer));
|
|
|
|
}
|
|
|
|
var localNormalizationQuery = QueryParser.toQuery(normalizationQuery, luceneAnalyzer);
|
2021-11-18 17:13:53 +01:00
|
|
|
var searchers = getIndexSearchers(snapshot);
|
|
|
|
|
|
|
|
// Collect all the shards results into a single global result
|
2022-01-26 14:22:54 +01:00
|
|
|
return decimalBucketMultiSearcher.collectMulti(searchers, bucketParams, localQueries, localNormalizationQuery);
|
2021-11-18 17:13:53 +01:00
|
|
|
}
|
|
|
|
|
2020-12-07 22:15:18 +01:00
|
|
|
@Override
|
2021-01-30 01:41:04 +01:00
|
|
|
public Mono<Void> close() {
|
2022-03-05 15:46:40 +01:00
|
|
|
return luceneIndicesFlux
|
2021-01-30 01:41:04 +01:00
|
|
|
.flatMap(LLLocalLuceneIndex::close)
|
2021-10-13 00:23:56 +02:00
|
|
|
.then(Mono.fromCallable(() -> {
|
|
|
|
if (multiSearcher instanceof Closeable closeable) {
|
2021-12-16 16:14:44 +01:00
|
|
|
//noinspection BlockingMethodInNonBlockingContext
|
2021-10-13 00:23:56 +02:00
|
|
|
closeable.close();
|
|
|
|
}
|
|
|
|
return null;
|
2021-12-16 16:14:44 +01:00
|
|
|
}).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())))
|
|
|
|
.publishOn(Schedulers.parallel())
|
2021-01-30 01:41:04 +01:00
|
|
|
.then();
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
2021-02-03 13:48:30 +01:00
|
|
|
@Override
|
|
|
|
public Mono<Void> flush() {
|
2022-03-05 15:46:40 +01:00
|
|
|
return luceneIndicesFlux
|
2021-02-03 13:48:30 +01:00
|
|
|
.flatMap(LLLocalLuceneIndex::flush)
|
|
|
|
.then();
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-07-18 19:37:24 +02:00
|
|
|
public Mono<Void> refresh(boolean force) {
|
2022-03-05 15:46:40 +01:00
|
|
|
return luceneIndicesFlux
|
2021-07-18 19:37:24 +02:00
|
|
|
.flatMap(index -> index.refresh(force))
|
2021-02-03 13:48:30 +01:00
|
|
|
.then();
|
|
|
|
}
|
|
|
|
|
2020-12-07 22:15:18 +01:00
|
|
|
@Override
|
2021-01-30 01:41:04 +01:00
|
|
|
public Mono<LLSnapshot> takeSnapshot() {
|
|
|
|
return Mono
|
2021-06-25 20:07:19 +02:00
|
|
|
// Generate next snapshot index
|
|
|
|
.fromCallable(nextSnapshotNumber::getAndIncrement)
|
2022-03-05 15:46:40 +01:00
|
|
|
.flatMap(snapshotIndex -> luceneIndicesFlux
|
2021-06-25 20:07:19 +02:00
|
|
|
.flatMapSequential(LLLocalLuceneIndex::takeSnapshot)
|
|
|
|
.collectList()
|
|
|
|
.doOnNext(instancesSnapshotsArray -> registeredSnapshots.put(snapshotIndex, instancesSnapshotsArray))
|
|
|
|
.thenReturn(new LLSnapshot(snapshotIndex))
|
|
|
|
);
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-01-30 01:41:04 +01:00
|
|
|
public Mono<Void> releaseSnapshot(LLSnapshot snapshot) {
|
|
|
|
return Mono
|
2021-06-25 20:07:19 +02:00
|
|
|
.fromCallable(() -> registeredSnapshots.remove(snapshot.getSequenceNumber()))
|
2022-03-05 15:46:40 +01:00
|
|
|
.flatMapIterable(list -> list)
|
2021-06-25 20:07:19 +02:00
|
|
|
.index()
|
|
|
|
.flatMapSequential(tuple -> {
|
|
|
|
int index = (int) (long) tuple.getT1();
|
|
|
|
LLSnapshot instanceSnapshot = tuple.getT2();
|
2022-03-05 15:46:40 +01:00
|
|
|
return luceneIndicesSet.get(index).releaseSnapshot(instanceSnapshot);
|
2021-01-30 01:41:04 +01:00
|
|
|
})
|
2021-06-25 20:07:19 +02:00
|
|
|
.then();
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public boolean isLowMemoryMode() {
|
2022-03-05 15:46:40 +01:00
|
|
|
return lowMemory;
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
}
|