Better manage cached index searchers
This commit is contained in:
parent
b7d975ccfb
commit
e59705bd0b
@ -0,0 +1,55 @@
|
||||
package it.cavallium.dbengine.database.disk;
|
||||
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
|
||||
public class CachedIndexSearcher {
|
||||
|
||||
private final IndexSearcher indexSearcher;
|
||||
private boolean inCache = true;
|
||||
private int usages = 0;
|
||||
|
||||
public CachedIndexSearcher(IndexSearcher indexSearcher) {
|
||||
this.indexSearcher = indexSearcher;
|
||||
}
|
||||
|
||||
public void incUsage() {
|
||||
synchronized (this) {
|
||||
usages++;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return true if closed
|
||||
*/
|
||||
public boolean decUsage() {
|
||||
synchronized (this) {
|
||||
usages--;
|
||||
return isClosed();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return true if closed
|
||||
*/
|
||||
public boolean removeFromCache() {
|
||||
synchronized (this) {
|
||||
inCache = false;
|
||||
return isClosed();
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isClosed() {
|
||||
return this.inCache || this.usages > 0;
|
||||
}
|
||||
|
||||
public IndexReader getIndexReader() {
|
||||
return indexSearcher.getIndexReader();
|
||||
}
|
||||
|
||||
public IndexSearcher getIndexSearcher() {
|
||||
return indexSearcher;
|
||||
}
|
||||
}
|
@ -376,7 +376,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
.flatMap(indexSearcher -> {
|
||||
Mono<Void> releaseMono = searcherManager.releaseUsedIndexSearcher(snapshot, indexSearcher);
|
||||
return localSearcher
|
||||
.collect(indexSearcher, releaseMono, modifiedLocalQuery, keyFieldName)
|
||||
.collect(indexSearcher.getIndexSearcher(), releaseMono, modifiedLocalQuery, keyFieldName)
|
||||
.map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result.release()))
|
||||
.onErrorResume(ex -> releaseMono.then(Mono.error(ex)));
|
||||
})
|
||||
@ -392,7 +392,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
.flatMap(indexSearcher -> {
|
||||
Mono<Void> releaseMono = searcherManager.releaseUsedIndexSearcher(snapshot, indexSearcher);
|
||||
return shardSearcher
|
||||
.searchOn(indexSearcher, releaseMono, modifiedLocalQuery)
|
||||
.searchOn(indexSearcher.getIndexSearcher(), releaseMono, modifiedLocalQuery)
|
||||
.onErrorResume(ex -> releaseMono.then(Mono.error(ex)));
|
||||
})
|
||||
);
|
||||
@ -469,7 +469,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
return searcherManager.captureIndexSearcher(snapshot).flatMap(indexSearcher -> {
|
||||
Mono<Void> releaseMono = searcherManager.releaseUsedIndexSearcher(snapshot, indexSearcher);
|
||||
return localSearcher
|
||||
.collect(indexSearcher, releaseMono, localQueryParams, keyFieldName)
|
||||
.collect(indexSearcher.getIndexSearcher(), releaseMono, localQueryParams, keyFieldName)
|
||||
.map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result.release()))
|
||||
.onErrorResume(ex -> releaseMono.then(Mono.error(ex)));
|
||||
});
|
||||
@ -482,7 +482,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
return searcherManager.captureIndexSearcher(snapshot)
|
||||
.flatMap(indexSearcher -> {
|
||||
Mono<Void> releaseMono = searcherManager.releaseUsedIndexSearcher(snapshot, indexSearcher);
|
||||
return shardSearcher.searchOn(indexSearcher, releaseMono, localQueryParams)
|
||||
return shardSearcher.searchOn(indexSearcher.getIndexSearcher(), releaseMono, localQueryParams)
|
||||
.onErrorResume(ex -> releaseMono.then(Mono.error(ex)));
|
||||
});
|
||||
}
|
||||
|
@ -7,12 +7,14 @@ import it.cavallium.dbengine.database.LLSnapshot;
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.locks.LockSupport;
|
||||
import java.util.function.Function;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.SearcherFactory;
|
||||
import org.apache.lucene.search.SearcherManager;
|
||||
import org.apache.lucene.search.similarities.Similarity;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.reactivestreams.Publisher;
|
||||
@ -28,9 +30,10 @@ public class PooledIndexSearcherManager {
|
||||
private final Similarity similarity;
|
||||
private final SearcherManager searcherManager;
|
||||
private final Duration queryRefreshDebounceTime;
|
||||
private final AtomicInteger activeSearchers = new AtomicInteger(0);
|
||||
|
||||
private final LoadingCache<LLSnapshot, Mono<IndexSearcher>> cachedSnapshotSearchers;
|
||||
private final Mono<IndexSearcher> cachedMainSearcher;
|
||||
private final LoadingCache<LLSnapshot, Mono<CachedIndexSearcher>> cachedSnapshotSearchers;
|
||||
private final Mono<CachedIndexSearcher> cachedMainSearcher;
|
||||
|
||||
private final Empty<Void> closeRequested = Sinks.empty();
|
||||
private final Empty<Void> refresherClosed = Sinks.empty();
|
||||
@ -65,14 +68,14 @@ public class PooledIndexSearcherManager {
|
||||
.maximumSize(3)
|
||||
.build(new CacheLoader<>() {
|
||||
@Override
|
||||
public Mono<IndexSearcher> load(@NotNull LLSnapshot snapshot) {
|
||||
public Mono<CachedIndexSearcher> load(@NotNull LLSnapshot snapshot) {
|
||||
return PooledIndexSearcherManager.this.generateCachedSearcher(snapshot);
|
||||
}
|
||||
});
|
||||
this.cachedMainSearcher = this.generateCachedSearcher(null);
|
||||
}
|
||||
|
||||
private Mono<IndexSearcher> generateCachedSearcher(@Nullable LLSnapshot snapshot) {
|
||||
private Mono<CachedIndexSearcher> generateCachedSearcher(@Nullable LLSnapshot snapshot) {
|
||||
return Mono.fromCallable(() -> {
|
||||
IndexSearcher indexSearcher;
|
||||
if (snapshot == null) {
|
||||
@ -81,7 +84,7 @@ public class PooledIndexSearcherManager {
|
||||
} else {
|
||||
indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher();
|
||||
}
|
||||
return indexSearcher;
|
||||
return new CachedIndexSearcher(indexSearcher);
|
||||
})
|
||||
.cacheInvalidateWhen(indexSearcher -> Mono
|
||||
.firstWithSignal(
|
||||
@ -93,9 +96,13 @@ public class PooledIndexSearcherManager {
|
||||
//noinspection SynchronizationOnLocalVariableOrMethodParameter
|
||||
synchronized (indexSearcher) {
|
||||
// Close
|
||||
if (indexSearcher.getIndexReader().getRefCount() <= 0) {
|
||||
if (indexSearcher.removeFromCache()) {
|
||||
try {
|
||||
if (snapshot == null) {
|
||||
searcherManager.release(indexSearcher);
|
||||
searcherManager.release(indexSearcher.getIndexSearcher());
|
||||
}
|
||||
} finally {
|
||||
activeSearchers.decrementAndGet();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -110,23 +117,33 @@ public class PooledIndexSearcherManager {
|
||||
try {
|
||||
boolean refreshStarted = searcherManager.maybeRefresh();
|
||||
// if refreshStarted == false, another thread is currently already refreshing
|
||||
} catch (AlreadyClosedException ignored) {
|
||||
|
||||
} catch (IOException ex) {
|
||||
ex.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
public void maybeRefreshBlocking() throws IOException {
|
||||
try {
|
||||
searcherManager.maybeRefreshBlocking();
|
||||
} catch (AlreadyClosedException ignored) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
public void maybeRefresh() throws IOException {
|
||||
try {
|
||||
searcherManager.maybeRefresh();
|
||||
} catch (AlreadyClosedException ignored) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
public <T> Flux<T> searchMany(@Nullable LLSnapshot snapshot, Function<IndexSearcher, Flux<T>> searcherFunction) {
|
||||
return Flux.usingWhen(
|
||||
this.captureIndexSearcher(snapshot),
|
||||
searcherFunction,
|
||||
indexSearcher -> searcherFunction.apply(indexSearcher.getIndexSearcher()),
|
||||
indexSearcher -> this.releaseUsedIndexSearcher(snapshot, indexSearcher)
|
||||
);
|
||||
}
|
||||
@ -134,19 +151,22 @@ public class PooledIndexSearcherManager {
|
||||
public <T> Mono<T> search(@Nullable LLSnapshot snapshot, Function<IndexSearcher, Mono<T>> searcherFunction) {
|
||||
return Mono.usingWhen(
|
||||
this.captureIndexSearcher(snapshot),
|
||||
searcherFunction,
|
||||
indexSearcher -> searcherFunction.apply(indexSearcher.getIndexSearcher()),
|
||||
indexSearcher -> this.releaseUsedIndexSearcher(snapshot, indexSearcher)
|
||||
);
|
||||
}
|
||||
|
||||
public Mono<IndexSearcher> captureIndexSearcher(@Nullable LLSnapshot snapshot) {
|
||||
public Mono<CachedIndexSearcher> captureIndexSearcher(@Nullable LLSnapshot snapshot) {
|
||||
return this
|
||||
.retrieveCachedIndexSearcher(snapshot)
|
||||
// Increment reference count
|
||||
.doOnNext(indexSearcher -> indexSearcher.getIndexReader().incRef());
|
||||
.doOnNext(indexSearcher -> {
|
||||
activeSearchers.incrementAndGet();
|
||||
indexSearcher.incUsage();
|
||||
});
|
||||
}
|
||||
|
||||
private Mono<IndexSearcher> retrieveCachedIndexSearcher(LLSnapshot snapshot) {
|
||||
private Mono<CachedIndexSearcher> retrieveCachedIndexSearcher(LLSnapshot snapshot) {
|
||||
if (snapshot == null) {
|
||||
return this.cachedMainSearcher;
|
||||
} else {
|
||||
@ -154,16 +174,20 @@ public class PooledIndexSearcherManager {
|
||||
}
|
||||
}
|
||||
|
||||
public Mono<Void> releaseUsedIndexSearcher(@Nullable LLSnapshot snapshot, IndexSearcher indexSearcher) {
|
||||
public Mono<Void> releaseUsedIndexSearcher(@Nullable LLSnapshot snapshot, CachedIndexSearcher indexSearcher) {
|
||||
return Mono.fromRunnable(() -> {
|
||||
try {
|
||||
synchronized (indexSearcher) {
|
||||
// Decrement reference count
|
||||
indexSearcher.getIndexReader().decRef();
|
||||
// Close
|
||||
if (indexSearcher.getIndexReader().getRefCount() <= 0) {
|
||||
if (indexSearcher.decUsage()) {
|
||||
try {
|
||||
if (snapshot == null) {
|
||||
searcherManager.release(indexSearcher);
|
||||
searcherManager.release(indexSearcher.getIndexSearcher());
|
||||
}
|
||||
} finally {
|
||||
activeSearchers.decrementAndGet();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -178,6 +202,10 @@ public class PooledIndexSearcherManager {
|
||||
.fromRunnable(this.closeRequested::tryEmitEmpty)
|
||||
.then(refresherClosed.asMono())
|
||||
.then(Mono.fromRunnable(() -> {
|
||||
while (activeSearchers.get() > 0) {
|
||||
// Park for 100ms
|
||||
LockSupport.parkNanos(100000000L);
|
||||
}
|
||||
cachedSnapshotSearchers.invalidateAll();
|
||||
cachedSnapshotSearchers.cleanUp();
|
||||
}));
|
||||
|
Loading…
Reference in New Issue
Block a user