Custom block size, fix cache stats, fix db close, disable clock cache

This commit is contained in:
Andrea Cavalli 2022-04-11 01:27:09 +02:00
parent eb5792bbe0
commit 1cac7cb0c9
3 changed files with 83 additions and 67 deletions

View File

@ -19,6 +19,7 @@ interfacesData:
memtableMemoryBudgetBytes: -long
cacheIndexAndFilterBlocks: -boolean
filter: -Filter
blockSize: -int
# versions must have only numbers, lowercase letters, dots, dashes. Maximum: 99.999.9999
versions:
0.0.0:
@ -256,6 +257,7 @@ versions:
memtableMemoryBudgetBytes: -long
cacheIndexAndFilterBlocks: -boolean
filter: -Filter
blockSize: -int
# Remember to update ColumnOptions common getters
NamedColumnOptions:
data:
@ -264,6 +266,7 @@ versions:
memtableMemoryBudgetBytes: -long
cacheIndexAndFilterBlocks: -boolean
filter: -Filter
blockSize: -int
BloomFilter:
data:
bitsPerKey: int

View File

@ -22,7 +22,8 @@ public class DefaultDatabaseOptions {
Collections.emptyList(),
Nullablelong.empty(),
Nullableboolean.empty(),
NullableFilter.empty()
NullableFilter.empty(),
Nullableint.empty()
);
public static NamedColumnOptions DEFAULT_NAMED_COLUMN_OPTIONS = new NamedColumnOptions(
@ -30,7 +31,8 @@ public class DefaultDatabaseOptions {
Collections.emptyList(),
Nullablelong.empty(),
Nullableboolean.empty(),
NullableFilter.empty()
NullableFilter.empty(),
Nullableint.empty()
);
public static DatabaseOptions DEFAULT_DATABASE_OPTIONS = new DatabaseOptions(List.of(),

View File

@ -51,7 +51,6 @@ import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.Cache;
import org.rocksdb.ChecksumType;
import org.rocksdb.ClockCache;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
@ -64,6 +63,7 @@ import org.rocksdb.DbPath;
import org.rocksdb.FlushOptions;
import org.rocksdb.IndexType;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.LRUCache;
import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
@ -109,6 +109,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
private final boolean enableColumnsBug;
private RocksDB db;
private Cache standardCache;
private Cache compressedCache;
private final Map<Column, ColumnFamilyHandle> handles;
private final ConcurrentHashMap<Long, Snapshot> snapshotsHandles = new ConcurrentHashMap<>();
private final AtomicLong nextSnapshotNumbers = new AtomicLong(1);
@ -186,9 +188,12 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
columnFamilyOptions.setMaxBytesForLevelBase((databaseOptions.spinning() ? 1024 : 256) * SizeUnit.MB);
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
columnFamilyOptions.setMaxBytesForLevelMultiplier(10);
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
columnFamilyOptions.setLevelCompactionDynamicLevelBytes(true);
// This option is not supported with multiple db paths
if (databaseOptions.volumes().size() <= 1) {
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
columnFamilyOptions.setLevelCompactionDynamicLevelBytes(true);
}
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
columnFamilyOptions.setLevel0FileNumCompactionTrigger(2);
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
@ -263,16 +268,18 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
.setCacheIndexAndFilterBlocksWithHighPriority(true)
.setCacheIndexAndFilterBlocks(cacheIndexAndFilterBlocks)
//.setPartitionFilters(true)
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
.setPartitionFilters(true)
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
.setIndexType(IndexType.kTwoLevelIndexSearch)
//todo: replace with kxxhash3
.setChecksumType(ChecksumType.kxxHash)
.setBlockCacheCompressed(optionsWithCache.compressedCache())
.setBlockCache(optionsWithCache.standardCache())
// Spinning disks: 64KiB to 256KiB (also 512KiB). SSDs: 16KiB
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
// https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html
.setBlockSize((databaseOptions.spinning() ? 128 : 16) * SizeUnit.KB);
.setBlockSize(columnOptions.blockSize().orElse((databaseOptions.spinning() ? 128 : 16) * 1024))
.setBlockCacheCompressed(optionsWithCache.compressedCache())
.setBlockCache(optionsWithCache.standardCache());
columnFamilyOptions.setTableFormatConfig(tableOptions);
columnFamilyOptions.setCompactionPriority(CompactionPriority.MinOverlappingRatio);
@ -354,7 +361,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
}
this.enableColumnsBug = "true".equals(databaseOptions.extraFlags().getOrDefault("enableColumnBug", "false"));
createIfNotExists(descriptors, rocksdbOptions, inMemory, dbPath, dbPathString);
createIfNotExists(descriptors, rocksdbOptions, standardCache, compressedCache, inMemory, dbPath, dbPathString);
while (true) {
try {
@ -372,6 +379,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
handles
);
}
this.standardCache = optionsWithCache.standardCache;
this.compressedCache = optionsWithCache.compressedCache;
break;
} catch (RocksDBException ex) {
switch (ex.getMessage()) {
@ -409,19 +418,32 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
throw new IOException(ex);
}
registerGauge(meterRegistry, name, "rocksdb.estimate-table-readers-mem");
registerGauge(meterRegistry, name, "rocksdb.size-all-mem-tables");
registerGauge(meterRegistry, name, "rocksdb.cur-size-all-mem-tables");
registerGauge(meterRegistry, name, "rocksdb.estimate-num-keys");
registerGauge(meterRegistry, name, "rocksdb.block-cache-usage");
registerGauge(meterRegistry, name, "rocksdb.block-cache-pinned-usage");
try {
for (ColumnFamilyHandle cfh : handles) {
var props = db.getProperty(cfh, "rocksdb.stats");
logger.trace("Stats for database {}, column {}: {}",
name,
new String(cfh.getName(), StandardCharsets.UTF_8),
props
);
}
} catch (RocksDBException ex) {
logger.debug("Failed to obtain stats", ex);
}
registerGauge(meterRegistry, name, "rocksdb.estimate-table-readers-mem", false);
registerGauge(meterRegistry, name, "rocksdb.size-all-mem-tables", false);
registerGauge(meterRegistry, name, "rocksdb.cur-size-all-mem-tables", false);
registerGauge(meterRegistry, name, "rocksdb.estimate-num-keys", false);
registerGauge(meterRegistry, name, "rocksdb.block-cache-usage", true);
registerGauge(meterRegistry, name, "rocksdb.block-cache-pinned-usage", true);
// Bloom seek stats
registerGauge(meterRegistry, name, "rocksdb.bloom.filter.prefix.useful");
registerGauge(meterRegistry, name, "rocksdb.bloom.filter.prefix.checked");
registerGauge(meterRegistry, name, "rocksdb.bloom.filter.prefix.useful", false);
registerGauge(meterRegistry, name, "rocksdb.bloom.filter.prefix.checked", false);
// Bloom point lookup stats
registerGauge(meterRegistry, name, "rocksdb.bloom.filter.useful");
registerGauge(meterRegistry, name, "rocksdb.bloom.filter.full.positive");
registerGauge(meterRegistry, name, "rocksdb.bloom.filter.full.true.positive");
registerGauge(meterRegistry, name, "rocksdb.bloom.filter.useful", false);
registerGauge(meterRegistry, name, "rocksdb.bloom.filter.full.positive", false);
registerGauge(meterRegistry, name, "rocksdb.bloom.filter.full.true.positive", false);
}
public Map<Column, ColumnFamilyHandle> getAllColumnFamilyHandles() {
@ -441,7 +463,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
return new RocksLevelOptions(compressionType, compressionOptions);
}
private void registerGauge(MeterRegistry meterRegistry, String name, String propertyName) {
private void registerGauge(MeterRegistry meterRegistry, String name, String propertyName, boolean divideByAllColumns) {
meterRegistry.gauge("rocksdb.property.value",
List.of(Tag.of("db.name", name), Tag.of("db.property.name", propertyName)),
db,
@ -450,7 +472,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
return 0d;
}
try {
return database.getAggregatedLongProperty(propertyName);
return database.getAggregatedLongProperty(propertyName)
/ (divideByAllColumns ? getAllColumnFamilyHandles().size() : 1d);
} catch (RocksDBException e) {
if ("NotFound".equals(e.getMessage())) {
return 0d;
@ -466,53 +489,32 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
return name;
}
private void flushAndCloseDb(RocksDB db, List<ColumnFamilyHandle> handles)
private void flushAndCloseDb(RocksDB db, Cache standardCache, Cache compressedCache, List<ColumnFamilyHandle> handles)
throws RocksDBException {
if (!db.isOwningHandle()) {
return;
}
flushDb(db, handles);
if (!db.isOwningHandle()) {
return;
if (db.isOwningHandle()) {
flushDb(db, handles);
}
for (ColumnFamilyHandle handle : handles) {
try {
if (handle.isOwningHandle()) {
handle.close();
}
handle.close();
} catch (Exception ex) {
logger.error("Can't close column family", ex);
}
}
if (!db.isOwningHandle()) {
return;
}
try {
db.closeE();
} catch (RocksDBException ex) {
if ("Cannot close DB with unreleased snapshot.".equals(ex.getMessage())) {
snapshotsHandles.forEach((id, snapshot) -> {
try {
if (!db.isOwningHandle()) {
return;
}
if (!snapshot.isOwningHandle()) {
return;
}
db.releaseSnapshot(snapshot);
} catch (Exception ex2) {
// ignore exception
logger.debug("Failed to release snapshot " + id, ex2);
}
});
if (!db.isOwningHandle()) {
return;
snapshotsHandles.forEach((id, snapshot) -> {
try {
if (db.isOwningHandle() && snapshot.isOwningHandle()) {
db.releaseSnapshot(snapshot);
}
db.closeE();
} catch (Exception ex2) {
// ignore exception
logger.debug("Failed to release snapshot " + id, ex2);
}
throw ex;
}
});
db.closeE();
compressedCache.close();
standardCache.close();
}
private void flushDb(RocksDB db, List<ColumnFamilyHandle> handles) throws RocksDBException {
@ -624,8 +626,9 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
.setWalSizeLimitMB(0)
.setMaxTotalWalSize(0) // automatic
;
blockCache = new ClockCache(databaseOptions.blockCache().orElse( 8L * SizeUnit.MB), 6, false);
compressedCache = new ClockCache(databaseOptions.compressedBlockCache().orElse( 8L * SizeUnit.MB), 6, false);
// DO NOT USE ClockCache! IT'S BROKEN!
blockCache = new LRUCache(databaseOptions.blockCache().orElse( 8L * SizeUnit.MB));
compressedCache = new LRUCache(databaseOptions.compressedBlockCache().orElse( 8L * SizeUnit.MB));
if (databaseOptions.spinning()) {
options
@ -654,8 +657,9 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
.setWalSizeLimitMB(0)
.setMaxTotalWalSize(80 * SizeUnit.MB) // 80MiB max wal directory size
;
blockCache = new ClockCache(databaseOptions.blockCache().orElse( 512 * SizeUnit.MB), 6, false);
compressedCache = new ClockCache(databaseOptions.compressedBlockCache().orElse( 512 * SizeUnit.MB), 6, false);
// DO NOT USE ClockCache! IT'S BROKEN!
blockCache = new LRUCache(databaseOptions.blockCache().orElse( 512 * SizeUnit.MB));
compressedCache = new LRUCache(databaseOptions.compressedBlockCache().orElse( 512 * SizeUnit.MB));
if (databaseOptions.useDirectIO()) {
options
@ -669,6 +673,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
options.setIncreaseParallelism(Runtime.getRuntime().availableProcessors());
}
options.setRowCache(blockCache);
options.setWriteBufferManager(new WriteBufferManager(256L * 1024L * 1024L, blockCache));
if (databaseOptions.useDirectIO()) {
@ -715,6 +720,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
private void createIfNotExists(List<ColumnFamilyDescriptor> descriptors,
DBOptions options,
Cache standardCache,
Cache compressedCache,
boolean inMemory,
Path dbPath,
String dbPathString) throws RocksDBException {
@ -739,14 +746,18 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
LinkedList<ColumnFamilyHandle> handles = new LinkedList<>();
this.db = RocksDB.open(new DBOptions(options).setCreateMissingColumnFamilies(true),
this.db = RocksDB.open(options.setCreateMissingColumnFamilies(true),
dbPathString,
descriptors,
handles
);
this.standardCache = standardCache;
this.compressedCache = compressedCache;
flushAndCloseDb(db, handles);
flushAndCloseDb(db, standardCache, compressedCache, handles);
this.db = null;
this.standardCache = null;
this.compressedCache = null;
}
}
@ -943,7 +954,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
.<Void>fromCallable(() -> {
try {
closed = true;
flushAndCloseDb(db, new ArrayList<>(handles.values()));
flushAndCloseDb(db, standardCache, compressedCache, new ArrayList<>(handles.values()));
deleteUnusedOldLogFiles();
} catch (RocksDBException e) {
throw new IOException(e);