From a1c0e19adcd45887f21ea6f7894ace21d304a067 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Wed, 4 May 2022 01:21:56 +0200 Subject: [PATCH] Add readable rocksdb stats --- .../dbengine/client/CompositeDatabase.java | 3 +- .../dbengine/database/ColumnProperty.java | 3 + .../dbengine/database/DatabaseProperties.java | 31 +++ .../dbengine/database/LLKeyValueDatabase.java | 14 +- .../database/RocksDBLongProperty.java | 106 ++++++++ .../dbengine/database/RocksDBMapProperty.java | 40 +++ .../dbengine/database/RocksDBProperty.java | 12 + .../database/RocksDBStringProperty.java | 53 ++++ .../disk/LLLocalKeyValueDatabase.java | 245 +++++++++++++----- .../memory/LLMemoryKeyValueDatabase.java | 45 +++- .../database/remote/LLQuicConnection.java | 48 +++- 11 files changed, 516 insertions(+), 84 deletions(-) create mode 100644 src/main/java/it/cavallium/dbengine/database/ColumnProperty.java create mode 100644 src/main/java/it/cavallium/dbengine/database/DatabaseProperties.java create mode 100644 src/main/java/it/cavallium/dbengine/database/RocksDBLongProperty.java create mode 100644 src/main/java/it/cavallium/dbengine/database/RocksDBMapProperty.java create mode 100644 src/main/java/it/cavallium/dbengine/database/RocksDBProperty.java create mode 100644 src/main/java/it/cavallium/dbengine/database/RocksDBStringProperty.java diff --git a/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java b/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java index 1013223..105d548 100644 --- a/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java +++ b/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java @@ -2,10 +2,11 @@ package it.cavallium.dbengine.client; import io.micrometer.core.instrument.MeterRegistry; import io.netty5.buffer.api.BufferAllocator; +import it.cavallium.dbengine.database.DatabaseProperties; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -public interface CompositeDatabase { +public interface CompositeDatabase extends DatabaseProperties { Mono close(); diff --git a/src/main/java/it/cavallium/dbengine/database/ColumnProperty.java b/src/main/java/it/cavallium/dbengine/database/ColumnProperty.java new file mode 100644 index 0000000..0f3f779 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/ColumnProperty.java @@ -0,0 +1,3 @@ +package it.cavallium.dbengine.database; + +public record ColumnProperty(String columnName, String propertyName, T value) {} diff --git a/src/main/java/it/cavallium/dbengine/database/DatabaseProperties.java b/src/main/java/it/cavallium/dbengine/database/DatabaseProperties.java new file mode 100644 index 0000000..8c79293 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/DatabaseProperties.java @@ -0,0 +1,31 @@ +package it.cavallium.dbengine.database; + +import it.cavallium.dbengine.client.MemoryStats; +import it.cavallium.dbengine.rpc.current.data.Column; +import java.util.Map; +import org.jetbrains.annotations.Nullable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public interface DatabaseProperties { + + Mono getMemoryStats(); + + Mono getRocksDBStats(); + + Mono> getMapProperty(@Nullable Column column, RocksDBMapProperty property); + + Flux>> getMapColumnProperties(RocksDBMapProperty property); + + Mono getStringProperty(@Nullable Column column, RocksDBStringProperty property); + + Flux> getStringColumnProperties(RocksDBStringProperty property); + + Mono getLongProperty(@Nullable Column column, RocksDBLongProperty property); + + Flux> getLongColumnProperties(RocksDBLongProperty property); + + Mono getAggregatedLongProperty(RocksDBLongProperty property); + + Flux getTableProperties(); +} diff --git a/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java index 5c82088..fd3e720 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java @@ -7,14 +7,14 @@ import io.netty5.buffer.api.BufferAllocator; import it.cavallium.dbengine.client.MemoryStats; import it.cavallium.dbengine.database.collections.DatabaseInt; import it.cavallium.dbengine.database.collections.DatabaseLong; +import it.cavallium.dbengine.rpc.current.data.Column; import java.nio.charset.StandardCharsets; -import java.util.Map.Entry; +import java.util.Map; import org.jetbrains.annotations.Nullable; -import org.rocksdb.TableProperties; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -public interface LLKeyValueDatabase extends LLSnapshottable, LLKeyValueDatabaseStructure { +public interface LLKeyValueDatabase extends LLSnapshottable, LLKeyValueDatabaseStructure, DatabaseProperties { Mono getSingleton(byte[] singletonListColumnName, byte[] name, byte @Nullable[] defaultValue); @@ -54,14 +54,6 @@ public interface LLKeyValueDatabase extends LLSnapshottable, LLKeyValueDatabaseS .map(DatabaseLong::new); } - Mono getProperty(String propertyName); - - Mono getMemoryStats(); - - Mono getRocksDBStats(); - - Flux getTableProperties(); - Mono verifyChecksum(); Mono compact(); diff --git a/src/main/java/it/cavallium/dbengine/database/RocksDBLongProperty.java b/src/main/java/it/cavallium/dbengine/database/RocksDBLongProperty.java new file mode 100644 index 0000000..d1c1c5d --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/RocksDBLongProperty.java @@ -0,0 +1,106 @@ +package it.cavallium.dbengine.database; + +public enum RocksDBLongProperty implements RocksDBProperty { + NUM_FILES_AT_LEVEL_0("num-files-at-level0"), + NUM_FILES_AT_LEVEL_1("num-files-at-level1"), + NUM_FILES_AT_LEVEL_2("num-files-at-level2"), + NUM_FILES_AT_LEVEL_3("num-files-at-level3"), + NUM_FILES_AT_LEVEL_4("num-files-at-level4"), + NUM_FILES_AT_LEVEL_5("num-files-at-level5"), + NUM_FILES_AT_LEVEL_6("num-files-at-level6"), + NUM_FILES_AT_LEVEL_7("num-files-at-level7"), + NUM_FILES_AT_LEVEL_8("num-files-at-level8"), + NUM_FILES_AT_LEVEL_9("num-files-at-level9"), + COMPRESSION_RATIO_AT_LEVEL_0("compression-ratio-at-level0"), + COMPRESSION_RATIO_AT_LEVEL_1("compression-ratio-at-level1"), + COMPRESSION_RATIO_AT_LEVEL_2("compression-ratio-at-level2"), + COMPRESSION_RATIO_AT_LEVEL_3("compression-ratio-at-level3"), + COMPRESSION_RATIO_AT_LEVEL_4("compression-ratio-at-level4"), + COMPRESSION_RATIO_AT_LEVEL_5("compression-ratio-at-level5"), + COMPRESSION_RATIO_AT_LEVEL_6("compression-ratio-at-level6"), + COMPRESSION_RATIO_AT_LEVEL_7("compression-ratio-at-level7"), + COMPRESSION_RATIO_AT_LEVEL_8("compression-ratio-at-level8"), + COMPRESSION_RATIO_AT_LEVEL_9("compression-ratio-at-level9"), + NUM_IMMUTABLE_MEM_TABLE("num-immutable-mem-table"), + NUM_IMMUTABLE_MEM_TABLE_FLUSHED("num-immutable-mem-table-flushed"), + MEM_TABLE_FLUSH_PENDING("mem-table-flush-pending"), + NUM_RUNNING_FLUSHES("num-running-flushes"), + COMPACTION_PENDING("compaction-pending"), + NUM_RUNNING_COMPACTIONS("num-running-compactions"), + BACKGROUND_ERRORS("background-errors"), + CUR_SIZE_ACTIVE_MEM_TABLE("cur-size-active-mem-table"), + CUR_SIZE_ALL_MEM_TABLES("cur-size-all-mem-tables"), + SIZE_ALL_MEM_TABLES("size-all-mem-tables"), + NUM_ENTRIES_ACTIVE_MEM_TABLE("num-entries-active-mem-table"), + NUM_ENTRIES_IMMUTABLE_MEM_TABLES("num-entries-imm-mem-tables"), + NUM_DELETES_ACTIVE_MEM_TABLE("num-deletes-active-mem-table"), + NUM_DELETES_IMMUTABLE_MEM_TABLES("num-deletes-imm-mem-tables"), + ESTIMATE_NUM_KEYS("estimate-num-keys"), + ESTIMATE_TABLE_READERS_MEM("estimate-table-readers-mem"), + IS_FILE_DELETIONS_ENABLED("is-file-deletions-enabled"), + NUM_SNAPSHOTS("num-snapshots"), + OLDEST_SNAPSHOT_TIME("oldest-snapshot-time"), + OLDEST_SNAPSHOT_SEQUENCE("oldest-snapshot-sequence"), + NUM_LIVE_VERSIONS("num-live-versions"), + CURRENT_SUPER_VERSION_NUMBER("current-super-version-number"), + ESTIMATE_LIVE_DATA_SIZE("estimate-live-data-size"), + MIN_LOG_NUMBER_TO_KEEP("min-log-number-to-keep"), + MIN_OBSOLETE_SST_NUMBER_TO_KEEP("min-obsolete-sst-number-to-keep"), + TOTAL_SST_FILES_SIZE("total-sst-files-size"), + LIVE_SST_FILES_SIZE("live-sst-files-size"), + LIVE_SST_FILES_SIZE_AT_TEMPERATURE("live-sst-files-size-at-temperature"), + BASE_LEVEL("base-level"), + ESTIMATE_PENDING_COMPACTION_BYTES("estimate-pending-compaction-bytes"), + ACTUAL_DELAYED_WRITE_RATE("actual-delayed-write-rate"), + IS_WRITE_STOPPED("is-write-stopped"), + ESTIMATE_OLDEST_KEY_TIME("estimate-oldest-key-time"), + BLOCK_CACHE_CAPACITY("block-cache-capacity", false), + BLOCK_CACHE_USAGE("block-cache-usage", false), + BLOCK_CACHE_PINNED_USAGE("block-cache-pinned-usage", false), + NUM_BLOB_FILES("num-blob-files"), + TOTAL_BLOB_FILE_SIZE("total-blob-file-size"), + LIVE_BLOB_FILE_SIZE("live-blob-file-size"), + LIVE_BLOB_FILE_GARBAGE_SIZE("live-blob-file-garbage-size") + ; + + private final String name; + private final boolean dividedByColumnFamily; + + RocksDBLongProperty(String name) { + this(name, true); + } + + RocksDBLongProperty(String name, boolean dividedByColumnFamily) { + this.name = name; + this.dividedByColumnFamily = dividedByColumnFamily; + } + + @Override + public String toString() { + return "rocksdb." + name; + } + + @Override + public String getName() { + return "rocksdb." + name; + } + + @Override + public boolean isNumeric() { + return true; + } + + @Override + public boolean isMap() { + return false; + } + + @Override + public boolean isString() { + return false; + } + + public boolean isDividedByColumnFamily() { + return dividedByColumnFamily; + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/RocksDBMapProperty.java b/src/main/java/it/cavallium/dbengine/database/RocksDBMapProperty.java new file mode 100644 index 0000000..14793f6 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/RocksDBMapProperty.java @@ -0,0 +1,40 @@ +package it.cavallium.dbengine.database; + +public enum RocksDBMapProperty implements RocksDBProperty { + CFSTATS("cfstats"), + DBSTATS("dbstats"), + BLOCK_CACHE_ENTRY_STATS("block-cache-entry-stats"), + AGGREGATED_TABLE_PROPERTIES("aggregated-table-properties"), + ; + + private final String name; + + RocksDBMapProperty(String name) { + this.name = name; + } + + @Override + public String toString() { + return "rocksdb." + name; + } + + @Override + public String getName() { + return "rocksdb." + name; + } + + @Override + public boolean isNumeric() { + return false; + } + + @Override + public boolean isMap() { + return true; + } + + @Override + public boolean isString() { + return false; + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/RocksDBProperty.java b/src/main/java/it/cavallium/dbengine/database/RocksDBProperty.java new file mode 100644 index 0000000..3b7e93d --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/RocksDBProperty.java @@ -0,0 +1,12 @@ +package it.cavallium.dbengine.database; + +public interface RocksDBProperty { + + String getName(); + + boolean isNumeric(); + + boolean isMap(); + + boolean isString(); +} diff --git a/src/main/java/it/cavallium/dbengine/database/RocksDBStringProperty.java b/src/main/java/it/cavallium/dbengine/database/RocksDBStringProperty.java new file mode 100644 index 0000000..7a6fc2b --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/RocksDBStringProperty.java @@ -0,0 +1,53 @@ +package it.cavallium.dbengine.database; + +public enum RocksDBStringProperty implements RocksDBProperty { + STATS("stats"), + SSTABLES("sstables"), + CFSTATS_NO_FILE_HISTOGRAM("cfstats-no-file-histogram"), + CF_FILE_HISTOGRAM("cf-file-histogram"), + LEVELSTATS("levelstats"), + AGGREGATED_TABLE_PROPERTIES_AT_LEVEL_0("aggregated-table-properties-at-level0"), + AGGREGATED_TABLE_PROPERTIES_AT_LEVEL_1("aggregated-table-properties-at-level1"), + AGGREGATED_TABLE_PROPERTIES_AT_LEVEL_2("aggregated-table-properties-at-level2"), + AGGREGATED_TABLE_PROPERTIES_AT_LEVEL_3("aggregated-table-properties-at-level3"), + AGGREGATED_TABLE_PROPERTIES_AT_LEVEL_4("aggregated-table-properties-at-level4"), + AGGREGATED_TABLE_PROPERTIES_AT_LEVEL_5("aggregated-table-properties-at-level5"), + AGGREGATED_TABLE_PROPERTIES_AT_LEVEL_6("aggregated-table-properties-at-level6"), + AGGREGATED_TABLE_PROPERTIES_AT_LEVEL_7("aggregated-table-properties-at-level7"), + AGGREGATED_TABLE_PROPERTIES_AT_LEVEL_8("aggregated-table-properties-at-level8"), + AGGREGATED_TABLE_PROPERTIES_AT_LEVEL_9("aggregated-table-properties-at-level9"), + OPTIONS_STATISTICS("options-statistics"), + BLOB_STATS("blob-stats") + ; + + private final String name; + + RocksDBStringProperty(String name) { + this.name = name; + } + + @Override + public String toString() { + return "rocksdb." + name; + } + + @Override + public String getName() { + return "rocksdb." + name; + } + + @Override + public boolean isNumeric() { + return false; + } + + @Override + public boolean isMap() { + return false; + } + + @Override + public boolean isString() { + return true; + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 4b63459..eb438a6 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -1,8 +1,8 @@ package it.cavallium.dbengine.database.disk; -import static com.google.common.collect.Lists.partition; import static io.netty5.buffer.api.StandardAllocationTypes.OFF_HEAP; import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB; +import static java.util.Objects.requireNonNull; import static org.rocksdb.ColumnFamilyOptionsInterface.DEFAULT_COMPACTION_MEMTABLE_MEMORY_BUDGET; import io.micrometer.core.instrument.MeterRegistry; @@ -12,10 +12,14 @@ import io.netty5.buffer.api.BufferAllocator; import io.netty5.util.internal.PlatformDependent; import it.cavallium.data.generator.nativedata.NullableString; import it.cavallium.dbengine.client.MemoryStats; +import it.cavallium.dbengine.database.ColumnProperty; import it.cavallium.dbengine.database.ColumnUtils; import it.cavallium.dbengine.database.LLKeyValueDatabase; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.RocksDBLongProperty; +import it.cavallium.dbengine.database.RocksDBMapProperty; +import it.cavallium.dbengine.database.RocksDBStringProperty; import it.cavallium.dbengine.database.TableWithProperties; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.rpc.current.data.Column; @@ -35,20 +39,13 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; +import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.StampedLock; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -56,6 +53,7 @@ import org.apache.commons.lang3.time.StopWatch; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.Nullable; +import org.reactivestreams.Publisher; import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.BloomFilter; import org.rocksdb.Cache; @@ -64,8 +62,6 @@ import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.CompactRangeOptions; -import org.rocksdb.CompactionJobInfo; -import org.rocksdb.CompactionOptions; import org.rocksdb.CompactionPriority; import org.rocksdb.CompressionOptions; import org.rocksdb.CompressionType; @@ -78,13 +74,11 @@ import org.rocksdb.IndexType; import org.rocksdb.InfoLogLevel; import org.rocksdb.IngestExternalFileOptions; import org.rocksdb.LRUCache; -import org.rocksdb.LevelMetaData; import org.rocksdb.OptimisticTransactionDB; import org.rocksdb.PersistentCache; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.Snapshot; -import org.rocksdb.SstFileMetaData; import org.rocksdb.TransactionDB; import org.rocksdb.TransactionDBOptions; import org.rocksdb.TxnDBWritePolicy; @@ -373,7 +367,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { } // Get databases directory path - Objects.requireNonNull(path); + requireNonNull(path); Path databasesDirPath = path.toAbsolutePath().getParent(); String dbPathString = databasesDirPath.toString() + File.separatorChar + path.getFileName(); Path dbPath = Paths.get(dbPathString); @@ -493,19 +487,16 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { logger.debug("Failed to obtain stats", ex); } - registerGauge(meterRegistry, name, "rocksdb.estimate-table-readers-mem", false); - registerGauge(meterRegistry, name, "rocksdb.size-all-mem-tables", false); - registerGauge(meterRegistry, name, "rocksdb.cur-size-all-mem-tables", false); - registerGauge(meterRegistry, name, "rocksdb.estimate-num-keys", false); - registerGauge(meterRegistry, name, "rocksdb.block-cache-usage", true); - registerGauge(meterRegistry, name, "rocksdb.block-cache-pinned-usage", true); + for (RocksDBLongProperty property : RocksDBLongProperty.values()) { + registerGauge(meterRegistry, name, property.getName(), property.isDividedByColumnFamily()); + } // Bloom seek stats - registerGauge(meterRegistry, name, "rocksdb.bloom.filter.prefix.useful", false); - registerGauge(meterRegistry, name, "rocksdb.bloom.filter.prefix.checked", false); + registerGauge(meterRegistry, name, "rocksdb.bloom.filter.prefix.useful", true); + registerGauge(meterRegistry, name, "rocksdb.bloom.filter.prefix.checked", true); // Bloom point lookup stats - registerGauge(meterRegistry, name, "rocksdb.bloom.filter.useful", false); - registerGauge(meterRegistry, name, "rocksdb.bloom.filter.full.positive", false); - registerGauge(meterRegistry, name, "rocksdb.bloom.filter.full.true.positive", false); + registerGauge(meterRegistry, name, "rocksdb.bloom.filter.useful", true); + registerGauge(meterRegistry, name, "rocksdb.bloom.filter.full.positive", true); + registerGauge(meterRegistry, name, "rocksdb.bloom.filter.full.true.positive", true); } public static boolean isDisableAutoCompactions() { @@ -635,30 +626,59 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { } private void registerGauge(MeterRegistry meterRegistry, String name, String propertyName, boolean divideByAllColumns) { - meterRegistry.gauge("rocksdb.property.value", - List.of(Tag.of("db.name", name), Tag.of("db.property.name", propertyName)), - db, - database -> { - if (closed) { - return 0d; - } - var closeReadLock = closeLock.readLock(); - try { + if (divideByAllColumns) { + for (Entry cfhEntry : handles.entrySet()) { + var columnName = cfhEntry.getKey().name(); + var cfh = cfhEntry.getValue(); + meterRegistry.gauge("rocksdb.property.value", + List.of(Tag.of("db.name", name), Tag.of("db.column.name", columnName), Tag.of("db.property.name", propertyName)), + db, + database -> { + if (closed) { + return 0d; + } + var closeReadLock = closeLock.readLock(); + try { + if (closed) { + return 0d; + } + return database.getLongProperty(cfh, propertyName); + } catch (RocksDBException e) { + if ("NotFound".equals(e.getMessage())) { + return 0d; + } + throw new RuntimeException(e); + } finally { + closeLock.unlockRead(closeReadLock); + } + } + ); + } + } else { + meterRegistry.gauge("rocksdb.property.value", + List.of(Tag.of("db.name", name), Tag.of("db.property.name", propertyName)), + db, + database -> { if (closed) { return 0d; } - return database.getAggregatedLongProperty(propertyName) - / (divideByAllColumns ? getAllColumnFamilyHandles().size() : 1d); - } catch (RocksDBException e) { - if ("NotFound".equals(e.getMessage())) { - return 0d; + var closeReadLock = closeLock.readLock(); + try { + if (closed) { + return 0d; + } + return database.getLongProperty(propertyName); + } catch (RocksDBException e) { + if ("NotFound".equals(e.getMessage())) { + return 0d; + } + throw new RuntimeException(e); + } finally { + closeLock.unlockRead(closeReadLock); } - throw new RuntimeException(e); - } finally { - closeLock.unlockRead(closeReadLock); } - } - ); + ); + } } @Override @@ -817,8 +837,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { options.setDeleteObsoleteFilesPeriodMicros(20 * 1000000); // 20 seconds options.setKeepLogFileNum(10); - Objects.requireNonNull(databasesDirPath); - Objects.requireNonNull(path.getFileName()); + requireNonNull(databasesDirPath); + requireNonNull(path.getFileName()); List paths = convertPaths(databasesDirPath, path.getFileName(), databaseOptions.volumes()) .stream() .map(p -> new DbPath(p.path, p.targetSize)) @@ -1079,19 +1099,6 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { return databaseOptions; } - @Override - public Mono getProperty(String propertyName) { - return Mono.fromCallable(() -> { - var closeReadLock = closeLock.readLock(); - try { - ensureOpen(); - return db.getAggregatedLongProperty(propertyName); - } finally { - closeLock.unlockRead(closeReadLock); - } - }).onErrorMap(cause -> new IOException("Failed to read " + propertyName, cause)).subscribeOn(dbRScheduler); - } - public Flux getSSTS() { var paths = convertPaths(dbPath.toAbsolutePath().getParent(), dbPath.getFileName(), databaseOptions.volumes()); return Mono @@ -1172,6 +1179,126 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { .subscribeOn(dbRScheduler); } + @Override + public Mono> getMapProperty(@Nullable Column column, RocksDBMapProperty property) { + return Mono + .fromCallable(() -> { + var closeReadLock = closeLock.readLock(); + try { + ensureOpen(); + if (column == null) { + return db.getMapProperty(property.getName()); + } else { + var cfh = requireNonNull(handles.get(column)); + return db.getMapProperty(cfh, property.getName()); + } + } finally { + closeLock.unlockRead(closeReadLock); + } + }) + .transform(this::convertNotFoundToEmpty) + .onErrorMap(cause -> new IOException("Failed to read property " + property.name(), cause)) + .subscribeOn(dbRScheduler); + } + + @Override + public Flux>> getMapColumnProperties(RocksDBMapProperty property) { + return Flux + .fromIterable(getAllColumnFamilyHandles().keySet()) + .flatMapSequential(c -> this + .getMapProperty(c, property) + .map(result -> new ColumnProperty<>(c.name(), property.getName(), result))); + } + + @Override + public Mono getStringProperty(@Nullable Column column, RocksDBStringProperty property) { + return Mono + .fromCallable(() -> { + var closeReadLock = closeLock.readLock(); + try { + ensureOpen(); + if (column == null) { + return db.getProperty(property.getName()); + } else { + var cfh = requireNonNull(handles.get(column)); + return db.getProperty(cfh, property.getName()); + } + } finally { + closeLock.unlockRead(closeReadLock); + } + }) + .transform(this::convertNotFoundToEmpty) + .onErrorMap(cause -> new IOException("Failed to read property " + property.name(), cause)) + .subscribeOn(dbRScheduler); + } + + @Override + public Flux> getStringColumnProperties(RocksDBStringProperty property) { + return Flux + .fromIterable(getAllColumnFamilyHandles().keySet()) + .flatMapSequential(c -> this + .getStringProperty(c, property) + .map(result -> new ColumnProperty<>(c.name(), property.getName(), result))); + } + + @Override + public Mono getLongProperty(@Nullable Column column, RocksDBLongProperty property) { + return Mono + .fromCallable(() -> { + var closeReadLock = closeLock.readLock(); + try { + ensureOpen(); + if (column == null) { + return db.getLongProperty(property.getName()); + } else { + var cfh = requireNonNull(handles.get(column)); + return db.getLongProperty(cfh, property.getName()); + } + } finally { + closeLock.unlockRead(closeReadLock); + } + }) + .transform(this::convertNotFoundToEmpty) + .onErrorMap(cause -> new IOException("Failed to read property " + property.name(), cause)) + .subscribeOn(dbRScheduler); + } + + @Override + public Flux> getLongColumnProperties(RocksDBLongProperty property) { + return Flux + .fromIterable(getAllColumnFamilyHandles().keySet()) + .flatMapSequential(c -> this + .getLongProperty(c, property) + .map(result -> new ColumnProperty<>(c.name(), property.getName(), result))); + } + + @Override + public Mono getAggregatedLongProperty(RocksDBLongProperty property) { + return Mono + .fromCallable(() -> { + var closeReadLock = closeLock.readLock(); + try { + ensureOpen(); + return db.getAggregatedLongProperty(property.getName()); + } finally { + closeLock.unlockRead(closeReadLock); + } + }) + .transform(this::convertNotFoundToEmpty) + .onErrorMap(cause -> new IOException("Failed to read property " + property.name(), cause)) + .subscribeOn(dbRScheduler); + } + + private Mono convertNotFoundToEmpty(Mono mono) { + return mono.onErrorResume(RocksDBException.class, ex -> { + if (ex.getMessage().equals("NotFound")) { + return Mono.empty(); + } else { + return Mono.error(ex); + } + }); + } + @Override public Mono getRocksDBStats() { return Mono diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java index e0482bf..234fefa 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java @@ -3,17 +3,22 @@ package it.cavallium.dbengine.database.memory; import io.micrometer.core.instrument.MeterRegistry; import io.netty5.buffer.api.BufferAllocator; import it.cavallium.dbengine.client.MemoryStats; +import it.cavallium.dbengine.database.ColumnProperty; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLKeyValueDatabase; import it.cavallium.dbengine.database.LLSingleton; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.RocksDBLongProperty; +import it.cavallium.dbengine.database.RocksDBMapProperty; +import it.cavallium.dbengine.database.RocksDBStringProperty; import it.cavallium.dbengine.database.TableWithProperties; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.rpc.current.data.Column; import it.unimi.dsi.fastutil.bytes.ByteList; import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicLong; @@ -90,11 +95,6 @@ public class LLMemoryKeyValueDatabase implements LLKeyValueDatabase { )); } - @Override - public Mono getProperty(String propertyName) { - return Mono.empty(); - } - @Override public Mono getMemoryStats() { return Mono.just(new MemoryStats(0, 0, 0, 0, 0, 0)); @@ -105,6 +105,41 @@ public class LLMemoryKeyValueDatabase implements LLKeyValueDatabase { return Mono.empty(); } + @Override + public Mono> getMapProperty(@Nullable Column column, RocksDBMapProperty property) { + return Mono.empty(); + } + + @Override + public Flux>> getMapColumnProperties(RocksDBMapProperty property) { + return Flux.empty(); + } + + @Override + public Mono getStringProperty(@Nullable Column column, RocksDBStringProperty property) { + return Mono.empty(); + } + + @Override + public Flux> getStringColumnProperties(RocksDBStringProperty property) { + return Flux.empty(); + } + + @Override + public Mono getLongProperty(@Nullable Column column, RocksDBLongProperty property) { + return Mono.empty(); + } + + @Override + public Flux> getLongColumnProperties(RocksDBLongProperty property) { + return Flux.empty(); + } + + @Override + public Mono getAggregatedLongProperty(RocksDBLongProperty property) { + return Mono.empty(); + } + @Override public Flux getTableProperties() { return Flux.empty(); diff --git a/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java b/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java index 851f0e0..7aa93c5 100644 --- a/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java @@ -10,6 +10,7 @@ import io.netty5.buffer.api.Send; import it.cavallium.dbengine.client.MemoryStats; import it.cavallium.dbengine.client.query.current.data.Query; import it.cavallium.dbengine.client.query.current.data.QueryParams; +import it.cavallium.dbengine.database.ColumnProperty; import it.cavallium.dbengine.database.LLDatabaseConnection; import it.cavallium.dbengine.database.LLDelta; import it.cavallium.dbengine.database.LLDictionary; @@ -21,13 +22,15 @@ import it.cavallium.dbengine.database.LLSingleton; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLTerm; import it.cavallium.dbengine.database.LLUpdateDocument; +import it.cavallium.dbengine.database.RocksDBLongProperty; +import it.cavallium.dbengine.database.RocksDBMapProperty; +import it.cavallium.dbengine.database.RocksDBStringProperty; import it.cavallium.dbengine.database.TableWithProperties; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.disk.BinarySerializationFunction; import it.cavallium.dbengine.database.remote.RPCCodecs.RPCEventCodec; import it.cavallium.dbengine.database.serialization.SerializationException; -import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.lucene.LuceneHacks; import it.cavallium.dbengine.lucene.collector.Buckets; import it.cavallium.dbengine.lucene.searcher.BucketParams; @@ -61,13 +64,12 @@ import java.io.File; import java.net.SocketAddress; import java.time.Duration; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import org.rocksdb.DBOptions; -import org.rocksdb.TableProperties; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.incubator.quic.QuicClient; @@ -321,11 +323,6 @@ public class LLQuicConnection implements LLDatabaseConnection { return null; } - @Override - public Mono getProperty(String propertyName) { - return null; - } - @Override public Mono getMemoryStats() { return null; @@ -336,6 +333,41 @@ public class LLQuicConnection implements LLDatabaseConnection { return null; } + @Override + public Mono getAggregatedLongProperty(RocksDBLongProperty property) { + return null; + } + + @Override + public Mono getStringProperty(@Nullable Column column, RocksDBStringProperty property) { + return null; + } + + @Override + public Flux> getStringColumnProperties(RocksDBStringProperty property) { + return null; + } + + @Override + public Mono getLongProperty(@Nullable Column column, RocksDBLongProperty property) { + return null; + } + + @Override + public Flux> getLongColumnProperties(RocksDBLongProperty property) { + return null; + } + + @Override + public Mono> getMapProperty(@Nullable Column column, RocksDBMapProperty property) { + return null; + } + + @Override + public Flux>> getMapColumnProperties(RocksDBMapProperty property) { + return null; + } + @Override public Flux getTableProperties() { return null;