diff --git a/pom.xml b/pom.xml
index 96812a9..c01dcc5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -521,7 +521,6 @@
io.soabase.recordbuilder.processor.RecordBuilderProcessor
- false
--enable-preview
17
17
diff --git a/src/main/java/it/cavallium/dbengine/database/LLDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/LLDatabaseConnection.java
index f33407e..6fb29c1 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLDatabaseConnection.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLDatabaseConnection.java
@@ -24,7 +24,8 @@ public interface LLDatabaseConnection {
List columns,
DatabaseOptions databaseOptions);
- Mono extends LLLuceneIndex> getLuceneIndex(String name,
+ Mono extends LLLuceneIndex> getLuceneIndex(@Nullable String clusterName,
+ @Nullable String shardName,
int instancesCount,
IndicizerAnalyzers indicizerAnalyzers,
IndicizerSimilarities indicizerSimilarities,
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java
index 647579e..a7d7541 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java
@@ -2,24 +2,22 @@ package it.cavallium.dbengine.database.disk;
import io.micrometer.core.instrument.MeterRegistry;
import io.net5.buffer.api.BufferAllocator;
+import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.client.IndicizerAnalyzers;
import it.cavallium.dbengine.client.IndicizerSimilarities;
import it.cavallium.dbengine.client.LuceneOptions;
import it.cavallium.dbengine.database.Column;
-import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.database.LLDatabaseConnection;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.lucene.LuceneHacks;
import it.cavallium.dbengine.netty.JMXNettyMonitoringManager;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import org.checkerframework.checker.units.qual.A;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@@ -88,7 +86,8 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection {
}
@Override
- public Mono getLuceneIndex(String name,
+ public Mono getLuceneIndex(@Nullable String clusterName,
+ @Nullable String shardName,
int instancesCount,
IndicizerAnalyzers indicizerAnalyzers,
IndicizerSimilarities indicizerSimilarities,
@@ -97,12 +96,18 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection {
return Mono
.fromCallable(() -> {
var env = this.env.get();
+ if (clusterName == null && shardName == null) {
+ throw new IllegalArgumentException("Shard name and/or cluster name must be set");
+ }
if (instancesCount != 1) {
+ if (shardName != null && !shardName.equals(clusterName)) {
+ throw new IllegalArgumentException("You shouldn't use a shard name for clustered instances");
+ }
Objects.requireNonNull(env, "Environment not set");
return new LLLocalMultiLuceneIndex(env,
luceneOptions.inMemory() ? null : basePath.resolve("lucene"),
meterRegistry,
- name,
+ clusterName,
instancesCount,
indicizerAnalyzers,
indicizerSimilarities,
@@ -112,7 +117,8 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection {
} else {
return new LLLocalLuceneIndex(env, luceneOptions.inMemory() ? null : basePath.resolve("lucene"),
meterRegistry,
- name,
+ clusterName,
+ shardName,
indicizerAnalyzers,
indicizerSimilarities,
luceneOptions,
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java
index 37e30e6..090d725 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java
@@ -8,6 +8,8 @@ import static it.cavallium.dbengine.database.LLUtils.toStringSafe;
import static java.util.Objects.requireNonNull;
import static java.util.Objects.requireNonNullElse;
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.Timer;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.Resource;
@@ -40,6 +42,7 @@ import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.IntStream;
import org.apache.commons.lang3.tuple.Pair;
@@ -131,6 +134,22 @@ public class LLLocalDictionary implements LLDictionary {
private final boolean nettyDirect;
private final BufferAllocator alloc;
+ private final Counter startedUpdates;
+ private final Counter endedUpdates;
+ private final Timer updateTime;
+ private final Counter startedGet;
+ private final Counter endedGet;
+ private final Timer getTime;
+ private final Counter startedContains;
+ private final Counter endedContains;
+ private final Timer containsTime;
+ private final Counter startedPut;
+ private final Counter endedPut;
+ private final Timer putTime;
+ private final Counter startedRemove;
+ private final Counter endedRemove;
+ private final Timer removeTime;
+
public LLLocalDictionary(
BufferAllocator allocator,
@NotNull RocksDBColumn db,
@@ -151,6 +170,48 @@ public class LLLocalDictionary implements LLDictionary {
this.databaseOptions = databaseOptions;
alloc = allocator;
this.nettyDirect = databaseOptions.allowNettyDirect() && alloc.getAllocationType() == OFF_HEAP;
+ var meterRegistry = db.getMeterRegistry();
+
+ this.startedGet = meterRegistry.counter("db.read.map.get.started.counter", "db.name", databaseName, "db.column", columnName);
+ this.endedGet = meterRegistry.counter("db.read.map.get.ended.counter", "db.name", databaseName, "db.column", columnName);
+ this.getTime = Timer
+ .builder("db.read.get.timer")
+ .publishPercentiles(0.2, 0.5, 0.95)
+ .publishPercentileHistogram()
+ .tags("db.name", databaseName, "db.column", columnName)
+ .register(meterRegistry);
+ this.startedContains = meterRegistry.counter("db.read.map.get.started.counter", "db.name", databaseName, "db.column", columnName);
+ this.endedContains = meterRegistry.counter("db.read.map.get.ended.counter", "db.name", databaseName, "db.column", columnName);
+ this.containsTime = Timer
+ .builder("db.read.get.timer")
+ .publishPercentiles(0.2, 0.5, 0.95)
+ .publishPercentileHistogram()
+ .tags("db.name", databaseName, "db.column", columnName)
+ .register(meterRegistry);
+ this.startedUpdates = meterRegistry.counter("db.write.map.update.started.counter", "db.name", databaseName, "db.column", columnName);
+ this.endedUpdates = meterRegistry.counter("db.write.map.update.ended.counter", "db.name", databaseName, "db.column", columnName);
+ this.updateTime = Timer
+ .builder("db.write.map.update.timer")
+ .publishPercentiles(0.2, 0.5, 0.95)
+ .publishPercentileHistogram()
+ .tags("db.name", databaseName, "db.column", columnName)
+ .register(meterRegistry);
+ this.startedPut = meterRegistry.counter("db.write.map.put.started.counter", "db.name", databaseName, "db.column", columnName);
+ this.endedPut = meterRegistry.counter("db.write.map.put.ended.counter", "db.name", databaseName, "db.column", columnName);
+ this.putTime = Timer
+ .builder("db.write.map.put.timer")
+ .publishPercentiles(0.2, 0.5, 0.95)
+ .publishPercentileHistogram()
+ .tags("db.name", databaseName, "db.column", columnName)
+ .register(meterRegistry);
+ this.startedRemove = meterRegistry.counter("db.write.map.remove.started.counter", "db.name", databaseName, "db.column", columnName);
+ this.endedRemove = meterRegistry.counter("db.write.map.remove.ended.counter", "db.name", databaseName, "db.column", columnName);
+ this.removeTime = Timer
+ .builder("db.write.map.remove.timer")
+ .publishPercentiles(0.2, 0.5, 0.95)
+ .publishPercentileHistogram()
+ .tags("db.name", databaseName, "db.column", columnName)
+ .register(meterRegistry);
}
@Override
@@ -205,7 +266,13 @@ public class LLLocalDictionary implements LLDictionary {
try (var key = keySend.receive()) {
try {
var readOptions = requireNonNullElse(resolveSnapshot(snapshot), EMPTY_READ_OPTIONS);
- var result = db.get(readOptions, key, existsAlmostCertainly);
+ Buffer result;
+ startedGet.increment();
+ try {
+ result = getTime.recordCallable(() -> db.get(readOptions, key, existsAlmostCertainly));
+ } finally {
+ endedGet.increment();
+ }
logger.trace(MARKER_ROCKSDB, "Read {}: {}", () -> toStringSafe(key), () -> toStringSafe(result));
if (result != null) {
sink.next(result.send());
@@ -236,52 +303,65 @@ public class LLLocalDictionary implements LLDictionary {
public boolean containsRange(@Nullable LLSnapshot snapshot, LLRange range) throws RocksDBException {
assert !Schedulers.isInNonBlockingThread() : "Called containsRange in a nonblocking thread";
- if (range.isSingle()) {
- var unmodifiableReadOpts = resolveSnapshot(snapshot);
- return db.exists(unmodifiableReadOpts, range.getSingleUnsafe());
- } else {
- // Temporary resources to release after finished
- AbstractSlice> slice1 = null;
- AbstractSlice> slice2 = null;
- try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
- readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
- readOpts.setFillCache(false);
- if (range.hasMin()) {
- var rangeMinInternalByteBuffer = asReadOnlyDirect(range.getMinUnsafe());
- if (nettyDirect && rangeMinInternalByteBuffer != null) {
- readOpts.setIterateLowerBound(slice1 = new DirectSlice(rangeMinInternalByteBuffer,
- range.getMinUnsafe().readableBytes()));
- } else {
- readOpts.setIterateLowerBound(slice1 = new Slice(LLUtils.toArray(range.getMinUnsafe())));
- }
- }
- if (range.hasMax()) {
- var rangeMaxInternalByteBuffer = asReadOnlyDirect(range.getMaxUnsafe());
- if (nettyDirect && rangeMaxInternalByteBuffer != null) {
- readOpts.setIterateUpperBound(slice2 = new DirectSlice(rangeMaxInternalByteBuffer,
- range.getMaxUnsafe().readableBytes()));
- } else {
- readOpts.setIterateUpperBound(slice2 = new Slice(LLUtils.toArray(range.getMaxUnsafe())));
- }
- }
- try (RocksIterator rocksIterator = db.newIterator(readOpts)) {
- if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
- var rangeMinInternalByteBuffer = asReadOnlyDirect(range.getMinUnsafe());
- if (nettyDirect && rangeMinInternalByteBuffer != null) {
- rocksIterator.seek(rangeMinInternalByteBuffer);
- } else {
- rocksIterator.seek(LLUtils.toArray(range.getMinUnsafe()));
+ startedContains.increment();
+ try {
+ var result = containsTime.recordCallable(() -> {
+ if (range.isSingle()) {
+ var unmodifiableReadOpts = resolveSnapshot(snapshot);
+ return db.exists(unmodifiableReadOpts, range.getSingleUnsafe());
+ } else {
+ // Temporary resources to release after finished
+ AbstractSlice> slice1 = null;
+ AbstractSlice> slice2 = null;
+ try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
+ readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
+ readOpts.setFillCache(false);
+ if (range.hasMin()) {
+ var rangeMinInternalByteBuffer = asReadOnlyDirect(range.getMinUnsafe());
+ if (nettyDirect && rangeMinInternalByteBuffer != null) {
+ readOpts.setIterateLowerBound(slice1 = new DirectSlice(rangeMinInternalByteBuffer,
+ range.getMinUnsafe().readableBytes()));
+ } else {
+ readOpts.setIterateLowerBound(slice1 = new Slice(LLUtils.toArray(range.getMinUnsafe())));
+ }
}
- } else {
- rocksIterator.seekToFirst();
+ if (range.hasMax()) {
+ var rangeMaxInternalByteBuffer = asReadOnlyDirect(range.getMaxUnsafe());
+ if (nettyDirect && rangeMaxInternalByteBuffer != null) {
+ readOpts.setIterateUpperBound(slice2 = new DirectSlice(rangeMaxInternalByteBuffer,
+ range.getMaxUnsafe().readableBytes()));
+ } else {
+ readOpts.setIterateUpperBound(slice2 = new Slice(LLUtils.toArray(range.getMaxUnsafe())));
+ }
+ }
+ try (RocksIterator rocksIterator = db.newIterator(readOpts)) {
+ if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
+ var rangeMinInternalByteBuffer = asReadOnlyDirect(range.getMinUnsafe());
+ if (nettyDirect && rangeMinInternalByteBuffer != null) {
+ rocksIterator.seek(rangeMinInternalByteBuffer);
+ } else {
+ rocksIterator.seek(LLUtils.toArray(range.getMinUnsafe()));
+ }
+ } else {
+ rocksIterator.seekToFirst();
+ }
+ rocksIterator.status();
+ return rocksIterator.isValid();
+ }
+ } finally {
+ if (slice1 != null) slice1.close();
+ if (slice2 != null) slice2.close();
}
- rocksIterator.status();
- return rocksIterator.isValid();
}
- } finally {
- if (slice1 != null) slice1.close();
- if (slice2 != null) slice2.close();
- }
+ });
+ assert result != null;
+ return result;
+ } catch (RocksDBException | RuntimeException e) {
+ throw e;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ } finally {
+ endedContains.increment();
}
}
@@ -298,8 +378,21 @@ public class LLLocalDictionary implements LLDictionary {
}
private boolean containsKey(@Nullable LLSnapshot snapshot, Buffer key) throws RocksDBException {
- var unmodifiableReadOpts = resolveSnapshot(snapshot);
- return db.exists(unmodifiableReadOpts, key);
+ startedContains.increment();
+ try {
+ var result = containsTime.recordCallable(() -> {
+ var unmodifiableReadOpts = resolveSnapshot(snapshot);
+ return db.exists(unmodifiableReadOpts, key);
+ });
+ assert result != null;
+ return result;
+ } catch (RocksDBException | RuntimeException e) {
+ throw e;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ } finally {
+ endedContains.increment();
+ }
}
@Override
@@ -340,7 +433,14 @@ public class LLLocalDictionary implements LLDictionary {
safeCloseable.close();
}
})
- .onErrorMap(cause -> new IOException("Failed to write", cause));
+ .onErrorMap(cause -> new IOException("Failed to write", cause))
+ .elapsed()
+ .map(tuple -> {
+ putTime.record(tuple.getT1(), TimeUnit.MILLISECONDS);
+ return tuple.getT2();
+ })
+ .doFirst(startedPut::increment)
+ .doFinally(s -> endedPut.increment());
}
@Override
@@ -366,8 +466,15 @@ public class LLLocalDictionary implements LLDictionary {
case GET_NEW_VALUE -> UpdateAtomicResultMode.CURRENT;
case GET_OLD_VALUE -> UpdateAtomicResultMode.PREVIOUS;
};
- UpdateAtomicResult result = db.updateAtomic(EMPTY_READ_OPTIONS, EMPTY_WRITE_OPTIONS, keySend, updater,
- existsAlmostCertainly, returnMode);
+ UpdateAtomicResult result;
+ startedUpdates.increment();
+ try {
+ result = updateTime.recordCallable(() -> db.updateAtomic(EMPTY_READ_OPTIONS,
+ EMPTY_WRITE_OPTIONS, keySend, updater, existsAlmostCertainly, returnMode));
+ } finally {
+ endedUpdates.increment();
+ }
+ assert result != null;
return switch (updateReturnMode) {
case NOTHING -> null;
case GET_NEW_VALUE -> ((UpdateAtomicResultCurrent) result).current();
@@ -395,8 +502,15 @@ public class LLLocalDictionary implements LLDictionary {
throw new UnsupportedOperationException("update() is disallowed because the database doesn't support"
+ "safe atomic operations");
}
- UpdateAtomicResult result = db.updateAtomic(EMPTY_READ_OPTIONS, EMPTY_WRITE_OPTIONS, keySend, updater,
- existsAlmostCertainly, UpdateAtomicResultMode.DELTA);
+ UpdateAtomicResult result;
+ startedUpdates.increment();
+ try {
+ result = updateTime.recordCallable(() -> db.updateAtomic(EMPTY_READ_OPTIONS,
+ EMPTY_WRITE_OPTIONS, keySend, updater, existsAlmostCertainly, UpdateAtomicResultMode.DELTA));
+ } finally {
+ endedUpdates.increment();
+ }
+ assert result != null;
return ((UpdateAtomicResultDelta) result).delta();
}).onErrorMap(cause -> new IOException("Failed to read or write", cause)),
keySend -> Mono.fromRunnable(keySend::close)).doOnDiscard(UpdateAtomicResult.class, uar -> {
@@ -431,7 +545,10 @@ public class LLLocalDictionary implements LLDictionary {
)
.singleOrEmpty(),
keySend -> Mono.fromRunnable(keySend::close)
- );
+ ).elapsed().map(tuple -> {
+ removeTime.record(tuple.getT1(), TimeUnit.MILLISECONDS);
+ return tuple.getT2();
+ }).doFirst(startedRemove::increment).doFinally(s -> endedRemove.increment());
}
private Mono> getPreviousData(Mono> keyMono, LLDictionaryResultType resultType,
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java
index 1760e41..baf1b95 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java
@@ -6,10 +6,11 @@ import static it.cavallium.dbengine.database.LLUtils.toDocument;
import static it.cavallium.dbengine.database.LLUtils.toFields;
import static it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.NO_TRANSFORMATION;
+import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Timer;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send;
-import io.net5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.client.DirectIOOptions;
import it.cavallium.dbengine.client.IndicizerAnalyzers;
import it.cavallium.dbengine.client.IndicizerSimilarities;
@@ -19,36 +20,33 @@ import it.cavallium.dbengine.client.query.QueryParser;
import it.cavallium.dbengine.client.query.current.data.Query;
import it.cavallium.dbengine.client.query.current.data.QueryParams;
import it.cavallium.dbengine.database.LLIndexRequest;
-import it.cavallium.dbengine.database.LLSoftUpdateDocument;
-import it.cavallium.dbengine.database.LLUpdateDocument;
-import it.cavallium.dbengine.database.LLItem;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLSearchResultShard;
import it.cavallium.dbengine.database.LLSnapshot;
+import it.cavallium.dbengine.database.LLSoftUpdateDocument;
import it.cavallium.dbengine.database.LLTerm;
+import it.cavallium.dbengine.database.LLUpdateDocument;
import it.cavallium.dbengine.database.LLUpdateFields;
import it.cavallium.dbengine.database.LLUtils;
-import it.cavallium.dbengine.lucene.LuceneHacks;
import it.cavallium.dbengine.lucene.AlwaysDirectIOFSDirectory;
+import it.cavallium.dbengine.lucene.LuceneHacks;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.collector.Buckets;
import it.cavallium.dbengine.lucene.searcher.AdaptiveLocalSearcher;
import it.cavallium.dbengine.lucene.searcher.BucketParams;
import it.cavallium.dbengine.lucene.searcher.DecimalBucketMultiSearcher;
+import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer;
import it.cavallium.dbengine.lucene.searcher.LocalQueryParams;
import it.cavallium.dbengine.lucene.searcher.LocalSearcher;
-import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer;
-import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -60,11 +58,9 @@ import org.apache.lucene.index.ConcurrentMergeScheduler;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
-import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.MergeScheduler;
import org.apache.lucene.index.SerialMergeScheduler;
import org.apache.lucene.index.SnapshotDeletionPolicy;
-import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.misc.store.DirectIODirectory;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.store.ByteBuffersDirectory;
@@ -76,8 +72,6 @@ import org.apache.lucene.store.NRTCachingDirectory;
import org.apache.lucene.util.Constants;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import org.warp.commonutils.functional.IORunnable;
-import org.warp.commonutils.type.ShortNamedThreadFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
@@ -97,8 +91,15 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private static final ReentrantLock shutdownLock = new ReentrantLock();
private static final Scheduler luceneHeavyTasksScheduler = uninterruptibleScheduler(Schedulers.single(Schedulers.boundedElastic()));
+ private final Counter startedDocIndexings;
+ private final Counter endeddDocIndexings;
+ private final Timer docIndexingTime;
+ private final Timer snapshotTime;
+ private final Timer flushTime;
+ private final Timer commitTime;
+ private final Timer mergeTime;
+ private final Timer refreshTime;
- private final MeterRegistry meterRegistry;
private final String luceneIndexName;
private final IndexWriter indexWriter;
private final SnapshotsManager snapshotsManager;
@@ -114,21 +115,27 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
public LLLocalLuceneIndex(LLTempLMDBEnv env,
@Nullable Path luceneBasePath,
MeterRegistry meterRegistry,
- String name,
+ @Nullable String clusterName,
+ @Nullable String shardName,
IndicizerAnalyzers indicizerAnalyzers,
IndicizerSimilarities indicizerSimilarities,
LuceneOptions luceneOptions,
@Nullable LuceneHacks luceneHacks) throws IOException {
- this.meterRegistry = meterRegistry;
+ if (clusterName == null && shardName == null) {
+ throw new IllegalArgumentException("Clustern name and/or shard name must be set");
+ }
+ String logName = Objects.requireNonNullElse(clusterName, shardName);
+ String luceneIndexName = Objects.requireNonNullElse(shardName, clusterName);
+
Path directoryPath;
if (luceneOptions.inMemory() != (luceneBasePath == null)) {
throw new IllegalArgumentException();
} else if (luceneBasePath != null) {
- directoryPath = luceneBasePath.resolve(name + ".lucene.db");
+ directoryPath = luceneBasePath.resolve(shardName + ".lucene.db");
} else {
directoryPath = null;
}
- if (name.length() == 0) {
+ if (luceneIndexName.length() == 0) {
throw new IOException("Empty lucene database name");
}
if (!MMapDirectory.UNMAP_SUPPORTED) {
@@ -194,7 +201,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
this.directory = directory;
}
- this.luceneIndexName = name;
+ this.luceneIndexName = luceneIndexName;
var snapshotter = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
this.lowMemory = lowMemory;
this.luceneAnalyzer = LuceneUtils.toPerFieldAnalyzerWrapper(indicizerAnalyzers);
@@ -251,6 +258,15 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
luceneOptions.queryRefreshDebounceTime()
);
+ this.startedDocIndexings = meterRegistry.counter("index.write.doc.started.counter", "index.name", logName);
+ this.endeddDocIndexings = meterRegistry.counter("index.write.doc.ended.counter", "index.name", logName);
+ this.docIndexingTime = Timer.builder("index.write.doc.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry);
+ this.snapshotTime = Timer.builder("index.write.snapshot.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry);
+ this.flushTime = Timer.builder("index.write.flush.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry);
+ this.commitTime = Timer.builder("index.write.commit.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry);
+ this.mergeTime = Timer.builder("index.write.merge.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry);
+ this.refreshTime = Timer.builder("index.search.refresh.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry);
+
// Start scheduled tasks
var commitMillis = luceneOptions.commitDebounceTime().toMillis();
luceneHeavyTasksScheduler.schedulePeriodically(this::scheduledCommit, commitMillis, commitMillis,
@@ -271,7 +287,10 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
@Override
public Mono takeSnapshot() {
- return snapshotsManager.takeSnapshot().transform(this::ensureOpen);
+ return snapshotsManager.takeSnapshot().elapsed().map(elapsed -> {
+ snapshotTime.record(elapsed.getT1(), TimeUnit.MILLISECONDS);
+ return elapsed.getT2();
+ }).transform(this::ensureOpen);
}
private Mono ensureOpen(Mono mono) {
@@ -293,49 +312,75 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
@Override
public Mono releaseSnapshot(LLSnapshot snapshot) {
- return snapshotsManager.releaseSnapshot(snapshot);
+ return snapshotsManager
+ .releaseSnapshot(snapshot)
+ .elapsed()
+ .doOnNext(elapsed -> snapshotTime.record(elapsed.getT1(), TimeUnit.MILLISECONDS))
+ .then();
}
@Override
public Mono addDocument(LLTerm key, LLUpdateDocument doc) {
- return this.runSafe(() -> {
- indexWriter.addDocument(toDocument(doc));
+ return this.runSafe(() -> docIndexingTime.recordCallable(() -> {
+ startedDocIndexings.increment();
+ try {
+ indexWriter.addDocument(toDocument(doc));
+ } finally {
+ endeddDocIndexings.increment();
+ }
return null;
- }).transform(this::ensureOpen);
+ })).transform(this::ensureOpen);
}
@Override
public Mono addDocuments(Flux> documents) {
- return documents.collectList().flatMap(documentsList -> this.runSafe(() -> {
- indexWriter.addDocuments(LLUtils.toDocumentsFromEntries(documentsList));
+ return documents.collectList().flatMap(documentsList -> this.runSafe(() -> docIndexingTime.recordCallable(() -> {
+ double count = documentsList.size();
+ startedDocIndexings.increment(count);
+ try {
+ indexWriter.addDocuments(LLUtils.toDocumentsFromEntries(documentsList));
+ } finally {
+ endeddDocIndexings.increment(count);
+ }
return null;
- })).transform(this::ensureOpen);
+ }))).transform(this::ensureOpen);
}
@Override
public Mono deleteDocument(LLTerm id) {
- return this.runSafe(() -> {
- indexWriter.deleteDocuments(LLUtils.toTerm(id));
+ return this.runSafe(() -> docIndexingTime.recordCallable(() -> {
+ startedDocIndexings.increment();
+ try {
+ indexWriter.deleteDocuments(LLUtils.toTerm(id));
+ } finally {
+ endeddDocIndexings.increment();
+ }
return null;
- }).transform(this::ensureOpen);
+ })).transform(this::ensureOpen);
}
@Override
public Mono update(LLTerm id, LLIndexRequest request) {
return this
- .runSafe(() -> {
- switch (request) {
- case LLUpdateDocument updateDocument ->
- indexWriter.updateDocument(LLUtils.toTerm(id), toDocument(updateDocument));
- case LLSoftUpdateDocument softUpdateDocument -> indexWriter.softUpdateDocument(LLUtils.toTerm(id),
- toDocument(softUpdateDocument.items()), toFields(softUpdateDocument.softDeleteItems()));
- case LLUpdateFields updateFields ->
- indexWriter.updateDocValues(LLUtils.toTerm(id), toFields(updateFields.items()));
- case null, default -> throw new UnsupportedOperationException("Unexpected request type: " + request);
+ .runSafe(() -> docIndexingTime.recordCallable(() -> {
+ startedDocIndexings.increment();
+ try {
+ switch (request) {
+ case LLUpdateDocument updateDocument ->
+ indexWriter.updateDocument(LLUtils.toTerm(id), toDocument(updateDocument));
+ case LLSoftUpdateDocument softUpdateDocument ->
+ indexWriter.softUpdateDocument(LLUtils.toTerm(id), toDocument(softUpdateDocument.items()),
+ toFields(softUpdateDocument.softDeleteItems()));
+ case LLUpdateFields updateFields -> indexWriter.updateDocValues(LLUtils.toTerm(id),
+ toFields(updateFields.items()));
+ case null, default -> throw new UnsupportedOperationException("Unexpected request type: " + request);
+ }
+ } finally {
+ endeddDocIndexings.increment();
}
return null;
- })
+ }))
.transform(this::ensureOpen);
}
@@ -349,7 +394,15 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
for (Entry entry : documentsMap.entrySet()) {
LLTerm key = entry.getKey();
LLUpdateDocument value = entry.getValue();
- indexWriter.updateDocument(LLUtils.toTerm(key), toDocument(value));
+ startedDocIndexings.increment();
+ try {
+ docIndexingTime.recordCallable(() -> {
+ indexWriter.updateDocument(LLUtils.toTerm(key), toDocument(value));
+ return null;
+ });
+ } finally {
+ endeddDocIndexings.increment();
+ }
}
return null;
}).transform(this::ensureOpen);
@@ -469,7 +522,10 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
if (activeTasks.isTerminated()) return null;
shutdownLock.lock();
try {
- indexWriter.flush();
+ flushTime.recordCallable(() -> {
+ indexWriter.flush();
+ return null;
+ });
} finally {
shutdownLock.unlock();
}
@@ -488,11 +544,14 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
if (activeTasks.isTerminated()) return null;
shutdownLock.lock();
try {
- if (force) {
- searcherManager.maybeRefreshBlocking();
- } else {
- searcherManager.maybeRefresh();
- }
+ refreshTime.recordCallable(() -> {
+ if (force) {
+ searcherManager.maybeRefreshBlocking();
+ } else {
+ searcherManager.maybeRefresh();
+ }
+ return null;
+ });
} finally {
shutdownLock.unlock();
}
@@ -507,8 +566,11 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private void scheduledCommit() {
shutdownLock.lock();
try {
- indexWriter.commit();
- } catch (IOException ex) {
+ commitTime.recordCallable(() -> {
+ indexWriter.commit();
+ return null;
+ });
+ } catch (Exception ex) {
logger.error(MARKER_LUCENE, "Failed to execute a scheduled commit", ex);
} finally {
shutdownLock.unlock();
@@ -518,8 +580,11 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private void scheduledMerge() {
shutdownLock.lock();
try {
- indexWriter.maybeMerge();
- } catch (IOException ex) {
+ mergeTime.recordCallable(() -> {
+ indexWriter.maybeMerge();
+ return null;
+ });
+ } catch (Exception ex) {
logger.error(MARKER_LUCENE, "Failed to execute a scheduled merge", ex);
} finally {
shutdownLock.unlock();
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java
index 42c6ea1..b1c9cdc 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java
@@ -65,7 +65,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
public LLLocalMultiLuceneIndex(LLTempLMDBEnv env,
Path lucene,
MeterRegistry meterRegistry,
- String name,
+ String clusterName,
int instancesCount,
IndicizerAnalyzers indicizerAnalyzers,
IndicizerSimilarities indicizerSimilarities,
@@ -79,16 +79,17 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
this.meterRegistry = meterRegistry;
LLLocalLuceneIndex[] luceneIndices = new LLLocalLuceneIndex[instancesCount];
for (int i = 0; i < instancesCount; i++) {
- String instanceName;
+ String shardName;
if (i == 0) {
- instanceName = name;
+ shardName = clusterName;
} else {
- instanceName = name + "_" + String.format("%03d", i);
+ shardName = clusterName + "_" + String.format("%03d", i);
}
luceneIndices[i] = new LLLocalLuceneIndex(env,
lucene,
meterRegistry,
- instanceName,
+ clusterName,
+ shardName,
indicizerAnalyzers,
indicizerSimilarities,
luceneOptions,
diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java
index dbac82d..e5ff4cf 100644
--- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java
+++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java
@@ -79,7 +79,8 @@ public class LLMemoryDatabaseConnection implements LLDatabaseConnection {
}
@Override
- public Mono getLuceneIndex(String name,
+ public Mono getLuceneIndex(@Nullable String clusterName,
+ @Nullable String shardName,
int instancesCount,
IndicizerAnalyzers indicizerAnalyzers,
IndicizerSimilarities indicizerSimilarities,
@@ -91,7 +92,8 @@ public class LLMemoryDatabaseConnection implements LLDatabaseConnection {
return new LLLocalLuceneIndex(env,
null,
meterRegistry,
- name,
+ clusterName,
+ shardName,
indicizerAnalyzers,
indicizerSimilarities,
luceneOptions,
diff --git a/src/test/java/it/cavallium/dbengine/LocalTemporaryDbGenerator.java b/src/test/java/it/cavallium/dbengine/LocalTemporaryDbGenerator.java
index 1caff2d..69a024b 100644
--- a/src/test/java/it/cavallium/dbengine/LocalTemporaryDbGenerator.java
+++ b/src/test/java/it/cavallium/dbengine/LocalTemporaryDbGenerator.java
@@ -80,7 +80,8 @@ public class LocalTemporaryDbGenerator implements TemporaryDbGenerator {
new DatabaseOptions(List.of(), Map.of(), true, false, true, false,
true, canUseNettyDirect, false, -1, null)
),
- conn.getLuceneIndex("testluceneindex1",
+ conn.getLuceneIndex(null,
+ "testluceneindex1",
1,
IndicizerAnalyzers.of(TextFieldsAnalyzer.WordSimple),
IndicizerSimilarities.of(TextFieldsSimilarity.Boolean),
@@ -88,6 +89,7 @@ public class LocalTemporaryDbGenerator implements TemporaryDbGenerator {
luceneHacks
),
conn.getLuceneIndex("testluceneindex16",
+ null,
3,
IndicizerAnalyzers.of(TextFieldsAnalyzer.WordSimple),
IndicizerSimilarities.of(TextFieldsSimilarity.Boolean),
diff --git a/src/test/java/it/cavallium/dbengine/MemoryTemporaryDbGenerator.java b/src/test/java/it/cavallium/dbengine/MemoryTemporaryDbGenerator.java
index cb23614..8c1ef7d 100644
--- a/src/test/java/it/cavallium/dbengine/MemoryTemporaryDbGenerator.java
+++ b/src/test/java/it/cavallium/dbengine/MemoryTemporaryDbGenerator.java
@@ -11,8 +11,8 @@ import it.cavallium.dbengine.client.IndicizerSimilarities;
import it.cavallium.dbengine.client.LuceneOptions;
import it.cavallium.dbengine.client.NRTCachingOptions;
import it.cavallium.dbengine.database.Column;
-import it.cavallium.dbengine.lucene.LuceneHacks;
import it.cavallium.dbengine.database.memory.LLMemoryDatabaseConnection;
+import it.cavallium.dbengine.lucene.LuceneHacks;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity;
import java.time.Duration;
@@ -51,7 +51,8 @@ public class MemoryTemporaryDbGenerator implements TemporaryDbGenerator {
List.of(Column.dictionary("testmap"), Column.special("ints"), Column.special("longs")),
new DatabaseOptions(List.of(), Map.of(), true, false, true, false, true, canUseNettyDirect, true, -1, null)
),
- conn.getLuceneIndex("testluceneindex1",
+ conn.getLuceneIndex(null,
+ "testluceneindex1",
1,
IndicizerAnalyzers.of(TextFieldsAnalyzer.WordSimple),
IndicizerSimilarities.of(TextFieldsSimilarity.Boolean),
@@ -59,6 +60,7 @@ public class MemoryTemporaryDbGenerator implements TemporaryDbGenerator {
luceneHacks
),
conn.getLuceneIndex("testluceneindex16",
+ null,
3,
IndicizerAnalyzers.of(TextFieldsAnalyzer.WordSimple),
IndicizerSimilarities.of(TextFieldsSimilarity.Boolean),