CavalliumDBEngine/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java

1320 lines
47 KiB
Java
Raw Normal View History

2020-12-07 22:15:18 +01:00
package it.cavallium.dbengine.database.disk;
import static com.google.common.collect.Lists.partition;
2022-03-16 13:47:56 +01:00
import static io.netty5.buffer.api.StandardAllocationTypes.OFF_HEAP;
2021-09-10 12:13:52 +02:00
import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
2022-04-04 22:55:28 +02:00
import static org.rocksdb.ColumnFamilyOptionsInterface.DEFAULT_COMPACTION_MEMTABLE_MEMORY_BUDGET;
2021-09-10 12:13:52 +02:00
import io.micrometer.core.instrument.MeterRegistry;
2022-01-15 20:00:10 +01:00
import io.micrometer.core.instrument.Tag;
2021-12-30 18:20:56 +01:00
import io.micrometer.core.instrument.Timer;
2022-03-16 13:47:56 +01:00
import io.netty5.buffer.api.BufferAllocator;
import io.netty5.util.internal.PlatformDependent;
import it.cavallium.data.generator.nativedata.NullableString;
2022-01-15 20:00:10 +01:00
import it.cavallium.dbengine.client.MemoryStats;
2022-03-02 12:34:30 +01:00
import it.cavallium.dbengine.database.ColumnUtils;
2021-01-30 00:24:55 +01:00
import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.LLSnapshot;
2022-01-26 14:22:54 +01:00
import it.cavallium.dbengine.database.LLUtils;
2022-04-09 02:45:42 +02:00
import it.cavallium.dbengine.database.TableWithProperties;
2021-02-13 01:31:24 +01:00
import it.cavallium.dbengine.database.UpdateMode;
2022-03-02 12:34:30 +01:00
import it.cavallium.dbengine.rpc.current.data.Column;
2022-04-08 14:32:47 +02:00
import it.cavallium.dbengine.rpc.current.data.ColumnOptions;
2022-03-22 11:50:30 +01:00
import it.cavallium.dbengine.rpc.current.data.DatabaseLevel;
2022-03-02 12:34:30 +01:00
import it.cavallium.dbengine.rpc.current.data.DatabaseOptions;
import it.cavallium.dbengine.rpc.current.data.DatabaseVolume;
2022-04-08 14:32:47 +02:00
import it.cavallium.dbengine.rpc.current.data.NamedColumnOptions;
2020-12-07 22:15:18 +01:00
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.BasicFileAttributes;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
2020-12-07 22:15:18 +01:00
import java.util.concurrent.ConcurrentHashMap;
2021-03-21 13:06:54 +01:00
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
2020-12-07 22:15:18 +01:00
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.StampedLock;
2022-04-08 14:32:47 +02:00
import java.util.stream.Collectors;
2022-04-15 16:49:01 +02:00
import java.util.stream.Stream;
2021-03-21 13:06:54 +01:00
import org.apache.commons.lang3.time.StopWatch;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
2021-07-10 20:52:01 +02:00
import org.jetbrains.annotations.Nullable;
2020-12-07 22:15:18 +01:00
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
2022-01-12 16:18:31 +01:00
import org.rocksdb.Cache;
2022-03-19 00:08:23 +01:00
import org.rocksdb.ChecksumType;
2020-12-07 22:15:18 +01:00
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
2022-03-10 02:38:57 +01:00
import org.rocksdb.ColumnFamilyOptions;
2021-03-19 20:55:38 +01:00
import org.rocksdb.CompactRangeOptions;
import org.rocksdb.CompactionJobInfo;
import org.rocksdb.CompactionOptions;
2021-05-04 01:21:29 +02:00
import org.rocksdb.CompactionPriority;
2022-01-12 16:18:31 +01:00
import org.rocksdb.CompressionOptions;
2020-12-07 22:15:18 +01:00
import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions;
2022-04-19 23:23:32 +02:00
import org.rocksdb.DataBlockIndexType;
2020-12-07 22:15:18 +01:00
import org.rocksdb.DbPath;
import org.rocksdb.Env;
2020-12-07 22:15:18 +01:00
import org.rocksdb.FlushOptions;
2021-07-17 11:52:08 +02:00
import org.rocksdb.IndexType;
2022-03-10 02:38:57 +01:00
import org.rocksdb.InfoLogLevel;
import org.rocksdb.IngestExternalFileOptions;
import org.rocksdb.LRUCache;
import org.rocksdb.LevelMetaData;
2021-10-17 19:52:43 +02:00
import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.PersistentCache;
2020-12-07 22:15:18 +01:00
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.Snapshot;
import org.rocksdb.SstFileMetaData;
2021-10-17 19:52:43 +02:00
import org.rocksdb.TransactionDB;
2021-12-27 18:44:54 +01:00
import org.rocksdb.TransactionDBOptions;
import org.rocksdb.TxnDBWritePolicy;
2020-12-07 22:15:18 +01:00
import org.rocksdb.WALRecoveryMode;
2021-03-20 12:41:11 +01:00
import org.rocksdb.WriteBufferManager;
2022-01-12 16:18:31 +01:00
import org.rocksdb.util.SizeUnit;
2022-04-04 17:52:49 +02:00
import org.warp.commonutils.type.ShortNamedThreadFactory;
2022-04-09 02:45:42 +02:00
import reactor.core.publisher.Flux;
2021-01-30 01:42:37 +01:00
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
2021-01-30 01:42:37 +01:00
import reactor.core.scheduler.Schedulers;
2020-12-07 22:15:18 +01:00
public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
private static final boolean DELETE_LOG_FILES = false;
2020-12-07 22:15:18 +01:00
static {
RocksDB.loadLibrary();
2022-01-26 14:22:54 +01:00
LLUtils.initHooks();
2020-12-07 22:15:18 +01:00
}
protected static final Logger logger = LogManager.getLogger(LLLocalKeyValueDatabase.class);
2022-01-15 20:00:10 +01:00
private static final ColumnFamilyDescriptor DEFAULT_COLUMN_FAMILY
= new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY);
2020-12-07 22:15:18 +01:00
2021-08-29 23:18:03 +02:00
private final BufferAllocator allocator;
private final MeterRegistry meterRegistry;
2022-04-05 13:58:12 +02:00
private final Scheduler dbWScheduler;
private final Scheduler dbRScheduler;
2021-06-27 15:40:56 +02:00
2021-12-30 18:20:56 +01:00
private final Timer snapshotTime;
2021-06-27 15:40:56 +02:00
// Configurations
2020-12-07 22:15:18 +01:00
private final Path dbPath;
private final String name;
2021-06-27 15:40:56 +02:00
private final DatabaseOptions databaseOptions;
2021-10-30 12:39:56 +02:00
private final boolean nettyDirect;
2021-06-27 15:40:56 +02:00
2021-06-19 16:26:54 +02:00
private final boolean enableColumnsBug;
2021-10-20 01:51:34 +02:00
private RocksDB db;
private Cache standardCache;
private Cache compressedCache;
2020-12-07 22:15:18 +01:00
private final Map<Column, ColumnFamilyHandle> handles;
private final HashMap<String, PersistentCache> persistentCaches;
2020-12-07 22:15:18 +01:00
private final ConcurrentHashMap<Long, Snapshot> snapshotsHandles = new ConcurrentHashMap<>();
private final AtomicLong nextSnapshotNumbers = new AtomicLong(1);
private final StampedLock closeLock = new StampedLock();
2022-01-15 20:00:10 +01:00
private volatile boolean closed = false;
2020-12-07 22:15:18 +01:00
2021-06-27 15:40:56 +02:00
@SuppressWarnings("SwitchStatementWithTooFewBranches")
2021-08-29 23:18:03 +02:00
public LLLocalKeyValueDatabase(BufferAllocator allocator,
MeterRegistry meterRegistry,
2021-05-03 21:41:51 +02:00
String name,
2022-03-02 12:34:30 +01:00
boolean inMemory,
2021-07-10 20:52:01 +02:00
@Nullable Path path,
2021-05-03 21:41:51 +02:00
List<Column> columns,
List<ColumnFamilyHandle> handles,
2021-06-27 15:40:56 +02:00
DatabaseOptions databaseOptions) throws IOException {
this.name = name;
2021-05-03 21:41:51 +02:00
this.allocator = allocator;
2021-10-30 12:39:56 +02:00
this.nettyDirect = databaseOptions.allowNettyDirect() && allocator.getAllocationType() == OFF_HEAP;
this.meterRegistry = meterRegistry;
2021-09-02 17:15:40 +02:00
2021-12-30 18:20:56 +01:00
this.snapshotTime = Timer
.builder("db.snapshot.timer")
.publishPercentiles(0.2, 0.5, 0.95)
.publishPercentileHistogram()
.tags("db.name", name)
.register(meterRegistry);
2021-10-30 12:39:56 +02:00
if (nettyDirect) {
2021-09-02 17:15:40 +02:00
if (!PlatformDependent.hasUnsafe()) {
throw new UnsupportedOperationException("Please enable unsafe support or disable netty direct buffers",
PlatformDependent.getUnsafeUnavailabilityCause()
);
}
}
2022-04-15 16:49:01 +02:00
this.enableColumnsBug = "true".equals(databaseOptions.extraFlags().getOrDefault("enableColumnBug", "false"));
if (!enableColumnsBug) {
if (columns.stream().noneMatch(column -> column.name().equals("default"))) {
columns = Stream.concat(Stream.of(Column.of("default")), columns.stream()).toList();
}
}
2022-03-12 02:55:18 +01:00
OptionsWithCache optionsWithCache = openRocksDb(path, databaseOptions);
2022-03-10 02:38:57 +01:00
var rocksdbOptions = optionsWithCache.options();
2020-12-07 22:15:18 +01:00
try {
2022-03-22 11:50:30 +01:00
List<ColumnFamilyDescriptor> descriptors = new ArrayList<>();
2022-03-10 02:38:57 +01:00
2022-03-22 11:50:30 +01:00
descriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY));
2022-04-08 14:32:47 +02:00
// Check column names validity
for (NamedColumnOptions columnOption : databaseOptions.columnOptions()) {
if (columns.stream().map(Column::name).noneMatch(columnName -> columnName.equals(columnOption.columnName()))) {
throw new IllegalArgumentException(
"Column " + columnOption.columnName() + " does not exist. Available columns: " + columns
.stream()
.map(Column::name)
.collect(Collectors.joining(", ", "[", "]")));
}
}
this.persistentCaches = new HashMap<>();
2020-12-07 22:15:18 +01:00
for (Column column : columns) {
2022-04-08 14:32:47 +02:00
var columnFamilyOptions = new ColumnFamilyOptions();
var columnOptions = databaseOptions
.columnOptions()
.stream()
.filter(opts -> opts.columnName().equals(column.name()))
.findFirst()
.map(opts -> (ColumnOptions) opts)
.orElse(databaseOptions.defaultColumnOptions());
2022-03-10 02:38:57 +01:00
//noinspection ConstantConditions
2022-04-08 14:32:47 +02:00
if (columnOptions.memtableMemoryBudgetBytes() != null) {
2022-04-04 22:55:28 +02:00
// about 512MB of ram will be used for level style compaction
2022-04-08 14:32:47 +02:00
columnFamilyOptions.optimizeLevelStyleCompaction(columnOptions.memtableMemoryBudgetBytes().orElse(
2022-04-04 22:55:28 +02:00
databaseOptions.lowMemory()
? (DEFAULT_COMPACTION_MEMTABLE_MEMORY_BUDGET / 4)
: DEFAULT_COMPACTION_MEMTABLE_MEMORY_BUDGET));
2022-03-10 02:38:57 +01:00
}
2022-04-28 11:35:01 +02:00
if (isDisableAutoCompactions()) {
columnFamilyOptions.setDisableAutoCompactions(true);
}
boolean dynamicLevelBytes;
// This option is not supported with multiple db paths
if (databaseOptions.volumes().size() <= 1) {
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
dynamicLevelBytes = true;
} else {
dynamicLevelBytes = false;
}
if (dynamicLevelBytes) {
columnFamilyOptions.setLevelCompactionDynamicLevelBytes(dynamicLevelBytes);
} else {
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
// https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html
columnFamilyOptions.setMaxBytesForLevelBase(256 * SizeUnit.MB);
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
columnFamilyOptions.setMaxBytesForLevelMultiplier(10);
}
2022-04-28 11:35:01 +02:00
if (isDisableAutoCompactions()) {
columnFamilyOptions.setLevel0FileNumCompactionTrigger(-1);
} else {
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
// Higher values speed up writes, but slow down reads
columnFamilyOptions.setLevel0FileNumCompactionTrigger(2);
}
if (isDisableSlowdown()) {
columnFamilyOptions.setLevel0SlowdownWritesTrigger(-1);
columnFamilyOptions.setLevel0StopWritesTrigger(Integer.MAX_VALUE);
columnFamilyOptions.setHardPendingCompactionBytesLimit(Long.MAX_VALUE);
columnFamilyOptions.setSoftPendingCompactionBytesLimit(Long.MAX_VALUE);
} else {
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
columnFamilyOptions.setLevel0SlowdownWritesTrigger(20);
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
columnFamilyOptions.setLevel0StopWritesTrigger(36);
}
2022-04-08 14:32:47 +02:00
if (!columnOptions.levels().isEmpty()) {
columnFamilyOptions.setNumLevels(columnOptions.levels().size());
2022-04-08 14:32:47 +02:00
var firstLevelOptions = getRocksLevelOptions(columnOptions.levels().get(0));
columnFamilyOptions.setCompressionType(firstLevelOptions.compressionType);
columnFamilyOptions.setCompressionOptions(firstLevelOptions.compressionOptions);
2022-03-22 11:50:30 +01:00
2022-04-08 14:32:47 +02:00
var lastLevelOptions = getRocksLevelOptions(columnOptions
2022-03-22 11:50:30 +01:00
.levels()
2022-04-08 14:32:47 +02:00
.get(columnOptions.levels().size() - 1));
columnFamilyOptions.setBottommostCompressionType(lastLevelOptions.compressionType);
columnFamilyOptions.setBottommostCompressionOptions(lastLevelOptions.compressionOptions);
2022-03-22 11:50:30 +01:00
2022-04-08 14:32:47 +02:00
columnFamilyOptions.setCompressionPerLevel(columnOptions
2022-03-22 11:50:30 +01:00
.levels()
2022-03-21 15:19:17 +01:00
.stream()
.map(v -> v.compression().getType())
.toList());
2022-03-10 02:38:57 +01:00
} else {
2022-04-30 02:14:44 +02:00
columnFamilyOptions.setNumLevels(7);
List<CompressionType> compressionTypes = new ArrayList<>(7);
for (int i = 0; i < 7; i++) {
2022-03-22 11:50:30 +01:00
if (i < 2) {
compressionTypes.add(CompressionType.NO_COMPRESSION);
} else {
compressionTypes.add(CompressionType.LZ4_COMPRESSION);
}
}
columnFamilyOptions.setBottommostCompressionType(CompressionType.LZ4HC_COMPRESSION);
2022-04-08 14:32:47 +02:00
columnFamilyOptions.setBottommostCompressionOptions(new CompressionOptions()
2022-03-21 15:19:17 +01:00
.setEnabled(true)
.setMaxDictBytes(32768));
2022-04-08 14:32:47 +02:00
columnFamilyOptions.setCompressionPerLevel(compressionTypes);
2022-03-10 02:38:57 +01:00
}
final BlockBasedTableConfig tableOptions = new BlockBasedTableConfig();
2022-03-22 11:50:30 +01:00
if (!databaseOptions.lowMemory()) {
2022-04-15 16:49:01 +02:00
// tableOptions.setOptimizeFiltersForMemory(true);
columnFamilyOptions.setWriteBufferSize(256 * SizeUnit.MB);
}
if (columnOptions.writeBufferSize().isPresent()) {
columnFamilyOptions.setWriteBufferSize(columnOptions.writeBufferSize().get());
2022-03-10 02:38:57 +01:00
}
2022-04-08 14:32:47 +02:00
tableOptions.setVerifyCompression(false);
if (columnOptions.filter().isPresent()) {
var filterOptions = columnOptions.filter().get();
2022-04-06 14:53:08 +02:00
if (filterOptions instanceof it.cavallium.dbengine.rpc.current.data.BloomFilter bloomFilterOptions) {
// If OptimizeFiltersForHits == true: memory size = bitsPerKey * (totalKeys * 0.1)
// If OptimizeFiltersForHits == false: memory size = bitsPerKey * totalKeys
final BloomFilter bloomFilter = new BloomFilter(bloomFilterOptions.bitsPerKey());
tableOptions.setFilterPolicy(bloomFilter);
}
}
2022-04-08 14:32:47 +02:00
boolean cacheIndexAndFilterBlocks = columnOptions.cacheIndexAndFilterBlocks()
2022-04-05 13:58:12 +02:00
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
.orElse(true);
if (databaseOptions.spinning()) {
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
2022-04-15 16:49:01 +02:00
// cacheIndexAndFilterBlocks = true;
2022-04-09 02:45:42 +02:00
// https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html
columnFamilyOptions.setMinWriteBufferNumberToMerge(3);
// https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html
columnFamilyOptions.setMaxWriteBufferNumber(4);
}
2022-03-12 02:55:18 +01:00
tableOptions
2022-04-19 23:23:32 +02:00
// http://rocksdb.org/blog/2018/08/23/data-block-hash-index.html
.setDataBlockIndexType(DataBlockIndexType.kDataBlockBinaryAndHash)
// http://rocksdb.org/blog/2018/08/23/data-block-hash-index.html
.setDataBlockHashTableUtilRatio(0.75)
2022-04-05 13:58:12 +02:00
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
2022-03-22 11:50:30 +01:00
.setPinTopLevelIndexAndFilter(true)
2022-04-05 13:58:12 +02:00
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
2022-03-22 11:50:30 +01:00
.setPinL0FilterAndIndexBlocksInCache(true)
2022-04-05 13:58:12 +02:00
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
2022-03-22 11:50:30 +01:00
.setCacheIndexAndFilterBlocksWithHighPriority(true)
.setCacheIndexAndFilterBlocks(cacheIndexAndFilterBlocks)
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
// Enabling partition filters increase the reads by 2x
2022-04-11 20:04:27 +02:00
.setPartitionFilters(columnOptions.partitionFilters().orElse(false))
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
2022-04-15 16:49:01 +02:00
.setIndexType(columnOptions.partitionFilters().orElse(false) ? IndexType.kTwoLevelIndexSearch : IndexType.kBinarySearch)
2022-03-29 21:27:56 +02:00
//todo: replace with kxxhash3
.setChecksumType(ChecksumType.kCRC32c)
2022-04-04 10:27:38 +02:00
// Spinning disks: 64KiB to 256KiB (also 512KiB). SSDs: 16KiB
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
2022-04-09 02:45:42 +02:00
// https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html
.setBlockSize(columnOptions.blockSize().orElse((databaseOptions.spinning() ? 128 : 16) * 1024))
.setBlockCacheCompressed(optionsWithCache.compressedCache())
.setBlockCache(optionsWithCache.standardCache())
.setPersistentCache(resolvePersistentCache(persistentCaches, rocksdbOptions, databaseOptions.persistentCaches(), columnOptions.persistentCacheId()));
2022-03-10 02:38:57 +01:00
2022-04-08 14:32:47 +02:00
columnFamilyOptions.setTableFormatConfig(tableOptions);
columnFamilyOptions.setCompactionPriority(CompactionPriority.MinOverlappingRatio);
if (columnOptions.filter().isPresent()) {
var filterOptions = columnOptions.filter().get();
2022-04-06 14:53:08 +02:00
if (filterOptions instanceof it.cavallium.dbengine.rpc.current.data.BloomFilter bloomFilterOptions) {
boolean optimizeForHits = bloomFilterOptions.optimizeForHits()
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
// https://github.com/EighteenZi/rocksdb_wiki/blob/master/RocksDB-Tuning-Guide.md#throughput-gap-between-random-read-vs-sequential-read-is-much-higher-in-spinning-disks-suggestions=
.orElse(databaseOptions.spinning());
2022-04-08 14:32:47 +02:00
columnFamilyOptions.setOptimizeFiltersForHits(optimizeForHits);
2022-04-06 14:53:08 +02:00
}
}
2022-04-06 14:53:08 +02:00
// // Increasing this value can reduce the frequency of compaction and reduce write amplification,
// // but it will also cause old data to be unable to be cleaned up in time, thus increasing read amplification.
// // This parameter is not easy to adjust. It is generally not recommended to set it above 256MB.
2022-04-09 02:45:42 +02:00
// https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html
columnFamilyOptions.setTargetFileSizeBase(64 * SizeUnit.MB);
2022-04-06 14:53:08 +02:00
// // For each level up, the threshold is multiplied by the factor target_file_size_multiplier
// // (but the default value is 1, which means that the maximum sstable of each level is the same).
2022-04-09 02:45:42 +02:00
columnFamilyOptions.setTargetFileSizeMultiplier(1);
2022-03-10 02:38:57 +01:00
2022-04-08 14:32:47 +02:00
descriptors.add(new ColumnFamilyDescriptor(column.name().getBytes(StandardCharsets.US_ASCII), columnFamilyOptions));
2020-12-07 22:15:18 +01:00
}
// Get databases directory path
Objects.requireNonNull(path);
2020-12-07 22:15:18 +01:00
Path databasesDirPath = path.toAbsolutePath().getParent();
String dbPathString = databasesDirPath.toString() + File.separatorChar + path.getFileName();
Path dbPath = Paths.get(dbPathString);
this.dbPath = dbPath;
2021-06-27 15:40:56 +02:00
// Set options
this.databaseOptions = databaseOptions;
int threadCap;
if (databaseOptions.lowMemory()) {
2022-03-20 16:14:31 +01:00
threadCap = Math.max(1, Runtime.getRuntime().availableProcessors());
2022-04-05 13:58:12 +02:00
this.dbWScheduler = Schedulers.boundedElastic();
this.dbRScheduler = Schedulers.boundedElastic();
2021-06-27 16:33:23 +02:00
} else {
// 8 or more
2022-04-07 20:03:29 +02:00
threadCap = Math.max(8, Runtime.getRuntime().availableProcessors());
{
var threadCapProperty = Integer.parseInt(System.getProperty("it.cavallium.dbengine.scheduler.write.threads", "0"));
if (threadCapProperty > 1) {
threadCap = threadCapProperty;
}
}
if (Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.scheduler.write.shared", "true"))) {
2022-04-05 13:58:12 +02:00
this.dbWScheduler = Schedulers.boundedElastic();
2022-03-21 15:19:17 +01:00
} else {
2022-04-05 13:58:12 +02:00
this.dbWScheduler = Schedulers.newBoundedElastic(threadCap,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
new ShortNamedThreadFactory("db-write-" + name).setDaemon(true).withGroup(new ThreadGroup("database-write")),
60
);
2022-04-07 20:03:29 +02:00
}
// 8 or more
threadCap = Math.max(8, Runtime.getRuntime().availableProcessors());
{
var threadCapProperty = Integer.parseInt(System.getProperty("it.cavallium.dbengine.scheduler.read.threads", "0"));
if (threadCapProperty > 1) {
threadCap = threadCapProperty;
}
}
if (Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.scheduler.read.shared", "true"))) {
this.dbRScheduler = Schedulers.boundedElastic();
} else {
2022-04-05 13:58:12 +02:00
this.dbRScheduler = Schedulers.newBoundedElastic(threadCap,
2022-03-21 15:19:17 +01:00
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
2022-04-05 13:58:12 +02:00
new ShortNamedThreadFactory("db-write-" + name).setDaemon(true).withGroup(new ThreadGroup("database-read")),
2022-04-04 17:52:49 +02:00
60
2022-03-21 15:19:17 +01:00
);
}
2021-06-27 15:40:56 +02:00
}
2020-12-07 22:15:18 +01:00
createIfNotExists(descriptors, rocksdbOptions, standardCache, compressedCache, inMemory, dbPath, dbPathString);
2021-06-09 02:56:53 +02:00
2021-06-25 23:47:53 +02:00
while (true) {
try {
// a factory method that returns a RocksDB instance
2021-12-27 17:45:52 +01:00
if (databaseOptions.optimistic()) {
2022-03-10 02:38:57 +01:00
this.db = OptimisticTransactionDB.open(rocksdbOptions, dbPathString, descriptors, handles);
2021-12-27 17:45:52 +01:00
} else {
2022-03-10 02:38:57 +01:00
this.db = TransactionDB.open(rocksdbOptions,
2021-12-27 23:07:11 +01:00
new TransactionDBOptions()
.setWritePolicy(TxnDBWritePolicy.WRITE_COMMITTED)
.setTransactionLockTimeout(5000)
.setDefaultLockTimeout(5000),
2021-12-27 18:44:54 +01:00
dbPathString,
descriptors,
handles
);
2021-12-27 17:45:52 +01:00
}
this.standardCache = optionsWithCache.standardCache;
this.compressedCache = optionsWithCache.compressedCache;
2021-06-25 23:47:53 +02:00
break;
} catch (RocksDBException ex) {
switch (ex.getMessage()) {
case "Direct I/O is not supported by the specified DB." -> {
logger.warn(ex.getLocalizedMessage());
2021-06-27 15:40:56 +02:00
rocksdbOptions
2021-06-25 23:47:53 +02:00
.setUseDirectReads(false)
2021-06-27 15:06:48 +02:00
.setUseDirectIoForFlushAndCompaction(false)
2021-06-27 15:40:56 +02:00
.setAllowMmapReads(databaseOptions.allowMemoryMapping())
.setAllowMmapWrites(databaseOptions.allowMemoryMapping());
2021-06-25 23:47:53 +02:00
}
default -> throw ex;
}
}
}
2020-12-07 22:15:18 +01:00
this.handles = new HashMap<>();
2022-03-02 12:34:30 +01:00
if (enableColumnsBug && !inMemory) {
2021-06-19 16:26:54 +02:00
for (int i = 0; i < columns.size(); i++) {
this.handles.put(columns.get(i), handles.get(i));
}
} else {
handles: for (ColumnFamilyHandle handle : handles) {
for (Column column : columns) {
if (Arrays.equals(column.name().getBytes(StandardCharsets.US_ASCII), handle.getName())) {
this.handles.put(column, handle);
continue handles;
}
}
}
2020-12-07 22:15:18 +01:00
}
2021-03-21 13:06:54 +01:00
// compactDb(db, handles);
2020-12-07 22:15:18 +01:00
flushDb(db, handles);
} catch (RocksDBException ex) {
throw new IOException(ex);
}
2022-01-15 20:00:10 +01:00
try {
for (ColumnFamilyHandle cfh : handles) {
var props = db.getProperty(cfh, "rocksdb.stats");
logger.trace("Stats for database {}, column {}: {}",
name,
new String(cfh.getName(), StandardCharsets.UTF_8),
props
);
}
} catch (RocksDBException ex) {
logger.debug("Failed to obtain stats", ex);
}
registerGauge(meterRegistry, name, "rocksdb.estimate-table-readers-mem", false);
registerGauge(meterRegistry, name, "rocksdb.size-all-mem-tables", false);
registerGauge(meterRegistry, name, "rocksdb.cur-size-all-mem-tables", false);
registerGauge(meterRegistry, name, "rocksdb.estimate-num-keys", false);
registerGauge(meterRegistry, name, "rocksdb.block-cache-usage", true);
registerGauge(meterRegistry, name, "rocksdb.block-cache-pinned-usage", true);
2022-04-07 20:03:29 +02:00
// Bloom seek stats
registerGauge(meterRegistry, name, "rocksdb.bloom.filter.prefix.useful", false);
registerGauge(meterRegistry, name, "rocksdb.bloom.filter.prefix.checked", false);
2022-04-07 20:03:29 +02:00
// Bloom point lookup stats
registerGauge(meterRegistry, name, "rocksdb.bloom.filter.useful", false);
registerGauge(meterRegistry, name, "rocksdb.bloom.filter.full.positive", false);
registerGauge(meterRegistry, name, "rocksdb.bloom.filter.full.true.positive", false);
2022-01-15 20:00:10 +01:00
}
public static boolean isDisableAutoCompactions() {
2022-04-28 11:35:01 +02:00
return Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.compactions.auto.disable", "false"));
}
public static boolean isDisableSlowdown() {
return isDisableAutoCompactions()
|| Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.disableslowdown", "false"));
}
private synchronized PersistentCache resolvePersistentCache(HashMap<String, PersistentCache> caches,
DBOptions rocksdbOptions,
List<it.cavallium.dbengine.rpc.current.data.PersistentCache> persistentCaches,
NullableString persistentCacheId) throws RocksDBException {
if (persistentCacheId.isEmpty()) {
return null;
}
var existingPersistentCache = caches.get(persistentCacheId.get());
if (existingPersistentCache != null) {
return existingPersistentCache;
}
var foundCaches = persistentCaches
.stream()
.filter(cache -> cache.id().equals(persistentCacheId.get()))
.toList();
if (foundCaches.size() > 1) {
throw new IllegalArgumentException("There are " + foundCaches.size()
+ " defined persistent caches with the id \"" + persistentCacheId.get() + "\"");
}
for (it.cavallium.dbengine.rpc.current.data.PersistentCache foundCache : foundCaches) {
var persistentCache = new PersistentCache(Env.getDefault(),
foundCache.path(),
foundCache.size(),
new RocksLog4jLogger(rocksdbOptions, logger),
foundCache.optimizeForNvm()
);
var prev = caches.put(persistentCacheId.get(), persistentCache);
if (prev != null) {
throw new IllegalStateException();
}
return persistentCache;
}
throw new IllegalArgumentException("Persistent cache " + persistentCacheId.get() + " is not defined");
}
2022-03-22 12:59:22 +01:00
public Map<Column, ColumnFamilyHandle> getAllColumnFamilyHandles() {
return this.handles;
}
public int getLastVolumeId() {
var paths = convertPaths(dbPath.toAbsolutePath().getParent(), dbPath.getFileName(), databaseOptions.volumes());
return paths.size() - 1;
}
2022-04-30 02:14:44 +02:00
public int getLevels(Column column) {
return RocksDBUtils.getLevels(db, handles.get(column));
}
public List<String> getColumnFiles(Column column, boolean excludeLastLevel) {
var cfh = handles.get(column);
return RocksDBUtils.getColumnFiles(db, cfh, excludeLastLevel);
}
public void forceCompaction(int volumeId) throws RocksDBException {
for (var cfh : this.handles.values()) {
RocksDBUtils.forceCompaction(db, name, cfh, volumeId, logger);
}
}
public void flush(FlushOptions flushOptions) throws RocksDBException {
db.flush(flushOptions);
}
2022-03-22 11:50:30 +01:00
private record RocksLevelOptions(CompressionType compressionType, CompressionOptions compressionOptions) {}
private RocksLevelOptions getRocksLevelOptions(DatabaseLevel levelOptions) {
var compressionType = levelOptions.compression().getType();
var compressionOptions = new CompressionOptions();
if (compressionType != CompressionType.NO_COMPRESSION) {
compressionOptions.setEnabled(true);
compressionOptions.setMaxDictBytes(levelOptions.maxDictBytes());
} else {
compressionOptions.setEnabled(false);
}
return new RocksLevelOptions(compressionType, compressionOptions);
}
private void registerGauge(MeterRegistry meterRegistry, String name, String propertyName, boolean divideByAllColumns) {
2022-01-15 20:00:10 +01:00
meterRegistry.gauge("rocksdb.property.value",
List.of(Tag.of("db.name", name), Tag.of("db.property.name", propertyName)),
db,
database -> {
if (closed) {
return 0d;
}
try {
return database.getAggregatedLongProperty(propertyName)
/ (divideByAllColumns ? getAllColumnFamilyHandles().size() : 1d);
2022-01-15 20:00:10 +01:00
} catch (RocksDBException e) {
2022-04-07 22:19:11 +02:00
if ("NotFound".equals(e.getMessage())) {
return 0d;
}
2022-01-15 20:00:10 +01:00
throw new RuntimeException(e);
}
}
);
2020-12-07 22:15:18 +01:00
}
@Override
public String getDatabaseName() {
return name;
}
public StampedLock getCloseLock() {
return closeLock;
}
private void flushAndCloseDb(RocksDB db, Cache standardCache, Cache compressedCache, List<ColumnFamilyHandle> handles)
2020-12-07 22:15:18 +01:00
throws RocksDBException {
var closeWriteLock = closeLock.writeLock();
try {
if (closed) {
return;
}
closed = true;
if (db.isOwningHandle()) {
//flushDb(db, handles);
}
2020-12-07 22:15:18 +01:00
for (ColumnFamilyHandle handle : handles) {
try {
handle.close();
} catch (Exception ex) {
logger.error("Can't close column family", ex);
}
}
snapshotsHandles.forEach((id, snapshot) -> {
try {
2022-04-20 23:29:39 +02:00
if (db.isOwningHandle()) {
db.releaseSnapshot(snapshot);
}
} catch (Exception ex2) {
// ignore exception
logger.debug("Failed to release snapshot " + id, ex2);
}
});
snapshotsHandles.clear();
2021-07-06 22:27:03 +02:00
try {
db.closeE();
2021-07-06 22:27:03 +02:00
} catch (Exception ex) {
logger.error("Can't close database " + name + " at " + dbPath, ex);
2021-07-06 22:27:03 +02:00
}
if (compressedCache != null) {
compressedCache.close();
}
if (standardCache != null) {
standardCache.close();
}
for (PersistentCache persistentCache : persistentCaches.values()) {
try {
persistentCache.close();
} catch (Exception ex) {
logger.error("Can't close persistent cache", ex);
}
2021-07-01 21:19:52 +02:00
}
} finally {
closeLock.unlockWrite(closeWriteLock);
2022-04-11 20:04:27 +02:00
}
2020-12-07 22:15:18 +01:00
}
2021-10-20 01:51:34 +02:00
private void flushDb(RocksDB db, List<ColumnFamilyHandle> handles) throws RocksDBException {
if (Schedulers.isInNonBlockingThread()) {
logger.error("Called flushDb in a nonblocking thread");
}
2020-12-07 22:15:18 +01:00
// force flush the database
2021-07-06 22:27:03 +02:00
try (var flushOptions = new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true)) {
db.flush(flushOptions);
2020-12-07 22:15:18 +01:00
}
2021-07-06 22:27:03 +02:00
try (var flushOptions = new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true)) {
db.flush(flushOptions, handles);
}
db.flushWal(true);
db.syncWal();
2020-12-07 22:15:18 +01:00
// end force flush
}
2021-04-03 19:09:06 +02:00
@SuppressWarnings("unused")
2021-10-20 01:51:34 +02:00
private void compactDb(TransactionDB db, List<ColumnFamilyHandle> handles) {
if (Schedulers.isInNonBlockingThread()) {
logger.error("Called compactDb in a nonblocking thread");
}
2021-03-19 20:55:38 +01:00
// force compact the database
for (ColumnFamilyHandle cfh : handles) {
var t = new Thread(() -> {
2021-03-21 13:06:54 +01:00
int r = ThreadLocalRandom.current().nextInt();
var s = StopWatch.createStarted();
2021-03-19 20:55:38 +01:00
try {
// Range rangeToCompact = db.suggestCompactRange(cfh);
2021-03-21 13:06:54 +01:00
logger.info("Compacting range {}", r);
db.compactRange(cfh, null, null, new CompactRangeOptions()
.setAllowWriteStall(true)
.setExclusiveManualCompaction(true)
.setChangeLevel(false));
2021-03-19 20:55:38 +01:00
} catch (RocksDBException e) {
if ("Database shutdown".equalsIgnoreCase(e.getMessage())) {
logger.warn("Compaction cancelled: database shutdown");
} else {
logger.warn("Failed to compact range", e);
}
}
2021-03-21 13:06:54 +01:00
logger.info("Compacted range {} in {} milliseconds", r, s.getTime(TimeUnit.MILLISECONDS));
2021-03-19 20:55:38 +01:00
}, "Compaction");
t.setDaemon(true);
t.start();
}
// end force compact
}
2022-03-22 11:50:30 +01:00
record OptionsWithCache(DBOptions options, @Nullable Cache standardCache, @Nullable Cache compressedCache) {}
2022-03-10 02:38:57 +01:00
private static OptionsWithCache openRocksDb(@Nullable Path path, DatabaseOptions databaseOptions) throws IOException {
2020-12-07 22:15:18 +01:00
// Get databases directory path
2021-07-10 20:52:01 +02:00
Path databasesDirPath;
if (path != null) {
databasesDirPath = path.toAbsolutePath().getParent();
// Create base directories
if (Files.notExists(databasesDirPath)) {
Files.createDirectories(databasesDirPath);
}
} else {
databasesDirPath = null;
2020-12-07 22:15:18 +01:00
}
// the Options class contains a set of configurable DB options
// that determines the behaviour of the database.
2022-03-10 02:38:57 +01:00
var options = new DBOptions();
2022-03-22 11:50:30 +01:00
options.setEnablePipelinedWrite(true);
options.setMaxSubcompactions(Integer.parseInt(System.getProperty("it.cavallium.dbengine.compactions.max.sub", "2")));
2022-03-22 11:50:30 +01:00
var customWriteRate = Long.parseLong(System.getProperty("it.cavallium.dbengine.write.delayedrate", "-1"));
if (customWriteRate >= 0) {
options.setDelayedWriteRate(customWriteRate);
} else {
options.setDelayedWriteRate(64 * SizeUnit.MB);
}
2020-12-07 22:15:18 +01:00
options.setCreateIfMissing(true);
options.setSkipStatsUpdateOnDbOpen(true);
2021-07-06 22:27:03 +02:00
options.setCreateMissingColumnFamilies(true);
options.setInfoLogLevel(InfoLogLevel.WARN_LEVEL);
options.setAvoidFlushDuringShutdown(false); // Flush all WALs during shutdown
2022-03-21 15:19:17 +01:00
options.setAvoidFlushDuringRecovery(true); // Flush all WALs during startup
2021-06-27 15:40:56 +02:00
options.setWalRecoveryMode(databaseOptions.absoluteConsistency()
? WALRecoveryMode.AbsoluteConsistency
: WALRecoveryMode.PointInTimeRecovery); // Crash if the WALs are corrupted.Default: TolerateCorruptedTailRecords
2020-12-07 22:15:18 +01:00
options.setDeleteObsoleteFilesPeriodMicros(20 * 1000000); // 20 seconds
options.setKeepLogFileNum(10);
2021-07-10 20:52:01 +02:00
Objects.requireNonNull(databasesDirPath);
Objects.requireNonNull(path.getFileName());
List<DbPath> paths = convertPaths(databasesDirPath, path.getFileName(), databaseOptions.volumes())
.stream()
.map(p -> new DbPath(p.path, p.targetSize))
.toList();
2021-06-09 02:56:53 +02:00
options.setDbPaths(paths);
2022-03-02 12:34:30 +01:00
options.setMaxOpenFiles(databaseOptions.maxOpenFiles().orElse(-1));
2022-04-09 02:45:42 +02:00
if (databaseOptions.spinning()) {
// https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html
options.setUseFsync(false);
}
2022-03-10 02:38:57 +01:00
2022-04-11 16:53:17 +02:00
long writeBufferManagerSize;
if (databaseOptions.writeBufferManager().isPresent()) {
writeBufferManagerSize = databaseOptions.writeBufferManager().get();
} else {
writeBufferManagerSize = 0;
}
if (isDisableAutoCompactions()) {
options.setMaxBackgroundCompactions(0);
options.setMaxBackgroundJobs(0);
} else {
options.setMaxBackgroundJobs(Integer.parseInt(System.getProperty("it.cavallium.dbengine.jobs.background.num", "2")));
}
2022-01-12 16:18:31 +01:00
Cache blockCache;
2022-03-12 02:55:18 +01:00
Cache compressedCache;
2021-06-27 15:40:56 +02:00
if (databaseOptions.lowMemory()) {
2020-12-07 22:15:18 +01:00
// LOW MEMORY
options
2021-05-18 01:10:30 +02:00
.setBytesPerSync(0) // default
.setWalBytesPerSync(0) // default
2020-12-07 22:15:18 +01:00
.setIncreaseParallelism(1)
2022-03-10 02:38:57 +01:00
.setDbWriteBufferSize(8 * SizeUnit.MB)
2021-05-18 01:10:30 +02:00
.setWalTtlSeconds(0)
2022-03-30 23:44:55 +02:00
.setWalSizeLimitMB(0)
2021-05-18 01:10:30 +02:00
.setMaxTotalWalSize(0) // automatic
;
// DO NOT USE ClockCache! IT'S BROKEN!
2022-04-11 16:53:17 +02:00
blockCache = new LRUCache(writeBufferManagerSize + databaseOptions.blockCache().orElse( 8L * SizeUnit.MB));
if (databaseOptions.compressedBlockCache().isPresent()) {
compressedCache = new LRUCache(databaseOptions.compressedBlockCache().get());
} else {
compressedCache = null;
}
2021-07-17 11:52:08 +02:00
2022-04-05 13:58:12 +02:00
if (databaseOptions.spinning()) {
options
// method documentation
.setCompactionReadaheadSize(16 * SizeUnit.MB)
// guessed
.setWritableFileMaxBufferSize(16 * SizeUnit.MB);
}
2021-07-17 11:52:08 +02:00
if (databaseOptions.useDirectIO()) {
options
// Option to enable readahead in compaction
// If not set, it will be set to 2MB internally
2022-04-05 13:58:12 +02:00
.setCompactionReadaheadSize(2 * SizeUnit.MB) // recommend at least 2MB
2021-07-17 11:52:08 +02:00
// Option to tune write buffer for direct writes
2022-04-05 13:58:12 +02:00
.setWritableFileMaxBufferSize(2 * SizeUnit.MB)
2021-07-17 11:52:08 +02:00
;
}
2020-12-07 22:15:18 +01:00
} else {
// HIGH MEMORY
options
//.setDbWriteBufferSize(64 * SizeUnit.MB)
2022-03-22 11:50:30 +01:00
.setBytesPerSync(64 * SizeUnit.KB)
.setWalBytesPerSync(64 * SizeUnit.KB)
2022-01-12 16:18:31 +01:00
2022-04-11 20:04:27 +02:00
.setWalTtlSeconds(30) // flush wal after 30 seconds
.setWalSizeLimitMB(1024) // 1024MB
.setMaxTotalWalSize(2L * SizeUnit.GB) // 2GiB max wal directory size
2020-12-07 22:15:18 +01:00
;
// DO NOT USE ClockCache! IT'S BROKEN!
2022-04-11 16:53:17 +02:00
blockCache = new LRUCache(writeBufferManagerSize + databaseOptions.blockCache().orElse( 512 * SizeUnit.MB));
if (databaseOptions.compressedBlockCache().isPresent()) {
compressedCache = new LRUCache(databaseOptions.compressedBlockCache().get());
} else {
compressedCache = null;
}
2021-06-25 23:47:53 +02:00
2021-06-27 15:40:56 +02:00
if (databaseOptions.useDirectIO()) {
2021-06-25 23:47:53 +02:00
options
// Option to enable readahead in compaction
// If not set, it will be set to 2MB internally
2021-07-17 11:52:08 +02:00
.setCompactionReadaheadSize(4 * 1024 * 1024) // recommend at least 2MB
2021-06-25 23:47:53 +02:00
// Option to tune write buffer for direct writes
2021-07-17 11:52:08 +02:00
.setWritableFileMaxBufferSize(4 * 1024 * 1024)
2021-06-25 23:47:53 +02:00
;
}
2022-03-30 23:44:55 +02:00
options.setIncreaseParallelism(Runtime.getRuntime().availableProcessors());
2020-12-07 22:15:18 +01:00
}
2021-12-27 16:33:31 +01:00
2022-04-11 16:53:17 +02:00
if (databaseOptions.writeBufferManager().isPresent()) {
options.setWriteBufferManager(new WriteBufferManager(writeBufferManagerSize, blockCache));
}
2022-01-12 16:18:31 +01:00
2021-07-17 11:52:08 +02:00
if (databaseOptions.useDirectIO()) {
options
.setAllowMmapReads(false)
.setAllowMmapWrites(false)
.setUseDirectReads(true)
;
} else {
options
.setAllowMmapReads(databaseOptions.allowMemoryMapping())
.setAllowMmapWrites(databaseOptions.allowMemoryMapping());
}
if (!databaseOptions.allowMemoryMapping()) {
options.setUseDirectIoForFlushAndCompaction(true);
}
2020-12-07 22:15:18 +01:00
2022-03-12 02:55:18 +01:00
return new OptionsWithCache(options, blockCache, compressedCache);
2020-12-07 22:15:18 +01:00
}
record DbPathRecord(Path path, long targetSize) {}
private static List<DbPathRecord> convertPaths(Path databasesDirPath, Path path, List<DatabaseVolume> volumes) {
var paths = new ArrayList<DbPathRecord>(volumes.size());
2021-12-27 16:33:31 +01:00
if (volumes.isEmpty()) {
return List.of(new DbPathRecord(databasesDirPath.resolve(path.getFileName() + "_hot"),
2022-04-15 16:49:01 +02:00
0), // Legacy
new DbPathRecord(databasesDirPath.resolve(path.getFileName() + "_cold"),
2022-04-07 20:03:29 +02:00
0), // Legacy
new DbPathRecord(databasesDirPath.resolve(path.getFileName() + "_colder"),
2022-04-15 16:49:01 +02:00
1000L * 1024L * 1024L * 1024L) // 1000GiB
2022-04-07 20:03:29 +02:00
); // Legacy
2021-12-27 16:33:31 +01:00
}
for (DatabaseVolume volume : volumes) {
Path volumePath;
if (volume.volumePath().isAbsolute()) {
volumePath = volume.volumePath();
} else {
volumePath = databasesDirPath.resolve(volume.volumePath());
}
paths.add(new DbPathRecord(volumePath, volume.targetSizeBytes()));
2021-12-27 16:33:31 +01:00
}
return paths;
}
private void createIfNotExists(List<ColumnFamilyDescriptor> descriptors,
2022-03-10 02:38:57 +01:00
DBOptions options,
Cache standardCache,
Cache compressedCache,
2022-03-02 12:34:30 +01:00
boolean inMemory,
Path dbPath,
String dbPathString) throws RocksDBException {
2022-03-02 12:34:30 +01:00
if (inMemory) {
return;
}
2020-12-07 22:15:18 +01:00
if (Files.notExists(dbPath)) {
// Check if handles are all different
var descriptorsSet = new HashSet<>(descriptors);
if (descriptorsSet.size() != descriptors.size()) {
throw new IllegalArgumentException("Descriptors must be unique!");
}
List<ColumnFamilyDescriptor> descriptorsToCreate = new LinkedList<>(descriptors);
descriptorsToCreate
.removeIf((cf) -> Arrays.equals(cf.getName(), DEFAULT_COLUMN_FAMILY.getName()));
2021-01-30 00:24:55 +01:00
/*
SkipStatsUpdateOnDbOpen = true because this RocksDB.open session is used only to add just some columns
2020-12-07 22:15:18 +01:00
*/
//var dbOptionsFastLoadSlowEdit = options.setSkipStatsUpdateOnDbOpen(true);
LinkedList<ColumnFamilyHandle> handles = new LinkedList<>();
this.db = RocksDB.open(options.setCreateMissingColumnFamilies(true),
2022-03-11 17:59:46 +01:00
dbPathString,
descriptors,
handles
);
this.standardCache = standardCache;
this.compressedCache = compressedCache;
2020-12-07 22:15:18 +01:00
flushAndCloseDb(db, standardCache, compressedCache, handles);
2021-10-21 10:00:39 +02:00
this.db = null;
this.standardCache = null;
this.compressedCache = null;
2020-12-07 22:15:18 +01:00
}
}
@Override
2022-03-20 14:33:27 +01:00
public Mono<LLLocalSingleton> getSingleton(byte[] singletonListColumnName,
byte[] name,
byte @Nullable[] defaultValue) {
2021-01-31 15:47:48 +01:00
return Mono
2021-11-12 02:05:44 +01:00
.fromCallable(() -> new LLLocalSingleton(
getRocksDBColumn(db, getCfh(singletonListColumnName)),
2021-01-31 15:47:48 +01:00
(snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()),
LLLocalKeyValueDatabase.this.name,
name,
2022-03-20 14:33:27 +01:00
ColumnUtils.toString(singletonListColumnName),
2022-04-05 13:58:12 +02:00
dbWScheduler, dbRScheduler, defaultValue
2021-01-31 15:47:48 +01:00
))
.onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(name), cause))
2022-04-05 13:58:12 +02:00
.subscribeOn(dbRScheduler);
2020-12-07 22:15:18 +01:00
}
@Override
2021-02-13 01:31:24 +01:00
public Mono<LLLocalDictionary> getDictionary(byte[] columnName, UpdateMode updateMode) {
2021-01-31 15:47:48 +01:00
return Mono
2021-06-26 02:35:33 +02:00
.fromCallable(() -> new LLLocalDictionary(
allocator,
2021-10-20 01:51:34 +02:00
getRocksDBColumn(db, getCfh(columnName)),
2021-06-26 02:35:33 +02:00
name,
2022-03-02 12:34:30 +01:00
ColumnUtils.toString(columnName),
2022-04-05 13:58:12 +02:00
dbWScheduler,
dbRScheduler,
2021-06-26 02:35:33 +02:00
(snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()),
2021-06-29 23:31:02 +02:00
updateMode,
databaseOptions
))
2022-04-05 13:58:12 +02:00
.subscribeOn(dbRScheduler);
2020-12-07 22:15:18 +01:00
}
2022-03-22 12:59:22 +01:00
public RocksDBColumn getRocksDBColumn(byte[] columnName) {
ColumnFamilyHandle cfh;
try {
cfh = getCfh(columnName);
} catch (RocksDBException e) {
throw new UnsupportedOperationException("Column family doesn't exist: " + Arrays.toString(columnName), e);
}
return getRocksDBColumn(db, cfh);
}
2021-10-20 01:51:34 +02:00
private RocksDBColumn getRocksDBColumn(RocksDB db, ColumnFamilyHandle cfh) {
var nettyDirect = databaseOptions.allowNettyDirect();
var closeLock = getCloseLock();
2021-10-20 01:51:34 +02:00
if (db instanceof OptimisticTransactionDB optimisticTransactionDB) {
return new OptimisticRocksDBColumn(optimisticTransactionDB,
nettyDirect,
allocator,
name,
cfh,
meterRegistry,
closeLock
);
2021-12-27 18:44:54 +01:00
} else if (db instanceof TransactionDB transactionDB) {
return new PessimisticRocksDBColumn(transactionDB,
nettyDirect,
allocator,
name,
cfh,
meterRegistry,
closeLock
);
2021-10-20 01:51:34 +02:00
} else {
return new StandardRocksDBColumn(db, nettyDirect, allocator, name, cfh, meterRegistry, closeLock);
2021-10-20 01:51:34 +02:00
}
}
2021-06-19 16:26:54 +02:00
private ColumnFamilyHandle getCfh(byte[] columnName) throws RocksDBException {
2022-03-02 12:34:30 +01:00
ColumnFamilyHandle cfh = handles.get(ColumnUtils.special(ColumnUtils.toString(columnName)));
2021-08-28 22:42:51 +02:00
assert enableColumnsBug || Arrays.equals(cfh.getName(), columnName);
2021-06-19 16:26:54 +02:00
return cfh;
}
2021-06-27 15:40:56 +02:00
public DatabaseOptions getDatabaseOptions() {
return databaseOptions;
}
2020-12-07 22:15:18 +01:00
@Override
2021-01-31 15:47:48 +01:00
public Mono<Long> getProperty(String propertyName) {
return Mono.fromCallable(() -> db.getAggregatedLongProperty(propertyName))
.onErrorMap(cause -> new IOException("Failed to read " + propertyName, cause))
2022-04-05 13:58:12 +02:00
.subscribeOn(dbRScheduler);
2020-12-07 22:15:18 +01:00
}
public Flux<Path> getSSTS() {
var paths = convertPaths(dbPath.toAbsolutePath().getParent(), dbPath.getFileName(), databaseOptions.volumes());
return Mono
.fromCallable(() -> db.getLiveFiles())
.flatMapIterable(liveFiles -> liveFiles.files)
.filter(file -> file.endsWith(".sst"))
.map(file -> file.substring(1))
.flatMapSequential(file -> Mono.fromCallable(() -> {
{
var path = dbPath.resolve(file);
if (Files.exists(path)) {
return path;
}
}
for (var volumePath : paths) {
var path = volumePath.path().resolve(file);
if (Files.exists(path)) {
return path;
}
}
return null;
}).subscribeOn(Schedulers.boundedElastic()));
}
public Mono<Void> ingestSSTS(Flux<Path> sstsFlux) {
return sstsFlux
.map(path -> path.toAbsolutePath().toString())
.flatMap(sst -> Mono.fromCallable(() -> {
try (var opts = new IngestExternalFileOptions()) {
try {
logger.info("Ingesting SST \"{}\"...", sst);
db.ingestExternalFile(List.of(sst), opts);
logger.info("Ingested SST \"{}\" successfully", sst);
} catch (RocksDBException e) {
logger.error("Can't ingest SST \"{}\"", sst, e);
}
}
return null;
}).subscribeOn(Schedulers.boundedElastic()))
.then();
}
2022-01-15 20:00:10 +01:00
@Override
public Mono<MemoryStats> getMemoryStats() {
return Mono
2022-04-09 02:45:42 +02:00
.fromCallable(() -> {
if (!closed) {
return new MemoryStats(db.getAggregatedLongProperty("rocksdb.estimate-table-readers-mem"),
db.getAggregatedLongProperty("rocksdb.size-all-mem-tables"),
db.getAggregatedLongProperty("rocksdb.cur-size-all-mem-tables"),
db.getAggregatedLongProperty("rocksdb.estimate-num-keys"),
2022-04-11 16:41:13 +02:00
db.getAggregatedLongProperty("rocksdb.block-cache-usage") / this.handles.size(),
db.getAggregatedLongProperty("rocksdb.block-cache-pinned-usage") / this.handles.size()
2022-04-09 02:45:42 +02:00
);
} else {
return null;
}
})
2022-01-15 20:00:10 +01:00
.onErrorMap(cause -> new IOException("Failed to read memory stats", cause))
2022-04-05 13:58:12 +02:00
.subscribeOn(dbRScheduler);
2022-04-09 02:45:42 +02:00
}
@Override
public Mono<String> getRocksDBStats() {
return Mono
.fromCallable(() -> {
if (!closed) {
2022-04-10 20:15:05 +02:00
StringBuilder aggregatedStats = new StringBuilder();
for (var entry : this.handles.entrySet()) {
aggregatedStats
.append(entry.getKey().name())
.append("\n")
.append(db.getProperty(entry.getValue(), "rocksdb.stats"))
.append("\n");
}
return aggregatedStats.toString();
2022-04-09 02:45:42 +02:00
} else {
return null;
}
})
.onErrorMap(cause -> new IOException("Failed to read stats", cause))
.subscribeOn(dbRScheduler);
}
@Override
public Flux<TableWithProperties> getTableProperties() {
2022-04-10 20:15:05 +02:00
return Flux
.fromIterable(handles.entrySet())
.flatMapSequential(handle -> Mono
.fromCallable(() -> {
if (!closed) {
return db.getPropertiesOfAllTables(handle.getValue());
} else {
return null;
}
})
.subscribeOn(dbRScheduler)
.flatMapIterable(Map::entrySet)
.map(entry -> new TableWithProperties(handle.getKey().name(), entry.getKey(), entry.getValue()))
)
.onErrorMap(cause -> new IOException("Failed to read stats", cause));
2022-01-15 20:00:10 +01:00
}
2021-06-27 15:06:48 +02:00
@Override
public Mono<Void> verifyChecksum() {
return Mono
.<Void>fromCallable(() -> {
db.verifyChecksum();
return null;
})
.onErrorMap(cause -> new IOException("Failed to verify checksum of database \""
+ getDatabaseName() + "\"", cause))
2022-04-05 13:58:12 +02:00
.subscribeOn(dbRScheduler);
2021-06-27 15:06:48 +02:00
}
@Override
public Mono<Void> compact() {
return Mono.<Void>fromCallable(() -> {
this.forceCompaction(getLastVolumeId());
return null;
}).subscribeOn(dbWScheduler);
}
@Override
public Mono<Void> flush() {
return Mono.<Void>fromCallable(() -> {
try (var fo = new FlushOptions().setWaitForFlush(true)) {
this.flush(fo);
return null;
}
}).subscribeOn(dbWScheduler);
}
2021-05-03 21:41:51 +02:00
@Override
2021-08-29 23:18:03 +02:00
public BufferAllocator getAllocator() {
2021-05-03 21:41:51 +02:00
return allocator;
}
@Override
public MeterRegistry getMeterRegistry() {
return meterRegistry;
}
2020-12-07 22:15:18 +01:00
@Override
2021-01-30 01:42:37 +01:00
public Mono<LLSnapshot> takeSnapshot() {
return Mono
2021-12-30 18:20:56 +01:00
.fromCallable(() -> snapshotTime.recordCallable(() -> {
var closeReadLock = closeLock.readLock();
try {
if (closed) {
throw new IllegalStateException("Database closed");
}
var snapshot = db.getSnapshot();
long currentSnapshotSequenceNumber = nextSnapshotNumbers.getAndIncrement();
this.snapshotsHandles.put(currentSnapshotSequenceNumber, snapshot);
return new LLSnapshot(currentSnapshotSequenceNumber);
} finally {
closeLock.unlockRead(closeReadLock);
}
2021-12-30 18:20:56 +01:00
}))
2022-04-05 13:58:12 +02:00
.subscribeOn(dbRScheduler);
2020-12-07 22:15:18 +01:00
}
@Override
2021-01-30 01:42:37 +01:00
public Mono<Void> releaseSnapshot(LLSnapshot snapshot) {
return Mono
.<Void>fromCallable(() -> {
var closeReadLock = closeLock.readLock();
try {
if (closed) {
throw new IllegalStateException("Database closed");
}
Snapshot dbSnapshot = this.snapshotsHandles.remove(snapshot.getSequenceNumber());
if (dbSnapshot == null) {
throw new IOException("Snapshot " + snapshot.getSequenceNumber() + " not found!");
}
if (!db.isOwningHandle()) {
return null;
}
db.releaseSnapshot(dbSnapshot);
return null;
} finally {
closeLock.unlockRead(closeReadLock);
}
})
2022-04-05 13:58:12 +02:00
.subscribeOn(dbRScheduler);
2020-12-07 22:15:18 +01:00
}
@Override
2021-01-31 19:52:47 +01:00
public Mono<Void> close() {
return Mono
.<Void>fromCallable(() -> {
try {
flushAndCloseDb(db, standardCache, compressedCache, new ArrayList<>(handles.values()));
2021-01-31 19:52:47 +01:00
deleteUnusedOldLogFiles();
} catch (RocksDBException e) {
throw new IOException(e);
}
return null;
})
.onErrorMap(cause -> new IOException("Failed to close", cause))
2022-04-05 13:58:12 +02:00
.subscribeOn(dbWScheduler);
2020-12-07 22:15:18 +01:00
}
/**
* Call this method ONLY AFTER flushing completely a db and closing it!
*/
2021-01-30 00:24:55 +01:00
@SuppressWarnings("unused")
2020-12-07 22:15:18 +01:00
private void deleteUnusedOldLogFiles() {
if (!DELETE_LOG_FILES) {
return;
}
2020-12-07 22:15:18 +01:00
Path basePath = dbPath;
try {
Files
.walk(basePath, 1)
.filter(p -> !p.equals(basePath))
.filter(p -> {
var fileName = p.getFileName().toString();
if (fileName.startsWith("LOG.old.")) {
var parts = fileName.split("\\.");
if (parts.length == 3) {
try {
long nameSuffix = Long.parseUnsignedLong(parts[2]);
return true;
} catch (NumberFormatException ex) {
return false;
}
}
}
if (fileName.endsWith(".log")) {
var parts = fileName.split("\\.");
if (parts.length == 2) {
try {
int name = Integer.parseUnsignedInt(parts[0]);
return true;
} catch (NumberFormatException ex) {
return false;
}
}
}
return false;
})
.filter(p -> {
try {
BasicFileAttributes attrs = Files.readAttributes(p, BasicFileAttributes.class);
if (attrs.isRegularFile() && !attrs.isSymbolicLink() && !attrs.isDirectory()) {
long ctime = attrs.creationTime().toMillis();
long atime = attrs.lastAccessTime().toMillis();
long mtime = attrs.lastModifiedTime().toMillis();
long lastTime = Math.max(Math.max(ctime, atime), mtime);
long safeTime;
if (p.getFileName().toString().startsWith("LOG.old.")) {
safeTime = System.currentTimeMillis() - Duration.ofHours(24).toMillis();
} else {
safeTime = System.currentTimeMillis() - Duration.ofHours(12).toMillis();
}
if (lastTime < safeTime) {
return true;
}
}
} catch (IOException ex) {
2021-03-19 20:55:38 +01:00
logger.error("Error when deleting unused log files", ex);
2020-12-07 22:15:18 +01:00
return false;
}
return false;
})
.forEach(path -> {
try {
Files.deleteIfExists(path);
System.out.println("Deleted log file \"" + path + "\"");
} catch (IOException e) {
2021-09-10 12:13:52 +02:00
logger.error(MARKER_ROCKSDB, "Failed to delete log file \"" + path + "\"", e);
2020-12-07 22:15:18 +01:00
}
});
} catch (IOException ex) {
2021-09-10 12:13:52 +02:00
logger.error(MARKER_ROCKSDB, "Failed to delete unused log files", ex);
2020-12-07 22:15:18 +01:00
}
}
2020-12-07 22:15:18 +01:00
}