From b1fbf39c879b71d11dee73e31f080a9b7d04740b Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 28 Jul 2022 23:41:10 +0200 Subject: [PATCH] Reduce refresh overhead --- .../it/cavallium/dbengine/database/LLUtils.java | 15 +++++++++++++++ .../database/disk/CachedIndexSearcherManager.java | 6 +++--- .../database/disk/SimpleIndexSearcherManager.java | 6 +++--- 3 files changed, 21 insertions(+), 6 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index dbf09d0..a950b6d 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.nio.charset.Charset; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -38,6 +39,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.ToIntFunction; @@ -71,9 +73,11 @@ import org.jetbrains.annotations.Nullable; import org.rocksdb.AbstractImmutableNativeReference; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; +import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Hooks; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; @SuppressWarnings("unused") @@ -720,6 +724,17 @@ public class LLUtils { })); } + public static Disposable scheduleRepeated(Scheduler scheduler, Runnable action, Duration delay) { + return scheduler.schedule(() -> { + try { + action.run(); + } catch (Throwable ex) { + logger.error(ex); + } + scheduleRepeated(scheduler, action, delay); + }, delay.toMillis(), TimeUnit.MILLISECONDS); + } + @Deprecated public record DirectBuffer(@NotNull Buffer buffer, @NotNull ByteBuffer byteBuffer) {} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java b/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java index 494242c..f2b4ed4 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java @@ -75,13 +75,13 @@ public class CachedIndexSearcherManager extends SimpleResource implements IndexS this.searcherManager = new SearcherManager(indexWriter, applyAllDeletes, writeAllDeletes, SEARCHER_FACTORY); - refreshSubscription = luceneHeavyTasksScheduler.schedulePeriodically(() -> { + refreshSubscription = LLUtils.scheduleRepeated(luceneHeavyTasksScheduler, () -> { try { - maybeRefreshBlocking(); + maybeRefresh(); } catch (Exception ex) { LOG.error("Failed to refresh the searcher manager", ex); } - }, queryRefreshDebounceTime.toMillis(), queryRefreshDebounceTime.toMillis(), TimeUnit.MILLISECONDS); + }, queryRefreshDebounceTime); this.cachedSnapshotSearchers = CacheBuilder.newBuilder() .expireAfterWrite(queryRefreshDebounceTime) diff --git a/src/main/java/it/cavallium/dbengine/database/disk/SimpleIndexSearcherManager.java b/src/main/java/it/cavallium/dbengine/database/disk/SimpleIndexSearcherManager.java index f64e7de..c6a0649 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/SimpleIndexSearcherManager.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/SimpleIndexSearcherManager.java @@ -77,13 +77,13 @@ public class SimpleIndexSearcherManager extends SimpleResource implements IndexS this.searcherManager = new SearcherManager(indexWriter, applyAllDeletes, writeAllDeletes, SEARCHER_FACTORY); - refreshSubscription = luceneHeavyTasksScheduler.schedulePeriodically(() -> { + refreshSubscription = LLUtils.scheduleRepeated(luceneHeavyTasksScheduler, () -> { try { - maybeRefreshBlocking(); + maybeRefresh(); } catch (Exception ex) { LOG.error("Failed to refresh the searcher manager", ex); } - }, queryRefreshDebounceTime.toMillis(), queryRefreshDebounceTime.toMillis(), TimeUnit.MILLISECONDS); + }, queryRefreshDebounceTime); this.noSnapshotSearcherMono = retrieveSearcherInternal(null); }