CavalliumDBEngine/src/main/java/it/cavallium/dbengine/database/disk/PooledIndexSearcherManager.java

194 lines
6.2 KiB
Java
Raw Normal View History

2021-09-06 15:06:51 +02:00
package it.cavallium.dbengine.database.disk;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import it.cavallium.dbengine.database.LLSnapshot;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
2021-09-06 17:35:02 +02:00
import java.util.concurrent.locks.LockSupport;
2021-09-06 15:06:51 +02:00
import java.util.function.Function;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.similarities.Similarity;
2021-09-06 17:35:02 +02:00
import org.apache.lucene.store.AlreadyClosedException;
2021-09-06 15:06:51 +02:00
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Empty;
import reactor.core.scheduler.Schedulers;
public class PooledIndexSearcherManager {
private final SnapshotsManager snapshotsManager;
private final Similarity similarity;
private final SearcherManager searcherManager;
private final Duration queryRefreshDebounceTime;
2021-09-06 17:35:02 +02:00
private final AtomicInteger activeSearchers = new AtomicInteger(0);
2021-09-06 15:06:51 +02:00
2021-09-06 17:35:02 +02:00
private final LoadingCache<LLSnapshot, Mono<CachedIndexSearcher>> cachedSnapshotSearchers;
private final Mono<CachedIndexSearcher> cachedMainSearcher;
2021-09-06 15:06:51 +02:00
private final Empty<Void> closeRequested = Sinks.empty();
private final Empty<Void> refresherClosed = Sinks.empty();
public PooledIndexSearcherManager(IndexWriter indexWriter,
SnapshotsManager snapshotsManager,
Similarity similarity,
boolean applyAllDeletes,
boolean writeAllDeletes,
Duration queryRefreshDebounceTime) throws IOException {
this.snapshotsManager = snapshotsManager;
this.similarity = similarity;
this.queryRefreshDebounceTime = queryRefreshDebounceTime;
this.searcherManager = new SearcherManager(indexWriter,
applyAllDeletes,
writeAllDeletes,
new SearcherFactory()
);
Mono
.fromRunnable(this::scheduledQueryRefresh)
.repeatWhen(s -> s.delayElements(queryRefreshDebounceTime, Schedulers.boundedElastic()))
.subscribeOn(Schedulers.boundedElastic())
.takeUntilOther(closeRequested.asMono())
.doAfterTerminate(refresherClosed::tryEmitEmpty)
.subscribe();
this.cachedSnapshotSearchers = CacheBuilder.newBuilder()
.expireAfterWrite(queryRefreshDebounceTime)
2021-09-06 15:08:07 +02:00
// Max 3 cached non-main index writers
.maximumSize(3)
2021-09-06 15:06:51 +02:00
.build(new CacheLoader<>() {
@Override
2021-09-06 17:35:02 +02:00
public Mono<CachedIndexSearcher> load(@NotNull LLSnapshot snapshot) {
2021-09-06 15:06:51 +02:00
return PooledIndexSearcherManager.this.generateCachedSearcher(snapshot);
}
});
this.cachedMainSearcher = this.generateCachedSearcher(null);
}
2021-09-06 17:35:02 +02:00
private Mono<CachedIndexSearcher> generateCachedSearcher(@Nullable LLSnapshot snapshot) {
2021-09-06 15:06:51 +02:00
return Mono.fromCallable(() -> {
IndexSearcher indexSearcher;
SearcherManager associatedSearcherManager;
2021-09-06 15:06:51 +02:00
if (snapshot == null) {
indexSearcher = searcherManager.acquire();
indexSearcher.setSimilarity(similarity);
associatedSearcherManager = searcherManager;
2021-09-06 15:06:51 +02:00
} else {
indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher();
associatedSearcherManager = null;
2021-09-06 15:06:51 +02:00
}
return new CachedIndexSearcher(indexSearcher, associatedSearcherManager, activeSearchers::decrementAndGet);
2021-09-06 15:06:51 +02:00
})
.cacheInvalidateWhen(indexSearcher -> Mono
.firstWithSignal(
this.closeRequested.asMono(),
Mono.delay(queryRefreshDebounceTime, Schedulers.boundedElastic()).then()
),
indexSearcher -> {
try {
2021-09-06 17:42:12 +02:00
// Mark as removed from cache
indexSearcher.removeFromCache();
2021-09-06 15:06:51 +02:00
} catch (IOException e) {
e.printStackTrace();
}
});
}
@SuppressWarnings("unused")
private void scheduledQueryRefresh() {
try {
boolean refreshStarted = searcherManager.maybeRefresh();
// if refreshStarted == false, another thread is currently already refreshing
2021-09-06 17:35:02 +02:00
} catch (AlreadyClosedException ignored) {
2021-09-06 15:06:51 +02:00
} catch (IOException ex) {
ex.printStackTrace();
}
}
public void maybeRefreshBlocking() throws IOException {
2021-09-06 17:35:02 +02:00
try {
searcherManager.maybeRefreshBlocking();
} catch (AlreadyClosedException ignored) {
}
2021-09-06 15:06:51 +02:00
}
public void maybeRefresh() throws IOException {
2021-09-06 17:35:02 +02:00
try {
searcherManager.maybeRefresh();
} catch (AlreadyClosedException ignored) {
}
2021-09-06 15:06:51 +02:00
}
public <T> Flux<T> searchMany(@Nullable LLSnapshot snapshot, Function<IndexSearcher, Flux<T>> searcherFunction) {
return Flux.usingWhen(
this.captureIndexSearcher(snapshot),
2021-09-06 17:35:02 +02:00
indexSearcher -> searcherFunction.apply(indexSearcher.getIndexSearcher()),
this::releaseUsedIndexSearcher
2021-09-06 15:06:51 +02:00
);
}
public <T> Mono<T> search(@Nullable LLSnapshot snapshot, Function<IndexSearcher, Mono<T>> searcherFunction) {
return Mono.usingWhen(
this.captureIndexSearcher(snapshot),
2021-09-06 17:35:02 +02:00
indexSearcher -> searcherFunction.apply(indexSearcher.getIndexSearcher()),
this::releaseUsedIndexSearcher
2021-09-06 15:06:51 +02:00
);
}
2021-09-06 17:35:02 +02:00
public Mono<CachedIndexSearcher> captureIndexSearcher(@Nullable LLSnapshot snapshot) {
2021-09-06 15:06:51 +02:00
return this
.retrieveCachedIndexSearcher(snapshot)
// Increment reference count
2021-09-06 17:35:02 +02:00
.doOnNext(indexSearcher -> {
activeSearchers.incrementAndGet();
indexSearcher.incUsage();
});
2021-09-06 15:06:51 +02:00
}
2021-09-06 17:35:02 +02:00
private Mono<CachedIndexSearcher> retrieveCachedIndexSearcher(LLSnapshot snapshot) {
2021-09-06 15:06:51 +02:00
if (snapshot == null) {
return this.cachedMainSearcher;
} else {
return this.cachedSnapshotSearchers.getUnchecked(snapshot);
}
}
public Mono<Void> releaseUsedIndexSearcher(CachedIndexSearcher indexSearcher) {
2021-09-06 15:06:51 +02:00
return Mono.fromRunnable(() -> {
try {
2021-09-06 17:42:12 +02:00
// Decrement reference count
indexSearcher.decUsage();
2021-09-06 15:06:51 +02:00
} catch (IOException e) {
e.printStackTrace();
}
});
}
public Mono<Void> close() {
return Mono
.fromRunnable(this.closeRequested::tryEmitEmpty)
.then(refresherClosed.asMono())
.then(Mono.fromRunnable(() -> {
2021-09-06 17:35:02 +02:00
while (activeSearchers.get() > 0) {
// Park for 100ms
LockSupport.parkNanos(100000000L);
}
2021-09-06 15:06:51 +02:00
cachedSnapshotSearchers.invalidateAll();
cachedSnapshotSearchers.cleanUp();
}));
}
}