package it.cavallium.dbengine.database.disk; import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler; import static it.cavallium.dbengine.database.LLUtils.MARKER_LUCENE; import static it.cavallium.dbengine.database.LLUtils.toDocument; import static it.cavallium.dbengine.database.LLUtils.toFields; import static it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.NO_TRANSFORMATION; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Timer; import io.net5.buffer.api.Resource; import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.DirectIOOptions; import it.cavallium.dbengine.client.IndicizerAnalyzers; import it.cavallium.dbengine.client.IndicizerSimilarities; import it.cavallium.dbengine.client.LuceneOptions; import it.cavallium.dbengine.client.NRTCachingOptions; import it.cavallium.dbengine.client.query.QueryParser; import it.cavallium.dbengine.client.query.current.data.Query; import it.cavallium.dbengine.client.query.current.data.QueryParams; import it.cavallium.dbengine.database.LLIndexRequest; import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.database.LLSearchResultShard; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLSoftUpdateDocument; import it.cavallium.dbengine.database.LLTerm; import it.cavallium.dbengine.database.LLUpdateDocument; import it.cavallium.dbengine.database.LLUpdateFields; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.lucene.AlwaysDirectIOFSDirectory; import it.cavallium.dbengine.lucene.LuceneHacks; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.collector.Buckets; import it.cavallium.dbengine.lucene.searcher.AdaptiveLocalSearcher; import it.cavallium.dbengine.lucene.searcher.BucketParams; import it.cavallium.dbengine.lucene.searcher.DecimalBucketMultiSearcher; import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer; import it.cavallium.dbengine.lucene.searcher.LocalQueryParams; import it.cavallium.dbengine.lucene.searcher.LocalSearcher; import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang3.time.StopWatch; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper; import org.apache.lucene.index.ConcurrentMergeScheduler; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; import org.apache.lucene.index.MergeScheduler; import org.apache.lucene.index.SerialMergeScheduler; import org.apache.lucene.index.SimpleMergedSegmentWarmer; import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.index.TieredMergePolicy; import org.apache.lucene.misc.store.DirectIODirectory; import org.apache.lucene.search.similarities.Similarity; import org.apache.lucene.store.ByteBuffersDirectory; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.MMapDirectory; import org.apache.lucene.store.NIOFSDirectory; import org.apache.lucene.store.NRTCachingDirectory; import org.apache.lucene.util.Constants; import org.apache.lucene.util.InfoStream; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuple2; public class LLLocalLuceneIndex implements LLLuceneIndex { protected static final Logger logger = LogManager.getLogger(LLLocalLuceneIndex.class); private final LocalSearcher localSearcher; private final DecimalBucketMultiSearcher decimalBucketMultiSearcher = new DecimalBucketMultiSearcher(); /** * Global lucene index scheduler. * There is only a single thread globally to not overwhelm the disk with * concurrent commits or concurrent refreshes. */ private static final ReentrantLock shutdownLock = new ReentrantLock(); private static final Scheduler luceneHeavyTasksScheduler = uninterruptibleScheduler(Schedulers.single(Schedulers.boundedElastic())); private final Counter startedDocIndexings; private final Counter endeddDocIndexings; private final Timer docIndexingTime; private final Timer snapshotTime; private final Timer flushTime; private final Timer commitTime; private final Timer mergeTime; private final Timer refreshTime; private final String luceneIndexName; private final IndexWriter indexWriter; private final SnapshotsManager snapshotsManager; private final IndexSearcherManager searcherManager; private final PerFieldAnalyzerWrapper luceneAnalyzer; private final Similarity luceneSimilarity; private final Directory directory; private final boolean lowMemory; private final Phaser activeTasks = new Phaser(1); private final AtomicBoolean closeRequested = new AtomicBoolean(); public LLLocalLuceneIndex(LLTempLMDBEnv env, @Nullable Path luceneBasePath, MeterRegistry meterRegistry, @Nullable String clusterName, @Nullable String shardName, IndicizerAnalyzers indicizerAnalyzers, IndicizerSimilarities indicizerSimilarities, LuceneOptions luceneOptions, @Nullable LuceneHacks luceneHacks) throws IOException { if (clusterName == null && shardName == null) { throw new IllegalArgumentException("Clustern name and/or shard name must be set"); } String logName = Objects.requireNonNullElse(clusterName, shardName); String luceneIndexName = Objects.requireNonNullElse(shardName, clusterName); Path directoryPath; if (luceneOptions.inMemory() != (luceneBasePath == null)) { throw new IllegalArgumentException(); } else if (luceneBasePath != null) { directoryPath = luceneBasePath.resolve(shardName + ".lucene.db"); } else { directoryPath = null; } if (luceneIndexName.length() == 0) { throw new IOException("Empty lucene database name"); } if (!MMapDirectory.UNMAP_SUPPORTED) { logger.error("Unmap is unsupported, lucene will run slower: {}", MMapDirectory.UNMAP_NOT_SUPPORTED_REASON); } else { logger.debug("Lucene MMap is supported"); } boolean lowMemory = luceneOptions.lowMemory(); if (luceneOptions.inMemory()) { this.directory = new ByteBuffersDirectory(); } else { Directory directory; { Directory forcedDirectFsDirectory = null; if (luceneOptions.directIOOptions().isPresent()) { DirectIOOptions directIOOptions = luceneOptions.directIOOptions().get(); if (directIOOptions.alwaysForceDirectIO()) { try { forcedDirectFsDirectory = new AlwaysDirectIOFSDirectory(directoryPath); } catch (UnsupportedOperationException ex) { logger.warn("Failed to open FSDirectory with DIRECT flag", ex); } } } if (forcedDirectFsDirectory != null) { directory = forcedDirectFsDirectory; } else { FSDirectory fsDirectory; if (luceneOptions.allowMemoryMapping()) { fsDirectory = FSDirectory.open(directoryPath); } else { fsDirectory = new NIOFSDirectory(directoryPath); } if (Constants.LINUX || Constants.MAC_OS_X) { try { int mergeBufferSize; long minBytesDirect; if (luceneOptions.directIOOptions().isPresent()) { var directIOOptions = luceneOptions.directIOOptions().get(); mergeBufferSize = directIOOptions.mergeBufferSize(); minBytesDirect = directIOOptions.minBytesDirect(); } else { mergeBufferSize = DirectIODirectory.DEFAULT_MERGE_BUFFER_SIZE; minBytesDirect = DirectIODirectory.DEFAULT_MIN_BYTES_DIRECT; } directory = new DirectIODirectory(fsDirectory, mergeBufferSize, minBytesDirect); } catch (UnsupportedOperationException ex) { logger.warn("Failed to open FSDirectory with DIRECT flag", ex); directory = fsDirectory; } } else { directory = fsDirectory; } } } if (luceneOptions.nrtCachingOptions().isPresent()) { NRTCachingOptions nrtCachingOptions = luceneOptions.nrtCachingOptions().get(); directory = new NRTCachingDirectory(directory, nrtCachingOptions.maxMergeSizeMB(), nrtCachingOptions.maxCachedMB()); } this.directory = directory; } this.luceneIndexName = luceneIndexName; var snapshotter = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()); this.lowMemory = lowMemory; this.luceneAnalyzer = LuceneUtils.toPerFieldAnalyzerWrapper(indicizerAnalyzers); this.luceneSimilarity = LuceneUtils.toPerFieldSimilarityWrapper(indicizerSimilarities); var useLMDB = luceneOptions.allowNonVolatileCollection(); var maxInMemoryResultEntries = luceneOptions.maxInMemoryResultEntries(); if (luceneHacks != null && luceneHacks.customLocalSearcher() != null) { localSearcher = luceneHacks.customLocalSearcher().get(); } else { localSearcher = new AdaptiveLocalSearcher(env, useLMDB, maxInMemoryResultEntries); } var indexWriterConfig = new IndexWriterConfig(luceneAnalyzer); indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND); indexWriterConfig.setIndexDeletionPolicy(snapshotter); indexWriterConfig.setCommitOnClose(true); var mergePolicy = new TieredMergePolicy(); indexWriterConfig.setMergePolicy(mergePolicy); indexWriterConfig.setMergedSegmentWarmer(new SimpleMergedSegmentWarmer(InfoStream.getDefault())); int writerSchedulerMaxThreadCount; MergeScheduler mergeScheduler; if (lowMemory) { mergeScheduler = new SerialMergeScheduler(); writerSchedulerMaxThreadCount = 1; } else { var concurrentMergeScheduler = new ConcurrentMergeScheduler(); // false means SSD, true means HDD concurrentMergeScheduler.setDefaultMaxMergesAndThreads(false); if (luceneOptions.inMemory()) { concurrentMergeScheduler.disableAutoIOThrottle(); } else { concurrentMergeScheduler.enableAutoIOThrottle(); } writerSchedulerMaxThreadCount = concurrentMergeScheduler.getMaxThreadCount(); mergeScheduler = concurrentMergeScheduler; } logger.trace("WriterSchedulerMaxThreadCount: {}", writerSchedulerMaxThreadCount); indexWriterConfig.setMergeScheduler(mergeScheduler); if (luceneOptions.indexWriterBufferSize() == -1) { //todo: allow to configure maxbuffereddocs fallback indexWriterConfig.setMaxBufferedDocs(80000); // disable ram buffer size after enabling maxBufferedDocs indexWriterConfig.setRAMBufferSizeMB(-1); } else { indexWriterConfig.setRAMBufferSizeMB(luceneOptions.indexWriterBufferSize() / 1024D / 1024D); } indexWriterConfig.setReaderPooling(false); indexWriterConfig.setSimilarity(getLuceneSimilarity()); this.indexWriter = new IndexWriter(directory, indexWriterConfig); this.snapshotsManager = new SnapshotsManager(indexWriter, snapshotter); this.searcherManager = new CachedIndexSearcherManager(indexWriter, snapshotsManager, getLuceneSimilarity(), luceneOptions.applyAllDeletes(), luceneOptions.writeAllDeletes(), luceneOptions.queryRefreshDebounceTime() ); this.startedDocIndexings = meterRegistry.counter("index.write.doc.started.counter", "index.name", logName); this.endeddDocIndexings = meterRegistry.counter("index.write.doc.ended.counter", "index.name", logName); this.docIndexingTime = Timer.builder("index.write.doc.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry); this.snapshotTime = Timer.builder("index.write.snapshot.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry); this.flushTime = Timer.builder("index.write.flush.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry); this.commitTime = Timer.builder("index.write.commit.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry); this.mergeTime = Timer.builder("index.write.merge.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry); this.refreshTime = Timer.builder("index.search.refresh.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry); // Start scheduled tasks var commitMillis = luceneOptions.commitDebounceTime().toMillis(); luceneHeavyTasksScheduler.schedulePeriodically(this::scheduledCommit, commitMillis, commitMillis, TimeUnit.MILLISECONDS); } private Similarity getLuceneSimilarity() { return luceneSimilarity; } @Override public String getLuceneIndexName() { return luceneIndexName; } @Override public Mono takeSnapshot() { return snapshotsManager.takeSnapshot().elapsed().map(elapsed -> { snapshotTime.record(elapsed.getT1(), TimeUnit.MILLISECONDS); return elapsed.getT2(); }).transform(this::ensureOpen); } private Mono ensureOpen(Mono mono) { return Mono.fromCallable(() -> { if (closeRequested.get()) { throw new IllegalStateException("Lucene index is closed"); } else { return null; } }).then(mono).doFirst(activeTasks::register).doFinally(s -> activeTasks.arriveAndDeregister()); } private Mono runSafe(Callable callable) { return Mono .fromCallable(callable) .subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())) .publishOn(Schedulers.parallel()); } @Override public Mono releaseSnapshot(LLSnapshot snapshot) { return snapshotsManager .releaseSnapshot(snapshot) .elapsed() .doOnNext(elapsed -> snapshotTime.record(elapsed.getT1(), TimeUnit.MILLISECONDS)) .then(); } @Override public Mono addDocument(LLTerm key, LLUpdateDocument doc) { return this.runSafe(() -> docIndexingTime.recordCallable(() -> { startedDocIndexings.increment(); try { indexWriter.addDocument(toDocument(doc)); } finally { endeddDocIndexings.increment(); } return null; })).transform(this::ensureOpen); } @Override public Mono addDocuments(Flux> documents) { return documents.collectList().flatMap(documentsList -> this.runSafe(() -> { var count = documentsList.size(); StopWatch stopWatch = StopWatch.createStarted(); try { startedDocIndexings.increment(count); try { indexWriter.addDocuments(LLUtils.toDocumentsFromEntries(documentsList)); } finally { endeddDocIndexings.increment(count); } } finally { docIndexingTime.record(stopWatch.getTime(TimeUnit.MILLISECONDS) / Math.max(count, 1), TimeUnit.MILLISECONDS); } return null; })).transform(this::ensureOpen); } @Override public Mono deleteDocument(LLTerm id) { return this.runSafe(() -> docIndexingTime.recordCallable(() -> { startedDocIndexings.increment(); try { indexWriter.deleteDocuments(LLUtils.toTerm(id)); } finally { endeddDocIndexings.increment(); } return null; })).transform(this::ensureOpen); } @Override public Mono update(LLTerm id, LLIndexRequest request) { return this .runSafe(() -> docIndexingTime.recordCallable(() -> { startedDocIndexings.increment(); try { switch (request) { case LLUpdateDocument updateDocument -> indexWriter.updateDocument(LLUtils.toTerm(id), toDocument(updateDocument)); case LLSoftUpdateDocument softUpdateDocument -> indexWriter.softUpdateDocument(LLUtils.toTerm(id), toDocument(softUpdateDocument.items()), toFields(softUpdateDocument.softDeleteItems())); case LLUpdateFields updateFields -> indexWriter.updateDocValues(LLUtils.toTerm(id), toFields(updateFields.items())); case null, default -> throw new UnsupportedOperationException("Unexpected request type: " + request); } } finally { endeddDocIndexings.increment(); } return null; })) .transform(this::ensureOpen); } @Override public Mono updateDocuments(Mono> documents) { return documents.flatMap(this::updateDocuments).then(); } private Mono updateDocuments(Map documentsMap) { return this.runSafe(() -> { for (Entry entry : documentsMap.entrySet()) { LLTerm key = entry.getKey(); LLUpdateDocument value = entry.getValue(); startedDocIndexings.increment(); try { docIndexingTime.recordCallable(() -> { indexWriter.updateDocument(LLUtils.toTerm(key), toDocument(value)); return null; }); } finally { endeddDocIndexings.increment(); } } return null; }).transform(this::ensureOpen); } @Override public Mono deleteAll() { return this.runSafe(() -> { shutdownLock.lock(); try { indexWriter.deleteAll(); indexWriter.forceMergeDeletes(true); indexWriter.commit(); } finally { shutdownLock.unlock(); } return null; }).subscribeOn(luceneHeavyTasksScheduler).transform(this::ensureOpen); } @Override public Mono moreLikeThis(@Nullable LLSnapshot snapshot, QueryParams queryParams, String keyFieldName, Flux>> mltDocumentFieldsFlux) { LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams, luceneAnalyzer); var searcher = this.searcherManager.retrieveSearcher(snapshot); var transformer = new MoreLikeThisTransformer(mltDocumentFieldsFlux); return localSearcher .collect(searcher, localQueryParams, keyFieldName, transformer) .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close)) .doOnDiscard(Send.class, Send::close) .doOnDiscard(Resource.class, Resource::close); } @Override public Mono search(@Nullable LLSnapshot snapshot, QueryParams queryParams, String keyFieldName) { LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams, luceneAnalyzer); var searcher = searcherManager.retrieveSearcher(snapshot); return localSearcher .collect(searcher, localQueryParams, keyFieldName, NO_TRANSFORMATION) .map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result::close)) .doOnDiscard(Send.class, Send::close) .doOnDiscard(Resource.class, Resource::close); } @Override public Mono computeBuckets(@Nullable LLSnapshot snapshot, @NotNull List queries, @Nullable Query normalizationQuery, BucketParams bucketParams) { List localQueries = new ArrayList<>(queries.size()); for (Query query : queries) { localQueries.add(QueryParser.toQuery(query, luceneAnalyzer)); } var localNormalizationQuery = QueryParser.toQuery(normalizationQuery, luceneAnalyzer); var searchers = searcherManager .retrieveSearcher(snapshot) .map(indexSearcher -> LLIndexSearchers.unsharded(indexSearcher).send()); return decimalBucketMultiSearcher .collectMulti(searchers, bucketParams, localQueries, localNormalizationQuery) .doOnDiscard(Send.class, Send::close) .doOnDiscard(Resource.class, Resource::close); } public Mono> retrieveSearcher(@Nullable LLSnapshot snapshot) { return searcherManager .retrieveSearcher(snapshot) .doOnDiscard(Send.class, Send::close) .doOnDiscard(Resource.class, Resource::close); } @Override public Mono close() { return Mono .fromCallable(() -> { logger.info("Waiting IndexWriter tasks..."); activeTasks.arriveAndAwaitAdvance(); logger.info("IndexWriter tasks ended"); return null; }) .subscribeOn(luceneHeavyTasksScheduler) .then(searcherManager.close()) .then(Mono.fromCallable(() -> { shutdownLock.lock(); try { logger.info("Closing IndexWriter..."); indexWriter.close(); directory.close(); logger.info("IndexWriter closed"); } finally { shutdownLock.unlock(); } return null; }).subscribeOn(luceneHeavyTasksScheduler)) // Avoid closing multiple times .transformDeferred(mono -> { if (this.closeRequested.compareAndSet(false, true)) { logger.trace("Set closeRequested to true. Further update/write calls will result in an error"); return mono; } else { logger.debug("Tried to close more than once"); return Mono.empty(); } }); } @Override public Mono flush() { return Mono .fromCallable(() -> { if (activeTasks.isTerminated()) return null; shutdownLock.lock(); try { flushTime.recordCallable(() -> { indexWriter.flush(); return null; }); } finally { shutdownLock.unlock(); } return null; }) .subscribeOn(luceneHeavyTasksScheduler) .transform(this::ensureOpen); } @Override public Mono refresh(boolean force) { return Mono .fromCallable(() -> { activeTasks.register(); try { if (activeTasks.isTerminated()) return null; shutdownLock.lock(); try { refreshTime.recordCallable(() -> { if (force) { searcherManager.maybeRefreshBlocking(); } else { searcherManager.maybeRefresh(); } return null; }); } finally { shutdownLock.unlock(); } } finally { activeTasks.arriveAndDeregister(); } return null; }) .subscribeOn(luceneHeavyTasksScheduler); } private void scheduledCommit() { shutdownLock.lock(); try { commitTime.recordCallable(() -> { indexWriter.commit(); return null; }); } catch (Exception ex) { logger.error(MARKER_LUCENE, "Failed to execute a scheduled commit", ex); } finally { shutdownLock.unlock(); } } private void scheduledMerge() { // Do not use. Merges are done automatically by merge policies shutdownLock.lock(); try { mergeTime.recordCallable(() -> { indexWriter.maybeMerge(); return null; }); } catch (Exception ex) { logger.error(MARKER_LUCENE, "Failed to execute a scheduled merge", ex); } finally { shutdownLock.unlock(); } } @Override public boolean isLowMemoryMode() { return lowMemory; } private class MoreLikeThisTransformer implements LLSearchTransformer { private final Flux>> mltDocumentFieldsFlux; public MoreLikeThisTransformer(Flux>> mltDocumentFieldsFlux) { this.mltDocumentFieldsFlux = mltDocumentFieldsFlux; } @Override public Mono transform(Mono inputMono) { return inputMono.flatMap(input -> LuceneUtils.getMoreLikeThisQuery(input.indexSearchers(), input.queryParams(), luceneAnalyzer, luceneSimilarity, mltDocumentFieldsFlux)); } } }