package it.cavallium.dbengine.client; import it.cavallium.dbengine.client.query.ClientQueryParams; import it.cavallium.dbengine.client.query.current.data.Query; import it.cavallium.dbengine.client.query.current.data.QueryParams; import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.database.LLScoreMode; import it.cavallium.dbengine.database.LLSearchResult; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLTerm; import it.cavallium.dbengine.database.collections.Joiner.ValueGetter; import it.cavallium.dbengine.lucene.LuceneUtils; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; public class LuceneIndexImpl implements LuceneIndex { private final LLLuceneIndex luceneIndex; private final Indicizer indicizer; public LuceneIndexImpl(LLLuceneIndex luceneIndex, Indicizer indicizer) { this.luceneIndex = luceneIndex; this.indicizer = indicizer; } private LLSnapshot resolveSnapshot(CompositeSnapshot snapshot) { if (snapshot == null) { return null; } else { return snapshot.getSnapshot(luceneIndex); } } @Override public Mono addDocument(T key, U value) { return indicizer .toDocument(key, value) .flatMap(doc -> luceneIndex.addDocument(indicizer.toIndex(key), doc)); } @Override public Mono addDocuments(Flux> entries) { return luceneIndex .addDocuments(entries .flatMap(entry -> indicizer .toDocument(entry.getKey(), entry.getValue()) .map(doc -> Map.entry(indicizer.toIndex(entry.getKey()), doc))) ); } @Override public Mono deleteDocument(T key) { LLTerm id = indicizer.toIndex(key); return luceneIndex.deleteDocument(id); } @Override public Mono updateDocument(T key, @NotNull U value) { return indicizer .toDocument(key, value) .flatMap(doc -> luceneIndex.updateDocument(indicizer.toIndex(key), doc)); } @Override public Mono updateDocuments(Flux> entries) { return luceneIndex .updateDocuments(entries .flatMap(entry -> indicizer .toDocument(entry.getKey(), entry.getValue()) .map(doc -> Map.entry(indicizer.toIndex(entry.getKey()), doc))) .collectMap(Entry::getKey, Entry::getValue) ); } @Override public Mono deleteAll() { return luceneIndex.deleteAll(); } private static QueryParams fixOffset(LLLuceneIndex luceneIndex, QueryParams queryParams) { if (luceneIndex.supportsOffset()) { return queryParams; } else { return queryParams.setOffset(0).setLimit(queryParams.limit() + queryParams.offset()); } } private static long fixTransformOffset(LLLuceneIndex luceneIndex, long offset) { if (luceneIndex.supportsOffset()) { return 0; } else { return offset; } } private Mono> transformLuceneResult(LLSearchResult llSearchResult, @Nullable MultiSort, ?> sort, LLScoreMode scoreMode, long offset, @Nullable Long limit) { Flux> mappedKeys = llSearchResult .results() .map(flux -> new SearchResultKeys<>(flux .results() .map(signal -> new SearchResultKey<>(signal.key().map(indicizer::getKey), signal.score())), flux.totalHitsCount() )); MultiSort, ?> finalSort; if (scoreMode != LLScoreMode.COMPLETE_NO_SCORES && sort == null) { finalSort = MultiSort.topScore(); } else { finalSort = sort; } return LuceneUtils.mergeSignalStreamKeys(mappedKeys, finalSort, offset, limit); } private Mono> transformLuceneResultWithValues(LLSearchResult llSearchResult, @Nullable MultiSort, V> sort, LLScoreMode scoreMode, long offset, @Nullable Long limit, ValueGetter valueGetter) { Flux> mappedKeys = llSearchResult .results() .map(flux -> new SearchResult<>(flux .results() .map(signal -> { var key = signal.key().map(indicizer::getKey); return new SearchResultItem<>(key, key.flatMap(valueGetter::get), signal.score()); }), flux.totalHitsCount())); MultiSort, ?> finalSort; if (scoreMode != LLScoreMode.COMPLETE_NO_SCORES && sort == null) { finalSort = MultiSort.topScoreWithValues(); } else { finalSort = sort; } return LuceneUtils.mergeSignalStreamItems(mappedKeys, finalSort, offset, limit); } /** * * @param queryParams the limit is valid for each lucene instance. * If you have 15 instances, the number of elements returned * can be at most limit * 15 * @return the collection has one or more flux */ @Override public Mono> moreLikeThis(ClientQueryParams, V> queryParams, T key, U mltDocumentValue) { Flux>> mltDocumentFields = indicizer.getMoreLikeThisDocumentFields(key, mltDocumentValue); return luceneIndex .moreLikeThis(resolveSnapshot(queryParams.snapshot()), fixOffset(luceneIndex, queryParams.toQueryParams()), indicizer.getKeyFieldName(), mltDocumentFields) .flatMap(llSearchResult -> this.transformLuceneResult(llSearchResult, queryParams.sort(), queryParams.scoreMode(), fixTransformOffset(luceneIndex, queryParams.offset()), queryParams.limit() )); } /** * * @param queryParams the limit is valid for each lucene instance. * If you have 15 instances, the number of elements returned * can be at most limit * 15 * @return the collection has one or more flux */ @Override public Mono> moreLikeThisWithValues(ClientQueryParams, V> queryParams, T key, U mltDocumentValue, ValueGetter valueGetter) { Flux>> mltDocumentFields = indicizer.getMoreLikeThisDocumentFields(key, mltDocumentValue); return luceneIndex .moreLikeThis(resolveSnapshot(queryParams.snapshot()), fixOffset(luceneIndex, queryParams.toQueryParams()), indicizer.getKeyFieldName(), mltDocumentFields ) .flatMap(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult, queryParams.sort(), queryParams.scoreMode(), fixTransformOffset(luceneIndex, queryParams.offset()), queryParams.limit(), valueGetter )); } /** * * @param queryParams the limit is valid for each lucene instance. * If you have 15 instances, the number of elements returned * can be at most limit * 15 * @return the collection has one or more flux */ @Override public Mono> search(ClientQueryParams, V> queryParams) { return luceneIndex .search(resolveSnapshot(queryParams.snapshot()), fixOffset(luceneIndex, queryParams.toQueryParams()), indicizer.getKeyFieldName() ) .flatMap(llSearchResult -> this.transformLuceneResult(llSearchResult, queryParams.sort(), queryParams.scoreMode(), fixTransformOffset(luceneIndex, queryParams.offset()), queryParams.limit() )); } /** * * @param queryParams the limit is valid for each lucene instance. * If you have 15 instances, the number of elements returned * can be at most limit * 15 * @return the collection has one or more flux */ @Override public Mono> searchWithValues(ClientQueryParams, V> queryParams, ValueGetter valueGetter) { return luceneIndex .search(resolveSnapshot(queryParams.snapshot()), fixOffset(luceneIndex, queryParams.toQueryParams()), indicizer.getKeyFieldName()) .flatMap(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult, queryParams.sort(), queryParams.scoreMode(), fixTransformOffset(luceneIndex, queryParams.offset()), queryParams.limit(), valueGetter )); } @Override public Mono count(@Nullable CompositeSnapshot snapshot, Query query) { return this.search(ClientQueryParams., Object>builder().snapshot(snapshot).query(query).limit(0).build()) .map(SearchResultKeys::totalHitsCount); } @Override public boolean isLowMemoryMode() { return luceneIndex.isLowMemoryMode(); } @Override public Mono close() { return luceneIndex.close(); } /** * Flush writes to disk */ @Override public Mono flush() { return luceneIndex.flush(); } /** * Refresh index searcher */ @Override public Mono refresh() { return luceneIndex.refresh(); } @Override public Mono takeSnapshot() { return luceneIndex.takeSnapshot(); } @Override public Mono releaseSnapshot(LLSnapshot snapshot) { return luceneIndex.releaseSnapshot(snapshot); } }