diff --git a/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcher.java b/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcher.java new file mode 100644 index 0000000..47a96c9 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcher.java @@ -0,0 +1,55 @@ +package it.cavallium.dbengine.database.disk; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.search.IndexSearcher; + +public class CachedIndexSearcher { + + private final IndexSearcher indexSearcher; + private boolean inCache = true; + private int usages = 0; + + public CachedIndexSearcher(IndexSearcher indexSearcher) { + this.indexSearcher = indexSearcher; + } + + public void incUsage() { + synchronized (this) { + usages++; + } + } + + /** + * + * @return true if closed + */ + public boolean decUsage() { + synchronized (this) { + usages--; + return isClosed(); + } + } + + /** + * + * @return true if closed + */ + public boolean removeFromCache() { + synchronized (this) { + inCache = false; + return isClosed(); + } + } + + private boolean isClosed() { + return this.inCache || this.usages > 0; + } + + public IndexReader getIndexReader() { + return indexSearcher.getIndexReader(); + } + + public IndexSearcher getIndexSearcher() { + return indexSearcher; + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index e0f5635..6628192 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -376,7 +376,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { .flatMap(indexSearcher -> { Mono releaseMono = searcherManager.releaseUsedIndexSearcher(snapshot, indexSearcher); return localSearcher - .collect(indexSearcher, releaseMono, modifiedLocalQuery, keyFieldName) + .collect(indexSearcher.getIndexSearcher(), releaseMono, modifiedLocalQuery, keyFieldName) .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result.release())) .onErrorResume(ex -> releaseMono.then(Mono.error(ex))); }) @@ -392,7 +392,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { .flatMap(indexSearcher -> { Mono releaseMono = searcherManager.releaseUsedIndexSearcher(snapshot, indexSearcher); return shardSearcher - .searchOn(indexSearcher, releaseMono, modifiedLocalQuery) + .searchOn(indexSearcher.getIndexSearcher(), releaseMono, modifiedLocalQuery) .onErrorResume(ex -> releaseMono.then(Mono.error(ex))); }) ); @@ -469,7 +469,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { return searcherManager.captureIndexSearcher(snapshot).flatMap(indexSearcher -> { Mono releaseMono = searcherManager.releaseUsedIndexSearcher(snapshot, indexSearcher); return localSearcher - .collect(indexSearcher, releaseMono, localQueryParams, keyFieldName) + .collect(indexSearcher.getIndexSearcher(), releaseMono, localQueryParams, keyFieldName) .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result.release())) .onErrorResume(ex -> releaseMono.then(Mono.error(ex))); }); @@ -482,7 +482,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { return searcherManager.captureIndexSearcher(snapshot) .flatMap(indexSearcher -> { Mono releaseMono = searcherManager.releaseUsedIndexSearcher(snapshot, indexSearcher); - return shardSearcher.searchOn(indexSearcher, releaseMono, localQueryParams) + return shardSearcher.searchOn(indexSearcher.getIndexSearcher(), releaseMono, localQueryParams) .onErrorResume(ex -> releaseMono.then(Mono.error(ex))); }); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/PooledIndexSearcherManager.java b/src/main/java/it/cavallium/dbengine/database/disk/PooledIndexSearcherManager.java index bb58c8b..e70b773 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/PooledIndexSearcherManager.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/PooledIndexSearcherManager.java @@ -7,12 +7,14 @@ import it.cavallium.dbengine.database.LLSnapshot; import java.io.IOException; import java.time.Duration; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.LockSupport; import java.util.function.Function; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.SearcherFactory; import org.apache.lucene.search.SearcherManager; import org.apache.lucene.search.similarities.Similarity; +import org.apache.lucene.store.AlreadyClosedException; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Publisher; @@ -28,9 +30,10 @@ public class PooledIndexSearcherManager { private final Similarity similarity; private final SearcherManager searcherManager; private final Duration queryRefreshDebounceTime; + private final AtomicInteger activeSearchers = new AtomicInteger(0); - private final LoadingCache> cachedSnapshotSearchers; - private final Mono cachedMainSearcher; + private final LoadingCache> cachedSnapshotSearchers; + private final Mono cachedMainSearcher; private final Empty closeRequested = Sinks.empty(); private final Empty refresherClosed = Sinks.empty(); @@ -65,14 +68,14 @@ public class PooledIndexSearcherManager { .maximumSize(3) .build(new CacheLoader<>() { @Override - public Mono load(@NotNull LLSnapshot snapshot) { + public Mono load(@NotNull LLSnapshot snapshot) { return PooledIndexSearcherManager.this.generateCachedSearcher(snapshot); } }); this.cachedMainSearcher = this.generateCachedSearcher(null); } - private Mono generateCachedSearcher(@Nullable LLSnapshot snapshot) { + private Mono generateCachedSearcher(@Nullable LLSnapshot snapshot) { return Mono.fromCallable(() -> { IndexSearcher indexSearcher; if (snapshot == null) { @@ -81,7 +84,7 @@ public class PooledIndexSearcherManager { } else { indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher(); } - return indexSearcher; + return new CachedIndexSearcher(indexSearcher); }) .cacheInvalidateWhen(indexSearcher -> Mono .firstWithSignal( @@ -93,9 +96,13 @@ public class PooledIndexSearcherManager { //noinspection SynchronizationOnLocalVariableOrMethodParameter synchronized (indexSearcher) { // Close - if (indexSearcher.getIndexReader().getRefCount() <= 0) { - if (snapshot == null) { - searcherManager.release(indexSearcher); + if (indexSearcher.removeFromCache()) { + try { + if (snapshot == null) { + searcherManager.release(indexSearcher.getIndexSearcher()); + } + } finally { + activeSearchers.decrementAndGet(); } } } @@ -110,23 +117,33 @@ public class PooledIndexSearcherManager { try { boolean refreshStarted = searcherManager.maybeRefresh(); // if refreshStarted == false, another thread is currently already refreshing + } catch (AlreadyClosedException ignored) { + } catch (IOException ex) { ex.printStackTrace(); } } public void maybeRefreshBlocking() throws IOException { - searcherManager.maybeRefreshBlocking(); + try { + searcherManager.maybeRefreshBlocking(); + } catch (AlreadyClosedException ignored) { + + } } public void maybeRefresh() throws IOException { - searcherManager.maybeRefresh(); + try { + searcherManager.maybeRefresh(); + } catch (AlreadyClosedException ignored) { + + } } public Flux searchMany(@Nullable LLSnapshot snapshot, Function> searcherFunction) { return Flux.usingWhen( this.captureIndexSearcher(snapshot), - searcherFunction, + indexSearcher -> searcherFunction.apply(indexSearcher.getIndexSearcher()), indexSearcher -> this.releaseUsedIndexSearcher(snapshot, indexSearcher) ); } @@ -134,19 +151,22 @@ public class PooledIndexSearcherManager { public Mono search(@Nullable LLSnapshot snapshot, Function> searcherFunction) { return Mono.usingWhen( this.captureIndexSearcher(snapshot), - searcherFunction, + indexSearcher -> searcherFunction.apply(indexSearcher.getIndexSearcher()), indexSearcher -> this.releaseUsedIndexSearcher(snapshot, indexSearcher) ); } - public Mono captureIndexSearcher(@Nullable LLSnapshot snapshot) { + public Mono captureIndexSearcher(@Nullable LLSnapshot snapshot) { return this .retrieveCachedIndexSearcher(snapshot) // Increment reference count - .doOnNext(indexSearcher -> indexSearcher.getIndexReader().incRef()); + .doOnNext(indexSearcher -> { + activeSearchers.incrementAndGet(); + indexSearcher.incUsage(); + }); } - private Mono retrieveCachedIndexSearcher(LLSnapshot snapshot) { + private Mono retrieveCachedIndexSearcher(LLSnapshot snapshot) { if (snapshot == null) { return this.cachedMainSearcher; } else { @@ -154,16 +174,20 @@ public class PooledIndexSearcherManager { } } - public Mono releaseUsedIndexSearcher(@Nullable LLSnapshot snapshot, IndexSearcher indexSearcher) { + public Mono releaseUsedIndexSearcher(@Nullable LLSnapshot snapshot, CachedIndexSearcher indexSearcher) { return Mono.fromRunnable(() -> { try { synchronized (indexSearcher) { // Decrement reference count indexSearcher.getIndexReader().decRef(); // Close - if (indexSearcher.getIndexReader().getRefCount() <= 0) { - if (snapshot == null) { - searcherManager.release(indexSearcher); + if (indexSearcher.decUsage()) { + try { + if (snapshot == null) { + searcherManager.release(indexSearcher.getIndexSearcher()); + } + } finally { + activeSearchers.decrementAndGet(); } } } @@ -178,6 +202,10 @@ public class PooledIndexSearcherManager { .fromRunnable(this.closeRequested::tryEmitEmpty) .then(refresherClosed.asMono()) .then(Mono.fromRunnable(() -> { + while (activeSearchers.get() > 0) { + // Park for 100ms + LockSupport.parkNanos(100000000L); + } cachedSnapshotSearchers.invalidateAll(); cachedSnapshotSearchers.cleanUp(); }));