Performance optimization

This commit is contained in:
Andrea Cavalli 2021-07-27 19:34:51 +02:00
parent b816e1f9e5
commit e738eda331
11 changed files with 242 additions and 79 deletions

View File

@ -353,22 +353,28 @@ public class LuceneUtils {
return Flux return Flux
.fromArray(hits) .fromArray(hits)
.flatMapSequential(hit -> Mono.fromCallable(() -> { .parallel()
.runOn(scheduler)
.map(hit -> {
int shardDocId = hit.doc; int shardDocId = hit.doc;
int shardIndex = hit.shardIndex; int shardIndex = hit.shardIndex;
float score = hit.score; float score = hit.score;
var indexSearcher = indexSearchers.shard(shardIndex); var indexSearcher = indexSearchers.shard(shardIndex);
try { try {
//noinspection BlockingMethodInNonBlockingContext
String collectedDoc = keyOfTopDoc(shardDocId, indexSearcher.getIndexReader(), keyFieldName); String collectedDoc = keyOfTopDoc(shardDocId, indexSearcher.getIndexReader(), keyFieldName);
return new LLKeyScore(shardDocId, score, Mono.just(collectedDoc)); return new LLKeyScore(shardDocId, score, Mono.just(collectedDoc));
} catch (NoSuchElementException ex) { } catch (NoSuchElementException ex) {
logger.debug("Error: document " + shardDocId + " key is not present!"); logger.debug("Error: document " + shardDocId + " key is not present!");
return null; // Errored key score, to filter out next
return new LLKeyScore(-1, -1, Mono.empty());
} catch (Exception ex) { } catch (Exception ex) {
return new LLKeyScore(shardDocId, score, Mono.error(ex)); return new LLKeyScore(shardDocId, score, Mono.error(ex));
} }
})) })
.subscribeOn(scheduler); // Filter out the errored key scores
.filter(ks -> !(ks.docId() == -1 && ks.score() == -1))
.sequential();
} }
/** /**

View File

@ -19,15 +19,9 @@ import org.jetbrains.annotations.Nullable;
public class UnscoredCollector extends TopDocsCollector<ScoreDoc> implements LeafCollector { public class UnscoredCollector extends TopDocsCollector<ScoreDoc> implements LeafCollector {
private final IntArrayList docIds = new IntArrayList(); private final IntArrayList docIds = new IntArrayList();
private final int limit; private final int limit;
private final boolean doAfterDocCalculation;
private final int afterDocId;
private LeafReaderContext currentLeafReaderContext; private LeafReaderContext currentLeafReaderContext;
private boolean isLastElementOrdered = true; public UnscoredCollector(int limit) {
private int biggestDocId = -1;
private int biggestDocIdIndex;
public UnscoredCollector(@Nullable Integer afterDocId, int limit) {
super(null); super(null);
if (!ALLOW_UNSCORED_PAGINATION_MODE) { if (!ALLOW_UNSCORED_PAGINATION_MODE) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
@ -36,25 +30,6 @@ public class UnscoredCollector extends TopDocsCollector<ScoreDoc> implements Lea
throw new IllegalArgumentException(); throw new IllegalArgumentException();
} }
this.limit = limit; this.limit = limit;
if (afterDocId != null) {
this.doAfterDocCalculation = true;
this.afterDocId = afterDocId;
} else {
this.doAfterDocCalculation = false;
this.afterDocId = -1;
}
}
public UnscoredCollector(@Nullable Integer afterDocId) {
super(null);
this.limit = -1;
if (afterDocId != null) {
this.doAfterDocCalculation = true;
this.afterDocId = afterDocId;
} else {
this.doAfterDocCalculation = false;
this.afterDocId = -1;
}
} }
@Override @Override
@ -64,28 +39,10 @@ public class UnscoredCollector extends TopDocsCollector<ScoreDoc> implements Lea
@Override @Override
public void collect(int localDocId) { public void collect(int localDocId) {
totalHits++; totalHits++;
boolean canCollect; boolean canCollect = limit == -1 || docIds.size() < limit;
if (limit == -1 || docIds.size() < limit) {
if (doAfterDocCalculation) {
canCollect = localDocId > (this.afterDocId - currentLeafReaderContext.docBase);
} else {
canCollect = true;
}
} else {
canCollect = false;
}
if (canCollect) { if (canCollect) {
int docId = currentLeafReaderContext.docBase + localDocId; int docId = currentLeafReaderContext.docBase + localDocId;
if (docIds.add(docId)) { docIds.add(docId);
if (docId > biggestDocId) {
isLastElementOrdered = true;
int docIndex = docIds.size() - 1;
biggestDocId = docId;
biggestDocIdIndex = docIndex;
} else {
isLastElementOrdered = false;
}
}
} }
} }
@ -138,13 +95,6 @@ public class UnscoredCollector extends TopDocsCollector<ScoreDoc> implements Lea
results[i] = new ScoreDoc(docId, 1.0f); results[i] = new ScoreDoc(docId, 1.0f);
i++; i++;
} }
if (!isLastElementOrdered || start + howMany < docIds.size()) {
int lastIndex = results.length - 1;
var previousLastDoc = results[lastIndex];
var biggestDoc = results[biggestDocIdIndex];
results[lastIndex] = biggestDoc;
results[biggestDocIdIndex] = previousLastDoc;
}
} }
@Override @Override

View File

@ -1,6 +1,5 @@
package it.cavallium.dbengine.lucene.searcher; package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.client.query.current.data.QueryParams;
import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.IndexSearcher;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Scheduler;
@ -9,6 +8,8 @@ public class AdaptiveLuceneLocalSearcher implements LuceneLocalSearcher {
private static final LuceneLocalSearcher localSearcher = new SimpleLuceneLocalSearcher(); private static final LuceneLocalSearcher localSearcher = new SimpleLuceneLocalSearcher();
private static final LuceneLocalSearcher unscoredPagedLuceneLocalSearcher = new LocalLuceneWrapper(new UnscoredUnsortedContinuousLuceneMultiSearcher());
private static final LuceneLocalSearcher countSearcher = new CountLuceneLocalSearcher(); private static final LuceneLocalSearcher countSearcher = new CountLuceneLocalSearcher();
@Override @Override
@ -19,6 +20,14 @@ public class AdaptiveLuceneLocalSearcher implements LuceneLocalSearcher {
Scheduler scheduler) { Scheduler scheduler) {
if (queryParams.limit() == 0) { if (queryParams.limit() == 0) {
return countSearcher.collect(indexSearcher, releaseIndexSearcher, queryParams, keyFieldName, scheduler); return countSearcher.collect(indexSearcher, releaseIndexSearcher, queryParams, keyFieldName, scheduler);
} else if (!queryParams.isScored() && queryParams.offset() == 0 && queryParams.limit() >= 2147483630
&& !queryParams.isSorted()) {
return unscoredPagedLuceneLocalSearcher.collect(indexSearcher,
releaseIndexSearcher,
queryParams,
keyFieldName,
scheduler
);
} else { } else {
return localSearcher.collect(indexSearcher, releaseIndexSearcher, queryParams, keyFieldName, scheduler); return localSearcher.collect(indexSearcher, releaseIndexSearcher, queryParams, keyFieldName, scheduler);
} }

View File

@ -6,7 +6,9 @@ public class AdaptiveLuceneMultiSearcher implements LuceneMultiSearcher {
private static final LuceneMultiSearcher scoredLuceneMultiSearcher = new ScoredLuceneMultiSearcher(); private static final LuceneMultiSearcher scoredLuceneMultiSearcher = new ScoredLuceneMultiSearcher();
private static final LuceneMultiSearcher unscoredLuceneMultiSearcher = new UnscoredLuceneMultiSearcher(); private static final LuceneMultiSearcher unscoredPagedLuceneMultiSearcher = new UnscoredPagedLuceneMultiSearcher();
private static final LuceneMultiSearcher unscoredIterableLuceneMultiSearcher = new UnscoredUnsortedContinuousLuceneMultiSearcher();
private static final LuceneMultiSearcher countLuceneMultiSearcher = new CountLuceneMultiSearcher(); private static final LuceneMultiSearcher countLuceneMultiSearcher = new CountLuceneMultiSearcher();
@ -16,8 +18,10 @@ public class AdaptiveLuceneMultiSearcher implements LuceneMultiSearcher {
return countLuceneMultiSearcher.createShardSearcher(queryParams); return countLuceneMultiSearcher.createShardSearcher(queryParams);
} else if (queryParams.isScored()) { } else if (queryParams.isScored()) {
return scoredLuceneMultiSearcher.createShardSearcher(queryParams); return scoredLuceneMultiSearcher.createShardSearcher(queryParams);
} else if (queryParams.offset() == 0 && queryParams.limit() >= 2147483630 && !queryParams.isSorted()) {
return unscoredIterableLuceneMultiSearcher.createShardSearcher(queryParams);
} else { } else {
return unscoredLuceneMultiSearcher.createShardSearcher(queryParams); return unscoredPagedLuceneMultiSearcher.createShardSearcher(queryParams);
} }
} }
} }

View File

@ -0,0 +1,28 @@
package it.cavallium.dbengine.lucene.searcher;
import org.apache.lucene.search.IndexSearcher;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
public class LocalLuceneWrapper implements LuceneLocalSearcher {
private final LuceneMultiSearcher luceneMultiSearcher;
public LocalLuceneWrapper(LuceneMultiSearcher luceneMultiSearcher) {
this.luceneMultiSearcher = luceneMultiSearcher;
}
@Override
public Mono<LuceneSearchResult> collect(IndexSearcher indexSearcher,
Mono<Void> releaseIndexSearcher,
LocalQueryParams queryParams,
String keyFieldName,
Scheduler scheduler) {
var shardSearcher = luceneMultiSearcher.createShardSearcher(queryParams);
return shardSearcher
.flatMap(luceneShardSearcher -> luceneShardSearcher
.searchOn(indexSearcher, releaseIndexSearcher, queryParams, scheduler)
.then(luceneShardSearcher.collect(queryParams, keyFieldName, scheduler))
);
}
}

View File

@ -40,6 +40,7 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset() + paginationInfo.firstPageLimit()), LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset() + paginationInfo.firstPageLimit()),
null, null,
LuceneUtils.totalHitsThreshold(), LuceneUtils.totalHitsThreshold(),
!paginationInfo.forceSinglePage(),
queryParams.isScored()); queryParams.isScored());
//noinspection BlockingMethodInNonBlockingContext //noinspection BlockingMethodInNonBlockingContext
indexSearcher.search(queryParams.query(), firstPageCollector); indexSearcher.search(queryParams.query(), firstPageCollector);
@ -73,6 +74,7 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
s.currentPageLimit(), s.currentPageLimit(),
s.last(), s.last(),
LuceneUtils.totalHitsThreshold(), LuceneUtils.totalHitsThreshold(),
true,
queryParams.isScored() queryParams.isScored()
); );
//noinspection BlockingMethodInNonBlockingContext //noinspection BlockingMethodInNonBlockingContext

View File

@ -32,21 +32,21 @@ class TopDocsSearcher {
int limit, int limit,
ScoreDoc after, ScoreDoc after,
int totalHitsThreshold, int totalHitsThreshold,
boolean allowPagination,
boolean computeScores) { boolean computeScores) {
TopDocsCollector<ScoreDoc> collector; TopDocsCollector<ScoreDoc> collector;
if (after != null && !allowPagination) {
throw new IllegalArgumentException("\"allowPagination\" is false, but \"after\" is set");
}
if (luceneSort == null) { if (luceneSort == null) {
if (after == null) { if (after == null) {
if (computeScores || !ALLOW_UNSCORED_PAGINATION_MODE) { if (computeScores || allowPagination || !ALLOW_UNSCORED_PAGINATION_MODE) {
collector = TopScoreDocCollector.create(limit, totalHitsThreshold); collector = TopScoreDocCollector.create(limit, totalHitsThreshold);
} else { } else {
collector = new UnscoredCollector(null, limit); collector = new UnscoredCollector(limit);
} }
} else { } else {
if (computeScores || !ALLOW_UNSCORED_PAGINATION_MODE) { collector = TopScoreDocCollector.create(limit, after, totalHitsThreshold);
collector = TopScoreDocCollector.create(limit, after, totalHitsThreshold);
} else {
collector = new UnscoredCollector(after.doc, limit);
}
} }
} else { } else {
if (!computeScores) { if (!computeScores) {

View File

@ -6,7 +6,7 @@ import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SE
import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.LuceneUtils;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
public class UnscoredLuceneMultiSearcher implements LuceneMultiSearcher { public class UnscoredPagedLuceneMultiSearcher implements LuceneMultiSearcher {
@Override @Override
public Mono<LuceneShardSearcher> createShardSearcher(LocalQueryParams queryParams) { public Mono<LuceneShardSearcher> createShardSearcher(LocalQueryParams queryParams) {
@ -21,13 +21,14 @@ public class UnscoredLuceneMultiSearcher implements LuceneMultiSearcher {
} else { } else {
paginationInfo = new PaginationInfo(queryParams.limit(), queryParams.offset(), FIRST_PAGE_LIMIT, false); paginationInfo = new PaginationInfo(queryParams.limit(), queryParams.offset(), FIRST_PAGE_LIMIT, false);
} }
UnscoredCollectorManager unsortedCollectorManager = new UnscoredCollectorManager(() -> TopDocsSearcher.getTopDocsCollector(queryParams.sort(), UnscoredTopDocsCollectorManager unsortedCollectorManager = new UnscoredTopDocsCollectorManager(() -> TopDocsSearcher.getTopDocsCollector(queryParams.sort(),
LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset() + paginationInfo.firstPageLimit()), LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset() + paginationInfo.firstPageLimit()),
null, null,
LuceneUtils.totalHitsThreshold(), LuceneUtils.totalHitsThreshold(),
!paginationInfo.forceSinglePage(),
queryParams.isScored() queryParams.isScored()
), queryParams.offset(), queryParams.limit(), queryParams.sort()); ), queryParams.offset(), queryParams.limit(), queryParams.sort());
return new UnscoredLuceneShardSearcher(unsortedCollectorManager, queryParams.query(), paginationInfo); return new UnscoredPagedLuceneShardSearcher(unsortedCollectorManager, queryParams.query(), paginationInfo);
}); });
} }
} }

View File

@ -1,7 +1,6 @@
package it.cavallium.dbengine.lucene.searcher; package it.cavallium.dbengine.lucene.searcher;
import static it.cavallium.dbengine.lucene.searcher.CurrentPageInfo.EMPTY_STATUS; import static it.cavallium.dbengine.lucene.searcher.CurrentPageInfo.EMPTY_STATUS;
import static it.cavallium.dbengine.lucene.searcher.CurrentPageInfo.TIE_BREAKER;
import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.LuceneUtils;
@ -16,13 +15,11 @@ import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopDocsCollector; import org.apache.lucene.search.TopDocsCollector;
import org.apache.lucene.search.TopFieldDocs;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
class UnscoredLuceneShardSearcher implements LuceneShardSearcher { class UnscoredPagedLuceneShardSearcher implements LuceneShardSearcher {
private final Object lock = new Object(); private final Object lock = new Object();
private final List<IndexSearcher> indexSearchersArray = new ArrayList<>(); private final List<IndexSearcher> indexSearchersArray = new ArrayList<>();
@ -32,7 +29,7 @@ class UnscoredLuceneShardSearcher implements LuceneShardSearcher {
private final Query luceneQuery; private final Query luceneQuery;
private final PaginationInfo paginationInfo; private final PaginationInfo paginationInfo;
public UnscoredLuceneShardSearcher(CollectorManager<TopDocsCollector<ScoreDoc>, TopDocs> unsortedCollectorManager, public UnscoredPagedLuceneShardSearcher(CollectorManager<TopDocsCollector<ScoreDoc>, TopDocs> unsortedCollectorManager,
Query luceneQuery, Query luceneQuery,
PaginationInfo paginationInfo) { PaginationInfo paginationInfo) {
this.firstPageUnsortedCollectorManager = unsortedCollectorManager; this.firstPageUnsortedCollectorManager = unsortedCollectorManager;
@ -90,9 +87,9 @@ class UnscoredLuceneShardSearcher implements LuceneShardSearcher {
if (s.last() != null && s.remainingLimit() > 0 && s.currentPageLimit() > 0) { if (s.last() != null && s.remainingLimit() > 0 && s.currentPageLimit() > 0) {
Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null"); Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null");
Query luceneQuery = queryParams.query(); Query luceneQuery = queryParams.query();
UnscoredCollectorManager currentPageUnsortedCollectorManager = new UnscoredCollectorManager( UnscoredTopDocsCollectorManager currentPageUnsortedCollectorManager = new UnscoredTopDocsCollectorManager(
() -> TopDocsSearcher.getTopDocsCollector(queryParams.sort(), s.currentPageLimit(), () -> TopDocsSearcher.getTopDocsCollector(queryParams.sort(), s.currentPageLimit(),
s.last(), LuceneUtils.totalHitsThreshold(), queryParams.isScored()), s.last(), LuceneUtils.totalHitsThreshold(), true, queryParams.isScored()),
0, s.currentPageLimit(), queryParams.sort()); 0, s.currentPageLimit(), queryParams.sort());
//noinspection BlockingMethodInNonBlockingContext //noinspection BlockingMethodInNonBlockingContext
TopDocs pageTopDocs = Flux TopDocs pageTopDocs = Flux

View File

@ -16,7 +16,7 @@ import org.apache.lucene.search.TopDocsCollector;
import org.apache.lucene.search.TopFieldDocs; import org.apache.lucene.search.TopFieldDocs;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
public class UnscoredCollectorManager implements public class UnscoredTopDocsCollectorManager implements
CollectorManager<TopDocsCollector<ScoreDoc>, TopDocs> { CollectorManager<TopDocsCollector<ScoreDoc>, TopDocs> {
private final Supplier<TopDocsCollector<ScoreDoc>> collectorSupplier; private final Supplier<TopDocsCollector<ScoreDoc>> collectorSupplier;
@ -24,7 +24,7 @@ public class UnscoredCollectorManager implements
private final long limit; private final long limit;
private final Sort sort; private final Sort sort;
public UnscoredCollectorManager(Supplier<TopDocsCollector<ScoreDoc>> collectorSupplier, public UnscoredTopDocsCollectorManager(Supplier<TopDocsCollector<ScoreDoc>> collectorSupplier,
long offset, long offset,
long limit, long limit,
@Nullable Sort sort) { @Nullable Sort sort) {

View File

@ -0,0 +1,166 @@
package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.SimpleCollector;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitResult;
import reactor.core.publisher.Sinks.Many;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
public class UnscoredUnsortedContinuousLuceneMultiSearcher implements LuceneMultiSearcher {
@Override
public Mono<LuceneShardSearcher> createShardSearcher(LocalQueryParams queryParams) {
return Mono
.fromCallable(() -> {
AtomicBoolean alreadySubscribed = new AtomicBoolean(false);
Many<ScoreDoc> scoreDocsSink = Sinks.many().unicast().onBackpressureBuffer();
// 1 is the collect phase
AtomicInteger remainingCollectors = new AtomicInteger(1);
if (queryParams.isScored()) {
throw new UnsupportedOperationException("Can't use the unscored searcher to do a scored or sorted query");
}
var cm = new CollectorManager<Collector, Void>() {
class IterableCollector extends SimpleCollector {
private int shardIndex;
@Override
public void collect(int i) {
var scoreDoc = new ScoreDoc(i, 0, shardIndex);
synchronized (scoreDocsSink) {
while (scoreDocsSink.tryEmitNext(scoreDoc) == EmitResult.FAIL_OVERFLOW) {
LockSupport.parkNanos(10);
}
}
}
@Override
public ScoreMode scoreMode() {
return ScoreMode.COMPLETE_NO_SCORES;
}
public void setShardIndex(int shardIndex) {
this.shardIndex = shardIndex;
}
}
@Override
public IterableCollector newCollector() {
return new IterableCollector();
}
@Override
public Void reduce(Collection<Collector> collection) {
throw new UnsupportedOperationException();
}
};
return new LuceneShardSearcher() {
private final Object lock = new Object();
private final List<IndexSearcher> indexSearchersArray = new ArrayList<>();
private final List<Mono<Void>> indexSearcherReleasersArray = new ArrayList<>();
@Override
public Mono<Void> searchOn(IndexSearcher indexSearcher,
Mono<Void> releaseIndexSearcher,
LocalQueryParams queryParams,
Scheduler scheduler) {
return Mono
.<Void>fromCallable(() -> {
//noinspection BlockingMethodInNonBlockingContext
var collector = cm.newCollector();
int collectorShardIndex;
synchronized (lock) {
collectorShardIndex = indexSearchersArray.size();
indexSearchersArray.add(indexSearcher);
indexSearcherReleasersArray.add(releaseIndexSearcher);
}
collector.setShardIndex(collectorShardIndex);
remainingCollectors.incrementAndGet();
Schedulers.boundedElastic().schedule(() -> {
try {
indexSearcher.search(queryParams.query(), collector);
synchronized (scoreDocsSink) {
decrementRemainingCollectors(scoreDocsSink, remainingCollectors);
}
} catch (IOException e) {
scoreDocsSink.tryEmitError(e);
}
});
return null;
})
.subscribeOn(scheduler);
}
@Override
public Mono<LuceneSearchResult> collect(LocalQueryParams queryParams,
String keyFieldName,
Scheduler scheduler) {
return Mono
.fromCallable(() -> {
synchronized (scoreDocsSink) {
decrementRemainingCollectors(scoreDocsSink, remainingCollectors);
}
if (!alreadySubscribed.compareAndSet(false, true)) {
throw new UnsupportedOperationException("Already subscribed!");
}
IndexSearchers indexSearchers;
Mono<Void> release;
synchronized (lock) {
indexSearchers = IndexSearchers.of(indexSearchersArray);
release = Mono.when(indexSearcherReleasersArray);
}
AtomicBoolean resultsAlreadySubscribed = new AtomicBoolean(false);
var resultsFlux = Mono
.<Void>fromCallable(() -> {
if (!resultsAlreadySubscribed.compareAndSet(false, true)) {
throw new UnsupportedOperationException("Already subscribed!");
}
return null;
})
.thenMany(scoreDocsSink.asFlux())
.buffer(1024, ObjectArrayList::new)
.flatMap(scoreDocs -> LuceneUtils.convertHits(scoreDocs.toArray(ScoreDoc[]::new),
indexSearchers,
keyFieldName,
scheduler
));
return new LuceneSearchResult(1, resultsFlux, release);
});
}
};
});
}
private static void decrementRemainingCollectors(Many<ScoreDoc> scoreDocsSink, AtomicInteger remainingCollectors) {
if (remainingCollectors.decrementAndGet() <= 0) {
scoreDocsSink.tryEmitComplete();
}
}
}