diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index 1d25a9f..da872bf 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -102,7 +102,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { true ); // Scheduler used to get callback values of LuceneStreamSearcher without creating deadlocks - private final Scheduler luceneWriterScheduler; + private static final Scheduler luceneWriterScheduler = Schedulers.boundedElastic(); private final String luceneIndexName; private final SnapshotDeletionPolicy snapshotter; @@ -236,13 +236,13 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { new SearcherFactory() ); - this.luceneWriterScheduler = Schedulers.newBoundedElastic( + /*this.luceneWriterScheduler = Schedulers.newBoundedElastic( writerSchedulerMaxThreadCount, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "lucene-writer", 60, true - ); + );*/ // Create scheduled tasks lifecycle manager this.scheduledTasksLifecycle = new ScheduledTaskLifecycle(); @@ -384,18 +384,17 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { public Mono addDocuments(Flux> documents) { return documents .collectList() + .publishOn(luceneWriterScheduler) .flatMap(documentsList -> Mono - .fromCallable(() -> { + .fromCallable(() -> { scheduledTasksLifecycle.startScheduledTask(); try { - //noinspection BlockingMethodInNonBlockingContext indexWriter.addDocuments(LLUtils.toDocumentsFromEntries(documentsList)); return null; } finally { scheduledTasksLifecycle.endScheduledTask(); } }) - .subscribeOn(luceneWriterScheduler) ); }