Safer approach to total hits count in lucene results

This commit is contained in:
Andrea Cavalli 2021-03-27 03:35:27 +01:00
parent 7379a8d8ae
commit 3e6573d955
16 changed files with 166 additions and 464 deletions

View File

@ -78,22 +78,17 @@ public class LuceneIndex<T, U> implements LLSnapshottable {
return luceneIndex.deleteAll();
}
private SearchResultKeys<T> transformLuceneResult(LLSearchResult llSearchResult,
private Mono<SearchResultKeys<T>> transformLuceneResult(LLSearchResult llSearchResult,
@Nullable MultiSort<SearchResultKey<T>> sort,
LLScoreMode scoreMode,
@Nullable Long limit) {
Flux<Flux<LuceneSignal<SearchResultKey<T>>>> mappedKeys = llSearchResult
.results()
.map(flux -> flux.map(signal -> {
if (signal.isValue()) {
return LuceneSignal.value(
new SearchResultKey<T>(indicizer.getKey(signal.getValue().getKey()),
signal.getValue().getScore()
));
} else {
return LuceneSignal.totalHitsCount(signal.getTotalHitsCount());
}
}));
Flux<SearchResultKeys<T>> mappedKeys = llSearchResult
.getResults()
.map(flux -> new SearchResultKeys<>(flux.getResults().map(signal -> {
return new SearchResultKey<T>(indicizer.getKey(signal.getKey()),
signal.getScore()
);
}), flux.getTotalHitsCount()));
MultiSort<SearchResultKey<T>> finalSort;
if (scoreMode != LLScoreMode.COMPLETE_NO_SCORES && sort == null) {
finalSort = MultiSort.topScore();
@ -101,38 +96,30 @@ public class LuceneIndex<T, U> implements LLSnapshottable {
finalSort = sort;
}
MultiSort<LuceneSignal<SearchResultKey<T>>> mappedSort;
MultiSort<SearchResultKey<T>> mappedSort;
if (finalSort != null) {
mappedSort = new MultiSort<>(finalSort.getQuerySort(), (signal1, signal2) -> {
if (signal1.isValue() && signal2.isValue()) {
return finalSort.getResultSort().compare((signal1.getValue()), signal2.getValue());
} else {
return 0;
}
return finalSort.getResultSort().compare((signal1), signal2);
});
} else {
mappedSort = null;
}
return new SearchResultKeys<>(LuceneUtils.mergeSignalStream(mappedKeys, mappedSort, limit));
return LuceneUtils.mergeSignalStreamKeys(mappedKeys, mappedSort, limit);
}
private SearchResult<T, U> transformLuceneResultWithValues(LLSearchResult llSearchResult,
private Mono<SearchResult<T, U>> transformLuceneResultWithValues(LLSearchResult llSearchResult,
@Nullable MultiSort<SearchResultItem<T, U>> sort,
LLScoreMode scoreMode,
@Nullable Long limit,
ValueGetter<T, U> valueGetter) {
Flux<Flux<LuceneSignal<SearchResultItem<T, U>>>> mappedKeys = llSearchResult
.results()
.map(flux -> flux.flatMapSequential(signal -> {
if (signal.isValue()) {
var key = indicizer.getKey(signal.getValue().getKey());
Flux<SearchResult<T, U>> mappedKeys = llSearchResult
.getResults()
.map(flux -> new SearchResult<>(flux.getResults().flatMapSequential(signal -> {
var key = indicizer.getKey(signal.getKey());
return valueGetter
.get(key)
.map(value -> LuceneSignal.value(new SearchResultItem<>(key, value, signal.getValue().getScore())));
} else {
return Mono.just(LuceneSignal.totalHitsCount((signal.getTotalHitsCount())));
}
}));
.map(value -> new SearchResultItem<>(key, value, signal.getScore()));
}), flux.getTotalHitsCount()));
MultiSort<SearchResultItem<T, U>> finalSort;
if (scoreMode != LLScoreMode.COMPLETE_NO_SCORES && sort == null) {
finalSort = MultiSort.topScoreWithValues();
@ -140,20 +127,15 @@ public class LuceneIndex<T, U> implements LLSnapshottable {
finalSort = sort;
}
MultiSort<LuceneSignal<SearchResultItem<T, U>>> mappedSort;
MultiSort<SearchResultItem<T, U>> mappedSort;
if (finalSort != null) {
mappedSort = new MultiSort<>(finalSort.getQuerySort(), (signal1, signal2) -> {
if (signal1.isValue() && signal2.isValue()) {
return finalSort.getResultSort().compare((signal1.getValue()), signal2.getValue());
} else {
return 0;
}
return finalSort.getResultSort().compare((signal1), signal2);
});
} else {
mappedSort = null;
}
var sortedKeys = LuceneUtils.mergeSignalStream(mappedKeys, mappedSort, limit);
return new SearchResult<>(sortedKeys);
return LuceneUtils.mergeSignalStreamItems(mappedKeys, mappedSort, limit);
}
/**
@ -171,7 +153,7 @@ public class LuceneIndex<T, U> implements LLSnapshottable {
= indicizer.getMoreLikeThisDocumentFields(key, mltDocumentValue);
return luceneIndex
.moreLikeThis(resolveSnapshot(queryParams.getSnapshot()), queryParams.toQueryParams(), indicizer.getKeyFieldName(), mltDocumentFields)
.map(llSearchResult -> this.transformLuceneResult(llSearchResult,
.flatMap(llSearchResult -> this.transformLuceneResult(llSearchResult,
queryParams.getSort(),
queryParams.getScoreMode(),
queryParams.getLimit()
@ -199,7 +181,7 @@ public class LuceneIndex<T, U> implements LLSnapshottable {
indicizer.getKeyFieldName(),
mltDocumentFields
)
.map(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult,
.flatMap(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult,
queryParams.getSort(),
queryParams.getScoreMode(),
queryParams.getLimit(),
@ -218,7 +200,7 @@ public class LuceneIndex<T, U> implements LLSnapshottable {
ClientQueryParams<SearchResultKey<T>> queryParams) {
return luceneIndex
.search(resolveSnapshot(queryParams.getSnapshot()), queryParams.toQueryParams(), indicizer.getKeyFieldName())
.map(llSearchResult -> this.transformLuceneResult(llSearchResult,
.flatMap(llSearchResult -> this.transformLuceneResult(llSearchResult,
queryParams.getSort(),
queryParams.getScoreMode(),
queryParams.getLimit()
@ -237,7 +219,7 @@ public class LuceneIndex<T, U> implements LLSnapshottable {
ValueGetter<T, U> valueGetter) {
return luceneIndex
.search(resolveSnapshot(queryParams.getSnapshot()), queryParams.toQueryParams(), indicizer.getKeyFieldName())
.map(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult,
.flatMap(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult,
queryParams.getSort(),
queryParams.getScoreMode(),
queryParams.getLimit(),
@ -246,10 +228,8 @@ public class LuceneIndex<T, U> implements LLSnapshottable {
}
public Mono<Long> count(@Nullable CompositeSnapshot snapshot, Query query) {
return Mono.from(this.search(ClientQueryParams.<SearchResultKey<T>>builder().snapshot(snapshot).query(query).limit(0).build())
.flatMapMany(SearchResultKeys::results)
.filter(LuceneSignal::isTotalHitsCount)
.map(LuceneSignal::getTotalHitsCount));
return this.search(ClientQueryParams.<SearchResultKey<T>>builder().snapshot(snapshot).query(query).limit(0).build())
.map(SearchResultKeys::getTotalHitsCount);
}
public boolean isLowMemoryMode() {

View File

@ -1,22 +0,0 @@
package it.cavallium.dbengine.client;
public interface LuceneSignal<T> {
boolean isValue();
boolean isTotalHitsCount();
T getValue();
long getTotalHitsCount();
static <T> LuceneSignalValue<T> value(T value) {
return new LuceneSignalValue<>(value);
}
static <T> LuceneSignalTotalHitsCount<T> totalHitsCount(long totalHitsCount) {
return new LuceneSignalTotalHitsCount<>(totalHitsCount);
}
<U> LuceneSignalTotalHitsCount<U> mapTotalHitsCount();
}

View File

@ -1,36 +0,0 @@
package it.cavallium.dbengine.client;
public class LuceneSignalTotalHitsCount<T> implements LuceneSignal<T> {
private final long totalHitsCount;
public LuceneSignalTotalHitsCount(long totalHitsCount) {
this.totalHitsCount = totalHitsCount;
}
@Override
public boolean isValue() {
return false;
}
@Override
public boolean isTotalHitsCount() {
return true;
}
@Override
public T getValue() {
throw new UnsupportedOperationException("This object is not value");
}
@Override
public long getTotalHitsCount() {
return totalHitsCount;
}
@Override
public <U> LuceneSignalTotalHitsCount<U> mapTotalHitsCount() {
//noinspection unchecked
return (LuceneSignalTotalHitsCount<U>) this;
}
}

View File

@ -1,34 +0,0 @@
package it.cavallium.dbengine.client;
public class LuceneSignalValue<T> implements LuceneSignal<T> {
private final T value;
public LuceneSignalValue(T value) {
this.value = value;
}
public boolean isValue() {
return true;
}
@Override
public boolean isTotalHitsCount() {
return false;
}
@Override
public T getValue() {
return value;
}
@Override
public long getTotalHitsCount() {
throw new UnsupportedOperationException("This object is not TotalHitsCount");
}
@Override
public <U> LuceneSignalTotalHitsCount<U> mapTotalHitsCount() {
throw new UnsupportedOperationException("This object is not TotalHitsCount");
}
}

View File

@ -5,7 +5,6 @@ import it.cavallium.dbengine.client.query.current.data.RandomSort;
import it.cavallium.dbengine.client.query.current.data.ScoreSort;
import it.cavallium.dbengine.client.query.current.data.Sort;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLSignal;
import java.util.Comparator;
import java.util.function.ToIntFunction;
import java.util.function.ToLongFunction;
@ -68,15 +67,9 @@ public class MultiSort<T> {
return new MultiSort<>(RandomSort.of(), (a, b) -> 0);
}
public static MultiSort<LLSignal> topScoreRaw() {
public static MultiSort<LLKeyScore> topScoreRaw() {
Comparator<LLKeyScore> comp = Comparator.comparingDouble(LLKeyScore::getScore).reversed();
return new MultiSort<>(ScoreSort.of(), (signal1, signal2) -> {
if (signal1.isValue() && signal2.isValue()) {
return comp.compare(signal1.getValue(), signal2.getValue());
} else {
return 0;
}
});
return new MultiSort<>(ScoreSort.of(), comp);
}
public static <T> MultiSort<SearchResultKey<T>> topScore() {

View File

@ -1,42 +1,15 @@
package it.cavallium.dbengine.client;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.Value;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
@EqualsAndHashCode
@ToString
@Value
public class SearchResult<T, U> {
private final Flux<LuceneSignal<SearchResultItem<T, U>>> results;
public SearchResult(Flux<LuceneSignal<SearchResultItem<T, U>>> results) {
this.results = results;
}
Flux<SearchResultItem<T, U>> results;
long totalHitsCount;
public static <T, U> SearchResult<T, U> empty() {
return new SearchResult<>(Flux.just(LuceneSignal.totalHitsCount(0L)));
}
public Flux<LuceneSignal<SearchResultItem<T, U>>> results() {
return this.results;
}
public Flux<SearchResultItem<T, U>> onlyValues() {
return this.results.filter(LuceneSignal::isValue).map(LuceneSignal::getValue);
}
/**
* You must subscribe to both publishers
*/
public Tuple2<Flux<SearchResultItem<T, U>>, Mono<Long>> splitShared() {
Flux<LuceneSignal<SearchResultItem<T, U>>> shared = results.publish().refCount(2);
return Tuples.of(
shared.filter(LuceneSignal::isValue).map(LuceneSignal::getValue).share(),
Mono.from(shared.filter(LuceneSignal::isTotalHitsCount).map(LuceneSignal::getTotalHitsCount)).cache()
);
return new SearchResult<>(Flux.empty(), 0L);
}
}

View File

@ -1,57 +1,25 @@
package it.cavallium.dbengine.client;
import it.cavallium.dbengine.database.collections.Joiner.ValueGetter;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.Value;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
@EqualsAndHashCode
@ToString
@Value
public class SearchResultKeys<T> {
private final Flux<LuceneSignal<SearchResultKey<T>>> results;
public SearchResultKeys(Flux<LuceneSignal<SearchResultKey<T>>> results) {
this.results = results;
}
Flux<SearchResultKey<T>> results;
long totalHitsCount;
public static <T, U> SearchResultKeys<T> empty() {
return new SearchResultKeys<>(Flux.just(LuceneSignal.totalHitsCount(0L)));
}
public Flux<LuceneSignal<SearchResultKey<T>>> results() {
return this.results;
return new SearchResultKeys<>(Flux.empty(), 0L);
}
public <U> SearchResult<T, U> withValues(ValueGetter<T, U> valuesGetter) {
return new SearchResult<>(
results.flatMapSequential(item -> {
if (item.isValue()) {
return valuesGetter
.get(item.getValue().getKey())
.map(value -> LuceneSignal.value(new SearchResultItem<>(item.getValue().getKey(), value, item.getValue().getScore())));
} else {
return Mono.just(item.mapTotalHitsCount());
}
})
);
}
public Flux<SearchResultKey<T>> onlyKeys() {
return this.results.filter(LuceneSignal::isValue).map(LuceneSignal::getValue);
}
/**
* You must subscribe to both publishers
*/
public Tuple2<Flux<SearchResultKey<T>>, Mono<Long>> splitShared() {
Flux<LuceneSignal<SearchResultKey<T>>> shared = results.publish().refCount(2);
return Tuples.of(
shared.filter(LuceneSignal::isValue).map(LuceneSignal::getValue).share(),
Mono.from(shared.filter(LuceneSignal::isTotalHitsCount).map(LuceneSignal::getTotalHitsCount)).cache()
results.flatMapSequential(item -> valuesGetter
.get(item.getKey())
.map(value -> new SearchResultItem<>(item.getKey(), value, item.getScore()))),
totalHitsCount
);
}
}

View File

@ -2,7 +2,7 @@ package it.cavallium.dbengine.database;
import java.util.Objects;
public class LLKeyScore implements LLSignal {
public class LLKeyScore {
private final String key;
private final float score;
@ -45,24 +45,4 @@ public class LLKeyScore implements LLSignal {
", score=" + score +
'}';
}
@Override
public boolean isValue() {
return true;
}
@Override
public boolean isTotalHitsCount() {
return false;
}
@Override
public LLKeyScore getValue() {
return this;
}
@Override
public long getTotalHitsCount() {
throw new UnsupportedOperationException();
}
}

View File

@ -5,6 +5,7 @@ import it.cavallium.dbengine.client.query.current.data.NoSort;
import it.cavallium.dbengine.client.query.current.data.Query;
import it.cavallium.dbengine.client.query.current.data.QueryParams;
import it.cavallium.dbengine.client.query.current.data.ScoreMode;
import it.cavallium.dbengine.lucene.LuceneUtils;
import java.util.Set;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
@ -50,10 +51,9 @@ public interface LLLuceneIndex extends LLSnapshottable {
default Mono<Long> count(@Nullable LLSnapshot snapshot, Query query) {
QueryParams params = QueryParams.of(query, 0, Nullablefloat.empty(), NoSort.of(), ScoreMode.of(false, false));
return Mono.from(this.search(snapshot, params, null)
.flatMapMany(LLSearchResult::results)
.flatMap(s -> s)
.filter(LLSignal::isTotalHitsCount)
.map(LLSignal::getTotalHitsCount));
.flatMap(results -> LuceneUtils.mergeSignalStreamRaw(results.getResults(), null, null))
.map(LLSearchResultShard::getTotalHitsCount)
.defaultIfEmpty(0L));
}
boolean isLowMemoryMode();

View File

@ -1,58 +1,17 @@
package it.cavallium.dbengine.database;
import java.util.Objects;
import java.util.function.BiFunction;
import lombok.Value;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Value
public class LLSearchResult {
private final Flux<Flux<LLSignal>> results;
public LLSearchResult(Flux<Flux<LLSignal>> results) {
this.results = results;
}
public static LLSearchResult empty() {
return new LLSearchResult(Flux.just(Flux.just(new LLTotalHitsCount(0L))));
}
Flux<LLSearchResultShard> results;
@NotNull
public static BiFunction<LLSearchResult, LLSearchResult, LLSearchResult> accumulator() {
return (a, b) -> new LLSearchResult(Flux.merge(a.results, b.results));
}
public Flux<Flux<LLSignal>> results() {
return this.results;
}
public Mono<Void> completion() {
return results.flatMap(r -> r).then();
}
public boolean equals(final Object o) {
if (o == this) {
return true;
}
if (!(o instanceof LLSearchResult)) {
return false;
}
final LLSearchResult other = (LLSearchResult) o;
final Object this$results = this.results();
final Object other$results = other.results();
return Objects.equals(this$results, other$results);
}
public int hashCode() {
final int PRIME = 59;
int result = 1;
final Object $results = this.results();
result = result * PRIME + ($results == null ? 43 : $results.hashCode());
return result;
}
public String toString() {
return "LLSearchResult(results=" + this.results() + ")";
}
}

View File

@ -0,0 +1,11 @@
package it.cavallium.dbengine.database;
import lombok.Value;
import reactor.core.publisher.Flux;
@Value
public class LLSearchResultShard {
Flux<LLKeyScore> results;
long totalHitsCount;
}

View File

@ -1,20 +0,0 @@
package it.cavallium.dbengine.database;
public interface LLSignal {
boolean isValue();
boolean isTotalHitsCount();
LLKeyScore getValue();
long getTotalHitsCount();
static LLSignal value(LLKeyScore value) {
return value;
}
static LLTotalHitsCount totalHitsCount(long totalHitsCount) {
return new LLTotalHitsCount(totalHitsCount);
}
}

View File

@ -1,54 +0,0 @@
package it.cavallium.dbengine.database;
import java.util.Objects;
import java.util.StringJoiner;
public class LLTotalHitsCount implements LLSignal {
private final long value;
public LLTotalHitsCount(long value) {
this.value = value;
}
@Override
public boolean isValue() {
return false;
}
@Override
public boolean isTotalHitsCount() {
return true;
}
public LLKeyScore getValue() {
throw new UnsupportedOperationException();
}
@Override
public long getTotalHitsCount() {
return value;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LLTotalHitsCount that = (LLTotalHitsCount) o;
return value == that.value;
}
@Override
public int hashCode() {
return Objects.hash(value);
}
@Override
public String toString() {
return new StringJoiner(", ", LLTotalHitsCount.class.getSimpleName() + "[", "]").add("count=" + value).toString();
}
}

View File

@ -9,10 +9,9 @@ import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLSearchCollectionStatisticsGetter;
import it.cavallium.dbengine.database.LLSearchResult;
import it.cavallium.dbengine.database.LLSignal;
import it.cavallium.dbengine.database.LLSearchResultShard;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLTerm;
import it.cavallium.dbengine.database.LLTotalHitsCount;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.ScheduledTaskLifecycle;
@ -392,15 +391,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
String keyFieldName,
Flux<Tuple2<String, Set<String>>> mltDocumentFieldsFlux,
long actionId) {
return moreLikeThis(snapshot,
queryParams,
keyFieldName,
mltDocumentFieldsFlux,
true,
actionId,
1
)
.flatMap(LLSearchResult::completion);
return moreLikeThis(snapshot, queryParams, keyFieldName, mltDocumentFieldsFlux, true, actionId, 1).then();
}
@SuppressWarnings({"unchecked", "rawtypes"})
@ -422,7 +413,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.flatMap(mltDocumentFields -> {
mltDocumentFields.entrySet().removeIf(entry -> entry.getValue().isEmpty());
if (mltDocumentFields.isEmpty()) {
return Mono.just(LLSearchResult.empty());
return Mono.just(new LLSearchResult(Flux.empty()));
}
return acquireSearcherWrapper(snapshot, doDistributedPre, actionId).flatMap(indexSearcher -> Mono
@ -495,7 +486,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
public Mono<Void> distributedPreSearch(@Nullable LLSnapshot snapshot, QueryParams queryParams, String keyFieldName, long actionId) {
return this
.search(snapshot, queryParams, keyFieldName, true, actionId, 1)
.flatMap(LLSearchResult::completion);
.then();
}
private Mono<LLSearchResult> search(@Nullable LLSnapshot snapshot,
@ -550,27 +541,58 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
Query luceneQuery,
Sort luceneSort,
ScoreMode luceneScoreMode) {
return new LLSearchResult(Flux.just(Flux.defer(() -> Flux.<LLSignal>create(sink -> {
AtomicBoolean cancelled = new AtomicBoolean();
Semaphore requests = new Semaphore(0);
sink.onDispose(() -> {
cancelled.set(true);
});
sink.onCancel(() -> {
cancelled.set(true);
});
sink.onRequest(delta -> {
requests.release((int) Math.min(delta, Integer.MAX_VALUE));
});
return new LLSearchResult(Mono.<LLSearchResultShard>create(monoSink -> {
long totalHitsCount;
try {
if (!cancelled.get()) {
if (doDistributedPre) {
//noinspection BlockingMethodInNonBlockingContext
allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery);
sink.next(new LLTotalHitsCount(0L));
} else {
if (!doDistributedPre) {
AtomicLong totalHitsCountAtomic = new AtomicLong(0);
//noinspection BlockingMethodInNonBlockingContext
streamSearcher.search(indexSearcher,
luceneQuery,
0,
luceneSort,
luceneScoreMode,
minCompetitiveScore,
keyFieldName,
keyScore -> HandleResult.HALT,
totalHitsCountAtomic::set
);
totalHitsCount = totalHitsCountAtomic.get();
} else {
//noinspection BlockingMethodInNonBlockingContext
allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery);
totalHitsCount = 0;
}
} catch (Exception ex) {
monoSink.error(ex);
return;
}
AtomicBoolean alreadySubscribed = new AtomicBoolean(false);
var resultsFlux = Flux.<LLKeyScore>create(sink -> {
if (!alreadySubscribed.compareAndSet(false, true)) {
sink.error(new IllegalStateException("Already subscribed to results"));
return;
}
AtomicBoolean cancelled = new AtomicBoolean();
Semaphore requests = new Semaphore(0);
sink.onDispose(() -> {
cancelled.set(true);
});
sink.onCancel(() -> {
cancelled.set(true);
});
sink.onRequest(delta -> {
requests.release((int) Math.min(delta, Integer.MAX_VALUE));
});
try {
if (!doDistributedPre) {
int boundedLimit = Math.max(0, limit > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) limit);
AtomicLong atomicTotalHitsCount = new AtomicLong(0);
//noinspection BlockingMethodInNonBlockingContext
streamSearcher.search(indexSearcher,
luceneQuery,
@ -602,31 +624,18 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
return HandleResult.HALT;
}
},
totalHitsCount -> {
try {
if (cancelled.get()) {
return;
}
while (!requests.tryAcquire(500, TimeUnit.MILLISECONDS)) {
if (cancelled.get()) {
return;
}
}
sink.next(new LLTotalHitsCount(totalHitsCount));
} catch (Exception ex) {
sink.error(ex);
cancelled.set(true);
requests.release(Integer.MAX_VALUE);
}
}
atomicTotalHitsCount::set
);
}
sink.complete();
} catch (Exception ex) {
sink.error(ex);
}
} catch (Exception ex) {
sink.error(ex);
}
}, OverflowStrategy.ERROR).subscribeOn(blockingLuceneSearchScheduler).publishOn(luceneQueryScheduler))));
}, OverflowStrategy.ERROR).subscribeOn(blockingLuceneSearchScheduler).publishOn(luceneQueryScheduler);
monoSink.success(new LLSearchResultShard(resultsFlux, totalHitsCount));
}).subscribeOn(blockingLuceneSearchScheduler).publishOn(luceneQueryScheduler).flux());
}
@Override

View File

@ -7,6 +7,7 @@ import it.cavallium.dbengine.client.query.current.data.QueryParams;
import it.cavallium.dbengine.database.LLDocument;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLSearchResult;
import it.cavallium.dbengine.database.LLSearchResultShard;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLTerm;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
@ -262,9 +263,12 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
.reduce(LLSearchResult.accumulator())
.map(result -> {
if (actionId != -1) {
var resultsWithTermination = result
.results()
.map(flux -> flux.doOnTerminate(() -> completedAction(actionId)));
Flux<LLSearchResultShard> resultsWithTermination = result
.getResults()
.map(flux -> new LLSearchResultShard(flux
.getResults()
.doOnTerminate(() -> completedAction(actionId)), flux.getTotalHitsCount())
);
return new LLSearchResult(resultsWithTermination);
} else {
return result;
@ -323,9 +327,12 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
.reduce(LLSearchResult.accumulator())
.map(result -> {
if (actionId != -1) {
var resultsWithTermination = result
.results()
.map(flux -> flux.doOnTerminate(() -> completedAction(actionId)));
Flux<LLSearchResultShard> resultsWithTermination = result
.getResults()
.map(flux -> new LLSearchResultShard(flux
.getResults()
.doOnTerminate(() -> completedAction(actionId)), flux.getTotalHitsCount())
);
return new LLSearchResult(resultsWithTermination);
} else {
return result;

View File

@ -1,11 +1,13 @@
package it.cavallium.dbengine.lucene;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.client.LuceneSignal;
import it.cavallium.dbengine.client.MultiSort;
import it.cavallium.dbengine.client.SearchResult;
import it.cavallium.dbengine.client.SearchResultItem;
import it.cavallium.dbengine.client.SearchResultKey;
import it.cavallium.dbengine.client.SearchResultKeys;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLSignal;
import it.cavallium.dbengine.database.LLTotalHitsCount;
import it.cavallium.dbengine.database.LLSearchResultShard;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionary;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep;
import it.cavallium.dbengine.database.collections.Joiner.ValueGetter;
@ -232,50 +234,36 @@ public class LuceneUtils {
return HandleResult.CONTINUE;
}
public static <T> Flux<LuceneSignal<T>> mergeSignalStream(Flux<Flux<LuceneSignal<T>>> mappedKeys,
MultiSort<LuceneSignal<T>> mappedSort,
public static <T> Mono<SearchResultKeys<T>> mergeSignalStreamKeys(Flux<SearchResultKeys<T>> mappedKeys,
MultiSort<SearchResultKey<T>> sort,
Long limit) {
Flux<Flux<LuceneSignal<T>>> sharedMappedSignals = mappedKeys
.map(sub -> sub
.publish()
.refCount(2)
)
.publish()
.refCount(2);
Flux<LuceneSignal<T>> sortedValues = LuceneUtils
.mergeStream(sharedMappedSignals.map(sub -> sub.filter(LuceneSignal::isValue)), mappedSort, limit);
//noinspection Convert2MethodRef
Mono<LuceneSignal<T>> sortedTotalSize = sharedMappedSignals
.flatMap(sub -> sub)
.filter(LuceneSignal::isTotalHitsCount)
.map(LuceneSignal::getTotalHitsCount)
.reduce(Long::sum)
.map(sum -> LuceneSignal.totalHitsCount(sum));
return Flux.merge(sortedValues, sortedTotalSize);
return mappedKeys.reduce(
new SearchResultKeys<>(Flux.empty(), 0L),
(a, b) -> new SearchResultKeys<T>(LuceneUtils
.mergeStream(Flux.just(a.getResults(), b.getResults()), sort, limit), a.getTotalHitsCount() + b.getTotalHitsCount())
);
}
public static Flux<LLSignal> mergeSignalStreamRaw(Flux<Flux<LLSignal>> mappedKeys,
MultiSort<LLSignal> mappedSort,
public static <T, U> Mono<SearchResult<T, U>> mergeSignalStreamItems(Flux<SearchResult<T, U>> mappedKeys,
MultiSort<SearchResultItem<T, U>> sort,
Long limit) {
Flux<Flux<LLSignal>> sharedMappedSignals = mappedKeys
.map(sub -> sub
.publish()
.refCount(2)
return mappedKeys.reduce(
new SearchResult<>(Flux.empty(), 0L),
(a, b) -> new SearchResult<T, U>(LuceneUtils
.mergeStream(Flux.just(a.getResults(), b.getResults()), sort, limit), a.getTotalHitsCount() + b.getTotalHitsCount())
);
}
public static Mono<LLSearchResultShard> mergeSignalStreamRaw(Flux<LLSearchResultShard> mappedKeys,
MultiSort<LLKeyScore> mappedSort,
Long limit) {
return mappedKeys.reduce(
new LLSearchResultShard(Flux.empty(), 0),
(s1, s2) -> new LLSearchResultShard(
LuceneUtils.mergeStream(Flux.just(s1.getResults(), s2.getResults()), mappedSort, limit),
s1.getTotalHitsCount() + s2.getTotalHitsCount()
)
.publish()
.refCount(2);
Flux<LLSignal> sortedValues = LuceneUtils
.mergeStream(sharedMappedSignals.map(sub -> sub.filter(LLSignal::isValue)), mappedSort, limit);
//noinspection Convert2MethodRef
Mono<LLSignal> sortedTotalSize = sharedMappedSignals
.flatMap(sub -> sub)
.filter(LLSignal::isTotalHitsCount)
.map(LLSignal::getTotalHitsCount)
.reduce(Long::sum)
.map(sum -> new LLTotalHitsCount(sum));
return Flux.merge(sortedValues, sortedTotalSize);
);
}
public static <T, U, V> ValueGetter<Entry<T, U>, V> getAsyncDbValueGetterDeep(