Fix unsorted unscored streaming multi searcher

This commit is contained in:
Andrea Cavalli 2021-12-08 11:58:06 +01:00
parent b5aa8b4baa
commit 6644e040dd
4 changed files with 18 additions and 10 deletions

View File

@ -35,7 +35,7 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
private static final Logger logger = LoggerFactory.getLogger(CachedIndexSearcherManager.class); private static final Logger logger = LoggerFactory.getLogger(CachedIndexSearcherManager.class);
private final Executor SEARCH_EXECUTOR = Executors private final Executor SEARCH_EXECUTOR = Executors
.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new ShortNamedThreadFactory("lucene-search")); .newCachedThreadPool(new ShortNamedThreadFactory("lucene-search"));
private final SearcherFactory SEARCHER_FACTORY = new ExecutorSearcherFactory(SEARCH_EXECUTOR); private final SearcherFactory SEARCHER_FACTORY = new ExecutorSearcherFactory(SEARCH_EXECUTOR);
private final SnapshotsManager snapshotsManager; private final SnapshotsManager snapshotsManager;

View File

@ -350,6 +350,9 @@ public class LuceneUtils {
); );
} }
/**
* HitsFlux must run on a blocking scheduler
*/
public static Flux<LLKeyScore> convertHits(Flux<ScoreDoc> hitsFlux, public static Flux<LLKeyScore> convertHits(Flux<ScoreDoc> hitsFlux,
List<IndexSearcher> indexSearchers, List<IndexSearcher> indexSearchers,
String keyFieldName, String keyFieldName,

View File

@ -2,6 +2,8 @@ package it.cavallium.dbengine.lucene.collector;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.lucene.searcher.LongSemaphore; import it.cavallium.dbengine.lucene.searcher.LongSemaphore;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.LockSupport;
import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.CollectionTerminatedException;
@ -42,14 +44,17 @@ public class ReactiveLeafCollector implements LeafCollector {
// Wait if no requests from downstream are found // Wait if no requests from downstream are found
try { try {
requested.acquire(); while (!requested.tryAcquire(1, TimeUnit.SECONDS)) {
if (scoreDocsSink.isCancelled()) {
throw new CollectionTerminatedException();
}
}
// Send the response
var scoreDoc = new ScoreDoc(leafReaderContext.docBase + i, 0, shardIndex);
scoreDocsSink.next(scoreDoc);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new CollectionTerminatedException(); throw new CompletionException(e);
} }
// Send the response
var scoreDoc = new ScoreDoc(leafReaderContext.docBase + i, 0, shardIndex);
scoreDocsSink.next(scoreDoc);
} }
} }

View File

@ -77,7 +77,7 @@ public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher {
try { try {
LLUtils.ensureBlocking(); LLUtils.ensureBlocking();
var collectorManager = cmm.get(shardIndex); var collectorManager = cmm.get(shardIndex);
shard.search(localQueryParams.query(), collectorManager); shard.search(localQueryParams.query(), collectorManager.newCollector());
} catch (IOException e) { } catch (IOException e) {
throw new CompletionException(e); throw new CompletionException(e);
} }
@ -92,13 +92,13 @@ public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher {
scoreDocsSink.complete(); scoreDocsSink.complete();
} }
}); });
scoreDocsSink.onCancel(() -> { scoreDocsSink.onDispose(() -> {
for (CompletableFuture<?> future : futures) { for (CompletableFuture<?> future : futures) {
future.cancel(true); future.cancel(true);
} }
combinedFuture.cancel(true); combinedFuture.cancel(true);
}); });
}, OverflowStrategy.ERROR); }, OverflowStrategy.BUFFER).subscribeOn(Schedulers.boundedElastic()).publishOn(Schedulers.boundedElastic());
Flux<LLKeyScore> resultsFlux = LuceneUtils.convertHits(scoreDocsFlux, shards, keyFieldName, false); Flux<LLKeyScore> resultsFlux = LuceneUtils.convertHits(scoreDocsFlux, shards, keyFieldName, false);