Trace leaks

This commit is contained in:
Andrea Cavalli 2022-05-12 19:14:27 +02:00
parent 6a5a9a3e94
commit c9a12760bc
36 changed files with 1155 additions and 799 deletions

View File

@ -16,10 +16,11 @@ import io.netty5.buffer.api.Send;
import io.netty5.buffer.api.WritableComponent; import io.netty5.buffer.api.WritableComponent;
import io.netty5.buffer.api.internal.Statics; import io.netty5.buffer.api.internal.Statics;
import io.netty5.util.IllegalReferenceCountException; import io.netty5.util.IllegalReferenceCountException;
import it.cavallium.dbengine.database.disk.RocksIteratorTuple;
import it.cavallium.dbengine.database.disk.UpdateAtomicResultCurrent; import it.cavallium.dbengine.database.disk.UpdateAtomicResultCurrent;
import it.cavallium.dbengine.database.disk.UpdateAtomicResultDelta; import it.cavallium.dbengine.database.disk.UpdateAtomicResultDelta;
import it.cavallium.dbengine.database.disk.UpdateAtomicResultPrevious; import it.cavallium.dbengine.database.disk.UpdateAtomicResultPrevious;
import it.cavallium.dbengine.database.disk.rocksdb.RocksIteratorObj;
import it.cavallium.dbengine.database.disk.rocksdb.RocksObj;
import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.lucene.RandomSortField; import it.cavallium.dbengine.lucene.RandomSortField;
@ -86,6 +87,7 @@ public class LLUtils {
public static final byte[][] LEXICONOGRAPHIC_ITERATION_SEEKS = new byte[256][1]; public static final byte[][] LEXICONOGRAPHIC_ITERATION_SEEKS = new byte[256][1];
public static final AtomicBoolean hookRegistered = new AtomicBoolean(); public static final AtomicBoolean hookRegistered = new AtomicBoolean();
public static final boolean MANUAL_READAHEAD = false; public static final boolean MANUAL_READAHEAD = false;
public static final boolean ALLOW_STATIC_OPTIONS = false;
public static final boolean FORCE_DISABLE_CHECKSUM_VERIFICATION public static final boolean FORCE_DISABLE_CHECKSUM_VERIFICATION
= Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.checksum.disable.force", "false")); = Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.checksum.disable.force", "false"));
@ -729,26 +731,26 @@ public class LLUtils {
* @param smallRange true if the range is small * @param smallRange true if the range is small
* @return the passed instance of ReadOptions, or a new one if the passed readOptions is null * @return the passed instance of ReadOptions, or a new one if the passed readOptions is null
*/ */
public static ReadOptions generateCustomReadOptions(@Nullable ReadOptions readOptions, public static RocksObj<ReadOptions> generateCustomReadOptions(@Nullable RocksObj<ReadOptions> readOptions,
boolean canFillCache, boolean canFillCache,
boolean boundedRange, boolean boundedRange,
boolean smallRange) { boolean smallRange) {
if (readOptions == null) { if (readOptions == null) {
//noinspection resource //noinspection resource
readOptions = new ReadOptions(); readOptions = new RocksObj<>(new ReadOptions());
} }
if (boundedRange || smallRange) { if (boundedRange || smallRange) {
readOptions.setFillCache(canFillCache); readOptions.v().setFillCache(canFillCache);
} else { } else {
if (readOptions.readaheadSize() <= 0) { if (readOptions.v().readaheadSize() <= 0) {
readOptions.setReadaheadSize(4 * 1024 * 1024); // 4MiB readOptions.v().setReadaheadSize(4 * 1024 * 1024); // 4MiB
} }
readOptions.setFillCache(false); readOptions.v().setFillCache(false);
readOptions.setVerifyChecksums(false); readOptions.v().setVerifyChecksums(false);
} }
if (FORCE_DISABLE_CHECKSUM_VERIFICATION) { if (FORCE_DISABLE_CHECKSUM_VERIFICATION) {
readOptions.setVerifyChecksums(false); readOptions.v().setVerifyChecksums(false);
} }
return readOptions; return readOptions;
@ -1012,8 +1014,10 @@ public class LLUtils {
iterable.forEach(LLUtils::onNextDropped); iterable.forEach(LLUtils::onNextDropped);
} else if (next instanceof SafeCloseable safeCloseable) { } else if (next instanceof SafeCloseable safeCloseable) {
safeCloseable.close(); safeCloseable.close();
} else if (next instanceof RocksIteratorTuple iteratorTuple) { } else if (next instanceof RocksIteratorObj rocksIteratorObj) {
iteratorTuple.close(); rocksIteratorObj.close();
} else if (next instanceof RocksObj<?> rocksObj) {
rocksObj.close();
} else if (next instanceof UpdateAtomicResultDelta delta) { } else if (next instanceof UpdateAtomicResultDelta delta) {
delta.delta().close(); delta.delta().close();
} else if (next instanceof UpdateAtomicResultCurrent cur) { } else if (next instanceof UpdateAtomicResultCurrent cur) {

View File

@ -1,6 +1,6 @@
package it.cavallium.dbengine.database; package it.cavallium.dbengine.database;
public interface SafeCloseable extends AutoCloseable { public interface SafeCloseable extends io.netty5.util.SafeCloseable {
@Override @Override
void close(); void close();

View File

@ -19,21 +19,14 @@ import io.netty5.buffer.api.WritableComponent;
import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.RepeatedElementList; import it.cavallium.dbengine.database.RepeatedElementList;
import it.cavallium.dbengine.database.SafeCloseable; import it.cavallium.dbengine.database.disk.rocksdb.RocksIteratorObj;
import it.cavallium.dbengine.database.disk.LLLocalDictionary.ReleasableSliceImplWithRelease; import it.cavallium.dbengine.database.disk.rocksdb.RocksObj;
import it.cavallium.dbengine.database.disk.LLLocalDictionary.ReleasableSliceImplWithoutRelease;
import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.rpc.current.data.Column;
import it.cavallium.dbengine.rpc.current.data.NamedColumnOptions;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.StampedLock; import java.util.concurrent.locks.StampedLock;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@ -42,18 +35,17 @@ import org.jetbrains.annotations.Nullable;
import org.rocksdb.AbstractSlice; import org.rocksdb.AbstractSlice;
import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.CompactRangeOptions; import org.rocksdb.CompactRangeOptions;
import org.rocksdb.CompactionOptions;
import org.rocksdb.DirectSlice; import org.rocksdb.DirectSlice;
import org.rocksdb.FlushOptions; import org.rocksdb.FlushOptions;
import org.rocksdb.Holder; import org.rocksdb.Holder;
import org.rocksdb.KeyMayExist.KeyMayExistEnum; import org.rocksdb.KeyMayExist.KeyMayExistEnum;
import org.rocksdb.LevelMetaData;
import org.rocksdb.ReadOptions; import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB; import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException; import org.rocksdb.RocksDBException;
import org.rocksdb.RocksObject;
import org.rocksdb.Slice; import org.rocksdb.Slice;
import org.rocksdb.SstFileMetaData;
import org.rocksdb.Transaction; import org.rocksdb.Transaction;
import org.rocksdb.TransactionOptions;
import org.rocksdb.WriteBatch; import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions; import org.rocksdb.WriteOptions;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
@ -73,7 +65,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
private final T db; private final T db;
private final boolean nettyDirect; private final boolean nettyDirect;
private final BufferAllocator alloc; private final BufferAllocator alloc;
private final ColumnFamilyHandle cfh; private final RocksObj<ColumnFamilyHandle> cfh;
protected final MeterRegistry meterRegistry; protected final MeterRegistry meterRegistry;
protected final StampedLock closeLock; protected final StampedLock closeLock;
@ -108,7 +100,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
boolean nettyDirect, boolean nettyDirect,
BufferAllocator alloc, BufferAllocator alloc,
String databaseName, String databaseName,
ColumnFamilyHandle cfh, RocksObj<ColumnFamilyHandle> cfh,
MeterRegistry meterRegistry, MeterRegistry meterRegistry,
StampedLock closeLock) { StampedLock closeLock) {
this.db = db; this.db = db;
@ -117,7 +109,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
this.cfh = cfh; this.cfh = cfh;
String columnName; String columnName;
try { try {
columnName = new String(cfh.getName(), StandardCharsets.UTF_8); columnName = new String(cfh.v().getName(), StandardCharsets.UTF_8);
} catch (RocksDBException e) { } catch (RocksDBException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
@ -254,81 +246,59 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
/** /**
* This method should not modify or move the writerIndex/readerIndex of the key * This method should not modify or move the writerIndex/readerIndex of the key
*/ */
static ReleasableSlice setIterateBound(boolean allowNettyDirect, static RocksObj<? extends AbstractSlice<?>> setIterateBound(boolean allowNettyDirect,
ReadOptions readOpts, IterateBound boundType, Buffer key) { RocksObj<ReadOptions> readOpts, IterateBound boundType, Buffer key) {
requireNonNull(key); requireNonNull(key);
AbstractSlice<?> slice; RocksObj<? extends AbstractSlice<?>> slice;
if (allowNettyDirect && USE_DIRECT_BUFFER_BOUNDS && isReadOnlyDirect(key)) { if (allowNettyDirect && USE_DIRECT_BUFFER_BOUNDS && isReadOnlyDirect(key)) {
ByteBuffer keyInternalByteBuffer = ((ReadableComponent) key).readableBuffer(); ByteBuffer keyInternalByteBuffer = ((ReadableComponent) key).readableBuffer();
assert keyInternalByteBuffer.position() == 0; assert keyInternalByteBuffer.position() == 0;
slice = new DirectSlice(keyInternalByteBuffer, key.readableBytes()); slice = new RocksObj<>(new DirectSlice(keyInternalByteBuffer, key.readableBytes()));
assert slice.size() == key.readableBytes(); assert slice.v().size() == key.readableBytes();
} else { } else {
slice = new Slice(requireNonNull(LLUtils.toArray(key))); slice = new RocksObj<>(new Slice(requireNonNull(LLUtils.toArray(key))));
} }
if (boundType == IterateBound.LOWER) { if (boundType == IterateBound.LOWER) {
readOpts.setIterateLowerBound(slice); readOpts.v().setIterateLowerBound(slice.v());
} else { } else {
readOpts.setIterateUpperBound(slice); readOpts.v().setIterateUpperBound(slice.v());
} }
return new ReleasableSliceImplWithRelease(slice); return slice;
} }
static ReleasableSlice emptyReleasableSlice() { static RocksObj<Slice> newEmptyReleasableSlice() {
var arr = new byte[0]; var arr = new byte[0];
return new ReleasableSliceImplWithRelease(new Slice(arr)); return new RocksObj<>(new Slice(arr));
} }
/** /**
* This method should not modify or move the writerIndex/readerIndex of the buffers inside the range * This method should not modify or move the writerIndex/readerIndex of the buffers inside the range
*/ */
@NotNull @NotNull
public RocksIteratorTuple newRocksIterator(boolean allowNettyDirect, public RocksIteratorObj newRocksIterator(boolean allowNettyDirect,
ReadOptions readOptions, RocksObj<ReadOptions> readOptions,
LLRange range, LLRange range,
boolean reverse) throws RocksDBException { boolean reverse) throws RocksDBException {
assert !Schedulers.isInNonBlockingThread() : "Called getRocksIterator in a nonblocking thread"; assert !Schedulers.isInNonBlockingThread() : "Called getRocksIterator in a nonblocking thread";
ReleasableSlice sliceMin; var rocksIterator = this.newIterator(readOptions, range.getMinUnsafe(), range.getMaxUnsafe());
ReleasableSlice sliceMax;
if (range.hasMin()) {
sliceMin = setIterateBound(allowNettyDirect, readOptions, IterateBound.LOWER, range.getMinUnsafe());
} else {
sliceMin = emptyReleasableSlice();
}
if (range.hasMax()) {
sliceMax = setIterateBound(allowNettyDirect, readOptions, IterateBound.UPPER, range.getMaxUnsafe());
} else {
sliceMax = emptyReleasableSlice();
}
SafeCloseable seekFromOrTo = null;
var rocksIterator = this.newIterator(readOptions);
try { try {
if (reverse) { if (reverse) {
if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMax()) { if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMax()) {
seekFromOrTo = Objects.requireNonNullElseGet(rocksIterator.seekFrom(range.getMaxUnsafe()), rocksIterator.seekFrom(range.getMaxUnsafe());
() -> ((SafeCloseable) () -> {}));
} else { } else {
seekFromOrTo = () -> {};
rocksIterator.seekToLast(); rocksIterator.seekToLast();
} }
} else { } else {
if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) { if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) {
seekFromOrTo = Objects.requireNonNullElseGet(rocksIterator.seekTo(range.getMinUnsafe()), rocksIterator.seekTo(range.getMinUnsafe());
() -> ((SafeCloseable) () -> {}));
} else { } else {
seekFromOrTo = () -> {};
rocksIterator.seekToFirst(); rocksIterator.seekToFirst();
} }
} }
return new RocksIteratorTuple(rocksIterator, sliceMin, sliceMax, seekFromOrTo); return rocksIterator;
} catch (Throwable ex) { } catch (Throwable ex) {
rocksIterator.close(); rocksIterator.close();
sliceMax.close();
sliceMax.close();
if (seekFromOrTo != null) {
seekFromOrTo.close();
}
throw ex; throw ex;
} }
} }
@ -337,7 +307,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
return db; return db;
} }
protected ColumnFamilyHandle getCfh() { protected RocksObj<ColumnFamilyHandle> getCfh() {
return cfh; return cfh;
} }
@ -345,12 +315,23 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
RocksDBUtils.ensureOpen(db, cfh); RocksDBUtils.ensureOpen(db, cfh);
} }
protected void ensureOwned(org.rocksdb.RocksObject rocksObject) { protected void ensureOwned(RocksObj<?> rocksObject) {
RocksDBUtils.ensureOwned(rocksObject); RocksDBUtils.ensureOwned(rocksObject);
} }
protected void ensureOwned(RocksObject rocksObject) {
RocksDBUtils.ensureOwned(rocksObject);
}
protected void ensureOwned(Buffer buffer) {
if (buffer != null && !buffer.isAccessible()) {
throw new IllegalStateException("Buffer is not accessible");
}
}
@Override @Override
public @Nullable Buffer get(@NotNull ReadOptions readOptions, Buffer key) throws RocksDBException { public @Nullable Buffer get(@NotNull RocksObj<ReadOptions> readOptions, Buffer key) throws RocksDBException {
var closeReadLock = closeLock.readLock(); var closeReadLock = closeLock.readLock();
try { try {
ensureOpen(); ensureOpen();
@ -385,7 +366,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
assert resultBuffer.writerOffset() == 0; assert resultBuffer.writerOffset() == 0;
var resultWritable = ((WritableComponent) resultBuffer).writableBuffer(); var resultWritable = ((WritableComponent) resultBuffer).writableBuffer();
var keyMayExist = db.keyMayExist(cfh, readOptions, keyNioBuffer.rewind(), var keyMayExist = db.keyMayExist(cfh.v(), readOptions.v(), keyNioBuffer.rewind(),
resultWritable.clear()); resultWritable.clear());
KeyMayExistEnum keyMayExistState = keyMayExist.exists; KeyMayExistEnum keyMayExistState = keyMayExist.exists;
int keyMayExistValueLength = keyMayExist.valueLength; int keyMayExistValueLength = keyMayExist.valueLength;
@ -416,7 +397,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
resultWritable.clear(); resultWritable.clear();
readAttemptsCount++; readAttemptsCount++;
// real data size // real data size
size = db.get(cfh, readOptions, keyNioBuffer.rewind(), resultWritable.clear()); size = db.get(cfh.v(), readOptions.v(), keyNioBuffer.rewind(), resultWritable.clear());
if (size == RocksDB.NOT_FOUND) { if (size == RocksDB.NOT_FOUND) {
resultBuffer.close(); resultBuffer.close();
readValueNotFoundWithMayExistBloomBufferSize.record(0); readValueNotFoundWithMayExistBloomBufferSize.record(0);
@ -442,7 +423,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
assert resultBuffer.writerOffset() == 0; assert resultBuffer.writerOffset() == 0;
readAttemptsCount++; readAttemptsCount++;
size = db.get(cfh, readOptions, keyNioBuffer.rewind(), resultWritable.clear()); size = db.get(cfh.v(), readOptions.v(), keyNioBuffer.rewind(), resultWritable.clear());
if (size == RocksDB.NOT_FOUND) { if (size == RocksDB.NOT_FOUND) {
readValueNotFoundWithMayExistBloomBufferSize.record(0); readValueNotFoundWithMayExistBloomBufferSize.record(0);
resultBuffer.close(); resultBuffer.close();
@ -471,7 +452,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
byte[] keyArray = LLUtils.toArray(key); byte[] keyArray = LLUtils.toArray(key);
requireNonNull(keyArray); requireNonNull(keyArray);
Holder<byte[]> data = new Holder<>(); Holder<byte[]> data = new Holder<>();
if (db.keyMayExist(cfh, readOptions, keyArray, data)) { if (db.keyMayExist(cfh.v(), readOptions.v(), keyArray, data)) {
// todo: "data.getValue().length > 0" is checked because keyMayExist is broken, and sometimes it // todo: "data.getValue().length > 0" is checked because keyMayExist is broken, and sometimes it
// returns an empty array, as if it exists // returns an empty array, as if it exists
if (data.getValue() != null && data.getValue().length > 0) { if (data.getValue() != null && data.getValue().length > 0) {
@ -479,7 +460,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
return LLUtils.fromByteArray(alloc, data.getValue()); return LLUtils.fromByteArray(alloc, data.getValue());
} else { } else {
readAttemptsCount++; readAttemptsCount++;
byte[] result = db.get(cfh, readOptions, keyArray); byte[] result = db.get(cfh.v(), readOptions.v(), keyArray);
if (result == null) { if (result == null) {
if (data.getValue() != null) { if (data.getValue() != null) {
readValueNotFoundWithBloomBufferSize.record(0); readValueNotFoundWithBloomBufferSize.record(0);
@ -506,7 +487,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
} }
@Override @Override
public void put(@NotNull WriteOptions writeOptions, Buffer key, Buffer value) throws RocksDBException { public void put(@NotNull RocksObj<WriteOptions> writeOptions, Buffer key, Buffer value) throws RocksDBException {
var closeReadLock = closeLock.readLock(); var closeReadLock = closeLock.readLock();
try { try {
ensureOpen(); ensureOpen();
@ -553,7 +534,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
} }
try { try {
db.put(cfh, writeOptions, keyNioBuffer, valueNioBuffer); db.put(cfh.v(), writeOptions.v(), keyNioBuffer, valueNioBuffer);
} finally { } finally {
if (mustCloseValue) { if (mustCloseValue) {
value.close(); value.close();
@ -565,7 +546,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
} }
} }
} else { } else {
db.put(cfh, writeOptions, LLUtils.toArray(key), LLUtils.toArray(value)); db.put(cfh.v(), writeOptions.v(), LLUtils.toArray(key), LLUtils.toArray(value));
} }
} finally { } finally {
closeLock.unlockRead(closeReadLock); closeLock.unlockRead(closeReadLock);
@ -573,7 +554,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
} }
@Override @Override
public boolean exists(@NotNull ReadOptions readOptions, Buffer key) throws RocksDBException { public boolean exists(@NotNull RocksObj<ReadOptions> readOptions, Buffer key) throws RocksDBException {
var closeReadLock = closeLock.readLock(); var closeReadLock = closeLock.readLock();
try { try {
ensureOpen(); ensureOpen();
@ -597,8 +578,8 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
assert keyNioBuffer.limit() == key.readableBytes(); assert keyNioBuffer.limit() == key.readableBytes();
} }
try { try {
if (db.keyMayExist(cfh, keyNioBuffer)) { if (db.keyMayExist(cfh.v(), keyNioBuffer)) {
int size = db.get(cfh, readOptions, keyNioBuffer.position(0), LLUtils.EMPTY_BYTE_BUFFER); int size = db.get(cfh.v(), readOptions.v(), keyNioBuffer.position(0), LLUtils.EMPTY_BYTE_BUFFER);
boolean found = size != RocksDB.NOT_FOUND; boolean found = size != RocksDB.NOT_FOUND;
if (found) { if (found) {
readValueFoundWithBloomSimpleBufferSize.record(size); readValueFoundWithBloomSimpleBufferSize.record(size);
@ -621,12 +602,12 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
byte[] keyBytes = LLUtils.toArray(key); byte[] keyBytes = LLUtils.toArray(key);
Holder<byte[]> data = new Holder<>(); Holder<byte[]> data = new Holder<>();
boolean mayExistHit = false; boolean mayExistHit = false;
if (db.keyMayExist(cfh, readOptions, keyBytes, data)) { if (db.keyMayExist(cfh.v(), readOptions.v(), keyBytes, data)) {
mayExistHit = true; mayExistHit = true;
if (data.getValue() != null) { if (data.getValue() != null) {
size = data.getValue().length; size = data.getValue().length;
} else { } else {
size = db.get(cfh, readOptions, keyBytes, NO_DATA); size = db.get(cfh.v(), readOptions.v(), keyBytes, NO_DATA);
} }
} }
boolean found = size != RocksDB.NOT_FOUND; boolean found = size != RocksDB.NOT_FOUND;
@ -647,7 +628,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
} }
@Override @Override
public boolean mayExists(@NotNull ReadOptions readOptions, Buffer key) throws RocksDBException { public boolean mayExists(@NotNull RocksObj<ReadOptions> readOptions, Buffer key) throws RocksDBException {
var closeReadLock = closeLock.readLock(); var closeReadLock = closeLock.readLock();
try { try {
ensureOpen(); ensureOpen();
@ -671,7 +652,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
assert keyNioBuffer.limit() == key.readableBytes(); assert keyNioBuffer.limit() == key.readableBytes();
} }
try { try {
return db.keyMayExist(cfh, keyNioBuffer); return db.keyMayExist(cfh.v(), readOptions.v(), keyNioBuffer);
} finally { } finally {
if (mustCloseKey) { if (mustCloseKey) {
key.close(); key.close();
@ -679,7 +660,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
} }
} else { } else {
byte[] keyBytes = LLUtils.toArray(key); byte[] keyBytes = LLUtils.toArray(key);
return db.keyMayExist(cfh, readOptions, keyBytes, null); return db.keyMayExist(cfh.v(), readOptions.v(), keyBytes, null);
} }
} finally { } finally {
closeLock.unlockRead(closeReadLock); closeLock.unlockRead(closeReadLock);
@ -687,7 +668,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
} }
@Override @Override
public void delete(WriteOptions writeOptions, Buffer key) throws RocksDBException { public void delete(RocksObj<WriteOptions> writeOptions, Buffer key) throws RocksDBException {
var closeReadLock = closeLock.readLock(); var closeReadLock = closeLock.readLock();
try { try {
ensureOpen(); ensureOpen();
@ -712,14 +693,14 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
assert keyNioBuffer.limit() == key.readableBytes(); assert keyNioBuffer.limit() == key.readableBytes();
} }
try { try {
db.delete(cfh, writeOptions, keyNioBuffer); db.delete(cfh.v(), writeOptions.v(), keyNioBuffer);
} finally { } finally {
if (mustCloseKey) { if (mustCloseKey) {
key.close(); key.close();
} }
} }
} else { } else {
db.delete(cfh, writeOptions, LLUtils.toArray(key)); db.delete(cfh.v(), writeOptions.v(), LLUtils.toArray(key));
} }
} finally { } finally {
closeLock.unlockRead(closeReadLock); closeLock.unlockRead(closeReadLock);
@ -727,20 +708,20 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
} }
@Override @Override
public void delete(WriteOptions writeOptions, byte[] key) throws RocksDBException { public void delete(RocksObj<WriteOptions> writeOptions, byte[] key) throws RocksDBException {
var closeReadLock = closeLock.readLock(); var closeReadLock = closeLock.readLock();
try { try {
ensureOpen(); ensureOpen();
ensureOwned(writeOptions); ensureOwned(writeOptions);
keyBufferSize.record(key.length); keyBufferSize.record(key.length);
db.delete(cfh, writeOptions, key); db.delete(cfh.v(), writeOptions.v(), key);
} finally { } finally {
closeLock.unlockRead(closeReadLock); closeLock.unlockRead(closeReadLock);
} }
} }
@Override @Override
public List<byte[]> multiGetAsList(ReadOptions readOptions, List<byte[]> keys) throws RocksDBException { public List<byte[]> multiGetAsList(RocksObj<ReadOptions> readOptions, List<byte[]> keys) throws RocksDBException {
var closeReadLock = closeLock.readLock(); var closeReadLock = closeLock.readLock();
try { try {
ensureOpen(); ensureOpen();
@ -748,8 +729,8 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
for (byte[] key : keys) { for (byte[] key : keys) {
keyBufferSize.record(key.length); keyBufferSize.record(key.length);
} }
var columnFamilyHandles = new RepeatedElementList<>(cfh, keys.size()); var columnFamilyHandles = new RepeatedElementList<>(cfh.v(), keys.size());
return db.multiGetAsList(readOptions, columnFamilyHandles, keys); return db.multiGetAsList(readOptions.v(), columnFamilyHandles, keys);
} finally { } finally {
closeLock.unlockRead(closeReadLock); closeLock.unlockRead(closeReadLock);
} }
@ -760,31 +741,31 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
var closeReadLock = closeLock.readLock(); var closeReadLock = closeLock.readLock();
try { try {
ensureOpen(); ensureOpen();
db.suggestCompactRange(cfh); db.suggestCompactRange(cfh.v());
} finally { } finally {
closeLock.unlockRead(closeReadLock); closeLock.unlockRead(closeReadLock);
} }
} }
@Override @Override
public void compactRange(byte[] begin, byte[] end, CompactRangeOptions options) throws RocksDBException { public void compactRange(byte[] begin, byte[] end, RocksObj<CompactRangeOptions> options) throws RocksDBException {
var closeReadLock = closeLock.readLock(); var closeReadLock = closeLock.readLock();
try { try {
ensureOpen(); ensureOpen();
ensureOwned(options); ensureOwned(options);
db.compactRange(cfh, begin, end, options); db.compactRange(cfh.v(), begin, end, options.v());
} finally { } finally {
closeLock.unlockRead(closeReadLock); closeLock.unlockRead(closeReadLock);
} }
} }
@Override @Override
public void flush(FlushOptions options) throws RocksDBException { public void flush(RocksObj<FlushOptions> options) throws RocksDBException {
var closeReadLock = closeLock.readLock(); var closeReadLock = closeLock.readLock();
try { try {
ensureOpen(); ensureOpen();
ensureOwned(options); ensureOwned(options);
db.flush(options, cfh); db.flush(options.v(), cfh.v());
} finally { } finally {
closeLock.unlockRead(closeReadLock); closeLock.unlockRead(closeReadLock);
} }
@ -806,20 +787,20 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
var closeReadLock = closeLock.readLock(); var closeReadLock = closeLock.readLock();
try { try {
ensureOpen(); ensureOpen();
return db.getLongProperty(cfh, property); return db.getLongProperty(cfh.v(), property);
} finally { } finally {
closeLock.unlockRead(closeReadLock); closeLock.unlockRead(closeReadLock);
} }
} }
@Override @Override
public void write(WriteOptions writeOptions, WriteBatch writeBatch) throws RocksDBException { public void write(RocksObj<WriteOptions> writeOptions, WriteBatch writeBatch) throws RocksDBException {
var closeReadLock = closeLock.readLock(); var closeReadLock = closeLock.readLock();
try { try {
ensureOpen(); ensureOpen();
ensureOwned(writeOptions); ensureOwned(writeOptions);
ensureOwned(writeBatch); ensureOwned(writeBatch);
db.write(writeOptions, writeBatch); db.write(writeOptions.v(), writeBatch);
} finally { } finally {
closeLock.unlockRead(closeReadLock); closeLock.unlockRead(closeReadLock);
} }
@ -828,13 +809,14 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
/** /**
* @return true if committed successfully * @return true if committed successfully
*/ */
protected abstract boolean commitOptimistically(Transaction tx) throws RocksDBException; protected abstract boolean commitOptimistically(RocksObj<Transaction> tx) throws RocksDBException;
protected abstract Transaction beginTransaction(@NotNull WriteOptions writeOptions); protected abstract RocksObj<Transaction> beginTransaction(@NotNull RocksObj<WriteOptions> writeOptions,
RocksObj<TransactionOptions> txOpts);
@Override @Override
public final @NotNull UpdateAtomicResult updateAtomic(@NotNull ReadOptions readOptions, public final @NotNull UpdateAtomicResult updateAtomic(@NotNull RocksObj<ReadOptions> readOptions,
@NotNull WriteOptions writeOptions, @NotNull RocksObj<WriteOptions> writeOptions,
Buffer key, Buffer key,
BinarySerializationFunction updater, BinarySerializationFunction updater,
UpdateAtomicResultMode returnMode) throws IOException { UpdateAtomicResultMode returnMode) throws IOException {
@ -876,32 +858,66 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
timer.record(duration, TimeUnit.NANOSECONDS); timer.record(duration, TimeUnit.NANOSECONDS);
} }
protected abstract @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull ReadOptions readOptions, protected abstract @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull RocksObj<ReadOptions> readOptions,
@NotNull WriteOptions writeOptions, @NotNull RocksObj<WriteOptions> writeOptions,
Buffer key, Buffer key,
BinarySerializationFunction updater, BinarySerializationFunction updater,
UpdateAtomicResultMode returnMode) throws IOException; UpdateAtomicResultMode returnMode) throws IOException;
@Override @Override
@NotNull @NotNull
public RocksDBIterator newIterator(@NotNull ReadOptions readOptions) { public RocksIteratorObj newIterator(@NotNull RocksObj<ReadOptions> readOptions,
@Nullable Buffer min,
@Nullable Buffer max) {
var closeReadLock = closeLock.readLock(); var closeReadLock = closeLock.readLock();
try { try {
ensureOpen(); ensureOpen();
ensureOwned(readOptions); ensureOwned(readOptions);
var it = db.newIterator(cfh, readOptions); ensureOwned(min);
ensureOwned(max);
RocksObj<? extends AbstractSlice<?>> sliceMin;
RocksObj<? extends AbstractSlice<?>> sliceMax;
if (min != null) {
sliceMin = setIterateBound(nettyDirect, readOptions, IterateBound.LOWER, min);
} else {
sliceMin = null;
}
try { try {
return new RocksDBIterator(it, if (max != null) {
nettyDirect, sliceMax = setIterateBound(nettyDirect, readOptions, IterateBound.UPPER, max);
this.startedIterSeek, } else {
this.endedIterSeek, sliceMax = null;
this.iterSeekTime, }
this.startedIterNext, try {
this.endedIterNext, var it = db.newIterator(cfh.v(), readOptions.v());
this.iterNextTime try {
); return new RocksIteratorObj(it,
sliceMin,
sliceMax,
min,
max,
nettyDirect,
this.startedIterSeek,
this.endedIterSeek,
this.iterSeekTime,
this.startedIterNext,
this.endedIterNext,
this.iterNextTime
);
} catch (Throwable ex) {
it.close();
throw ex;
}
} catch (Throwable ex) {
if (sliceMax != null) {
sliceMax.close();
}
throw ex;
}
} catch (Throwable ex) { } catch (Throwable ex) {
it.close(); if (sliceMin != null) {
sliceMin.close();
}
throw ex; throw ex;
} }
} finally { } finally {
@ -927,7 +943,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
var closeReadLock = closeLock.readLock(); var closeReadLock = closeLock.readLock();
try { try {
ensureOpen(); ensureOpen();
return RocksDBUtils.getLevels(db, cfh); return RocksDBUtils.getLevels(db, cfh.v());
} finally { } finally {
closeLock.unlockRead(closeReadLock); closeLock.unlockRead(closeReadLock);
} }
@ -938,14 +954,14 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
var closeReadLock = closeLock.readLock(); var closeReadLock = closeLock.readLock();
try { try {
ensureOpen(); ensureOpen();
RocksDBUtils.forceCompaction(db, db.getName(), cfh, volumeId, logger); RocksDBUtils.forceCompaction(db, db.getName(), cfh.v(), volumeId, logger);
} finally { } finally {
closeLock.unlockRead(closeReadLock); closeLock.unlockRead(closeReadLock);
} }
} }
@Override @Override
public ColumnFamilyHandle getColumnFamilyHandle() { public RocksObj<ColumnFamilyHandle> getColumnFamilyHandle() {
return cfh; return cfh;
} }

View File

@ -10,6 +10,7 @@ import io.netty5.buffer.api.Send;
import io.netty5.util.internal.PlatformDependent; import io.netty5.util.internal.PlatformDependent;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.RocksDBColumn; import it.cavallium.dbengine.database.disk.RocksDBColumn;
import it.cavallium.dbengine.database.disk.rocksdb.RocksObj;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -27,7 +28,7 @@ public class CappedWriteBatch extends WriteBatch {
private final RocksDBColumn db; private final RocksDBColumn db;
private final BufferAllocator alloc; private final BufferAllocator alloc;
private final int cap; private final int cap;
private final WriteOptions writeOptions; private final RocksObj<WriteOptions> writeOptions;
private final List<Buffer> buffersToRelease; private final List<Buffer> buffersToRelease;
private final List<ByteBuffer> byteBuffersToRelease; private final List<ByteBuffer> byteBuffersToRelease;
@ -41,7 +42,7 @@ public class CappedWriteBatch extends WriteBatch {
int cap, int cap,
int reservedWriteBatchSize, int reservedWriteBatchSize,
long maxWriteBatchSize, long maxWriteBatchSize,
WriteOptions writeOptions) { RocksObj<WriteOptions> writeOptions) {
super(reservedWriteBatchSize); super(reservedWriteBatchSize);
this.db = db; this.db = db;
this.alloc = alloc; this.alloc = alloc;

View File

@ -5,6 +5,7 @@ import static it.cavallium.dbengine.database.disk.LLTempHugePqEnv.getColumnOptio
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry; import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import io.netty5.buffer.api.BufferAllocator; import io.netty5.buffer.api.BufferAllocator;
import it.cavallium.dbengine.database.disk.rocksdb.RocksObj;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.io.Closeable; import java.io.Closeable;
@ -23,18 +24,18 @@ import org.rocksdb.RocksDBException;
public class HugePqEnv implements Closeable { public class HugePqEnv implements Closeable {
private final RocksDB db; private final RocksDB db;
private final ArrayList<ColumnFamilyHandle> defaultCfh; private final ArrayList<RocksObj<ColumnFamilyHandle>> defaultCfh;
private final Int2ObjectMap<ColumnFamilyHandle> cfhs = new Int2ObjectOpenHashMap<>(); private final Int2ObjectMap<RocksObj<ColumnFamilyHandle>> cfhs = new Int2ObjectOpenHashMap<>();
public HugePqEnv(RocksDB db, ArrayList<ColumnFamilyHandle> defaultCfh) { public HugePqEnv(RocksDB db, ArrayList<RocksObj<ColumnFamilyHandle>> defaultCfh) {
this.db = db; this.db = db;
this.defaultCfh = defaultCfh; this.defaultCfh = defaultCfh;
} }
@Override @Override
public void close() throws IOException { public void close() throws IOException {
for (ColumnFamilyHandle cfh : defaultCfh) { for (var cfh : defaultCfh) {
db.destroyColumnFamilyHandle(cfh); db.destroyColumnFamilyHandle(cfh.v());
cfh.close(); cfh.close();
} }
try { try {
@ -45,7 +46,7 @@ public class HugePqEnv implements Closeable {
} }
public int createColumnFamily(int name, AbstractComparator comparator) throws RocksDBException { public int createColumnFamily(int name, AbstractComparator comparator) throws RocksDBException {
var cfh = db.createColumnFamily(new ColumnFamilyDescriptor(Ints.toByteArray(name), getColumnOptions(comparator))); var cfh = new RocksObj<>(db.createColumnFamily(new ColumnFamilyDescriptor(Ints.toByteArray(name), getColumnOptions(comparator))));
synchronized (cfhs) { synchronized (cfhs) {
var prev = cfhs.put(name, cfh); var prev = cfhs.put(name, cfh);
if (prev != null) { if (prev != null) {
@ -56,19 +57,19 @@ public class HugePqEnv implements Closeable {
} }
public void deleteColumnFamily(int db) throws RocksDBException { public void deleteColumnFamily(int db) throws RocksDBException {
ColumnFamilyHandle cfh; RocksObj<ColumnFamilyHandle> cfh;
synchronized (cfhs) { synchronized (cfhs) {
cfh = cfhs.remove(db); cfh = cfhs.remove(db);
} }
if (cfh != null) { if (cfh != null) {
this.db.dropColumnFamily(cfh); this.db.dropColumnFamily(cfh.v());
this.db.destroyColumnFamilyHandle(cfh); this.db.destroyColumnFamilyHandle(cfh.v());
cfh.close(); cfh.close();
} }
} }
public StandardRocksDBColumn openDb(int hugePqId) { public StandardRocksDBColumn openDb(int hugePqId) {
ColumnFamilyHandle cfh; RocksObj<ColumnFamilyHandle> cfh;
synchronized (cfhs) { synchronized (cfhs) {
cfh = Objects.requireNonNull(cfhs.get(hugePqId), () -> "column " + hugePqId + " does not exist"); cfh = Objects.requireNonNull(cfhs.get(hugePqId), () -> "column " + hugePqId + " does not exist");
} }

View File

@ -4,6 +4,7 @@ import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.Send; import io.netty5.buffer.api.Send;
import it.cavallium.dbengine.database.LLEntry; import it.cavallium.dbengine.database.LLEntry;
import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.disk.rocksdb.RocksObj;
import org.rocksdb.ReadOptions; import org.rocksdb.ReadOptions;
public class LLLocalEntryReactiveRocksIterator extends LLLocalReactiveRocksIterator<Send<LLEntry>> { public class LLLocalEntryReactiveRocksIterator extends LLLocalReactiveRocksIterator<Send<LLEntry>> {
@ -11,7 +12,7 @@ public class LLLocalEntryReactiveRocksIterator extends LLLocalReactiveRocksItera
public LLLocalEntryReactiveRocksIterator(RocksDBColumn db, public LLLocalEntryReactiveRocksIterator(RocksDBColumn db,
Send<LLRange> range, Send<LLRange> range,
boolean allowNettyDirect, boolean allowNettyDirect,
ReadOptions readOptions, RocksObj<ReadOptions> readOptions,
boolean reverse, boolean reverse,
boolean smallRange) { boolean smallRange) {
super(db, range, allowNettyDirect, readOptions, true, reverse, smallRange); super(db, range, allowNettyDirect, readOptions, true, reverse, smallRange);

View File

@ -4,6 +4,7 @@ import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.Send; import io.netty5.buffer.api.Send;
import it.cavallium.dbengine.database.LLEntry; import it.cavallium.dbengine.database.LLEntry;
import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.disk.rocksdb.RocksObj;
import org.rocksdb.ReadOptions; import org.rocksdb.ReadOptions;
public class LLLocalGroupedEntryReactiveRocksIterator extends public class LLLocalGroupedEntryReactiveRocksIterator extends
@ -13,7 +14,7 @@ public class LLLocalGroupedEntryReactiveRocksIterator extends
int prefixLength, int prefixLength,
Send<LLRange> range, Send<LLRange> range,
boolean allowNettyDirect, boolean allowNettyDirect,
ReadOptions readOptions, RocksObj<ReadOptions> readOptions,
boolean smallRange) { boolean smallRange) {
super(db, prefixLength, range, allowNettyDirect, readOptions, false, true, smallRange); super(db, prefixLength, range, allowNettyDirect, readOptions, false, true, smallRange);
} }

View File

@ -3,6 +3,7 @@ package it.cavallium.dbengine.database.disk;
import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.Send; import io.netty5.buffer.api.Send;
import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.disk.rocksdb.RocksObj;
import org.rocksdb.ReadOptions; import org.rocksdb.ReadOptions;
public class LLLocalGroupedKeyReactiveRocksIterator extends LLLocalGroupedReactiveRocksIterator<Send<Buffer>> { public class LLLocalGroupedKeyReactiveRocksIterator extends LLLocalGroupedReactiveRocksIterator<Send<Buffer>> {
@ -11,7 +12,7 @@ public class LLLocalGroupedKeyReactiveRocksIterator extends LLLocalGroupedReacti
int prefixLength, int prefixLength,
Send<LLRange> range, Send<LLRange> range,
boolean allowNettyDirect, boolean allowNettyDirect,
ReadOptions readOptions, RocksObj<ReadOptions> readOptions,
boolean smallRange) { boolean smallRange) {
super(db, prefixLength, range, allowNettyDirect, readOptions, true, false, smallRange); super(db, prefixLength, range, allowNettyDirect, readOptions, true, false, smallRange);
} }

View File

@ -11,6 +11,7 @@ import io.netty5.buffer.api.Send;
import io.netty5.buffer.api.internal.ResourceSupport; import io.netty5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.rocksdb.RocksObj;
import it.unimi.dsi.fastutil.objects.ObjectArrayList; import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.util.List; import java.util.List;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
@ -58,7 +59,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> extends
private final int prefixLength; private final int prefixLength;
private LLRange range; private LLRange range;
private final boolean allowNettyDirect; private final boolean allowNettyDirect;
private ReadOptions readOptions; private RocksObj<ReadOptions> readOptions;
private final boolean canFillCache; private final boolean canFillCache;
private final boolean readValues; private final boolean readValues;
private final boolean smallRange; private final boolean smallRange;
@ -68,7 +69,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> extends
int prefixLength, int prefixLength,
Send<LLRange> range, Send<LLRange> range,
boolean allowNettyDirect, boolean allowNettyDirect,
ReadOptions readOptions, RocksObj<ReadOptions> readOptions,
boolean canFillCache, boolean canFillCache,
boolean readValues, boolean readValues,
boolean smallRange) { boolean smallRange) {
@ -78,7 +79,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> extends
this.prefixLength = prefixLength; this.prefixLength = prefixLength;
this.range = range.receive(); this.range = range.receive();
this.allowNettyDirect = allowNettyDirect; this.allowNettyDirect = allowNettyDirect;
this.readOptions = readOptions != null ? readOptions : new ReadOptions(); this.readOptions = readOptions != null ? readOptions : new RocksObj<>(new ReadOptions());
this.canFillCache = canFillCache; this.canFillCache = canFillCache;
this.readValues = readValues; this.readValues = readValues;
this.smallRange = smallRange; this.smallRange = smallRange;
@ -94,7 +95,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> extends
return new RocksIterWithReadOpts(readOptions, db.newRocksIterator(allowNettyDirect, readOptions, range, false)); return new RocksIterWithReadOpts(readOptions, db.newRocksIterator(allowNettyDirect, readOptions, range, false));
}, (tuple, sink) -> { }, (tuple, sink) -> {
try { try {
var rocksIterator = tuple.iter().iterator(); var rocksIterator = tuple.iter();
ObjectArrayList<T> values = new ObjectArrayList<>(); ObjectArrayList<T> values = new ObjectArrayList<>();
Buffer firstGroupKey = null; Buffer firstGroupKey = null;
try { try {

View File

@ -11,6 +11,7 @@ import io.netty5.buffer.api.Send;
import io.netty5.buffer.api.internal.ResourceSupport; import io.netty5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.rocksdb.RocksObj;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.rocksdb.ReadOptions; import org.rocksdb.ReadOptions;
@ -56,7 +57,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends
private final int prefixLength; private final int prefixLength;
private LLRange rangeShared; private LLRange rangeShared;
private final boolean allowNettyDirect; private final boolean allowNettyDirect;
private ReadOptions readOptions; private RocksObj<ReadOptions> readOptions;
private final boolean canFillCache; private final boolean canFillCache;
private final boolean smallRange; private final boolean smallRange;
@ -64,7 +65,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends
int prefixLength, int prefixLength,
Send<LLRange> range, Send<LLRange> range,
boolean allowNettyDirect, boolean allowNettyDirect,
ReadOptions readOptions, RocksObj<ReadOptions> readOptions,
boolean canFillCache, boolean canFillCache,
boolean smallRange) { boolean smallRange) {
super(DROP); super(DROP);
@ -73,7 +74,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends
this.prefixLength = prefixLength; this.prefixLength = prefixLength;
this.rangeShared = range.receive(); this.rangeShared = range.receive();
this.allowNettyDirect = allowNettyDirect; this.allowNettyDirect = allowNettyDirect;
this.readOptions = readOptions != null ? readOptions : new ReadOptions(); this.readOptions = readOptions != null ? readOptions : new RocksObj<>(new ReadOptions());
this.canFillCache = canFillCache; this.canFillCache = canFillCache;
this.smallRange = smallRange; this.smallRange = smallRange;
} }
@ -93,7 +94,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends
return new RocksIterWithReadOpts(readOptions, db.newRocksIterator(allowNettyDirect, readOptions, rangeShared, false)); return new RocksIterWithReadOpts(readOptions, db.newRocksIterator(allowNettyDirect, readOptions, rangeShared, false));
}, (tuple, sink) -> { }, (tuple, sink) -> {
try { try {
var rocksIterator = tuple.iter().iterator(); var rocksIterator = tuple.iter();
Buffer firstGroupKey = null; Buffer firstGroupKey = null;
try { try {
while (rocksIterator.isValid()) { while (rocksIterator.isValid()) {

View File

@ -3,6 +3,7 @@ package it.cavallium.dbengine.database.disk;
import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.Send; import io.netty5.buffer.api.Send;
import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.disk.rocksdb.RocksObj;
import org.rocksdb.ReadOptions; import org.rocksdb.ReadOptions;
public class LLLocalKeyReactiveRocksIterator extends LLLocalReactiveRocksIterator<Send<Buffer>> { public class LLLocalKeyReactiveRocksIterator extends LLLocalReactiveRocksIterator<Send<Buffer>> {
@ -10,7 +11,7 @@ public class LLLocalKeyReactiveRocksIterator extends LLLocalReactiveRocksIterato
public LLLocalKeyReactiveRocksIterator(RocksDBColumn db, public LLLocalKeyReactiveRocksIterator(RocksDBColumn db,
Send<LLRange> range, Send<LLRange> range,
boolean allowNettyDirect, boolean allowNettyDirect,
ReadOptions readOptions, RocksObj<ReadOptions> readOptions,
boolean reverse, boolean reverse,
boolean smallRange) { boolean smallRange) {
super(db, range, allowNettyDirect, readOptions, false, reverse, smallRange); super(db, range, allowNettyDirect, readOptions, false, reverse, smallRange);

View File

@ -9,6 +9,7 @@ import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.Timer;
import io.netty5.buffer.api.BufferAllocator; import io.netty5.buffer.api.BufferAllocator;
import io.netty5.buffer.api.internal.ResourceSupport;
import io.netty5.util.internal.PlatformDependent; import io.netty5.util.internal.PlatformDependent;
import it.cavallium.data.generator.nativedata.NullableString; import it.cavallium.data.generator.nativedata.NullableString;
import it.cavallium.dbengine.client.MemoryStats; import it.cavallium.dbengine.client.MemoryStats;
@ -22,6 +23,7 @@ import it.cavallium.dbengine.database.RocksDBMapProperty;
import it.cavallium.dbengine.database.RocksDBStringProperty; import it.cavallium.dbengine.database.RocksDBStringProperty;
import it.cavallium.dbengine.database.TableWithProperties; import it.cavallium.dbengine.database.TableWithProperties;
import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.disk.rocksdb.RocksObj;
import it.cavallium.dbengine.rpc.current.data.Column; import it.cavallium.dbengine.rpc.current.data.Column;
import it.cavallium.dbengine.rpc.current.data.ColumnOptions; import it.cavallium.dbengine.rpc.current.data.ColumnOptions;
import it.cavallium.dbengine.rpc.current.data.DatabaseLevel; import it.cavallium.dbengine.rpc.current.data.DatabaseLevel;
@ -123,10 +125,10 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
private Statistics statistics; private Statistics statistics;
private Cache standardCache; private Cache standardCache;
private Cache compressedCache; private Cache compressedCache;
private final Map<Column, ColumnFamilyHandle> handles; private final Map<Column, RocksObj<ColumnFamilyHandle>> handles;
private final HashMap<String, PersistentCache> persistentCaches; private final HashMap<String, PersistentCache> persistentCaches;
private final ConcurrentHashMap<Long, Snapshot> snapshotsHandles = new ConcurrentHashMap<>(); private final ConcurrentHashMap<Long, RocksObj<Snapshot>> snapshotsHandles = new ConcurrentHashMap<>();
private final AtomicLong nextSnapshotNumbers = new AtomicLong(1); private final AtomicLong nextSnapshotNumbers = new AtomicLong(1);
private final StampedLock closeLock = new StampedLock(); private final StampedLock closeLock = new StampedLock();
private volatile boolean closed = false; private volatile boolean closed = false;
@ -465,13 +467,13 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
this.handles = new HashMap<>(); this.handles = new HashMap<>();
if (enableColumnsBug && !inMemory) { if (enableColumnsBug && !inMemory) {
for (int i = 0; i < columns.size(); i++) { for (int i = 0; i < columns.size(); i++) {
this.handles.put(columns.get(i), handles.get(i)); this.handles.put(columns.get(i), new RocksObj<>(handles.get(i)));
} }
} else { } else {
handles: for (ColumnFamilyHandle handle : handles) { handles: for (ColumnFamilyHandle handle : handles) {
for (Column column : columns) { for (Column column : columns) {
if (Arrays.equals(column.name().getBytes(StandardCharsets.US_ASCII), handle.getName())) { if (Arrays.equals(column.name().getBytes(StandardCharsets.US_ASCII), handle.getName())) {
this.handles.put(column, handle); this.handles.put(column, new RocksObj<>(handle));
continue handles; continue handles;
} }
} }
@ -529,6 +531,10 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
RocksDBUtils.ensureOwned(rocksObject); RocksDBUtils.ensureOwned(rocksObject);
} }
protected void ensureOwned(RocksObj<?> rocksObject) {
RocksDBUtils.ensureOwned(rocksObject);
}
private synchronized PersistentCache resolvePersistentCache(HashMap<String, PersistentCache> caches, private synchronized PersistentCache resolvePersistentCache(HashMap<String, PersistentCache> caches,
DBOptions rocksdbOptions, DBOptions rocksdbOptions,
List<it.cavallium.dbengine.rpc.current.data.PersistentCache> persistentCaches, List<it.cavallium.dbengine.rpc.current.data.PersistentCache> persistentCaches,
@ -565,7 +571,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
throw new IllegalArgumentException("Persistent cache " + persistentCacheId.get() + " is not defined"); throw new IllegalArgumentException("Persistent cache " + persistentCacheId.get() + " is not defined");
} }
public Map<Column, ColumnFamilyHandle> getAllColumnFamilyHandles() { public Map<Column, RocksObj<ColumnFamilyHandle>> getAllColumnFamilyHandles() {
return this.handles; return this.handles;
} }
@ -580,7 +586,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
ensureOpen(); ensureOpen();
var cfh = handles.get(column); var cfh = handles.get(column);
ensureOwned(cfh); ensureOwned(cfh);
return RocksDBUtils.getLevels(db, cfh); return RocksDBUtils.getLevels(db, cfh.v());
} finally { } finally {
closeLock.unlockRead(closeReadLock); closeLock.unlockRead(closeReadLock);
} }
@ -592,7 +598,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
ensureOpen(); ensureOpen();
var cfh = handles.get(column); var cfh = handles.get(column);
ensureOwned(cfh); ensureOwned(cfh);
return RocksDBUtils.getColumnFiles(db, cfh, excludeLastLevel); return RocksDBUtils.getColumnFiles(db, cfh.v(), excludeLastLevel);
} finally { } finally {
closeLock.unlockRead(closeReadLock); closeLock.unlockRead(closeReadLock);
} }
@ -604,7 +610,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
ensureOpen(); ensureOpen();
for (var cfh : this.handles.values()) { for (var cfh : this.handles.values()) {
ensureOwned(cfh); ensureOwned(cfh);
RocksDBUtils.forceCompaction(db, name, cfh, volumeId, logger); RocksDBUtils.forceCompaction(db, name, cfh.v(), volumeId, logger);
} }
} finally { } finally {
closeLock.unlockRead(closeReadLock); closeLock.unlockRead(closeReadLock);
@ -637,7 +643,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
private void registerGauge(MeterRegistry meterRegistry, String name, String propertyName, boolean divideByAllColumns) { private void registerGauge(MeterRegistry meterRegistry, String name, String propertyName, boolean divideByAllColumns) {
if (divideByAllColumns) { if (divideByAllColumns) {
for (Entry<Column, ColumnFamilyHandle> cfhEntry : handles.entrySet()) { for (var cfhEntry : handles.entrySet()) {
var columnName = cfhEntry.getKey().name(); var columnName = cfhEntry.getKey().name();
var cfh = cfhEntry.getValue(); var cfh = cfhEntry.getValue();
meterRegistry.gauge("rocksdb.property.value", meterRegistry.gauge("rocksdb.property.value",
@ -652,7 +658,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
if (closed) { if (closed) {
return 0d; return 0d;
} }
return database.getLongProperty(cfh, propertyName); return database.getLongProperty(cfh.v(), propertyName);
} catch (RocksDBException e) { } catch (RocksDBException e) {
if ("NotFound".equals(e.getMessage())) { if ("NotFound".equals(e.getMessage())) {
return 0d; return 0d;
@ -715,7 +721,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
snapshotsHandles.forEach((id, snapshot) -> { snapshotsHandles.forEach((id, snapshot) -> {
try { try {
if (db.isOwningHandle()) { if (db.isOwningHandle()) {
db.releaseSnapshot(snapshot); db.releaseSnapshot(snapshot.v());
snapshot.close();
} }
} catch (Exception ex2) { } catch (Exception ex2) {
// ignore exception // ignore exception
@ -1026,7 +1033,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
return stats; return stats;
} }
private Snapshot getSnapshotLambda(LLSnapshot snapshot) { private RocksObj<Snapshot> getSnapshotLambda(LLSnapshot snapshot) {
var closeReadSnapLock = closeLock.readLock(); var closeReadSnapLock = closeLock.readLock();
try { try {
ensureOpen(); ensureOpen();
@ -1078,8 +1085,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
name, name,
ColumnUtils.toString(columnName), ColumnUtils.toString(columnName),
dbWScheduler, dbWScheduler,
dbRScheduler, dbRScheduler, snapshot -> getSnapshotLambda(snapshot),
this::getSnapshotLambda,
updateMode, updateMode,
databaseOptions databaseOptions
); );
@ -1093,7 +1099,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
var closeReadLock = closeLock.readLock(); var closeReadLock = closeLock.readLock();
try { try {
ensureOpen(); ensureOpen();
ColumnFamilyHandle cfh; RocksObj<ColumnFamilyHandle> cfh;
try { try {
cfh = getCfh(columnName); cfh = getCfh(columnName);
ensureOwned(cfh); ensureOwned(cfh);
@ -1106,7 +1112,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
} }
} }
private RocksDBColumn getRocksDBColumn(RocksDB db, ColumnFamilyHandle cfh) { private RocksDBColumn getRocksDBColumn(RocksDB db, RocksObj<ColumnFamilyHandle> cfh) {
var nettyDirect = databaseOptions.allowNettyDirect(); var nettyDirect = databaseOptions.allowNettyDirect();
var closeLock = getCloseLock(); var closeLock = getCloseLock();
if (db instanceof OptimisticTransactionDB optimisticTransactionDB) { if (db instanceof OptimisticTransactionDB optimisticTransactionDB) {
@ -1132,9 +1138,9 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
} }
} }
private ColumnFamilyHandle getCfh(byte[] columnName) throws RocksDBException { private RocksObj<ColumnFamilyHandle> getCfh(byte[] columnName) throws RocksDBException {
ColumnFamilyHandle cfh = handles.get(ColumnUtils.special(ColumnUtils.toString(columnName))); var cfh = handles.get(ColumnUtils.special(ColumnUtils.toString(columnName)));
assert enableColumnsBug || Arrays.equals(cfh.getName(), columnName); assert enableColumnsBug || Arrays.equals(cfh.v().getName(), columnName);
return cfh; return cfh;
} }
@ -1233,7 +1239,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
return db.getMapProperty(property.getName()); return db.getMapProperty(property.getName());
} else { } else {
var cfh = requireNonNull(handles.get(column)); var cfh = requireNonNull(handles.get(column));
return db.getMapProperty(cfh, property.getName()); return db.getMapProperty(cfh.v(), property.getName());
} }
} finally { } finally {
closeLock.unlockRead(closeReadLock); closeLock.unlockRead(closeReadLock);
@ -1264,7 +1270,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
return db.getProperty(property.getName()); return db.getProperty(property.getName());
} else { } else {
var cfh = requireNonNull(handles.get(column)); var cfh = requireNonNull(handles.get(column));
return db.getProperty(cfh, property.getName()); return db.getProperty(cfh.v(), property.getName());
} }
} finally { } finally {
closeLock.unlockRead(closeReadLock); closeLock.unlockRead(closeReadLock);
@ -1295,7 +1301,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
return db.getLongProperty(property.getName()); return db.getLongProperty(property.getName());
} else { } else {
var cfh = requireNonNull(handles.get(column)); var cfh = requireNonNull(handles.get(column));
return db.getLongProperty(cfh, property.getName()); return db.getLongProperty(cfh.v(), property.getName());
} }
} finally { } finally {
closeLock.unlockRead(closeReadLock); closeLock.unlockRead(closeReadLock);
@ -1356,7 +1362,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
aggregatedStats aggregatedStats
.append(entry.getKey().name()) .append(entry.getKey().name())
.append("\n") .append("\n")
.append(db.getProperty(entry.getValue(), "rocksdb.stats")) .append(db.getProperty(entry.getValue().v(), "rocksdb.stats"))
.append("\n"); .append("\n");
} }
return aggregatedStats.toString(); return aggregatedStats.toString();
@ -1382,7 +1388,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
try { try {
if (closed) return null; if (closed) return null;
ensureOpen(); ensureOpen();
return db.getPropertiesOfAllTables(handle.getValue()); return db.getPropertiesOfAllTables(handle.getValue().v());
} finally { } finally {
closeLock.unlockRead(closeReadLock); closeLock.unlockRead(closeReadLock);
} }
@ -1447,7 +1453,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
try { try {
ensureOpen(); ensureOpen();
return snapshotTime.recordCallable(() -> { return snapshotTime.recordCallable(() -> {
var snapshot = db.getSnapshot(); var snapshot = new RocksObj<>(db.getSnapshot());
long currentSnapshotSequenceNumber = nextSnapshotNumbers.getAndIncrement(); long currentSnapshotSequenceNumber = nextSnapshotNumbers.getAndIncrement();
this.snapshotsHandles.put(currentSnapshotSequenceNumber, snapshot); this.snapshotsHandles.put(currentSnapshotSequenceNumber, snapshot);
return new LLSnapshot(currentSnapshotSequenceNumber); return new LLSnapshot(currentSnapshotSequenceNumber);
@ -1463,15 +1469,14 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
return Mono return Mono
.<Void>fromCallable(() -> { .<Void>fromCallable(() -> {
var closeReadLock = closeLock.readLock(); var closeReadLock = closeLock.readLock();
try { try (var dbSnapshot = this.snapshotsHandles.remove(snapshot.getSequenceNumber())) {
Snapshot dbSnapshot = this.snapshotsHandles.remove(snapshot.getSequenceNumber());
if (dbSnapshot == null) { if (dbSnapshot == null) {
throw new IOException("Snapshot " + snapshot.getSequenceNumber() + " not found!"); throw new IOException("Snapshot " + snapshot.getSequenceNumber() + " not found!");
} }
if (!db.isOwningHandle()) { if (!db.isOwningHandle()) {
return null; return null;
} }
db.releaseSnapshot(dbSnapshot); db.releaseSnapshot(dbSnapshot.v());
return null; return null;
} finally { } finally {
closeLock.unlockRead(closeReadLock); closeLock.unlockRead(closeReadLock);
@ -1489,7 +1494,13 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
statistics = null; statistics = null;
} }
try { try {
flushAndCloseDb(db, standardCache, compressedCache, new ArrayList<>(handles.values())); flushAndCloseDb(db,
standardCache,
compressedCache,
new ArrayList<>(handles.values().stream().map(RocksObj::v).toList())
);
handles.values().forEach(ResourceSupport::close);
handles.clear();
deleteUnusedOldLogFiles(); deleteUnusedOldLogFiles();
} catch (RocksDBException e) { } catch (RocksDBException e) {
throw new IOException(e); throw new IOException(e);

View File

@ -7,6 +7,7 @@ import io.netty5.buffer.api.Owned;
import io.netty5.buffer.api.Send; import io.netty5.buffer.api.Send;
import io.netty5.buffer.api.internal.ResourceSupport; import io.netty5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.disk.rocksdb.RocksObj;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.rocksdb.ReadOptions; import org.rocksdb.ReadOptions;
@ -17,7 +18,7 @@ import reactor.util.function.Tuples;
public final class LLLocalMigrationReactiveRocksIterator extends public final class LLLocalMigrationReactiveRocksIterator extends
ResourceSupport<LLLocalMigrationReactiveRocksIterator, LLLocalMigrationReactiveRocksIterator> { ResourceSupport<LLLocalMigrationReactiveRocksIterator, LLLocalMigrationReactiveRocksIterator> {
protected static final Logger logger = LogManager.getLogger(LLLocalMigrationReactiveRocksIterator.class); private static final Logger logger = LogManager.getLogger(LLLocalMigrationReactiveRocksIterator.class);
private static final Drop<LLLocalMigrationReactiveRocksIterator> DROP = new Drop<>() { private static final Drop<LLLocalMigrationReactiveRocksIterator> DROP = new Drop<>() {
@Override @Override
public void drop(LLLocalMigrationReactiveRocksIterator obj) { public void drop(LLLocalMigrationReactiveRocksIterator obj) {
@ -50,17 +51,17 @@ public final class LLLocalMigrationReactiveRocksIterator extends
private final RocksDBColumn db; private final RocksDBColumn db;
private LLRange rangeShared; private LLRange rangeShared;
private ReadOptions readOptions; private RocksObj<ReadOptions> readOptions;
@SuppressWarnings({"unchecked", "rawtypes"}) @SuppressWarnings({"unchecked", "rawtypes"})
public LLLocalMigrationReactiveRocksIterator(RocksDBColumn db, public LLLocalMigrationReactiveRocksIterator(RocksDBColumn db,
Send<LLRange> range, Send<LLRange> range,
ReadOptions readOptions) { Send<RocksObj<ReadOptions>> readOptions) {
super((Drop<LLLocalMigrationReactiveRocksIterator>) (Drop) DROP); super((Drop<LLLocalMigrationReactiveRocksIterator>) (Drop) DROP);
try (range) { try (range) {
this.db = db; this.db = db;
this.rangeShared = range.receive(); this.rangeShared = range.receive();
this.readOptions = readOptions; this.readOptions = readOptions.receive();
} }
} }
@ -72,8 +73,7 @@ public final class LLLocalMigrationReactiveRocksIterator extends
return new RocksIterWithReadOpts(readOptions, db.newRocksIterator(false, readOptions, rangeShared, false)); return new RocksIterWithReadOpts(readOptions, db.newRocksIterator(false, readOptions, rangeShared, false));
}, (tuple, sink) -> { }, (tuple, sink) -> {
try { try {
//noinspection resource var rocksIterator = tuple.iter();
var rocksIterator = tuple.iter().iterator();
if (rocksIterator.isValid()) { if (rocksIterator.isValid()) {
byte[] key = rocksIterator.key(); byte[] key = rocksIterator.key();
byte[] value = rocksIterator.value(); byte[] value = rocksIterator.value();
@ -97,7 +97,7 @@ public final class LLLocalMigrationReactiveRocksIterator extends
@Override @Override
protected Owned<LLLocalMigrationReactiveRocksIterator> prepareSend() { protected Owned<LLLocalMigrationReactiveRocksIterator> prepareSend() {
var range = this.rangeShared.send(); var range = this.rangeShared.send();
var readOptions = this.readOptions; var readOptions = this.readOptions.send();
return drop -> new LLLocalMigrationReactiveRocksIterator(db, return drop -> new LLLocalMigrationReactiveRocksIterator(db,
range, range,
readOptions readOptions

View File

@ -11,6 +11,7 @@ import io.netty5.buffer.api.Send;
import io.netty5.buffer.api.internal.ResourceSupport; import io.netty5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.rocksdb.RocksObj;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
@ -56,7 +57,7 @@ public abstract class LLLocalReactiveRocksIterator<T> extends
private final RocksDBColumn db; private final RocksDBColumn db;
private LLRange rangeShared; private LLRange rangeShared;
private final boolean allowNettyDirect; private final boolean allowNettyDirect;
private ReadOptions readOptions; private RocksObj<ReadOptions> readOptions;
private final boolean readValues; private final boolean readValues;
private final boolean reverse; private final boolean reverse;
private final boolean smallRange; private final boolean smallRange;
@ -65,7 +66,7 @@ public abstract class LLLocalReactiveRocksIterator<T> extends
public LLLocalReactiveRocksIterator(RocksDBColumn db, public LLLocalReactiveRocksIterator(RocksDBColumn db,
Send<LLRange> range, Send<LLRange> range,
boolean allowNettyDirect, boolean allowNettyDirect,
ReadOptions readOptions, RocksObj<ReadOptions> readOptions,
boolean readValues, boolean readValues,
boolean reverse, boolean reverse,
boolean smallRange) { boolean smallRange) {
@ -90,7 +91,7 @@ public abstract class LLLocalReactiveRocksIterator<T> extends
return new RocksIterWithReadOpts(readOptions, db.newRocksIterator(allowNettyDirect, readOptions, rangeShared, reverse)); return new RocksIterWithReadOpts(readOptions, db.newRocksIterator(allowNettyDirect, readOptions, rangeShared, reverse));
}, (tuple, sink) -> { }, (tuple, sink) -> {
try { try {
var rocksIterator = tuple.iter().iterator(); var rocksIterator = tuple.iter();
if (rocksIterator.isValid()) { if (rocksIterator.isValid()) {
Buffer key; Buffer key;
if (allowNettyDirect) { if (allowNettyDirect) {

View File

@ -10,6 +10,7 @@ import it.cavallium.dbengine.database.LLSingleton;
import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.disk.rocksdb.RocksObj;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -25,11 +26,8 @@ import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
public class LLLocalSingleton implements LLSingleton { public class LLLocalSingleton implements LLSingleton {
private static final ReadOptions EMPTY_READ_OPTIONS = new ReadOptions();
private static final WriteOptions EMPTY_WRITE_OPTIONS = new WriteOptions();
private final RocksDBColumn db; private final RocksDBColumn db;
private final Function<LLSnapshot, Snapshot> snapshotResolver; private final Function<LLSnapshot, RocksObj<Snapshot>> snapshotResolver;
private final byte[] name; private final byte[] name;
private final String columnName; private final String columnName;
private final Mono<Send<Buffer>> nameMono; private final Mono<Send<Buffer>> nameMono;
@ -38,7 +36,7 @@ public class LLLocalSingleton implements LLSingleton {
private final Scheduler dbRScheduler; private final Scheduler dbRScheduler;
public LLLocalSingleton(RocksDBColumn db, public LLLocalSingleton(RocksDBColumn db,
Function<LLSnapshot, Snapshot> snapshotResolver, Function<LLSnapshot, RocksObj<Snapshot>> snapshotResolver,
String databaseName, String databaseName,
byte[] name, byte[] name,
String columnName, String columnName,
@ -62,8 +60,11 @@ public class LLLocalSingleton implements LLSingleton {
if (Schedulers.isInNonBlockingThread()) { if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Initialized in a nonblocking thread"); throw new UnsupportedOperationException("Initialized in a nonblocking thread");
} }
if (defaultValue != null && db.get(EMPTY_READ_OPTIONS, this.name, true) == null) { try (var readOptions = new RocksObj<>(new ReadOptions());
db.put(EMPTY_WRITE_OPTIONS, this.name, defaultValue); var writeOptions = new RocksObj<>(new WriteOptions())) {
if (defaultValue != null && db.get(readOptions, this.name, true) == null) {
db.put(writeOptions, this.name, defaultValue);
}
} }
} }
@ -71,13 +72,11 @@ public class LLLocalSingleton implements LLSingleton {
return Mono.fromCallable(callable).subscribeOn(write ? dbWScheduler : dbRScheduler); return Mono.fromCallable(callable).subscribeOn(write ? dbWScheduler : dbRScheduler);
} }
private ReadOptions generateReadOptions(LLSnapshot snapshot, boolean orStaticOpts) { private RocksObj<ReadOptions> generateReadOptions(LLSnapshot snapshot) {
if (snapshot != null) { if (snapshot != null) {
return new ReadOptions().setSnapshot(snapshotResolver.apply(snapshot)); return new RocksObj<>(new ReadOptions().setSnapshot(snapshotResolver.apply(snapshot).v()));
} else if (orStaticOpts) {
return EMPTY_READ_OPTIONS;
} else { } else {
return null; return new RocksObj<>(new ReadOptions());
} }
} }
@ -91,13 +90,8 @@ public class LLLocalSingleton implements LLSingleton {
return nameMono.publishOn(dbRScheduler).handle((nameSend, sink) -> { return nameMono.publishOn(dbRScheduler).handle((nameSend, sink) -> {
try (Buffer name = nameSend.receive()) { try (Buffer name = nameSend.receive()) {
Buffer result; Buffer result;
var readOptions = generateReadOptions(snapshot, true); try (var readOptions = generateReadOptions(snapshot)) {
try {
result = db.get(readOptions, name); result = db.get(readOptions, name);
} finally {
if (readOptions != EMPTY_READ_OPTIONS) {
readOptions.close();
}
} }
if (result != null) { if (result != null) {
sink.next(result.send()); sink.next(result.send());
@ -115,11 +109,11 @@ public class LLLocalSingleton implements LLSingleton {
return Mono.zip(nameMono, valueMono).publishOn(dbWScheduler).handle((tuple, sink) -> { return Mono.zip(nameMono, valueMono).publishOn(dbWScheduler).handle((tuple, sink) -> {
var nameSend = tuple.getT1(); var nameSend = tuple.getT1();
var valueSend = tuple.getT2(); var valueSend = tuple.getT2();
try (Buffer name = nameSend.receive()) { try (Buffer name = nameSend.receive();
try (Buffer value = valueSend.receive()) { Buffer value = valueSend.receive();
db.put(EMPTY_WRITE_OPTIONS, name, value); var writeOptions = new RocksObj<>(new WriteOptions())) {
sink.next(true); db.put(writeOptions, name, value);
} sink.next(true);
} catch (RocksDBException ex) { } catch (RocksDBException ex) {
sink.error(new IOException("Failed to write " + Arrays.toString(name), ex)); sink.error(new IOException("Failed to write " + Arrays.toString(name), ex));
} }
@ -128,8 +122,8 @@ public class LLLocalSingleton implements LLSingleton {
private Mono<Void> unset() { private Mono<Void> unset() {
return nameMono.publishOn(dbWScheduler).handle((nameSend, sink) -> { return nameMono.publishOn(dbWScheduler).handle((nameSend, sink) -> {
try (Buffer name = nameSend.receive()) { try (Buffer name = nameSend.receive(); var writeOptions = new RocksObj<>(new WriteOptions())) {
db.delete(EMPTY_WRITE_OPTIONS, name); db.delete(writeOptions, name);
} catch (RocksDBException ex) { } catch (RocksDBException ex) {
sink.error(new IOException("Failed to read " + Arrays.toString(name), ex)); sink.error(new IOException("Failed to read " + Arrays.toString(name), ex));
} }
@ -149,8 +143,10 @@ public class LLLocalSingleton implements LLSingleton {
case GET_OLD_VALUE -> UpdateAtomicResultMode.PREVIOUS; case GET_OLD_VALUE -> UpdateAtomicResultMode.PREVIOUS;
}; };
UpdateAtomicResult result; UpdateAtomicResult result;
try (var key = keySend.receive()) { try (var key = keySend.receive();
result = db.updateAtomic(EMPTY_READ_OPTIONS, EMPTY_WRITE_OPTIONS, key, updater, returnMode); var readOptions = new RocksObj<>(new ReadOptions());
var writeOptions = new RocksObj<>(new WriteOptions())) {
result = db.updateAtomic(readOptions, writeOptions, key, updater, returnMode);
} }
return switch (updateReturnMode) { return switch (updateReturnMode) {
case NOTHING -> null; case NOTHING -> null;
@ -168,8 +164,10 @@ public class LLLocalSingleton implements LLSingleton {
throw new UnsupportedOperationException("Called update in a nonblocking thread"); throw new UnsupportedOperationException("Called update in a nonblocking thread");
} }
UpdateAtomicResult result; UpdateAtomicResult result;
try (var key = keySend.receive()) { try (var key = keySend.receive();
result = db.updateAtomic(EMPTY_READ_OPTIONS, EMPTY_WRITE_OPTIONS, key, updater, DELTA); var readOptions = new RocksObj<>(new ReadOptions());
var writeOptions = new RocksObj<>(new WriteOptions())) {
result = db.updateAtomic(readOptions, writeOptions, key, updater, DELTA);
} }
return ((UpdateAtomicResultDelta) result).delta(); return ((UpdateAtomicResultDelta) result).delta();
}).onErrorMap(cause -> new IOException("Failed to read or write", cause)), }).onErrorMap(cause -> new IOException("Failed to read or write", cause)),

View File

@ -1,5 +1,6 @@
package it.cavallium.dbengine.database.disk; package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.disk.rocksdb.RocksObj;
import java.io.Closeable; import java.io.Closeable;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -58,11 +59,16 @@ public class LLTempHugePqEnv implements Closeable {
var cfh = new ArrayList<ColumnFamilyHandle>(); var cfh = new ArrayList<ColumnFamilyHandle>();
nextColumnName = new AtomicInteger(0); nextColumnName = new AtomicInteger(0);
env = new HugePqEnv(RocksDB.open(opts, var db = RocksDB.open(opts,
tempDirectory.toString(), tempDirectory.toString(),
List.of(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, getColumnOptions(null))), List.of(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, getColumnOptions(null))),
cfh cfh
), cfh); );
var cfhObjs = new ArrayList<RocksObj<ColumnFamilyHandle>>(cfh.size());
for (ColumnFamilyHandle columnFamilyHandle : cfh) {
cfhObjs.add(new RocksObj<>(columnFamilyHandle));
}
env = new HugePqEnv(db, cfhObjs);
initialized = true; initialized = true;
} catch (RocksDBException | IOException e) { } catch (RocksDBException | IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);

View File

@ -10,6 +10,7 @@ import io.netty5.buffer.api.MemoryManager;
import io.netty5.buffer.api.Send; import io.netty5.buffer.api.Send;
import it.cavallium.dbengine.database.LLDelta; import it.cavallium.dbengine.database.LLDelta;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.rocksdb.RocksObj;
import it.cavallium.dbengine.lucene.ExponentialPageLimits; import it.cavallium.dbengine.lucene.ExponentialPageLimits;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
@ -24,6 +25,7 @@ import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException; import org.rocksdb.RocksDBException;
import org.rocksdb.Status.Code; import org.rocksdb.Status.Code;
import org.rocksdb.Transaction; import org.rocksdb.Transaction;
import org.rocksdb.TransactionOptions;
import org.rocksdb.WriteBatch; import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions; import org.rocksdb.WriteOptions;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
@ -38,7 +40,7 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn<Optimis
boolean nettyDirect, boolean nettyDirect,
BufferAllocator alloc, BufferAllocator alloc,
String databaseName, String databaseName,
ColumnFamilyHandle cfh, RocksObj<ColumnFamilyHandle> cfh,
MeterRegistry meterRegistry, MeterRegistry meterRegistry,
StampedLock closeLock) { StampedLock closeLock) {
super(db, nettyDirect, alloc, databaseName, cfh, meterRegistry, closeLock); super(db, nettyDirect, alloc, databaseName, cfh, meterRegistry, closeLock);
@ -53,9 +55,9 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn<Optimis
} }
@Override @Override
protected boolean commitOptimistically(Transaction tx) throws RocksDBException { protected boolean commitOptimistically(RocksObj<Transaction> tx) throws RocksDBException {
try { try {
tx.commit(); tx.v().commit();
return true; return true;
} catch (RocksDBException ex) { } catch (RocksDBException ex) {
var status = ex.getStatus() != null ? ex.getStatus().getCode() : Code.Ok; var status = ex.getStatus() != null ? ex.getStatus().getCode() : Code.Ok;
@ -67,18 +69,19 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn<Optimis
} }
@Override @Override
protected Transaction beginTransaction(@NotNull WriteOptions writeOptions) { protected RocksObj<Transaction> beginTransaction(@NotNull RocksObj<WriteOptions> writeOptions,
return getDb().beginTransaction(writeOptions); RocksObj<TransactionOptions> txOpts) {
return new RocksObj<>(getDb().beginTransaction(writeOptions.v()));
} }
@Override @Override
public void write(WriteOptions writeOptions, WriteBatch writeBatch) throws RocksDBException { public void write(RocksObj<WriteOptions> writeOptions, WriteBatch writeBatch) throws RocksDBException {
getDb().write(writeOptions, writeBatch); getDb().write(writeOptions.v(), writeBatch);
} }
@Override @Override
public @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull ReadOptions readOptions, public @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull RocksObj<ReadOptions> readOptions,
@NotNull WriteOptions writeOptions, @NotNull RocksObj<WriteOptions> writeOptions,
Buffer key, Buffer key,
BinarySerializationFunction updater, BinarySerializationFunction updater,
UpdateAtomicResultMode returnMode) throws IOException { UpdateAtomicResultMode returnMode) throws IOException {
@ -89,7 +92,8 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn<Optimis
if (Schedulers.isInNonBlockingThread()) { if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called update in a nonblocking thread"); throw new UnsupportedOperationException("Called update in a nonblocking thread");
} }
try (var tx = beginTransaction(writeOptions)) { try (var txOpts = new RocksObj<>(new TransactionOptions());
var tx = beginTransaction(writeOptions, txOpts)) {
boolean committedSuccessfully; boolean committedSuccessfully;
int retries = 0; int retries = 0;
ExponentialPageLimits retryTime = null; ExponentialPageLimits retryTime = null;
@ -97,7 +101,7 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn<Optimis
Send<Buffer> sentCurData; Send<Buffer> sentCurData;
boolean changed; boolean changed;
do { do {
var prevDataArray = tx.getForUpdate(readOptions, cfh, keyArray, true); var prevDataArray = tx.v().getForUpdate(readOptions.v(), cfh.v(), keyArray, true);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, logger.trace(MARKER_ROCKSDB,
"Reading {}: {} (before update)", "Reading {}: {} (before update)",
@ -135,7 +139,7 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn<Optimis
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key)); logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
} }
tx.delete(cfh, keyArray, true); tx.v().delete(cfh.v(), keyArray, true);
changed = true; changed = true;
committedSuccessfully = commitOptimistically(tx); committedSuccessfully = commitOptimistically(tx);
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) { } else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
@ -146,19 +150,19 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn<Optimis
LLUtils.toStringSafe(newData) LLUtils.toStringSafe(newData)
); );
} }
tx.put(cfh, keyArray, newDataArray); tx.v().put(cfh.v(), keyArray, newDataArray);
changed = true; changed = true;
committedSuccessfully = commitOptimistically(tx); committedSuccessfully = commitOptimistically(tx);
} else { } else {
changed = false; changed = false;
committedSuccessfully = true; committedSuccessfully = true;
tx.rollback(); tx.v().rollback();
} }
sentPrevData = prevData == null ? null : prevData.send(); sentPrevData = prevData == null ? null : prevData.send();
sentCurData = newData == null ? null : newData.send(); sentCurData = newData == null ? null : newData.send();
if (!committedSuccessfully) { if (!committedSuccessfully) {
tx.undoGetForUpdate(cfh, keyArray); tx.v().undoGetForUpdate(cfh.v(), keyArray);
tx.rollback(); tx.v().rollback();
if (sentPrevData != null) { if (sentPrevData != null) {
sentPrevData.close(); sentPrevData.close();
} }

View File

@ -9,6 +9,7 @@ import io.netty5.buffer.api.MemoryManager;
import io.netty5.buffer.api.Send; import io.netty5.buffer.api.Send;
import it.cavallium.dbengine.database.LLDelta; import it.cavallium.dbengine.database.LLDelta;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.rocksdb.RocksObj;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.StampedLock; import java.util.concurrent.locks.StampedLock;
@ -31,26 +32,27 @@ public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn<Transa
boolean nettyDirect, boolean nettyDirect,
BufferAllocator alloc, BufferAllocator alloc,
String dbName, String dbName,
ColumnFamilyHandle cfh, RocksObj<ColumnFamilyHandle> cfh,
MeterRegistry meterRegistry, MeterRegistry meterRegistry,
StampedLock closeLock) { StampedLock closeLock) {
super(db, nettyDirect, alloc, dbName, cfh, meterRegistry, closeLock); super(db, nettyDirect, alloc, dbName, cfh, meterRegistry, closeLock);
} }
@Override @Override
protected boolean commitOptimistically(Transaction tx) throws RocksDBException { protected boolean commitOptimistically(RocksObj<Transaction> tx) throws RocksDBException {
tx.commit(); tx.v().commit();
return true; return true;
} }
@Override @Override
protected Transaction beginTransaction(@NotNull WriteOptions writeOptions) { protected RocksObj<Transaction> beginTransaction(@NotNull RocksObj<WriteOptions> writeOptions,
return getDb().beginTransaction(writeOptions, DEFAULT_TX_OPTIONS); RocksObj<TransactionOptions> txOpts) {
return new RocksObj<>(getDb().beginTransaction(writeOptions.v(), txOpts.v()));
} }
@Override @Override
public @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull ReadOptions readOptions, public @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull RocksObj<ReadOptions> readOptions,
@NotNull WriteOptions writeOptions, @NotNull RocksObj<WriteOptions> writeOptions,
Buffer key, Buffer key,
BinarySerializationFunction updater, BinarySerializationFunction updater,
UpdateAtomicResultMode returnMode) throws IOException { UpdateAtomicResultMode returnMode) throws IOException {
@ -61,14 +63,15 @@ public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn<Transa
if (Schedulers.isInNonBlockingThread()) { if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called update in a nonblocking thread"); throw new UnsupportedOperationException("Called update in a nonblocking thread");
} }
try (var tx = beginTransaction(writeOptions)) { try (var txOpts = new RocksObj<>(new TransactionOptions());
var tx = beginTransaction(writeOptions, txOpts)) {
Send<Buffer> sentPrevData; Send<Buffer> sentPrevData;
Send<Buffer> sentCurData; Send<Buffer> sentCurData;
boolean changed; boolean changed;
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Reading {} (before update lock)", LLUtils.toStringSafe(key)); logger.trace(MARKER_ROCKSDB, "Reading {} (before update lock)", LLUtils.toStringSafe(key));
} }
var prevDataArray = tx.getForUpdate(readOptions, cfh, keyArray, true); var prevDataArray = tx.v().getForUpdate(readOptions.v(), cfh.v(), keyArray, true);
try { try {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, logger.trace(MARKER_ROCKSDB,
@ -109,9 +112,9 @@ public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn<Transa
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key)); logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
} }
writeValueBufferSize.record(0); writeValueBufferSize.record(0);
tx.delete(cfh, keyArray, true); tx.v().delete(cfh.v(), keyArray, true);
changed = true; changed = true;
tx.commit(); tx.v().commit();
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) { } else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, logger.trace(MARKER_ROCKSDB,
@ -121,19 +124,19 @@ public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn<Transa
); );
} }
writeValueBufferSize.record(newDataArray.length); writeValueBufferSize.record(newDataArray.length);
tx.put(cfh, keyArray, newDataArray); tx.v().put(cfh.v(), keyArray, newDataArray);
changed = true; changed = true;
tx.commit(); tx.v().commit();
} else { } else {
changed = false; changed = false;
tx.rollback(); tx.v().rollback();
} }
sentPrevData = prevData == null ? null : prevData.send(); sentPrevData = prevData == null ? null : prevData.send();
sentCurData = newData == null ? null : newData.send(); sentCurData = newData == null ? null : newData.send();
} }
} }
} finally { } finally {
tx.undoGetForUpdate(cfh, keyArray); tx.v().undoGetForUpdate(cfh.v(), keyArray);
} }
recordAtomicUpdateTime(changed, sentPrevData != null, sentCurData != null, initNanoTime); recordAtomicUpdateTime(changed, sentPrevData != null, sentCurData != null, initNanoTime);
return switch (returnMode) { return switch (returnMode) {

View File

@ -1,13 +0,0 @@
package it.cavallium.dbengine.database.disk;
import io.netty5.buffer.api.Buffer;
import it.cavallium.dbengine.database.SafeCloseable;
import org.rocksdb.AbstractSlice;
public interface ReleasableSlice extends SafeCloseable {
@Override
default void close() {
}
}

View File

@ -5,6 +5,8 @@ import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.BufferAllocator; import io.netty5.buffer.api.BufferAllocator;
import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.rocksdb.RocksIteratorObj;
import it.cavallium.dbengine.database.disk.rocksdb.RocksObj;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
@ -22,13 +24,12 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn {
/** /**
* This method should not modify or move the writerIndex/readerIndex of the buffers inside the range * This method should not modify or move the writerIndex/readerIndex of the buffers inside the range
*/ */
@NotNull @NotNull RocksIteratorObj newRocksIterator(boolean allowNettyDirect,
RocksIteratorTuple newRocksIterator(boolean allowNettyDirect, RocksObj<ReadOptions> readOptions,
ReadOptions readOptions,
LLRange range, LLRange range,
boolean reverse) throws RocksDBException; boolean reverse) throws RocksDBException;
default byte @Nullable [] get(@NotNull ReadOptions readOptions, default byte @Nullable [] get(@NotNull RocksObj<ReadOptions> readOptions,
byte[] key, byte[] key,
boolean existsAlmostCertainly) boolean existsAlmostCertainly)
throws RocksDBException { throws RocksDBException {
@ -45,15 +46,15 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn {
} }
@Nullable @Nullable
Buffer get(@NotNull ReadOptions readOptions, Buffer key) throws RocksDBException; Buffer get(@NotNull RocksObj<ReadOptions> readOptions, Buffer key) throws RocksDBException;
boolean exists(@NotNull ReadOptions readOptions, Buffer key) throws RocksDBException; boolean exists(@NotNull RocksObj<ReadOptions> readOptions, Buffer key) throws RocksDBException;
boolean mayExists(@NotNull ReadOptions readOptions, Buffer key) throws RocksDBException; boolean mayExists(@NotNull RocksObj<ReadOptions> readOptions, Buffer key) throws RocksDBException;
void put(@NotNull WriteOptions writeOptions, Buffer key, Buffer value) throws RocksDBException; void put(@NotNull RocksObj<WriteOptions> writeOptions, Buffer key, Buffer value) throws RocksDBException;
default void put(@NotNull WriteOptions writeOptions, byte[] key, byte[] value) throws RocksDBException { default void put(@NotNull RocksObj<WriteOptions> writeOptions, byte[] key, byte[] value) throws RocksDBException {
var allocator = getAllocator(); var allocator = getAllocator();
try (var keyBuf = allocator.allocate(key.length)) { try (var keyBuf = allocator.allocate(key.length)) {
keyBuf.writeBytes(key); keyBuf.writeBytes(key);
@ -65,31 +66,33 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn {
} }
} }
@NotNull RocksDBIterator newIterator(@NotNull ReadOptions readOptions); @NotNull RocksIteratorObj newIterator(@NotNull RocksObj<ReadOptions> readOptions, @Nullable Buffer min, @Nullable Buffer max);
@NotNull UpdateAtomicResult updateAtomic(@NotNull ReadOptions readOptions, @NotNull WriteOptions writeOptions, @NotNull UpdateAtomicResult updateAtomic(@NotNull RocksObj<ReadOptions> readOptions,
Buffer key, BinarySerializationFunction updater, @NotNull RocksObj<WriteOptions> writeOptions,
Buffer key,
BinarySerializationFunction updater,
UpdateAtomicResultMode returnMode) throws RocksDBException, IOException; UpdateAtomicResultMode returnMode) throws RocksDBException, IOException;
void delete(WriteOptions writeOptions, Buffer key) throws RocksDBException; void delete(RocksObj<WriteOptions> writeOptions, Buffer key) throws RocksDBException;
void delete(WriteOptions writeOptions, byte[] key) throws RocksDBException; void delete(RocksObj<WriteOptions> writeOptions, byte[] key) throws RocksDBException;
List<byte[]> multiGetAsList(ReadOptions readOptions, List<byte[]> keys) throws RocksDBException; List<byte[]> multiGetAsList(RocksObj<ReadOptions> readOptions, List<byte[]> keys) throws RocksDBException;
void write(WriteOptions writeOptions, WriteBatch writeBatch) throws RocksDBException; void write(RocksObj<WriteOptions> writeOptions, WriteBatch writeBatch) throws RocksDBException;
void suggestCompactRange() throws RocksDBException; void suggestCompactRange() throws RocksDBException;
void compactRange(byte[] begin, byte[] end, CompactRangeOptions options) throws RocksDBException; void compactRange(byte[] begin, byte[] end, RocksObj<CompactRangeOptions> options) throws RocksDBException;
void flush(FlushOptions options) throws RocksDBException; void flush(RocksObj<FlushOptions> options) throws RocksDBException;
void flushWal(boolean sync) throws RocksDBException; void flushWal(boolean sync) throws RocksDBException;
long getLongProperty(String property) throws RocksDBException; long getLongProperty(String property) throws RocksDBException;
ColumnFamilyHandle getColumnFamilyHandle(); RocksObj<ColumnFamilyHandle> getColumnFamilyHandle();
BufferAllocator getAllocator(); BufferAllocator getAllocator();

View File

@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.disk;
import static com.google.common.collect.Lists.partition; import static com.google.common.collect.Lists.partition;
import it.cavallium.dbengine.database.disk.rocksdb.RocksObj;
import it.cavallium.dbengine.rpc.current.data.Column; import it.cavallium.dbengine.rpc.current.data.Column;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
@ -96,7 +97,7 @@ public class RocksDBUtils {
} }
} }
public static void ensureOpen(RocksDB db, @Nullable ColumnFamilyHandle cfh) { public static void ensureOpen(RocksDB db, @Nullable RocksObj<ColumnFamilyHandle> cfh) {
if (Schedulers.isInNonBlockingThread()) { if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called in a nonblocking thread"); throw new UnsupportedOperationException("Called in a nonblocking thread");
} }
@ -109,4 +110,10 @@ public class RocksDBUtils {
throw new IllegalStateException("Not owning handle"); throw new IllegalStateException("Not owning handle");
} }
} }
public static void ensureOwned(@Nullable RocksObj<?> rocksObject) {
if (rocksObject != null && !rocksObject.isAccessible()) {
throw new IllegalStateException("Not owning handle");
}
}
} }

View File

@ -1,9 +1,11 @@
package it.cavallium.dbengine.database.disk; package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.SafeCloseable; import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.rocksdb.RocksIteratorObj;
import it.cavallium.dbengine.database.disk.rocksdb.RocksObj;
import org.rocksdb.ReadOptions; import org.rocksdb.ReadOptions;
record RocksIterWithReadOpts(ReadOptions readOptions, RocksIteratorTuple iter) implements SafeCloseable { public record RocksIterWithReadOpts(RocksObj<ReadOptions> readOptions, RocksIteratorObj iter) implements SafeCloseable {
@Override @Override
public void close() { public void close() {

View File

@ -1,23 +0,0 @@
package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.SafeCloseable;
import java.util.List;
import org.jetbrains.annotations.NotNull;
import org.rocksdb.AbstractImmutableNativeReference;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksIterator;
import org.rocksdb.RocksObject;
public record RocksIteratorTuple(@NotNull RocksDBIterator iterator,
@NotNull ReleasableSlice sliceMin,
@NotNull ReleasableSlice sliceMax,
@NotNull SafeCloseable seekTo) implements SafeCloseable {
@Override
public void close() {
iterator.close();
sliceMin.close();
sliceMax.close();
seekTo.close();
}
}

View File

@ -7,6 +7,7 @@ import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.BufferAllocator; import io.netty5.buffer.api.BufferAllocator;
import it.cavallium.dbengine.database.LLDelta; import it.cavallium.dbengine.database.LLDelta;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.rocksdb.RocksObj;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.StampedLock; import java.util.concurrent.locks.StampedLock;
@ -16,6 +17,7 @@ import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions; import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB; import org.rocksdb.RocksDB;
import org.rocksdb.Transaction; import org.rocksdb.Transaction;
import org.rocksdb.TransactionOptions;
import org.rocksdb.WriteOptions; import org.rocksdb.WriteOptions;
public final class StandardRocksDBColumn extends AbstractRocksDBColumn<RocksDB> { public final class StandardRocksDBColumn extends AbstractRocksDBColumn<RocksDB> {
@ -24,23 +26,26 @@ public final class StandardRocksDBColumn extends AbstractRocksDBColumn<RocksDB>
boolean nettyDirect, boolean nettyDirect,
BufferAllocator alloc, BufferAllocator alloc,
String dbName, String dbName,
ColumnFamilyHandle cfh, MeterRegistry meterRegistry, StampedLock closeLock) { RocksObj<ColumnFamilyHandle> cfh,
MeterRegistry meterRegistry,
StampedLock closeLock) {
super(db, nettyDirect, alloc, dbName, cfh, meterRegistry, closeLock); super(db, nettyDirect, alloc, dbName, cfh, meterRegistry, closeLock);
} }
@Override @Override
protected boolean commitOptimistically(Transaction tx) { protected boolean commitOptimistically(RocksObj<Transaction> tx) {
throw new UnsupportedOperationException("Transactions not supported"); throw new UnsupportedOperationException("Transactions not supported");
} }
@Override @Override
protected Transaction beginTransaction(@NotNull WriteOptions writeOptions) { protected RocksObj<Transaction> beginTransaction(@NotNull RocksObj<WriteOptions> writeOptions,
RocksObj<TransactionOptions> txOpts) {
throw new UnsupportedOperationException("Transactions not supported"); throw new UnsupportedOperationException("Transactions not supported");
} }
@Override @Override
public @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull ReadOptions readOptions, public @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull RocksObj<ReadOptions> readOptions,
@NotNull WriteOptions writeOptions, @NotNull RocksObj<WriteOptions> writeOptions,
Buffer key, Buffer key,
BinarySerializationFunction updater, BinarySerializationFunction updater,
UpdateAtomicResultMode returnMode) throws IOException { UpdateAtomicResultMode returnMode) throws IOException {

View File

@ -0,0 +1,58 @@
package it.cavallium.dbengine.database.disk.rocksdb;
import io.netty5.buffer.api.Drop;
import io.netty5.buffer.api.Owned;
import io.netty5.buffer.api.internal.ResourceSupport;
import org.rocksdb.AbstractSlice;
import org.rocksdb.DirectSlice;
public abstract class LLAbstractSlice<T extends AbstractSlice<U>, U> extends ResourceSupport<LLAbstractSlice<T, U>, LLAbstractSlice<T, U>> {
protected static final Drop<LLAbstractSlice<?, ?>> DROP = new Drop<>() {
@Override
public void drop(LLAbstractSlice obj) {
if (obj.val != null) {
obj.val.close();
}
}
@Override
public Drop<LLAbstractSlice<?, ?>> fork() {
return this;
}
@Override
public void attach(LLAbstractSlice obj) {
}
};
private T val;
public LLAbstractSlice(T val) {
//noinspection unchecked
super((Drop<LLAbstractSlice<T, U>>) (Drop<?>) DROP);
this.val = val;
}
public T getNative() {
return val;
}
@Override
protected final void makeInaccessible() {
this.val = null;
}
@Override
protected final Owned<LLAbstractSlice<T, U>> prepareSend() {
var val = this.val;
return drop -> {
var instance = createInstance(val);
drop.attach(instance);
return instance;
};
}
protected abstract LLAbstractSlice<T, U> createInstance(T val);
}

View File

@ -0,0 +1,60 @@
package it.cavallium.dbengine.database.disk.rocksdb;
import io.netty5.buffer.api.Drop;
import io.netty5.buffer.api.Owned;
import io.netty5.buffer.api.internal.ResourceSupport;
import org.rocksdb.AbstractSlice;
import org.rocksdb.ColumnFamilyHandle;
public final class LLColumnFamilyHandle extends ResourceSupport<LLColumnFamilyHandle, LLColumnFamilyHandle> {
private static final Drop<LLColumnFamilyHandle> DROP = new Drop<>() {
@Override
public void drop(LLColumnFamilyHandle obj) {
if (obj.val != null) {
obj.val.close();
}
}
@Override
public Drop<LLColumnFamilyHandle> fork() {
return this;
}
@Override
public void attach(LLColumnFamilyHandle obj) {
}
};
private ColumnFamilyHandle val;
public LLColumnFamilyHandle(ColumnFamilyHandle val) {
super(DROP);
this.val = val;
}
public ColumnFamilyHandle getNative() {
return val;
}
@Override
protected RuntimeException createResourceClosedException() {
return new IllegalStateException("Closed");
}
@Override
protected void makeInaccessible() {
this.val = null;
}
@Override
protected Owned<LLColumnFamilyHandle> prepareSend() {
var val = this.val;
return drop -> {
var instance = new LLColumnFamilyHandle(val);
drop.attach(instance);
return instance;
};
}
}

View File

@ -0,0 +1,59 @@
package it.cavallium.dbengine.database.disk.rocksdb;
import io.netty5.buffer.api.Drop;
import io.netty5.buffer.api.Owned;
import io.netty5.buffer.api.internal.ResourceSupport;
import org.rocksdb.CompactionOptions;
public final class LLCompactionOptions extends ResourceSupport<LLCompactionOptions, LLCompactionOptions> {
private static final Drop<LLCompactionOptions> DROP = new Drop<>() {
@Override
public void drop(LLCompactionOptions obj) {
if (obj.val != null) {
obj.val.close();
}
}
@Override
public Drop<LLCompactionOptions> fork() {
return this;
}
@Override
public void attach(LLCompactionOptions obj) {
}
};
private CompactionOptions val;
public LLCompactionOptions(CompactionOptions val) {
super(DROP);
this.val = val;
}
public CompactionOptions getNative() {
return val;
}
@Override
protected RuntimeException createResourceClosedException() {
return new IllegalStateException("Closed");
}
@Override
protected void makeInaccessible() {
this.val = null;
}
@Override
protected Owned<LLCompactionOptions> prepareSend() {
var val = this.val;
return drop -> {
var instance = new LLCompactionOptions(val);
drop.attach(instance);
return instance;
};
}
}

View File

@ -0,0 +1,29 @@
package it.cavallium.dbengine.database.disk.rocksdb;
import io.netty5.buffer.api.Drop;
import io.netty5.buffer.api.Owned;
import io.netty5.buffer.api.internal.ResourceSupport;
import java.nio.ByteBuffer;
import org.rocksdb.AbstractSlice;
import org.rocksdb.DirectSlice;
public final class LLDirectSlice extends LLAbstractSlice<DirectSlice, ByteBuffer> {
public LLDirectSlice(DirectSlice val) {
super(val);
}
public DirectSlice getNative() {
return super.getNative();
}
@Override
protected LLAbstractSlice<DirectSlice, ByteBuffer> createInstance(DirectSlice val) {
return new LLDirectSlice(val);
}
@Override
protected RuntimeException createResourceClosedException() {
return new IllegalStateException("Closed");
}
}

View File

@ -0,0 +1,60 @@
package it.cavallium.dbengine.database.disk.rocksdb;
import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.Drop;
import io.netty5.buffer.api.Owned;
import io.netty5.buffer.api.Send;
import io.netty5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.database.LLDelta;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLLocalGroupedReactiveRocksIterator;
import org.rocksdb.ReadOptions;
public final class LLReadOptions extends ResourceSupport<LLReadOptions, LLReadOptions> {
private static final Drop<LLReadOptions> DROP = new Drop<>() {
@Override
public void drop(LLReadOptions obj) {
if (obj.val != null) {
obj.val.close();
}
}
@Override
public Drop<LLReadOptions> fork() {
return this;
}
@Override
public void attach(LLReadOptions obj) {
}
};
private ReadOptions val;
public LLReadOptions(ReadOptions val) {
super(DROP);
this.val = val;
}
@Override
protected RuntimeException createResourceClosedException() {
return new IllegalStateException("Closed");
}
@Override
protected void makeInaccessible() {
this.val = null;
}
@Override
protected Owned<LLReadOptions> prepareSend() {
var val = this.val;
return drop -> {
var instance = new LLReadOptions(val);
drop.attach(instance);
return instance;
};
}
}

View File

@ -0,0 +1,55 @@
package it.cavallium.dbengine.database.disk.rocksdb;
import io.netty5.buffer.api.Drop;
import io.netty5.buffer.api.Owned;
import io.netty5.buffer.api.internal.ResourceSupport;
import org.rocksdb.WriteOptions;
public final class LLWriteOptions extends ResourceSupport<LLWriteOptions, LLWriteOptions> {
private static final Drop<LLWriteOptions> DROP = new Drop<>() {
@Override
public void drop(LLWriteOptions obj) {
if (obj.val != null) {
obj.val.close();
}
}
@Override
public Drop<LLWriteOptions> fork() {
return this;
}
@Override
public void attach(LLWriteOptions obj) {
}
};
private WriteOptions val;
public LLWriteOptions(WriteOptions val) {
super(DROP);
this.val = val;
}
@Override
protected RuntimeException createResourceClosedException() {
return new IllegalStateException("Closed");
}
@Override
protected void makeInaccessible() {
this.val = null;
}
@Override
protected Owned<LLWriteOptions> prepareSend() {
var val = this.val;
return drop -> {
var instance = new LLWriteOptions(val);
drop.attach(instance);
return instance;
};
}
}

View File

@ -1,21 +1,52 @@
package it.cavallium.dbengine.database.disk; package it.cavallium.dbengine.database.disk.rocksdb;
import static it.cavallium.dbengine.database.LLUtils.isReadOnlyDirect; import static it.cavallium.dbengine.database.LLUtils.isReadOnlyDirect;
import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.Timer;
import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.Drop;
import io.netty5.buffer.api.Owned;
import io.netty5.buffer.api.ReadableComponent; import io.netty5.buffer.api.ReadableComponent;
import io.netty5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.SafeCloseable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.jetbrains.annotations.Nullable; import org.rocksdb.AbstractSlice;
import org.rocksdb.RocksDBException; import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator; import org.rocksdb.RocksIterator;
public class RocksDBIterator implements SafeCloseable { public class RocksIteratorObj extends ResourceSupport<RocksIteratorObj, RocksIteratorObj> {
private final RocksIterator rocksIterator; protected static final Drop<RocksIteratorObj> DROP = new Drop<>() {
@Override
public void drop(RocksIteratorObj obj) {
if (obj.rocksIterator != null) {
obj.rocksIterator.close();
}
if (obj.sliceMin != null) {
obj.sliceMin.close();
}
if (obj.sliceMax != null) {
obj.sliceMax.close();
}
}
@Override
public Drop<RocksIteratorObj> fork() {
return this;
}
@Override
public void attach(RocksIteratorObj obj) {
}
};
private RocksIterator rocksIterator;
private RocksObj<? extends AbstractSlice<?>> sliceMin;
private RocksObj<? extends AbstractSlice<?>> sliceMax;
private Buffer min;
private Buffer max;
private final boolean allowNettyDirect; private final boolean allowNettyDirect;
private final Counter startedIterSeek; private final Counter startedIterSeek;
private final Counter endedIterSeek; private final Counter endedIterSeek;
@ -23,8 +54,14 @@ public class RocksDBIterator implements SafeCloseable {
private final Counter startedIterNext; private final Counter startedIterNext;
private final Counter endedIterNext; private final Counter endedIterNext;
private final Timer iterNextTime; private final Timer iterNextTime;
private Object seekingFrom;
private Object seekingTo;
public RocksDBIterator(RocksIterator rocksIterator, public RocksIteratorObj(RocksIterator rocksIterator,
RocksObj<? extends AbstractSlice<?>> sliceMin,
RocksObj<? extends AbstractSlice<?>> sliceMax,
Buffer min,
Buffer max,
boolean allowNettyDirect, boolean allowNettyDirect,
Counter startedIterSeek, Counter startedIterSeek,
Counter endedIterSeek, Counter endedIterSeek,
@ -32,6 +69,42 @@ public class RocksDBIterator implements SafeCloseable {
Counter startedIterNext, Counter startedIterNext,
Counter endedIterNext, Counter endedIterNext,
Timer iterNextTime) { Timer iterNextTime) {
this(rocksIterator,
sliceMin,
sliceMax,
min,
max,
allowNettyDirect,
startedIterSeek,
endedIterSeek,
iterSeekTime,
startedIterNext,
endedIterNext,
iterNextTime,
null,
null
);
}
private RocksIteratorObj(RocksIterator rocksIterator,
RocksObj<? extends AbstractSlice<?>> sliceMin,
RocksObj<? extends AbstractSlice<?>> sliceMax,
Buffer min,
Buffer max,
boolean allowNettyDirect,
Counter startedIterSeek,
Counter endedIterSeek,
Timer iterSeekTime,
Counter startedIterNext,
Counter endedIterNext,
Timer iterNextTime,
Object seekingFrom,
Object seekingTo) {
super(DROP);
this.sliceMin = sliceMin;
this.sliceMax = sliceMax;
this.min = min;
this.max = max;
this.rocksIterator = rocksIterator; this.rocksIterator = rocksIterator;
this.allowNettyDirect = allowNettyDirect; this.allowNettyDirect = allowNettyDirect;
this.startedIterSeek = startedIterSeek; this.startedIterSeek = startedIterSeek;
@ -40,11 +113,8 @@ public class RocksDBIterator implements SafeCloseable {
this.startedIterNext = startedIterNext; this.startedIterNext = startedIterNext;
this.endedIterNext = endedIterNext; this.endedIterNext = endedIterNext;
this.iterNextTime = iterNextTime; this.iterNextTime = iterNextTime;
} this.seekingFrom = seekingFrom;
this.seekingTo = seekingTo;
@Override
public void close() {
rocksIterator.close();
} }
public void seek(ByteBuffer seekBuf) throws RocksDBException { public void seek(ByteBuffer seekBuf) throws RocksDBException {
@ -90,25 +160,25 @@ public class RocksDBIterator implements SafeCloseable {
/** /**
* Useful for reverse iterations * Useful for reverse iterations
*/ */
@Nullable public void seekFrom(Buffer key) {
public SafeCloseable seekFrom(Buffer key) {
if (allowNettyDirect && isReadOnlyDirect(key)) { if (allowNettyDirect && isReadOnlyDirect(key)) {
ByteBuffer keyInternalByteBuffer = ((ReadableComponent) key).readableBuffer(); ByteBuffer keyInternalByteBuffer = ((ReadableComponent) key).readableBuffer();
assert keyInternalByteBuffer.position() == 0; assert keyInternalByteBuffer.position() == 0;
rocksIterator.seekForPrev(keyInternalByteBuffer); rocksIterator.seekForPrev(keyInternalByteBuffer);
// This is useful to retain the key buffer in memory and avoid deallocations // This is useful to retain the key buffer in memory and avoid deallocations
return key::isAccessible; this.seekingFrom = key;
} else { } else {
rocksIterator.seekForPrev(LLUtils.toArray(key)); var keyArray = LLUtils.toArray(key);
return null; rocksIterator.seekForPrev(keyArray);
// This is useful to retain the key buffer in memory and avoid deallocations
this.seekingFrom = keyArray;
} }
} }
/** /**
* Useful for forward iterations * Useful for forward iterations
*/ */
@Nullable public void seekTo(Buffer key) {
public SafeCloseable seekTo(Buffer key) {
if (allowNettyDirect && isReadOnlyDirect(key)) { if (allowNettyDirect && isReadOnlyDirect(key)) {
ByteBuffer keyInternalByteBuffer = ((ReadableComponent) key).readableBuffer(); ByteBuffer keyInternalByteBuffer = ((ReadableComponent) key).readableBuffer();
assert keyInternalByteBuffer.position() == 0; assert keyInternalByteBuffer.position() == 0;
@ -116,13 +186,14 @@ public class RocksDBIterator implements SafeCloseable {
iterSeekTime.record(() -> rocksIterator.seek(keyInternalByteBuffer)); iterSeekTime.record(() -> rocksIterator.seek(keyInternalByteBuffer));
endedIterSeek.increment(); endedIterSeek.increment();
// This is useful to retain the key buffer in memory and avoid deallocations // This is useful to retain the key buffer in memory and avoid deallocations
return key::isAccessible; this.seekingTo = key;
} else { } else {
var array = LLUtils.toArray(key); var keyArray = LLUtils.toArray(key);
startedIterSeek.increment(); startedIterSeek.increment();
iterSeekTime.record(() -> rocksIterator.seek(array)); iterSeekTime.record(() -> rocksIterator.seek(keyArray));
endedIterSeek.increment(); endedIterSeek.increment();
return null; // This is useful to retain the key buffer in memory and avoid deallocations
this.seekingTo = keyArray;
} }
} }
@ -173,4 +244,50 @@ public class RocksDBIterator implements SafeCloseable {
rocksIterator.prev(); rocksIterator.prev();
} }
} }
@Override
protected void makeInaccessible() {
this.rocksIterator = null;
this.sliceMin = null;
this.sliceMax = null;
this.min = null;
this.max = null;
this.seekingFrom = null;
this.seekingTo = null;
}
@Override
protected RuntimeException createResourceClosedException() {
return new IllegalStateException("Closed");
}
@Override
protected Owned<RocksIteratorObj> prepareSend() {
var rocksIterator = this.rocksIterator;
var sliceMin = this.sliceMin;
var sliceMax = this.sliceMax;
var minSend = this.min != null ? this.min.send() : null;
var maxSend = this.max != null ? this.max.send() : null;
var seekingFrom = this.seekingFrom;
var seekingTo = this.seekingTo;
return drop -> {
var instance = new RocksIteratorObj(rocksIterator,
sliceMin,
sliceMax,
minSend != null ? minSend.receive() : null,
maxSend != null ? maxSend.receive() : null,
allowNettyDirect,
startedIterSeek,
endedIterSeek,
iterSeekTime,
startedIterNext,
endedIterNext,
iterNextTime,
seekingFrom,
seekingTo
);
drop.attach(instance);
return instance;
};
}
} }

View File

@ -0,0 +1,62 @@
package it.cavallium.dbengine.database.disk.rocksdb;
import io.netty5.buffer.api.Drop;
import io.netty5.buffer.api.Owned;
import io.netty5.buffer.api.internal.ResourceSupport;
import org.jetbrains.annotations.NotNull;
import org.rocksdb.AbstractNativeReference;
public class RocksObj<T extends AbstractNativeReference> extends ResourceSupport<RocksObj<T>, RocksObj<T>> {
private static final Drop<RocksObj<?>> DROP = new Drop<>() {
@Override
public void drop(RocksObj obj) {
if (obj.val != null) {
obj.val.close();
}
}
@Override
public Drop<RocksObj<?>> fork() {
return this;
}
@Override
public void attach(RocksObj obj) {
}
};
private T val;
public RocksObj(T val) {
//noinspection unchecked
super((Drop<RocksObj<T>>) (Drop<?>) DROP);
this.val = val;
}
@NotNull
public T v() {
return val;
}
@Override
protected void makeInaccessible() {
this.val = null;
}
@Override
protected RuntimeException createResourceClosedException() {
return new IllegalStateException("Closed");
}
@Override
protected Owned<RocksObj<T>> prepareSend() {
var val = this.val;
return drop -> {
var instance = new RocksObj<>(val);
drop.attach(instance);
return instance;
};
}
}

View File

@ -6,6 +6,7 @@ import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLTempHugePqEnv; import it.cavallium.dbengine.database.disk.LLTempHugePqEnv;
import it.cavallium.dbengine.database.disk.HugePqEnv; import it.cavallium.dbengine.database.disk.HugePqEnv;
import it.cavallium.dbengine.database.disk.StandardRocksDBColumn; import it.cavallium.dbengine.database.disk.StandardRocksDBColumn;
import it.cavallium.dbengine.database.disk.rocksdb.RocksObj;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.rocksdb.ReadOptions; import org.rocksdb.ReadOptions;
@ -25,8 +26,6 @@ public class HugePqArray<V> implements IArray<V>, SafeCloseable {
private final HugePqEnv env; private final HugePqEnv env;
private final int hugePqId; private final int hugePqId;
private final StandardRocksDBColumn rocksDB; private final StandardRocksDBColumn rocksDB;
private static final WriteOptions WRITE_OPTIONS = new WriteOptions().setDisableWAL(true).setSync(false);
private static final ReadOptions READ_OPTIONS = new ReadOptions().setVerifyChecksums(false);
private final V defaultValue; private final V defaultValue;
private final long virtualSize; private final long virtualSize;
@ -42,6 +41,14 @@ public class HugePqArray<V> implements IArray<V>, SafeCloseable {
this.virtualSize = size; this.virtualSize = size;
} }
private static RocksObj<ReadOptions> newReadOptions() {
return new RocksObj<>(new ReadOptions().setVerifyChecksums(false));
}
private static RocksObj<WriteOptions> newWriteOptions() {
return new RocksObj<>(new WriteOptions().setDisableWAL(true).setSync(false));
}
public HugePqCodec<V> getValueCodec() { public HugePqCodec<V> getValueCodec() {
return valueCodec; return valueCodec;
} }
@ -59,9 +66,10 @@ public class HugePqArray<V> implements IArray<V>, SafeCloseable {
ensureBounds(index); ensureBounds(index);
ensureThread(); ensureThread();
var keyBuf = allocate(Long.BYTES); var keyBuf = allocate(Long.BYTES);
try (var valueBuf = valueCodec.serialize(this::allocate, value); keyBuf) { try (var writeOptions = newWriteOptions();
var valueBuf = valueCodec.serialize(this::allocate, value); keyBuf) {
keyBuf.writeLong(index); keyBuf.writeLong(index);
rocksDB.put(WRITE_OPTIONS, keyBuf, valueBuf); rocksDB.put(writeOptions, keyBuf, valueBuf);
} catch (RocksDBException e) { } catch (RocksDBException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
@ -71,10 +79,10 @@ public class HugePqArray<V> implements IArray<V>, SafeCloseable {
public void reset(long index) { public void reset(long index) {
ensureBounds(index); ensureBounds(index);
ensureThread(); ensureThread();
var keyBuf = allocate(Long.BYTES); try (var writeOptions = newWriteOptions();
try (keyBuf) { var keyBuf = allocate(Long.BYTES)) {
keyBuf.writeLong(index); keyBuf.writeLong(index);
rocksDB.delete(WRITE_OPTIONS, keyBuf); rocksDB.delete(writeOptions, keyBuf);
} catch (RocksDBException e) { } catch (RocksDBException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
@ -88,7 +96,8 @@ public class HugePqArray<V> implements IArray<V>, SafeCloseable {
var keyBuf = allocate(Long.BYTES); var keyBuf = allocate(Long.BYTES);
try (keyBuf) { try (keyBuf) {
keyBuf.writeLong(index); keyBuf.writeLong(index);
try (var value = rocksDB.get(READ_OPTIONS, keyBuf)) { try (var readOptions = newReadOptions();
var value = rocksDB.get(readOptions, keyBuf)) {
if (value == null) { if (value == null) {
return null; return null;
} }

View File

@ -6,10 +6,12 @@ import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.SafeCloseable; import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.disk.LLTempHugePqEnv; import it.cavallium.dbengine.database.disk.LLTempHugePqEnv;
import it.cavallium.dbengine.database.disk.HugePqEnv; import it.cavallium.dbengine.database.disk.HugePqEnv;
import it.cavallium.dbengine.database.disk.RocksIteratorTuple; import it.cavallium.dbengine.database.disk.RocksIterWithReadOpts;
import it.cavallium.dbengine.database.disk.StandardRocksDBColumn; import it.cavallium.dbengine.database.disk.StandardRocksDBColumn;
import it.cavallium.dbengine.database.disk.UpdateAtomicResultMode; import it.cavallium.dbengine.database.disk.UpdateAtomicResultMode;
import it.cavallium.dbengine.database.disk.UpdateAtomicResultPrevious; import it.cavallium.dbengine.database.disk.UpdateAtomicResultPrevious;
import it.cavallium.dbengine.database.disk.rocksdb.RocksIteratorObj;
import it.cavallium.dbengine.database.disk.rocksdb.RocksObj;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -37,8 +39,6 @@ public class HugePqPriorityQueue<T> implements PriorityQueue<T>, Reversable<Reve
private final HugePqEnv env; private final HugePqEnv env;
private final int hugePqId; private final int hugePqId;
private final StandardRocksDBColumn rocksDB; private final StandardRocksDBColumn rocksDB;
private static final WriteOptions WRITE_OPTIONS = new WriteOptions().setDisableWAL(true).setSync(false);
private static final ReadOptions READ_OPTIONS = new ReadOptions().setVerifyChecksums(false);
private final HugePqCodec<T> codec; private final HugePqCodec<T> codec;
private long size = 0; private long size = 0;
@ -51,6 +51,14 @@ public class HugePqPriorityQueue<T> implements PriorityQueue<T>, Reversable<Reve
this.codec = codec; this.codec = codec;
} }
private static RocksObj<ReadOptions> newReadOptions() {
return new RocksObj<>(new ReadOptions().setVerifyChecksums(false));
}
private static RocksObj<WriteOptions> newWriteOptions() {
return new RocksObj<>(new WriteOptions().setDisableWAL(true).setSync(false));
}
private Buffer allocate(int size) { private Buffer allocate(int size) {
return rocksDB.getAllocator().allocate(size); return rocksDB.getAllocator().allocate(size);
} }
@ -69,7 +77,9 @@ public class HugePqPriorityQueue<T> implements PriorityQueue<T>, Reversable<Reve
var keyBuf = serializeKey(element); var keyBuf = serializeKey(element);
try (keyBuf) { try (keyBuf) {
rocksDB.updateAtomic(READ_OPTIONS, WRITE_OPTIONS, keyBuf, this::incrementOrAdd, UpdateAtomicResultMode.NOTHING); try (var readOptions = newReadOptions(); var writeOptions = newWriteOptions()) {
rocksDB.updateAtomic(readOptions, writeOptions, keyBuf, this::incrementOrAdd, UpdateAtomicResultMode.NOTHING);
}
++size; ++size;
} catch (IOException e) { } catch (IOException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
@ -101,17 +111,16 @@ public class HugePqPriorityQueue<T> implements PriorityQueue<T>, Reversable<Reve
} }
private T databaseTop() { private T databaseTop() {
try (var it = rocksDB.newRocksIterator(true, READ_OPTIONS, LLRange.all(), false)) { try (var readOptions = newReadOptions();
try (var rocksIterator = it.iterator()) { var it = rocksDB.newRocksIterator(true, readOptions, LLRange.all(), false)) {
rocksIterator.seekToFirst(); it.seekToFirst();
if (rocksIterator.isValid()) { if (it.isValid()) {
var key = rocksIterator.key(); var key = it.key();
try (var keyBuf = rocksDB.getAllocator().copyOf(key)) { try (var keyBuf = rocksDB.getAllocator().copyOf(key)) {
return deserializeKey(keyBuf); return deserializeKey(keyBuf);
}
} else {
return null;
} }
} else {
return null;
} }
} catch (RocksDBException e) { } catch (RocksDBException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
@ -121,19 +130,19 @@ public class HugePqPriorityQueue<T> implements PriorityQueue<T>, Reversable<Reve
@Override @Override
public T pop() { public T pop() {
ensureThread(); ensureThread();
try (var it = rocksDB.newRocksIterator(true, READ_OPTIONS, LLRange.all(), false)) { try (var readOptions = newReadOptions();
try (var rocksIterator = it.iterator()) { var writeOptions = newWriteOptions();
rocksIterator.seekToFirst(); var it = rocksDB.newRocksIterator(true, readOptions, LLRange.all(), false)) {
if (rocksIterator.isValid()) { it.seekToFirst();
var key = rocksIterator.key(); if (it.isValid()) {
try (var keyBuf = rocksDB.getAllocator().copyOf(key)) { var key = it.key();
rocksDB.updateAtomic(READ_OPTIONS, WRITE_OPTIONS, keyBuf, this::reduceOrRemove, UpdateAtomicResultMode.NOTHING); try (var keyBuf = rocksDB.getAllocator().copyOf(key)) {
--size; rocksDB.updateAtomic(readOptions, writeOptions, keyBuf, this::reduceOrRemove, UpdateAtomicResultMode.NOTHING);
return deserializeKey(keyBuf); --size;
} return deserializeKey(keyBuf);
} else {
return null;
} }
} else {
return null;
} }
} catch (RocksDBException | IOException e) { } catch (RocksDBException | IOException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
@ -180,10 +189,10 @@ public class HugePqPriorityQueue<T> implements PriorityQueue<T>, Reversable<Reve
@Override @Override
public void clear() { public void clear() {
ensureThread(); ensureThread();
try (var wb = new WriteBatch()) { try (var wb = new WriteBatch(); var wo = newWriteOptions()) {
wb.deleteRange(rocksDB.getColumnFamilyHandle(), new byte[0], getBiggestKey()); wb.deleteRange(rocksDB.getColumnFamilyHandle().v(), new byte[0], getBiggestKey());
size = 0; size = 0;
rocksDB.write(WRITE_OPTIONS, wb); rocksDB.write(wo, wb);
} catch (RocksDBException e) { } catch (RocksDBException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
@ -199,8 +208,10 @@ public class HugePqPriorityQueue<T> implements PriorityQueue<T>, Reversable<Reve
public boolean remove(@NotNull T element) { public boolean remove(@NotNull T element) {
ensureThread(); ensureThread();
Objects.requireNonNull(element); Objects.requireNonNull(element);
try (var keyBuf = serializeKey(element)) { try (var readOptions = newReadOptions();
UpdateAtomicResultPrevious prev = (UpdateAtomicResultPrevious) rocksDB.updateAtomic(READ_OPTIONS, WRITE_OPTIONS, var writeOptions = newWriteOptions();
var keyBuf = serializeKey(element)) {
UpdateAtomicResultPrevious prev = (UpdateAtomicResultPrevious) rocksDB.updateAtomic(readOptions, writeOptions,
keyBuf, keyBuf,
this::reduceOrRemove, this::reduceOrRemove,
UpdateAtomicResultMode.PREVIOUS UpdateAtomicResultMode.PREVIOUS
@ -232,9 +243,9 @@ public class HugePqPriorityQueue<T> implements PriorityQueue<T>, Reversable<Reve
} }
private Flux<T> iterate(long skips, boolean reverse) { private Flux<T> iterate(long skips, boolean reverse) {
return Flux.<List<T>, RocksIteratorTuple>generate(() -> { return Flux.<List<T>, RocksIterWithReadOpts>generate(() -> {
var it = rocksDB.newRocksIterator(true, READ_OPTIONS, LLRange.all(), reverse); var readOptions = newReadOptions();
var rocksIterator = it.iterator(); var rocksIterator = rocksDB.newRocksIterator(true, readOptions, LLRange.all(), reverse);
if (reverse) { if (reverse) {
rocksIterator.seekToLast(); rocksIterator.seekToLast();
} else { } else {
@ -249,9 +260,9 @@ public class HugePqPriorityQueue<T> implements PriorityQueue<T>, Reversable<Reve
} }
skipsDone++; skipsDone++;
} }
return it; return new RocksIterWithReadOpts(readOptions, rocksIterator);
}, (itT, sink) -> { }, (t, sink) -> {
var rocksIterator = itT.iterator(); var rocksIterator = t.iter();
if (rocksIterator.isValid()) { if (rocksIterator.isValid()) {
try (var keyBuf = rocksDB.getAllocator().copyOf(rocksIterator.key()); try (var keyBuf = rocksDB.getAllocator().copyOf(rocksIterator.key());
var valBuf = rocksDB.getAllocator().copyOf(rocksIterator.value())) { var valBuf = rocksDB.getAllocator().copyOf(rocksIterator.value())) {
@ -284,8 +295,8 @@ public class HugePqPriorityQueue<T> implements PriorityQueue<T>, Reversable<Reve
sink.complete(); sink.complete();
} }
return itT; return t;
}, RocksIteratorTuple::close).concatMapIterable(item -> item); }, RocksIterWithReadOpts::close).concatMapIterable(item -> item);
} }
@Override @Override

View File

@ -18,6 +18,7 @@ module dbengine {
opens it.cavallium.dbengine.database.remote; opens it.cavallium.dbengine.database.remote;
exports it.cavallium.dbengine; exports it.cavallium.dbengine;
exports it.cavallium.dbengine.utils; exports it.cavallium.dbengine.utils;
exports it.cavallium.dbengine.database.disk.rocksdb;
requires org.jetbrains.annotations; requires org.jetbrains.annotations;
requires reactor.core; requires reactor.core;
requires com.google.common; requires com.google.common;