CavalliumDBEngine/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java

253 lines
7.9 KiB
Java
Raw Normal View History

2021-04-03 19:09:06 +02:00
package it.cavallium.dbengine.client;
2022-07-23 15:03:59 +02:00
import it.cavallium.dbengine.client.Hits.CloseableHits;
import it.cavallium.dbengine.client.Hits.LuceneHits;
2021-04-03 19:09:06 +02:00
import it.cavallium.dbengine.client.query.ClientQueryParams;
import it.cavallium.dbengine.client.query.current.data.Query;
2021-08-04 01:12:39 +02:00
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
2022-06-30 15:06:10 +02:00
import it.cavallium.dbengine.database.DiscardingCloseable;
2022-03-05 15:46:40 +01:00
import it.cavallium.dbengine.database.LLKeyScore;
2021-04-03 19:09:06 +02:00
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLSearchResultShard;
2022-07-23 14:25:59 +02:00
import it.cavallium.dbengine.database.LLSearchResultShard.LuceneLLSearchResultShard;
import it.cavallium.dbengine.database.LLSearchResultShard.ResourcesLLSearchResultShard;
2021-04-03 19:09:06 +02:00
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLTerm;
2022-03-19 00:08:23 +01:00
import it.cavallium.dbengine.database.LLUpdateDocument;
2022-07-23 15:12:44 +02:00
import it.cavallium.dbengine.database.LLUtils;
2022-06-30 15:06:10 +02:00
import it.cavallium.dbengine.database.SafeCloseable;
2022-07-23 14:25:59 +02:00
import it.cavallium.dbengine.lucene.LuceneCloseable;
2022-03-05 15:46:40 +01:00
import it.cavallium.dbengine.lucene.LuceneUtils;
2021-11-19 19:03:31 +01:00
import it.cavallium.dbengine.lucene.collector.Buckets;
2021-11-18 17:13:53 +01:00
import it.cavallium.dbengine.lucene.searcher.BucketParams;
2022-03-05 15:46:40 +01:00
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
2022-03-11 17:59:46 +01:00
import java.time.Duration;
2021-11-19 19:03:31 +01:00
import java.util.List;
2021-04-03 19:09:06 +02:00
import java.util.Map;
import java.util.Map.Entry;
2022-03-05 15:46:40 +01:00
import java.util.Objects;
2022-03-19 00:08:23 +01:00
import java.util.logging.Level;
2022-07-23 14:25:59 +02:00
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
2021-06-07 16:21:12 +02:00
import org.jetbrains.annotations.NotNull;
2021-04-03 19:09:06 +02:00
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
2022-03-19 00:08:23 +01:00
import reactor.core.publisher.SignalType;
2021-04-03 19:09:06 +02:00
public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
2022-07-02 11:44:13 +02:00
private static final Duration MAX_COUNT_TIME = Duration.ofSeconds(30);
2022-07-23 14:25:59 +02:00
private static final Logger LOG = LogManager.getLogger(LuceneIndex.class);
2021-04-03 19:09:06 +02:00
private final LLLuceneIndex luceneIndex;
private final Indicizer<T,U> indicizer;
public LuceneIndexImpl(LLLuceneIndex luceneIndex, Indicizer<T, U> indicizer) {
this.luceneIndex = luceneIndex;
this.indicizer = indicizer;
}
private LLSnapshot resolveSnapshot(CompositeSnapshot snapshot) {
if (snapshot == null) {
return null;
} else {
return snapshot.getSnapshot(luceneIndex);
}
}
@Override
public Mono<Void> addDocument(T key, U value) {
return indicizer
2021-11-07 18:00:11 +01:00
.toDocument(key, value)
.flatMap(doc -> luceneIndex.addDocument(indicizer.toIndex(key), doc));
2021-04-03 19:09:06 +02:00
}
@Override
2022-03-19 00:08:23 +01:00
public Mono<Long> addDocuments(boolean atomic, Flux<Entry<T, U>> entries) {
2022-03-12 00:22:41 +01:00
return luceneIndex.addDocuments(atomic, entries.flatMap(entry -> indicizer
2022-03-05 15:46:40 +01:00
.toDocument(entry.getKey(), entry.getValue())
.map(doc -> Map.entry(indicizer.toIndex(entry.getKey()), doc))));
2021-04-03 19:09:06 +02:00
}
@Override
public Mono<Void> deleteDocument(T key) {
LLTerm id = indicizer.toIndex(key);
return luceneIndex.deleteDocument(id);
2021-04-03 19:09:06 +02:00
}
@Override
2021-06-07 16:21:12 +02:00
public Mono<Void> updateDocument(T key, @NotNull U value) {
2021-04-03 19:09:06 +02:00
return indicizer
2021-11-07 17:46:40 +01:00
.toIndexRequest(key, value)
.flatMap(doc -> luceneIndex.update(indicizer.toIndex(key), doc));
2021-04-03 19:09:06 +02:00
}
@Override
2022-03-19 00:08:23 +01:00
public Mono<Long> updateDocuments(Flux<Entry<T, U>> entries) {
Flux<Entry<LLTerm, LLUpdateDocument>> mappedEntries = entries
.flatMap(entry -> Mono
.zip(Mono.just(indicizer.toIndex(entry.getKey())),
indicizer.toDocument(entry.getKey(), entry.getValue()).single(),
Map::entry
)
.single()
)
.log("impl-update-documents", Level.FINEST, false, SignalType.ON_NEXT, SignalType.ON_COMPLETE);
return luceneIndex.updateDocuments(mappedEntries);
2021-04-03 19:09:06 +02:00
}
@Override
public Mono<Void> deleteAll() {
return luceneIndex.deleteAll();
2021-04-03 19:09:06 +02:00
}
@Override
2021-11-08 11:17:52 +01:00
public Mono<Hits<HitKey<T>>> moreLikeThis(ClientQueryParams queryParams,
2021-04-03 19:09:06 +02:00
T key,
U mltDocumentValue) {
2022-01-28 19:31:25 +01:00
var mltDocumentFields
2021-04-03 19:09:06 +02:00
= indicizer.getMoreLikeThisDocumentFields(key, mltDocumentValue);
return luceneIndex
2021-05-21 00:19:40 +02:00
.moreLikeThis(resolveSnapshot(queryParams.snapshot()),
queryParams.toQueryParams(),
2021-04-03 19:09:06 +02:00
indicizer.getKeyFieldName(),
mltDocumentFields
)
2022-03-05 15:46:40 +01:00
.collectList()
2022-06-30 13:54:55 +02:00
.mapNotNull(shards -> mergeResults(queryParams, shards))
.map(llSearchResult -> mapResults(llSearchResult))
2022-06-30 13:54:55 +02:00
.defaultIfEmpty(Hits.empty())
2022-07-23 15:12:44 +02:00
.doOnDiscard(DiscardingCloseable.class, LLUtils::onDiscard);
2021-07-17 11:52:08 +02:00
}
@Override
2021-11-08 11:17:52 +01:00
public Mono<Hits<HitKey<T>>> search(ClientQueryParams queryParams) {
2021-04-03 19:09:06 +02:00
return luceneIndex
2021-05-21 00:19:40 +02:00
.search(resolveSnapshot(queryParams.snapshot()),
queryParams.toQueryParams(),
2021-04-03 19:09:06 +02:00
indicizer.getKeyFieldName()
)
2022-03-05 15:46:40 +01:00
.collectList()
2022-06-30 13:54:55 +02:00
.mapNotNull(shards -> mergeResults(queryParams, shards))
.map(llSearchResult -> mapResults(llSearchResult))
2022-06-20 23:31:42 +02:00
.defaultIfEmpty(Hits.empty())
2022-07-23 15:12:44 +02:00
.doOnDiscard(DiscardingCloseable.class, LLUtils::onDiscard);
2021-04-03 19:09:06 +02:00
}
2021-11-18 17:13:53 +01:00
@Override
2021-11-19 19:03:31 +01:00
public Mono<Buckets> computeBuckets(@Nullable CompositeSnapshot snapshot,
@NotNull List<Query> query,
2022-01-17 18:12:12 +01:00
@Nullable Query normalizationQuery,
2021-11-19 19:03:31 +01:00
BucketParams bucketParams) {
2022-03-05 15:46:40 +01:00
return luceneIndex.computeBuckets(resolveSnapshot(snapshot), query,
normalizationQuery, bucketParams).single();
2021-11-18 17:13:53 +01:00
}
2021-11-08 11:17:52 +01:00
private Hits<HitKey<T>> mapResults(LLSearchResultShard llSearchResult) {
2022-07-23 14:25:59 +02:00
Flux<HitKey<T>> scoresWithKeysFlux = llSearchResult.results()
2021-10-28 23:48:25 +02:00
.map(hit -> new HitKey<>(indicizer.getKey(hit.key()), hit.score()));
2021-04-03 19:09:06 +02:00
2022-07-23 14:25:59 +02:00
if (llSearchResult instanceof LuceneCloseable luceneCloseable) {
return new LuceneHits<>(scoresWithKeysFlux, llSearchResult.totalHitsCount(), luceneCloseable);
} else {
return new CloseableHits<>(scoresWithKeysFlux, llSearchResult.totalHitsCount(), llSearchResult);
}
2021-07-17 11:52:08 +02:00
}
2021-04-03 19:09:06 +02:00
@Override
2021-08-04 01:12:39 +02:00
public Mono<TotalHitsCount> count(@Nullable CompositeSnapshot snapshot, Query query) {
2022-07-02 11:44:13 +02:00
return luceneIndex
.count(resolveSnapshot(snapshot), query, MAX_COUNT_TIME)
2022-07-23 15:12:44 +02:00
.doOnDiscard(DiscardingCloseable.class, LLUtils::onDiscard);
2021-04-03 19:09:06 +02:00
}
@Override
public boolean isLowMemoryMode() {
return luceneIndex.isLowMemoryMode();
}
@Override
2022-06-30 13:54:55 +02:00
public void close() {
luceneIndex.close();
2021-04-03 19:09:06 +02:00
}
/**
* Flush writes to disk
*/
@Override
public Mono<Void> flush() {
return luceneIndex.flush();
2021-04-03 19:09:06 +02:00
}
@Override
public Mono<Void> waitForMerges() {
return luceneIndex.waitForMerges();
}
@Override
public Mono<Void> waitForLastMerges() {
return luceneIndex.waitForLastMerges();
}
2021-04-03 19:09:06 +02:00
/**
* Refresh index searcher
*/
@Override
2021-07-18 19:37:24 +02:00
public Mono<Void> refresh(boolean force) {
return luceneIndex.refresh(force);
2021-04-03 19:09:06 +02:00
}
@Override
public Mono<LLSnapshot> takeSnapshot() {
return luceneIndex.takeSnapshot();
2021-04-03 19:09:06 +02:00
}
@Override
public Mono<Void> releaseSnapshot(LLSnapshot snapshot) {
return luceneIndex.releaseSnapshot(snapshot);
2021-04-03 19:09:06 +02:00
}
2022-03-05 15:46:40 +01:00
2022-07-23 14:25:59 +02:00
@SuppressWarnings({"unchecked", "rawtypes"})
2022-06-30 13:54:55 +02:00
@Nullable
2022-07-02 11:44:13 +02:00
private static LLSearchResultShard mergeResults(ClientQueryParams queryParams, List<LLSearchResultShard> shards) {
2022-03-16 00:32:00 +01:00
if (shards.size() == 0) {
2022-06-30 13:54:55 +02:00
return null;
2022-03-16 00:32:00 +01:00
} else if (shards.size() == 1) {
2022-06-30 13:54:55 +02:00
return shards.get(0);
2022-03-16 00:32:00 +01:00
}
2022-06-30 13:54:55 +02:00
TotalHitsCount count = null;
ObjectArrayList<Flux<LLKeyScore>> results = new ObjectArrayList<>(shards.size());
2022-07-23 14:25:59 +02:00
ObjectArrayList resources = new ObjectArrayList(shards.size());
boolean luceneResources = false;
2022-06-30 13:54:55 +02:00
for (LLSearchResultShard shard : shards) {
2022-07-23 14:25:59 +02:00
if (!luceneResources && shard instanceof LuceneCloseable) {
luceneResources = true;
}
2022-06-30 13:54:55 +02:00
if (count == null) {
count = shard.totalHitsCount();
2022-03-21 15:25:26 +01:00
} else {
2022-06-30 13:54:55 +02:00
count = LuceneUtils.sum(count, shard.totalHitsCount());
}
var maxLimit = queryParams.offset() + queryParams.limit();
results.add(shard.results().take(maxLimit, true));
resources.add(shard);
}
Objects.requireNonNull(count);
Flux<LLKeyScore> resultsFlux;
if (results.size() == 0) {
resultsFlux = Flux.empty();
} else if (results.size() == 1) {
resultsFlux = results.get(0);
} else {
resultsFlux = Flux.merge(results);
}
2022-07-23 14:25:59 +02:00
if (luceneResources) {
return new LuceneLLSearchResultShard(resultsFlux, count, (List<LuceneCloseable>) resources);
} else {
return new ResourcesLLSearchResultShard(resultsFlux, count, (List<SafeCloseable>) resources);
}
}
2021-04-03 19:09:06 +02:00
}