Add indicizer
This commit is contained in:
parent
86ed2088a6
commit
d86c92cb61
@ -1,4 +1,4 @@
|
||||
package it.cavallium.dbengine.database.indicizer;
|
||||
package it.cavallium.dbengine.client;
|
||||
|
||||
import it.cavallium.dbengine.database.LLDocument;
|
||||
import it.cavallium.dbengine.database.LLTerm;
|
214
src/main/java/it/cavallium/dbengine/client/LuceneIndex.java
Normal file
214
src/main/java/it/cavallium/dbengine/client/LuceneIndex.java
Normal file
@ -0,0 +1,214 @@
|
||||
package it.cavallium.dbengine.client;
|
||||
|
||||
import it.cavallium.dbengine.database.LLLuceneIndex;
|
||||
import it.cavallium.dbengine.database.LLScoreMode;
|
||||
import it.cavallium.dbengine.database.LLSearchResult;
|
||||
import it.cavallium.dbengine.database.LLSnapshot;
|
||||
import it.cavallium.dbengine.database.LLSnapshottable;
|
||||
import it.cavallium.dbengine.database.LLSort;
|
||||
import it.cavallium.dbengine.database.LLTerm;
|
||||
import it.cavallium.dbengine.database.LuceneUtils;
|
||||
import it.cavallium.dbengine.database.collections.Joiner.ValueGetter;
|
||||
import it.cavallium.dbengine.lucene.serializer.Query;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.util.function.Tuple2;
|
||||
|
||||
public class LuceneIndex<T, U> implements LLSnapshottable {
|
||||
|
||||
private final LLLuceneIndex luceneIndex;
|
||||
private final Indicizer<T,U> indicizer;
|
||||
|
||||
public LuceneIndex(LLLuceneIndex luceneIndex, Indicizer<T, U> indicizer) {
|
||||
this.luceneIndex = luceneIndex;
|
||||
this.indicizer = indicizer;
|
||||
}
|
||||
|
||||
|
||||
private LLSnapshot resolveSnapshot(CompositeSnapshot snapshot) {
|
||||
if (snapshot == null) {
|
||||
return null;
|
||||
} else {
|
||||
return snapshot.getSnapshot(luceneIndex);
|
||||
}
|
||||
}
|
||||
|
||||
public Mono<Void> addDocument(T key, U value) {
|
||||
return indicizer
|
||||
.toDocument(key, value)
|
||||
.flatMap(doc -> luceneIndex.addDocument(indicizer.toIndex(key), doc));
|
||||
}
|
||||
|
||||
public Mono<Void> addDocuments(Flux<Entry<T, U>> entries) {
|
||||
return luceneIndex
|
||||
.addDocuments(entries
|
||||
.flatMap(entry -> indicizer
|
||||
.toDocument(entry.getKey(), entry.getValue())
|
||||
.map(doc -> Map.entry(indicizer.toIndex(entry.getKey()), doc)))
|
||||
.groupBy(Entry::getKey, Entry::getValue)
|
||||
);
|
||||
}
|
||||
|
||||
public Mono<Void> deleteDocument(T key) {
|
||||
LLTerm id = indicizer.toIndex(key);
|
||||
return luceneIndex.deleteDocument(id);
|
||||
}
|
||||
|
||||
public Mono<Void> updateDocument(T key, U value) {
|
||||
return indicizer
|
||||
.toDocument(key, value)
|
||||
.flatMap(doc -> luceneIndex.updateDocument(indicizer.toIndex(key), doc));
|
||||
}
|
||||
|
||||
public Mono<Void> updateDocuments(Flux<Entry<T, U>> entries) {
|
||||
return luceneIndex
|
||||
.updateDocuments(entries
|
||||
.flatMap(entry -> indicizer
|
||||
.toDocument(entry.getKey(), entry.getValue())
|
||||
.map(doc -> Map.entry(indicizer.toIndex(entry.getKey()), doc)))
|
||||
.groupBy(Entry::getKey, Entry::getValue)
|
||||
);
|
||||
}
|
||||
|
||||
public Mono<Void> deleteAll() {
|
||||
return luceneIndex.deleteAll();
|
||||
}
|
||||
|
||||
private SearchResultKeys<T> transformLuceneResult(LLSearchResult llSearchResult,
|
||||
@Nullable MultiSort<SearchResultKey<T>> sort,
|
||||
@Nullable Integer limit) {
|
||||
var mappedKeys = llSearchResult
|
||||
.results()
|
||||
.map(flux -> flux.map(item -> new SearchResultKey<>(indicizer.getKey(item.getKey()), item.getScore())));
|
||||
var sortedKeys = LuceneUtils.mergeStream(mappedKeys, sort, limit);
|
||||
return new SearchResultKeys<>(llSearchResult.totalHitsCount(), sortedKeys);
|
||||
}
|
||||
|
||||
private SearchResult<T, U> transformLuceneResultWithValues(LLSearchResult llSearchResult,
|
||||
@Nullable MultiSort<SearchResultItem<T, U>> sort,
|
||||
@Nullable Integer limit,
|
||||
ValueGetter<T, U> valueGetter) {
|
||||
var mappedKeys = llSearchResult
|
||||
.results()
|
||||
.map(flux -> flux.flatMap(item -> {
|
||||
var key = indicizer.getKey(item.getKey());
|
||||
return valueGetter.get(key).map(value -> new SearchResultItem<>(key, value, item.getScore()));
|
||||
}));
|
||||
var sortedKeys = LuceneUtils.mergeStream(mappedKeys, sort, limit);
|
||||
return new SearchResult<>(llSearchResult.totalHitsCount(), sortedKeys);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param limit the limit is valid for each lucene instance.
|
||||
* If you have 15 instances, the number of elements returned
|
||||
* can be at most <code>limit * 15</code>
|
||||
* @return the collection has one or more flux
|
||||
*/
|
||||
public Mono<SearchResultKeys<T>> moreLikeThis(@Nullable CompositeSnapshot snapshot,
|
||||
U mltDocumentValue,
|
||||
int limit) {
|
||||
Flux<Tuple2<String, Set<String>>> mltDocumentFields = indicizer.getMoreLikeThisDocumentFields(mltDocumentValue);
|
||||
return luceneIndex
|
||||
.moreLikeThis(resolveSnapshot(snapshot), mltDocumentFields, limit, indicizer.getKeyFieldName())
|
||||
.map(llSearchResult -> this.transformLuceneResult(llSearchResult, MultiSort.topScore(), limit));
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param limit the limit is valid for each lucene instance.
|
||||
* If you have 15 instances, the number of elements returned
|
||||
* can be at most <code>limit * 15</code>
|
||||
* @return the collection has one or more flux
|
||||
*/
|
||||
public Mono<SearchResult<T, U>> moreLikeThisWithValues(@Nullable CompositeSnapshot snapshot,
|
||||
U mltDocumentValue,
|
||||
int limit,
|
||||
ValueGetter<T, U> valueGetter) {
|
||||
Flux<Tuple2<String, Set<String>>> mltDocumentFields = indicizer.getMoreLikeThisDocumentFields(mltDocumentValue);
|
||||
return luceneIndex
|
||||
.moreLikeThis(resolveSnapshot(snapshot), mltDocumentFields, limit, indicizer.getKeyFieldName())
|
||||
.map(llSearchResult ->
|
||||
this.transformLuceneResultWithValues(llSearchResult, MultiSort.topScoreWithValues(), limit, valueGetter));
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param limit the limit is valid for each lucene instance.
|
||||
* If you have 15 instances, the number of elements returned
|
||||
* can be at most <code>limit * 15</code>
|
||||
* @return the collection has one or more flux
|
||||
*/
|
||||
public Mono<SearchResultKeys<T>> search(@Nullable CompositeSnapshot snapshot,
|
||||
Query query,
|
||||
int limit,
|
||||
@Nullable MultiSort<SearchResultKey<T>> sort,
|
||||
LLScoreMode scoreMode) {
|
||||
LLSort querySort = sort != null ? sort.getQuerySort() : null;
|
||||
return luceneIndex
|
||||
.search(resolveSnapshot(snapshot), query, limit, querySort, scoreMode, indicizer.getKeyFieldName())
|
||||
.map(llSearchResult -> this.transformLuceneResult(llSearchResult, sort, limit));
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param limit the limit is valid for each lucene instance.
|
||||
* If you have 15 instances, the number of elements returned
|
||||
* can be at most <code>limit * 15</code>
|
||||
* @return the collection has one or more flux
|
||||
*/
|
||||
public Mono<SearchResult<T, U>> searchWithValues(@Nullable CompositeSnapshot snapshot,
|
||||
Query query,
|
||||
int limit,
|
||||
@Nullable MultiSort<SearchResultItem<T, U>> sort,
|
||||
LLScoreMode scoreMode,
|
||||
ValueGetter<T, U> valueGetter) {
|
||||
LLSort querySort = sort != null ? sort.getQuerySort() : null;
|
||||
return luceneIndex
|
||||
.search(resolveSnapshot(snapshot), query, limit, querySort, scoreMode, indicizer.getKeyFieldName())
|
||||
.map(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult, sort, limit, valueGetter));
|
||||
}
|
||||
|
||||
public Mono<Long> count(@Nullable CompositeSnapshot snapshot, Query query) {
|
||||
return this.search(snapshot, query, 0, null, null)
|
||||
.flatMap(SearchResultKeys::totalHitsCount)
|
||||
.single();
|
||||
}
|
||||
|
||||
public boolean isLowMemoryMode() {
|
||||
return luceneIndex.isLowMemoryMode();
|
||||
}
|
||||
|
||||
public Mono<Void> close() {
|
||||
return luceneIndex.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush writes to disk
|
||||
*/
|
||||
public Mono<Void> flush() {
|
||||
return luceneIndex.flush();
|
||||
}
|
||||
|
||||
/**
|
||||
* Refresh index searcher
|
||||
*/
|
||||
public Mono<Void> refresh() {
|
||||
return luceneIndex.refresh();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<LLSnapshot> takeSnapshot() {
|
||||
return luceneIndex.takeSnapshot();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> releaseSnapshot(LLSnapshot snapshot) {
|
||||
return luceneIndex.releaseSnapshot(snapshot);
|
||||
}
|
||||
}
|
81
src/main/java/it/cavallium/dbengine/client/MultiSort.java
Normal file
81
src/main/java/it/cavallium/dbengine/client/MultiSort.java
Normal file
@ -0,0 +1,81 @@
|
||||
package it.cavallium.dbengine.client;
|
||||
|
||||
import it.cavallium.dbengine.database.LLSort;
|
||||
import java.util.Comparator;
|
||||
import java.util.function.ToIntFunction;
|
||||
import java.util.function.ToLongFunction;
|
||||
|
||||
public class MultiSort<T> {
|
||||
|
||||
private final LLSort querySort;
|
||||
private final Comparator<T> resultSort;
|
||||
|
||||
public MultiSort(LLSort querySort, Comparator<T> resultSort) {
|
||||
this.querySort = querySort;
|
||||
this.resultSort = resultSort;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sort a lucene field and the results by a numeric sort field and an int value
|
||||
* @param fieldName Lucene SortedNumericSortField field name
|
||||
* @param toIntFunction function to retrieve the integer value of each result
|
||||
* @param reverse descending sort
|
||||
* @param <T> result type
|
||||
* @return MultiSort object
|
||||
*/
|
||||
public static <T> MultiSort<T> sortedNumericInt(String fieldName, ToIntFunction<T> toIntFunction, boolean reverse) {
|
||||
// Create lucene sort
|
||||
LLSort querySort = LLSort.newSortedNumericSortField(fieldName, reverse);
|
||||
|
||||
// Create result sort
|
||||
Comparator<T> resultSort = Comparator.comparingInt(toIntFunction);
|
||||
if (reverse) {
|
||||
resultSort = resultSort.reversed();
|
||||
}
|
||||
|
||||
// Return the multi sort
|
||||
return new MultiSort<>(querySort, resultSort);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sort a lucene field and the results by a numeric sort field and an long value
|
||||
* @param fieldName Lucene SortedNumericSortField field name
|
||||
* @param toLongFunction function to retrieve the long value of each result
|
||||
* @param reverse descending sort
|
||||
* @param <T> result type
|
||||
* @return MultiSort object
|
||||
*/
|
||||
public static <T> MultiSort<T> sortedNumericLong(String fieldName, ToLongFunction<T> toLongFunction, boolean reverse) {
|
||||
// Create lucene sort
|
||||
LLSort querySort = LLSort.newSortedNumericSortField(fieldName, reverse);
|
||||
|
||||
// Create result sort
|
||||
Comparator<T> resultSort = Comparator.comparingLong(toLongFunction);
|
||||
if (!reverse) {
|
||||
resultSort = resultSort.reversed();
|
||||
}
|
||||
|
||||
// Return the multi sort
|
||||
return new MultiSort<>(querySort, resultSort);
|
||||
}
|
||||
|
||||
public static <T> MultiSort<T> randomSortField() {
|
||||
return new MultiSort<>(LLSort.newRandomSortField(), (a, b) -> 0);
|
||||
}
|
||||
|
||||
public static <T> MultiSort<SearchResultKey<T>> topScore() {
|
||||
return new MultiSort<>(null, Comparator.<SearchResultKey<T>>comparingDouble(SearchResultKey::getScore).reversed());
|
||||
}
|
||||
|
||||
public static <T, U> MultiSort<SearchResultItem<T, U>> topScoreWithValues() {
|
||||
return new MultiSort<>(null, Comparator.<SearchResultItem<T, U>>comparingDouble(SearchResultItem::getScore).reversed());
|
||||
}
|
||||
|
||||
public LLSort getQuerySort() {
|
||||
return querySort;
|
||||
}
|
||||
|
||||
public Comparator<T> getResultSort() {
|
||||
return resultSort;
|
||||
}
|
||||
}
|
54
src/main/java/it/cavallium/dbengine/client/SearchResult.java
Normal file
54
src/main/java/it/cavallium/dbengine/client/SearchResult.java
Normal file
@ -0,0 +1,54 @@
|
||||
package it.cavallium.dbengine.client;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.StringJoiner;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public class SearchResult<T, U> {
|
||||
|
||||
private final Mono<Long> totalHitsCount;
|
||||
private final Flux<SearchResultItem<T, U>> results;
|
||||
|
||||
public SearchResult(Mono<Long> totalHitsCount, Flux<SearchResultItem<T, U>> results) {
|
||||
this.totalHitsCount = totalHitsCount;
|
||||
this.results = results;
|
||||
}
|
||||
|
||||
public static <T, U> SearchResult<T, U> empty() {
|
||||
return new SearchResult<>(Mono.just(0L), Flux.empty());
|
||||
}
|
||||
|
||||
public Mono<Long> totalHitsCount() {
|
||||
return this.totalHitsCount;
|
||||
}
|
||||
|
||||
public Flux<SearchResultItem<T, U>> results() {
|
||||
return this.results;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
SearchResult<?, ?> that = (SearchResult<?, ?>) o;
|
||||
return Objects.equals(totalHitsCount, that.totalHitsCount) && Objects.equals(results, that.results);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(totalHitsCount, results);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return new StringJoiner(", ", SearchResult.class.getSimpleName() + "[", "]")
|
||||
.add("totalHitsCount=" + totalHitsCount)
|
||||
.add("results=" + results)
|
||||
.toString();
|
||||
}
|
||||
}
|
@ -0,0 +1,54 @@
|
||||
package it.cavallium.dbengine.client;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.StringJoiner;
|
||||
|
||||
public class SearchResultItem<T, U> {
|
||||
private final T key;
|
||||
private final U value;
|
||||
private final float score;
|
||||
|
||||
public SearchResultItem(T key, U value, float score) {
|
||||
this.key = key;
|
||||
this.value = value;
|
||||
this.score = score;
|
||||
}
|
||||
|
||||
public float getScore() {
|
||||
return score;
|
||||
}
|
||||
|
||||
public T getKey() {
|
||||
return key;
|
||||
}
|
||||
|
||||
public U getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
SearchResultItem<?, ?> that = (SearchResultItem<?, ?>) o;
|
||||
return Float.compare(that.score, score) == 0 && Objects.equals(key, that.key) && Objects.equals(value, that.value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(key, value, score);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return new StringJoiner(", ", SearchResultItem.class.getSimpleName() + "[", "]")
|
||||
.add("key=" + key)
|
||||
.add("value=" + value)
|
||||
.add("score=" + score)
|
||||
.toString();
|
||||
}
|
||||
}
|
@ -0,0 +1,47 @@
|
||||
package it.cavallium.dbengine.client;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.StringJoiner;
|
||||
|
||||
public class SearchResultKey<T> {
|
||||
private final T key;
|
||||
private final float score;
|
||||
|
||||
public SearchResultKey(T key, float score) {
|
||||
this.key = key;
|
||||
this.score = score;
|
||||
}
|
||||
|
||||
public float getScore() {
|
||||
return score;
|
||||
}
|
||||
|
||||
public T getKey() {
|
||||
return key;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
SearchResultKey<?> that = (SearchResultKey<?>) o;
|
||||
return Float.compare(that.score, score) == 0 && Objects.equals(key, that.key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(key, score);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return new StringJoiner(", ", SearchResultKey.class.getSimpleName() + "[", "]")
|
||||
.add("key=" + key)
|
||||
.add("score=" + score)
|
||||
.toString();
|
||||
}
|
||||
}
|
@ -0,0 +1,63 @@
|
||||
package it.cavallium.dbengine.client;
|
||||
|
||||
import it.cavallium.dbengine.database.collections.Joiner.ValueGetter;
|
||||
import java.util.Objects;
|
||||
import java.util.StringJoiner;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public class SearchResultKeys<T> {
|
||||
|
||||
private final Mono<Long> totalHitsCount;
|
||||
private final Flux<SearchResultKey<T>> results;
|
||||
|
||||
public SearchResultKeys(Mono<Long> totalHitsCount, Flux<SearchResultKey<T>> results) {
|
||||
this.totalHitsCount = totalHitsCount;
|
||||
this.results = results;
|
||||
}
|
||||
|
||||
public static <T, U> SearchResultKeys<T> empty() {
|
||||
return new SearchResultKeys<>(Mono.just(0L), Flux.empty());
|
||||
}
|
||||
|
||||
public Mono<Long> totalHitsCount() {
|
||||
return this.totalHitsCount;
|
||||
}
|
||||
|
||||
public Flux<SearchResultKey<T>> results() {
|
||||
return this.results;
|
||||
}
|
||||
|
||||
public <U> SearchResult<T, U> withValues(ValueGetter<T, U> valuesGetter) {
|
||||
return new SearchResult<>(totalHitsCount,
|
||||
results.flatMap(item -> valuesGetter
|
||||
.get(item.getKey())
|
||||
.map(value -> new SearchResultItem<>(item.getKey(), value, item.getScore())))
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
SearchResultKeys<?> that = (SearchResultKeys<?>) o;
|
||||
return Objects.equals(totalHitsCount, that.totalHitsCount) && Objects.equals(results, that.results);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(totalHitsCount, results);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return new StringJoiner(", ", SearchResultKeys.class.getSimpleName() + "[", "]")
|
||||
.add("totalHitsCount=" + totalHitsCount)
|
||||
.add("results=" + results)
|
||||
.toString();
|
||||
}
|
||||
}
|
@ -1,5 +1,6 @@
|
||||
package it.cavallium.dbengine.database;
|
||||
|
||||
import it.cavallium.dbengine.client.MultiSort;
|
||||
import it.cavallium.dbengine.database.analyzer.N4CharGramAnalyzer;
|
||||
import it.cavallium.dbengine.database.analyzer.N4CharGramEdgeAnalyzer;
|
||||
import it.cavallium.dbengine.database.analyzer.TextFieldsAnalyzer;
|
||||
@ -10,6 +11,8 @@ import org.apache.lucene.analysis.TokenStream;
|
||||
import org.apache.lucene.analysis.en.EnglishPossessiveFilter;
|
||||
import org.apache.lucene.analysis.en.KStemFilter;
|
||||
import org.apache.lucene.analysis.miscellaneous.ASCIIFoldingFilter;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
public class LuceneUtils {
|
||||
private static final Analyzer lucene4CharGramAnalyzerEdgeInstance = new N4CharGramEdgeAnalyzer();
|
||||
@ -57,4 +60,29 @@ public class LuceneUtils {
|
||||
tokenStream = new LowerCaseFilter(tokenStream);
|
||||
return tokenStream;
|
||||
}
|
||||
|
||||
/**
|
||||
* Merge streams together maintaining absolute order
|
||||
*/
|
||||
public static <T> Flux<T> mergeStream(Flux<Flux<T>> mappedMultiResults,
|
||||
@Nullable MultiSort<T> sort,
|
||||
@Nullable Integer limit) {
|
||||
if (limit != null && limit == 0) {
|
||||
return mappedMultiResults.flatMap(f -> f).ignoreElements().flux();
|
||||
}
|
||||
return mappedMultiResults.collectList().flatMapMany(mappedMultiResultsList -> {
|
||||
Flux<T> mergedFlux;
|
||||
if (sort == null) {
|
||||
mergedFlux = Flux.merge(mappedMultiResultsList);
|
||||
} else {
|
||||
//noinspection unchecked
|
||||
mergedFlux = Flux.mergeOrdered(32, sort.getResultSort(), mappedMultiResultsList.toArray(Flux[]::new));
|
||||
}
|
||||
if (limit == null) {
|
||||
return mergedFlux;
|
||||
} else {
|
||||
return mergedFlux.take(limit);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -346,7 +346,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
query,
|
||||
limit,
|
||||
null,
|
||||
ScoreMode.COMPLETE,
|
||||
ScoreMode.TOP_SCORES,
|
||||
keyFieldName,
|
||||
keyScore -> {
|
||||
EmitResult result = topKeysSink.tryEmitNext(keyScore);
|
||||
|
@ -1,89 +0,0 @@
|
||||
package it.cavallium.dbengine.database.indicizer;
|
||||
|
||||
import it.cavallium.dbengine.database.DatabaseMemoryMode;
|
||||
import it.cavallium.dbengine.database.LLScoreMode;
|
||||
import it.cavallium.dbengine.database.LLSearchResult;
|
||||
import it.cavallium.dbengine.database.LLSort;
|
||||
import it.cavallium.dbengine.database.collections.Joiner;
|
||||
import it.cavallium.dbengine.database.collections.Joiner.ValueGetter;
|
||||
import it.cavallium.dbengine.lucene.serializer.Query;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
@SuppressWarnings("SpellCheckingInspection")
|
||||
public class JoinedIndicizerWriter<KEY, DBTYPE, JOINEDTYPE> implements LuceneIndicizerWriter<KEY, DBTYPE> {
|
||||
|
||||
private final LuceneIndicizerWriter<KEY, JOINEDTYPE> indicizerWriter;
|
||||
private final Joiner<KEY, DBTYPE, JOINEDTYPE> joiner;
|
||||
private final ValueGetter<KEY, DBTYPE> dbValueGetter;
|
||||
|
||||
public JoinedIndicizerWriter(LuceneIndicizerWriter<KEY, JOINEDTYPE> indicizerWriter,
|
||||
Joiner<KEY, DBTYPE, JOINEDTYPE> joiner,
|
||||
ValueGetter<KEY, DBTYPE> dbValueGetter) {
|
||||
this.indicizerWriter = indicizerWriter;
|
||||
this.joiner = joiner;
|
||||
this.dbValueGetter = dbValueGetter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> add(KEY key, DBTYPE value) {
|
||||
return joiner
|
||||
.join(dbValueGetter, value)
|
||||
.flatMap(joinedValue -> this.indicizerWriter.add(key, joinedValue));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> remove(KEY key) {
|
||||
return this.indicizerWriter.remove(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> update(KEY key, DBTYPE value) {
|
||||
return joiner
|
||||
.join(dbValueGetter, value)
|
||||
.flatMap(joinedValue -> this.indicizerWriter.update(key, joinedValue));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> clearIndex() {
|
||||
return this.indicizerWriter.clearIndex();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<LLSearchResult> moreLikeThis(@Nullable CompositeSnapshot snapshot, DBTYPE mltDocumentValue, int limit) {
|
||||
return joiner
|
||||
.join(dbValueGetter, mltDocumentValue)
|
||||
.flatMap(val -> this.indicizerWriter.moreLikeThis(snapshot, val, limit));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<LLSearchResult> search(@Nullable CompositeSnapshot snapshot,
|
||||
Query query,
|
||||
int limit,
|
||||
@Nullable LLSort sort,
|
||||
LLScoreMode scoreMode) {
|
||||
return this.indicizerWriter
|
||||
.search(snapshot, query, limit, sort, scoreMode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Long> count(@Nullable CompositeSnapshot snapshot, Query query) {
|
||||
return this.indicizerWriter.count(snapshot, query);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> close() {
|
||||
return this.indicizerWriter.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<KEY> getKey(String key) {
|
||||
return this.indicizerWriter.getKey(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DatabaseMemoryMode getMemoryMode() {
|
||||
return this.indicizerWriter.getMemoryMode();
|
||||
}
|
||||
}
|
@ -1,51 +0,0 @@
|
||||
package it.cavallium.dbengine.database.indicizer;
|
||||
|
||||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import it.cavallium.dbengine.database.DatabaseMemoryMode;
|
||||
import it.cavallium.dbengine.database.LLScoreMode;
|
||||
import it.cavallium.dbengine.database.LLSearchResult;
|
||||
import it.cavallium.dbengine.database.LLSort;
|
||||
import it.cavallium.dbengine.lucene.serializer.Query;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.util.function.Tuple2;
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public interface LuceneIndicizerWriter<T, U> {
|
||||
|
||||
Mono<Void> add(T key, U value);
|
||||
|
||||
default Mono<Void> addMulti(@NotNull Flux<Tuple2<T, U>> values) {
|
||||
return values.flatMap(tuple -> this.add(tuple.getT1(), tuple.getT2())).then();
|
||||
}
|
||||
|
||||
Mono<Void> remove(T key);
|
||||
|
||||
Mono<Void> update(T key, U value);
|
||||
|
||||
default Mono<Void> updateMulti(@NotNull Flux<Tuple2<T, U>> values) {
|
||||
return values.flatMap(tuple -> this.update(tuple.getT1(), tuple.getT2())).then();
|
||||
}
|
||||
|
||||
Mono<Void> clearIndex();
|
||||
|
||||
Mono<LLSearchResult> moreLikeThis(@Nullable CompositeSnapshot snapshot,
|
||||
U mltDocumentValue,
|
||||
int limit);
|
||||
|
||||
Mono<LLSearchResult> search(@Nullable CompositeSnapshot snapshot,
|
||||
Query query,
|
||||
int limit,
|
||||
@Nullable LLSort sort,
|
||||
LLScoreMode scoreMode);
|
||||
|
||||
Mono<Long> count(@Nullable CompositeSnapshot snapshot, Query query);
|
||||
|
||||
Mono<Void> close();
|
||||
|
||||
Mono<T> getKey(String key);
|
||||
|
||||
DatabaseMemoryMode getMemoryMode();
|
||||
}
|
@ -1,94 +0,0 @@
|
||||
package it.cavallium.dbengine.database.indicizer;
|
||||
|
||||
import it.cavallium.dbengine.database.DatabaseMemoryMode;
|
||||
import it.cavallium.dbengine.database.LLLuceneIndex;
|
||||
import it.cavallium.dbengine.database.LLScoreMode;
|
||||
import it.cavallium.dbengine.database.LLSearchResult;
|
||||
import it.cavallium.dbengine.database.LLSnapshot;
|
||||
import it.cavallium.dbengine.database.LLSort;
|
||||
import it.cavallium.dbengine.database.LLTerm;
|
||||
import it.cavallium.dbengine.lucene.serializer.Query;
|
||||
import java.util.Set;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.util.function.Tuple2;
|
||||
|
||||
public class StandardLuceneIndicizerWriter<T, U> implements LuceneIndicizerWriter<T, U> {
|
||||
|
||||
protected final LLLuceneIndex luceneIndex;
|
||||
protected final Indicizer<T, U> indicizer;
|
||||
|
||||
public StandardLuceneIndicizerWriter(@NotNull LLLuceneIndex luceneIndex, @NotNull Indicizer<T, U> indicizer) {
|
||||
this.luceneIndex = luceneIndex;
|
||||
this.indicizer = indicizer;
|
||||
}
|
||||
|
||||
private LLSnapshot resolveSnapshot(CompositeSnapshot snapshot) {
|
||||
if (snapshot == null) {
|
||||
return null;
|
||||
} else {
|
||||
return snapshot.getSnapshot(luceneIndex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> add(@NotNull T key, @NotNull U value) {
|
||||
LLTerm docKey = indicizer.toIndex(key);
|
||||
return indicizer.toDocument(key, value).flatMap(doc -> luceneIndex.addDocument(docKey, doc));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> remove(@NotNull T key) {
|
||||
LLTerm term = indicizer.toIndex(key);
|
||||
return luceneIndex.deleteDocument(term);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> update(@NotNull T key, @NotNull U value) {
|
||||
LLTerm term = indicizer.toIndex(key);
|
||||
return indicizer.toDocument(key, value).flatMap(doc -> luceneIndex.updateDocument(term, doc));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> clearIndex() {
|
||||
return luceneIndex.deleteAll();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<LLSearchResult> moreLikeThis(@Nullable CompositeSnapshot snapshot, U mltDocumentValue, int limit) {
|
||||
Flux<Tuple2<String, Set<String>>> mltDocumentFields = indicizer.getMoreLikeThisDocumentFields(mltDocumentValue);
|
||||
return luceneIndex.moreLikeThis(resolveSnapshot(snapshot), mltDocumentFields, limit, indicizer.getKeyFieldName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<LLSearchResult> search(@Nullable CompositeSnapshot snapshot,
|
||||
Query query,
|
||||
int limit,
|
||||
@Nullable LLSort sort,
|
||||
LLScoreMode scoreMode) {
|
||||
return luceneIndex.search(resolveSnapshot(snapshot), query, limit, sort, scoreMode, indicizer.getKeyFieldName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Long> count(@Nullable CompositeSnapshot snapshot, Query query) {
|
||||
return luceneIndex.count(resolveSnapshot(snapshot), query);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> close() {
|
||||
return luceneIndex.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<T> getKey(String key) {
|
||||
return Mono.just(indicizer.getKey(key));
|
||||
}
|
||||
|
||||
@Override
|
||||
public DatabaseMemoryMode getMemoryMode() {
|
||||
return luceneIndex.isLowMemoryMode() ? DatabaseMemoryMode.LOW : DatabaseMemoryMode.NORMAL;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user