diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 9140397..29cee39 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -135,7 +135,9 @@ public class LLUtils { public static Document toDocument(LLItem[] document) { Document d = new Document(); for (LLItem item : document) { - d.add(LLUtils.toField(item)); + if (item != null) { + d.add(LLUtils.toField(item)); + } } return d; } diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/ReactiveCollectorMultiManager.java b/src/main/java/it/cavallium/dbengine/lucene/collector/ReactiveCollectorMultiManager.java index afd13e9..125b6da 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/collector/ReactiveCollectorMultiManager.java +++ b/src/main/java/it/cavallium/dbengine/lucene/collector/ReactiveCollectorMultiManager.java @@ -15,9 +15,11 @@ import reactor.core.publisher.Sinks.Many; public class ReactiveCollectorMultiManager implements CollectorMultiManager { private final FluxSink scoreDocsSink; + private final Thread requestThread; - public ReactiveCollectorMultiManager(FluxSink scoreDocsSink) { + public ReactiveCollectorMultiManager(FluxSink scoreDocsSink, Thread requestThread) { this.scoreDocsSink = scoreDocsSink; + this.requestThread = requestThread; } public CollectorManager get(int shardIndex) { @@ -29,7 +31,7 @@ public class ReactiveCollectorMultiManager implements CollectorMultiManager scoreDocsSink; private final int shardIndex; + private final Thread requestThread; - public ReactiveLeafCollector(LeafReaderContext leafReaderContext, FluxSink scoreDocsSink, int shardIndex) { + public ReactiveLeafCollector(LeafReaderContext leafReaderContext, + FluxSink scoreDocsSink, + int shardIndex, + Thread requestThread) { this.leafReaderContext = leafReaderContext; this.scoreDocsSink = scoreDocsSink; this.shardIndex = shardIndex; + this.requestThread = requestThread; } @Override @@ -30,15 +36,25 @@ public class ReactiveLeafCollector implements LeafCollector { @Override public void collect(int i) { - LLUtils.ensureBlocking(); - var scoreDoc = new ScoreDoc(leafReaderContext.docBase + i, 0, shardIndex); - while (scoreDocsSink.requestedFromDownstream() < 0 && !scoreDocsSink.isCancelled()) { - // 10ms - LockSupport.parkNanos(10L * 1000000L); + // Assert that we are running on the request thread + assert Thread.currentThread() == requestThread; + // Assert that this is a non-blocking context + assert !Schedulers.isInNonBlockingThread(); + + // Wait if no requests from downstream are found + boolean cancelled; + while (!(cancelled = scoreDocsSink.isCancelled()) && scoreDocsSink.requestedFromDownstream() <= 0) { + // 1000ms + LockSupport.parkNanos(1000L * 1000000L); } - scoreDocsSink.next(scoreDoc); - if (scoreDocsSink.isCancelled()) { + // Cancel execution throwing this specific lucene error + if (cancelled) { throw new CollectionTerminatedException(); } + + // Send the response + var scoreDoc = new ScoreDoc(leafReaderContext.docBase + i, 0, shardIndex); + scoreDocsSink.next(scoreDoc); + } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java index 3fdcfb7..9f6bf8a 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java @@ -9,11 +9,13 @@ import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.collector.ReactiveCollectorMultiManager; import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.LockSupport; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.ScoreDoc; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink.OverflowStrategy; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher { @@ -32,57 +34,60 @@ public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher { .fromCallable(() -> new TransformerInput(indexSearchers, queryParams))); } - return queryParamsMono - .flatMap(queryParams2 -> { - var localQueryParams = getLocalQueryParams(queryParams2); - if (queryParams2.isSorted() && queryParams2.limitLong() > 0) { - return Mono.error(new UnsupportedOperationException("Sorted queries are not supported" - + " by UnsortedUnscoredContinuousLuceneMultiSearcher")); + return queryParamsMono.map(queryParams2 -> { + var localQueryParams = getLocalQueryParams(queryParams2); + if (queryParams2.isSorted() && queryParams2.limitLong() > 0) { + throw new UnsupportedOperationException("Sorted queries are not supported" + + " by UnsortedUnscoredContinuousLuceneMultiSearcher"); + } + if (queryParams2.needsScores() && queryParams2.limitLong() > 0) { + throw new UnsupportedOperationException("Scored queries are not supported" + + " by UnsortedUnscoredContinuousLuceneMultiSearcher"); + } + var shards = indexSearchers.shards(); + + Flux scoreDocsFlux = Flux.create(scoreDocsSink -> { + LLUtils.ensureBlocking(); + var currentThread = Thread.currentThread(); + var cmm = new ReactiveCollectorMultiManager(scoreDocsSink, currentThread); + + // Unpark the paused request thread + scoreDocsSink.onRequest(n -> LockSupport.unpark(currentThread)); + + int mutableShardIndex = 0; + for (IndexSearcher shard : shards) { + int shardIndex = mutableShardIndex++; + try { + var collectorManager = cmm.get(shardIndex); + assert queryParams.computePreciseHitsCount() == cmm.scoreMode().isExhaustive(); + + var executor = shard.getExecutor(); + if (executor == null) { + shard.search(localQueryParams.query(), collectorManager); + } else { + // Avoid using the index searcher executor to avoid blocking on its threads + shard.search(localQueryParams.query(), collectorManager.newCollector()); + } + } catch (Throwable e) { + scoreDocsSink.error(e); + } finally { + scoreDocsSink.complete(); } - if (queryParams2.needsScores() && queryParams2.limitLong() > 0) { - return Mono.error(new UnsupportedOperationException("Scored queries are not supported" - + " by UnsortedUnscoredContinuousLuceneMultiSearcher")); - } - return Mono.fromCallable(() -> { - LLUtils.ensureBlocking(); - - var shards = indexSearchers.shards(); - - Flux scoreDocsFlux = Flux.create(scoreDocsSink -> { - var cmm = new ReactiveCollectorMultiManager(scoreDocsSink); - - AtomicInteger runningTasks = new AtomicInteger(0); - - runningTasks.addAndGet(shards.size()); - int mutableShardIndex = 0; - for (IndexSearcher shard : shards) { - int shardIndex = mutableShardIndex++; - try { - var collector = cmm.get(shardIndex); - assert queryParams.computePreciseHitsCount() == cmm.scoreMode().isExhaustive(); - - shard.search(localQueryParams.query(), collector); - } catch (Throwable e) { - scoreDocsSink.error(e); - } finally { - if (runningTasks.decrementAndGet() <= 0) { - scoreDocsSink.complete(); - } - } - } - }, OverflowStrategy.BUFFER); + } + }, OverflowStrategy.ERROR) + .subscribeOn(Schedulers.boundedElastic(), false) + .limitRate(1024); - Flux resultsFlux = LuceneUtils.convertHits(scoreDocsFlux, shards, keyFieldName, false); + Flux resultsFlux = LuceneUtils.convertHits(scoreDocsFlux, shards, keyFieldName, false); - var totalHitsCount = new TotalHitsCount(0, false); - Flux mergedFluxes = resultsFlux - .skip(queryParams2.offsetLong()) - .take(queryParams2.limitLong(), true); + var totalHitsCount = new TotalHitsCount(0, false); + Flux mergedFluxes = resultsFlux + .skip(queryParams2.offsetLong()) + .take(queryParams2.limitLong(), true); - return new LuceneSearchResult(totalHitsCount, mergedFluxes, indexSearchers::close); - }); - }); + return new LuceneSearchResult(totalHitsCount, mergedFluxes, indexSearchers::close); + }); }, false); }