Remove unnecessary schedulers

This commit is contained in:
Andrea Cavalli 2021-07-31 12:28:53 +02:00
parent 4e782403f5
commit 7597e54bac

View File

@ -102,7 +102,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
true true
); );
// Scheduler used to get callback values of LuceneStreamSearcher without creating deadlocks // Scheduler used to get callback values of LuceneStreamSearcher without creating deadlocks
private final Scheduler luceneWriterScheduler; private static final Scheduler luceneWriterScheduler = Schedulers.boundedElastic();
private final String luceneIndexName; private final String luceneIndexName;
private final SnapshotDeletionPolicy snapshotter; private final SnapshotDeletionPolicy snapshotter;
@ -236,13 +236,13 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
new SearcherFactory() new SearcherFactory()
); );
this.luceneWriterScheduler = Schedulers.newBoundedElastic( /*this.luceneWriterScheduler = Schedulers.newBoundedElastic(
writerSchedulerMaxThreadCount, writerSchedulerMaxThreadCount,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
"lucene-writer", "lucene-writer",
60, 60,
true true
); );*/
// Create scheduled tasks lifecycle manager // Create scheduled tasks lifecycle manager
this.scheduledTasksLifecycle = new ScheduledTaskLifecycle(); this.scheduledTasksLifecycle = new ScheduledTaskLifecycle();
@ -384,18 +384,17 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
public Mono<Void> addDocuments(Flux<Entry<LLTerm, LLDocument>> documents) { public Mono<Void> addDocuments(Flux<Entry<LLTerm, LLDocument>> documents) {
return documents return documents
.collectList() .collectList()
.publishOn(luceneWriterScheduler)
.flatMap(documentsList -> Mono .flatMap(documentsList -> Mono
.<Void>fromCallable(() -> { .fromCallable(() -> {
scheduledTasksLifecycle.startScheduledTask(); scheduledTasksLifecycle.startScheduledTask();
try { try {
//noinspection BlockingMethodInNonBlockingContext
indexWriter.addDocuments(LLUtils.toDocumentsFromEntries(documentsList)); indexWriter.addDocuments(LLUtils.toDocumentsFromEntries(documentsList));
return null; return null;
} finally { } finally {
scheduledTasksLifecycle.endScheduledTask(); scheduledTasksLifecycle.endScheduledTask();
} }
}) })
.subscribeOn(luceneWriterScheduler)
); );
} }