diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java index de8e4ad..fee7f02 100644 --- a/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java @@ -78,22 +78,17 @@ public class LuceneIndex implements LLSnapshottable { return luceneIndex.deleteAll(); } - private SearchResultKeys transformLuceneResult(LLSearchResult llSearchResult, + private Mono> transformLuceneResult(LLSearchResult llSearchResult, @Nullable MultiSort> sort, LLScoreMode scoreMode, @Nullable Long limit) { - Flux>>> mappedKeys = llSearchResult - .results() - .map(flux -> flux.map(signal -> { - if (signal.isValue()) { - return LuceneSignal.value( - new SearchResultKey(indicizer.getKey(signal.getValue().getKey()), - signal.getValue().getScore() - )); - } else { - return LuceneSignal.totalHitsCount(signal.getTotalHitsCount()); - } - })); + Flux> mappedKeys = llSearchResult + .getResults() + .map(flux -> new SearchResultKeys<>(flux.getResults().map(signal -> { + return new SearchResultKey(indicizer.getKey(signal.getKey()), + signal.getScore() + ); + }), flux.getTotalHitsCount())); MultiSort> finalSort; if (scoreMode != LLScoreMode.COMPLETE_NO_SCORES && sort == null) { finalSort = MultiSort.topScore(); @@ -101,38 +96,30 @@ public class LuceneIndex implements LLSnapshottable { finalSort = sort; } - MultiSort>> mappedSort; + MultiSort> mappedSort; if (finalSort != null) { mappedSort = new MultiSort<>(finalSort.getQuerySort(), (signal1, signal2) -> { - if (signal1.isValue() && signal2.isValue()) { - return finalSort.getResultSort().compare((signal1.getValue()), signal2.getValue()); - } else { - return 0; - } + return finalSort.getResultSort().compare((signal1), signal2); }); } else { mappedSort = null; } - return new SearchResultKeys<>(LuceneUtils.mergeSignalStream(mappedKeys, mappedSort, limit)); + return LuceneUtils.mergeSignalStreamKeys(mappedKeys, mappedSort, limit); } - private SearchResult transformLuceneResultWithValues(LLSearchResult llSearchResult, + private Mono> transformLuceneResultWithValues(LLSearchResult llSearchResult, @Nullable MultiSort> sort, LLScoreMode scoreMode, @Nullable Long limit, ValueGetter valueGetter) { - Flux>>> mappedKeys = llSearchResult - .results() - .map(flux -> flux.flatMapSequential(signal -> { - if (signal.isValue()) { - var key = indicizer.getKey(signal.getValue().getKey()); + Flux> mappedKeys = llSearchResult + .getResults() + .map(flux -> new SearchResult<>(flux.getResults().flatMapSequential(signal -> { + var key = indicizer.getKey(signal.getKey()); return valueGetter .get(key) - .map(value -> LuceneSignal.value(new SearchResultItem<>(key, value, signal.getValue().getScore()))); - } else { - return Mono.just(LuceneSignal.totalHitsCount((signal.getTotalHitsCount()))); - } - })); + .map(value -> new SearchResultItem<>(key, value, signal.getScore())); + }), flux.getTotalHitsCount())); MultiSort> finalSort; if (scoreMode != LLScoreMode.COMPLETE_NO_SCORES && sort == null) { finalSort = MultiSort.topScoreWithValues(); @@ -140,20 +127,15 @@ public class LuceneIndex implements LLSnapshottable { finalSort = sort; } - MultiSort>> mappedSort; + MultiSort> mappedSort; if (finalSort != null) { mappedSort = new MultiSort<>(finalSort.getQuerySort(), (signal1, signal2) -> { - if (signal1.isValue() && signal2.isValue()) { - return finalSort.getResultSort().compare((signal1.getValue()), signal2.getValue()); - } else { - return 0; - } + return finalSort.getResultSort().compare((signal1), signal2); }); } else { mappedSort = null; } - var sortedKeys = LuceneUtils.mergeSignalStream(mappedKeys, mappedSort, limit); - return new SearchResult<>(sortedKeys); + return LuceneUtils.mergeSignalStreamItems(mappedKeys, mappedSort, limit); } /** @@ -171,7 +153,7 @@ public class LuceneIndex implements LLSnapshottable { = indicizer.getMoreLikeThisDocumentFields(key, mltDocumentValue); return luceneIndex .moreLikeThis(resolveSnapshot(queryParams.getSnapshot()), queryParams.toQueryParams(), indicizer.getKeyFieldName(), mltDocumentFields) - .map(llSearchResult -> this.transformLuceneResult(llSearchResult, + .flatMap(llSearchResult -> this.transformLuceneResult(llSearchResult, queryParams.getSort(), queryParams.getScoreMode(), queryParams.getLimit() @@ -199,7 +181,7 @@ public class LuceneIndex implements LLSnapshottable { indicizer.getKeyFieldName(), mltDocumentFields ) - .map(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult, + .flatMap(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult, queryParams.getSort(), queryParams.getScoreMode(), queryParams.getLimit(), @@ -218,7 +200,7 @@ public class LuceneIndex implements LLSnapshottable { ClientQueryParams> queryParams) { return luceneIndex .search(resolveSnapshot(queryParams.getSnapshot()), queryParams.toQueryParams(), indicizer.getKeyFieldName()) - .map(llSearchResult -> this.transformLuceneResult(llSearchResult, + .flatMap(llSearchResult -> this.transformLuceneResult(llSearchResult, queryParams.getSort(), queryParams.getScoreMode(), queryParams.getLimit() @@ -237,7 +219,7 @@ public class LuceneIndex implements LLSnapshottable { ValueGetter valueGetter) { return luceneIndex .search(resolveSnapshot(queryParams.getSnapshot()), queryParams.toQueryParams(), indicizer.getKeyFieldName()) - .map(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult, + .flatMap(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult, queryParams.getSort(), queryParams.getScoreMode(), queryParams.getLimit(), @@ -246,10 +228,8 @@ public class LuceneIndex implements LLSnapshottable { } public Mono count(@Nullable CompositeSnapshot snapshot, Query query) { - return Mono.from(this.search(ClientQueryParams.>builder().snapshot(snapshot).query(query).limit(0).build()) - .flatMapMany(SearchResultKeys::results) - .filter(LuceneSignal::isTotalHitsCount) - .map(LuceneSignal::getTotalHitsCount)); + return this.search(ClientQueryParams.>builder().snapshot(snapshot).query(query).limit(0).build()) + .map(SearchResultKeys::getTotalHitsCount); } public boolean isLowMemoryMode() { diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneSignal.java b/src/main/java/it/cavallium/dbengine/client/LuceneSignal.java deleted file mode 100644 index 71bb793..0000000 --- a/src/main/java/it/cavallium/dbengine/client/LuceneSignal.java +++ /dev/null @@ -1,22 +0,0 @@ -package it.cavallium.dbengine.client; - -public interface LuceneSignal { - - boolean isValue(); - - boolean isTotalHitsCount(); - - T getValue(); - - long getTotalHitsCount(); - - static LuceneSignalValue value(T value) { - return new LuceneSignalValue<>(value); - } - - static LuceneSignalTotalHitsCount totalHitsCount(long totalHitsCount) { - return new LuceneSignalTotalHitsCount<>(totalHitsCount); - } - - LuceneSignalTotalHitsCount mapTotalHitsCount(); -} diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneSignalTotalHitsCount.java b/src/main/java/it/cavallium/dbengine/client/LuceneSignalTotalHitsCount.java deleted file mode 100644 index 1be9008..0000000 --- a/src/main/java/it/cavallium/dbengine/client/LuceneSignalTotalHitsCount.java +++ /dev/null @@ -1,36 +0,0 @@ -package it.cavallium.dbengine.client; - -public class LuceneSignalTotalHitsCount implements LuceneSignal { - - private final long totalHitsCount; - - public LuceneSignalTotalHitsCount(long totalHitsCount) { - this.totalHitsCount = totalHitsCount; - } - - @Override - public boolean isValue() { - return false; - } - - @Override - public boolean isTotalHitsCount() { - return true; - } - - @Override - public T getValue() { - throw new UnsupportedOperationException("This object is not value"); - } - - @Override - public long getTotalHitsCount() { - return totalHitsCount; - } - - @Override - public LuceneSignalTotalHitsCount mapTotalHitsCount() { - //noinspection unchecked - return (LuceneSignalTotalHitsCount) this; - } -} diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneSignalValue.java b/src/main/java/it/cavallium/dbengine/client/LuceneSignalValue.java deleted file mode 100644 index 0b29bf9..0000000 --- a/src/main/java/it/cavallium/dbengine/client/LuceneSignalValue.java +++ /dev/null @@ -1,34 +0,0 @@ -package it.cavallium.dbengine.client; - -public class LuceneSignalValue implements LuceneSignal { - - private final T value; - - public LuceneSignalValue(T value) { - this.value = value; - } - - public boolean isValue() { - return true; - } - - @Override - public boolean isTotalHitsCount() { - return false; - } - - @Override - public T getValue() { - return value; - } - - @Override - public long getTotalHitsCount() { - throw new UnsupportedOperationException("This object is not TotalHitsCount"); - } - - @Override - public LuceneSignalTotalHitsCount mapTotalHitsCount() { - throw new UnsupportedOperationException("This object is not TotalHitsCount"); - } -} diff --git a/src/main/java/it/cavallium/dbengine/client/MultiSort.java b/src/main/java/it/cavallium/dbengine/client/MultiSort.java index b2f5053..ec88865 100644 --- a/src/main/java/it/cavallium/dbengine/client/MultiSort.java +++ b/src/main/java/it/cavallium/dbengine/client/MultiSort.java @@ -5,7 +5,6 @@ import it.cavallium.dbengine.client.query.current.data.RandomSort; import it.cavallium.dbengine.client.query.current.data.ScoreSort; import it.cavallium.dbengine.client.query.current.data.Sort; import it.cavallium.dbengine.database.LLKeyScore; -import it.cavallium.dbengine.database.LLSignal; import java.util.Comparator; import java.util.function.ToIntFunction; import java.util.function.ToLongFunction; @@ -68,15 +67,9 @@ public class MultiSort { return new MultiSort<>(RandomSort.of(), (a, b) -> 0); } - public static MultiSort topScoreRaw() { + public static MultiSort topScoreRaw() { Comparator comp = Comparator.comparingDouble(LLKeyScore::getScore).reversed(); - return new MultiSort<>(ScoreSort.of(), (signal1, signal2) -> { - if (signal1.isValue() && signal2.isValue()) { - return comp.compare(signal1.getValue(), signal2.getValue()); - } else { - return 0; - } - }); + return new MultiSort<>(ScoreSort.of(), comp); } public static MultiSort> topScore() { diff --git a/src/main/java/it/cavallium/dbengine/client/SearchResult.java b/src/main/java/it/cavallium/dbengine/client/SearchResult.java index d43612f..edd74b7 100644 --- a/src/main/java/it/cavallium/dbengine/client/SearchResult.java +++ b/src/main/java/it/cavallium/dbengine/client/SearchResult.java @@ -1,42 +1,15 @@ package it.cavallium.dbengine.client; -import lombok.EqualsAndHashCode; -import lombok.ToString; +import lombok.Value; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.util.function.Tuple2; -import reactor.util.function.Tuples; -@EqualsAndHashCode -@ToString +@Value public class SearchResult { - private final Flux>> results; - - public SearchResult(Flux>> results) { - this.results = results; - } + Flux> results; + long totalHitsCount; public static SearchResult empty() { - return new SearchResult<>(Flux.just(LuceneSignal.totalHitsCount(0L))); - } - - public Flux>> results() { - return this.results; - } - - public Flux> onlyValues() { - return this.results.filter(LuceneSignal::isValue).map(LuceneSignal::getValue); - } - - /** - * You must subscribe to both publishers - */ - public Tuple2>, Mono> splitShared() { - Flux>> shared = results.publish().refCount(2); - return Tuples.of( - shared.filter(LuceneSignal::isValue).map(LuceneSignal::getValue).share(), - Mono.from(shared.filter(LuceneSignal::isTotalHitsCount).map(LuceneSignal::getTotalHitsCount)).cache() - ); + return new SearchResult<>(Flux.empty(), 0L); } } diff --git a/src/main/java/it/cavallium/dbengine/client/SearchResultKeys.java b/src/main/java/it/cavallium/dbengine/client/SearchResultKeys.java index 254c313..d42390b 100644 --- a/src/main/java/it/cavallium/dbengine/client/SearchResultKeys.java +++ b/src/main/java/it/cavallium/dbengine/client/SearchResultKeys.java @@ -1,57 +1,25 @@ package it.cavallium.dbengine.client; import it.cavallium.dbengine.database.collections.Joiner.ValueGetter; -import lombok.EqualsAndHashCode; -import lombok.ToString; +import lombok.Value; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.util.function.Tuple2; -import reactor.util.function.Tuples; -@EqualsAndHashCode -@ToString +@Value public class SearchResultKeys { - private final Flux>> results; - - public SearchResultKeys(Flux>> results) { - this.results = results; - } + Flux> results; + long totalHitsCount; public static SearchResultKeys empty() { - return new SearchResultKeys<>(Flux.just(LuceneSignal.totalHitsCount(0L))); - } - - public Flux>> results() { - return this.results; + return new SearchResultKeys<>(Flux.empty(), 0L); } public SearchResult withValues(ValueGetter valuesGetter) { return new SearchResult<>( - results.flatMapSequential(item -> { - if (item.isValue()) { - return valuesGetter - .get(item.getValue().getKey()) - .map(value -> LuceneSignal.value(new SearchResultItem<>(item.getValue().getKey(), value, item.getValue().getScore()))); - } else { - return Mono.just(item.mapTotalHitsCount()); - } - }) - ); - } - - public Flux> onlyKeys() { - return this.results.filter(LuceneSignal::isValue).map(LuceneSignal::getValue); - } - - /** - * You must subscribe to both publishers - */ - public Tuple2>, Mono> splitShared() { - Flux>> shared = results.publish().refCount(2); - return Tuples.of( - shared.filter(LuceneSignal::isValue).map(LuceneSignal::getValue).share(), - Mono.from(shared.filter(LuceneSignal::isTotalHitsCount).map(LuceneSignal::getTotalHitsCount)).cache() + results.flatMapSequential(item -> valuesGetter + .get(item.getKey()) + .map(value -> new SearchResultItem<>(item.getKey(), value, item.getScore()))), + totalHitsCount ); } } diff --git a/src/main/java/it/cavallium/dbengine/database/LLKeyScore.java b/src/main/java/it/cavallium/dbengine/database/LLKeyScore.java index b6403e3..41bdc57 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLKeyScore.java +++ b/src/main/java/it/cavallium/dbengine/database/LLKeyScore.java @@ -2,7 +2,7 @@ package it.cavallium.dbengine.database; import java.util.Objects; -public class LLKeyScore implements LLSignal { +public class LLKeyScore { private final String key; private final float score; @@ -45,24 +45,4 @@ public class LLKeyScore implements LLSignal { ", score=" + score + '}'; } - - @Override - public boolean isValue() { - return true; - } - - @Override - public boolean isTotalHitsCount() { - return false; - } - - @Override - public LLKeyScore getValue() { - return this; - } - - @Override - public long getTotalHitsCount() { - throw new UnsupportedOperationException(); - } } diff --git a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java index b5b297b..fe09ad1 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java @@ -5,6 +5,7 @@ import it.cavallium.dbengine.client.query.current.data.NoSort; import it.cavallium.dbengine.client.query.current.data.Query; import it.cavallium.dbengine.client.query.current.data.QueryParams; import it.cavallium.dbengine.client.query.current.data.ScoreMode; +import it.cavallium.dbengine.lucene.LuceneUtils; import java.util.Set; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; @@ -50,10 +51,9 @@ public interface LLLuceneIndex extends LLSnapshottable { default Mono count(@Nullable LLSnapshot snapshot, Query query) { QueryParams params = QueryParams.of(query, 0, Nullablefloat.empty(), NoSort.of(), ScoreMode.of(false, false)); return Mono.from(this.search(snapshot, params, null) - .flatMapMany(LLSearchResult::results) - .flatMap(s -> s) - .filter(LLSignal::isTotalHitsCount) - .map(LLSignal::getTotalHitsCount)); + .flatMap(results -> LuceneUtils.mergeSignalStreamRaw(results.getResults(), null, null)) + .map(LLSearchResultShard::getTotalHitsCount) + .defaultIfEmpty(0L)); } boolean isLowMemoryMode(); diff --git a/src/main/java/it/cavallium/dbengine/database/LLSearchResult.java b/src/main/java/it/cavallium/dbengine/database/LLSearchResult.java index 7be4f67..dae2717 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLSearchResult.java +++ b/src/main/java/it/cavallium/dbengine/database/LLSearchResult.java @@ -1,58 +1,17 @@ package it.cavallium.dbengine.database; -import java.util.Objects; import java.util.function.BiFunction; +import lombok.Value; import org.jetbrains.annotations.NotNull; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; +@Value public class LLSearchResult { - private final Flux> results; - - public LLSearchResult(Flux> results) { - this.results = results; - } - - public static LLSearchResult empty() { - return new LLSearchResult(Flux.just(Flux.just(new LLTotalHitsCount(0L)))); - } + Flux results; @NotNull public static BiFunction accumulator() { return (a, b) -> new LLSearchResult(Flux.merge(a.results, b.results)); } - - public Flux> results() { - return this.results; - } - - public Mono completion() { - return results.flatMap(r -> r).then(); - } - - public boolean equals(final Object o) { - if (o == this) { - return true; - } - if (!(o instanceof LLSearchResult)) { - return false; - } - final LLSearchResult other = (LLSearchResult) o; - final Object this$results = this.results(); - final Object other$results = other.results(); - return Objects.equals(this$results, other$results); - } - - public int hashCode() { - final int PRIME = 59; - int result = 1; - final Object $results = this.results(); - result = result * PRIME + ($results == null ? 43 : $results.hashCode()); - return result; - } - - public String toString() { - return "LLSearchResult(results=" + this.results() + ")"; - } } diff --git a/src/main/java/it/cavallium/dbengine/database/LLSearchResultShard.java b/src/main/java/it/cavallium/dbengine/database/LLSearchResultShard.java new file mode 100644 index 0000000..f2943e3 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/LLSearchResultShard.java @@ -0,0 +1,11 @@ +package it.cavallium.dbengine.database; + +import lombok.Value; +import reactor.core.publisher.Flux; + +@Value +public class LLSearchResultShard { + + Flux results; + long totalHitsCount; +} diff --git a/src/main/java/it/cavallium/dbengine/database/LLSignal.java b/src/main/java/it/cavallium/dbengine/database/LLSignal.java deleted file mode 100644 index d7eb135..0000000 --- a/src/main/java/it/cavallium/dbengine/database/LLSignal.java +++ /dev/null @@ -1,20 +0,0 @@ -package it.cavallium.dbengine.database; - -public interface LLSignal { - - boolean isValue(); - - boolean isTotalHitsCount(); - - LLKeyScore getValue(); - - long getTotalHitsCount(); - - static LLSignal value(LLKeyScore value) { - return value; - } - - static LLTotalHitsCount totalHitsCount(long totalHitsCount) { - return new LLTotalHitsCount(totalHitsCount); - } -} diff --git a/src/main/java/it/cavallium/dbengine/database/LLTotalHitsCount.java b/src/main/java/it/cavallium/dbengine/database/LLTotalHitsCount.java deleted file mode 100644 index 411667a..0000000 --- a/src/main/java/it/cavallium/dbengine/database/LLTotalHitsCount.java +++ /dev/null @@ -1,54 +0,0 @@ -package it.cavallium.dbengine.database; - -import java.util.Objects; -import java.util.StringJoiner; - -public class LLTotalHitsCount implements LLSignal { - - private final long value; - - public LLTotalHitsCount(long value) { - this.value = value; - } - - @Override - public boolean isValue() { - return false; - } - - @Override - public boolean isTotalHitsCount() { - return true; - } - - public LLKeyScore getValue() { - throw new UnsupportedOperationException(); - } - - @Override - public long getTotalHitsCount() { - return value; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - LLTotalHitsCount that = (LLTotalHitsCount) o; - return value == that.value; - } - - @Override - public int hashCode() { - return Objects.hash(value); - } - - @Override - public String toString() { - return new StringJoiner(", ", LLTotalHitsCount.class.getSimpleName() + "[", "]").add("count=" + value).toString(); - } -} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index 0e6d710..b2f0e80 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -9,10 +9,9 @@ import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.database.LLSearchCollectionStatisticsGetter; import it.cavallium.dbengine.database.LLSearchResult; -import it.cavallium.dbengine.database.LLSignal; +import it.cavallium.dbengine.database.LLSearchResultShard; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLTerm; -import it.cavallium.dbengine.database.LLTotalHitsCount; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.ScheduledTaskLifecycle; @@ -392,15 +391,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { String keyFieldName, Flux>> mltDocumentFieldsFlux, long actionId) { - return moreLikeThis(snapshot, - queryParams, - keyFieldName, - mltDocumentFieldsFlux, - true, - actionId, - 1 - ) - .flatMap(LLSearchResult::completion); + return moreLikeThis(snapshot, queryParams, keyFieldName, mltDocumentFieldsFlux, true, actionId, 1).then(); } @SuppressWarnings({"unchecked", "rawtypes"}) @@ -422,7 +413,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { .flatMap(mltDocumentFields -> { mltDocumentFields.entrySet().removeIf(entry -> entry.getValue().isEmpty()); if (mltDocumentFields.isEmpty()) { - return Mono.just(LLSearchResult.empty()); + return Mono.just(new LLSearchResult(Flux.empty())); } return acquireSearcherWrapper(snapshot, doDistributedPre, actionId).flatMap(indexSearcher -> Mono @@ -495,7 +486,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { public Mono distributedPreSearch(@Nullable LLSnapshot snapshot, QueryParams queryParams, String keyFieldName, long actionId) { return this .search(snapshot, queryParams, keyFieldName, true, actionId, 1) - .flatMap(LLSearchResult::completion); + .then(); } private Mono search(@Nullable LLSnapshot snapshot, @@ -550,27 +541,58 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { Query luceneQuery, Sort luceneSort, ScoreMode luceneScoreMode) { - return new LLSearchResult(Flux.just(Flux.defer(() -> Flux.create(sink -> { - AtomicBoolean cancelled = new AtomicBoolean(); - Semaphore requests = new Semaphore(0); - sink.onDispose(() -> { - cancelled.set(true); - }); - sink.onCancel(() -> { - cancelled.set(true); - }); - sink.onRequest(delta -> { - requests.release((int) Math.min(delta, Integer.MAX_VALUE)); - }); + return new LLSearchResult(Mono.create(monoSink -> { + long totalHitsCount; try { - if (!cancelled.get()) { - if (doDistributedPre) { - //noinspection BlockingMethodInNonBlockingContext - allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery); - sink.next(new LLTotalHitsCount(0L)); - } else { + if (!doDistributedPre) { + AtomicLong totalHitsCountAtomic = new AtomicLong(0); + //noinspection BlockingMethodInNonBlockingContext + streamSearcher.search(indexSearcher, + luceneQuery, + 0, + luceneSort, + luceneScoreMode, + minCompetitiveScore, + keyFieldName, + keyScore -> HandleResult.HALT, + totalHitsCountAtomic::set + ); + totalHitsCount = totalHitsCountAtomic.get(); + } else { + //noinspection BlockingMethodInNonBlockingContext + allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery); + totalHitsCount = 0; + } + } catch (Exception ex) { + monoSink.error(ex); + return; + } + + AtomicBoolean alreadySubscribed = new AtomicBoolean(false); + var resultsFlux = Flux.create(sink -> { + + if (!alreadySubscribed.compareAndSet(false, true)) { + sink.error(new IllegalStateException("Already subscribed to results")); + return; + } + + AtomicBoolean cancelled = new AtomicBoolean(); + Semaphore requests = new Semaphore(0); + sink.onDispose(() -> { + cancelled.set(true); + }); + sink.onCancel(() -> { + cancelled.set(true); + }); + sink.onRequest(delta -> { + requests.release((int) Math.min(delta, Integer.MAX_VALUE)); + }); + + try { + if (!doDistributedPre) { int boundedLimit = Math.max(0, limit > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) limit); + AtomicLong atomicTotalHitsCount = new AtomicLong(0); //noinspection BlockingMethodInNonBlockingContext streamSearcher.search(indexSearcher, luceneQuery, @@ -602,31 +624,18 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { return HandleResult.HALT; } }, - totalHitsCount -> { - try { - if (cancelled.get()) { - return; - } - while (!requests.tryAcquire(500, TimeUnit.MILLISECONDS)) { - if (cancelled.get()) { - return; - } - } - sink.next(new LLTotalHitsCount(totalHitsCount)); - } catch (Exception ex) { - sink.error(ex); - cancelled.set(true); - requests.release(Integer.MAX_VALUE); - } - } + atomicTotalHitsCount::set ); } sink.complete(); + } catch (Exception ex) { + sink.error(ex); } - } catch (Exception ex) { - sink.error(ex); - } - }, OverflowStrategy.ERROR).subscribeOn(blockingLuceneSearchScheduler).publishOn(luceneQueryScheduler)))); + + }, OverflowStrategy.ERROR).subscribeOn(blockingLuceneSearchScheduler).publishOn(luceneQueryScheduler); + + monoSink.success(new LLSearchResultShard(resultsFlux, totalHitsCount)); + }).subscribeOn(blockingLuceneSearchScheduler).publishOn(luceneQueryScheduler).flux()); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index 562fccc..2f85bd8 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -7,6 +7,7 @@ import it.cavallium.dbengine.client.query.current.data.QueryParams; import it.cavallium.dbengine.database.LLDocument; import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.database.LLSearchResult; +import it.cavallium.dbengine.database.LLSearchResultShard; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLTerm; import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer; @@ -262,9 +263,12 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { .reduce(LLSearchResult.accumulator()) .map(result -> { if (actionId != -1) { - var resultsWithTermination = result - .results() - .map(flux -> flux.doOnTerminate(() -> completedAction(actionId))); + Flux resultsWithTermination = result + .getResults() + .map(flux -> new LLSearchResultShard(flux + .getResults() + .doOnTerminate(() -> completedAction(actionId)), flux.getTotalHitsCount()) + ); return new LLSearchResult(resultsWithTermination); } else { return result; @@ -323,9 +327,12 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { .reduce(LLSearchResult.accumulator()) .map(result -> { if (actionId != -1) { - var resultsWithTermination = result - .results() - .map(flux -> flux.doOnTerminate(() -> completedAction(actionId))); + Flux resultsWithTermination = result + .getResults() + .map(flux -> new LLSearchResultShard(flux + .getResults() + .doOnTerminate(() -> completedAction(actionId)), flux.getTotalHitsCount()) + ); return new LLSearchResult(resultsWithTermination); } else { return result; diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java index ce38d5b..e21ae23 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java @@ -1,11 +1,13 @@ package it.cavallium.dbengine.lucene; import it.cavallium.dbengine.client.CompositeSnapshot; -import it.cavallium.dbengine.client.LuceneSignal; import it.cavallium.dbengine.client.MultiSort; +import it.cavallium.dbengine.client.SearchResult; +import it.cavallium.dbengine.client.SearchResultItem; +import it.cavallium.dbengine.client.SearchResultKey; +import it.cavallium.dbengine.client.SearchResultKeys; import it.cavallium.dbengine.database.LLKeyScore; -import it.cavallium.dbengine.database.LLSignal; -import it.cavallium.dbengine.database.LLTotalHitsCount; +import it.cavallium.dbengine.database.LLSearchResultShard; import it.cavallium.dbengine.database.collections.DatabaseMapDictionary; import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep; import it.cavallium.dbengine.database.collections.Joiner.ValueGetter; @@ -232,50 +234,36 @@ public class LuceneUtils { return HandleResult.CONTINUE; } - public static Flux> mergeSignalStream(Flux>> mappedKeys, - MultiSort> mappedSort, + public static Mono> mergeSignalStreamKeys(Flux> mappedKeys, + MultiSort> sort, Long limit) { - Flux>> sharedMappedSignals = mappedKeys - .map(sub -> sub - .publish() - .refCount(2) - ) - .publish() - .refCount(2); - Flux> sortedValues = LuceneUtils - .mergeStream(sharedMappedSignals.map(sub -> sub.filter(LuceneSignal::isValue)), mappedSort, limit); - //noinspection Convert2MethodRef - Mono> sortedTotalSize = sharedMappedSignals - .flatMap(sub -> sub) - .filter(LuceneSignal::isTotalHitsCount) - .map(LuceneSignal::getTotalHitsCount) - .reduce(Long::sum) - .map(sum -> LuceneSignal.totalHitsCount(sum)); - return Flux.merge(sortedValues, sortedTotalSize); + return mappedKeys.reduce( + new SearchResultKeys<>(Flux.empty(), 0L), + (a, b) -> new SearchResultKeys(LuceneUtils + .mergeStream(Flux.just(a.getResults(), b.getResults()), sort, limit), a.getTotalHitsCount() + b.getTotalHitsCount()) + ); } - public static Flux mergeSignalStreamRaw(Flux> mappedKeys, - MultiSort mappedSort, + public static Mono> mergeSignalStreamItems(Flux> mappedKeys, + MultiSort> sort, Long limit) { - Flux> sharedMappedSignals = mappedKeys - .map(sub -> sub - .publish() - .refCount(2) + return mappedKeys.reduce( + new SearchResult<>(Flux.empty(), 0L), + (a, b) -> new SearchResult(LuceneUtils + .mergeStream(Flux.just(a.getResults(), b.getResults()), sort, limit), a.getTotalHitsCount() + b.getTotalHitsCount()) + ); + } + + public static Mono mergeSignalStreamRaw(Flux mappedKeys, + MultiSort mappedSort, + Long limit) { + return mappedKeys.reduce( + new LLSearchResultShard(Flux.empty(), 0), + (s1, s2) -> new LLSearchResultShard( + LuceneUtils.mergeStream(Flux.just(s1.getResults(), s2.getResults()), mappedSort, limit), + s1.getTotalHitsCount() + s2.getTotalHitsCount() ) - .publish() - .refCount(2); - - Flux sortedValues = LuceneUtils - .mergeStream(sharedMappedSignals.map(sub -> sub.filter(LLSignal::isValue)), mappedSort, limit); - //noinspection Convert2MethodRef - Mono sortedTotalSize = sharedMappedSignals - .flatMap(sub -> sub) - .filter(LLSignal::isTotalHitsCount) - .map(LLSignal::getTotalHitsCount) - .reduce(Long::sum) - .map(sum -> new LLTotalHitsCount(sum)); - - return Flux.merge(sortedValues, sortedTotalSize); + ); } public static ValueGetter, V> getAsyncDbValueGetterDeep(