Reduce refresh overhead

This commit is contained in:
Andrea Cavalli 2022-07-28 23:41:10 +02:00
parent 8ff3381c72
commit b1fbf39c87
3 changed files with 21 additions and 6 deletions

View File

@ -31,6 +31,7 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@ -38,6 +39,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.ToIntFunction;
@ -71,9 +73,11 @@ import org.jetbrains.annotations.Nullable;
import org.rocksdb.AbstractImmutableNativeReference;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
@SuppressWarnings("unused")
@ -720,6 +724,17 @@ public class LLUtils {
}));
}
public static Disposable scheduleRepeated(Scheduler scheduler, Runnable action, Duration delay) {
return scheduler.schedule(() -> {
try {
action.run();
} catch (Throwable ex) {
logger.error(ex);
}
scheduleRepeated(scheduler, action, delay);
}, delay.toMillis(), TimeUnit.MILLISECONDS);
}
@Deprecated
public record DirectBuffer(@NotNull Buffer buffer, @NotNull ByteBuffer byteBuffer) {}

View File

@ -75,13 +75,13 @@ public class CachedIndexSearcherManager extends SimpleResource implements IndexS
this.searcherManager = new SearcherManager(indexWriter, applyAllDeletes, writeAllDeletes, SEARCHER_FACTORY);
refreshSubscription = luceneHeavyTasksScheduler.schedulePeriodically(() -> {
refreshSubscription = LLUtils.scheduleRepeated(luceneHeavyTasksScheduler, () -> {
try {
maybeRefreshBlocking();
maybeRefresh();
} catch (Exception ex) {
LOG.error("Failed to refresh the searcher manager", ex);
}
}, queryRefreshDebounceTime.toMillis(), queryRefreshDebounceTime.toMillis(), TimeUnit.MILLISECONDS);
}, queryRefreshDebounceTime);
this.cachedSnapshotSearchers = CacheBuilder.newBuilder()
.expireAfterWrite(queryRefreshDebounceTime)

View File

@ -77,13 +77,13 @@ public class SimpleIndexSearcherManager extends SimpleResource implements IndexS
this.searcherManager = new SearcherManager(indexWriter, applyAllDeletes, writeAllDeletes, SEARCHER_FACTORY);
refreshSubscription = luceneHeavyTasksScheduler.schedulePeriodically(() -> {
refreshSubscription = LLUtils.scheduleRepeated(luceneHeavyTasksScheduler, () -> {
try {
maybeRefreshBlocking();
maybeRefresh();
} catch (Exception ex) {
LOG.error("Failed to refresh the searcher manager", ex);
}
}, queryRefreshDebounceTime.toMillis(), queryRefreshDebounceTime.toMillis(), TimeUnit.MILLISECONDS);
}, queryRefreshDebounceTime);
this.noSnapshotSearcherMono = retrieveSearcherInternal(null);
}