Replace sinks with references when needed

This commit is contained in:
Andrea Cavalli 2021-10-26 00:02:08 +02:00
parent 1899ef1723
commit da2b302d1b

View File

@ -12,6 +12,7 @@ import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.SearcherFactory;
@ -43,8 +44,8 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
private final LoadingCache<LLSnapshot, Mono<Send<LLIndexSearcher>>> cachedSnapshotSearchers;
private final Mono<Send<LLIndexSearcher>> cachedMainSearcher;
private final Empty<Void> closeRequested = Sinks.empty();
private final Empty<Void> refresherClosed = Sinks.empty();
private final AtomicBoolean closeRequested = new AtomicBoolean();
private final Empty<Void> closeRequestedMono = Sinks.empty();
private final Mono<Void> closeMono;
public CachedIndexSearcherManager(IndexWriter indexWriter,
@ -59,6 +60,7 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
this.searcherManager = new SearcherManager(indexWriter, applyAllDeletes, writeAllDeletes, SEARCHER_FACTORY);
Empty<Void> refresherClosed = Sinks.empty();
Mono
.fromRunnable(() -> {
try {
@ -69,7 +71,7 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
})
.subscribeOn(Schedulers.boundedElastic())
.repeatWhen(s -> s.delayElements(queryRefreshDebounceTime))
.takeUntilOther(closeRequested.asMono())
.takeUntilOther(closeRequestedMono.asMono())
.doAfterTerminate(refresherClosed::tryEmitEmpty)
.subscribe();
@ -88,7 +90,8 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
this.closeMono = Mono
.fromRunnable(() -> {
logger.info("Closing IndexSearcherManager...");
this.closeRequested.tryEmitEmpty();
this.closeRequested.set(true);
this.closeRequestedMono.tryEmitEmpty();
})
.then(refresherClosed.asMono())
.then(Mono.<Void>fromRunnable(() -> {
@ -127,10 +130,9 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
private Mono<Send<LLIndexSearcher>> generateCachedSearcher(@Nullable LLSnapshot snapshot) {
// todo: check if defer is really needed
return Mono.defer(() -> {
var onClose = this.closeRequested.asMono();
var onQueryRefresh = Mono.delay(queryRefreshDebounceTime).then();
var onInvalidateCache = Mono.firstWithSignal(onClose, onQueryRefresh).doOnNext(s -> System.err.println("Invalidation triggered"));
if (closeRequested.get()) {
return Mono.empty();
}
return Mono.fromCallable(() -> {
activeSearchers.register();
IndexSearcher indexSearcher;
@ -146,7 +148,6 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
assert indexSearcher.getIndexReader().getRefCount() > 0;
return new LLIndexSearcher(indexSearcher, decRef, this::dropCachedIndexSearcher).send();
})
.takeUntilOther(onClose)
.doOnDiscard(Send.class, Send::close);
});
}