diff --git a/src/main/data-generator/quic-rpc.yaml b/src/main/data-generator/quic-rpc.yaml index 99ed2d1..9e943cd 100644 --- a/src/main/data-generator/quic-rpc.yaml +++ b/src/main/data-generator/quic-rpc.yaml @@ -19,6 +19,7 @@ interfacesData: memtableMemoryBudgetBytes: -long cacheIndexAndFilterBlocks: -boolean filter: -Filter + blockSize: -int # versions must have only numbers, lowercase letters, dots, dashes. Maximum: 99.999.9999 versions: 0.0.0: @@ -256,6 +257,7 @@ versions: memtableMemoryBudgetBytes: -long cacheIndexAndFilterBlocks: -boolean filter: -Filter + blockSize: -int # Remember to update ColumnOptions common getters NamedColumnOptions: data: @@ -264,6 +266,7 @@ versions: memtableMemoryBudgetBytes: -long cacheIndexAndFilterBlocks: -boolean filter: -Filter + blockSize: -int BloomFilter: data: bitsPerKey: int diff --git a/src/main/java/it/cavallium/dbengine/client/DefaultDatabaseOptions.java b/src/main/java/it/cavallium/dbengine/client/DefaultDatabaseOptions.java index 554b28d..02e3492 100644 --- a/src/main/java/it/cavallium/dbengine/client/DefaultDatabaseOptions.java +++ b/src/main/java/it/cavallium/dbengine/client/DefaultDatabaseOptions.java @@ -22,7 +22,8 @@ public class DefaultDatabaseOptions { Collections.emptyList(), Nullablelong.empty(), Nullableboolean.empty(), - NullableFilter.empty() + NullableFilter.empty(), + Nullableint.empty() ); public static NamedColumnOptions DEFAULT_NAMED_COLUMN_OPTIONS = new NamedColumnOptions( @@ -30,7 +31,8 @@ public class DefaultDatabaseOptions { Collections.emptyList(), Nullablelong.empty(), Nullableboolean.empty(), - NullableFilter.empty() + NullableFilter.empty(), + Nullableint.empty() ); public static DatabaseOptions DEFAULT_DATABASE_OPTIONS = new DatabaseOptions(List.of(), diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 1b08799..fa995f6 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -51,7 +51,6 @@ import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.BloomFilter; import org.rocksdb.Cache; import org.rocksdb.ChecksumType; -import org.rocksdb.ClockCache; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; @@ -64,6 +63,7 @@ import org.rocksdb.DbPath; import org.rocksdb.FlushOptions; import org.rocksdb.IndexType; import org.rocksdb.InfoLogLevel; +import org.rocksdb.LRUCache; import org.rocksdb.OptimisticTransactionDB; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; @@ -109,6 +109,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { private final boolean enableColumnsBug; private RocksDB db; + private Cache standardCache; + private Cache compressedCache; private final Map handles; private final ConcurrentHashMap snapshotsHandles = new ConcurrentHashMap<>(); private final AtomicLong nextSnapshotNumbers = new AtomicLong(1); @@ -186,9 +188,12 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { columnFamilyOptions.setMaxBytesForLevelBase((databaseOptions.spinning() ? 1024 : 256) * SizeUnit.MB); // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html columnFamilyOptions.setMaxBytesForLevelMultiplier(10); - // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html - // https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks - columnFamilyOptions.setLevelCompactionDynamicLevelBytes(true); + // This option is not supported with multiple db paths + if (databaseOptions.volumes().size() <= 1) { + // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html + // https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks + columnFamilyOptions.setLevelCompactionDynamicLevelBytes(true); + } // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html columnFamilyOptions.setLevel0FileNumCompactionTrigger(2); // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html @@ -263,16 +268,18 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters .setCacheIndexAndFilterBlocksWithHighPriority(true) .setCacheIndexAndFilterBlocks(cacheIndexAndFilterBlocks) - //.setPartitionFilters(true) + // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters + .setPartitionFilters(true) + // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters .setIndexType(IndexType.kTwoLevelIndexSearch) //todo: replace with kxxhash3 .setChecksumType(ChecksumType.kxxHash) - .setBlockCacheCompressed(optionsWithCache.compressedCache()) - .setBlockCache(optionsWithCache.standardCache()) // Spinning disks: 64KiB to 256KiB (also 512KiB). SSDs: 16KiB // https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks // https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html - .setBlockSize((databaseOptions.spinning() ? 128 : 16) * SizeUnit.KB); + .setBlockSize(columnOptions.blockSize().orElse((databaseOptions.spinning() ? 128 : 16) * 1024)) + .setBlockCacheCompressed(optionsWithCache.compressedCache()) + .setBlockCache(optionsWithCache.standardCache()); columnFamilyOptions.setTableFormatConfig(tableOptions); columnFamilyOptions.setCompactionPriority(CompactionPriority.MinOverlappingRatio); @@ -354,7 +361,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { } this.enableColumnsBug = "true".equals(databaseOptions.extraFlags().getOrDefault("enableColumnBug", "false")); - createIfNotExists(descriptors, rocksdbOptions, inMemory, dbPath, dbPathString); + createIfNotExists(descriptors, rocksdbOptions, standardCache, compressedCache, inMemory, dbPath, dbPathString); while (true) { try { @@ -372,6 +379,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { handles ); } + this.standardCache = optionsWithCache.standardCache; + this.compressedCache = optionsWithCache.compressedCache; break; } catch (RocksDBException ex) { switch (ex.getMessage()) { @@ -409,19 +418,32 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { throw new IOException(ex); } - registerGauge(meterRegistry, name, "rocksdb.estimate-table-readers-mem"); - registerGauge(meterRegistry, name, "rocksdb.size-all-mem-tables"); - registerGauge(meterRegistry, name, "rocksdb.cur-size-all-mem-tables"); - registerGauge(meterRegistry, name, "rocksdb.estimate-num-keys"); - registerGauge(meterRegistry, name, "rocksdb.block-cache-usage"); - registerGauge(meterRegistry, name, "rocksdb.block-cache-pinned-usage"); + try { + for (ColumnFamilyHandle cfh : handles) { + var props = db.getProperty(cfh, "rocksdb.stats"); + logger.trace("Stats for database {}, column {}: {}", + name, + new String(cfh.getName(), StandardCharsets.UTF_8), + props + ); + } + } catch (RocksDBException ex) { + logger.debug("Failed to obtain stats", ex); + } + + registerGauge(meterRegistry, name, "rocksdb.estimate-table-readers-mem", false); + registerGauge(meterRegistry, name, "rocksdb.size-all-mem-tables", false); + registerGauge(meterRegistry, name, "rocksdb.cur-size-all-mem-tables", false); + registerGauge(meterRegistry, name, "rocksdb.estimate-num-keys", false); + registerGauge(meterRegistry, name, "rocksdb.block-cache-usage", true); + registerGauge(meterRegistry, name, "rocksdb.block-cache-pinned-usage", true); // Bloom seek stats - registerGauge(meterRegistry, name, "rocksdb.bloom.filter.prefix.useful"); - registerGauge(meterRegistry, name, "rocksdb.bloom.filter.prefix.checked"); + registerGauge(meterRegistry, name, "rocksdb.bloom.filter.prefix.useful", false); + registerGauge(meterRegistry, name, "rocksdb.bloom.filter.prefix.checked", false); // Bloom point lookup stats - registerGauge(meterRegistry, name, "rocksdb.bloom.filter.useful"); - registerGauge(meterRegistry, name, "rocksdb.bloom.filter.full.positive"); - registerGauge(meterRegistry, name, "rocksdb.bloom.filter.full.true.positive"); + registerGauge(meterRegistry, name, "rocksdb.bloom.filter.useful", false); + registerGauge(meterRegistry, name, "rocksdb.bloom.filter.full.positive", false); + registerGauge(meterRegistry, name, "rocksdb.bloom.filter.full.true.positive", false); } public Map getAllColumnFamilyHandles() { @@ -441,7 +463,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { return new RocksLevelOptions(compressionType, compressionOptions); } - private void registerGauge(MeterRegistry meterRegistry, String name, String propertyName) { + private void registerGauge(MeterRegistry meterRegistry, String name, String propertyName, boolean divideByAllColumns) { meterRegistry.gauge("rocksdb.property.value", List.of(Tag.of("db.name", name), Tag.of("db.property.name", propertyName)), db, @@ -450,7 +472,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { return 0d; } try { - return database.getAggregatedLongProperty(propertyName); + return database.getAggregatedLongProperty(propertyName) + / (divideByAllColumns ? getAllColumnFamilyHandles().size() : 1d); } catch (RocksDBException e) { if ("NotFound".equals(e.getMessage())) { return 0d; @@ -466,53 +489,32 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { return name; } - private void flushAndCloseDb(RocksDB db, List handles) + private void flushAndCloseDb(RocksDB db, Cache standardCache, Cache compressedCache, List handles) throws RocksDBException { - if (!db.isOwningHandle()) { - return; - } - flushDb(db, handles); - if (!db.isOwningHandle()) { - return; + if (db.isOwningHandle()) { + flushDb(db, handles); } for (ColumnFamilyHandle handle : handles) { try { - if (handle.isOwningHandle()) { - handle.close(); - } + handle.close(); } catch (Exception ex) { logger.error("Can't close column family", ex); } } - if (!db.isOwningHandle()) { - return; - } - try { - db.closeE(); - } catch (RocksDBException ex) { - if ("Cannot close DB with unreleased snapshot.".equals(ex.getMessage())) { - snapshotsHandles.forEach((id, snapshot) -> { - try { - if (!db.isOwningHandle()) { - return; - } - if (!snapshot.isOwningHandle()) { - return; - } - db.releaseSnapshot(snapshot); - } catch (Exception ex2) { - // ignore exception - logger.debug("Failed to release snapshot " + id, ex2); - } - }); - if (!db.isOwningHandle()) { - return; + snapshotsHandles.forEach((id, snapshot) -> { + try { + if (db.isOwningHandle() && snapshot.isOwningHandle()) { + db.releaseSnapshot(snapshot); } - db.closeE(); + } catch (Exception ex2) { + // ignore exception + logger.debug("Failed to release snapshot " + id, ex2); } - throw ex; - } + }); + db.closeE(); + compressedCache.close(); + standardCache.close(); } private void flushDb(RocksDB db, List handles) throws RocksDBException { @@ -624,8 +626,9 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { .setWalSizeLimitMB(0) .setMaxTotalWalSize(0) // automatic ; - blockCache = new ClockCache(databaseOptions.blockCache().orElse( 8L * SizeUnit.MB), 6, false); - compressedCache = new ClockCache(databaseOptions.compressedBlockCache().orElse( 8L * SizeUnit.MB), 6, false); + // DO NOT USE ClockCache! IT'S BROKEN! + blockCache = new LRUCache(databaseOptions.blockCache().orElse( 8L * SizeUnit.MB)); + compressedCache = new LRUCache(databaseOptions.compressedBlockCache().orElse( 8L * SizeUnit.MB)); if (databaseOptions.spinning()) { options @@ -654,8 +657,9 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { .setWalSizeLimitMB(0) .setMaxTotalWalSize(80 * SizeUnit.MB) // 80MiB max wal directory size ; - blockCache = new ClockCache(databaseOptions.blockCache().orElse( 512 * SizeUnit.MB), 6, false); - compressedCache = new ClockCache(databaseOptions.compressedBlockCache().orElse( 512 * SizeUnit.MB), 6, false); + // DO NOT USE ClockCache! IT'S BROKEN! + blockCache = new LRUCache(databaseOptions.blockCache().orElse( 512 * SizeUnit.MB)); + compressedCache = new LRUCache(databaseOptions.compressedBlockCache().orElse( 512 * SizeUnit.MB)); if (databaseOptions.useDirectIO()) { options @@ -669,6 +673,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { options.setIncreaseParallelism(Runtime.getRuntime().availableProcessors()); } + options.setRowCache(blockCache); options.setWriteBufferManager(new WriteBufferManager(256L * 1024L * 1024L, blockCache)); if (databaseOptions.useDirectIO()) { @@ -715,6 +720,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { private void createIfNotExists(List descriptors, DBOptions options, + Cache standardCache, + Cache compressedCache, boolean inMemory, Path dbPath, String dbPathString) throws RocksDBException { @@ -739,14 +746,18 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { LinkedList handles = new LinkedList<>(); - this.db = RocksDB.open(new DBOptions(options).setCreateMissingColumnFamilies(true), + this.db = RocksDB.open(options.setCreateMissingColumnFamilies(true), dbPathString, descriptors, handles ); + this.standardCache = standardCache; + this.compressedCache = compressedCache; - flushAndCloseDb(db, handles); + flushAndCloseDb(db, standardCache, compressedCache, handles); this.db = null; + this.standardCache = null; + this.compressedCache = null; } } @@ -943,7 +954,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { .fromCallable(() -> { try { closed = true; - flushAndCloseDb(db, new ArrayList<>(handles.values())); + flushAndCloseDb(db, standardCache, compressedCache, new ArrayList<>(handles.values())); deleteUnusedOldLogFiles(); } catch (RocksDBException e) { throw new IOException(e);