2021-09-06 15:06:51 +02:00
|
|
|
package it.cavallium.dbengine.database.disk;
|
|
|
|
|
2021-12-16 16:14:44 +01:00
|
|
|
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
|
|
|
|
|
2021-09-06 15:06:51 +02:00
|
|
|
import com.google.common.cache.CacheBuilder;
|
|
|
|
import com.google.common.cache.CacheLoader;
|
|
|
|
import com.google.common.cache.LoadingCache;
|
|
|
|
import it.cavallium.dbengine.database.LLSnapshot;
|
2022-05-21 22:41:48 +02:00
|
|
|
import it.cavallium.dbengine.database.LLUtils;
|
2021-09-06 15:06:51 +02:00
|
|
|
import java.io.IOException;
|
2022-06-14 13:10:38 +02:00
|
|
|
import java.lang.ref.Cleaner;
|
2021-09-06 15:06:51 +02:00
|
|
|
import java.time.Duration;
|
2022-02-25 15:46:32 +01:00
|
|
|
import java.util.concurrent.ExecutorService;
|
2021-11-09 02:14:21 +01:00
|
|
|
import java.util.concurrent.Executors;
|
2021-09-07 02:36:11 +02:00
|
|
|
import java.util.concurrent.TimeUnit;
|
2021-10-26 00:02:08 +02:00
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
2022-02-25 15:46:32 +01:00
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
import java.util.concurrent.locks.LockSupport;
|
2021-12-17 01:48:49 +01:00
|
|
|
import org.apache.logging.log4j.LogManager;
|
|
|
|
import org.apache.logging.log4j.Logger;
|
2021-09-06 15:06:51 +02:00
|
|
|
import org.apache.lucene.index.IndexWriter;
|
|
|
|
import org.apache.lucene.search.IndexSearcher;
|
|
|
|
import org.apache.lucene.search.SearcherFactory;
|
|
|
|
import org.apache.lucene.search.SearcherManager;
|
|
|
|
import org.apache.lucene.search.similarities.Similarity;
|
2021-09-06 17:35:02 +02:00
|
|
|
import org.apache.lucene.store.AlreadyClosedException;
|
2021-09-06 15:06:51 +02:00
|
|
|
import org.jetbrains.annotations.NotNull;
|
|
|
|
import org.jetbrains.annotations.Nullable;
|
2022-05-10 00:31:16 +02:00
|
|
|
import it.cavallium.dbengine.utils.ShortNamedThreadFactory;
|
2021-09-06 15:06:51 +02:00
|
|
|
import reactor.core.publisher.Mono;
|
|
|
|
import reactor.core.publisher.Sinks;
|
|
|
|
import reactor.core.publisher.Sinks.Empty;
|
2022-04-04 20:12:29 +02:00
|
|
|
import reactor.core.scheduler.Scheduler;
|
2021-09-06 15:06:51 +02:00
|
|
|
import reactor.core.scheduler.Schedulers;
|
|
|
|
|
2021-09-18 18:34:21 +02:00
|
|
|
public class CachedIndexSearcherManager implements IndexSearcherManager {
|
2021-09-06 15:06:51 +02:00
|
|
|
|
2022-06-14 17:46:49 +02:00
|
|
|
private static final Logger LOG = LogManager.getLogger(CachedIndexSearcherManager.class);
|
2022-06-16 18:40:17 +02:00
|
|
|
private static final ExecutorService SEARCH_EXECUTOR = Executors.newFixedThreadPool(
|
2022-02-25 15:46:32 +01:00
|
|
|
Runtime.getRuntime().availableProcessors(),
|
|
|
|
new ShortNamedThreadFactory("lucene-search")
|
|
|
|
.setDaemon(true).withGroup(new ThreadGroup("lucene-search"))
|
|
|
|
);
|
2022-06-16 18:40:17 +02:00
|
|
|
private static final SearcherFactory SEARCHER_FACTORY = new ExecutorSearcherFactory(SEARCH_EXECUTOR);
|
2021-09-06 18:52:21 +02:00
|
|
|
|
2021-09-06 15:06:51 +02:00
|
|
|
private final SnapshotsManager snapshotsManager;
|
2022-04-04 20:12:29 +02:00
|
|
|
private final Scheduler luceneHeavyTasksScheduler;
|
2021-09-06 15:06:51 +02:00
|
|
|
private final Similarity similarity;
|
|
|
|
private final SearcherManager searcherManager;
|
|
|
|
private final Duration queryRefreshDebounceTime;
|
2022-02-25 15:46:32 +01:00
|
|
|
|
|
|
|
private final AtomicLong activeSearchers = new AtomicLong(0);
|
|
|
|
private final AtomicLong activeRefreshes = new AtomicLong(0);
|
2021-09-06 15:06:51 +02:00
|
|
|
|
2022-06-14 13:10:38 +02:00
|
|
|
private final LoadingCache<LLSnapshot, Mono<LLIndexSearcher>> cachedSnapshotSearchers;
|
|
|
|
private final Mono<LLIndexSearcher> cachedMainSearcher;
|
2021-09-06 15:06:51 +02:00
|
|
|
|
2021-10-26 00:02:08 +02:00
|
|
|
private final AtomicBoolean closeRequested = new AtomicBoolean();
|
|
|
|
private final Empty<Void> closeRequestedMono = Sinks.empty();
|
2021-09-08 21:34:52 +02:00
|
|
|
private final Mono<Void> closeMono;
|
2021-09-06 15:06:51 +02:00
|
|
|
|
2022-06-14 17:46:49 +02:00
|
|
|
private static final Cleaner CLEANER = Cleaner.create();
|
2022-06-14 13:10:38 +02:00
|
|
|
|
2021-09-06 18:24:36 +02:00
|
|
|
public CachedIndexSearcherManager(IndexWriter indexWriter,
|
2021-09-06 15:06:51 +02:00
|
|
|
SnapshotsManager snapshotsManager,
|
2022-04-04 20:12:29 +02:00
|
|
|
Scheduler luceneHeavyTasksScheduler,
|
2021-09-06 15:06:51 +02:00
|
|
|
Similarity similarity,
|
|
|
|
boolean applyAllDeletes,
|
|
|
|
boolean writeAllDeletes,
|
|
|
|
Duration queryRefreshDebounceTime) throws IOException {
|
|
|
|
this.snapshotsManager = snapshotsManager;
|
2022-04-04 20:12:29 +02:00
|
|
|
this.luceneHeavyTasksScheduler = luceneHeavyTasksScheduler;
|
2021-09-06 15:06:51 +02:00
|
|
|
this.similarity = similarity;
|
|
|
|
this.queryRefreshDebounceTime = queryRefreshDebounceTime;
|
|
|
|
|
2021-09-25 13:06:24 +02:00
|
|
|
this.searcherManager = new SearcherManager(indexWriter, applyAllDeletes, writeAllDeletes, SEARCHER_FACTORY);
|
2021-09-06 15:06:51 +02:00
|
|
|
|
2021-10-26 00:02:08 +02:00
|
|
|
Empty<Void> refresherClosed = Sinks.empty();
|
2021-09-06 15:06:51 +02:00
|
|
|
Mono
|
2021-09-06 18:52:21 +02:00
|
|
|
.fromRunnable(() -> {
|
|
|
|
try {
|
2021-09-08 21:34:52 +02:00
|
|
|
maybeRefresh();
|
2021-09-06 18:52:21 +02:00
|
|
|
} catch (Exception ex) {
|
2022-06-14 17:46:49 +02:00
|
|
|
LOG.error("Failed to refresh the searcher manager", ex);
|
2021-09-06 18:52:21 +02:00
|
|
|
}
|
|
|
|
})
|
2022-04-04 20:12:29 +02:00
|
|
|
.subscribeOn(luceneHeavyTasksScheduler)
|
2021-12-16 16:14:44 +01:00
|
|
|
.publishOn(Schedulers.parallel())
|
2021-09-09 11:43:37 +02:00
|
|
|
.repeatWhen(s -> s.delayElements(queryRefreshDebounceTime))
|
2021-10-26 00:02:08 +02:00
|
|
|
.takeUntilOther(closeRequestedMono.asMono())
|
2021-09-06 15:06:51 +02:00
|
|
|
.doAfterTerminate(refresherClosed::tryEmitEmpty)
|
2022-05-21 22:41:48 +02:00
|
|
|
.transform(LLUtils::handleDiscard)
|
2021-09-06 15:06:51 +02:00
|
|
|
.subscribe();
|
|
|
|
|
|
|
|
this.cachedSnapshotSearchers = CacheBuilder.newBuilder()
|
|
|
|
.expireAfterWrite(queryRefreshDebounceTime)
|
2021-09-06 15:08:07 +02:00
|
|
|
// Max 3 cached non-main index writers
|
|
|
|
.maximumSize(3)
|
2021-09-06 15:06:51 +02:00
|
|
|
.build(new CacheLoader<>() {
|
|
|
|
@Override
|
2022-06-14 13:10:38 +02:00
|
|
|
public Mono<LLIndexSearcher> load(@NotNull LLSnapshot snapshot) {
|
2021-09-06 18:24:36 +02:00
|
|
|
return CachedIndexSearcherManager.this.generateCachedSearcher(snapshot);
|
2021-09-06 15:06:51 +02:00
|
|
|
}
|
|
|
|
});
|
|
|
|
this.cachedMainSearcher = this.generateCachedSearcher(null);
|
2021-09-08 21:34:52 +02:00
|
|
|
|
|
|
|
this.closeMono = Mono
|
|
|
|
.fromRunnable(() -> {
|
2022-06-14 17:46:49 +02:00
|
|
|
LOG.debug("Closing IndexSearcherManager...");
|
2021-10-26 00:02:08 +02:00
|
|
|
this.closeRequested.set(true);
|
|
|
|
this.closeRequestedMono.tryEmitEmpty();
|
2021-09-08 21:34:52 +02:00
|
|
|
})
|
|
|
|
.then(refresherClosed.asMono())
|
|
|
|
.then(Mono.<Void>fromRunnable(() -> {
|
2022-06-14 17:46:49 +02:00
|
|
|
LOG.debug("Closed IndexSearcherManager");
|
|
|
|
LOG.debug("Closing refreshes...");
|
2022-02-25 15:46:32 +01:00
|
|
|
long initTime = System.nanoTime();
|
|
|
|
while (activeRefreshes.get() > 0 && (System.nanoTime() - initTime) <= 15000000000L) {
|
|
|
|
LockSupport.parkNanos(50000000);
|
2021-09-08 21:34:52 +02:00
|
|
|
}
|
2022-06-14 17:46:49 +02:00
|
|
|
LOG.debug("Closed refreshes...");
|
|
|
|
LOG.debug("Closing active searchers...");
|
2022-02-25 15:46:32 +01:00
|
|
|
initTime = System.nanoTime();
|
|
|
|
while (activeSearchers.get() > 0 && (System.nanoTime() - initTime) <= 15000000000L) {
|
|
|
|
LockSupport.parkNanos(50000000);
|
2021-09-08 21:34:52 +02:00
|
|
|
}
|
2022-06-14 17:46:49 +02:00
|
|
|
LOG.debug("Closed active searchers");
|
|
|
|
LOG.debug("Stopping searcher executor...");
|
2021-09-08 21:34:52 +02:00
|
|
|
cachedSnapshotSearchers.invalidateAll();
|
|
|
|
cachedSnapshotSearchers.cleanUp();
|
2022-06-16 18:40:17 +02:00
|
|
|
SEARCH_EXECUTOR.shutdown();
|
2022-02-25 15:46:32 +01:00
|
|
|
try {
|
|
|
|
//noinspection BlockingMethodInNonBlockingContext
|
2022-06-16 18:40:17 +02:00
|
|
|
if (!SEARCH_EXECUTOR.awaitTermination(15, TimeUnit.SECONDS)) {
|
|
|
|
SEARCH_EXECUTOR.shutdownNow();
|
2022-02-25 15:46:32 +01:00
|
|
|
}
|
|
|
|
} catch (InterruptedException e) {
|
2022-06-14 17:46:49 +02:00
|
|
|
LOG.error("Failed to stop executor", e);
|
2022-02-25 15:46:32 +01:00
|
|
|
}
|
2022-06-14 17:46:49 +02:00
|
|
|
LOG.debug("Stopped searcher executor");
|
2021-12-16 16:14:44 +01:00
|
|
|
}).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())))
|
|
|
|
.publishOn(Schedulers.parallel())
|
|
|
|
.cache();
|
2021-09-06 15:06:51 +02:00
|
|
|
}
|
|
|
|
|
2022-06-14 13:10:38 +02:00
|
|
|
private Mono<LLIndexSearcher> generateCachedSearcher(@Nullable LLSnapshot snapshot) {
|
2021-12-12 16:19:50 +01:00
|
|
|
return Mono.fromCallable(() -> {
|
2022-06-14 13:10:38 +02:00
|
|
|
if (closeRequested.get()) {
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
activeSearchers.incrementAndGet();
|
2022-06-21 14:35:07 +02:00
|
|
|
try {
|
|
|
|
IndexSearcher indexSearcher;
|
|
|
|
boolean fromSnapshot;
|
|
|
|
if (snapshot == null) {
|
|
|
|
indexSearcher = searcherManager.acquire();
|
|
|
|
fromSnapshot = false;
|
|
|
|
} else {
|
|
|
|
indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher(SEARCH_EXECUTOR);
|
|
|
|
fromSnapshot = true;
|
|
|
|
}
|
|
|
|
indexSearcher.setSimilarity(similarity);
|
|
|
|
assert indexSearcher.getIndexReader().getRefCount() > 0;
|
|
|
|
var closed = new AtomicBoolean();
|
|
|
|
LLIndexSearcher llIndexSearcher;
|
|
|
|
if (fromSnapshot) {
|
|
|
|
llIndexSearcher = new SnapshotIndexSearcher(indexSearcher, closed);
|
|
|
|
} else {
|
|
|
|
llIndexSearcher = new MainIndexSearcher(indexSearcher, closed);
|
|
|
|
}
|
|
|
|
CLEANER.register(llIndexSearcher, () -> {
|
|
|
|
if (closed.compareAndSet(false, true)) {
|
|
|
|
LOG.warn("An index searcher was not closed!");
|
|
|
|
if (!fromSnapshot) {
|
|
|
|
try {
|
|
|
|
searcherManager.release(indexSearcher);
|
|
|
|
} catch (IOException e) {
|
|
|
|
LOG.error("Failed to release the index searcher", e);
|
|
|
|
}
|
2022-06-14 13:10:38 +02:00
|
|
|
}
|
2021-12-12 16:19:50 +01:00
|
|
|
}
|
2022-06-21 14:35:07 +02:00
|
|
|
});
|
|
|
|
return llIndexSearcher;
|
|
|
|
} catch (Throwable ex) {
|
|
|
|
activeSearchers.decrementAndGet();
|
|
|
|
throw ex;
|
|
|
|
}
|
2022-06-14 13:10:38 +02:00
|
|
|
});
|
2021-09-18 18:34:21 +02:00
|
|
|
}
|
|
|
|
|
2021-10-01 19:17:33 +02:00
|
|
|
private void dropCachedIndexSearcher() {
|
2021-09-18 18:34:21 +02:00
|
|
|
// This shouldn't happen more than once per searcher.
|
2022-02-25 15:46:32 +01:00
|
|
|
activeSearchers.decrementAndGet();
|
2021-09-06 15:06:51 +02:00
|
|
|
}
|
|
|
|
|
2021-09-18 18:34:21 +02:00
|
|
|
@Override
|
2021-09-06 15:06:51 +02:00
|
|
|
public void maybeRefreshBlocking() throws IOException {
|
2021-09-06 17:35:02 +02:00
|
|
|
try {
|
2022-02-25 15:46:32 +01:00
|
|
|
activeRefreshes.incrementAndGet();
|
2021-09-06 17:35:02 +02:00
|
|
|
searcherManager.maybeRefreshBlocking();
|
|
|
|
} catch (AlreadyClosedException ignored) {
|
|
|
|
|
2021-09-06 18:52:21 +02:00
|
|
|
} finally {
|
2022-02-25 15:46:32 +01:00
|
|
|
activeRefreshes.decrementAndGet();
|
2021-09-06 17:35:02 +02:00
|
|
|
}
|
2021-09-06 15:06:51 +02:00
|
|
|
}
|
|
|
|
|
2021-09-18 18:34:21 +02:00
|
|
|
@Override
|
2021-09-06 15:06:51 +02:00
|
|
|
public void maybeRefresh() throws IOException {
|
2021-09-06 17:35:02 +02:00
|
|
|
try {
|
2022-02-25 15:46:32 +01:00
|
|
|
activeRefreshes.incrementAndGet();
|
2021-09-06 17:35:02 +02:00
|
|
|
searcherManager.maybeRefresh();
|
|
|
|
} catch (AlreadyClosedException ignored) {
|
|
|
|
|
2021-09-06 18:52:21 +02:00
|
|
|
} finally {
|
2022-02-25 15:46:32 +01:00
|
|
|
activeRefreshes.decrementAndGet();
|
2021-09-06 17:35:02 +02:00
|
|
|
}
|
2021-09-06 15:06:51 +02:00
|
|
|
}
|
|
|
|
|
2021-09-18 18:34:21 +02:00
|
|
|
@Override
|
2022-06-14 13:10:38 +02:00
|
|
|
public Mono<LLIndexSearcher> retrieveSearcher(@Nullable LLSnapshot snapshot) {
|
2021-09-06 15:06:51 +02:00
|
|
|
if (snapshot == null) {
|
|
|
|
return this.cachedMainSearcher;
|
|
|
|
} else {
|
|
|
|
return this.cachedSnapshotSearchers.getUnchecked(snapshot);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-18 18:34:21 +02:00
|
|
|
@Override
|
2021-09-06 15:06:51 +02:00
|
|
|
public Mono<Void> close() {
|
2021-09-08 21:34:52 +02:00
|
|
|
return closeMono;
|
2021-09-06 15:06:51 +02:00
|
|
|
}
|
2021-09-25 13:06:24 +02:00
|
|
|
|
2022-06-13 23:25:43 +02:00
|
|
|
public long getActiveSearchers() {
|
|
|
|
return activeSearchers.get();
|
|
|
|
}
|
|
|
|
|
|
|
|
public long getActiveRefreshes() {
|
|
|
|
return activeRefreshes.get();
|
|
|
|
}
|
2022-06-14 13:10:38 +02:00
|
|
|
|
|
|
|
private class MainIndexSearcher extends LLIndexSearcher {
|
|
|
|
|
|
|
|
public MainIndexSearcher(IndexSearcher indexSearcher, AtomicBoolean released) {
|
2022-06-14 17:46:49 +02:00
|
|
|
super(indexSearcher, released);
|
2022-06-14 13:10:38 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void onClose() throws IOException {
|
|
|
|
dropCachedIndexSearcher();
|
2022-06-14 17:46:49 +02:00
|
|
|
if (getClosed().compareAndSet(false, true)) {
|
2022-06-14 13:10:38 +02:00
|
|
|
searcherManager.release(indexSearcher);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private class SnapshotIndexSearcher extends LLIndexSearcher {
|
|
|
|
|
2022-06-14 17:46:49 +02:00
|
|
|
public SnapshotIndexSearcher(IndexSearcher indexSearcher, AtomicBoolean closed) {
|
|
|
|
super(indexSearcher, closed);
|
2022-06-14 13:10:38 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void onClose() {
|
|
|
|
dropCachedIndexSearcher();
|
|
|
|
}
|
|
|
|
}
|
2021-09-06 15:06:51 +02:00
|
|
|
}
|