Implement more micrometer metrics

This commit is contained in:
Andrea Cavalli 2021-12-30 17:28:06 +01:00
parent 5769bc7076
commit 68d8b5240c
9 changed files with 316 additions and 121 deletions

View File

@ -521,7 +521,6 @@
<annotationProcessors>
<annotationProcessor>io.soabase.recordbuilder.processor.RecordBuilderProcessor</annotationProcessor>
</annotationProcessors>
<useIncrementalCompilation>false</useIncrementalCompilation>
<compilerArgs>--enable-preview</compilerArgs>
<source>17</source>
<target>17</target>

View File

@ -24,7 +24,8 @@ public interface LLDatabaseConnection {
List<Column> columns,
DatabaseOptions databaseOptions);
Mono<? extends LLLuceneIndex> getLuceneIndex(String name,
Mono<? extends LLLuceneIndex> getLuceneIndex(@Nullable String clusterName,
@Nullable String shardName,
int instancesCount,
IndicizerAnalyzers indicizerAnalyzers,
IndicizerSimilarities indicizerSimilarities,

View File

@ -2,24 +2,22 @@ package it.cavallium.dbengine.database.disk;
import io.micrometer.core.instrument.MeterRegistry;
import io.net5.buffer.api.BufferAllocator;
import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.client.IndicizerAnalyzers;
import it.cavallium.dbengine.client.IndicizerSimilarities;
import it.cavallium.dbengine.client.LuceneOptions;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.database.LLDatabaseConnection;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.lucene.LuceneHacks;
import it.cavallium.dbengine.netty.JMXNettyMonitoringManager;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.checkerframework.checker.units.qual.A;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@ -88,7 +86,8 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection {
}
@Override
public Mono<LLLuceneIndex> getLuceneIndex(String name,
public Mono<LLLuceneIndex> getLuceneIndex(@Nullable String clusterName,
@Nullable String shardName,
int instancesCount,
IndicizerAnalyzers indicizerAnalyzers,
IndicizerSimilarities indicizerSimilarities,
@ -97,12 +96,18 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection {
return Mono
.fromCallable(() -> {
var env = this.env.get();
if (clusterName == null && shardName == null) {
throw new IllegalArgumentException("Shard name and/or cluster name must be set");
}
if (instancesCount != 1) {
if (shardName != null && !shardName.equals(clusterName)) {
throw new IllegalArgumentException("You shouldn't use a shard name for clustered instances");
}
Objects.requireNonNull(env, "Environment not set");
return new LLLocalMultiLuceneIndex(env,
luceneOptions.inMemory() ? null : basePath.resolve("lucene"),
meterRegistry,
name,
clusterName,
instancesCount,
indicizerAnalyzers,
indicizerSimilarities,
@ -112,7 +117,8 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection {
} else {
return new LLLocalLuceneIndex(env, luceneOptions.inMemory() ? null : basePath.resolve("lucene"),
meterRegistry,
name,
clusterName,
shardName,
indicizerAnalyzers,
indicizerSimilarities,
luceneOptions,

View File

@ -8,6 +8,8 @@ import static it.cavallium.dbengine.database.LLUtils.toStringSafe;
import static java.util.Objects.requireNonNull;
import static java.util.Objects.requireNonNullElse;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.Resource;
@ -40,6 +42,7 @@ import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.IntStream;
import org.apache.commons.lang3.tuple.Pair;
@ -131,6 +134,22 @@ public class LLLocalDictionary implements LLDictionary {
private final boolean nettyDirect;
private final BufferAllocator alloc;
private final Counter startedUpdates;
private final Counter endedUpdates;
private final Timer updateTime;
private final Counter startedGet;
private final Counter endedGet;
private final Timer getTime;
private final Counter startedContains;
private final Counter endedContains;
private final Timer containsTime;
private final Counter startedPut;
private final Counter endedPut;
private final Timer putTime;
private final Counter startedRemove;
private final Counter endedRemove;
private final Timer removeTime;
public LLLocalDictionary(
BufferAllocator allocator,
@NotNull RocksDBColumn db,
@ -151,6 +170,48 @@ public class LLLocalDictionary implements LLDictionary {
this.databaseOptions = databaseOptions;
alloc = allocator;
this.nettyDirect = databaseOptions.allowNettyDirect() && alloc.getAllocationType() == OFF_HEAP;
var meterRegistry = db.getMeterRegistry();
this.startedGet = meterRegistry.counter("db.read.map.get.started.counter", "db.name", databaseName, "db.column", columnName);
this.endedGet = meterRegistry.counter("db.read.map.get.ended.counter", "db.name", databaseName, "db.column", columnName);
this.getTime = Timer
.builder("db.read.get.timer")
.publishPercentiles(0.2, 0.5, 0.95)
.publishPercentileHistogram()
.tags("db.name", databaseName, "db.column", columnName)
.register(meterRegistry);
this.startedContains = meterRegistry.counter("db.read.map.get.started.counter", "db.name", databaseName, "db.column", columnName);
this.endedContains = meterRegistry.counter("db.read.map.get.ended.counter", "db.name", databaseName, "db.column", columnName);
this.containsTime = Timer
.builder("db.read.get.timer")
.publishPercentiles(0.2, 0.5, 0.95)
.publishPercentileHistogram()
.tags("db.name", databaseName, "db.column", columnName)
.register(meterRegistry);
this.startedUpdates = meterRegistry.counter("db.write.map.update.started.counter", "db.name", databaseName, "db.column", columnName);
this.endedUpdates = meterRegistry.counter("db.write.map.update.ended.counter", "db.name", databaseName, "db.column", columnName);
this.updateTime = Timer
.builder("db.write.map.update.timer")
.publishPercentiles(0.2, 0.5, 0.95)
.publishPercentileHistogram()
.tags("db.name", databaseName, "db.column", columnName)
.register(meterRegistry);
this.startedPut = meterRegistry.counter("db.write.map.put.started.counter", "db.name", databaseName, "db.column", columnName);
this.endedPut = meterRegistry.counter("db.write.map.put.ended.counter", "db.name", databaseName, "db.column", columnName);
this.putTime = Timer
.builder("db.write.map.put.timer")
.publishPercentiles(0.2, 0.5, 0.95)
.publishPercentileHistogram()
.tags("db.name", databaseName, "db.column", columnName)
.register(meterRegistry);
this.startedRemove = meterRegistry.counter("db.write.map.remove.started.counter", "db.name", databaseName, "db.column", columnName);
this.endedRemove = meterRegistry.counter("db.write.map.remove.ended.counter", "db.name", databaseName, "db.column", columnName);
this.removeTime = Timer
.builder("db.write.map.remove.timer")
.publishPercentiles(0.2, 0.5, 0.95)
.publishPercentileHistogram()
.tags("db.name", databaseName, "db.column", columnName)
.register(meterRegistry);
}
@Override
@ -205,7 +266,13 @@ public class LLLocalDictionary implements LLDictionary {
try (var key = keySend.receive()) {
try {
var readOptions = requireNonNullElse(resolveSnapshot(snapshot), EMPTY_READ_OPTIONS);
var result = db.get(readOptions, key, existsAlmostCertainly);
Buffer result;
startedGet.increment();
try {
result = getTime.recordCallable(() -> db.get(readOptions, key, existsAlmostCertainly));
} finally {
endedGet.increment();
}
logger.trace(MARKER_ROCKSDB, "Read {}: {}", () -> toStringSafe(key), () -> toStringSafe(result));
if (result != null) {
sink.next(result.send());
@ -236,52 +303,65 @@ public class LLLocalDictionary implements LLDictionary {
public boolean containsRange(@Nullable LLSnapshot snapshot, LLRange range) throws RocksDBException {
assert !Schedulers.isInNonBlockingThread() : "Called containsRange in a nonblocking thread";
if (range.isSingle()) {
var unmodifiableReadOpts = resolveSnapshot(snapshot);
return db.exists(unmodifiableReadOpts, range.getSingleUnsafe());
} else {
// Temporary resources to release after finished
AbstractSlice<?> slice1 = null;
AbstractSlice<?> slice2 = null;
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
readOpts.setFillCache(false);
if (range.hasMin()) {
var rangeMinInternalByteBuffer = asReadOnlyDirect(range.getMinUnsafe());
if (nettyDirect && rangeMinInternalByteBuffer != null) {
readOpts.setIterateLowerBound(slice1 = new DirectSlice(rangeMinInternalByteBuffer,
range.getMinUnsafe().readableBytes()));
} else {
readOpts.setIterateLowerBound(slice1 = new Slice(LLUtils.toArray(range.getMinUnsafe())));
}
}
if (range.hasMax()) {
var rangeMaxInternalByteBuffer = asReadOnlyDirect(range.getMaxUnsafe());
if (nettyDirect && rangeMaxInternalByteBuffer != null) {
readOpts.setIterateUpperBound(slice2 = new DirectSlice(rangeMaxInternalByteBuffer,
range.getMaxUnsafe().readableBytes()));
} else {
readOpts.setIterateUpperBound(slice2 = new Slice(LLUtils.toArray(range.getMaxUnsafe())));
}
}
try (RocksIterator rocksIterator = db.newIterator(readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
var rangeMinInternalByteBuffer = asReadOnlyDirect(range.getMinUnsafe());
if (nettyDirect && rangeMinInternalByteBuffer != null) {
rocksIterator.seek(rangeMinInternalByteBuffer);
} else {
rocksIterator.seek(LLUtils.toArray(range.getMinUnsafe()));
startedContains.increment();
try {
var result = containsTime.recordCallable(() -> {
if (range.isSingle()) {
var unmodifiableReadOpts = resolveSnapshot(snapshot);
return db.exists(unmodifiableReadOpts, range.getSingleUnsafe());
} else {
// Temporary resources to release after finished
AbstractSlice<?> slice1 = null;
AbstractSlice<?> slice2 = null;
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
readOpts.setFillCache(false);
if (range.hasMin()) {
var rangeMinInternalByteBuffer = asReadOnlyDirect(range.getMinUnsafe());
if (nettyDirect && rangeMinInternalByteBuffer != null) {
readOpts.setIterateLowerBound(slice1 = new DirectSlice(rangeMinInternalByteBuffer,
range.getMinUnsafe().readableBytes()));
} else {
readOpts.setIterateLowerBound(slice1 = new Slice(LLUtils.toArray(range.getMinUnsafe())));
}
}
} else {
rocksIterator.seekToFirst();
if (range.hasMax()) {
var rangeMaxInternalByteBuffer = asReadOnlyDirect(range.getMaxUnsafe());
if (nettyDirect && rangeMaxInternalByteBuffer != null) {
readOpts.setIterateUpperBound(slice2 = new DirectSlice(rangeMaxInternalByteBuffer,
range.getMaxUnsafe().readableBytes()));
} else {
readOpts.setIterateUpperBound(slice2 = new Slice(LLUtils.toArray(range.getMaxUnsafe())));
}
}
try (RocksIterator rocksIterator = db.newIterator(readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
var rangeMinInternalByteBuffer = asReadOnlyDirect(range.getMinUnsafe());
if (nettyDirect && rangeMinInternalByteBuffer != null) {
rocksIterator.seek(rangeMinInternalByteBuffer);
} else {
rocksIterator.seek(LLUtils.toArray(range.getMinUnsafe()));
}
} else {
rocksIterator.seekToFirst();
}
rocksIterator.status();
return rocksIterator.isValid();
}
} finally {
if (slice1 != null) slice1.close();
if (slice2 != null) slice2.close();
}
rocksIterator.status();
return rocksIterator.isValid();
}
} finally {
if (slice1 != null) slice1.close();
if (slice2 != null) slice2.close();
}
});
assert result != null;
return result;
} catch (RocksDBException | RuntimeException e) {
throw e;
} catch (Exception ex) {
throw new RuntimeException(ex);
} finally {
endedContains.increment();
}
}
@ -298,8 +378,21 @@ public class LLLocalDictionary implements LLDictionary {
}
private boolean containsKey(@Nullable LLSnapshot snapshot, Buffer key) throws RocksDBException {
var unmodifiableReadOpts = resolveSnapshot(snapshot);
return db.exists(unmodifiableReadOpts, key);
startedContains.increment();
try {
var result = containsTime.recordCallable(() -> {
var unmodifiableReadOpts = resolveSnapshot(snapshot);
return db.exists(unmodifiableReadOpts, key);
});
assert result != null;
return result;
} catch (RocksDBException | RuntimeException e) {
throw e;
} catch (Exception ex) {
throw new RuntimeException(ex);
} finally {
endedContains.increment();
}
}
@Override
@ -340,7 +433,14 @@ public class LLLocalDictionary implements LLDictionary {
safeCloseable.close();
}
})
.onErrorMap(cause -> new IOException("Failed to write", cause));
.onErrorMap(cause -> new IOException("Failed to write", cause))
.elapsed()
.map(tuple -> {
putTime.record(tuple.getT1(), TimeUnit.MILLISECONDS);
return tuple.getT2();
})
.doFirst(startedPut::increment)
.doFinally(s -> endedPut.increment());
}
@Override
@ -366,8 +466,15 @@ public class LLLocalDictionary implements LLDictionary {
case GET_NEW_VALUE -> UpdateAtomicResultMode.CURRENT;
case GET_OLD_VALUE -> UpdateAtomicResultMode.PREVIOUS;
};
UpdateAtomicResult result = db.updateAtomic(EMPTY_READ_OPTIONS, EMPTY_WRITE_OPTIONS, keySend, updater,
existsAlmostCertainly, returnMode);
UpdateAtomicResult result;
startedUpdates.increment();
try {
result = updateTime.recordCallable(() -> db.updateAtomic(EMPTY_READ_OPTIONS,
EMPTY_WRITE_OPTIONS, keySend, updater, existsAlmostCertainly, returnMode));
} finally {
endedUpdates.increment();
}
assert result != null;
return switch (updateReturnMode) {
case NOTHING -> null;
case GET_NEW_VALUE -> ((UpdateAtomicResultCurrent) result).current();
@ -395,8 +502,15 @@ public class LLLocalDictionary implements LLDictionary {
throw new UnsupportedOperationException("update() is disallowed because the database doesn't support"
+ "safe atomic operations");
}
UpdateAtomicResult result = db.updateAtomic(EMPTY_READ_OPTIONS, EMPTY_WRITE_OPTIONS, keySend, updater,
existsAlmostCertainly, UpdateAtomicResultMode.DELTA);
UpdateAtomicResult result;
startedUpdates.increment();
try {
result = updateTime.recordCallable(() -> db.updateAtomic(EMPTY_READ_OPTIONS,
EMPTY_WRITE_OPTIONS, keySend, updater, existsAlmostCertainly, UpdateAtomicResultMode.DELTA));
} finally {
endedUpdates.increment();
}
assert result != null;
return ((UpdateAtomicResultDelta) result).delta();
}).onErrorMap(cause -> new IOException("Failed to read or write", cause)),
keySend -> Mono.fromRunnable(keySend::close)).doOnDiscard(UpdateAtomicResult.class, uar -> {
@ -431,7 +545,10 @@ public class LLLocalDictionary implements LLDictionary {
)
.singleOrEmpty(),
keySend -> Mono.fromRunnable(keySend::close)
);
).elapsed().map(tuple -> {
removeTime.record(tuple.getT1(), TimeUnit.MILLISECONDS);
return tuple.getT2();
}).doFirst(startedRemove::increment).doFinally(s -> endedRemove.increment());
}
private Mono<Send<Buffer>> getPreviousData(Mono<Send<Buffer>> keyMono, LLDictionaryResultType resultType,

View File

@ -6,10 +6,11 @@ import static it.cavallium.dbengine.database.LLUtils.toDocument;
import static it.cavallium.dbengine.database.LLUtils.toFields;
import static it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.NO_TRANSFORMATION;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send;
import io.net5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.client.DirectIOOptions;
import it.cavallium.dbengine.client.IndicizerAnalyzers;
import it.cavallium.dbengine.client.IndicizerSimilarities;
@ -19,36 +20,33 @@ import it.cavallium.dbengine.client.query.QueryParser;
import it.cavallium.dbengine.client.query.current.data.Query;
import it.cavallium.dbengine.client.query.current.data.QueryParams;
import it.cavallium.dbengine.database.LLIndexRequest;
import it.cavallium.dbengine.database.LLSoftUpdateDocument;
import it.cavallium.dbengine.database.LLUpdateDocument;
import it.cavallium.dbengine.database.LLItem;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLSearchResultShard;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLSoftUpdateDocument;
import it.cavallium.dbengine.database.LLTerm;
import it.cavallium.dbengine.database.LLUpdateDocument;
import it.cavallium.dbengine.database.LLUpdateFields;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.lucene.LuceneHacks;
import it.cavallium.dbengine.lucene.AlwaysDirectIOFSDirectory;
import it.cavallium.dbengine.lucene.LuceneHacks;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.collector.Buckets;
import it.cavallium.dbengine.lucene.searcher.AdaptiveLocalSearcher;
import it.cavallium.dbengine.lucene.searcher.BucketParams;
import it.cavallium.dbengine.lucene.searcher.DecimalBucketMultiSearcher;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer;
import it.cavallium.dbengine.lucene.searcher.LocalQueryParams;
import it.cavallium.dbengine.lucene.searcher.LocalSearcher;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer;
import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -60,11 +58,9 @@ import org.apache.lucene.index.ConcurrentMergeScheduler;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.MergeScheduler;
import org.apache.lucene.index.SerialMergeScheduler;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.misc.store.DirectIODirectory;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.store.ByteBuffersDirectory;
@ -76,8 +72,6 @@ import org.apache.lucene.store.NRTCachingDirectory;
import org.apache.lucene.util.Constants;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.functional.IORunnable;
import org.warp.commonutils.type.ShortNamedThreadFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
@ -97,8 +91,15 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private static final ReentrantLock shutdownLock = new ReentrantLock();
private static final Scheduler luceneHeavyTasksScheduler = uninterruptibleScheduler(Schedulers.single(Schedulers.boundedElastic()));
private final Counter startedDocIndexings;
private final Counter endeddDocIndexings;
private final Timer docIndexingTime;
private final Timer snapshotTime;
private final Timer flushTime;
private final Timer commitTime;
private final Timer mergeTime;
private final Timer refreshTime;
private final MeterRegistry meterRegistry;
private final String luceneIndexName;
private final IndexWriter indexWriter;
private final SnapshotsManager snapshotsManager;
@ -114,21 +115,27 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
public LLLocalLuceneIndex(LLTempLMDBEnv env,
@Nullable Path luceneBasePath,
MeterRegistry meterRegistry,
String name,
@Nullable String clusterName,
@Nullable String shardName,
IndicizerAnalyzers indicizerAnalyzers,
IndicizerSimilarities indicizerSimilarities,
LuceneOptions luceneOptions,
@Nullable LuceneHacks luceneHacks) throws IOException {
this.meterRegistry = meterRegistry;
if (clusterName == null && shardName == null) {
throw new IllegalArgumentException("Clustern name and/or shard name must be set");
}
String logName = Objects.requireNonNullElse(clusterName, shardName);
String luceneIndexName = Objects.requireNonNullElse(shardName, clusterName);
Path directoryPath;
if (luceneOptions.inMemory() != (luceneBasePath == null)) {
throw new IllegalArgumentException();
} else if (luceneBasePath != null) {
directoryPath = luceneBasePath.resolve(name + ".lucene.db");
directoryPath = luceneBasePath.resolve(shardName + ".lucene.db");
} else {
directoryPath = null;
}
if (name.length() == 0) {
if (luceneIndexName.length() == 0) {
throw new IOException("Empty lucene database name");
}
if (!MMapDirectory.UNMAP_SUPPORTED) {
@ -194,7 +201,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
this.directory = directory;
}
this.luceneIndexName = name;
this.luceneIndexName = luceneIndexName;
var snapshotter = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
this.lowMemory = lowMemory;
this.luceneAnalyzer = LuceneUtils.toPerFieldAnalyzerWrapper(indicizerAnalyzers);
@ -251,6 +258,15 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
luceneOptions.queryRefreshDebounceTime()
);
this.startedDocIndexings = meterRegistry.counter("index.write.doc.started.counter", "index.name", logName);
this.endeddDocIndexings = meterRegistry.counter("index.write.doc.ended.counter", "index.name", logName);
this.docIndexingTime = Timer.builder("index.write.doc.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry);
this.snapshotTime = Timer.builder("index.write.snapshot.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry);
this.flushTime = Timer.builder("index.write.flush.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry);
this.commitTime = Timer.builder("index.write.commit.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry);
this.mergeTime = Timer.builder("index.write.merge.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry);
this.refreshTime = Timer.builder("index.search.refresh.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", logName).register(meterRegistry);
// Start scheduled tasks
var commitMillis = luceneOptions.commitDebounceTime().toMillis();
luceneHeavyTasksScheduler.schedulePeriodically(this::scheduledCommit, commitMillis, commitMillis,
@ -271,7 +287,10 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
@Override
public Mono<LLSnapshot> takeSnapshot() {
return snapshotsManager.takeSnapshot().transform(this::ensureOpen);
return snapshotsManager.takeSnapshot().elapsed().map(elapsed -> {
snapshotTime.record(elapsed.getT1(), TimeUnit.MILLISECONDS);
return elapsed.getT2();
}).transform(this::ensureOpen);
}
private <V> Mono<V> ensureOpen(Mono<V> mono) {
@ -293,49 +312,75 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
@Override
public Mono<Void> releaseSnapshot(LLSnapshot snapshot) {
return snapshotsManager.releaseSnapshot(snapshot);
return snapshotsManager
.releaseSnapshot(snapshot)
.elapsed()
.doOnNext(elapsed -> snapshotTime.record(elapsed.getT1(), TimeUnit.MILLISECONDS))
.then();
}
@Override
public Mono<Void> addDocument(LLTerm key, LLUpdateDocument doc) {
return this.<Void>runSafe(() -> {
indexWriter.addDocument(toDocument(doc));
return this.<Void>runSafe(() -> docIndexingTime.recordCallable(() -> {
startedDocIndexings.increment();
try {
indexWriter.addDocument(toDocument(doc));
} finally {
endeddDocIndexings.increment();
}
return null;
}).transform(this::ensureOpen);
})).transform(this::ensureOpen);
}
@Override
public Mono<Void> addDocuments(Flux<Entry<LLTerm, LLUpdateDocument>> documents) {
return documents.collectList().flatMap(documentsList -> this.<Void>runSafe(() -> {
indexWriter.addDocuments(LLUtils.toDocumentsFromEntries(documentsList));
return documents.collectList().flatMap(documentsList -> this.<Void>runSafe(() -> docIndexingTime.recordCallable(() -> {
double count = documentsList.size();
startedDocIndexings.increment(count);
try {
indexWriter.addDocuments(LLUtils.toDocumentsFromEntries(documentsList));
} finally {
endeddDocIndexings.increment(count);
}
return null;
})).transform(this::ensureOpen);
}))).transform(this::ensureOpen);
}
@Override
public Mono<Void> deleteDocument(LLTerm id) {
return this.<Void>runSafe(() -> {
indexWriter.deleteDocuments(LLUtils.toTerm(id));
return this.<Void>runSafe(() -> docIndexingTime.recordCallable(() -> {
startedDocIndexings.increment();
try {
indexWriter.deleteDocuments(LLUtils.toTerm(id));
} finally {
endeddDocIndexings.increment();
}
return null;
}).transform(this::ensureOpen);
})).transform(this::ensureOpen);
}
@Override
public Mono<Void> update(LLTerm id, LLIndexRequest request) {
return this
.<Void>runSafe(() -> {
switch (request) {
case LLUpdateDocument updateDocument ->
indexWriter.updateDocument(LLUtils.toTerm(id), toDocument(updateDocument));
case LLSoftUpdateDocument softUpdateDocument -> indexWriter.softUpdateDocument(LLUtils.toTerm(id),
toDocument(softUpdateDocument.items()), toFields(softUpdateDocument.softDeleteItems()));
case LLUpdateFields updateFields ->
indexWriter.updateDocValues(LLUtils.toTerm(id), toFields(updateFields.items()));
case null, default -> throw new UnsupportedOperationException("Unexpected request type: " + request);
.<Void>runSafe(() -> docIndexingTime.recordCallable(() -> {
startedDocIndexings.increment();
try {
switch (request) {
case LLUpdateDocument updateDocument ->
indexWriter.updateDocument(LLUtils.toTerm(id), toDocument(updateDocument));
case LLSoftUpdateDocument softUpdateDocument ->
indexWriter.softUpdateDocument(LLUtils.toTerm(id), toDocument(softUpdateDocument.items()),
toFields(softUpdateDocument.softDeleteItems()));
case LLUpdateFields updateFields -> indexWriter.updateDocValues(LLUtils.toTerm(id),
toFields(updateFields.items()));
case null, default -> throw new UnsupportedOperationException("Unexpected request type: " + request);
}
} finally {
endeddDocIndexings.increment();
}
return null;
})
}))
.transform(this::ensureOpen);
}
@ -349,7 +394,15 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
for (Entry<LLTerm, LLUpdateDocument> entry : documentsMap.entrySet()) {
LLTerm key = entry.getKey();
LLUpdateDocument value = entry.getValue();
indexWriter.updateDocument(LLUtils.toTerm(key), toDocument(value));
startedDocIndexings.increment();
try {
docIndexingTime.recordCallable(() -> {
indexWriter.updateDocument(LLUtils.toTerm(key), toDocument(value));
return null;
});
} finally {
endeddDocIndexings.increment();
}
}
return null;
}).transform(this::ensureOpen);
@ -469,7 +522,10 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
if (activeTasks.isTerminated()) return null;
shutdownLock.lock();
try {
indexWriter.flush();
flushTime.recordCallable(() -> {
indexWriter.flush();
return null;
});
} finally {
shutdownLock.unlock();
}
@ -488,11 +544,14 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
if (activeTasks.isTerminated()) return null;
shutdownLock.lock();
try {
if (force) {
searcherManager.maybeRefreshBlocking();
} else {
searcherManager.maybeRefresh();
}
refreshTime.recordCallable(() -> {
if (force) {
searcherManager.maybeRefreshBlocking();
} else {
searcherManager.maybeRefresh();
}
return null;
});
} finally {
shutdownLock.unlock();
}
@ -507,8 +566,11 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private void scheduledCommit() {
shutdownLock.lock();
try {
indexWriter.commit();
} catch (IOException ex) {
commitTime.recordCallable(() -> {
indexWriter.commit();
return null;
});
} catch (Exception ex) {
logger.error(MARKER_LUCENE, "Failed to execute a scheduled commit", ex);
} finally {
shutdownLock.unlock();
@ -518,8 +580,11 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private void scheduledMerge() {
shutdownLock.lock();
try {
indexWriter.maybeMerge();
} catch (IOException ex) {
mergeTime.recordCallable(() -> {
indexWriter.maybeMerge();
return null;
});
} catch (Exception ex) {
logger.error(MARKER_LUCENE, "Failed to execute a scheduled merge", ex);
} finally {
shutdownLock.unlock();

View File

@ -65,7 +65,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
public LLLocalMultiLuceneIndex(LLTempLMDBEnv env,
Path lucene,
MeterRegistry meterRegistry,
String name,
String clusterName,
int instancesCount,
IndicizerAnalyzers indicizerAnalyzers,
IndicizerSimilarities indicizerSimilarities,
@ -79,16 +79,17 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
this.meterRegistry = meterRegistry;
LLLocalLuceneIndex[] luceneIndices = new LLLocalLuceneIndex[instancesCount];
for (int i = 0; i < instancesCount; i++) {
String instanceName;
String shardName;
if (i == 0) {
instanceName = name;
shardName = clusterName;
} else {
instanceName = name + "_" + String.format("%03d", i);
shardName = clusterName + "_" + String.format("%03d", i);
}
luceneIndices[i] = new LLLocalLuceneIndex(env,
lucene,
meterRegistry,
instanceName,
clusterName,
shardName,
indicizerAnalyzers,
indicizerSimilarities,
luceneOptions,

View File

@ -79,7 +79,8 @@ public class LLMemoryDatabaseConnection implements LLDatabaseConnection {
}
@Override
public Mono<LLLuceneIndex> getLuceneIndex(String name,
public Mono<LLLuceneIndex> getLuceneIndex(@Nullable String clusterName,
@Nullable String shardName,
int instancesCount,
IndicizerAnalyzers indicizerAnalyzers,
IndicizerSimilarities indicizerSimilarities,
@ -91,7 +92,8 @@ public class LLMemoryDatabaseConnection implements LLDatabaseConnection {
return new LLLocalLuceneIndex(env,
null,
meterRegistry,
name,
clusterName,
shardName,
indicizerAnalyzers,
indicizerSimilarities,
luceneOptions,

View File

@ -80,7 +80,8 @@ public class LocalTemporaryDbGenerator implements TemporaryDbGenerator {
new DatabaseOptions(List.of(), Map.of(), true, false, true, false,
true, canUseNettyDirect, false, -1, null)
),
conn.getLuceneIndex("testluceneindex1",
conn.getLuceneIndex(null,
"testluceneindex1",
1,
IndicizerAnalyzers.of(TextFieldsAnalyzer.WordSimple),
IndicizerSimilarities.of(TextFieldsSimilarity.Boolean),
@ -88,6 +89,7 @@ public class LocalTemporaryDbGenerator implements TemporaryDbGenerator {
luceneHacks
),
conn.getLuceneIndex("testluceneindex16",
null,
3,
IndicizerAnalyzers.of(TextFieldsAnalyzer.WordSimple),
IndicizerSimilarities.of(TextFieldsSimilarity.Boolean),

View File

@ -11,8 +11,8 @@ import it.cavallium.dbengine.client.IndicizerSimilarities;
import it.cavallium.dbengine.client.LuceneOptions;
import it.cavallium.dbengine.client.NRTCachingOptions;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.lucene.LuceneHacks;
import it.cavallium.dbengine.database.memory.LLMemoryDatabaseConnection;
import it.cavallium.dbengine.lucene.LuceneHacks;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity;
import java.time.Duration;
@ -51,7 +51,8 @@ public class MemoryTemporaryDbGenerator implements TemporaryDbGenerator {
List.of(Column.dictionary("testmap"), Column.special("ints"), Column.special("longs")),
new DatabaseOptions(List.of(), Map.of(), true, false, true, false, true, canUseNettyDirect, true, -1, null)
),
conn.getLuceneIndex("testluceneindex1",
conn.getLuceneIndex(null,
"testluceneindex1",
1,
IndicizerAnalyzers.of(TextFieldsAnalyzer.WordSimple),
IndicizerSimilarities.of(TextFieldsSimilarity.Boolean),
@ -59,6 +60,7 @@ public class MemoryTemporaryDbGenerator implements TemporaryDbGenerator {
luceneHacks
),
conn.getLuceneIndex("testluceneindex16",
null,
3,
IndicizerAnalyzers.of(TextFieldsAnalyzer.WordSimple),
IndicizerSimilarities.of(TextFieldsSimilarity.Boolean),