Advanced lucene options

This commit is contained in:
Andrea Cavalli 2021-07-01 21:19:52 +02:00
parent b80feabb97
commit 8a1e4028f7
17 changed files with 499 additions and 138 deletions

View File

@ -1,4 +1,4 @@
package it.cavallium.dbengine.database.disk;
package it.cavallium.dbengine.client;
import io.soabase.recordbuilder.core.RecordBuilder;
import it.cavallium.dbengine.database.Column;

View File

@ -0,0 +1,6 @@
package it.cavallium.dbengine.client;
import io.soabase.recordbuilder.core.RecordBuilder;
@RecordBuilder
public record DirectIOOptions(boolean alwaysForceDirectIO, int mergeBufferSize, long minBytesDirect) {}

View File

@ -0,0 +1,17 @@
package it.cavallium.dbengine.client;
import io.soabase.recordbuilder.core.RecordBuilder;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import org.jetbrains.annotations.Nullable;
@RecordBuilder
public record LuceneOptions(Map<String, String> extraFlags,
Duration queryRefreshDebounceTime,
Duration commitDebounceTime,
boolean lowMemory,
boolean inMemory,
Optional<DirectIOOptions> directIOOptions,
boolean allowMemoryMapping,
Optional<NRTCachingOptions> nrtCachingOptions) {}

View File

@ -0,0 +1,6 @@
package it.cavallium.dbengine.client;
import io.soabase.recordbuilder.core.RecordBuilder;
@RecordBuilder
public record NRTCachingOptions(double maxMergeSizeMB, double maxCachedMB) {}

View File

@ -1,14 +1,11 @@
package it.cavallium.dbengine.database;
import io.netty.buffer.ByteBufAllocator;
import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.client.IndicizerAnalyzers;
import it.cavallium.dbengine.client.IndicizerSimilarities;
import it.cavallium.dbengine.database.disk.DatabaseOptions;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity;
import java.time.Duration;
import it.cavallium.dbengine.client.LuceneOptions;
import java.util.List;
import java.util.Map;
import reactor.core.publisher.Mono;
@SuppressWarnings("UnusedReturnValue")
@ -26,10 +23,7 @@ public interface LLDatabaseConnection {
int instancesCount,
IndicizerAnalyzers indicizerAnalyzers,
IndicizerSimilarities indicizerSimilarities,
Duration queryRefreshDebounceTime,
Duration commitDebounceTime,
boolean lowMemory,
boolean inMemory);
LuceneOptions luceneOptions);
Mono<Void> disconnect();
}

View File

@ -3,18 +3,17 @@ package it.cavallium.dbengine.database.disk;
import io.netty.buffer.ByteBufAllocator;
import it.cavallium.dbengine.client.IndicizerAnalyzers;
import it.cavallium.dbengine.client.IndicizerSimilarities;
import it.cavallium.dbengine.client.LuceneOptions;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.database.LLDatabaseConnection;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity;
import it.cavallium.dbengine.netty.JMXNettyMonitoringManager;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@ -70,10 +69,7 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection {
int instancesCount,
IndicizerAnalyzers indicizerAnalyzers,
IndicizerSimilarities indicizerSimilarities,
Duration queryRefreshDebounceTime,
Duration commitDebounceTime,
boolean lowMemory,
boolean inMemory) {
LuceneOptions luceneOptions) {
return Mono
.fromCallable(() -> {
if (instancesCount != 1) {
@ -82,20 +78,14 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection {
instancesCount,
indicizerAnalyzers,
indicizerSimilarities,
queryRefreshDebounceTime,
commitDebounceTime,
lowMemory,
inMemory
luceneOptions
);
} else {
return new LLLocalLuceneIndex(basePath.resolve("lucene"),
name,
indicizerAnalyzers,
indicizerSimilarities,
queryRefreshDebounceTime,
commitDebounceTime,
lowMemory,
inMemory,
luceneOptions,
null
);
}

View File

@ -5,6 +5,7 @@ import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCounted;
import it.cavallium.dbengine.client.BadBlock;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
@ -13,12 +14,10 @@ import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.UpdateReturnMode;
import it.unimi.dsi.fastutil.bytes.ByteList;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;

View File

@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.disk;
import io.netty.buffer.ByteBufAllocator;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.UpdateMode;
@ -183,7 +184,22 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
handle.close();
}
db.closeE();
try {
db.closeE();
} catch (RocksDBException ex) {
if ("Cannot close DB with unreleased snapshot.".equals(ex.getMessage())) {
snapshotsHandles.forEach((id, snapshot) -> {;
try {
db.releaseSnapshot(snapshot);
} catch (Exception ex2) {
// ignore exception
logger.debug("Failed to release snapshot " + id, ex2);
}
});
db.closeE();
}
throw ex;
}
}
private void flushDb(RocksDB db, List<ColumnFamilyHandle> handles) throws RocksDBException {

View File

@ -1,7 +1,10 @@
package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.client.DirectIOOptions;
import it.cavallium.dbengine.client.IndicizerAnalyzers;
import it.cavallium.dbengine.client.IndicizerSimilarities;
import it.cavallium.dbengine.client.LuceneOptions;
import it.cavallium.dbengine.client.NRTCachingOptions;
import it.cavallium.dbengine.client.query.QueryParser;
import it.cavallium.dbengine.client.query.current.data.QueryParams;
import it.cavallium.dbengine.database.EnglishItalianStopFilter;
@ -14,10 +17,9 @@ import it.cavallium.dbengine.database.LLSearchResultShard;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLTerm;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.lucene.AlwaysDirectIOFSDirectory;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.ScheduledTaskLifecycle;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity;
import it.cavallium.dbengine.lucene.searcher.AdaptiveStreamSearcher;
import it.cavallium.dbengine.lucene.searcher.AllowOnlyQueryParsingCollectorStreamSearcher;
import it.cavallium.dbengine.lucene.searcher.LuceneSearchInstance;
@ -26,11 +28,7 @@ import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher.HandleResult;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
@ -42,14 +40,11 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.ConcurrentMergeScheduler;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
import org.apache.lucene.index.MergeScheduler;
import org.apache.lucene.index.SerialMergeScheduler;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.misc.store.DirectIODirectory;
@ -67,8 +62,6 @@ import org.apache.lucene.search.similarities.TFIDFSimilarity;
import org.apache.lucene.store.ByteBuffersDirectory;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.FSLockFactory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.MMapDirectory;
import org.apache.lucene.store.NIOFSDirectory;
import org.apache.lucene.store.NRTCachingDirectory;
@ -79,7 +72,6 @@ import org.warp.commonutils.log.LoggerFactory;
import org.warp.commonutils.type.ShortNamedThreadFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink.OverflowStrategy;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
@ -88,8 +80,6 @@ import reactor.util.function.Tuples;
public class LLLocalLuceneIndex implements LLLuceneIndex {
private static final boolean ALLOW_MMAP = false;
protected static final Logger logger = LoggerFactory.getLogger(LLLocalLuceneIndex.class);
private static final LuceneStreamSearcher streamSearcher = new AdaptiveStreamSearcher();
private static final AllowOnlyQueryParsingCollectorStreamSearcher allowOnlyQueryParsingCollectorStreamSearcher
@ -133,9 +123,8 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
String name,
IndicizerAnalyzers indicizerAnalyzers,
IndicizerSimilarities indicizerSimilarities,
Duration queryRefreshDebounceTime,
Duration commitDebounceTime,
boolean lowMemory, boolean inMemory, @Nullable LLSearchCollectionStatisticsGetter distributedCollectionStatisticsGetter) throws IOException {
LuceneOptions luceneOptions,
@Nullable LLSearchCollectionStatisticsGetter distributedCollectionStatisticsGetter) throws IOException {
if (name.length() == 0) {
throw new IOException("Empty lucene database name");
}
@ -145,27 +134,58 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
} else {
logger.debug("Lucene MMap is supported");
}
if (inMemory) {
boolean lowMemory = luceneOptions.lowMemory();
if (luceneOptions.inMemory()) {
this.directory = new ByteBuffersDirectory();
} else {
Directory directory;
{
FSDirectory fsDirectory = ALLOW_MMAP ? FSDirectory.open(directoryPath) : new NIOFSDirectory(directoryPath);
if (Constants.LINUX || Constants.MAC_OS_X) {
if (!lowMemory) {
directory = new DirectIODirectory(fsDirectory, 5 * 1024 * 1024, 60 * 1024 * 1024);
} else {
directory = new DirectIODirectory(fsDirectory);
}
Directory forcedDirectFsDirectory = null;
if (luceneOptions.directIOOptions().isPresent()) {
DirectIOOptions directIOOptions = luceneOptions.directIOOptions().get();
if (directIOOptions.alwaysForceDirectIO()) {
try {
forcedDirectFsDirectory = new AlwaysDirectIOFSDirectory(directoryPath);
} catch (UnsupportedOperationException ex) {
logger.warn("Failed to open FSDirectory with DIRECT flag", ex);
}
}
}
if (forcedDirectFsDirectory != null) {
directory = forcedDirectFsDirectory;
} else {
directory = fsDirectory;
FSDirectory fsDirectory;
if (luceneOptions.allowMemoryMapping()) {
fsDirectory = FSDirectory.open(directoryPath);
} else {
fsDirectory = new NIOFSDirectory(directoryPath);
}
if (Constants.LINUX || Constants.MAC_OS_X) {
try {
int mergeBufferSize;
long minBytesDirect;
if (luceneOptions.directIOOptions().isPresent()) {
var directIOOptions = luceneOptions.directIOOptions().get();
mergeBufferSize = directIOOptions.mergeBufferSize();
minBytesDirect = directIOOptions.minBytesDirect();
} else {
mergeBufferSize = DirectIODirectory.DEFAULT_MERGE_BUFFER_SIZE;
minBytesDirect = DirectIODirectory.DEFAULT_MIN_BYTES_DIRECT;
}
directory = new DirectIODirectory(fsDirectory, mergeBufferSize, minBytesDirect);
} catch (UnsupportedOperationException ex) {
logger.warn("Failed to open FSDirectory with DIRECT flag", ex);
directory = fsDirectory;
}
} else {
directory = fsDirectory;
}
}
}
if (!lowMemory) {
directory = new NRTCachingDirectory(directory, 5.0, 60.0);
} else {
directory = new NRTCachingDirectory(directory, 1.0, 6.0);
if (luceneOptions.nrtCachingOptions().isPresent()) {
NRTCachingOptions nrtCachingOptions = luceneOptions.nrtCachingOptions().get();
directory = new NRTCachingDirectory(directory, nrtCachingOptions.maxMergeSizeMB(), nrtCachingOptions.maxCachedMB());
}
this.directory = directory;
@ -201,8 +221,8 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
this.scheduledTasksLifecycle = new ScheduledTaskLifecycle();
// Start scheduled tasks
registerScheduledFixedTask(this::scheduledCommit, commitDebounceTime);
registerScheduledFixedTask(this::scheduledQueryRefresh, queryRefreshDebounceTime);
registerScheduledFixedTask(this::scheduledCommit, luceneOptions.commitDebounceTime());
registerScheduledFixedTask(this::scheduledQueryRefresh, luceneOptions.queryRefreshDebounceTime());
}
private Similarity getSimilarity() {
@ -210,14 +230,43 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
}
private void registerScheduledFixedTask(Runnable task, Duration duration) {
scheduledTasksLifecycle.registerScheduledTask(luceneHeavyTasksScheduler.schedulePeriodically(() -> {
new PeriodicTask(task, duration).start();
}
private class PeriodicTask implements Runnable {
private final Runnable task;
private final Duration duration;
private volatile boolean cancelled = false;
public PeriodicTask(Runnable task, Duration duration) {
this.task = task;
this.duration = duration;
}
public void start() {
luceneHeavyTasksScheduler.schedule(this,
duration.toMillis(),
TimeUnit.MILLISECONDS
);
}
@Override
public void run() {
scheduledTasksLifecycle.startScheduledTask();
try {
if (scheduledTasksLifecycle.isCancelled() || cancelled) return;
task.run();
if (scheduledTasksLifecycle.isCancelled() || cancelled) return;
luceneHeavyTasksScheduler.schedule(this, duration.toMillis(), TimeUnit.MILLISECONDS);
} finally {
scheduledTasksLifecycle.endScheduledTask();
}
}, duration.toMillis(), duration.toMillis(), TimeUnit.MILLISECONDS));
}
public void cancel() {
cancelled = true;
}
}
@Override
@ -250,10 +299,15 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.defer(() -> {
if (ex instanceof IllegalStateException && "No index commit to snapshot".equals(ex.getMessage())) {
return Mono.fromCallable(() -> {
//noinspection BlockingMethodInNonBlockingContext
indexWriter.commit();
//noinspection BlockingMethodInNonBlockingContext
return snapshotter.snapshot();
scheduledTasksLifecycle.startScheduledTask();
try {
//noinspection BlockingMethodInNonBlockingContext
indexWriter.commit();
//noinspection BlockingMethodInNonBlockingContext
return snapshotter.snapshot();
} finally {
scheduledTasksLifecycle.endScheduledTask();
}
}).subscribeOn(luceneHeavyTasksScheduler);
} else {
return Mono.error(ex);
@ -265,26 +319,36 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
@Override
public Mono<Void> releaseSnapshot(LLSnapshot snapshot) {
return Mono.<Void>fromCallable(() -> {
var indexSnapshot = this.snapshots.remove(snapshot.getSequenceNumber());
if (indexSnapshot == null) {
throw new IOException("LLSnapshot " + snapshot.getSequenceNumber() + " not found!");
scheduledTasksLifecycle.startScheduledTask();
try {
var indexSnapshot = this.snapshots.remove(snapshot.getSequenceNumber());
if (indexSnapshot == null) {
throw new IOException("LLSnapshot " + snapshot.getSequenceNumber() + " not found!");
}
indexSnapshot.close();
var luceneIndexSnapshot = indexSnapshot.getSnapshot();
snapshotter.release(luceneIndexSnapshot);
// Delete unused files after releasing the snapshot
indexWriter.deleteUnusedFiles();
return null;
} finally {
scheduledTasksLifecycle.endScheduledTask();
}
indexSnapshot.close();
var luceneIndexSnapshot = indexSnapshot.getSnapshot();
snapshotter.release(luceneIndexSnapshot);
// Delete unused files after releasing the snapshot
indexWriter.deleteUnusedFiles();
return null;
}).subscribeOn(Schedulers.boundedElastic());
}
@Override
public Mono<Void> addDocument(LLTerm key, LLDocument doc) {
return Mono.<Void>fromCallable(() -> {
indexWriter.addDocument(LLUtils.toDocument(doc));
return null;
scheduledTasksLifecycle.startScheduledTask();
try {
indexWriter.addDocument(LLUtils.toDocument(doc));
return null;
} finally {
scheduledTasksLifecycle.endScheduledTask();
}
}).subscribeOn(Schedulers.boundedElastic());
}
@ -294,8 +358,13 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.collectList()
.flatMap(documentsList -> Mono
.<Void>fromCallable(() -> {
indexWriter.addDocuments(LLUtils.toDocumentsFromEntries(documentsList));
return null;
scheduledTasksLifecycle.startScheduledTask();
try {
indexWriter.addDocuments(LLUtils.toDocumentsFromEntries(documentsList));
return null;
} finally {
scheduledTasksLifecycle.endScheduledTask();
}
})
.subscribeOn(Schedulers.boundedElastic())
);
@ -305,15 +374,25 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
@Override
public Mono<Void> deleteDocument(LLTerm id) {
return Mono.<Void>fromCallable(() -> {
indexWriter.deleteDocuments(LLUtils.toTerm(id));
return null;
scheduledTasksLifecycle.startScheduledTask();
try {
indexWriter.deleteDocuments(LLUtils.toTerm(id));
return null;
} finally {
scheduledTasksLifecycle.endScheduledTask();
}
}).subscribeOn(Schedulers.boundedElastic());
}
@Override
public Mono<Void> updateDocument(LLTerm id, LLDocument document) {
return Mono.<Void>fromCallable(() -> {
indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(document));
scheduledTasksLifecycle.startScheduledTask();
try {
indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(document));
} finally {
scheduledTasksLifecycle.endScheduledTask();
}
return null;
}).subscribeOn(Schedulers.boundedElastic());
}
@ -326,12 +405,17 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private Mono<Void> updateDocuments(Map<LLTerm, LLDocument> documentsMap) {
return Mono
.<Void>fromCallable(() -> {
for (Entry<LLTerm, LLDocument> entry : documentsMap.entrySet()) {
LLTerm key = entry.getKey();
LLDocument value = entry.getValue();
indexWriter.updateDocument(LLUtils.toTerm(key), LLUtils.toDocument(value));
scheduledTasksLifecycle.startScheduledTask();
try {
for (Entry<LLTerm, LLDocument> entry : documentsMap.entrySet()) {
LLTerm key = entry.getKey();
LLDocument value = entry.getValue();
indexWriter.updateDocument(LLUtils.toTerm(key), LLUtils.toDocument(value));
}
return null;
} finally {
scheduledTasksLifecycle.endScheduledTask();
}
return null;
})
.subscribeOn(Schedulers.boundedElastic());
}
@ -339,13 +423,18 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
@Override
public Mono<Void> deleteAll() {
return Mono.<Void>fromCallable(() -> {
//noinspection BlockingMethodInNonBlockingContext
indexWriter.deleteAll();
//noinspection BlockingMethodInNonBlockingContext
indexWriter.forceMergeDeletes(true);
//noinspection BlockingMethodInNonBlockingContext
indexWriter.commit();
return null;
scheduledTasksLifecycle.startScheduledTask();
try {
//noinspection BlockingMethodInNonBlockingContext
indexWriter.deleteAll();
//noinspection BlockingMethodInNonBlockingContext
indexWriter.forceMergeDeletes(true);
//noinspection BlockingMethodInNonBlockingContext
indexWriter.commit();
return null;
} finally {
scheduledTasksLifecycle.endScheduledTask();
}
}).subscribeOn(luceneHeavyTasksScheduler);
}
@ -431,7 +520,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
int scoreDivisor) {
Query luceneAdditionalQuery;
try {
luceneAdditionalQuery = QueryParser.toQuery(queryParams.getQuery());
luceneAdditionalQuery = QueryParser.toQuery(queryParams.query());
} catch (Exception e) {
return Mono.error(e);
}
@ -479,14 +568,14 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.subscribeOn(Schedulers.boundedElastic())
.map(luceneQuery -> luceneSearch(doDistributedPre,
indexSearcher,
queryParams.getOffset(),
queryParams.getLimit(),
queryParams.getMinCompetitiveScore().getNullable(),
queryParams.offset(),
queryParams.limit(),
queryParams.minCompetitiveScore().getNullable(),
keyFieldName,
scoreDivisor,
luceneQuery,
QueryParser.toSort(queryParams.getSort()),
QueryParser.toScoreMode(queryParams.getScoreMode()),
QueryParser.toSort(queryParams.sort()),
QueryParser.toScoreMode(queryParams.scoreMode()),
releaseSearcherWrapper(snapshot, indexSearcher)
))
.onErrorResume(ex -> releaseSearcherWrapper(snapshot, indexSearcher).then(Mono.error(ex)))
@ -520,6 +609,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.then();
}
@SuppressWarnings("RedundantTypeArguments")
private Mono<LLSearchResult> search(@Nullable LLSnapshot snapshot,
QueryParams queryParams, String keyFieldName,
boolean doDistributedPre, long actionId, int scoreDivisor) {
@ -527,10 +617,10 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.acquireSearcherWrapper(snapshot, doDistributedPre, actionId)
.flatMap(indexSearcher -> Mono
.fromCallable(() -> {
Objects.requireNonNull(queryParams.getScoreMode(), "ScoreMode must not be null");
Query luceneQuery = QueryParser.toQuery(queryParams.getQuery());
Sort luceneSort = QueryParser.toSort(queryParams.getSort());
org.apache.lucene.search.ScoreMode luceneScoreMode = QueryParser.toScoreMode(queryParams.getScoreMode());
Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null");
Query luceneQuery = QueryParser.toQuery(queryParams.query());
Sort luceneSort = QueryParser.toSort(queryParams.sort());
org.apache.lucene.search.ScoreMode luceneScoreMode = QueryParser.toScoreMode(queryParams.scoreMode());
return Tuples.of(luceneQuery, Optional.ofNullable(luceneSort), luceneScoreMode);
})
.subscribeOn(Schedulers.boundedElastic())
@ -542,9 +632,9 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
return luceneSearch(doDistributedPre,
indexSearcher,
queryParams.getOffset(),
queryParams.getLimit(),
queryParams.getMinCompetitiveScore().getNullable(),
queryParams.offset(),
queryParams.limit(),
queryParams.minCompetitiveScore().getNullable(),
keyFieldName,
scoreDivisor,
luceneQuery,
@ -678,6 +768,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.<Void>fromCallable(() -> {
scheduledTasksLifecycle.startScheduledTask();
try {
if (scheduledTasksLifecycle.isCancelled()) return null;
//noinspection BlockingMethodInNonBlockingContext
indexWriter.commit();
} finally {
@ -694,6 +785,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.<Void>fromCallable(() -> {
scheduledTasksLifecycle.startScheduledTask();
try {
if (scheduledTasksLifecycle.isCancelled()) return null;
//noinspection BlockingMethodInNonBlockingContext
searcherManager.maybeRefresh();
} finally {

View File

@ -5,6 +5,7 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader.InvalidCacheLoadException;
import it.cavallium.dbengine.client.IndicizerAnalyzers;
import it.cavallium.dbengine.client.IndicizerSimilarities;
import it.cavallium.dbengine.client.LuceneOptions;
import it.cavallium.dbengine.client.query.current.data.QueryParams;
import it.cavallium.dbengine.database.LLDocument;
import it.cavallium.dbengine.database.LLLuceneIndex;
@ -65,9 +66,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
int instancesCount,
IndicizerAnalyzers indicizerAnalyzers,
IndicizerSimilarities indicizerSimilarities,
Duration queryRefreshDebounceTime,
Duration commitDebounceTime,
boolean lowMemory, boolean inMemory) throws IOException {
LuceneOptions luceneOptions) throws IOException {
if (instancesCount <= 1 || instancesCount > 100) {
throw new IOException("Unsupported instances count: " + instancesCount);
@ -86,9 +85,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
instanceName,
indicizerAnalyzers,
indicizerSimilarities,
queryRefreshDebounceTime,
commitDebounceTime,
lowMemory, inMemory, (indexSearcher, field, distributedPre, actionId) -> distributedCustomCollectionStatistics(finalI,
luceneOptions, (indexSearcher, field, distributedPre, actionId) -> distributedCustomCollectionStatistics(finalI,
indexSearcher,
field,
distributedPre,

View File

@ -0,0 +1,24 @@
package it.cavallium.dbengine.lucene;
import java.io.IOException;
import java.nio.file.Path;
import java.util.OptionalLong;
import org.apache.lucene.misc.store.DirectIODirectory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.IOContext;
public class AlwaysDirectIOFSDirectory extends DirectIODirectory {
public AlwaysDirectIOFSDirectory(Path path, int mergeBufferSize, long minBytesDirect) throws IOException {
super(FSDirectory.open(path), mergeBufferSize, minBytesDirect);
}
public AlwaysDirectIOFSDirectory(Path path) throws IOException {
super(FSDirectory.open(path));
}
@Override
protected boolean useDirectIO(String name, IOContext context, OptionalLong fileLength) {
return true;
}
}

View File

@ -0,0 +1,142 @@
package it.cavallium.dbengine.lucene;
import static it.cavallium.dbengine.lucene.LuceneUtils.alignUnsigned;
import static it.cavallium.dbengine.lucene.LuceneUtils.readInternalAligned;
import com.sun.nio.file.ExtendedOpenOption;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import org.apache.lucene.store.BufferedIndexInput;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.FSLockFactory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.LockFactory;
import org.apache.lucene.util.IOUtils;
@SuppressWarnings({"RedundantArrayCreation", "unused", "unused", "RedundantCast"})
public class DirectNIOFSDirectory extends FSDirectory {
private final OpenOption[] openOptions = {StandardOpenOption.READ, ExtendedOpenOption.DIRECT};
public DirectNIOFSDirectory(Path path, LockFactory lockFactory) throws IOException {
super(path, lockFactory);
}
public DirectNIOFSDirectory(Path path) throws IOException {
this(path, FSLockFactory.getDefault());
}
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
this.ensureOpen();
this.ensureCanRead(name);
Path path = this.getDirectory().resolve(name);
FileChannel fc = FileChannel.open(path, openOptions);
boolean success = false;
DirectNIOFSDirectory.NIOFSIndexInput var7;
try {
DirectNIOFSDirectory.NIOFSIndexInput indexInput = new DirectNIOFSDirectory.NIOFSIndexInput("NIOFSIndexInput(path=\"" + path + "\")", fc, context);
success = true;
var7 = indexInput;
} finally {
if (!success) {
IOUtils.closeWhileHandlingException(new Closeable[]{fc});
}
}
return var7;
}
static final class NIOFSIndexInput extends BufferedIndexInput {
private static final int CHUNK_SIZE = 16384;
protected final FileChannel channel;
boolean isClone = false;
protected final long off;
protected final long end;
public NIOFSIndexInput(String resourceDesc, FileChannel fc, IOContext context) throws IOException {
super(resourceDesc, context);
this.channel = fc;
this.off = 0L;
this.end = fc.size();
}
public NIOFSIndexInput(String resourceDesc, FileChannel fc, long off, long length, int bufferSize) {
super(resourceDesc, bufferSize);
this.channel = fc;
this.off = off;
this.end = off + length;
this.isClone = true;
}
public void close() throws IOException {
if (!this.isClone) {
this.channel.close();
}
}
public DirectNIOFSDirectory.NIOFSIndexInput clone() {
DirectNIOFSDirectory.NIOFSIndexInput clone = (DirectNIOFSDirectory.NIOFSIndexInput)super.clone();
clone.isClone = true;
return clone;
}
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
if (offset >= 0L && length >= 0L && offset + length <= this.length()) {
return new DirectNIOFSDirectory.NIOFSIndexInput(this.getFullSliceDescription(sliceDescription), this.channel, this.off + offset, length, this.getBufferSize());
} else {
throw new IllegalArgumentException("slice() " + sliceDescription + " out of bounds: offset=" + offset + ",length=" + length + ",fileLength=" + this.length() + ": " + this);
}
}
public final long length() {
return this.end - this.off;
}
protected void readInternal(ByteBuffer b) throws IOException {
long pos = this.getFilePointer() + this.off;
if (pos + (long)b.remaining() > this.end) {
throw new EOFException("read past EOF: " + this);
}
try {
if (pos % 4096 == 0 && b.remaining() % 4096 == 0) {
readInternalAligned(this, this.channel, pos, b, b.remaining(), b.remaining(), end);
} else {
long startOffsetAligned = alignUnsigned(pos, false);
int size = b.remaining();
long endOffsetAligned = alignUnsigned(pos + size, true);
long expectedTempBufferSize = endOffsetAligned - startOffsetAligned;
if (expectedTempBufferSize > Integer.MAX_VALUE || expectedTempBufferSize < 0) {
throw new IllegalStateException("Invalid temp buffer size: " + expectedTempBufferSize);
}
ByteBuffer alignedBuf = ByteBuffer.allocate((int) expectedTempBufferSize);
int sliceStartOffset = (int) (pos - startOffsetAligned);
int sliceEndOffset = sliceStartOffset + (int) size;
readInternalAligned(this, this.channel, startOffsetAligned, alignedBuf, (int) expectedTempBufferSize, sliceEndOffset, end);
var slice = alignedBuf.slice(sliceStartOffset, sliceEndOffset - sliceStartOffset);
b.put(slice.array(), slice.arrayOffset(), sliceEndOffset - sliceStartOffset);
b.limit(b.position());
}
} catch (IOException var7) {
throw new IOException(var7.getMessage() + ": " + this, var7);
}
}
protected void seekInternal(long pos) throws IOException {
if (pos > this.length()) {
throw new EOFException("read past EOF: pos=" + pos + " vs length=" + this.length() + ": " + this);
}
}
}
}

View File

@ -24,7 +24,10 @@ import it.cavallium.dbengine.lucene.analyzer.WordAnalyzer;
import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher.HandleResult;
import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher.ResultItemConsumer;
import it.cavallium.dbengine.lucene.similarity.NGramSimilarity;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
@ -291,4 +294,68 @@ public class LuceneUtils {
}
};
}
public static int alignUnsigned(int number, boolean expand) {
if (number % 4096 != 0) {
if (expand) {
return number + (4096 - (number % 4096));
} else {
return number - (number % 4096);
}
} else {
return number;
}
}
public static long alignUnsigned(long number, boolean expand) {
if (number % 4096L != 0) {
if (expand) {
return number + (4096L - (number % 4096L));
} else {
return number - (number % 4096L);
}
} else {
return number;
}
}
public static void readInternalAligned(Object ref, FileChannel channel, long pos, ByteBuffer b, int readLength, int usefulLength, long end) throws IOException {
int startBufPosition = b.position();
int readData = 0;
int i;
for(; readLength > 0; readLength -= i) {
int toRead = readLength;
b.limit(b.position() + toRead);
assert b.remaining() == toRead;
var beforeReadBufPosition = b.position();
channel.read(b, pos);
b.limit(Math.min(startBufPosition + usefulLength, b.position() + toRead));
var afterReadBufPosition = b.position();
i = (afterReadBufPosition - beforeReadBufPosition);
readData += i;
if (i < toRead && i > 0) {
if (readData < usefulLength) {
throw new EOFException("read past EOF: " + ref + " buffer: " + b + " chunkLen: " + toRead + " end: " + end);
}
if (readData == usefulLength) {
b.limit(b.position());
// File end reached
return;
}
}
if (i < 0) {
throw new EOFException("read past EOF: " + ref + " buffer: " + b + " chunkLen: " + toRead + " end: " + end);
}
assert i > 0 : "FileChannel.read with non zero-length bb.remaining() must always read at least one byte (FileChannel is in blocking mode, see spec of ReadableByteChannel)";
pos += (long)i;
}
assert readLength == 0;
}
}

View File

@ -1,5 +1,6 @@
package it.cavallium.dbengine.lucene;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.StampedLock;
import org.warp.commonutils.concurrency.atomicity.Atomic;
@ -9,24 +10,20 @@ import reactor.core.Disposable;
public class ScheduledTaskLifecycle {
private final StampedLock lock;
private final ConcurrentHashMap<Disposable, Object> tasks = new ConcurrentHashMap<>();
private volatile boolean cancelled = false;
public ScheduledTaskLifecycle() {
this.lock = new StampedLock();
}
/**
* Register a scheduled task
*/
public void registerScheduledTask(Disposable task) {
this.tasks.put(task, new Object());
}
/**
* Mark this task as running.
* After calling this method, please call {@method endScheduledTask} inside a finally block!
*/
public void startScheduledTask() {
if (cancelled) {
throw new IllegalStateException("Already closed");
}
this.lock.readLock();
}
@ -41,21 +38,13 @@ public class ScheduledTaskLifecycle {
* Cancel all scheduled tasks and wait all running methods to finish
*/
public void cancelAndWait() {
for (var task : tasks.keySet()) {
task.dispose();
}
for (var task : tasks.keySet()) {
while (!task.isDisposed()) {
try {
//noinspection BusyWait
Thread.sleep(500);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}
cancelled = true;
// Acquire a write lock to wait all tasks to end
lock.unlockWrite(lock.writeLock());
}
public boolean isCancelled() {
return cancelled;
}
}

View File

@ -14,7 +14,7 @@ import it.cavallium.dbengine.database.collections.DatabaseStageEntry;
import it.cavallium.dbengine.database.collections.DatabaseStageMap;
import it.cavallium.dbengine.database.collections.SubStageGetterHashMap;
import it.cavallium.dbengine.database.collections.SubStageGetterMap;
import it.cavallium.dbengine.database.disk.DatabaseOptions;
import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;

View File

@ -13,7 +13,7 @@ import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionary;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep;
import it.cavallium.dbengine.database.collections.SubStageGetterMap;
import it.cavallium.dbengine.database.disk.DatabaseOptions;
import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;

View File

@ -0,0 +1,22 @@
package it.cavallium.dbengine;
import it.cavallium.dbengine.lucene.DirectNIOFSDirectory;
import it.cavallium.dbengine.lucene.LuceneUtils;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class TestAlignedRead {
@Test
public void testAlignment() {
Assertions.assertEquals(0, LuceneUtils.alignUnsigned(0, true));
Assertions.assertEquals(0, LuceneUtils.alignUnsigned(0, false));
Assertions.assertEquals(4096, LuceneUtils.alignUnsigned(1, true));
Assertions.assertEquals(0, LuceneUtils.alignUnsigned(1, false));
Assertions.assertEquals(4096, LuceneUtils.alignUnsigned(4096, true));
Assertions.assertEquals(4096, LuceneUtils.alignUnsigned(4096, false));
}
}