2021-09-06 15:06:51 +02:00
|
|
|
package it.cavallium.dbengine.database.disk;
|
|
|
|
|
|
|
|
import com.google.common.cache.CacheBuilder;
|
|
|
|
import com.google.common.cache.CacheLoader;
|
|
|
|
import com.google.common.cache.LoadingCache;
|
|
|
|
import it.cavallium.dbengine.database.LLSnapshot;
|
|
|
|
import java.io.IOException;
|
|
|
|
import java.time.Duration;
|
2021-09-08 21:34:52 +02:00
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
2021-09-06 18:29:10 +02:00
|
|
|
import java.util.concurrent.Phaser;
|
2021-09-07 02:36:11 +02:00
|
|
|
import java.util.concurrent.TimeUnit;
|
2021-09-07 11:39:13 +02:00
|
|
|
import java.util.concurrent.TimeoutException;
|
2021-09-08 21:34:52 +02:00
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
2021-09-06 15:06:51 +02:00
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
2021-09-06 17:35:02 +02:00
|
|
|
import java.util.concurrent.locks.LockSupport;
|
2021-09-06 15:06:51 +02:00
|
|
|
import java.util.function.Function;
|
|
|
|
import org.apache.lucene.index.IndexWriter;
|
|
|
|
import org.apache.lucene.search.IndexSearcher;
|
|
|
|
import org.apache.lucene.search.SearcherFactory;
|
|
|
|
import org.apache.lucene.search.SearcherManager;
|
|
|
|
import org.apache.lucene.search.similarities.Similarity;
|
2021-09-06 17:35:02 +02:00
|
|
|
import org.apache.lucene.store.AlreadyClosedException;
|
2021-09-06 15:06:51 +02:00
|
|
|
import org.jetbrains.annotations.NotNull;
|
|
|
|
import org.jetbrains.annotations.Nullable;
|
2021-09-06 18:52:21 +02:00
|
|
|
import org.slf4j.Logger;
|
|
|
|
import org.slf4j.LoggerFactory;
|
2021-09-06 15:06:51 +02:00
|
|
|
import reactor.core.publisher.Flux;
|
|
|
|
import reactor.core.publisher.Mono;
|
|
|
|
import reactor.core.publisher.Sinks;
|
|
|
|
import reactor.core.publisher.Sinks.Empty;
|
|
|
|
import reactor.core.scheduler.Schedulers;
|
|
|
|
|
2021-09-06 18:24:36 +02:00
|
|
|
public class CachedIndexSearcherManager {
|
2021-09-06 15:06:51 +02:00
|
|
|
|
2021-09-06 18:52:21 +02:00
|
|
|
private static final Logger logger = LoggerFactory.getLogger(CachedIndexSearcherManager.class);
|
|
|
|
|
2021-09-06 15:06:51 +02:00
|
|
|
private final SnapshotsManager snapshotsManager;
|
|
|
|
private final Similarity similarity;
|
|
|
|
private final SearcherManager searcherManager;
|
|
|
|
private final Duration queryRefreshDebounceTime;
|
2021-09-06 18:29:10 +02:00
|
|
|
private final Phaser activeSearchers = new Phaser(1);
|
2021-09-06 18:52:21 +02:00
|
|
|
private final Phaser activeRefreshes = new Phaser(1);
|
2021-09-06 15:06:51 +02:00
|
|
|
|
2021-09-06 17:35:02 +02:00
|
|
|
private final LoadingCache<LLSnapshot, Mono<CachedIndexSearcher>> cachedSnapshotSearchers;
|
|
|
|
private final Mono<CachedIndexSearcher> cachedMainSearcher;
|
2021-09-06 15:06:51 +02:00
|
|
|
|
|
|
|
private final Empty<Void> closeRequested = Sinks.empty();
|
|
|
|
private final Empty<Void> refresherClosed = Sinks.empty();
|
2021-09-08 21:34:52 +02:00
|
|
|
private final Mono<Void> closeMono;
|
2021-09-06 15:06:51 +02:00
|
|
|
|
2021-09-06 18:24:36 +02:00
|
|
|
public CachedIndexSearcherManager(IndexWriter indexWriter,
|
2021-09-06 15:06:51 +02:00
|
|
|
SnapshotsManager snapshotsManager,
|
|
|
|
Similarity similarity,
|
|
|
|
boolean applyAllDeletes,
|
|
|
|
boolean writeAllDeletes,
|
|
|
|
Duration queryRefreshDebounceTime) throws IOException {
|
|
|
|
this.snapshotsManager = snapshotsManager;
|
|
|
|
this.similarity = similarity;
|
|
|
|
this.queryRefreshDebounceTime = queryRefreshDebounceTime;
|
|
|
|
|
|
|
|
this.searcherManager = new SearcherManager(indexWriter,
|
|
|
|
applyAllDeletes,
|
|
|
|
writeAllDeletes,
|
|
|
|
new SearcherFactory()
|
|
|
|
);
|
|
|
|
|
|
|
|
Mono
|
2021-09-06 18:52:21 +02:00
|
|
|
.fromRunnable(() -> {
|
|
|
|
try {
|
2021-09-08 21:34:52 +02:00
|
|
|
maybeRefresh();
|
2021-09-06 18:52:21 +02:00
|
|
|
} catch (Exception ex) {
|
|
|
|
logger.error("Failed to refresh the searcher manager", ex);
|
|
|
|
}
|
|
|
|
})
|
2021-09-06 15:06:51 +02:00
|
|
|
.repeatWhen(s -> s.delayElements(queryRefreshDebounceTime, Schedulers.boundedElastic()))
|
|
|
|
.subscribeOn(Schedulers.boundedElastic())
|
|
|
|
.takeUntilOther(closeRequested.asMono())
|
|
|
|
.doAfterTerminate(refresherClosed::tryEmitEmpty)
|
|
|
|
.subscribe();
|
|
|
|
|
|
|
|
this.cachedSnapshotSearchers = CacheBuilder.newBuilder()
|
|
|
|
.expireAfterWrite(queryRefreshDebounceTime)
|
2021-09-06 15:08:07 +02:00
|
|
|
// Max 3 cached non-main index writers
|
|
|
|
.maximumSize(3)
|
2021-09-06 15:06:51 +02:00
|
|
|
.build(new CacheLoader<>() {
|
|
|
|
@Override
|
2021-09-06 17:35:02 +02:00
|
|
|
public Mono<CachedIndexSearcher> load(@NotNull LLSnapshot snapshot) {
|
2021-09-06 18:24:36 +02:00
|
|
|
return CachedIndexSearcherManager.this.generateCachedSearcher(snapshot);
|
2021-09-06 15:06:51 +02:00
|
|
|
}
|
|
|
|
});
|
|
|
|
this.cachedMainSearcher = this.generateCachedSearcher(null);
|
2021-09-08 21:34:52 +02:00
|
|
|
|
|
|
|
this.closeMono = Mono
|
|
|
|
.fromRunnable(() -> {
|
|
|
|
logger.info("Closing IndexSearcherManager...");
|
|
|
|
this.closeRequested.tryEmitEmpty();
|
|
|
|
})
|
|
|
|
.then(refresherClosed.asMono())
|
|
|
|
.then(Mono.<Void>fromRunnable(() -> {
|
|
|
|
logger.info("Closed IndexSearcherManager");
|
|
|
|
logger.info("Closing refreshes...");
|
|
|
|
if (!activeRefreshes.isTerminated()) {
|
|
|
|
try {
|
|
|
|
activeRefreshes.awaitAdvanceInterruptibly(activeRefreshes.arrive(), 15, TimeUnit.SECONDS);
|
|
|
|
} catch (Exception ex) {
|
|
|
|
if (ex instanceof TimeoutException) {
|
|
|
|
logger.error("Failed to terminate active refreshes: timeout");
|
|
|
|
} else {
|
|
|
|
logger.error("Failed to terminate active refreshes", ex);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
logger.info("Closed refreshes...");
|
|
|
|
logger.info("Closing active searchers...");
|
|
|
|
if (!activeSearchers.isTerminated()) {
|
|
|
|
try {
|
|
|
|
activeSearchers.awaitAdvanceInterruptibly(activeSearchers.arrive(), 15, TimeUnit.SECONDS);
|
|
|
|
} catch (Exception ex) {
|
|
|
|
if (ex instanceof TimeoutException) {
|
|
|
|
logger.error("Failed to terminate active searchers: timeout");
|
|
|
|
} else {
|
|
|
|
logger.error("Failed to terminate active searchers", ex);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
logger.info("Closed active searchers");
|
|
|
|
cachedSnapshotSearchers.invalidateAll();
|
|
|
|
cachedSnapshotSearchers.cleanUp();
|
|
|
|
})).cache();
|
2021-09-06 15:06:51 +02:00
|
|
|
}
|
|
|
|
|
2021-09-06 17:35:02 +02:00
|
|
|
private Mono<CachedIndexSearcher> generateCachedSearcher(@Nullable LLSnapshot snapshot) {
|
2021-09-06 15:06:51 +02:00
|
|
|
return Mono.fromCallable(() -> {
|
2021-09-08 21:34:52 +02:00
|
|
|
activeSearchers.register();
|
2021-09-06 15:06:51 +02:00
|
|
|
IndexSearcher indexSearcher;
|
2021-09-06 18:23:47 +02:00
|
|
|
SearcherManager associatedSearcherManager;
|
2021-09-06 15:06:51 +02:00
|
|
|
if (snapshot == null) {
|
|
|
|
indexSearcher = searcherManager.acquire();
|
|
|
|
indexSearcher.setSimilarity(similarity);
|
2021-09-06 18:23:47 +02:00
|
|
|
associatedSearcherManager = searcherManager;
|
2021-09-06 15:06:51 +02:00
|
|
|
} else {
|
|
|
|
indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher();
|
2021-09-06 18:23:47 +02:00
|
|
|
associatedSearcherManager = null;
|
2021-09-06 15:06:51 +02:00
|
|
|
}
|
2021-09-08 21:34:52 +02:00
|
|
|
AtomicBoolean alreadyDeregistered = new AtomicBoolean(false);
|
|
|
|
return new CachedIndexSearcher(indexSearcher, associatedSearcherManager,
|
|
|
|
() -> {
|
|
|
|
// This shouldn't happen more than once,
|
|
|
|
// but I put this AtomicBoolean to be sure that this will NEVER happen more than once.
|
|
|
|
if (alreadyDeregistered.compareAndSet(false, true)) {
|
|
|
|
activeSearchers.arriveAndDeregister();
|
|
|
|
} else {
|
|
|
|
logger.error("Disposed CachedIndexSearcher twice! This is an implementation bug!");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
);
|
2021-09-06 15:06:51 +02:00
|
|
|
})
|
|
|
|
.cacheInvalidateWhen(indexSearcher -> Mono
|
|
|
|
.firstWithSignal(
|
|
|
|
this.closeRequested.asMono(),
|
|
|
|
Mono.delay(queryRefreshDebounceTime, Schedulers.boundedElastic()).then()
|
|
|
|
),
|
|
|
|
indexSearcher -> {
|
|
|
|
try {
|
2021-09-06 17:42:12 +02:00
|
|
|
// Mark as removed from cache
|
2021-09-06 18:23:47 +02:00
|
|
|
indexSearcher.removeFromCache();
|
2021-09-06 18:52:21 +02:00
|
|
|
} catch (Exception ex) {
|
|
|
|
logger.error("Failed to release an old cached IndexSearcher", ex);
|
2021-09-06 15:06:51 +02:00
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
public void maybeRefreshBlocking() throws IOException {
|
2021-09-06 17:35:02 +02:00
|
|
|
try {
|
2021-09-06 18:52:21 +02:00
|
|
|
activeRefreshes.register();
|
2021-09-06 17:35:02 +02:00
|
|
|
searcherManager.maybeRefreshBlocking();
|
|
|
|
} catch (AlreadyClosedException ignored) {
|
|
|
|
|
2021-09-06 18:52:21 +02:00
|
|
|
} finally {
|
|
|
|
activeRefreshes.arriveAndDeregister();
|
2021-09-06 17:35:02 +02:00
|
|
|
}
|
2021-09-06 15:06:51 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
public void maybeRefresh() throws IOException {
|
2021-09-06 17:35:02 +02:00
|
|
|
try {
|
2021-09-06 18:52:21 +02:00
|
|
|
activeRefreshes.register();
|
2021-09-06 17:35:02 +02:00
|
|
|
searcherManager.maybeRefresh();
|
|
|
|
} catch (AlreadyClosedException ignored) {
|
|
|
|
|
2021-09-06 18:52:21 +02:00
|
|
|
} finally {
|
|
|
|
activeRefreshes.arriveAndDeregister();
|
2021-09-06 17:35:02 +02:00
|
|
|
}
|
2021-09-06 15:06:51 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
public <T> Flux<T> searchMany(@Nullable LLSnapshot snapshot, Function<IndexSearcher, Flux<T>> searcherFunction) {
|
|
|
|
return Flux.usingWhen(
|
|
|
|
this.captureIndexSearcher(snapshot),
|
2021-09-06 17:35:02 +02:00
|
|
|
indexSearcher -> searcherFunction.apply(indexSearcher.getIndexSearcher()),
|
2021-09-06 18:23:47 +02:00
|
|
|
this::releaseUsedIndexSearcher
|
2021-09-06 15:06:51 +02:00
|
|
|
);
|
|
|
|
}
|
|
|
|
|
|
|
|
public <T> Mono<T> search(@Nullable LLSnapshot snapshot, Function<IndexSearcher, Mono<T>> searcherFunction) {
|
|
|
|
return Mono.usingWhen(
|
|
|
|
this.captureIndexSearcher(snapshot),
|
2021-09-06 17:35:02 +02:00
|
|
|
indexSearcher -> searcherFunction.apply(indexSearcher.getIndexSearcher()),
|
2021-09-06 18:23:47 +02:00
|
|
|
this::releaseUsedIndexSearcher
|
2021-09-06 15:06:51 +02:00
|
|
|
);
|
|
|
|
}
|
|
|
|
|
2021-09-06 17:35:02 +02:00
|
|
|
public Mono<CachedIndexSearcher> captureIndexSearcher(@Nullable LLSnapshot snapshot) {
|
2021-09-06 15:06:51 +02:00
|
|
|
return this
|
|
|
|
.retrieveCachedIndexSearcher(snapshot)
|
|
|
|
// Increment reference count
|
2021-09-08 21:34:52 +02:00
|
|
|
.doOnNext(CachedIndexSearcher::incUsage);
|
2021-09-06 15:06:51 +02:00
|
|
|
}
|
|
|
|
|
2021-09-06 17:35:02 +02:00
|
|
|
private Mono<CachedIndexSearcher> retrieveCachedIndexSearcher(LLSnapshot snapshot) {
|
2021-09-06 15:06:51 +02:00
|
|
|
if (snapshot == null) {
|
|
|
|
return this.cachedMainSearcher;
|
|
|
|
} else {
|
|
|
|
return this.cachedSnapshotSearchers.getUnchecked(snapshot);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-06 18:23:47 +02:00
|
|
|
public Mono<Void> releaseUsedIndexSearcher(CachedIndexSearcher indexSearcher) {
|
2021-09-06 15:06:51 +02:00
|
|
|
return Mono.fromRunnable(() -> {
|
|
|
|
try {
|
2021-09-06 17:42:12 +02:00
|
|
|
// Decrement reference count
|
2021-09-06 18:23:47 +02:00
|
|
|
indexSearcher.decUsage();
|
2021-09-06 18:52:21 +02:00
|
|
|
} catch (Exception ex) {
|
|
|
|
logger.error("Failed to release an used IndexSearcher", ex);
|
2021-09-06 15:06:51 +02:00
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
public Mono<Void> close() {
|
2021-09-08 21:34:52 +02:00
|
|
|
return closeMono;
|
2021-09-06 15:06:51 +02:00
|
|
|
}
|
|
|
|
}
|