From ec5bf1c5cc59da2ae17af91d905882f9150dad12 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Tue, 26 Apr 2022 17:12:22 +0200 Subject: [PATCH] Compaction API and configurable write buffer size --- src/main/data-generator/quic-rpc.yaml | 3 + .../client/DefaultDatabaseOptions.java | 6 +- .../dbengine/database/LLKeyValueDatabase.java | 2 + .../cavallium/dbengine/database/LLUtils.java | 4 +- .../database/disk/AbstractRocksDBColumn.java | 45 ++++- .../disk/LLLocalKeyValueDatabase.java | 174 +++++++++++++++--- ...LLLocalMigrationReactiveRocksIterator.java | 118 ++++++++++++ .../dbengine/database/disk/RocksDBColumn.java | 2 + .../database/disk/RocksDBIterator.java | 28 ++- .../database/remote/LLQuicConnection.java | 5 + 10 files changed, 356 insertions(+), 31 deletions(-) create mode 100644 src/main/java/it/cavallium/dbengine/database/disk/LLLocalMigrationReactiveRocksIterator.java diff --git a/src/main/data-generator/quic-rpc.yaml b/src/main/data-generator/quic-rpc.yaml index 1c5fa50..08791bd 100644 --- a/src/main/data-generator/quic-rpc.yaml +++ b/src/main/data-generator/quic-rpc.yaml @@ -22,6 +22,7 @@ interfacesData: filter: -Filter blockSize: -int persistentCacheId: -String + writeBufferSize: -long # versions must have only numbers, lowercase letters, dots, dashes. Maximum: 99.999.9999 versions: 0.0.0: @@ -264,6 +265,7 @@ versions: filter: -Filter blockSize: -int persistentCacheId: -String + writeBufferSize: -long # Remember to update ColumnOptions common getters NamedColumnOptions: data: @@ -275,6 +277,7 @@ versions: filter: -Filter blockSize: -int persistentCacheId: -String + writeBufferSize: -long BloomFilter: data: bitsPerKey: int diff --git a/src/main/java/it/cavallium/dbengine/client/DefaultDatabaseOptions.java b/src/main/java/it/cavallium/dbengine/client/DefaultDatabaseOptions.java index aaad1a0..acc9939 100644 --- a/src/main/java/it/cavallium/dbengine/client/DefaultDatabaseOptions.java +++ b/src/main/java/it/cavallium/dbengine/client/DefaultDatabaseOptions.java @@ -26,7 +26,8 @@ public class DefaultDatabaseOptions { Nullableboolean.empty(), NullableFilter.empty(), Nullableint.empty(), - NullableString.empty() + NullableString.empty(), + Nullablelong.empty() ); public static NamedColumnOptions DEFAULT_NAMED_COLUMN_OPTIONS = new NamedColumnOptions( @@ -37,7 +38,8 @@ public class DefaultDatabaseOptions { Nullableboolean.empty(), NullableFilter.empty(), Nullableint.empty(), - NullableString.empty() + NullableString.empty(), + Nullablelong.empty() ); public static DatabaseOptions DEFAULT_DATABASE_OPTIONS = new DatabaseOptions(List.of(), diff --git a/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java index a717bbb..cf2f055 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java @@ -64,6 +64,8 @@ public interface LLKeyValueDatabase extends LLSnapshottable, LLKeyValueDatabaseS Mono verifyChecksum(); + Mono compact(); + BufferAllocator getAllocator(); MeterRegistry getMeterRegistry(); diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 41bf109..d90c5a4 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -740,7 +740,9 @@ public class LLUtils { if (boundedRange || smallRange) { readOptions.setFillCache(canFillCache); } else { - readOptions.setReadaheadSize(4 * 1024 * 1024); // 4MiB + if (readOptions.readaheadSize() <= 0) { + readOptions.setReadaheadSize(4 * 1024 * 1024); // 4MiB + } readOptions.setFillCache(false); readOptions.setVerifyChecksums(false); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java index 6020f43..093c31c 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java @@ -23,10 +23,10 @@ import it.cavallium.dbengine.database.SafeCloseable; import it.cavallium.dbengine.database.disk.LLLocalDictionary.ReleasableSliceImplWithRelease; import it.cavallium.dbengine.database.disk.LLLocalDictionary.ReleasableSliceImplWithoutRelease; import it.cavallium.dbengine.database.serialization.SerializationException; -import it.cavallium.dbengine.rpc.current.data.DatabaseOptions; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -37,14 +37,17 @@ import org.jetbrains.annotations.Nullable; import org.rocksdb.AbstractSlice; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.CompactRangeOptions; +import org.rocksdb.CompactionOptions; import org.rocksdb.DirectSlice; import org.rocksdb.FlushOptions; import org.rocksdb.Holder; import org.rocksdb.KeyMayExist.KeyMayExistEnum; +import org.rocksdb.LevelMetaData; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.Slice; +import org.rocksdb.SstFileMetaData; import org.rocksdb.Transaction; import org.rocksdb.WriteBatch; import org.rocksdb.WriteOptions; @@ -939,6 +942,34 @@ public sealed abstract class AbstractRocksDBColumn implements return newData; } + @Override + public final void forceCompaction(int volumeId) throws RocksDBException { + List files = new ArrayList<>(); + var meta = db.getColumnFamilyMetaData(cfh); + int bottommostLevel = -1; + for (LevelMetaData level : meta.levels()) { + bottommostLevel = Math.max(bottommostLevel, level.level()); + } + int count = 0; + x: for (LevelMetaData level : meta.levels()) { + for (SstFileMetaData file : level.files()) { + if (file.fileName().endsWith(".sst")) { + files.add(file.fileName()); + count++; + if (count >= 4) { + break x; + } + } + } + } + try (var co = new CompactionOptions()) { + if (!files.isEmpty() && bottommostLevel != -1) { + db.compactFiles(co, cfh, files, bottommostLevel, volumeId, null); + } + db.compactRange(cfh); + } + } + @Override public ColumnFamilyHandle getColumnFamilyHandle() { return cfh; @@ -952,4 +983,16 @@ public sealed abstract class AbstractRocksDBColumn implements public MeterRegistry getMeterRegistry() { return meterRegistry; } + + public Timer getIterNextTime() { + return iterNextTime; + } + + public Counter getStartedIterNext() { + return startedIterNext; + } + + public Counter getEndedIterNext() { + return endedIterNext; + } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index ea936b9..4644752 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -59,6 +59,8 @@ import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.CompactRangeOptions; +import org.rocksdb.CompactRangeOptions.BottommostLevelCompaction; +import org.rocksdb.CompactionOptions; import org.rocksdb.CompactionPriority; import org.rocksdb.CompressionOptions; import org.rocksdb.CompressionType; @@ -66,15 +68,22 @@ import org.rocksdb.DBOptions; import org.rocksdb.DataBlockIndexType; import org.rocksdb.DbPath; import org.rocksdb.Env; +import org.rocksdb.EnvOptions; import org.rocksdb.FlushOptions; import org.rocksdb.IndexType; import org.rocksdb.InfoLogLevel; +import org.rocksdb.IngestExternalFileOptions; import org.rocksdb.LRUCache; +import org.rocksdb.LevelMetaData; +import org.rocksdb.LiveFileMetaData; import org.rocksdb.OptimisticTransactionDB; import org.rocksdb.PersistentCache; import org.rocksdb.RocksDB; +import org.rocksdb.RocksDB.LiveFiles; import org.rocksdb.RocksDBException; import org.rocksdb.Snapshot; +import org.rocksdb.SstFileMetaData; +import org.rocksdb.SstFileWriter; import org.rocksdb.TransactionDB; import org.rocksdb.TransactionDBOptions; import org.rocksdb.TxnDBWritePolicy; @@ -203,23 +212,39 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { : DEFAULT_COMPACTION_MEMTABLE_MEMORY_BUDGET)); } - // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html - // https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html - columnFamilyOptions.setMaxBytesForLevelBase(256 * SizeUnit.MB); - // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html - columnFamilyOptions.setMaxBytesForLevelMultiplier(10); + if (Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.compactions.auto.disable", "false"))) { + columnFamilyOptions.setDisableAutoCompactions(true); + } + boolean dynamicLevelBytes; // This option is not supported with multiple db paths if (databaseOptions.volumes().size() <= 1) { // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html // https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks - columnFamilyOptions.setLevelCompactionDynamicLevelBytes(true); + dynamicLevelBytes = true; + } else { + dynamicLevelBytes = false; + } + if (dynamicLevelBytes) { + columnFamilyOptions.setLevelCompactionDynamicLevelBytes(dynamicLevelBytes); + } else { + // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html + // https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html + columnFamilyOptions.setMaxBytesForLevelBase(256 * SizeUnit.MB); + // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html + columnFamilyOptions.setMaxBytesForLevelMultiplier(10); } // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html + // Higher values speed up writes, but slow down reads columnFamilyOptions.setLevel0FileNumCompactionTrigger(2); - // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html - columnFamilyOptions.setLevel0SlowdownWritesTrigger(20); - // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html - columnFamilyOptions.setLevel0StopWritesTrigger(36); + if (Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.disableslowdown", "false"))) { + columnFamilyOptions.setLevel0SlowdownWritesTrigger(-1); + columnFamilyOptions.setLevel0StopWritesTrigger(Integer.MAX_VALUE); + } else { + // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html + columnFamilyOptions.setLevel0SlowdownWritesTrigger(20); + // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html + columnFamilyOptions.setLevel0StopWritesTrigger(36); + } if (!columnOptions.levels().isEmpty()) { var firstLevelOptions = getRocksLevelOptions(columnOptions.levels().get(0)); @@ -247,7 +272,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { compressionTypes.add(CompressionType.LZ4_COMPRESSION); } } - columnFamilyOptions.setBottommostCompressionType(CompressionType.LZ4_COMPRESSION); + columnFamilyOptions.setBottommostCompressionType(CompressionType.LZ4HC_COMPRESSION); columnFamilyOptions.setBottommostCompressionOptions(new CompressionOptions() .setEnabled(true) .setMaxDictBytes(32768)); @@ -257,6 +282,10 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { final BlockBasedTableConfig tableOptions = new BlockBasedTableConfig(); if (!databaseOptions.lowMemory()) { // tableOptions.setOptimizeFiltersForMemory(true); + columnFamilyOptions.setWriteBufferSize(256 * SizeUnit.MB); + } + if (columnOptions.writeBufferSize().isPresent()) { + columnFamilyOptions.setWriteBufferSize(columnOptions.writeBufferSize().get()); } tableOptions.setVerifyCompression(false); if (columnOptions.filter().isPresent()) { @@ -511,6 +540,54 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { return this.handles; } + public int getLastVolumeId() { + var paths = convertPaths(dbPath.toAbsolutePath().getParent(), dbPath.getFileName(), databaseOptions.volumes()); + return paths.size() - 1; + } + + public void forceCompaction(int volumeId) throws RocksDBException { + try (var co = new CompactionOptions() + .setCompression(CompressionType.LZ4HC_COMPRESSION) + .setMaxSubcompactions(Runtime.getRuntime().availableProcessors()) + .setOutputFileSizeLimit(2 * SizeUnit.GB)) { + for (ColumnFamilyHandle cfh : this.handles.values()) { + List files = new ArrayList<>(); + var meta = db.getLiveFilesMetaData(); + int bottommostLevel = -1; + for (var level : meta) { + bottommostLevel = Math.max(bottommostLevel, level.level()); + } + int count = 0; + for (var file : meta) { + if (file.fileName().endsWith(".sst") && Arrays.equals(cfh.getName(), file.columnFamilyName())) { + files.add(file.fileName()); + count++; + if (count >= 2) { + break; + } + } + } + bottommostLevel = Math.max(bottommostLevel, databaseOptions.defaultColumnOptions().levels().size() - 1); + if (!files.isEmpty() && bottommostLevel != -1) { + db.compactFiles(co, cfh, files, bottommostLevel, volumeId, null); + } + try (var co2 = new CompactRangeOptions() + .setAllowWriteStall(true) + .setChangeLevel(true) + .setTargetLevel(bottommostLevel) + .setMaxSubcompactions(Runtime.getRuntime().availableProcessors()) + .setExclusiveManualCompaction(true) + .setBottommostLevelCompaction(BottommostLevelCompaction.kSkip)) { + db.compactRange(cfh, null, null, co2); + } + } + } + } + + public void flush(FlushOptions flushOptions) throws RocksDBException { + db.flush(flushOptions); + } + private record RocksLevelOptions(CompressionType compressionType, CompressionOptions compressionOptions) {} private RocksLevelOptions getRocksLevelOptions(DatabaseLevel levelOptions) { var compressionType = levelOptions.compression().getType(); @@ -560,7 +637,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { } closed = true; if (db.isOwningHandle()) { - flushDb(db, handles); + //flushDb(db, handles); } for (ColumnFamilyHandle handle : handles) { @@ -672,7 +749,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { // that determines the behaviour of the database. var options = new DBOptions(); options.setEnablePipelinedWrite(true); - options.setMaxSubcompactions(2); + options.setMaxSubcompactions(Integer.parseInt(System.getProperty("it.cavallium.dbengine.compactions.max.sub", "2"))); var customWriteRate = Long.parseLong(System.getProperty("it.cavallium.dbengine.write.delayedrate", "-1")); if (customWriteRate >= 0) { options.setDelayedWriteRate(customWriteRate); @@ -693,7 +770,10 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { Objects.requireNonNull(databasesDirPath); Objects.requireNonNull(path.getFileName()); - List paths = convertPaths(databasesDirPath, path.getFileName(), databaseOptions.volumes()); + List paths = convertPaths(databasesDirPath, path.getFileName(), databaseOptions.volumes()) + .stream() + .map(p -> new DbPath(p.path, p.targetSize)) + .toList(); options.setDbPaths(paths); options.setMaxOpenFiles(databaseOptions.maxOpenFiles().orElse(-1)); if (databaseOptions.spinning()) { @@ -748,7 +828,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { } else { // HIGH MEMORY options - .setDbWriteBufferSize(64 * SizeUnit.MB) + //.setDbWriteBufferSize(64 * SizeUnit.MB) .setBytesPerSync(64 * SizeUnit.KB) .setWalBytesPerSync(64 * SizeUnit.KB) @@ -799,14 +879,16 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { return new OptionsWithCache(options, blockCache, compressedCache); } - private static List convertPaths(Path databasesDirPath, Path path, List volumes) { - var paths = new ArrayList(volumes.size()); + record DbPathRecord(Path path, long targetSize) {} + + private static List convertPaths(Path databasesDirPath, Path path, List volumes) { + var paths = new ArrayList(volumes.size()); if (volumes.isEmpty()) { - return List.of(new DbPath(databasesDirPath.resolve(path.getFileName() + "_hot"), + return List.of(new DbPathRecord(databasesDirPath.resolve(path.getFileName() + "_hot"), 0), // Legacy - new DbPath(databasesDirPath.resolve(path.getFileName() + "_cold"), + new DbPathRecord(databasesDirPath.resolve(path.getFileName() + "_cold"), 0), // Legacy - new DbPath(databasesDirPath.resolve(path.getFileName() + "_colder"), + new DbPathRecord(databasesDirPath.resolve(path.getFileName() + "_colder"), 1000L * 1024L * 1024L * 1024L) // 1000GiB ); // Legacy } @@ -817,7 +899,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { } else { volumePath = databasesDirPath.resolve(volume.volumePath()); } - paths.add(new DbPath(volumePath, volume.targetSizeBytes())); + paths.add(new DbPathRecord(volumePath, volume.targetSizeBytes())); } return paths; } @@ -937,6 +1019,48 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { .subscribeOn(dbRScheduler); } + public Flux getSSTS() { + var paths = convertPaths(dbPath.toAbsolutePath().getParent(), dbPath.getFileName(), databaseOptions.volumes()); + return Mono + .fromCallable(() -> db.getLiveFiles()) + .flatMapIterable(liveFiles -> liveFiles.files) + .filter(file -> file.endsWith(".sst")) + .map(file -> file.substring(1)) + .flatMapSequential(file -> Mono.fromCallable(() -> { + { + var path = dbPath.resolve(file); + if (Files.exists(path)) { + return path; + } + } + for (var volumePath : paths) { + var path = volumePath.path().resolve(file); + if (Files.exists(path)) { + return path; + } + } + return null; + }).subscribeOn(Schedulers.boundedElastic())); + } + + public Mono ingestSSTS(Flux sstsFlux) { + return sstsFlux + .map(path -> path.toAbsolutePath().toString()) + .flatMap(sst -> Mono.fromCallable(() -> { + try (var opts = new IngestExternalFileOptions()) { + try { + logger.info("Ingesting SST \"{}\"...", sst); + db.ingestExternalFile(List.of(sst), opts); + logger.info("Ingested SST \"{}\" successfully", sst); + } catch (RocksDBException e) { + logger.error("Can't ingest SST \"{}\"", sst, e); + } + } + return null; + }).subscribeOn(Schedulers.boundedElastic())) + .then(); + } + @Override public Mono getMemoryStats() { return Mono @@ -1010,6 +1134,14 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { .subscribeOn(dbRScheduler); } + @Override + public Mono compact() { + return Mono.fromCallable(() -> { + this.forceCompaction(getLastVolumeId()); + return null; + }).subscribeOn(dbWScheduler); + } + @Override public BufferAllocator getAllocator() { return allocator; diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMigrationReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMigrationReactiveRocksIterator.java new file mode 100644 index 0000000..38e4ec8 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMigrationReactiveRocksIterator.java @@ -0,0 +1,118 @@ +package it.cavallium.dbengine.database.disk; + +import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB; +import static it.cavallium.dbengine.database.LLUtils.generateCustomReadOptions; +import static it.cavallium.dbengine.database.LLUtils.isBoundedRange; + +import io.netty5.buffer.api.Buffer; +import io.netty5.buffer.api.Drop; +import io.netty5.buffer.api.Owned; +import io.netty5.buffer.api.Send; +import io.netty5.buffer.api.internal.ResourceSupport; +import it.cavallium.dbengine.database.LLRange; +import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.disk.LLLocalMigrationReactiveRocksIterator.ByteEntry; +import java.util.Map; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.Nullable; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDBException; +import reactor.core.publisher.Flux; + +public final class LLLocalMigrationReactiveRocksIterator extends + ResourceSupport { + + protected static final Logger logger = LogManager.getLogger(LLLocalMigrationReactiveRocksIterator.class); + private static final Drop DROP = new Drop<>() { + @Override + public void drop(LLLocalMigrationReactiveRocksIterator obj) { + try { + if (obj.rangeShared != null) { + obj.rangeShared.close(); + } + } catch (Throwable ex) { + logger.error("Failed to close range", ex); + } + try { + if (obj.readOptions != null) { + if (!(obj.readOptions instanceof UnreleasableReadOptions)) { + obj.readOptions.close(); + } + } + } catch (Throwable ex) { + logger.error("Failed to close readOptions", ex); + } + } + + @Override + public Drop fork() { + return this; + } + + @Override + public void attach(LLLocalMigrationReactiveRocksIterator obj) { + + } + }; + + private final RocksDBColumn db; + private LLRange rangeShared; + private ReadOptions readOptions; + + @SuppressWarnings({"unchecked", "rawtypes"}) + public LLLocalMigrationReactiveRocksIterator(RocksDBColumn db, + Send range, + ReadOptions readOptions) { + super((Drop) (Drop) DROP); + try (range) { + this.db = db; + this.rangeShared = range.receive(); + this.readOptions = readOptions; + } + } + + public record ByteEntry(byte[] key, byte[] value) {} + + public Flux flux() { + return Flux.generate(() -> { + var readOptions = generateCustomReadOptions(this.readOptions, false, false, false); + return db.getRocksIterator(false, readOptions, rangeShared, false); + }, (tuple, sink) -> { + try { + var rocksIterator = tuple.iterator(); + if (rocksIterator.isValid()) { + byte[] key = rocksIterator.key(); + byte[] value = rocksIterator.value(); + rocksIterator.next(false); + sink.next(new ByteEntry(key, value)); + } else { + sink.complete(); + } + } catch (RocksDBException ex) { + sink.error(ex); + } + return tuple; + }, RocksIteratorTuple::close); + } + + @Override + protected final RuntimeException createResourceClosedException() { + return new IllegalStateException("Closed"); + } + + @Override + protected Owned prepareSend() { + var range = this.rangeShared.send(); + var readOptions = this.readOptions; + return drop -> new LLLocalMigrationReactiveRocksIterator(db, + range, + readOptions + ); + } + + protected void makeInaccessible() { + this.rangeShared = null; + this.readOptions = null; + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java index 18188c7..1fc8a6a 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java @@ -100,4 +100,6 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn { MeterRegistry getMeterRegistry(); boolean supportsTransactions(); + + void forceCompaction(int volumeId) throws RocksDBException; } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBIterator.java index 246e44d..0a82ecb 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBIterator.java @@ -147,14 +147,30 @@ public class RocksDBIterator implements SafeCloseable { } public void next() throws RocksDBException { - startedIterNext.increment(); - iterNextTime.record(rocksIterator::next); - endedIterNext.increment(); + next(true); + } + + public void next(boolean traceStats) throws RocksDBException { + if (traceStats) { + startedIterNext.increment(); + iterNextTime.record(rocksIterator::next); + endedIterNext.increment(); + } else { + rocksIterator.next(); + } } public void prev() throws RocksDBException { - startedIterNext.increment(); - iterNextTime.record(rocksIterator::prev); - endedIterNext.increment(); + prev(true); + } + + public void prev(boolean traceStats) throws RocksDBException { + if (traceStats) { + startedIterNext.increment(); + iterNextTime.record(rocksIterator::prev); + endedIterNext.increment(); + } else { + rocksIterator.prev(); + } } } diff --git a/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java b/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java index 0a47ccf..a1bdd9a 100644 --- a/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java @@ -346,6 +346,11 @@ public class LLQuicConnection implements LLDatabaseConnection { return null; } + @Override + public Mono compact() { + return null; + } + @Override public BufferAllocator getAllocator() { return allocator;