2020-12-07 22:15:18 +01:00
|
|
|
package it.cavallium.dbengine.database.disk;
|
|
|
|
|
2020-12-31 12:04:53 +01:00
|
|
|
import it.cavallium.dbengine.database.LLDocument;
|
|
|
|
import it.cavallium.dbengine.database.LLKeyScore;
|
|
|
|
import it.cavallium.dbengine.database.LLLuceneIndex;
|
2021-01-29 17:19:01 +01:00
|
|
|
import it.cavallium.dbengine.database.LLScoreMode;
|
|
|
|
import it.cavallium.dbengine.database.LLSearchResult;
|
2020-12-31 12:04:53 +01:00
|
|
|
import it.cavallium.dbengine.database.LLSnapshot;
|
|
|
|
import it.cavallium.dbengine.database.LLSort;
|
|
|
|
import it.cavallium.dbengine.database.LLTerm;
|
|
|
|
import it.cavallium.dbengine.database.LLUtils;
|
|
|
|
import it.cavallium.dbengine.database.LuceneUtils;
|
|
|
|
import it.cavallium.dbengine.database.analyzer.TextFieldsAnalyzer;
|
2020-12-07 22:15:18 +01:00
|
|
|
import it.cavallium.dbengine.database.luceneutil.AdaptiveStreamSearcher;
|
2020-12-31 12:04:53 +01:00
|
|
|
import it.cavallium.dbengine.database.luceneutil.LuceneStreamSearcher;
|
2021-01-29 17:19:01 +01:00
|
|
|
import it.cavallium.dbengine.database.luceneutil.PagedStreamSearcher;
|
2020-12-31 12:04:53 +01:00
|
|
|
import it.cavallium.luceneserializer.luceneserializer.QueryParser;
|
2020-12-07 22:15:18 +01:00
|
|
|
import java.io.IOException;
|
|
|
|
import java.nio.file.Path;
|
|
|
|
import java.time.Duration;
|
|
|
|
import java.util.Map;
|
|
|
|
import java.util.Objects;
|
2021-01-29 17:19:01 +01:00
|
|
|
import java.util.Optional;
|
2020-12-07 22:15:18 +01:00
|
|
|
import java.util.Set;
|
2021-01-29 17:19:01 +01:00
|
|
|
import java.util.concurrent.ArrayBlockingQueue;
|
2020-12-07 22:15:18 +01:00
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
import java.util.concurrent.Executors;
|
|
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
import org.apache.lucene.index.IndexCommit;
|
|
|
|
import org.apache.lucene.index.IndexWriter;
|
|
|
|
import org.apache.lucene.index.IndexWriterConfig;
|
|
|
|
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
|
|
|
|
import org.apache.lucene.index.SnapshotDeletionPolicy;
|
|
|
|
import org.apache.lucene.queries.mlt.MoreLikeThis;
|
|
|
|
import org.apache.lucene.search.IndexSearcher;
|
|
|
|
import org.apache.lucene.search.Query;
|
2021-01-29 17:19:01 +01:00
|
|
|
import org.apache.lucene.search.ScoreMode;
|
2020-12-07 22:15:18 +01:00
|
|
|
import org.apache.lucene.search.SearcherManager;
|
|
|
|
import org.apache.lucene.search.Sort;
|
|
|
|
import org.apache.lucene.store.Directory;
|
2020-12-31 20:10:47 +01:00
|
|
|
import org.apache.lucene.store.FSDirectory;
|
2020-12-07 22:15:18 +01:00
|
|
|
import org.jetbrains.annotations.Nullable;
|
2020-12-12 23:41:09 +01:00
|
|
|
import org.warp.commonutils.concurrency.executor.ScheduledTaskLifecycle;
|
2020-12-07 22:15:18 +01:00
|
|
|
import org.warp.commonutils.type.ShortNamedThreadFactory;
|
|
|
|
import reactor.core.publisher.Flux;
|
|
|
|
import reactor.core.publisher.Mono;
|
2021-01-29 17:19:01 +01:00
|
|
|
import reactor.core.publisher.Sinks;
|
|
|
|
import reactor.core.publisher.Sinks.EmissionException;
|
|
|
|
import reactor.core.publisher.Sinks.EmitResult;
|
|
|
|
import reactor.core.publisher.Sinks.Many;
|
|
|
|
import reactor.core.publisher.Sinks.One;
|
2020-12-07 22:15:18 +01:00
|
|
|
import reactor.core.scheduler.Schedulers;
|
|
|
|
import reactor.util.function.Tuples;
|
|
|
|
|
|
|
|
public class LLLocalLuceneIndex implements LLLuceneIndex {
|
|
|
|
|
|
|
|
private static final LuceneStreamSearcher streamSearcher = new AdaptiveStreamSearcher();
|
2020-12-12 23:41:09 +01:00
|
|
|
/**
|
|
|
|
* Global lucene index scheduler.
|
|
|
|
* There is only a single thread globally to not overwhelm the disk with
|
|
|
|
* parallel commits or parallel refreshes.
|
|
|
|
*/
|
|
|
|
private static final ScheduledExecutorService scheduler
|
|
|
|
= Executors.newSingleThreadScheduledExecutor(new ShortNamedThreadFactory("Lucene"));
|
|
|
|
|
2020-12-07 22:15:18 +01:00
|
|
|
private final String luceneIndexName;
|
|
|
|
private final SnapshotDeletionPolicy snapshotter;
|
|
|
|
private final IndexWriter indexWriter;
|
|
|
|
private final SearcherManager searcherManager;
|
|
|
|
private final Directory directory;
|
|
|
|
/**
|
|
|
|
* Last snapshot sequence number. 0 is not used
|
|
|
|
*/
|
|
|
|
private final AtomicLong lastSnapshotSeqNo = new AtomicLong(0);
|
|
|
|
/**
|
|
|
|
* Snapshot seq no to index commit point
|
|
|
|
*/
|
|
|
|
private final ConcurrentHashMap<Long, LuceneIndexSnapshot> snapshots = new ConcurrentHashMap<>();
|
|
|
|
private final boolean lowMemory;
|
|
|
|
|
2020-12-12 23:41:09 +01:00
|
|
|
private final ScheduledTaskLifecycle scheduledTasksLifecycle;
|
|
|
|
|
2020-12-07 22:15:18 +01:00
|
|
|
public LLLocalLuceneIndex(Path luceneBasePath,
|
|
|
|
String name,
|
|
|
|
TextFieldsAnalyzer analyzer,
|
|
|
|
Duration queryRefreshDebounceTime,
|
|
|
|
Duration commitDebounceTime,
|
|
|
|
boolean lowMemory) throws IOException {
|
|
|
|
if (name.length() == 0) {
|
|
|
|
throw new IOException("Empty lucene database name");
|
|
|
|
}
|
|
|
|
Path directoryPath = luceneBasePath.resolve(name + ".lucene.db");
|
2020-12-31 20:10:47 +01:00
|
|
|
this.directory = FSDirectory.open(directoryPath);
|
2020-12-07 22:15:18 +01:00
|
|
|
this.luceneIndexName = name;
|
|
|
|
this.snapshotter = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
|
|
|
|
this.lowMemory = lowMemory;
|
|
|
|
IndexWriterConfig indexWriterConfig = new IndexWriterConfig(LuceneUtils.getAnalyzer(analyzer));
|
|
|
|
indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND);
|
|
|
|
indexWriterConfig.setIndexDeletionPolicy(snapshotter);
|
|
|
|
indexWriterConfig.setCommitOnClose(true);
|
|
|
|
if (lowMemory) {
|
|
|
|
indexWriterConfig.setRAMBufferSizeMB(32);
|
|
|
|
indexWriterConfig.setRAMPerThreadHardLimitMB(32);
|
|
|
|
} else {
|
|
|
|
indexWriterConfig.setRAMBufferSizeMB(128);
|
|
|
|
indexWriterConfig.setRAMPerThreadHardLimitMB(512);
|
|
|
|
}
|
|
|
|
this.indexWriter = new IndexWriter(directory, indexWriterConfig);
|
|
|
|
this.searcherManager = new SearcherManager(indexWriter, false, false, null);
|
2020-12-12 23:41:09 +01:00
|
|
|
|
|
|
|
// Create scheduled tasks lifecycle manager
|
|
|
|
this.scheduledTasksLifecycle = new ScheduledTaskLifecycle();
|
|
|
|
|
|
|
|
// Start scheduled tasks
|
|
|
|
registerScheduledFixedTask(this::scheduledCommit, commitDebounceTime);
|
|
|
|
registerScheduledFixedTask(this::scheduledQueryRefresh, queryRefreshDebounceTime);
|
|
|
|
}
|
|
|
|
|
|
|
|
private void registerScheduledFixedTask(Runnable task, Duration duration) {
|
2020-12-31 12:05:04 +01:00
|
|
|
scheduledTasksLifecycle.registerScheduledTask(scheduler.scheduleAtFixedRate(() -> {
|
|
|
|
scheduledTasksLifecycle.startScheduledTask();
|
|
|
|
try {
|
|
|
|
task.run();
|
|
|
|
} finally {
|
|
|
|
scheduledTasksLifecycle.endScheduledTask();
|
|
|
|
}
|
|
|
|
}, duration.toMillis(), duration.toMillis(), TimeUnit.MILLISECONDS));
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public String getLuceneIndexName() {
|
|
|
|
return luceneIndexName;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-01-30 01:41:04 +01:00
|
|
|
public Mono<LLSnapshot> takeSnapshot() {
|
|
|
|
return Mono
|
|
|
|
.fromCallable(lastSnapshotSeqNo::incrementAndGet)
|
|
|
|
.subscribeOn(Schedulers.boundedElastic())
|
|
|
|
.flatMap(snapshotSeqNo -> takeLuceneSnapshot()
|
|
|
|
.flatMap(snapshot -> Mono
|
|
|
|
.fromCallable(() -> {
|
|
|
|
this.snapshots.put(snapshotSeqNo, new LuceneIndexSnapshot(snapshot));
|
|
|
|
return new LLSnapshot(snapshotSeqNo);
|
|
|
|
})
|
|
|
|
.subscribeOn(Schedulers.boundedElastic())
|
|
|
|
)
|
|
|
|
);
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Use internally. This method commits before taking the snapshot if there are no commits in a new database,
|
|
|
|
* avoiding the exception.
|
|
|
|
*/
|
2021-01-30 01:41:04 +01:00
|
|
|
private Mono<IndexCommit> takeLuceneSnapshot() {
|
|
|
|
return Mono.fromCallable(() -> {
|
|
|
|
try {
|
2020-12-07 22:15:18 +01:00
|
|
|
return snapshotter.snapshot();
|
2021-01-30 01:41:04 +01:00
|
|
|
} catch (IllegalStateException ex) {
|
|
|
|
if ("No index commit to snapshot".equals(ex.getMessage())) {
|
|
|
|
indexWriter.commit();
|
|
|
|
return snapshotter.snapshot();
|
|
|
|
} else {
|
|
|
|
throw ex;
|
|
|
|
}
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
2021-01-30 01:41:04 +01:00
|
|
|
}).subscribeOn(Schedulers.boundedElastic());
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-01-30 01:41:04 +01:00
|
|
|
public Mono<Void> releaseSnapshot(LLSnapshot snapshot) {
|
|
|
|
return Mono.<Void>fromCallable(() -> {
|
|
|
|
var indexSnapshot = this.snapshots.remove(snapshot.getSequenceNumber());
|
|
|
|
if (indexSnapshot == null) {
|
|
|
|
throw new IOException("Snapshot " + snapshot.getSequenceNumber() + " not found!");
|
|
|
|
}
|
2020-12-07 22:15:18 +01:00
|
|
|
|
2021-01-30 01:41:04 +01:00
|
|
|
indexSnapshot.close();
|
2020-12-07 22:15:18 +01:00
|
|
|
|
2021-01-30 01:41:04 +01:00
|
|
|
var luceneIndexSnapshot = indexSnapshot.getSnapshot();
|
|
|
|
snapshotter.release(luceneIndexSnapshot);
|
|
|
|
// Delete unused files after releasing the snapshot
|
|
|
|
indexWriter.deleteUnusedFiles();
|
|
|
|
return null;
|
|
|
|
}).subscribeOn(Schedulers.boundedElastic());
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-01-30 01:41:04 +01:00
|
|
|
public Mono<Void> addDocument(LLTerm key, LLDocument doc) {
|
|
|
|
return Mono.<Void>fromCallable(() -> {
|
|
|
|
indexWriter.addDocument(LLUtils.toDocument(doc));
|
|
|
|
return null;
|
|
|
|
}).subscribeOn(Schedulers.boundedElastic());
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-01-30 01:41:04 +01:00
|
|
|
public Mono<Void> addDocuments(Iterable<LLTerm> keys, Iterable<LLDocument> docs) {
|
|
|
|
return Mono.<Void>fromCallable(() -> {
|
2020-12-07 22:15:18 +01:00
|
|
|
indexWriter.addDocuments(LLUtils.toDocuments(docs));
|
2021-01-30 01:41:04 +01:00
|
|
|
return null;
|
|
|
|
}).subscribeOn(Schedulers.boundedElastic());
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-01-30 01:41:04 +01:00
|
|
|
public Mono<Void> deleteDocument(LLTerm id) {
|
|
|
|
return Mono.<Void>fromCallable(() -> {
|
|
|
|
indexWriter.deleteDocuments(LLUtils.toTerm(id));
|
|
|
|
return null;
|
|
|
|
}).subscribeOn(Schedulers.boundedElastic());
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-01-30 01:41:04 +01:00
|
|
|
public Mono<Void> updateDocument(LLTerm id, LLDocument document) {
|
|
|
|
return Mono.<Void>fromCallable(() -> {
|
|
|
|
indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(document));
|
|
|
|
return null;
|
|
|
|
}).subscribeOn(Schedulers.boundedElastic());
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-01-30 01:41:04 +01:00
|
|
|
public Mono<Void> updateDocuments(Iterable<LLTerm> ids, Iterable<LLDocument> documents) {
|
|
|
|
return Mono.<Void>fromCallable(() -> {
|
|
|
|
var idIt = ids.iterator();
|
|
|
|
var docIt = documents.iterator();
|
|
|
|
while (idIt.hasNext()) {
|
|
|
|
var id = idIt.next();
|
|
|
|
var doc = docIt.next();
|
|
|
|
|
|
|
|
indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(doc));
|
|
|
|
}
|
|
|
|
return null;
|
|
|
|
}).subscribeOn(Schedulers.boundedElastic());
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-01-30 01:41:04 +01:00
|
|
|
public Mono<Void> deleteAll() {
|
|
|
|
return Mono.<Void>fromCallable(() -> {
|
|
|
|
indexWriter.deleteAll();
|
|
|
|
indexWriter.commit();
|
|
|
|
indexWriter.forceMergeDeletes(true);
|
|
|
|
indexWriter.flush();
|
|
|
|
indexWriter.commit();
|
|
|
|
return null;
|
|
|
|
}).subscribeOn(Schedulers.boundedElastic());
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
2021-01-29 17:19:01 +01:00
|
|
|
private Mono<IndexSearcher> acquireSearcherWrapper(LLSnapshot snapshot) {
|
|
|
|
return Mono.fromCallable(() -> {
|
|
|
|
if (snapshot == null) {
|
|
|
|
return searcherManager.acquire();
|
|
|
|
} else {
|
|
|
|
return resolveSnapshot(snapshot).getIndexSearcher();
|
|
|
|
}
|
|
|
|
}).subscribeOn(Schedulers.boundedElastic());
|
|
|
|
}
|
2020-12-07 22:15:18 +01:00
|
|
|
|
2021-01-29 17:19:01 +01:00
|
|
|
private Mono<Void> releaseSearcherWrapper(LLSnapshot snapshot, IndexSearcher indexSearcher) {
|
|
|
|
return Mono.<Void>fromRunnable(() -> {
|
|
|
|
if (snapshot == null) {
|
|
|
|
try {
|
|
|
|
searcherManager.release(indexSearcher);
|
|
|
|
} catch (IOException e) {
|
|
|
|
e.printStackTrace();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}).subscribeOn(Schedulers.boundedElastic());
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
2021-01-29 17:19:01 +01:00
|
|
|
@SuppressWarnings({"Convert2MethodRef", "unchecked", "rawtypes"})
|
2020-12-07 22:15:18 +01:00
|
|
|
@Override
|
2021-01-29 17:19:01 +01:00
|
|
|
public Mono<LLSearchResult> moreLikeThis(@Nullable LLSnapshot snapshot,
|
|
|
|
Map<String, Set<String>> mltDocumentFields,
|
|
|
|
int limit,
|
|
|
|
String keyFieldName) {
|
2020-12-07 22:15:18 +01:00
|
|
|
if (mltDocumentFields.isEmpty()) {
|
2021-01-29 17:19:01 +01:00
|
|
|
return Mono.just(LLSearchResult.empty());
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
2021-01-29 17:19:01 +01:00
|
|
|
return acquireSearcherWrapper(snapshot)
|
|
|
|
.flatMap(indexSearcher -> Mono
|
|
|
|
.fromCallable(() -> {
|
|
|
|
var mlt = new MoreLikeThis(indexSearcher.getIndexReader());
|
|
|
|
mlt.setAnalyzer(indexWriter.getAnalyzer());
|
|
|
|
mlt.setFieldNames(mltDocumentFields.keySet().toArray(String[]::new));
|
|
|
|
mlt.setMinTermFreq(1);
|
|
|
|
//mlt.setMinDocFreq(1);
|
|
|
|
mlt.setBoost(true);
|
|
|
|
|
|
|
|
// Get the reference doc and apply it to MoreLikeThis, to generate the query
|
|
|
|
return mlt.like((Map) mltDocumentFields);
|
|
|
|
})
|
|
|
|
.subscribeOn(Schedulers.boundedElastic())
|
|
|
|
.flatMap(query -> Mono
|
|
|
|
.fromCallable(() -> {
|
|
|
|
One<Long> totalHitsCountSink = Sinks.one();
|
|
|
|
Many<LLKeyScore> topKeysSink = Sinks.many().unicast().onBackpressureBuffer(new ArrayBlockingQueue<>(1000));
|
|
|
|
|
|
|
|
streamSearcher.search(indexSearcher,
|
|
|
|
query,
|
|
|
|
limit,
|
|
|
|
null,
|
|
|
|
ScoreMode.COMPLETE,
|
|
|
|
keyFieldName,
|
|
|
|
keyScore -> {
|
|
|
|
EmitResult result = topKeysSink.tryEmitNext(keyScore);
|
|
|
|
if (result.isFailure()) {
|
|
|
|
throw new EmissionException(result);
|
|
|
|
}
|
|
|
|
},
|
|
|
|
totalHitsCount -> {
|
|
|
|
EmitResult result = totalHitsCountSink.tryEmitValue(totalHitsCount);
|
|
|
|
if (result.isFailure()) {
|
|
|
|
throw new EmissionException(result);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
return new LLSearchResult(totalHitsCountSink.asMono(), Flux.just(topKeysSink.asFlux()));
|
|
|
|
}).subscribeOn(Schedulers.boundedElastic())
|
|
|
|
).then()
|
|
|
|
.materialize()
|
|
|
|
.flatMap(value -> releaseSearcherWrapper(snapshot, indexSearcher).thenReturn(value))
|
|
|
|
.dematerialize()
|
|
|
|
);
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
2021-01-29 17:19:01 +01:00
|
|
|
@SuppressWarnings("Convert2MethodRef")
|
2020-12-07 22:15:18 +01:00
|
|
|
@Override
|
2021-01-29 17:19:01 +01:00
|
|
|
public Mono<LLSearchResult> search(@Nullable LLSnapshot snapshot, String queryString, int limit,
|
|
|
|
@Nullable LLSort sort, LLScoreMode scoreMode, String keyFieldName) {
|
|
|
|
|
|
|
|
return acquireSearcherWrapper(snapshot)
|
|
|
|
.flatMap(indexSearcher -> Mono
|
|
|
|
.fromCallable(() -> {
|
|
|
|
Query query = QueryParser.parse(queryString);
|
|
|
|
Sort luceneSort = LLUtils.toSort(sort);
|
|
|
|
org.apache.lucene.search.ScoreMode luceneScoreMode = LLUtils.toScoreMode(scoreMode);
|
|
|
|
return Tuples.of(query, Optional.ofNullable(luceneSort), luceneScoreMode);
|
|
|
|
})
|
|
|
|
.subscribeOn(Schedulers.boundedElastic())
|
|
|
|
.flatMap(tuple -> Mono
|
|
|
|
.fromCallable(() -> {
|
|
|
|
Query query = tuple.getT1();
|
|
|
|
Sort luceneSort = tuple.getT2().orElse(null);
|
|
|
|
ScoreMode luceneScoreMode = tuple.getT3();
|
|
|
|
|
|
|
|
One<Long> totalHitsCountSink = Sinks.one();
|
|
|
|
Many<LLKeyScore> topKeysSink = Sinks.many().unicast().onBackpressureBuffer(new ArrayBlockingQueue<>(PagedStreamSearcher.MAX_ITEMS_PER_PAGE));
|
|
|
|
|
|
|
|
streamSearcher.search(indexSearcher,
|
|
|
|
query,
|
|
|
|
limit,
|
|
|
|
luceneSort,
|
|
|
|
luceneScoreMode,
|
|
|
|
keyFieldName,
|
|
|
|
keyScore -> {
|
|
|
|
EmitResult result = topKeysSink.tryEmitNext(keyScore);
|
|
|
|
if (result.isFailure()) {
|
|
|
|
throw new EmissionException(result);
|
|
|
|
}
|
|
|
|
},
|
|
|
|
totalHitsCount -> {
|
|
|
|
EmitResult result = totalHitsCountSink.tryEmitValue(totalHitsCount);
|
|
|
|
if (result.isFailure()) {
|
|
|
|
throw new EmissionException(result);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
return new LLSearchResult(totalHitsCountSink.asMono(), Flux.just(topKeysSink.asFlux()));
|
|
|
|
}).subscribeOn(Schedulers.boundedElastic())
|
|
|
|
)
|
|
|
|
.materialize()
|
|
|
|
.flatMap(value -> releaseSearcherWrapper(snapshot, indexSearcher).thenReturn(value))
|
|
|
|
.dematerialize()
|
|
|
|
);
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-01-30 01:41:04 +01:00
|
|
|
public Mono<Void> close() {
|
|
|
|
return Mono
|
|
|
|
.<Void>fromCallable(() -> {
|
|
|
|
scheduledTasksLifecycle.cancelAndWait();
|
|
|
|
indexWriter.close();
|
|
|
|
directory.close();
|
|
|
|
return null;
|
|
|
|
})
|
|
|
|
.subscribeOn(Schedulers.boundedElastic());
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
private void scheduledCommit() {
|
|
|
|
try {
|
|
|
|
if (indexWriter.hasUncommittedChanges()) {
|
|
|
|
indexWriter.commit();
|
|
|
|
}
|
|
|
|
} catch (IOException ex) {
|
|
|
|
ex.printStackTrace();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-01-29 17:19:01 +01:00
|
|
|
@SuppressWarnings("unused")
|
2020-12-07 22:15:18 +01:00
|
|
|
private void scheduledQueryRefresh() {
|
|
|
|
try {
|
2021-01-29 17:19:01 +01:00
|
|
|
boolean refreshStarted = searcherManager.maybeRefresh();
|
|
|
|
// if refreshStarted == false, another thread is currently already refreshing
|
2020-12-07 22:15:18 +01:00
|
|
|
} catch (IOException ex) {
|
|
|
|
ex.printStackTrace();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private LuceneIndexSnapshot resolveSnapshot(@Nullable LLSnapshot snapshot) {
|
|
|
|
if (snapshot == null) {
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
return Objects.requireNonNull(snapshots.get(snapshot.getSequenceNumber()),
|
|
|
|
() -> "Can't resolve snapshot " + snapshot.getSequenceNumber()
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public boolean isLowMemoryMode() {
|
|
|
|
return lowMemory;
|
|
|
|
}
|
|
|
|
}
|