package it.cavallium.dbengine.client; import it.cavallium.dbengine.client.query.ClientQueryParams; import it.cavallium.dbengine.client.query.current.data.Query; import it.cavallium.dbengine.client.query.current.data.QueryParams; import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.database.LLScoreMode; import it.cavallium.dbengine.database.LLSearchResult; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLTerm; import it.cavallium.dbengine.database.collections.Joiner.ValueGetter; import it.cavallium.dbengine.lucene.LuceneUtils; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; public class LuceneIndexImpl implements LuceneIndex { private final LLLuceneIndex luceneIndex; private final Indicizer indicizer; public LuceneIndexImpl(LLLuceneIndex luceneIndex, Indicizer indicizer) { this.luceneIndex = luceneIndex; this.indicizer = indicizer; } private LLSnapshot resolveSnapshot(CompositeSnapshot snapshot) { if (snapshot == null) { return null; } else { return snapshot.getSnapshot(luceneIndex); } } @Override public Mono addDocument(T key, U value) { return indicizer .toDocument(key, value) .flatMap(doc -> luceneIndex.addDocument(indicizer.toIndex(key), doc)); } @Override public Mono addDocuments(Flux> entries) { return luceneIndex .addDocuments(entries .flatMap(entry -> indicizer .toDocument(entry.getKey(), entry.getValue()) .map(doc -> Map.entry(indicizer.toIndex(entry.getKey()), doc))) .groupBy(Entry::getKey, Entry::getValue) ); } @Override public Mono deleteDocument(T key) { LLTerm id = indicizer.toIndex(key); return luceneIndex.deleteDocument(id); } @Override public Mono updateDocument(T key, U value) { return indicizer .toDocument(key, value) .flatMap(doc -> luceneIndex.updateDocument(indicizer.toIndex(key), doc)); } @Override public Mono updateDocuments(Flux> entries) { return luceneIndex .updateDocuments(entries .flatMap(entry -> indicizer .toDocument(entry.getKey(), entry.getValue()) .map(doc -> Map.entry(indicizer.toIndex(entry.getKey()), doc))) .groupBy(Entry::getKey, Entry::getValue) ); } @Override public Mono deleteAll() { return luceneIndex.deleteAll(); } private static QueryParams fixOffset(LLLuceneIndex luceneIndex, QueryParams queryParams) { if (luceneIndex.supportsOffset()) { return queryParams; } else { return queryParams.setOffset(0); } } private static long fixTransformOffset(LLLuceneIndex luceneIndex, long offset) { if (luceneIndex.supportsOffset()) { return 0; } else { return offset; } } private Mono> transformLuceneResult(LLSearchResult llSearchResult, @Nullable MultiSort> sort, LLScoreMode scoreMode, long offset, @Nullable Long limit) { Flux> mappedKeys = llSearchResult .getResults() .map(flux -> new SearchResultKeys<>(flux .getResults() .map(signal -> new SearchResultKey<>(indicizer.getKey(signal.getKey()), signal.getScore())), flux.getTotalHitsCount() )); MultiSort> finalSort; if (scoreMode != LLScoreMode.COMPLETE_NO_SCORES && sort == null) { finalSort = MultiSort.topScore(); } else { finalSort = sort; } MultiSort> mappedSort; if (finalSort != null) { mappedSort = new MultiSort<>( finalSort.getQuerySort(), (signal1, signal2) -> finalSort.getResultSort().compare((signal1), signal2) ); } else { mappedSort = null; } return LuceneUtils.mergeSignalStreamKeys(mappedKeys, mappedSort, offset, limit); } private Mono> transformLuceneResultWithValues(LLSearchResult llSearchResult, @Nullable MultiSort> sort, LLScoreMode scoreMode, long offset, @Nullable Long limit, ValueGetter valueGetter) { Flux> mappedKeys = llSearchResult .getResults() .map(flux -> new SearchResult<>(flux.getResults().flatMapSequential(signal -> { var key = indicizer.getKey(signal.getKey()); return valueGetter .get(key) .map(value -> new SearchResultItem<>(key, value, signal.getScore())); }), flux.getTotalHitsCount())); MultiSort> finalSort; if (scoreMode != LLScoreMode.COMPLETE_NO_SCORES && sort == null) { finalSort = MultiSort.topScoreWithValues(); } else { finalSort = sort; } MultiSort> mappedSort; if (finalSort != null) { mappedSort = new MultiSort<>( finalSort.getQuerySort(), (signal1, signal2) -> finalSort.getResultSort().compare((signal1), signal2) ); } else { mappedSort = null; } return LuceneUtils.mergeSignalStreamItems(mappedKeys, mappedSort, offset, limit); } /** * * @param queryParams the limit is valid for each lucene instance. * If you have 15 instances, the number of elements returned * can be at most limit * 15 * @return the collection has one or more flux */ @Override public Mono> moreLikeThis(ClientQueryParams> queryParams, T key, U mltDocumentValue) { Flux>> mltDocumentFields = indicizer.getMoreLikeThisDocumentFields(key, mltDocumentValue); return luceneIndex .moreLikeThis(resolveSnapshot(queryParams.getSnapshot()), fixOffset(luceneIndex, queryParams.toQueryParams()), indicizer.getKeyFieldName(), mltDocumentFields) .flatMap(llSearchResult -> this.transformLuceneResult(llSearchResult, queryParams.getSort(), queryParams.getScoreMode(), fixTransformOffset(luceneIndex, queryParams.getOffset()), queryParams.getLimit() )); } /** * * @param queryParams the limit is valid for each lucene instance. * If you have 15 instances, the number of elements returned * can be at most limit * 15 * @return the collection has one or more flux */ @Override public Mono> moreLikeThisWithValues(ClientQueryParams> queryParams, T key, U mltDocumentValue, ValueGetter valueGetter) { Flux>> mltDocumentFields = indicizer.getMoreLikeThisDocumentFields(key, mltDocumentValue); return luceneIndex .moreLikeThis(resolveSnapshot(queryParams.getSnapshot()), fixOffset(luceneIndex, queryParams.toQueryParams()), indicizer.getKeyFieldName(), mltDocumentFields ) .flatMap(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult, queryParams.getSort(), queryParams.getScoreMode(), fixTransformOffset(luceneIndex, queryParams.getOffset()), queryParams.getLimit(), valueGetter )); } /** * * @param queryParams the limit is valid for each lucene instance. * If you have 15 instances, the number of elements returned * can be at most limit * 15 * @return the collection has one or more flux */ @Override public Mono> search(ClientQueryParams> queryParams) { return luceneIndex .search(resolveSnapshot(queryParams.getSnapshot()), fixOffset(luceneIndex, queryParams.toQueryParams()), indicizer.getKeyFieldName() ) .flatMap(llSearchResult -> this.transformLuceneResult(llSearchResult, queryParams.getSort(), queryParams.getScoreMode(), fixTransformOffset(luceneIndex, queryParams.getOffset()), queryParams.getLimit() )); } /** * * @param queryParams the limit is valid for each lucene instance. * If you have 15 instances, the number of elements returned * can be at most limit * 15 * @return the collection has one or more flux */ @Override public Mono> searchWithValues(ClientQueryParams> queryParams, ValueGetter valueGetter) { return luceneIndex .search(resolveSnapshot(queryParams.getSnapshot()), fixOffset(luceneIndex, queryParams.toQueryParams()), indicizer.getKeyFieldName()) .flatMap(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult, queryParams.getSort(), queryParams.getScoreMode(), fixTransformOffset(luceneIndex, queryParams.getOffset()), queryParams.getLimit(), valueGetter )); } @Override public Mono count(@Nullable CompositeSnapshot snapshot, Query query) { return this.search(ClientQueryParams.>builder().snapshot(snapshot).query(query).limit(0).build()) .map(SearchResultKeys::getTotalHitsCount); } @Override public boolean isLowMemoryMode() { return luceneIndex.isLowMemoryMode(); } @Override public Mono close() { return luceneIndex.close(); } /** * Flush writes to disk */ @Override public Mono flush() { return luceneIndex.flush(); } /** * Refresh index searcher */ @Override public Mono refresh() { return luceneIndex.refresh(); } @Override public Mono takeSnapshot() { return luceneIndex.takeSnapshot(); } @Override public Mono releaseSnapshot(LLSnapshot snapshot) { return luceneIndex.releaseSnapshot(snapshot); } }