CavalliumDBEngine/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java

622 lines
23 KiB
Java
Raw Normal View History

2020-12-07 22:15:18 +01:00
package it.cavallium.dbengine.database.disk;
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
2021-09-10 12:13:52 +02:00
import static it.cavallium.dbengine.database.LLUtils.MARKER_LUCENE;
import static it.cavallium.dbengine.database.LLUtils.toDocument;
import static it.cavallium.dbengine.database.LLUtils.toFields;
import static it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.NO_TRANSFORMATION;
2021-09-10 12:13:52 +02:00
2021-12-30 17:28:06 +01:00
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
2021-12-30 17:28:06 +01:00
import io.micrometer.core.instrument.Timer;
2021-11-08 11:17:52 +01:00
import io.net5.buffer.api.Resource;
2021-09-18 18:34:21 +02:00
import io.net5.buffer.api.Send;
2021-07-01 21:19:52 +02:00
import it.cavallium.dbengine.client.DirectIOOptions;
2021-05-28 16:04:59 +02:00
import it.cavallium.dbengine.client.IndicizerAnalyzers;
import it.cavallium.dbengine.client.IndicizerSimilarities;
2021-07-01 21:19:52 +02:00
import it.cavallium.dbengine.client.LuceneOptions;
import it.cavallium.dbengine.client.NRTCachingOptions;
2021-11-19 19:03:31 +01:00
import it.cavallium.dbengine.client.query.QueryParser;
import it.cavallium.dbengine.client.query.current.data.Query;
2021-03-02 01:53:36 +01:00
import it.cavallium.dbengine.client.query.current.data.QueryParams;
2021-11-07 17:46:40 +01:00
import it.cavallium.dbengine.database.LLIndexRequest;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLSearchResultShard;
import it.cavallium.dbengine.database.LLSnapshot;
2021-12-30 17:28:06 +01:00
import it.cavallium.dbengine.database.LLSoftUpdateDocument;
import it.cavallium.dbengine.database.LLTerm;
2021-12-30 17:28:06 +01:00
import it.cavallium.dbengine.database.LLUpdateDocument;
2021-11-07 17:46:40 +01:00
import it.cavallium.dbengine.database.LLUpdateFields;
import it.cavallium.dbengine.database.LLUtils;
2021-07-01 21:19:52 +02:00
import it.cavallium.dbengine.lucene.AlwaysDirectIOFSDirectory;
2021-12-30 17:28:06 +01:00
import it.cavallium.dbengine.lucene.LuceneHacks;
import it.cavallium.dbengine.lucene.LuceneUtils;
2021-11-19 19:03:31 +01:00
import it.cavallium.dbengine.lucene.collector.Buckets;
import it.cavallium.dbengine.lucene.searcher.AdaptiveLocalSearcher;
2021-11-18 17:13:53 +01:00
import it.cavallium.dbengine.lucene.searcher.BucketParams;
import it.cavallium.dbengine.lucene.searcher.DecimalBucketMultiSearcher;
2021-12-30 17:28:06 +01:00
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer;
2021-07-06 01:30:37 +02:00
import it.cavallium.dbengine.lucene.searcher.LocalQueryParams;
import it.cavallium.dbengine.lucene.searcher.LocalSearcher;
2020-12-07 22:15:18 +01:00
import java.io.IOException;
import java.nio.file.Path;
2021-11-19 19:03:31 +01:00
import java.util.ArrayList;
2021-11-18 17:13:53 +01:00
import java.util.List;
2020-12-07 22:15:18 +01:00
import java.util.Map;
import java.util.Map.Entry;
2021-12-30 17:28:06 +01:00
import java.util.Objects;
2020-12-07 22:15:18 +01:00
import java.util.Set;
import java.util.concurrent.Callable;
2021-09-06 18:52:21 +02:00
import java.util.concurrent.Phaser;
2020-12-07 22:15:18 +01:00
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
2021-12-31 00:58:47 +01:00
import org.apache.commons.lang3.time.StopWatch;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper;
2021-06-25 20:06:58 +02:00
import org.apache.lucene.index.ConcurrentMergeScheduler;
2020-12-07 22:15:18 +01:00
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
2021-07-06 14:33:47 +02:00
import org.apache.lucene.index.MergeScheduler;
2021-06-25 20:06:58 +02:00
import org.apache.lucene.index.SerialMergeScheduler;
2021-12-31 00:58:47 +01:00
import org.apache.lucene.index.SimpleMergedSegmentWarmer;
2020-12-07 22:15:18 +01:00
import org.apache.lucene.index.SnapshotDeletionPolicy;
2021-12-30 22:29:06 +01:00
import org.apache.lucene.index.TieredMergePolicy;
2021-05-25 01:12:24 +02:00
import org.apache.lucene.misc.store.DirectIODirectory;
import org.apache.lucene.search.similarities.Similarity;
2021-05-25 01:12:24 +02:00
import org.apache.lucene.store.ByteBuffersDirectory;
2020-12-07 22:15:18 +01:00
import org.apache.lucene.store.Directory;
2020-12-31 20:10:47 +01:00
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.MMapDirectory;
2021-05-25 01:12:24 +02:00
import org.apache.lucene.store.NIOFSDirectory;
import org.apache.lucene.store.NRTCachingDirectory;
import org.apache.lucene.util.Constants;
2021-11-19 19:03:31 +01:00
import org.jetbrains.annotations.NotNull;
2020-12-07 22:15:18 +01:00
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
2020-12-07 22:15:18 +01:00
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;
2020-12-07 22:15:18 +01:00
public class LLLocalLuceneIndex implements LLLuceneIndex {
protected static final Logger logger = LogManager.getLogger(LLLocalLuceneIndex.class);
private final LocalSearcher localSearcher;
2021-11-18 17:13:53 +01:00
private final DecimalBucketMultiSearcher decimalBucketMultiSearcher = new DecimalBucketMultiSearcher();
/**
* Global lucene index scheduler.
* There is only a single thread globally to not overwhelm the disk with
2021-02-03 14:37:02 +01:00
* concurrent commits or concurrent refreshes.
*/
private static final ReentrantLock shutdownLock = new ReentrantLock();
private static final Scheduler luceneHeavyTasksScheduler = uninterruptibleScheduler(Schedulers.single(Schedulers.boundedElastic()));
2021-12-30 17:28:06 +01:00
private final Counter startedDocIndexings;
private final Counter endeddDocIndexings;
private final Timer docIndexingTime;
private final Timer snapshotTime;
private final Timer flushTime;
private final Timer commitTime;
private final Timer mergeTime;
private final Timer refreshTime;
2020-12-07 22:15:18 +01:00
private final String luceneIndexName;
private final IndexWriter indexWriter;
2021-09-06 15:06:51 +02:00
private final SnapshotsManager snapshotsManager;
2021-09-18 18:34:21 +02:00
private final IndexSearcherManager searcherManager;
private final PerFieldAnalyzerWrapper luceneAnalyzer;
private final Similarity luceneSimilarity;
2020-12-07 22:15:18 +01:00
private final Directory directory;
private final boolean lowMemory;
2021-09-06 18:52:21 +02:00
private final Phaser activeTasks = new Phaser(1);
private final AtomicBoolean closeRequested = new AtomicBoolean();
2021-11-21 12:31:23 +01:00
public LLLocalLuceneIndex(LLTempLMDBEnv env,
@Nullable Path luceneBasePath,
MeterRegistry meterRegistry,
2021-12-30 17:28:06 +01:00
@Nullable String clusterName,
@Nullable String shardName,
2021-05-28 16:04:59 +02:00
IndicizerAnalyzers indicizerAnalyzers,
IndicizerSimilarities indicizerSimilarities,
LuceneOptions luceneOptions,
@Nullable LuceneHacks luceneHacks) throws IOException {
2021-12-30 17:28:06 +01:00
if (clusterName == null && shardName == null) {
throw new IllegalArgumentException("Clustern name and/or shard name must be set");
}
String logName = Objects.requireNonNullElse(clusterName, shardName);
String luceneIndexName = Objects.requireNonNullElse(shardName, clusterName);
2021-07-10 20:52:01 +02:00
Path directoryPath;
if (luceneOptions.inMemory() != (luceneBasePath == null)) {
throw new IllegalArgumentException();
} else if (luceneBasePath != null) {
2021-12-30 17:28:06 +01:00
directoryPath = luceneBasePath.resolve(shardName + ".lucene.db");
2021-07-10 20:52:01 +02:00
} else {
directoryPath = null;
}
2021-12-30 17:28:06 +01:00
if (luceneIndexName.length() == 0) {
2020-12-07 22:15:18 +01:00
throw new IOException("Empty lucene database name");
}
2021-05-25 01:12:24 +02:00
if (!MMapDirectory.UNMAP_SUPPORTED) {
logger.error("Unmap is unsupported, lucene will run slower: {}", MMapDirectory.UNMAP_NOT_SUPPORTED_REASON);
} else {
logger.debug("Lucene MMap is supported");
}
2021-07-01 21:19:52 +02:00
boolean lowMemory = luceneOptions.lowMemory();
if (luceneOptions.inMemory()) {
2021-05-25 01:12:24 +02:00
this.directory = new ByteBuffersDirectory();
} else {
Directory directory;
{
2021-07-01 21:19:52 +02:00
Directory forcedDirectFsDirectory = null;
if (luceneOptions.directIOOptions().isPresent()) {
DirectIOOptions directIOOptions = luceneOptions.directIOOptions().get();
if (directIOOptions.alwaysForceDirectIO()) {
try {
forcedDirectFsDirectory = new AlwaysDirectIOFSDirectory(directoryPath);
} catch (UnsupportedOperationException ex) {
logger.warn("Failed to open FSDirectory with DIRECT flag", ex);
}
}
}
if (forcedDirectFsDirectory != null) {
directory = forcedDirectFsDirectory;
} else {
FSDirectory fsDirectory;
if (luceneOptions.allowMemoryMapping()) {
fsDirectory = FSDirectory.open(directoryPath);
2021-05-25 01:12:24 +02:00
} else {
2021-07-01 21:19:52 +02:00
fsDirectory = new NIOFSDirectory(directoryPath);
}
if (Constants.LINUX || Constants.MAC_OS_X) {
try {
int mergeBufferSize;
long minBytesDirect;
if (luceneOptions.directIOOptions().isPresent()) {
var directIOOptions = luceneOptions.directIOOptions().get();
mergeBufferSize = directIOOptions.mergeBufferSize();
minBytesDirect = directIOOptions.minBytesDirect();
} else {
mergeBufferSize = DirectIODirectory.DEFAULT_MERGE_BUFFER_SIZE;
minBytesDirect = DirectIODirectory.DEFAULT_MIN_BYTES_DIRECT;
}
directory = new DirectIODirectory(fsDirectory, mergeBufferSize, minBytesDirect);
} catch (UnsupportedOperationException ex) {
logger.warn("Failed to open FSDirectory with DIRECT flag", ex);
directory = fsDirectory;
}
} else {
directory = fsDirectory;
2021-05-25 01:12:24 +02:00
}
}
}
2021-07-01 21:19:52 +02:00
if (luceneOptions.nrtCachingOptions().isPresent()) {
NRTCachingOptions nrtCachingOptions = luceneOptions.nrtCachingOptions().get();
2021-09-18 18:34:21 +02:00
directory = new NRTCachingDirectory(directory, nrtCachingOptions.maxMergeSizeMB(),
nrtCachingOptions.maxCachedMB());
2021-05-25 01:12:24 +02:00
}
this.directory = directory;
}
2021-06-06 02:23:51 +02:00
2021-12-30 17:28:06 +01:00
this.luceneIndexName = luceneIndexName;
2021-09-06 15:06:51 +02:00
var snapshotter = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
2020-12-07 22:15:18 +01:00
this.lowMemory = lowMemory;
this.luceneAnalyzer = LuceneUtils.toPerFieldAnalyzerWrapper(indicizerAnalyzers);
this.luceneSimilarity = LuceneUtils.toPerFieldSimilarityWrapper(indicizerSimilarities);
2021-12-12 16:41:49 +01:00
var useLMDB = luceneOptions.allowNonVolatileCollection();
2021-12-12 23:40:30 +01:00
var maxInMemoryResultEntries = luceneOptions.maxInMemoryResultEntries();
if (luceneHacks != null && luceneHacks.customLocalSearcher() != null) {
localSearcher = luceneHacks.customLocalSearcher().get();
} else {
2021-12-12 23:40:30 +01:00
localSearcher = new AdaptiveLocalSearcher(env, useLMDB, maxInMemoryResultEntries);
}
2021-06-06 02:23:51 +02:00
var indexWriterConfig = new IndexWriterConfig(luceneAnalyzer);
2020-12-07 22:15:18 +01:00
indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND);
indexWriterConfig.setIndexDeletionPolicy(snapshotter);
indexWriterConfig.setCommitOnClose(true);
2021-12-30 22:29:06 +01:00
var mergePolicy = new TieredMergePolicy();
indexWriterConfig.setMergePolicy(mergePolicy);
2021-12-31 00:58:47 +01:00
indexWriterConfig.setMergedSegmentWarmer(new SimpleMergedSegmentWarmer(null));
int writerSchedulerMaxThreadCount;
2021-07-06 14:33:47 +02:00
MergeScheduler mergeScheduler;
2020-12-07 22:15:18 +01:00
if (lowMemory) {
2021-07-06 14:33:47 +02:00
mergeScheduler = new SerialMergeScheduler();
writerSchedulerMaxThreadCount = 1;
2020-12-07 22:15:18 +01:00
} else {
2021-07-06 14:33:47 +02:00
var concurrentMergeScheduler = new ConcurrentMergeScheduler();
2021-12-12 16:19:50 +01:00
// false means SSD, true means HDD
2021-07-23 15:20:33 +02:00
concurrentMergeScheduler.setDefaultMaxMergesAndThreads(false);
if (luceneOptions.inMemory()) {
concurrentMergeScheduler.disableAutoIOThrottle();
} else {
concurrentMergeScheduler.enableAutoIOThrottle();
}
writerSchedulerMaxThreadCount = concurrentMergeScheduler.getMaxThreadCount();
2021-07-06 14:33:47 +02:00
mergeScheduler = concurrentMergeScheduler;
2020-12-07 22:15:18 +01:00
}
logger.trace("WriterSchedulerMaxThreadCount: {}", writerSchedulerMaxThreadCount);
2021-07-06 14:33:47 +02:00
indexWriterConfig.setMergeScheduler(mergeScheduler);
if (luceneOptions.indexWriterBufferSize() == -1) {
//todo: allow to configure maxbuffereddocs fallback
2021-12-16 02:38:56 +01:00
indexWriterConfig.setMaxBufferedDocs(80000);
// disable ram buffer size after enabling maxBufferedDocs
indexWriterConfig.setRAMBufferSizeMB(-1);
} else {
indexWriterConfig.setRAMBufferSizeMB(luceneOptions.indexWriterBufferSize() / 1024D / 1024D);
}
2021-12-17 23:51:10 +01:00
indexWriterConfig.setReaderPooling(false);
indexWriterConfig.setSimilarity(getLuceneSimilarity());
2020-12-07 22:15:18 +01:00
this.indexWriter = new IndexWriter(directory, indexWriterConfig);
2021-09-06 18:52:21 +02:00
this.snapshotsManager = new SnapshotsManager(indexWriter, snapshotter);
2021-09-06 18:24:36 +02:00
this.searcherManager = new CachedIndexSearcherManager(indexWriter,
2021-09-06 15:06:51 +02:00
snapshotsManager,
getLuceneSimilarity(),
2021-07-06 14:33:47 +02:00
luceneOptions.applyAllDeletes(),
luceneOptions.writeAllDeletes(),
2021-09-06 15:06:51 +02:00
luceneOptions.queryRefreshDebounceTime()
2021-07-06 14:33:47 +02:00
);
2021-12-30 17:28:06 +01:00
this.startedDocIndexings = meterRegistry.counter("index.write.doc.started.counter", "index.name", logName);
this.endeddDocIndexings = meterRegistry.counter("index.write.doc.ended.counter", "index.name", logName);
this.docIndexingTime = Timer.builder("index.write.doc.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry);
this.snapshotTime = Timer.builder("index.write.snapshot.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry);
this.flushTime = Timer.builder("index.write.flush.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry);
this.commitTime = Timer.builder("index.write.commit.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry);
this.mergeTime = Timer.builder("index.write.merge.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry);
this.refreshTime = Timer.builder("index.search.refresh.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry);
// Start scheduled tasks
2021-09-06 18:52:21 +02:00
var commitMillis = luceneOptions.commitDebounceTime().toMillis();
luceneHeavyTasksScheduler.schedulePeriodically(this::scheduledCommit, commitMillis, commitMillis,
TimeUnit.MILLISECONDS);
}
private Similarity getLuceneSimilarity() {
return luceneSimilarity;
}
2020-12-07 22:15:18 +01:00
@Override
public String getLuceneIndexName() {
return luceneIndexName;
}
@Override
public Mono<LLSnapshot> takeSnapshot() {
2021-12-30 17:28:06 +01:00
return snapshotsManager.takeSnapshot().elapsed().map(elapsed -> {
snapshotTime.record(elapsed.getT1(), TimeUnit.MILLISECONDS);
return elapsed.getT2();
}).transform(this::ensureOpen);
}
private <V> Mono<V> ensureOpen(Mono<V> mono) {
return Mono.<Void>fromCallable(() -> {
if (closeRequested.get()) {
throw new IllegalStateException("Lucene index is closed");
} else {
return null;
}
}).then(mono).doFirst(activeTasks::register).doFinally(s -> activeTasks.arriveAndDeregister());
}
private <V> Mono<V> runSafe(Callable<V> callable) {
return Mono
.fromCallable(callable)
.subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))
.publishOn(Schedulers.parallel());
}
2020-12-07 22:15:18 +01:00
@Override
public Mono<Void> releaseSnapshot(LLSnapshot snapshot) {
2021-12-30 17:28:06 +01:00
return snapshotsManager
.releaseSnapshot(snapshot)
.elapsed()
.doOnNext(elapsed -> snapshotTime.record(elapsed.getT1(), TimeUnit.MILLISECONDS))
.then();
2020-12-07 22:15:18 +01:00
}
@Override
2021-11-07 17:46:40 +01:00
public Mono<Void> addDocument(LLTerm key, LLUpdateDocument doc) {
2021-12-30 17:28:06 +01:00
return this.<Void>runSafe(() -> docIndexingTime.recordCallable(() -> {
startedDocIndexings.increment();
try {
indexWriter.addDocument(toDocument(doc));
} finally {
endeddDocIndexings.increment();
}
return null;
2021-12-30 17:28:06 +01:00
})).transform(this::ensureOpen);
2020-12-07 22:15:18 +01:00
}
@Override
2021-11-07 17:46:40 +01:00
public Mono<Void> addDocuments(Flux<Entry<LLTerm, LLUpdateDocument>> documents) {
2021-12-31 00:58:47 +01:00
return documents.collectList().flatMap(documentsList -> this.<Void>runSafe(() -> {
var count = documentsList.size();
StopWatch stopWatch = StopWatch.createStarted();
2021-12-30 17:28:06 +01:00
try {
2021-12-31 00:58:47 +01:00
startedDocIndexings.increment(count);
try {
indexWriter.addDocuments(LLUtils.toDocumentsFromEntries(documentsList));
} finally {
endeddDocIndexings.increment(count);
}
2021-12-30 17:28:06 +01:00
} finally {
2021-12-31 00:58:47 +01:00
docIndexingTime.record(stopWatch.getTime(TimeUnit.MILLISECONDS) / Math.max(count, 1), TimeUnit.MILLISECONDS);
2021-12-30 17:28:06 +01:00
}
return null;
2021-12-31 00:58:47 +01:00
})).transform(this::ensureOpen);
2020-12-07 22:15:18 +01:00
}
2020-12-07 22:15:18 +01:00
@Override
public Mono<Void> deleteDocument(LLTerm id) {
2021-12-30 17:28:06 +01:00
return this.<Void>runSafe(() -> docIndexingTime.recordCallable(() -> {
startedDocIndexings.increment();
try {
indexWriter.deleteDocuments(LLUtils.toTerm(id));
} finally {
endeddDocIndexings.increment();
}
return null;
2021-12-30 17:28:06 +01:00
})).transform(this::ensureOpen);
2020-12-07 22:15:18 +01:00
}
@Override
2021-11-07 17:46:40 +01:00
public Mono<Void> update(LLTerm id, LLIndexRequest request) {
return this
2021-12-30 17:28:06 +01:00
.<Void>runSafe(() -> docIndexingTime.recordCallable(() -> {
startedDocIndexings.increment();
try {
switch (request) {
case LLUpdateDocument updateDocument ->
indexWriter.updateDocument(LLUtils.toTerm(id), toDocument(updateDocument));
case LLSoftUpdateDocument softUpdateDocument ->
indexWriter.softUpdateDocument(LLUtils.toTerm(id), toDocument(softUpdateDocument.items()),
toFields(softUpdateDocument.softDeleteItems()));
case LLUpdateFields updateFields -> indexWriter.updateDocValues(LLUtils.toTerm(id),
toFields(updateFields.items()));
case null, default -> throw new UnsupportedOperationException("Unexpected request type: " + request);
}
} finally {
endeddDocIndexings.increment();
2021-11-07 17:46:40 +01:00
}
return null;
2021-12-30 17:28:06 +01:00
}))
.transform(this::ensureOpen);
2020-12-07 22:15:18 +01:00
}
@Override
2021-11-07 17:46:40 +01:00
public Mono<Void> updateDocuments(Mono<Map<LLTerm, LLUpdateDocument>> documents) {
return documents.flatMap(this::updateDocuments).then();
}
2021-11-07 17:46:40 +01:00
private Mono<Void> updateDocuments(Map<LLTerm, LLUpdateDocument> documentsMap) {
return this.<Void>runSafe(() -> {
2021-11-07 17:46:40 +01:00
for (Entry<LLTerm, LLUpdateDocument> entry : documentsMap.entrySet()) {
LLTerm key = entry.getKey();
2021-11-07 17:46:40 +01:00
LLUpdateDocument value = entry.getValue();
2021-12-30 17:28:06 +01:00
startedDocIndexings.increment();
try {
docIndexingTime.recordCallable(() -> {
indexWriter.updateDocument(LLUtils.toTerm(key), toDocument(value));
return null;
});
} finally {
endeddDocIndexings.increment();
}
}
return null;
}).transform(this::ensureOpen);
2020-12-07 22:15:18 +01:00
}
@Override
public Mono<Void> deleteAll() {
return this.<Void>runSafe(() -> {
shutdownLock.lock();
try {
indexWriter.deleteAll();
indexWriter.forceMergeDeletes(true);
indexWriter.commit();
} finally {
shutdownLock.unlock();
}
return null;
}).subscribeOn(luceneHeavyTasksScheduler).transform(this::ensureOpen);
2020-12-07 22:15:18 +01:00
}
@Override
2021-11-08 11:17:52 +01:00
public Mono<LLSearchResultShard> moreLikeThis(@Nullable LLSnapshot snapshot,
2021-03-02 01:53:36 +01:00
QueryParams queryParams,
String keyFieldName,
Flux<Tuple2<String, Set<String>>> mltDocumentFieldsFlux) {
2021-11-16 23:19:23 +01:00
LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams, luceneAnalyzer);
var searcher = this.searcherManager.retrieveSearcher(snapshot);
var transformer = new MoreLikeThisTransformer(mltDocumentFieldsFlux);
2021-07-06 01:30:37 +02:00
2021-11-08 11:17:52 +01:00
return localSearcher
.collect(searcher, localQueryParams, keyFieldName, transformer)
.map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close))
.doOnDiscard(Send.class, Send::close)
.doOnDiscard(Resource.class, Resource::close);
2021-07-06 01:30:37 +02:00
}
2020-12-07 22:15:18 +01:00
@Override
2021-11-08 11:17:52 +01:00
public Mono<LLSearchResultShard> search(@Nullable LLSnapshot snapshot, QueryParams queryParams,
2021-09-18 18:34:21 +02:00
String keyFieldName) {
2021-11-16 23:19:23 +01:00
LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams, luceneAnalyzer);
var searcher = searcherManager.retrieveSearcher(snapshot);
2021-11-08 11:17:52 +01:00
return localSearcher
.collect(searcher, localQueryParams, keyFieldName, NO_TRANSFORMATION)
.map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close))
2021-11-08 12:06:32 +01:00
.doOnDiscard(Send.class, Send::close)
.doOnDiscard(Resource.class, Resource::close);
2021-11-18 17:13:53 +01:00
}
@Override
2021-11-19 19:03:31 +01:00
public Mono<Buckets> computeBuckets(@Nullable LLSnapshot snapshot,
@NotNull List<Query> queries,
@Nullable Query normalizationQuery,
2021-11-18 17:13:53 +01:00
BucketParams bucketParams) {
2021-11-19 19:03:31 +01:00
List<org.apache.lucene.search.Query> localQueries = new ArrayList<>(queries.size());
for (Query query : queries) {
localQueries.add(QueryParser.toQuery(query, luceneAnalyzer));
}
var localNormalizationQuery = QueryParser.toQuery(normalizationQuery, luceneAnalyzer);
2021-11-18 17:13:53 +01:00
var searchers = searcherManager
.retrieveSearcher(snapshot)
.map(indexSearcher -> LLIndexSearchers.unsharded(indexSearcher).send());
return decimalBucketMultiSearcher
2021-11-19 19:03:31 +01:00
.collectMulti(searchers, bucketParams, localQueries, localNormalizationQuery)
2021-11-18 17:13:53 +01:00
.doOnDiscard(Send.class, Send::close)
.doOnDiscard(Resource.class, Resource::close);
}
public Mono<Send<LLIndexSearcher>> retrieveSearcher(@Nullable LLSnapshot snapshot) {
2021-09-18 18:34:21 +02:00
return searcherManager
.retrieveSearcher(snapshot)
2021-11-08 12:06:32 +01:00
.doOnDiscard(Send.class, Send::close)
.doOnDiscard(Resource.class, Resource::close);
2020-12-07 22:15:18 +01:00
}
@Override
public Mono<Void> close() {
return Mono
.<Void>fromCallable(() -> {
2021-09-07 02:36:11 +02:00
logger.info("Waiting IndexWriter tasks...");
2021-09-06 18:52:21 +02:00
activeTasks.arriveAndAwaitAdvance();
2021-09-07 02:36:11 +02:00
logger.info("IndexWriter tasks ended");
2021-09-06 15:06:51 +02:00
return null;
})
.subscribeOn(luceneHeavyTasksScheduler)
.then(searcherManager.close())
.then(Mono.<Void>fromCallable(() -> {
shutdownLock.lock();
try {
logger.info("Closing IndexWriter...");
indexWriter.close();
directory.close();
logger.info("IndexWriter closed");
} finally {
shutdownLock.unlock();
}
return null;
}).subscribeOn(luceneHeavyTasksScheduler))
// Avoid closing multiple times
.transformDeferred(mono -> {
if (this.closeRequested.compareAndSet(false, true)) {
logger.trace("Set closeRequested to true. Further update/write calls will result in an error");
return mono;
} else {
logger.debug("Tried to close more than once");
return Mono.empty();
}
});
2020-12-07 22:15:18 +01:00
}
2021-02-03 13:48:30 +01:00
@Override
public Mono<Void> flush() {
return Mono
.<Void>fromCallable(() -> {
if (activeTasks.isTerminated()) return null;
shutdownLock.lock();
try {
2021-12-30 17:28:06 +01:00
flushTime.recordCallable(() -> {
indexWriter.flush();
return null;
});
} finally {
shutdownLock.unlock();
}
2021-02-03 13:48:30 +01:00
return null;
})
.subscribeOn(luceneHeavyTasksScheduler)
.transform(this::ensureOpen);
2021-02-03 13:48:30 +01:00
}
@Override
2021-07-18 19:37:24 +02:00
public Mono<Void> refresh(boolean force) {
2021-02-03 13:48:30 +01:00
return Mono
.<Void>fromCallable(() -> {
2021-09-06 18:52:21 +02:00
activeTasks.register();
2021-02-03 13:48:30 +01:00
try {
2021-09-06 18:52:21 +02:00
if (activeTasks.isTerminated()) return null;
shutdownLock.lock();
try {
2021-12-30 17:28:06 +01:00
refreshTime.recordCallable(() -> {
if (force) {
searcherManager.maybeRefreshBlocking();
} else {
searcherManager.maybeRefresh();
}
return null;
});
} finally {
shutdownLock.unlock();
2021-07-18 19:37:24 +02:00
}
2021-02-03 13:48:30 +01:00
} finally {
2021-09-06 18:52:21 +02:00
activeTasks.arriveAndDeregister();
2021-02-03 13:48:30 +01:00
}
return null;
})
2021-03-03 10:57:45 +01:00
.subscribeOn(luceneHeavyTasksScheduler);
2021-02-03 13:48:30 +01:00
}
2020-12-07 22:15:18 +01:00
private void scheduledCommit() {
shutdownLock.lock();
2020-12-07 22:15:18 +01:00
try {
2021-12-30 17:28:06 +01:00
commitTime.recordCallable(() -> {
indexWriter.commit();
return null;
});
} catch (Exception ex) {
2021-09-10 12:13:52 +02:00
logger.error(MARKER_LUCENE, "Failed to execute a scheduled commit", ex);
} finally {
shutdownLock.unlock();
2020-12-07 22:15:18 +01:00
}
}
2021-12-30 22:29:06 +01:00
private void scheduledMerge() { // Do not use. Merges are done automatically by merge policies
shutdownLock.lock();
2021-12-15 16:47:59 +01:00
try {
2021-12-30 17:28:06 +01:00
mergeTime.recordCallable(() -> {
indexWriter.maybeMerge();
return null;
});
} catch (Exception ex) {
2021-12-15 16:47:59 +01:00
logger.error(MARKER_LUCENE, "Failed to execute a scheduled merge", ex);
} finally {
shutdownLock.unlock();
2021-12-15 16:47:59 +01:00
}
}
2020-12-07 22:15:18 +01:00
@Override
public boolean isLowMemoryMode() {
return lowMemory;
}
private class MoreLikeThisTransformer implements LLSearchTransformer {
private final Flux<Tuple2<String, Set<String>>> mltDocumentFieldsFlux;
public MoreLikeThisTransformer(Flux<Tuple2<String, Set<String>>> mltDocumentFieldsFlux) {
this.mltDocumentFieldsFlux = mltDocumentFieldsFlux;
}
@Override
public Mono<LocalQueryParams> transform(Mono<TransformerInput> inputMono) {
return inputMono.flatMap(input -> LuceneUtils.getMoreLikeThisQuery(input.indexSearchers(), input.queryParams(),
luceneAnalyzer, luceneSimilarity, mltDocumentFieldsFlux));
}
}
2020-12-07 22:15:18 +01:00
}