Use the right scheduler

This commit is contained in:
Andrea Cavalli 2022-04-04 20:12:29 +02:00
parent a45f357bca
commit 1dfe0d5a77
2 changed files with 6 additions and 1 deletions

View File

@ -29,6 +29,7 @@ import org.warp.commonutils.type.ShortNamedThreadFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Empty;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
public class CachedIndexSearcherManager implements IndexSearcherManager {
@ -42,6 +43,7 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
private final SearcherFactory SEARCHER_FACTORY = new ExecutorSearcherFactory(searchExecutor);
private final SnapshotsManager snapshotsManager;
private final Scheduler luceneHeavyTasksScheduler;
private final Similarity similarity;
private final SearcherManager searcherManager;
private final Duration queryRefreshDebounceTime;
@ -58,11 +60,13 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
public CachedIndexSearcherManager(IndexWriter indexWriter,
SnapshotsManager snapshotsManager,
Scheduler luceneHeavyTasksScheduler,
Similarity similarity,
boolean applyAllDeletes,
boolean writeAllDeletes,
Duration queryRefreshDebounceTime) throws IOException {
this.snapshotsManager = snapshotsManager;
this.luceneHeavyTasksScheduler = luceneHeavyTasksScheduler;
this.similarity = similarity;
this.queryRefreshDebounceTime = queryRefreshDebounceTime;
@ -77,7 +81,7 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
logger.error("Failed to refresh the searcher manager", ex);
}
})
.subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))
.subscribeOn(luceneHeavyTasksScheduler)
.publishOn(Schedulers.parallel())
.repeatWhen(s -> s.delayElements(queryRefreshDebounceTime))
.takeUntilOther(closeRequestedMono.asMono())

View File

@ -206,6 +206,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
this.snapshotsManager = new SnapshotsManager(indexWriter, snapshotter);
this.searcherManager = new CachedIndexSearcherManager(indexWriter,
snapshotsManager,
luceneHeavyTasksScheduler,
getLuceneSimilarity(),
luceneOptions.applyAllDeletes().orElse(true),
luceneOptions.writeAllDeletes().orElse(false),