Fix options

This commit is contained in:
Andrea Cavalli 2022-03-10 02:38:57 +01:00
parent 325457dd44
commit 16f6025b30

View File

@ -41,12 +41,14 @@ import org.apache.commons.lang3.time.StopWatch;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.rocksdb.AccessHint;
import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter; import org.rocksdb.BloomFilter;
import org.rocksdb.Cache; import org.rocksdb.Cache;
import org.rocksdb.ClockCache; import org.rocksdb.ClockCache;
import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactRangeOptions; import org.rocksdb.CompactRangeOptions;
import org.rocksdb.CompactionPriority; import org.rocksdb.CompactionPriority;
import org.rocksdb.CompactionStyle; import org.rocksdb.CompactionStyle;
@ -56,6 +58,7 @@ import org.rocksdb.DBOptions;
import org.rocksdb.DbPath; import org.rocksdb.DbPath;
import org.rocksdb.FlushOptions; import org.rocksdb.FlushOptions;
import org.rocksdb.IndexType; import org.rocksdb.IndexType;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.OptimisticTransactionDB; import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.Options; import org.rocksdb.Options;
import org.rocksdb.RocksDB; import org.rocksdb.RocksDB;
@ -137,14 +140,71 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
} }
} }
Options rocksdbOptions = openRocksDb(path, databaseOptions); var optionsWithCache = openRocksDb(path, databaseOptions);
var rocksdbOptions = optionsWithCache.options();
try { try {
List<ColumnFamilyDescriptor> descriptors = new LinkedList<>(); List<ColumnFamilyDescriptor> descriptors = new LinkedList<>();
descriptors descriptors
.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY)); .add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY));
for (Column column : columns) { for (Column column : columns) {
var columnOptions = new ColumnFamilyOptions();
//noinspection ConstantConditions
if (databaseOptions.memtableMemoryBudgetBytes() != null) {
// 16MiB/256MiB of ram will be used for level style compaction
columnOptions.optimizeLevelStyleCompaction(databaseOptions
.memtableMemoryBudgetBytes()
.orElse(databaseOptions.lowMemory() ? 16L * SizeUnit.MB : 128L * SizeUnit.MB));
}
if (!databaseOptions.volumes().isEmpty()) {
int btmIndex = databaseOptions.volumes().size() - 1;
columnOptions.setCompressionType(databaseOptions.volumes().get(0).compression().getType());
columnOptions.setCompressionOptions(new CompressionOptions().setMaxDictBytes((int) (512L * SizeUnit.MB)));
columnOptions.setBottommostCompressionType(databaseOptions.volumes().get(btmIndex).compression().getType());
columnOptions.setBottommostCompressionOptions(new CompressionOptions().setMaxDictBytes((int) (512L * SizeUnit.MB)));
columnOptions.setCompressionPerLevel(databaseOptions.volumes().stream().map(v -> v.compression().getType()).toList());
} else {
columnOptions.setCompressionType(CompressionType.LZ4_COMPRESSION);
columnOptions.setCompressionOptions(new CompressionOptions().setMaxDictBytes((int) (512L * SizeUnit.MB)));
columnOptions.setBottommostCompressionType(CompressionType.LZ4HC_COMPRESSION);
columnOptions.setBottommostCompressionOptions(new CompressionOptions().setMaxDictBytes((int) (512L * SizeUnit.MB)));
}
final BlockBasedTableConfig tableOptions = new BlockBasedTableConfig();
if (databaseOptions.lowMemory()) {
tableOptions
.setIndexType(IndexType.kTwoLevelIndexSearch)
.setPartitionFilters(true)
.setBlockCache(optionsWithCache.cache())
.setCacheIndexAndFilterBlocks(databaseOptions.setCacheIndexAndFilterBlocks().orElse(true))
.setCacheIndexAndFilterBlocksWithHighPriority(true)
.setPinTopLevelIndexAndFilter(true)
.setPinL0FilterAndIndexBlocksInCache(true)
;
} else {
tableOptions
.setIndexType(IndexType.kTwoLevelIndexSearch)
.setPartitionFilters(true)
.setBlockCache(optionsWithCache.cache())
.setCacheIndexAndFilterBlocks(databaseOptions.setCacheIndexAndFilterBlocks().orElse( true))
.setCacheIndexAndFilterBlocksWithHighPriority(true)
.setPinTopLevelIndexAndFilter(true)
.setPinL0FilterAndIndexBlocksInCache(true)
;
final BloomFilter bloomFilter = new BloomFilter(3, false);
tableOptions.setFilterPolicy(bloomFilter);
tableOptions.setOptimizeFiltersForMemory(true);
}
tableOptions.setBlockSize(16 * 1024); // 16KiB
columnOptions.setTableFormatConfig(tableOptions);
columnOptions.setCompactionPriority(CompactionPriority.MinOverlappingRatio);
descriptors descriptors
.add(new ColumnFamilyDescriptor(column.name().getBytes(StandardCharsets.US_ASCII))); .add(new ColumnFamilyDescriptor(column.name().getBytes(StandardCharsets.US_ASCII), columnOptions));
} }
// Get databases directory path // Get databases directory path
@ -178,9 +238,9 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
try { try {
// a factory method that returns a RocksDB instance // a factory method that returns a RocksDB instance
if (databaseOptions.optimistic()) { if (databaseOptions.optimistic()) {
this.db = OptimisticTransactionDB.open(new DBOptions(rocksdbOptions), dbPathString, descriptors, handles); this.db = OptimisticTransactionDB.open(rocksdbOptions, dbPathString, descriptors, handles);
} else { } else {
this.db = TransactionDB.open(new DBOptions(rocksdbOptions), this.db = TransactionDB.open(rocksdbOptions,
new TransactionDBOptions() new TransactionDBOptions()
.setWritePolicy(TxnDBWritePolicy.WRITE_COMMITTED) .setWritePolicy(TxnDBWritePolicy.WRITE_COMMITTED)
.setTransactionLockTimeout(5000) .setTransactionLockTimeout(5000)
@ -334,7 +394,9 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
// end force compact // end force compact
} }
private static Options openRocksDb(@Nullable Path path, DatabaseOptions databaseOptions) throws IOException { record OptionsWithCache(DBOptions options, Cache cache) {}
private static OptionsWithCache openRocksDb(@Nullable Path path, DatabaseOptions databaseOptions) throws IOException {
// Get databases directory path // Get databases directory path
Path databasesDirPath; Path databasesDirPath;
if (path != null) { if (path != null) {
@ -349,13 +411,10 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
// the Options class contains a set of configurable DB options // the Options class contains a set of configurable DB options
// that determines the behaviour of the database. // that determines the behaviour of the database.
var options = new Options(); var options = new DBOptions();
options.setCreateIfMissing(true); options.setCreateIfMissing(true);
options.setCreateMissingColumnFamilies(true); options.setCreateMissingColumnFamilies(true);
options.setCompactionStyle(CompactionStyle.LEVEL); options.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL);
options.setTargetFileSizeMultiplier(2); // Each level is 2 times the previous level
options.setMinWriteBufferNumberToMerge(2);
options.setMaxWriteBufferNumber(4);
options.setAvoidFlushDuringShutdown(false); // Flush all WALs during shutdown options.setAvoidFlushDuringShutdown(false); // Flush all WALs during shutdown
options.setAvoidFlushDuringRecovery(false); // Flush all WALs during startup options.setAvoidFlushDuringRecovery(false); // Flush all WALs during startup
options.setWalRecoveryMode(databaseOptions.absoluteConsistency() options.setWalRecoveryMode(databaseOptions.absoluteConsistency()
@ -369,7 +428,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
List<DbPath> paths = convertPaths(databasesDirPath, path.getFileName(), databaseOptions.volumes()); List<DbPath> paths = convertPaths(databasesDirPath, path.getFileName(), databaseOptions.volumes());
options.setDbPaths(paths); options.setDbPaths(paths);
options.setMaxOpenFiles(databaseOptions.maxOpenFiles().orElse(-1)); options.setMaxOpenFiles(databaseOptions.maxOpenFiles().orElse(-1));
final BlockBasedTableConfig tableOptions = new BlockBasedTableConfig();
Cache blockCache; Cache blockCache;
if (databaseOptions.lowMemory()) { if (databaseOptions.lowMemory()) {
// LOW MEMORY // LOW MEMORY
@ -377,21 +436,12 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
.setBytesPerSync(0) // default .setBytesPerSync(0) // default
.setWalBytesPerSync(0) // default .setWalBytesPerSync(0) // default
.setIncreaseParallelism(1) .setIncreaseParallelism(1)
.setWriteBufferSize(8 * SizeUnit.MB) .setDbWriteBufferSize(8 * SizeUnit.MB)
.setWalTtlSeconds(0) .setWalTtlSeconds(0)
.setWalSizeLimitMB(0) // 16MB .setWalSizeLimitMB(0) // 16MB
.setMaxTotalWalSize(0) // automatic .setMaxTotalWalSize(0) // automatic
; ;
blockCache = new ClockCache(databaseOptions.blockCache().orElse( 8L * SizeUnit.MB), -1, true); blockCache = new ClockCache(databaseOptions.blockCache().orElse( 8L * SizeUnit.MB), -1, true);
tableOptions
.setIndexType(IndexType.kTwoLevelIndexSearch)
.setPartitionFilters(true)
.setBlockCache(blockCache)
.setCacheIndexAndFilterBlocks(databaseOptions.setCacheIndexAndFilterBlocks().orElse(true))
.setCacheIndexAndFilterBlocksWithHighPriority(true)
.setPinTopLevelIndexAndFilter(true)
.setPinL0FilterAndIndexBlocksInCache(true)
;
if (databaseOptions.useDirectIO()) { if (databaseOptions.useDirectIO()) {
options options
@ -414,18 +464,6 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
.setMaxTotalWalSize(2L * SizeUnit.GB) // 2GiB max wal directory size .setMaxTotalWalSize(2L * SizeUnit.GB) // 2GiB max wal directory size
; ;
blockCache = new ClockCache(databaseOptions.blockCache().orElse( 512 * SizeUnit.MB), -1, true); blockCache = new ClockCache(databaseOptions.blockCache().orElse( 512 * SizeUnit.MB), -1, true);
tableOptions
.setIndexType(IndexType.kTwoLevelIndexSearch)
.setPartitionFilters(true)
.setBlockCache(blockCache)
.setCacheIndexAndFilterBlocks(databaseOptions.setCacheIndexAndFilterBlocks().orElse( true))
.setCacheIndexAndFilterBlocksWithHighPriority(true)
.setPinTopLevelIndexAndFilter(true)
.setPinL0FilterAndIndexBlocksInCache(true)
;
final BloomFilter bloomFilter = new BloomFilter(3, false);
tableOptions.setFilterPolicy(bloomFilter);
tableOptions.setOptimizeFiltersForMemory(true);
if (databaseOptions.useDirectIO()) { if (databaseOptions.useDirectIO()) {
options options
@ -440,29 +478,6 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
options.setWriteBufferManager(new WriteBufferManager(256L * 1024L * 1024L, blockCache)); options.setWriteBufferManager(new WriteBufferManager(256L * 1024L * 1024L, blockCache));
//noinspection ConstantConditions
if (databaseOptions.memtableMemoryBudgetBytes() != null) {
// 16MiB/256MiB of ram will be used for level style compaction
options.optimizeLevelStyleCompaction(databaseOptions
.memtableMemoryBudgetBytes()
.orElse(databaseOptions.lowMemory() ? 16L * SizeUnit.MB : 128L * SizeUnit.MB));
}
if (!databaseOptions.volumes().isEmpty()) {
int btmIndex = databaseOptions.volumes().size() - 1;
options.setCompressionType(databaseOptions.volumes().get(0).compression().getType());
options.setCompressionOptions(new CompressionOptions().setMaxDictBytes((int) (512L * SizeUnit.MB)));
options.setBottommostCompressionType(databaseOptions.volumes().get(btmIndex).compression().getType());
options.setBottommostCompressionOptions(new CompressionOptions().setMaxDictBytes((int) (512L * SizeUnit.MB)));
options.setCompressionPerLevel(databaseOptions.volumes().stream().map(v -> v.compression().getType()).toList());
} else {
options.setCompressionType(CompressionType.LZ4_COMPRESSION);
options.setCompressionOptions(new CompressionOptions().setMaxDictBytes((int) (512L * SizeUnit.MB)));
options.setBottommostCompressionType(CompressionType.LZ4HC_COMPRESSION);
options.setBottommostCompressionOptions(new CompressionOptions().setMaxDictBytes((int) (512L * SizeUnit.MB)));
}
if (databaseOptions.useDirectIO()) { if (databaseOptions.useDirectIO()) {
options options
.setAllowMmapReads(false) .setAllowMmapReads(false)
@ -479,11 +494,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
options.setUseDirectIoForFlushAndCompaction(true); options.setUseDirectIoForFlushAndCompaction(true);
} }
tableOptions.setBlockSize(16 * 1024); // 16KiB return new OptionsWithCache(options, blockCache);
options.setTableFormatConfig(tableOptions);
options.setCompactionPriority(CompactionPriority.MinOverlappingRatio);
return options;
} }
private static List<DbPath> convertPaths(Path databasesDirPath, Path path, List<DatabaseVolume> volumes) { private static List<DbPath> convertPaths(Path databasesDirPath, Path path, List<DatabaseVolume> volumes) {
@ -509,7 +520,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
} }
private void createIfNotExists(List<ColumnFamilyDescriptor> descriptors, private void createIfNotExists(List<ColumnFamilyDescriptor> descriptors,
Options options, DBOptions options,
boolean inMemory, boolean inMemory,
Path dbPath, Path dbPath,
String dbPathString) throws RocksDBException { String dbPathString) throws RocksDBException {
@ -534,7 +545,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
LinkedList<ColumnFamilyHandle> handles = new LinkedList<>(); LinkedList<ColumnFamilyHandle> handles = new LinkedList<>();
this.db = RocksDB.open(options, dbPathString); this.db = RocksDB.open(options, dbPathString, descriptors, handles);
for (ColumnFamilyDescriptor columnFamilyDescriptor : descriptorsToCreate) { for (ColumnFamilyDescriptor columnFamilyDescriptor : descriptorsToCreate) {
handles.add(db.createColumnFamily(columnFamilyDescriptor)); handles.add(db.createColumnFamily(columnFamilyDescriptor));
} }