From da2b302d1bb5764194f4870e7579f19f0da3e7e3 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Tue, 26 Oct 2021 00:02:08 +0200 Subject: [PATCH] Replace sinks with references when needed --- .../disk/CachedIndexSearcherManager.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java b/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java index 0fef9b1..9e2a73e 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java @@ -12,6 +12,7 @@ import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.SearcherFactory; @@ -43,8 +44,8 @@ public class CachedIndexSearcherManager implements IndexSearcherManager { private final LoadingCache>> cachedSnapshotSearchers; private final Mono> cachedMainSearcher; - private final Empty closeRequested = Sinks.empty(); - private final Empty refresherClosed = Sinks.empty(); + private final AtomicBoolean closeRequested = new AtomicBoolean(); + private final Empty closeRequestedMono = Sinks.empty(); private final Mono closeMono; public CachedIndexSearcherManager(IndexWriter indexWriter, @@ -59,6 +60,7 @@ public class CachedIndexSearcherManager implements IndexSearcherManager { this.searcherManager = new SearcherManager(indexWriter, applyAllDeletes, writeAllDeletes, SEARCHER_FACTORY); + Empty refresherClosed = Sinks.empty(); Mono .fromRunnable(() -> { try { @@ -69,7 +71,7 @@ public class CachedIndexSearcherManager implements IndexSearcherManager { }) .subscribeOn(Schedulers.boundedElastic()) .repeatWhen(s -> s.delayElements(queryRefreshDebounceTime)) - .takeUntilOther(closeRequested.asMono()) + .takeUntilOther(closeRequestedMono.asMono()) .doAfterTerminate(refresherClosed::tryEmitEmpty) .subscribe(); @@ -88,7 +90,8 @@ public class CachedIndexSearcherManager implements IndexSearcherManager { this.closeMono = Mono .fromRunnable(() -> { logger.info("Closing IndexSearcherManager..."); - this.closeRequested.tryEmitEmpty(); + this.closeRequested.set(true); + this.closeRequestedMono.tryEmitEmpty(); }) .then(refresherClosed.asMono()) .then(Mono.fromRunnable(() -> { @@ -127,10 +130,9 @@ public class CachedIndexSearcherManager implements IndexSearcherManager { private Mono> generateCachedSearcher(@Nullable LLSnapshot snapshot) { // todo: check if defer is really needed return Mono.defer(() -> { - var onClose = this.closeRequested.asMono(); - var onQueryRefresh = Mono.delay(queryRefreshDebounceTime).then(); - var onInvalidateCache = Mono.firstWithSignal(onClose, onQueryRefresh).doOnNext(s -> System.err.println("Invalidation triggered")); - + if (closeRequested.get()) { + return Mono.empty(); + } return Mono.fromCallable(() -> { activeSearchers.register(); IndexSearcher indexSearcher; @@ -146,7 +148,6 @@ public class CachedIndexSearcherManager implements IndexSearcherManager { assert indexSearcher.getIndexReader().getRefCount() > 0; return new LLIndexSearcher(indexSearcher, decRef, this::dropCachedIndexSearcher).send(); }) - .takeUntilOther(onClose) .doOnDiscard(Send.class, Send::close); }); }