Clean unused iterators

This commit is contained in:
Andrea Cavalli 2023-05-22 19:12:05 +02:00
parent 8499dcf89c
commit 2f5c8b618f
28 changed files with 501 additions and 411 deletions

View File

@ -7,6 +7,7 @@ import com.google.common.primitives.Longs;
import it.cavallium.buffer.Buf;
import it.cavallium.dbengine.client.HitEntry;
import it.cavallium.dbengine.client.HitKey;
import it.cavallium.dbengine.database.disk.rocksdb.LLReadOptions;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.lucene.LuceneCloseable;
import it.cavallium.dbengine.lucene.LuceneUtils;
@ -472,13 +473,13 @@ public class LLUtils {
* @param smallRange true if the range is small
* @return the passed instance of ReadOptions, or a new one if the passed readOptions is null
*/
public static ReadOptions generateCustomReadOptions(@Nullable ReadOptions readOptions,
public static LLReadOptions generateCustomReadOptions(@Nullable LLReadOptions readOptions,
boolean canFillCache,
boolean boundedRange,
boolean smallRange) {
if (readOptions == null) {
//noinspection resource
readOptions = new ReadOptions();
readOptions = new LLReadOptions();
}
var hugeRange = !boundedRange && !smallRange;
if (hugeRange) {

View File

@ -10,9 +10,12 @@ import it.cavallium.buffer.Buf;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.RepeatedElementList;
import it.cavallium.dbengine.database.disk.rocksdb.LLReadOptions;
import it.cavallium.dbengine.database.disk.rocksdb.LLSlice;
import it.cavallium.dbengine.database.disk.rocksdb.LLWriteOptions;
import it.cavallium.dbengine.database.disk.rocksdb.RocksIteratorObj;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import java.io.IOException;
import it.cavallium.dbengine.utils.SimpleResource;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
@ -24,22 +27,18 @@ import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.AbstractImmutableNativeReference;
import org.rocksdb.AbstractSlice;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.CompactRangeOptions;
import org.rocksdb.FlushOptions;
import org.rocksdb.Holder;
import org.rocksdb.KeyMayExist;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksObject;
import org.rocksdb.Slice;
import org.rocksdb.TableProperties;
import org.rocksdb.Transaction;
import org.rocksdb.TransactionOptions;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements RocksDBColumn
permits StandardRocksDBColumn, OptimisticRocksDBColumn, PessimisticRocksDBColumn {
@ -80,6 +79,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
private final Timer updateRemovedTime;
private final Timer updateUnchangedTime;
private final DBColumnKeyMayExistGetter keyMayExistGetter;
private final IteratorMetrics iteratorMetrics;
public AbstractRocksDBColumn(T db,
String databaseName,
@ -222,28 +222,27 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
.publishPercentileHistogram()
.tags("db.name", databaseName, "db.column", columnName, "update.type", "unchanged")
.register(meterRegistry);
this.iteratorMetrics = new IteratorMetrics(this.startedIterSeek,
this.endedIterSeek,
this.iterSeekTime,
this.startedIterNext,
this.endedIterNext,
this.iterNextTime);
this.keyMayExistGetter = new DBColumnKeyMayExistGetter();
}
/**
* This method should not modify or move the writerIndex/readerIndex of the key
*/
static AbstractSlice<?> setIterateBound(ReadOptions readOpts, IterateBound boundType, Buf key) {
static void setIterateBound(LLReadOptions readOpts, IterateBound boundType, Buf key) {
requireNonNull(key);
AbstractSlice<?> slice;
slice = new Slice(requireNonNull(LLUtils.asArray(key)));
LLSlice slice;
slice = LLSlice.of(requireNonNull(LLUtils.asArray(key)));
if (boundType == IterateBound.LOWER) {
readOpts.setIterateLowerBound(slice);
} else {
readOpts.setIterateUpperBound(slice);
}
return slice;
}
static Slice newEmptyReleasableSlice() {
var arr = new byte[0];
return new Slice(arr);
}
/**
@ -251,7 +250,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
*/
@Override
@NotNull
public RocksIteratorObj newRocksIterator(ReadOptions readOptions, LLRange range, boolean reverse)
public RocksIteratorObj newRocksIterator(LLReadOptions readOptions, LLRange range, boolean reverse)
throws RocksDBException {
assert !LLUtils.isInNonBlockingThread() : "Called getRocksIterator in a nonblocking thread";
var rocksIterator = this.newIterator(readOptions, range.getMin(), range.getMax());
@ -297,8 +296,13 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
RocksDBUtils.ensureOwned(rocksObject);
}
protected void ensureOwned(SimpleResource simpleResource) {
RocksDBUtils.ensureOwned(simpleResource);
}
@Override
public @Nullable Buf get(@NotNull ReadOptions readOptions, Buf key) throws RocksDBException {
public @Nullable Buf get(@NotNull LLReadOptions readOptions, Buf key) throws RocksDBException {
var closeReadLock = closeLock.readLock();
try {
ensureOpen();
@ -310,21 +314,21 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
}
@Override
public void put(@NotNull WriteOptions writeOptions, Buf key, Buf value) throws RocksDBException {
public void put(@NotNull LLWriteOptions writeOptions, Buf key, Buf value) throws RocksDBException {
var closeReadLock = closeLock.readLock();
try {
ensureOpen();
ensureOwned(writeOptions);
this.keyBufferSize.record(key.size());
this.writeValueBufferSize.record(value.size());
db.put(cfh, writeOptions, LLUtils.asArray(key), LLUtils.asArray(value));
db.put(cfh, writeOptions.getUnsafe(), LLUtils.asArray(key), LLUtils.asArray(value));
} finally {
closeLock.unlockRead(closeReadLock);
}
}
@Override
public boolean exists(@NotNull ReadOptions readOptions, Buf key) throws RocksDBException {
public boolean exists(@NotNull LLReadOptions readOptions, Buf key) throws RocksDBException {
var closeReadLock = closeLock.readLock();
try {
ensureOpen();
@ -333,12 +337,12 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
byte[] keyBytes = LLUtils.asArray(key);
Holder<byte[]> data = new Holder<>();
boolean mayExistHit = false;
if (db.keyMayExist(cfh, readOptions, keyBytes, data)) {
if (db.keyMayExist(cfh, readOptions.getUnsafe(), keyBytes, data)) {
mayExistHit = true;
if (data.getValue() != null) {
size = data.getValue().length;
} else {
size = db.get(cfh, readOptions, keyBytes, NO_DATA);
size = db.get(cfh, readOptions.getUnsafe(), keyBytes, NO_DATA);
}
}
boolean found = size != RocksDB.NOT_FOUND;
@ -358,46 +362,46 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
}
@Override
public boolean mayExists(@NotNull ReadOptions readOptions, Buf key) {
public boolean mayExists(@NotNull LLReadOptions readOptions, Buf key) {
var closeReadLock = closeLock.readLock();
try {
ensureOpen();
ensureOwned(readOptions);
byte[] keyBytes = LLUtils.asArray(key);
return db.keyMayExist(cfh, readOptions, keyBytes, null);
return db.keyMayExist(cfh, readOptions.getUnsafe(), keyBytes, null);
} finally {
closeLock.unlockRead(closeReadLock);
}
}
@Override
public void delete(WriteOptions writeOptions, Buf key) throws RocksDBException {
public void delete(LLWriteOptions writeOptions, Buf key) throws RocksDBException {
var closeReadLock = closeLock.readLock();
try {
ensureOpen();
ensureOwned(writeOptions);
keyBufferSize.record(key.size());
db.delete(cfh, writeOptions, LLUtils.asArray(key));
db.delete(cfh, writeOptions.getUnsafe(), LLUtils.asArray(key));
} finally {
closeLock.unlockRead(closeReadLock);
}
}
@Override
public void delete(WriteOptions writeOptions, byte[] key) throws RocksDBException {
public void delete(LLWriteOptions writeOptions, byte[] key) throws RocksDBException {
var closeReadLock = closeLock.readLock();
try {
ensureOpen();
ensureOwned(writeOptions);
keyBufferSize.record(key.length);
db.delete(cfh, writeOptions, key);
db.delete(cfh, writeOptions.getUnsafe(), key);
} finally {
closeLock.unlockRead(closeReadLock);
}
}
@Override
public List<byte[]> multiGetAsList(ReadOptions readOptions, List<byte[]> keys) throws RocksDBException {
public List<byte[]> multiGetAsList(LLReadOptions readOptions, List<byte[]> keys) throws RocksDBException {
var closeReadLock = closeLock.readLock();
try {
ensureOpen();
@ -406,7 +410,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
keyBufferSize.record(key.length);
}
var columnFamilyHandles = new RepeatedElementList<>(cfh, keys.size());
return db.multiGetAsList(readOptions, columnFamilyHandles, keys);
return db.multiGetAsList(readOptions.getUnsafe(), columnFamilyHandles, keys);
} finally {
closeLock.unlockRead(closeReadLock);
}
@ -480,13 +484,13 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
}
@Override
public void write(WriteOptions writeOptions, WriteBatch writeBatch) throws RocksDBException {
public void write(LLWriteOptions writeOptions, WriteBatch writeBatch) throws RocksDBException {
var closeReadLock = closeLock.readLock();
try {
ensureOpen();
ensureOwned(writeOptions);
ensureOwned(writeBatch);
db.write(writeOptions, writeBatch);
db.write(writeOptions.getUnsafe(), writeBatch);
} finally {
closeLock.unlockRead(closeReadLock);
}
@ -497,12 +501,12 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
*/
protected abstract boolean commitOptimistically(Transaction tx) throws RocksDBException;
protected abstract Transaction beginTransaction(@NotNull WriteOptions writeOptions,
protected abstract Transaction beginTransaction(@NotNull LLWriteOptions writeOptions,
TransactionOptions txOpts);
@Override
public final @NotNull UpdateAtomicResult updateAtomic(@NotNull ReadOptions readOptions,
@NotNull WriteOptions writeOptions,
public final @NotNull UpdateAtomicResult updateAtomic(@NotNull LLReadOptions readOptions,
@NotNull LLWriteOptions writeOptions,
Buf key,
SerializationFunction<@Nullable Buf, @Nullable Buf> updater,
UpdateAtomicResultMode returnMode) {
@ -540,65 +544,24 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
timer.record(duration, TimeUnit.NANOSECONDS);
}
protected abstract @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull ReadOptions readOptions,
@NotNull WriteOptions writeOptions,
protected abstract @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull LLReadOptions readOptions,
@NotNull LLWriteOptions writeOptions,
Buf key,
SerializationFunction<@Nullable Buf, @Nullable Buf> updater,
UpdateAtomicResultMode returnMode);
@Override
@NotNull
public RocksIteratorObj newIterator(@NotNull ReadOptions readOptions,
public RocksIteratorObj newIterator(@NotNull LLReadOptions readOptions,
@Nullable Buf min,
@Nullable Buf max) {
var closeReadLock = closeLock.readLock();
try {
ensureOpen();
ensureOwned(readOptions);
AbstractSlice<?> sliceMin;
AbstractSlice<?> sliceMax;
if (min != null) {
sliceMin = setIterateBound(readOptions, IterateBound.LOWER, min);
} else {
sliceMin = null;
}
try {
if (max != null) {
sliceMax = setIterateBound(readOptions, IterateBound.UPPER, max);
} else {
sliceMax = null;
}
try {
var it = db.newIterator(cfh, readOptions);
try {
return new RocksIteratorObj(it,
sliceMin,
sliceMax,
min,
max,
this.startedIterSeek,
this.endedIterSeek,
this.iterSeekTime,
this.startedIterNext,
this.endedIterNext,
this.iterNextTime
);
} catch (Throwable ex) {
it.close();
throw ex;
}
} catch (Throwable ex) {
if (sliceMax != null) {
sliceMax.close();
}
throw ex;
}
} catch (Throwable ex) {
if (sliceMin != null) {
sliceMin.close();
}
throw ex;
}
setIterateBound(readOptions, IterateBound.LOWER, min);
setIterateBound(readOptions, IterateBound.UPPER, max);
return readOptions.newIterator(db, cfh, iteratorMetrics);
} finally {
closeLock.unlockRead(closeReadLock);
}
@ -651,23 +614,23 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
private class DBColumnKeyMayExistGetter extends KeyMayExistGetter {
@Override
protected KeyMayExist keyMayExist(ReadOptions readOptions, ByteBuffer key, ByteBuffer value) {
return db.keyMayExist(cfh, readOptions, key, value);
protected KeyMayExist keyMayExist(LLReadOptions readOptions, ByteBuffer key, ByteBuffer value) {
return db.keyMayExist(cfh, readOptions.getUnsafe(), key, value);
}
@Override
protected boolean keyMayExist(ReadOptions readOptions, byte[] key, @Nullable Holder<byte[]> valueHolder) {
return db.keyMayExist(cfh, readOptions, key, valueHolder);
protected boolean keyMayExist(LLReadOptions readOptions, byte[] key, @Nullable Holder<byte[]> valueHolder) {
return db.keyMayExist(cfh, readOptions.getUnsafe(), key, valueHolder);
}
@Override
protected int get(ReadOptions readOptions, ByteBuffer key, ByteBuffer value) throws RocksDBException {
return db.get(cfh, readOptions, key, value);
protected int get(LLReadOptions readOptions, ByteBuffer key, ByteBuffer value) throws RocksDBException {
return db.get(cfh, readOptions.getUnsafe(), key, value);
}
@Override
protected byte[] get(ReadOptions readOptions, byte[] key) throws RocksDBException, IllegalArgumentException {
return db.get(cfh, readOptions, key);
protected byte[] get(LLReadOptions readOptions, byte[] key) throws RocksDBException, IllegalArgumentException {
return db.get(cfh, readOptions.getUnsafe(), key);
}
@Override

View File

@ -3,6 +3,7 @@ package it.cavallium.dbengine.database.disk;
import static it.cavallium.dbengine.database.LLUtils.asArray;
import it.cavallium.buffer.Buf;
import it.cavallium.dbengine.database.disk.rocksdb.LLWriteOptions;
import java.nio.ByteBuffer;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;
@ -13,7 +14,7 @@ public class CappedWriteBatch extends WriteBatch {
private final RocksDBColumn db;
private final int cap;
private final WriteOptions writeOptions;
private final LLWriteOptions writeOptions;
/**
* @param db
@ -23,7 +24,7 @@ public class CappedWriteBatch extends WriteBatch {
int cap,
int reservedWriteBatchSize,
long maxWriteBatchSize,
WriteOptions writeOptions) {
LLWriteOptions writeOptions) {
super(reservedWriteBatchSize);
this.db = db;
this.cap = cap;

View File

@ -0,0 +1,7 @@
package it.cavallium.dbengine.database.disk;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
public record IteratorMetrics(Counter startedIterSeek, Counter endedIterSeek, Timer iterSeekTime,
Counter startedIterNext, Counter endedIterNext, Timer iterNextTime) {}

View File

@ -6,6 +6,7 @@ import static java.util.Objects.requireNonNull;
import it.cavallium.buffer.Buf;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.rocksdb.LLReadOptions;
import java.nio.ByteBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -32,12 +33,12 @@ public abstract class KeyMayExistGetter {
public KeyMayExistGetter() {
}
public final @Nullable Buf get(@NotNull ReadOptions readOptions, Buf key) throws RocksDBException {
public final @Nullable Buf get(@NotNull LLReadOptions readOptions, Buf key) throws RocksDBException {
recordKeyBufferSize(key.size());
return getHeap(readOptions, key);
}
private Buf getHeap(ReadOptions readOptions, Buf key) throws RocksDBException {
private Buf getHeap(LLReadOptions readOptions, Buf key) throws RocksDBException {
int readAttemptsCount = 0;
try {
byte[] keyArray = LLUtils.asArray(key);
@ -73,15 +74,15 @@ public abstract class KeyMayExistGetter {
}
}
protected abstract KeyMayExist keyMayExist(final ReadOptions readOptions, final ByteBuffer key, final ByteBuffer value);
protected abstract KeyMayExist keyMayExist(final LLReadOptions readOptions, final ByteBuffer key, final ByteBuffer value);
protected abstract boolean keyMayExist(final ReadOptions readOptions,
protected abstract boolean keyMayExist(final LLReadOptions readOptions,
final byte[] key,
@Nullable final Holder<byte[]> valueHolder);
protected abstract int get(final ReadOptions opt, final ByteBuffer key, final ByteBuffer value) throws RocksDBException;
protected abstract int get(final LLReadOptions opt, final ByteBuffer key, final ByteBuffer value) throws RocksDBException;
protected abstract byte[] get(final ReadOptions opt, final byte[] key) throws RocksDBException, IllegalArgumentException;
protected abstract byte[] get(final LLReadOptions opt, final byte[] key) throws RocksDBException, IllegalArgumentException;
protected abstract void recordReadValueNotFoundWithMayExistBloomBufferSize(int value);

View File

@ -30,6 +30,9 @@ import it.cavallium.dbengine.database.OptionalBuf;
import it.cavallium.dbengine.database.SerializedKey;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.disk.rocksdb.LLReadOptions;
import it.cavallium.dbengine.database.disk.rocksdb.LLSlice;
import it.cavallium.dbengine.database.disk.rocksdb.LLWriteOptions;
import it.cavallium.dbengine.database.serialization.KVSerializationFunction;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.rpc.current.data.DatabaseOptions;
@ -68,7 +71,7 @@ public class LLLocalDictionary implements LLDictionary {
static final long MAX_WRITE_BATCH_SIZE = 1024L * 1024L * 1024L; // 1GiB
static final int CAPPED_WRITE_BATCH_CAP = 50000; // 50K operations
static final int MULTI_GET_WINDOW = 16;
private static final ReadOptions EMPTY_READ_OPTIONS = LLUtils.ALLOW_STATIC_OPTIONS ? new ReadOptions() : null;
private static final LLReadOptions EMPTY_READ_OPTIONS = LLUtils.ALLOW_STATIC_OPTIONS ? new LLReadOptions() : null;
static final boolean PREFER_AUTO_SEEK_BOUND = false;
/**
* It used to be false,
@ -195,33 +198,33 @@ public class LLLocalDictionary implements LLDictionary {
}
@NotNull
private ReadOptions generateReadOptionsOrStatic(LLSnapshot snapshot) {
private LLReadOptions generateReadOptionsOrStatic(LLSnapshot snapshot) {
var resolved = generateReadOptions(snapshot != null ? snapshotResolver.apply(snapshot) : null, true);
if (resolved != null) {
return resolved;
} else {
return new ReadOptions();
return new LLReadOptions();
}
}
@Nullable
private ReadOptions generateReadOptionsOrNull(LLSnapshot snapshot) {
private LLReadOptions generateReadOptionsOrNull(LLSnapshot snapshot) {
return generateReadOptions(snapshot != null ? snapshotResolver.apply(snapshot) : null, false);
}
@NotNull
private ReadOptions generateReadOptionsOrNew(LLSnapshot snapshot) {
private LLReadOptions generateReadOptionsOrNew(LLSnapshot snapshot) {
var result = generateReadOptions(snapshot != null ? snapshotResolver.apply(snapshot) : null, false);
if (result != null) {
return result;
} else {
return new ReadOptions();
return new LLReadOptions();
}
}
private ReadOptions generateReadOptions(Snapshot snapshot, boolean orStaticOpts) {
private LLReadOptions generateReadOptions(Snapshot snapshot, boolean orStaticOpts) {
if (snapshot != null) {
return new ReadOptions().setSnapshot(snapshot);
return new LLReadOptions().setSnapshot(snapshot);
} else if (ALLOW_STATIC_OPTIONS && orStaticOpts) {
return EMPTY_READ_OPTIONS;
} else {
@ -337,7 +340,7 @@ public class LLLocalDictionary implements LLDictionary {
logger.trace(MARKER_ROCKSDB, "Writing {}: {}", varargs);
}
startedPut.increment();
try (var writeOptions = new WriteOptions()) {
try (var writeOptions = new LLWriteOptions()) {
putTime.recordCallable(() -> {
db.put(writeOptions, key, value);
return null;
@ -376,7 +379,7 @@ public class LLLocalDictionary implements LLDictionary {
try {
var readOptions = generateReadOptionsOrStatic(null);
startedUpdates.increment();
try (var writeOptions = new WriteOptions()) {
try (var writeOptions = new LLWriteOptions()) {
result = updateTime.recordCallable(() -> db.updateAtomic(readOptions, writeOptions, key, updater, returnMode));
} finally {
endedUpdates.increment();
@ -413,7 +416,7 @@ public class LLLocalDictionary implements LLDictionary {
try {
var readOptions = generateReadOptionsOrStatic(null);
startedUpdates.increment();
try (var writeOptions = new WriteOptions()) {
try (var writeOptions = new LLWriteOptions()) {
result = updateTime.recordCallable(() ->
(UpdateAtomicResultDelta) db.updateAtomic(readOptions, writeOptions, key, updater, DELTA));
} finally {
@ -437,7 +440,7 @@ public class LLLocalDictionary implements LLDictionary {
try {
logger.trace(MARKER_ROCKSDB, "Deleting {}", () -> toStringSafe(key));
startedRemove.increment();
try (var writeOptions = new WriteOptions()) {
try (var writeOptions = new LLWriteOptions()) {
removeTime.recordCallable(() -> {
db.delete(writeOptions, key);
return null;
@ -489,7 +492,7 @@ public class LLLocalDictionary implements LLDictionary {
collectOn(ROCKSDB_POOL,
batches(entries, Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP)),
executing(entriesWindow -> {
try (var writeOptions = new WriteOptions()) {
try (var writeOptions = new LLWriteOptions()) {
assert !LLUtils.isInNonBlockingThread() : "Called putMulti in a nonblocking thread";
if (USE_WRITE_BATCHES_IN_PUT_MULTI) {
try (var batch = new CappedWriteBatch(db,
@ -521,7 +524,7 @@ public class LLLocalDictionary implements LLDictionary {
record MappedInput<K>(K key, Buf serializedKey, OptionalBuf mapped) {}
return batches(keys, Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP))
.flatMap(entriesWindow -> {
try (var writeOptions = new WriteOptions()) {
try (var writeOptions = new LLWriteOptions()) {
if (LLUtils.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called updateMulti in a nonblocking thread");
}
@ -761,7 +764,7 @@ public class LLLocalDictionary implements LLDictionary {
@Override
public void setRange(LLRange range, Stream<LLEntry> entries, boolean smallRange) {
if (USE_WINDOW_IN_SET_RANGE) {
try (var writeOptions = new WriteOptions()) {
try (var writeOptions = new LLWriteOptions()) {
assert !LLUtils.isInNonBlockingThread() : "Called setRange in a nonblocking thread";
if (!USE_WRITE_BATCH_IN_SET_RANGE_DELETE || !USE_WRITE_BATCHES_IN_SET_RANGE) {
try (var opts = LLUtils.generateCustomReadOptions(null, true, isBoundedRange(range), smallRange)) {
@ -807,7 +810,7 @@ public class LLLocalDictionary implements LLDictionary {
}
collectOn(ROCKSDB_POOL, batches(entries, MULTI_GET_WINDOW), executing(entriesList -> {
try (var writeOptions = new WriteOptions()) {
try (var writeOptions = new LLWriteOptions()) {
if (!USE_WRITE_BATCHES_IN_SET_RANGE) {
for (LLEntry entry : entriesList) {
db.put(writeOptions, entry.getKey(), entry.getValue());
@ -843,7 +846,7 @@ public class LLLocalDictionary implements LLDictionary {
throw new UnsupportedOperationException("Can't use write batches in setRange without window. Please fix the parameters");
}
collectOn(ROCKSDB_POOL, this.getRange(null, range, false, smallRange), executing(oldValue -> {
try (var writeOptions = new WriteOptions()) {
try (var writeOptions = new LLWriteOptions()) {
db.delete(writeOptions, oldValue.getKey());
} catch (RocksDBException ex) {
throw new CompletionException(new DBException("Failed to write range", ex));
@ -878,7 +881,7 @@ public class LLLocalDictionary implements LLDictionary {
public void clear() {
assert !LLUtils.isInNonBlockingThread() : "Called clear in a nonblocking thread";
boolean shouldCompactLater = false;
try (var writeOptions = new WriteOptions();
try (var writeOptions = new LLWriteOptions();
var readOpts = LLUtils.generateCustomReadOptions(null, false, false, false)) {
if (VERIFY_CHECKSUMS_WHEN_NOT_NEEDED) {
readOpts.setVerifyChecksums(true);
@ -1088,16 +1091,16 @@ public class LLLocalDictionary implements LLDictionary {
: LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS[idx + 1]
)).map(range -> {
long partialCount = 0;
try (var rangeReadOpts = new ReadOptions(readOpts)) {
Slice sliceBegin;
try (var rangeReadOpts = readOpts.copy()) {
LLSlice sliceBegin;
if (range.getKey() != null) {
sliceBegin = new Slice(range.getKey());
sliceBegin = LLSlice.of(range.getKey());
} else {
sliceBegin = null;
}
Slice sliceEnd;
LLSlice sliceEnd;
if (range.getValue() != null) {
sliceEnd = new Slice(range.getValue());
sliceEnd = LLSlice.of(range.getValue());
} else {
sliceEnd = null;
}
@ -1147,8 +1150,8 @@ public class LLLocalDictionary implements LLDictionary {
@Override
public LLEntry removeOne(LLRange range) {
assert !LLUtils.isInNonBlockingThread() : "Called removeOne in a nonblocking thread";
try (var readOpts = new ReadOptions();
var writeOpts = new WriteOptions()) {
try (var readOpts = new LLReadOptions();
var writeOpts = new LLWriteOptions()) {
try (var rocksIterator = db.newIterator(readOpts, range.getMin(), range.getMax())) {
if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) {
rocksIterator.seekTo(range.getMin());

View File

@ -3,6 +3,7 @@ package it.cavallium.dbengine.database.disk;
import it.cavallium.buffer.Buf;
import it.cavallium.dbengine.database.LLEntry;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.disk.rocksdb.LLReadOptions;
import java.util.function.Supplier;
import org.rocksdb.ReadOptions;
@ -10,7 +11,7 @@ public class LLLocalEntryReactiveRocksIterator extends LLLocalReactiveRocksItera
public LLLocalEntryReactiveRocksIterator(RocksDBColumn db,
LLRange range,
Supplier<ReadOptions> readOptions,
Supplier<LLReadOptions> readOptions,
boolean reverse,
boolean smallRange) {
super(db, range, readOptions, true, reverse, smallRange);

View File

@ -3,6 +3,7 @@ package it.cavallium.dbengine.database.disk;
import it.cavallium.buffer.Buf;
import it.cavallium.dbengine.database.LLEntry;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.disk.rocksdb.LLReadOptions;
import java.util.function.Supplier;
import org.rocksdb.ReadOptions;
@ -11,7 +12,7 @@ public class LLLocalGroupedEntryReactiveRocksIterator extends LLLocalGroupedReac
public LLLocalGroupedEntryReactiveRocksIterator(RocksDBColumn db,
int prefixLength,
LLRange range,
Supplier<ReadOptions> readOptions,
Supplier<LLReadOptions> readOptions,
boolean smallRange) {
super(db, prefixLength, range, readOptions, true, true, smallRange);
}

View File

@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.disk;
import it.cavallium.buffer.Buf;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.disk.rocksdb.LLReadOptions;
import java.util.function.Supplier;
import org.rocksdb.ReadOptions;
@ -10,7 +11,7 @@ public class LLLocalGroupedKeyReactiveRocksIterator extends LLLocalGroupedReacti
public LLLocalGroupedKeyReactiveRocksIterator(RocksDBColumn db,
int prefixLength,
LLRange range,
Supplier<ReadOptions> readOptions,
Supplier<LLReadOptions> readOptions,
boolean smallRange) {
super(db, prefixLength, range, readOptions, true, false, smallRange);
}

View File

@ -7,6 +7,7 @@ import static it.cavallium.dbengine.database.LLUtils.isBoundedRange;
import it.cavallium.buffer.Buf;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.rocksdb.LLReadOptions;
import it.cavallium.dbengine.database.disk.rocksdb.RocksIteratorObj;
import it.cavallium.dbengine.utils.DBException;
import it.cavallium.dbengine.utils.StreamUtils;
@ -30,7 +31,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> {
private final RocksDBColumn db;
private final int prefixLength;
private final LLRange range;
private final Supplier<ReadOptions> readOptions;
private final Supplier<LLReadOptions> readOptions;
private final boolean canFillCache;
private final boolean readValues;
private final boolean smallRange;
@ -38,14 +39,14 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> {
public LLLocalGroupedReactiveRocksIterator(RocksDBColumn db,
int prefixLength,
LLRange range,
Supplier<ReadOptions> readOptions,
Supplier<LLReadOptions> readOptions,
boolean canFillCache,
boolean readValues,
boolean smallRange) {
this.db = db;
this.prefixLength = prefixLength;
this.range = range;
this.readOptions = readOptions != null ? readOptions : ReadOptions::new;
this.readOptions = readOptions != null ? readOptions : LLReadOptions::new;
this.canFillCache = canFillCache;
this.readValues = readValues;
this.smallRange = smallRange;

View File

@ -10,6 +10,7 @@ import com.google.common.collect.Streams;
import it.cavallium.buffer.Buf;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.rocksdb.LLReadOptions;
import it.cavallium.dbengine.utils.DBException;
import it.cavallium.dbengine.utils.StreamUtils;
import java.io.IOException;
@ -29,20 +30,20 @@ public class LLLocalKeyPrefixReactiveRocksIterator {
private final RocksDBColumn db;
private final int prefixLength;
private final LLRange range;
private final Supplier<ReadOptions> readOptions;
private final Supplier<LLReadOptions> readOptions;
private final boolean canFillCache;
private final boolean smallRange;
public LLLocalKeyPrefixReactiveRocksIterator(RocksDBColumn db,
int prefixLength,
LLRange range,
Supplier<ReadOptions> readOptions,
Supplier<LLReadOptions> readOptions,
boolean canFillCache,
boolean smallRange) {
this.db = db;
this.prefixLength = prefixLength;
this.range = range;
this.readOptions = readOptions != null ? readOptions : ReadOptions::new;
this.readOptions = readOptions != null ? readOptions : LLReadOptions::new;
this.canFillCache = canFillCache;
this.smallRange = smallRange;
}

View File

@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.disk;
import it.cavallium.buffer.Buf;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.disk.rocksdb.LLReadOptions;
import java.util.function.Supplier;
import org.rocksdb.ReadOptions;
@ -9,7 +10,7 @@ public class LLLocalKeyReactiveRocksIterator extends LLLocalReactiveRocksIterato
public LLLocalKeyReactiveRocksIterator(RocksDBColumn db,
LLRange rangeMono,
Supplier<ReadOptions> readOptions,
Supplier<LLReadOptions> readOptions,
boolean reverse,
boolean smallRange) {
super(db, rangeMono, readOptions, false, reverse, smallRange);

View File

@ -98,7 +98,7 @@ import org.rocksdb.util.SizeUnit;
public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDatabase {
private static final boolean DELETE_LOG_FILES = false;
private static final boolean FOLLOW_ROCKSDB_OPTIMIZATIONS = true;
private static final boolean FOLLOW_ROCKSDB_OPTIMIZATIONS = false;
private static final boolean USE_CLOCK_CACHE
= Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.clockcache.enable", "false"));

View File

@ -7,6 +7,7 @@ import it.cavallium.buffer.Buf;
import it.cavallium.dbengine.database.LLEntry;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.rocksdb.LLReadOptions;
import it.cavallium.dbengine.database.disk.rocksdb.RocksIteratorObj;
import it.cavallium.dbengine.utils.DBException;
import java.io.IOException;
@ -22,11 +23,11 @@ public final class LLLocalMigrationReactiveRocksIterator {
private final RocksDBColumn db;
private final LLRange range;
private final Supplier<ReadOptions> readOptions;
private final Supplier<LLReadOptions> readOptions;
public LLLocalMigrationReactiveRocksIterator(RocksDBColumn db,
LLRange range,
Supplier<ReadOptions> readOptions) {
Supplier<LLReadOptions> readOptions) {
this.db = db;
this.range = range;
this.readOptions = readOptions;

View File

@ -8,6 +8,7 @@ import static it.cavallium.dbengine.utils.StreamUtils.streamWhileNonNull;
import it.cavallium.buffer.Buf;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.rocksdb.LLReadOptions;
import it.cavallium.dbengine.database.disk.rocksdb.RocksIteratorObj;
import it.cavallium.dbengine.utils.DBException;
import java.io.IOException;
@ -27,20 +28,20 @@ public abstract class LLLocalReactiveRocksIterator<T> {
private final RocksDBColumn db;
private final LLRange range;
private final Supplier<ReadOptions> readOptions;
private final Supplier<LLReadOptions> readOptions;
private final boolean readValues;
private final boolean reverse;
private final boolean smallRange;
public LLLocalReactiveRocksIterator(RocksDBColumn db,
LLRange range,
Supplier<ReadOptions> readOptions,
Supplier<LLReadOptions> readOptions,
boolean readValues,
boolean reverse,
boolean smallRange) {
this.db = db;
this.range = range;
this.readOptions = readOptions != null ? readOptions : ReadOptions::new;
this.readOptions = readOptions != null ? readOptions : LLReadOptions::new;
this.readValues = readValues;
this.reverse = reverse;
this.smallRange = smallRange;

View File

@ -9,6 +9,8 @@ import it.cavallium.dbengine.database.LLSingleton;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.disk.rocksdb.LLReadOptions;
import it.cavallium.dbengine.database.disk.rocksdb.LLWriteOptions;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.utils.DBException;
import java.io.IOException;
@ -43,19 +45,20 @@ public class LLLocalSingleton implements LLSingleton {
if (LLUtils.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Initialized in a nonblocking thread");
}
try (var readOptions = new ReadOptions();
var writeOptions = new WriteOptions()) {
if (defaultValue != null && db.get(readOptions, this.name.asArray(), true) == null) {
db.put(writeOptions, this.name.asArray(), defaultValue);
try (var readOptions = new LLReadOptions()) {
try (var writeOptions = new LLWriteOptions()) {
if (defaultValue != null && db.get(readOptions, this.name.asArray(), true) == null) {
db.put(writeOptions, this.name.asArray(), defaultValue);
}
}
}
}
private ReadOptions generateReadOptions(LLSnapshot snapshot) {
private LLReadOptions generateReadOptions(LLSnapshot snapshot) {
if (snapshot != null) {
return new ReadOptions().setSnapshot(snapshotResolver.apply(snapshot));
return new LLReadOptions().setSnapshot(snapshotResolver.apply(snapshot));
} else {
return new ReadOptions();
return new LLReadOptions();
}
}
@ -74,7 +77,7 @@ public class LLLocalSingleton implements LLSingleton {
@Override
public void set(Buf value) {
try (var writeOptions = new WriteOptions()) {
try (var writeOptions = new LLWriteOptions()) {
if (value == null) {
db.delete(writeOptions, name);
} else {
@ -101,8 +104,10 @@ public class LLLocalSingleton implements LLSingleton {
case GET_OLD_VALUE -> UpdateAtomicResultMode.PREVIOUS;
};
UpdateAtomicResult result;
try (var readOptions = new ReadOptions(); var writeOptions = new WriteOptions()) {
result = db.updateAtomic(readOptions, writeOptions, name, updater, returnMode);
try (var readOptions = new LLReadOptions()) {
try (var writeOptions = new LLWriteOptions()) {
result = db.updateAtomic(readOptions, writeOptions, name, updater, returnMode);
}
}
return switch (updateReturnMode) {
case NOTHING -> null;
@ -117,8 +122,10 @@ public class LLLocalSingleton implements LLSingleton {
throw new UnsupportedOperationException("Called update in a nonblocking thread");
}
UpdateAtomicResult result;
try (var readOptions = new ReadOptions(); var writeOptions = new WriteOptions()) {
result = db.updateAtomic(readOptions, writeOptions, name, updater, DELTA);
try (var readOptions = new LLReadOptions()) {
try (var writeOptions = new LLWriteOptions()) {
result = db.updateAtomic(readOptions, writeOptions, name, updater, DELTA);
}
}
return ((UpdateAtomicResultDelta) result).delta();
}

View File

@ -7,6 +7,8 @@ import io.micrometer.core.instrument.MeterRegistry;
import it.cavallium.buffer.Buf;
import it.cavallium.dbengine.database.LLDelta;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.rocksdb.LLReadOptions;
import it.cavallium.dbengine.database.disk.rocksdb.LLWriteOptions;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.lucene.ExponentialPageLimits;
import it.cavallium.dbengine.utils.DBException;
@ -63,19 +65,19 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn<Optimis
}
@Override
protected Transaction beginTransaction(@NotNull WriteOptions writeOptions,
protected Transaction beginTransaction(@NotNull LLWriteOptions writeOptions,
TransactionOptions txOpts) {
return getDb().beginTransaction(writeOptions);
return getDb().beginTransaction(writeOptions.getUnsafe());
}
@Override
public void write(WriteOptions writeOptions, WriteBatch writeBatch) throws RocksDBException {
getDb().write(writeOptions, writeBatch);
public void write(LLWriteOptions writeOptions, WriteBatch writeBatch) throws RocksDBException {
getDb().write(writeOptions.getUnsafe(), writeBatch);
}
@Override
public @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull ReadOptions readOptions,
@NotNull WriteOptions writeOptions,
public @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull LLReadOptions readOptions,
@NotNull LLWriteOptions writeOptions,
Buf key,
SerializationFunction<@Nullable Buf, @Nullable Buf> updater,
UpdateAtomicResultMode returnMode) {
@ -86,107 +88,108 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn<Optimis
if (LLUtils.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called update in a nonblocking thread");
}
try (var txOpts = new TransactionOptions();
var tx = beginTransaction(writeOptions, txOpts)) {
boolean committedSuccessfully;
int retries = 0;
ExponentialPageLimits retryTime = null;
Buf prevData;
Buf newData;
boolean changed;
do {
var prevDataArray = tx.getForUpdate(readOptions, cfh, keyArray, true);
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Reading {}: {} (before update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevDataArray)
);
}
if (prevDataArray != null) {
prevData = Buf.wrap(prevDataArray);
prevDataArray = null;
} else {
prevData = null;
}
if (prevData != null) {
prevData.freeze();
}
try {
newData = updater.apply(prevData);
} catch (Exception ex) {
throw new DBException("Failed to update key " + LLUtils.toStringSafe(key) + ". The previous value was:\n" + LLUtils.toStringSafe(prevData, 8192), ex);
}
var newDataArray = newData == null ? null : LLUtils.asArray(newData);
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Updating {}. previous data: {}, updated data: {}",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevDataArray),
LLUtils.toStringSafe(newDataArray)
);
}
if (prevData != null && newData == null) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
}
tx.delete(cfh, keyArray, true);
changed = true;
committedSuccessfully = commitOptimistically(tx);
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
try (var txOpts = new TransactionOptions()) {
try (var tx = beginTransaction(writeOptions, txOpts)) {
boolean committedSuccessfully;
int retries = 0;
ExponentialPageLimits retryTime = null;
Buf prevData;
Buf newData;
boolean changed;
do {
var prevDataArray = tx.getForUpdate(readOptions.getUnsafe(), cfh, keyArray, true);
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Writing {}: {} (after update)",
"Reading {}: {} (before update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(newData)
LLUtils.toStringSafe(prevDataArray)
);
}
tx.put(cfh, keyArray, newDataArray);
changed = true;
committedSuccessfully = commitOptimistically(tx);
} else {
changed = false;
committedSuccessfully = true;
tx.rollback();
if (prevDataArray != null) {
prevData = Buf.wrap(prevDataArray);
prevDataArray = null;
} else {
prevData = null;
}
if (prevData != null) {
prevData.freeze();
}
try {
newData = updater.apply(prevData);
} catch (Exception ex) {
throw new DBException("Failed to update key " + LLUtils.toStringSafe(key) + ". The previous value was:\n" + LLUtils.toStringSafe(prevData, 8192), ex);
}
var newDataArray = newData == null ? null : LLUtils.asArray(newData);
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Updating {}. previous data: {}, updated data: {}",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevDataArray),
LLUtils.toStringSafe(newDataArray)
);
}
if (prevData != null && newData == null) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
}
tx.delete(cfh, keyArray, true);
changed = true;
committedSuccessfully = commitOptimistically(tx);
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Writing {}: {} (after update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(newData)
);
}
tx.put(cfh, keyArray, newDataArray);
changed = true;
committedSuccessfully = commitOptimistically(tx);
} else {
changed = false;
committedSuccessfully = true;
tx.rollback();
}
if (!committedSuccessfully) {
tx.undoGetForUpdate(cfh, keyArray);
tx.rollback();
retries++;
if (retries == 1) {
retryTime = new ExponentialPageLimits(0, 2, 2000);
}
long retryNs = 1000000L * retryTime.getPageLimit(retries);
// +- 30%
retryNs = retryNs + ThreadLocalRandom.current().nextLong(-retryNs * 30L / 100L, retryNs * 30L / 100L);
if (retries >= 5 && retries % 5 == 0 || ALWAYS_PRINT_OPTIMISTIC_RETRIES) {
logger.warn(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):"
+ " waiting {} ms before retrying for the {} time", LLUtils.toStringSafe(key), retryNs / 1000000d, retries);
} else if (logger.isDebugEnabled(MARKER_ROCKSDB)) {
logger.debug(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):"
+ " waiting {} ms before retrying for the {} time", LLUtils.toStringSafe(key), retryNs / 1000000d, retries);
}
// Wait for n milliseconds
if (retryNs > 0) {
LockSupport.parkNanos(retryNs);
}
}
} while (!committedSuccessfully);
if (retries > 5) {
logger.warn(MARKER_ROCKSDB, "Took {} retries to update key {}", retries, LLUtils.toStringSafe(key));
}
if (!committedSuccessfully) {
tx.undoGetForUpdate(cfh, keyArray);
tx.rollback();
retries++;
if (retries == 1) {
retryTime = new ExponentialPageLimits(0, 2, 2000);
}
long retryNs = 1000000L * retryTime.getPageLimit(retries);
// +- 30%
retryNs = retryNs + ThreadLocalRandom.current().nextLong(-retryNs * 30L / 100L, retryNs * 30L / 100L);
if (retries >= 5 && retries % 5 == 0 || ALWAYS_PRINT_OPTIMISTIC_RETRIES) {
logger.warn(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):"
+ " waiting {} ms before retrying for the {} time", LLUtils.toStringSafe(key), retryNs / 1000000d, retries);
} else if (logger.isDebugEnabled(MARKER_ROCKSDB)) {
logger.debug(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):"
+ " waiting {} ms before retrying for the {} time", LLUtils.toStringSafe(key), retryNs / 1000000d, retries);
}
// Wait for n milliseconds
if (retryNs > 0) {
LockSupport.parkNanos(retryNs);
}
}
} while (!committedSuccessfully);
if (retries > 5) {
logger.warn(MARKER_ROCKSDB, "Took {} retries to update key {}", retries, LLUtils.toStringSafe(key));
recordAtomicUpdateTime(changed, prevData != null, newData != null, initNanoTime);
optimisticAttempts.record(retries);
return switch (returnMode) {
case NOTHING -> RESULT_NOTHING;
case CURRENT -> new UpdateAtomicResultCurrent(newData);
case PREVIOUS -> new UpdateAtomicResultPrevious(prevData);
case BINARY_CHANGED -> new UpdateAtomicResultBinaryChanged(changed);
case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(prevData, newData));
};
}
recordAtomicUpdateTime(changed, prevData != null, newData != null, initNanoTime);
optimisticAttempts.record(retries);
return switch (returnMode) {
case NOTHING -> RESULT_NOTHING;
case CURRENT -> new UpdateAtomicResultCurrent(newData);
case PREVIOUS -> new UpdateAtomicResultPrevious(prevData);
case BINARY_CHANGED -> new UpdateAtomicResultBinaryChanged(changed);
case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(prevData, newData));
};
}
} catch (Exception ex) {
throw new DBException("Failed to update key " + LLUtils.toStringSafe(key), ex);

View File

@ -6,6 +6,8 @@ import io.micrometer.core.instrument.MeterRegistry;
import it.cavallium.buffer.Buf;
import it.cavallium.dbengine.database.LLDelta;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.rocksdb.LLReadOptions;
import it.cavallium.dbengine.database.disk.rocksdb.LLWriteOptions;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.utils.DBException;
import java.io.IOException;
@ -39,14 +41,14 @@ public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn<Transa
}
@Override
protected Transaction beginTransaction(@NotNull WriteOptions writeOptions,
protected Transaction beginTransaction(@NotNull LLWriteOptions writeOptions,
TransactionOptions txOpts) {
return getDb().beginTransaction(writeOptions, txOpts);
return getDb().beginTransaction(writeOptions.getUnsafe(), txOpts);
}
@Override
public @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull ReadOptions readOptions,
@NotNull WriteOptions writeOptions,
public @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull LLReadOptions readOptions,
@NotNull LLWriteOptions writeOptions,
Buf key,
SerializationFunction<@Nullable Buf, @Nullable Buf> updater,
UpdateAtomicResultMode returnMode) {
@ -57,82 +59,83 @@ public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn<Transa
if (LLUtils.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called update in a nonblocking thread");
}
try (var txOpts = new TransactionOptions();
var tx = beginTransaction(writeOptions, txOpts)) {
Buf prevData;
Buf newData;
boolean changed;
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Reading {} (before update lock)", LLUtils.toStringSafe(key));
}
var prevDataArray = tx.getForUpdate(readOptions, cfh, keyArray, true);
try {
try (var txOpts = new TransactionOptions()) {
try (var tx = beginTransaction(writeOptions, txOpts)) {
Buf prevData;
Buf newData;
boolean changed;
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Reading {}: {} (before update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevDataArray)
);
}
if (prevDataArray != null) {
readValueFoundWithoutBloomBufferSize.record(prevDataArray.length);
prevData = Buf.wrap(prevDataArray);
} else {
readValueNotFoundWithoutBloomBufferSize.record(0);
prevData = null;
}
if (prevData != null) {
prevData.freeze();
logger.trace(MARKER_ROCKSDB, "Reading {} (before update lock)", LLUtils.toStringSafe(key));
}
var prevDataArray = tx.getForUpdate(readOptions.getUnsafe(), cfh, keyArray, true);
try {
newData = updater.apply(prevData);
} catch (Exception ex) {
throw new DBException("Failed to update key " + LLUtils.toStringSafe(key) + ". The previous value was:\n" + LLUtils.toStringSafe(prevData, 8192), ex);
}
var newDataArray = newData == null ? null : LLUtils.asArray(newData);
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Updating {}. previous data: {}, updated data: {}",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevDataArray),
LLUtils.toStringSafe(newDataArray)
);
}
if (prevData != null && newData == null) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
}
writeValueBufferSize.record(0);
tx.delete(cfh, keyArray, true);
changed = true;
tx.commit();
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Writing {}: {} (after update)",
"Reading {}: {} (before update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(newData)
LLUtils.toStringSafe(prevDataArray)
);
}
writeValueBufferSize.record(newDataArray.length);
tx.put(cfh, keyArray, newDataArray);
changed = true;
tx.commit();
} else {
changed = false;
tx.rollback();
if (prevDataArray != null) {
readValueFoundWithoutBloomBufferSize.record(prevDataArray.length);
prevData = Buf.wrap(prevDataArray);
} else {
readValueNotFoundWithoutBloomBufferSize.record(0);
prevData = null;
}
if (prevData != null) {
prevData.freeze();
}
try {
newData = updater.apply(prevData);
} catch (Exception ex) {
throw new DBException("Failed to update key " + LLUtils.toStringSafe(key) + ". The previous value was:\n" + LLUtils.toStringSafe(prevData, 8192), ex);
}
var newDataArray = newData == null ? null : LLUtils.asArray(newData);
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Updating {}. previous data: {}, updated data: {}",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevDataArray),
LLUtils.toStringSafe(newDataArray)
);
}
if (prevData != null && newData == null) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
}
writeValueBufferSize.record(0);
tx.delete(cfh, keyArray, true);
changed = true;
tx.commit();
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Writing {}: {} (after update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(newData)
);
}
writeValueBufferSize.record(newDataArray.length);
tx.put(cfh, keyArray, newDataArray);
changed = true;
tx.commit();
} else {
changed = false;
tx.rollback();
}
} finally {
tx.undoGetForUpdate(cfh, keyArray);
}
} finally {
tx.undoGetForUpdate(cfh, keyArray);
recordAtomicUpdateTime(changed, prevData != null, newData != null, initNanoTime);
return switch (returnMode) {
case NOTHING -> RESULT_NOTHING;
case CURRENT -> new UpdateAtomicResultCurrent(newData);
case PREVIOUS -> new UpdateAtomicResultPrevious(prevData);
case BINARY_CHANGED -> new UpdateAtomicResultBinaryChanged(changed);
case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(prevData, newData));
};
}
recordAtomicUpdateTime(changed, prevData != null, newData != null, initNanoTime);
return switch (returnMode) {
case NOTHING -> RESULT_NOTHING;
case CURRENT -> new UpdateAtomicResultCurrent(newData);
case PREVIOUS -> new UpdateAtomicResultPrevious(prevData);
case BINARY_CHANGED -> new UpdateAtomicResultBinaryChanged(changed);
case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(prevData, newData));
};
}
} catch (Exception ex) {
throw new DBException("Failed to update key " + LLUtils.toStringSafe(key), ex);

View File

@ -4,6 +4,8 @@ import io.micrometer.core.instrument.MeterRegistry;
import it.cavallium.buffer.Buf;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.rocksdb.LLReadOptions;
import it.cavallium.dbengine.database.disk.rocksdb.LLWriteOptions;
import it.cavallium.dbengine.database.disk.rocksdb.RocksIteratorObj;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import java.io.IOException;
@ -13,21 +15,19 @@ import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.CompactRangeOptions;
import org.rocksdb.FlushOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
public sealed interface RocksDBColumn permits AbstractRocksDBColumn {
/**
* This method should not modify or move the writerIndex/readerIndex of the buffers inside the range
*/
@NotNull RocksIteratorObj newRocksIterator(ReadOptions readOptions,
@NotNull RocksIteratorObj newRocksIterator(LLReadOptions readOptions,
LLRange range,
boolean reverse) throws RocksDBException;
default byte @Nullable [] get(@NotNull ReadOptions readOptions,
default byte @Nullable [] get(@NotNull LLReadOptions readOptions,
byte[] key,
boolean existsAlmostCertainly)
throws RocksDBException {
@ -39,33 +39,33 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn {
}
@Nullable
Buf get(@NotNull ReadOptions readOptions, Buf key) throws RocksDBException;
Buf get(@NotNull LLReadOptions readOptions, Buf key) throws RocksDBException;
boolean exists(@NotNull ReadOptions readOptions, Buf key) throws RocksDBException;
boolean exists(@NotNull LLReadOptions readOptions, Buf key) throws RocksDBException;
boolean mayExists(@NotNull ReadOptions readOptions, Buf key);
boolean mayExists(@NotNull LLReadOptions readOptions, Buf key);
void put(@NotNull WriteOptions writeOptions, Buf key, Buf value) throws RocksDBException;
void put(@NotNull LLWriteOptions writeOptions, Buf key, Buf value) throws RocksDBException;
default void put(@NotNull WriteOptions writeOptions, byte[] key, byte[] value) throws RocksDBException {
default void put(@NotNull LLWriteOptions writeOptions, byte[] key, byte[] value) throws RocksDBException {
this.put(writeOptions, Buf.wrap(key), Buf.wrap(value));
}
@NotNull RocksIteratorObj newIterator(@NotNull ReadOptions readOptions, @Nullable Buf min, @Nullable Buf max);
@NotNull RocksIteratorObj newIterator(@NotNull LLReadOptions readOptions, @Nullable Buf min, @Nullable Buf max);
@NotNull UpdateAtomicResult updateAtomic(@NotNull ReadOptions readOptions,
@NotNull WriteOptions writeOptions,
@NotNull UpdateAtomicResult updateAtomic(@NotNull LLReadOptions readOptions,
@NotNull LLWriteOptions writeOptions,
Buf key,
SerializationFunction<@Nullable Buf, @Nullable Buf> updater,
UpdateAtomicResultMode returnMode);
void delete(WriteOptions writeOptions, Buf key) throws RocksDBException;
void delete(LLWriteOptions writeOptions, Buf key) throws RocksDBException;
void delete(WriteOptions writeOptions, byte[] key) throws RocksDBException;
void delete(LLWriteOptions writeOptions, byte[] key) throws RocksDBException;
List<byte[]> multiGetAsList(ReadOptions readOptions, List<byte[]> keys) throws RocksDBException;
List<byte[]> multiGetAsList(LLReadOptions readOptions, List<byte[]> keys) throws RocksDBException;
void write(WriteOptions writeOptions, WriteBatch writeBatch) throws RocksDBException;
void write(LLWriteOptions writeOptions, WriteBatch writeBatch) throws RocksDBException;
void suggestCompactRange() throws RocksDBException;

View File

@ -3,6 +3,7 @@ package it.cavallium.dbengine.database.disk;
import static com.google.common.collect.Lists.partition;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.utils.SimpleResource;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
@ -107,4 +108,10 @@ public class RocksDBUtils {
throw new IllegalStateException("Not owning handle");
}
}
public static void ensureOwned(@Nullable SimpleResource simpleResource) {
if (simpleResource != null && simpleResource.isClosed()) {
throw new IllegalStateException("Resource is closed");
}
}
}

View File

@ -6,6 +6,8 @@ import io.micrometer.core.instrument.MeterRegistry;
import it.cavallium.buffer.Buf;
import it.cavallium.dbengine.database.LLDelta;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.rocksdb.LLReadOptions;
import it.cavallium.dbengine.database.disk.rocksdb.LLWriteOptions;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.utils.DBException;
import java.io.IOException;
@ -35,14 +37,14 @@ public final class StandardRocksDBColumn extends AbstractRocksDBColumn<RocksDB>
}
@Override
protected Transaction beginTransaction(@NotNull WriteOptions writeOptions,
protected Transaction beginTransaction(@NotNull LLWriteOptions writeOptions,
TransactionOptions txOpts) {
throw new UnsupportedOperationException("Transactions not supported");
}
@Override
public @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull ReadOptions readOptions,
@NotNull WriteOptions writeOptions,
public @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull LLReadOptions readOptions,
@NotNull LLWriteOptions writeOptions,
Buf key,
SerializationFunction<@Nullable Buf, @Nullable Buf> updater,
UpdateAtomicResultMode returnMode) {

View File

@ -8,6 +8,7 @@ public final class LLColumnFamilyHandle extends SimpleResource {
private final ColumnFamilyHandle val;
public LLColumnFamilyHandle(ColumnFamilyHandle val) {
super(val::close);
this.val = val;
}

View File

@ -8,6 +8,7 @@ public final class LLCompactionOptions extends SimpleResource {
private final CompactionOptions val;
public LLCompactionOptions(CompactionOptions val) {
super(val::close);
this.val = val;
}

View File

@ -1,18 +1,89 @@
package it.cavallium.dbengine.database.disk.rocksdb;
import it.cavallium.dbengine.database.disk.IteratorMetrics;
import it.cavallium.dbengine.utils.SimpleResource;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.Snapshot;
public final class LLReadOptions extends SimpleResource {
private final ReadOptions val;
private LLSlice itLowerBoundRef;
private LLSlice itUpperBoundRef;
private Snapshot snapshot;
public LLReadOptions(ReadOptions val) {
super(val::close);
this.val = val;
}
public LLReadOptions() {
this(new ReadOptions());
}
public LLReadOptions copy() {
var ro = new LLReadOptions(new ReadOptions(this.val));
ro.itUpperBoundRef = this.itUpperBoundRef;
ro.itLowerBoundRef = this.itLowerBoundRef;
ro.snapshot = this.snapshot;
return ro;
}
@Override
protected void onClose() {
val.close();
itLowerBoundRef = null;
itUpperBoundRef = null;
snapshot = null;
}
public void setIterateLowerBound(LLSlice slice) {
val.setIterateLowerBound(slice.getSliceUnsafe());
itLowerBoundRef = slice;
}
public void setIterateUpperBound(LLSlice slice) {
val.setIterateUpperBound(slice.getSliceUnsafe());
itUpperBoundRef = slice;
}
public long readaheadSize() {
return val.readaheadSize();
}
public void setReadaheadSize(long readaheadSize) {
val.setReadaheadSize(readaheadSize);
}
public RocksIteratorObj newIterator(RocksDB db, ColumnFamilyHandle cfh, IteratorMetrics iteratorMetrics) {
return new RocksIteratorObj(db.newIterator(cfh, val), this, iteratorMetrics);
}
public void setFillCache(boolean fillCache) {
val.setFillCache(fillCache);
}
public void setVerifyChecksums(boolean verifyChecksums) {
val.setVerifyChecksums(verifyChecksums);
}
public ReadOptions getUnsafe() {
return val;
}
public LLReadOptions setSnapshot(Snapshot snapshot) {
val.setSnapshot(snapshot);
this.snapshot = snapshot;
return this;
}
public void setIgnoreRangeDeletions(boolean ignoreRangeDeletions) {
val.setIgnoreRangeDeletions(ignoreRangeDeletions);
}
public Snapshot snapshot() {
return val.snapshot();
}
}

View File

@ -0,0 +1,28 @@
package it.cavallium.dbengine.database.disk.rocksdb;
import it.cavallium.dbengine.utils.SimpleResource;
import org.rocksdb.AbstractSlice;
import org.rocksdb.Slice;
public final class LLSlice extends SimpleResource {
private final AbstractSlice<?> val;
public LLSlice(AbstractSlice<?> val) {
super(val::close);
this.val = val;
}
public static LLSlice of(byte[] data) {
return new LLSlice(new Slice(data));
}
public AbstractSlice<?> getSliceUnsafe() {
return val;
}
@Override
protected void onClose() {
val.close();
}
}

View File

@ -8,11 +8,40 @@ public final class LLWriteOptions extends SimpleResource {
private final WriteOptions val;
public LLWriteOptions(WriteOptions val) {
super(val::close);
this.val = val;
}
public LLWriteOptions() {
this(new WriteOptions());
}
@Override
protected void onClose() {
val.close();
}
public WriteOptions getUnsafe() {
return val;
}
public LLWriteOptions setDisableWAL(boolean disableWAL) {
val.setDisableWAL(disableWAL);
return this;
}
public LLWriteOptions setNoSlowdown(boolean noSlowdown) {
val.setNoSlowdown(noSlowdown);
return this;
}
public LLWriteOptions setLowPri(boolean lowPri) {
val.setLowPri(lowPri);
return this;
}
public LLWriteOptions setMemtableInsertHintPerBatch(boolean memtableInsertHintPerBatch) {
val.setMemtableInsertHintPerBatch(memtableInsertHintPerBatch);
return this;
}
}

View File

@ -4,6 +4,7 @@ import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import it.cavallium.buffer.Buf;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.IteratorMetrics;
import it.cavallium.dbengine.utils.SimpleResource;
import java.nio.ByteBuffer;
import org.rocksdb.AbstractSlice;
@ -12,73 +13,28 @@ import org.rocksdb.RocksIterator;
public class RocksIteratorObj extends SimpleResource {
private RocksIterator rocksIterator;
private AbstractSlice<?> sliceMin;
private AbstractSlice<?> sliceMax;
private Buf min;
private Buf max;
private LLReadOptions readOptions;
private final RocksIterator rocksIterator;
private final Counter startedIterSeek;
private final Counter endedIterSeek;
private final Timer iterSeekTime;
private final Counter startedIterNext;
private final Counter endedIterNext;
private final Timer iterNextTime;
private Object seekingFrom;
private Object seekingTo;
private byte[] seekingFrom;
private byte[] seekingTo;
public RocksIteratorObj(RocksIterator rocksIterator,
AbstractSlice<?> sliceMin,
AbstractSlice<?> sliceMax,
Buf min,
Buf max,
Counter startedIterSeek,
Counter endedIterSeek,
Timer iterSeekTime,
Counter startedIterNext,
Counter endedIterNext,
Timer iterNextTime) {
this(rocksIterator,
sliceMin,
sliceMax,
min,
max,
startedIterSeek,
endedIterSeek,
iterSeekTime,
startedIterNext,
endedIterNext,
iterNextTime,
null,
null
);
}
private RocksIteratorObj(RocksIterator rocksIterator,
AbstractSlice<?> sliceMin,
AbstractSlice<?> sliceMax,
Buf min,
Buf max,
Counter startedIterSeek,
Counter endedIterSeek,
Timer iterSeekTime,
Counter startedIterNext,
Counter endedIterNext,
Timer iterNextTime,
Object seekingFrom,
Object seekingTo) {
this.sliceMin = sliceMin;
this.sliceMax = sliceMax;
this.min = min;
this.max = max;
RocksIteratorObj(RocksIterator rocksIterator,
LLReadOptions readOptions, IteratorMetrics iteratorMetrics) {
super(rocksIterator::close);
this.readOptions = readOptions;
this.rocksIterator = rocksIterator;
this.startedIterSeek = startedIterSeek;
this.endedIterSeek = endedIterSeek;
this.iterSeekTime = iterSeekTime;
this.startedIterNext = startedIterNext;
this.endedIterNext = endedIterNext;
this.iterNextTime = iterNextTime;
this.seekingFrom = seekingFrom;
this.seekingTo = seekingTo;
this.startedIterSeek = iteratorMetrics.startedIterSeek();
this.startedIterNext = iteratorMetrics.startedIterNext();
this.iterSeekTime = iteratorMetrics.iterSeekTime();
this.endedIterNext = iteratorMetrics.endedIterNext();
this.endedIterSeek = iteratorMetrics.endedIterSeek();
this.iterNextTime = iteratorMetrics.iterNextTime();
}
public void seek(ByteBuffer seekBuf) throws RocksDBException {
@ -233,11 +189,8 @@ public class RocksIteratorObj extends SimpleResource {
if (rocksIterator != null) {
rocksIterator.close();
}
if (sliceMin != null) {
sliceMin.close();
}
if (sliceMax != null) {
sliceMax.close();
}
seekingFrom = null;
seekingTo = null;
readOptions = null;
}
}

View File

@ -3,6 +3,7 @@ package it.cavallium.dbengine.tests;
import static java.util.Map.entry;
import it.cavallium.dbengine.database.disk.KeyMayExistGetter;
import it.cavallium.dbengine.database.disk.rocksdb.LLReadOptions;
import it.unimi.dsi.fastutil.bytes.ByteList;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
@ -42,22 +43,22 @@ public class TestGetter {
public KeyMayExistGetter getter = new KeyMayExistGetter() {
@Override
protected KeyMayExist keyMayExist(ReadOptions readOptions, ByteBuffer key, ByteBuffer value) {
protected KeyMayExist keyMayExist(LLReadOptions readOptions, ByteBuffer key, ByteBuffer value) {
return null;
}
@Override
protected boolean keyMayExist(ReadOptions readOptions, byte[] key, @Nullable Holder<byte[]> valueHolder) {
protected boolean keyMayExist(LLReadOptions readOptions, byte[] key, @Nullable Holder<byte[]> valueHolder) {
throw new UnsupportedOperationException();
}
@Override
protected int get(ReadOptions opt, ByteBuffer key, ByteBuffer value) throws RocksDBException {
protected int get(LLReadOptions opt, ByteBuffer key, ByteBuffer value) throws RocksDBException {
return 0;
}
@Override
protected byte[] get(ReadOptions opt, byte[] key) throws RocksDBException, IllegalArgumentException {
protected byte[] get(LLReadOptions opt, byte[] key) throws RocksDBException, IllegalArgumentException {
throw new UnsupportedOperationException();
}