Compaction API and configurable write buffer size

This commit is contained in:
Andrea Cavalli 2022-04-26 17:12:22 +02:00
parent 116fc88311
commit ec5bf1c5cc
10 changed files with 356 additions and 31 deletions

View File

@ -22,6 +22,7 @@ interfacesData:
filter: -Filter filter: -Filter
blockSize: -int blockSize: -int
persistentCacheId: -String persistentCacheId: -String
writeBufferSize: -long
# versions must have only numbers, lowercase letters, dots, dashes. Maximum: 99.999.9999 # versions must have only numbers, lowercase letters, dots, dashes. Maximum: 99.999.9999
versions: versions:
0.0.0: 0.0.0:
@ -264,6 +265,7 @@ versions:
filter: -Filter filter: -Filter
blockSize: -int blockSize: -int
persistentCacheId: -String persistentCacheId: -String
writeBufferSize: -long
# Remember to update ColumnOptions common getters # Remember to update ColumnOptions common getters
NamedColumnOptions: NamedColumnOptions:
data: data:
@ -275,6 +277,7 @@ versions:
filter: -Filter filter: -Filter
blockSize: -int blockSize: -int
persistentCacheId: -String persistentCacheId: -String
writeBufferSize: -long
BloomFilter: BloomFilter:
data: data:
bitsPerKey: int bitsPerKey: int

View File

@ -26,7 +26,8 @@ public class DefaultDatabaseOptions {
Nullableboolean.empty(), Nullableboolean.empty(),
NullableFilter.empty(), NullableFilter.empty(),
Nullableint.empty(), Nullableint.empty(),
NullableString.empty() NullableString.empty(),
Nullablelong.empty()
); );
public static NamedColumnOptions DEFAULT_NAMED_COLUMN_OPTIONS = new NamedColumnOptions( public static NamedColumnOptions DEFAULT_NAMED_COLUMN_OPTIONS = new NamedColumnOptions(
@ -37,7 +38,8 @@ public class DefaultDatabaseOptions {
Nullableboolean.empty(), Nullableboolean.empty(),
NullableFilter.empty(), NullableFilter.empty(),
Nullableint.empty(), Nullableint.empty(),
NullableString.empty() NullableString.empty(),
Nullablelong.empty()
); );
public static DatabaseOptions DEFAULT_DATABASE_OPTIONS = new DatabaseOptions(List.of(), public static DatabaseOptions DEFAULT_DATABASE_OPTIONS = new DatabaseOptions(List.of(),

View File

@ -64,6 +64,8 @@ public interface LLKeyValueDatabase extends LLSnapshottable, LLKeyValueDatabaseS
Mono<Void> verifyChecksum(); Mono<Void> verifyChecksum();
Mono<Void> compact();
BufferAllocator getAllocator(); BufferAllocator getAllocator();
MeterRegistry getMeterRegistry(); MeterRegistry getMeterRegistry();

View File

@ -740,7 +740,9 @@ public class LLUtils {
if (boundedRange || smallRange) { if (boundedRange || smallRange) {
readOptions.setFillCache(canFillCache); readOptions.setFillCache(canFillCache);
} else { } else {
if (readOptions.readaheadSize() <= 0) {
readOptions.setReadaheadSize(4 * 1024 * 1024); // 4MiB readOptions.setReadaheadSize(4 * 1024 * 1024); // 4MiB
}
readOptions.setFillCache(false); readOptions.setFillCache(false);
readOptions.setVerifyChecksums(false); readOptions.setVerifyChecksums(false);
} }

View File

@ -23,10 +23,10 @@ import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLLocalDictionary.ReleasableSliceImplWithRelease; import it.cavallium.dbengine.database.disk.LLLocalDictionary.ReleasableSliceImplWithRelease;
import it.cavallium.dbengine.database.disk.LLLocalDictionary.ReleasableSliceImplWithoutRelease; import it.cavallium.dbengine.database.disk.LLLocalDictionary.ReleasableSliceImplWithoutRelease;
import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.rpc.current.data.DatabaseOptions;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -37,14 +37,17 @@ import org.jetbrains.annotations.Nullable;
import org.rocksdb.AbstractSlice; import org.rocksdb.AbstractSlice;
import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.CompactRangeOptions; import org.rocksdb.CompactRangeOptions;
import org.rocksdb.CompactionOptions;
import org.rocksdb.DirectSlice; import org.rocksdb.DirectSlice;
import org.rocksdb.FlushOptions; import org.rocksdb.FlushOptions;
import org.rocksdb.Holder; import org.rocksdb.Holder;
import org.rocksdb.KeyMayExist.KeyMayExistEnum; import org.rocksdb.KeyMayExist.KeyMayExistEnum;
import org.rocksdb.LevelMetaData;
import org.rocksdb.ReadOptions; import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB; import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException; import org.rocksdb.RocksDBException;
import org.rocksdb.Slice; import org.rocksdb.Slice;
import org.rocksdb.SstFileMetaData;
import org.rocksdb.Transaction; import org.rocksdb.Transaction;
import org.rocksdb.WriteBatch; import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions; import org.rocksdb.WriteOptions;
@ -939,6 +942,34 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
return newData; return newData;
} }
@Override
public final void forceCompaction(int volumeId) throws RocksDBException {
List<String> files = new ArrayList<>();
var meta = db.getColumnFamilyMetaData(cfh);
int bottommostLevel = -1;
for (LevelMetaData level : meta.levels()) {
bottommostLevel = Math.max(bottommostLevel, level.level());
}
int count = 0;
x: for (LevelMetaData level : meta.levels()) {
for (SstFileMetaData file : level.files()) {
if (file.fileName().endsWith(".sst")) {
files.add(file.fileName());
count++;
if (count >= 4) {
break x;
}
}
}
}
try (var co = new CompactionOptions()) {
if (!files.isEmpty() && bottommostLevel != -1) {
db.compactFiles(co, cfh, files, bottommostLevel, volumeId, null);
}
db.compactRange(cfh);
}
}
@Override @Override
public ColumnFamilyHandle getColumnFamilyHandle() { public ColumnFamilyHandle getColumnFamilyHandle() {
return cfh; return cfh;
@ -952,4 +983,16 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
public MeterRegistry getMeterRegistry() { public MeterRegistry getMeterRegistry() {
return meterRegistry; return meterRegistry;
} }
public Timer getIterNextTime() {
return iterNextTime;
}
public Counter getStartedIterNext() {
return startedIterNext;
}
public Counter getEndedIterNext() {
return endedIterNext;
}
} }

View File

@ -59,6 +59,8 @@ import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactRangeOptions; import org.rocksdb.CompactRangeOptions;
import org.rocksdb.CompactRangeOptions.BottommostLevelCompaction;
import org.rocksdb.CompactionOptions;
import org.rocksdb.CompactionPriority; import org.rocksdb.CompactionPriority;
import org.rocksdb.CompressionOptions; import org.rocksdb.CompressionOptions;
import org.rocksdb.CompressionType; import org.rocksdb.CompressionType;
@ -66,15 +68,22 @@ import org.rocksdb.DBOptions;
import org.rocksdb.DataBlockIndexType; import org.rocksdb.DataBlockIndexType;
import org.rocksdb.DbPath; import org.rocksdb.DbPath;
import org.rocksdb.Env; import org.rocksdb.Env;
import org.rocksdb.EnvOptions;
import org.rocksdb.FlushOptions; import org.rocksdb.FlushOptions;
import org.rocksdb.IndexType; import org.rocksdb.IndexType;
import org.rocksdb.InfoLogLevel; import org.rocksdb.InfoLogLevel;
import org.rocksdb.IngestExternalFileOptions;
import org.rocksdb.LRUCache; import org.rocksdb.LRUCache;
import org.rocksdb.LevelMetaData;
import org.rocksdb.LiveFileMetaData;
import org.rocksdb.OptimisticTransactionDB; import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.PersistentCache; import org.rocksdb.PersistentCache;
import org.rocksdb.RocksDB; import org.rocksdb.RocksDB;
import org.rocksdb.RocksDB.LiveFiles;
import org.rocksdb.RocksDBException; import org.rocksdb.RocksDBException;
import org.rocksdb.Snapshot; import org.rocksdb.Snapshot;
import org.rocksdb.SstFileMetaData;
import org.rocksdb.SstFileWriter;
import org.rocksdb.TransactionDB; import org.rocksdb.TransactionDB;
import org.rocksdb.TransactionDBOptions; import org.rocksdb.TransactionDBOptions;
import org.rocksdb.TxnDBWritePolicy; import org.rocksdb.TxnDBWritePolicy;
@ -203,23 +212,39 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
: DEFAULT_COMPACTION_MEMTABLE_MEMORY_BUDGET)); : DEFAULT_COMPACTION_MEMTABLE_MEMORY_BUDGET));
} }
if (Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.compactions.auto.disable", "false"))) {
columnFamilyOptions.setDisableAutoCompactions(true);
}
boolean dynamicLevelBytes;
// This option is not supported with multiple db paths
if (databaseOptions.volumes().size() <= 1) {
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
dynamicLevelBytes = true;
} else {
dynamicLevelBytes = false;
}
if (dynamicLevelBytes) {
columnFamilyOptions.setLevelCompactionDynamicLevelBytes(dynamicLevelBytes);
} else {
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
// https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html // https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html
columnFamilyOptions.setMaxBytesForLevelBase(256 * SizeUnit.MB); columnFamilyOptions.setMaxBytesForLevelBase(256 * SizeUnit.MB);
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
columnFamilyOptions.setMaxBytesForLevelMultiplier(10); columnFamilyOptions.setMaxBytesForLevelMultiplier(10);
// This option is not supported with multiple db paths
if (databaseOptions.volumes().size() <= 1) {
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
columnFamilyOptions.setLevelCompactionDynamicLevelBytes(true);
} }
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
// Higher values speed up writes, but slow down reads
columnFamilyOptions.setLevel0FileNumCompactionTrigger(2); columnFamilyOptions.setLevel0FileNumCompactionTrigger(2);
if (Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.disableslowdown", "false"))) {
columnFamilyOptions.setLevel0SlowdownWritesTrigger(-1);
columnFamilyOptions.setLevel0StopWritesTrigger(Integer.MAX_VALUE);
} else {
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
columnFamilyOptions.setLevel0SlowdownWritesTrigger(20); columnFamilyOptions.setLevel0SlowdownWritesTrigger(20);
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
columnFamilyOptions.setLevel0StopWritesTrigger(36); columnFamilyOptions.setLevel0StopWritesTrigger(36);
}
if (!columnOptions.levels().isEmpty()) { if (!columnOptions.levels().isEmpty()) {
var firstLevelOptions = getRocksLevelOptions(columnOptions.levels().get(0)); var firstLevelOptions = getRocksLevelOptions(columnOptions.levels().get(0));
@ -247,7 +272,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
compressionTypes.add(CompressionType.LZ4_COMPRESSION); compressionTypes.add(CompressionType.LZ4_COMPRESSION);
} }
} }
columnFamilyOptions.setBottommostCompressionType(CompressionType.LZ4_COMPRESSION); columnFamilyOptions.setBottommostCompressionType(CompressionType.LZ4HC_COMPRESSION);
columnFamilyOptions.setBottommostCompressionOptions(new CompressionOptions() columnFamilyOptions.setBottommostCompressionOptions(new CompressionOptions()
.setEnabled(true) .setEnabled(true)
.setMaxDictBytes(32768)); .setMaxDictBytes(32768));
@ -257,6 +282,10 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
final BlockBasedTableConfig tableOptions = new BlockBasedTableConfig(); final BlockBasedTableConfig tableOptions = new BlockBasedTableConfig();
if (!databaseOptions.lowMemory()) { if (!databaseOptions.lowMemory()) {
// tableOptions.setOptimizeFiltersForMemory(true); // tableOptions.setOptimizeFiltersForMemory(true);
columnFamilyOptions.setWriteBufferSize(256 * SizeUnit.MB);
}
if (columnOptions.writeBufferSize().isPresent()) {
columnFamilyOptions.setWriteBufferSize(columnOptions.writeBufferSize().get());
} }
tableOptions.setVerifyCompression(false); tableOptions.setVerifyCompression(false);
if (columnOptions.filter().isPresent()) { if (columnOptions.filter().isPresent()) {
@ -511,6 +540,54 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
return this.handles; return this.handles;
} }
public int getLastVolumeId() {
var paths = convertPaths(dbPath.toAbsolutePath().getParent(), dbPath.getFileName(), databaseOptions.volumes());
return paths.size() - 1;
}
public void forceCompaction(int volumeId) throws RocksDBException {
try (var co = new CompactionOptions()
.setCompression(CompressionType.LZ4HC_COMPRESSION)
.setMaxSubcompactions(Runtime.getRuntime().availableProcessors())
.setOutputFileSizeLimit(2 * SizeUnit.GB)) {
for (ColumnFamilyHandle cfh : this.handles.values()) {
List<String> files = new ArrayList<>();
var meta = db.getLiveFilesMetaData();
int bottommostLevel = -1;
for (var level : meta) {
bottommostLevel = Math.max(bottommostLevel, level.level());
}
int count = 0;
for (var file : meta) {
if (file.fileName().endsWith(".sst") && Arrays.equals(cfh.getName(), file.columnFamilyName())) {
files.add(file.fileName());
count++;
if (count >= 2) {
break;
}
}
}
bottommostLevel = Math.max(bottommostLevel, databaseOptions.defaultColumnOptions().levels().size() - 1);
if (!files.isEmpty() && bottommostLevel != -1) {
db.compactFiles(co, cfh, files, bottommostLevel, volumeId, null);
}
try (var co2 = new CompactRangeOptions()
.setAllowWriteStall(true)
.setChangeLevel(true)
.setTargetLevel(bottommostLevel)
.setMaxSubcompactions(Runtime.getRuntime().availableProcessors())
.setExclusiveManualCompaction(true)
.setBottommostLevelCompaction(BottommostLevelCompaction.kSkip)) {
db.compactRange(cfh, null, null, co2);
}
}
}
}
public void flush(FlushOptions flushOptions) throws RocksDBException {
db.flush(flushOptions);
}
private record RocksLevelOptions(CompressionType compressionType, CompressionOptions compressionOptions) {} private record RocksLevelOptions(CompressionType compressionType, CompressionOptions compressionOptions) {}
private RocksLevelOptions getRocksLevelOptions(DatabaseLevel levelOptions) { private RocksLevelOptions getRocksLevelOptions(DatabaseLevel levelOptions) {
var compressionType = levelOptions.compression().getType(); var compressionType = levelOptions.compression().getType();
@ -560,7 +637,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
} }
closed = true; closed = true;
if (db.isOwningHandle()) { if (db.isOwningHandle()) {
flushDb(db, handles); //flushDb(db, handles);
} }
for (ColumnFamilyHandle handle : handles) { for (ColumnFamilyHandle handle : handles) {
@ -672,7 +749,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
// that determines the behaviour of the database. // that determines the behaviour of the database.
var options = new DBOptions(); var options = new DBOptions();
options.setEnablePipelinedWrite(true); options.setEnablePipelinedWrite(true);
options.setMaxSubcompactions(2); options.setMaxSubcompactions(Integer.parseInt(System.getProperty("it.cavallium.dbengine.compactions.max.sub", "2")));
var customWriteRate = Long.parseLong(System.getProperty("it.cavallium.dbengine.write.delayedrate", "-1")); var customWriteRate = Long.parseLong(System.getProperty("it.cavallium.dbengine.write.delayedrate", "-1"));
if (customWriteRate >= 0) { if (customWriteRate >= 0) {
options.setDelayedWriteRate(customWriteRate); options.setDelayedWriteRate(customWriteRate);
@ -693,7 +770,10 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
Objects.requireNonNull(databasesDirPath); Objects.requireNonNull(databasesDirPath);
Objects.requireNonNull(path.getFileName()); Objects.requireNonNull(path.getFileName());
List<DbPath> paths = convertPaths(databasesDirPath, path.getFileName(), databaseOptions.volumes()); List<DbPath> paths = convertPaths(databasesDirPath, path.getFileName(), databaseOptions.volumes())
.stream()
.map(p -> new DbPath(p.path, p.targetSize))
.toList();
options.setDbPaths(paths); options.setDbPaths(paths);
options.setMaxOpenFiles(databaseOptions.maxOpenFiles().orElse(-1)); options.setMaxOpenFiles(databaseOptions.maxOpenFiles().orElse(-1));
if (databaseOptions.spinning()) { if (databaseOptions.spinning()) {
@ -748,7 +828,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
} else { } else {
// HIGH MEMORY // HIGH MEMORY
options options
.setDbWriteBufferSize(64 * SizeUnit.MB) //.setDbWriteBufferSize(64 * SizeUnit.MB)
.setBytesPerSync(64 * SizeUnit.KB) .setBytesPerSync(64 * SizeUnit.KB)
.setWalBytesPerSync(64 * SizeUnit.KB) .setWalBytesPerSync(64 * SizeUnit.KB)
@ -799,14 +879,16 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
return new OptionsWithCache(options, blockCache, compressedCache); return new OptionsWithCache(options, blockCache, compressedCache);
} }
private static List<DbPath> convertPaths(Path databasesDirPath, Path path, List<DatabaseVolume> volumes) { record DbPathRecord(Path path, long targetSize) {}
var paths = new ArrayList<DbPath>(volumes.size());
private static List<DbPathRecord> convertPaths(Path databasesDirPath, Path path, List<DatabaseVolume> volumes) {
var paths = new ArrayList<DbPathRecord>(volumes.size());
if (volumes.isEmpty()) { if (volumes.isEmpty()) {
return List.of(new DbPath(databasesDirPath.resolve(path.getFileName() + "_hot"), return List.of(new DbPathRecord(databasesDirPath.resolve(path.getFileName() + "_hot"),
0), // Legacy 0), // Legacy
new DbPath(databasesDirPath.resolve(path.getFileName() + "_cold"), new DbPathRecord(databasesDirPath.resolve(path.getFileName() + "_cold"),
0), // Legacy 0), // Legacy
new DbPath(databasesDirPath.resolve(path.getFileName() + "_colder"), new DbPathRecord(databasesDirPath.resolve(path.getFileName() + "_colder"),
1000L * 1024L * 1024L * 1024L) // 1000GiB 1000L * 1024L * 1024L * 1024L) // 1000GiB
); // Legacy ); // Legacy
} }
@ -817,7 +899,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
} else { } else {
volumePath = databasesDirPath.resolve(volume.volumePath()); volumePath = databasesDirPath.resolve(volume.volumePath());
} }
paths.add(new DbPath(volumePath, volume.targetSizeBytes())); paths.add(new DbPathRecord(volumePath, volume.targetSizeBytes()));
} }
return paths; return paths;
} }
@ -937,6 +1019,48 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
.subscribeOn(dbRScheduler); .subscribeOn(dbRScheduler);
} }
public Flux<Path> getSSTS() {
var paths = convertPaths(dbPath.toAbsolutePath().getParent(), dbPath.getFileName(), databaseOptions.volumes());
return Mono
.fromCallable(() -> db.getLiveFiles())
.flatMapIterable(liveFiles -> liveFiles.files)
.filter(file -> file.endsWith(".sst"))
.map(file -> file.substring(1))
.flatMapSequential(file -> Mono.fromCallable(() -> {
{
var path = dbPath.resolve(file);
if (Files.exists(path)) {
return path;
}
}
for (var volumePath : paths) {
var path = volumePath.path().resolve(file);
if (Files.exists(path)) {
return path;
}
}
return null;
}).subscribeOn(Schedulers.boundedElastic()));
}
public Mono<Void> ingestSSTS(Flux<Path> sstsFlux) {
return sstsFlux
.map(path -> path.toAbsolutePath().toString())
.flatMap(sst -> Mono.fromCallable(() -> {
try (var opts = new IngestExternalFileOptions()) {
try {
logger.info("Ingesting SST \"{}\"...", sst);
db.ingestExternalFile(List.of(sst), opts);
logger.info("Ingested SST \"{}\" successfully", sst);
} catch (RocksDBException e) {
logger.error("Can't ingest SST \"{}\"", sst, e);
}
}
return null;
}).subscribeOn(Schedulers.boundedElastic()))
.then();
}
@Override @Override
public Mono<MemoryStats> getMemoryStats() { public Mono<MemoryStats> getMemoryStats() {
return Mono return Mono
@ -1010,6 +1134,14 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
.subscribeOn(dbRScheduler); .subscribeOn(dbRScheduler);
} }
@Override
public Mono<Void> compact() {
return Mono.<Void>fromCallable(() -> {
this.forceCompaction(getLastVolumeId());
return null;
}).subscribeOn(dbWScheduler);
}
@Override @Override
public BufferAllocator getAllocator() { public BufferAllocator getAllocator() {
return allocator; return allocator;

View File

@ -0,0 +1,118 @@
package it.cavallium.dbengine.database.disk;
import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
import static it.cavallium.dbengine.database.LLUtils.generateCustomReadOptions;
import static it.cavallium.dbengine.database.LLUtils.isBoundedRange;
import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.Drop;
import io.netty5.buffer.api.Owned;
import io.netty5.buffer.api.Send;
import io.netty5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLLocalMigrationReactiveRocksIterator.ByteEntry;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
import reactor.core.publisher.Flux;
public final class LLLocalMigrationReactiveRocksIterator extends
ResourceSupport<LLLocalMigrationReactiveRocksIterator, LLLocalMigrationReactiveRocksIterator> {
protected static final Logger logger = LogManager.getLogger(LLLocalMigrationReactiveRocksIterator.class);
private static final Drop<LLLocalMigrationReactiveRocksIterator> DROP = new Drop<>() {
@Override
public void drop(LLLocalMigrationReactiveRocksIterator obj) {
try {
if (obj.rangeShared != null) {
obj.rangeShared.close();
}
} catch (Throwable ex) {
logger.error("Failed to close range", ex);
}
try {
if (obj.readOptions != null) {
if (!(obj.readOptions instanceof UnreleasableReadOptions)) {
obj.readOptions.close();
}
}
} catch (Throwable ex) {
logger.error("Failed to close readOptions", ex);
}
}
@Override
public Drop<LLLocalMigrationReactiveRocksIterator> fork() {
return this;
}
@Override
public void attach(LLLocalMigrationReactiveRocksIterator obj) {
}
};
private final RocksDBColumn db;
private LLRange rangeShared;
private ReadOptions readOptions;
@SuppressWarnings({"unchecked", "rawtypes"})
public LLLocalMigrationReactiveRocksIterator(RocksDBColumn db,
Send<LLRange> range,
ReadOptions readOptions) {
super((Drop<LLLocalMigrationReactiveRocksIterator>) (Drop) DROP);
try (range) {
this.db = db;
this.rangeShared = range.receive();
this.readOptions = readOptions;
}
}
public record ByteEntry(byte[] key, byte[] value) {}
public Flux<ByteEntry> flux() {
return Flux.generate(() -> {
var readOptions = generateCustomReadOptions(this.readOptions, false, false, false);
return db.getRocksIterator(false, readOptions, rangeShared, false);
}, (tuple, sink) -> {
try {
var rocksIterator = tuple.iterator();
if (rocksIterator.isValid()) {
byte[] key = rocksIterator.key();
byte[] value = rocksIterator.value();
rocksIterator.next(false);
sink.next(new ByteEntry(key, value));
} else {
sink.complete();
}
} catch (RocksDBException ex) {
sink.error(ex);
}
return tuple;
}, RocksIteratorTuple::close);
}
@Override
protected final RuntimeException createResourceClosedException() {
return new IllegalStateException("Closed");
}
@Override
protected Owned<LLLocalMigrationReactiveRocksIterator> prepareSend() {
var range = this.rangeShared.send();
var readOptions = this.readOptions;
return drop -> new LLLocalMigrationReactiveRocksIterator(db,
range,
readOptions
);
}
protected void makeInaccessible() {
this.rangeShared = null;
this.readOptions = null;
}
}

View File

@ -100,4 +100,6 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn {
MeterRegistry getMeterRegistry(); MeterRegistry getMeterRegistry();
boolean supportsTransactions(); boolean supportsTransactions();
void forceCompaction(int volumeId) throws RocksDBException;
} }

View File

@ -147,14 +147,30 @@ public class RocksDBIterator implements SafeCloseable {
} }
public void next() throws RocksDBException { public void next() throws RocksDBException {
next(true);
}
public void next(boolean traceStats) throws RocksDBException {
if (traceStats) {
startedIterNext.increment(); startedIterNext.increment();
iterNextTime.record(rocksIterator::next); iterNextTime.record(rocksIterator::next);
endedIterNext.increment(); endedIterNext.increment();
} else {
rocksIterator.next();
}
} }
public void prev() throws RocksDBException { public void prev() throws RocksDBException {
prev(true);
}
public void prev(boolean traceStats) throws RocksDBException {
if (traceStats) {
startedIterNext.increment(); startedIterNext.increment();
iterNextTime.record(rocksIterator::prev); iterNextTime.record(rocksIterator::prev);
endedIterNext.increment(); endedIterNext.increment();
} else {
rocksIterator.prev();
}
} }
} }

View File

@ -346,6 +346,11 @@ public class LLQuicConnection implements LLDatabaseConnection {
return null; return null;
} }
@Override
public Mono<Void> compact() {
return null;
}
@Override @Override
public BufferAllocator getAllocator() { public BufferAllocator getAllocator() {
return allocator; return allocator;