From 68d8b5240c53cac92f5cea53b996f275f1b9a997 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 30 Dec 2021 17:28:06 +0100 Subject: [PATCH] Implement more micrometer metrics --- pom.xml | 1 - .../database/LLDatabaseConnection.java | 3 +- .../disk/LLLocalDatabaseConnection.java | 18 +- .../database/disk/LLLocalDictionary.java | 221 +++++++++++++----- .../database/disk/LLLocalLuceneIndex.java | 167 +++++++++---- .../disk/LLLocalMultiLuceneIndex.java | 11 +- .../memory/LLMemoryDatabaseConnection.java | 6 +- .../dbengine/LocalTemporaryDbGenerator.java | 4 +- .../dbengine/MemoryTemporaryDbGenerator.java | 6 +- 9 files changed, 316 insertions(+), 121 deletions(-) diff --git a/pom.xml b/pom.xml index 96812a9..c01dcc5 100644 --- a/pom.xml +++ b/pom.xml @@ -521,7 +521,6 @@ io.soabase.recordbuilder.processor.RecordBuilderProcessor - false --enable-preview 17 17 diff --git a/src/main/java/it/cavallium/dbengine/database/LLDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/LLDatabaseConnection.java index f33407e..6fb29c1 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLDatabaseConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/LLDatabaseConnection.java @@ -24,7 +24,8 @@ public interface LLDatabaseConnection { List columns, DatabaseOptions databaseOptions); - Mono getLuceneIndex(String name, + Mono getLuceneIndex(@Nullable String clusterName, + @Nullable String shardName, int instancesCount, IndicizerAnalyzers indicizerAnalyzers, IndicizerSimilarities indicizerSimilarities, diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java index 647579e..a7d7541 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java @@ -2,24 +2,22 @@ package it.cavallium.dbengine.database.disk; import io.micrometer.core.instrument.MeterRegistry; import io.net5.buffer.api.BufferAllocator; +import it.cavallium.dbengine.client.DatabaseOptions; import it.cavallium.dbengine.client.IndicizerAnalyzers; import it.cavallium.dbengine.client.IndicizerSimilarities; import it.cavallium.dbengine.client.LuceneOptions; import it.cavallium.dbengine.database.Column; -import it.cavallium.dbengine.client.DatabaseOptions; import it.cavallium.dbengine.database.LLDatabaseConnection; import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.lucene.LuceneHacks; import it.cavallium.dbengine.netty.JMXNettyMonitoringManager; import java.nio.file.Files; import java.nio.file.Path; -import java.time.Duration; import java.util.LinkedList; import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import org.checkerframework.checker.units.qual.A; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -88,7 +86,8 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection { } @Override - public Mono getLuceneIndex(String name, + public Mono getLuceneIndex(@Nullable String clusterName, + @Nullable String shardName, int instancesCount, IndicizerAnalyzers indicizerAnalyzers, IndicizerSimilarities indicizerSimilarities, @@ -97,12 +96,18 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection { return Mono .fromCallable(() -> { var env = this.env.get(); + if (clusterName == null && shardName == null) { + throw new IllegalArgumentException("Shard name and/or cluster name must be set"); + } if (instancesCount != 1) { + if (shardName != null && !shardName.equals(clusterName)) { + throw new IllegalArgumentException("You shouldn't use a shard name for clustered instances"); + } Objects.requireNonNull(env, "Environment not set"); return new LLLocalMultiLuceneIndex(env, luceneOptions.inMemory() ? null : basePath.resolve("lucene"), meterRegistry, - name, + clusterName, instancesCount, indicizerAnalyzers, indicizerSimilarities, @@ -112,7 +117,8 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection { } else { return new LLLocalLuceneIndex(env, luceneOptions.inMemory() ? null : basePath.resolve("lucene"), meterRegistry, - name, + clusterName, + shardName, indicizerAnalyzers, indicizerSimilarities, luceneOptions, diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 37e30e6..090d725 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -8,6 +8,8 @@ import static it.cavallium.dbengine.database.LLUtils.toStringSafe; import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNullElse; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Timer; import io.net5.buffer.api.Buffer; import io.net5.buffer.api.BufferAllocator; import io.net5.buffer.api.Resource; @@ -40,6 +42,7 @@ import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.IntStream; import org.apache.commons.lang3.tuple.Pair; @@ -131,6 +134,22 @@ public class LLLocalDictionary implements LLDictionary { private final boolean nettyDirect; private final BufferAllocator alloc; + private final Counter startedUpdates; + private final Counter endedUpdates; + private final Timer updateTime; + private final Counter startedGet; + private final Counter endedGet; + private final Timer getTime; + private final Counter startedContains; + private final Counter endedContains; + private final Timer containsTime; + private final Counter startedPut; + private final Counter endedPut; + private final Timer putTime; + private final Counter startedRemove; + private final Counter endedRemove; + private final Timer removeTime; + public LLLocalDictionary( BufferAllocator allocator, @NotNull RocksDBColumn db, @@ -151,6 +170,48 @@ public class LLLocalDictionary implements LLDictionary { this.databaseOptions = databaseOptions; alloc = allocator; this.nettyDirect = databaseOptions.allowNettyDirect() && alloc.getAllocationType() == OFF_HEAP; + var meterRegistry = db.getMeterRegistry(); + + this.startedGet = meterRegistry.counter("db.read.map.get.started.counter", "db.name", databaseName, "db.column", columnName); + this.endedGet = meterRegistry.counter("db.read.map.get.ended.counter", "db.name", databaseName, "db.column", columnName); + this.getTime = Timer + .builder("db.read.get.timer") + .publishPercentiles(0.2, 0.5, 0.95) + .publishPercentileHistogram() + .tags("db.name", databaseName, "db.column", columnName) + .register(meterRegistry); + this.startedContains = meterRegistry.counter("db.read.map.get.started.counter", "db.name", databaseName, "db.column", columnName); + this.endedContains = meterRegistry.counter("db.read.map.get.ended.counter", "db.name", databaseName, "db.column", columnName); + this.containsTime = Timer + .builder("db.read.get.timer") + .publishPercentiles(0.2, 0.5, 0.95) + .publishPercentileHistogram() + .tags("db.name", databaseName, "db.column", columnName) + .register(meterRegistry); + this.startedUpdates = meterRegistry.counter("db.write.map.update.started.counter", "db.name", databaseName, "db.column", columnName); + this.endedUpdates = meterRegistry.counter("db.write.map.update.ended.counter", "db.name", databaseName, "db.column", columnName); + this.updateTime = Timer + .builder("db.write.map.update.timer") + .publishPercentiles(0.2, 0.5, 0.95) + .publishPercentileHistogram() + .tags("db.name", databaseName, "db.column", columnName) + .register(meterRegistry); + this.startedPut = meterRegistry.counter("db.write.map.put.started.counter", "db.name", databaseName, "db.column", columnName); + this.endedPut = meterRegistry.counter("db.write.map.put.ended.counter", "db.name", databaseName, "db.column", columnName); + this.putTime = Timer + .builder("db.write.map.put.timer") + .publishPercentiles(0.2, 0.5, 0.95) + .publishPercentileHistogram() + .tags("db.name", databaseName, "db.column", columnName) + .register(meterRegistry); + this.startedRemove = meterRegistry.counter("db.write.map.remove.started.counter", "db.name", databaseName, "db.column", columnName); + this.endedRemove = meterRegistry.counter("db.write.map.remove.ended.counter", "db.name", databaseName, "db.column", columnName); + this.removeTime = Timer + .builder("db.write.map.remove.timer") + .publishPercentiles(0.2, 0.5, 0.95) + .publishPercentileHistogram() + .tags("db.name", databaseName, "db.column", columnName) + .register(meterRegistry); } @Override @@ -205,7 +266,13 @@ public class LLLocalDictionary implements LLDictionary { try (var key = keySend.receive()) { try { var readOptions = requireNonNullElse(resolveSnapshot(snapshot), EMPTY_READ_OPTIONS); - var result = db.get(readOptions, key, existsAlmostCertainly); + Buffer result; + startedGet.increment(); + try { + result = getTime.recordCallable(() -> db.get(readOptions, key, existsAlmostCertainly)); + } finally { + endedGet.increment(); + } logger.trace(MARKER_ROCKSDB, "Read {}: {}", () -> toStringSafe(key), () -> toStringSafe(result)); if (result != null) { sink.next(result.send()); @@ -236,52 +303,65 @@ public class LLLocalDictionary implements LLDictionary { public boolean containsRange(@Nullable LLSnapshot snapshot, LLRange range) throws RocksDBException { assert !Schedulers.isInNonBlockingThread() : "Called containsRange in a nonblocking thread"; - if (range.isSingle()) { - var unmodifiableReadOpts = resolveSnapshot(snapshot); - return db.exists(unmodifiableReadOpts, range.getSingleUnsafe()); - } else { - // Temporary resources to release after finished - AbstractSlice slice1 = null; - AbstractSlice slice2 = null; - try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { - readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); - readOpts.setFillCache(false); - if (range.hasMin()) { - var rangeMinInternalByteBuffer = asReadOnlyDirect(range.getMinUnsafe()); - if (nettyDirect && rangeMinInternalByteBuffer != null) { - readOpts.setIterateLowerBound(slice1 = new DirectSlice(rangeMinInternalByteBuffer, - range.getMinUnsafe().readableBytes())); - } else { - readOpts.setIterateLowerBound(slice1 = new Slice(LLUtils.toArray(range.getMinUnsafe()))); - } - } - if (range.hasMax()) { - var rangeMaxInternalByteBuffer = asReadOnlyDirect(range.getMaxUnsafe()); - if (nettyDirect && rangeMaxInternalByteBuffer != null) { - readOpts.setIterateUpperBound(slice2 = new DirectSlice(rangeMaxInternalByteBuffer, - range.getMaxUnsafe().readableBytes())); - } else { - readOpts.setIterateUpperBound(slice2 = new Slice(LLUtils.toArray(range.getMaxUnsafe()))); - } - } - try (RocksIterator rocksIterator = db.newIterator(readOpts)) { - if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - var rangeMinInternalByteBuffer = asReadOnlyDirect(range.getMinUnsafe()); - if (nettyDirect && rangeMinInternalByteBuffer != null) { - rocksIterator.seek(rangeMinInternalByteBuffer); - } else { - rocksIterator.seek(LLUtils.toArray(range.getMinUnsafe())); + startedContains.increment(); + try { + var result = containsTime.recordCallable(() -> { + if (range.isSingle()) { + var unmodifiableReadOpts = resolveSnapshot(snapshot); + return db.exists(unmodifiableReadOpts, range.getSingleUnsafe()); + } else { + // Temporary resources to release after finished + AbstractSlice slice1 = null; + AbstractSlice slice2 = null; + try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { + readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); + readOpts.setFillCache(false); + if (range.hasMin()) { + var rangeMinInternalByteBuffer = asReadOnlyDirect(range.getMinUnsafe()); + if (nettyDirect && rangeMinInternalByteBuffer != null) { + readOpts.setIterateLowerBound(slice1 = new DirectSlice(rangeMinInternalByteBuffer, + range.getMinUnsafe().readableBytes())); + } else { + readOpts.setIterateLowerBound(slice1 = new Slice(LLUtils.toArray(range.getMinUnsafe()))); + } } - } else { - rocksIterator.seekToFirst(); + if (range.hasMax()) { + var rangeMaxInternalByteBuffer = asReadOnlyDirect(range.getMaxUnsafe()); + if (nettyDirect && rangeMaxInternalByteBuffer != null) { + readOpts.setIterateUpperBound(slice2 = new DirectSlice(rangeMaxInternalByteBuffer, + range.getMaxUnsafe().readableBytes())); + } else { + readOpts.setIterateUpperBound(slice2 = new Slice(LLUtils.toArray(range.getMaxUnsafe()))); + } + } + try (RocksIterator rocksIterator = db.newIterator(readOpts)) { + if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { + var rangeMinInternalByteBuffer = asReadOnlyDirect(range.getMinUnsafe()); + if (nettyDirect && rangeMinInternalByteBuffer != null) { + rocksIterator.seek(rangeMinInternalByteBuffer); + } else { + rocksIterator.seek(LLUtils.toArray(range.getMinUnsafe())); + } + } else { + rocksIterator.seekToFirst(); + } + rocksIterator.status(); + return rocksIterator.isValid(); + } + } finally { + if (slice1 != null) slice1.close(); + if (slice2 != null) slice2.close(); } - rocksIterator.status(); - return rocksIterator.isValid(); } - } finally { - if (slice1 != null) slice1.close(); - if (slice2 != null) slice2.close(); - } + }); + assert result != null; + return result; + } catch (RocksDBException | RuntimeException e) { + throw e; + } catch (Exception ex) { + throw new RuntimeException(ex); + } finally { + endedContains.increment(); } } @@ -298,8 +378,21 @@ public class LLLocalDictionary implements LLDictionary { } private boolean containsKey(@Nullable LLSnapshot snapshot, Buffer key) throws RocksDBException { - var unmodifiableReadOpts = resolveSnapshot(snapshot); - return db.exists(unmodifiableReadOpts, key); + startedContains.increment(); + try { + var result = containsTime.recordCallable(() -> { + var unmodifiableReadOpts = resolveSnapshot(snapshot); + return db.exists(unmodifiableReadOpts, key); + }); + assert result != null; + return result; + } catch (RocksDBException | RuntimeException e) { + throw e; + } catch (Exception ex) { + throw new RuntimeException(ex); + } finally { + endedContains.increment(); + } } @Override @@ -340,7 +433,14 @@ public class LLLocalDictionary implements LLDictionary { safeCloseable.close(); } }) - .onErrorMap(cause -> new IOException("Failed to write", cause)); + .onErrorMap(cause -> new IOException("Failed to write", cause)) + .elapsed() + .map(tuple -> { + putTime.record(tuple.getT1(), TimeUnit.MILLISECONDS); + return tuple.getT2(); + }) + .doFirst(startedPut::increment) + .doFinally(s -> endedPut.increment()); } @Override @@ -366,8 +466,15 @@ public class LLLocalDictionary implements LLDictionary { case GET_NEW_VALUE -> UpdateAtomicResultMode.CURRENT; case GET_OLD_VALUE -> UpdateAtomicResultMode.PREVIOUS; }; - UpdateAtomicResult result = db.updateAtomic(EMPTY_READ_OPTIONS, EMPTY_WRITE_OPTIONS, keySend, updater, - existsAlmostCertainly, returnMode); + UpdateAtomicResult result; + startedUpdates.increment(); + try { + result = updateTime.recordCallable(() -> db.updateAtomic(EMPTY_READ_OPTIONS, + EMPTY_WRITE_OPTIONS, keySend, updater, existsAlmostCertainly, returnMode)); + } finally { + endedUpdates.increment(); + } + assert result != null; return switch (updateReturnMode) { case NOTHING -> null; case GET_NEW_VALUE -> ((UpdateAtomicResultCurrent) result).current(); @@ -395,8 +502,15 @@ public class LLLocalDictionary implements LLDictionary { throw new UnsupportedOperationException("update() is disallowed because the database doesn't support" + "safe atomic operations"); } - UpdateAtomicResult result = db.updateAtomic(EMPTY_READ_OPTIONS, EMPTY_WRITE_OPTIONS, keySend, updater, - existsAlmostCertainly, UpdateAtomicResultMode.DELTA); + UpdateAtomicResult result; + startedUpdates.increment(); + try { + result = updateTime.recordCallable(() -> db.updateAtomic(EMPTY_READ_OPTIONS, + EMPTY_WRITE_OPTIONS, keySend, updater, existsAlmostCertainly, UpdateAtomicResultMode.DELTA)); + } finally { + endedUpdates.increment(); + } + assert result != null; return ((UpdateAtomicResultDelta) result).delta(); }).onErrorMap(cause -> new IOException("Failed to read or write", cause)), keySend -> Mono.fromRunnable(keySend::close)).doOnDiscard(UpdateAtomicResult.class, uar -> { @@ -431,7 +545,10 @@ public class LLLocalDictionary implements LLDictionary { ) .singleOrEmpty(), keySend -> Mono.fromRunnable(keySend::close) - ); + ).elapsed().map(tuple -> { + removeTime.record(tuple.getT1(), TimeUnit.MILLISECONDS); + return tuple.getT2(); + }).doFirst(startedRemove::increment).doFinally(s -> endedRemove.increment()); } private Mono> getPreviousData(Mono> keyMono, LLDictionaryResultType resultType, diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index 1760e41..baf1b95 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -6,10 +6,11 @@ import static it.cavallium.dbengine.database.LLUtils.toDocument; import static it.cavallium.dbengine.database.LLUtils.toFields; import static it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.NO_TRANSFORMATION; +import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Timer; import io.net5.buffer.api.Resource; import io.net5.buffer.api.Send; -import io.net5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.client.DirectIOOptions; import it.cavallium.dbengine.client.IndicizerAnalyzers; import it.cavallium.dbengine.client.IndicizerSimilarities; @@ -19,36 +20,33 @@ import it.cavallium.dbengine.client.query.QueryParser; import it.cavallium.dbengine.client.query.current.data.Query; import it.cavallium.dbengine.client.query.current.data.QueryParams; import it.cavallium.dbengine.database.LLIndexRequest; -import it.cavallium.dbengine.database.LLSoftUpdateDocument; -import it.cavallium.dbengine.database.LLUpdateDocument; -import it.cavallium.dbengine.database.LLItem; import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.database.LLSearchResultShard; import it.cavallium.dbengine.database.LLSnapshot; +import it.cavallium.dbengine.database.LLSoftUpdateDocument; import it.cavallium.dbengine.database.LLTerm; +import it.cavallium.dbengine.database.LLUpdateDocument; import it.cavallium.dbengine.database.LLUpdateFields; import it.cavallium.dbengine.database.LLUtils; -import it.cavallium.dbengine.lucene.LuceneHacks; import it.cavallium.dbengine.lucene.AlwaysDirectIOFSDirectory; +import it.cavallium.dbengine.lucene.LuceneHacks; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.collector.Buckets; import it.cavallium.dbengine.lucene.searcher.AdaptiveLocalSearcher; import it.cavallium.dbengine.lucene.searcher.BucketParams; import it.cavallium.dbengine.lucene.searcher.DecimalBucketMultiSearcher; +import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer; import it.cavallium.dbengine.lucene.searcher.LocalQueryParams; import it.cavallium.dbengine.lucene.searcher.LocalSearcher; -import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer; -import it.unimi.dsi.fastutil.doubles.DoubleArrayList; import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -60,11 +58,9 @@ import org.apache.lucene.index.ConcurrentMergeScheduler; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; -import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.MergeScheduler; import org.apache.lucene.index.SerialMergeScheduler; import org.apache.lucene.index.SnapshotDeletionPolicy; -import org.apache.lucene.index.TieredMergePolicy; import org.apache.lucene.misc.store.DirectIODirectory; import org.apache.lucene.search.similarities.Similarity; import org.apache.lucene.store.ByteBuffersDirectory; @@ -76,8 +72,6 @@ import org.apache.lucene.store.NRTCachingDirectory; import org.apache.lucene.util.Constants; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import org.warp.commonutils.functional.IORunnable; -import org.warp.commonutils.type.ShortNamedThreadFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; @@ -97,8 +91,15 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { private static final ReentrantLock shutdownLock = new ReentrantLock(); private static final Scheduler luceneHeavyTasksScheduler = uninterruptibleScheduler(Schedulers.single(Schedulers.boundedElastic())); + private final Counter startedDocIndexings; + private final Counter endeddDocIndexings; + private final Timer docIndexingTime; + private final Timer snapshotTime; + private final Timer flushTime; + private final Timer commitTime; + private final Timer mergeTime; + private final Timer refreshTime; - private final MeterRegistry meterRegistry; private final String luceneIndexName; private final IndexWriter indexWriter; private final SnapshotsManager snapshotsManager; @@ -114,21 +115,27 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { public LLLocalLuceneIndex(LLTempLMDBEnv env, @Nullable Path luceneBasePath, MeterRegistry meterRegistry, - String name, + @Nullable String clusterName, + @Nullable String shardName, IndicizerAnalyzers indicizerAnalyzers, IndicizerSimilarities indicizerSimilarities, LuceneOptions luceneOptions, @Nullable LuceneHacks luceneHacks) throws IOException { - this.meterRegistry = meterRegistry; + if (clusterName == null && shardName == null) { + throw new IllegalArgumentException("Clustern name and/or shard name must be set"); + } + String logName = Objects.requireNonNullElse(clusterName, shardName); + String luceneIndexName = Objects.requireNonNullElse(shardName, clusterName); + Path directoryPath; if (luceneOptions.inMemory() != (luceneBasePath == null)) { throw new IllegalArgumentException(); } else if (luceneBasePath != null) { - directoryPath = luceneBasePath.resolve(name + ".lucene.db"); + directoryPath = luceneBasePath.resolve(shardName + ".lucene.db"); } else { directoryPath = null; } - if (name.length() == 0) { + if (luceneIndexName.length() == 0) { throw new IOException("Empty lucene database name"); } if (!MMapDirectory.UNMAP_SUPPORTED) { @@ -194,7 +201,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { this.directory = directory; } - this.luceneIndexName = name; + this.luceneIndexName = luceneIndexName; var snapshotter = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()); this.lowMemory = lowMemory; this.luceneAnalyzer = LuceneUtils.toPerFieldAnalyzerWrapper(indicizerAnalyzers); @@ -251,6 +258,15 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { luceneOptions.queryRefreshDebounceTime() ); + this.startedDocIndexings = meterRegistry.counter("index.write.doc.started.counter", "index.name", logName); + this.endeddDocIndexings = meterRegistry.counter("index.write.doc.ended.counter", "index.name", logName); + this.docIndexingTime = Timer.builder("index.write.doc.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry); + this.snapshotTime = Timer.builder("index.write.snapshot.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry); + this.flushTime = Timer.builder("index.write.flush.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry); + this.commitTime = Timer.builder("index.write.commit.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry); + this.mergeTime = Timer.builder("index.write.merge.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry); + this.refreshTime = Timer.builder("index.search.refresh.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry); + // Start scheduled tasks var commitMillis = luceneOptions.commitDebounceTime().toMillis(); luceneHeavyTasksScheduler.schedulePeriodically(this::scheduledCommit, commitMillis, commitMillis, @@ -271,7 +287,10 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { @Override public Mono takeSnapshot() { - return snapshotsManager.takeSnapshot().transform(this::ensureOpen); + return snapshotsManager.takeSnapshot().elapsed().map(elapsed -> { + snapshotTime.record(elapsed.getT1(), TimeUnit.MILLISECONDS); + return elapsed.getT2(); + }).transform(this::ensureOpen); } private Mono ensureOpen(Mono mono) { @@ -293,49 +312,75 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { @Override public Mono releaseSnapshot(LLSnapshot snapshot) { - return snapshotsManager.releaseSnapshot(snapshot); + return snapshotsManager + .releaseSnapshot(snapshot) + .elapsed() + .doOnNext(elapsed -> snapshotTime.record(elapsed.getT1(), TimeUnit.MILLISECONDS)) + .then(); } @Override public Mono addDocument(LLTerm key, LLUpdateDocument doc) { - return this.runSafe(() -> { - indexWriter.addDocument(toDocument(doc)); + return this.runSafe(() -> docIndexingTime.recordCallable(() -> { + startedDocIndexings.increment(); + try { + indexWriter.addDocument(toDocument(doc)); + } finally { + endeddDocIndexings.increment(); + } return null; - }).transform(this::ensureOpen); + })).transform(this::ensureOpen); } @Override public Mono addDocuments(Flux> documents) { - return documents.collectList().flatMap(documentsList -> this.runSafe(() -> { - indexWriter.addDocuments(LLUtils.toDocumentsFromEntries(documentsList)); + return documents.collectList().flatMap(documentsList -> this.runSafe(() -> docIndexingTime.recordCallable(() -> { + double count = documentsList.size(); + startedDocIndexings.increment(count); + try { + indexWriter.addDocuments(LLUtils.toDocumentsFromEntries(documentsList)); + } finally { + endeddDocIndexings.increment(count); + } return null; - })).transform(this::ensureOpen); + }))).transform(this::ensureOpen); } @Override public Mono deleteDocument(LLTerm id) { - return this.runSafe(() -> { - indexWriter.deleteDocuments(LLUtils.toTerm(id)); + return this.runSafe(() -> docIndexingTime.recordCallable(() -> { + startedDocIndexings.increment(); + try { + indexWriter.deleteDocuments(LLUtils.toTerm(id)); + } finally { + endeddDocIndexings.increment(); + } return null; - }).transform(this::ensureOpen); + })).transform(this::ensureOpen); } @Override public Mono update(LLTerm id, LLIndexRequest request) { return this - .runSafe(() -> { - switch (request) { - case LLUpdateDocument updateDocument -> - indexWriter.updateDocument(LLUtils.toTerm(id), toDocument(updateDocument)); - case LLSoftUpdateDocument softUpdateDocument -> indexWriter.softUpdateDocument(LLUtils.toTerm(id), - toDocument(softUpdateDocument.items()), toFields(softUpdateDocument.softDeleteItems())); - case LLUpdateFields updateFields -> - indexWriter.updateDocValues(LLUtils.toTerm(id), toFields(updateFields.items())); - case null, default -> throw new UnsupportedOperationException("Unexpected request type: " + request); + .runSafe(() -> docIndexingTime.recordCallable(() -> { + startedDocIndexings.increment(); + try { + switch (request) { + case LLUpdateDocument updateDocument -> + indexWriter.updateDocument(LLUtils.toTerm(id), toDocument(updateDocument)); + case LLSoftUpdateDocument softUpdateDocument -> + indexWriter.softUpdateDocument(LLUtils.toTerm(id), toDocument(softUpdateDocument.items()), + toFields(softUpdateDocument.softDeleteItems())); + case LLUpdateFields updateFields -> indexWriter.updateDocValues(LLUtils.toTerm(id), + toFields(updateFields.items())); + case null, default -> throw new UnsupportedOperationException("Unexpected request type: " + request); + } + } finally { + endeddDocIndexings.increment(); } return null; - }) + })) .transform(this::ensureOpen); } @@ -349,7 +394,15 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { for (Entry entry : documentsMap.entrySet()) { LLTerm key = entry.getKey(); LLUpdateDocument value = entry.getValue(); - indexWriter.updateDocument(LLUtils.toTerm(key), toDocument(value)); + startedDocIndexings.increment(); + try { + docIndexingTime.recordCallable(() -> { + indexWriter.updateDocument(LLUtils.toTerm(key), toDocument(value)); + return null; + }); + } finally { + endeddDocIndexings.increment(); + } } return null; }).transform(this::ensureOpen); @@ -469,7 +522,10 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { if (activeTasks.isTerminated()) return null; shutdownLock.lock(); try { - indexWriter.flush(); + flushTime.recordCallable(() -> { + indexWriter.flush(); + return null; + }); } finally { shutdownLock.unlock(); } @@ -488,11 +544,14 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { if (activeTasks.isTerminated()) return null; shutdownLock.lock(); try { - if (force) { - searcherManager.maybeRefreshBlocking(); - } else { - searcherManager.maybeRefresh(); - } + refreshTime.recordCallable(() -> { + if (force) { + searcherManager.maybeRefreshBlocking(); + } else { + searcherManager.maybeRefresh(); + } + return null; + }); } finally { shutdownLock.unlock(); } @@ -507,8 +566,11 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { private void scheduledCommit() { shutdownLock.lock(); try { - indexWriter.commit(); - } catch (IOException ex) { + commitTime.recordCallable(() -> { + indexWriter.commit(); + return null; + }); + } catch (Exception ex) { logger.error(MARKER_LUCENE, "Failed to execute a scheduled commit", ex); } finally { shutdownLock.unlock(); @@ -518,8 +580,11 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { private void scheduledMerge() { shutdownLock.lock(); try { - indexWriter.maybeMerge(); - } catch (IOException ex) { + mergeTime.recordCallable(() -> { + indexWriter.maybeMerge(); + return null; + }); + } catch (Exception ex) { logger.error(MARKER_LUCENE, "Failed to execute a scheduled merge", ex); } finally { shutdownLock.unlock(); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index 42c6ea1..b1c9cdc 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -65,7 +65,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { public LLLocalMultiLuceneIndex(LLTempLMDBEnv env, Path lucene, MeterRegistry meterRegistry, - String name, + String clusterName, int instancesCount, IndicizerAnalyzers indicizerAnalyzers, IndicizerSimilarities indicizerSimilarities, @@ -79,16 +79,17 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { this.meterRegistry = meterRegistry; LLLocalLuceneIndex[] luceneIndices = new LLLocalLuceneIndex[instancesCount]; for (int i = 0; i < instancesCount; i++) { - String instanceName; + String shardName; if (i == 0) { - instanceName = name; + shardName = clusterName; } else { - instanceName = name + "_" + String.format("%03d", i); + shardName = clusterName + "_" + String.format("%03d", i); } luceneIndices[i] = new LLLocalLuceneIndex(env, lucene, meterRegistry, - instanceName, + clusterName, + shardName, indicizerAnalyzers, indicizerSimilarities, luceneOptions, diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java index dbac82d..e5ff4cf 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java @@ -79,7 +79,8 @@ public class LLMemoryDatabaseConnection implements LLDatabaseConnection { } @Override - public Mono getLuceneIndex(String name, + public Mono getLuceneIndex(@Nullable String clusterName, + @Nullable String shardName, int instancesCount, IndicizerAnalyzers indicizerAnalyzers, IndicizerSimilarities indicizerSimilarities, @@ -91,7 +92,8 @@ public class LLMemoryDatabaseConnection implements LLDatabaseConnection { return new LLLocalLuceneIndex(env, null, meterRegistry, - name, + clusterName, + shardName, indicizerAnalyzers, indicizerSimilarities, luceneOptions, diff --git a/src/test/java/it/cavallium/dbengine/LocalTemporaryDbGenerator.java b/src/test/java/it/cavallium/dbengine/LocalTemporaryDbGenerator.java index 1caff2d..69a024b 100644 --- a/src/test/java/it/cavallium/dbengine/LocalTemporaryDbGenerator.java +++ b/src/test/java/it/cavallium/dbengine/LocalTemporaryDbGenerator.java @@ -80,7 +80,8 @@ public class LocalTemporaryDbGenerator implements TemporaryDbGenerator { new DatabaseOptions(List.of(), Map.of(), true, false, true, false, true, canUseNettyDirect, false, -1, null) ), - conn.getLuceneIndex("testluceneindex1", + conn.getLuceneIndex(null, + "testluceneindex1", 1, IndicizerAnalyzers.of(TextFieldsAnalyzer.WordSimple), IndicizerSimilarities.of(TextFieldsSimilarity.Boolean), @@ -88,6 +89,7 @@ public class LocalTemporaryDbGenerator implements TemporaryDbGenerator { luceneHacks ), conn.getLuceneIndex("testluceneindex16", + null, 3, IndicizerAnalyzers.of(TextFieldsAnalyzer.WordSimple), IndicizerSimilarities.of(TextFieldsSimilarity.Boolean), diff --git a/src/test/java/it/cavallium/dbengine/MemoryTemporaryDbGenerator.java b/src/test/java/it/cavallium/dbengine/MemoryTemporaryDbGenerator.java index cb23614..8c1ef7d 100644 --- a/src/test/java/it/cavallium/dbengine/MemoryTemporaryDbGenerator.java +++ b/src/test/java/it/cavallium/dbengine/MemoryTemporaryDbGenerator.java @@ -11,8 +11,8 @@ import it.cavallium.dbengine.client.IndicizerSimilarities; import it.cavallium.dbengine.client.LuceneOptions; import it.cavallium.dbengine.client.NRTCachingOptions; import it.cavallium.dbengine.database.Column; -import it.cavallium.dbengine.lucene.LuceneHacks; import it.cavallium.dbengine.database.memory.LLMemoryDatabaseConnection; +import it.cavallium.dbengine.lucene.LuceneHacks; import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer; import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity; import java.time.Duration; @@ -51,7 +51,8 @@ public class MemoryTemporaryDbGenerator implements TemporaryDbGenerator { List.of(Column.dictionary("testmap"), Column.special("ints"), Column.special("longs")), new DatabaseOptions(List.of(), Map.of(), true, false, true, false, true, canUseNettyDirect, true, -1, null) ), - conn.getLuceneIndex("testluceneindex1", + conn.getLuceneIndex(null, + "testluceneindex1", 1, IndicizerAnalyzers.of(TextFieldsAnalyzer.WordSimple), IndicizerSimilarities.of(TextFieldsSimilarity.Boolean), @@ -59,6 +60,7 @@ public class MemoryTemporaryDbGenerator implements TemporaryDbGenerator { luceneHacks ), conn.getLuceneIndex("testluceneindex16", + null, 3, IndicizerAnalyzers.of(TextFieldsAnalyzer.WordSimple), IndicizerSimilarities.of(TextFieldsSimilarity.Boolean),