Avoid calling reduce() multiple times

This commit is contained in:
Andrea Cavalli 2021-07-08 18:54:53 +02:00
parent e2b5432b8d
commit 5f3c8a2515
8 changed files with 201 additions and 136 deletions

View File

@ -22,6 +22,7 @@ import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -365,32 +366,62 @@ public class LuceneUtils {
* Transform a flux of results to take elements while the minimum competitive score is valid * Transform a flux of results to take elements while the minimum competitive score is valid
*/ */
public static Flux<LLKeyScore> filterTopDoc(Flux<LLKeyScore> flux, LocalQueryParams queryParams) { public static Flux<LLKeyScore> filterTopDoc(Flux<LLKeyScore> flux, LocalQueryParams queryParams) {
return flux; if (queryParams.scoreMode().needsScores() && queryParams.minCompetitiveScore() != null) {
/* if (queryParams.sort() != null && queryParams.sort().needsScores()) {
if (queryParams.sort() != null && queryParams.sort().needsScores() && queryParams.minCompetitiveScore() != null) { return flux.takeWhile(entry -> LuceneUtils.filterTopDoc(entry.score(), queryParams.minCompetitiveScore()));
return flux.takeWhile(entry -> LuceneUtils.filterTopDoc(entry.score(), queryParams.minCompetitiveScore())); } else {
return flux.filter(entry -> LuceneUtils.filterTopDoc(entry.score(), queryParams.minCompetitiveScore()));
}
} else { } else {
return flux; return flux;
}*/ }
} }
public static TopDocs mergeTopDocs(Sort sort, int startN, int topN, TopDocs[] topDocs, Comparator<ScoreDoc> tieBreaker) { public static TopDocs mergeTopDocs(Sort sort, @Nullable Integer startN, @Nullable Integer topN, TopDocs[] topDocs, Comparator<ScoreDoc> tieBreaker) {
if ((startN == null) != (topN == null)) {
throw new IllegalArgumentException("You must pass startN and topN together or nothing");
}
TopDocs result; TopDocs result;
if (sort != null) { if (sort != null) {
if (!(topDocs instanceof TopFieldDocs[])) { if (!(topDocs instanceof TopFieldDocs[])) {
throw new IllegalStateException("Expected TopFieldDocs[], got TopDocs[]"); throw new IllegalStateException("Expected TopFieldDocs[], got TopDocs[]");
} }
result = TopDocs.merge(sort, startN, if (startN == null) {
topN, int defaultTopN = 0;
(TopFieldDocs[]) topDocs, for (TopDocs td : topDocs) {
tieBreaker int length = td.scoreDocs.length;
); defaultTopN += length;
}
result = TopDocs.merge(sort, 0, defaultTopN,
(TopFieldDocs[]) topDocs,
tieBreaker
);
} else {
result = TopDocs.merge(sort, startN,
topN,
(TopFieldDocs[]) topDocs,
tieBreaker
);
}
} else { } else {
result = TopDocs.merge(startN, if (startN == null) {
topN, int defaultTopN = 0;
topDocs, for (TopDocs td : topDocs) {
tieBreaker int length = td.scoreDocs.length;
); defaultTopN += length;
}
result = TopDocs.merge(0,
defaultTopN,
topDocs,
tieBreaker
);
} else {
result = TopDocs.merge(startN,
topN,
topDocs,
tieBreaker
);
}
} }
return result; return result;
} }

View File

@ -6,6 +6,7 @@ import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SE
import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.LuceneUtils;
import org.apache.lucene.search.CollectorManager; import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.Sort; import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldCollector; import org.apache.lucene.search.TopFieldCollector;
import org.apache.lucene.search.TopFieldDocs; import org.apache.lucene.search.TopFieldDocs;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
@ -26,8 +27,10 @@ public class ScoredLuceneMultiSearcher implements LuceneMultiSearcher {
} else { } else {
paginationInfo = new PaginationInfo(queryParams.limit(), queryParams.offset(), FIRST_PAGE_LIMIT, false); paginationInfo = new PaginationInfo(queryParams.limit(), queryParams.offset(), FIRST_PAGE_LIMIT, false);
} }
CollectorManager<TopFieldCollector, TopFieldDocs> sharedManager = TopFieldCollector CollectorManager<TopFieldCollector, TopDocs> sharedManager = new ScoringShardsCollectorManager(luceneSort,
.createSharedManager(luceneSort, LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset() + paginationInfo.firstPageLimit()), null, 1000); LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset() + paginationInfo.firstPageLimit()),
null, 1000, LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset()),
LuceneUtils.safeLongToInt(paginationInfo.firstPageLimit()));
return new ScoredSimpleLuceneShardSearcher(sharedManager, queryParams.query(), paginationInfo); return new ScoredSimpleLuceneShardSearcher(sharedManager, queryParams.query(), paginationInfo);
}); });
} }

View File

@ -28,13 +28,13 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher {
private final Object lock = new Object(); private final Object lock = new Object();
private final List<IndexSearcher> indexSearchersArray = new ArrayList<>(); private final List<IndexSearcher> indexSearchersArray = new ArrayList<>();
private final List<TopFieldCollector> collectors = new ArrayList<>(); private final List<TopFieldCollector> collectors = new ArrayList<>();
private final CollectorManager<TopFieldCollector, TopFieldDocs> sharedManager; private final CollectorManager<TopFieldCollector, TopDocs> firstPageSharedManager;
private final Query luceneQuery; private final Query luceneQuery;
private final PaginationInfo paginationInfo; private final PaginationInfo paginationInfo;
public ScoredSimpleLuceneShardSearcher(CollectorManager<TopFieldCollector, TopFieldDocs> sharedManager, public ScoredSimpleLuceneShardSearcher(CollectorManager<TopFieldCollector, TopDocs> firstPageSharedManager,
Query luceneQuery, PaginationInfo paginationInfo) { Query luceneQuery, PaginationInfo paginationInfo) {
this.sharedManager = sharedManager; this.firstPageSharedManager = firstPageSharedManager;
this.luceneQuery = luceneQuery; this.luceneQuery = luceneQuery;
this.paginationInfo = paginationInfo; this.paginationInfo = paginationInfo;
} }
@ -45,7 +45,7 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher {
TopFieldCollector collector; TopFieldCollector collector;
synchronized (lock) { synchronized (lock) {
//noinspection BlockingMethodInNonBlockingContext //noinspection BlockingMethodInNonBlockingContext
collector = sharedManager.newCollector(); collector = firstPageSharedManager.newCollector();
indexSearchersArray.add(indexSearcher); indexSearchersArray.add(indexSearcher);
collectors.add(collector); collectors.add(collector);
} }
@ -65,42 +65,9 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher {
return Mono return Mono
.fromCallable(() -> { .fromCallable(() -> {
TopDocs result; TopDocs result;
if (queryParams.isSorted()) { synchronized (lock) {
TopFieldDocs[] topDocs; //noinspection BlockingMethodInNonBlockingContext
synchronized (lock) { result = firstPageSharedManager.reduce(collectors);
topDocs = new TopFieldDocs[collectors.size()];
var i = 0;
for (TopFieldCollector collector : collectors) {
topDocs[i] = collector.topDocs();
for (ScoreDoc scoreDoc : topDocs[i].scoreDocs) {
scoreDoc.shardIndex = i;
}
i++;
}
}
result = LuceneUtils.mergeTopDocs(queryParams.sort(), LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset()),
LuceneUtils.safeLongToInt(paginationInfo.firstPageLimit()),
topDocs,
TIE_BREAKER
);
} else {
TopDocs[] topDocs;
synchronized (lock) {
topDocs = new TopDocs[collectors.size()];
var i = 0;
for (TopFieldCollector collector : collectors) {
topDocs[i] = collector.topDocs();
for (ScoreDoc scoreDoc : topDocs[i].scoreDocs) {
scoreDoc.shardIndex = i;
}
i++;
}
}
result = TopDocs.merge(LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset()),
LuceneUtils.safeLongToInt(paginationInfo.firstPageLimit()),
topDocs,
TIE_BREAKER
);
} }
IndexSearchers indexSearchers; IndexSearchers indexSearchers;
synchronized (lock) { synchronized (lock) {
@ -120,13 +87,13 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher {
paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1), paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1),
(s, sink) -> { (s, sink) -> {
if (s.last() != null && s.remainingLimit() > 0) { if (s.last() != null && s.remainingLimit() > 0) {
CollectorManager<TopFieldCollector, TopFieldDocs> sharedManager;
Sort luceneSort = queryParams.sort(); Sort luceneSort = queryParams.sort();
if (luceneSort == null) { if (luceneSort == null) {
luceneSort = Sort.RELEVANCE; luceneSort = Sort.RELEVANCE;
} }
sharedManager = TopFieldCollector.createSharedManager(luceneSort, s.currentPageLimit(), CollectorManager<TopFieldCollector, TopDocs> sharedManager
(FieldDoc) s.last(), 1000); = new ScoringShardsCollectorManager(luceneSort, s.currentPageLimit(),
(FieldDoc) s.last(), 1000, 0, s.currentPageLimit());
//noinspection BlockingMethodInNonBlockingContext //noinspection BlockingMethodInNonBlockingContext
TopDocs pageTopDocs = Flux TopDocs pageTopDocs = Flux
.fromIterable(indexSearchersArray) .fromIterable(indexSearchersArray)
@ -136,28 +103,17 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher {
long shardIndex = tuple.getT1(); long shardIndex = tuple.getT1();
IndexSearcher indexSearcher = tuple.getT2(); IndexSearcher indexSearcher = tuple.getT2();
//noinspection BlockingMethodInNonBlockingContext //noinspection BlockingMethodInNonBlockingContext
var results = indexSearcher.search(luceneQuery, sharedManager); TopFieldCollector collector = sharedManager.newCollector();
for (ScoreDoc scoreDoc : results.scoreDocs) { //noinspection BlockingMethodInNonBlockingContext
scoreDoc.shardIndex = LuceneUtils.safeLongToInt(shardIndex); indexSearcher.search(luceneQuery, collector);
} return collector;
return results;
}) })
.subscribeOn(scheduler) .subscribeOn(scheduler)
) )
.collect(Collectors.toCollection(ObjectArrayList::new)) .collect(Collectors.toCollection(ObjectArrayList::new))
.map(topFieldDocs -> topFieldDocs.toArray(TopFieldDocs[]::new)) .flatMap(collectors -> Mono.fromCallable(() -> {
.flatMap(topFieldDocs -> Mono.fromCallable(() -> { //noinspection BlockingMethodInNonBlockingContext
if (queryParams.isSorted()) { return sharedManager.reduce(collectors);
return LuceneUtils.mergeTopDocs(queryParams.sort(), 0, s.currentPageLimit(),
topFieldDocs,
TIE_BREAKER
);
} else {
return TopDocs.merge(0, s.currentPageLimit(),
topFieldDocs,
TIE_BREAKER
);
}
}).subscribeOn(scheduler)) }).subscribeOn(scheduler))
.subscribeOn(Schedulers.immediate()) .subscribeOn(Schedulers.immediate())
.blockOptional().orElseThrow(); .blockOptional().orElseThrow();
@ -177,7 +133,11 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher {
); );
}); });
return new LuceneSearchResult(result.totalHits.value, firstPageHits.concatWith(nextHits)); return new LuceneSearchResult(result.totalHits.value,
firstPageHits
.concatWith(nextHits)
.transform(flux -> LuceneUtils.filterTopDoc(flux, queryParams))
);
}) })
.subscribeOn(scheduler); .subscribeOn(scheduler);
} }

View File

@ -0,0 +1,99 @@
package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.lucene.searcher.CurrentPageInfo.TIE_BREAKER;
import it.cavallium.dbengine.lucene.LuceneUtils;
import java.io.IOException;
import java.util.Collection;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldCollector;
import org.apache.lucene.search.TopFieldDocs;
import org.jetbrains.annotations.Nullable;
public class ScoringShardsCollectorManager implements CollectorManager<TopFieldCollector, TopDocs> {
private final Sort sort;
private final int numHits;
private final FieldDoc after;
private final int totalHitsThreshold;
private final @Nullable Integer startN;
private final @Nullable Integer topN;
private final CollectorManager<TopFieldCollector, TopFieldDocs> sharedCollectorManager;
public ScoringShardsCollectorManager(final Sort sort,
final int numHits,
final FieldDoc after,
final int totalHitsThreshold,
int startN,
int topN) {
this(sort, numHits, after, totalHitsThreshold, (Integer) startN, (Integer) topN);
}
public ScoringShardsCollectorManager(final Sort sort,
final int numHits,
final FieldDoc after,
final int totalHitsThreshold) {
this(sort, numHits, after, totalHitsThreshold, null, null);
}
private ScoringShardsCollectorManager(final Sort sort,
final int numHits,
final FieldDoc after,
final int totalHitsThreshold,
@Nullable Integer startN,
@Nullable Integer topN) {
this.sort = sort;
this.numHits = numHits;
this.after = after;
this.totalHitsThreshold = totalHitsThreshold;
this.startN = startN;
this.topN = topN;
this.sharedCollectorManager = TopFieldCollector.createSharedManager(sort, numHits, after, totalHitsThreshold);
}
@Override
public TopFieldCollector newCollector() throws IOException {
return sharedCollectorManager.newCollector();
}
@Override
public TopDocs reduce(Collection<TopFieldCollector> collectors) throws IOException {
TopDocs result;
if (sort != null) {
TopFieldDocs[] topDocs = new TopFieldDocs[collectors.size()];
var i = 0;
for (TopFieldCollector collector : collectors) {
topDocs[i] = collector.topDocs();
for (ScoreDoc scoreDoc : topDocs[i].scoreDocs) {
scoreDoc.shardIndex = i;
}
i++;
}
result = LuceneUtils.mergeTopDocs(sort, startN,
topN,
topDocs,
TIE_BREAKER
);
} else {
TopDocs[] topDocs = new TopDocs[collectors.size()];
var i = 0;
for (TopFieldCollector collector : collectors) {
topDocs[i] = collector.topDocs();
for (ScoreDoc scoreDoc : topDocs[i].scoreDocs) {
scoreDoc.shardIndex = i;
}
i++;
}
result = LuceneUtils.mergeTopDocs(null, startN,
topN,
topDocs,
TIE_BREAKER
);
}
return result;
}
}

View File

@ -83,7 +83,10 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
); );
}); });
return new LuceneSearchResult(firstPageTopDocs.totalHits.value, firstPageMono.concatWith(nextHits)); return new LuceneSearchResult(firstPageTopDocs.totalHits.value, firstPageMono
.concatWith(nextHits)
.transform(flux -> LuceneUtils.filterTopDoc(flux, queryParams))
);
}) })
.subscribeOn(scheduler); .subscribeOn(scheduler);
} }

View File

@ -91,11 +91,12 @@ class TopDocsSearcher {
return topDocs; return topDocs;
} }
public static TopDocsCollector<? extends ScoreDoc> getTopDocsCollector(Sort luceneSort, @SuppressWarnings({"unchecked", "rawtypes"})
public static TopDocsCollector<ScoreDoc> getTopDocsCollector(Sort luceneSort,
int limit, int limit,
ScoreDoc after, ScoreDoc after,
int totalHitsThreshold) { int totalHitsThreshold) {
TopDocsCollector<? extends ScoreDoc> collector; TopDocsCollector<ScoreDoc> collector;
if (luceneSort == null) { if (luceneSort == null) {
if (after == null) { if (after == null) {
collector = TopScoreDocCollector.create(limit, totalHitsThreshold); collector = TopScoreDocCollector.create(limit, totalHitsThreshold);
@ -104,9 +105,9 @@ class TopDocsSearcher {
} }
} else { } else {
if (after == null) { if (after == null) {
collector = TopFieldCollector.create(luceneSort, limit, totalHitsThreshold); collector = (TopDocsCollector<ScoreDoc>) (TopDocsCollector) TopFieldCollector.create(luceneSort, limit, totalHitsThreshold);
} else if (after instanceof FieldDoc afterFieldDoc) { } else if (after instanceof FieldDoc afterFieldDoc) {
collector = TopFieldCollector.create(luceneSort, limit, afterFieldDoc, totalHitsThreshold); collector = (TopDocsCollector<ScoreDoc>) (TopDocsCollector) TopFieldCollector.create(luceneSort, limit, afterFieldDoc, totalHitsThreshold);
} else { } else {
throw new UnsupportedOperationException("GetTopDocs with \"luceneSort\" != null requires \"after\" to be a FieldDoc"); throw new UnsupportedOperationException("GetTopDocs with \"luceneSort\" != null requires \"after\" to be a FieldDoc");
} }

View File

@ -16,14 +16,14 @@ import org.apache.lucene.search.TopFieldDocs;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
public class UnscoredCollectorManager implements public class UnscoredCollectorManager implements
CollectorManager<TopDocsCollector<? extends ScoreDoc>, TopDocs> { CollectorManager<TopDocsCollector<ScoreDoc>, TopDocs> {
private final Supplier<TopDocsCollector<? extends ScoreDoc>> collectorSupplier; private final Supplier<TopDocsCollector<ScoreDoc>> collectorSupplier;
private final long offset; private final long offset;
private final long limit; private final long limit;
private final Sort sort; private final Sort sort;
public UnscoredCollectorManager(Supplier<TopDocsCollector<? extends ScoreDoc>> collectorSupplier, public UnscoredCollectorManager(Supplier<TopDocsCollector<ScoreDoc>> collectorSupplier,
long offset, long offset,
long limit, long limit,
@Nullable Sort sort) { @Nullable Sort sort) {
@ -34,12 +34,12 @@ public class UnscoredCollectorManager implements
} }
@Override @Override
public TopDocsCollector<? extends ScoreDoc> newCollector() throws IOException { public TopDocsCollector<ScoreDoc> newCollector() throws IOException {
return collectorSupplier.get(); return collectorSupplier.get();
} }
@Override @Override
public TopDocs reduce(Collection<TopDocsCollector<? extends ScoreDoc>> collection) throws IOException { public TopDocs reduce(Collection<TopDocsCollector<ScoreDoc>> collection) throws IOException {
int i = 0; int i = 0;
TopDocs[] topDocsArray; TopDocs[] topDocsArray;
if (sort != null) { if (sort != null) {

View File

@ -26,12 +26,12 @@ class UnscoredLuceneShardSearcher implements LuceneShardSearcher {
private final Object lock = new Object(); private final Object lock = new Object();
private final List<IndexSearcher> indexSearchersArray = new ArrayList<>(); private final List<IndexSearcher> indexSearchersArray = new ArrayList<>();
private final List<TopDocsCollector<? extends ScoreDoc>> collectors = new ArrayList<>(); private final List<TopDocsCollector<ScoreDoc>> collectors = new ArrayList<>();
private final CollectorManager<TopDocsCollector<? extends ScoreDoc>, ? extends TopDocs> firstPageUnsortedCollectorManager; private final CollectorManager<TopDocsCollector<ScoreDoc>, TopDocs> firstPageUnsortedCollectorManager;
private final Query luceneQuery; private final Query luceneQuery;
private final PaginationInfo paginationInfo; private final PaginationInfo paginationInfo;
public UnscoredLuceneShardSearcher(CollectorManager<TopDocsCollector<? extends ScoreDoc>, ? extends TopDocs> unsortedCollectorManager, public UnscoredLuceneShardSearcher(CollectorManager<TopDocsCollector<ScoreDoc>, TopDocs> unsortedCollectorManager,
Query luceneQuery, Query luceneQuery,
PaginationInfo paginationInfo) { PaginationInfo paginationInfo) {
this.firstPageUnsortedCollectorManager = unsortedCollectorManager; this.firstPageUnsortedCollectorManager = unsortedCollectorManager;
@ -42,7 +42,7 @@ class UnscoredLuceneShardSearcher implements LuceneShardSearcher {
@Override @Override
public Mono<Void> searchOn(IndexSearcher indexSearcher, LocalQueryParams queryParams, Scheduler scheduler) { public Mono<Void> searchOn(IndexSearcher indexSearcher, LocalQueryParams queryParams, Scheduler scheduler) {
return Mono.<Void>fromCallable(() -> { return Mono.<Void>fromCallable(() -> {
TopDocsCollector<? extends ScoreDoc> collector; TopDocsCollector<ScoreDoc> collector;
synchronized (lock) { synchronized (lock) {
//noinspection BlockingMethodInNonBlockingContext //noinspection BlockingMethodInNonBlockingContext
collector = firstPageUnsortedCollectorManager.newCollector(); collector = firstPageUnsortedCollectorManager.newCollector();
@ -59,28 +59,11 @@ class UnscoredLuceneShardSearcher implements LuceneShardSearcher {
public Mono<LuceneSearchResult> collect(LocalQueryParams queryParams, String keyFieldName, Scheduler scheduler) { public Mono<LuceneSearchResult> collect(LocalQueryParams queryParams, String keyFieldName, Scheduler scheduler) {
return Mono return Mono
.fromCallable(() -> { .fromCallable(() -> {
TopDocs[] topDocs; TopDocs result;
synchronized (lock) { synchronized (lock) {
if (queryParams.isSorted()) { //noinspection BlockingMethodInNonBlockingContext
topDocs = new TopFieldDocs[collectors.size()]; result = firstPageUnsortedCollectorManager.reduce(collectors);
} else {
topDocs = new TopDocs[collectors.size()];
}
var i = 0;
for (TopDocsCollector<?> collector : collectors) {
topDocs[i] = collector.topDocs();
for (ScoreDoc scoreDoc : topDocs[i].scoreDocs) {
scoreDoc.shardIndex = i;
}
i++;
}
} }
TopDocs result = LuceneUtils.mergeTopDocs(queryParams.sort(),
LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset()),
LuceneUtils.safeLongToInt(paginationInfo.firstPageLimit()),
topDocs,
TIE_BREAKER
);
IndexSearchers indexSearchers; IndexSearchers indexSearchers;
synchronized (lock) { synchronized (lock) {
indexSearchers = IndexSearchers.of(indexSearchersArray); indexSearchers = IndexSearchers.of(indexSearchersArray);
@ -106,34 +89,19 @@ class UnscoredLuceneShardSearcher implements LuceneShardSearcher {
//noinspection BlockingMethodInNonBlockingContext //noinspection BlockingMethodInNonBlockingContext
TopDocs pageTopDocs = Flux TopDocs pageTopDocs = Flux
.fromIterable(indexSearchersArray) .fromIterable(indexSearchersArray)
.index() .flatMapSequential(indexSearcher -> Mono
.flatMapSequential(tuple -> Mono
.fromCallable(() -> { .fromCallable(() -> {
long shardIndex = tuple.getT1();
IndexSearcher indexSearcher = tuple.getT2();
//noinspection BlockingMethodInNonBlockingContext //noinspection BlockingMethodInNonBlockingContext
var results = indexSearcher.search(luceneQuery, currentPageUnsortedCollectorManager); var collector = currentPageUnsortedCollectorManager.newCollector();
for (ScoreDoc scoreDoc : results.scoreDocs) { //noinspection BlockingMethodInNonBlockingContext
scoreDoc.shardIndex = LuceneUtils.safeLongToInt(shardIndex); indexSearcher.search(luceneQuery, collector);
} return collector;
return results;
}) })
.subscribeOn(scheduler) .subscribeOn(scheduler)
) )
.collect(Collectors.toCollection(ObjectArrayList::new)) .collect(Collectors.toCollection(ObjectArrayList::new))
.map(topFieldDocs -> { .flatMap(collectors -> Mono
if (queryParams.isSorted()) { .fromCallable(() -> currentPageUnsortedCollectorManager.reduce(collectors))
@SuppressWarnings("SuspiciousToArrayCall")
TopFieldDocs[] topFieldDocsArray = topFieldDocs.toArray(TopFieldDocs[]::new);
return topFieldDocsArray;
} else {
return topFieldDocs.toArray(TopDocs[]::new);
}
})
.flatMap(topFieldDocs -> Mono
.fromCallable(() -> LuceneUtils
.mergeTopDocs(queryParams.sort(), 0, s.currentPageLimit(), topFieldDocs, TIE_BREAKER)
)
.subscribeOn(scheduler) .subscribeOn(scheduler)
) )
.blockOptional().orElseThrow(); .blockOptional().orElseThrow();