diff --git a/pom.xml b/pom.xml index 58a737c..a84cb16 100644 --- a/pom.xml +++ b/pom.xml @@ -250,7 +250,6 @@ io.micrometer micrometer-core - true io.micrometer diff --git a/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java b/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java index 5b7b23c..4e8160a 100644 --- a/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java +++ b/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine.client; +import io.micrometer.core.instrument.MeterRegistry; import io.net5.buffer.api.BufferAllocator; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -20,6 +21,8 @@ public interface CompositeDatabase { BufferAllocator getAllocator(); + MeterRegistry getMeterRegistry(); + /** * Find corrupted items */ diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java index 34b65e2..6d15cf4 100644 --- a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java +++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine.client; +import com.google.common.util.concurrent.Uninterruptibles; import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.IndexAction.Add; import it.cavallium.dbengine.client.IndexAction.AddMulti; @@ -24,11 +25,15 @@ import java.lang.ref.Cleaner; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.locks.LockSupport; +import java.util.logging.Level; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.LoggerFactory; +import org.warp.commonutils.type.ShortNamedThreadFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; @@ -42,108 +47,12 @@ import reactor.util.function.Tuple2; public class LuceneIndexImpl implements LuceneIndex { private static final Logger log = LoggerFactory.getLogger(LuceneIndex.class); - private static final Cleaner cleaner = Cleaner.create(); private final LLLuceneIndex luceneIndex; private final Indicizer indicizer; - private final Many actions; - private final Empty actionsClosed; public LuceneIndexImpl(LLLuceneIndex luceneIndex, Indicizer indicizer) { this.luceneIndex = luceneIndex; this.indicizer = indicizer; - this.actions = Sinks - .many() - .unicast() - .onBackpressureBuffer(Queues.get(1024).get()); - this.actionsClosed = Sinks.empty(); - - subscribeToActions(); - } - - private void subscribeToActions() { - var d = actions - .asFlux() - .doAfterTerminate(actionsClosed::tryEmitEmpty) - .flatMap(this::onParallelAction) - .concatMap(this::onOrderedAction) - .then() - .subscribeOn(Schedulers.boundedElastic()) - .subscribe(); - - cleaner.register(LuceneIndexImpl.this, d::dispose); - } - - /** - * Actions that don't require any kind of order - */ - private Mono onParallelAction(IndexAction action) { - return (switch (action.getType()) { - case TAKE_SNAPSHOT, RELEASE_SNAPSHOT, FLUSH, CLOSE -> Mono.empty(); - - case ADD -> luceneIndex.addDocument(((Add) action).key(), ((Add) action).doc()) - .doOnError(e -> ((Add) action).addedFuture().error(e)) - .onErrorResume(ex -> Mono.empty()) - .doOnSuccess(s -> ((Add) action).addedFuture().success()); - case ADD_MULTI -> luceneIndex.addDocuments(((AddMulti) action).docsFlux()) - .doOnError(e -> ((AddMulti) action).addedMultiFuture().error(e)) - .onErrorResume(ex -> Mono.empty()) - .doOnSuccess(s -> ((AddMulti) action).addedMultiFuture().success()); - case UPDATE -> luceneIndex - .updateDocument(((Update) action).key(),((Update) action).doc()) - .doOnError(e -> ((Update) action).updatedFuture().error(e)) - .onErrorResume(ex -> Mono.empty()) - .doOnSuccess(s -> ((Update) action).updatedFuture().success()); - case UPDATE_MULTI -> luceneIndex.updateDocuments(Mono.just(((UpdateMulti) action).docs())) - .doOnError(e -> ((UpdateMulti) action).updatedMultiFuture().error(e)) - .onErrorResume(ex -> Mono.empty()) - .doOnSuccess(s -> ((UpdateMulti) action).updatedMultiFuture().success()); - case DELETE -> luceneIndex.deleteDocument(((Delete) action).key()) - .doOnError(e -> ((Delete) action).deletedFuture().error(e)) - .onErrorResume(ex -> Mono.empty()) - .doOnSuccess(s -> ((Delete) action).deletedFuture().success()); - case DELETE_ALL -> luceneIndex.deleteAll() - .doOnError(e -> ((DeleteAll) action).deletedAllFuture().error(e)) - .onErrorResume(ex -> Mono.empty()) - .doOnSuccess(s -> ((DeleteAll) action).deletedAllFuture().success()); - case REFRESH -> luceneIndex.refresh(((Refresh) action).force()) - .doOnError(e -> ((Refresh) action).refreshFuture().error(e)) - .onErrorResume(ex -> Mono.empty()) - .doOnSuccess(s -> ((Refresh) action).refreshFuture().success()); - }) - .doOnError(ex -> log.error("Uncaught error when handling parallel index action " + action.getType(), ex)) - .onErrorResume(ex -> Mono.empty()) - .thenReturn(action); - } - - /** - * Actions that require absolute order - */ - private Mono onOrderedAction(IndexAction action) { - return (switch (action.getType()) { - case ADD, REFRESH, DELETE_ALL, DELETE, UPDATE_MULTI, UPDATE, ADD_MULTI -> Mono.empty(); - - case TAKE_SNAPSHOT -> luceneIndex.takeSnapshot().single() - .doOnError(e -> ((TakeSnapshot) action).snapshotFuture().error(e)) - .onErrorResume(ex -> Mono.empty()) - .doOnNext(s -> ((TakeSnapshot) action).snapshotFuture().success(s)); - case RELEASE_SNAPSHOT -> luceneIndex.releaseSnapshot(((ReleaseSnapshot) action).snapshot()) - .doOnError(e -> ((ReleaseSnapshot) action).releasedFuture().error(e)) - .onErrorResume(ex -> Mono.empty()) - .doOnSuccess(s -> ((ReleaseSnapshot) action).releasedFuture().success()); - case FLUSH -> luceneIndex.flush() - .doOnError(e -> ((Flush) action).flushFuture().error(e)) - .onErrorResume(ex -> Mono.empty()) - .doOnSuccess(s -> ((Flush) action).flushFuture().success()); - case CLOSE -> luceneIndex.close() - .doOnError(e -> ((Close) action).closeFuture().error(e)) - .onErrorResume(ex -> Mono.empty()) - .doOnSuccess(s -> ((Close) action).closeFuture().success()) - .doAfterTerminate(() -> emitActionOptimistically(null)); - }) - .doOnError(ex -> log.error("Uncaught error when handling ordered index action " + action.getType(), ex)) - .onErrorResume(ex -> Mono.empty()) - .onErrorResume(ex -> Mono.empty()) - .thenReturn(action); } private LLSnapshot resolveSnapshot(CompositeSnapshot snapshot) { @@ -158,45 +67,46 @@ public class LuceneIndexImpl implements LuceneIndex { public Mono addDocument(T key, U value) { return indicizer .toDocument(key, value) - .flatMap(doc -> Mono - .create(sink -> emitActionOptimistically(new IndexAction.Add(indicizer.toIndex(key), doc, sink)))); + .flatMap(doc -> luceneIndex.addDocument(indicizer.toIndex(key), doc)); } @Override public Mono addDocuments(Flux> entries) { - var convertedEntries = entries.flatMap(entry -> indicizer - .toDocument(entry.getKey(), entry.getValue()) - .map(doc -> Map.entry(indicizer.toIndex(entry.getKey()), doc)) - ); - return Mono.create(sink -> emitActionOptimistically(new IndexAction.AddMulti(convertedEntries, sink))); + return luceneIndex + .addDocuments(entries + .flatMap(entry -> indicizer + .toDocument(entry.getKey(), entry.getValue()) + .map(doc -> Map.entry(indicizer.toIndex(entry.getKey()), doc))) + ); } @Override public Mono deleteDocument(T key) { LLTerm id = indicizer.toIndex(key); - return Mono.create(sink -> emitActionOptimistically(new IndexAction.Delete(id, sink))); + return luceneIndex.deleteDocument(id); } @Override public Mono updateDocument(T key, @NotNull U value) { return indicizer .toDocument(key, value) - .flatMap(doc -> Mono.create(sink -> emitActionOptimistically(new Update(indicizer.toIndex(key), doc, sink)))); + .flatMap(doc -> luceneIndex.updateDocument(indicizer.toIndex(key), doc)); } @Override public Mono updateDocuments(Flux> entries) { - return entries - .flatMap(entry -> indicizer - .toDocument(entry.getKey(), entry.getValue()) - .map(doc -> Map.entry(indicizer.toIndex(entry.getKey()), doc))) - .collectMap(Entry::getKey, Entry::getValue) - .flatMap(docs -> Mono.create(sink -> emitActionOptimistically(new IndexAction.UpdateMulti(docs, sink)))); + return luceneIndex + .updateDocuments(entries + .flatMap(entry -> indicizer + .toDocument(entry.getKey(), entry.getValue()) + .map(doc -> Map.entry(indicizer.toIndex(entry.getKey()), doc))) + .collectMap(Entry::getKey, Entry::getValue) + ); } @Override public Mono deleteAll() { - return Mono.create(sink -> emitActionOptimistically(new IndexAction.DeleteAll(sink))); + return luceneIndex.deleteAll(); } @Override @@ -255,19 +165,7 @@ public class LuceneIndexImpl implements LuceneIndex { @Override public Mono close() { - return Mono - .create(sink -> emitActionOptimistically(new Close(sink))) - .then(this.actionsClosed.asMono()); - } - - private void emitActionOptimistically(IndexAction action) { - EmitResult emitResult; - while ((emitResult = (action == null ? actions.tryEmitComplete() : actions.tryEmitNext(action))) - == EmitResult.FAIL_NON_SERIALIZED || emitResult == EmitResult.FAIL_OVERFLOW) { - // 10ms - LockSupport.parkNanos(10000000); - } - emitResult.orThrow(); + return luceneIndex.close(); } /** @@ -275,7 +173,7 @@ public class LuceneIndexImpl implements LuceneIndex { */ @Override public Mono flush() { - return Mono.create(sink -> emitActionOptimistically(new IndexAction.Flush(sink))); + return luceneIndex.flush(); } /** @@ -283,16 +181,16 @@ public class LuceneIndexImpl implements LuceneIndex { */ @Override public Mono refresh(boolean force) { - return Mono.create(sink -> emitActionOptimistically(new IndexAction.Refresh(force, sink))); + return luceneIndex.refresh(force); } @Override public Mono takeSnapshot() { - return Mono.create(sink -> emitActionOptimistically(new IndexAction.TakeSnapshot(sink))); + return luceneIndex.takeSnapshot(); } @Override public Mono releaseSnapshot(LLSnapshot snapshot) { - return Mono.create(sink -> emitActionOptimistically(new IndexAction.ReleaseSnapshot(snapshot, sink))); + return luceneIndex.releaseSnapshot(snapshot); } } diff --git a/src/main/java/it/cavallium/dbengine/database/LLDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/LLDatabaseConnection.java index c519730..f33407e 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLDatabaseConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/LLDatabaseConnection.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine.database; +import io.micrometer.core.instrument.MeterRegistry; import io.net5.buffer.api.BufferAllocator; import it.cavallium.dbengine.client.DatabaseOptions; import it.cavallium.dbengine.client.IndicizerAnalyzers; @@ -15,6 +16,8 @@ public interface LLDatabaseConnection { BufferAllocator getAllocator(); + MeterRegistry getMeterRegistry(); + Mono connect(); Mono getDatabase(String name, diff --git a/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java index a62d0cd..cad9d34 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java @@ -2,6 +2,7 @@ package it.cavallium.dbengine.database; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; +import io.micrometer.core.instrument.MeterRegistry; import io.net5.buffer.api.BufferAllocator; import it.cavallium.dbengine.database.collections.DatabaseInt; import it.cavallium.dbengine.database.collections.DatabaseLong; @@ -47,5 +48,7 @@ public interface LLKeyValueDatabase extends LLSnapshottable, LLKeyValueDatabaseS BufferAllocator getAllocator(); + MeterRegistry getMeterRegistry(); + Mono close(); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java index 866d1e2..6d186a0 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java @@ -1,27 +1,20 @@ package it.cavallium.dbengine.database.disk; -import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB; import static java.util.Objects.requireNonNull; -import static java.util.Objects.requireNonNullElse; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; import io.net5.buffer.api.Buffer; import io.net5.buffer.api.BufferAllocator; -import io.net5.buffer.api.MemoryManager; import io.net5.buffer.api.Send; import io.net5.util.internal.PlatformDependent; import it.cavallium.dbengine.client.DatabaseOptions; -import it.cavallium.dbengine.database.LLDelta; -import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils.DirectBuffer; import it.cavallium.dbengine.database.RepeatedElementList; -import it.cavallium.dbengine.database.serialization.SerializationFunction; -import it.cavallium.dbengine.lucene.ExponentialPageLimits; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.locks.LockSupport; +import java.util.concurrent.atomic.AtomicInteger; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.rocksdb.ColumnFamilyHandle; @@ -37,7 +30,6 @@ import org.rocksdb.WriteBatch; import org.rocksdb.WriteOptions; import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.LoggerFactory; -import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; public sealed abstract class AbstractRocksDBColumn implements RocksDBColumn @@ -54,11 +46,24 @@ public sealed abstract class AbstractRocksDBColumn implements private final BufferAllocator alloc; private final ColumnFamilyHandle cfh; - public AbstractRocksDBColumn(T db, DatabaseOptions databaseOptions, BufferAllocator alloc, ColumnFamilyHandle cfh) { + private final MeterRegistry meterRegistry; + private final AtomicInteger lastDataSizeMetric = new AtomicInteger(0); + + public AbstractRocksDBColumn(T db, + DatabaseOptions databaseOptions, + BufferAllocator alloc, + ColumnFamilyHandle cfh, + MeterRegistry meterRegistry) { this.db = db; this.opts = databaseOptions; this.alloc = alloc; this.cfh = cfh; + + this.meterRegistry = meterRegistry; + Gauge + .builder("it.cavallium.dbengine.database.disk.column.lastdatasize", lastDataSizeMetric::get) + .description("Last data size read using get()") + .register(meterRegistry); } protected T getDb() { @@ -81,6 +86,15 @@ public sealed abstract class AbstractRocksDBColumn implements if (Schedulers.isInNonBlockingThread()) { throw new UnsupportedOperationException("Called dbGet in a nonblocking thread"); } + if (!db.isOwningHandle()) { + throw new IllegalStateException("Database is closed"); + } + if (!readOptions.isOwningHandle()) { + throw new IllegalStateException("ReadOptions is closed"); + } + if (!cfh.isOwningHandle()) { + throw new IllegalStateException("Column family is closed"); + } if (opts.allowNettyDirect()) { //todo: implement keyMayExist if existsAlmostCertainly is false. @@ -91,7 +105,6 @@ public sealed abstract class AbstractRocksDBColumn implements // Create a direct result buffer because RocksDB works only with direct buffers try (Buffer resultBuf = alloc.allocate(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES)) { int valueSize; - int assertionReadData = -1; ByteBuffer resultNioBuf; do { // Create the result nio buffer to pass to RocksDB @@ -115,6 +128,9 @@ public sealed abstract class AbstractRocksDBColumn implements // of the result into the result buffer more than once. assert resultNioBuf.limit() <= valueSize; + // Update data size metrics + this.lastDataSizeMetric.set(valueSize); + if (valueSize <= resultNioBuf.limit()) { // Return the result ready to be read return resultBuf.readerOffset(0).writerOffset(valueSize).send(); @@ -175,6 +191,15 @@ public sealed abstract class AbstractRocksDBColumn implements if (Schedulers.isInNonBlockingThread()) { throw new UnsupportedOperationException("Called dbPut in a nonblocking thread"); } + if (!db.isOwningHandle()) { + throw new IllegalStateException("Database is closed"); + } + if (!writeOptions.isOwningHandle()) { + throw new IllegalStateException("WriteOptions is closed"); + } + if (!cfh.isOwningHandle()) { + throw new IllegalStateException("Column family is closed"); + } assert key.isAccessible(); assert value.isAccessible(); if (opts.allowNettyDirect()) { @@ -197,7 +222,7 @@ public sealed abstract class AbstractRocksDBColumn implements } } } finally { - if (writeOptions != null && !(writeOptions instanceof UnreleasableWriteOptions)) { + if (!(writeOptions instanceof UnreleasableWriteOptions)) { writeOptions.close(); } } @@ -209,6 +234,15 @@ public sealed abstract class AbstractRocksDBColumn implements if (Schedulers.isInNonBlockingThread()) { throw new UnsupportedOperationException("Called containsKey in a nonblocking thread"); } + if (!db.isOwningHandle()) { + throw new IllegalStateException("Database is closed"); + } + if (!readOptions.isOwningHandle()) { + throw new IllegalStateException("ReadOptions is closed"); + } + if (!cfh.isOwningHandle()) { + throw new IllegalStateException("Column family is closed"); + } int size = RocksDB.NOT_FOUND; byte[] keyBytes = LLUtils.toArray(key); Holder data = new Holder<>(); @@ -221,7 +255,7 @@ public sealed abstract class AbstractRocksDBColumn implements } } } finally { - if (readOptions != null && !(readOptions instanceof UnreleasableReadOptions)) { + if (!(readOptions instanceof UnreleasableReadOptions)) { readOptions.close(); } } @@ -232,6 +266,15 @@ public sealed abstract class AbstractRocksDBColumn implements @Override public void delete(WriteOptions writeOptions, Send keySend) throws RocksDBException { try (var key = keySend.receive()) { + if (!db.isOwningHandle()) { + throw new IllegalStateException("Database is closed"); + } + if (!writeOptions.isOwningHandle()) { + throw new IllegalStateException("WriteOptions is closed"); + } + if (!cfh.isOwningHandle()) { + throw new IllegalStateException("Column family is closed"); + } if (opts.allowNettyDirect()) { DirectBuffer keyNioBuffer = LLUtils.convertToReadableDirect(alloc, key.send()); try { @@ -248,43 +291,106 @@ public sealed abstract class AbstractRocksDBColumn implements @Override public void delete(WriteOptions writeOptions, byte[] key) throws RocksDBException { + if (!db.isOwningHandle()) { + throw new IllegalStateException("Database is closed"); + } + if (!writeOptions.isOwningHandle()) { + throw new IllegalStateException("WriteOptions is closed"); + } + if (!cfh.isOwningHandle()) { + throw new IllegalStateException("Column family is closed"); + } db.delete(cfh, writeOptions, key); } @Override public List multiGetAsList(ReadOptions readOptions, List keys) throws RocksDBException { + if (!db.isOwningHandle()) { + throw new IllegalStateException("Database is closed"); + } + if (!readOptions.isOwningHandle()) { + throw new IllegalStateException("ReadOptions is closed"); + } + if (!cfh.isOwningHandle()) { + throw new IllegalStateException("Column family is closed"); + } var columnFamilyHandles = new RepeatedElementList<>(cfh, keys.size()); return db.multiGetAsList(readOptions, columnFamilyHandles, keys); } @Override public void suggestCompactRange() throws RocksDBException { + if (!db.isOwningHandle()) { + throw new IllegalStateException("Database is closed"); + } + if (!cfh.isOwningHandle()) { + throw new IllegalStateException("Column family is closed"); + } db.suggestCompactRange(cfh); } @Override public void compactRange(byte[] begin, byte[] end, CompactRangeOptions options) throws RocksDBException { + if (!db.isOwningHandle()) { + throw new IllegalStateException("Database is closed"); + } + if (!options.isOwningHandle()) { + throw new IllegalStateException("CompactRangeOptions is closed"); + } + if (!cfh.isOwningHandle()) { + throw new IllegalStateException("Column family is closed"); + } db.compactRange(cfh, begin, end, options); } @Override public void flush(FlushOptions options) throws RocksDBException { + if (!db.isOwningHandle()) { + throw new IllegalStateException("Database is closed"); + } + if (!options.isOwningHandle()) { + throw new IllegalStateException("FlushOptions is closed"); + } + if (!cfh.isOwningHandle()) { + throw new IllegalStateException("Column family is closed"); + } db.flush(options, cfh); } @Override public void flushWal(boolean sync) throws RocksDBException { + if (!db.isOwningHandle()) { + throw new IllegalStateException("Database is closed"); + } + if (!cfh.isOwningHandle()) { + throw new IllegalStateException("Column family is closed"); + } db.flushWal(sync); } @Override public long getLongProperty(String property) throws RocksDBException { + if (!db.isOwningHandle()) { + throw new IllegalStateException("Database is closed"); + } + if (!cfh.isOwningHandle()) { + throw new IllegalStateException("Column family is closed"); + } return db.getLongProperty(cfh, property); } @Override public void write(WriteOptions writeOptions, WriteBatch writeBatch) throws RocksDBException { + if (!db.isOwningHandle()) { + throw new IllegalStateException("Database is closed"); + } + if (!writeOptions.isOwningHandle()) { + throw new IllegalStateException("WriteOptions is closed"); + } + if (!cfh.isOwningHandle()) { + throw new IllegalStateException("Column family is closed"); + } db.write(writeOptions, writeBatch); } @@ -298,6 +404,15 @@ public sealed abstract class AbstractRocksDBColumn implements @Override @NotNull public RocksIterator newIterator(@NotNull ReadOptions readOptions) { + if (!db.isOwningHandle()) { + throw new IllegalStateException("Database is closed"); + } + if (!readOptions.isOwningHandle()) { + throw new IllegalStateException("ReadOptions is closed"); + } + if (!cfh.isOwningHandle()) { + throw new IllegalStateException("Column family is closed"); + } return db.newIterator(cfh, readOptions); } @@ -310,4 +425,8 @@ public sealed abstract class AbstractRocksDBColumn implements public BufferAllocator getAllocator() { return alloc; } + + public MeterRegistry getMeterRegistry() { + return meterRegistry; + } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java index d623115..86896bd 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine.database.disk; +import io.micrometer.core.instrument.MeterRegistry; import io.net5.buffer.api.BufferAllocator; import it.cavallium.dbengine.client.IndicizerAnalyzers; import it.cavallium.dbengine.client.IndicizerSimilarities; @@ -31,11 +32,13 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection { private final AtomicBoolean connected = new AtomicBoolean(); private final BufferAllocator allocator; + private final MeterRegistry meterRegistry; private final Path basePath; private final AtomicReference env = new AtomicReference<>(); - public LLLocalDatabaseConnection(BufferAllocator allocator, Path basePath) { + public LLLocalDatabaseConnection(BufferAllocator allocator, MeterRegistry meterRegistry, Path basePath) { this.allocator = allocator; + this.meterRegistry = meterRegistry; this.basePath = basePath; } @@ -44,6 +47,10 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection { return allocator; } + public MeterRegistry getMeterRegistry() { + return meterRegistry; + } + @Override public Mono connect() { return Mono @@ -70,6 +77,7 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection { return Mono .fromCallable(() -> new LLLocalKeyValueDatabase( allocator, + meterRegistry, name, basePath.resolve("database_" + name), columns, @@ -93,6 +101,7 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection { Objects.requireNonNull(env, "Environment not set"); return new LLLocalMultiLuceneIndex(env, luceneOptions.inMemory() ? null : basePath.resolve("lucene"), + meterRegistry, name, instancesCount, indicizerAnalyzers, @@ -102,6 +111,7 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection { ); } else { return new LLLocalLuceneIndex(luceneOptions.inMemory() ? null : basePath.resolve("lucene"), + meterRegistry, name, indicizerAnalyzers, indicizerSimilarities, diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 2604b0a..5d9f284 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -808,7 +808,7 @@ public class LLLocalDictionary implements LLDictionary { rangeSend -> Flux.using( () -> new LLLocalEntryReactiveRocksIterator(db, rangeSend, databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot)), - llLocalEntryReactiveRocksIterator -> llLocalEntryReactiveRocksIterator.flux().subscribeOn(dbScheduler), + iterator -> iterator.flux().subscribeOn(dbScheduler, false), LLLocalReactiveRocksIterator::close ).transform(LLUtils::handleDiscard), rangeSend -> Mono.fromRunnable(rangeSend::close) @@ -821,7 +821,7 @@ public class LLLocalDictionary implements LLDictionary { rangeSend -> Flux.using( () -> new LLLocalGroupedEntryReactiveRocksIterator(db, prefixLength, rangeSend, databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot)), - reactiveRocksIterator -> reactiveRocksIterator.flux().subscribeOn(dbScheduler), + iterator -> iterator.flux().subscribeOn(dbScheduler, false), LLLocalGroupedReactiveRocksIterator::close ).transform(LLUtils::handleDiscard), rangeSend -> Mono.fromRunnable(rangeSend::close) @@ -852,7 +852,7 @@ public class LLLocalDictionary implements LLDictionary { rangeSend -> Flux.using( () -> new LLLocalGroupedKeyReactiveRocksIterator(db, prefixLength, rangeSend, databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot)), - reactiveRocksIterator -> reactiveRocksIterator.flux().subscribeOn(dbScheduler), + iterator -> iterator.flux().subscribeOn(dbScheduler, false), LLLocalGroupedReactiveRocksIterator::close ).transform(LLUtils::handleDiscard), rangeSend -> Mono.fromRunnable(rangeSend::close) @@ -951,7 +951,7 @@ public class LLLocalDictionary implements LLDictionary { () -> new LLLocalKeyReactiveRocksIterator(db, rangeSend, databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot) ), - llLocalKeyReactiveRocksIterator -> llLocalKeyReactiveRocksIterator.flux().subscribeOn(dbScheduler), + iterator -> iterator.flux().subscribeOn(dbScheduler, false), LLLocalReactiveRocksIterator::close ).transform(LLUtils::handleDiscard), rangeSend -> Mono.fromRunnable(rangeSend::close) diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 8e3610a..2ef7c65 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.disk; import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB; +import io.micrometer.core.instrument.MeterRegistry; import io.net5.buffer.api.BufferAllocator; import io.net5.util.internal.PlatformDependent; import it.cavallium.dbengine.database.Column; @@ -73,6 +74,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { RocksDB.DEFAULT_COLUMN_FAMILY); private final BufferAllocator allocator; + private final MeterRegistry meterRegistry; private final Scheduler dbScheduler; // Configurations @@ -89,6 +91,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { @SuppressWarnings("SwitchStatementWithTooFewBranches") public LLLocalKeyValueDatabase(BufferAllocator allocator, + MeterRegistry meterRegistry, String name, @Nullable Path path, List columns, @@ -96,6 +99,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { DatabaseOptions databaseOptions) throws IOException { this.name = name; this.allocator = allocator; + this.meterRegistry = meterRegistry; if (databaseOptions.allowNettyDirect()) { if (!PlatformDependent.hasUnsafe()) { @@ -496,11 +500,11 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { private RocksDBColumn getRocksDBColumn(RocksDB db, ColumnFamilyHandle cfh) { if (db instanceof OptimisticTransactionDB optimisticTransactionDB) { - return new OptimisticRocksDBColumn(optimisticTransactionDB, databaseOptions, allocator, cfh); + return new OptimisticRocksDBColumn(optimisticTransactionDB, databaseOptions, allocator, cfh, meterRegistry); } else if (db instanceof TransactionDB) { - return new PessimisticRocksDBColumn((TransactionDB) db, databaseOptions, allocator, cfh); + return new PessimisticRocksDBColumn((TransactionDB) db, databaseOptions, allocator, cfh, meterRegistry); } else { - return new StandardRocksDBColumn(db, databaseOptions, allocator, cfh); + return new StandardRocksDBColumn(db, databaseOptions, allocator, cfh, meterRegistry); } } @@ -547,6 +551,11 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { return allocator; } + @Override + public MeterRegistry getMeterRegistry() { + return meterRegistry; + } + @Override public Mono takeSnapshot() { return Mono diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index bdbf7c3..a246c9a 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -3,6 +3,8 @@ package it.cavallium.dbengine.database.disk; import static it.cavallium.dbengine.database.LLUtils.MARKER_LUCENE; import static it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.NO_TRANSFORMATION; +import com.google.common.util.concurrent.Uninterruptibles; +import io.micrometer.core.instrument.MeterRegistry; import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.DirectIOOptions; import it.cavallium.dbengine.client.IndicizerAnalyzers; @@ -28,6 +30,9 @@ import java.nio.file.Path; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -49,8 +54,10 @@ import org.apache.lucene.store.NIOFSDirectory; import org.apache.lucene.store.NRTCachingDirectory; import org.apache.lucene.util.Constants; import org.jetbrains.annotations.Nullable; +import org.warp.commonutils.functional.IORunnable; import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.LoggerFactory; +import org.warp.commonutils.type.ShortNamedThreadFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; @@ -67,7 +74,9 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { * concurrent commits or concurrent refreshes. */ private static final Scheduler luceneHeavyTasksScheduler = Schedulers.single(Schedulers.boundedElastic()); + private static final ExecutorService SAFE_EXECUTOR = Executors.newCachedThreadPool(new ShortNamedThreadFactory("lucene-index-impl")); + private final MeterRegistry meterRegistry; private final String luceneIndexName; private final IndexWriter indexWriter; private final SnapshotsManager snapshotsManager; @@ -81,11 +90,13 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { private final AtomicBoolean closeRequested = new AtomicBoolean(); public LLLocalLuceneIndex(@Nullable Path luceneBasePath, + MeterRegistry meterRegistry, String name, IndicizerAnalyzers indicizerAnalyzers, IndicizerSimilarities indicizerSimilarities, LuceneOptions luceneOptions, @Nullable LuceneHacks luceneHacks) throws IOException { + this.meterRegistry = meterRegistry; Path directoryPath; if (luceneOptions.inMemory() != (luceneBasePath == null)) { throw new IllegalArgumentException(); @@ -234,13 +245,45 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } private Mono ensureOpen(Mono mono) { - return Mono.defer(() -> { + return Mono.fromCallable(() -> { if (closeRequested.get()) { - return Mono.error(new IllegalStateException("Lucene index is closed")); + throw new IllegalStateException("Lucene index is closed"); } else { - return mono; + return null; } - }).doFirst(activeTasks::register).doFinally(s -> activeTasks.arriveAndDeregister()); + }).then(mono).doFirst(activeTasks::register).doFinally(s -> activeTasks.arriveAndDeregister()); + } + + private Mono runSafe(Callable callable) { + return Mono.create(sink -> { + var future = SAFE_EXECUTOR.submit(() -> { + try { + var result = callable.call(); + if (result != null) { + sink.success(result); + } else { + sink.success(); + } + } catch (Throwable e) { + sink.error(e); + } + }); + sink.onDispose(() -> future.cancel(false)); + }); + } + + private Mono runSafe(IORunnable runnable) { + return Mono.create(sink -> { + var future = SAFE_EXECUTOR.submit(() -> { + try { + runnable.run(); + sink.success(); + } catch (Throwable e) { + sink.error(e); + } + }); + sink.onDispose(() -> future.cancel(false)); + }); } @Override @@ -250,40 +293,29 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { @Override public Mono addDocument(LLTerm key, LLDocument doc) { - return Mono.fromCallable(() -> { - indexWriter.addDocument(LLUtils.toDocument(doc)); - return null; - }).subscribeOn(Schedulers.boundedElastic()).transform(this::ensureOpen); + return this.runSafe(() -> indexWriter.addDocument(LLUtils.toDocument(doc))).transform(this::ensureOpen); } @Override public Mono addDocuments(Flux> documents) { return documents .collectList() - .flatMap(documentsList -> Mono - .fromCallable(() -> { - indexWriter.addDocuments(LLUtils.toDocumentsFromEntries(documentsList)); - return null; - }).subscribeOn(Schedulers.boundedElastic()) - ) + .flatMap(documentsList -> this.runSafe(() -> indexWriter.addDocuments(LLUtils + .toDocumentsFromEntries(documentsList)))) .transform(this::ensureOpen); } @Override public Mono deleteDocument(LLTerm id) { - return Mono.fromCallable(() -> { - indexWriter.deleteDocuments(LLUtils.toTerm(id)); - return null; - }).subscribeOn(Schedulers.boundedElastic()).transform(this::ensureOpen); + return this.runSafe(() -> indexWriter.deleteDocuments(LLUtils.toTerm(id))).transform(this::ensureOpen); } @Override public Mono updateDocument(LLTerm id, LLDocument document) { - return Mono.fromCallable(() -> { - indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(document)); - return null; - }).subscribeOn(Schedulers.boundedElastic()).transform(this::ensureOpen); + return this + .runSafe(() -> indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(document))) + .transform(this::ensureOpen); } @Override @@ -292,29 +324,21 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } private Mono updateDocuments(Map documentsMap) { - return Mono - .fromCallable(() -> { - for (Entry entry : documentsMap.entrySet()) { - LLTerm key = entry.getKey(); - LLDocument value = entry.getValue(); - indexWriter.updateDocument(LLUtils.toTerm(key), LLUtils.toDocument(value)); - } - return null; - }) - .subscribeOn(Schedulers.boundedElastic()) - .transform(this::ensureOpen); + return this.runSafe(() -> { + for (Entry entry : documentsMap.entrySet()) { + LLTerm key = entry.getKey(); + LLDocument value = entry.getValue(); + indexWriter.updateDocument(LLUtils.toTerm(key), LLUtils.toDocument(value)); + } + }).transform(this::ensureOpen); } @Override public Mono deleteAll() { - return Mono.fromCallable(() -> { - //noinspection BlockingMethodInNonBlockingContext + return this.runSafe(() -> { indexWriter.deleteAll(); - //noinspection BlockingMethodInNonBlockingContext indexWriter.forceMergeDeletes(true); - //noinspection BlockingMethodInNonBlockingContext indexWriter.commit(); - return null; }).subscribeOn(luceneHeavyTasksScheduler).transform(this::ensureOpen); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index 3e1e223..a6c6304 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine.database.disk; +import io.micrometer.core.instrument.MeterRegistry; import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.IndicizerAnalyzers; import it.cavallium.dbengine.client.IndicizerSimilarities; @@ -39,6 +40,7 @@ import reactor.util.function.Tuple2; public class LLLocalMultiLuceneIndex implements LLLuceneIndex { + private final MeterRegistry meterRegistry; private final ConcurrentHashMap registeredSnapshots = new ConcurrentHashMap<>(); private final AtomicLong nextSnapshotNumber = new AtomicLong(1); private final LLLocalLuceneIndex[] luceneIndices; @@ -49,6 +51,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { public LLLocalMultiLuceneIndex(LLTempLMDBEnv env, Path lucene, + MeterRegistry meterRegistry, String name, int instancesCount, IndicizerAnalyzers indicizerAnalyzers, @@ -60,6 +63,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { throw new IOException("Unsupported instances count: " + instancesCount); } + this.meterRegistry = meterRegistry; LLLocalLuceneIndex[] luceneIndices = new LLLocalLuceneIndex[instancesCount]; for (int i = 0; i < instancesCount; i++) { String instanceName; @@ -69,6 +73,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { instanceName = name + "_" + String.format("%03d", i); } luceneIndices[i] = new LLLocalLuceneIndex(lucene, + meterRegistry, instanceName, indicizerAnalyzers, indicizerSimilarities, diff --git a/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java index 84f0a61..a24882a 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java @@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.disk; import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB; +import io.micrometer.core.instrument.MeterRegistry; import io.net5.buffer.api.Buffer; import io.net5.buffer.api.BufferAllocator; import io.net5.buffer.api.MemoryManager; @@ -27,11 +28,14 @@ import reactor.core.scheduler.Schedulers; public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn { + private static final boolean ALWAYS_PRINT_OPTIMISTIC_RETRIES = false; + public OptimisticRocksDBColumn(OptimisticTransactionDB db, DatabaseOptions databaseOptions, BufferAllocator alloc, - ColumnFamilyHandle cfh) { - super(db, databaseOptions, alloc, cfh); + ColumnFamilyHandle cfh, + MeterRegistry meterRegistry) { + super(db, databaseOptions, alloc, cfh, meterRegistry); } @Override @@ -156,20 +160,23 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn= 100 && retries % 100 == 0) { - logger.warn(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):" - + " waiting 5ms before retrying for the {} time", LLUtils.toStringSafe(key), retries); - } else if (logger.isDebugEnabled(MARKER_ROCKSDB)) { - logger.debug(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):" - + " waiting 5ms before retrying", LLUtils.toStringSafe(key)); - } + if (retries == 1) { retryTime = new ExponentialPageLimits(0, 5, 2000); } long retryMs = retryTime.getPageLimit(retries); + // +- 20% retryMs = retryMs + (long) (retryMs * 0.2d * ThreadLocalRandom.current().nextDouble(-1.0d, 1.0d)); - // Wait for 5ms + + if (retries >= 5 && retries % 5 == 0 || ALWAYS_PRINT_OPTIMISTIC_RETRIES) { + logger.warn(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):" + + " waiting {} ms before retrying for the {} time", LLUtils.toStringSafe(key), retryMs, retries); + } else if (logger.isDebugEnabled(MARKER_ROCKSDB)) { + logger.debug(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):" + + " waiting {} ms before retrying for the {} time", LLUtils.toStringSafe(key), retryMs, retries); + } + // Wait for n milliseconds try { Thread.sleep(retryMs); } catch (InterruptedException e) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/PessimisticRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/PessimisticRocksDBColumn.java index f3483af..b55d7db 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/PessimisticRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/PessimisticRocksDBColumn.java @@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.disk; import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB; +import io.micrometer.core.instrument.MeterRegistry; import io.net5.buffer.api.Buffer; import io.net5.buffer.api.BufferAllocator; import io.net5.buffer.api.MemoryManager; @@ -29,8 +30,8 @@ public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn public StandardRocksDBColumn(RocksDB db, DatabaseOptions databaseOptions, BufferAllocator alloc, - ColumnFamilyHandle cfh) { - super(db, databaseOptions, alloc, cfh); + ColumnFamilyHandle cfh, MeterRegistry meterRegistry) { + super(db, databaseOptions, alloc, cfh, meterRegistry); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java index f31f4e2..824ae6a 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine.database.memory; +import io.micrometer.core.instrument.MeterRegistry; import io.net5.buffer.api.BufferAllocator; import it.cavallium.dbengine.client.DatabaseOptions; import it.cavallium.dbengine.client.IndicizerAnalyzers; @@ -24,9 +25,11 @@ public class LLMemoryDatabaseConnection implements LLDatabaseConnection { } private final BufferAllocator allocator; + private final MeterRegistry meterRegistry; - public LLMemoryDatabaseConnection(BufferAllocator allocator) { + public LLMemoryDatabaseConnection(BufferAllocator allocator, MeterRegistry meterRegistry) { this.allocator = allocator; + this.meterRegistry = meterRegistry; } @Override @@ -34,6 +37,11 @@ public class LLMemoryDatabaseConnection implements LLDatabaseConnection { return allocator; } + @Override + public MeterRegistry getMeterRegistry() { + return meterRegistry; + } + @Override public Mono connect() { return Mono.empty(); @@ -46,6 +54,7 @@ public class LLMemoryDatabaseConnection implements LLDatabaseConnection { return Mono .fromCallable(() -> new LLMemoryKeyValueDatabase( allocator, + meterRegistry, name, columns )) @@ -61,6 +70,7 @@ public class LLMemoryDatabaseConnection implements LLDatabaseConnection { @Nullable LuceneHacks luceneHacks) { return Mono .fromCallable(() -> new LLLocalLuceneIndex(null, + meterRegistry, name, indicizerAnalyzers, indicizerSimilarities, diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java index 8d1b411..cdc3657 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine.database.memory; +import io.micrometer.core.instrument.MeterRegistry; import io.net5.buffer.api.BufferAllocator; import it.cavallium.dbengine.database.Column; import it.cavallium.dbengine.database.LLDictionary; @@ -18,6 +19,7 @@ import reactor.core.publisher.Mono; public class LLMemoryKeyValueDatabase implements LLKeyValueDatabase { private final BufferAllocator allocator; + private final MeterRegistry meterRegistry; private final String name; private final AtomicLong nextSnapshotNumber = new AtomicLong(1); @@ -25,8 +27,12 @@ public class LLMemoryKeyValueDatabase implements LLKeyValueDatabase { private final ConcurrentHashMap> mainDb; private final ConcurrentHashMap singletons = new ConcurrentHashMap<>(); - public LLMemoryKeyValueDatabase(BufferAllocator allocator, String name, List columns) { + public LLMemoryKeyValueDatabase(BufferAllocator allocator, + MeterRegistry meterRegistry, + String name, + List columns) { this.allocator = allocator; + this.meterRegistry = meterRegistry; this.name = name; this.mainDb = new ConcurrentHashMap<>(); for (Column column : columns) { @@ -80,6 +86,11 @@ public class LLMemoryKeyValueDatabase implements LLKeyValueDatabase { return allocator; } + @Override + public MeterRegistry getMeterRegistry() { + return meterRegistry; + } + @Override public Mono close() { return Mono diff --git a/src/test/java/it/cavallium/dbengine/LocalTemporaryDbGenerator.java b/src/test/java/it/cavallium/dbengine/LocalTemporaryDbGenerator.java index 41260f2..4bbf55b 100644 --- a/src/test/java/it/cavallium/dbengine/LocalTemporaryDbGenerator.java +++ b/src/test/java/it/cavallium/dbengine/LocalTemporaryDbGenerator.java @@ -2,6 +2,7 @@ package it.cavallium.dbengine; import static it.cavallium.dbengine.DbTestUtils.ensureNoLeaks; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import it.cavallium.dbengine.DbTestUtils.TempDb; import it.cavallium.dbengine.DbTestUtils.TestAllocator; import it.cavallium.dbengine.client.DatabaseOptions; @@ -56,7 +57,7 @@ public class LocalTemporaryDbGenerator implements TemporaryDbGenerator { return null; }) .subscribeOn(Schedulers.boundedElastic()) - .then(new LLLocalDatabaseConnection(allocator.allocator(), wrkspcPath).connect()) + .then(new LLLocalDatabaseConnection(allocator.allocator(), new SimpleMeterRegistry(), wrkspcPath).connect()) .flatMap(conn -> { SwappableLuceneSearcher searcher = new SwappableLuceneSearcher(); var luceneHacks = new LuceneHacks(() -> searcher, () -> searcher); diff --git a/src/test/java/it/cavallium/dbengine/MemoryTemporaryDbGenerator.java b/src/test/java/it/cavallium/dbengine/MemoryTemporaryDbGenerator.java index bcccc41..73aabcd 100644 --- a/src/test/java/it/cavallium/dbengine/MemoryTemporaryDbGenerator.java +++ b/src/test/java/it/cavallium/dbengine/MemoryTemporaryDbGenerator.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import it.cavallium.dbengine.DbTestUtils.TempDb; import it.cavallium.dbengine.DbTestUtils.TestAllocator; import it.cavallium.dbengine.client.DatabaseOptions; @@ -28,7 +29,7 @@ public class MemoryTemporaryDbGenerator implements TemporaryDbGenerator { public Mono openTempDb(TestAllocator allocator) { boolean canUseNettyDirect = DbTestUtils.computeCanUseNettyDirect(); return Mono - .fromCallable(() -> new LLMemoryDatabaseConnection(allocator.allocator())) + .fromCallable(() -> new LLMemoryDatabaseConnection(allocator.allocator(), new SimpleMeterRegistry())) .flatMap(conn -> { SwappableLuceneSearcher searcher = new SwappableLuceneSearcher(); var luceneHacks = new LuceneHacks(() -> searcher, () -> searcher);