diff --git a/src/main/java/it/cavallium/dbengine/database/disk/DatabaseOptions.java b/src/main/java/it/cavallium/dbengine/client/DatabaseOptions.java similarity index 91% rename from src/main/java/it/cavallium/dbengine/database/disk/DatabaseOptions.java rename to src/main/java/it/cavallium/dbengine/client/DatabaseOptions.java index 1503be9..814732c 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/DatabaseOptions.java +++ b/src/main/java/it/cavallium/dbengine/client/DatabaseOptions.java @@ -1,4 +1,4 @@ -package it.cavallium.dbengine.database.disk; +package it.cavallium.dbengine.client; import io.soabase.recordbuilder.core.RecordBuilder; import it.cavallium.dbengine.database.Column; diff --git a/src/main/java/it/cavallium/dbengine/client/DirectIOOptions.java b/src/main/java/it/cavallium/dbengine/client/DirectIOOptions.java new file mode 100644 index 0000000..b7222f1 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/client/DirectIOOptions.java @@ -0,0 +1,6 @@ +package it.cavallium.dbengine.client; + +import io.soabase.recordbuilder.core.RecordBuilder; + +@RecordBuilder +public record DirectIOOptions(boolean alwaysForceDirectIO, int mergeBufferSize, long minBytesDirect) {} diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneOptions.java b/src/main/java/it/cavallium/dbengine/client/LuceneOptions.java new file mode 100644 index 0000000..fc69f9c --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/client/LuceneOptions.java @@ -0,0 +1,17 @@ +package it.cavallium.dbengine.client; + +import io.soabase.recordbuilder.core.RecordBuilder; +import java.time.Duration; +import java.util.Map; +import java.util.Optional; +import org.jetbrains.annotations.Nullable; + +@RecordBuilder +public record LuceneOptions(Map extraFlags, + Duration queryRefreshDebounceTime, + Duration commitDebounceTime, + boolean lowMemory, + boolean inMemory, + Optional directIOOptions, + boolean allowMemoryMapping, + Optional nrtCachingOptions) {} diff --git a/src/main/java/it/cavallium/dbengine/client/NRTCachingOptions.java b/src/main/java/it/cavallium/dbengine/client/NRTCachingOptions.java new file mode 100644 index 0000000..9b3108a --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/client/NRTCachingOptions.java @@ -0,0 +1,6 @@ +package it.cavallium.dbengine.client; + +import io.soabase.recordbuilder.core.RecordBuilder; + +@RecordBuilder +public record NRTCachingOptions(double maxMergeSizeMB, double maxCachedMB) {} diff --git a/src/main/java/it/cavallium/dbengine/database/LLDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/LLDatabaseConnection.java index 82ae674..39709b0 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLDatabaseConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/LLDatabaseConnection.java @@ -1,14 +1,11 @@ package it.cavallium.dbengine.database; import io.netty.buffer.ByteBufAllocator; +import it.cavallium.dbengine.client.DatabaseOptions; import it.cavallium.dbengine.client.IndicizerAnalyzers; import it.cavallium.dbengine.client.IndicizerSimilarities; -import it.cavallium.dbengine.database.disk.DatabaseOptions; -import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer; -import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity; -import java.time.Duration; +import it.cavallium.dbengine.client.LuceneOptions; import java.util.List; -import java.util.Map; import reactor.core.publisher.Mono; @SuppressWarnings("UnusedReturnValue") @@ -26,10 +23,7 @@ public interface LLDatabaseConnection { int instancesCount, IndicizerAnalyzers indicizerAnalyzers, IndicizerSimilarities indicizerSimilarities, - Duration queryRefreshDebounceTime, - Duration commitDebounceTime, - boolean lowMemory, - boolean inMemory); + LuceneOptions luceneOptions); Mono disconnect(); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java index cd96849..ec86c5a 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java @@ -3,18 +3,17 @@ package it.cavallium.dbengine.database.disk; import io.netty.buffer.ByteBufAllocator; import it.cavallium.dbengine.client.IndicizerAnalyzers; import it.cavallium.dbengine.client.IndicizerSimilarities; +import it.cavallium.dbengine.client.LuceneOptions; import it.cavallium.dbengine.database.Column; +import it.cavallium.dbengine.client.DatabaseOptions; import it.cavallium.dbengine.database.LLDatabaseConnection; import it.cavallium.dbengine.database.LLLuceneIndex; -import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer; -import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity; import it.cavallium.dbengine.netty.JMXNettyMonitoringManager; import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; import java.util.LinkedList; import java.util.List; -import java.util.Map; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -70,10 +69,7 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection { int instancesCount, IndicizerAnalyzers indicizerAnalyzers, IndicizerSimilarities indicizerSimilarities, - Duration queryRefreshDebounceTime, - Duration commitDebounceTime, - boolean lowMemory, - boolean inMemory) { + LuceneOptions luceneOptions) { return Mono .fromCallable(() -> { if (instancesCount != 1) { @@ -82,20 +78,14 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection { instancesCount, indicizerAnalyzers, indicizerSimilarities, - queryRefreshDebounceTime, - commitDebounceTime, - lowMemory, - inMemory + luceneOptions ); } else { return new LLLocalLuceneIndex(basePath.resolve("lucene"), name, indicizerAnalyzers, indicizerSimilarities, - queryRefreshDebounceTime, - commitDebounceTime, - lowMemory, - inMemory, + luceneOptions, null ); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 1eb03f5..490cb8f 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -5,6 +5,7 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.util.ReferenceCounted; import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.database.Column; +import it.cavallium.dbengine.client.DatabaseOptions; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; @@ -13,12 +14,10 @@ import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateReturnMode; -import it.unimi.dsi.fastutil.bytes.ByteList; import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.objects.ObjectArrayList; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index d4f474b..4d6e262 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.disk; import io.netty.buffer.ByteBufAllocator; import it.cavallium.dbengine.database.Column; +import it.cavallium.dbengine.client.DatabaseOptions; import it.cavallium.dbengine.database.LLKeyValueDatabase; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.UpdateMode; @@ -183,7 +184,22 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { handle.close(); } - db.closeE(); + try { + db.closeE(); + } catch (RocksDBException ex) { + if ("Cannot close DB with unreleased snapshot.".equals(ex.getMessage())) { + snapshotsHandles.forEach((id, snapshot) -> {; + try { + db.releaseSnapshot(snapshot); + } catch (Exception ex2) { + // ignore exception + logger.debug("Failed to release snapshot " + id, ex2); + } + }); + db.closeE(); + } + throw ex; + } } private void flushDb(RocksDB db, List handles) throws RocksDBException { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index 379bea8..64505ae 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -1,7 +1,10 @@ package it.cavallium.dbengine.database.disk; +import it.cavallium.dbengine.client.DirectIOOptions; import it.cavallium.dbengine.client.IndicizerAnalyzers; import it.cavallium.dbengine.client.IndicizerSimilarities; +import it.cavallium.dbengine.client.LuceneOptions; +import it.cavallium.dbengine.client.NRTCachingOptions; import it.cavallium.dbengine.client.query.QueryParser; import it.cavallium.dbengine.client.query.current.data.QueryParams; import it.cavallium.dbengine.database.EnglishItalianStopFilter; @@ -14,10 +17,9 @@ import it.cavallium.dbengine.database.LLSearchResultShard; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLTerm; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.lucene.AlwaysDirectIOFSDirectory; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.ScheduledTaskLifecycle; -import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer; -import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity; import it.cavallium.dbengine.lucene.searcher.AdaptiveStreamSearcher; import it.cavallium.dbengine.lucene.searcher.AllowOnlyQueryParsingCollectorStreamSearcher; import it.cavallium.dbengine.lucene.searcher.LuceneSearchInstance; @@ -26,11 +28,7 @@ import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher.HandleResult; import java.io.IOException; import java.nio.file.Path; import java.time.Duration; -import java.time.temporal.ChronoUnit; -import java.time.temporal.TemporalUnit; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; @@ -42,14 +40,11 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper; -import org.apache.lucene.document.Document; import org.apache.lucene.index.ConcurrentMergeScheduler; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; -import org.apache.lucene.index.MergeScheduler; import org.apache.lucene.index.SerialMergeScheduler; import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.misc.store.DirectIODirectory; @@ -67,8 +62,6 @@ import org.apache.lucene.search.similarities.TFIDFSimilarity; import org.apache.lucene.store.ByteBuffersDirectory; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; -import org.apache.lucene.store.FSLockFactory; -import org.apache.lucene.store.IOContext; import org.apache.lucene.store.MMapDirectory; import org.apache.lucene.store.NIOFSDirectory; import org.apache.lucene.store.NRTCachingDirectory; @@ -79,7 +72,6 @@ import org.warp.commonutils.log.LoggerFactory; import org.warp.commonutils.type.ShortNamedThreadFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink.OverflowStrategy; -import reactor.core.publisher.GroupedFlux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; @@ -88,8 +80,6 @@ import reactor.util.function.Tuples; public class LLLocalLuceneIndex implements LLLuceneIndex { - private static final boolean ALLOW_MMAP = false; - protected static final Logger logger = LoggerFactory.getLogger(LLLocalLuceneIndex.class); private static final LuceneStreamSearcher streamSearcher = new AdaptiveStreamSearcher(); private static final AllowOnlyQueryParsingCollectorStreamSearcher allowOnlyQueryParsingCollectorStreamSearcher @@ -133,9 +123,8 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { String name, IndicizerAnalyzers indicizerAnalyzers, IndicizerSimilarities indicizerSimilarities, - Duration queryRefreshDebounceTime, - Duration commitDebounceTime, - boolean lowMemory, boolean inMemory, @Nullable LLSearchCollectionStatisticsGetter distributedCollectionStatisticsGetter) throws IOException { + LuceneOptions luceneOptions, + @Nullable LLSearchCollectionStatisticsGetter distributedCollectionStatisticsGetter) throws IOException { if (name.length() == 0) { throw new IOException("Empty lucene database name"); } @@ -145,27 +134,58 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } else { logger.debug("Lucene MMap is supported"); } - if (inMemory) { + boolean lowMemory = luceneOptions.lowMemory(); + if (luceneOptions.inMemory()) { this.directory = new ByteBuffersDirectory(); } else { Directory directory; { - FSDirectory fsDirectory = ALLOW_MMAP ? FSDirectory.open(directoryPath) : new NIOFSDirectory(directoryPath); - if (Constants.LINUX || Constants.MAC_OS_X) { - if (!lowMemory) { - directory = new DirectIODirectory(fsDirectory, 5 * 1024 * 1024, 60 * 1024 * 1024); - } else { - directory = new DirectIODirectory(fsDirectory); - } + Directory forcedDirectFsDirectory = null; + if (luceneOptions.directIOOptions().isPresent()) { + DirectIOOptions directIOOptions = luceneOptions.directIOOptions().get(); + if (directIOOptions.alwaysForceDirectIO()) { + try { + forcedDirectFsDirectory = new AlwaysDirectIOFSDirectory(directoryPath); + } catch (UnsupportedOperationException ex) { + logger.warn("Failed to open FSDirectory with DIRECT flag", ex); + } + } + } + if (forcedDirectFsDirectory != null) { + directory = forcedDirectFsDirectory; } else { - directory = fsDirectory; + FSDirectory fsDirectory; + if (luceneOptions.allowMemoryMapping()) { + fsDirectory = FSDirectory.open(directoryPath); + } else { + fsDirectory = new NIOFSDirectory(directoryPath); + } + if (Constants.LINUX || Constants.MAC_OS_X) { + try { + int mergeBufferSize; + long minBytesDirect; + if (luceneOptions.directIOOptions().isPresent()) { + var directIOOptions = luceneOptions.directIOOptions().get(); + mergeBufferSize = directIOOptions.mergeBufferSize(); + minBytesDirect = directIOOptions.minBytesDirect(); + } else { + mergeBufferSize = DirectIODirectory.DEFAULT_MERGE_BUFFER_SIZE; + minBytesDirect = DirectIODirectory.DEFAULT_MIN_BYTES_DIRECT; + } + directory = new DirectIODirectory(fsDirectory, mergeBufferSize, minBytesDirect); + } catch (UnsupportedOperationException ex) { + logger.warn("Failed to open FSDirectory with DIRECT flag", ex); + directory = fsDirectory; + } + } else { + directory = fsDirectory; + } } } - if (!lowMemory) { - directory = new NRTCachingDirectory(directory, 5.0, 60.0); - } else { - directory = new NRTCachingDirectory(directory, 1.0, 6.0); + if (luceneOptions.nrtCachingOptions().isPresent()) { + NRTCachingOptions nrtCachingOptions = luceneOptions.nrtCachingOptions().get(); + directory = new NRTCachingDirectory(directory, nrtCachingOptions.maxMergeSizeMB(), nrtCachingOptions.maxCachedMB()); } this.directory = directory; @@ -201,8 +221,8 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { this.scheduledTasksLifecycle = new ScheduledTaskLifecycle(); // Start scheduled tasks - registerScheduledFixedTask(this::scheduledCommit, commitDebounceTime); - registerScheduledFixedTask(this::scheduledQueryRefresh, queryRefreshDebounceTime); + registerScheduledFixedTask(this::scheduledCommit, luceneOptions.commitDebounceTime()); + registerScheduledFixedTask(this::scheduledQueryRefresh, luceneOptions.queryRefreshDebounceTime()); } private Similarity getSimilarity() { @@ -210,14 +230,43 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } private void registerScheduledFixedTask(Runnable task, Duration duration) { - scheduledTasksLifecycle.registerScheduledTask(luceneHeavyTasksScheduler.schedulePeriodically(() -> { + new PeriodicTask(task, duration).start(); + } + + private class PeriodicTask implements Runnable { + + private final Runnable task; + private final Duration duration; + private volatile boolean cancelled = false; + + public PeriodicTask(Runnable task, Duration duration) { + this.task = task; + this.duration = duration; + } + + public void start() { + luceneHeavyTasksScheduler.schedule(this, + duration.toMillis(), + TimeUnit.MILLISECONDS + ); + } + + @Override + public void run() { scheduledTasksLifecycle.startScheduledTask(); try { + if (scheduledTasksLifecycle.isCancelled() || cancelled) return; task.run(); + if (scheduledTasksLifecycle.isCancelled() || cancelled) return; + luceneHeavyTasksScheduler.schedule(this, duration.toMillis(), TimeUnit.MILLISECONDS); } finally { scheduledTasksLifecycle.endScheduledTask(); } - }, duration.toMillis(), duration.toMillis(), TimeUnit.MILLISECONDS)); + } + + public void cancel() { + cancelled = true; + } } @Override @@ -250,10 +299,15 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { .defer(() -> { if (ex instanceof IllegalStateException && "No index commit to snapshot".equals(ex.getMessage())) { return Mono.fromCallable(() -> { - //noinspection BlockingMethodInNonBlockingContext - indexWriter.commit(); - //noinspection BlockingMethodInNonBlockingContext - return snapshotter.snapshot(); + scheduledTasksLifecycle.startScheduledTask(); + try { + //noinspection BlockingMethodInNonBlockingContext + indexWriter.commit(); + //noinspection BlockingMethodInNonBlockingContext + return snapshotter.snapshot(); + } finally { + scheduledTasksLifecycle.endScheduledTask(); + } }).subscribeOn(luceneHeavyTasksScheduler); } else { return Mono.error(ex); @@ -265,26 +319,36 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { @Override public Mono releaseSnapshot(LLSnapshot snapshot) { return Mono.fromCallable(() -> { - var indexSnapshot = this.snapshots.remove(snapshot.getSequenceNumber()); - if (indexSnapshot == null) { - throw new IOException("LLSnapshot " + snapshot.getSequenceNumber() + " not found!"); + scheduledTasksLifecycle.startScheduledTask(); + try { + var indexSnapshot = this.snapshots.remove(snapshot.getSequenceNumber()); + if (indexSnapshot == null) { + throw new IOException("LLSnapshot " + snapshot.getSequenceNumber() + " not found!"); + } + + indexSnapshot.close(); + + var luceneIndexSnapshot = indexSnapshot.getSnapshot(); + snapshotter.release(luceneIndexSnapshot); + // Delete unused files after releasing the snapshot + indexWriter.deleteUnusedFiles(); + return null; + } finally { + scheduledTasksLifecycle.endScheduledTask(); } - - indexSnapshot.close(); - - var luceneIndexSnapshot = indexSnapshot.getSnapshot(); - snapshotter.release(luceneIndexSnapshot); - // Delete unused files after releasing the snapshot - indexWriter.deleteUnusedFiles(); - return null; }).subscribeOn(Schedulers.boundedElastic()); } @Override public Mono addDocument(LLTerm key, LLDocument doc) { return Mono.fromCallable(() -> { - indexWriter.addDocument(LLUtils.toDocument(doc)); - return null; + scheduledTasksLifecycle.startScheduledTask(); + try { + indexWriter.addDocument(LLUtils.toDocument(doc)); + return null; + } finally { + scheduledTasksLifecycle.endScheduledTask(); + } }).subscribeOn(Schedulers.boundedElastic()); } @@ -294,8 +358,13 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { .collectList() .flatMap(documentsList -> Mono .fromCallable(() -> { - indexWriter.addDocuments(LLUtils.toDocumentsFromEntries(documentsList)); - return null; + scheduledTasksLifecycle.startScheduledTask(); + try { + indexWriter.addDocuments(LLUtils.toDocumentsFromEntries(documentsList)); + return null; + } finally { + scheduledTasksLifecycle.endScheduledTask(); + } }) .subscribeOn(Schedulers.boundedElastic()) ); @@ -305,15 +374,25 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { @Override public Mono deleteDocument(LLTerm id) { return Mono.fromCallable(() -> { - indexWriter.deleteDocuments(LLUtils.toTerm(id)); - return null; + scheduledTasksLifecycle.startScheduledTask(); + try { + indexWriter.deleteDocuments(LLUtils.toTerm(id)); + return null; + } finally { + scheduledTasksLifecycle.endScheduledTask(); + } }).subscribeOn(Schedulers.boundedElastic()); } @Override public Mono updateDocument(LLTerm id, LLDocument document) { return Mono.fromCallable(() -> { - indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(document)); + scheduledTasksLifecycle.startScheduledTask(); + try { + indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(document)); + } finally { + scheduledTasksLifecycle.endScheduledTask(); + } return null; }).subscribeOn(Schedulers.boundedElastic()); } @@ -326,12 +405,17 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { private Mono updateDocuments(Map documentsMap) { return Mono .fromCallable(() -> { - for (Entry entry : documentsMap.entrySet()) { - LLTerm key = entry.getKey(); - LLDocument value = entry.getValue(); - indexWriter.updateDocument(LLUtils.toTerm(key), LLUtils.toDocument(value)); + scheduledTasksLifecycle.startScheduledTask(); + try { + for (Entry entry : documentsMap.entrySet()) { + LLTerm key = entry.getKey(); + LLDocument value = entry.getValue(); + indexWriter.updateDocument(LLUtils.toTerm(key), LLUtils.toDocument(value)); + } + return null; + } finally { + scheduledTasksLifecycle.endScheduledTask(); } - return null; }) .subscribeOn(Schedulers.boundedElastic()); } @@ -339,13 +423,18 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { @Override public Mono deleteAll() { return Mono.fromCallable(() -> { - //noinspection BlockingMethodInNonBlockingContext - indexWriter.deleteAll(); - //noinspection BlockingMethodInNonBlockingContext - indexWriter.forceMergeDeletes(true); - //noinspection BlockingMethodInNonBlockingContext - indexWriter.commit(); - return null; + scheduledTasksLifecycle.startScheduledTask(); + try { + //noinspection BlockingMethodInNonBlockingContext + indexWriter.deleteAll(); + //noinspection BlockingMethodInNonBlockingContext + indexWriter.forceMergeDeletes(true); + //noinspection BlockingMethodInNonBlockingContext + indexWriter.commit(); + return null; + } finally { + scheduledTasksLifecycle.endScheduledTask(); + } }).subscribeOn(luceneHeavyTasksScheduler); } @@ -431,7 +520,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { int scoreDivisor) { Query luceneAdditionalQuery; try { - luceneAdditionalQuery = QueryParser.toQuery(queryParams.getQuery()); + luceneAdditionalQuery = QueryParser.toQuery(queryParams.query()); } catch (Exception e) { return Mono.error(e); } @@ -479,14 +568,14 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { .subscribeOn(Schedulers.boundedElastic()) .map(luceneQuery -> luceneSearch(doDistributedPre, indexSearcher, - queryParams.getOffset(), - queryParams.getLimit(), - queryParams.getMinCompetitiveScore().getNullable(), + queryParams.offset(), + queryParams.limit(), + queryParams.minCompetitiveScore().getNullable(), keyFieldName, scoreDivisor, luceneQuery, - QueryParser.toSort(queryParams.getSort()), - QueryParser.toScoreMode(queryParams.getScoreMode()), + QueryParser.toSort(queryParams.sort()), + QueryParser.toScoreMode(queryParams.scoreMode()), releaseSearcherWrapper(snapshot, indexSearcher) )) .onErrorResume(ex -> releaseSearcherWrapper(snapshot, indexSearcher).then(Mono.error(ex))) @@ -520,6 +609,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { .then(); } + @SuppressWarnings("RedundantTypeArguments") private Mono search(@Nullable LLSnapshot snapshot, QueryParams queryParams, String keyFieldName, boolean doDistributedPre, long actionId, int scoreDivisor) { @@ -527,10 +617,10 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { .acquireSearcherWrapper(snapshot, doDistributedPre, actionId) .flatMap(indexSearcher -> Mono .fromCallable(() -> { - Objects.requireNonNull(queryParams.getScoreMode(), "ScoreMode must not be null"); - Query luceneQuery = QueryParser.toQuery(queryParams.getQuery()); - Sort luceneSort = QueryParser.toSort(queryParams.getSort()); - org.apache.lucene.search.ScoreMode luceneScoreMode = QueryParser.toScoreMode(queryParams.getScoreMode()); + Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null"); + Query luceneQuery = QueryParser.toQuery(queryParams.query()); + Sort luceneSort = QueryParser.toSort(queryParams.sort()); + org.apache.lucene.search.ScoreMode luceneScoreMode = QueryParser.toScoreMode(queryParams.scoreMode()); return Tuples.of(luceneQuery, Optional.ofNullable(luceneSort), luceneScoreMode); }) .subscribeOn(Schedulers.boundedElastic()) @@ -542,9 +632,9 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { return luceneSearch(doDistributedPre, indexSearcher, - queryParams.getOffset(), - queryParams.getLimit(), - queryParams.getMinCompetitiveScore().getNullable(), + queryParams.offset(), + queryParams.limit(), + queryParams.minCompetitiveScore().getNullable(), keyFieldName, scoreDivisor, luceneQuery, @@ -678,6 +768,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { .fromCallable(() -> { scheduledTasksLifecycle.startScheduledTask(); try { + if (scheduledTasksLifecycle.isCancelled()) return null; //noinspection BlockingMethodInNonBlockingContext indexWriter.commit(); } finally { @@ -694,6 +785,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { .fromCallable(() -> { scheduledTasksLifecycle.startScheduledTask(); try { + if (scheduledTasksLifecycle.isCancelled()) return null; //noinspection BlockingMethodInNonBlockingContext searcherManager.maybeRefresh(); } finally { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index 7659320..f82476d 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -5,6 +5,7 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader.InvalidCacheLoadException; import it.cavallium.dbengine.client.IndicizerAnalyzers; import it.cavallium.dbengine.client.IndicizerSimilarities; +import it.cavallium.dbengine.client.LuceneOptions; import it.cavallium.dbengine.client.query.current.data.QueryParams; import it.cavallium.dbengine.database.LLDocument; import it.cavallium.dbengine.database.LLLuceneIndex; @@ -65,9 +66,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { int instancesCount, IndicizerAnalyzers indicizerAnalyzers, IndicizerSimilarities indicizerSimilarities, - Duration queryRefreshDebounceTime, - Duration commitDebounceTime, - boolean lowMemory, boolean inMemory) throws IOException { + LuceneOptions luceneOptions) throws IOException { if (instancesCount <= 1 || instancesCount > 100) { throw new IOException("Unsupported instances count: " + instancesCount); @@ -86,9 +85,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { instanceName, indicizerAnalyzers, indicizerSimilarities, - queryRefreshDebounceTime, - commitDebounceTime, - lowMemory, inMemory, (indexSearcher, field, distributedPre, actionId) -> distributedCustomCollectionStatistics(finalI, + luceneOptions, (indexSearcher, field, distributedPre, actionId) -> distributedCustomCollectionStatistics(finalI, indexSearcher, field, distributedPre, diff --git a/src/main/java/it/cavallium/dbengine/lucene/AlwaysDirectIOFSDirectory.java b/src/main/java/it/cavallium/dbengine/lucene/AlwaysDirectIOFSDirectory.java new file mode 100644 index 0000000..e6bb6ee --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/AlwaysDirectIOFSDirectory.java @@ -0,0 +1,24 @@ +package it.cavallium.dbengine.lucene; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.OptionalLong; +import org.apache.lucene.misc.store.DirectIODirectory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.IOContext; + +public class AlwaysDirectIOFSDirectory extends DirectIODirectory { + + public AlwaysDirectIOFSDirectory(Path path, int mergeBufferSize, long minBytesDirect) throws IOException { + super(FSDirectory.open(path), mergeBufferSize, minBytesDirect); + } + + public AlwaysDirectIOFSDirectory(Path path) throws IOException { + super(FSDirectory.open(path)); + } + + @Override + protected boolean useDirectIO(String name, IOContext context, OptionalLong fileLength) { + return true; + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/DirectNIOFSDirectory.java b/src/main/java/it/cavallium/dbengine/lucene/DirectNIOFSDirectory.java new file mode 100644 index 0000000..89f213c --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/DirectNIOFSDirectory.java @@ -0,0 +1,142 @@ +package it.cavallium.dbengine.lucene; + +import static it.cavallium.dbengine.lucene.LuceneUtils.alignUnsigned; +import static it.cavallium.dbengine.lucene.LuceneUtils.readInternalAligned; + +import com.sun.nio.file.ExtendedOpenOption; +import java.io.Closeable; +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.OpenOption; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import org.apache.lucene.store.BufferedIndexInput; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.FSLockFactory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.LockFactory; +import org.apache.lucene.util.IOUtils; + +@SuppressWarnings({"RedundantArrayCreation", "unused", "unused", "RedundantCast"}) +public class DirectNIOFSDirectory extends FSDirectory { + + private final OpenOption[] openOptions = {StandardOpenOption.READ, ExtendedOpenOption.DIRECT}; + + public DirectNIOFSDirectory(Path path, LockFactory lockFactory) throws IOException { + super(path, lockFactory); + } + + public DirectNIOFSDirectory(Path path) throws IOException { + this(path, FSLockFactory.getDefault()); + } + + @Override + public IndexInput openInput(String name, IOContext context) throws IOException { + this.ensureOpen(); + this.ensureCanRead(name); + Path path = this.getDirectory().resolve(name); + FileChannel fc = FileChannel.open(path, openOptions); + boolean success = false; + + DirectNIOFSDirectory.NIOFSIndexInput var7; + try { + DirectNIOFSDirectory.NIOFSIndexInput indexInput = new DirectNIOFSDirectory.NIOFSIndexInput("NIOFSIndexInput(path=\"" + path + "\")", fc, context); + success = true; + var7 = indexInput; + } finally { + if (!success) { + IOUtils.closeWhileHandlingException(new Closeable[]{fc}); + } + + } + + return var7; + } + + static final class NIOFSIndexInput extends BufferedIndexInput { + private static final int CHUNK_SIZE = 16384; + protected final FileChannel channel; + boolean isClone = false; + protected final long off; + protected final long end; + + public NIOFSIndexInput(String resourceDesc, FileChannel fc, IOContext context) throws IOException { + super(resourceDesc, context); + this.channel = fc; + this.off = 0L; + this.end = fc.size(); + } + + public NIOFSIndexInput(String resourceDesc, FileChannel fc, long off, long length, int bufferSize) { + super(resourceDesc, bufferSize); + this.channel = fc; + this.off = off; + this.end = off + length; + this.isClone = true; + } + + public void close() throws IOException { + if (!this.isClone) { + this.channel.close(); + } + + } + + public DirectNIOFSDirectory.NIOFSIndexInput clone() { + DirectNIOFSDirectory.NIOFSIndexInput clone = (DirectNIOFSDirectory.NIOFSIndexInput)super.clone(); + clone.isClone = true; + return clone; + } + + public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { + if (offset >= 0L && length >= 0L && offset + length <= this.length()) { + return new DirectNIOFSDirectory.NIOFSIndexInput(this.getFullSliceDescription(sliceDescription), this.channel, this.off + offset, length, this.getBufferSize()); + } else { + throw new IllegalArgumentException("slice() " + sliceDescription + " out of bounds: offset=" + offset + ",length=" + length + ",fileLength=" + this.length() + ": " + this); + } + } + + public final long length() { + return this.end - this.off; + } + + protected void readInternal(ByteBuffer b) throws IOException { + long pos = this.getFilePointer() + this.off; + if (pos + (long)b.remaining() > this.end) { + throw new EOFException("read past EOF: " + this); + } + + try { + if (pos % 4096 == 0 && b.remaining() % 4096 == 0) { + readInternalAligned(this, this.channel, pos, b, b.remaining(), b.remaining(), end); + } else { + long startOffsetAligned = alignUnsigned(pos, false); + int size = b.remaining(); + long endOffsetAligned = alignUnsigned(pos + size, true); + long expectedTempBufferSize = endOffsetAligned - startOffsetAligned; + if (expectedTempBufferSize > Integer.MAX_VALUE || expectedTempBufferSize < 0) { + throw new IllegalStateException("Invalid temp buffer size: " + expectedTempBufferSize); + } + ByteBuffer alignedBuf = ByteBuffer.allocate((int) expectedTempBufferSize); + int sliceStartOffset = (int) (pos - startOffsetAligned); + int sliceEndOffset = sliceStartOffset + (int) size; + readInternalAligned(this, this.channel, startOffsetAligned, alignedBuf, (int) expectedTempBufferSize, sliceEndOffset, end); + var slice = alignedBuf.slice(sliceStartOffset, sliceEndOffset - sliceStartOffset); + b.put(slice.array(), slice.arrayOffset(), sliceEndOffset - sliceStartOffset); + b.limit(b.position()); + } + } catch (IOException var7) { + throw new IOException(var7.getMessage() + ": " + this, var7); + } + } + + protected void seekInternal(long pos) throws IOException { + if (pos > this.length()) { + throw new EOFException("read past EOF: pos=" + pos + " vs length=" + this.length() + ": " + this); + } + } + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java index 8dc5ecf..ea8d06a 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java @@ -24,7 +24,10 @@ import it.cavallium.dbengine.lucene.analyzer.WordAnalyzer; import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher.HandleResult; import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher.ResultItemConsumer; import it.cavallium.dbengine.lucene.similarity.NGramSimilarity; +import java.io.EOFException; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; @@ -291,4 +294,68 @@ public class LuceneUtils { } }; } + + public static int alignUnsigned(int number, boolean expand) { + if (number % 4096 != 0) { + if (expand) { + return number + (4096 - (number % 4096)); + } else { + return number - (number % 4096); + } + } else { + return number; + } + } + + public static long alignUnsigned(long number, boolean expand) { + if (number % 4096L != 0) { + if (expand) { + return number + (4096L - (number % 4096L)); + } else { + return number - (number % 4096L); + } + } else { + return number; + } + } + + public static void readInternalAligned(Object ref, FileChannel channel, long pos, ByteBuffer b, int readLength, int usefulLength, long end) throws IOException { + int startBufPosition = b.position(); + int readData = 0; + int i; + for(; readLength > 0; readLength -= i) { + int toRead = readLength; + b.limit(b.position() + toRead); + + assert b.remaining() == toRead; + + var beforeReadBufPosition = b.position(); + channel.read(b, pos); + b.limit(Math.min(startBufPosition + usefulLength, b.position() + toRead)); + var afterReadBufPosition = b.position(); + i = (afterReadBufPosition - beforeReadBufPosition); + readData += i; + + if (i < toRead && i > 0) { + if (readData < usefulLength) { + throw new EOFException("read past EOF: " + ref + " buffer: " + b + " chunkLen: " + toRead + " end: " + end); + } + if (readData == usefulLength) { + b.limit(b.position()); + // File end reached + return; + } + } + + if (i < 0) { + throw new EOFException("read past EOF: " + ref + " buffer: " + b + " chunkLen: " + toRead + " end: " + end); + } + + assert i > 0 : "FileChannel.read with non zero-length bb.remaining() must always read at least one byte (FileChannel is in blocking mode, see spec of ReadableByteChannel)"; + + pos += (long)i; + } + + assert readLength == 0; + } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/ScheduledTaskLifecycle.java b/src/main/java/it/cavallium/dbengine/lucene/ScheduledTaskLifecycle.java index 8433b5b..e2b88f9 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/ScheduledTaskLifecycle.java +++ b/src/main/java/it/cavallium/dbengine/lucene/ScheduledTaskLifecycle.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine.lucene; +import java.nio.channels.ClosedChannelException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.StampedLock; import org.warp.commonutils.concurrency.atomicity.Atomic; @@ -9,24 +10,20 @@ import reactor.core.Disposable; public class ScheduledTaskLifecycle { private final StampedLock lock; - private final ConcurrentHashMap tasks = new ConcurrentHashMap<>(); + private volatile boolean cancelled = false; public ScheduledTaskLifecycle() { this.lock = new StampedLock(); } - /** - * Register a scheduled task - */ - public void registerScheduledTask(Disposable task) { - this.tasks.put(task, new Object()); - } - /** * Mark this task as running. * After calling this method, please call {@method endScheduledTask} inside a finally block! */ public void startScheduledTask() { + if (cancelled) { + throw new IllegalStateException("Already closed"); + } this.lock.readLock(); } @@ -41,21 +38,13 @@ public class ScheduledTaskLifecycle { * Cancel all scheduled tasks and wait all running methods to finish */ public void cancelAndWait() { - for (var task : tasks.keySet()) { - task.dispose(); - } - for (var task : tasks.keySet()) { - while (!task.isDisposed()) { - try { - //noinspection BusyWait - Thread.sleep(500); - } catch (InterruptedException ex) { - throw new RuntimeException(ex); - } - } - } + cancelled = true; // Acquire a write lock to wait all tasks to end lock.unlockWrite(lock.writeLock()); } + + public boolean isCancelled() { + return cancelled; + } } diff --git a/src/test/java/it/cavallium/dbengine/DbTestUtils.java b/src/test/java/it/cavallium/dbengine/DbTestUtils.java index 003897d..ed6e205 100644 --- a/src/test/java/it/cavallium/dbengine/DbTestUtils.java +++ b/src/test/java/it/cavallium/dbengine/DbTestUtils.java @@ -14,7 +14,7 @@ import it.cavallium.dbengine.database.collections.DatabaseStageEntry; import it.cavallium.dbengine.database.collections.DatabaseStageMap; import it.cavallium.dbengine.database.collections.SubStageGetterHashMap; import it.cavallium.dbengine.database.collections.SubStageGetterMap; -import it.cavallium.dbengine.database.disk.DatabaseOptions; +import it.cavallium.dbengine.client.DatabaseOptions; import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection; import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; diff --git a/src/test/java/it/cavallium/dbengine/OldDatabaseTests.java b/src/test/java/it/cavallium/dbengine/OldDatabaseTests.java index 9b8dcc9..5c3c65e 100644 --- a/src/test/java/it/cavallium/dbengine/OldDatabaseTests.java +++ b/src/test/java/it/cavallium/dbengine/OldDatabaseTests.java @@ -13,7 +13,7 @@ import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.collections.DatabaseMapDictionary; import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep; import it.cavallium.dbengine.database.collections.SubStageGetterMap; -import it.cavallium.dbengine.database.disk.DatabaseOptions; +import it.cavallium.dbengine.client.DatabaseOptions; import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection; import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; diff --git a/src/test/java/it/cavallium/dbengine/TestAlignedRead.java b/src/test/java/it/cavallium/dbengine/TestAlignedRead.java new file mode 100644 index 0000000..ffd586f --- /dev/null +++ b/src/test/java/it/cavallium/dbengine/TestAlignedRead.java @@ -0,0 +1,22 @@ +package it.cavallium.dbengine; + +import it.cavallium.dbengine.lucene.DirectNIOFSDirectory; +import it.cavallium.dbengine.lucene.LuceneUtils; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestAlignedRead { + @Test + public void testAlignment() { + Assertions.assertEquals(0, LuceneUtils.alignUnsigned(0, true)); + Assertions.assertEquals(0, LuceneUtils.alignUnsigned(0, false)); + Assertions.assertEquals(4096, LuceneUtils.alignUnsigned(1, true)); + Assertions.assertEquals(0, LuceneUtils.alignUnsigned(1, false)); + Assertions.assertEquals(4096, LuceneUtils.alignUnsigned(4096, true)); + Assertions.assertEquals(4096, LuceneUtils.alignUnsigned(4096, false)); + } +}