From 8e15020f5b3d097ef58811b27b90e02592c1c6c1 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sat, 25 Sep 2021 13:07:52 +0200 Subject: [PATCH] Improve performance of infinite queries --- .../searcher/AdaptiveLuceneMultiSearcher.java | 15 +- ...UnscoredContinuousLuceneMultiSearcher.java | 163 ++++++++++++++++++ 2 files changed, 174 insertions(+), 4 deletions(-) create mode 100644 src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredContinuousLuceneMultiSearcher.java diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneMultiSearcher.java index 8558a66..f151cce 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneMultiSearcher.java @@ -1,9 +1,7 @@ package it.cavallium.dbengine.lucene.searcher; import io.net5.buffer.api.Send; -import it.cavallium.dbengine.database.disk.LLIndexSearcher; import it.cavallium.dbengine.database.disk.LLIndexSearchers; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class AdaptiveLuceneMultiSearcher implements LuceneMultiSearcher { @@ -14,9 +12,12 @@ public class AdaptiveLuceneMultiSearcher implements LuceneMultiSearcher { private static final LuceneMultiSearcher scoredSimpleLuceneShardSearcher = new ScoredSimpleLuceneShardSearcher(); - private static final LuceneMultiSearcher unscoredPagedLuceneMultiSearcher + private static final LuceneMultiSearcher unsortedUnscoredPagedLuceneMultiSearcher = new SimpleUnsortedUnscoredLuceneMultiSearcher(new SimpleLuceneLocalSearcher()); + private static final LuceneMultiSearcher unsortedUnscoredContinuousLuceneMultiSearcher + = new UnsortedUnscoredContinuousLuceneMultiSearcher(); + @Override public Mono> collectMulti(Mono> indexSearchersMono, LocalQueryParams queryParams, @@ -27,7 +28,13 @@ public class AdaptiveLuceneMultiSearcher implements LuceneMultiSearcher { } else if (queryParams.isSorted() || queryParams.isScored()) { return scoredSimpleLuceneShardSearcher.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); } else { - return unscoredPagedLuceneMultiSearcher.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); + if (queryParams.offset() + queryParams.limit() <= queryParams.pageLimits().getPageLimit(0)) { + // Run single-page searches using the paged multi searcher + return unsortedUnscoredPagedLuceneMultiSearcher.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); + } else { + // Run large/unbounded searches using the continuous multi searcher + return unsortedUnscoredContinuousLuceneMultiSearcher.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); + } } } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredContinuousLuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredContinuousLuceneMultiSearcher.java new file mode 100644 index 0000000..6342035 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredContinuousLuceneMultiSearcher.java @@ -0,0 +1,163 @@ +package it.cavallium.dbengine.lucene.searcher; + +import io.net5.buffer.api.Resource; +import io.net5.buffer.api.Send; +import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; +import it.cavallium.dbengine.database.LLKeyScore; +import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.disk.LLIndexSearcher; +import it.cavallium.dbengine.database.disk.LLIndexSearchers; +import it.cavallium.dbengine.lucene.LuceneUtils; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.LockSupport; +import java.util.function.Supplier; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.CollectorManager; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.SimpleCollector; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.EmitResult; +import reactor.core.publisher.Sinks.Many; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; +import reactor.util.concurrent.Queues; + +public class UnsortedUnscoredContinuousLuceneMultiSearcher implements LuceneMultiSearcher { + + private static final Scheduler UNSCORED_UNSORTED_EXECUTOR = Schedulers.newBoundedElastic(Runtime + .getRuntime() + .availableProcessors(), Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "UnscoredUnsortedExecutor"); + private static final Supplier> QUEUE_SUPPLIER = Queues.get(1024); + + @Override + public Mono> collectMulti(Mono> indexSearchersMono, + LocalQueryParams queryParams, + String keyFieldName, + LLSearchTransformer transformer) { + var indexSearchersSendResource = Mono + .fromRunnable(() -> { + LLUtils.ensureBlocking(); + if (queryParams.isSorted() && queryParams.limit() > 0) { + throw new UnsupportedOperationException("Sorted queries are not supported" + + " by UnsortedUnscoredContinuousLuceneMultiSearcher"); + } + if (queryParams.isScored() && queryParams.limit() > 0) { + throw new UnsupportedOperationException("Scored queries are not supported" + + " by UnsortedUnscoredContinuousLuceneMultiSearcher"); + } + }) + .then(indexSearchersMono); + var localQueryParams = getLocalQueryParams(queryParams); + + return LLUtils.usingSendResource(indexSearchersSendResource, + indexSearchers -> Mono.fromCallable(() -> { + + Many scoreDocsSink = Sinks.many().unicast().onBackpressureBuffer(QUEUE_SUPPLIER.get()); + + var cm = new CollectorManager() { + + class IterableCollector extends SimpleCollector { + + private int shardIndex; + private LeafReaderContext context; + + @Override + public void collect(int i) { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called collect in a nonblocking thread"); + } + var scoreDoc = new ScoreDoc(context.docBase + i, 0, shardIndex); + synchronized (scoreDocsSink) { + while (scoreDocsSink.tryEmitNext(scoreDoc) == EmitResult.FAIL_OVERFLOW) { + LockSupport.parkNanos(10); + } + } + } + + @Override + protected void doSetNextReader(LeafReaderContext context) { + this.context = context; + } + + @Override + public ScoreMode scoreMode() { + return ScoreMode.COMPLETE_NO_SCORES; + } + + public void setShardIndex(int shardIndex) { + this.shardIndex = shardIndex; + } + } + + @Override + public IterableCollector newCollector() { + return new IterableCollector(); + } + + @Override + public Void reduce(Collection collection) { + throw new UnsupportedOperationException(); + } + }; + + AtomicInteger runningTasks = new AtomicInteger(0); + var shards = indexSearchers.shards(); + + runningTasks.addAndGet(shards.size()); + int mutableShardIndex = 0; + for (IndexSearcher shard : shards) { + int shardIndex = mutableShardIndex++; + UNSCORED_UNSORTED_EXECUTOR.schedule(() -> { + try { + var collector = cm.newCollector(); + collector.setShardIndex(shardIndex); + shard.search(localQueryParams.query(), collector); + } catch (Throwable e) { + while (scoreDocsSink.tryEmitError(e) == EmitResult.FAIL_NON_SERIALIZED) { + LockSupport.parkNanos(10); + } + } finally { + if (runningTasks.decrementAndGet() <= 0) { + while (scoreDocsSink.tryEmitComplete() == EmitResult.FAIL_NON_SERIALIZED) { + LockSupport.parkNanos(10); + } + } + } + }); + } + + Flux resultsFlux = LuceneUtils.convertHits(scoreDocsSink.asFlux(), shards, keyFieldName, false); + + var totalHitsCount = new TotalHitsCount(0, false); + Flux mergedFluxes = resultsFlux + .skip(queryParams.offset()) + .take(queryParams.limit(), true); + + return new LuceneSearchResult(totalHitsCount, mergedFluxes, d -> { + indexSearchers.close(); + }).send(); + }), false); + } + + private LocalQueryParams getLocalQueryParams(LocalQueryParams queryParams) { + return new LocalQueryParams(queryParams.query(), + 0, + LuceneUtils.safeLongToInt((long) queryParams.offset() + (long) queryParams.limit()), + queryParams.pageLimits(), + queryParams.minCompetitiveScore(), + queryParams.sort(), + queryParams.scoreMode() + ); + } +}