Pool searchers

This commit is contained in:
Andrea Cavalli 2021-09-06 15:06:51 +02:00
parent 6c73c1e86d
commit d5964a7bed
4 changed files with 360 additions and 198 deletions

View File

@ -5,13 +5,10 @@ import it.cavallium.dbengine.client.IndicizerAnalyzers;
import it.cavallium.dbengine.client.IndicizerSimilarities;
import it.cavallium.dbengine.client.LuceneOptions;
import it.cavallium.dbengine.client.NRTCachingOptions;
import it.cavallium.dbengine.client.query.QueryParser;
import it.cavallium.dbengine.client.query.current.data.QueryParams;
import it.cavallium.dbengine.database.EnglishItalianStopFilter;
import it.cavallium.dbengine.database.LLDocument;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLSearchResult;
import it.cavallium.dbengine.database.LLSearchResultShard;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLTerm;
@ -20,10 +17,8 @@ import it.cavallium.dbengine.lucene.AlwaysDirectIOFSDirectory;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.ScheduledTaskLifecycle;
import it.cavallium.dbengine.lucene.searcher.AdaptiveLuceneLocalSearcher;
import it.cavallium.dbengine.lucene.searcher.AdaptiveLuceneMultiSearcher;
import it.cavallium.dbengine.lucene.searcher.LocalQueryParams;
import it.cavallium.dbengine.lucene.searcher.LuceneLocalSearcher;
import it.cavallium.dbengine.lucene.searcher.LuceneMultiSearcher;
import it.cavallium.dbengine.lucene.searcher.LuceneShardSearcher;
import java.io.IOException;
import java.nio.file.Path;
@ -31,15 +26,9 @@ import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.index.ConcurrentMergeScheduler;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
@ -51,14 +40,9 @@ import org.apache.lucene.queries.mlt.MoreLikeThis;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.ConstantScoreQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.MatchNoDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.search.similarities.TFIDFSimilarity;
import org.apache.lucene.store.ByteBuffersDirectory;
@ -76,7 +60,6 @@ import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
public class LLLocalLuceneIndex implements LLLuceneIndex {
@ -95,20 +78,12 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
);
private final String luceneIndexName;
private final SnapshotDeletionPolicy snapshotter;
private final IndexWriter indexWriter;
private final SearcherManager searcherManager;
private final Directory directory;
/**
* Last snapshot sequence number. 0 is not used
*/
private final AtomicLong lastSnapshotSeqNo = new AtomicLong(0);
/**
* LLSnapshot seq no to index commit point
*/
private final ConcurrentHashMap<Long, LuceneIndexSnapshot> snapshots = new ConcurrentHashMap<>();
private final boolean lowMemory;
private final SnapshotsManager snapshotsManager;
private final PooledIndexSearcherManager searcherManager;
private final Similarity similarity;
private final Directory directory;
private final boolean lowMemory;
private final ScheduledTaskLifecycle scheduledTasksLifecycle;
@ -191,10 +166,13 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
}
this.luceneIndexName = name;
this.snapshotter = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
var snapshotter = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
this.lowMemory = lowMemory;
this.similarity = LuceneUtils.toPerFieldSimilarityWrapper(indicizerSimilarities);
// Create scheduled tasks lifecycle manager
this.scheduledTasksLifecycle = new ScheduledTaskLifecycle();
IndexWriterConfig indexWriterConfig = new IndexWriterConfig(LuceneUtils.toPerFieldAnalyzerWrapper(indicizerAnalyzers));
indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND);
indexWriterConfig.setIndexDeletionPolicy(snapshotter);
@ -217,18 +195,17 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
indexWriterConfig.setReaderPooling(false);
indexWriterConfig.setSimilarity(getSimilarity());
this.indexWriter = new IndexWriter(directory, indexWriterConfig);
this.searcherManager = new SearcherManager(indexWriter,
this.snapshotsManager = new SnapshotsManager(indexWriter, snapshotter, scheduledTasksLifecycle);
this.searcherManager = new PooledIndexSearcherManager(indexWriter,
snapshotsManager,
getSimilarity(),
luceneOptions.applyAllDeletes(),
luceneOptions.writeAllDeletes(),
new SearcherFactory()
luceneOptions.queryRefreshDebounceTime()
);
// Create scheduled tasks lifecycle manager
this.scheduledTasksLifecycle = new ScheduledTaskLifecycle();
// Start scheduled tasks
registerScheduledFixedTask(this::scheduledCommit, luceneOptions.commitDebounceTime());
registerScheduledFixedTask(this::scheduledQueryRefresh, luceneOptions.queryRefreshDebounceTime());
}
private Similarity getSimilarity() {
@ -284,67 +261,12 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
@Override
public Mono<LLSnapshot> takeSnapshot() {
return takeLuceneSnapshot()
.flatMap(snapshot -> Mono
.fromCallable(() -> {
var snapshotSeqNo = lastSnapshotSeqNo.incrementAndGet();
this.snapshots.put(snapshotSeqNo, new LuceneIndexSnapshot(snapshot));
return new LLSnapshot(snapshotSeqNo);
})
.subscribeOn(Schedulers.boundedElastic())
);
}
/**
* Use internally. This method commits before taking the snapshot if there are no commits in a new database,
* avoiding the exception.
*/
private Mono<IndexCommit> takeLuceneSnapshot() {
return Mono
.fromCallable(snapshotter::snapshot)
.subscribeOn(Schedulers.boundedElastic())
.onErrorResume(ex -> Mono
.defer(() -> {
if (ex instanceof IllegalStateException && "No index commit to snapshot".equals(ex.getMessage())) {
return Mono.fromCallable(() -> {
scheduledTasksLifecycle.startScheduledTask();
try {
//noinspection BlockingMethodInNonBlockingContext
indexWriter.commit();
//noinspection BlockingMethodInNonBlockingContext
return snapshotter.snapshot();
} finally {
scheduledTasksLifecycle.endScheduledTask();
}
}).subscribeOn(luceneHeavyTasksScheduler);
} else {
return Mono.error(ex);
}
})
);
return snapshotsManager.takeSnapshot().subscribeOn(luceneHeavyTasksScheduler);
}
@Override
public Mono<Void> releaseSnapshot(LLSnapshot snapshot) {
return Mono.<Void>fromCallable(() -> {
scheduledTasksLifecycle.startScheduledTask();
try {
var indexSnapshot = this.snapshots.remove(snapshot.getSequenceNumber());
if (indexSnapshot == null) {
throw new IOException("LLSnapshot " + snapshot.getSequenceNumber() + " not found!");
}
indexSnapshot.close();
var luceneIndexSnapshot = indexSnapshot.getSnapshot();
snapshotter.release(luceneIndexSnapshot);
// Delete unused files after releasing the snapshot
indexWriter.deleteUnusedFiles();
return null;
} finally {
scheduledTasksLifecycle.endScheduledTask();
}
}).subscribeOn(Schedulers.boundedElastic());
return snapshotsManager.releaseSnapshot(snapshot);
}
@Override
@ -444,40 +366,15 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
}).subscribeOn(luceneHeavyTasksScheduler);
}
private Mono<IndexSearcher> acquireSearcherWrapper(LLSnapshot snapshot) {
return Mono.fromCallable(() -> {
IndexSearcher indexSearcher;
if (snapshot == null) {
indexSearcher = searcherManager.acquire();
indexSearcher.setSimilarity(getSimilarity());
} else {
indexSearcher = resolveSnapshot(snapshot).getIndexSearcher();
}
return indexSearcher;
}).subscribeOn(Schedulers.boundedElastic());
}
private Mono<Void> releaseSearcherWrapper(LLSnapshot snapshot, IndexSearcher indexSearcher) {
return Mono.<Void>fromRunnable(() -> {
if (snapshot == null) {
try {
searcherManager.release(indexSearcher);
} catch (IOException e) {
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.boundedElastic());
}
@Override
public Mono<LLSearchResultShard> moreLikeThis(@Nullable LLSnapshot snapshot,
QueryParams queryParams,
String keyFieldName,
Flux<Tuple2<String, Set<String>>> mltDocumentFieldsFlux) {
return getMoreLikeThisQuery(snapshot, LuceneUtils.toLocalQueryParams(queryParams), mltDocumentFieldsFlux)
.flatMap(modifiedLocalQuery -> this.acquireSearcherWrapper(snapshot)
.flatMap(modifiedLocalQuery -> searcherManager.captureIndexSearcher(snapshot)
.flatMap(indexSearcher -> {
Mono<Void> releaseMono = releaseSearcherWrapper(snapshot, indexSearcher);
Mono<Void> releaseMono = searcherManager.releaseUsedIndexSearcher(snapshot, indexSearcher);
return localSearcher
.collect(indexSearcher, releaseMono, modifiedLocalQuery, keyFieldName)
.map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result.release()))
@ -491,9 +388,9 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
Flux<Tuple2<String, Set<String>>> mltDocumentFieldsFlux,
LuceneShardSearcher shardSearcher) {
return getMoreLikeThisQuery(snapshot, LuceneUtils.toLocalQueryParams(queryParams), mltDocumentFieldsFlux)
.flatMap(modifiedLocalQuery -> this.acquireSearcherWrapper(snapshot)
.flatMap(modifiedLocalQuery -> searcherManager.captureIndexSearcher(snapshot)
.flatMap(indexSearcher -> {
Mono<Void> releaseMono = releaseSearcherWrapper(snapshot, indexSearcher);
Mono<Void> releaseMono = searcherManager.releaseUsedIndexSearcher(snapshot, indexSearcher);
return shardSearcher
.searchOn(indexSearcher, releaseMono, modifiedLocalQuery)
.onErrorResume(ex -> releaseMono.then(Mono.error(ex)));
@ -523,75 +420,68 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
localQueryParams.scoreMode()
));
}
return Mono
.usingWhen(
this.acquireSearcherWrapper(snapshot),
indexSearcher -> Mono
.fromCallable(() -> {
var mlt = new MoreLikeThis(indexSearcher.getIndexReader());
mlt.setAnalyzer(indexWriter.getAnalyzer());
mlt.setFieldNames(mltDocumentFields.keySet().toArray(String[]::new));
mlt.setMinTermFreq(1);
mlt.setMinDocFreq(3);
mlt.setMaxDocFreqPct(20);
mlt.setBoost(localQueryParams.scoreMode().needsScores());
mlt.setStopWords(EnglishItalianStopFilter.getStopWordsString());
var similarity = getSimilarity();
if (similarity instanceof TFIDFSimilarity) {
mlt.setSimilarity((TFIDFSimilarity) similarity);
} else {
logger.trace("Using an unsupported similarity algorithm for MoreLikeThis:"
+ " {}. You must use a similarity instance based on TFIDFSimilarity!", similarity);
}
return this.searcherManager.search(snapshot, indexSearcher -> Mono.fromCallable(() -> {
var mlt = new MoreLikeThis(indexSearcher.getIndexReader());
mlt.setAnalyzer(indexWriter.getAnalyzer());
mlt.setFieldNames(mltDocumentFields.keySet().toArray(String[]::new));
mlt.setMinTermFreq(1);
mlt.setMinDocFreq(3);
mlt.setMaxDocFreqPct(20);
mlt.setBoost(localQueryParams.scoreMode().needsScores());
mlt.setStopWords(EnglishItalianStopFilter.getStopWordsString());
var similarity = getSimilarity();
if (similarity instanceof TFIDFSimilarity) {
mlt.setSimilarity((TFIDFSimilarity) similarity);
} else {
logger.trace("Using an unsupported similarity algorithm for MoreLikeThis:"
+ " {}. You must use a similarity instance based on TFIDFSimilarity!", similarity);
}
// Get the reference docId and apply it to MoreLikeThis, to generate the query
@SuppressWarnings({"unchecked", "rawtypes"})
var mltQuery = mlt.like((Map) mltDocumentFields);
Query luceneQuery;
if (!(luceneAdditionalQuery instanceof MatchAllDocsQuery)) {
luceneQuery = new BooleanQuery.Builder()
.add(mltQuery, Occur.MUST)
.add(new ConstantScoreQuery(luceneAdditionalQuery), Occur.MUST)
.build();
} else {
luceneQuery = mltQuery;
}
// Get the reference docId and apply it to MoreLikeThis, to generate the query
@SuppressWarnings({"unchecked", "rawtypes"})
var mltQuery = mlt.like((Map) mltDocumentFields);
Query luceneQuery;
if (!(luceneAdditionalQuery instanceof MatchAllDocsQuery)) {
luceneQuery = new BooleanQuery.Builder()
.add(mltQuery, Occur.MUST)
.add(new ConstantScoreQuery(luceneAdditionalQuery), Occur.MUST)
.build();
} else {
luceneQuery = mltQuery;
}
return luceneQuery;
})
.subscribeOn(Schedulers.boundedElastic())
.map(luceneQuery -> new LocalQueryParams(luceneQuery,
localQueryParams.offset(),
localQueryParams.limit(),
localQueryParams.minCompetitiveScore(),
localQueryParams.sort(),
localQueryParams.scoreMode()
)),
indexSearcher -> releaseSearcherWrapper(snapshot, indexSearcher)
);
return luceneQuery;
})
.subscribeOn(Schedulers.boundedElastic())
.map(luceneQuery -> new LocalQueryParams(luceneQuery,
localQueryParams.offset(),
localQueryParams.limit(),
localQueryParams.minCompetitiveScore(),
localQueryParams.sort(),
localQueryParams.scoreMode()
)));
});
}
@Override
public Mono<LLSearchResultShard> search(@Nullable LLSnapshot snapshot, QueryParams queryParams, String keyFieldName) {
LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams);
return this.acquireSearcherWrapper(snapshot)
.flatMap(indexSearcher -> {
Mono<Void> releaseMono = releaseSearcherWrapper(snapshot, indexSearcher);
return localSearcher
.collect(indexSearcher, releaseMono, localQueryParams, keyFieldName)
.map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result.release()))
.onErrorResume(ex -> releaseMono.then(Mono.error(ex)));
});
return searcherManager.captureIndexSearcher(snapshot).flatMap(indexSearcher -> {
Mono<Void> releaseMono = searcherManager.releaseUsedIndexSearcher(snapshot, indexSearcher);
return localSearcher
.collect(indexSearcher, releaseMono, localQueryParams, keyFieldName)
.map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result.release()))
.onErrorResume(ex -> releaseMono.then(Mono.error(ex)));
});
}
public Mono<Void> distributedSearch(@Nullable LLSnapshot snapshot,
QueryParams queryParams,
LuceneShardSearcher shardSearcher) {
LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams);
return this.acquireSearcherWrapper(snapshot)
return searcherManager.captureIndexSearcher(snapshot)
.flatMap(indexSearcher -> {
Mono<Void> releaseMono = releaseSearcherWrapper(snapshot, indexSearcher);
Mono<Void> releaseMono = searcherManager.releaseUsedIndexSearcher(snapshot, indexSearcher);
return shardSearcher.searchOn(indexSearcher, releaseMono, localQueryParams)
.onErrorResume(ex -> releaseMono.then(Mono.error(ex)));
});
@ -603,14 +493,18 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.<Void>fromCallable(() -> {
logger.debug("Closing IndexWriter...");
scheduledTasksLifecycle.cancelAndWait();
return null;
})
.subscribeOn(luceneHeavyTasksScheduler)
.then(searcherManager.close())
.then(Mono.<Void>fromCallable(() -> {
//noinspection BlockingMethodInNonBlockingContext
indexWriter.close();
//noinspection BlockingMethodInNonBlockingContext
directory.close();
logger.debug("IndexWriter closed");
return null;
})
.subscribeOn(luceneHeavyTasksScheduler);
}).subscribeOn(luceneHeavyTasksScheduler));
}
@Override
@ -663,25 +557,6 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
}
}
@SuppressWarnings("unused")
private void scheduledQueryRefresh() {
try {
boolean refreshStarted = searcherManager.maybeRefresh();
// if refreshStarted == false, another thread is currently already refreshing
} catch (IOException ex) {
ex.printStackTrace();
}
}
private LuceneIndexSnapshot resolveSnapshot(@Nullable LLSnapshot snapshot) {
if (snapshot == null) {
return null;
}
return Objects.requireNonNull(snapshots.get(snapshot.getSequenceNumber()),
() -> "Can't resolve snapshot " + snapshot.getSequenceNumber()
);
}
@Override
public boolean isLowMemoryMode() {
return lowMemory;

View File

@ -0,0 +1,183 @@
package it.cavallium.dbengine.database.disk;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import it.cavallium.dbengine.database.LLSnapshot;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.similarities.Similarity;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Empty;
import reactor.core.scheduler.Schedulers;
public class PooledIndexSearcherManager {
private final SnapshotsManager snapshotsManager;
private final Similarity similarity;
private final SearcherManager searcherManager;
private final Duration queryRefreshDebounceTime;
private final LoadingCache<LLSnapshot, Mono<IndexSearcher>> cachedSnapshotSearchers;
private final Mono<IndexSearcher> cachedMainSearcher;
private final Empty<Void> closeRequested = Sinks.empty();
private final Empty<Void> refresherClosed = Sinks.empty();
public PooledIndexSearcherManager(IndexWriter indexWriter,
SnapshotsManager snapshotsManager,
Similarity similarity,
boolean applyAllDeletes,
boolean writeAllDeletes,
Duration queryRefreshDebounceTime) throws IOException {
this.snapshotsManager = snapshotsManager;
this.similarity = similarity;
this.queryRefreshDebounceTime = queryRefreshDebounceTime;
this.searcherManager = new SearcherManager(indexWriter,
applyAllDeletes,
writeAllDeletes,
new SearcherFactory()
);
Mono
.fromRunnable(this::scheduledQueryRefresh)
.repeatWhen(s -> s.delayElements(queryRefreshDebounceTime, Schedulers.boundedElastic()))
.subscribeOn(Schedulers.boundedElastic())
.takeUntilOther(closeRequested.asMono())
.doAfterTerminate(refresherClosed::tryEmitEmpty)
.subscribe();
this.cachedSnapshotSearchers = CacheBuilder.newBuilder()
.expireAfterWrite(queryRefreshDebounceTime)
.build(new CacheLoader<>() {
@Override
public Mono<IndexSearcher> load(@NotNull LLSnapshot snapshot) {
return PooledIndexSearcherManager.this.generateCachedSearcher(snapshot);
}
});
this.cachedMainSearcher = this.generateCachedSearcher(null);
}
private Mono<IndexSearcher> generateCachedSearcher(@Nullable LLSnapshot snapshot) {
return Mono.fromCallable(() -> {
IndexSearcher indexSearcher;
if (snapshot == null) {
indexSearcher = searcherManager.acquire();
indexSearcher.setSimilarity(similarity);
} else {
indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher();
}
return indexSearcher;
})
.cacheInvalidateWhen(indexSearcher -> Mono
.firstWithSignal(
this.closeRequested.asMono(),
Mono.delay(queryRefreshDebounceTime, Schedulers.boundedElastic()).then()
),
indexSearcher -> {
try {
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (indexSearcher) {
// Close
if (indexSearcher.getIndexReader().getRefCount() <= 0) {
if (snapshot == null) {
searcherManager.release(indexSearcher);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
});
}
@SuppressWarnings("unused")
private void scheduledQueryRefresh() {
try {
boolean refreshStarted = searcherManager.maybeRefresh();
// if refreshStarted == false, another thread is currently already refreshing
} catch (IOException ex) {
ex.printStackTrace();
}
}
public void maybeRefreshBlocking() throws IOException {
searcherManager.maybeRefreshBlocking();
}
public void maybeRefresh() throws IOException {
searcherManager.maybeRefresh();
}
public <T> Flux<T> searchMany(@Nullable LLSnapshot snapshot, Function<IndexSearcher, Flux<T>> searcherFunction) {
return Flux.usingWhen(
this.captureIndexSearcher(snapshot),
searcherFunction,
indexSearcher -> this.releaseUsedIndexSearcher(snapshot, indexSearcher)
);
}
public <T> Mono<T> search(@Nullable LLSnapshot snapshot, Function<IndexSearcher, Mono<T>> searcherFunction) {
return Mono.usingWhen(
this.captureIndexSearcher(snapshot),
searcherFunction,
indexSearcher -> this.releaseUsedIndexSearcher(snapshot, indexSearcher)
);
}
public Mono<IndexSearcher> captureIndexSearcher(@Nullable LLSnapshot snapshot) {
return this
.retrieveCachedIndexSearcher(snapshot)
// Increment reference count
.doOnNext(indexSearcher -> indexSearcher.getIndexReader().incRef());
}
private Mono<IndexSearcher> retrieveCachedIndexSearcher(LLSnapshot snapshot) {
if (snapshot == null) {
return this.cachedMainSearcher;
} else {
return this.cachedSnapshotSearchers.getUnchecked(snapshot);
}
}
public Mono<Void> releaseUsedIndexSearcher(@Nullable LLSnapshot snapshot, IndexSearcher indexSearcher) {
return Mono.fromRunnable(() -> {
try {
synchronized (indexSearcher) {
// Decrement reference count
indexSearcher.getIndexReader().decRef();
// Close
if (indexSearcher.getIndexReader().getRefCount() <= 0) {
if (snapshot == null) {
searcherManager.release(indexSearcher);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
});
}
public Mono<Void> close() {
return Mono
.fromRunnable(this.closeRequested::tryEmitEmpty)
.then(refresherClosed.asMono())
.then(Mono.fromRunnable(() -> {
cachedSnapshotSearchers.invalidateAll();
cachedSnapshotSearchers.cleanUp();
}));
}
}

View File

@ -0,0 +1,103 @@
package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.lucene.ScheduledTaskLifecycle;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class SnapshotsManager {
private final IndexWriter indexWriter;
private final SnapshotDeletionPolicy snapshotter;
private final ScheduledTaskLifecycle scheduledTasksLifecycle;
/**
* Last snapshot sequence number. 0 is not used
*/
private final AtomicLong lastSnapshotSeqNo = new AtomicLong(0);
/**
* LLSnapshot seq no to index commit point
*/
private final ConcurrentHashMap<Long, LuceneIndexSnapshot> snapshots = new ConcurrentHashMap<>();
public SnapshotsManager(IndexWriter indexWriter,
SnapshotDeletionPolicy snapshotter,
ScheduledTaskLifecycle scheduledTasksLifecycle) {
this.indexWriter = indexWriter;
this.snapshotter = snapshotter;
this.scheduledTasksLifecycle = scheduledTasksLifecycle;
}
public LuceneIndexSnapshot resolveSnapshot(@Nullable LLSnapshot snapshot) {
if (snapshot == null) {
return null;
}
return Objects.requireNonNull(snapshots.get(snapshot.getSequenceNumber()),
() -> "Can't resolve snapshot " + snapshot.getSequenceNumber()
);
}
public Mono<LLSnapshot> takeSnapshot() {
return takeLuceneSnapshot().map(snapshot -> {
var snapshotSeqNo = lastSnapshotSeqNo.incrementAndGet();
this.snapshots.put(snapshotSeqNo, new LuceneIndexSnapshot(snapshot));
return new LLSnapshot(snapshotSeqNo);
});
}
/**
* Use internally. This method commits before taking the snapshot if there are no commits in a new database,
* avoiding the exception.
*/
private Mono<IndexCommit> takeLuceneSnapshot() {
return Mono
.fromCallable(snapshotter::snapshot)
.subscribeOn(Schedulers.boundedElastic())
.onErrorResume(ex -> Mono
.defer(() -> {
if (ex instanceof IllegalStateException && "No index commit to snapshot".equals(ex.getMessage())) {
return Mono.fromCallable(() -> {
scheduledTasksLifecycle.startScheduledTask();
try {
indexWriter.commit();
return snapshotter.snapshot();
} finally {
scheduledTasksLifecycle.endScheduledTask();
}
});
} else {
return Mono.error(ex);
}
})
);
}
public Mono<Void> releaseSnapshot(LLSnapshot snapshot) {
return Mono.<Void>fromCallable(() -> {
scheduledTasksLifecycle.startScheduledTask();
try {
var indexSnapshot = this.snapshots.remove(snapshot.getSequenceNumber());
if (indexSnapshot == null) {
throw new IOException("LLSnapshot " + snapshot.getSequenceNumber() + " not found!");
}
indexSnapshot.close();
var luceneIndexSnapshot = indexSnapshot.getSnapshot();
snapshotter.release(luceneIndexSnapshot);
// Delete unused files after releasing the snapshot
indexWriter.deleteUnusedFiles();
return null;
} finally {
scheduledTasksLifecycle.endScheduledTask();
}
}).subscribeOn(Schedulers.boundedElastic());
}
}

View File

@ -19,7 +19,8 @@ public class AdaptiveLuceneLocalSearcher implements LuceneLocalSearcher {
LocalQueryParams queryParams,
String keyFieldName) {
if (Schedulers.isInNonBlockingThread()) {
return Mono.error(() -> new UnsupportedOperationException("Called collect in a nonblocking thread"));
return releaseIndexSearcher
.then(Mono.error(() -> new UnsupportedOperationException("Called collect in a nonblocking thread")));
}
if (queryParams.limit() == 0) {
return countSearcher.collect(indexSearcher, releaseIndexSearcher, queryParams, keyFieldName);