CavalliumDBEngine/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java

674 lines
23 KiB
Java
Raw Normal View History

2020-12-07 22:15:18 +01:00
package it.cavallium.dbengine.database.disk;
2021-09-10 12:13:52 +02:00
import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
import io.micrometer.core.instrument.MeterRegistry;
2021-09-17 16:56:28 +02:00
import io.net5.buffer.api.BufferAllocator;
import io.net5.util.internal.PlatformDependent;
2021-01-30 00:24:55 +01:00
import it.cavallium.dbengine.database.Column;
2021-07-01 21:19:52 +02:00
import it.cavallium.dbengine.client.DatabaseOptions;
2021-01-30 00:24:55 +01:00
import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.LLSnapshot;
2021-02-13 01:31:24 +01:00
import it.cavallium.dbengine.database.UpdateMode;
2020-12-07 22:15:18 +01:00
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.BasicFileAttributes;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
2020-12-07 22:15:18 +01:00
import java.util.concurrent.ConcurrentHashMap;
2021-03-21 13:06:54 +01:00
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
2020-12-07 22:15:18 +01:00
import java.util.concurrent.atomic.AtomicLong;
2021-03-21 13:06:54 +01:00
import org.apache.commons.lang3.time.StopWatch;
2021-07-10 20:52:01 +02:00
import org.jetbrains.annotations.Nullable;
2020-12-07 22:15:18 +01:00
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
2021-07-17 11:52:08 +02:00
import org.rocksdb.ClockCache;
2020-12-07 22:15:18 +01:00
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
2021-03-19 20:55:38 +01:00
import org.rocksdb.CompactRangeOptions;
2021-05-04 01:21:29 +02:00
import org.rocksdb.CompactionPriority;
2020-12-07 22:15:18 +01:00
import org.rocksdb.CompactionStyle;
import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions;
import org.rocksdb.DbPath;
import org.rocksdb.FlushOptions;
2021-07-17 11:52:08 +02:00
import org.rocksdb.IndexType;
2021-10-17 19:52:43 +02:00
import org.rocksdb.OptimisticTransactionDB;
2021-10-21 10:00:39 +02:00
import org.rocksdb.OptimisticTransactionOptions;
2020-12-07 22:15:18 +01:00
import org.rocksdb.Options;
import org.rocksdb.RateLimiter;
2020-12-07 22:15:18 +01:00
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.Snapshot;
2021-10-20 01:51:34 +02:00
import org.rocksdb.Transaction;
2021-10-17 19:52:43 +02:00
import org.rocksdb.TransactionDB;
2021-10-20 01:51:34 +02:00
import org.rocksdb.TransactionDBOptions;
2020-12-07 22:15:18 +01:00
import org.rocksdb.WALRecoveryMode;
2021-03-20 12:41:11 +01:00
import org.rocksdb.WriteBufferManager;
2021-03-19 20:55:38 +01:00
import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory;
2021-01-30 01:42:37 +01:00
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
2021-01-30 01:42:37 +01:00
import reactor.core.scheduler.Schedulers;
2020-12-07 22:15:18 +01:00
public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
static {
RocksDB.loadLibrary();
}
2021-03-19 20:55:38 +01:00
protected static final Logger logger = LoggerFactory.getLogger(LLLocalKeyValueDatabase.class);
2020-12-07 22:15:18 +01:00
private static final ColumnFamilyDescriptor DEFAULT_COLUMN_FAMILY = new ColumnFamilyDescriptor(
RocksDB.DEFAULT_COLUMN_FAMILY);
2021-08-29 23:18:03 +02:00
private final BufferAllocator allocator;
private final MeterRegistry meterRegistry;
private final Scheduler dbScheduler;
2021-06-27 15:40:56 +02:00
// Configurations
2020-12-07 22:15:18 +01:00
private final Path dbPath;
private final String name;
2021-06-27 15:40:56 +02:00
private final DatabaseOptions databaseOptions;
2021-06-19 16:26:54 +02:00
private final boolean enableColumnsBug;
2021-10-20 01:51:34 +02:00
private RocksDB db;
2020-12-07 22:15:18 +01:00
private final Map<Column, ColumnFamilyHandle> handles;
private final ConcurrentHashMap<Long, Snapshot> snapshotsHandles = new ConcurrentHashMap<>();
private final AtomicLong nextSnapshotNumbers = new AtomicLong(1);
2021-06-27 15:40:56 +02:00
@SuppressWarnings("SwitchStatementWithTooFewBranches")
2021-08-29 23:18:03 +02:00
public LLLocalKeyValueDatabase(BufferAllocator allocator,
MeterRegistry meterRegistry,
2021-05-03 21:41:51 +02:00
String name,
2021-07-10 20:52:01 +02:00
@Nullable Path path,
2021-05-03 21:41:51 +02:00
List<Column> columns,
List<ColumnFamilyHandle> handles,
2021-06-27 15:40:56 +02:00
DatabaseOptions databaseOptions) throws IOException {
this.name = name;
2021-05-03 21:41:51 +02:00
this.allocator = allocator;
this.meterRegistry = meterRegistry;
2021-09-02 17:15:40 +02:00
if (databaseOptions.allowNettyDirect()) {
if (!PlatformDependent.hasUnsafe()) {
throw new UnsupportedOperationException("Please enable unsafe support or disable netty direct buffers",
PlatformDependent.getUnsafeUnavailabilityCause()
);
}
2021-09-02 21:14:26 +02:00
if (!MemorySegmentUtils.isSupported()) {
2021-09-03 02:22:55 +02:00
throw new UnsupportedOperationException("Foreign Memory Access API support is disabled."
2021-09-22 11:03:39 +02:00
+ " Please set \"" + MemorySegmentUtils.getSuggestedArgs() + "\"",
MemorySegmentUtils.getUnsupportedCause()
);
2021-09-02 17:15:40 +02:00
}
}
2021-06-27 15:40:56 +02:00
Options rocksdbOptions = openRocksDb(path, databaseOptions);
2020-12-07 22:15:18 +01:00
try {
List<ColumnFamilyDescriptor> descriptors = new LinkedList<>();
descriptors
.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY));
2020-12-07 22:15:18 +01:00
for (Column column : columns) {
descriptors
2021-06-19 16:26:54 +02:00
.add(new ColumnFamilyDescriptor(column.name().getBytes(StandardCharsets.US_ASCII)));
2020-12-07 22:15:18 +01:00
}
// Get databases directory path
Objects.requireNonNull(path);
2020-12-07 22:15:18 +01:00
Path databasesDirPath = path.toAbsolutePath().getParent();
String dbPathString = databasesDirPath.toString() + File.separatorChar + path.getFileName();
Path dbPath = Paths.get(dbPathString);
this.dbPath = dbPath;
2021-06-27 15:40:56 +02:00
// Set options
this.databaseOptions = databaseOptions;
int threadCap;
if (databaseOptions.lowMemory()) {
threadCap = Runtime.getRuntime().availableProcessors();
2021-06-27 16:33:23 +02:00
} else {
// 8 or more
threadCap = Math.max(8, Runtime.getRuntime().availableProcessors());
2021-06-27 15:40:56 +02:00
}
2021-09-08 23:51:05 +02:00
this.dbScheduler = Schedulers.boundedElastic(); /*Schedulers.newBoundedElastic(threadCap,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
"db-" + name,
60,
true
2021-09-08 23:51:05 +02:00
);*/
2021-06-27 15:40:56 +02:00
this.enableColumnsBug = "true".equals(databaseOptions.extraFlags().getOrDefault("enableColumnBug", "false"));
2020-12-07 22:15:18 +01:00
2021-06-27 15:40:56 +02:00
createIfNotExists(descriptors, rocksdbOptions, databaseOptions, dbPath, dbPathString);
2021-06-09 02:56:53 +02:00
2021-06-25 23:47:53 +02:00
while (true) {
try {
// a factory method that returns a RocksDB instance
2021-10-21 10:00:39 +02:00
this.db = OptimisticTransactionDB.open(new DBOptions(rocksdbOptions), dbPathString, descriptors, handles);
2021-06-25 23:47:53 +02:00
break;
} catch (RocksDBException ex) {
switch (ex.getMessage()) {
case "Direct I/O is not supported by the specified DB." -> {
logger.warn(ex.getLocalizedMessage());
2021-06-27 15:40:56 +02:00
rocksdbOptions
2021-06-25 23:47:53 +02:00
.setUseDirectReads(false)
2021-06-27 15:06:48 +02:00
.setUseDirectIoForFlushAndCompaction(false)
2021-06-27 15:40:56 +02:00
.setAllowMmapReads(databaseOptions.allowMemoryMapping())
.setAllowMmapWrites(databaseOptions.allowMemoryMapping());
2021-06-25 23:47:53 +02:00
}
default -> throw ex;
}
}
}
2020-12-07 22:15:18 +01:00
this.handles = new HashMap<>();
2021-06-27 15:40:56 +02:00
if (enableColumnsBug && !databaseOptions.inMemory()) {
2021-06-19 16:26:54 +02:00
for (int i = 0; i < columns.size(); i++) {
this.handles.put(columns.get(i), handles.get(i));
}
} else {
handles: for (ColumnFamilyHandle handle : handles) {
for (Column column : columns) {
if (Arrays.equals(column.name().getBytes(StandardCharsets.US_ASCII), handle.getName())) {
this.handles.put(column, handle);
continue handles;
}
}
}
2020-12-07 22:15:18 +01:00
}
2021-03-21 13:06:54 +01:00
// compactDb(db, handles);
2020-12-07 22:15:18 +01:00
flushDb(db, handles);
} catch (RocksDBException ex) {
throw new IOException(ex);
}
}
@Override
public String getDatabaseName() {
return name;
}
2021-10-20 01:51:34 +02:00
private void flushAndCloseDb(RocksDB db, List<ColumnFamilyHandle> handles)
2020-12-07 22:15:18 +01:00
throws RocksDBException {
flushDb(db, handles);
for (ColumnFamilyHandle handle : handles) {
2021-07-06 22:27:03 +02:00
try {
handle.close();
} catch (Exception ex) {
logger.error("Can't close column family", ex);
}
2020-12-07 22:15:18 +01:00
}
2021-07-01 21:19:52 +02:00
try {
db.closeE();
} catch (RocksDBException ex) {
if ("Cannot close DB with unreleased snapshot.".equals(ex.getMessage())) {
2021-07-10 20:52:01 +02:00
snapshotsHandles.forEach((id, snapshot) -> {
2021-07-01 21:19:52 +02:00
try {
db.releaseSnapshot(snapshot);
} catch (Exception ex2) {
// ignore exception
logger.debug("Failed to release snapshot " + id, ex2);
}
});
db.closeE();
}
throw ex;
}
2020-12-07 22:15:18 +01:00
}
2021-10-20 01:51:34 +02:00
private void flushDb(RocksDB db, List<ColumnFamilyHandle> handles) throws RocksDBException {
if (Schedulers.isInNonBlockingThread()) {
logger.error("Called flushDb in a nonblocking thread");
}
2020-12-07 22:15:18 +01:00
// force flush the database
2021-07-06 22:27:03 +02:00
try (var flushOptions = new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true)) {
db.flush(flushOptions);
2020-12-07 22:15:18 +01:00
}
2021-07-06 22:27:03 +02:00
try (var flushOptions = new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true)) {
db.flush(flushOptions, handles);
}
db.flushWal(true);
db.syncWal();
2020-12-07 22:15:18 +01:00
// end force flush
}
2021-04-03 19:09:06 +02:00
@SuppressWarnings("unused")
2021-10-20 01:51:34 +02:00
private void compactDb(TransactionDB db, List<ColumnFamilyHandle> handles) {
if (Schedulers.isInNonBlockingThread()) {
logger.error("Called compactDb in a nonblocking thread");
}
2021-03-19 20:55:38 +01:00
// force compact the database
for (ColumnFamilyHandle cfh : handles) {
var t = new Thread(() -> {
2021-03-21 13:06:54 +01:00
int r = ThreadLocalRandom.current().nextInt();
var s = StopWatch.createStarted();
2021-03-19 20:55:38 +01:00
try {
// Range rangeToCompact = db.suggestCompactRange(cfh);
2021-03-21 13:06:54 +01:00
logger.info("Compacting range {}", r);
db.compactRange(cfh, null, null, new CompactRangeOptions()
.setAllowWriteStall(true)
.setExclusiveManualCompaction(true)
.setChangeLevel(false));
2021-03-19 20:55:38 +01:00
} catch (RocksDBException e) {
if ("Database shutdown".equalsIgnoreCase(e.getMessage())) {
logger.warn("Compaction cancelled: database shutdown");
} else {
logger.warn("Failed to compact range", e);
}
}
2021-03-21 13:06:54 +01:00
logger.info("Compacted range {} in {} milliseconds", r, s.getTime(TimeUnit.MILLISECONDS));
2021-03-19 20:55:38 +01:00
}, "Compaction");
t.setDaemon(true);
t.start();
}
// end force compact
}
2021-05-04 01:21:29 +02:00
@SuppressWarnings({"CommentedOutCode", "PointlessArithmeticExpression"})
2021-07-10 20:52:01 +02:00
private static Options openRocksDb(@Nullable Path path, DatabaseOptions databaseOptions) throws IOException {
2020-12-07 22:15:18 +01:00
// Get databases directory path
2021-07-10 20:52:01 +02:00
Path databasesDirPath;
if (path != null) {
databasesDirPath = path.toAbsolutePath().getParent();
// Create base directories
if (Files.notExists(databasesDirPath)) {
Files.createDirectories(databasesDirPath);
}
} else {
databasesDirPath = null;
2020-12-07 22:15:18 +01:00
}
// the Options class contains a set of configurable DB options
// that determines the behaviour of the database.
var options = new Options();
options.setCreateIfMissing(true);
2021-07-06 22:27:03 +02:00
options.setCreateMissingColumnFamilies(true);
2020-12-07 22:15:18 +01:00
options.setCompactionStyle(CompactionStyle.LEVEL);
options.setTargetFileSizeBase(64 * 1024 * 1024); // 64MiB sst file
options.setTargetFileSizeMultiplier(2); // Each level is 2 times the previous level
options.setCompressionPerLevel(List.of(CompressionType.NO_COMPRESSION,
CompressionType.SNAPPY_COMPRESSION,
CompressionType.SNAPPY_COMPRESSION
));
//options.setMaxBytesForLevelBase(4 * 256 * 1024 * 1024); // 4 times the sst file
2020-12-07 22:15:18 +01:00
options.setManualWalFlush(false);
options.setMinWriteBufferNumberToMerge(3);
options.setMaxWriteBufferNumber(4);
options.setAvoidFlushDuringShutdown(false); // Flush all WALs during shutdown
options.setAvoidFlushDuringRecovery(false); // Flush all WALs during startup
2021-06-27 15:40:56 +02:00
options.setWalRecoveryMode(databaseOptions.absoluteConsistency()
? WALRecoveryMode.AbsoluteConsistency
: WALRecoveryMode.PointInTimeRecovery); // Crash if the WALs are corrupted.Default: TolerateCorruptedTailRecords
2020-12-07 22:15:18 +01:00
options.setDeleteObsoleteFilesPeriodMicros(20 * 1000000); // 20 seconds
options.setPreserveDeletes(false);
options.setKeepLogFileNum(10);
2021-03-20 12:41:11 +01:00
options.setAllowFAllocate(true);
options.setRateLimiter(new RateLimiter(10L * 1024L * 1024L)); // 10MiB/s max compaction write speed
2021-07-10 20:52:01 +02:00
Objects.requireNonNull(databasesDirPath);
Objects.requireNonNull(path.getFileName());
2021-07-10 20:52:01 +02:00
List<DbPath> paths = List.of(new DbPath(databasesDirPath.resolve(path.getFileName() + "_hot"),
2021-06-09 02:56:53 +02:00
10L * 1024L * 1024L * 1024L), // 10GiB
new DbPath(databasesDirPath.resolve(path.getFileName() + "_cold"),
2021-05-12 19:02:51 +02:00
100L * 1024L * 1024L * 1024L), // 100GiB
new DbPath(databasesDirPath.resolve(path.getFileName() + "_colder"),
2021-06-09 02:56:53 +02:00
600L * 1024L * 1024L * 1024L)); // 600GiB
options.setDbPaths(paths);
options.setCfPaths(paths);
2021-07-24 23:47:51 +02:00
options.setMaxOpenFiles(databaseOptions.maxOpenFiles());
2020-12-07 22:15:18 +01:00
// Direct I/O parameters. Removed because they use too much disk.
//options.setUseDirectReads(true);
//options.setUseDirectIoForFlushAndCompaction(true);
//options.setWritableFileMaxBufferSize(1024 * 1024); // 1MB by default
2021-03-22 20:02:19 +01:00
//options.setCompactionReadaheadSize(2 * 1024 * 1024); // recommend at least 2MB
2021-03-20 12:41:11 +01:00
final BlockBasedTableConfig tableOptions = new BlockBasedTableConfig();
2021-06-27 15:40:56 +02:00
if (databaseOptions.lowMemory()) {
2020-12-07 22:15:18 +01:00
// LOW MEMORY
options
2021-05-18 01:10:30 +02:00
.setLevelCompactionDynamicLevelBytes(false)
.setBytesPerSync(0) // default
.setWalBytesPerSync(0) // default
2020-12-07 22:15:18 +01:00
.setIncreaseParallelism(1)
.optimizeLevelStyleCompaction(1024 * 1024) // 1MiB of ram will be used for level style compaction
.setWriteBufferSize(1024 * 1024) // 1MB
2021-05-18 01:10:30 +02:00
.setWalTtlSeconds(0)
.setWalSizeLimitMB(0) // 16MB
.setMaxTotalWalSize(0) // automatic
;
tableOptions
2021-07-17 11:52:08 +02:00
.setIndexType(IndexType.kTwoLevelIndexSearch)
.setPartitionFilters(true)
.setMetadataBlockSize(4096)
.setBlockCache(new ClockCache(8L * 1024L * 1024L)) // 8MiB
.setCacheIndexAndFilterBlocks(true)
.setCacheIndexAndFilterBlocksWithHighPriority(true)
.setPinL0FilterAndIndexBlocksInCache(true)
2020-12-07 22:15:18 +01:00
;
2021-07-17 11:52:08 +02:00
options.setWriteBufferManager(new WriteBufferManager(8L * 1024L * 1024L, new ClockCache(8L * 1024L * 1024L))); // 8MiB
if (databaseOptions.useDirectIO()) {
options
// Option to enable readahead in compaction
// If not set, it will be set to 2MB internally
.setCompactionReadaheadSize(2 * 1024 * 1024) // recommend at least 2MB
// Option to tune write buffer for direct writes
.setWritableFileMaxBufferSize(1024 * 1024)
;
}
2020-12-07 22:15:18 +01:00
} else {
// HIGH MEMORY
options
2021-05-18 01:10:30 +02:00
.setLevelCompactionDynamicLevelBytes(true)
2020-12-07 22:15:18 +01:00
.setAllowConcurrentMemtableWrite(true)
.setEnableWriteThreadAdaptiveYield(true)
.setIncreaseParallelism(Runtime.getRuntime().availableProcessors())
2021-05-04 01:21:29 +02:00
.setBytesPerSync(1 * 1024 * 1024) // 1MiB
2020-12-07 22:15:18 +01:00
.setWalBytesPerSync(10 * 1024 * 1024)
.optimizeLevelStyleCompaction(
128 * 1024 * 1024) // 128MiB of ram will be used for level style compaction
2021-03-20 12:41:11 +01:00
.setWriteBufferSize(64 * 1024 * 1024) // 64MB
2021-05-18 01:10:30 +02:00
.setWalTtlSeconds(30) // flush wal after 30 seconds
2020-12-07 22:15:18 +01:00
.setWalSizeLimitMB(1024) // 1024MB
2021-03-20 12:41:11 +01:00
.setMaxTotalWalSize(2L * 1024L * 1024L * 1024L) // 2GiB max wal directory size
2020-12-07 22:15:18 +01:00
;
2021-05-18 01:10:30 +02:00
tableOptions
2021-07-17 11:52:08 +02:00
.setIndexType(IndexType.kTwoLevelIndexSearch)
.setPartitionFilters(true)
.setMetadataBlockSize(4096)
.setBlockCache(new ClockCache(512L * 1024L * 1024L)) // 512MiB
2021-05-18 01:10:30 +02:00
.setCacheIndexAndFilterBlocks(true)
2021-07-17 11:52:08 +02:00
.setCacheIndexAndFilterBlocksWithHighPriority(true)
2021-05-18 01:10:30 +02:00
.setPinL0FilterAndIndexBlocksInCache(true)
;
final BloomFilter bloomFilter = new BloomFilter(10, false);
2021-07-23 22:43:24 +02:00
// Disabled this option because it can cause memory corruption
//tableOptions.setOptimizeFiltersForMemory(true);
2021-05-18 01:10:30 +02:00
tableOptions.setFilterPolicy(bloomFilter);
2021-07-17 11:52:08 +02:00
options.setWriteBufferManager(new WriteBufferManager(256L * 1024L * 1024L, new ClockCache(128L * 1024L * 1024L))); // 128MiB
2021-06-25 23:47:53 +02:00
2021-06-27 15:40:56 +02:00
if (databaseOptions.useDirectIO()) {
2021-06-25 23:47:53 +02:00
options
// Option to enable readahead in compaction
// If not set, it will be set to 2MB internally
2021-07-17 11:52:08 +02:00
.setCompactionReadaheadSize(4 * 1024 * 1024) // recommend at least 2MB
2021-06-25 23:47:53 +02:00
// Option to tune write buffer for direct writes
2021-07-17 11:52:08 +02:00
.setWritableFileMaxBufferSize(4 * 1024 * 1024)
2021-06-25 23:47:53 +02:00
;
}
2020-12-07 22:15:18 +01:00
}
2021-07-17 11:52:08 +02:00
if (databaseOptions.useDirectIO()) {
options
.setAllowMmapReads(false)
.setAllowMmapWrites(false)
.setUseDirectReads(true)
;
} else {
options
.setAllowMmapReads(databaseOptions.allowMemoryMapping())
.setAllowMmapWrites(databaseOptions.allowMemoryMapping());
}
if (!databaseOptions.allowMemoryMapping()) {
options.setUseDirectIoForFlushAndCompaction(true);
}
2020-12-07 22:15:18 +01:00
2021-05-04 01:21:29 +02:00
tableOptions.setBlockSize(16 * 1024); // 16MiB
2020-12-07 22:15:18 +01:00
options.setTableFormatConfig(tableOptions);
2021-05-04 01:21:29 +02:00
options.setCompactionPriority(CompactionPriority.MinOverlappingRatio);
2020-12-07 22:15:18 +01:00
return options;
}
private void createIfNotExists(List<ColumnFamilyDescriptor> descriptors,
Options options,
2021-06-27 15:40:56 +02:00
DatabaseOptions databaseOptions,
Path dbPath,
String dbPathString) throws RocksDBException {
2021-06-27 15:40:56 +02:00
if (databaseOptions.inMemory()) {
return;
}
2020-12-07 22:15:18 +01:00
if (Files.notExists(dbPath)) {
// Check if handles are all different
var descriptorsSet = new HashSet<>(descriptors);
if (descriptorsSet.size() != descriptors.size()) {
throw new IllegalArgumentException("Descriptors must be unique!");
}
List<ColumnFamilyDescriptor> descriptorsToCreate = new LinkedList<>(descriptors);
descriptorsToCreate
.removeIf((cf) -> Arrays.equals(cf.getName(), DEFAULT_COLUMN_FAMILY.getName()));
2021-01-30 00:24:55 +01:00
/*
SkipStatsUpdateOnDbOpen = true because this RocksDB.open session is used only to add just some columns
2020-12-07 22:15:18 +01:00
*/
//var dbOptionsFastLoadSlowEdit = options.setSkipStatsUpdateOnDbOpen(true);
LinkedList<ColumnFamilyHandle> handles = new LinkedList<>();
2021-10-21 10:00:39 +02:00
this.db = RocksDB.open(options, dbPathString);
2020-12-07 22:15:18 +01:00
for (ColumnFamilyDescriptor columnFamilyDescriptor : descriptorsToCreate) {
handles.add(db.createColumnFamily(columnFamilyDescriptor));
}
2021-06-27 15:40:56 +02:00
flushAndCloseDb(db, handles);
2021-10-21 10:00:39 +02:00
this.db = null;
2020-12-07 22:15:18 +01:00
}
}
@Override
2021-01-31 15:47:48 +01:00
public Mono<LLLocalSingleton> getSingleton(byte[] singletonListColumnName, byte[] name, byte[] defaultValue) {
return Mono
.fromCallable(() -> new LLLocalSingleton(db,
2021-06-19 16:26:54 +02:00
getCfh(singletonListColumnName),
2021-01-31 15:47:48 +01:00
(snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()),
LLLocalKeyValueDatabase.this.name,
name,
dbScheduler,
2021-01-31 15:47:48 +01:00
defaultValue
))
.onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(name), cause))
.subscribeOn(dbScheduler);
2020-12-07 22:15:18 +01:00
}
@Override
2021-02-13 01:31:24 +01:00
public Mono<LLLocalDictionary> getDictionary(byte[] columnName, UpdateMode updateMode) {
2021-01-31 15:47:48 +01:00
return Mono
2021-06-26 02:35:33 +02:00
.fromCallable(() -> new LLLocalDictionary(
allocator,
2021-10-20 01:51:34 +02:00
getRocksDBColumn(db, getCfh(columnName)),
2021-06-26 02:35:33 +02:00
name,
Column.toString(columnName),
dbScheduler,
2021-06-26 02:35:33 +02:00
(snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()),
2021-06-29 23:31:02 +02:00
updateMode,
databaseOptions
))
.subscribeOn(dbScheduler);
2020-12-07 22:15:18 +01:00
}
2021-10-20 01:51:34 +02:00
private RocksDBColumn getRocksDBColumn(RocksDB db, ColumnFamilyHandle cfh) {
if (db instanceof OptimisticTransactionDB optimisticTransactionDB) {
return new OptimisticRocksDBColumn(optimisticTransactionDB, databaseOptions, allocator, cfh, meterRegistry);
2021-10-20 01:51:34 +02:00
} else if (db instanceof TransactionDB) {
return new PessimisticRocksDBColumn((TransactionDB) db, databaseOptions, allocator, cfh, meterRegistry);
2021-10-20 01:51:34 +02:00
} else {
return new StandardRocksDBColumn(db, databaseOptions, allocator, cfh, meterRegistry);
2021-10-20 01:51:34 +02:00
}
}
2021-06-19 16:26:54 +02:00
private ColumnFamilyHandle getCfh(byte[] columnName) throws RocksDBException {
ColumnFamilyHandle cfh = handles.get(Column.special(Column.toString(columnName)));
2021-08-28 22:42:51 +02:00
assert enableColumnsBug || Arrays.equals(cfh.getName(), columnName);
2021-06-19 16:26:54 +02:00
return cfh;
}
2021-06-27 15:40:56 +02:00
public DatabaseOptions getDatabaseOptions() {
return databaseOptions;
}
2020-12-07 22:15:18 +01:00
@Override
2021-01-31 15:47:48 +01:00
public Mono<Long> getProperty(String propertyName) {
return Mono.fromCallable(() -> db.getAggregatedLongProperty(propertyName))
2021-09-10 12:13:46 +02:00
.map(val -> {
// Sometimes this two properties return a negative value, I don't know why.
if (Objects.equals(propertyName, "rocksdb.cur-size-all-mem-tables")
|| Objects.equals(propertyName, "rocksdb.size-all-mem-tables")) {
return Math.abs(val);
} else {
return val;
}
})
.onErrorMap(cause -> new IOException("Failed to read " + propertyName, cause))
.subscribeOn(dbScheduler);
2020-12-07 22:15:18 +01:00
}
2021-06-27 15:06:48 +02:00
@Override
public Mono<Void> verifyChecksum() {
return Mono
.<Void>fromCallable(() -> {
db.verifyChecksum();
return null;
})
.onErrorMap(cause -> new IOException("Failed to verify checksum of database \""
+ getDatabaseName() + "\"", cause))
.subscribeOn(dbScheduler);
2021-06-27 15:06:48 +02:00
}
2021-05-03 21:41:51 +02:00
@Override
2021-08-29 23:18:03 +02:00
public BufferAllocator getAllocator() {
2021-05-03 21:41:51 +02:00
return allocator;
}
@Override
public MeterRegistry getMeterRegistry() {
return meterRegistry;
}
2020-12-07 22:15:18 +01:00
@Override
2021-01-30 01:42:37 +01:00
public Mono<LLSnapshot> takeSnapshot() {
return Mono
.fromCallable(() -> {
var snapshot = db.getSnapshot();
long currentSnapshotSequenceNumber = nextSnapshotNumbers.getAndIncrement();
this.snapshotsHandles.put(currentSnapshotSequenceNumber, snapshot);
return new LLSnapshot(currentSnapshotSequenceNumber);
})
.subscribeOn(dbScheduler);
2020-12-07 22:15:18 +01:00
}
@Override
2021-01-30 01:42:37 +01:00
public Mono<Void> releaseSnapshot(LLSnapshot snapshot) {
return Mono
.<Void>fromCallable(() -> {
2021-01-30 01:42:37 +01:00
Snapshot dbSnapshot = this.snapshotsHandles.remove(snapshot.getSequenceNumber());
if (dbSnapshot == null) {
throw new IOException("Snapshot " + snapshot.getSequenceNumber() + " not found!");
}
db.releaseSnapshot(dbSnapshot);
return null;
})
.subscribeOn(dbScheduler);
2020-12-07 22:15:18 +01:00
}
@Override
2021-01-31 19:52:47 +01:00
public Mono<Void> close() {
return Mono
.<Void>fromCallable(() -> {
try {
flushAndCloseDb(db, new ArrayList<>(handles.values()));
deleteUnusedOldLogFiles();
} catch (RocksDBException e) {
throw new IOException(e);
}
return null;
})
.onErrorMap(cause -> new IOException("Failed to close", cause))
.subscribeOn(dbScheduler);
2020-12-07 22:15:18 +01:00
}
/**
* Call this method ONLY AFTER flushing completely a db and closing it!
*/
2021-01-30 00:24:55 +01:00
@SuppressWarnings("unused")
2020-12-07 22:15:18 +01:00
private void deleteUnusedOldLogFiles() {
Path basePath = dbPath;
try {
Files
.walk(basePath, 1)
.filter(p -> !p.equals(basePath))
.filter(p -> {
var fileName = p.getFileName().toString();
if (fileName.startsWith("LOG.old.")) {
var parts = fileName.split("\\.");
if (parts.length == 3) {
try {
long nameSuffix = Long.parseUnsignedLong(parts[2]);
return true;
} catch (NumberFormatException ex) {
return false;
}
}
}
if (fileName.endsWith(".log")) {
var parts = fileName.split("\\.");
if (parts.length == 2) {
try {
int name = Integer.parseUnsignedInt(parts[0]);
return true;
} catch (NumberFormatException ex) {
return false;
}
}
}
return false;
})
.filter(p -> {
try {
BasicFileAttributes attrs = Files.readAttributes(p, BasicFileAttributes.class);
if (attrs.isRegularFile() && !attrs.isSymbolicLink() && !attrs.isDirectory()) {
long ctime = attrs.creationTime().toMillis();
long atime = attrs.lastAccessTime().toMillis();
long mtime = attrs.lastModifiedTime().toMillis();
long lastTime = Math.max(Math.max(ctime, atime), mtime);
long safeTime;
if (p.getFileName().toString().startsWith("LOG.old.")) {
safeTime = System.currentTimeMillis() - Duration.ofHours(24).toMillis();
} else {
safeTime = System.currentTimeMillis() - Duration.ofHours(12).toMillis();
}
if (lastTime < safeTime) {
return true;
}
}
} catch (IOException ex) {
2021-03-19 20:55:38 +01:00
logger.error("Error when deleting unused log files", ex);
2020-12-07 22:15:18 +01:00
return false;
}
return false;
})
.forEach(path -> {
try {
Files.deleteIfExists(path);
System.out.println("Deleted log file \"" + path + "\"");
} catch (IOException e) {
2021-09-10 12:13:52 +02:00
logger.error(MARKER_ROCKSDB, "Failed to delete log file \"" + path + "\"", e);
2020-12-07 22:15:18 +01:00
}
});
} catch (IOException ex) {
2021-09-10 12:13:52 +02:00
logger.error(MARKER_ROCKSDB, "Failed to delete unused log files", ex);
2020-12-07 22:15:18 +01:00
}
}
}