From f03f7296d40f2be1cdcabe14e5214022e4ca2e7c Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sat, 25 Sep 2021 14:09:10 +0200 Subject: [PATCH] Separate LeafCollector --- .../searcher/AdaptiveLuceneMultiSearcher.java | 2 +- ...UnscoredContinuousLuceneMultiSearcher.java | 62 +++++++++++-------- 2 files changed, 36 insertions(+), 28 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneMultiSearcher.java index f151cce..3b6a835 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneMultiSearcher.java @@ -28,7 +28,7 @@ public class AdaptiveLuceneMultiSearcher implements LuceneMultiSearcher { } else if (queryParams.isSorted() || queryParams.isScored()) { return scoredSimpleLuceneShardSearcher.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); } else { - if (queryParams.offset() + queryParams.limit() <= queryParams.pageLimits().getPageLimit(0)) { + if (((long) queryParams.offset() + (long) queryParams.limit()) <= (long) queryParams.pageLimits().getPageLimit(0)) { // Run single-page searches using the paged multi searcher return unsortedUnscoredPagedLuceneMultiSearcher.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); } else { diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredContinuousLuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredContinuousLuceneMultiSearcher.java index a8226d8..08403c9 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredContinuousLuceneMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredContinuousLuceneMultiSearcher.java @@ -21,6 +21,8 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.Collector; import org.apache.lucene.search.CollectorManager; import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.LeafCollector; +import org.apache.lucene.search.Scorable; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.SimpleCollector; @@ -35,9 +37,11 @@ import reactor.util.concurrent.Queues; public class UnsortedUnscoredContinuousLuceneMultiSearcher implements LuceneMultiSearcher { - private static final Scheduler UNSCORED_UNSORTED_EXECUTOR = Schedulers.newBoundedElastic(Runtime - .getRuntime() - .availableProcessors(), Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "UnscoredUnsortedExecutor"); + private static final Scheduler UNSCORED_UNSORTED_EXECUTOR = Schedulers.newBoundedElastic( + Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, + Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, + "UnscoredUnsortedExecutor" + ); private static final Supplier> QUEUE_SUPPLIER = Queues.get(1024); @Override @@ -67,35 +71,39 @@ public class UnsortedUnscoredContinuousLuceneMultiSearcher implements LuceneMult var cm = new CollectorManager() { - class IterableCollector extends SimpleCollector { + class IterableCollector implements Collector { private int shardIndex; - private LeafReaderContext context; @Override - public void collect(int i) { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called collect in a nonblocking thread"); - } - var scoreDoc = new ScoreDoc(context.docBase + i, 0, shardIndex); - boolean shouldRetry; - do { - var currentError = scoreDocsSink.tryEmitNext(scoreDoc); - shouldRetry = currentError == EmitResult.FAIL_NON_SERIALIZED - || currentError == EmitResult.FAIL_OVERFLOW - || currentError == EmitResult.FAIL_ZERO_SUBSCRIBER; - if (shouldRetry) { - LockSupport.parkNanos(10); - } - if (!shouldRetry && currentError.isFailure()) { - currentError.orThrow(); - } - } while (shouldRetry); - } + public LeafCollector getLeafCollector(LeafReaderContext leafReaderContext) throws IOException { + return new LeafCollector() { + @Override + public void setScorer(Scorable scorable) throws IOException { - @Override - protected void doSetNextReader(LeafReaderContext context) { - this.context = context; + } + + @Override + public void collect(int i) throws IOException { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called collect in a nonblocking thread"); + } + var scoreDoc = new ScoreDoc(leafReaderContext.docBase + i, 0, shardIndex); + boolean shouldRetry; + do { + var currentError = scoreDocsSink.tryEmitNext(scoreDoc); + shouldRetry = currentError == EmitResult.FAIL_NON_SERIALIZED + || currentError == EmitResult.FAIL_OVERFLOW + || currentError == EmitResult.FAIL_ZERO_SUBSCRIBER; + if (shouldRetry) { + LockSupport.parkNanos(10); + } + if (!shouldRetry && currentError.isFailure()) { + currentError.orThrow(); + } + } while (shouldRetry); + } + }; } @Override