diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java index c722687..b39e18a 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java @@ -353,22 +353,28 @@ public class LuceneUtils { return Flux .fromArray(hits) - .flatMapSequential(hit -> Mono.fromCallable(() -> { + .parallel() + .runOn(scheduler) + .map(hit -> { int shardDocId = hit.doc; int shardIndex = hit.shardIndex; float score = hit.score; var indexSearcher = indexSearchers.shard(shardIndex); try { + //noinspection BlockingMethodInNonBlockingContext String collectedDoc = keyOfTopDoc(shardDocId, indexSearcher.getIndexReader(), keyFieldName); return new LLKeyScore(shardDocId, score, Mono.just(collectedDoc)); } catch (NoSuchElementException ex) { logger.debug("Error: document " + shardDocId + " key is not present!"); - return null; + // Errored key score, to filter out next + return new LLKeyScore(-1, -1, Mono.empty()); } catch (Exception ex) { return new LLKeyScore(shardDocId, score, Mono.error(ex)); } - })) - .subscribeOn(scheduler); + }) + // Filter out the errored key scores + .filter(ks -> !(ks.docId() == -1 && ks.score() == -1)) + .sequential(); } /** diff --git a/src/main/java/it/cavallium/dbengine/lucene/UnscoredCollector.java b/src/main/java/it/cavallium/dbengine/lucene/UnscoredCollector.java index c3a6c89..d6caaf7 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/UnscoredCollector.java +++ b/src/main/java/it/cavallium/dbengine/lucene/UnscoredCollector.java @@ -19,15 +19,9 @@ import org.jetbrains.annotations.Nullable; public class UnscoredCollector extends TopDocsCollector implements LeafCollector { private final IntArrayList docIds = new IntArrayList(); private final int limit; - private final boolean doAfterDocCalculation; - private final int afterDocId; private LeafReaderContext currentLeafReaderContext; - private boolean isLastElementOrdered = true; - private int biggestDocId = -1; - private int biggestDocIdIndex; - - public UnscoredCollector(@Nullable Integer afterDocId, int limit) { + public UnscoredCollector(int limit) { super(null); if (!ALLOW_UNSCORED_PAGINATION_MODE) { throw new UnsupportedOperationException(); @@ -36,25 +30,6 @@ public class UnscoredCollector extends TopDocsCollector implements Lea throw new IllegalArgumentException(); } this.limit = limit; - if (afterDocId != null) { - this.doAfterDocCalculation = true; - this.afterDocId = afterDocId; - } else { - this.doAfterDocCalculation = false; - this.afterDocId = -1; - } - } - - public UnscoredCollector(@Nullable Integer afterDocId) { - super(null); - this.limit = -1; - if (afterDocId != null) { - this.doAfterDocCalculation = true; - this.afterDocId = afterDocId; - } else { - this.doAfterDocCalculation = false; - this.afterDocId = -1; - } } @Override @@ -64,28 +39,10 @@ public class UnscoredCollector extends TopDocsCollector implements Lea @Override public void collect(int localDocId) { totalHits++; - boolean canCollect; - if (limit == -1 || docIds.size() < limit) { - if (doAfterDocCalculation) { - canCollect = localDocId > (this.afterDocId - currentLeafReaderContext.docBase); - } else { - canCollect = true; - } - } else { - canCollect = false; - } + boolean canCollect = limit == -1 || docIds.size() < limit; if (canCollect) { int docId = currentLeafReaderContext.docBase + localDocId; - if (docIds.add(docId)) { - if (docId > biggestDocId) { - isLastElementOrdered = true; - int docIndex = docIds.size() - 1; - biggestDocId = docId; - biggestDocIdIndex = docIndex; - } else { - isLastElementOrdered = false; - } - } + docIds.add(docId); } } @@ -138,13 +95,6 @@ public class UnscoredCollector extends TopDocsCollector implements Lea results[i] = new ScoreDoc(docId, 1.0f); i++; } - if (!isLastElementOrdered || start + howMany < docIds.size()) { - int lastIndex = results.length - 1; - var previousLastDoc = results[lastIndex]; - var biggestDoc = results[biggestDocIdIndex]; - results[lastIndex] = biggestDoc; - results[biggestDocIdIndex] = previousLastDoc; - } } @Override diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneLocalSearcher.java index f8f6b4a..5c48b54 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneLocalSearcher.java @@ -1,6 +1,5 @@ package it.cavallium.dbengine.lucene.searcher; -import it.cavallium.dbengine.client.query.current.data.QueryParams; import org.apache.lucene.search.IndexSearcher; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; @@ -9,6 +8,8 @@ public class AdaptiveLuceneLocalSearcher implements LuceneLocalSearcher { private static final LuceneLocalSearcher localSearcher = new SimpleLuceneLocalSearcher(); + private static final LuceneLocalSearcher unscoredPagedLuceneLocalSearcher = new LocalLuceneWrapper(new UnscoredUnsortedContinuousLuceneMultiSearcher()); + private static final LuceneLocalSearcher countSearcher = new CountLuceneLocalSearcher(); @Override @@ -19,6 +20,14 @@ public class AdaptiveLuceneLocalSearcher implements LuceneLocalSearcher { Scheduler scheduler) { if (queryParams.limit() == 0) { return countSearcher.collect(indexSearcher, releaseIndexSearcher, queryParams, keyFieldName, scheduler); + } else if (!queryParams.isScored() && queryParams.offset() == 0 && queryParams.limit() >= 2147483630 + && !queryParams.isSorted()) { + return unscoredPagedLuceneLocalSearcher.collect(indexSearcher, + releaseIndexSearcher, + queryParams, + keyFieldName, + scheduler + ); } else { return localSearcher.collect(indexSearcher, releaseIndexSearcher, queryParams, keyFieldName, scheduler); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneMultiSearcher.java index df7a71b..c8ec0e1 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLuceneMultiSearcher.java @@ -6,7 +6,9 @@ public class AdaptiveLuceneMultiSearcher implements LuceneMultiSearcher { private static final LuceneMultiSearcher scoredLuceneMultiSearcher = new ScoredLuceneMultiSearcher(); - private static final LuceneMultiSearcher unscoredLuceneMultiSearcher = new UnscoredLuceneMultiSearcher(); + private static final LuceneMultiSearcher unscoredPagedLuceneMultiSearcher = new UnscoredPagedLuceneMultiSearcher(); + + private static final LuceneMultiSearcher unscoredIterableLuceneMultiSearcher = new UnscoredUnsortedContinuousLuceneMultiSearcher(); private static final LuceneMultiSearcher countLuceneMultiSearcher = new CountLuceneMultiSearcher(); @@ -16,8 +18,10 @@ public class AdaptiveLuceneMultiSearcher implements LuceneMultiSearcher { return countLuceneMultiSearcher.createShardSearcher(queryParams); } else if (queryParams.isScored()) { return scoredLuceneMultiSearcher.createShardSearcher(queryParams); + } else if (queryParams.offset() == 0 && queryParams.limit() >= 2147483630 && !queryParams.isSorted()) { + return unscoredIterableLuceneMultiSearcher.createShardSearcher(queryParams); } else { - return unscoredLuceneMultiSearcher.createShardSearcher(queryParams); + return unscoredPagedLuceneMultiSearcher.createShardSearcher(queryParams); } } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/LocalLuceneWrapper.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/LocalLuceneWrapper.java new file mode 100644 index 0000000..493cd6b --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/LocalLuceneWrapper.java @@ -0,0 +1,28 @@ +package it.cavallium.dbengine.lucene.searcher; + +import org.apache.lucene.search.IndexSearcher; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; + +public class LocalLuceneWrapper implements LuceneLocalSearcher { + + private final LuceneMultiSearcher luceneMultiSearcher; + + public LocalLuceneWrapper(LuceneMultiSearcher luceneMultiSearcher) { + this.luceneMultiSearcher = luceneMultiSearcher; + } + + @Override + public Mono collect(IndexSearcher indexSearcher, + Mono releaseIndexSearcher, + LocalQueryParams queryParams, + String keyFieldName, + Scheduler scheduler) { + var shardSearcher = luceneMultiSearcher.createShardSearcher(queryParams); + return shardSearcher + .flatMap(luceneShardSearcher -> luceneShardSearcher + .searchOn(indexSearcher, releaseIndexSearcher, queryParams, scheduler) + .then(luceneShardSearcher.collect(queryParams, keyFieldName, scheduler)) + ); + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java index becee15..25ac540 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java @@ -40,6 +40,7 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset() + paginationInfo.firstPageLimit()), null, LuceneUtils.totalHitsThreshold(), + !paginationInfo.forceSinglePage(), queryParams.isScored()); //noinspection BlockingMethodInNonBlockingContext indexSearcher.search(queryParams.query(), firstPageCollector); @@ -73,6 +74,7 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { s.currentPageLimit(), s.last(), LuceneUtils.totalHitsThreshold(), + true, queryParams.isScored() ); //noinspection BlockingMethodInNonBlockingContext diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/TopDocsSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/TopDocsSearcher.java index d3847cc..bc09d4d 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/TopDocsSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/TopDocsSearcher.java @@ -32,21 +32,21 @@ class TopDocsSearcher { int limit, ScoreDoc after, int totalHitsThreshold, + boolean allowPagination, boolean computeScores) { TopDocsCollector collector; + if (after != null && !allowPagination) { + throw new IllegalArgumentException("\"allowPagination\" is false, but \"after\" is set"); + } if (luceneSort == null) { if (after == null) { - if (computeScores || !ALLOW_UNSCORED_PAGINATION_MODE) { + if (computeScores || allowPagination || !ALLOW_UNSCORED_PAGINATION_MODE) { collector = TopScoreDocCollector.create(limit, totalHitsThreshold); } else { - collector = new UnscoredCollector(null, limit); + collector = new UnscoredCollector(limit); } } else { - if (computeScores || !ALLOW_UNSCORED_PAGINATION_MODE) { - collector = TopScoreDocCollector.create(limit, after, totalHitsThreshold); - } else { - collector = new UnscoredCollector(after.doc, limit); - } + collector = TopScoreDocCollector.create(limit, after, totalHitsThreshold); } } else { if (!computeScores) { diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredLuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredPagedLuceneMultiSearcher.java similarity index 75% rename from src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredLuceneMultiSearcher.java rename to src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredPagedLuceneMultiSearcher.java index 37a0f6e..8118348 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredLuceneMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredPagedLuceneMultiSearcher.java @@ -6,7 +6,7 @@ import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SE import it.cavallium.dbengine.lucene.LuceneUtils; import reactor.core.publisher.Mono; -public class UnscoredLuceneMultiSearcher implements LuceneMultiSearcher { +public class UnscoredPagedLuceneMultiSearcher implements LuceneMultiSearcher { @Override public Mono createShardSearcher(LocalQueryParams queryParams) { @@ -21,13 +21,14 @@ public class UnscoredLuceneMultiSearcher implements LuceneMultiSearcher { } else { paginationInfo = new PaginationInfo(queryParams.limit(), queryParams.offset(), FIRST_PAGE_LIMIT, false); } - UnscoredCollectorManager unsortedCollectorManager = new UnscoredCollectorManager(() -> TopDocsSearcher.getTopDocsCollector(queryParams.sort(), + UnscoredTopDocsCollectorManager unsortedCollectorManager = new UnscoredTopDocsCollectorManager(() -> TopDocsSearcher.getTopDocsCollector(queryParams.sort(), LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset() + paginationInfo.firstPageLimit()), null, LuceneUtils.totalHitsThreshold(), + !paginationInfo.forceSinglePage(), queryParams.isScored() ), queryParams.offset(), queryParams.limit(), queryParams.sort()); - return new UnscoredLuceneShardSearcher(unsortedCollectorManager, queryParams.query(), paginationInfo); + return new UnscoredPagedLuceneShardSearcher(unsortedCollectorManager, queryParams.query(), paginationInfo); }); } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredLuceneShardSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredPagedLuceneShardSearcher.java similarity index 90% rename from src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredLuceneShardSearcher.java rename to src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredPagedLuceneShardSearcher.java index 7e4b3c1..2dcc931 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredLuceneShardSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredPagedLuceneShardSearcher.java @@ -1,7 +1,6 @@ package it.cavallium.dbengine.lucene.searcher; import static it.cavallium.dbengine.lucene.searcher.CurrentPageInfo.EMPTY_STATUS; -import static it.cavallium.dbengine.lucene.searcher.CurrentPageInfo.TIE_BREAKER; import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.lucene.LuceneUtils; @@ -16,13 +15,11 @@ import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopDocsCollector; -import org.apache.lucene.search.TopFieldDocs; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; -class UnscoredLuceneShardSearcher implements LuceneShardSearcher { +class UnscoredPagedLuceneShardSearcher implements LuceneShardSearcher { private final Object lock = new Object(); private final List indexSearchersArray = new ArrayList<>(); @@ -32,7 +29,7 @@ class UnscoredLuceneShardSearcher implements LuceneShardSearcher { private final Query luceneQuery; private final PaginationInfo paginationInfo; - public UnscoredLuceneShardSearcher(CollectorManager, TopDocs> unsortedCollectorManager, + public UnscoredPagedLuceneShardSearcher(CollectorManager, TopDocs> unsortedCollectorManager, Query luceneQuery, PaginationInfo paginationInfo) { this.firstPageUnsortedCollectorManager = unsortedCollectorManager; @@ -90,9 +87,9 @@ class UnscoredLuceneShardSearcher implements LuceneShardSearcher { if (s.last() != null && s.remainingLimit() > 0 && s.currentPageLimit() > 0) { Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null"); Query luceneQuery = queryParams.query(); - UnscoredCollectorManager currentPageUnsortedCollectorManager = new UnscoredCollectorManager( + UnscoredTopDocsCollectorManager currentPageUnsortedCollectorManager = new UnscoredTopDocsCollectorManager( () -> TopDocsSearcher.getTopDocsCollector(queryParams.sort(), s.currentPageLimit(), - s.last(), LuceneUtils.totalHitsThreshold(), queryParams.isScored()), + s.last(), LuceneUtils.totalHitsThreshold(), true, queryParams.isScored()), 0, s.currentPageLimit(), queryParams.sort()); //noinspection BlockingMethodInNonBlockingContext TopDocs pageTopDocs = Flux diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredCollectorManager.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredTopDocsCollectorManager.java similarity index 92% rename from src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredCollectorManager.java rename to src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredTopDocsCollectorManager.java index 91abcc8..94940ef 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredCollectorManager.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredTopDocsCollectorManager.java @@ -16,7 +16,7 @@ import org.apache.lucene.search.TopDocsCollector; import org.apache.lucene.search.TopFieldDocs; import org.jetbrains.annotations.Nullable; -public class UnscoredCollectorManager implements +public class UnscoredTopDocsCollectorManager implements CollectorManager, TopDocs> { private final Supplier> collectorSupplier; @@ -24,7 +24,7 @@ public class UnscoredCollectorManager implements private final long limit; private final Sort sort; - public UnscoredCollectorManager(Supplier> collectorSupplier, + public UnscoredTopDocsCollectorManager(Supplier> collectorSupplier, long offset, long limit, @Nullable Sort sort) { diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredUnsortedContinuousLuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredUnsortedContinuousLuceneMultiSearcher.java new file mode 100644 index 0000000..d748ea0 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredUnsortedContinuousLuceneMultiSearcher.java @@ -0,0 +1,166 @@ +package it.cavallium.dbengine.lucene.searcher; + +import it.cavallium.dbengine.lucene.LuceneUtils; +import it.unimi.dsi.fastutil.objects.ObjectArrayList; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.LockSupport; +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.CollectorManager; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.SimpleCollector; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.EmitResult; +import reactor.core.publisher.Sinks.Many; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; + +public class UnscoredUnsortedContinuousLuceneMultiSearcher implements LuceneMultiSearcher { + + @Override + public Mono createShardSearcher(LocalQueryParams queryParams) { + return Mono + .fromCallable(() -> { + AtomicBoolean alreadySubscribed = new AtomicBoolean(false); + Many scoreDocsSink = Sinks.many().unicast().onBackpressureBuffer(); + // 1 is the collect phase + AtomicInteger remainingCollectors = new AtomicInteger(1); + + if (queryParams.isScored()) { + throw new UnsupportedOperationException("Can't use the unscored searcher to do a scored or sorted query"); + } + + var cm = new CollectorManager() { + + class IterableCollector extends SimpleCollector { + + private int shardIndex; + + @Override + public void collect(int i) { + var scoreDoc = new ScoreDoc(i, 0, shardIndex); + synchronized (scoreDocsSink) { + while (scoreDocsSink.tryEmitNext(scoreDoc) == EmitResult.FAIL_OVERFLOW) { + LockSupport.parkNanos(10); + } + } + } + + @Override + public ScoreMode scoreMode() { + return ScoreMode.COMPLETE_NO_SCORES; + } + + public void setShardIndex(int shardIndex) { + this.shardIndex = shardIndex; + } + } + + @Override + public IterableCollector newCollector() { + return new IterableCollector(); + } + + @Override + public Void reduce(Collection collection) { + throw new UnsupportedOperationException(); + } + }; + + return new LuceneShardSearcher() { + private final Object lock = new Object(); + private final List indexSearchersArray = new ArrayList<>(); + private final List> indexSearcherReleasersArray = new ArrayList<>(); + @Override + public Mono searchOn(IndexSearcher indexSearcher, + Mono releaseIndexSearcher, + LocalQueryParams queryParams, + Scheduler scheduler) { + return Mono + .fromCallable(() -> { + //noinspection BlockingMethodInNonBlockingContext + var collector = cm.newCollector(); + int collectorShardIndex; + synchronized (lock) { + collectorShardIndex = indexSearchersArray.size(); + indexSearchersArray.add(indexSearcher); + indexSearcherReleasersArray.add(releaseIndexSearcher); + } + collector.setShardIndex(collectorShardIndex); + remainingCollectors.incrementAndGet(); + Schedulers.boundedElastic().schedule(() -> { + try { + indexSearcher.search(queryParams.query(), collector); + + synchronized (scoreDocsSink) { + decrementRemainingCollectors(scoreDocsSink, remainingCollectors); + } + } catch (IOException e) { + scoreDocsSink.tryEmitError(e); + } + }); + return null; + }) + .subscribeOn(scheduler); + } + + @Override + public Mono collect(LocalQueryParams queryParams, + String keyFieldName, + Scheduler scheduler) { + return Mono + .fromCallable(() -> { + synchronized (scoreDocsSink) { + decrementRemainingCollectors(scoreDocsSink, remainingCollectors); + } + + if (!alreadySubscribed.compareAndSet(false, true)) { + throw new UnsupportedOperationException("Already subscribed!"); + } + + IndexSearchers indexSearchers; + Mono release; + synchronized (lock) { + indexSearchers = IndexSearchers.of(indexSearchersArray); + release = Mono.when(indexSearcherReleasersArray); + } + + AtomicBoolean resultsAlreadySubscribed = new AtomicBoolean(false); + + var resultsFlux = Mono + .fromCallable(() -> { + if (!resultsAlreadySubscribed.compareAndSet(false, true)) { + throw new UnsupportedOperationException("Already subscribed!"); + } + return null; + }) + .thenMany(scoreDocsSink.asFlux()) + .buffer(1024, ObjectArrayList::new) + .flatMap(scoreDocs -> LuceneUtils.convertHits(scoreDocs.toArray(ScoreDoc[]::new), + indexSearchers, + keyFieldName, + scheduler + )); + + return new LuceneSearchResult(1, resultsFlux, release); + }); + } + }; + }); + } + + private static void decrementRemainingCollectors(Many scoreDocsSink, AtomicInteger remainingCollectors) { + if (remainingCollectors.decrementAndGet() <= 0) { + scoreDocsSink.tryEmitComplete(); + } + } +}