2020-12-07 22:15:18 +01:00
|
|
|
package it.cavallium.dbengine.database.disk;
|
|
|
|
|
2022-03-16 13:47:56 +01:00
|
|
|
import static io.netty5.buffer.api.StandardAllocationTypes.OFF_HEAP;
|
2021-09-10 12:13:52 +02:00
|
|
|
import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
|
2022-06-09 00:49:08 +02:00
|
|
|
import static java.lang.Boolean.parseBoolean;
|
2022-05-04 01:21:56 +02:00
|
|
|
import static java.util.Objects.requireNonNull;
|
2022-04-04 22:55:28 +02:00
|
|
|
import static org.rocksdb.ColumnFamilyOptionsInterface.DEFAULT_COMPACTION_MEMTABLE_MEMORY_BUDGET;
|
2021-09-10 12:13:52 +02:00
|
|
|
|
2021-10-30 11:13:46 +02:00
|
|
|
import io.micrometer.core.instrument.MeterRegistry;
|
2022-01-15 20:00:10 +01:00
|
|
|
import io.micrometer.core.instrument.Tag;
|
2021-12-30 18:20:56 +01:00
|
|
|
import io.micrometer.core.instrument.Timer;
|
2022-03-16 13:47:56 +01:00
|
|
|
import io.netty5.buffer.api.BufferAllocator;
|
|
|
|
import io.netty5.util.internal.PlatformDependent;
|
2022-04-15 02:41:06 +02:00
|
|
|
import it.cavallium.data.generator.nativedata.NullableString;
|
2022-01-15 20:00:10 +01:00
|
|
|
import it.cavallium.dbengine.client.MemoryStats;
|
2022-05-04 01:21:56 +02:00
|
|
|
import it.cavallium.dbengine.database.ColumnProperty;
|
2022-03-02 12:34:30 +01:00
|
|
|
import it.cavallium.dbengine.database.ColumnUtils;
|
2021-01-30 00:24:55 +01:00
|
|
|
import it.cavallium.dbengine.database.LLKeyValueDatabase;
|
|
|
|
import it.cavallium.dbengine.database.LLSnapshot;
|
2022-01-26 14:22:54 +01:00
|
|
|
import it.cavallium.dbengine.database.LLUtils;
|
2022-05-04 01:21:56 +02:00
|
|
|
import it.cavallium.dbengine.database.RocksDBLongProperty;
|
|
|
|
import it.cavallium.dbengine.database.RocksDBMapProperty;
|
|
|
|
import it.cavallium.dbengine.database.RocksDBStringProperty;
|
2022-04-09 02:45:42 +02:00
|
|
|
import it.cavallium.dbengine.database.TableWithProperties;
|
2021-02-13 01:31:24 +01:00
|
|
|
import it.cavallium.dbengine.database.UpdateMode;
|
2022-03-02 12:34:30 +01:00
|
|
|
import it.cavallium.dbengine.rpc.current.data.Column;
|
2022-04-08 14:32:47 +02:00
|
|
|
import it.cavallium.dbengine.rpc.current.data.ColumnOptions;
|
2022-03-22 11:50:30 +01:00
|
|
|
import it.cavallium.dbengine.rpc.current.data.DatabaseLevel;
|
2022-03-02 12:34:30 +01:00
|
|
|
import it.cavallium.dbengine.rpc.current.data.DatabaseOptions;
|
|
|
|
import it.cavallium.dbengine.rpc.current.data.DatabaseVolume;
|
2022-04-08 14:32:47 +02:00
|
|
|
import it.cavallium.dbengine.rpc.current.data.NamedColumnOptions;
|
2020-12-07 22:15:18 +01:00
|
|
|
import java.io.File;
|
|
|
|
import java.io.IOException;
|
|
|
|
import java.nio.charset.StandardCharsets;
|
|
|
|
import java.nio.file.Files;
|
|
|
|
import java.nio.file.Path;
|
|
|
|
import java.nio.file.Paths;
|
|
|
|
import java.nio.file.attribute.BasicFileAttributes;
|
|
|
|
import java.time.Duration;
|
|
|
|
import java.util.ArrayList;
|
|
|
|
import java.util.Arrays;
|
|
|
|
import java.util.HashMap;
|
|
|
|
import java.util.List;
|
|
|
|
import java.util.Map;
|
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
2021-03-21 13:06:54 +01:00
|
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
|
|
import java.util.concurrent.TimeUnit;
|
2020-12-07 22:15:18 +01:00
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
2022-04-30 01:49:44 +02:00
|
|
|
import java.util.concurrent.locks.StampedLock;
|
2022-04-08 14:32:47 +02:00
|
|
|
import java.util.stream.Collectors;
|
2022-04-15 16:49:01 +02:00
|
|
|
import java.util.stream.Stream;
|
2021-03-21 13:06:54 +01:00
|
|
|
import org.apache.commons.lang3.time.StopWatch;
|
2021-12-17 01:48:49 +01:00
|
|
|
import org.apache.logging.log4j.LogManager;
|
|
|
|
import org.apache.logging.log4j.Logger;
|
2021-07-10 20:52:01 +02:00
|
|
|
import org.jetbrains.annotations.Nullable;
|
2022-05-20 10:20:00 +02:00
|
|
|
import org.rocksdb.AbstractImmutableNativeReference;
|
2020-12-07 22:15:18 +01:00
|
|
|
import org.rocksdb.BlockBasedTableConfig;
|
|
|
|
import org.rocksdb.BloomFilter;
|
2022-01-12 16:18:31 +01:00
|
|
|
import org.rocksdb.Cache;
|
2022-03-19 00:08:23 +01:00
|
|
|
import org.rocksdb.ChecksumType;
|
2020-12-07 22:15:18 +01:00
|
|
|
import org.rocksdb.ColumnFamilyDescriptor;
|
|
|
|
import org.rocksdb.ColumnFamilyHandle;
|
2022-03-10 02:38:57 +01:00
|
|
|
import org.rocksdb.ColumnFamilyOptions;
|
2021-03-19 20:55:38 +01:00
|
|
|
import org.rocksdb.CompactRangeOptions;
|
2021-05-04 01:21:29 +02:00
|
|
|
import org.rocksdb.CompactionPriority;
|
2022-01-12 16:18:31 +01:00
|
|
|
import org.rocksdb.CompressionOptions;
|
2020-12-07 22:15:18 +01:00
|
|
|
import org.rocksdb.CompressionType;
|
|
|
|
import org.rocksdb.DBOptions;
|
2022-04-19 23:23:32 +02:00
|
|
|
import org.rocksdb.DataBlockIndexType;
|
2020-12-07 22:15:18 +01:00
|
|
|
import org.rocksdb.DbPath;
|
2022-04-15 02:41:06 +02:00
|
|
|
import org.rocksdb.Env;
|
2020-12-07 22:15:18 +01:00
|
|
|
import org.rocksdb.FlushOptions;
|
2021-07-17 11:52:08 +02:00
|
|
|
import org.rocksdb.IndexType;
|
2022-03-10 02:38:57 +01:00
|
|
|
import org.rocksdb.InfoLogLevel;
|
2022-04-26 17:12:22 +02:00
|
|
|
import org.rocksdb.IngestExternalFileOptions;
|
2022-04-11 01:27:09 +02:00
|
|
|
import org.rocksdb.LRUCache;
|
2021-10-17 19:52:43 +02:00
|
|
|
import org.rocksdb.OptimisticTransactionDB;
|
2022-04-15 02:41:06 +02:00
|
|
|
import org.rocksdb.PersistentCache;
|
2020-12-07 22:15:18 +01:00
|
|
|
import org.rocksdb.RocksDB;
|
|
|
|
import org.rocksdb.RocksDBException;
|
|
|
|
import org.rocksdb.Snapshot;
|
2022-05-09 22:08:54 +02:00
|
|
|
import org.rocksdb.Statistics;
|
|
|
|
import org.rocksdb.StatsLevel;
|
|
|
|
import org.rocksdb.TickerType;
|
2021-10-17 19:52:43 +02:00
|
|
|
import org.rocksdb.TransactionDB;
|
2021-12-27 18:44:54 +01:00
|
|
|
import org.rocksdb.TransactionDBOptions;
|
|
|
|
import org.rocksdb.TxnDBWritePolicy;
|
2020-12-07 22:15:18 +01:00
|
|
|
import org.rocksdb.WALRecoveryMode;
|
2021-03-20 12:41:11 +01:00
|
|
|
import org.rocksdb.WriteBufferManager;
|
2022-01-12 16:18:31 +01:00
|
|
|
import org.rocksdb.util.SizeUnit;
|
2022-05-10 00:31:16 +02:00
|
|
|
import it.cavallium.dbengine.utils.ShortNamedThreadFactory;
|
2022-04-09 02:45:42 +02:00
|
|
|
import reactor.core.publisher.Flux;
|
2021-01-30 01:42:37 +01:00
|
|
|
import reactor.core.publisher.Mono;
|
2021-02-01 02:21:53 +01:00
|
|
|
import reactor.core.scheduler.Scheduler;
|
2021-01-30 01:42:37 +01:00
|
|
|
import reactor.core.scheduler.Schedulers;
|
2020-12-07 22:15:18 +01:00
|
|
|
|
|
|
|
public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
|
|
|
|
|
2022-03-22 00:23:32 +01:00
|
|
|
private static final boolean DELETE_LOG_FILES = false;
|
2022-05-02 18:48:44 +02:00
|
|
|
private static final boolean FOLLOW_ROCKSDB_OPTIMIZATIONS = true;
|
2022-03-22 00:23:32 +01:00
|
|
|
|
2020-12-07 22:15:18 +01:00
|
|
|
static {
|
|
|
|
RocksDB.loadLibrary();
|
2022-01-26 14:22:54 +01:00
|
|
|
LLUtils.initHooks();
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
2021-12-17 01:48:49 +01:00
|
|
|
protected static final Logger logger = LogManager.getLogger(LLLocalKeyValueDatabase.class);
|
2020-12-07 22:15:18 +01:00
|
|
|
|
2021-08-29 23:18:03 +02:00
|
|
|
private final BufferAllocator allocator;
|
2021-10-30 11:13:46 +02:00
|
|
|
private final MeterRegistry meterRegistry;
|
2022-04-05 13:58:12 +02:00
|
|
|
private final Scheduler dbWScheduler;
|
|
|
|
private final Scheduler dbRScheduler;
|
2021-06-27 15:40:56 +02:00
|
|
|
|
2021-12-30 18:20:56 +01:00
|
|
|
private final Timer snapshotTime;
|
|
|
|
|
2021-06-27 15:40:56 +02:00
|
|
|
// Configurations
|
|
|
|
|
2020-12-07 22:15:18 +01:00
|
|
|
private final Path dbPath;
|
|
|
|
private final String name;
|
2021-06-27 15:40:56 +02:00
|
|
|
private final DatabaseOptions databaseOptions;
|
|
|
|
|
2021-06-19 16:26:54 +02:00
|
|
|
private final boolean enableColumnsBug;
|
2022-05-22 16:48:08 +02:00
|
|
|
private final RocksDBRefs refs = new RocksDBRefs();
|
2021-10-20 01:51:34 +02:00
|
|
|
private RocksDB db;
|
2022-05-09 22:08:54 +02:00
|
|
|
private Statistics statistics;
|
2022-04-11 01:27:09 +02:00
|
|
|
private Cache standardCache;
|
|
|
|
private Cache compressedCache;
|
2022-05-20 10:20:00 +02:00
|
|
|
private final Map<Column, ColumnFamilyHandle> handles;
|
2022-04-15 02:41:06 +02:00
|
|
|
|
|
|
|
private final HashMap<String, PersistentCache> persistentCaches;
|
2022-05-20 10:20:00 +02:00
|
|
|
private final ConcurrentHashMap<Long, Snapshot> snapshotsHandles = new ConcurrentHashMap<>();
|
2020-12-07 22:15:18 +01:00
|
|
|
private final AtomicLong nextSnapshotNumbers = new AtomicLong(1);
|
2022-04-30 01:49:44 +02:00
|
|
|
private final StampedLock closeLock = new StampedLock();
|
2022-06-08 18:52:15 +02:00
|
|
|
private volatile boolean closeRequested = false;
|
2022-01-15 20:00:10 +01:00
|
|
|
private volatile boolean closed = false;
|
2020-12-07 22:15:18 +01:00
|
|
|
|
2021-06-27 15:40:56 +02:00
|
|
|
@SuppressWarnings("SwitchStatementWithTooFewBranches")
|
2021-08-29 23:18:03 +02:00
|
|
|
public LLLocalKeyValueDatabase(BufferAllocator allocator,
|
2021-10-30 11:13:46 +02:00
|
|
|
MeterRegistry meterRegistry,
|
2021-05-03 21:41:51 +02:00
|
|
|
String name,
|
2022-03-02 12:34:30 +01:00
|
|
|
boolean inMemory,
|
2021-07-10 20:52:01 +02:00
|
|
|
@Nullable Path path,
|
2021-05-03 21:41:51 +02:00
|
|
|
List<Column> columns,
|
|
|
|
List<ColumnFamilyHandle> handles,
|
2021-06-27 15:40:56 +02:00
|
|
|
DatabaseOptions databaseOptions) throws IOException {
|
|
|
|
this.name = name;
|
2021-05-03 21:41:51 +02:00
|
|
|
this.allocator = allocator;
|
2022-05-01 15:42:51 +02:00
|
|
|
boolean nettyDirect = databaseOptions.allowNettyDirect() && allocator.getAllocationType() == OFF_HEAP;
|
2021-10-30 11:13:46 +02:00
|
|
|
this.meterRegistry = meterRegistry;
|
2021-09-02 17:15:40 +02:00
|
|
|
|
2021-12-30 18:20:56 +01:00
|
|
|
this.snapshotTime = Timer
|
|
|
|
.builder("db.snapshot.timer")
|
|
|
|
.publishPercentiles(0.2, 0.5, 0.95)
|
|
|
|
.publishPercentileHistogram()
|
|
|
|
.tags("db.name", name)
|
|
|
|
.register(meterRegistry);
|
|
|
|
|
2021-10-30 12:39:56 +02:00
|
|
|
if (nettyDirect) {
|
2021-09-02 17:15:40 +02:00
|
|
|
if (!PlatformDependent.hasUnsafe()) {
|
|
|
|
throw new UnsupportedOperationException("Please enable unsafe support or disable netty direct buffers",
|
|
|
|
PlatformDependent.getUnsafeUnavailabilityCause()
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-04-15 16:49:01 +02:00
|
|
|
this.enableColumnsBug = "true".equals(databaseOptions.extraFlags().getOrDefault("enableColumnBug", "false"));
|
|
|
|
|
|
|
|
if (!enableColumnsBug) {
|
|
|
|
if (columns.stream().noneMatch(column -> column.name().equals("default"))) {
|
|
|
|
columns = Stream.concat(Stream.of(Column.of("default")), columns.stream()).toList();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-05-22 16:48:08 +02:00
|
|
|
OptionsWithCache optionsWithCache = openRocksDb(path, databaseOptions, refs);
|
2022-03-10 02:38:57 +01:00
|
|
|
var rocksdbOptions = optionsWithCache.options();
|
2020-12-07 22:15:18 +01:00
|
|
|
try {
|
2022-03-22 11:50:30 +01:00
|
|
|
List<ColumnFamilyDescriptor> descriptors = new ArrayList<>();
|
2022-03-10 02:38:57 +01:00
|
|
|
|
2022-05-22 16:48:08 +02:00
|
|
|
var defaultColumnOptions = new ColumnFamilyOptions();
|
|
|
|
refs.track(defaultColumnOptions);
|
|
|
|
descriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, defaultColumnOptions));
|
2022-04-08 14:32:47 +02:00
|
|
|
|
|
|
|
// Check column names validity
|
|
|
|
for (NamedColumnOptions columnOption : databaseOptions.columnOptions()) {
|
|
|
|
if (columns.stream().map(Column::name).noneMatch(columnName -> columnName.equals(columnOption.columnName()))) {
|
|
|
|
throw new IllegalArgumentException(
|
|
|
|
"Column " + columnOption.columnName() + " does not exist. Available columns: " + columns
|
|
|
|
.stream()
|
|
|
|
.map(Column::name)
|
|
|
|
.collect(Collectors.joining(", ", "[", "]")));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-06-08 18:52:15 +02:00
|
|
|
var rocksLogger = new RocksLog4jLogger(rocksdbOptions, logger);
|
2022-04-15 02:41:06 +02:00
|
|
|
this.persistentCaches = new HashMap<>();
|
|
|
|
|
2020-12-07 22:15:18 +01:00
|
|
|
for (Column column : columns) {
|
2022-04-08 14:32:47 +02:00
|
|
|
var columnFamilyOptions = new ColumnFamilyOptions();
|
2022-05-22 16:48:08 +02:00
|
|
|
refs.track(columnFamilyOptions);
|
2022-04-08 14:32:47 +02:00
|
|
|
|
|
|
|
var columnOptions = databaseOptions
|
|
|
|
.columnOptions()
|
|
|
|
.stream()
|
|
|
|
.filter(opts -> opts.columnName().equals(column.name()))
|
|
|
|
.findFirst()
|
|
|
|
.map(opts -> (ColumnOptions) opts)
|
|
|
|
.orElse(databaseOptions.defaultColumnOptions());
|
2022-03-10 02:38:57 +01:00
|
|
|
|
|
|
|
//noinspection ConstantConditions
|
2022-04-08 14:32:47 +02:00
|
|
|
if (columnOptions.memtableMemoryBudgetBytes() != null) {
|
2022-04-04 22:55:28 +02:00
|
|
|
// about 512MB of ram will be used for level style compaction
|
2022-04-08 14:32:47 +02:00
|
|
|
columnFamilyOptions.optimizeLevelStyleCompaction(columnOptions.memtableMemoryBudgetBytes().orElse(
|
2022-04-04 22:55:28 +02:00
|
|
|
databaseOptions.lowMemory()
|
|
|
|
? (DEFAULT_COMPACTION_MEMTABLE_MEMORY_BUDGET / 4)
|
|
|
|
: DEFAULT_COMPACTION_MEMTABLE_MEMORY_BUDGET));
|
2022-03-10 02:38:57 +01:00
|
|
|
}
|
|
|
|
|
2022-04-28 11:35:01 +02:00
|
|
|
if (isDisableAutoCompactions()) {
|
2022-04-26 17:12:22 +02:00
|
|
|
columnFamilyOptions.setDisableAutoCompactions(true);
|
|
|
|
}
|
2022-06-09 00:13:44 +02:00
|
|
|
var blobFiles = columnOptions.blobFiles();
|
|
|
|
columnFamilyOptions.setEnableBlobFiles(blobFiles);
|
|
|
|
if (blobFiles) {
|
|
|
|
if (columnOptions.blobFileSize().isPresent()) {
|
|
|
|
columnFamilyOptions.setBlobFileSize(columnOptions.blobFileSize().get());
|
|
|
|
}
|
|
|
|
if (columnOptions.minBlobSize().isPresent()) {
|
|
|
|
columnFamilyOptions.setMinBlobSize(columnOptions.minBlobSize().get());
|
|
|
|
}
|
|
|
|
if (columnOptions.blobCompressionType().isPresent()) {
|
|
|
|
columnFamilyOptions.setCompressionType(columnOptions.blobCompressionType().get().getType());
|
|
|
|
} else {
|
|
|
|
columnFamilyOptions.setCompressionType(CompressionType.LZ4_COMPRESSION);
|
|
|
|
}
|
|
|
|
columnFamilyOptions.setBlobCompactionReadaheadSize(4 * SizeUnit.MB);
|
|
|
|
columnFamilyOptions.setEnableBlobGarbageCollection(true);
|
2022-06-01 17:36:21 +02:00
|
|
|
}
|
2022-05-01 15:42:51 +02:00
|
|
|
|
2022-04-11 01:27:09 +02:00
|
|
|
// This option is not supported with multiple db paths
|
2022-05-01 15:42:51 +02:00
|
|
|
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
|
|
|
|
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
|
|
|
|
boolean dynamicLevelBytes = databaseOptions.volumes().size() <= 1;
|
2022-04-26 17:12:22 +02:00
|
|
|
if (dynamicLevelBytes) {
|
2022-05-01 15:42:51 +02:00
|
|
|
columnFamilyOptions.setLevelCompactionDynamicLevelBytes(true);
|
2022-04-26 17:12:22 +02:00
|
|
|
} else {
|
|
|
|
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
|
|
|
|
// https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html
|
|
|
|
columnFamilyOptions.setMaxBytesForLevelBase(256 * SizeUnit.MB);
|
|
|
|
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
|
|
|
|
columnFamilyOptions.setMaxBytesForLevelMultiplier(10);
|
2022-04-11 01:27:09 +02:00
|
|
|
}
|
2022-04-28 11:35:01 +02:00
|
|
|
if (isDisableAutoCompactions()) {
|
|
|
|
columnFamilyOptions.setLevel0FileNumCompactionTrigger(-1);
|
|
|
|
} else {
|
2022-05-02 18:48:44 +02:00
|
|
|
if (!FOLLOW_ROCKSDB_OPTIMIZATIONS) {
|
|
|
|
// ArangoDB uses a value of 2: https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
|
|
|
|
// Higher values speed up writes, but slow down reads
|
|
|
|
columnFamilyOptions.setLevel0FileNumCompactionTrigger(2);
|
|
|
|
}
|
2022-04-28 11:35:01 +02:00
|
|
|
}
|
2022-04-28 23:23:26 +02:00
|
|
|
if (isDisableSlowdown()) {
|
2022-04-26 17:12:22 +02:00
|
|
|
columnFamilyOptions.setLevel0SlowdownWritesTrigger(-1);
|
|
|
|
columnFamilyOptions.setLevel0StopWritesTrigger(Integer.MAX_VALUE);
|
2022-04-28 23:23:26 +02:00
|
|
|
columnFamilyOptions.setHardPendingCompactionBytesLimit(Long.MAX_VALUE);
|
|
|
|
columnFamilyOptions.setSoftPendingCompactionBytesLimit(Long.MAX_VALUE);
|
2022-04-26 17:12:22 +02:00
|
|
|
} else {
|
|
|
|
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
|
|
|
|
columnFamilyOptions.setLevel0SlowdownWritesTrigger(20);
|
|
|
|
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
|
|
|
|
columnFamilyOptions.setLevel0StopWritesTrigger(36);
|
|
|
|
}
|
2022-04-04 11:16:20 +02:00
|
|
|
|
2022-04-08 14:32:47 +02:00
|
|
|
if (!columnOptions.levels().isEmpty()) {
|
2022-04-30 01:49:44 +02:00
|
|
|
columnFamilyOptions.setNumLevels(columnOptions.levels().size());
|
2022-05-22 16:48:08 +02:00
|
|
|
var firstLevelOptions = getRocksLevelOptions(columnOptions.levels().get(0), refs);
|
2022-04-08 14:32:47 +02:00
|
|
|
columnFamilyOptions.setCompressionType(firstLevelOptions.compressionType);
|
|
|
|
columnFamilyOptions.setCompressionOptions(firstLevelOptions.compressionOptions);
|
2022-03-22 11:50:30 +01:00
|
|
|
|
2022-04-08 14:32:47 +02:00
|
|
|
var lastLevelOptions = getRocksLevelOptions(columnOptions
|
2022-03-22 11:50:30 +01:00
|
|
|
.levels()
|
2022-05-22 16:48:08 +02:00
|
|
|
.get(columnOptions.levels().size() - 1), refs);
|
2022-04-08 14:32:47 +02:00
|
|
|
columnFamilyOptions.setBottommostCompressionType(lastLevelOptions.compressionType);
|
|
|
|
columnFamilyOptions.setBottommostCompressionOptions(lastLevelOptions.compressionOptions);
|
2022-03-22 11:50:30 +01:00
|
|
|
|
2022-04-08 14:32:47 +02:00
|
|
|
columnFamilyOptions.setCompressionPerLevel(columnOptions
|
2022-03-22 11:50:30 +01:00
|
|
|
.levels()
|
2022-03-21 15:19:17 +01:00
|
|
|
.stream()
|
|
|
|
.map(v -> v.compression().getType())
|
|
|
|
.toList());
|
2022-03-10 02:38:57 +01:00
|
|
|
} else {
|
2022-04-30 02:14:44 +02:00
|
|
|
columnFamilyOptions.setNumLevels(7);
|
|
|
|
List<CompressionType> compressionTypes = new ArrayList<>(7);
|
|
|
|
for (int i = 0; i < 7; i++) {
|
2022-03-22 11:50:30 +01:00
|
|
|
if (i < 2) {
|
|
|
|
compressionTypes.add(CompressionType.NO_COMPRESSION);
|
|
|
|
} else {
|
|
|
|
compressionTypes.add(CompressionType.LZ4_COMPRESSION);
|
|
|
|
}
|
|
|
|
}
|
2022-04-26 17:12:22 +02:00
|
|
|
columnFamilyOptions.setBottommostCompressionType(CompressionType.LZ4HC_COMPRESSION);
|
2022-05-22 16:48:08 +02:00
|
|
|
var compressionOptions = new CompressionOptions()
|
2022-03-21 15:19:17 +01:00
|
|
|
.setEnabled(true)
|
2022-05-22 16:48:08 +02:00
|
|
|
.setMaxDictBytes(32768);
|
|
|
|
refs.track(compressionOptions);
|
|
|
|
columnFamilyOptions.setBottommostCompressionOptions(compressionOptions);
|
2022-04-08 14:32:47 +02:00
|
|
|
columnFamilyOptions.setCompressionPerLevel(compressionTypes);
|
2022-03-10 02:38:57 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
final BlockBasedTableConfig tableOptions = new BlockBasedTableConfig();
|
2022-05-02 18:48:44 +02:00
|
|
|
if (!FOLLOW_ROCKSDB_OPTIMIZATIONS) {
|
|
|
|
if (!databaseOptions.lowMemory()) {
|
|
|
|
// tableOptions.setOptimizeFiltersForMemory(true);
|
|
|
|
columnFamilyOptions.setWriteBufferSize(256 * SizeUnit.MB);
|
|
|
|
}
|
|
|
|
if (columnOptions.writeBufferSize().isPresent()) {
|
|
|
|
columnFamilyOptions.setWriteBufferSize(columnOptions.writeBufferSize().get());
|
|
|
|
}
|
2022-03-10 02:38:57 +01:00
|
|
|
}
|
2022-04-08 14:32:47 +02:00
|
|
|
tableOptions.setVerifyCompression(false);
|
|
|
|
if (columnOptions.filter().isPresent()) {
|
|
|
|
var filterOptions = columnOptions.filter().get();
|
2022-04-06 14:53:08 +02:00
|
|
|
|
|
|
|
if (filterOptions instanceof it.cavallium.dbengine.rpc.current.data.BloomFilter bloomFilterOptions) {
|
|
|
|
// If OptimizeFiltersForHits == true: memory size = bitsPerKey * (totalKeys * 0.1)
|
|
|
|
// If OptimizeFiltersForHits == false: memory size = bitsPerKey * totalKeys
|
|
|
|
final BloomFilter bloomFilter = new BloomFilter(bloomFilterOptions.bitsPerKey());
|
2022-05-22 16:48:08 +02:00
|
|
|
refs.track(bloomFilter);
|
2022-04-06 14:53:08 +02:00
|
|
|
tableOptions.setFilterPolicy(bloomFilter);
|
|
|
|
}
|
|
|
|
}
|
2022-04-08 14:32:47 +02:00
|
|
|
boolean cacheIndexAndFilterBlocks = columnOptions.cacheIndexAndFilterBlocks()
|
2022-04-05 13:58:12 +02:00
|
|
|
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
|
|
|
|
.orElse(true);
|
2022-04-04 11:16:20 +02:00
|
|
|
if (databaseOptions.spinning()) {
|
2022-05-02 18:48:44 +02:00
|
|
|
if (!FOLLOW_ROCKSDB_OPTIMIZATIONS) {
|
|
|
|
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
|
|
|
|
// cacheIndexAndFilterBlocks = true;
|
|
|
|
// https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html
|
|
|
|
columnFamilyOptions.setMinWriteBufferNumberToMerge(3);
|
|
|
|
// https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html
|
|
|
|
columnFamilyOptions.setMaxWriteBufferNumber(4);
|
|
|
|
}
|
2022-04-04 11:16:20 +02:00
|
|
|
}
|
2022-03-12 02:55:18 +01:00
|
|
|
tableOptions
|
2022-04-19 23:23:32 +02:00
|
|
|
// http://rocksdb.org/blog/2018/08/23/data-block-hash-index.html
|
|
|
|
.setDataBlockIndexType(DataBlockIndexType.kDataBlockBinaryAndHash)
|
|
|
|
// http://rocksdb.org/blog/2018/08/23/data-block-hash-index.html
|
|
|
|
.setDataBlockHashTableUtilRatio(0.75)
|
2022-04-05 13:58:12 +02:00
|
|
|
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
|
2022-03-22 11:50:30 +01:00
|
|
|
.setPinTopLevelIndexAndFilter(true)
|
2022-04-05 13:58:12 +02:00
|
|
|
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
|
2022-03-22 11:50:30 +01:00
|
|
|
.setPinL0FilterAndIndexBlocksInCache(true)
|
2022-04-05 13:58:12 +02:00
|
|
|
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
|
2022-03-22 11:50:30 +01:00
|
|
|
.setCacheIndexAndFilterBlocksWithHighPriority(true)
|
2022-04-04 11:16:20 +02:00
|
|
|
.setCacheIndexAndFilterBlocks(cacheIndexAndFilterBlocks)
|
2022-04-11 01:27:09 +02:00
|
|
|
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
|
2022-04-15 02:41:06 +02:00
|
|
|
// Enabling partition filters increase the reads by 2x
|
2022-04-11 20:04:27 +02:00
|
|
|
.setPartitionFilters(columnOptions.partitionFilters().orElse(false))
|
2022-04-11 01:27:09 +02:00
|
|
|
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
|
2022-04-15 16:49:01 +02:00
|
|
|
.setIndexType(columnOptions.partitionFilters().orElse(false) ? IndexType.kTwoLevelIndexSearch : IndexType.kBinarySearch)
|
2022-05-28 14:34:35 +02:00
|
|
|
.setChecksumType(ChecksumType.kXXH3)
|
2022-04-04 10:27:38 +02:00
|
|
|
// Spinning disks: 64KiB to 256KiB (also 512KiB). SSDs: 16KiB
|
2022-04-04 11:16:20 +02:00
|
|
|
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
|
2022-04-09 02:45:42 +02:00
|
|
|
// https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html
|
2022-04-11 01:27:09 +02:00
|
|
|
.setBlockSize(columnOptions.blockSize().orElse((databaseOptions.spinning() ? 128 : 16) * 1024))
|
|
|
|
.setBlockCacheCompressed(optionsWithCache.compressedCache())
|
2022-04-15 02:41:06 +02:00
|
|
|
.setBlockCache(optionsWithCache.standardCache())
|
2022-05-22 16:48:08 +02:00
|
|
|
.setPersistentCache(resolvePersistentCache(persistentCaches,
|
|
|
|
rocksdbOptions,
|
|
|
|
databaseOptions.persistentCaches(),
|
|
|
|
columnOptions.persistentCacheId(),
|
2022-06-08 18:52:15 +02:00
|
|
|
refs,
|
|
|
|
rocksLogger
|
2022-05-22 16:48:08 +02:00
|
|
|
));
|
2022-03-10 02:38:57 +01:00
|
|
|
|
2022-04-08 14:32:47 +02:00
|
|
|
columnFamilyOptions.setTableFormatConfig(tableOptions);
|
|
|
|
columnFamilyOptions.setCompactionPriority(CompactionPriority.MinOverlappingRatio);
|
|
|
|
if (columnOptions.filter().isPresent()) {
|
|
|
|
var filterOptions = columnOptions.filter().get();
|
2022-04-06 14:53:08 +02:00
|
|
|
|
|
|
|
if (filterOptions instanceof it.cavallium.dbengine.rpc.current.data.BloomFilter bloomFilterOptions) {
|
|
|
|
boolean optimizeForHits = bloomFilterOptions.optimizeForHits()
|
|
|
|
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
|
|
|
|
// https://github.com/EighteenZi/rocksdb_wiki/blob/master/RocksDB-Tuning-Guide.md#throughput-gap-between-random-read-vs-sequential-read-is-much-higher-in-spinning-disks-suggestions=
|
|
|
|
.orElse(databaseOptions.spinning());
|
2022-04-08 14:32:47 +02:00
|
|
|
columnFamilyOptions.setOptimizeFiltersForHits(optimizeForHits);
|
2022-04-06 14:53:08 +02:00
|
|
|
}
|
2022-04-06 02:41:32 +02:00
|
|
|
}
|
2022-04-06 14:53:08 +02:00
|
|
|
|
2022-05-02 18:48:44 +02:00
|
|
|
if (!FOLLOW_ROCKSDB_OPTIMIZATIONS) {
|
|
|
|
// // Increasing this value can reduce the frequency of compaction and reduce write amplification,
|
|
|
|
// // but it will also cause old data to be unable to be cleaned up in time, thus increasing read amplification.
|
|
|
|
// // This parameter is not easy to adjust. It is generally not recommended to set it above 256MB.
|
|
|
|
// https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html
|
|
|
|
columnFamilyOptions.setTargetFileSizeBase(64 * SizeUnit.MB);
|
|
|
|
// // For each level up, the threshold is multiplied by the factor target_file_size_multiplier
|
|
|
|
// // (but the default value is 1, which means that the maximum sstable of each level is the same).
|
|
|
|
columnFamilyOptions.setTargetFileSizeMultiplier(2);
|
|
|
|
}
|
2022-03-10 02:38:57 +01:00
|
|
|
|
2022-04-08 14:32:47 +02:00
|
|
|
descriptors.add(new ColumnFamilyDescriptor(column.name().getBytes(StandardCharsets.US_ASCII), columnFamilyOptions));
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// Get databases directory path
|
2022-05-04 01:21:56 +02:00
|
|
|
requireNonNull(path);
|
2020-12-07 22:15:18 +01:00
|
|
|
Path databasesDirPath = path.toAbsolutePath().getParent();
|
|
|
|
String dbPathString = databasesDirPath.toString() + File.separatorChar + path.getFileName();
|
|
|
|
Path dbPath = Paths.get(dbPathString);
|
|
|
|
this.dbPath = dbPath;
|
2021-06-27 15:40:56 +02:00
|
|
|
|
|
|
|
// Set options
|
|
|
|
this.databaseOptions = databaseOptions;
|
|
|
|
|
|
|
|
int threadCap;
|
|
|
|
if (databaseOptions.lowMemory()) {
|
2022-03-20 16:14:31 +01:00
|
|
|
threadCap = Math.max(1, Runtime.getRuntime().availableProcessors());
|
|
|
|
|
2022-04-05 13:58:12 +02:00
|
|
|
this.dbWScheduler = Schedulers.boundedElastic();
|
|
|
|
this.dbRScheduler = Schedulers.boundedElastic();
|
2021-06-27 16:33:23 +02:00
|
|
|
} else {
|
|
|
|
// 8 or more
|
2022-04-07 20:03:29 +02:00
|
|
|
threadCap = Math.max(8, Runtime.getRuntime().availableProcessors());
|
|
|
|
{
|
|
|
|
var threadCapProperty = Integer.parseInt(System.getProperty("it.cavallium.dbengine.scheduler.write.threads", "0"));
|
|
|
|
if (threadCapProperty > 1) {
|
|
|
|
threadCap = threadCapProperty;
|
|
|
|
}
|
|
|
|
}
|
2022-06-09 00:49:08 +02:00
|
|
|
if (parseBoolean(System.getProperty("it.cavallium.dbengine.scheduler.write.shared", "true"))) {
|
2022-04-05 13:58:12 +02:00
|
|
|
this.dbWScheduler = Schedulers.boundedElastic();
|
2022-03-21 15:19:17 +01:00
|
|
|
} else {
|
2022-04-05 13:58:12 +02:00
|
|
|
this.dbWScheduler = Schedulers.newBoundedElastic(threadCap,
|
|
|
|
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
|
|
|
|
new ShortNamedThreadFactory("db-write-" + name).setDaemon(true).withGroup(new ThreadGroup("database-write")),
|
|
|
|
60
|
|
|
|
);
|
2022-04-07 20:03:29 +02:00
|
|
|
}
|
|
|
|
// 8 or more
|
|
|
|
threadCap = Math.max(8, Runtime.getRuntime().availableProcessors());
|
|
|
|
{
|
|
|
|
var threadCapProperty = Integer.parseInt(System.getProperty("it.cavallium.dbengine.scheduler.read.threads", "0"));
|
|
|
|
if (threadCapProperty > 1) {
|
|
|
|
threadCap = threadCapProperty;
|
|
|
|
}
|
|
|
|
}
|
2022-06-09 00:49:08 +02:00
|
|
|
if (parseBoolean(System.getProperty("it.cavallium.dbengine.scheduler.read.shared", "true"))) {
|
2022-04-07 20:03:29 +02:00
|
|
|
this.dbRScheduler = Schedulers.boundedElastic();
|
|
|
|
} else {
|
2022-04-05 13:58:12 +02:00
|
|
|
this.dbRScheduler = Schedulers.newBoundedElastic(threadCap,
|
2022-03-21 15:19:17 +01:00
|
|
|
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
|
2022-05-28 14:34:35 +02:00
|
|
|
new ShortNamedThreadFactory("db-read-" + name).setDaemon(true).withGroup(new ThreadGroup("database-read")),
|
2022-04-04 17:52:49 +02:00
|
|
|
60
|
2022-03-21 15:19:17 +01:00
|
|
|
);
|
|
|
|
}
|
2021-06-27 15:40:56 +02:00
|
|
|
}
|
2020-12-07 22:15:18 +01:00
|
|
|
|
2022-05-09 22:08:54 +02:00
|
|
|
var statsLevel = System.getProperty("it.cavallium.dbengine.stats.level");
|
|
|
|
if (statsLevel != null) {
|
|
|
|
this.statistics = registerStatistics(name, rocksdbOptions, meterRegistry, StatsLevel.valueOf(statsLevel));
|
|
|
|
} else {
|
|
|
|
this.statistics = null;
|
|
|
|
}
|
|
|
|
|
2021-06-25 23:47:53 +02:00
|
|
|
while (true) {
|
|
|
|
try {
|
|
|
|
// a factory method that returns a RocksDB instance
|
2021-12-27 17:45:52 +01:00
|
|
|
if (databaseOptions.optimistic()) {
|
2022-03-10 02:38:57 +01:00
|
|
|
this.db = OptimisticTransactionDB.open(rocksdbOptions, dbPathString, descriptors, handles);
|
2021-12-27 17:45:52 +01:00
|
|
|
} else {
|
2022-05-22 16:48:08 +02:00
|
|
|
var transactionOptions = new TransactionDBOptions()
|
|
|
|
.setWritePolicy(TxnDBWritePolicy.WRITE_COMMITTED)
|
|
|
|
.setTransactionLockTimeout(5000)
|
|
|
|
.setDefaultLockTimeout(5000);
|
|
|
|
refs.track(transactionOptions);
|
2022-03-10 02:38:57 +01:00
|
|
|
this.db = TransactionDB.open(rocksdbOptions,
|
2022-05-22 16:48:08 +02:00
|
|
|
transactionOptions,
|
2021-12-27 18:44:54 +01:00
|
|
|
dbPathString,
|
|
|
|
descriptors,
|
|
|
|
handles
|
|
|
|
);
|
2021-12-27 17:45:52 +01:00
|
|
|
}
|
2022-04-11 01:27:09 +02:00
|
|
|
this.standardCache = optionsWithCache.standardCache;
|
|
|
|
this.compressedCache = optionsWithCache.compressedCache;
|
2021-06-25 23:47:53 +02:00
|
|
|
break;
|
|
|
|
} catch (RocksDBException ex) {
|
|
|
|
switch (ex.getMessage()) {
|
|
|
|
case "Direct I/O is not supported by the specified DB." -> {
|
|
|
|
logger.warn(ex.getLocalizedMessage());
|
2021-06-27 15:40:56 +02:00
|
|
|
rocksdbOptions
|
2021-06-25 23:47:53 +02:00
|
|
|
.setUseDirectReads(false)
|
2021-06-27 15:06:48 +02:00
|
|
|
.setUseDirectIoForFlushAndCompaction(false)
|
2021-06-27 15:40:56 +02:00
|
|
|
.setAllowMmapReads(databaseOptions.allowMemoryMapping())
|
|
|
|
.setAllowMmapWrites(databaseOptions.allowMemoryMapping());
|
2021-06-25 23:47:53 +02:00
|
|
|
}
|
|
|
|
default -> throw ex;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2020-12-07 22:15:18 +01:00
|
|
|
this.handles = new HashMap<>();
|
2022-03-02 12:34:30 +01:00
|
|
|
if (enableColumnsBug && !inMemory) {
|
2021-06-19 16:26:54 +02:00
|
|
|
for (int i = 0; i < columns.size(); i++) {
|
2022-05-20 10:20:00 +02:00
|
|
|
this.handles.put(columns.get(i), handles.get(i));
|
2021-06-19 16:26:54 +02:00
|
|
|
}
|
|
|
|
} else {
|
|
|
|
handles: for (ColumnFamilyHandle handle : handles) {
|
|
|
|
for (Column column : columns) {
|
|
|
|
if (Arrays.equals(column.name().getBytes(StandardCharsets.US_ASCII), handle.getName())) {
|
2022-05-20 10:20:00 +02:00
|
|
|
this.handles.put(column, handle);
|
2021-06-19 16:26:54 +02:00
|
|
|
continue handles;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
2022-05-22 16:48:08 +02:00
|
|
|
handles.forEach(refs::track);
|
|
|
|
|
2021-03-21 13:06:54 +01:00
|
|
|
// compactDb(db, handles);
|
2020-12-07 22:15:18 +01:00
|
|
|
flushDb(db, handles);
|
|
|
|
} catch (RocksDBException ex) {
|
|
|
|
throw new IOException(ex);
|
|
|
|
}
|
2022-01-15 20:00:10 +01:00
|
|
|
|
2022-04-11 01:27:09 +02:00
|
|
|
try {
|
|
|
|
for (ColumnFamilyHandle cfh : handles) {
|
|
|
|
var props = db.getProperty(cfh, "rocksdb.stats");
|
|
|
|
logger.trace("Stats for database {}, column {}: {}",
|
|
|
|
name,
|
|
|
|
new String(cfh.getName(), StandardCharsets.UTF_8),
|
|
|
|
props
|
|
|
|
);
|
|
|
|
}
|
|
|
|
} catch (RocksDBException ex) {
|
|
|
|
logger.debug("Failed to obtain stats", ex);
|
|
|
|
}
|
|
|
|
|
2022-05-04 01:21:56 +02:00
|
|
|
for (RocksDBLongProperty property : RocksDBLongProperty.values()) {
|
|
|
|
registerGauge(meterRegistry, name, property.getName(), property.isDividedByColumnFamily());
|
|
|
|
}
|
2022-04-07 20:03:29 +02:00
|
|
|
// Bloom seek stats
|
2022-05-04 01:21:56 +02:00
|
|
|
registerGauge(meterRegistry, name, "rocksdb.bloom.filter.prefix.useful", true);
|
|
|
|
registerGauge(meterRegistry, name, "rocksdb.bloom.filter.prefix.checked", true);
|
2022-04-07 20:03:29 +02:00
|
|
|
// Bloom point lookup stats
|
2022-05-04 01:21:56 +02:00
|
|
|
registerGauge(meterRegistry, name, "rocksdb.bloom.filter.useful", true);
|
|
|
|
registerGauge(meterRegistry, name, "rocksdb.bloom.filter.full.positive", true);
|
|
|
|
registerGauge(meterRegistry, name, "rocksdb.bloom.filter.full.true.positive", true);
|
2022-01-15 20:00:10 +01:00
|
|
|
}
|
|
|
|
|
2022-04-28 23:23:26 +02:00
|
|
|
public static boolean isDisableAutoCompactions() {
|
2022-06-09 00:49:08 +02:00
|
|
|
return parseBoolean(System.getProperty("it.cavallium.dbengine.compactions.auto.disable", "false"));
|
2022-04-28 11:35:01 +02:00
|
|
|
}
|
|
|
|
|
2022-04-28 23:23:26 +02:00
|
|
|
public static boolean isDisableSlowdown() {
|
|
|
|
return isDisableAutoCompactions()
|
2022-06-09 00:49:08 +02:00
|
|
|
|| parseBoolean(System.getProperty("it.cavallium.dbengine.disableslowdown", "false"));
|
2022-04-28 23:23:26 +02:00
|
|
|
}
|
|
|
|
|
2022-04-30 14:21:20 +02:00
|
|
|
protected void ensureOpen() {
|
|
|
|
if (closed) {
|
|
|
|
throw new IllegalStateException("Database closed");
|
|
|
|
}
|
|
|
|
RocksDBUtils.ensureOpen(db, null);
|
|
|
|
}
|
|
|
|
|
2022-05-20 10:20:00 +02:00
|
|
|
protected void ensureOwned(AbstractImmutableNativeReference rocksObject) {
|
2022-05-12 19:14:27 +02:00
|
|
|
RocksDBUtils.ensureOwned(rocksObject);
|
|
|
|
}
|
|
|
|
|
2022-04-15 02:41:06 +02:00
|
|
|
private synchronized PersistentCache resolvePersistentCache(HashMap<String, PersistentCache> caches,
|
|
|
|
DBOptions rocksdbOptions,
|
|
|
|
List<it.cavallium.dbengine.rpc.current.data.PersistentCache> persistentCaches,
|
2022-05-22 16:48:08 +02:00
|
|
|
NullableString persistentCacheId,
|
2022-06-08 18:52:15 +02:00
|
|
|
RocksDBRefs refs,
|
|
|
|
RocksLog4jLogger rocksLogger) throws RocksDBException {
|
2022-04-15 02:41:06 +02:00
|
|
|
if (persistentCacheId.isEmpty()) {
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
var existingPersistentCache = caches.get(persistentCacheId.get());
|
|
|
|
if (existingPersistentCache != null) {
|
|
|
|
return existingPersistentCache;
|
|
|
|
}
|
|
|
|
|
|
|
|
var foundCaches = persistentCaches
|
|
|
|
.stream()
|
|
|
|
.filter(cache -> cache.id().equals(persistentCacheId.get()))
|
|
|
|
.toList();
|
|
|
|
if (foundCaches.size() > 1) {
|
|
|
|
throw new IllegalArgumentException("There are " + foundCaches.size()
|
|
|
|
+ " defined persistent caches with the id \"" + persistentCacheId.get() + "\"");
|
|
|
|
}
|
|
|
|
for (it.cavallium.dbengine.rpc.current.data.PersistentCache foundCache : foundCaches) {
|
|
|
|
var persistentCache = new PersistentCache(Env.getDefault(),
|
|
|
|
foundCache.path(),
|
|
|
|
foundCache.size(),
|
2022-06-08 18:52:15 +02:00
|
|
|
rocksLogger,
|
2022-04-15 02:41:06 +02:00
|
|
|
foundCache.optimizeForNvm()
|
|
|
|
);
|
2022-05-22 16:48:08 +02:00
|
|
|
refs.track(persistentCache);
|
2022-04-15 02:41:06 +02:00
|
|
|
var prev = caches.put(persistentCacheId.get(), persistentCache);
|
|
|
|
if (prev != null) {
|
|
|
|
throw new IllegalStateException();
|
|
|
|
}
|
|
|
|
return persistentCache;
|
|
|
|
}
|
|
|
|
throw new IllegalArgumentException("Persistent cache " + persistentCacheId.get() + " is not defined");
|
|
|
|
}
|
|
|
|
|
2022-05-20 10:20:00 +02:00
|
|
|
public Map<Column, ColumnFamilyHandle> getAllColumnFamilyHandles() {
|
2022-03-22 12:59:22 +01:00
|
|
|
return this.handles;
|
|
|
|
}
|
|
|
|
|
2022-04-26 17:12:22 +02:00
|
|
|
public int getLastVolumeId() {
|
|
|
|
var paths = convertPaths(dbPath.toAbsolutePath().getParent(), dbPath.getFileName(), databaseOptions.volumes());
|
|
|
|
return paths.size() - 1;
|
|
|
|
}
|
|
|
|
|
2022-04-30 02:14:44 +02:00
|
|
|
public int getLevels(Column column) {
|
2022-04-30 14:21:20 +02:00
|
|
|
var closeReadLock = closeLock.readLock();
|
|
|
|
try {
|
|
|
|
ensureOpen();
|
|
|
|
var cfh = handles.get(column);
|
|
|
|
ensureOwned(cfh);
|
2022-05-20 10:20:00 +02:00
|
|
|
return RocksDBUtils.getLevels(db, cfh);
|
2022-04-30 14:21:20 +02:00
|
|
|
} finally {
|
|
|
|
closeLock.unlockRead(closeReadLock);
|
|
|
|
}
|
2022-04-30 01:49:44 +02:00
|
|
|
}
|
2022-04-28 23:23:26 +02:00
|
|
|
|
2022-04-30 01:49:44 +02:00
|
|
|
public List<String> getColumnFiles(Column column, boolean excludeLastLevel) {
|
2022-04-30 14:21:20 +02:00
|
|
|
var closeReadLock = closeLock.readLock();
|
|
|
|
try {
|
|
|
|
ensureOpen();
|
|
|
|
var cfh = handles.get(column);
|
|
|
|
ensureOwned(cfh);
|
2022-05-20 10:20:00 +02:00
|
|
|
return RocksDBUtils.getColumnFiles(db, cfh, excludeLastLevel);
|
2022-04-30 14:21:20 +02:00
|
|
|
} finally {
|
|
|
|
closeLock.unlockRead(closeReadLock);
|
|
|
|
}
|
2022-04-30 01:49:44 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
public void forceCompaction(int volumeId) throws RocksDBException {
|
2022-04-30 14:21:20 +02:00
|
|
|
var closeReadLock = closeLock.readLock();
|
|
|
|
try {
|
|
|
|
ensureOpen();
|
|
|
|
for (var cfh : this.handles.values()) {
|
|
|
|
ensureOwned(cfh);
|
2022-05-20 10:20:00 +02:00
|
|
|
RocksDBUtils.forceCompaction(db, name, cfh, volumeId, logger);
|
2022-04-30 14:21:20 +02:00
|
|
|
}
|
|
|
|
} finally {
|
|
|
|
closeLock.unlockRead(closeReadLock);
|
2022-04-26 17:12:22 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
public void flush(FlushOptions flushOptions) throws RocksDBException {
|
2022-04-30 14:21:20 +02:00
|
|
|
var closeReadLock = closeLock.readLock();
|
|
|
|
try {
|
|
|
|
ensureOpen();
|
|
|
|
ensureOwned(flushOptions);
|
|
|
|
db.flush(flushOptions);
|
|
|
|
} finally {
|
|
|
|
closeLock.unlockRead(closeReadLock);
|
|
|
|
}
|
2022-04-26 17:12:22 +02:00
|
|
|
}
|
|
|
|
|
2022-03-22 11:50:30 +01:00
|
|
|
private record RocksLevelOptions(CompressionType compressionType, CompressionOptions compressionOptions) {}
|
2022-05-22 16:48:08 +02:00
|
|
|
private RocksLevelOptions getRocksLevelOptions(DatabaseLevel levelOptions, RocksDBRefs refs) {
|
2022-03-22 11:50:30 +01:00
|
|
|
var compressionType = levelOptions.compression().getType();
|
|
|
|
var compressionOptions = new CompressionOptions();
|
2022-05-22 16:48:08 +02:00
|
|
|
refs.track(compressionOptions);
|
2022-03-22 11:50:30 +01:00
|
|
|
if (compressionType != CompressionType.NO_COMPRESSION) {
|
|
|
|
compressionOptions.setEnabled(true);
|
|
|
|
compressionOptions.setMaxDictBytes(levelOptions.maxDictBytes());
|
|
|
|
} else {
|
|
|
|
compressionOptions.setEnabled(false);
|
|
|
|
}
|
|
|
|
return new RocksLevelOptions(compressionType, compressionOptions);
|
|
|
|
}
|
|
|
|
|
2022-04-11 01:27:09 +02:00
|
|
|
private void registerGauge(MeterRegistry meterRegistry, String name, String propertyName, boolean divideByAllColumns) {
|
2022-05-04 01:21:56 +02:00
|
|
|
if (divideByAllColumns) {
|
2022-05-12 19:14:27 +02:00
|
|
|
for (var cfhEntry : handles.entrySet()) {
|
2022-05-04 01:21:56 +02:00
|
|
|
var columnName = cfhEntry.getKey().name();
|
|
|
|
var cfh = cfhEntry.getValue();
|
|
|
|
meterRegistry.gauge("rocksdb.property.value",
|
|
|
|
List.of(Tag.of("db.name", name), Tag.of("db.column.name", columnName), Tag.of("db.property.name", propertyName)),
|
|
|
|
db,
|
|
|
|
database -> {
|
|
|
|
if (closed) {
|
|
|
|
return 0d;
|
|
|
|
}
|
|
|
|
var closeReadLock = closeLock.readLock();
|
|
|
|
try {
|
|
|
|
if (closed) {
|
|
|
|
return 0d;
|
|
|
|
}
|
2022-05-20 10:20:00 +02:00
|
|
|
return database.getLongProperty(cfh, propertyName);
|
2022-05-04 01:21:56 +02:00
|
|
|
} catch (RocksDBException e) {
|
|
|
|
if ("NotFound".equals(e.getMessage())) {
|
|
|
|
return 0d;
|
|
|
|
}
|
|
|
|
throw new RuntimeException(e);
|
|
|
|
} finally {
|
|
|
|
closeLock.unlockRead(closeReadLock);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
);
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
meterRegistry.gauge("rocksdb.property.value",
|
|
|
|
List.of(Tag.of("db.name", name), Tag.of("db.property.name", propertyName)),
|
|
|
|
db,
|
|
|
|
database -> {
|
2022-04-30 14:21:20 +02:00
|
|
|
if (closed) {
|
|
|
|
return 0d;
|
|
|
|
}
|
2022-05-04 01:21:56 +02:00
|
|
|
var closeReadLock = closeLock.readLock();
|
|
|
|
try {
|
|
|
|
if (closed) {
|
|
|
|
return 0d;
|
|
|
|
}
|
2022-05-04 12:36:32 +02:00
|
|
|
return database.getAggregatedLongProperty(propertyName) / (double) handles.size();
|
2022-05-04 01:21:56 +02:00
|
|
|
} catch (RocksDBException e) {
|
|
|
|
if ("NotFound".equals(e.getMessage())) {
|
|
|
|
return 0d;
|
|
|
|
}
|
|
|
|
throw new RuntimeException(e);
|
|
|
|
} finally {
|
|
|
|
closeLock.unlockRead(closeReadLock);
|
2022-04-07 22:19:11 +02:00
|
|
|
}
|
2022-01-15 20:00:10 +01:00
|
|
|
}
|
2022-05-04 01:21:56 +02:00
|
|
|
);
|
|
|
|
}
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public String getDatabaseName() {
|
|
|
|
return name;
|
|
|
|
}
|
|
|
|
|
2022-04-30 01:49:44 +02:00
|
|
|
public StampedLock getCloseLock() {
|
|
|
|
return closeLock;
|
2022-04-28 23:23:26 +02:00
|
|
|
}
|
|
|
|
|
2022-04-11 01:27:09 +02:00
|
|
|
private void flushAndCloseDb(RocksDB db, Cache standardCache, Cache compressedCache, List<ColumnFamilyHandle> handles)
|
2020-12-07 22:15:18 +01:00
|
|
|
throws RocksDBException {
|
2022-04-30 01:49:44 +02:00
|
|
|
var closeWriteLock = closeLock.writeLock();
|
2022-04-15 02:41:06 +02:00
|
|
|
try {
|
|
|
|
if (closed) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
closed = true;
|
|
|
|
if (db.isOwningHandle()) {
|
2022-04-26 17:12:22 +02:00
|
|
|
//flushDb(db, handles);
|
2022-04-15 02:41:06 +02:00
|
|
|
}
|
2020-12-07 22:15:18 +01:00
|
|
|
|
2022-04-15 02:41:06 +02:00
|
|
|
snapshotsHandles.forEach((id, snapshot) -> {
|
|
|
|
try {
|
2022-04-20 23:29:39 +02:00
|
|
|
if (db.isOwningHandle()) {
|
2022-05-20 10:20:00 +02:00
|
|
|
db.releaseSnapshot(snapshot);
|
2022-05-12 19:14:27 +02:00
|
|
|
snapshot.close();
|
2022-04-15 02:41:06 +02:00
|
|
|
}
|
|
|
|
} catch (Exception ex2) {
|
|
|
|
// ignore exception
|
|
|
|
logger.debug("Failed to release snapshot " + id, ex2);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
snapshotsHandles.clear();
|
2021-07-06 22:27:03 +02:00
|
|
|
try {
|
2022-04-15 02:41:06 +02:00
|
|
|
db.closeE();
|
2021-07-06 22:27:03 +02:00
|
|
|
} catch (Exception ex) {
|
2022-04-15 02:41:06 +02:00
|
|
|
logger.error("Can't close database " + name + " at " + dbPath, ex);
|
2021-07-06 22:27:03 +02:00
|
|
|
}
|
2022-05-01 15:35:12 +02:00
|
|
|
for (ColumnFamilyHandle handle : handles) {
|
|
|
|
try {
|
|
|
|
handle.close();
|
|
|
|
} catch (Exception ex) {
|
|
|
|
logger.error("Can't close column family", ex);
|
|
|
|
}
|
|
|
|
}
|
2022-04-15 02:41:06 +02:00
|
|
|
if (compressedCache != null) {
|
|
|
|
compressedCache.close();
|
|
|
|
}
|
|
|
|
if (standardCache != null) {
|
|
|
|
standardCache.close();
|
|
|
|
}
|
|
|
|
|
|
|
|
for (PersistentCache persistentCache : persistentCaches.values()) {
|
|
|
|
try {
|
|
|
|
persistentCache.close();
|
|
|
|
} catch (Exception ex) {
|
|
|
|
logger.error("Can't close persistent cache", ex);
|
2022-04-09 16:31:32 +02:00
|
|
|
}
|
2021-07-01 21:19:52 +02:00
|
|
|
}
|
2022-05-22 16:48:08 +02:00
|
|
|
refs.close();
|
2022-04-15 02:41:06 +02:00
|
|
|
} finally {
|
2022-04-30 01:49:44 +02:00
|
|
|
closeLock.unlockWrite(closeWriteLock);
|
2022-04-11 20:04:27 +02:00
|
|
|
}
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
2021-10-20 01:51:34 +02:00
|
|
|
private void flushDb(RocksDB db, List<ColumnFamilyHandle> handles) throws RocksDBException {
|
2021-09-05 14:23:46 +02:00
|
|
|
if (Schedulers.isInNonBlockingThread()) {
|
|
|
|
logger.error("Called flushDb in a nonblocking thread");
|
|
|
|
}
|
2020-12-07 22:15:18 +01:00
|
|
|
// force flush the database
|
2021-07-06 22:27:03 +02:00
|
|
|
try (var flushOptions = new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true)) {
|
|
|
|
db.flush(flushOptions);
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
2021-07-06 22:27:03 +02:00
|
|
|
try (var flushOptions = new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true)) {
|
|
|
|
db.flush(flushOptions, handles);
|
|
|
|
}
|
|
|
|
db.flushWal(true);
|
|
|
|
db.syncWal();
|
2020-12-07 22:15:18 +01:00
|
|
|
// end force flush
|
|
|
|
}
|
|
|
|
|
2021-04-03 19:09:06 +02:00
|
|
|
@SuppressWarnings("unused")
|
2021-10-20 01:51:34 +02:00
|
|
|
private void compactDb(TransactionDB db, List<ColumnFamilyHandle> handles) {
|
2021-09-05 14:23:46 +02:00
|
|
|
if (Schedulers.isInNonBlockingThread()) {
|
|
|
|
logger.error("Called compactDb in a nonblocking thread");
|
|
|
|
}
|
2021-03-19 20:55:38 +01:00
|
|
|
// force compact the database
|
|
|
|
for (ColumnFamilyHandle cfh : handles) {
|
|
|
|
var t = new Thread(() -> {
|
2021-03-21 13:06:54 +01:00
|
|
|
int r = ThreadLocalRandom.current().nextInt();
|
|
|
|
var s = StopWatch.createStarted();
|
2021-03-19 20:55:38 +01:00
|
|
|
try {
|
|
|
|
// Range rangeToCompact = db.suggestCompactRange(cfh);
|
2021-03-21 13:06:54 +01:00
|
|
|
logger.info("Compacting range {}", r);
|
2022-05-11 00:29:42 +02:00
|
|
|
try (var cro = new CompactRangeOptions()
|
2021-03-21 13:06:54 +01:00
|
|
|
.setAllowWriteStall(true)
|
|
|
|
.setExclusiveManualCompaction(true)
|
2022-05-11 00:29:42 +02:00
|
|
|
.setChangeLevel(false)) {
|
|
|
|
db.compactRange(cfh, null, null, cro);
|
|
|
|
}
|
2021-03-19 20:55:38 +01:00
|
|
|
} catch (RocksDBException e) {
|
|
|
|
if ("Database shutdown".equalsIgnoreCase(e.getMessage())) {
|
|
|
|
logger.warn("Compaction cancelled: database shutdown");
|
|
|
|
} else {
|
|
|
|
logger.warn("Failed to compact range", e);
|
|
|
|
}
|
|
|
|
}
|
2021-03-21 13:06:54 +01:00
|
|
|
logger.info("Compacted range {} in {} milliseconds", r, s.getTime(TimeUnit.MILLISECONDS));
|
2021-03-19 20:55:38 +01:00
|
|
|
}, "Compaction");
|
|
|
|
t.setDaemon(true);
|
|
|
|
t.start();
|
|
|
|
}
|
|
|
|
// end force compact
|
|
|
|
}
|
|
|
|
|
2022-03-22 11:50:30 +01:00
|
|
|
record OptionsWithCache(DBOptions options, @Nullable Cache standardCache, @Nullable Cache compressedCache) {}
|
2022-03-10 02:38:57 +01:00
|
|
|
|
2022-05-22 16:48:08 +02:00
|
|
|
private static OptionsWithCache openRocksDb(@Nullable Path path, DatabaseOptions databaseOptions, RocksDBRefs refs)
|
|
|
|
throws IOException {
|
2020-12-07 22:15:18 +01:00
|
|
|
// Get databases directory path
|
2021-07-10 20:52:01 +02:00
|
|
|
Path databasesDirPath;
|
|
|
|
if (path != null) {
|
|
|
|
databasesDirPath = path.toAbsolutePath().getParent();
|
|
|
|
// Create base directories
|
|
|
|
if (Files.notExists(databasesDirPath)) {
|
|
|
|
Files.createDirectories(databasesDirPath);
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
databasesDirPath = null;
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
2022-06-09 16:47:42 +02:00
|
|
|
//noinspection ConstantConditions
|
|
|
|
if (databaseOptions.persistentCaches() != null) {
|
|
|
|
for (var persistentCache : databaseOptions.persistentCaches()) {
|
|
|
|
var persistentCachePath = Paths.get(persistentCache.path());
|
|
|
|
if (Files.notExists(persistentCachePath)) {
|
|
|
|
Files.createDirectories(persistentCachePath);
|
|
|
|
if (!Files.isDirectory(persistentCachePath)) {
|
|
|
|
throw new IllegalArgumentException(
|
|
|
|
"Persistent cache \"" + persistentCache.id() + "\" path \"" + persistentCachePath
|
|
|
|
+ "\" is not a directory!");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2020-12-07 22:15:18 +01:00
|
|
|
|
|
|
|
// the Options class contains a set of configurable DB options
|
|
|
|
// that determines the behaviour of the database.
|
2022-03-10 02:38:57 +01:00
|
|
|
var options = new DBOptions();
|
2022-05-22 16:48:08 +02:00
|
|
|
refs.track(options);
|
2022-03-22 11:50:30 +01:00
|
|
|
options.setEnablePipelinedWrite(true);
|
2022-05-03 19:47:32 +02:00
|
|
|
var maxSubCompactions = Integer.parseInt(System.getProperty("it.cavallium.dbengine.compactions.max.sub", "-1"));
|
|
|
|
if (maxSubCompactions >= 0) {
|
|
|
|
options.setMaxSubcompactions(maxSubCompactions);
|
|
|
|
}
|
2022-03-22 11:50:30 +01:00
|
|
|
var customWriteRate = Long.parseLong(System.getProperty("it.cavallium.dbengine.write.delayedrate", "-1"));
|
|
|
|
if (customWriteRate >= 0) {
|
|
|
|
options.setDelayedWriteRate(customWriteRate);
|
|
|
|
}
|
2022-05-02 00:42:38 +02:00
|
|
|
if (databaseOptions.logPath().isPresent()) {
|
|
|
|
options.setDbLogDir(databaseOptions.logPath().get());
|
|
|
|
}
|
|
|
|
if (databaseOptions.walPath().isPresent()) {
|
|
|
|
options.setWalDir(databaseOptions.walPath().get());
|
|
|
|
}
|
2020-12-07 22:15:18 +01:00
|
|
|
options.setCreateIfMissing(true);
|
2022-03-22 00:23:32 +01:00
|
|
|
options.setSkipStatsUpdateOnDbOpen(true);
|
2021-07-06 22:27:03 +02:00
|
|
|
options.setCreateMissingColumnFamilies(true);
|
2022-05-10 00:31:16 +02:00
|
|
|
options.setInfoLogLevel(InfoLogLevel.WARN_LEVEL);
|
2022-03-22 00:23:32 +01:00
|
|
|
options.setAvoidFlushDuringShutdown(false); // Flush all WALs during shutdown
|
2022-03-21 15:19:17 +01:00
|
|
|
options.setAvoidFlushDuringRecovery(true); // Flush all WALs during startup
|
2021-06-27 15:40:56 +02:00
|
|
|
options.setWalRecoveryMode(databaseOptions.absoluteConsistency()
|
2021-02-01 11:00:27 +01:00
|
|
|
? WALRecoveryMode.AbsoluteConsistency
|
|
|
|
: WALRecoveryMode.PointInTimeRecovery); // Crash if the WALs are corrupted.Default: TolerateCorruptedTailRecords
|
2020-12-07 22:15:18 +01:00
|
|
|
options.setDeleteObsoleteFilesPeriodMicros(20 * 1000000); // 20 seconds
|
|
|
|
options.setKeepLogFileNum(10);
|
2021-07-10 20:52:01 +02:00
|
|
|
|
2022-05-04 01:21:56 +02:00
|
|
|
requireNonNull(databasesDirPath);
|
|
|
|
requireNonNull(path.getFileName());
|
2022-04-26 17:12:22 +02:00
|
|
|
List<DbPath> paths = convertPaths(databasesDirPath, path.getFileName(), databaseOptions.volumes())
|
|
|
|
.stream()
|
|
|
|
.map(p -> new DbPath(p.path, p.targetSize))
|
|
|
|
.toList();
|
2021-06-09 02:56:53 +02:00
|
|
|
options.setDbPaths(paths);
|
2022-03-02 12:34:30 +01:00
|
|
|
options.setMaxOpenFiles(databaseOptions.maxOpenFiles().orElse(-1));
|
2022-04-09 02:45:42 +02:00
|
|
|
if (databaseOptions.spinning()) {
|
|
|
|
// https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html
|
|
|
|
options.setUseFsync(false);
|
|
|
|
}
|
2022-03-10 02:38:57 +01:00
|
|
|
|
2022-04-11 16:53:17 +02:00
|
|
|
long writeBufferManagerSize;
|
|
|
|
if (databaseOptions.writeBufferManager().isPresent()) {
|
|
|
|
writeBufferManagerSize = databaseOptions.writeBufferManager().get();
|
|
|
|
} else {
|
|
|
|
writeBufferManagerSize = 0;
|
|
|
|
}
|
|
|
|
|
2022-04-28 23:23:26 +02:00
|
|
|
if (isDisableAutoCompactions()) {
|
|
|
|
options.setMaxBackgroundCompactions(0);
|
|
|
|
options.setMaxBackgroundJobs(0);
|
|
|
|
} else {
|
2022-05-02 18:48:44 +02:00
|
|
|
var backgroundJobs = Integer.parseInt(System.getProperty("it.cavallium.dbengine.jobs.background.num", "-1"));
|
|
|
|
if (backgroundJobs >= 0) {
|
|
|
|
options.setMaxBackgroundJobs(backgroundJobs);
|
|
|
|
}
|
2022-04-28 23:23:26 +02:00
|
|
|
}
|
|
|
|
|
2022-01-12 16:18:31 +01:00
|
|
|
Cache blockCache;
|
2022-03-12 02:55:18 +01:00
|
|
|
Cache compressedCache;
|
2022-06-09 00:49:08 +02:00
|
|
|
final boolean useDirectIO = databaseOptions.useDirectIO();
|
|
|
|
final boolean allowMmapReads = !useDirectIO && databaseOptions.allowMemoryMapping();
|
|
|
|
final boolean allowMmapWrites = !useDirectIO && (databaseOptions.allowMemoryMapping()
|
|
|
|
|| parseBoolean(System.getProperty("it.cavallium.dbengine.mmapwrites.enable", "false")));
|
2021-06-27 15:40:56 +02:00
|
|
|
if (databaseOptions.lowMemory()) {
|
2020-12-07 22:15:18 +01:00
|
|
|
// LOW MEMORY
|
|
|
|
options
|
2021-05-18 01:10:30 +02:00
|
|
|
.setBytesPerSync(0) // default
|
|
|
|
.setWalBytesPerSync(0) // default
|
2020-12-07 22:15:18 +01:00
|
|
|
.setIncreaseParallelism(1)
|
2022-03-10 02:38:57 +01:00
|
|
|
.setDbWriteBufferSize(8 * SizeUnit.MB)
|
2021-05-18 01:10:30 +02:00
|
|
|
.setWalTtlSeconds(0)
|
2022-03-30 23:44:55 +02:00
|
|
|
.setWalSizeLimitMB(0)
|
2021-05-18 01:10:30 +02:00
|
|
|
.setMaxTotalWalSize(0) // automatic
|
|
|
|
;
|
2022-04-11 01:27:09 +02:00
|
|
|
// DO NOT USE ClockCache! IT'S BROKEN!
|
2022-04-11 16:53:17 +02:00
|
|
|
blockCache = new LRUCache(writeBufferManagerSize + databaseOptions.blockCache().orElse( 8L * SizeUnit.MB));
|
2022-05-22 16:48:08 +02:00
|
|
|
refs.track(blockCache);
|
2022-04-11 16:53:17 +02:00
|
|
|
if (databaseOptions.compressedBlockCache().isPresent()) {
|
|
|
|
compressedCache = new LRUCache(databaseOptions.compressedBlockCache().get());
|
2022-05-22 16:48:08 +02:00
|
|
|
refs.track(compressedCache);
|
2022-04-11 16:53:17 +02:00
|
|
|
} else {
|
|
|
|
compressedCache = null;
|
|
|
|
}
|
2021-07-17 11:52:08 +02:00
|
|
|
|
2022-06-09 00:49:08 +02:00
|
|
|
if (useDirectIO) {
|
2021-07-17 11:52:08 +02:00
|
|
|
options
|
|
|
|
// Option to enable readahead in compaction
|
|
|
|
// If not set, it will be set to 2MB internally
|
2022-04-05 13:58:12 +02:00
|
|
|
.setCompactionReadaheadSize(2 * SizeUnit.MB) // recommend at least 2MB
|
2021-07-17 11:52:08 +02:00
|
|
|
// Option to tune write buffer for direct writes
|
2022-05-29 23:48:40 +02:00
|
|
|
.setWritableFileMaxBufferSize(SizeUnit.MB)
|
2021-07-17 11:52:08 +02:00
|
|
|
;
|
|
|
|
}
|
2022-05-29 23:48:40 +02:00
|
|
|
if (databaseOptions.spinning()) {
|
|
|
|
options
|
|
|
|
// method documentation
|
|
|
|
.setCompactionReadaheadSize(4 * SizeUnit.MB)
|
|
|
|
// guessed
|
|
|
|
.setWritableFileMaxBufferSize(2 * SizeUnit.MB);
|
|
|
|
}
|
2020-12-07 22:15:18 +01:00
|
|
|
} else {
|
|
|
|
// HIGH MEMORY
|
|
|
|
options
|
2022-04-26 17:12:22 +02:00
|
|
|
//.setDbWriteBufferSize(64 * SizeUnit.MB)
|
2022-05-02 00:42:38 +02:00
|
|
|
.setBytesPerSync(64 * SizeUnit.MB)
|
|
|
|
.setWalBytesPerSync(64 * SizeUnit.MB)
|
2022-01-12 16:18:31 +01:00
|
|
|
|
2022-05-01 17:25:22 +02:00
|
|
|
.setWalTtlSeconds(0) // Auto
|
|
|
|
.setWalSizeLimitMB(0) // Auto
|
|
|
|
.setMaxTotalWalSize(0) // Auto
|
2020-12-07 22:15:18 +01:00
|
|
|
;
|
2022-04-11 01:27:09 +02:00
|
|
|
// DO NOT USE ClockCache! IT'S BROKEN!
|
2022-04-11 16:53:17 +02:00
|
|
|
blockCache = new LRUCache(writeBufferManagerSize + databaseOptions.blockCache().orElse( 512 * SizeUnit.MB));
|
2022-05-22 16:48:08 +02:00
|
|
|
refs.track(blockCache);
|
2022-04-11 16:53:17 +02:00
|
|
|
if (databaseOptions.compressedBlockCache().isPresent()) {
|
|
|
|
compressedCache = new LRUCache(databaseOptions.compressedBlockCache().get());
|
2022-05-22 16:48:08 +02:00
|
|
|
refs.track(compressedCache);
|
2022-04-11 16:53:17 +02:00
|
|
|
} else {
|
|
|
|
compressedCache = null;
|
|
|
|
}
|
2021-06-25 23:47:53 +02:00
|
|
|
|
2022-06-09 00:49:08 +02:00
|
|
|
if (useDirectIO) {
|
2021-06-25 23:47:53 +02:00
|
|
|
options
|
|
|
|
// Option to enable readahead in compaction
|
|
|
|
// If not set, it will be set to 2MB internally
|
2022-05-29 23:48:40 +02:00
|
|
|
.setCompactionReadaheadSize(4 * SizeUnit.MB) // recommend at least 2MB
|
2021-06-25 23:47:53 +02:00
|
|
|
// Option to tune write buffer for direct writes
|
2022-05-29 23:48:40 +02:00
|
|
|
.setWritableFileMaxBufferSize(2 * SizeUnit.MB)
|
2021-06-25 23:47:53 +02:00
|
|
|
;
|
|
|
|
}
|
2022-05-29 23:48:40 +02:00
|
|
|
if (databaseOptions.spinning()) {
|
|
|
|
options
|
|
|
|
// method documentation
|
|
|
|
.setCompactionReadaheadSize(16 * SizeUnit.MB)
|
|
|
|
// guessed
|
|
|
|
.setWritableFileMaxBufferSize(8 * SizeUnit.MB);
|
|
|
|
}
|
2022-03-30 23:44:55 +02:00
|
|
|
options.setIncreaseParallelism(Runtime.getRuntime().availableProcessors());
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
2021-12-27 16:33:31 +01:00
|
|
|
|
2022-04-11 16:53:17 +02:00
|
|
|
if (databaseOptions.writeBufferManager().isPresent()) {
|
2022-05-22 16:48:08 +02:00
|
|
|
var writeBufferManager = new WriteBufferManager(writeBufferManagerSize, blockCache, false);
|
|
|
|
refs.track(writeBufferManager);
|
|
|
|
options.setWriteBufferManager(writeBufferManager);
|
2022-04-11 16:53:17 +02:00
|
|
|
}
|
2022-01-12 16:18:31 +01:00
|
|
|
|
2022-06-09 00:49:08 +02:00
|
|
|
if (useDirectIO) {
|
2021-07-17 11:52:08 +02:00
|
|
|
options
|
|
|
|
.setAllowMmapReads(false)
|
|
|
|
.setAllowMmapWrites(false)
|
|
|
|
.setUseDirectReads(true)
|
|
|
|
;
|
|
|
|
} else {
|
|
|
|
options
|
2022-06-09 00:49:08 +02:00
|
|
|
.setAllowMmapReads(allowMmapReads)
|
|
|
|
.setAllowMmapWrites(allowMmapWrites);
|
2021-07-17 11:52:08 +02:00
|
|
|
}
|
|
|
|
|
2022-06-09 00:49:08 +02:00
|
|
|
if (useDirectIO || !allowMmapWrites) {
|
2021-07-17 11:52:08 +02:00
|
|
|
options.setUseDirectIoForFlushAndCompaction(true);
|
|
|
|
}
|
2020-12-07 22:15:18 +01:00
|
|
|
|
2022-03-12 02:55:18 +01:00
|
|
|
return new OptionsWithCache(options, blockCache, compressedCache);
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
2022-04-26 17:12:22 +02:00
|
|
|
record DbPathRecord(Path path, long targetSize) {}
|
|
|
|
|
|
|
|
private static List<DbPathRecord> convertPaths(Path databasesDirPath, Path path, List<DatabaseVolume> volumes) {
|
|
|
|
var paths = new ArrayList<DbPathRecord>(volumes.size());
|
2021-12-27 16:33:31 +01:00
|
|
|
if (volumes.isEmpty()) {
|
2022-04-26 17:12:22 +02:00
|
|
|
return List.of(new DbPathRecord(databasesDirPath.resolve(path.getFileName() + "_hot"),
|
2022-04-15 16:49:01 +02:00
|
|
|
0), // Legacy
|
2022-04-26 17:12:22 +02:00
|
|
|
new DbPathRecord(databasesDirPath.resolve(path.getFileName() + "_cold"),
|
2022-04-07 20:03:29 +02:00
|
|
|
0), // Legacy
|
2022-04-26 17:12:22 +02:00
|
|
|
new DbPathRecord(databasesDirPath.resolve(path.getFileName() + "_colder"),
|
2022-04-15 16:49:01 +02:00
|
|
|
1000L * 1024L * 1024L * 1024L) // 1000GiB
|
2022-04-07 20:03:29 +02:00
|
|
|
); // Legacy
|
2021-12-27 16:33:31 +01:00
|
|
|
}
|
|
|
|
for (DatabaseVolume volume : volumes) {
|
|
|
|
Path volumePath;
|
|
|
|
if (volume.volumePath().isAbsolute()) {
|
|
|
|
volumePath = volume.volumePath();
|
|
|
|
} else {
|
|
|
|
volumePath = databasesDirPath.resolve(volume.volumePath());
|
|
|
|
}
|
2022-04-26 17:12:22 +02:00
|
|
|
paths.add(new DbPathRecord(volumePath, volume.targetSizeBytes()));
|
2021-12-27 16:33:31 +01:00
|
|
|
}
|
|
|
|
return paths;
|
|
|
|
}
|
|
|
|
|
2022-05-09 22:08:54 +02:00
|
|
|
private Statistics registerStatistics(String dbName, DBOptions dbOptions, MeterRegistry meterRegistry,
|
|
|
|
StatsLevel statsLevel) {
|
|
|
|
Statistics stats = new Statistics();
|
|
|
|
stats.setStatsLevel(statsLevel);
|
|
|
|
dbOptions.setStatistics(stats);
|
|
|
|
for (TickerType tickerType : TickerType.values()) {
|
|
|
|
if (tickerType == TickerType.TICKER_ENUM_MAX) {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
meterRegistry.gauge("rocksdb.statistics.value",
|
|
|
|
List.of(Tag.of("db.name", dbName), Tag.of("db.statistics.name", tickerType.name())),
|
|
|
|
stats,
|
|
|
|
statistics -> {
|
2022-06-08 18:52:15 +02:00
|
|
|
if (closeRequested || closed) return 0d;
|
|
|
|
long closeReadLock = 0;
|
2022-05-09 22:08:54 +02:00
|
|
|
try {
|
2022-06-08 18:52:15 +02:00
|
|
|
closeReadLock = closeLock.tryReadLock(1, TimeUnit.SECONDS);
|
|
|
|
} catch (InterruptedException ignored) {}
|
|
|
|
try {
|
|
|
|
if (closeRequested || closed || closeReadLock == 0) return 0d;
|
2022-05-09 22:08:54 +02:00
|
|
|
return statistics.getTickerCount(tickerType);
|
|
|
|
} finally {
|
|
|
|
closeLock.unlockRead(closeReadLock);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
);
|
|
|
|
}
|
|
|
|
return stats;
|
|
|
|
}
|
|
|
|
|
2022-05-20 10:20:00 +02:00
|
|
|
private Snapshot getSnapshotLambda(LLSnapshot snapshot) {
|
2022-04-30 14:21:20 +02:00
|
|
|
var closeReadSnapLock = closeLock.readLock();
|
|
|
|
try {
|
|
|
|
ensureOpen();
|
|
|
|
var snapshotHandle = snapshotsHandles.get(snapshot.getSequenceNumber());
|
|
|
|
ensureOwned(snapshotHandle);
|
|
|
|
return snapshotHandle;
|
|
|
|
} finally {
|
|
|
|
closeLock.unlockRead(closeReadSnapLock);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-12-07 22:15:18 +01:00
|
|
|
@Override
|
2022-03-20 14:33:27 +01:00
|
|
|
public Mono<LLLocalSingleton> getSingleton(byte[] singletonListColumnName,
|
|
|
|
byte[] name,
|
|
|
|
byte @Nullable[] defaultValue) {
|
2021-01-31 15:47:48 +01:00
|
|
|
return Mono
|
2022-04-30 14:21:20 +02:00
|
|
|
.fromCallable(() -> {
|
|
|
|
var closeReadLock = closeLock.readLock();
|
|
|
|
try {
|
|
|
|
ensureOpen();
|
|
|
|
var cfh = getCfh(singletonListColumnName);
|
|
|
|
ensureOwned(cfh);
|
|
|
|
return new LLLocalSingleton(
|
|
|
|
getRocksDBColumn(db, cfh),
|
|
|
|
this::getSnapshotLambda,
|
|
|
|
LLLocalKeyValueDatabase.this.name,
|
|
|
|
name,
|
|
|
|
ColumnUtils.toString(singletonListColumnName),
|
|
|
|
dbWScheduler, dbRScheduler, defaultValue
|
|
|
|
);
|
|
|
|
} finally {
|
|
|
|
closeLock.unlockRead(closeReadLock);
|
|
|
|
}
|
|
|
|
})
|
2021-09-07 11:26:10 +02:00
|
|
|
.onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(name), cause))
|
2022-04-05 13:58:12 +02:00
|
|
|
.subscribeOn(dbRScheduler);
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-02-13 01:31:24 +01:00
|
|
|
public Mono<LLLocalDictionary> getDictionary(byte[] columnName, UpdateMode updateMode) {
|
2022-05-02 19:05:40 +02:00
|
|
|
return Mono.fromCallable(() -> {
|
|
|
|
var closeReadLock = closeLock.readLock();
|
|
|
|
try {
|
|
|
|
ensureOpen();
|
|
|
|
var cfh = getCfh(columnName);
|
|
|
|
ensureOwned(cfh);
|
|
|
|
return new LLLocalDictionary(allocator,
|
|
|
|
getRocksDBColumn(db, cfh),
|
|
|
|
name,
|
|
|
|
ColumnUtils.toString(columnName),
|
|
|
|
dbWScheduler,
|
2022-05-12 19:14:27 +02:00
|
|
|
dbRScheduler, snapshot -> getSnapshotLambda(snapshot),
|
2022-05-02 19:05:40 +02:00
|
|
|
updateMode,
|
|
|
|
databaseOptions
|
|
|
|
);
|
|
|
|
} finally {
|
|
|
|
closeLock.unlockRead(closeReadLock);
|
|
|
|
}
|
|
|
|
}).subscribeOn(dbRScheduler);
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
2022-03-22 12:59:22 +01:00
|
|
|
public RocksDBColumn getRocksDBColumn(byte[] columnName) {
|
2022-04-30 14:21:20 +02:00
|
|
|
var closeReadLock = closeLock.readLock();
|
2022-03-22 12:59:22 +01:00
|
|
|
try {
|
2022-04-30 14:21:20 +02:00
|
|
|
ensureOpen();
|
2022-05-20 10:20:00 +02:00
|
|
|
ColumnFamilyHandle cfh;
|
2022-04-30 14:21:20 +02:00
|
|
|
try {
|
|
|
|
cfh = getCfh(columnName);
|
|
|
|
ensureOwned(cfh);
|
|
|
|
} catch (RocksDBException e) {
|
|
|
|
throw new UnsupportedOperationException("Column family doesn't exist: " + Arrays.toString(columnName), e);
|
|
|
|
}
|
|
|
|
return getRocksDBColumn(db, cfh);
|
|
|
|
} finally {
|
|
|
|
closeLock.unlockRead(closeReadLock);
|
2022-03-22 12:59:22 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-05-20 10:20:00 +02:00
|
|
|
private RocksDBColumn getRocksDBColumn(RocksDB db, ColumnFamilyHandle cfh) {
|
2022-04-06 02:41:32 +02:00
|
|
|
var nettyDirect = databaseOptions.allowNettyDirect();
|
2022-04-30 01:49:44 +02:00
|
|
|
var closeLock = getCloseLock();
|
2021-10-20 01:51:34 +02:00
|
|
|
if (db instanceof OptimisticTransactionDB optimisticTransactionDB) {
|
2022-04-28 23:23:26 +02:00
|
|
|
return new OptimisticRocksDBColumn(optimisticTransactionDB,
|
|
|
|
nettyDirect,
|
|
|
|
allocator,
|
|
|
|
name,
|
|
|
|
cfh,
|
|
|
|
meterRegistry,
|
2022-04-30 01:49:44 +02:00
|
|
|
closeLock
|
2022-04-28 23:23:26 +02:00
|
|
|
);
|
2021-12-27 18:44:54 +01:00
|
|
|
} else if (db instanceof TransactionDB transactionDB) {
|
2022-04-28 23:23:26 +02:00
|
|
|
return new PessimisticRocksDBColumn(transactionDB,
|
|
|
|
nettyDirect,
|
|
|
|
allocator,
|
|
|
|
name,
|
|
|
|
cfh,
|
|
|
|
meterRegistry,
|
2022-04-30 01:49:44 +02:00
|
|
|
closeLock
|
2022-04-28 23:23:26 +02:00
|
|
|
);
|
2021-10-20 01:51:34 +02:00
|
|
|
} else {
|
2022-04-30 01:49:44 +02:00
|
|
|
return new StandardRocksDBColumn(db, nettyDirect, allocator, name, cfh, meterRegistry, closeLock);
|
2021-10-20 01:51:34 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-05-20 10:20:00 +02:00
|
|
|
private ColumnFamilyHandle getCfh(byte[] columnName) throws RocksDBException {
|
2022-05-12 19:14:27 +02:00
|
|
|
var cfh = handles.get(ColumnUtils.special(ColumnUtils.toString(columnName)));
|
2022-05-20 10:20:00 +02:00
|
|
|
assert enableColumnsBug || Arrays.equals(cfh.getName(), columnName);
|
2021-06-19 16:26:54 +02:00
|
|
|
return cfh;
|
|
|
|
}
|
|
|
|
|
2021-06-27 15:40:56 +02:00
|
|
|
public DatabaseOptions getDatabaseOptions() {
|
|
|
|
return databaseOptions;
|
|
|
|
}
|
|
|
|
|
2022-04-26 17:12:22 +02:00
|
|
|
public Flux<Path> getSSTS() {
|
|
|
|
var paths = convertPaths(dbPath.toAbsolutePath().getParent(), dbPath.getFileName(), databaseOptions.volumes());
|
|
|
|
return Mono
|
2022-04-30 14:21:20 +02:00
|
|
|
.fromCallable(() -> {
|
|
|
|
var closeReadLock = closeLock.readLock();
|
|
|
|
try {
|
|
|
|
ensureOpen();
|
|
|
|
return db.getLiveFiles();
|
|
|
|
} finally {
|
|
|
|
closeLock.unlockRead(closeReadLock);
|
|
|
|
}
|
|
|
|
})
|
2022-04-26 17:12:22 +02:00
|
|
|
.flatMapIterable(liveFiles -> liveFiles.files)
|
|
|
|
.filter(file -> file.endsWith(".sst"))
|
|
|
|
.map(file -> file.substring(1))
|
|
|
|
.flatMapSequential(file -> Mono.fromCallable(() -> {
|
|
|
|
{
|
|
|
|
var path = dbPath.resolve(file);
|
|
|
|
if (Files.exists(path)) {
|
|
|
|
return path;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
for (var volumePath : paths) {
|
|
|
|
var path = volumePath.path().resolve(file);
|
|
|
|
if (Files.exists(path)) {
|
|
|
|
return path;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return null;
|
|
|
|
}).subscribeOn(Schedulers.boundedElastic()));
|
|
|
|
}
|
|
|
|
|
|
|
|
public Mono<Void> ingestSSTS(Flux<Path> sstsFlux) {
|
|
|
|
return sstsFlux
|
|
|
|
.map(path -> path.toAbsolutePath().toString())
|
|
|
|
.flatMap(sst -> Mono.fromCallable(() -> {
|
2022-04-30 14:21:20 +02:00
|
|
|
var closeReadLock = closeLock.readLock();
|
2022-04-26 17:12:22 +02:00
|
|
|
try (var opts = new IngestExternalFileOptions()) {
|
|
|
|
try {
|
|
|
|
logger.info("Ingesting SST \"{}\"...", sst);
|
|
|
|
db.ingestExternalFile(List.of(sst), opts);
|
|
|
|
logger.info("Ingested SST \"{}\" successfully", sst);
|
|
|
|
} catch (RocksDBException e) {
|
|
|
|
logger.error("Can't ingest SST \"{}\"", sst, e);
|
|
|
|
}
|
2022-04-30 14:21:20 +02:00
|
|
|
} finally {
|
|
|
|
closeLock.unlockRead(closeReadLock);
|
2022-04-26 17:12:22 +02:00
|
|
|
}
|
|
|
|
return null;
|
|
|
|
}).subscribeOn(Schedulers.boundedElastic()))
|
|
|
|
.then();
|
|
|
|
}
|
|
|
|
|
2022-01-15 20:00:10 +01:00
|
|
|
@Override
|
|
|
|
public Mono<MemoryStats> getMemoryStats() {
|
|
|
|
return Mono
|
2022-04-09 02:45:42 +02:00
|
|
|
.fromCallable(() -> {
|
2022-06-08 18:52:15 +02:00
|
|
|
if (closeRequested || closed) return null;
|
|
|
|
long closeReadLock = 0;
|
2022-04-30 14:21:20 +02:00
|
|
|
try {
|
2022-06-08 18:52:15 +02:00
|
|
|
//noinspection BlockingMethodInNonBlockingContext
|
|
|
|
closeReadLock = closeLock.tryReadLock(1, TimeUnit.SECONDS);
|
|
|
|
} catch (InterruptedException ignored) {}
|
|
|
|
try {
|
|
|
|
if (closeRequested || closed || closeReadLock == 0) return null;
|
|
|
|
ensureOpen();
|
|
|
|
return new MemoryStats(db.getAggregatedLongProperty("rocksdb.estimate-table-readers-mem"),
|
|
|
|
db.getAggregatedLongProperty("rocksdb.size-all-mem-tables"),
|
|
|
|
db.getAggregatedLongProperty("rocksdb.cur-size-all-mem-tables"),
|
|
|
|
db.getAggregatedLongProperty("rocksdb.estimate-num-keys"),
|
|
|
|
db.getAggregatedLongProperty("rocksdb.block-cache-usage") / this.handles.size(),
|
|
|
|
db.getAggregatedLongProperty("rocksdb.block-cache-pinned-usage") / this.handles.size()
|
|
|
|
);
|
2022-04-30 14:21:20 +02:00
|
|
|
} finally {
|
|
|
|
closeLock.unlockRead(closeReadLock);
|
2022-04-09 02:45:42 +02:00
|
|
|
}
|
|
|
|
})
|
2022-01-15 20:00:10 +01:00
|
|
|
.onErrorMap(cause -> new IOException("Failed to read memory stats", cause))
|
2022-04-05 13:58:12 +02:00
|
|
|
.subscribeOn(dbRScheduler);
|
2022-04-09 02:45:42 +02:00
|
|
|
}
|
|
|
|
|
2022-05-04 01:21:56 +02:00
|
|
|
@Override
|
|
|
|
public Mono<Map<String, String>> getMapProperty(@Nullable Column column, RocksDBMapProperty property) {
|
|
|
|
return Mono
|
|
|
|
.fromCallable(() -> {
|
|
|
|
var closeReadLock = closeLock.readLock();
|
|
|
|
try {
|
|
|
|
ensureOpen();
|
|
|
|
if (column == null) {
|
|
|
|
return db.getMapProperty(property.getName());
|
|
|
|
} else {
|
|
|
|
var cfh = requireNonNull(handles.get(column));
|
2022-05-20 10:20:00 +02:00
|
|
|
return db.getMapProperty(cfh, property.getName());
|
2022-05-04 01:21:56 +02:00
|
|
|
}
|
|
|
|
} finally {
|
|
|
|
closeLock.unlockRead(closeReadLock);
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.transform(this::convertNotFoundToEmpty)
|
|
|
|
.onErrorMap(cause -> new IOException("Failed to read property " + property.name(), cause))
|
|
|
|
.subscribeOn(dbRScheduler);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Flux<ColumnProperty<Map<String, String>>> getMapColumnProperties(RocksDBMapProperty property) {
|
|
|
|
return Flux
|
|
|
|
.fromIterable(getAllColumnFamilyHandles().keySet())
|
|
|
|
.flatMapSequential(c -> this
|
|
|
|
.getMapProperty(c, property)
|
|
|
|
.map(result -> new ColumnProperty<>(c.name(), property.getName(), result)));
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Mono<String> getStringProperty(@Nullable Column column, RocksDBStringProperty property) {
|
|
|
|
return Mono
|
|
|
|
.fromCallable(() -> {
|
|
|
|
var closeReadLock = closeLock.readLock();
|
|
|
|
try {
|
|
|
|
ensureOpen();
|
|
|
|
if (column == null) {
|
|
|
|
return db.getProperty(property.getName());
|
|
|
|
} else {
|
|
|
|
var cfh = requireNonNull(handles.get(column));
|
2022-05-20 10:20:00 +02:00
|
|
|
return db.getProperty(cfh, property.getName());
|
2022-05-04 01:21:56 +02:00
|
|
|
}
|
|
|
|
} finally {
|
|
|
|
closeLock.unlockRead(closeReadLock);
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.transform(this::convertNotFoundToEmpty)
|
|
|
|
.onErrorMap(cause -> new IOException("Failed to read property " + property.name(), cause))
|
|
|
|
.subscribeOn(dbRScheduler);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Flux<ColumnProperty<String>> getStringColumnProperties(RocksDBStringProperty property) {
|
|
|
|
return Flux
|
|
|
|
.fromIterable(getAllColumnFamilyHandles().keySet())
|
|
|
|
.flatMapSequential(c -> this
|
|
|
|
.getStringProperty(c, property)
|
|
|
|
.map(result -> new ColumnProperty<>(c.name(), property.getName(), result)));
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Mono<Long> getLongProperty(@Nullable Column column, RocksDBLongProperty property) {
|
|
|
|
return Mono
|
|
|
|
.fromCallable(() -> {
|
|
|
|
var closeReadLock = closeLock.readLock();
|
|
|
|
try {
|
|
|
|
ensureOpen();
|
|
|
|
if (column == null) {
|
|
|
|
return db.getLongProperty(property.getName());
|
|
|
|
} else {
|
|
|
|
var cfh = requireNonNull(handles.get(column));
|
2022-05-20 10:20:00 +02:00
|
|
|
return db.getLongProperty(cfh, property.getName());
|
2022-05-04 01:21:56 +02:00
|
|
|
}
|
|
|
|
} finally {
|
|
|
|
closeLock.unlockRead(closeReadLock);
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.transform(this::convertNotFoundToEmpty)
|
|
|
|
.onErrorMap(cause -> new IOException("Failed to read property " + property.name(), cause))
|
|
|
|
.subscribeOn(dbRScheduler);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Flux<ColumnProperty<Long>> getLongColumnProperties(RocksDBLongProperty property) {
|
|
|
|
return Flux
|
|
|
|
.fromIterable(getAllColumnFamilyHandles().keySet())
|
|
|
|
.flatMapSequential(c -> this
|
|
|
|
.getLongProperty(c, property)
|
|
|
|
.map(result -> new ColumnProperty<>(c.name(), property.getName(), result)));
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Mono<Long> getAggregatedLongProperty(RocksDBLongProperty property) {
|
|
|
|
return Mono
|
|
|
|
.fromCallable(() -> {
|
|
|
|
var closeReadLock = closeLock.readLock();
|
|
|
|
try {
|
|
|
|
ensureOpen();
|
|
|
|
return db.getAggregatedLongProperty(property.getName());
|
|
|
|
} finally {
|
|
|
|
closeLock.unlockRead(closeReadLock);
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.transform(this::convertNotFoundToEmpty)
|
|
|
|
.onErrorMap(cause -> new IOException("Failed to read property " + property.name(), cause))
|
|
|
|
.subscribeOn(dbRScheduler);
|
|
|
|
}
|
|
|
|
|
|
|
|
private <V> Mono<V> convertNotFoundToEmpty(Mono<V> mono) {
|
|
|
|
return mono.onErrorResume(RocksDBException.class, ex -> {
|
|
|
|
if (ex.getMessage().equals("NotFound")) {
|
|
|
|
return Mono.empty();
|
|
|
|
} else {
|
|
|
|
return Mono.error(ex);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2022-04-09 02:45:42 +02:00
|
|
|
@Override
|
|
|
|
public Mono<String> getRocksDBStats() {
|
|
|
|
return Mono
|
|
|
|
.fromCallable(() -> {
|
2022-06-08 18:52:15 +02:00
|
|
|
if (closeRequested || closed) return null;
|
|
|
|
long closeReadLock = 0;
|
2022-04-30 14:21:20 +02:00
|
|
|
try {
|
2022-06-08 18:52:15 +02:00
|
|
|
//noinspection BlockingMethodInNonBlockingContext
|
|
|
|
closeReadLock = closeLock.tryReadLock(1, TimeUnit.SECONDS);
|
|
|
|
} catch (InterruptedException ignored) {}
|
|
|
|
try {
|
|
|
|
if (closeRequested || closed || closeReadLock == 0) return null;
|
|
|
|
ensureOpen();
|
|
|
|
StringBuilder aggregatedStats = new StringBuilder();
|
|
|
|
for (var entry : this.handles.entrySet()) {
|
|
|
|
aggregatedStats
|
|
|
|
.append(entry.getKey().name())
|
|
|
|
.append("\n")
|
|
|
|
.append(db.getProperty(entry.getValue(), "rocksdb.stats"))
|
|
|
|
.append("\n");
|
2022-04-10 20:15:05 +02:00
|
|
|
}
|
2022-06-08 18:52:15 +02:00
|
|
|
return aggregatedStats.toString();
|
2022-04-30 14:21:20 +02:00
|
|
|
} finally {
|
|
|
|
closeLock.unlockRead(closeReadLock);
|
2022-04-09 02:45:42 +02:00
|
|
|
}
|
|
|
|
})
|
|
|
|
.onErrorMap(cause -> new IOException("Failed to read stats", cause))
|
|
|
|
.subscribeOn(dbRScheduler);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Flux<TableWithProperties> getTableProperties() {
|
2022-04-10 20:15:05 +02:00
|
|
|
return Flux
|
|
|
|
.fromIterable(handles.entrySet())
|
|
|
|
.flatMapSequential(handle -> Mono
|
|
|
|
.fromCallable(() -> {
|
2022-06-08 18:52:15 +02:00
|
|
|
if (closeRequested || closed) return null;
|
|
|
|
long closeReadLock = 0;
|
|
|
|
try {
|
|
|
|
//noinspection BlockingMethodInNonBlockingContext
|
|
|
|
closeReadLock = closeLock.tryReadLock(1, TimeUnit.SECONDS);
|
|
|
|
} catch (InterruptedException ignored) {}
|
2022-04-30 14:21:20 +02:00
|
|
|
try {
|
2022-06-08 18:52:15 +02:00
|
|
|
if (closeRequested || closed || closeReadLock == 0) return null;
|
2022-04-30 14:21:20 +02:00
|
|
|
ensureOpen();
|
2022-05-20 10:20:00 +02:00
|
|
|
return db.getPropertiesOfAllTables(handle.getValue());
|
2022-04-30 14:21:20 +02:00
|
|
|
} finally {
|
|
|
|
closeLock.unlockRead(closeReadLock);
|
2022-04-10 20:15:05 +02:00
|
|
|
}
|
|
|
|
})
|
|
|
|
.subscribeOn(dbRScheduler)
|
|
|
|
.flatMapIterable(Map::entrySet)
|
|
|
|
.map(entry -> new TableWithProperties(handle.getKey().name(), entry.getKey(), entry.getValue()))
|
|
|
|
)
|
|
|
|
.onErrorMap(cause -> new IOException("Failed to read stats", cause));
|
2022-01-15 20:00:10 +01:00
|
|
|
}
|
|
|
|
|
2021-06-27 15:06:48 +02:00
|
|
|
@Override
|
|
|
|
public Mono<Void> verifyChecksum() {
|
|
|
|
return Mono
|
|
|
|
.<Void>fromCallable(() -> {
|
2022-04-30 14:21:20 +02:00
|
|
|
var closeReadLock = closeLock.readLock();
|
|
|
|
try {
|
|
|
|
ensureOpen();
|
|
|
|
db.verifyChecksum();
|
|
|
|
} finally {
|
|
|
|
closeLock.unlockRead(closeReadLock);
|
|
|
|
}
|
2021-06-27 15:06:48 +02:00
|
|
|
return null;
|
|
|
|
})
|
|
|
|
.onErrorMap(cause -> new IOException("Failed to verify checksum of database \""
|
2021-09-07 11:26:10 +02:00
|
|
|
+ getDatabaseName() + "\"", cause))
|
2022-04-05 13:58:12 +02:00
|
|
|
.subscribeOn(dbRScheduler);
|
2021-06-27 15:06:48 +02:00
|
|
|
}
|
|
|
|
|
2022-04-26 17:12:22 +02:00
|
|
|
@Override
|
|
|
|
public Mono<Void> compact() {
|
|
|
|
return Mono.<Void>fromCallable(() -> {
|
|
|
|
this.forceCompaction(getLastVolumeId());
|
|
|
|
return null;
|
|
|
|
}).subscribeOn(dbWScheduler);
|
|
|
|
}
|
|
|
|
|
2022-04-28 23:23:26 +02:00
|
|
|
@Override
|
|
|
|
public Mono<Void> flush() {
|
|
|
|
return Mono.<Void>fromCallable(() -> {
|
|
|
|
try (var fo = new FlushOptions().setWaitForFlush(true)) {
|
|
|
|
this.flush(fo);
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
}).subscribeOn(dbWScheduler);
|
|
|
|
}
|
|
|
|
|
2021-05-03 21:41:51 +02:00
|
|
|
@Override
|
2021-08-29 23:18:03 +02:00
|
|
|
public BufferAllocator getAllocator() {
|
2021-05-03 21:41:51 +02:00
|
|
|
return allocator;
|
|
|
|
}
|
|
|
|
|
2021-10-30 11:13:46 +02:00
|
|
|
@Override
|
|
|
|
public MeterRegistry getMeterRegistry() {
|
|
|
|
return meterRegistry;
|
|
|
|
}
|
|
|
|
|
2020-12-07 22:15:18 +01:00
|
|
|
@Override
|
2021-01-30 01:42:37 +01:00
|
|
|
public Mono<LLSnapshot> takeSnapshot() {
|
2022-04-30 14:21:20 +02:00
|
|
|
return Mono.fromCallable(() -> {
|
|
|
|
var closeReadLock = closeLock.readLock();
|
|
|
|
try {
|
|
|
|
ensureOpen();
|
|
|
|
return snapshotTime.recordCallable(() -> {
|
2022-05-20 10:20:00 +02:00
|
|
|
var snapshot = db.getSnapshot();
|
2022-04-30 14:21:20 +02:00
|
|
|
long currentSnapshotSequenceNumber = nextSnapshotNumbers.getAndIncrement();
|
|
|
|
this.snapshotsHandles.put(currentSnapshotSequenceNumber, snapshot);
|
|
|
|
return new LLSnapshot(currentSnapshotSequenceNumber);
|
|
|
|
});
|
|
|
|
} finally {
|
|
|
|
closeLock.unlockRead(closeReadLock);
|
|
|
|
}
|
|
|
|
}).subscribeOn(dbRScheduler);
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-01-30 01:42:37 +01:00
|
|
|
public Mono<Void> releaseSnapshot(LLSnapshot snapshot) {
|
|
|
|
return Mono
|
2021-09-07 11:26:10 +02:00
|
|
|
.<Void>fromCallable(() -> {
|
2022-04-30 01:49:44 +02:00
|
|
|
var closeReadLock = closeLock.readLock();
|
2022-05-12 19:14:27 +02:00
|
|
|
try (var dbSnapshot = this.snapshotsHandles.remove(snapshot.getSequenceNumber())) {
|
2022-04-15 02:41:06 +02:00
|
|
|
if (dbSnapshot == null) {
|
|
|
|
throw new IOException("Snapshot " + snapshot.getSequenceNumber() + " not found!");
|
|
|
|
}
|
|
|
|
if (!db.isOwningHandle()) {
|
|
|
|
return null;
|
|
|
|
}
|
2022-05-20 10:20:00 +02:00
|
|
|
db.releaseSnapshot(dbSnapshot);
|
2022-04-09 16:31:32 +02:00
|
|
|
return null;
|
2022-04-15 02:41:06 +02:00
|
|
|
} finally {
|
2022-04-30 01:49:44 +02:00
|
|
|
closeLock.unlockRead(closeReadLock);
|
2022-04-09 16:31:32 +02:00
|
|
|
}
|
2021-09-07 11:26:10 +02:00
|
|
|
})
|
2022-04-05 13:58:12 +02:00
|
|
|
.subscribeOn(dbRScheduler);
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-01-31 19:52:47 +01:00
|
|
|
public Mono<Void> close() {
|
|
|
|
return Mono
|
|
|
|
.<Void>fromCallable(() -> {
|
2022-06-08 18:52:15 +02:00
|
|
|
closeRequested = true;
|
2022-05-09 22:08:54 +02:00
|
|
|
if (statistics != null) {
|
|
|
|
statistics.close();
|
|
|
|
statistics = null;
|
|
|
|
}
|
2021-01-31 19:52:47 +01:00
|
|
|
try {
|
2022-05-12 19:14:27 +02:00
|
|
|
flushAndCloseDb(db,
|
|
|
|
standardCache,
|
|
|
|
compressedCache,
|
2022-05-20 10:20:00 +02:00
|
|
|
new ArrayList<>(handles.values())
|
2022-05-12 19:14:27 +02:00
|
|
|
);
|
2022-05-18 00:52:01 +02:00
|
|
|
handles.values().forEach(columnFamilyHandleRocksObj -> {
|
|
|
|
if (columnFamilyHandleRocksObj.isAccessible()) {
|
|
|
|
columnFamilyHandleRocksObj.close();
|
|
|
|
}
|
|
|
|
});
|
2022-05-12 19:14:27 +02:00
|
|
|
handles.clear();
|
2021-01-31 19:52:47 +01:00
|
|
|
deleteUnusedOldLogFiles();
|
|
|
|
} catch (RocksDBException e) {
|
|
|
|
throw new IOException(e);
|
|
|
|
}
|
|
|
|
return null;
|
|
|
|
})
|
2021-09-07 11:26:10 +02:00
|
|
|
.onErrorMap(cause -> new IOException("Failed to close", cause))
|
2022-04-05 13:58:12 +02:00
|
|
|
.subscribeOn(dbWScheduler);
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Call this method ONLY AFTER flushing completely a db and closing it!
|
|
|
|
*/
|
2021-01-30 00:24:55 +01:00
|
|
|
@SuppressWarnings("unused")
|
2020-12-07 22:15:18 +01:00
|
|
|
private void deleteUnusedOldLogFiles() {
|
2022-03-22 00:23:32 +01:00
|
|
|
if (!DELETE_LOG_FILES) {
|
|
|
|
return;
|
|
|
|
}
|
2020-12-07 22:15:18 +01:00
|
|
|
Path basePath = dbPath;
|
|
|
|
try {
|
|
|
|
Files
|
|
|
|
.walk(basePath, 1)
|
|
|
|
.filter(p -> !p.equals(basePath))
|
|
|
|
.filter(p -> {
|
|
|
|
var fileName = p.getFileName().toString();
|
|
|
|
if (fileName.startsWith("LOG.old.")) {
|
|
|
|
var parts = fileName.split("\\.");
|
|
|
|
if (parts.length == 3) {
|
|
|
|
try {
|
|
|
|
long nameSuffix = Long.parseUnsignedLong(parts[2]);
|
|
|
|
return true;
|
|
|
|
} catch (NumberFormatException ex) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (fileName.endsWith(".log")) {
|
|
|
|
var parts = fileName.split("\\.");
|
|
|
|
if (parts.length == 2) {
|
|
|
|
try {
|
|
|
|
int name = Integer.parseUnsignedInt(parts[0]);
|
|
|
|
return true;
|
|
|
|
} catch (NumberFormatException ex) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
})
|
|
|
|
.filter(p -> {
|
|
|
|
try {
|
|
|
|
BasicFileAttributes attrs = Files.readAttributes(p, BasicFileAttributes.class);
|
|
|
|
if (attrs.isRegularFile() && !attrs.isSymbolicLink() && !attrs.isDirectory()) {
|
|
|
|
long ctime = attrs.creationTime().toMillis();
|
|
|
|
long atime = attrs.lastAccessTime().toMillis();
|
|
|
|
long mtime = attrs.lastModifiedTime().toMillis();
|
|
|
|
long lastTime = Math.max(Math.max(ctime, atime), mtime);
|
|
|
|
long safeTime;
|
|
|
|
if (p.getFileName().toString().startsWith("LOG.old.")) {
|
|
|
|
safeTime = System.currentTimeMillis() - Duration.ofHours(24).toMillis();
|
|
|
|
} else {
|
|
|
|
safeTime = System.currentTimeMillis() - Duration.ofHours(12).toMillis();
|
|
|
|
}
|
|
|
|
if (lastTime < safeTime) {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} catch (IOException ex) {
|
2021-03-19 20:55:38 +01:00
|
|
|
logger.error("Error when deleting unused log files", ex);
|
2020-12-07 22:15:18 +01:00
|
|
|
return false;
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
})
|
|
|
|
.forEach(path -> {
|
|
|
|
try {
|
|
|
|
Files.deleteIfExists(path);
|
|
|
|
System.out.println("Deleted log file \"" + path + "\"");
|
|
|
|
} catch (IOException e) {
|
2021-09-10 12:13:52 +02:00
|
|
|
logger.error(MARKER_ROCKSDB, "Failed to delete log file \"" + path + "\"", e);
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
});
|
|
|
|
} catch (IOException ex) {
|
2021-09-10 12:13:52 +02:00
|
|
|
logger.error(MARKER_ROCKSDB, "Failed to delete unused log files", ex);
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
}
|
2022-04-15 02:41:06 +02:00
|
|
|
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|