Implement phaser

This commit is contained in:
Andrea Cavalli 2021-09-06 18:29:10 +02:00
parent d05994c8ff
commit 51b60168f7

View File

@ -6,6 +6,7 @@ import com.google.common.cache.LoadingCache;
import it.cavallium.dbengine.database.LLSnapshot;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Function;
@ -29,7 +30,7 @@ public class CachedIndexSearcherManager {
private final Similarity similarity;
private final SearcherManager searcherManager;
private final Duration queryRefreshDebounceTime;
private final AtomicInteger activeSearchers = new AtomicInteger(0);
private final Phaser activeSearchers = new Phaser(1);
private final LoadingCache<LLSnapshot, Mono<CachedIndexSearcher>> cachedSnapshotSearchers;
private final Mono<CachedIndexSearcher> cachedMainSearcher;
@ -86,7 +87,7 @@ public class CachedIndexSearcherManager {
indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher();
associatedSearcherManager = null;
}
return new CachedIndexSearcher(indexSearcher, associatedSearcherManager, activeSearchers::decrementAndGet);
return new CachedIndexSearcher(indexSearcher, associatedSearcherManager, activeSearchers::arriveAndDeregister);
})
.cacheInvalidateWhen(indexSearcher -> Mono
.firstWithSignal(
@ -152,7 +153,7 @@ public class CachedIndexSearcherManager {
.retrieveCachedIndexSearcher(snapshot)
// Increment reference count
.doOnNext(indexSearcher -> {
activeSearchers.incrementAndGet();
activeSearchers.register();
indexSearcher.incUsage();
});
}
@ -181,10 +182,7 @@ public class CachedIndexSearcherManager {
.fromRunnable(this.closeRequested::tryEmitEmpty)
.then(refresherClosed.asMono())
.then(Mono.fromRunnable(() -> {
while (activeSearchers.get() > 0) {
// Park for 100ms
LockSupport.parkNanos(100000000L);
}
activeSearchers.arriveAndAwaitAdvance();
cachedSnapshotSearchers.invalidateAll();
cachedSnapshotSearchers.cleanUp();
}));