Finished initial refactoring

This commit is contained in:
Andrea Cavalli 2021-09-20 12:51:27 +02:00
parent 4a883ca8ea
commit 3c5edbc06e
10 changed files with 105 additions and 75 deletions

View File

@ -389,12 +389,30 @@ public class LLUtils {
Function<V, Mono<U>> resourceClosure, Function<V, Mono<U>> resourceClosure,
boolean cleanupOnSuccess) { boolean cleanupOnSuccess) {
return Mono.usingWhen(resourceSupplier, resourceClosure, r -> { return Mono.usingWhen(resourceSupplier, resourceClosure, r -> {
if (cleanupOnSuccess) { if (cleanupOnSuccess) {
return Mono.fromRunnable(r::close); return Mono.fromRunnable(r::close);
} else { } else {
return Mono.empty(); return Mono.empty();
} }
}, (r, ex) -> Mono.fromRunnable(r::close), r -> Mono.fromRunnable(r::close)) }, (r, ex) -> Mono.fromRunnable(r::close), r -> Mono.fromRunnable(r::close))
.doOnDiscard(Resource.class, Resource::close)
.doOnDiscard(Send.class, Send::close);
}
/**
* cleanup resource
* @param cleanupOnSuccess if true the resource will be cleaned up if the function is successful
*/
public static <U, T extends Resource<T>> Mono<U> usingSendResource(Mono<Send<T>> resourceSupplier,
Function<T, Mono<U>> resourceClosure,
boolean cleanupOnSuccess) {
return Mono.usingWhen(resourceSupplier.map(Send::receive), resourceClosure, r -> {
if (cleanupOnSuccess) {
return Mono.fromRunnable(r::close);
} else {
return Mono.empty();
}
}, (r, ex) -> Mono.fromRunnable(r::close), r -> Mono.fromRunnable(r::close))
.doOnDiscard(Resource.class, Resource::close) .doOnDiscard(Resource.class, Resource::close)
.doOnDiscard(Send.class, Send::close); .doOnDiscard(Send.class, Send::close);
} }

View File

@ -12,8 +12,8 @@ import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionary; import it.cavallium.dbengine.database.collections.DatabaseMapDictionary;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep; import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep;
import it.cavallium.dbengine.database.collections.ValueGetter; import it.cavallium.dbengine.database.collections.ValueGetter;
import it.cavallium.dbengine.database.disk.LLIndexContexts;
import it.cavallium.dbengine.database.disk.LLIndexSearcher; import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.lucene.analyzer.NCharGramAnalyzer; import it.cavallium.dbengine.lucene.analyzer.NCharGramAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.NCharGramEdgeAnalyzer; import it.cavallium.dbengine.lucene.analyzer.NCharGramEdgeAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer; import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
@ -371,7 +371,7 @@ public class LuceneUtils {
} }
public static Flux<LLKeyScore> convertHits(Flux<ScoreDoc> hitsFlux, public static Flux<LLKeyScore> convertHits(Flux<ScoreDoc> hitsFlux,
LLIndexContexts indexSearchers, LLIndexSearchers indexSearchers,
String keyFieldName, String keyFieldName,
boolean preserveOrder) { boolean preserveOrder) {
if (preserveOrder) { if (preserveOrder) {
@ -396,7 +396,7 @@ public class LuceneUtils {
@Nullable @Nullable
private static LLKeyScore mapHitBlocking(ScoreDoc hit, private static LLKeyScore mapHitBlocking(ScoreDoc hit,
LLIndexContexts indexSearchers, LLIndexSearchers indexSearchers,
String keyFieldName) { String keyFieldName) {
if (Schedulers.isInNonBlockingThread()) { if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called mapHitBlocking in a nonblocking thread"); throw new UnsupportedOperationException("Called mapHitBlocking in a nonblocking thread");

View File

@ -1,8 +1,8 @@
package it.cavallium.dbengine.lucene.searcher; package it.cavallium.dbengine.lucene.searcher;
import io.net5.buffer.api.Send; import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.disk.LLIndexContext;
import it.cavallium.dbengine.database.disk.LLIndexSearcher; import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
public class AdaptiveLuceneLocalSearcher implements LuceneLocalSearcher { public class AdaptiveLuceneLocalSearcher implements LuceneLocalSearcher {
@ -12,13 +12,14 @@ public class AdaptiveLuceneLocalSearcher implements LuceneLocalSearcher {
private static final LuceneLocalSearcher countSearcher = new CountLuceneLocalSearcher(); private static final LuceneLocalSearcher countSearcher = new CountLuceneLocalSearcher();
@Override @Override
public Mono<Send<LuceneSearchResult>> collect(Mono<Send<LLIndexContext>> indexSearcher, public Mono<Send<LuceneSearchResult>> collect(Mono<Send<LLIndexSearcher>> indexSearcher,
LocalQueryParams queryParams, LocalQueryParams queryParams,
String keyFieldName) { String keyFieldName,
LLSearchTransformer transformer) {
if (queryParams.limit() == 0) { if (queryParams.limit() == 0) {
return countSearcher.collect(indexSearcher, queryParams, keyFieldName); return countSearcher.collect(indexSearcher, queryParams, keyFieldName, transformer);
} else { } else {
return localSearcher.collect(indexSearcher, queryParams, keyFieldName); return localSearcher.collect(indexSearcher, queryParams, keyFieldName, transformer);
} }
} }
} }

View File

@ -1,8 +1,8 @@
package it.cavallium.dbengine.lucene.searcher; package it.cavallium.dbengine.lucene.searcher;
import io.net5.buffer.api.Send; import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.disk.LLIndexContext;
import it.cavallium.dbengine.database.disk.LLIndexSearcher; import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
@ -18,15 +18,16 @@ public class AdaptiveLuceneMultiSearcher implements LuceneMultiSearcher {
= new SimpleUnsortedUnscoredLuceneMultiSearcher(new SimpleLuceneLocalSearcher()); = new SimpleUnsortedUnscoredLuceneMultiSearcher(new SimpleLuceneLocalSearcher());
@Override @Override
public Mono<Send<LuceneSearchResult>> collect(Flux<Send<LLIndexContext>> indexSearchersFlux, public Mono<Send<LuceneSearchResult>> collectMulti(Mono<Send<LLIndexSearchers>> indexSearchersMono,
LocalQueryParams queryParams, LocalQueryParams queryParams,
String keyFieldName) { String keyFieldName,
LLSearchTransformer transformer) {
if (queryParams.limit() == 0) { if (queryParams.limit() == 0) {
return countLuceneMultiSearcher.collect(indexSearchersFlux, queryParams, keyFieldName); return countLuceneMultiSearcher.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
} else if (queryParams.isSorted() || queryParams.isScored()) { } else if (queryParams.isSorted() || queryParams.isScored()) {
return scoredSimpleLuceneShardSearcher.collect(indexSearchersFlux, queryParams, keyFieldName); return scoredSimpleLuceneShardSearcher.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
} else { } else {
return unscoredPagedLuceneMultiSearcher.collect(indexSearchersFlux, queryParams, keyFieldName); return unscoredPagedLuceneMultiSearcher.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
} }
} }
} }

View File

@ -3,7 +3,6 @@ package it.cavallium.dbengine.lucene.searcher;
import io.net5.buffer.api.Send; import io.net5.buffer.api.Send;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexContext;
import it.cavallium.dbengine.database.disk.LLIndexSearcher; import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
@ -12,9 +11,10 @@ import reactor.core.scheduler.Schedulers;
public class CountLuceneLocalSearcher implements LuceneLocalSearcher { public class CountLuceneLocalSearcher implements LuceneLocalSearcher {
@Override @Override
public Mono<Send<LuceneSearchResult>> collect(Mono<Send<LLIndexContext>> indexSearcherMono, public Mono<Send<LuceneSearchResult>> collect(Mono<Send<LLIndexSearcher>> indexSearcherMono,
LocalQueryParams queryParams, LocalQueryParams queryParams,
String keyFieldName) { String keyFieldName,
LLSearchTransformer transformer) {
return Mono return Mono
.usingWhen( .usingWhen(
indexSearcherMono, indexSearcherMono,

View File

@ -1,7 +1,6 @@
package it.cavallium.dbengine.lucene.searcher; package it.cavallium.dbengine.lucene.searcher;
import io.net5.buffer.api.Send; import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.disk.LLIndexContext;
import it.cavallium.dbengine.database.disk.LLIndexSearcher; import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;

View File

@ -1,7 +1,6 @@
package it.cavallium.dbengine.lucene.searcher; package it.cavallium.dbengine.lucene.searcher;
import io.net5.buffer.api.Send; import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.disk.LLIndexContext;
import it.cavallium.dbengine.database.disk.LLIndexSearcher; import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexSearchers; import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;

View File

@ -6,8 +6,8 @@ import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SE
import io.net5.buffer.api.Send; import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexContext; import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexContexts; import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.LuceneUtils;
import java.util.Arrays; import java.util.Arrays;
import java.util.Objects; import java.util.Objects;
@ -23,15 +23,14 @@ public class ScoredSimpleLuceneShardSearcher implements LuceneMultiSearcher {
} }
@Override @Override
public Mono<Send<LuceneSearchResult>> collect(Flux<Send<LLIndexContext>> indexSearchersFlux, public Mono<Send<LuceneSearchResult>> collectMulti(Mono<Send<LLIndexSearchers>> indexSearchersMono,
LocalQueryParams queryParams, LocalQueryParams queryParams,
String keyFieldName) { String keyFieldName,
LLSearchTransformer transformer) {
Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null"); Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null");
PaginationInfo paginationInfo = getPaginationInfo(queryParams); PaginationInfo paginationInfo = getPaginationInfo(queryParams);
var indexSearchersMono = indexSearchersFlux.collectList().map(LLIndexContexts::of); return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> this
return LLUtils.usingResource(indexSearchersMono, indexSearchers -> this
// Search first page results // Search first page results
.searchFirstPage(indexSearchers, queryParams, paginationInfo) .searchFirstPage(indexSearchers, queryParams, paginationInfo)
// Compute the results of the first page // Compute the results of the first page
@ -66,7 +65,7 @@ public class ScoredSimpleLuceneShardSearcher implements LuceneMultiSearcher {
/** /**
* Search effectively the raw results of the first page * Search effectively the raw results of the first page
*/ */
private Mono<PageData> searchFirstPage(LLIndexContexts indexSearchers, private Mono<PageData> searchFirstPage(LLIndexSearchers indexSearchers,
LocalQueryParams queryParams, LocalQueryParams queryParams,
PaginationInfo paginationInfo) { PaginationInfo paginationInfo) {
var limit = LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset() + paginationInfo.firstPageLimit()); var limit = LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset() + paginationInfo.firstPageLimit());
@ -81,7 +80,7 @@ public class ScoredSimpleLuceneShardSearcher implements LuceneMultiSearcher {
* Compute the results of the first page, extracting useful data * Compute the results of the first page, extracting useful data
*/ */
private Mono<FirstPageResults> computeFirstPageResults(Mono<PageData> firstPageDataMono, private Mono<FirstPageResults> computeFirstPageResults(Mono<PageData> firstPageDataMono,
LLIndexContexts indexSearchers, LLIndexSearchers indexSearchers,
String keyFieldName, String keyFieldName,
LocalQueryParams queryParams) { LocalQueryParams queryParams) {
return firstPageDataMono.map(firstPageData -> { return firstPageDataMono.map(firstPageData -> {
@ -98,7 +97,7 @@ public class ScoredSimpleLuceneShardSearcher implements LuceneMultiSearcher {
} }
private Mono<Send<LuceneSearchResult>> computeOtherResults(Mono<FirstPageResults> firstResultMono, private Mono<Send<LuceneSearchResult>> computeOtherResults(Mono<FirstPageResults> firstResultMono,
LLIndexContexts indexSearchers, LLIndexSearchers indexSearchers,
LocalQueryParams queryParams, LocalQueryParams queryParams,
String keyFieldName) { String keyFieldName) {
return firstResultMono.map(firstResult -> { return firstResultMono.map(firstResult -> {
@ -116,7 +115,7 @@ public class ScoredSimpleLuceneShardSearcher implements LuceneMultiSearcher {
/** /**
* Search effectively the merged raw results of the next pages * Search effectively the merged raw results of the next pages
*/ */
private Flux<LLKeyScore> searchOtherPages(LLIndexContexts indexSearchers, private Flux<LLKeyScore> searchOtherPages(LLIndexSearchers indexSearchers,
LocalQueryParams queryParams, String keyFieldName, CurrentPageInfo secondPageInfo) { LocalQueryParams queryParams, String keyFieldName, CurrentPageInfo secondPageInfo) {
return Flux return Flux
.defer(() -> { .defer(() -> {
@ -139,7 +138,7 @@ public class ScoredSimpleLuceneShardSearcher implements LuceneMultiSearcher {
* skip the first n results in the first page * skip the first n results in the first page
*/ */
private Mono<PageData> searchPage(LocalQueryParams queryParams, private Mono<PageData> searchPage(LocalQueryParams queryParams,
LLIndexContexts indexSearchers, LLIndexSearchers indexSearchers,
boolean allowPagination, boolean allowPagination,
int resultsOffset, int resultsOffset,
CurrentPageInfo s) { CurrentPageInfo s) {

View File

@ -7,10 +7,10 @@ import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SE
import io.net5.buffer.api.Send; import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexContext; import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexContexts; import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.database.disk.LLIndexSearchers.UnshardedIndexSearchers;
import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.database.disk.LLIndexContexts.UnshardedIndexSearchers;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Objects; import java.util.Objects;
@ -25,14 +25,15 @@ import reactor.core.scheduler.Schedulers;
public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
@Override @Override
public Mono<Send<LuceneSearchResult>> collect(Mono<Send<LLIndexContext>> indexSearcherMono, public Mono<Send<LuceneSearchResult>> collect(Mono<Send<LLIndexSearcher>> indexSearcherMono,
LocalQueryParams queryParams, LocalQueryParams queryParams,
String keyFieldName) { String keyFieldName,
LLSearchTransformer transformer) {
Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null"); Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null");
PaginationInfo paginationInfo = getPaginationInfo(queryParams); PaginationInfo paginationInfo = getPaginationInfo(queryParams);
var indexSearchersMono = indexSearcherMono.map(LLIndexContexts::unsharded); var indexSearchersMono = indexSearcherMono.map(LLIndexSearchers::unsharded);
return LLUtils.usingResource(indexSearchersMono, indexSearchers -> this return LLUtils.usingResource(indexSearchersMono, indexSearchers -> this
// Search first page results // Search first page results
@ -61,7 +62,7 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
/** /**
* Search effectively the raw results of the first page * Search effectively the raw results of the first page
*/ */
private Mono<PageData> searchFirstPage(UnshardedIndexSearchers indexSearchers, private Mono<PageData> searchFirstPage(LLIndexSearchers indexSearchers,
LocalQueryParams queryParams, LocalQueryParams queryParams,
PaginationInfo paginationInfo) { PaginationInfo paginationInfo) {
var limit = LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset() + paginationInfo.firstPageLimit()); var limit = LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset() + paginationInfo.firstPageLimit());
@ -76,7 +77,7 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
* Compute the results of the first page, extracting useful data * Compute the results of the first page, extracting useful data
*/ */
private Mono<FirstPageResults> computeFirstPageResults(Mono<PageData> firstPageDataMono, private Mono<FirstPageResults> computeFirstPageResults(Mono<PageData> firstPageDataMono,
LLIndexContexts indexSearchers, LLIndexSearchers indexSearchers,
String keyFieldName, String keyFieldName,
LocalQueryParams queryParams) { LocalQueryParams queryParams) {
return firstPageDataMono.map(firstPageData -> { return firstPageDataMono.map(firstPageData -> {
@ -132,7 +133,7 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
* skip the first n results in the first page * skip the first n results in the first page
*/ */
private CurrentPageInfo searchPageSync(LocalQueryParams queryParams, private CurrentPageInfo searchPageSync(LocalQueryParams queryParams,
UnshardedIndexSearchers indexSearchers, LLIndexSearchers indexSearchers,
boolean allowPagination, boolean allowPagination,
int resultsOffset, int resultsOffset,
CurrentPageInfo s, CurrentPageInfo s,
@ -141,13 +142,19 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
if (resultsOffset < 0) { if (resultsOffset < 0) {
throw new IndexOutOfBoundsException(resultsOffset); throw new IndexOutOfBoundsException(resultsOffset);
} }
UnshardedIndexSearchers unshardedIndexSearchers;
if (indexSearchers instanceof UnshardedIndexSearchers unshardedIndexSearchers1) {
unshardedIndexSearchers = unshardedIndexSearchers1;
} else {
throw new IllegalArgumentException();
}
if ((s.pageIndex() == 0 || s.last() != null) && s.remainingLimit() > 0) { if ((s.pageIndex() == 0 || s.last() != null) && s.remainingLimit() > 0) {
TopDocs pageTopDocs; TopDocs pageTopDocs;
try { try {
TopDocsCollector<ScoreDoc> collector = TopDocsSearcher.getTopDocsCollector(queryParams.sort(), TopDocsCollector<ScoreDoc> collector = TopDocsSearcher.getTopDocsCollector(queryParams.sort(),
s.currentPageLimit(), s.last(), LuceneUtils.totalHitsThreshold(), allowPagination, s.currentPageLimit(), s.last(), LuceneUtils.totalHitsThreshold(), allowPagination,
queryParams.isScored()); queryParams.isScored());
indexSearchers.shard().getIndexSearcher().search(queryParams.query(), collector); unshardedIndexSearchers.shard().getIndexSearcher().search(queryParams.query(), collector);
if (resultsOffset > 0) { if (resultsOffset > 0) {
pageTopDocs = collector.topDocs(resultsOffset, s.currentPageLimit()); pageTopDocs = collector.topDocs(resultsOffset, s.currentPageLimit());
} else { } else {

View File

@ -6,8 +6,8 @@ import io.net5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexContext;
import it.cavallium.dbengine.database.disk.LLIndexSearcher; import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
@ -23,10 +23,11 @@ public class SimpleUnsortedUnscoredLuceneMultiSearcher implements LuceneMultiSea
} }
@Override @Override
public Mono<Send<LuceneSearchResult>> collect(Flux<Send<LLIndexContext>> indexSearchersFlux, public Mono<Send<LuceneSearchResult>> collectMulti(Mono<Send<LLIndexSearchers>> indexSearchersMono,
LocalQueryParams queryParams, LocalQueryParams queryParams,
String keyFieldName) { String keyFieldName,
return Mono LLSearchTransformer transformer) {
var indexSearchersResource = Mono
.fromRunnable(() -> { .fromRunnable(() -> {
LLUtils.ensureBlocking(); LLUtils.ensureBlocking();
if (!queryParams.isSorted()) { if (!queryParams.isSorted()) {
@ -38,31 +39,36 @@ public class SimpleUnsortedUnscoredLuceneMultiSearcher implements LuceneMultiSea
+ " by SimpleUnsortedUnscoredLuceneMultiSearcher"); + " by SimpleUnsortedUnscoredLuceneMultiSearcher");
} }
}) })
.thenMany(indexSearchersFlux) .then(indexSearchersMono.map(Send::receive));
.flatMap(resSend -> localSearcher.collect(Mono.just(resSend).share(), queryParams, keyFieldName))
.collectList()
.map(results -> {
List<LuceneSearchResult> resultsToDrop = new ArrayList<>(results.size());
List<Flux<LLKeyScore>> resultsFluxes = new ArrayList<>(results.size());
boolean exactTotalHitsCount = true;
long totalHitsCountValue = 0;
for (Send<LuceneSearchResult> resultToReceive : results) {
LuceneSearchResult result = resultToReceive.receive();
resultsToDrop.add(result);
resultsFluxes.add(result.results());
exactTotalHitsCount &= result.totalHitsCount().exact();
totalHitsCountValue += result.totalHitsCount().value();
}
var totalHitsCount = new TotalHitsCount(totalHitsCountValue, exactTotalHitsCount); return LLUtils.usingResource(indexSearchersResource,
Flux<LLKeyScore> mergedFluxes = Flux.merge(resultsFluxes); indexSearchers -> Flux.fromIterable(indexSearchers.shards())
.flatMap(searcher -> localSearcher
.collect(Mono.just(searcher.send()), queryParams, keyFieldName, transformer))
.collectList()
.map(results -> {
List<LuceneSearchResult> resultsToDrop = new ArrayList<>(results.size());
List<Flux<LLKeyScore>> resultsFluxes = new ArrayList<>(results.size());
boolean exactTotalHitsCount = true;
long totalHitsCountValue = 0;
for (Send<LuceneSearchResult> resultToReceive : results) {
LuceneSearchResult result = resultToReceive.receive();
resultsToDrop.add(result);
resultsFluxes.add(result.results());
exactTotalHitsCount &= result.totalHitsCount().exact();
totalHitsCountValue += result.totalHitsCount().value();
}
return new LuceneSearchResult(totalHitsCount, mergedFluxes, d -> { var totalHitsCount = new TotalHitsCount(totalHitsCountValue, exactTotalHitsCount);
for (LuceneSearchResult luceneSearchResult : resultsToDrop) { Flux<LLKeyScore> mergedFluxes = Flux.merge(resultsFluxes);
luceneSearchResult.close();
} return new LuceneSearchResult(totalHitsCount, mergedFluxes, d -> {
}).send(); for (LuceneSearchResult luceneSearchResult : resultsToDrop) {
}) luceneSearchResult.close();
.doOnDiscard(Send.class, Send::close); }
}).send();
}),
true
);
} }
} }