Add more metrics about payload sizes, and iteration times
This commit is contained in:
parent
6bd3fdb677
commit
7d0951956d
@ -2,32 +2,42 @@ package it.cavallium.dbengine.database.disk;
|
||||
|
||||
import static io.netty5.buffer.api.StandardAllocationTypes.OFF_HEAP;
|
||||
import static it.cavallium.dbengine.database.LLUtils.INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES;
|
||||
import static it.cavallium.dbengine.database.LLUtils.isReadOnlyDirect;
|
||||
import static java.util.Objects.requireNonNull;
|
||||
import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithValue;
|
||||
import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithoutValue;
|
||||
|
||||
import io.micrometer.core.instrument.Gauge;
|
||||
import io.micrometer.core.instrument.Counter;
|
||||
import io.micrometer.core.instrument.DistributionSummary;
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micrometer.core.instrument.Timer;
|
||||
import io.netty5.buffer.api.Buffer;
|
||||
import io.netty5.buffer.api.BufferAllocator;
|
||||
import io.netty5.buffer.api.DefaultBufferAllocators;
|
||||
import io.netty5.buffer.api.MemoryManager;
|
||||
import io.netty5.buffer.api.ReadableComponent;
|
||||
import io.netty5.buffer.api.Send;
|
||||
import io.netty5.buffer.api.WritableComponent;
|
||||
import io.netty5.util.internal.PlatformDependent;
|
||||
import it.cavallium.dbengine.database.LLRange;
|
||||
import it.cavallium.dbengine.database.LLUtils;
|
||||
import it.cavallium.dbengine.database.RepeatedElementList;
|
||||
import it.cavallium.dbengine.lucene.DirectNIOFSDirectory;
|
||||
import it.cavallium.dbengine.database.SafeCloseable;
|
||||
import it.cavallium.dbengine.database.disk.LLLocalDictionary.ReleasableSliceImplWithRelease;
|
||||
import it.cavallium.dbengine.database.disk.LLLocalDictionary.ReleasableSliceImplWithoutRelease;
|
||||
import it.cavallium.dbengine.database.serialization.SerializationFunction;
|
||||
import it.cavallium.dbengine.rpc.current.data.DatabaseOptions;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.Objects;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.rocksdb.AbstractSlice;
|
||||
import org.rocksdb.ColumnFamilyHandle;
|
||||
import org.rocksdb.CompactRangeOptions;
|
||||
import org.rocksdb.DirectSlice;
|
||||
import org.rocksdb.FlushOptions;
|
||||
import org.rocksdb.Holder;
|
||||
import org.rocksdb.KeyMayExist.KeyMayExistEnum;
|
||||
@ -35,6 +45,7 @@ import org.rocksdb.ReadOptions;
|
||||
import org.rocksdb.RocksDB;
|
||||
import org.rocksdb.RocksDBException;
|
||||
import org.rocksdb.RocksIterator;
|
||||
import org.rocksdb.Slice;
|
||||
import org.rocksdb.Transaction;
|
||||
import org.rocksdb.WriteBatch;
|
||||
import org.rocksdb.WriteOptions;
|
||||
@ -43,6 +54,10 @@ import reactor.core.scheduler.Schedulers;
|
||||
public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements RocksDBColumn
|
||||
permits StandardRocksDBColumn, OptimisticRocksDBColumn, PessimisticRocksDBColumn {
|
||||
|
||||
/**
|
||||
* Default: true
|
||||
*/
|
||||
private static final boolean USE_DIRECT_BUFFER_BOUNDS = true;
|
||||
private static final byte[] NO_DATA = new byte[0];
|
||||
protected static final UpdateAtomicResult RESULT_NOTHING = new UpdateAtomicResultNothing();
|
||||
|
||||
@ -55,11 +70,28 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
|
||||
private final ColumnFamilyHandle cfh;
|
||||
|
||||
private final MeterRegistry meterRegistry;
|
||||
private final AtomicInteger lastDataSizeMetric = new AtomicInteger(0);
|
||||
|
||||
protected final DistributionSummary keyBufferSize;
|
||||
protected final DistributionSummary readValueNotFoundWithoutBloomBufferSize;
|
||||
protected final DistributionSummary readValueNotFoundWithBloomBufferSize;
|
||||
protected final DistributionSummary readValueFoundWithBloomUncachedBufferSize;
|
||||
protected final DistributionSummary readValueFoundWithBloomCacheBufferSize;
|
||||
protected final DistributionSummary readValueFoundWithBloomSimpleBufferSize;
|
||||
protected final DistributionSummary readValueNotFoundWithMayExistBloomBufferSize;
|
||||
protected final DistributionSummary writeValueBufferSize;
|
||||
protected final DistributionSummary readAttempts;
|
||||
|
||||
private final Counter startedIterSeek;
|
||||
private final Counter endedIterSeek;
|
||||
private final Timer iterSeekTime;
|
||||
private final Counter startedIterNext;
|
||||
private final Counter endedIterNext;
|
||||
private final Timer iterNextTime;
|
||||
|
||||
public AbstractRocksDBColumn(T db,
|
||||
DatabaseOptions databaseOptions,
|
||||
BufferAllocator alloc,
|
||||
String databaseName,
|
||||
ColumnFamilyHandle cfh,
|
||||
MeterRegistry meterRegistry) {
|
||||
this.db = db;
|
||||
@ -68,11 +100,176 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
|
||||
this.alloc = alloc;
|
||||
this.cfh = cfh;
|
||||
|
||||
String columnName;
|
||||
try {
|
||||
columnName = new String(cfh.getName(), StandardCharsets.UTF_8);
|
||||
} catch (RocksDBException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
|
||||
this.meterRegistry = meterRegistry;
|
||||
Gauge
|
||||
.builder("it.cavallium.dbengine.database.disk.column.lastdatasize", lastDataSizeMetric::get)
|
||||
.description("Last data size read using get()")
|
||||
|
||||
this.keyBufferSize = DistributionSummary
|
||||
.builder("buffer.size.distribution")
|
||||
.publishPercentiles(0.2, 0.5, 0.95)
|
||||
.baseUnit("bytes")
|
||||
.scale(1)
|
||||
.publishPercentileHistogram()
|
||||
.tags("db.name", databaseName, "db.column", columnName, "buffer.type", "key")
|
||||
.register(meterRegistry);
|
||||
this.readValueNotFoundWithoutBloomBufferSize = DistributionSummary
|
||||
.builder("buffer.size.distribution")
|
||||
.publishPercentiles(0.2, 0.5, 0.95)
|
||||
.baseUnit("bytes")
|
||||
.scale(1)
|
||||
.publishPercentileHistogram()
|
||||
.tags("db.name", databaseName, "db.column", columnName, "buffer.type", "val.read", "found", "false")
|
||||
.register(meterRegistry);
|
||||
this.readValueNotFoundWithBloomBufferSize = DistributionSummary
|
||||
.builder("buffer.size.distribution")
|
||||
.publishPercentiles(0.2, 0.5, 0.95)
|
||||
.baseUnit("bytes")
|
||||
.scale(1)
|
||||
.publishPercentileHistogram()
|
||||
.tags("db.name", databaseName, "db.column", columnName, "buffer.type", "val.read", "found", "false", "bloom", "hit.notfound")
|
||||
.register(meterRegistry);
|
||||
this.readValueFoundWithBloomUncachedBufferSize = DistributionSummary
|
||||
.builder("buffer.size.distribution")
|
||||
.publishPercentiles(0.2, 0.5, 0.95)
|
||||
.baseUnit("bytes")
|
||||
.scale(1)
|
||||
.publishPercentileHistogram()
|
||||
.tags("db.name", databaseName, "db.column", columnName, "buffer.type", "val.read", "found", "true", "bloom", "hit.found")
|
||||
.register(meterRegistry);
|
||||
this.readValueFoundWithBloomCacheBufferSize = DistributionSummary
|
||||
.builder("buffer.size.distribution")
|
||||
.publishPercentiles(0.2, 0.5, 0.95)
|
||||
.baseUnit("bytes")
|
||||
.scale(1)
|
||||
.publishPercentileHistogram()
|
||||
.tags("db.name", databaseName, "db.column", columnName, "buffer.type", "val.read", "found", "true", "bloom", "hit.cachedvalue")
|
||||
.register(meterRegistry);
|
||||
this.readValueFoundWithBloomSimpleBufferSize = DistributionSummary
|
||||
.builder("buffer.size.distribution")
|
||||
.publishPercentiles(0.2, 0.5, 0.95)
|
||||
.baseUnit("bytes")
|
||||
.scale(1)
|
||||
.publishPercentileHistogram()
|
||||
.tags("db.name", databaseName, "db.column", columnName, "buffer.type", "val.read", "found", "true", "bloom", "hit")
|
||||
.register(meterRegistry);
|
||||
this.readValueNotFoundWithMayExistBloomBufferSize = DistributionSummary
|
||||
.builder("buffer.size.distribution")
|
||||
.publishPercentiles(0.2, 0.5, 0.95)
|
||||
.baseUnit("bytes")
|
||||
.scale(1)
|
||||
.publishPercentileHistogram()
|
||||
.tags("db.name", databaseName, "db.column", columnName, "buffer.type", "val.read", "found", "false", "bloom", "hit.wrong")
|
||||
.register(meterRegistry);
|
||||
this.writeValueBufferSize = DistributionSummary
|
||||
.builder("buffer.size.distribution")
|
||||
.publishPercentiles(0.2, 0.5, 0.95)
|
||||
.baseUnit("bytes")
|
||||
.scale(1)
|
||||
.publishPercentileHistogram()
|
||||
.tags("db.name", databaseName, "db.column", columnName, "buffer.type", "val.write")
|
||||
.register(meterRegistry);
|
||||
this.readAttempts = DistributionSummary
|
||||
.builder("db.read.attempts.distribution")
|
||||
.publishPercentiles(0.2, 0.5, 0.95)
|
||||
.baseUnit("times")
|
||||
.scale(1)
|
||||
.publishPercentileHistogram()
|
||||
.tags("db.name", databaseName, "db.column", columnName)
|
||||
.register(meterRegistry);
|
||||
|
||||
this.startedIterSeek = meterRegistry.counter("db.read.iter.seek.started.counter", "db.name", databaseName, "db.column", columnName);
|
||||
this.endedIterSeek = meterRegistry.counter("db.read.iter.seek.ended.counter", "db.name", databaseName, "db.column", columnName);
|
||||
this.iterSeekTime = Timer
|
||||
.builder("db.read.iter.seek.timer")
|
||||
.publishPercentiles(0.2, 0.5, 0.95)
|
||||
.publishPercentileHistogram()
|
||||
.tags("db.name", databaseName, "db.column", columnName)
|
||||
.register(meterRegistry);
|
||||
this.startedIterNext = meterRegistry.counter("db.read.iter.next.started.counter", "db.name", databaseName, "db.column", columnName);
|
||||
this.endedIterNext = meterRegistry.counter("db.read.iter.next.ended.counter", "db.name", databaseName, "db.column", columnName);
|
||||
this.iterNextTime = Timer
|
||||
.builder("db.read.iter.next.timer")
|
||||
.publishPercentiles(0.2, 0.5, 0.95)
|
||||
.publishPercentileHistogram()
|
||||
.tags("db.name", databaseName, "db.column", columnName)
|
||||
.register(meterRegistry);
|
||||
}
|
||||
|
||||
/**
|
||||
* This method should not modify or move the writerIndex/readerIndex of the key
|
||||
*/
|
||||
static ReleasableSlice setIterateBound(boolean allowNettyDirect,
|
||||
ReadOptions readOpts, IterateBound boundType, Buffer key) {
|
||||
requireNonNull(key);
|
||||
AbstractSlice<?> slice;
|
||||
if (allowNettyDirect && USE_DIRECT_BUFFER_BOUNDS && isReadOnlyDirect(key)) {
|
||||
ByteBuffer keyInternalByteBuffer = ((ReadableComponent) key).readableBuffer();
|
||||
assert keyInternalByteBuffer.position() == 0;
|
||||
slice = new DirectSlice(keyInternalByteBuffer, key.readableBytes());
|
||||
assert slice.size() == key.readableBytes();
|
||||
} else {
|
||||
slice = new Slice(requireNonNull(LLUtils.toArray(key)));
|
||||
}
|
||||
if (boundType == IterateBound.LOWER) {
|
||||
readOpts.setIterateLowerBound(slice);
|
||||
} else {
|
||||
readOpts.setIterateUpperBound(slice);
|
||||
}
|
||||
return new ReleasableSliceImplWithRelease(slice);
|
||||
}
|
||||
|
||||
static ReleasableSlice emptyReleasableSlice() {
|
||||
var arr = new byte[0];
|
||||
|
||||
return new ReleasableSliceImplWithoutRelease(new Slice(arr));
|
||||
}
|
||||
|
||||
/**
|
||||
* This method should not modify or move the writerIndex/readerIndex of the buffers inside the range
|
||||
*/
|
||||
@NotNull
|
||||
public RocksIteratorTuple getRocksIterator(boolean allowNettyDirect,
|
||||
ReadOptions readOptions,
|
||||
LLRange range,
|
||||
boolean reverse) throws RocksDBException {
|
||||
assert !Schedulers.isInNonBlockingThread() : "Called getRocksIterator in a nonblocking thread";
|
||||
ReleasableSlice sliceMin;
|
||||
ReleasableSlice sliceMax;
|
||||
if (range.hasMin()) {
|
||||
sliceMin = setIterateBound(allowNettyDirect, readOptions, IterateBound.LOWER, range.getMinUnsafe());
|
||||
} else {
|
||||
sliceMin = emptyReleasableSlice();
|
||||
}
|
||||
if (range.hasMax()) {
|
||||
sliceMax = setIterateBound(allowNettyDirect, readOptions, IterateBound.UPPER, range.getMaxUnsafe());
|
||||
} else {
|
||||
sliceMax = emptyReleasableSlice();
|
||||
}
|
||||
var rocksIterator = this.newIterator(readOptions);
|
||||
SafeCloseable seekFromOrTo;
|
||||
if (reverse) {
|
||||
if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMax()) {
|
||||
seekFromOrTo = Objects.requireNonNullElseGet(rocksIterator.seekFrom(range.getMaxUnsafe()),
|
||||
() -> ((SafeCloseable) () -> {}));
|
||||
} else {
|
||||
seekFromOrTo = () -> {};
|
||||
rocksIterator.seekToLast();
|
||||
}
|
||||
} else {
|
||||
if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) {
|
||||
seekFromOrTo = Objects.requireNonNullElseGet(rocksIterator.seekTo(range.getMinUnsafe()),
|
||||
() -> ((SafeCloseable) () -> {}));
|
||||
} else {
|
||||
seekFromOrTo = () -> {};
|
||||
rocksIterator.seekToFirst();
|
||||
}
|
||||
}
|
||||
return new RocksIteratorTuple(List.of(readOptions), rocksIterator, sliceMin, sliceMax, seekFromOrTo);
|
||||
}
|
||||
|
||||
protected T getDb() {
|
||||
@ -102,126 +299,155 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
|
||||
if (!cfh.isOwningHandle()) {
|
||||
throw new IllegalStateException("Column family is closed");
|
||||
}
|
||||
if (nettyDirect) {
|
||||
// Get the key nio buffer to pass to RocksDB
|
||||
ByteBuffer keyNioBuffer;
|
||||
boolean mustCloseKey;
|
||||
{
|
||||
if (!LLUtils.isReadOnlyDirect(key)) {
|
||||
// If the nio buffer is not available, copy the netty buffer into a new direct buffer
|
||||
mustCloseKey = true;
|
||||
var directKey = DefaultBufferAllocators.offHeapAllocator().allocate(key.readableBytes());
|
||||
key.copyInto(key.readerOffset(), directKey, 0, key.readableBytes());
|
||||
key = directKey;
|
||||
} else {
|
||||
mustCloseKey = false;
|
||||
}
|
||||
keyNioBuffer = ((ReadableComponent) key).readableBuffer();
|
||||
assert keyNioBuffer.isDirect();
|
||||
assert keyNioBuffer.limit() == key.readableBytes();
|
||||
}
|
||||
|
||||
try {
|
||||
// Create a direct result buffer because RocksDB works only with direct buffers
|
||||
var resultBuffer = alloc.allocate(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES);
|
||||
try {
|
||||
assert resultBuffer.readerOffset() == 0;
|
||||
assert resultBuffer.writerOffset() == 0;
|
||||
var resultWritable = ((WritableComponent) resultBuffer).writableBuffer();
|
||||
|
||||
var keyMayExist = db.keyMayExist(cfh, readOptions, keyNioBuffer.rewind(), resultWritable.clear());
|
||||
KeyMayExistEnum keyMayExistState = keyMayExist.exists;
|
||||
int keyMayExistValueLength = keyMayExist.valueLength;
|
||||
// At the beginning, size reflects the expected size, then it becomes the real data size
|
||||
int size = keyMayExistState == kExistsWithValue ? keyMayExistValueLength : -1;
|
||||
switch (keyMayExistState) {
|
||||
case kNotExist: {
|
||||
resultBuffer.close();
|
||||
return null;
|
||||
}
|
||||
// todo: kExistsWithValue is not reliable (read below),
|
||||
// in some cases it should be treated as kExistsWithoutValue
|
||||
case kExistsWithValue:
|
||||
case kExistsWithoutValue: {
|
||||
boolean isKExistsWithoutValue = false;
|
||||
if (keyMayExistState == kExistsWithoutValue) {
|
||||
isKExistsWithoutValue = true;
|
||||
} else {
|
||||
// todo: "size == 0 || resultWritable.limit() == 0" is checked because keyMayExist is broken,
|
||||
// and sometimes it returns an empty array, as if it exists
|
||||
if (size == 0 || resultWritable.limit() == 0) {
|
||||
isKExistsWithoutValue = true;
|
||||
}
|
||||
}
|
||||
if (isKExistsWithoutValue) {
|
||||
assert keyMayExistValueLength == 0;
|
||||
resultWritable.clear();
|
||||
// real data size
|
||||
size = db.get(cfh, readOptions, keyNioBuffer.rewind(), resultWritable.clear());
|
||||
if (size == RocksDB.NOT_FOUND) {
|
||||
resultBuffer.close();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
default: {
|
||||
// real data size
|
||||
this.lastDataSizeMetric.set(size);
|
||||
assert size >= 0;
|
||||
if (size <= resultWritable.limit()) {
|
||||
assert size == resultWritable.limit();
|
||||
return resultBuffer.writerOffset(resultWritable.limit());
|
||||
} else {
|
||||
resultBuffer.ensureWritable(size);
|
||||
resultWritable = ((WritableComponent) resultBuffer).writableBuffer();
|
||||
assert resultBuffer.readerOffset() == 0;
|
||||
assert resultBuffer.writerOffset() == 0;
|
||||
|
||||
size = db.get(cfh, readOptions, keyNioBuffer.rewind(), resultWritable.clear());
|
||||
if (size == RocksDB.NOT_FOUND) {
|
||||
resultBuffer.close();
|
||||
return null;
|
||||
}
|
||||
assert size == resultWritable.limit();
|
||||
return resultBuffer.writerOffset(resultWritable.limit());
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
resultBuffer.close();
|
||||
throw t;
|
||||
}
|
||||
} finally {
|
||||
if (mustCloseKey) {
|
||||
key.close();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
byte[] keyArray = LLUtils.toArray(key);
|
||||
requireNonNull(keyArray);
|
||||
Holder<byte[]> data = new Holder<>();
|
||||
if (db.keyMayExist(cfh, readOptions, keyArray, data)) {
|
||||
// todo: "data.getValue().length > 0" is checked because keyMayExist is broken, and sometimes it
|
||||
// returns an empty array, as if it exists
|
||||
if (data.getValue() != null && data.getValue().length > 0) {
|
||||
return LLUtils.fromByteArray(alloc, data.getValue());
|
||||
keyBufferSize.record(key.readableBytes());
|
||||
int readAttemptsCount = 0;
|
||||
try {
|
||||
if (nettyDirect) {
|
||||
// Get the key nio buffer to pass to RocksDB
|
||||
ByteBuffer keyNioBuffer;
|
||||
boolean mustCloseKey;
|
||||
{
|
||||
if (!LLUtils.isReadOnlyDirect(key)) {
|
||||
// If the nio buffer is not available, copy the netty buffer into a new direct buffer
|
||||
mustCloseKey = true;
|
||||
var directKey = DefaultBufferAllocators.offHeapAllocator().allocate(key.readableBytes());
|
||||
key.copyInto(key.readerOffset(), directKey, 0, key.readableBytes());
|
||||
key = directKey;
|
||||
} else {
|
||||
byte[] result = db.get(cfh, readOptions, keyArray);
|
||||
if (result == null) {
|
||||
return null;
|
||||
} else {
|
||||
return LLUtils.fromByteArray(alloc, result);
|
||||
}
|
||||
mustCloseKey = false;
|
||||
}
|
||||
} else {
|
||||
return null;
|
||||
keyNioBuffer = ((ReadableComponent) key).readableBuffer();
|
||||
assert keyNioBuffer.isDirect();
|
||||
assert keyNioBuffer.limit() == key.readableBytes();
|
||||
}
|
||||
} finally {
|
||||
if (!(readOptions instanceof UnreleasableReadOptions)) {
|
||||
readOptions.close();
|
||||
|
||||
try {
|
||||
// Create a direct result buffer because RocksDB works only with direct buffers
|
||||
var resultBuffer = alloc.allocate(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES);
|
||||
try {
|
||||
assert resultBuffer.readerOffset() == 0;
|
||||
assert resultBuffer.writerOffset() == 0;
|
||||
var resultWritable = ((WritableComponent) resultBuffer).writableBuffer();
|
||||
|
||||
var keyMayExist = db.keyMayExist(cfh, readOptions, keyNioBuffer.rewind(), resultWritable.clear());
|
||||
KeyMayExistEnum keyMayExistState = keyMayExist.exists;
|
||||
int keyMayExistValueLength = keyMayExist.valueLength;
|
||||
// At the beginning, size reflects the expected size, then it becomes the real data size
|
||||
int size = keyMayExistState == kExistsWithValue ? keyMayExistValueLength : -1;
|
||||
boolean isKExistsWithoutValue = false;
|
||||
switch (keyMayExistState) {
|
||||
case kNotExist: {
|
||||
readValueNotFoundWithBloomBufferSize.record(0);
|
||||
resultBuffer.close();
|
||||
return null;
|
||||
}
|
||||
// todo: kExistsWithValue is not reliable (read below),
|
||||
// in some cases it should be treated as kExistsWithoutValue
|
||||
case kExistsWithValue:
|
||||
case kExistsWithoutValue: {
|
||||
if (keyMayExistState == kExistsWithoutValue) {
|
||||
isKExistsWithoutValue = true;
|
||||
} else {
|
||||
// todo: "size == 0 || resultWritable.limit() == 0" is checked because keyMayExist is broken,
|
||||
// and sometimes it returns an empty array, as if it exists
|
||||
if (size == 0 || resultWritable.limit() == 0) {
|
||||
isKExistsWithoutValue = true;
|
||||
}
|
||||
}
|
||||
if (isKExistsWithoutValue) {
|
||||
assert keyMayExistValueLength == 0;
|
||||
resultWritable.clear();
|
||||
readAttemptsCount++;
|
||||
// real data size
|
||||
size = db.get(cfh, readOptions, keyNioBuffer.rewind(), resultWritable.clear());
|
||||
if (size == RocksDB.NOT_FOUND) {
|
||||
resultBuffer.close();
|
||||
readValueNotFoundWithMayExistBloomBufferSize.record(0);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
default: {
|
||||
// real data size
|
||||
assert size >= 0;
|
||||
if (size <= resultWritable.limit()) {
|
||||
if (isKExistsWithoutValue) {
|
||||
readValueFoundWithBloomUncachedBufferSize.record(size);
|
||||
} else {
|
||||
readValueFoundWithBloomCacheBufferSize.record(size);
|
||||
}
|
||||
assert size == resultWritable.limit();
|
||||
return resultBuffer.writerOffset(resultWritable.limit());
|
||||
} else {
|
||||
resultBuffer.ensureWritable(size);
|
||||
resultWritable = ((WritableComponent) resultBuffer).writableBuffer();
|
||||
assert resultBuffer.readerOffset() == 0;
|
||||
assert resultBuffer.writerOffset() == 0;
|
||||
|
||||
readAttemptsCount++;
|
||||
size = db.get(cfh, readOptions, keyNioBuffer.rewind(), resultWritable.clear());
|
||||
if (size == RocksDB.NOT_FOUND) {
|
||||
readValueNotFoundWithMayExistBloomBufferSize.record(0);
|
||||
resultBuffer.close();
|
||||
return null;
|
||||
}
|
||||
assert size == resultWritable.limit();
|
||||
if (isKExistsWithoutValue) {
|
||||
readValueFoundWithBloomUncachedBufferSize.record(size);
|
||||
} else {
|
||||
readValueFoundWithBloomCacheBufferSize.record(size);
|
||||
}
|
||||
return resultBuffer.writerOffset(resultWritable.limit());
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
resultBuffer.close();
|
||||
throw t;
|
||||
}
|
||||
} finally {
|
||||
if (mustCloseKey) {
|
||||
key.close();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
byte[] keyArray = LLUtils.toArray(key);
|
||||
requireNonNull(keyArray);
|
||||
Holder<byte[]> data = new Holder<>();
|
||||
if (db.keyMayExist(cfh, readOptions, keyArray, data)) {
|
||||
// todo: "data.getValue().length > 0" is checked because keyMayExist is broken, and sometimes it
|
||||
// returns an empty array, as if it exists
|
||||
if (data.getValue() != null && data.getValue().length > 0) {
|
||||
readValueFoundWithBloomCacheBufferSize.record(data.getValue().length);
|
||||
return LLUtils.fromByteArray(alloc, data.getValue());
|
||||
} else {
|
||||
readAttemptsCount++;
|
||||
byte[] result = db.get(cfh, readOptions, keyArray);
|
||||
if (result == null) {
|
||||
if (data.getValue() != null) {
|
||||
readValueNotFoundWithBloomBufferSize.record(0);
|
||||
} else {
|
||||
readValueNotFoundWithMayExistBloomBufferSize.record(0);
|
||||
}
|
||||
return null;
|
||||
} else {
|
||||
readValueFoundWithBloomUncachedBufferSize.record(0);
|
||||
return LLUtils.fromByteArray(alloc, result);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
readValueNotFoundWithBloomBufferSize.record(0);
|
||||
return null;
|
||||
}
|
||||
} finally {
|
||||
if (!(readOptions instanceof UnreleasableReadOptions)) {
|
||||
readOptions.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
readAttempts.record(readAttemptsCount);
|
||||
}
|
||||
}
|
||||
|
||||
@ -242,6 +468,8 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
|
||||
}
|
||||
assert key.isAccessible();
|
||||
assert value.isAccessible();
|
||||
this.keyBufferSize.record(key.readableBytes());
|
||||
this.writeValueBufferSize.record(value.readableBytes());
|
||||
if (nettyDirect) {
|
||||
// Get the key nio buffer to pass to RocksDB
|
||||
ByteBuffer keyNioBuffer;
|
||||
@ -336,8 +564,16 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
|
||||
try {
|
||||
if (db.keyMayExist(cfh, keyNioBuffer)) {
|
||||
int size = db.get(cfh, readOptions, keyNioBuffer.position(0), LLUtils.EMPTY_BYTE_BUFFER);
|
||||
return size != RocksDB.NOT_FOUND;
|
||||
boolean found = size != RocksDB.NOT_FOUND;
|
||||
if (found) {
|
||||
readValueFoundWithBloomSimpleBufferSize.record(size);
|
||||
return true;
|
||||
} else {
|
||||
readValueNotFoundWithMayExistBloomBufferSize.record(0);
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
readValueNotFoundWithBloomBufferSize.record(0);
|
||||
return false;
|
||||
}
|
||||
} finally {
|
||||
@ -349,8 +585,10 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
|
||||
int size = RocksDB.NOT_FOUND;
|
||||
byte[] keyBytes = LLUtils.toArray(key);
|
||||
Holder<byte[]> data = new Holder<>();
|
||||
boolean mayExistHit = false;
|
||||
try {
|
||||
if (db.keyMayExist(cfh, readOptions, keyBytes, data)) {
|
||||
mayExistHit = true;
|
||||
if (data.getValue() != null) {
|
||||
size = data.getValue().length;
|
||||
} else {
|
||||
@ -362,7 +600,17 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
|
||||
readOptions.close();
|
||||
}
|
||||
}
|
||||
return size != RocksDB.NOT_FOUND;
|
||||
boolean found = size != RocksDB.NOT_FOUND;
|
||||
if (found) {
|
||||
readValueFoundWithBloomSimpleBufferSize.record(size);
|
||||
} else {
|
||||
if (mayExistHit) {
|
||||
readValueNotFoundWithMayExistBloomBufferSize.record(0);
|
||||
} else {
|
||||
readValueNotFoundWithBloomBufferSize.record(0);
|
||||
}
|
||||
}
|
||||
return found;
|
||||
}
|
||||
}
|
||||
|
||||
@ -377,6 +625,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
|
||||
if (!cfh.isOwningHandle()) {
|
||||
throw new IllegalStateException("Column family is closed");
|
||||
}
|
||||
keyBufferSize.record(key.readableBytes());
|
||||
if (nettyDirect) {
|
||||
// Get the key nio buffer to pass to RocksDB
|
||||
ByteBuffer keyNioBuffer;
|
||||
@ -418,6 +667,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
|
||||
if (!cfh.isOwningHandle()) {
|
||||
throw new IllegalStateException("Column family is closed");
|
||||
}
|
||||
keyBufferSize.record(key.length);
|
||||
db.delete(cfh, writeOptions, key);
|
||||
}
|
||||
|
||||
@ -432,6 +682,9 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
|
||||
if (!cfh.isOwningHandle()) {
|
||||
throw new IllegalStateException("Column family is closed");
|
||||
}
|
||||
for (byte[] key : keys) {
|
||||
keyBufferSize.record(key.length);
|
||||
}
|
||||
var columnFamilyHandles = new RepeatedElementList<>(cfh, keys.size());
|
||||
return db.multiGetAsList(readOptions, columnFamilyHandles, keys);
|
||||
}
|
||||
@ -519,9 +772,24 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
|
||||
|
||||
protected abstract Transaction beginTransaction(@NotNull WriteOptions writeOptions);
|
||||
|
||||
@Override
|
||||
public final @NotNull UpdateAtomicResult updateAtomic(@NotNull ReadOptions readOptions,
|
||||
@NotNull WriteOptions writeOptions,
|
||||
Send<Buffer> keySend,
|
||||
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
|
||||
UpdateAtomicResultMode returnMode) throws IOException {
|
||||
return updateAtomicImpl(readOptions, writeOptions, keySend, updater, returnMode);
|
||||
}
|
||||
|
||||
protected abstract @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull ReadOptions readOptions,
|
||||
@NotNull WriteOptions writeOptions,
|
||||
Send<Buffer> keySend,
|
||||
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
|
||||
UpdateAtomicResultMode returnMode) throws IOException;
|
||||
|
||||
@Override
|
||||
@NotNull
|
||||
public RocksIterator newIterator(@NotNull ReadOptions readOptions) {
|
||||
public RocksDBIterator newIterator(@NotNull ReadOptions readOptions) {
|
||||
if (!db.isOwningHandle()) {
|
||||
throw new IllegalStateException("Database is closed");
|
||||
}
|
||||
@ -531,7 +799,15 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
|
||||
if (!cfh.isOwningHandle()) {
|
||||
throw new IllegalStateException("Column family is closed");
|
||||
}
|
||||
return db.newIterator(cfh, readOptions);
|
||||
return new RocksDBIterator(db.newIterator(cfh, readOptions),
|
||||
nettyDirect,
|
||||
this.startedIterSeek,
|
||||
this.endedIterSeek,
|
||||
this.iterSeekTime,
|
||||
this.startedIterNext,
|
||||
this.endedIterNext,
|
||||
this.iterNextTime
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -30,6 +30,7 @@ import it.cavallium.dbengine.database.serialization.KVSerializationFunction;
|
||||
import it.cavallium.dbengine.database.serialization.SerializationFunction;
|
||||
import it.cavallium.dbengine.rpc.current.data.DatabaseOptions;
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
@ -108,11 +109,6 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
static final boolean PARALLEL_EXACT_SIZE = true;
|
||||
|
||||
private static final byte[] FIRST_KEY = new byte[]{};
|
||||
|
||||
/**
|
||||
* Default: true
|
||||
*/
|
||||
private static final boolean USE_DIRECT_BUFFER_BOUNDS = true;
|
||||
/**
|
||||
* 1KiB dummy buffer, write only, used for debugging purposes
|
||||
*/
|
||||
@ -318,17 +314,18 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
readOpts.setIterateUpperBound(slice2 = new Slice(LLUtils.toArray(range.getMaxUnsafe())));
|
||||
}
|
||||
}
|
||||
try (RocksIterator rocksIterator = db.newIterator(readOpts)) {
|
||||
try (var rocksIterator = db.newIterator(readOpts)) {
|
||||
if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) {
|
||||
if (nettyDirect && isReadOnlyDirect(range.getMinUnsafe())) {
|
||||
rocksIterator.seek(((ReadableComponent) range.getMinUnsafe()).readableBuffer());
|
||||
var seekBuf = ((ReadableComponent) range.getMinUnsafe()).readableBuffer();
|
||||
rocksIterator.seek(seekBuf);
|
||||
} else {
|
||||
rocksIterator.seek(LLUtils.toArray(range.getMinUnsafe()));
|
||||
var seekArray = LLUtils.toArray(range.getMinUnsafe());
|
||||
rocksIterator.seek(seekArray);
|
||||
}
|
||||
} else {
|
||||
rocksIterator.seekToFirst();
|
||||
}
|
||||
rocksIterator.status();
|
||||
return !rocksIterator.isValid();
|
||||
}
|
||||
} finally {
|
||||
@ -893,21 +890,17 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
}
|
||||
}
|
||||
ro.setVerifyChecksums(true);
|
||||
try (var rocksIteratorTuple = getRocksIterator(nettyDirect, ro, range, db, false)) {
|
||||
try (var rocksIteratorTuple = db.getRocksIterator(nettyDirect, ro, range, false)) {
|
||||
var rocksIterator = rocksIteratorTuple.iterator();
|
||||
rocksIterator.seekToFirst();
|
||||
rocksIterator.status();
|
||||
while (rocksIterator.isValid() && !sink.isCancelled()) {
|
||||
try {
|
||||
rocksIterator.status();
|
||||
rocksIterator.key(DUMMY_WRITE_ONLY_BYTE_BUFFER);
|
||||
rocksIterator.status();
|
||||
rocksIterator.value(DUMMY_WRITE_ONLY_BYTE_BUFFER);
|
||||
rocksIterator.status();
|
||||
rocksIterator.next();
|
||||
} catch (RocksDBException ex) {
|
||||
sink.next(new BadBlock(databaseName, ColumnUtils.special(columnName), null, ex));
|
||||
}
|
||||
rocksIterator.next();
|
||||
}
|
||||
}
|
||||
sink.complete();
|
||||
@ -979,33 +972,31 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
try (var opts = new ReadOptions(EMPTY_READ_OPTIONS)) {
|
||||
ReleasableSlice minBound;
|
||||
if (range.hasMin()) {
|
||||
minBound = setIterateBound(nettyDirect, opts, IterateBound.LOWER, range.getMinUnsafe());
|
||||
minBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, opts, IterateBound.LOWER, range.getMinUnsafe());
|
||||
} else {
|
||||
minBound = emptyReleasableSlice();
|
||||
minBound = AbstractRocksDBColumn.emptyReleasableSlice();
|
||||
}
|
||||
try {
|
||||
ReleasableSlice maxBound;
|
||||
if (range.hasMax()) {
|
||||
maxBound = setIterateBound(nettyDirect, opts, IterateBound.UPPER, range.getMaxUnsafe());
|
||||
maxBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, opts, IterateBound.UPPER, range.getMaxUnsafe());
|
||||
} else {
|
||||
maxBound = emptyReleasableSlice();
|
||||
maxBound = AbstractRocksDBColumn.emptyReleasableSlice();
|
||||
}
|
||||
assert cfh.isOwningHandle();
|
||||
assert opts.isOwningHandle();
|
||||
SafeCloseable seekTo;
|
||||
try (RocksIterator it = db.newIterator(opts)) {
|
||||
try (var it = db.newIterator(opts)) {
|
||||
if (!PREFER_AUTO_SEEK_BOUND && range.hasMin()) {
|
||||
seekTo = rocksIterSeekTo(nettyDirect, it, range.getMinUnsafe());
|
||||
seekTo = it.seekTo(range.getMinUnsafe());
|
||||
} else {
|
||||
seekTo = null;
|
||||
it.seekToFirst();
|
||||
}
|
||||
try {
|
||||
it.status();
|
||||
while (it.isValid()) {
|
||||
db.delete(EMPTY_WRITE_OPTIONS, it.key());
|
||||
it.next();
|
||||
it.status();
|
||||
}
|
||||
} finally {
|
||||
if (seekTo != null) {
|
||||
@ -1153,31 +1144,29 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
readOpts.setFillCache(false);
|
||||
ReleasableSlice minBound;
|
||||
if (range.hasMin()) {
|
||||
minBound = setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe());
|
||||
minBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe());
|
||||
} else {
|
||||
minBound = emptyReleasableSlice();
|
||||
minBound = AbstractRocksDBColumn.emptyReleasableSlice();
|
||||
}
|
||||
try {
|
||||
ReleasableSlice maxBound;
|
||||
if (range.hasMax()) {
|
||||
maxBound = setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe());
|
||||
maxBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe());
|
||||
} else {
|
||||
maxBound = emptyReleasableSlice();
|
||||
maxBound = AbstractRocksDBColumn.emptyReleasableSlice();
|
||||
}
|
||||
try (var rocksIterator = db.newIterator(readOpts)) {
|
||||
SafeCloseable seekTo;
|
||||
if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) {
|
||||
seekTo = rocksIterSeekTo(nettyDirect, rocksIterator, range.getMinUnsafe());
|
||||
seekTo = rocksIterator.seekTo(range.getMinUnsafe());
|
||||
} else {
|
||||
seekTo = null;
|
||||
rocksIterator.seekToFirst();
|
||||
}
|
||||
try {
|
||||
rocksIterator.status();
|
||||
while (rocksIterator.isValid()) {
|
||||
writeBatch.delete(cfh, LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).send());
|
||||
rocksIterator.next();
|
||||
rocksIterator.status();
|
||||
}
|
||||
} finally {
|
||||
if (seekTo != null) {
|
||||
@ -1203,31 +1192,29 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
readOpts.setFillCache(false);
|
||||
ReleasableSlice minBound;
|
||||
if (range.hasMin()) {
|
||||
minBound = setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe());
|
||||
minBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe());
|
||||
} else {
|
||||
minBound = emptyReleasableSlice();
|
||||
minBound = AbstractRocksDBColumn.emptyReleasableSlice();
|
||||
}
|
||||
try {
|
||||
ReleasableSlice maxBound;
|
||||
if (range.hasMax()) {
|
||||
maxBound = setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe());
|
||||
maxBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe());
|
||||
} else {
|
||||
maxBound = emptyReleasableSlice();
|
||||
maxBound = AbstractRocksDBColumn.emptyReleasableSlice();
|
||||
}
|
||||
try (var rocksIterator = db.newIterator(readOpts)) {
|
||||
SafeCloseable seekTo;
|
||||
if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) {
|
||||
seekTo = rocksIterSeekTo(nettyDirect, rocksIterator, range.getMinUnsafe());
|
||||
seekTo = rocksIterator.seekTo(range.getMinUnsafe());
|
||||
} else {
|
||||
seekTo = null;
|
||||
rocksIterator.seekToFirst();
|
||||
}
|
||||
try {
|
||||
rocksIterator.status();
|
||||
while (rocksIterator.isValid()) {
|
||||
writeBatch.delete(cfh, rocksIterator.key());
|
||||
rocksIterator.next();
|
||||
rocksIterator.status();
|
||||
}
|
||||
} finally {
|
||||
if (seekTo != null) {
|
||||
@ -1244,72 +1231,6 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Useful for reverse iterations
|
||||
*/
|
||||
@Nullable
|
||||
private static SafeCloseable rocksIterSeekFrom(boolean allowNettyDirect,
|
||||
RocksIterator rocksIterator, Buffer key) {
|
||||
if (allowNettyDirect && isReadOnlyDirect(key)) {
|
||||
ByteBuffer keyInternalByteBuffer = ((ReadableComponent) key).readableBuffer();
|
||||
assert keyInternalByteBuffer.position() == 0;
|
||||
rocksIterator.seekForPrev(keyInternalByteBuffer);
|
||||
// This is useful to retain the key buffer in memory and avoid deallocations
|
||||
return key::isAccessible;
|
||||
} else {
|
||||
rocksIterator.seekForPrev(LLUtils.toArray(key));
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Useful for forward iterations
|
||||
*/
|
||||
@Nullable
|
||||
private static SafeCloseable rocksIterSeekTo(boolean allowNettyDirect,
|
||||
RocksIterator rocksIterator, Buffer key) {
|
||||
if (allowNettyDirect && isReadOnlyDirect(key)) {
|
||||
ByteBuffer keyInternalByteBuffer = ((ReadableComponent) key).readableBuffer();
|
||||
assert keyInternalByteBuffer.position() == 0;
|
||||
rocksIterator.seek(keyInternalByteBuffer);
|
||||
// This is useful to retain the key buffer in memory and avoid deallocations
|
||||
return key::isAccessible;
|
||||
} else {
|
||||
rocksIterator.seek(LLUtils.toArray(key));
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method should not modify or move the writerIndex/readerIndex of the key
|
||||
*/
|
||||
private static ReleasableSlice setIterateBound(boolean allowNettyDirect,
|
||||
ReadOptions readOpts, IterateBound boundType, Buffer key) {
|
||||
requireNonNull(key);
|
||||
AbstractSlice<?> slice;
|
||||
if (allowNettyDirect && LLLocalDictionary.USE_DIRECT_BUFFER_BOUNDS
|
||||
&& (isReadOnlyDirect(key))) {
|
||||
ByteBuffer keyInternalByteBuffer = ((ReadableComponent) key).readableBuffer();
|
||||
assert keyInternalByteBuffer.position() == 0;
|
||||
slice = new DirectSlice(keyInternalByteBuffer, key.readableBytes());
|
||||
assert slice.size() == key.readableBytes();
|
||||
} else {
|
||||
slice = new Slice(requireNonNull(LLUtils.toArray(key)));
|
||||
}
|
||||
if (boundType == IterateBound.LOWER) {
|
||||
readOpts.setIterateLowerBound(slice);
|
||||
} else {
|
||||
readOpts.setIterateUpperBound(slice);
|
||||
}
|
||||
return new ReleasableSliceImplWithRelease(slice);
|
||||
}
|
||||
|
||||
private static ReleasableSlice emptyReleasableSlice() {
|
||||
var arr = new byte[0];
|
||||
|
||||
return new ReleasableSliceImplWithoutRelease(new Slice(arr));
|
||||
}
|
||||
|
||||
/**
|
||||
* This method should not modify or move the writerIndex/readerIndex of the key
|
||||
*/
|
||||
@ -1349,20 +1270,17 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
|
||||
byte[] firstDeletedKey = null;
|
||||
byte[] lastDeletedKey = null;
|
||||
try (RocksIterator rocksIterator = db.newIterator(readOpts)) {
|
||||
try (var rocksIterator = db.newIterator(readOpts)) {
|
||||
// If the database supports transactions, delete each key one by one
|
||||
if (db.supportsTransactions()) {
|
||||
rocksIterator.seekToFirst();
|
||||
rocksIterator.status();
|
||||
while (rocksIterator.isValid()) {
|
||||
writeBatch.delete(cfh, rocksIterator.key());
|
||||
rocksIterator.next();
|
||||
rocksIterator.status();
|
||||
}
|
||||
} else {
|
||||
rocksIterator.seekToLast();
|
||||
|
||||
rocksIterator.status();
|
||||
if (rocksIterator.isValid()) {
|
||||
firstDeletedKey = FIRST_KEY;
|
||||
lastDeletedKey = rocksIterator.key();
|
||||
@ -1411,16 +1329,16 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
|
||||
ReleasableSlice minBound;
|
||||
if (range.hasMin()) {
|
||||
minBound = setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe());
|
||||
minBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe());
|
||||
} else {
|
||||
minBound = emptyReleasableSlice();
|
||||
minBound = AbstractRocksDBColumn.emptyReleasableSlice();
|
||||
}
|
||||
try {
|
||||
ReleasableSlice maxBound;
|
||||
if (range.hasMax()) {
|
||||
maxBound = setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe());
|
||||
maxBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe());
|
||||
} else {
|
||||
maxBound = emptyReleasableSlice();
|
||||
maxBound = AbstractRocksDBColumn.emptyReleasableSlice();
|
||||
}
|
||||
try {
|
||||
if (fast) {
|
||||
@ -1430,17 +1348,15 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
try (var rocksIterator = db.newIterator(readOpts)) {
|
||||
SafeCloseable seekTo;
|
||||
if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) {
|
||||
seekTo = rocksIterSeekTo(nettyDirect, rocksIterator, range.getMinUnsafe());
|
||||
seekTo = rocksIterator.seekTo(range.getMinUnsafe());
|
||||
} else {
|
||||
seekTo = null;
|
||||
rocksIterator.seekToFirst();
|
||||
}
|
||||
try {
|
||||
long i = 0;
|
||||
rocksIterator.status();
|
||||
while (rocksIterator.isValid()) {
|
||||
rocksIterator.next();
|
||||
rocksIterator.status();
|
||||
i++;
|
||||
}
|
||||
sink.next(i);
|
||||
@ -1472,27 +1388,26 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
|
||||
ReleasableSlice minBound;
|
||||
if (range.hasMin()) {
|
||||
minBound = setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe());
|
||||
minBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe());
|
||||
} else {
|
||||
minBound = emptyReleasableSlice();
|
||||
minBound = AbstractRocksDBColumn.emptyReleasableSlice();
|
||||
}
|
||||
try {
|
||||
ReleasableSlice maxBound;
|
||||
if (range.hasMax()) {
|
||||
maxBound = setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe());
|
||||
maxBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe());
|
||||
} else {
|
||||
maxBound = emptyReleasableSlice();
|
||||
maxBound = AbstractRocksDBColumn.emptyReleasableSlice();
|
||||
}
|
||||
try (var rocksIterator = db.newIterator(readOpts)) {
|
||||
SafeCloseable seekTo;
|
||||
if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) {
|
||||
seekTo = rocksIterSeekTo(nettyDirect, rocksIterator, range.getMinUnsafe());
|
||||
seekTo = rocksIterator.seekTo(range.getMinUnsafe());
|
||||
} else {
|
||||
seekTo = null;
|
||||
rocksIterator.seekToFirst();
|
||||
}
|
||||
try {
|
||||
rocksIterator.status();
|
||||
if (rocksIterator.isValid()) {
|
||||
try (var key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key)) {
|
||||
try (var value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value)) {
|
||||
@ -1528,27 +1443,26 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
|
||||
ReleasableSlice minBound;
|
||||
if (range.hasMin()) {
|
||||
minBound = setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe());
|
||||
minBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe());
|
||||
} else {
|
||||
minBound = emptyReleasableSlice();
|
||||
minBound = AbstractRocksDBColumn.emptyReleasableSlice();
|
||||
}
|
||||
try {
|
||||
ReleasableSlice maxBound;
|
||||
if (range.hasMax()) {
|
||||
maxBound = setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe());
|
||||
maxBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe());
|
||||
} else {
|
||||
maxBound = emptyReleasableSlice();
|
||||
maxBound = AbstractRocksDBColumn.emptyReleasableSlice();
|
||||
}
|
||||
try (var rocksIterator = db.newIterator(readOpts)) {
|
||||
SafeCloseable seekTo;
|
||||
if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) {
|
||||
seekTo = rocksIterSeekTo(nettyDirect, rocksIterator, range.getMinUnsafe());
|
||||
seekTo = rocksIterator.seekTo(range.getMinUnsafe());
|
||||
} else {
|
||||
seekTo = null;
|
||||
rocksIterator.seekToFirst();
|
||||
}
|
||||
try {
|
||||
rocksIterator.status();
|
||||
if (rocksIterator.isValid()) {
|
||||
sink.next(LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).send());
|
||||
} else {
|
||||
@ -1588,14 +1502,12 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
rocksdbSnapshot.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
|
||||
rocksdbSnapshot.setIgnoreRangeDeletions(true);
|
||||
long count = 0;
|
||||
try (RocksIterator rocksIterator = db.newIterator(rocksdbSnapshot)) {
|
||||
try (var rocksIterator = db.newIterator(rocksdbSnapshot)) {
|
||||
rocksIterator.seekToFirst();
|
||||
rocksIterator.status();
|
||||
// If it's a fast size of a snapshot, count only up to 100'000 elements
|
||||
while (rocksIterator.isValid() && count < 100_000) {
|
||||
count++;
|
||||
rocksIterator.next();
|
||||
rocksIterator.status();
|
||||
}
|
||||
return count;
|
||||
}
|
||||
@ -1644,13 +1556,11 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
if (sliceBegin != null) {
|
||||
rangeReadOpts.setIterateUpperBound(sliceEnd);
|
||||
}
|
||||
try (RocksIterator rocksIterator = db.newIterator(rangeReadOpts)) {
|
||||
try (var rocksIterator = db.newIterator(rangeReadOpts)) {
|
||||
rocksIterator.seekToFirst();
|
||||
rocksIterator.status();
|
||||
while (rocksIterator.isValid()) {
|
||||
partialCount++;
|
||||
rocksIterator.next();
|
||||
rocksIterator.status();
|
||||
}
|
||||
return partialCount;
|
||||
}
|
||||
@ -1673,13 +1583,15 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
return count;
|
||||
} else {
|
||||
long count = 0;
|
||||
try (RocksIterator iter = db.newIterator(readOpts)) {
|
||||
iter.seekToFirst();
|
||||
while (iter.isValid()) {
|
||||
try (var rocksIterator = db.newIterator(readOpts)) {
|
||||
rocksIterator.seekToFirst();
|
||||
while (rocksIterator.isValid()) {
|
||||
count++;
|
||||
iter.next();
|
||||
rocksIterator.next();
|
||||
}
|
||||
return count;
|
||||
} catch (RocksDBException ex) {
|
||||
throw new IllegalStateException("Failed to read exact size", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1693,27 +1605,26 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
try (var readOpts = new ReadOptions(getReadOptions(null))) {
|
||||
ReleasableSlice minBound;
|
||||
if (range.hasMin()) {
|
||||
minBound = setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe());
|
||||
minBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe());
|
||||
} else {
|
||||
minBound = emptyReleasableSlice();
|
||||
minBound = AbstractRocksDBColumn.emptyReleasableSlice();
|
||||
}
|
||||
try {
|
||||
ReleasableSlice maxBound;
|
||||
if (range.hasMax()) {
|
||||
maxBound = setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe());
|
||||
maxBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe());
|
||||
} else {
|
||||
maxBound = emptyReleasableSlice();
|
||||
maxBound = AbstractRocksDBColumn.emptyReleasableSlice();
|
||||
}
|
||||
try (RocksIterator rocksIterator = db.newIterator(readOpts)) {
|
||||
try (var rocksIterator = db.newIterator(readOpts)) {
|
||||
SafeCloseable seekTo;
|
||||
if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) {
|
||||
seekTo = rocksIterSeekTo(nettyDirect, rocksIterator, range.getMinUnsafe());
|
||||
seekTo = rocksIterator.seekTo(range.getMinUnsafe());
|
||||
} else {
|
||||
seekTo = null;
|
||||
rocksIterator.seekToFirst();
|
||||
}
|
||||
try {
|
||||
rocksIterator.status();
|
||||
if (!rocksIterator.isValid()) {
|
||||
sink.complete();
|
||||
return;
|
||||
@ -1740,47 +1651,4 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* This method should not modify or move the writerIndex/readerIndex of the buffers inside the range
|
||||
*/
|
||||
@NotNull
|
||||
public static RocksIteratorTuple getRocksIterator(boolean allowNettyDirect,
|
||||
ReadOptions readOptions,
|
||||
LLRange range,
|
||||
RocksDBColumn db,
|
||||
boolean reverse) {
|
||||
assert !Schedulers.isInNonBlockingThread() : "Called getRocksIterator in a nonblocking thread";
|
||||
ReleasableSlice sliceMin;
|
||||
ReleasableSlice sliceMax;
|
||||
if (range.hasMin()) {
|
||||
sliceMin = setIterateBound(allowNettyDirect, readOptions, IterateBound.LOWER, range.getMinUnsafe());
|
||||
} else {
|
||||
sliceMin = emptyReleasableSlice();
|
||||
}
|
||||
if (range.hasMax()) {
|
||||
sliceMax = setIterateBound(allowNettyDirect, readOptions, IterateBound.UPPER, range.getMaxUnsafe());
|
||||
} else {
|
||||
sliceMax = emptyReleasableSlice();
|
||||
}
|
||||
var rocksIterator = db.newIterator(readOptions);
|
||||
SafeCloseable seekFromOrTo;
|
||||
if (reverse) {
|
||||
if (!PREFER_AUTO_SEEK_BOUND && range.hasMax()) {
|
||||
seekFromOrTo = Objects.requireNonNullElseGet(rocksIterSeekFrom(allowNettyDirect, rocksIterator, range.getMaxUnsafe()),
|
||||
() -> ((SafeCloseable) () -> {}));
|
||||
} else {
|
||||
seekFromOrTo = () -> {};
|
||||
rocksIterator.seekToLast();
|
||||
}
|
||||
} else {
|
||||
if (!PREFER_AUTO_SEEK_BOUND && range.hasMin()) {
|
||||
seekFromOrTo = Objects.requireNonNullElseGet(rocksIterSeekTo(allowNettyDirect, rocksIterator, range.getMinUnsafe()),
|
||||
() -> ((SafeCloseable) () -> {}));
|
||||
} else {
|
||||
seekFromOrTo = () -> {};
|
||||
rocksIterator.seekToFirst();
|
||||
}
|
||||
}
|
||||
return new RocksIteratorTuple(List.of(readOptions), rocksIterator, sliceMin, sliceMax, seekFromOrTo);
|
||||
}
|
||||
}
|
||||
|
@ -95,14 +95,13 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> extends
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range));
|
||||
}
|
||||
return LLLocalDictionary.getRocksIterator(allowNettyDirect, readOptions, range, db, false);
|
||||
return db.getRocksIterator(allowNettyDirect, readOptions, range, false);
|
||||
}, (tuple, sink) -> {
|
||||
try {
|
||||
var rocksIterator = tuple.iterator();
|
||||
ObjectArrayList<T> values = new ObjectArrayList<>();
|
||||
Buffer firstGroupKey = null;
|
||||
try {
|
||||
rocksIterator.status();
|
||||
while (rocksIterator.isValid()) {
|
||||
try (Buffer key = LLUtils.readDirectNioBuffer(db.getAllocator(), rocksIterator::key)) {
|
||||
if (firstGroupKey == null) {
|
||||
@ -129,7 +128,6 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> extends
|
||||
|
||||
try {
|
||||
rocksIterator.next();
|
||||
rocksIterator.status();
|
||||
T entry = getEntry(key.send(), value == null ? null : value.send());
|
||||
values.add(entry);
|
||||
} finally {
|
||||
|
@ -91,11 +91,10 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(rangeShared));
|
||||
}
|
||||
return LLLocalDictionary.getRocksIterator(allowNettyDirect, readOptions, rangeShared, db, false);
|
||||
return db.getRocksIterator(allowNettyDirect, readOptions, rangeShared, false);
|
||||
}, (tuple, sink) -> {
|
||||
try {
|
||||
var rocksIterator = tuple.iterator();
|
||||
rocksIterator.status();
|
||||
Buffer firstGroupKey = null;
|
||||
try {
|
||||
while (rocksIterator.isValid()) {
|
||||
@ -117,7 +116,6 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends
|
||||
break;
|
||||
}
|
||||
rocksIterator.next();
|
||||
rocksIterator.status();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -650,11 +650,11 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
|
||||
|
||||
private RocksDBColumn getRocksDBColumn(RocksDB db, ColumnFamilyHandle cfh) {
|
||||
if (db instanceof OptimisticTransactionDB optimisticTransactionDB) {
|
||||
return new OptimisticRocksDBColumn(optimisticTransactionDB, databaseOptions, allocator, cfh, meterRegistry);
|
||||
return new OptimisticRocksDBColumn(optimisticTransactionDB, databaseOptions, allocator, name, cfh, meterRegistry);
|
||||
} else if (db instanceof TransactionDB transactionDB) {
|
||||
return new PessimisticRocksDBColumn(transactionDB, databaseOptions, allocator, cfh, meterRegistry);
|
||||
return new PessimisticRocksDBColumn(transactionDB, databaseOptions, allocator, name, cfh, meterRegistry);
|
||||
} else {
|
||||
return new StandardRocksDBColumn(db, databaseOptions, allocator, cfh, meterRegistry);
|
||||
return new StandardRocksDBColumn(db, databaseOptions, allocator, name, cfh, meterRegistry);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3,7 +3,6 @@ package it.cavallium.dbengine.database.disk;
|
||||
import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
|
||||
import static it.cavallium.dbengine.database.LLUtils.generateCustomReadOptions;
|
||||
import static it.cavallium.dbengine.database.LLUtils.isBoundedRange;
|
||||
import static it.cavallium.dbengine.database.disk.LLLocalDictionary.getRocksIterator;
|
||||
|
||||
import io.netty5.buffer.api.Buffer;
|
||||
import io.netty5.buffer.api.Drop;
|
||||
@ -89,11 +88,10 @@ public abstract class LLLocalReactiveRocksIterator<T> extends
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(rangeShared));
|
||||
}
|
||||
return getRocksIterator(allowNettyDirect, readOptions, rangeShared, db, reverse);
|
||||
return db.getRocksIterator(allowNettyDirect, readOptions, rangeShared, reverse);
|
||||
}, (tuple, sink) -> {
|
||||
try {
|
||||
var rocksIterator = tuple.iterator();
|
||||
rocksIterator.status();
|
||||
if (rocksIterator.isValid()) {
|
||||
Buffer key;
|
||||
if (allowNettyDirect) {
|
||||
@ -128,7 +126,6 @@ public abstract class LLLocalReactiveRocksIterator<T> extends
|
||||
} else {
|
||||
rocksIterator.next();
|
||||
}
|
||||
rocksIterator.status();
|
||||
sink.next(getEntry(key.send(), value == null ? null : value.send()));
|
||||
} finally {
|
||||
if (value != null) {
|
||||
|
@ -34,9 +34,10 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn<Optimis
|
||||
public OptimisticRocksDBColumn(OptimisticTransactionDB db,
|
||||
DatabaseOptions databaseOptions,
|
||||
BufferAllocator alloc,
|
||||
String dbName,
|
||||
ColumnFamilyHandle cfh,
|
||||
MeterRegistry meterRegistry) {
|
||||
super(db, databaseOptions, alloc, cfh, meterRegistry);
|
||||
super(db, databaseOptions, alloc, dbName, cfh, meterRegistry);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -64,11 +65,11 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn<Optimis
|
||||
}
|
||||
|
||||
@Override
|
||||
public @NotNull UpdateAtomicResult updateAtomic(@NotNull ReadOptions readOptions,
|
||||
public @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull ReadOptions readOptions,
|
||||
@NotNull WriteOptions writeOptions,
|
||||
Send<Buffer> keySend,
|
||||
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
|
||||
UpdateAtomicResultMode returnMode) throws IOException, RocksDBException {
|
||||
UpdateAtomicResultMode returnMode) throws IOException {
|
||||
try (Buffer key = keySend.receive()) {
|
||||
try {
|
||||
var cfh = getCfh();
|
||||
|
@ -30,8 +30,9 @@ public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn<Transa
|
||||
public PessimisticRocksDBColumn(TransactionDB db,
|
||||
DatabaseOptions databaseOptions,
|
||||
BufferAllocator alloc,
|
||||
String dbName,
|
||||
ColumnFamilyHandle cfh, MeterRegistry meterRegistry) {
|
||||
super(db, databaseOptions, alloc, cfh, meterRegistry);
|
||||
super(db, databaseOptions, alloc, dbName, cfh, meterRegistry);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -46,11 +47,11 @@ public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn<Transa
|
||||
}
|
||||
|
||||
@Override
|
||||
public @NotNull UpdateAtomicResult updateAtomic(@NotNull ReadOptions readOptions,
|
||||
public @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull ReadOptions readOptions,
|
||||
@NotNull WriteOptions writeOptions,
|
||||
Send<Buffer> keySend,
|
||||
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
|
||||
UpdateAtomicResultMode returnMode) throws IOException, RocksDBException {
|
||||
UpdateAtomicResultMode returnMode) throws IOException {
|
||||
try (Buffer key = keySend.receive()) {
|
||||
try {
|
||||
var cfh = getCfh();
|
||||
|
@ -4,6 +4,7 @@ import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.netty5.buffer.api.Buffer;
|
||||
import io.netty5.buffer.api.BufferAllocator;
|
||||
import io.netty5.buffer.api.Send;
|
||||
import it.cavallium.dbengine.database.LLRange;
|
||||
import it.cavallium.dbengine.database.LLUtils;
|
||||
import it.cavallium.dbengine.database.serialization.SerializationFunction;
|
||||
import java.io.IOException;
|
||||
@ -21,6 +22,15 @@ import org.rocksdb.WriteOptions;
|
||||
|
||||
public sealed interface RocksDBColumn permits AbstractRocksDBColumn {
|
||||
|
||||
/**
|
||||
* This method should not modify or move the writerIndex/readerIndex of the buffers inside the range
|
||||
*/
|
||||
@NotNull
|
||||
RocksIteratorTuple getRocksIterator(boolean allowNettyDirect,
|
||||
ReadOptions readOptions,
|
||||
LLRange range,
|
||||
boolean reverse) throws RocksDBException;
|
||||
|
||||
default byte @Nullable [] get(@NotNull ReadOptions readOptions,
|
||||
byte[] key,
|
||||
boolean existsAlmostCertainly)
|
||||
@ -58,7 +68,7 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn {
|
||||
}
|
||||
}
|
||||
|
||||
@NotNull RocksIterator newIterator(@NotNull ReadOptions readOptions);
|
||||
@NotNull RocksDBIterator newIterator(@NotNull ReadOptions readOptions);
|
||||
|
||||
@NotNull UpdateAtomicResult updateAtomic(@NotNull ReadOptions readOptions, @NotNull WriteOptions writeOptions,
|
||||
Send<Buffer> keySend, SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
|
||||
|
@ -0,0 +1,160 @@
|
||||
package it.cavallium.dbengine.database.disk;
|
||||
|
||||
import static it.cavallium.dbengine.database.LLUtils.isReadOnlyDirect;
|
||||
|
||||
import io.micrometer.core.instrument.Counter;
|
||||
import io.micrometer.core.instrument.Timer;
|
||||
import io.netty5.buffer.api.Buffer;
|
||||
import io.netty5.buffer.api.ReadableComponent;
|
||||
import it.cavallium.dbengine.database.LLUtils;
|
||||
import it.cavallium.dbengine.database.SafeCloseable;
|
||||
import java.nio.ByteBuffer;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.rocksdb.RocksDBException;
|
||||
import org.rocksdb.RocksIterator;
|
||||
|
||||
public class RocksDBIterator implements SafeCloseable {
|
||||
|
||||
private final RocksIterator rocksIterator;
|
||||
private final boolean allowNettyDirect;
|
||||
private final Counter startedIterSeek;
|
||||
private final Counter endedIterSeek;
|
||||
private final Timer iterSeekTime;
|
||||
private final Counter startedIterNext;
|
||||
private final Counter endedIterNext;
|
||||
private final Timer iterNextTime;
|
||||
|
||||
public RocksDBIterator(RocksIterator rocksIterator,
|
||||
boolean allowNettyDirect,
|
||||
Counter startedIterSeek,
|
||||
Counter endedIterSeek,
|
||||
Timer iterSeekTime,
|
||||
Counter startedIterNext,
|
||||
Counter endedIterNext,
|
||||
Timer iterNextTime) {
|
||||
this.rocksIterator = rocksIterator;
|
||||
this.allowNettyDirect = allowNettyDirect;
|
||||
this.startedIterSeek = startedIterSeek;
|
||||
this.endedIterSeek = endedIterSeek;
|
||||
this.iterSeekTime = iterSeekTime;
|
||||
this.startedIterNext = startedIterNext;
|
||||
this.endedIterNext = endedIterNext;
|
||||
this.iterNextTime = iterNextTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
rocksIterator.close();
|
||||
}
|
||||
|
||||
public void seek(ByteBuffer seekBuf) throws RocksDBException {
|
||||
startedIterSeek.increment();
|
||||
try {
|
||||
iterSeekTime.record(() -> rocksIterator.seek(seekBuf));
|
||||
} finally {
|
||||
endedIterSeek.increment();
|
||||
}
|
||||
rocksIterator.status();
|
||||
}
|
||||
|
||||
public void seek(byte[] seekArray) throws RocksDBException {
|
||||
startedIterSeek.increment();
|
||||
try {
|
||||
iterSeekTime.record(() -> rocksIterator.seek(seekArray));
|
||||
} finally {
|
||||
endedIterSeek.increment();
|
||||
}
|
||||
rocksIterator.status();
|
||||
}
|
||||
|
||||
public void seekToFirst() throws RocksDBException {
|
||||
startedIterSeek.increment();
|
||||
try {
|
||||
iterSeekTime.record(rocksIterator::seekToFirst);
|
||||
} finally {
|
||||
endedIterSeek.increment();
|
||||
}
|
||||
rocksIterator.status();
|
||||
}
|
||||
|
||||
public void seekToLast() throws RocksDBException {
|
||||
startedIterSeek.increment();
|
||||
try {
|
||||
iterSeekTime.record(rocksIterator::seekToLast);
|
||||
} finally {
|
||||
endedIterSeek.increment();
|
||||
}
|
||||
rocksIterator.status();
|
||||
}
|
||||
|
||||
/**
|
||||
* Useful for reverse iterations
|
||||
*/
|
||||
@Nullable
|
||||
public SafeCloseable seekFrom(Buffer key) {
|
||||
if (allowNettyDirect && isReadOnlyDirect(key)) {
|
||||
ByteBuffer keyInternalByteBuffer = ((ReadableComponent) key).readableBuffer();
|
||||
assert keyInternalByteBuffer.position() == 0;
|
||||
rocksIterator.seekForPrev(keyInternalByteBuffer);
|
||||
// This is useful to retain the key buffer in memory and avoid deallocations
|
||||
return key::isAccessible;
|
||||
} else {
|
||||
rocksIterator.seekForPrev(LLUtils.toArray(key));
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Useful for forward iterations
|
||||
*/
|
||||
@Nullable
|
||||
public SafeCloseable seekTo(Buffer key) {
|
||||
if (allowNettyDirect && isReadOnlyDirect(key)) {
|
||||
ByteBuffer keyInternalByteBuffer = ((ReadableComponent) key).readableBuffer();
|
||||
assert keyInternalByteBuffer.position() == 0;
|
||||
startedIterSeek.increment();
|
||||
iterSeekTime.record(() -> rocksIterator.seek(keyInternalByteBuffer));
|
||||
endedIterSeek.increment();
|
||||
// This is useful to retain the key buffer in memory and avoid deallocations
|
||||
return key::isAccessible;
|
||||
} else {
|
||||
var array = LLUtils.toArray(key);
|
||||
startedIterSeek.increment();
|
||||
iterSeekTime.record(() -> rocksIterator.seek(array));
|
||||
endedIterSeek.increment();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isValid() {
|
||||
return rocksIterator.isValid();
|
||||
}
|
||||
|
||||
public int key(ByteBuffer buffer) {
|
||||
return rocksIterator.key(buffer);
|
||||
}
|
||||
|
||||
public int value(ByteBuffer buffer) {
|
||||
return rocksIterator.value(buffer);
|
||||
}
|
||||
|
||||
public byte[] key() {
|
||||
return rocksIterator.key();
|
||||
}
|
||||
|
||||
public byte[] value() {
|
||||
return rocksIterator.value();
|
||||
}
|
||||
|
||||
public void next() throws RocksDBException {
|
||||
startedIterNext.increment();
|
||||
iterNextTime.record(rocksIterator::next);
|
||||
endedIterNext.increment();
|
||||
}
|
||||
|
||||
public void prev() throws RocksDBException {
|
||||
startedIterNext.increment();
|
||||
iterNextTime.record(rocksIterator::prev);
|
||||
endedIterNext.increment();
|
||||
}
|
||||
}
|
@ -8,7 +8,7 @@ import org.rocksdb.ReadOptions;
|
||||
import org.rocksdb.RocksIterator;
|
||||
import org.rocksdb.RocksObject;
|
||||
|
||||
public record RocksIteratorTuple(List<RocksObject> refs, @NotNull RocksIterator iterator,
|
||||
public record RocksIteratorTuple(List<RocksObject> refs, @NotNull RocksDBIterator iterator,
|
||||
@NotNull ReleasableSlice sliceMin, @NotNull ReleasableSlice sliceMax,
|
||||
@NotNull SafeCloseable seekTo) implements
|
||||
SafeCloseable {
|
||||
|
@ -25,8 +25,9 @@ public final class StandardRocksDBColumn extends AbstractRocksDBColumn<RocksDB>
|
||||
public StandardRocksDBColumn(RocksDB db,
|
||||
DatabaseOptions databaseOptions,
|
||||
BufferAllocator alloc,
|
||||
String dbName,
|
||||
ColumnFamilyHandle cfh, MeterRegistry meterRegistry) {
|
||||
super(db, databaseOptions, alloc, cfh, meterRegistry);
|
||||
super(db, databaseOptions, alloc, dbName, cfh, meterRegistry);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -40,11 +41,11 @@ public final class StandardRocksDBColumn extends AbstractRocksDBColumn<RocksDB>
|
||||
}
|
||||
|
||||
@Override
|
||||
public @NotNull UpdateAtomicResult updateAtomic(@NotNull ReadOptions readOptions,
|
||||
public @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull ReadOptions readOptions,
|
||||
@NotNull WriteOptions writeOptions,
|
||||
Send<Buffer> keySend,
|
||||
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
|
||||
UpdateAtomicResultMode returnMode) throws IOException, RocksDBException {
|
||||
UpdateAtomicResultMode returnMode) throws IOException {
|
||||
try (Buffer key = keySend.receive()) {
|
||||
try {
|
||||
@Nullable Buffer prevData = this.get(readOptions, key);
|
||||
|
Loading…
Reference in New Issue
Block a user