Add dedicated scheduler

This commit is contained in:
Andrea Cavalli 2022-04-05 00:37:44 +02:00
parent 1cd5fc8eed
commit 24cf7ea58d

View File

@ -93,7 +93,13 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
new ShortNamedThreadFactory("heavy-tasks").setDaemon(true).withGroup(new ThreadGroup("lucene-heavy-tasks")),
Math.toIntExact(Duration.ofHours(1).toSeconds())
));
private static final Scheduler bulkScheduler = uninterruptibleScheduler(Schedulers.boundedElastic());
private static final Scheduler luceneWriteScheduler = uninterruptibleScheduler(Schedulers.newBoundedElastic(
DEFAULT_BOUNDED_ELASTIC_SIZE,
DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
new ShortNamedThreadFactory("lucene-write").setDaemon(true).withGroup(new ThreadGroup("lucene-write")),
Math.toIntExact(Duration.ofHours(1).toSeconds())
));
private static final Scheduler bulkScheduler = luceneWriteScheduler;
static {
LLUtils.initHooks();
@ -258,7 +264,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private <V> Mono<V> runSafe(Callable<V> callable) {
return Mono
.fromCallable(callable)
.subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))
.subscribeOn(luceneWriteScheduler)
.publishOn(Schedulers.parallel());
}
@ -422,7 +428,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
shutdownLock.unlock();
}
return null;
}).subscribeOn(luceneHeavyTasksScheduler).transform(this::ensureOpen);
}).subscribeOn(luceneHeavyTasksScheduler).publishOn(Schedulers.parallel()).transform(this::ensureOpen);
}
@Override