Fix hanging on ScoredSimpleLuceneShardSearcher

This commit is contained in:
Andrea Cavalli 2021-08-25 10:23:42 +02:00
parent 6e31aa01f9
commit 6fe9f9c24f
2 changed files with 71 additions and 51 deletions

View File

@ -20,7 +20,7 @@ public interface LuceneShardSearcher {
/**
* @param queryParams the query parameters
* @param keyFieldName the name of the key field
* @param scheduler a blocking scheduler
* @param collectorScheduler a blocking scheduler
*/
Mono<LuceneSearchResult> collect(LocalQueryParams queryParams, String keyFieldName, Scheduler scheduler);
Mono<LuceneSearchResult> collect(LocalQueryParams queryParams, String keyFieldName, Scheduler collectorScheduler);
}

View File

@ -8,6 +8,9 @@ import it.cavallium.dbengine.lucene.LuceneUtils;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.FieldDoc;
@ -20,6 +23,8 @@ import org.apache.lucene.search.TopFieldCollector;
import org.apache.lucene.search.TopFieldDocs;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Empty;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
@ -61,7 +66,7 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher {
}
@Override
public Mono<LuceneSearchResult> collect(LocalQueryParams queryParams, String keyFieldName, Scheduler scheduler) {
public Mono<LuceneSearchResult> collect(LocalQueryParams queryParams, String keyFieldName, Scheduler collectorScheduler) {
if (!queryParams.isScored()) {
return Mono.error(
new UnsupportedOperationException("Can't execute an unscored query with a scored lucene shard searcher")
@ -72,7 +77,7 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher {
TopDocs result;
Mono<Void> release;
synchronized (lock) {
//noinspection BlockingMethodInNonBlockingContext
result = firstPageSharedManager.reduce(collectors);
release = Mono.when(indexSearcherReleasersArray);
}
@ -81,7 +86,7 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher {
indexSearchers = IndexSearchers.of(indexSearchersArray);
}
Flux<LLKeyScore> firstPageHits = LuceneUtils
.convertHits(result.scoreDocs, indexSearchers, keyFieldName, scheduler, true);
.convertHits(result.scoreDocs, indexSearchers, keyFieldName, collectorScheduler, true);
Flux<LLKeyScore> nextHits = Flux.defer(() -> {
if (paginationInfo.forceSinglePage()
@ -89,54 +94,69 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher {
return Flux.empty();
}
return Flux
.<TopDocs, CurrentPageInfo>generate(
() -> new CurrentPageInfo(LuceneUtils.getLastFieldDoc(result.scoreDocs),
paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1),
(s, sink) -> {
if (s.last() != null && s.remainingLimit() > 0) {
Sort luceneSort = queryParams.sort();
if (luceneSort == null) {
luceneSort = Sort.RELEVANCE;
.<TopDocs>create(emitter -> {
Empty<Void> cancelEvent = Sinks.empty();
AtomicReference<CurrentPageInfo> currentPageInfoAtomicReference = new AtomicReference<>(new CurrentPageInfo(LuceneUtils.getLastFieldDoc(result.scoreDocs),
paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1));
emitter.onRequest(requests -> {
synchronized (currentPageInfoAtomicReference) {
var s = currentPageInfoAtomicReference.get();
while (requests > 0 && !emitter.isCancelled()) {
requests--;
if (s.last() != null && s.remainingLimit() > 0) {
Sort luceneSort = queryParams.sort();
if (luceneSort == null) {
luceneSort = Sort.RELEVANCE;
}
CollectorManager<TopFieldCollector, TopDocs> sharedManager
= new ScoringShardsCollectorManager(luceneSort, s.currentPageLimit(),
(FieldDoc) s.last(), LuceneUtils.totalHitsThreshold(), 0, s.currentPageLimit());
TopDocs pageTopDocs = Flux
.fromIterable(indexSearchersArray)
.index()
.flatMapSequential(tuple -> Mono
.fromCallable(() -> {
long shardIndex = tuple.getT1();
IndexSearcher indexSearcher = tuple.getT2();
TopFieldCollector collector = sharedManager.newCollector();
indexSearcher.search(luceneQuery, collector);
return collector;
})
.subscribeOn(Schedulers.immediate())
)
.collect(Collectors.toCollection(ObjectArrayList::new))
.flatMap(collectors -> Mono
.fromCallable(() -> sharedManager.reduce(collectors))
.subscribeOn(Schedulers.immediate())
)
.single()
.takeUntilOther(cancelEvent.asMono())
.block();
if (!emitter.isCancelled()) {
Objects.requireNonNull(pageTopDocs);
var pageLastDoc = LuceneUtils.getLastFieldDoc(pageTopDocs.scoreDocs);
emitter.next(pageTopDocs);
s = new CurrentPageInfo(pageLastDoc, s.remainingLimit() - s.currentPageLimit(), s.pageIndex() + 1);
} else {
s = EMPTY_STATUS;
requests = 0;
}
} else {
emitter.complete();
s = EMPTY_STATUS;
requests = 0;
}
CollectorManager<TopFieldCollector, TopDocs> sharedManager
= new ScoringShardsCollectorManager(luceneSort, s.currentPageLimit(),
(FieldDoc) s.last(), LuceneUtils.totalHitsThreshold(), 0, s.currentPageLimit());
//noinspection BlockingMethodInNonBlockingContext
TopDocs pageTopDocs = Flux
.fromIterable(indexSearchersArray)
.index()
.flatMapSequential(tuple -> Mono
.fromCallable(() -> {
long shardIndex = tuple.getT1();
IndexSearcher indexSearcher = tuple.getT2();
//noinspection BlockingMethodInNonBlockingContext
TopFieldCollector collector = sharedManager.newCollector();
//noinspection BlockingMethodInNonBlockingContext
indexSearcher.search(luceneQuery, collector);
return collector;
})
.subscribeOn(scheduler)
)
.collect(Collectors.toCollection(ObjectArrayList::new))
.flatMap(collectors -> Mono.fromCallable(() -> {
//noinspection BlockingMethodInNonBlockingContext
return sharedManager.reduce(collectors);
}).subscribeOn(scheduler))
.subscribeOn(Schedulers.immediate())
.blockOptional().orElseThrow();
var pageLastDoc = LuceneUtils.getLastFieldDoc(pageTopDocs.scoreDocs);
sink.next(pageTopDocs);
return new CurrentPageInfo(pageLastDoc, s.remainingLimit() - s.currentPageLimit(), s.pageIndex() + 1);
} else {
sink.complete();
return EMPTY_STATUS;
}
},
s -> {}
)
.subscribeOn(scheduler)
currentPageInfoAtomicReference.set(s);
}
});
emitter.onCancel(cancelEvent::tryEmitEmpty);
})
.subscribeOn(collectorScheduler)
.flatMapSequential(topFieldDoc -> LuceneUtils
.convertHits(topFieldDoc.scoreDocs, indexSearchers, keyFieldName, scheduler, true)
.convertHits(topFieldDoc.scoreDocs, indexSearchers, keyFieldName, collectorScheduler, true)
);
});
@ -147,7 +167,7 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher {
release
);
})
.subscribeOn(scheduler);
.subscribeOn(Schedulers.boundedElastic());
}
}