Secure database shutdown, deduplicate compaction script

This commit is contained in:
Andrea Cavalli 2022-04-30 01:49:44 +02:00
parent 9d16ccdd9e
commit e962ae6336
7 changed files with 596 additions and 559 deletions

View File

@ -23,14 +23,18 @@ import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLLocalDictionary.ReleasableSliceImplWithRelease; import it.cavallium.dbengine.database.disk.LLLocalDictionary.ReleasableSliceImplWithRelease;
import it.cavallium.dbengine.database.disk.LLLocalDictionary.ReleasableSliceImplWithoutRelease; import it.cavallium.dbengine.database.disk.LLLocalDictionary.ReleasableSliceImplWithoutRelease;
import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.rpc.current.data.Column;
import it.cavallium.dbengine.rpc.current.data.NamedColumnOptions;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.StampedLock;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
@ -72,7 +76,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
private final ColumnFamilyHandle cfh; private final ColumnFamilyHandle cfh;
protected final MeterRegistry meterRegistry; protected final MeterRegistry meterRegistry;
protected final Lock accessibilityLock; protected final StampedLock closeLock;
protected final String columnName; protected final String columnName;
protected final DistributionSummary keyBufferSize; protected final DistributionSummary keyBufferSize;
@ -106,7 +110,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
String databaseName, String databaseName,
ColumnFamilyHandle cfh, ColumnFamilyHandle cfh,
MeterRegistry meterRegistry, MeterRegistry meterRegistry,
Lock accessibilityLock) { StampedLock closeLock) {
this.db = db; this.db = db;
this.nettyDirect = nettyDirect && alloc.getAllocationType() == OFF_HEAP; this.nettyDirect = nettyDirect && alloc.getAllocationType() == OFF_HEAP;
this.alloc = alloc; this.alloc = alloc;
@ -119,7 +123,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
} }
this.columnName = columnName; this.columnName = columnName;
this.meterRegistry = meterRegistry; this.meterRegistry = meterRegistry;
this.accessibilityLock = accessibilityLock; this.closeLock = closeLock;
this.keyBufferSize = DistributionSummary this.keyBufferSize = DistributionSummary
.builder("buffer.size.distribution") .builder("buffer.size.distribution")
@ -327,21 +331,26 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
return cfh; return cfh;
} }
@Override protected void ensureOpen() {
public @Nullable Buffer get(@NotNull ReadOptions readOptions, Buffer key)
throws RocksDBException {
if (Schedulers.isInNonBlockingThread()) { if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called dbGet in a nonblocking thread"); throw new UnsupportedOperationException("Called in a nonblocking thread");
} }
if (!db.isOwningHandle()) { ensureOwned(db);
throw new IllegalStateException("Database is closed"); ensureOwned(cfh);
} }
if (!readOptions.isOwningHandle()) {
throw new IllegalStateException("ReadOptions is closed"); protected void ensureOwned(org.rocksdb.RocksObject rocksObject) {
if (!rocksObject.isOwningHandle()) {
throw new IllegalStateException("Not owning handle");
} }
if (!cfh.isOwningHandle()) {
throw new IllegalStateException("Column family is closed");
} }
@Override
public @Nullable Buffer get(@NotNull ReadOptions readOptions, Buffer key) throws RocksDBException {
var closeReadLock = closeLock.readLock();
try {
ensureOpen();
ensureOwned(readOptions);
keyBufferSize.record(key.readableBytes()); keyBufferSize.record(key.readableBytes());
int readAttemptsCount = 0; int readAttemptsCount = 0;
try { try {
@ -372,7 +381,8 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
assert resultBuffer.writerOffset() == 0; assert resultBuffer.writerOffset() == 0;
var resultWritable = ((WritableComponent) resultBuffer).writableBuffer(); var resultWritable = ((WritableComponent) resultBuffer).writableBuffer();
var keyMayExist = db.keyMayExist(cfh, readOptions, keyNioBuffer.rewind(), resultWritable.clear()); var keyMayExist = db.keyMayExist(cfh, readOptions, keyNioBuffer.rewind(),
resultWritable.clear());
KeyMayExistEnum keyMayExistState = keyMayExist.exists; KeyMayExistEnum keyMayExistState = keyMayExist.exists;
int keyMayExistValueLength = keyMayExist.valueLength; int keyMayExistValueLength = keyMayExist.valueLength;
// At the beginning, size reflects the expected size, then it becomes the real data size // At the beginning, size reflects the expected size, then it becomes the real data size
@ -492,23 +502,18 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
} finally { } finally {
readAttempts.record(readAttemptsCount); readAttempts.record(readAttemptsCount);
} }
} finally {
closeLock.unlockRead(closeReadLock);
}
} }
@Override @Override
public void put(@NotNull WriteOptions writeOptions, Buffer key, Buffer value) throws RocksDBException { public void put(@NotNull WriteOptions writeOptions, Buffer key, Buffer value) throws RocksDBException {
var closeReadLock = closeLock.readLock();
try { try {
if (Schedulers.isInNonBlockingThread()) { try {
throw new UnsupportedOperationException("Called dbPut in a nonblocking thread"); ensureOpen();
} ensureOwned(writeOptions);
if (!db.isOwningHandle()) {
throw new IllegalStateException("Database is closed");
}
if (!writeOptions.isOwningHandle()) {
throw new IllegalStateException("WriteOptions is closed");
}
if (!cfh.isOwningHandle()) {
throw new IllegalStateException("Column family is closed");
}
assert key.isAccessible(); assert key.isAccessible();
assert value.isAccessible(); assert value.isAccessible();
this.keyBufferSize.record(key.readableBytes()); this.keyBufferSize.record(key.readableBytes());
@ -570,22 +575,17 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
writeOptions.close(); writeOptions.close();
} }
} }
} finally {
closeLock.unlockRead(closeReadLock);
}
} }
@Override @Override
public boolean exists(@NotNull ReadOptions readOptions, Buffer key) throws RocksDBException { public boolean exists(@NotNull ReadOptions readOptions, Buffer key) throws RocksDBException {
if (Schedulers.isInNonBlockingThread()) { var closeReadLock = closeLock.readLock();
throw new UnsupportedOperationException("Called containsKey in a nonblocking thread"); try {
} ensureOpen();
if (!db.isOwningHandle()) { ensureOwned(readOptions);
throw new IllegalStateException("Database is closed");
}
if (!readOptions.isOwningHandle()) {
throw new IllegalStateException("ReadOptions is closed");
}
if (!cfh.isOwningHandle()) {
throw new IllegalStateException("Column family is closed");
}
if (nettyDirect) { if (nettyDirect) {
// Get the key nio buffer to pass to RocksDB // Get the key nio buffer to pass to RocksDB
ByteBuffer keyNioBuffer; ByteBuffer keyNioBuffer;
@ -655,22 +655,17 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
} }
return found; return found;
} }
} finally {
closeLock.unlockRead(closeReadLock);
}
} }
@Override @Override
public boolean mayExists(@NotNull ReadOptions readOptions, Buffer key) throws RocksDBException { public boolean mayExists(@NotNull ReadOptions readOptions, Buffer key) throws RocksDBException {
if (Schedulers.isInNonBlockingThread()) { var closeReadLock = closeLock.readLock();
throw new UnsupportedOperationException("Called containsKey in a nonblocking thread"); try {
} ensureOpen();
if (!db.isOwningHandle()) { ensureOwned(readOptions);
throw new IllegalStateException("Database is closed");
}
if (!readOptions.isOwningHandle()) {
throw new IllegalStateException("ReadOptions is closed");
}
if (!cfh.isOwningHandle()) {
throw new IllegalStateException("Column family is closed");
}
if (nettyDirect) { if (nettyDirect) {
// Get the key nio buffer to pass to RocksDB // Get the key nio buffer to pass to RocksDB
ByteBuffer keyNioBuffer; ByteBuffer keyNioBuffer;
@ -706,19 +701,17 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
} }
} }
} }
} finally {
closeLock.unlockRead(closeReadLock);
}
} }
@Override @Override
public void delete(WriteOptions writeOptions, Buffer key) throws RocksDBException { public void delete(WriteOptions writeOptions, Buffer key) throws RocksDBException {
if (!db.isOwningHandle()) { var closeReadLock = closeLock.readLock();
throw new IllegalStateException("Database is closed"); try {
} ensureOpen();
if (!writeOptions.isOwningHandle()) { ensureOwned(writeOptions);
throw new IllegalStateException("WriteOptions is closed");
}
if (!cfh.isOwningHandle()) {
throw new IllegalStateException("Column family is closed");
}
keyBufferSize.record(key.readableBytes()); keyBufferSize.record(key.readableBytes());
if (nettyDirect) { if (nettyDirect) {
// Get the key nio buffer to pass to RocksDB // Get the key nio buffer to pass to RocksDB
@ -748,115 +741,108 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
} else { } else {
db.delete(cfh, writeOptions, LLUtils.toArray(key)); db.delete(cfh, writeOptions, LLUtils.toArray(key));
} }
} finally {
closeLock.unlockRead(closeReadLock);
}
} }
@Override @Override
public void delete(WriteOptions writeOptions, byte[] key) throws RocksDBException { public void delete(WriteOptions writeOptions, byte[] key) throws RocksDBException {
if (!db.isOwningHandle()) { var closeReadLock = closeLock.readLock();
throw new IllegalStateException("Database is closed"); try {
} ensureOpen();
if (!writeOptions.isOwningHandle()) { ensureOwned(writeOptions);
throw new IllegalStateException("WriteOptions is closed");
}
if (!cfh.isOwningHandle()) {
throw new IllegalStateException("Column family is closed");
}
keyBufferSize.record(key.length); keyBufferSize.record(key.length);
db.delete(cfh, writeOptions, key); db.delete(cfh, writeOptions, key);
} finally {
closeLock.unlockRead(closeReadLock);
}
} }
@Override @Override
public List<byte[]> multiGetAsList(ReadOptions readOptions, List<byte[]> keys) throws RocksDBException { public List<byte[]> multiGetAsList(ReadOptions readOptions, List<byte[]> keys) throws RocksDBException {
if (!db.isOwningHandle()) { var closeReadLock = closeLock.readLock();
throw new IllegalStateException("Database is closed"); try {
} ensureOpen();
if (!readOptions.isOwningHandle()) { ensureOwned(readOptions);
throw new IllegalStateException("ReadOptions is closed");
}
if (!cfh.isOwningHandle()) {
throw new IllegalStateException("Column family is closed");
}
for (byte[] key : keys) { for (byte[] key : keys) {
keyBufferSize.record(key.length); keyBufferSize.record(key.length);
} }
var columnFamilyHandles = new RepeatedElementList<>(cfh, keys.size()); var columnFamilyHandles = new RepeatedElementList<>(cfh, keys.size());
return db.multiGetAsList(readOptions, columnFamilyHandles, keys); return db.multiGetAsList(readOptions, columnFamilyHandles, keys);
} finally {
closeLock.unlockRead(closeReadLock);
}
} }
@Override @Override
public void suggestCompactRange() throws RocksDBException { public void suggestCompactRange() throws RocksDBException {
if (!db.isOwningHandle()) { var closeReadLock = closeLock.readLock();
throw new IllegalStateException("Database is closed"); try {
} ensureOpen();
if (!cfh.isOwningHandle()) {
throw new IllegalStateException("Column family is closed");
}
db.suggestCompactRange(cfh); db.suggestCompactRange(cfh);
} finally {
closeLock.unlockRead(closeReadLock);
}
} }
@Override @Override
public void compactRange(byte[] begin, byte[] end, CompactRangeOptions options) public void compactRange(byte[] begin, byte[] end, CompactRangeOptions options) throws RocksDBException {
throws RocksDBException { var closeReadLock = closeLock.readLock();
if (!db.isOwningHandle()) { try {
throw new IllegalStateException("Database is closed"); ensureOpen();
} ensureOwned(options);
if (!options.isOwningHandle()) {
throw new IllegalStateException("CompactRangeOptions is closed");
}
if (!cfh.isOwningHandle()) {
throw new IllegalStateException("Column family is closed");
}
db.compactRange(cfh, begin, end, options); db.compactRange(cfh, begin, end, options);
} finally {
closeLock.unlockRead(closeReadLock);
}
} }
@Override @Override
public void flush(FlushOptions options) throws RocksDBException { public void flush(FlushOptions options) throws RocksDBException {
if (!db.isOwningHandle()) { var closeReadLock = closeLock.readLock();
throw new IllegalStateException("Database is closed"); try {
} ensureOpen();
if (!options.isOwningHandle()) { ensureOwned(options);
throw new IllegalStateException("FlushOptions is closed");
}
if (!cfh.isOwningHandle()) {
throw new IllegalStateException("Column family is closed");
}
db.flush(options, cfh); db.flush(options, cfh);
} finally {
closeLock.unlockRead(closeReadLock);
}
} }
@Override @Override
public void flushWal(boolean sync) throws RocksDBException { public void flushWal(boolean sync) throws RocksDBException {
if (!db.isOwningHandle()) { var closeReadLock = closeLock.readLock();
throw new IllegalStateException("Database is closed"); try {
} ensureOpen();
if (!cfh.isOwningHandle()) {
throw new IllegalStateException("Column family is closed");
}
db.flushWal(sync); db.flushWal(sync);
} finally {
closeLock.unlockRead(closeReadLock);
}
} }
@Override @Override
public long getLongProperty(String property) throws RocksDBException { public long getLongProperty(String property) throws RocksDBException {
if (!db.isOwningHandle()) { var closeReadLock = closeLock.readLock();
throw new IllegalStateException("Database is closed"); try {
} ensureOpen();
if (!cfh.isOwningHandle()) {
throw new IllegalStateException("Column family is closed");
}
return db.getLongProperty(cfh, property); return db.getLongProperty(cfh, property);
} finally {
closeLock.unlockRead(closeReadLock);
}
} }
@Override @Override
public void write(WriteOptions writeOptions, WriteBatch writeBatch) throws RocksDBException { public void write(WriteOptions writeOptions, WriteBatch writeBatch) throws RocksDBException {
if (!db.isOwningHandle()) { var closeReadLock = closeLock.readLock();
throw new IllegalStateException("Database is closed"); try {
} ensureOpen();
if (!writeOptions.isOwningHandle()) { ensureOwned(writeOptions);
throw new IllegalStateException("WriteOptions is closed"); ensureOwned(writeBatch);
}
if (!cfh.isOwningHandle()) {
throw new IllegalStateException("Column family is closed");
}
db.write(writeOptions, writeBatch); db.write(writeOptions, writeBatch);
} finally {
closeLock.unlockRead(closeReadLock);
}
} }
/** /**
@ -872,28 +858,24 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
Buffer key, Buffer key,
BinarySerializationFunction updater, BinarySerializationFunction updater,
UpdateAtomicResultMode returnMode) throws IOException { UpdateAtomicResultMode returnMode) throws IOException {
if (!db.isOwningHandle()) { var closeReadLock = closeLock.readLock();
throw new IllegalStateException("Database is closed"); try {
} ensureOpen();
if (!readOptions.isOwningHandle()) { ensureOwned(readOptions);
throw new IllegalStateException("ReadOptions is closed");
}
if (!cfh.isOwningHandle()) {
throw new IllegalStateException("Column family is closed");
}
try { try {
keyBufferSize.record(key.readableBytes()); keyBufferSize.record(key.readableBytes());
startedUpdate.increment(); startedUpdate.increment();
accessibilityLock.lock();
return updateAtomicImpl(readOptions, writeOptions, key, updater, returnMode); return updateAtomicImpl(readOptions, writeOptions, key, updater, returnMode);
} catch (IOException e) { } catch (IOException e) {
throw e; throw e;
} catch (Exception e) { } catch (Exception e) {
throw new IOException(e); throw new IOException(e);
} finally { } finally {
accessibilityLock.unlock();
endedUpdate.increment(); endedUpdate.increment();
} }
} finally {
closeLock.unlockRead(closeReadLock);
}
} }
protected final void recordAtomicUpdateTime(boolean changed, boolean prevSet, boolean newSet, long initTime) { protected final void recordAtomicUpdateTime(boolean changed, boolean prevSet, boolean newSet, long initTime) {
@ -923,16 +905,12 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
@Override @Override
@NotNull @NotNull
public RocksDBIterator newIterator(@NotNull ReadOptions readOptions) { public RocksDBIterator newIterator(@NotNull ReadOptions readOptions) {
if (!db.isOwningHandle()) { var closeReadLock = closeLock.readLock();
throw new IllegalStateException("Database is closed"); try {
} ensureOpen();
if (!readOptions.isOwningHandle()) { ensureOwned(readOptions);
throw new IllegalStateException("ReadOptions is closed"); var it = db.newIterator(cfh, readOptions);
} return new RocksDBIterator(it,
if (!cfh.isOwningHandle()) {
throw new IllegalStateException("Column family is closed");
}
return new RocksDBIterator(db.newIterator(cfh, readOptions),
nettyDirect, nettyDirect,
this.startedIterSeek, this.startedIterSeek,
this.endedIterSeek, this.endedIterSeek,
@ -941,6 +919,9 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
this.endedIterNext, this.endedIterNext,
this.iterNextTime this.iterNextTime
); );
} finally {
closeLock.unlockRead(closeReadLock);
}
} }
protected final Buffer applyUpdateAndCloseIfNecessary(BinarySerializationFunction updater, protected final Buffer applyUpdateAndCloseIfNecessary(BinarySerializationFunction updater,
@ -957,31 +938,18 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
return newData; return newData;
} }
protected int getLastLevel() {
return RocksDBUtils.getLastLevel(db, cfh);
}
@Override @Override
public final void forceCompaction(int volumeId) throws RocksDBException { public final void forceCompaction(int volumeId) {
List<String> files = new ArrayList<>(); var closeReadLock = closeLock.readLock();
var meta = db.getColumnFamilyMetaData(cfh); try {
int bottommostLevel = -1; ensureOpen();
for (LevelMetaData level : meta.levels()) { RocksDBUtils.forceCompaction(db, db.getName(), cfh, volumeId, logger);
bottommostLevel = Math.max(bottommostLevel, level.level()); } finally {
} closeLock.unlockRead(closeReadLock);
int count = 0;
x: for (LevelMetaData level : meta.levels()) {
for (SstFileMetaData file : level.files()) {
if (file.fileName().endsWith(".sst")) {
files.add(file.fileName());
count++;
if (count >= 4) {
break x;
}
}
}
}
try (var co = new CompactionOptions()) {
if (!files.isEmpty() && bottommostLevel != -1) {
db.compactFiles(co, cfh, files, bottommostLevel, volumeId, null);
}
db.compactRange(cfh);
} }
} }

View File

@ -40,6 +40,7 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -47,8 +48,8 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.StampedLock;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.commons.lang3.time.StopWatch; import org.apache.commons.lang3.time.StopWatch;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
@ -62,7 +63,6 @@ import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactRangeOptions; import org.rocksdb.CompactRangeOptions;
import org.rocksdb.CompactRangeOptions.BottommostLevelCompaction;
import org.rocksdb.CompactionJobInfo; import org.rocksdb.CompactionJobInfo;
import org.rocksdb.CompactionOptions; import org.rocksdb.CompactionOptions;
import org.rocksdb.CompactionPriority; import org.rocksdb.CompactionPriority;
@ -132,7 +132,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
private final HashMap<String, PersistentCache> persistentCaches; private final HashMap<String, PersistentCache> persistentCaches;
private final ConcurrentHashMap<Long, Snapshot> snapshotsHandles = new ConcurrentHashMap<>(); private final ConcurrentHashMap<Long, Snapshot> snapshotsHandles = new ConcurrentHashMap<>();
private final AtomicLong nextSnapshotNumbers = new AtomicLong(1); private final AtomicLong nextSnapshotNumbers = new AtomicLong(1);
private final ReadWriteLock shutdownLock = new ReentrantReadWriteLock(); private final StampedLock closeLock = new StampedLock();
private volatile boolean closed = false; private volatile boolean closed = false;
@SuppressWarnings("SwitchStatementWithTooFewBranches") @SuppressWarnings("SwitchStatementWithTooFewBranches")
@ -253,6 +253,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
} }
if (!columnOptions.levels().isEmpty()) { if (!columnOptions.levels().isEmpty()) {
columnFamilyOptions.setNumLevels(columnOptions.levels().size());
var firstLevelOptions = getRocksLevelOptions(columnOptions.levels().get(0)); var firstLevelOptions = getRocksLevelOptions(columnOptions.levels().get(0));
columnFamilyOptions.setCompressionType(firstLevelOptions.compressionType); columnFamilyOptions.setCompressionType(firstLevelOptions.compressionType);
columnFamilyOptions.setCompressionOptions(firstLevelOptions.compressionOptions); columnFamilyOptions.setCompressionOptions(firstLevelOptions.compressionOptions);
@ -269,9 +270,9 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
.map(v -> v.compression().getType()) .map(v -> v.compression().getType())
.toList()); .toList());
} else { } else {
columnFamilyOptions.setNumLevels(7); columnFamilyOptions.setNumLevels(6);
List<CompressionType> compressionTypes = new ArrayList<>(7); List<CompressionType> compressionTypes = new ArrayList<>(6);
for (int i = 0; i < 7; i++) { for (int i = 0; i < 6; i++) {
if (i < 2) { if (i < 2) {
compressionTypes.add(CompressionType.NO_COMPRESSION); compressionTypes.add(CompressionType.NO_COMPRESSION);
} else { } else {
@ -560,60 +561,27 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
return paths.size() - 1; return paths.size() - 1;
} }
public void forceCompaction(int volumeId) throws RocksDBException { public int getLastLevel(Column column) {
try (var co = new CompactionOptions() return databaseOptions
.setCompression(CompressionType.LZ4_COMPRESSION) .columnOptions()
.setMaxSubcompactions(0) .stream()
.setOutputFileSizeLimit(2 * SizeUnit.GB)) { .filter(namedColumnOptions -> namedColumnOptions.columnName().equals(column.name()))
for (ColumnFamilyHandle cfh : this.handles.values()) { .findFirst()
List<String> files = new ArrayList<>(); .map(NamedColumnOptions::levels)
var meta = db.getColumnFamilyMetaData(cfh); .filter(levels -> !levels.isEmpty())
int bottommostLevel = -1; .or(() -> Optional.of(databaseOptions.defaultColumnOptions().levels()).filter(levels -> !levels.isEmpty()))
for (LevelMetaData level : meta.levels()) { .map(List::size)
bottommostLevel = Math.max(bottommostLevel, level.level()); .orElse(6);
} }
for (LevelMetaData level : meta.levels()) {
if (level.level() < bottommostLevel) {
for (SstFileMetaData file : level.files()) {
if (file.fileName().endsWith(".sst")) {
files.add(file.fileName());
}
}
}
}
bottommostLevel = Math.max(bottommostLevel, databaseOptions.defaultColumnOptions().levels().size() - 1);
if (!files.isEmpty() && bottommostLevel != -1) { public List<String> getColumnFiles(Column column, boolean excludeLastLevel) {
var partitionSize = files.size() / Runtime.getRuntime().availableProcessors(); var cfh = handles.get(column);
List<List<String>> partitions; return RocksDBUtils.getColumnFiles(db, cfh, excludeLastLevel);
if (partitionSize > 0) {
partitions = partition(files, files.size() / Runtime.getRuntime().availableProcessors());
} else {
partitions = List.of(files);
}
int finalBottommostLevel = bottommostLevel;
Mono.when(partitions.stream().map(partition -> Mono.<Void>fromCallable(() -> {
logger.info("Compacting {} files in database {} in column family {} to level {}",
partition.size(),
name,
new String(cfh.getName(), StandardCharsets.UTF_8),
finalBottommostLevel
);
if (!partition.isEmpty()) {
var coi = new CompactionJobInfo();
db.compactFiles(co, cfh, partition, finalBottommostLevel, volumeId, coi);
logger.info("Compacted {} files in database {} in column family {} to level {}: {}",
partition.size(),
name,
new String(cfh.getName(), StandardCharsets.UTF_8),
finalBottommostLevel,
coi.status().getCodeString()
);
}
return null;
}).subscribeOn(Schedulers.boundedElastic())).toList()).block();
}
} }
public void forceCompaction(int volumeId) throws RocksDBException {
for (var cfh : this.handles.values()) {
RocksDBUtils.forceCompaction(db, name, cfh, volumeId, logger);
} }
} }
@ -660,14 +628,13 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
return name; return name;
} }
public Lock getAccessibilityLock() { public StampedLock getCloseLock() {
return shutdownLock.readLock(); return closeLock;
} }
private void flushAndCloseDb(RocksDB db, Cache standardCache, Cache compressedCache, List<ColumnFamilyHandle> handles) private void flushAndCloseDb(RocksDB db, Cache standardCache, Cache compressedCache, List<ColumnFamilyHandle> handles)
throws RocksDBException { throws RocksDBException {
var shutdownWriteLock = shutdownLock.writeLock(); var closeWriteLock = closeLock.writeLock();
shutdownWriteLock.lock();
try { try {
if (closed) { if (closed) {
return; return;
@ -715,7 +682,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
} }
} }
} finally { } finally {
shutdownWriteLock.unlock(); closeLock.unlockWrite(closeWriteLock);
} }
} }
@ -1037,7 +1004,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
private RocksDBColumn getRocksDBColumn(RocksDB db, ColumnFamilyHandle cfh) { private RocksDBColumn getRocksDBColumn(RocksDB db, ColumnFamilyHandle cfh) {
var nettyDirect = databaseOptions.allowNettyDirect(); var nettyDirect = databaseOptions.allowNettyDirect();
var accessibilityLock = getAccessibilityLock(); var closeLock = getCloseLock();
if (db instanceof OptimisticTransactionDB optimisticTransactionDB) { if (db instanceof OptimisticTransactionDB optimisticTransactionDB) {
return new OptimisticRocksDBColumn(optimisticTransactionDB, return new OptimisticRocksDBColumn(optimisticTransactionDB,
nettyDirect, nettyDirect,
@ -1045,7 +1012,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
name, name,
cfh, cfh,
meterRegistry, meterRegistry,
accessibilityLock closeLock
); );
} else if (db instanceof TransactionDB transactionDB) { } else if (db instanceof TransactionDB transactionDB) {
return new PessimisticRocksDBColumn(transactionDB, return new PessimisticRocksDBColumn(transactionDB,
@ -1054,10 +1021,10 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
name, name,
cfh, cfh,
meterRegistry, meterRegistry,
accessibilityLock closeLock
); );
} else { } else {
return new StandardRocksDBColumn(db, nettyDirect, allocator, name, cfh, meterRegistry, accessibilityLock); return new StandardRocksDBColumn(db, nettyDirect, allocator, name, cfh, meterRegistry, closeLock);
} }
} }
@ -1225,8 +1192,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
public Mono<LLSnapshot> takeSnapshot() { public Mono<LLSnapshot> takeSnapshot() {
return Mono return Mono
.fromCallable(() -> snapshotTime.recordCallable(() -> { .fromCallable(() -> snapshotTime.recordCallable(() -> {
var shutdownReadLock = shutdownLock.readLock(); var closeReadLock = closeLock.readLock();
shutdownReadLock.lock();
try { try {
if (closed) { if (closed) {
throw new IllegalStateException("Database closed"); throw new IllegalStateException("Database closed");
@ -1236,7 +1202,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
this.snapshotsHandles.put(currentSnapshotSequenceNumber, snapshot); this.snapshotsHandles.put(currentSnapshotSequenceNumber, snapshot);
return new LLSnapshot(currentSnapshotSequenceNumber); return new LLSnapshot(currentSnapshotSequenceNumber);
} finally { } finally {
shutdownReadLock.unlock(); closeLock.unlockRead(closeReadLock);
} }
})) }))
.subscribeOn(dbRScheduler); .subscribeOn(dbRScheduler);
@ -1246,8 +1212,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
public Mono<Void> releaseSnapshot(LLSnapshot snapshot) { public Mono<Void> releaseSnapshot(LLSnapshot snapshot) {
return Mono return Mono
.<Void>fromCallable(() -> { .<Void>fromCallable(() -> {
var shutdownReadLock = shutdownLock.readLock(); var closeReadLock = closeLock.readLock();
shutdownReadLock.lock();
try { try {
if (closed) { if (closed) {
throw new IllegalStateException("Database closed"); throw new IllegalStateException("Database closed");
@ -1262,7 +1227,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
db.releaseSnapshot(dbSnapshot); db.releaseSnapshot(dbSnapshot);
return null; return null;
} finally { } finally {
shutdownReadLock.unlock(); closeLock.unlockRead(closeReadLock);
} }
}) })
.subscribeOn(dbRScheduler); .subscribeOn(dbRScheduler);

View File

@ -15,6 +15,7 @@ import java.io.IOException;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.StampedLock;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyHandle;
@ -39,8 +40,8 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn<Optimis
String databaseName, String databaseName,
ColumnFamilyHandle cfh, ColumnFamilyHandle cfh,
MeterRegistry meterRegistry, MeterRegistry meterRegistry,
Lock accessibilityLock) { StampedLock closeLock) {
super(db, nettyDirect, alloc, databaseName, cfh, meterRegistry, accessibilityLock); super(db, nettyDirect, alloc, databaseName, cfh, meterRegistry, closeLock);
this.optimisticAttempts = DistributionSummary this.optimisticAttempts = DistributionSummary
.builder("db.optimistic.attempts.distribution") .builder("db.optimistic.attempts.distribution")
.publishPercentiles(0.2, 0.5, 0.95) .publishPercentiles(0.2, 0.5, 0.95)

View File

@ -11,6 +11,7 @@ import it.cavallium.dbengine.database.LLDelta;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.StampedLock;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyHandle;
@ -32,8 +33,8 @@ public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn<Transa
String dbName, String dbName,
ColumnFamilyHandle cfh, ColumnFamilyHandle cfh,
MeterRegistry meterRegistry, MeterRegistry meterRegistry,
Lock accessibilityLock) { StampedLock closeLock) {
super(db, nettyDirect, alloc, dbName, cfh, meterRegistry, accessibilityLock); super(db, nettyDirect, alloc, dbName, cfh, meterRegistry, closeLock);
} }
@Override @Override

View File

@ -56,8 +56,7 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn {
void put(@NotNull WriteOptions writeOptions, Buffer key, Buffer value) throws RocksDBException; void put(@NotNull WriteOptions writeOptions, Buffer key, Buffer value) throws RocksDBException;
default void put(@NotNull WriteOptions writeOptions, byte[] key, byte[] value) default void put(@NotNull WriteOptions writeOptions, byte[] key, byte[] value) throws RocksDBException {
throws RocksDBException {
var allocator = getAllocator(); var allocator = getAllocator();
try (var keyBuf = allocator.allocate(key.length)) { try (var keyBuf = allocator.allocate(key.length)) {
keyBuf.writeBytes(key); keyBuf.writeBytes(key);

View File

@ -0,0 +1,102 @@
package it.cavallium.dbengine.database.disk;
import static com.google.common.collect.Lists.partition;
import it.cavallium.dbengine.rpc.current.data.Column;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.apache.logging.log4j.Logger;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.CompactionJobInfo;
import org.rocksdb.CompactionOptions;
import org.rocksdb.CompressionType;
import org.rocksdb.LevelMetaData;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.SstFileMetaData;
import org.rocksdb.util.SizeUnit;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class RocksDBUtils {
public static int getLastLevel(RocksDB db, ColumnFamilyHandle cfh) {
var lastLevel = db.numberLevels(cfh);
if (lastLevel == 0) {
return 6;
} else {
return lastLevel;
}
}
public static List<String> getColumnFiles(RocksDB db, ColumnFamilyHandle cfh, boolean excludeLastLevel) {
List<String> files = new ArrayList<>();
var meta = db.getColumnFamilyMetaData(cfh);
var lastLevel = excludeLastLevel ? getLastLevel(db, cfh) : -1;
for (LevelMetaData level : meta.levels()) {
if (!excludeLastLevel || level.level() < lastLevel) {
for (SstFileMetaData file : level.files()) {
if (file.fileName().endsWith(".sst")) {
files.add(file.fileName());
}
}
}
}
return files;
}
public static void forceCompaction(RocksDB db,
String logDbName,
ColumnFamilyHandle cfh,
int volumeId,
Logger logger) {
try (var co = new CompactionOptions()
.setCompression(CompressionType.LZ4_COMPRESSION)
.setMaxSubcompactions(0)
.setOutputFileSizeLimit(2 * SizeUnit.GB)) {
List<String> filesToCompact = getColumnFiles(db, cfh, true);
if (!filesToCompact.isEmpty()) {
var partitionSize = filesToCompact.size() / Runtime.getRuntime().availableProcessors();
List<List<String>> partitions;
if (partitionSize > 0) {
partitions = partition(filesToCompact, partitionSize);
} else {
partitions = List.of(filesToCompact);
}
int finalBottommostLevel = getLastLevel(db, cfh);
Mono.whenDelayError(partitions.stream().map(partition -> Mono.<Void>fromCallable(() -> {
logger.info("Compacting {} files in database {} in column family {} to level {}",
partition.size(),
logDbName,
new String(cfh.getName(), StandardCharsets.UTF_8),
finalBottommostLevel
);
if (!partition.isEmpty()) {
var coi = new CompactionJobInfo();
try {
db.compactFiles(co, cfh, partition, finalBottommostLevel, volumeId, coi);
logger.info("Compacted {} files in database {} in column family {} to level {}: {}",
partition.size(),
logDbName,
new String(cfh.getName(), StandardCharsets.UTF_8),
finalBottommostLevel,
coi.status().getCodeString()
);
} catch (Throwable ex) {
logger.error("Failed to compact {} files in database {} in column family {} to level {}",
partition.size(),
logDbName,
new String(cfh.getName(), StandardCharsets.UTF_8),
finalBottommostLevel,
ex
);
}
}
return null;
}).subscribeOn(Schedulers.boundedElastic())).toList()).block();
}
}
}
}

View File

@ -9,6 +9,7 @@ import it.cavallium.dbengine.database.LLDelta;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.StampedLock;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyHandle;
@ -23,8 +24,8 @@ public final class StandardRocksDBColumn extends AbstractRocksDBColumn<RocksDB>
boolean nettyDirect, boolean nettyDirect,
BufferAllocator alloc, BufferAllocator alloc,
String dbName, String dbName,
ColumnFamilyHandle cfh, MeterRegistry meterRegistry, Lock accessibilityLock) { ColumnFamilyHandle cfh, MeterRegistry meterRegistry, StampedLock closeLock) {
super(db, nettyDirect, alloc, dbName, cfh, meterRegistry, accessibilityLock); super(db, nettyDirect, alloc, dbName, cfh, meterRegistry, closeLock);
} }
@Override @Override