Separate LeafCollector
This commit is contained in:
parent
5443e330bb
commit
f03f7296d4
@ -28,7 +28,7 @@ public class AdaptiveLuceneMultiSearcher implements LuceneMultiSearcher {
|
|||||||
} else if (queryParams.isSorted() || queryParams.isScored()) {
|
} else if (queryParams.isSorted() || queryParams.isScored()) {
|
||||||
return scoredSimpleLuceneShardSearcher.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
|
return scoredSimpleLuceneShardSearcher.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
|
||||||
} else {
|
} else {
|
||||||
if (queryParams.offset() + queryParams.limit() <= queryParams.pageLimits().getPageLimit(0)) {
|
if (((long) queryParams.offset() + (long) queryParams.limit()) <= (long) queryParams.pageLimits().getPageLimit(0)) {
|
||||||
// Run single-page searches using the paged multi searcher
|
// Run single-page searches using the paged multi searcher
|
||||||
return unsortedUnscoredPagedLuceneMultiSearcher.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
|
return unsortedUnscoredPagedLuceneMultiSearcher.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
|
||||||
} else {
|
} else {
|
||||||
|
@ -21,6 +21,8 @@ import org.apache.lucene.index.LeafReaderContext;
|
|||||||
import org.apache.lucene.search.Collector;
|
import org.apache.lucene.search.Collector;
|
||||||
import org.apache.lucene.search.CollectorManager;
|
import org.apache.lucene.search.CollectorManager;
|
||||||
import org.apache.lucene.search.IndexSearcher;
|
import org.apache.lucene.search.IndexSearcher;
|
||||||
|
import org.apache.lucene.search.LeafCollector;
|
||||||
|
import org.apache.lucene.search.Scorable;
|
||||||
import org.apache.lucene.search.ScoreDoc;
|
import org.apache.lucene.search.ScoreDoc;
|
||||||
import org.apache.lucene.search.ScoreMode;
|
import org.apache.lucene.search.ScoreMode;
|
||||||
import org.apache.lucene.search.SimpleCollector;
|
import org.apache.lucene.search.SimpleCollector;
|
||||||
@ -35,9 +37,11 @@ import reactor.util.concurrent.Queues;
|
|||||||
|
|
||||||
public class UnsortedUnscoredContinuousLuceneMultiSearcher implements LuceneMultiSearcher {
|
public class UnsortedUnscoredContinuousLuceneMultiSearcher implements LuceneMultiSearcher {
|
||||||
|
|
||||||
private static final Scheduler UNSCORED_UNSORTED_EXECUTOR = Schedulers.newBoundedElastic(Runtime
|
private static final Scheduler UNSCORED_UNSORTED_EXECUTOR = Schedulers.newBoundedElastic(
|
||||||
.getRuntime()
|
Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
|
||||||
.availableProcessors(), Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "UnscoredUnsortedExecutor");
|
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
|
||||||
|
"UnscoredUnsortedExecutor"
|
||||||
|
);
|
||||||
private static final Supplier<Queue<ScoreDoc>> QUEUE_SUPPLIER = Queues.get(1024);
|
private static final Supplier<Queue<ScoreDoc>> QUEUE_SUPPLIER = Queues.get(1024);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -67,35 +71,39 @@ public class UnsortedUnscoredContinuousLuceneMultiSearcher implements LuceneMult
|
|||||||
|
|
||||||
var cm = new CollectorManager<Collector, Void>() {
|
var cm = new CollectorManager<Collector, Void>() {
|
||||||
|
|
||||||
class IterableCollector extends SimpleCollector {
|
class IterableCollector implements Collector {
|
||||||
|
|
||||||
private int shardIndex;
|
private int shardIndex;
|
||||||
private LeafReaderContext context;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void collect(int i) {
|
public LeafCollector getLeafCollector(LeafReaderContext leafReaderContext) throws IOException {
|
||||||
if (Schedulers.isInNonBlockingThread()) {
|
return new LeafCollector() {
|
||||||
throw new UnsupportedOperationException("Called collect in a nonblocking thread");
|
@Override
|
||||||
}
|
public void setScorer(Scorable scorable) throws IOException {
|
||||||
var scoreDoc = new ScoreDoc(context.docBase + i, 0, shardIndex);
|
|
||||||
boolean shouldRetry;
|
|
||||||
do {
|
|
||||||
var currentError = scoreDocsSink.tryEmitNext(scoreDoc);
|
|
||||||
shouldRetry = currentError == EmitResult.FAIL_NON_SERIALIZED
|
|
||||||
|| currentError == EmitResult.FAIL_OVERFLOW
|
|
||||||
|| currentError == EmitResult.FAIL_ZERO_SUBSCRIBER;
|
|
||||||
if (shouldRetry) {
|
|
||||||
LockSupport.parkNanos(10);
|
|
||||||
}
|
|
||||||
if (!shouldRetry && currentError.isFailure()) {
|
|
||||||
currentError.orThrow();
|
|
||||||
}
|
|
||||||
} while (shouldRetry);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
}
|
||||||
protected void doSetNextReader(LeafReaderContext context) {
|
|
||||||
this.context = context;
|
@Override
|
||||||
|
public void collect(int i) throws IOException {
|
||||||
|
if (Schedulers.isInNonBlockingThread()) {
|
||||||
|
throw new UnsupportedOperationException("Called collect in a nonblocking thread");
|
||||||
|
}
|
||||||
|
var scoreDoc = new ScoreDoc(leafReaderContext.docBase + i, 0, shardIndex);
|
||||||
|
boolean shouldRetry;
|
||||||
|
do {
|
||||||
|
var currentError = scoreDocsSink.tryEmitNext(scoreDoc);
|
||||||
|
shouldRetry = currentError == EmitResult.FAIL_NON_SERIALIZED
|
||||||
|
|| currentError == EmitResult.FAIL_OVERFLOW
|
||||||
|
|| currentError == EmitResult.FAIL_ZERO_SUBSCRIBER;
|
||||||
|
if (shouldRetry) {
|
||||||
|
LockSupport.parkNanos(10);
|
||||||
|
}
|
||||||
|
if (!shouldRetry && currentError.isFailure()) {
|
||||||
|
currentError.orThrow();
|
||||||
|
}
|
||||||
|
} while (shouldRetry);
|
||||||
|
}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
Loading…
Reference in New Issue
Block a user