Release cached searcher
This commit is contained in:
parent
20c19f662b
commit
23fa46c775
@ -140,18 +140,18 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
|
|||||||
return Mono.fromCallable(() -> {
|
return Mono.fromCallable(() -> {
|
||||||
activeSearchers.register();
|
activeSearchers.register();
|
||||||
IndexSearcher indexSearcher;
|
IndexSearcher indexSearcher;
|
||||||
|
boolean decRef;
|
||||||
if (snapshot == null) {
|
if (snapshot == null) {
|
||||||
indexSearcher = searcherManager.acquire();
|
indexSearcher = searcherManager.acquire();
|
||||||
|
decRef = true;
|
||||||
} else {
|
} else {
|
||||||
indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher();
|
indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher();
|
||||||
|
decRef = false;
|
||||||
}
|
}
|
||||||
indexSearcher.setSimilarity(similarity);
|
indexSearcher.setSimilarity(similarity);
|
||||||
assert indexSearcher.getIndexReader().getRefCount() > 0;
|
assert indexSearcher.getIndexReader().getRefCount() > 0;
|
||||||
return indexSearcher;
|
return new LLIndexSearcher(indexSearcher, decRef, this::dropCachedIndexSearcher).send();
|
||||||
})
|
})
|
||||||
// todo: re-enable caching if needed
|
|
||||||
//.cacheInvalidateWhen(tuple -> onInvalidateCache)
|
|
||||||
.map(indexSearcher -> new LLIndexSearcher(indexSearcher, this::dropCachedIndexSearcher).send())
|
|
||||||
.takeUntilOther(onClose)
|
.takeUntilOther(onClose)
|
||||||
.doOnDiscard(Send.class, Send::close);
|
.doOnDiscard(Send.class, Send::close);
|
||||||
});
|
});
|
||||||
|
@ -2,23 +2,24 @@ package it.cavallium.dbengine.database.disk;
|
|||||||
|
|
||||||
import io.net5.buffer.api.Drop;
|
import io.net5.buffer.api.Drop;
|
||||||
import io.net5.buffer.api.Owned;
|
import io.net5.buffer.api.Owned;
|
||||||
import io.net5.buffer.api.internal.ResourceSupport;
|
|
||||||
import it.cavallium.dbengine.database.LiveResourceSupport;
|
import it.cavallium.dbengine.database.LiveResourceSupport;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import org.apache.lucene.index.IndexReader;
|
import org.apache.lucene.index.IndexReader;
|
||||||
import org.apache.lucene.search.IndexSearcher;
|
import org.apache.lucene.search.IndexSearcher;
|
||||||
import org.apache.lucene.search.SearcherManager;
|
|
||||||
import org.jetbrains.annotations.Nullable;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
public class LLIndexSearcher extends LiveResourceSupport<LLIndexSearcher, LLIndexSearcher> {
|
public class LLIndexSearcher extends LiveResourceSupport<LLIndexSearcher, LLIndexSearcher> {
|
||||||
|
|
||||||
private IndexSearcher indexSearcher;
|
private static final Logger logger = LoggerFactory.getLogger(LLIndexSearcher.class);
|
||||||
|
|
||||||
public LLIndexSearcher(IndexSearcher indexSearcher, Drop<LLIndexSearcher> drop) {
|
private IndexSearcher indexSearcher;
|
||||||
super(drop);
|
private final boolean decRef;
|
||||||
|
|
||||||
|
public LLIndexSearcher(IndexSearcher indexSearcher, boolean decRef, Drop<LLIndexSearcher> drop) {
|
||||||
|
super(new CloseOnDrop(drop));
|
||||||
this.indexSearcher = indexSearcher;
|
this.indexSearcher = indexSearcher;
|
||||||
|
this.decRef = decRef;
|
||||||
}
|
}
|
||||||
|
|
||||||
public IndexReader getIndexReader() {
|
public IndexReader getIndexReader() {
|
||||||
@ -43,10 +44,33 @@ public class LLIndexSearcher extends LiveResourceSupport<LLIndexSearcher, LLInde
|
|||||||
@Override
|
@Override
|
||||||
protected Owned<LLIndexSearcher> prepareSend() {
|
protected Owned<LLIndexSearcher> prepareSend() {
|
||||||
var indexSearcher = this.indexSearcher;
|
var indexSearcher = this.indexSearcher;
|
||||||
return drop -> new LLIndexSearcher(indexSearcher, drop);
|
return drop -> new LLIndexSearcher(indexSearcher, decRef, drop);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void makeInaccessible() {
|
protected void makeInaccessible() {
|
||||||
this.indexSearcher = null;
|
this.indexSearcher = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class CloseOnDrop implements Drop<LLIndexSearcher> {
|
||||||
|
|
||||||
|
private final Drop<LLIndexSearcher> delegate;
|
||||||
|
|
||||||
|
public CloseOnDrop(Drop<LLIndexSearcher> drop) {
|
||||||
|
if (drop instanceof CloseOnDrop closeOnDrop) {
|
||||||
|
this.delegate = closeOnDrop.delegate;
|
||||||
|
} else {
|
||||||
|
this.delegate = drop;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void drop(LLIndexSearcher obj) {
|
||||||
|
try {
|
||||||
|
obj.indexSearcher.getIndexReader().decRef();
|
||||||
|
} catch (IOException ex) {
|
||||||
|
logger.error("Failed to drop IndexReader", ex);
|
||||||
|
}
|
||||||
|
delegate.drop(obj);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -43,7 +43,7 @@ public class SimpleUnsortedUnscoredLuceneMultiSearcher implements LuceneMultiSea
|
|||||||
indexSearchers -> Flux
|
indexSearchers -> Flux
|
||||||
.fromIterable(indexSearchers.shards())
|
.fromIterable(indexSearchers.shards())
|
||||||
.flatMap(searcher -> {
|
.flatMap(searcher -> {
|
||||||
var llSearcher = Mono.fromCallable(() -> new LLIndexSearcher(searcher, d -> {}).send());
|
var llSearcher = Mono.fromCallable(() -> new LLIndexSearcher(searcher, false, d -> {}).send());
|
||||||
return localSearcher.collect(llSearcher, localQueryParams, keyFieldName, transformer);
|
return localSearcher.collect(llSearcher, localQueryParams, keyFieldName, transformer);
|
||||||
})
|
})
|
||||||
.collectList()
|
.collectList()
|
||||||
|
Loading…
Reference in New Issue
Block a user