diff --git a/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java index cf2f055..5c82088 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java @@ -66,6 +66,8 @@ public interface LLKeyValueDatabase extends LLSnapshottable, LLKeyValueDatabaseS Mono compact(); + Mono flush(); + BufferAllocator getAllocator(); MeterRegistry getMeterRegistry(); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java index 1b9b295..5c006aa 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; @@ -71,6 +72,7 @@ public sealed abstract class AbstractRocksDBColumn implements private final ColumnFamilyHandle cfh; protected final MeterRegistry meterRegistry; + protected final Lock accessibilityLock; protected final String columnName; protected final DistributionSummary keyBufferSize; @@ -103,7 +105,8 @@ public sealed abstract class AbstractRocksDBColumn implements BufferAllocator alloc, String databaseName, ColumnFamilyHandle cfh, - MeterRegistry meterRegistry) { + MeterRegistry meterRegistry, + Lock accessibilityLock) { this.db = db; this.nettyDirect = nettyDirect && alloc.getAllocationType() == OFF_HEAP; this.alloc = alloc; @@ -116,6 +119,7 @@ public sealed abstract class AbstractRocksDBColumn implements } this.columnName = columnName; this.meterRegistry = meterRegistry; + this.accessibilityLock = accessibilityLock; this.keyBufferSize = DistributionSummary .builder("buffer.size.distribution") @@ -880,12 +884,14 @@ public sealed abstract class AbstractRocksDBColumn implements try { keyBufferSize.record(key.readableBytes()); startedUpdate.increment(); + accessibilityLock.lock(); return updateAtomicImpl(readOptions, writeOptions, key, updater, returnMode); } catch (IOException e) { throw e; } catch (Exception e) { throw new IOException(e); } finally { + accessibilityLock.unlock(); endedUpdate.increment(); } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/HugePqEnv.java b/src/main/java/it/cavallium/dbengine/database/disk/HugePqEnv.java index 3f74a0a..8498852 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/HugePqEnv.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/HugePqEnv.java @@ -11,6 +11,8 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.Objects; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.rocksdb.AbstractComparator; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; @@ -74,7 +76,8 @@ public class HugePqEnv implements Closeable { BufferAllocator.offHeapPooled(), db.getName(), cfh, - new CompositeMeterRegistry() + new CompositeMeterRegistry(), + new ReentrantReadWriteLock().readLock() ); } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 091fb9f..9d6687c 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine.database.disk; +import static com.google.common.collect.Lists.partition; import static io.netty5.buffer.api.StandardAllocationTypes.OFF_HEAP; import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB; import static org.rocksdb.ColumnFamilyOptionsInterface.DEFAULT_COMPACTION_MEMTABLE_MEMORY_BUDGET; @@ -43,9 +44,11 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.commons.lang3.time.StopWatch; import org.apache.logging.log4j.LogManager; @@ -60,6 +63,7 @@ import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.CompactRangeOptions; import org.rocksdb.CompactRangeOptions.BottommostLevelCompaction; +import org.rocksdb.CompactionJobInfo; import org.rocksdb.CompactionOptions; import org.rocksdb.CompactionPriority; import org.rocksdb.CompressionOptions; @@ -73,11 +77,13 @@ import org.rocksdb.IndexType; import org.rocksdb.InfoLogLevel; import org.rocksdb.IngestExternalFileOptions; import org.rocksdb.LRUCache; +import org.rocksdb.LevelMetaData; import org.rocksdb.OptimisticTransactionDB; import org.rocksdb.PersistentCache; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.Snapshot; +import org.rocksdb.SstFileMetaData; import org.rocksdb.TransactionDB; import org.rocksdb.TransactionDBOptions; import org.rocksdb.TxnDBWritePolicy; @@ -208,8 +214,6 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { if (isDisableAutoCompactions()) { columnFamilyOptions.setDisableAutoCompactions(true); - columnFamilyOptions.setHardPendingCompactionBytesLimit(Long.MAX_VALUE); - columnFamilyOptions.setSoftPendingCompactionBytesLimit(Long.MAX_VALUE); } boolean dynamicLevelBytes; // This option is not supported with multiple db paths @@ -236,9 +240,11 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { // Higher values speed up writes, but slow down reads columnFamilyOptions.setLevel0FileNumCompactionTrigger(2); } - if (Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.disableslowdown", "false"))) { + if (isDisableSlowdown()) { columnFamilyOptions.setLevel0SlowdownWritesTrigger(-1); columnFamilyOptions.setLevel0StopWritesTrigger(Integer.MAX_VALUE); + columnFamilyOptions.setHardPendingCompactionBytesLimit(Long.MAX_VALUE); + columnFamilyOptions.setSoftPendingCompactionBytesLimit(Long.MAX_VALUE); } else { // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html columnFamilyOptions.setLevel0SlowdownWritesTrigger(20); @@ -500,10 +506,15 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { registerGauge(meterRegistry, name, "rocksdb.bloom.filter.full.true.positive", false); } - private boolean isDisableAutoCompactions() { + public static boolean isDisableAutoCompactions() { return Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.compactions.auto.disable", "false")); } + public static boolean isDisableSlowdown() { + return isDisableAutoCompactions() + || Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.disableslowdown", "false")); + } + private synchronized PersistentCache resolvePersistentCache(HashMap caches, DBOptions rocksdbOptions, List persistentCaches, @@ -551,38 +562,56 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { public void forceCompaction(int volumeId) throws RocksDBException { try (var co = new CompactionOptions() - .setCompression(CompressionType.LZ4HC_COMPRESSION) - .setMaxSubcompactions(Runtime.getRuntime().availableProcessors()) + .setCompression(CompressionType.LZ4_COMPRESSION) + .setMaxSubcompactions(0) .setOutputFileSizeLimit(2 * SizeUnit.GB)) { for (ColumnFamilyHandle cfh : this.handles.values()) { List files = new ArrayList<>(); - var meta = db.getLiveFilesMetaData(); + var meta = db.getColumnFamilyMetaData(cfh); int bottommostLevel = -1; - for (var level : meta) { + for (LevelMetaData level : meta.levels()) { bottommostLevel = Math.max(bottommostLevel, level.level()); } - int count = 0; - for (var file : meta) { - if (file.fileName().endsWith(".sst") && Arrays.equals(cfh.getName(), file.columnFamilyName())) { - files.add(file.fileName()); - count++; - if (count >= 2) { - break; + for (LevelMetaData level : meta.levels()) { + if (level.level() < bottommostLevel) { + for (SstFileMetaData file : level.files()) { + if (file.fileName().endsWith(".sst")) { + files.add(file.fileName()); + } } } } bottommostLevel = Math.max(bottommostLevel, databaseOptions.defaultColumnOptions().levels().size() - 1); - /*if (!files.isEmpty() && bottommostLevel != -1) { - db.compactFiles(co, cfh, files, bottommostLevel, volumeId, null); - }*/ - try (var co2 = new CompactRangeOptions() - .setAllowWriteStall(true) - .setChangeLevel(true) - .setTargetLevel(bottommostLevel) - .setMaxSubcompactions(Runtime.getRuntime().availableProcessors()) - .setExclusiveManualCompaction(true) - .setBottommostLevelCompaction(BottommostLevelCompaction.kSkip)) { - db.compactRange(cfh, null, null, co2); + + if (!files.isEmpty() && bottommostLevel != -1) { + var partitionSize = files.size() / Runtime.getRuntime().availableProcessors(); + List> partitions; + if (partitionSize > 0) { + partitions = partition(files, files.size() / Runtime.getRuntime().availableProcessors()); + } else { + partitions = List.of(files); + } + int finalBottommostLevel = bottommostLevel; + Mono.when(partitions.stream().map(partition -> Mono.fromCallable(() -> { + logger.info("Compacting {} files in database {} in column family {} to level {}", + partition.size(), + name, + new String(cfh.getName(), StandardCharsets.UTF_8), + finalBottommostLevel + ); + if (!partition.isEmpty()) { + var coi = new CompactionJobInfo(); + db.compactFiles(co, cfh, partition, finalBottommostLevel, volumeId, coi); + logger.info("Compacted {} files in database {} in column family {} to level {}: {}", + partition.size(), + name, + new String(cfh.getName(), StandardCharsets.UTF_8), + finalBottommostLevel, + coi.status().getCodeString() + ); + } + return null; + }).subscribeOn(Schedulers.boundedElastic())).toList()).block(); } } } @@ -631,6 +660,10 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { return name; } + public Lock getAccessibilityLock() { + return shutdownLock.readLock(); + } + private void flushAndCloseDb(RocksDB db, Cache standardCache, Cache compressedCache, List handles) throws RocksDBException { var shutdownWriteLock = shutdownLock.writeLock(); @@ -792,6 +825,13 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { writeBufferManagerSize = 0; } + if (isDisableAutoCompactions()) { + options.setMaxBackgroundCompactions(0); + options.setMaxBackgroundJobs(0); + } else { + options.setMaxBackgroundJobs(Integer.parseInt(System.getProperty("it.cavallium.dbengine.jobs.background.num", "2"))); + } + Cache blockCache; Cache compressedCache; if (databaseOptions.lowMemory()) { @@ -997,12 +1037,27 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { private RocksDBColumn getRocksDBColumn(RocksDB db, ColumnFamilyHandle cfh) { var nettyDirect = databaseOptions.allowNettyDirect(); + var accessibilityLock = getAccessibilityLock(); if (db instanceof OptimisticTransactionDB optimisticTransactionDB) { - return new OptimisticRocksDBColumn(optimisticTransactionDB, nettyDirect, allocator, name, cfh, meterRegistry); + return new OptimisticRocksDBColumn(optimisticTransactionDB, + nettyDirect, + allocator, + name, + cfh, + meterRegistry, + accessibilityLock + ); } else if (db instanceof TransactionDB transactionDB) { - return new PessimisticRocksDBColumn(transactionDB, nettyDirect, allocator, name, cfh, meterRegistry); + return new PessimisticRocksDBColumn(transactionDB, + nettyDirect, + allocator, + name, + cfh, + meterRegistry, + accessibilityLock + ); } else { - return new StandardRocksDBColumn(db, nettyDirect, allocator, name, cfh, meterRegistry); + return new StandardRocksDBColumn(db, nettyDirect, allocator, name, cfh, meterRegistry, accessibilityLock); } } @@ -1146,6 +1201,16 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { }).subscribeOn(dbWScheduler); } + @Override + public Mono flush() { + return Mono.fromCallable(() -> { + try (var fo = new FlushOptions().setWaitForFlush(true)) { + this.flush(fo); + return null; + } + }).subscribeOn(dbWScheduler); + } + @Override public BufferAllocator getAllocator() { return allocator; diff --git a/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java index ad1eda6..a3679f9 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java @@ -10,11 +10,10 @@ import io.netty5.buffer.api.MemoryManager; import io.netty5.buffer.api.Send; import it.cavallium.dbengine.database.LLDelta; import it.cavallium.dbengine.database.LLUtils; -import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.lucene.ExponentialPageLimits; -import it.cavallium.dbengine.rpc.current.data.DatabaseOptions; import java.io.IOException; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.LockSupport; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -39,8 +38,9 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn boolean nettyDirect, BufferAllocator alloc, String dbName, - ColumnFamilyHandle cfh, MeterRegistry meterRegistry) { - super(db, nettyDirect, alloc, dbName, cfh, meterRegistry); + ColumnFamilyHandle cfh, MeterRegistry meterRegistry, Lock accessibilityLock) { + super(db, nettyDirect, alloc, dbName, cfh, meterRegistry, accessibilityLock); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java index bf790cf..e0482bf 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java @@ -120,6 +120,11 @@ public class LLMemoryKeyValueDatabase implements LLKeyValueDatabase { return Mono.empty(); } + @Override + public Mono flush() { + return Mono.empty(); + } + @Override public BufferAllocator getAllocator() { return allocator; diff --git a/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java b/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java index a1bdd9a..851f0e0 100644 --- a/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java @@ -351,6 +351,11 @@ public class LLQuicConnection implements LLDatabaseConnection { return null; } + @Override + public Mono flush() { + return null; + } + @Override public BufferAllocator getAllocator() { return allocator;