package it.cavallium.dbengine.client; import it.cavallium.dbengine.client.Hits.CloseableHits; import it.cavallium.dbengine.client.Hits.LuceneHits; import it.cavallium.dbengine.client.query.ClientQueryParams; import it.cavallium.dbengine.client.query.current.data.Query; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.database.LLSearchResultShard; import it.cavallium.dbengine.database.LLSearchResultShard.LuceneLLSearchResultShard; import it.cavallium.dbengine.database.LLSearchResultShard.ResourcesLLSearchResultShard; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLTerm; import it.cavallium.dbengine.database.SafeCloseable; import it.cavallium.dbengine.lucene.LuceneCloseable; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.collector.Buckets; import it.cavallium.dbengine.lucene.searcher.BucketParams; import it.unimi.dsi.fastutil.objects.ObjectArrayList; import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.function.Function; import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; public class LuceneIndexImpl implements LuceneIndex { private static final Duration MAX_COUNT_TIME = Duration.ofSeconds(30); private static final Logger LOG = LogManager.getLogger(LuceneIndex.class); private final LLLuceneIndex luceneIndex; private final Indicizer indicizer; public LuceneIndexImpl(LLLuceneIndex luceneIndex, Indicizer indicizer) { this.luceneIndex = luceneIndex; this.indicizer = indicizer; } private LLSnapshot resolveSnapshot(CompositeSnapshot snapshot) { if (snapshot == null) { return null; } else { return snapshot.getSnapshot(luceneIndex); } } @Override public void addDocument(T key, U value) { luceneIndex.addDocument(indicizer.toIndex(key), indicizer.toDocument(key, value)); } @Override public long addDocuments(boolean atomic, Stream> entries) { return luceneIndex.addDocuments(atomic, entries.map(entry -> Map.entry(indicizer.toIndex(entry.getKey()), indicizer.toDocument(entry.getKey(), entry.getValue())))); } @Override public void deleteDocument(T key) { LLTerm id = indicizer.toIndex(key); luceneIndex.deleteDocument(id); } @Override public void updateDocument(T key, @NotNull U value) { luceneIndex.update(indicizer.toIndex(key), indicizer.toIndexRequest(key, value)); } @Override public long updateDocuments(Stream> entries) { return luceneIndex.updateDocuments(entries.map(entry -> Map.entry(indicizer.toIndex(entry.getKey()), indicizer.toDocument(entry.getKey(), entry.getValue())))); } @Override public void deleteAll() { luceneIndex.deleteAll(); } @Override public Hits> moreLikeThis(ClientQueryParams queryParams, T key, U mltDocumentValue) { var mltDocumentFields = indicizer.getMoreLikeThisDocumentFields(key, mltDocumentValue); var results = luceneIndex .moreLikeThis(resolveSnapshot(queryParams.snapshot()), queryParams.toQueryParams(), indicizer.getKeyFieldName(), mltDocumentFields ) .toList(); LLSearchResultShard mergedResults = mergeResults(queryParams, results); if (mergedResults != null) { return mapResults(mergedResults); } else { return Hits.empty(); } } @Override public Hits> search(ClientQueryParams queryParams) { var results = luceneIndex .search(resolveSnapshot(queryParams.snapshot()), queryParams.toQueryParams(), indicizer.getKeyFieldName() ) .toList(); var mergedResults = mergeResults(queryParams, results); if (mergedResults != null) { return mapResults(mergedResults); } else { return Hits.empty(); } } @Override public Buckets computeBuckets(@Nullable CompositeSnapshot snapshot, @NotNull List query, @Nullable Query normalizationQuery, BucketParams bucketParams) { return luceneIndex.computeBuckets(resolveSnapshot(snapshot), query, normalizationQuery, bucketParams); } private Hits> mapResults(LLSearchResultShard llSearchResult) { Stream> scoresWithKeysFlux = llSearchResult.results() .map(hit -> new HitKey<>(indicizer.getKey(hit.key()), hit.score())); if (llSearchResult instanceof LuceneCloseable luceneCloseable) { return new LuceneHits<>(scoresWithKeysFlux, llSearchResult.totalHitsCount(), luceneCloseable); } else { return new CloseableHits<>(scoresWithKeysFlux, llSearchResult.totalHitsCount(), llSearchResult); } } @Override public TotalHitsCount count(@Nullable CompositeSnapshot snapshot, Query query) { return luceneIndex.count(resolveSnapshot(snapshot), query, MAX_COUNT_TIME); } @Override public boolean isLowMemoryMode() { return luceneIndex.isLowMemoryMode(); } @Override public void close() { luceneIndex.close(); } /** * Flush writes to disk */ @Override public void flush() { luceneIndex.flush(); } @Override public void waitForMerges() { luceneIndex.waitForMerges(); } @Override public void waitForLastMerges() { luceneIndex.waitForLastMerges(); } /** * Refresh index searcher */ @Override public void refresh(boolean force) { luceneIndex.refresh(force); } @Override public LLSnapshot takeSnapshot() { return luceneIndex.takeSnapshot(); } @Override public void releaseSnapshot(LLSnapshot snapshot) { luceneIndex.releaseSnapshot(snapshot); } @SuppressWarnings({"unchecked", "rawtypes"}) @Nullable private static LLSearchResultShard mergeResults(ClientQueryParams queryParams, List shards) { if (shards.size() == 0) { return null; } else if (shards.size() == 1) { return shards.get(0); } TotalHitsCount count = null; ObjectArrayList> results = new ObjectArrayList<>(shards.size()); ObjectArrayList resources = new ObjectArrayList(shards.size()); boolean luceneResources = false; for (LLSearchResultShard shard : shards) { if (!luceneResources && shard instanceof LuceneCloseable) { luceneResources = true; } if (count == null) { count = shard.totalHitsCount(); } else { count = LuceneUtils.sum(count, shard.totalHitsCount()); } var maxLimit = queryParams.offset() + queryParams.limit(); results.add(shard.results().limit(maxLimit)); resources.add(shard); } Objects.requireNonNull(count); Stream resultsFlux; if (results.size() == 0) { resultsFlux = Stream.empty(); } else if (results.size() == 1) { resultsFlux = results.get(0); } else { resultsFlux = results.parallelStream().flatMap(Function.identity()); } if (luceneResources) { return new LuceneLLSearchResultShard(resultsFlux, count, (List) resources); } else { return new ResourcesLLSearchResultShard(resultsFlux, count, (List) resources); } } }