Flush API, accessibility lock, better manual compaction

This commit is contained in:
Andrea Cavalli 2022-04-28 23:23:26 +02:00
parent e7718a8370
commit 9d16ccdd9e
9 changed files with 129 additions and 45 deletions

View File

@ -66,6 +66,8 @@ public interface LLKeyValueDatabase extends LLSnapshottable, LLKeyValueDatabaseS
Mono<Void> compact();
Mono<Void> flush();
BufferAllocator getAllocator();
MeterRegistry getMeterRegistry();

View File

@ -30,6 +30,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
@ -71,6 +72,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
private final ColumnFamilyHandle cfh;
protected final MeterRegistry meterRegistry;
protected final Lock accessibilityLock;
protected final String columnName;
protected final DistributionSummary keyBufferSize;
@ -103,7 +105,8 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
BufferAllocator alloc,
String databaseName,
ColumnFamilyHandle cfh,
MeterRegistry meterRegistry) {
MeterRegistry meterRegistry,
Lock accessibilityLock) {
this.db = db;
this.nettyDirect = nettyDirect && alloc.getAllocationType() == OFF_HEAP;
this.alloc = alloc;
@ -116,6 +119,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
}
this.columnName = columnName;
this.meterRegistry = meterRegistry;
this.accessibilityLock = accessibilityLock;
this.keyBufferSize = DistributionSummary
.builder("buffer.size.distribution")
@ -880,12 +884,14 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
try {
keyBufferSize.record(key.readableBytes());
startedUpdate.increment();
accessibilityLock.lock();
return updateAtomicImpl(readOptions, writeOptions, key, updater, returnMode);
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw new IOException(e);
} finally {
accessibilityLock.unlock();
endedUpdate.increment();
}
}

View File

@ -11,6 +11,8 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.rocksdb.AbstractComparator;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
@ -74,7 +76,8 @@ public class HugePqEnv implements Closeable {
BufferAllocator.offHeapPooled(),
db.getName(),
cfh,
new CompositeMeterRegistry()
new CompositeMeterRegistry(),
new ReentrantReadWriteLock().readLock()
);
}
}

View File

@ -1,5 +1,6 @@
package it.cavallium.dbengine.database.disk;
import static com.google.common.collect.Lists.partition;
import static io.netty5.buffer.api.StandardAllocationTypes.OFF_HEAP;
import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
import static org.rocksdb.ColumnFamilyOptionsInterface.DEFAULT_COMPACTION_MEMTABLE_MEMORY_BUDGET;
@ -43,9 +44,11 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.logging.log4j.LogManager;
@ -60,6 +63,7 @@ import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactRangeOptions;
import org.rocksdb.CompactRangeOptions.BottommostLevelCompaction;
import org.rocksdb.CompactionJobInfo;
import org.rocksdb.CompactionOptions;
import org.rocksdb.CompactionPriority;
import org.rocksdb.CompressionOptions;
@ -73,11 +77,13 @@ import org.rocksdb.IndexType;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.IngestExternalFileOptions;
import org.rocksdb.LRUCache;
import org.rocksdb.LevelMetaData;
import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.PersistentCache;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.Snapshot;
import org.rocksdb.SstFileMetaData;
import org.rocksdb.TransactionDB;
import org.rocksdb.TransactionDBOptions;
import org.rocksdb.TxnDBWritePolicy;
@ -208,8 +214,6 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
if (isDisableAutoCompactions()) {
columnFamilyOptions.setDisableAutoCompactions(true);
columnFamilyOptions.setHardPendingCompactionBytesLimit(Long.MAX_VALUE);
columnFamilyOptions.setSoftPendingCompactionBytesLimit(Long.MAX_VALUE);
}
boolean dynamicLevelBytes;
// This option is not supported with multiple db paths
@ -236,9 +240,11 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
// Higher values speed up writes, but slow down reads
columnFamilyOptions.setLevel0FileNumCompactionTrigger(2);
}
if (Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.disableslowdown", "false"))) {
if (isDisableSlowdown()) {
columnFamilyOptions.setLevel0SlowdownWritesTrigger(-1);
columnFamilyOptions.setLevel0StopWritesTrigger(Integer.MAX_VALUE);
columnFamilyOptions.setHardPendingCompactionBytesLimit(Long.MAX_VALUE);
columnFamilyOptions.setSoftPendingCompactionBytesLimit(Long.MAX_VALUE);
} else {
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
columnFamilyOptions.setLevel0SlowdownWritesTrigger(20);
@ -500,10 +506,15 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
registerGauge(meterRegistry, name, "rocksdb.bloom.filter.full.true.positive", false);
}
private boolean isDisableAutoCompactions() {
public static boolean isDisableAutoCompactions() {
return Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.compactions.auto.disable", "false"));
}
public static boolean isDisableSlowdown() {
return isDisableAutoCompactions()
|| Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.disableslowdown", "false"));
}
private synchronized PersistentCache resolvePersistentCache(HashMap<String, PersistentCache> caches,
DBOptions rocksdbOptions,
List<it.cavallium.dbengine.rpc.current.data.PersistentCache> persistentCaches,
@ -551,38 +562,56 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
public void forceCompaction(int volumeId) throws RocksDBException {
try (var co = new CompactionOptions()
.setCompression(CompressionType.LZ4HC_COMPRESSION)
.setMaxSubcompactions(Runtime.getRuntime().availableProcessors())
.setCompression(CompressionType.LZ4_COMPRESSION)
.setMaxSubcompactions(0)
.setOutputFileSizeLimit(2 * SizeUnit.GB)) {
for (ColumnFamilyHandle cfh : this.handles.values()) {
List<String> files = new ArrayList<>();
var meta = db.getLiveFilesMetaData();
var meta = db.getColumnFamilyMetaData(cfh);
int bottommostLevel = -1;
for (var level : meta) {
for (LevelMetaData level : meta.levels()) {
bottommostLevel = Math.max(bottommostLevel, level.level());
}
int count = 0;
for (var file : meta) {
if (file.fileName().endsWith(".sst") && Arrays.equals(cfh.getName(), file.columnFamilyName())) {
files.add(file.fileName());
count++;
if (count >= 2) {
break;
for (LevelMetaData level : meta.levels()) {
if (level.level() < bottommostLevel) {
for (SstFileMetaData file : level.files()) {
if (file.fileName().endsWith(".sst")) {
files.add(file.fileName());
}
}
}
}
bottommostLevel = Math.max(bottommostLevel, databaseOptions.defaultColumnOptions().levels().size() - 1);
/*if (!files.isEmpty() && bottommostLevel != -1) {
db.compactFiles(co, cfh, files, bottommostLevel, volumeId, null);
}*/
try (var co2 = new CompactRangeOptions()
.setAllowWriteStall(true)
.setChangeLevel(true)
.setTargetLevel(bottommostLevel)
.setMaxSubcompactions(Runtime.getRuntime().availableProcessors())
.setExclusiveManualCompaction(true)
.setBottommostLevelCompaction(BottommostLevelCompaction.kSkip)) {
db.compactRange(cfh, null, null, co2);
if (!files.isEmpty() && bottommostLevel != -1) {
var partitionSize = files.size() / Runtime.getRuntime().availableProcessors();
List<List<String>> partitions;
if (partitionSize > 0) {
partitions = partition(files, files.size() / Runtime.getRuntime().availableProcessors());
} else {
partitions = List.of(files);
}
int finalBottommostLevel = bottommostLevel;
Mono.when(partitions.stream().map(partition -> Mono.<Void>fromCallable(() -> {
logger.info("Compacting {} files in database {} in column family {} to level {}",
partition.size(),
name,
new String(cfh.getName(), StandardCharsets.UTF_8),
finalBottommostLevel
);
if (!partition.isEmpty()) {
var coi = new CompactionJobInfo();
db.compactFiles(co, cfh, partition, finalBottommostLevel, volumeId, coi);
logger.info("Compacted {} files in database {} in column family {} to level {}: {}",
partition.size(),
name,
new String(cfh.getName(), StandardCharsets.UTF_8),
finalBottommostLevel,
coi.status().getCodeString()
);
}
return null;
}).subscribeOn(Schedulers.boundedElastic())).toList()).block();
}
}
}
@ -631,6 +660,10 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
return name;
}
public Lock getAccessibilityLock() {
return shutdownLock.readLock();
}
private void flushAndCloseDb(RocksDB db, Cache standardCache, Cache compressedCache, List<ColumnFamilyHandle> handles)
throws RocksDBException {
var shutdownWriteLock = shutdownLock.writeLock();
@ -792,6 +825,13 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
writeBufferManagerSize = 0;
}
if (isDisableAutoCompactions()) {
options.setMaxBackgroundCompactions(0);
options.setMaxBackgroundJobs(0);
} else {
options.setMaxBackgroundJobs(Integer.parseInt(System.getProperty("it.cavallium.dbengine.jobs.background.num", "2")));
}
Cache blockCache;
Cache compressedCache;
if (databaseOptions.lowMemory()) {
@ -997,12 +1037,27 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
private RocksDBColumn getRocksDBColumn(RocksDB db, ColumnFamilyHandle cfh) {
var nettyDirect = databaseOptions.allowNettyDirect();
var accessibilityLock = getAccessibilityLock();
if (db instanceof OptimisticTransactionDB optimisticTransactionDB) {
return new OptimisticRocksDBColumn(optimisticTransactionDB, nettyDirect, allocator, name, cfh, meterRegistry);
return new OptimisticRocksDBColumn(optimisticTransactionDB,
nettyDirect,
allocator,
name,
cfh,
meterRegistry,
accessibilityLock
);
} else if (db instanceof TransactionDB transactionDB) {
return new PessimisticRocksDBColumn(transactionDB, nettyDirect, allocator, name, cfh, meterRegistry);
return new PessimisticRocksDBColumn(transactionDB,
nettyDirect,
allocator,
name,
cfh,
meterRegistry,
accessibilityLock
);
} else {
return new StandardRocksDBColumn(db, nettyDirect, allocator, name, cfh, meterRegistry);
return new StandardRocksDBColumn(db, nettyDirect, allocator, name, cfh, meterRegistry, accessibilityLock);
}
}
@ -1146,6 +1201,16 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
}).subscribeOn(dbWScheduler);
}
@Override
public Mono<Void> flush() {
return Mono.<Void>fromCallable(() -> {
try (var fo = new FlushOptions().setWaitForFlush(true)) {
this.flush(fo);
return null;
}
}).subscribeOn(dbWScheduler);
}
@Override
public BufferAllocator getAllocator() {
return allocator;

View File

@ -10,11 +10,10 @@ import io.netty5.buffer.api.MemoryManager;
import io.netty5.buffer.api.Send;
import it.cavallium.dbengine.database.LLDelta;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.lucene.ExponentialPageLimits;
import it.cavallium.dbengine.rpc.current.data.DatabaseOptions;
import java.io.IOException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@ -39,8 +38,9 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn<Optimis
BufferAllocator alloc,
String databaseName,
ColumnFamilyHandle cfh,
MeterRegistry meterRegistry) {
super(db, nettyDirect, alloc, databaseName, cfh, meterRegistry);
MeterRegistry meterRegistry,
Lock accessibilityLock) {
super(db, nettyDirect, alloc, databaseName, cfh, meterRegistry, accessibilityLock);
this.optimisticAttempts = DistributionSummary
.builder("db.optimistic.attempts.distribution")
.publishPercentiles(0.2, 0.5, 0.95)

View File

@ -9,9 +9,8 @@ import io.netty5.buffer.api.MemoryManager;
import io.netty5.buffer.api.Send;
import it.cavallium.dbengine.database.LLDelta;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.rpc.current.data.DatabaseOptions;
import java.io.IOException;
import java.util.concurrent.locks.Lock;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle;
@ -31,8 +30,10 @@ public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn<Transa
boolean nettyDirect,
BufferAllocator alloc,
String dbName,
ColumnFamilyHandle cfh, MeterRegistry meterRegistry) {
super(db, nettyDirect, alloc, dbName, cfh, meterRegistry);
ColumnFamilyHandle cfh,
MeterRegistry meterRegistry,
Lock accessibilityLock) {
super(db, nettyDirect, alloc, dbName, cfh, meterRegistry, accessibilityLock);
}
@Override

View File

@ -5,18 +5,15 @@ import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
import io.micrometer.core.instrument.MeterRegistry;
import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.BufferAllocator;
import io.netty5.buffer.api.Send;
import it.cavallium.dbengine.database.LLDelta;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.rpc.current.data.DatabaseOptions;
import java.io.IOException;
import java.util.concurrent.locks.Lock;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.Transaction;
import org.rocksdb.WriteOptions;
@ -26,8 +23,8 @@ public final class StandardRocksDBColumn extends AbstractRocksDBColumn<RocksDB>
boolean nettyDirect,
BufferAllocator alloc,
String dbName,
ColumnFamilyHandle cfh, MeterRegistry meterRegistry) {
super(db, nettyDirect, alloc, dbName, cfh, meterRegistry);
ColumnFamilyHandle cfh, MeterRegistry meterRegistry, Lock accessibilityLock) {
super(db, nettyDirect, alloc, dbName, cfh, meterRegistry, accessibilityLock);
}
@Override

View File

@ -120,6 +120,11 @@ public class LLMemoryKeyValueDatabase implements LLKeyValueDatabase {
return Mono.empty();
}
@Override
public Mono<Void> flush() {
return Mono.empty();
}
@Override
public BufferAllocator getAllocator() {
return allocator;

View File

@ -351,6 +351,11 @@ public class LLQuicConnection implements LLDatabaseConnection {
return null;
}
@Override
public Mono<Void> flush() {
return null;
}
@Override
public BufferAllocator getAllocator() {
return allocator;