From c9a12760bcb40ff23f68e20d0e634b2244c59f61 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 12 May 2022 19:14:27 +0200 Subject: [PATCH] Trace leaks --- .../cavallium/dbengine/database/LLUtils.java | 26 +- .../dbengine/database/SafeCloseable.java | 2 +- .../database/disk/AbstractRocksDBColumn.java | 236 ++++--- .../database/disk/CappedWriteBatch.java | 5 +- .../dbengine/database/disk/HugePqEnv.java | 21 +- .../database/disk/LLLocalDictionary.java | 645 ++++++------------ .../LLLocalEntryReactiveRocksIterator.java | 3 +- ...ocalGroupedEntryReactiveRocksIterator.java | 3 +- ...LLocalGroupedKeyReactiveRocksIterator.java | 3 +- .../LLLocalGroupedReactiveRocksIterator.java | 9 +- ...LLLocalKeyPrefixReactiveRocksIterator.java | 9 +- .../disk/LLLocalKeyReactiveRocksIterator.java | 3 +- .../disk/LLLocalKeyValueDatabase.java | 69 +- ...LLLocalMigrationReactiveRocksIterator.java | 14 +- .../disk/LLLocalReactiveRocksIterator.java | 7 +- .../database/disk/LLLocalSingleton.java | 56 +- .../database/disk/LLTempHugePqEnv.java | 10 +- .../disk/OptimisticRocksDBColumn.java | 36 +- .../disk/PessimisticRocksDBColumn.java | 33 +- .../database/disk/ReleasableSlice.java | 13 - .../dbengine/database/disk/RocksDBColumn.java | 41 +- .../dbengine/database/disk/RocksDBUtils.java | 9 +- .../database/disk/RocksIterWithReadOpts.java | 4 +- .../database/disk/RocksIteratorTuple.java | 23 - .../database/disk/StandardRocksDBColumn.java | 15 +- .../disk/rocksdb/LLAbstractSlice.java | 58 ++ .../disk/rocksdb/LLColumnFamilyHandle.java | 60 ++ .../disk/rocksdb/LLCompactionOptions.java | 59 ++ .../database/disk/rocksdb/LLDirectSlice.java | 29 + .../database/disk/rocksdb/LLReadOptions.java | 60 ++ .../database/disk/rocksdb/LLWriteOptions.java | 55 ++ .../RocksIteratorObj.java} | 161 ++++- .../database/disk/rocksdb/RocksObj.java | 62 ++ .../dbengine/lucene/HugePqArray.java | 25 +- .../dbengine/lucene/HugePqPriorityQueue.java | 89 +-- src/main/java/module-info.java | 1 + 36 files changed, 1155 insertions(+), 799 deletions(-) delete mode 100644 src/main/java/it/cavallium/dbengine/database/disk/ReleasableSlice.java delete mode 100644 src/main/java/it/cavallium/dbengine/database/disk/RocksIteratorTuple.java create mode 100644 src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLAbstractSlice.java create mode 100644 src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLColumnFamilyHandle.java create mode 100644 src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLCompactionOptions.java create mode 100644 src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLDirectSlice.java create mode 100644 src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLReadOptions.java create mode 100644 src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLWriteOptions.java rename src/main/java/it/cavallium/dbengine/database/disk/{RocksDBIterator.java => rocksdb/RocksIteratorObj.java} (51%) create mode 100644 src/main/java/it/cavallium/dbengine/database/disk/rocksdb/RocksObj.java diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 24511ea..b90de45 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -16,10 +16,11 @@ import io.netty5.buffer.api.Send; import io.netty5.buffer.api.WritableComponent; import io.netty5.buffer.api.internal.Statics; import io.netty5.util.IllegalReferenceCountException; -import it.cavallium.dbengine.database.disk.RocksIteratorTuple; import it.cavallium.dbengine.database.disk.UpdateAtomicResultCurrent; import it.cavallium.dbengine.database.disk.UpdateAtomicResultDelta; import it.cavallium.dbengine.database.disk.UpdateAtomicResultPrevious; +import it.cavallium.dbengine.database.disk.rocksdb.RocksIteratorObj; +import it.cavallium.dbengine.database.disk.rocksdb.RocksObj; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.lucene.RandomSortField; @@ -86,6 +87,7 @@ public class LLUtils { public static final byte[][] LEXICONOGRAPHIC_ITERATION_SEEKS = new byte[256][1]; public static final AtomicBoolean hookRegistered = new AtomicBoolean(); public static final boolean MANUAL_READAHEAD = false; + public static final boolean ALLOW_STATIC_OPTIONS = false; public static final boolean FORCE_DISABLE_CHECKSUM_VERIFICATION = Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.checksum.disable.force", "false")); @@ -729,26 +731,26 @@ public class LLUtils { * @param smallRange true if the range is small * @return the passed instance of ReadOptions, or a new one if the passed readOptions is null */ - public static ReadOptions generateCustomReadOptions(@Nullable ReadOptions readOptions, + public static RocksObj generateCustomReadOptions(@Nullable RocksObj readOptions, boolean canFillCache, boolean boundedRange, boolean smallRange) { if (readOptions == null) { //noinspection resource - readOptions = new ReadOptions(); + readOptions = new RocksObj<>(new ReadOptions()); } if (boundedRange || smallRange) { - readOptions.setFillCache(canFillCache); + readOptions.v().setFillCache(canFillCache); } else { - if (readOptions.readaheadSize() <= 0) { - readOptions.setReadaheadSize(4 * 1024 * 1024); // 4MiB + if (readOptions.v().readaheadSize() <= 0) { + readOptions.v().setReadaheadSize(4 * 1024 * 1024); // 4MiB } - readOptions.setFillCache(false); - readOptions.setVerifyChecksums(false); + readOptions.v().setFillCache(false); + readOptions.v().setVerifyChecksums(false); } if (FORCE_DISABLE_CHECKSUM_VERIFICATION) { - readOptions.setVerifyChecksums(false); + readOptions.v().setVerifyChecksums(false); } return readOptions; @@ -1012,8 +1014,10 @@ public class LLUtils { iterable.forEach(LLUtils::onNextDropped); } else if (next instanceof SafeCloseable safeCloseable) { safeCloseable.close(); - } else if (next instanceof RocksIteratorTuple iteratorTuple) { - iteratorTuple.close(); + } else if (next instanceof RocksIteratorObj rocksIteratorObj) { + rocksIteratorObj.close(); + } else if (next instanceof RocksObj rocksObj) { + rocksObj.close(); } else if (next instanceof UpdateAtomicResultDelta delta) { delta.delta().close(); } else if (next instanceof UpdateAtomicResultCurrent cur) { diff --git a/src/main/java/it/cavallium/dbengine/database/SafeCloseable.java b/src/main/java/it/cavallium/dbengine/database/SafeCloseable.java index 565726c..4372651 100644 --- a/src/main/java/it/cavallium/dbengine/database/SafeCloseable.java +++ b/src/main/java/it/cavallium/dbengine/database/SafeCloseable.java @@ -1,6 +1,6 @@ package it.cavallium.dbengine.database; -public interface SafeCloseable extends AutoCloseable { +public interface SafeCloseable extends io.netty5.util.SafeCloseable { @Override void close(); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java index 095255e..ce2dc1b 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java @@ -19,21 +19,14 @@ import io.netty5.buffer.api.WritableComponent; import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.RepeatedElementList; -import it.cavallium.dbengine.database.SafeCloseable; -import it.cavallium.dbengine.database.disk.LLLocalDictionary.ReleasableSliceImplWithRelease; -import it.cavallium.dbengine.database.disk.LLLocalDictionary.ReleasableSliceImplWithoutRelease; +import it.cavallium.dbengine.database.disk.rocksdb.RocksIteratorObj; +import it.cavallium.dbengine.database.disk.rocksdb.RocksObj; import it.cavallium.dbengine.database.serialization.SerializationException; -import it.cavallium.dbengine.rpc.current.data.Column; -import it.cavallium.dbengine.rpc.current.data.NamedColumnOptions; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.List; -import java.util.Objects; -import java.util.Optional; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.StampedLock; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -42,18 +35,17 @@ import org.jetbrains.annotations.Nullable; import org.rocksdb.AbstractSlice; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.CompactRangeOptions; -import org.rocksdb.CompactionOptions; import org.rocksdb.DirectSlice; import org.rocksdb.FlushOptions; import org.rocksdb.Holder; import org.rocksdb.KeyMayExist.KeyMayExistEnum; -import org.rocksdb.LevelMetaData; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; +import org.rocksdb.RocksObject; import org.rocksdb.Slice; -import org.rocksdb.SstFileMetaData; import org.rocksdb.Transaction; +import org.rocksdb.TransactionOptions; import org.rocksdb.WriteBatch; import org.rocksdb.WriteOptions; import reactor.core.scheduler.Schedulers; @@ -73,7 +65,7 @@ public sealed abstract class AbstractRocksDBColumn implements private final T db; private final boolean nettyDirect; private final BufferAllocator alloc; - private final ColumnFamilyHandle cfh; + private final RocksObj cfh; protected final MeterRegistry meterRegistry; protected final StampedLock closeLock; @@ -108,7 +100,7 @@ public sealed abstract class AbstractRocksDBColumn implements boolean nettyDirect, BufferAllocator alloc, String databaseName, - ColumnFamilyHandle cfh, + RocksObj cfh, MeterRegistry meterRegistry, StampedLock closeLock) { this.db = db; @@ -117,7 +109,7 @@ public sealed abstract class AbstractRocksDBColumn implements this.cfh = cfh; String columnName; try { - columnName = new String(cfh.getName(), StandardCharsets.UTF_8); + columnName = new String(cfh.v().getName(), StandardCharsets.UTF_8); } catch (RocksDBException e) { throw new IllegalStateException(e); } @@ -254,81 +246,59 @@ public sealed abstract class AbstractRocksDBColumn implements /** * This method should not modify or move the writerIndex/readerIndex of the key */ - static ReleasableSlice setIterateBound(boolean allowNettyDirect, - ReadOptions readOpts, IterateBound boundType, Buffer key) { + static RocksObj> setIterateBound(boolean allowNettyDirect, + RocksObj readOpts, IterateBound boundType, Buffer key) { requireNonNull(key); - AbstractSlice slice; + RocksObj> slice; if (allowNettyDirect && USE_DIRECT_BUFFER_BOUNDS && isReadOnlyDirect(key)) { ByteBuffer keyInternalByteBuffer = ((ReadableComponent) key).readableBuffer(); assert keyInternalByteBuffer.position() == 0; - slice = new DirectSlice(keyInternalByteBuffer, key.readableBytes()); - assert slice.size() == key.readableBytes(); + slice = new RocksObj<>(new DirectSlice(keyInternalByteBuffer, key.readableBytes())); + assert slice.v().size() == key.readableBytes(); } else { - slice = new Slice(requireNonNull(LLUtils.toArray(key))); + slice = new RocksObj<>(new Slice(requireNonNull(LLUtils.toArray(key)))); } if (boundType == IterateBound.LOWER) { - readOpts.setIterateLowerBound(slice); + readOpts.v().setIterateLowerBound(slice.v()); } else { - readOpts.setIterateUpperBound(slice); + readOpts.v().setIterateUpperBound(slice.v()); } - return new ReleasableSliceImplWithRelease(slice); + return slice; } - static ReleasableSlice emptyReleasableSlice() { + static RocksObj newEmptyReleasableSlice() { var arr = new byte[0]; - return new ReleasableSliceImplWithRelease(new Slice(arr)); + return new RocksObj<>(new Slice(arr)); } /** * This method should not modify or move the writerIndex/readerIndex of the buffers inside the range */ @NotNull - public RocksIteratorTuple newRocksIterator(boolean allowNettyDirect, - ReadOptions readOptions, + public RocksIteratorObj newRocksIterator(boolean allowNettyDirect, + RocksObj readOptions, LLRange range, boolean reverse) throws RocksDBException { assert !Schedulers.isInNonBlockingThread() : "Called getRocksIterator in a nonblocking thread"; - ReleasableSlice sliceMin; - ReleasableSlice sliceMax; - if (range.hasMin()) { - sliceMin = setIterateBound(allowNettyDirect, readOptions, IterateBound.LOWER, range.getMinUnsafe()); - } else { - sliceMin = emptyReleasableSlice(); - } - if (range.hasMax()) { - sliceMax = setIterateBound(allowNettyDirect, readOptions, IterateBound.UPPER, range.getMaxUnsafe()); - } else { - sliceMax = emptyReleasableSlice(); - } - SafeCloseable seekFromOrTo = null; - var rocksIterator = this.newIterator(readOptions); + var rocksIterator = this.newIterator(readOptions, range.getMinUnsafe(), range.getMaxUnsafe()); try { if (reverse) { if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMax()) { - seekFromOrTo = Objects.requireNonNullElseGet(rocksIterator.seekFrom(range.getMaxUnsafe()), - () -> ((SafeCloseable) () -> {})); + rocksIterator.seekFrom(range.getMaxUnsafe()); } else { - seekFromOrTo = () -> {}; rocksIterator.seekToLast(); } } else { if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) { - seekFromOrTo = Objects.requireNonNullElseGet(rocksIterator.seekTo(range.getMinUnsafe()), - () -> ((SafeCloseable) () -> {})); + rocksIterator.seekTo(range.getMinUnsafe()); } else { - seekFromOrTo = () -> {}; rocksIterator.seekToFirst(); } } - return new RocksIteratorTuple(rocksIterator, sliceMin, sliceMax, seekFromOrTo); + return rocksIterator; } catch (Throwable ex) { rocksIterator.close(); - sliceMax.close(); - sliceMax.close(); - if (seekFromOrTo != null) { - seekFromOrTo.close(); - } throw ex; } } @@ -337,7 +307,7 @@ public sealed abstract class AbstractRocksDBColumn implements return db; } - protected ColumnFamilyHandle getCfh() { + protected RocksObj getCfh() { return cfh; } @@ -345,12 +315,23 @@ public sealed abstract class AbstractRocksDBColumn implements RocksDBUtils.ensureOpen(db, cfh); } - protected void ensureOwned(org.rocksdb.RocksObject rocksObject) { + protected void ensureOwned(RocksObj rocksObject) { RocksDBUtils.ensureOwned(rocksObject); } + + protected void ensureOwned(RocksObject rocksObject) { + RocksDBUtils.ensureOwned(rocksObject); + } + + protected void ensureOwned(Buffer buffer) { + if (buffer != null && !buffer.isAccessible()) { + throw new IllegalStateException("Buffer is not accessible"); + } + } + @Override - public @Nullable Buffer get(@NotNull ReadOptions readOptions, Buffer key) throws RocksDBException { + public @Nullable Buffer get(@NotNull RocksObj readOptions, Buffer key) throws RocksDBException { var closeReadLock = closeLock.readLock(); try { ensureOpen(); @@ -385,7 +366,7 @@ public sealed abstract class AbstractRocksDBColumn implements assert resultBuffer.writerOffset() == 0; var resultWritable = ((WritableComponent) resultBuffer).writableBuffer(); - var keyMayExist = db.keyMayExist(cfh, readOptions, keyNioBuffer.rewind(), + var keyMayExist = db.keyMayExist(cfh.v(), readOptions.v(), keyNioBuffer.rewind(), resultWritable.clear()); KeyMayExistEnum keyMayExistState = keyMayExist.exists; int keyMayExistValueLength = keyMayExist.valueLength; @@ -416,7 +397,7 @@ public sealed abstract class AbstractRocksDBColumn implements resultWritable.clear(); readAttemptsCount++; // real data size - size = db.get(cfh, readOptions, keyNioBuffer.rewind(), resultWritable.clear()); + size = db.get(cfh.v(), readOptions.v(), keyNioBuffer.rewind(), resultWritable.clear()); if (size == RocksDB.NOT_FOUND) { resultBuffer.close(); readValueNotFoundWithMayExistBloomBufferSize.record(0); @@ -442,7 +423,7 @@ public sealed abstract class AbstractRocksDBColumn implements assert resultBuffer.writerOffset() == 0; readAttemptsCount++; - size = db.get(cfh, readOptions, keyNioBuffer.rewind(), resultWritable.clear()); + size = db.get(cfh.v(), readOptions.v(), keyNioBuffer.rewind(), resultWritable.clear()); if (size == RocksDB.NOT_FOUND) { readValueNotFoundWithMayExistBloomBufferSize.record(0); resultBuffer.close(); @@ -471,7 +452,7 @@ public sealed abstract class AbstractRocksDBColumn implements byte[] keyArray = LLUtils.toArray(key); requireNonNull(keyArray); Holder data = new Holder<>(); - if (db.keyMayExist(cfh, readOptions, keyArray, data)) { + if (db.keyMayExist(cfh.v(), readOptions.v(), keyArray, data)) { // todo: "data.getValue().length > 0" is checked because keyMayExist is broken, and sometimes it // returns an empty array, as if it exists if (data.getValue() != null && data.getValue().length > 0) { @@ -479,7 +460,7 @@ public sealed abstract class AbstractRocksDBColumn implements return LLUtils.fromByteArray(alloc, data.getValue()); } else { readAttemptsCount++; - byte[] result = db.get(cfh, readOptions, keyArray); + byte[] result = db.get(cfh.v(), readOptions.v(), keyArray); if (result == null) { if (data.getValue() != null) { readValueNotFoundWithBloomBufferSize.record(0); @@ -506,7 +487,7 @@ public sealed abstract class AbstractRocksDBColumn implements } @Override - public void put(@NotNull WriteOptions writeOptions, Buffer key, Buffer value) throws RocksDBException { + public void put(@NotNull RocksObj writeOptions, Buffer key, Buffer value) throws RocksDBException { var closeReadLock = closeLock.readLock(); try { ensureOpen(); @@ -553,7 +534,7 @@ public sealed abstract class AbstractRocksDBColumn implements } try { - db.put(cfh, writeOptions, keyNioBuffer, valueNioBuffer); + db.put(cfh.v(), writeOptions.v(), keyNioBuffer, valueNioBuffer); } finally { if (mustCloseValue) { value.close(); @@ -565,7 +546,7 @@ public sealed abstract class AbstractRocksDBColumn implements } } } else { - db.put(cfh, writeOptions, LLUtils.toArray(key), LLUtils.toArray(value)); + db.put(cfh.v(), writeOptions.v(), LLUtils.toArray(key), LLUtils.toArray(value)); } } finally { closeLock.unlockRead(closeReadLock); @@ -573,7 +554,7 @@ public sealed abstract class AbstractRocksDBColumn implements } @Override - public boolean exists(@NotNull ReadOptions readOptions, Buffer key) throws RocksDBException { + public boolean exists(@NotNull RocksObj readOptions, Buffer key) throws RocksDBException { var closeReadLock = closeLock.readLock(); try { ensureOpen(); @@ -597,8 +578,8 @@ public sealed abstract class AbstractRocksDBColumn implements assert keyNioBuffer.limit() == key.readableBytes(); } try { - if (db.keyMayExist(cfh, keyNioBuffer)) { - int size = db.get(cfh, readOptions, keyNioBuffer.position(0), LLUtils.EMPTY_BYTE_BUFFER); + if (db.keyMayExist(cfh.v(), keyNioBuffer)) { + int size = db.get(cfh.v(), readOptions.v(), keyNioBuffer.position(0), LLUtils.EMPTY_BYTE_BUFFER); boolean found = size != RocksDB.NOT_FOUND; if (found) { readValueFoundWithBloomSimpleBufferSize.record(size); @@ -621,12 +602,12 @@ public sealed abstract class AbstractRocksDBColumn implements byte[] keyBytes = LLUtils.toArray(key); Holder data = new Holder<>(); boolean mayExistHit = false; - if (db.keyMayExist(cfh, readOptions, keyBytes, data)) { + if (db.keyMayExist(cfh.v(), readOptions.v(), keyBytes, data)) { mayExistHit = true; if (data.getValue() != null) { size = data.getValue().length; } else { - size = db.get(cfh, readOptions, keyBytes, NO_DATA); + size = db.get(cfh.v(), readOptions.v(), keyBytes, NO_DATA); } } boolean found = size != RocksDB.NOT_FOUND; @@ -647,7 +628,7 @@ public sealed abstract class AbstractRocksDBColumn implements } @Override - public boolean mayExists(@NotNull ReadOptions readOptions, Buffer key) throws RocksDBException { + public boolean mayExists(@NotNull RocksObj readOptions, Buffer key) throws RocksDBException { var closeReadLock = closeLock.readLock(); try { ensureOpen(); @@ -671,7 +652,7 @@ public sealed abstract class AbstractRocksDBColumn implements assert keyNioBuffer.limit() == key.readableBytes(); } try { - return db.keyMayExist(cfh, keyNioBuffer); + return db.keyMayExist(cfh.v(), readOptions.v(), keyNioBuffer); } finally { if (mustCloseKey) { key.close(); @@ -679,7 +660,7 @@ public sealed abstract class AbstractRocksDBColumn implements } } else { byte[] keyBytes = LLUtils.toArray(key); - return db.keyMayExist(cfh, readOptions, keyBytes, null); + return db.keyMayExist(cfh.v(), readOptions.v(), keyBytes, null); } } finally { closeLock.unlockRead(closeReadLock); @@ -687,7 +668,7 @@ public sealed abstract class AbstractRocksDBColumn implements } @Override - public void delete(WriteOptions writeOptions, Buffer key) throws RocksDBException { + public void delete(RocksObj writeOptions, Buffer key) throws RocksDBException { var closeReadLock = closeLock.readLock(); try { ensureOpen(); @@ -712,14 +693,14 @@ public sealed abstract class AbstractRocksDBColumn implements assert keyNioBuffer.limit() == key.readableBytes(); } try { - db.delete(cfh, writeOptions, keyNioBuffer); + db.delete(cfh.v(), writeOptions.v(), keyNioBuffer); } finally { if (mustCloseKey) { key.close(); } } } else { - db.delete(cfh, writeOptions, LLUtils.toArray(key)); + db.delete(cfh.v(), writeOptions.v(), LLUtils.toArray(key)); } } finally { closeLock.unlockRead(closeReadLock); @@ -727,20 +708,20 @@ public sealed abstract class AbstractRocksDBColumn implements } @Override - public void delete(WriteOptions writeOptions, byte[] key) throws RocksDBException { + public void delete(RocksObj writeOptions, byte[] key) throws RocksDBException { var closeReadLock = closeLock.readLock(); try { ensureOpen(); ensureOwned(writeOptions); keyBufferSize.record(key.length); - db.delete(cfh, writeOptions, key); + db.delete(cfh.v(), writeOptions.v(), key); } finally { closeLock.unlockRead(closeReadLock); } } @Override - public List multiGetAsList(ReadOptions readOptions, List keys) throws RocksDBException { + public List multiGetAsList(RocksObj readOptions, List keys) throws RocksDBException { var closeReadLock = closeLock.readLock(); try { ensureOpen(); @@ -748,8 +729,8 @@ public sealed abstract class AbstractRocksDBColumn implements for (byte[] key : keys) { keyBufferSize.record(key.length); } - var columnFamilyHandles = new RepeatedElementList<>(cfh, keys.size()); - return db.multiGetAsList(readOptions, columnFamilyHandles, keys); + var columnFamilyHandles = new RepeatedElementList<>(cfh.v(), keys.size()); + return db.multiGetAsList(readOptions.v(), columnFamilyHandles, keys); } finally { closeLock.unlockRead(closeReadLock); } @@ -760,31 +741,31 @@ public sealed abstract class AbstractRocksDBColumn implements var closeReadLock = closeLock.readLock(); try { ensureOpen(); - db.suggestCompactRange(cfh); + db.suggestCompactRange(cfh.v()); } finally { closeLock.unlockRead(closeReadLock); } } @Override - public void compactRange(byte[] begin, byte[] end, CompactRangeOptions options) throws RocksDBException { + public void compactRange(byte[] begin, byte[] end, RocksObj options) throws RocksDBException { var closeReadLock = closeLock.readLock(); try { ensureOpen(); ensureOwned(options); - db.compactRange(cfh, begin, end, options); + db.compactRange(cfh.v(), begin, end, options.v()); } finally { closeLock.unlockRead(closeReadLock); } } @Override - public void flush(FlushOptions options) throws RocksDBException { + public void flush(RocksObj options) throws RocksDBException { var closeReadLock = closeLock.readLock(); try { ensureOpen(); ensureOwned(options); - db.flush(options, cfh); + db.flush(options.v(), cfh.v()); } finally { closeLock.unlockRead(closeReadLock); } @@ -806,20 +787,20 @@ public sealed abstract class AbstractRocksDBColumn implements var closeReadLock = closeLock.readLock(); try { ensureOpen(); - return db.getLongProperty(cfh, property); + return db.getLongProperty(cfh.v(), property); } finally { closeLock.unlockRead(closeReadLock); } } @Override - public void write(WriteOptions writeOptions, WriteBatch writeBatch) throws RocksDBException { + public void write(RocksObj writeOptions, WriteBatch writeBatch) throws RocksDBException { var closeReadLock = closeLock.readLock(); try { ensureOpen(); ensureOwned(writeOptions); ensureOwned(writeBatch); - db.write(writeOptions, writeBatch); + db.write(writeOptions.v(), writeBatch); } finally { closeLock.unlockRead(closeReadLock); } @@ -828,13 +809,14 @@ public sealed abstract class AbstractRocksDBColumn implements /** * @return true if committed successfully */ - protected abstract boolean commitOptimistically(Transaction tx) throws RocksDBException; + protected abstract boolean commitOptimistically(RocksObj tx) throws RocksDBException; - protected abstract Transaction beginTransaction(@NotNull WriteOptions writeOptions); + protected abstract RocksObj beginTransaction(@NotNull RocksObj writeOptions, + RocksObj txOpts); @Override - public final @NotNull UpdateAtomicResult updateAtomic(@NotNull ReadOptions readOptions, - @NotNull WriteOptions writeOptions, + public final @NotNull UpdateAtomicResult updateAtomic(@NotNull RocksObj readOptions, + @NotNull RocksObj writeOptions, Buffer key, BinarySerializationFunction updater, UpdateAtomicResultMode returnMode) throws IOException { @@ -876,32 +858,66 @@ public sealed abstract class AbstractRocksDBColumn implements timer.record(duration, TimeUnit.NANOSECONDS); } - protected abstract @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull ReadOptions readOptions, - @NotNull WriteOptions writeOptions, + protected abstract @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull RocksObj readOptions, + @NotNull RocksObj writeOptions, Buffer key, BinarySerializationFunction updater, UpdateAtomicResultMode returnMode) throws IOException; @Override @NotNull - public RocksDBIterator newIterator(@NotNull ReadOptions readOptions) { + public RocksIteratorObj newIterator(@NotNull RocksObj readOptions, + @Nullable Buffer min, + @Nullable Buffer max) { var closeReadLock = closeLock.readLock(); try { ensureOpen(); ensureOwned(readOptions); - var it = db.newIterator(cfh, readOptions); + ensureOwned(min); + ensureOwned(max); + RocksObj> sliceMin; + RocksObj> sliceMax; + if (min != null) { + sliceMin = setIterateBound(nettyDirect, readOptions, IterateBound.LOWER, min); + } else { + sliceMin = null; + } try { - return new RocksDBIterator(it, - nettyDirect, - this.startedIterSeek, - this.endedIterSeek, - this.iterSeekTime, - this.startedIterNext, - this.endedIterNext, - this.iterNextTime - ); + if (max != null) { + sliceMax = setIterateBound(nettyDirect, readOptions, IterateBound.UPPER, max); + } else { + sliceMax = null; + } + try { + var it = db.newIterator(cfh.v(), readOptions.v()); + try { + return new RocksIteratorObj(it, + sliceMin, + sliceMax, + min, + max, + nettyDirect, + this.startedIterSeek, + this.endedIterSeek, + this.iterSeekTime, + this.startedIterNext, + this.endedIterNext, + this.iterNextTime + ); + } catch (Throwable ex) { + it.close(); + throw ex; + } + } catch (Throwable ex) { + if (sliceMax != null) { + sliceMax.close(); + } + throw ex; + } } catch (Throwable ex) { - it.close(); + if (sliceMin != null) { + sliceMin.close(); + } throw ex; } } finally { @@ -927,7 +943,7 @@ public sealed abstract class AbstractRocksDBColumn implements var closeReadLock = closeLock.readLock(); try { ensureOpen(); - return RocksDBUtils.getLevels(db, cfh); + return RocksDBUtils.getLevels(db, cfh.v()); } finally { closeLock.unlockRead(closeReadLock); } @@ -938,14 +954,14 @@ public sealed abstract class AbstractRocksDBColumn implements var closeReadLock = closeLock.readLock(); try { ensureOpen(); - RocksDBUtils.forceCompaction(db, db.getName(), cfh, volumeId, logger); + RocksDBUtils.forceCompaction(db, db.getName(), cfh.v(), volumeId, logger); } finally { closeLock.unlockRead(closeReadLock); } } @Override - public ColumnFamilyHandle getColumnFamilyHandle() { + public RocksObj getColumnFamilyHandle() { return cfh; } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/CappedWriteBatch.java b/src/main/java/it/cavallium/dbengine/database/disk/CappedWriteBatch.java index 40534de..fbe177b 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/CappedWriteBatch.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/CappedWriteBatch.java @@ -10,6 +10,7 @@ import io.netty5.buffer.api.Send; import io.netty5.util.internal.PlatformDependent; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.RocksDBColumn; +import it.cavallium.dbengine.database.disk.rocksdb.RocksObj; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -27,7 +28,7 @@ public class CappedWriteBatch extends WriteBatch { private final RocksDBColumn db; private final BufferAllocator alloc; private final int cap; - private final WriteOptions writeOptions; + private final RocksObj writeOptions; private final List buffersToRelease; private final List byteBuffersToRelease; @@ -41,7 +42,7 @@ public class CappedWriteBatch extends WriteBatch { int cap, int reservedWriteBatchSize, long maxWriteBatchSize, - WriteOptions writeOptions) { + RocksObj writeOptions) { super(reservedWriteBatchSize); this.db = db; this.alloc = alloc; diff --git a/src/main/java/it/cavallium/dbengine/database/disk/HugePqEnv.java b/src/main/java/it/cavallium/dbengine/database/disk/HugePqEnv.java index c684ce2..c469bdd 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/HugePqEnv.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/HugePqEnv.java @@ -5,6 +5,7 @@ import static it.cavallium.dbengine.database.disk.LLTempHugePqEnv.getColumnOptio import com.google.common.primitives.Ints; import io.micrometer.core.instrument.composite.CompositeMeterRegistry; import io.netty5.buffer.api.BufferAllocator; +import it.cavallium.dbengine.database.disk.rocksdb.RocksObj; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import java.io.Closeable; @@ -23,18 +24,18 @@ import org.rocksdb.RocksDBException; public class HugePqEnv implements Closeable { private final RocksDB db; - private final ArrayList defaultCfh; - private final Int2ObjectMap cfhs = new Int2ObjectOpenHashMap<>(); + private final ArrayList> defaultCfh; + private final Int2ObjectMap> cfhs = new Int2ObjectOpenHashMap<>(); - public HugePqEnv(RocksDB db, ArrayList defaultCfh) { + public HugePqEnv(RocksDB db, ArrayList> defaultCfh) { this.db = db; this.defaultCfh = defaultCfh; } @Override public void close() throws IOException { - for (ColumnFamilyHandle cfh : defaultCfh) { - db.destroyColumnFamilyHandle(cfh); + for (var cfh : defaultCfh) { + db.destroyColumnFamilyHandle(cfh.v()); cfh.close(); } try { @@ -45,7 +46,7 @@ public class HugePqEnv implements Closeable { } public int createColumnFamily(int name, AbstractComparator comparator) throws RocksDBException { - var cfh = db.createColumnFamily(new ColumnFamilyDescriptor(Ints.toByteArray(name), getColumnOptions(comparator))); + var cfh = new RocksObj<>(db.createColumnFamily(new ColumnFamilyDescriptor(Ints.toByteArray(name), getColumnOptions(comparator)))); synchronized (cfhs) { var prev = cfhs.put(name, cfh); if (prev != null) { @@ -56,19 +57,19 @@ public class HugePqEnv implements Closeable { } public void deleteColumnFamily(int db) throws RocksDBException { - ColumnFamilyHandle cfh; + RocksObj cfh; synchronized (cfhs) { cfh = cfhs.remove(db); } if (cfh != null) { - this.db.dropColumnFamily(cfh); - this.db.destroyColumnFamilyHandle(cfh); + this.db.dropColumnFamily(cfh.v()); + this.db.destroyColumnFamilyHandle(cfh.v()); cfh.close(); } } public StandardRocksDBColumn openDb(int hugePqId) { - ColumnFamilyHandle cfh; + RocksObj cfh; synchronized (cfhs) { cfh = Objects.requireNonNull(cfhs.get(hugePqId), () -> "column " + hugePqId + " does not exist"); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index ea5e912..268d9b7 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -1,6 +1,7 @@ package it.cavallium.dbengine.database.disk; import static io.netty5.buffer.api.StandardAllocationTypes.OFF_HEAP; +import static it.cavallium.dbengine.database.LLUtils.ALLOW_STATIC_OPTIONS; import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB; import static it.cavallium.dbengine.database.LLUtils.fromByteArray; import static it.cavallium.dbengine.database.LLUtils.isBoundedRange; @@ -14,6 +15,7 @@ import io.micrometer.core.instrument.Timer; import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.BufferAllocator; import io.netty5.buffer.api.ReadableComponent; +import io.netty5.buffer.api.Resource; import io.netty5.buffer.api.Send; import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.database.ColumnUtils; @@ -27,6 +29,7 @@ import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.SafeCloseable; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateReturnMode; +import it.cavallium.dbengine.database.disk.rocksdb.RocksObj; import it.cavallium.dbengine.database.serialization.KVSerializationFunction; import it.cavallium.dbengine.rpc.current.data.DatabaseOptions; import java.io.IOException; @@ -46,6 +49,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.util.Supplier; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.rocksdb.AbstractNativeReference; import org.rocksdb.AbstractSlice; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.CompactRangeOptions; @@ -73,9 +77,7 @@ public class LLLocalDictionary implements LLDictionary { static final long MAX_WRITE_BATCH_SIZE = 1024L * 1024L * 1024L; // 1GiB static final int CAPPED_WRITE_BATCH_CAP = 50000; // 50K operations static final int MULTI_GET_WINDOW = 16; - private static final ReadOptions EMPTY_READ_OPTIONS = new ReadOptions(); - private static final WriteOptions EMPTY_WRITE_OPTIONS = new WriteOptions(); - private static final WriteOptions BATCH_WRITE_OPTIONS = new WriteOptions(); + private static final RocksObj EMPTY_READ_OPTIONS = LLUtils.ALLOW_STATIC_OPTIONS ? new RocksObj<>(new ReadOptions()) : null; static final boolean PREFER_AUTO_SEEK_BOUND = false; /** * It used to be false, @@ -111,12 +113,12 @@ public class LLLocalDictionary implements LLDictionary { private static final ByteBuffer DUMMY_WRITE_ONLY_BYTE_BUFFER = ByteBuffer.allocateDirect(1024); private final RocksDBColumn db; - private final ColumnFamilyHandle cfh; + private final RocksObj cfh; private final String databaseName; private final String columnName; private final Scheduler dbWScheduler; private final Scheduler dbRScheduler; - private final Function snapshotResolver; + private final Function> snapshotResolver; private final UpdateMode updateMode; private final boolean nettyDirect; private final BufferAllocator alloc; @@ -143,7 +145,7 @@ public class LLLocalDictionary implements LLDictionary { String columnName, Scheduler dbWScheduler, Scheduler dbRScheduler, - Function snapshotResolver, + Function> snapshotResolver, UpdateMode updateMode, DatabaseOptions databaseOptions) { requireNonNull(db); @@ -211,29 +213,34 @@ public class LLLocalDictionary implements LLDictionary { } @NotNull - private ReadOptions generateReadOptionsOrStatic(LLSnapshot snapshot) { - return generateReadOptions(snapshot != null ? snapshotResolver.apply(snapshot) : null, true); + private RocksObj generateReadOptionsOrStatic(LLSnapshot snapshot) { + var resolved = generateReadOptions(snapshot != null ? snapshotResolver.apply(snapshot) : null, true); + if (resolved != null) { + return resolved; + } else { + return new RocksObj<>(new ReadOptions()); + } } @Nullable - private ReadOptions generateReadOptionsOrNull(LLSnapshot snapshot) { + private RocksObj generateReadOptionsOrNull(LLSnapshot snapshot) { return generateReadOptions(snapshot != null ? snapshotResolver.apply(snapshot) : null, false); } @NotNull - private ReadOptions generateReadOptionsOrNew(LLSnapshot snapshot) { + private RocksObj generateReadOptionsOrNew(LLSnapshot snapshot) { var result = generateReadOptions(snapshot != null ? snapshotResolver.apply(snapshot) : null, false); if (result != null) { return result; } else { - return new ReadOptions(); + return new RocksObj<>(new ReadOptions()); } } - private ReadOptions generateReadOptions(Snapshot snapshot, boolean orStaticOpts) { + private RocksObj generateReadOptions(RocksObj snapshot, boolean orStaticOpts) { if (snapshot != null) { - return new ReadOptions().setSnapshot(snapshot); - } else if (orStaticOpts) { + return new RocksObj<>(new ReadOptions().setSnapshot(snapshot.v())); + } else if (ALLOW_STATIC_OPTIONS && orStaticOpts) { return EMPTY_READ_OPTIONS; } else { return null; @@ -295,37 +302,15 @@ public class LLLocalDictionary implements LLDictionary { return !containsKey(snapshot, range.getSingleUnsafe()); } else { // Temporary resources to release after finished - AbstractSlice slice1 = null; - AbstractSlice slice2 = null; try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNull(snapshot), true, isBoundedRange(range), true )) { - readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); - readOpts.setFillCache(fillCache); - if (range.hasMin()) { - if (nettyDirect && isReadOnlyDirect(range.getMinUnsafe())) { - readOpts.setIterateLowerBound(slice1 = new DirectSlice( - ((ReadableComponent) range.getMinUnsafe()).readableBuffer(), - range.getMinUnsafe().readableBytes() - )); - } else { - readOpts.setIterateLowerBound(slice1 = new Slice(LLUtils.toArray(range.getMinUnsafe()))); - } - } - if (range.hasMax()) { - if (nettyDirect && isReadOnlyDirect(range.getMaxUnsafe())) { - readOpts.setIterateUpperBound(slice2 = new DirectSlice( - ((ReadableComponent) range.getMaxUnsafe()).readableBuffer(), - range.getMaxUnsafe().readableBytes() - )); - } else { - readOpts.setIterateUpperBound(slice2 = new Slice(LLUtils.toArray(range.getMaxUnsafe()))); - } - } - try (var rocksIterator = db.newIterator(readOpts)) { + readOpts.v().setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); + readOpts.v().setFillCache(fillCache); + try (var rocksIterator = db.newIterator(readOpts, range.getMinUnsafe(), range.getMaxUnsafe())) { if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) { if (nettyDirect && isReadOnlyDirect(range.getMinUnsafe())) { var seekBuf = ((ReadableComponent) range.getMinUnsafe()).readableBuffer(); @@ -339,13 +324,6 @@ public class LLLocalDictionary implements LLDictionary { } return !rocksIterator.isValid(); } - } finally { - if (slice1 != null) { - slice1.close(); - } - if (slice2 != null) { - slice2.close(); - } } } }); @@ -419,9 +397,9 @@ public class LLLocalDictionary implements LLDictionary { logger.trace(MARKER_ROCKSDB, "Writing {}: {}", varargs); } startedPut.increment(); - try { + try (var writeOptions = new RocksObj<>(new WriteOptions())) { putTime.recordCallable(() -> { - db.put(EMPTY_WRITE_OPTIONS, key, value); + db.put(writeOptions, key, value); return null; }); } catch (RocksDBException ex) { @@ -460,12 +438,16 @@ public class LLLocalDictionary implements LLDictionary { case GET_OLD_VALUE -> UpdateAtomicResultMode.PREVIOUS; }; UpdateAtomicResult result; + var readOptions = generateReadOptionsOrStatic(null); startedUpdates.increment(); - try { - result = updateTime.recordCallable(() -> db.updateAtomic(EMPTY_READ_OPTIONS, - EMPTY_WRITE_OPTIONS, key, updater, returnMode)); + try (var writeOptions = new RocksObj<>(new WriteOptions())) { + result = updateTime.recordCallable(() -> + db.updateAtomic(readOptions, writeOptions, key, updater, returnMode)); } finally { endedUpdates.increment(); + if (readOptions != EMPTY_READ_OPTIONS) { + readOptions.close(); + } } assert result != null; var previous = switch (updateReturnMode) { @@ -504,12 +486,16 @@ public class LLLocalDictionary implements LLDictionary { } UpdateAtomicResult result; + var readOptions = generateReadOptionsOrStatic(null); startedUpdates.increment(); - try { + try (var writeOptions = new RocksObj<>(new WriteOptions())) { result = updateTime.recordCallable(() -> - db.updateAtomic(EMPTY_READ_OPTIONS, EMPTY_WRITE_OPTIONS, key, updater, DELTA)); + db.updateAtomic(readOptions, writeOptions, key, updater, DELTA)); } finally { endedUpdates.increment(); + if (readOptions != EMPTY_READ_OPTIONS) { + readOptions.close(); + } } assert result != null; sink.next(((UpdateAtomicResultDelta) result).delta()); @@ -530,9 +516,9 @@ public class LLLocalDictionary implements LLDictionary { try (var key = keySend.receive()) { logger.trace(MARKER_ROCKSDB, "Deleting {}", () -> toStringSafe(key)); startedRemove.increment(); - try { + try (var writeOptions = new RocksObj<>(new WriteOptions())) { removeTime.recordCallable(() -> { - db.delete(EMPTY_WRITE_OPTIONS, key); + db.delete(writeOptions, key); return null; }); } finally { @@ -566,8 +552,16 @@ public class LLLocalDictionary implements LLDictionary { .handle((keySend, sink) -> { try (var key = keySend.receive()) { assert !Schedulers.isInNonBlockingThread() : "Called getPreviousData in a nonblocking thread"; - var result = db.get(EMPTY_READ_OPTIONS, key); - logger.trace(MARKER_ROCKSDB, "Reading {}: {}", () -> toStringSafe(key), () -> toStringSafe(result)); + Buffer result; + var readOptions = generateReadOptionsOrStatic(null); + try { + result = db.get(readOptions, key); + } finally { + if (readOptions != EMPTY_READ_OPTIONS) { + readOptions.close(); + } + } + logger.trace(MARKER_ROCKSDB, "Read {}: {}", () -> toStringSafe(key), () -> toStringSafe(result)); if (result == null) { sink.complete(); } else { @@ -638,34 +632,34 @@ public class LLLocalDictionary implements LLDictionary { for (Send entrySend : entriesWindowList) { entriesWindow.add(entrySend.receive()); } - try { + try (var writeOptions = new RocksObj<>(new WriteOptions())) { assert !Schedulers.isInNonBlockingThread() : "Called putMulti in a nonblocking thread"; if (USE_WRITE_BATCHES_IN_PUT_MULTI) { - var batch = new CappedWriteBatch(db, + try (var batch = new CappedWriteBatch(db, alloc, CAPPED_WRITE_BATCH_CAP, RESERVED_WRITE_BATCH_SIZE, MAX_WRITE_BATCH_SIZE, - BATCH_WRITE_OPTIONS - ); - for (LLEntry entry : entriesWindow) { - var k = entry.getKey(); - var v = entry.getValue(); - if (nettyDirect) { - batch.put(cfh, k, v); - } else { - try (var key = k.receive()) { - try (var value = v.receive()) { - batch.put(cfh, LLUtils.toArray(key), LLUtils.toArray(value)); + writeOptions + )) { + for (LLEntry entry : entriesWindow) { + var k = entry.getKey(); + var v = entry.getValue(); + if (nettyDirect) { + batch.put(cfh.v(), k, v); + } else { + try (var key = k.receive()) { + try (var value = v.receive()) { + batch.put(cfh.v(), LLUtils.toArray(key), LLUtils.toArray(value)); + } } } } + batch.writeToDbAndClose(); } - batch.writeToDbAndClose(); - batch.close(); } else { for (LLEntry entry : entriesWindow) { - db.put(EMPTY_WRITE_OPTIONS, entry.getKeyUnsafe(), entry.getValueUnsafe()); + db.put(writeOptions, entry.getKeyUnsafe(), entry.getValueUnsafe()); } } sink.next(true); @@ -690,7 +684,7 @@ public class LLLocalDictionary implements LLDictionary { for (Tuple2> tuple : ew) { entriesWindow.add(tuple.mapT2(Send::receive)); } - try { + try (var writeOptions = new RocksObj<>(new WriteOptions())) { if (Schedulers.isInNonBlockingThread()) { throw new UnsupportedOperationException("Called updateMulti in a nonblocking thread"); } @@ -758,30 +752,30 @@ public class LLLocalDictionary implements LLDictionary { } if (USE_WRITE_BATCHES_IN_PUT_MULTI) { - var batch = new CappedWriteBatch(db, + try (var batch = new CappedWriteBatch(db, alloc, CAPPED_WRITE_BATCH_CAP, RESERVED_WRITE_BATCH_SIZE, MAX_WRITE_BATCH_SIZE, - BATCH_WRITE_OPTIONS - ); - int i = 0; - for (Tuple2 entry : entriesWindow) { - try (var valueToWrite = updatedValuesToWrite.get(i)) { - if (valueToWrite == null) { - batch.delete(cfh, entry.getT2().send()); - } else { - batch.put(cfh, entry.getT2().send(), valueToWrite.send()); + writeOptions + )) { + int i = 0; + for (Tuple2 entry : entriesWindow) { + try (var valueToWrite = updatedValuesToWrite.get(i)) { + if (valueToWrite == null) { + batch.delete(cfh.v(), entry.getT2().send()); + } else { + batch.put(cfh.v(), entry.getT2().send(), valueToWrite.send()); + } } + i++; } - i++; + batch.writeToDbAndClose(); } - batch.writeToDbAndClose(); - batch.close(); } else { int i = 0; for (Tuple2 entry : entriesWindow) { - db.put(EMPTY_WRITE_OPTIONS, entry.getT2(), updatedValuesToWrite.get(i)); + db.put(writeOptions, entry.getT2(), updatedValuesToWrite.get(i)); i++; } } @@ -840,7 +834,7 @@ public class LLLocalDictionary implements LLDictionary { boolean reverse, boolean smallRange) { Mono iteratorMono = rangeMono.map(rangeSend -> { - ReadOptions readOptions = generateReadOptionsOrNull(snapshot); + var readOptions = generateReadOptionsOrNull(snapshot); return new LLLocalEntryReactiveRocksIterator(db, rangeSend, nettyDirect, readOptions, reverse, smallRange); }); return Flux.usingWhen(iteratorMono, @@ -852,7 +846,7 @@ public class LLLocalDictionary implements LLDictionary { private Flux>> getRangeMultiGrouped(LLSnapshot snapshot, Mono> rangeMono, int prefixLength, boolean smallRange) { Mono iteratorMono = rangeMono.map(rangeSend -> { - ReadOptions readOptions = generateReadOptionsOrNull(snapshot); + var readOptions = generateReadOptionsOrNull(snapshot); return new LLLocalGroupedEntryReactiveRocksIterator(db, prefixLength, rangeSend, @@ -890,7 +884,7 @@ public class LLLocalDictionary implements LLDictionary { int prefixLength, boolean smallRange) { Mono iteratorMono = rangeMono.map(rangeSend -> { - ReadOptions readOptions = generateReadOptionsOrNull(snapshot); + var readOptions = generateReadOptionsOrNull(snapshot); return new LLLocalGroupedKeyReactiveRocksIterator(db, prefixLength, rangeSend, @@ -917,24 +911,22 @@ public class LLLocalDictionary implements LLDictionary { isBoundedRange(range), false )) { - ro.setFillCache(false); + ro.v().setFillCache(false); if (!range.isSingle()) { if (LLUtils.MANUAL_READAHEAD) { - ro.setReadaheadSize(32 * 1024); + ro.v().setReadaheadSize(32 * 1024); } } - ro.setVerifyChecksums(true); - try (var rocksIteratorTuple = db.newRocksIterator(nettyDirect, ro, range, false)) { - try (var rocksIterator = rocksIteratorTuple.iterator()) { - rocksIterator.seekToFirst(); - while (rocksIterator.isValid() && !sink.isCancelled()) { - try { - rocksIterator.key(DUMMY_WRITE_ONLY_BYTE_BUFFER); - rocksIterator.value(DUMMY_WRITE_ONLY_BYTE_BUFFER); - rocksIterator.next(); - } catch (RocksDBException ex) { - sink.next(new BadBlock(databaseName, ColumnUtils.special(columnName), null, ex)); - } + ro.v().setVerifyChecksums(true); + try (var rocksIterator = db.newRocksIterator(nettyDirect, ro, range, false)) { + rocksIterator.seekToFirst(); + while (rocksIterator.isValid() && !sink.isCancelled()) { + try { + rocksIterator.key(DUMMY_WRITE_ONLY_BYTE_BUFFER); + rocksIterator.value(DUMMY_WRITE_ONLY_BYTE_BUFFER); + rocksIterator.next(); + } catch (RocksDBException ex) { + sink.next(new BadBlock(databaseName, ColumnUtils.special(columnName), null, ex)); } } } @@ -952,7 +944,7 @@ public class LLLocalDictionary implements LLDictionary { public Flux> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, Mono> rangeMono, int prefixLength, boolean smallRange) { Mono iteratorMono = rangeMono.map(range -> { - ReadOptions readOptions = generateReadOptionsOrNull(snapshot); + var readOptions = generateReadOptionsOrNull(snapshot); return new LLLocalKeyPrefixReactiveRocksIterator(db, prefixLength, range, @@ -985,17 +977,27 @@ public class LLLocalDictionary implements LLDictionary { .flux(); } + private record RocksObjTuple>(RocksObj t1, U t2) implements SafeCloseable { + + @Override + public void close() { + //noinspection EmptyTryBlock + try (t1; t2) {} + } + } + private Flux> getRangeKeysMulti(LLSnapshot snapshot, Mono> rangeMono, boolean reverse, boolean smallRange) { - Mono iteratorMono = rangeMono.map(range -> { - ReadOptions readOptions = generateReadOptionsOrNull(snapshot); - return new LLLocalKeyReactiveRocksIterator(db, range, nettyDirect, readOptions, reverse, smallRange); + Mono> iteratorMono = rangeMono.map(range -> { + var readOptions = generateReadOptionsOrNull(snapshot); + var it = new LLLocalKeyReactiveRocksIterator(db, range, nettyDirect, readOptions, reverse, smallRange); + return new RocksObjTuple<>(readOptions, it); }); return Flux.usingWhen(iteratorMono, - iterator -> iterator.flux().subscribeOn(dbRScheduler, false), - iterator -> Mono.fromRunnable(iterator::close) + t -> t.t2().flux().subscribeOn(dbRScheduler, false), + t -> Mono.fromRunnable(t::close) ); } @@ -1005,7 +1007,8 @@ public class LLLocalDictionary implements LLDictionary { return rangeMono .publishOn(dbWScheduler) .handle((rangeSend, sink) -> { - try (var range = rangeSend.receive()) { + try (var writeOptions = new RocksObj<>(new WriteOptions()); + var range = rangeSend.receive()) { assert !Schedulers.isInNonBlockingThread() : "Called setRange in a nonblocking thread"; if (!USE_WRITE_BATCH_IN_SET_RANGE_DELETE || !USE_WRITE_BATCHES_IN_SET_RANGE) { try (var opts = LLUtils.generateCustomReadOptions(null, @@ -1013,45 +1016,19 @@ public class LLLocalDictionary implements LLDictionary { isBoundedRange(range), smallRange )) { - ReleasableSlice minBound; - if (range.hasMin()) { - minBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, opts, IterateBound.LOWER, range.getMinUnsafe()); - } else { - minBound = AbstractRocksDBColumn.emptyReleasableSlice(); - } - try { - ReleasableSlice maxBound; - if (range.hasMax()) { - maxBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, opts, IterateBound.UPPER, range.getMaxUnsafe()); - } else { - maxBound = AbstractRocksDBColumn.emptyReleasableSlice(); - } - assert cfh.isOwningHandle(); - assert opts.isOwningHandle(); SafeCloseable seekTo; - try (var it = db.newIterator(opts)) { + try (var it = db.newIterator(opts, range.getMinUnsafe(), range.getMaxUnsafe())) { if (!PREFER_AUTO_SEEK_BOUND && range.hasMin()) { - seekTo = it.seekTo(range.getMinUnsafe()); + it.seekTo(range.getMinUnsafe()); } else { seekTo = null; it.seekToFirst(); } - try { - while (it.isValid()) { - db.delete(EMPTY_WRITE_OPTIONS, it.key()); - it.next(); - } - } finally { - if (seekTo != null) { - seekTo.close(); - } + while (it.isValid()) { + db.delete(writeOptions, it.key()); + it.next(); } - } finally { - maxBound.close(); } - } finally { - minBound.close(); - } } } else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) { try (var batch = new CappedWriteBatch(db, @@ -1059,10 +1036,10 @@ public class LLLocalDictionary implements LLDictionary { CAPPED_WRITE_BATCH_CAP, RESERVED_WRITE_BATCH_SIZE, MAX_WRITE_BATCH_SIZE, - BATCH_WRITE_OPTIONS + writeOptions )) { if (range.isSingle()) { - batch.delete(cfh, range.getSingle()); + batch.delete(cfh.v(), range.getSingle()); } else { deleteSmallRangeWriteBatch(batch, range.copy().send()); } @@ -1071,11 +1048,11 @@ public class LLLocalDictionary implements LLDictionary { } else { try (var batch = new WriteBatch(RESERVED_WRITE_BATCH_SIZE)) { if (range.isSingle()) { - batch.delete(cfh, LLUtils.toArray(range.getSingleUnsafe())); + batch.delete(cfh.v(), LLUtils.toArray(range.getSingleUnsafe())); } else { deleteSmallRangeWriteBatch(batch, range.copy().send()); } - db.write(EMPTY_WRITE_OPTIONS, batch); + db.write(writeOptions, batch); batch.clear(); } } @@ -1093,11 +1070,11 @@ public class LLLocalDictionary implements LLDictionary { for (Send entrySend : entriesListSend) { entriesList.add(entrySend.receive()); } - try { + try (var writeOptions = new RocksObj<>(new WriteOptions())) { if (!USE_WRITE_BATCHES_IN_SET_RANGE) { for (LLEntry entry : entriesList) { assert entry.isAccessible(); - db.put(EMPTY_WRITE_OPTIONS, entry.getKeyUnsafe(), entry.getValueUnsafe()); + db.put(writeOptions, entry.getKeyUnsafe(), entry.getValueUnsafe()); } } else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) { try (var batch = new CappedWriteBatch(db, @@ -1105,14 +1082,14 @@ public class LLLocalDictionary implements LLDictionary { CAPPED_WRITE_BATCH_CAP, RESERVED_WRITE_BATCH_SIZE, MAX_WRITE_BATCH_SIZE, - BATCH_WRITE_OPTIONS + writeOptions )) { for (LLEntry entry : entriesList) { assert entry.isAccessible(); if (nettyDirect) { - batch.put(cfh, entry.getKey(), entry.getValue()); + batch.put(cfh.v(), entry.getKey(), entry.getValue()); } else { - batch.put(cfh, + batch.put(cfh.v(), LLUtils.toArray(entry.getKeyUnsafe()), LLUtils.toArray(entry.getValueUnsafe()) ); @@ -1124,10 +1101,10 @@ public class LLLocalDictionary implements LLDictionary { try (var batch = new WriteBatch(RESERVED_WRITE_BATCH_SIZE)) { for (LLEntry entry : entriesList) { assert entry.isAccessible(); - batch.put(cfh, LLUtils.toArray(entry.getKeyUnsafe()), + batch.put(cfh.v(), LLUtils.toArray(entry.getKeyUnsafe()), LLUtils.toArray(entry.getValueUnsafe())); } - db.write(EMPTY_WRITE_OPTIONS, batch); + db.write(writeOptions, batch); batch.clear(); } } @@ -1152,8 +1129,9 @@ public class LLLocalDictionary implements LLDictionary { .getRange(null, rangeMono, false, smallRange) .publishOn(dbWScheduler) .handle((oldValueSend, sink) -> { - try (var oldValue = oldValueSend.receive()) { - db.delete(EMPTY_WRITE_OPTIONS, oldValue.getKeyUnsafe()); + try (var writeOptions = new RocksObj<>(new WriteOptions()); + var oldValue = oldValueSend.receive()) { + db.delete(writeOptions, oldValue.getKeyUnsafe()); sink.next(true); } catch (RocksDBException ex) { sink.error(new RocksDBException("Failed to write range: " + ex.getMessage())); @@ -1182,49 +1160,20 @@ public class LLLocalDictionary implements LLDictionary { //todo: this is broken, check why. (is this still true?) private void deleteSmallRangeWriteBatch(CappedWriteBatch writeBatch, Send rangeToReceive) throws RocksDBException { - var range = rangeToReceive.receive(); - try (var readOpts = generateReadOptionsOrNew(null)) { - readOpts.setFillCache(false); - ReleasableSlice minBound; - if (range.hasMin()) { - minBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe()); - } else { - minBound = AbstractRocksDBColumn.emptyReleasableSlice(); - } - try { - ReleasableSlice maxBound; - if (range.hasMax()) { - maxBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe()); + try (var range = rangeToReceive.receive(); + var readOpts = generateReadOptionsOrNew(null)) { + readOpts.v().setFillCache(false); + try (var rocksIterator = db.newIterator(readOpts, range.getMinUnsafe(), range.getMaxUnsafe())) { + if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) { + rocksIterator.seekTo(range.getMinUnsafe()); } else { - maxBound = AbstractRocksDBColumn.emptyReleasableSlice(); + rocksIterator.seekToFirst(); } - try (var rocksIterator = db.newIterator(readOpts)) { - SafeCloseable seekTo; - if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) { - seekTo = rocksIterator.seekTo(range.getMinUnsafe()); - } else { - seekTo = null; - rocksIterator.seekToFirst(); - } - try { - while (rocksIterator.isValid()) { - writeBatch.delete(cfh, LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).send()); - rocksIterator.next(); - } - } finally { - if (seekTo != null) { - seekTo.close(); - } - } - } finally { - maxBound.close(); + while (rocksIterator.isValid()) { + writeBatch.delete(cfh.v(), LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).send()); + rocksIterator.next(); } - } finally { - minBound.close(); } - } catch (Throwable e) { - range.close(); - throw e; } } @@ -1232,93 +1181,52 @@ public class LLLocalDictionary implements LLDictionary { throws RocksDBException { try (var range = rangeToReceive.receive()) { try (var readOpts = LLUtils.generateCustomReadOptions(null, true, isBoundedRange(range), true)) { - readOpts.setFillCache(false); - ReleasableSlice minBound; - if (range.hasMin()) { - minBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe()); - } else { - minBound = AbstractRocksDBColumn.emptyReleasableSlice(); - } - try { - ReleasableSlice maxBound; - if (range.hasMax()) { - maxBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe()); + readOpts.v().setFillCache(false); + try (var rocksIterator = db.newIterator(readOpts, range.getMinUnsafe(), range.getMaxUnsafe())) { + if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) { + rocksIterator.seekTo(range.getMinUnsafe()); } else { - maxBound = AbstractRocksDBColumn.emptyReleasableSlice(); + rocksIterator.seekToFirst(); } - try (var rocksIterator = db.newIterator(readOpts)) { - SafeCloseable seekTo; - if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) { - seekTo = rocksIterator.seekTo(range.getMinUnsafe()); - } else { - seekTo = null; - rocksIterator.seekToFirst(); - } - try { - while (rocksIterator.isValid()) { - writeBatch.delete(cfh, rocksIterator.key()); - rocksIterator.next(); - } - } finally { - if (seekTo != null) { - seekTo.close(); - } - } - } finally { - maxBound.close(); + while (rocksIterator.isValid()) { + writeBatch.delete(cfh.v(), rocksIterator.key()); + rocksIterator.next(); } - } finally { - minBound.close(); } } } } - /** - * This method should not modify or move the writerIndex/readerIndex of the key - */ - public record ReleasableSliceImplWithoutRelease(AbstractSlice slice) implements ReleasableSlice {} - - /** - * This class should not modify or move the writerIndex/readerIndex of the key - */ - public record ReleasableSliceImplWithRelease(AbstractSlice slice) implements ReleasableSlice { - - @Override - public void close() { - slice.close(); - } - } - public Mono clear() { return Mono .fromCallable(() -> { assert !Schedulers.isInNonBlockingThread() : "Called clear in a nonblocking thread"; boolean shouldCompactLater = false; - try (var readOpts = LLUtils.generateCustomReadOptions(null, false, false, false)) { - readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); + try (var writeOptions = new RocksObj<>(new WriteOptions()); + var readOpts = LLUtils.generateCustomReadOptions(null, false, false, false)) { + readOpts.v().setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); // readOpts.setIgnoreRangeDeletions(true); - readOpts.setFillCache(false); + readOpts.v().setFillCache(false); if (LLUtils.MANUAL_READAHEAD) { - readOpts.setReadaheadSize(32 * 1024); // 32KiB + readOpts.v().setReadaheadSize(32 * 1024); // 32KiB } try (CappedWriteBatch writeBatch = new CappedWriteBatch(db, alloc, CAPPED_WRITE_BATCH_CAP, RESERVED_WRITE_BATCH_SIZE, MAX_WRITE_BATCH_SIZE, - BATCH_WRITE_OPTIONS + writeOptions )) { byte[] firstDeletedKey = null; byte[] lastDeletedKey = null; - try (var rocksIterator = db.newIterator(readOpts)) { + try (var rocksIterator = db.newIterator(readOpts, null, null)) { // If the database supports transactions, delete each key one by one if (db.supportsTransactions()) { rocksIterator.seekToFirst(); while (rocksIterator.isValid()) { - writeBatch.delete(cfh, rocksIterator.key()); + writeBatch.delete(cfh.v(), rocksIterator.key()); rocksIterator.next(); } } else { @@ -1327,8 +1235,8 @@ public class LLLocalDictionary implements LLDictionary { if (rocksIterator.isValid()) { firstDeletedKey = FIRST_KEY; lastDeletedKey = rocksIterator.key(); - writeBatch.deleteRange(cfh, FIRST_KEY, rocksIterator.key()); - writeBatch.delete(cfh, rocksIterator.key()); + writeBatch.deleteRange(cfh.v(), FIRST_KEY, rocksIterator.key()); + writeBatch.delete(cfh.v(), rocksIterator.key()); shouldCompactLater = true; } } @@ -1340,16 +1248,16 @@ public class LLLocalDictionary implements LLDictionary { // Compact range db.suggestCompactRange(); if (lastDeletedKey != null) { - try (var cro = new CompactRangeOptions() + try (var cro = new RocksObj<>(new CompactRangeOptions() .setAllowWriteStall(false) .setExclusiveManualCompaction(false) - .setChangeLevel(false)) { + .setChangeLevel(false))) { db.compactRange(firstDeletedKey, lastDeletedKey, cro); } } } - try (var fo = new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true)) { + try (var fo = new RocksObj<>(new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true))) { db.flush(fo); } db.flushWal(true); @@ -1375,52 +1283,24 @@ public class LLLocalDictionary implements LLDictionary { isBoundedRange(range), false )) { - readOpts.setFillCache(false); - readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); - ReleasableSlice minBound; - if (range.hasMin()) { - minBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe()); - } else { - minBound = AbstractRocksDBColumn.emptyReleasableSlice(); - } - try { - ReleasableSlice maxBound; - if (range.hasMax()) { - maxBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe()); - } else { - maxBound = AbstractRocksDBColumn.emptyReleasableSlice(); - } - try { - if (fast) { - readOpts.setIgnoreRangeDeletions(true); + readOpts.v().setFillCache(false); + readOpts.v().setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); + if (fast) { + readOpts.v().setIgnoreRangeDeletions(true); - } - try (var rocksIterator = db.newIterator(readOpts)) { - SafeCloseable seekTo; - if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) { - seekTo = rocksIterator.seekTo(range.getMinUnsafe()); - } else { - seekTo = null; - rocksIterator.seekToFirst(); - } - try { - long i = 0; - while (rocksIterator.isValid()) { - rocksIterator.next(); - i++; - } - sink.next(i); - } finally { - if (seekTo != null) { - seekTo.close(); - } - } - } - } finally { - maxBound.close(); + } + try (var rocksIterator = db.newIterator(readOpts, range.getMinUnsafe(), range.getMaxUnsafe())) { + if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) { + rocksIterator.seekTo(range.getMinUnsafe()); + } else { + rocksIterator.seekToFirst(); } - } finally { - minBound.close(); + long i = 0; + while (rocksIterator.isValid()) { + rocksIterator.next(); + i++; + } + sink.next(i); } } } @@ -1436,47 +1316,21 @@ public class LLLocalDictionary implements LLDictionary { try (var range = rangeSend.receive()) { assert !Schedulers.isInNonBlockingThread() : "Called getOne in a nonblocking thread"; try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNull(snapshot), true, true, true)) { - ReleasableSlice minBound; - if (range.hasMin()) { - minBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe()); - } else { - minBound = AbstractRocksDBColumn.emptyReleasableSlice(); - } - try { - ReleasableSlice maxBound; - if (range.hasMax()) { - maxBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe()); + try (var rocksIterator = db.newIterator(readOpts, range.getMinUnsafe(), range.getMaxUnsafe())) { + if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) { + rocksIterator.seekTo(range.getMinUnsafe()); } else { - maxBound = AbstractRocksDBColumn.emptyReleasableSlice(); + rocksIterator.seekToFirst(); } - try (var rocksIterator = db.newIterator(readOpts)) { - SafeCloseable seekTo; - if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) { - seekTo = rocksIterator.seekTo(range.getMinUnsafe()); - } else { - seekTo = null; - rocksIterator.seekToFirst(); - } - try { - if (rocksIterator.isValid()) { - try (var key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key)) { - try (var value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value)) { - sink.next(LLEntry.of(key.send(), value.send()).send()); - } - } - } else { - sink.complete(); - } - } finally { - if (seekTo != null) { - seekTo.close(); + if (rocksIterator.isValid()) { + try (var key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key)) { + try (var value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value)) { + sink.next(LLEntry.of(key.send(), value.send()).send()); } } - } finally { - maxBound.close(); + } else { + sink.complete(); } - } finally { - minBound.close(); } } } catch (RocksDBException ex) { @@ -1491,43 +1345,17 @@ public class LLLocalDictionary implements LLDictionary { try (var range = rangeSend.receive()) { assert !Schedulers.isInNonBlockingThread() : "Called getOneKey in a nonblocking thread"; try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNull(snapshot), true, true, true)) { - ReleasableSlice minBound; - if (range.hasMin()) { - minBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe()); - } else { - minBound = AbstractRocksDBColumn.emptyReleasableSlice(); - } - try { - ReleasableSlice maxBound; - if (range.hasMax()) { - maxBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe()); + try (var rocksIterator = db.newIterator(readOpts, range.getMinUnsafe(), range.getMaxUnsafe())) { + if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) { + rocksIterator.seekTo(range.getMinUnsafe()); } else { - maxBound = AbstractRocksDBColumn.emptyReleasableSlice(); + rocksIterator.seekToFirst(); } - try (var rocksIterator = db.newIterator(readOpts)) { - SafeCloseable seekTo; - if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) { - seekTo = rocksIterator.seekTo(range.getMinUnsafe()); - } else { - seekTo = null; - rocksIterator.seekToFirst(); - } - try { - if (rocksIterator.isValid()) { - sink.next(LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).send()); - } else { - sink.complete(); - } - } finally { - if (seekTo != null) { - seekTo.close(); - } - } - } finally { - maxBound.close(); + if (rocksIterator.isValid()) { + sink.next(LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).send()); + } else { + sink.complete(); } - } finally { - minBound.close(); } } } catch (RocksDBException ex) { @@ -1538,7 +1366,7 @@ public class LLLocalDictionary implements LLDictionary { private long fastSizeAll(@Nullable LLSnapshot snapshot) throws RocksDBException { try (var rocksdbSnapshot = generateReadOptionsOrNew(snapshot)) { - if (USE_CURRENT_FASTSIZE_FOR_OLD_SNAPSHOTS || rocksdbSnapshot.snapshot() == null) { + if (USE_CURRENT_FASTSIZE_FOR_OLD_SNAPSHOTS || rocksdbSnapshot.v().snapshot() == null) { try { return db.getLongProperty("rocksdb.estimate-num-keys"); } catch (RocksDBException e) { @@ -1548,11 +1376,11 @@ public class LLLocalDictionary implements LLDictionary { } else if (PARALLEL_EXACT_SIZE) { return exactSizeAll(snapshot); } else { - rocksdbSnapshot.setFillCache(false); - rocksdbSnapshot.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); - rocksdbSnapshot.setIgnoreRangeDeletions(true); + rocksdbSnapshot.v().setFillCache(false); + rocksdbSnapshot.v().setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); + rocksdbSnapshot.v().setIgnoreRangeDeletions(true); long count = 0; - try (var rocksIterator = db.newIterator(rocksdbSnapshot)) { + try (var rocksIterator = db.newIterator(rocksdbSnapshot, null, null)) { rocksIterator.seekToFirst(); // If it's a fast size of a snapshot, count only up to 100'000 elements while (rocksIterator.isValid() && count < 100_000) { @@ -1571,9 +1399,9 @@ public class LLLocalDictionary implements LLDictionary { } try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNull(snapshot), false, false, false)) { if (LLUtils.MANUAL_READAHEAD) { - readOpts.setReadaheadSize(128 * 1024); // 128KiB + readOpts.v().setReadaheadSize(128 * 1024); // 128KiB } - readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); + readOpts.v().setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); if (PARALLEL_EXACT_SIZE) { var commonPool = ForkJoinPool.commonPool(); @@ -1585,7 +1413,7 @@ public class LLLocalDictionary implements LLDictionary { )) .map(range -> (Callable) () -> { long partialCount = 0; - try (var rangeReadOpts = new ReadOptions(readOpts)) { + try (var rangeReadOpts = new RocksObj<>(new ReadOptions(readOpts.v()))) { Slice sliceBegin; if (range.getKey() != null) { sliceBegin = new Slice(range.getKey()); @@ -1600,12 +1428,12 @@ public class LLLocalDictionary implements LLDictionary { } try { if (sliceBegin != null) { - rangeReadOpts.setIterateLowerBound(sliceBegin); + rangeReadOpts.v().setIterateLowerBound(sliceBegin); } if (sliceEnd != null) { - rangeReadOpts.setIterateUpperBound(sliceEnd); + rangeReadOpts.v().setIterateUpperBound(sliceEnd); } - try (var rocksIterator = db.newIterator(rangeReadOpts)) { + try (var rocksIterator = db.newIterator(rangeReadOpts, null, null)) { rocksIterator.seekToFirst(); while (rocksIterator.isValid()) { partialCount++; @@ -1632,7 +1460,7 @@ public class LLLocalDictionary implements LLDictionary { return count; } else { long count = 0; - try (var rocksIterator = db.newIterator(readOpts)) { + try (var rocksIterator = db.newIterator(readOpts, null, null)) { rocksIterator.seekToFirst(); while (rocksIterator.isValid()) { count++; @@ -1651,47 +1479,22 @@ public class LLLocalDictionary implements LLDictionary { return rangeMono.publishOn(dbWScheduler).handle((rangeSend, sink) -> { try (var range = rangeSend.receive()) { assert !Schedulers.isInNonBlockingThread() : "Called removeOne in a nonblocking thread"; - try (var readOpts = new ReadOptions()) { - ReleasableSlice minBound; - if (range.hasMin()) { - minBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe()); - } else { - minBound = AbstractRocksDBColumn.emptyReleasableSlice(); - } - try { - ReleasableSlice maxBound; - if (range.hasMax()) { - maxBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe()); + try (var readOpts = new RocksObj<>(new ReadOptions()); + var writeOpts = new RocksObj<>(new WriteOptions())) { + try (var rocksIterator = db.newIterator(readOpts, range.getMinUnsafe(), range.getMaxUnsafe())) { + if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) { + rocksIterator.seekTo(range.getMinUnsafe()); } else { - maxBound = AbstractRocksDBColumn.emptyReleasableSlice(); + rocksIterator.seekToFirst(); } - try (var rocksIterator = db.newIterator(readOpts)) { - SafeCloseable seekTo; - if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) { - seekTo = rocksIterator.seekTo(range.getMinUnsafe()); - } else { - seekTo = null; - rocksIterator.seekToFirst(); - } - try { - if (!rocksIterator.isValid()) { - sink.complete(); - return; - } - Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key); - Buffer value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value); - db.delete(EMPTY_WRITE_OPTIONS, key); - sink.next(LLEntry.of(key, value).send()); - } finally { - if (seekTo != null) { - seekTo.close(); - } - } - } finally { - maxBound.close(); + if (!rocksIterator.isValid()) { + sink.complete(); + return; } - } finally { - minBound.close(); + Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key); + Buffer value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value); + db.delete(writeOpts, key); + sink.next(LLEntry.of(key, value).send()); } } } catch (RocksDBException ex) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalEntryReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalEntryReactiveRocksIterator.java index d6ef3f6..e541e2e 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalEntryReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalEntryReactiveRocksIterator.java @@ -4,6 +4,7 @@ import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.Send; import it.cavallium.dbengine.database.LLEntry; import it.cavallium.dbengine.database.LLRange; +import it.cavallium.dbengine.database.disk.rocksdb.RocksObj; import org.rocksdb.ReadOptions; public class LLLocalEntryReactiveRocksIterator extends LLLocalReactiveRocksIterator> { @@ -11,7 +12,7 @@ public class LLLocalEntryReactiveRocksIterator extends LLLocalReactiveRocksItera public LLLocalEntryReactiveRocksIterator(RocksDBColumn db, Send range, boolean allowNettyDirect, - ReadOptions readOptions, + RocksObj readOptions, boolean reverse, boolean smallRange) { super(db, range, allowNettyDirect, readOptions, true, reverse, smallRange); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedEntryReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedEntryReactiveRocksIterator.java index b385c3e..3551b9b 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedEntryReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedEntryReactiveRocksIterator.java @@ -4,6 +4,7 @@ import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.Send; import it.cavallium.dbengine.database.LLEntry; import it.cavallium.dbengine.database.LLRange; +import it.cavallium.dbengine.database.disk.rocksdb.RocksObj; import org.rocksdb.ReadOptions; public class LLLocalGroupedEntryReactiveRocksIterator extends @@ -13,7 +14,7 @@ public class LLLocalGroupedEntryReactiveRocksIterator extends int prefixLength, Send range, boolean allowNettyDirect, - ReadOptions readOptions, + RocksObj readOptions, boolean smallRange) { super(db, prefixLength, range, allowNettyDirect, readOptions, false, true, smallRange); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedKeyReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedKeyReactiveRocksIterator.java index 5d4d457..306148c 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedKeyReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedKeyReactiveRocksIterator.java @@ -3,6 +3,7 @@ package it.cavallium.dbengine.database.disk; import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.Send; import it.cavallium.dbengine.database.LLRange; +import it.cavallium.dbengine.database.disk.rocksdb.RocksObj; import org.rocksdb.ReadOptions; public class LLLocalGroupedKeyReactiveRocksIterator extends LLLocalGroupedReactiveRocksIterator> { @@ -11,7 +12,7 @@ public class LLLocalGroupedKeyReactiveRocksIterator extends LLLocalGroupedReacti int prefixLength, Send range, boolean allowNettyDirect, - ReadOptions readOptions, + RocksObj readOptions, boolean smallRange) { super(db, prefixLength, range, allowNettyDirect, readOptions, true, false, smallRange); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java index 4bf3a4b..db2a0fc 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java @@ -11,6 +11,7 @@ import io.netty5.buffer.api.Send; import io.netty5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.disk.rocksdb.RocksObj; import it.unimi.dsi.fastutil.objects.ObjectArrayList; import java.util.List; import org.apache.logging.log4j.LogManager; @@ -58,7 +59,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator extends private final int prefixLength; private LLRange range; private final boolean allowNettyDirect; - private ReadOptions readOptions; + private RocksObj readOptions; private final boolean canFillCache; private final boolean readValues; private final boolean smallRange; @@ -68,7 +69,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator extends int prefixLength, Send range, boolean allowNettyDirect, - ReadOptions readOptions, + RocksObj readOptions, boolean canFillCache, boolean readValues, boolean smallRange) { @@ -78,7 +79,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator extends this.prefixLength = prefixLength; this.range = range.receive(); this.allowNettyDirect = allowNettyDirect; - this.readOptions = readOptions != null ? readOptions : new ReadOptions(); + this.readOptions = readOptions != null ? readOptions : new RocksObj<>(new ReadOptions()); this.canFillCache = canFillCache; this.readValues = readValues; this.smallRange = smallRange; @@ -94,7 +95,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator extends return new RocksIterWithReadOpts(readOptions, db.newRocksIterator(allowNettyDirect, readOptions, range, false)); }, (tuple, sink) -> { try { - var rocksIterator = tuple.iter().iterator(); + var rocksIterator = tuple.iter(); ObjectArrayList values = new ObjectArrayList<>(); Buffer firstGroupKey = null; try { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java index a038312..20815ec 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java @@ -11,6 +11,7 @@ import io.netty5.buffer.api.Send; import io.netty5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.disk.rocksdb.RocksObj; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.rocksdb.ReadOptions; @@ -56,7 +57,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends private final int prefixLength; private LLRange rangeShared; private final boolean allowNettyDirect; - private ReadOptions readOptions; + private RocksObj readOptions; private final boolean canFillCache; private final boolean smallRange; @@ -64,7 +65,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends int prefixLength, Send range, boolean allowNettyDirect, - ReadOptions readOptions, + RocksObj readOptions, boolean canFillCache, boolean smallRange) { super(DROP); @@ -73,7 +74,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends this.prefixLength = prefixLength; this.rangeShared = range.receive(); this.allowNettyDirect = allowNettyDirect; - this.readOptions = readOptions != null ? readOptions : new ReadOptions(); + this.readOptions = readOptions != null ? readOptions : new RocksObj<>(new ReadOptions()); this.canFillCache = canFillCache; this.smallRange = smallRange; } @@ -93,7 +94,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends return new RocksIterWithReadOpts(readOptions, db.newRocksIterator(allowNettyDirect, readOptions, rangeShared, false)); }, (tuple, sink) -> { try { - var rocksIterator = tuple.iter().iterator(); + var rocksIterator = tuple.iter(); Buffer firstGroupKey = null; try { while (rocksIterator.isValid()) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyReactiveRocksIterator.java index 170aa00..055e59c 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyReactiveRocksIterator.java @@ -3,6 +3,7 @@ package it.cavallium.dbengine.database.disk; import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.Send; import it.cavallium.dbengine.database.LLRange; +import it.cavallium.dbengine.database.disk.rocksdb.RocksObj; import org.rocksdb.ReadOptions; public class LLLocalKeyReactiveRocksIterator extends LLLocalReactiveRocksIterator> { @@ -10,7 +11,7 @@ public class LLLocalKeyReactiveRocksIterator extends LLLocalReactiveRocksIterato public LLLocalKeyReactiveRocksIterator(RocksDBColumn db, Send range, boolean allowNettyDirect, - ReadOptions readOptions, + RocksObj readOptions, boolean reverse, boolean smallRange) { super(db, range, allowNettyDirect, readOptions, false, reverse, smallRange); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index aed34e5..334f239 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -9,6 +9,7 @@ import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Timer; import io.netty5.buffer.api.BufferAllocator; +import io.netty5.buffer.api.internal.ResourceSupport; import io.netty5.util.internal.PlatformDependent; import it.cavallium.data.generator.nativedata.NullableString; import it.cavallium.dbengine.client.MemoryStats; @@ -22,6 +23,7 @@ import it.cavallium.dbengine.database.RocksDBMapProperty; import it.cavallium.dbengine.database.RocksDBStringProperty; import it.cavallium.dbengine.database.TableWithProperties; import it.cavallium.dbengine.database.UpdateMode; +import it.cavallium.dbengine.database.disk.rocksdb.RocksObj; import it.cavallium.dbengine.rpc.current.data.Column; import it.cavallium.dbengine.rpc.current.data.ColumnOptions; import it.cavallium.dbengine.rpc.current.data.DatabaseLevel; @@ -123,10 +125,10 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { private Statistics statistics; private Cache standardCache; private Cache compressedCache; - private final Map handles; + private final Map> handles; private final HashMap persistentCaches; - private final ConcurrentHashMap snapshotsHandles = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> snapshotsHandles = new ConcurrentHashMap<>(); private final AtomicLong nextSnapshotNumbers = new AtomicLong(1); private final StampedLock closeLock = new StampedLock(); private volatile boolean closed = false; @@ -465,13 +467,13 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { this.handles = new HashMap<>(); if (enableColumnsBug && !inMemory) { for (int i = 0; i < columns.size(); i++) { - this.handles.put(columns.get(i), handles.get(i)); + this.handles.put(columns.get(i), new RocksObj<>(handles.get(i))); } } else { handles: for (ColumnFamilyHandle handle : handles) { for (Column column : columns) { if (Arrays.equals(column.name().getBytes(StandardCharsets.US_ASCII), handle.getName())) { - this.handles.put(column, handle); + this.handles.put(column, new RocksObj<>(handle)); continue handles; } } @@ -529,6 +531,10 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { RocksDBUtils.ensureOwned(rocksObject); } + protected void ensureOwned(RocksObj rocksObject) { + RocksDBUtils.ensureOwned(rocksObject); + } + private synchronized PersistentCache resolvePersistentCache(HashMap caches, DBOptions rocksdbOptions, List persistentCaches, @@ -565,7 +571,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { throw new IllegalArgumentException("Persistent cache " + persistentCacheId.get() + " is not defined"); } - public Map getAllColumnFamilyHandles() { + public Map> getAllColumnFamilyHandles() { return this.handles; } @@ -580,7 +586,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { ensureOpen(); var cfh = handles.get(column); ensureOwned(cfh); - return RocksDBUtils.getLevels(db, cfh); + return RocksDBUtils.getLevels(db, cfh.v()); } finally { closeLock.unlockRead(closeReadLock); } @@ -592,7 +598,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { ensureOpen(); var cfh = handles.get(column); ensureOwned(cfh); - return RocksDBUtils.getColumnFiles(db, cfh, excludeLastLevel); + return RocksDBUtils.getColumnFiles(db, cfh.v(), excludeLastLevel); } finally { closeLock.unlockRead(closeReadLock); } @@ -604,7 +610,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { ensureOpen(); for (var cfh : this.handles.values()) { ensureOwned(cfh); - RocksDBUtils.forceCompaction(db, name, cfh, volumeId, logger); + RocksDBUtils.forceCompaction(db, name, cfh.v(), volumeId, logger); } } finally { closeLock.unlockRead(closeReadLock); @@ -637,7 +643,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { private void registerGauge(MeterRegistry meterRegistry, String name, String propertyName, boolean divideByAllColumns) { if (divideByAllColumns) { - for (Entry cfhEntry : handles.entrySet()) { + for (var cfhEntry : handles.entrySet()) { var columnName = cfhEntry.getKey().name(); var cfh = cfhEntry.getValue(); meterRegistry.gauge("rocksdb.property.value", @@ -652,7 +658,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { if (closed) { return 0d; } - return database.getLongProperty(cfh, propertyName); + return database.getLongProperty(cfh.v(), propertyName); } catch (RocksDBException e) { if ("NotFound".equals(e.getMessage())) { return 0d; @@ -715,7 +721,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { snapshotsHandles.forEach((id, snapshot) -> { try { if (db.isOwningHandle()) { - db.releaseSnapshot(snapshot); + db.releaseSnapshot(snapshot.v()); + snapshot.close(); } } catch (Exception ex2) { // ignore exception @@ -1026,7 +1033,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { return stats; } - private Snapshot getSnapshotLambda(LLSnapshot snapshot) { + private RocksObj getSnapshotLambda(LLSnapshot snapshot) { var closeReadSnapLock = closeLock.readLock(); try { ensureOpen(); @@ -1078,8 +1085,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { name, ColumnUtils.toString(columnName), dbWScheduler, - dbRScheduler, - this::getSnapshotLambda, + dbRScheduler, snapshot -> getSnapshotLambda(snapshot), updateMode, databaseOptions ); @@ -1093,7 +1099,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { var closeReadLock = closeLock.readLock(); try { ensureOpen(); - ColumnFamilyHandle cfh; + RocksObj cfh; try { cfh = getCfh(columnName); ensureOwned(cfh); @@ -1106,7 +1112,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { } } - private RocksDBColumn getRocksDBColumn(RocksDB db, ColumnFamilyHandle cfh) { + private RocksDBColumn getRocksDBColumn(RocksDB db, RocksObj cfh) { var nettyDirect = databaseOptions.allowNettyDirect(); var closeLock = getCloseLock(); if (db instanceof OptimisticTransactionDB optimisticTransactionDB) { @@ -1132,9 +1138,9 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { } } - private ColumnFamilyHandle getCfh(byte[] columnName) throws RocksDBException { - ColumnFamilyHandle cfh = handles.get(ColumnUtils.special(ColumnUtils.toString(columnName))); - assert enableColumnsBug || Arrays.equals(cfh.getName(), columnName); + private RocksObj getCfh(byte[] columnName) throws RocksDBException { + var cfh = handles.get(ColumnUtils.special(ColumnUtils.toString(columnName))); + assert enableColumnsBug || Arrays.equals(cfh.v().getName(), columnName); return cfh; } @@ -1233,7 +1239,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { return db.getMapProperty(property.getName()); } else { var cfh = requireNonNull(handles.get(column)); - return db.getMapProperty(cfh, property.getName()); + return db.getMapProperty(cfh.v(), property.getName()); } } finally { closeLock.unlockRead(closeReadLock); @@ -1264,7 +1270,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { return db.getProperty(property.getName()); } else { var cfh = requireNonNull(handles.get(column)); - return db.getProperty(cfh, property.getName()); + return db.getProperty(cfh.v(), property.getName()); } } finally { closeLock.unlockRead(closeReadLock); @@ -1295,7 +1301,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { return db.getLongProperty(property.getName()); } else { var cfh = requireNonNull(handles.get(column)); - return db.getLongProperty(cfh, property.getName()); + return db.getLongProperty(cfh.v(), property.getName()); } } finally { closeLock.unlockRead(closeReadLock); @@ -1356,7 +1362,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { aggregatedStats .append(entry.getKey().name()) .append("\n") - .append(db.getProperty(entry.getValue(), "rocksdb.stats")) + .append(db.getProperty(entry.getValue().v(), "rocksdb.stats")) .append("\n"); } return aggregatedStats.toString(); @@ -1382,7 +1388,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { try { if (closed) return null; ensureOpen(); - return db.getPropertiesOfAllTables(handle.getValue()); + return db.getPropertiesOfAllTables(handle.getValue().v()); } finally { closeLock.unlockRead(closeReadLock); } @@ -1447,7 +1453,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { try { ensureOpen(); return snapshotTime.recordCallable(() -> { - var snapshot = db.getSnapshot(); + var snapshot = new RocksObj<>(db.getSnapshot()); long currentSnapshotSequenceNumber = nextSnapshotNumbers.getAndIncrement(); this.snapshotsHandles.put(currentSnapshotSequenceNumber, snapshot); return new LLSnapshot(currentSnapshotSequenceNumber); @@ -1463,15 +1469,14 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { return Mono .fromCallable(() -> { var closeReadLock = closeLock.readLock(); - try { - Snapshot dbSnapshot = this.snapshotsHandles.remove(snapshot.getSequenceNumber()); + try (var dbSnapshot = this.snapshotsHandles.remove(snapshot.getSequenceNumber())) { if (dbSnapshot == null) { throw new IOException("Snapshot " + snapshot.getSequenceNumber() + " not found!"); } if (!db.isOwningHandle()) { return null; } - db.releaseSnapshot(dbSnapshot); + db.releaseSnapshot(dbSnapshot.v()); return null; } finally { closeLock.unlockRead(closeReadLock); @@ -1489,7 +1494,13 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { statistics = null; } try { - flushAndCloseDb(db, standardCache, compressedCache, new ArrayList<>(handles.values())); + flushAndCloseDb(db, + standardCache, + compressedCache, + new ArrayList<>(handles.values().stream().map(RocksObj::v).toList()) + ); + handles.values().forEach(ResourceSupport::close); + handles.clear(); deleteUnusedOldLogFiles(); } catch (RocksDBException e) { throw new IOException(e); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMigrationReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMigrationReactiveRocksIterator.java index 43c9ba3..b3cbd41 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMigrationReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMigrationReactiveRocksIterator.java @@ -7,6 +7,7 @@ import io.netty5.buffer.api.Owned; import io.netty5.buffer.api.Send; import io.netty5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.database.LLRange; +import it.cavallium.dbengine.database.disk.rocksdb.RocksObj; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.rocksdb.ReadOptions; @@ -17,7 +18,7 @@ import reactor.util.function.Tuples; public final class LLLocalMigrationReactiveRocksIterator extends ResourceSupport { - protected static final Logger logger = LogManager.getLogger(LLLocalMigrationReactiveRocksIterator.class); + private static final Logger logger = LogManager.getLogger(LLLocalMigrationReactiveRocksIterator.class); private static final Drop DROP = new Drop<>() { @Override public void drop(LLLocalMigrationReactiveRocksIterator obj) { @@ -50,17 +51,17 @@ public final class LLLocalMigrationReactiveRocksIterator extends private final RocksDBColumn db; private LLRange rangeShared; - private ReadOptions readOptions; + private RocksObj readOptions; @SuppressWarnings({"unchecked", "rawtypes"}) public LLLocalMigrationReactiveRocksIterator(RocksDBColumn db, Send range, - ReadOptions readOptions) { + Send> readOptions) { super((Drop) (Drop) DROP); try (range) { this.db = db; this.rangeShared = range.receive(); - this.readOptions = readOptions; + this.readOptions = readOptions.receive(); } } @@ -72,8 +73,7 @@ public final class LLLocalMigrationReactiveRocksIterator extends return new RocksIterWithReadOpts(readOptions, db.newRocksIterator(false, readOptions, rangeShared, false)); }, (tuple, sink) -> { try { - //noinspection resource - var rocksIterator = tuple.iter().iterator(); + var rocksIterator = tuple.iter(); if (rocksIterator.isValid()) { byte[] key = rocksIterator.key(); byte[] value = rocksIterator.value(); @@ -97,7 +97,7 @@ public final class LLLocalMigrationReactiveRocksIterator extends @Override protected Owned prepareSend() { var range = this.rangeShared.send(); - var readOptions = this.readOptions; + var readOptions = this.readOptions.send(); return drop -> new LLLocalMigrationReactiveRocksIterator(db, range, readOptions diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java index f613465..71ddd19 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java @@ -11,6 +11,7 @@ import io.netty5.buffer.api.Send; import io.netty5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.disk.rocksdb.RocksObj; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.Nullable; @@ -56,7 +57,7 @@ public abstract class LLLocalReactiveRocksIterator extends private final RocksDBColumn db; private LLRange rangeShared; private final boolean allowNettyDirect; - private ReadOptions readOptions; + private RocksObj readOptions; private final boolean readValues; private final boolean reverse; private final boolean smallRange; @@ -65,7 +66,7 @@ public abstract class LLLocalReactiveRocksIterator extends public LLLocalReactiveRocksIterator(RocksDBColumn db, Send range, boolean allowNettyDirect, - ReadOptions readOptions, + RocksObj readOptions, boolean readValues, boolean reverse, boolean smallRange) { @@ -90,7 +91,7 @@ public abstract class LLLocalReactiveRocksIterator extends return new RocksIterWithReadOpts(readOptions, db.newRocksIterator(allowNettyDirect, readOptions, rangeShared, reverse)); }, (tuple, sink) -> { try { - var rocksIterator = tuple.iter().iterator(); + var rocksIterator = tuple.iter(); if (rocksIterator.isValid()) { Buffer key; if (allowNettyDirect) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java index 97a93b7..72dc35f 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java @@ -10,6 +10,7 @@ import it.cavallium.dbengine.database.LLSingleton; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateReturnMode; +import it.cavallium.dbengine.database.disk.rocksdb.RocksObj; import java.io.IOException; import java.util.Arrays; import java.util.concurrent.Callable; @@ -25,11 +26,8 @@ import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; public class LLLocalSingleton implements LLSingleton { - - private static final ReadOptions EMPTY_READ_OPTIONS = new ReadOptions(); - private static final WriteOptions EMPTY_WRITE_OPTIONS = new WriteOptions(); private final RocksDBColumn db; - private final Function snapshotResolver; + private final Function> snapshotResolver; private final byte[] name; private final String columnName; private final Mono> nameMono; @@ -38,7 +36,7 @@ public class LLLocalSingleton implements LLSingleton { private final Scheduler dbRScheduler; public LLLocalSingleton(RocksDBColumn db, - Function snapshotResolver, + Function> snapshotResolver, String databaseName, byte[] name, String columnName, @@ -62,8 +60,11 @@ public class LLLocalSingleton implements LLSingleton { if (Schedulers.isInNonBlockingThread()) { throw new UnsupportedOperationException("Initialized in a nonblocking thread"); } - if (defaultValue != null && db.get(EMPTY_READ_OPTIONS, this.name, true) == null) { - db.put(EMPTY_WRITE_OPTIONS, this.name, defaultValue); + try (var readOptions = new RocksObj<>(new ReadOptions()); + var writeOptions = new RocksObj<>(new WriteOptions())) { + if (defaultValue != null && db.get(readOptions, this.name, true) == null) { + db.put(writeOptions, this.name, defaultValue); + } } } @@ -71,13 +72,11 @@ public class LLLocalSingleton implements LLSingleton { return Mono.fromCallable(callable).subscribeOn(write ? dbWScheduler : dbRScheduler); } - private ReadOptions generateReadOptions(LLSnapshot snapshot, boolean orStaticOpts) { + private RocksObj generateReadOptions(LLSnapshot snapshot) { if (snapshot != null) { - return new ReadOptions().setSnapshot(snapshotResolver.apply(snapshot)); - } else if (orStaticOpts) { - return EMPTY_READ_OPTIONS; + return new RocksObj<>(new ReadOptions().setSnapshot(snapshotResolver.apply(snapshot).v())); } else { - return null; + return new RocksObj<>(new ReadOptions()); } } @@ -91,13 +90,8 @@ public class LLLocalSingleton implements LLSingleton { return nameMono.publishOn(dbRScheduler).handle((nameSend, sink) -> { try (Buffer name = nameSend.receive()) { Buffer result; - var readOptions = generateReadOptions(snapshot, true); - try { + try (var readOptions = generateReadOptions(snapshot)) { result = db.get(readOptions, name); - } finally { - if (readOptions != EMPTY_READ_OPTIONS) { - readOptions.close(); - } } if (result != null) { sink.next(result.send()); @@ -115,11 +109,11 @@ public class LLLocalSingleton implements LLSingleton { return Mono.zip(nameMono, valueMono).publishOn(dbWScheduler).handle((tuple, sink) -> { var nameSend = tuple.getT1(); var valueSend = tuple.getT2(); - try (Buffer name = nameSend.receive()) { - try (Buffer value = valueSend.receive()) { - db.put(EMPTY_WRITE_OPTIONS, name, value); - sink.next(true); - } + try (Buffer name = nameSend.receive(); + Buffer value = valueSend.receive(); + var writeOptions = new RocksObj<>(new WriteOptions())) { + db.put(writeOptions, name, value); + sink.next(true); } catch (RocksDBException ex) { sink.error(new IOException("Failed to write " + Arrays.toString(name), ex)); } @@ -128,8 +122,8 @@ public class LLLocalSingleton implements LLSingleton { private Mono unset() { return nameMono.publishOn(dbWScheduler).handle((nameSend, sink) -> { - try (Buffer name = nameSend.receive()) { - db.delete(EMPTY_WRITE_OPTIONS, name); + try (Buffer name = nameSend.receive(); var writeOptions = new RocksObj<>(new WriteOptions())) { + db.delete(writeOptions, name); } catch (RocksDBException ex) { sink.error(new IOException("Failed to read " + Arrays.toString(name), ex)); } @@ -149,8 +143,10 @@ public class LLLocalSingleton implements LLSingleton { case GET_OLD_VALUE -> UpdateAtomicResultMode.PREVIOUS; }; UpdateAtomicResult result; - try (var key = keySend.receive()) { - result = db.updateAtomic(EMPTY_READ_OPTIONS, EMPTY_WRITE_OPTIONS, key, updater, returnMode); + try (var key = keySend.receive(); + var readOptions = new RocksObj<>(new ReadOptions()); + var writeOptions = new RocksObj<>(new WriteOptions())) { + result = db.updateAtomic(readOptions, writeOptions, key, updater, returnMode); } return switch (updateReturnMode) { case NOTHING -> null; @@ -168,8 +164,10 @@ public class LLLocalSingleton implements LLSingleton { throw new UnsupportedOperationException("Called update in a nonblocking thread"); } UpdateAtomicResult result; - try (var key = keySend.receive()) { - result = db.updateAtomic(EMPTY_READ_OPTIONS, EMPTY_WRITE_OPTIONS, key, updater, DELTA); + try (var key = keySend.receive(); + var readOptions = new RocksObj<>(new ReadOptions()); + var writeOptions = new RocksObj<>(new WriteOptions())) { + result = db.updateAtomic(readOptions, writeOptions, key, updater, DELTA); } return ((UpdateAtomicResultDelta) result).delta(); }).onErrorMap(cause -> new IOException("Failed to read or write", cause)), diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLTempHugePqEnv.java b/src/main/java/it/cavallium/dbengine/database/disk/LLTempHugePqEnv.java index 9a57116..8be13c4 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLTempHugePqEnv.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLTempHugePqEnv.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine.database.disk; +import it.cavallium.dbengine.database.disk.rocksdb.RocksObj; import java.io.Closeable; import java.io.File; import java.io.IOException; @@ -58,11 +59,16 @@ public class LLTempHugePqEnv implements Closeable { var cfh = new ArrayList(); nextColumnName = new AtomicInteger(0); - env = new HugePqEnv(RocksDB.open(opts, + var db = RocksDB.open(opts, tempDirectory.toString(), List.of(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, getColumnOptions(null))), cfh - ), cfh); + ); + var cfhObjs = new ArrayList>(cfh.size()); + for (ColumnFamilyHandle columnFamilyHandle : cfh) { + cfhObjs.add(new RocksObj<>(columnFamilyHandle)); + } + env = new HugePqEnv(db, cfhObjs); initialized = true; } catch (RocksDBException | IOException e) { throw new RuntimeException(e); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java index 533f5e1..900f55b 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java @@ -10,6 +10,7 @@ import io.netty5.buffer.api.MemoryManager; import io.netty5.buffer.api.Send; import it.cavallium.dbengine.database.LLDelta; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.disk.rocksdb.RocksObj; import it.cavallium.dbengine.lucene.ExponentialPageLimits; import java.io.IOException; import java.util.concurrent.ThreadLocalRandom; @@ -24,6 +25,7 @@ import org.rocksdb.ReadOptions; import org.rocksdb.RocksDBException; import org.rocksdb.Status.Code; import org.rocksdb.Transaction; +import org.rocksdb.TransactionOptions; import org.rocksdb.WriteBatch; import org.rocksdb.WriteOptions; import reactor.core.scheduler.Schedulers; @@ -38,7 +40,7 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn cfh, MeterRegistry meterRegistry, StampedLock closeLock) { super(db, nettyDirect, alloc, databaseName, cfh, meterRegistry, closeLock); @@ -53,9 +55,9 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn tx) throws RocksDBException { try { - tx.commit(); + tx.v().commit(); return true; } catch (RocksDBException ex) { var status = ex.getStatus() != null ? ex.getStatus().getCode() : Code.Ok; @@ -67,18 +69,19 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn beginTransaction(@NotNull RocksObj writeOptions, + RocksObj txOpts) { + return new RocksObj<>(getDb().beginTransaction(writeOptions.v())); } @Override - public void write(WriteOptions writeOptions, WriteBatch writeBatch) throws RocksDBException { - getDb().write(writeOptions, writeBatch); + public void write(RocksObj writeOptions, WriteBatch writeBatch) throws RocksDBException { + getDb().write(writeOptions.v(), writeBatch); } @Override - public @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull ReadOptions readOptions, - @NotNull WriteOptions writeOptions, + public @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull RocksObj readOptions, + @NotNull RocksObj writeOptions, Buffer key, BinarySerializationFunction updater, UpdateAtomicResultMode returnMode) throws IOException { @@ -89,7 +92,8 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn(new TransactionOptions()); + var tx = beginTransaction(writeOptions, txOpts)) { boolean committedSuccessfully; int retries = 0; ExponentialPageLimits retryTime = null; @@ -97,7 +101,7 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn sentCurData; boolean changed; do { - var prevDataArray = tx.getForUpdate(readOptions, cfh, keyArray, true); + var prevDataArray = tx.v().getForUpdate(readOptions.v(), cfh.v(), keyArray, true); if (logger.isTraceEnabled()) { logger.trace(MARKER_ROCKSDB, "Reading {}: {} (before update)", @@ -135,7 +139,7 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn cfh, MeterRegistry meterRegistry, StampedLock closeLock) { super(db, nettyDirect, alloc, dbName, cfh, meterRegistry, closeLock); } @Override - protected boolean commitOptimistically(Transaction tx) throws RocksDBException { - tx.commit(); + protected boolean commitOptimistically(RocksObj tx) throws RocksDBException { + tx.v().commit(); return true; } @Override - protected Transaction beginTransaction(@NotNull WriteOptions writeOptions) { - return getDb().beginTransaction(writeOptions, DEFAULT_TX_OPTIONS); + protected RocksObj beginTransaction(@NotNull RocksObj writeOptions, + RocksObj txOpts) { + return new RocksObj<>(getDb().beginTransaction(writeOptions.v(), txOpts.v())); } @Override - public @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull ReadOptions readOptions, - @NotNull WriteOptions writeOptions, + public @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull RocksObj readOptions, + @NotNull RocksObj writeOptions, Buffer key, BinarySerializationFunction updater, UpdateAtomicResultMode returnMode) throws IOException { @@ -61,14 +63,15 @@ public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn(new TransactionOptions()); + var tx = beginTransaction(writeOptions, txOpts)) { Send sentPrevData; Send sentCurData; boolean changed; if (logger.isTraceEnabled()) { logger.trace(MARKER_ROCKSDB, "Reading {} (before update lock)", LLUtils.toStringSafe(key)); } - var prevDataArray = tx.getForUpdate(readOptions, cfh, keyArray, true); + var prevDataArray = tx.v().getForUpdate(readOptions.v(), cfh.v(), keyArray, true); try { if (logger.isTraceEnabled()) { logger.trace(MARKER_ROCKSDB, @@ -109,9 +112,9 @@ public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn readOptions, LLRange range, boolean reverse) throws RocksDBException; - default byte @Nullable [] get(@NotNull ReadOptions readOptions, + default byte @Nullable [] get(@NotNull RocksObj readOptions, byte[] key, boolean existsAlmostCertainly) throws RocksDBException { @@ -45,15 +46,15 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn { } @Nullable - Buffer get(@NotNull ReadOptions readOptions, Buffer key) throws RocksDBException; + Buffer get(@NotNull RocksObj readOptions, Buffer key) throws RocksDBException; - boolean exists(@NotNull ReadOptions readOptions, Buffer key) throws RocksDBException; + boolean exists(@NotNull RocksObj readOptions, Buffer key) throws RocksDBException; - boolean mayExists(@NotNull ReadOptions readOptions, Buffer key) throws RocksDBException; + boolean mayExists(@NotNull RocksObj readOptions, Buffer key) throws RocksDBException; - void put(@NotNull WriteOptions writeOptions, Buffer key, Buffer value) throws RocksDBException; + void put(@NotNull RocksObj writeOptions, Buffer key, Buffer value) throws RocksDBException; - default void put(@NotNull WriteOptions writeOptions, byte[] key, byte[] value) throws RocksDBException { + default void put(@NotNull RocksObj writeOptions, byte[] key, byte[] value) throws RocksDBException { var allocator = getAllocator(); try (var keyBuf = allocator.allocate(key.length)) { keyBuf.writeBytes(key); @@ -65,31 +66,33 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn { } } - @NotNull RocksDBIterator newIterator(@NotNull ReadOptions readOptions); + @NotNull RocksIteratorObj newIterator(@NotNull RocksObj readOptions, @Nullable Buffer min, @Nullable Buffer max); - @NotNull UpdateAtomicResult updateAtomic(@NotNull ReadOptions readOptions, @NotNull WriteOptions writeOptions, - Buffer key, BinarySerializationFunction updater, + @NotNull UpdateAtomicResult updateAtomic(@NotNull RocksObj readOptions, + @NotNull RocksObj writeOptions, + Buffer key, + BinarySerializationFunction updater, UpdateAtomicResultMode returnMode) throws RocksDBException, IOException; - void delete(WriteOptions writeOptions, Buffer key) throws RocksDBException; + void delete(RocksObj writeOptions, Buffer key) throws RocksDBException; - void delete(WriteOptions writeOptions, byte[] key) throws RocksDBException; + void delete(RocksObj writeOptions, byte[] key) throws RocksDBException; - List multiGetAsList(ReadOptions readOptions, List keys) throws RocksDBException; + List multiGetAsList(RocksObj readOptions, List keys) throws RocksDBException; - void write(WriteOptions writeOptions, WriteBatch writeBatch) throws RocksDBException; + void write(RocksObj writeOptions, WriteBatch writeBatch) throws RocksDBException; void suggestCompactRange() throws RocksDBException; - void compactRange(byte[] begin, byte[] end, CompactRangeOptions options) throws RocksDBException; + void compactRange(byte[] begin, byte[] end, RocksObj options) throws RocksDBException; - void flush(FlushOptions options) throws RocksDBException; + void flush(RocksObj options) throws RocksDBException; void flushWal(boolean sync) throws RocksDBException; long getLongProperty(String property) throws RocksDBException; - ColumnFamilyHandle getColumnFamilyHandle(); + RocksObj getColumnFamilyHandle(); BufferAllocator getAllocator(); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBUtils.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBUtils.java index 5b20ae2..6b78e39 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBUtils.java @@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.disk; import static com.google.common.collect.Lists.partition; +import it.cavallium.dbengine.database.disk.rocksdb.RocksObj; import it.cavallium.dbengine.rpc.current.data.Column; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -96,7 +97,7 @@ public class RocksDBUtils { } } - public static void ensureOpen(RocksDB db, @Nullable ColumnFamilyHandle cfh) { + public static void ensureOpen(RocksDB db, @Nullable RocksObj cfh) { if (Schedulers.isInNonBlockingThread()) { throw new UnsupportedOperationException("Called in a nonblocking thread"); } @@ -109,4 +110,10 @@ public class RocksDBUtils { throw new IllegalStateException("Not owning handle"); } } + + public static void ensureOwned(@Nullable RocksObj rocksObject) { + if (rocksObject != null && !rocksObject.isAccessible()) { + throw new IllegalStateException("Not owning handle"); + } + } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksIterWithReadOpts.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksIterWithReadOpts.java index b257021..62c5e49 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/RocksIterWithReadOpts.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksIterWithReadOpts.java @@ -1,9 +1,11 @@ package it.cavallium.dbengine.database.disk; import it.cavallium.dbengine.database.SafeCloseable; +import it.cavallium.dbengine.database.disk.rocksdb.RocksIteratorObj; +import it.cavallium.dbengine.database.disk.rocksdb.RocksObj; import org.rocksdb.ReadOptions; -record RocksIterWithReadOpts(ReadOptions readOptions, RocksIteratorTuple iter) implements SafeCloseable { +public record RocksIterWithReadOpts(RocksObj readOptions, RocksIteratorObj iter) implements SafeCloseable { @Override public void close() { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksIteratorTuple.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksIteratorTuple.java deleted file mode 100644 index 600b927..0000000 --- a/src/main/java/it/cavallium/dbengine/database/disk/RocksIteratorTuple.java +++ /dev/null @@ -1,23 +0,0 @@ -package it.cavallium.dbengine.database.disk; - -import it.cavallium.dbengine.database.SafeCloseable; -import java.util.List; -import org.jetbrains.annotations.NotNull; -import org.rocksdb.AbstractImmutableNativeReference; -import org.rocksdb.ReadOptions; -import org.rocksdb.RocksIterator; -import org.rocksdb.RocksObject; - -public record RocksIteratorTuple(@NotNull RocksDBIterator iterator, - @NotNull ReleasableSlice sliceMin, - @NotNull ReleasableSlice sliceMax, - @NotNull SafeCloseable seekTo) implements SafeCloseable { - - @Override - public void close() { - iterator.close(); - sliceMin.close(); - sliceMax.close(); - seekTo.close(); - } -} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/StandardRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/StandardRocksDBColumn.java index c455c23..7d5ff71 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/StandardRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/StandardRocksDBColumn.java @@ -7,6 +7,7 @@ import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.BufferAllocator; import it.cavallium.dbengine.database.LLDelta; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.disk.rocksdb.RocksObj; import java.io.IOException; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.StampedLock; @@ -16,6 +17,7 @@ import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.Transaction; +import org.rocksdb.TransactionOptions; import org.rocksdb.WriteOptions; public final class StandardRocksDBColumn extends AbstractRocksDBColumn { @@ -24,23 +26,26 @@ public final class StandardRocksDBColumn extends AbstractRocksDBColumn boolean nettyDirect, BufferAllocator alloc, String dbName, - ColumnFamilyHandle cfh, MeterRegistry meterRegistry, StampedLock closeLock) { + RocksObj cfh, + MeterRegistry meterRegistry, + StampedLock closeLock) { super(db, nettyDirect, alloc, dbName, cfh, meterRegistry, closeLock); } @Override - protected boolean commitOptimistically(Transaction tx) { + protected boolean commitOptimistically(RocksObj tx) { throw new UnsupportedOperationException("Transactions not supported"); } @Override - protected Transaction beginTransaction(@NotNull WriteOptions writeOptions) { + protected RocksObj beginTransaction(@NotNull RocksObj writeOptions, + RocksObj txOpts) { throw new UnsupportedOperationException("Transactions not supported"); } @Override - public @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull ReadOptions readOptions, - @NotNull WriteOptions writeOptions, + public @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull RocksObj readOptions, + @NotNull RocksObj writeOptions, Buffer key, BinarySerializationFunction updater, UpdateAtomicResultMode returnMode) throws IOException { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLAbstractSlice.java b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLAbstractSlice.java new file mode 100644 index 0000000..8e06dd5 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLAbstractSlice.java @@ -0,0 +1,58 @@ +package it.cavallium.dbengine.database.disk.rocksdb; + +import io.netty5.buffer.api.Drop; +import io.netty5.buffer.api.Owned; +import io.netty5.buffer.api.internal.ResourceSupport; +import org.rocksdb.AbstractSlice; +import org.rocksdb.DirectSlice; + +public abstract class LLAbstractSlice, U> extends ResourceSupport, LLAbstractSlice> { + + protected static final Drop> DROP = new Drop<>() { + @Override + public void drop(LLAbstractSlice obj) { + if (obj.val != null) { + obj.val.close(); + } + } + + @Override + public Drop> fork() { + return this; + } + + @Override + public void attach(LLAbstractSlice obj) { + + } + }; + + private T val; + + public LLAbstractSlice(T val) { + //noinspection unchecked + super((Drop>) (Drop) DROP); + this.val = val; + } + + public T getNative() { + return val; + } + + @Override + protected final void makeInaccessible() { + this.val = null; + } + + @Override + protected final Owned> prepareSend() { + var val = this.val; + return drop -> { + var instance = createInstance(val); + drop.attach(instance); + return instance; + }; + } + + protected abstract LLAbstractSlice createInstance(T val); +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLColumnFamilyHandle.java b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLColumnFamilyHandle.java new file mode 100644 index 0000000..c1137d9 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLColumnFamilyHandle.java @@ -0,0 +1,60 @@ +package it.cavallium.dbengine.database.disk.rocksdb; + +import io.netty5.buffer.api.Drop; +import io.netty5.buffer.api.Owned; +import io.netty5.buffer.api.internal.ResourceSupport; +import org.rocksdb.AbstractSlice; +import org.rocksdb.ColumnFamilyHandle; + +public final class LLColumnFamilyHandle extends ResourceSupport { + + private static final Drop DROP = new Drop<>() { + @Override + public void drop(LLColumnFamilyHandle obj) { + if (obj.val != null) { + obj.val.close(); + } + } + + @Override + public Drop fork() { + return this; + } + + @Override + public void attach(LLColumnFamilyHandle obj) { + + } + }; + + private ColumnFamilyHandle val; + + public LLColumnFamilyHandle(ColumnFamilyHandle val) { + super(DROP); + this.val = val; + } + + public ColumnFamilyHandle getNative() { + return val; + } + + @Override + protected RuntimeException createResourceClosedException() { + return new IllegalStateException("Closed"); + } + + @Override + protected void makeInaccessible() { + this.val = null; + } + + @Override + protected Owned prepareSend() { + var val = this.val; + return drop -> { + var instance = new LLColumnFamilyHandle(val); + drop.attach(instance); + return instance; + }; + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLCompactionOptions.java b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLCompactionOptions.java new file mode 100644 index 0000000..223eca2 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLCompactionOptions.java @@ -0,0 +1,59 @@ +package it.cavallium.dbengine.database.disk.rocksdb; + +import io.netty5.buffer.api.Drop; +import io.netty5.buffer.api.Owned; +import io.netty5.buffer.api.internal.ResourceSupport; +import org.rocksdb.CompactionOptions; + +public final class LLCompactionOptions extends ResourceSupport { + + private static final Drop DROP = new Drop<>() { + @Override + public void drop(LLCompactionOptions obj) { + if (obj.val != null) { + obj.val.close(); + } + } + + @Override + public Drop fork() { + return this; + } + + @Override + public void attach(LLCompactionOptions obj) { + + } + }; + + private CompactionOptions val; + + public LLCompactionOptions(CompactionOptions val) { + super(DROP); + this.val = val; + } + + public CompactionOptions getNative() { + return val; + } + + @Override + protected RuntimeException createResourceClosedException() { + return new IllegalStateException("Closed"); + } + + @Override + protected void makeInaccessible() { + this.val = null; + } + + @Override + protected Owned prepareSend() { + var val = this.val; + return drop -> { + var instance = new LLCompactionOptions(val); + drop.attach(instance); + return instance; + }; + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLDirectSlice.java b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLDirectSlice.java new file mode 100644 index 0000000..9b57917 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLDirectSlice.java @@ -0,0 +1,29 @@ +package it.cavallium.dbengine.database.disk.rocksdb; + +import io.netty5.buffer.api.Drop; +import io.netty5.buffer.api.Owned; +import io.netty5.buffer.api.internal.ResourceSupport; +import java.nio.ByteBuffer; +import org.rocksdb.AbstractSlice; +import org.rocksdb.DirectSlice; + +public final class LLDirectSlice extends LLAbstractSlice { + + public LLDirectSlice(DirectSlice val) { + super(val); + } + + public DirectSlice getNative() { + return super.getNative(); + } + + @Override + protected LLAbstractSlice createInstance(DirectSlice val) { + return new LLDirectSlice(val); + } + + @Override + protected RuntimeException createResourceClosedException() { + return new IllegalStateException("Closed"); + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLReadOptions.java b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLReadOptions.java new file mode 100644 index 0000000..bf94d1a --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLReadOptions.java @@ -0,0 +1,60 @@ +package it.cavallium.dbengine.database.disk.rocksdb; + +import io.netty5.buffer.api.Buffer; +import io.netty5.buffer.api.Drop; +import io.netty5.buffer.api.Owned; +import io.netty5.buffer.api.Send; +import io.netty5.buffer.api.internal.ResourceSupport; +import it.cavallium.dbengine.database.LLDelta; +import it.cavallium.dbengine.database.SafeCloseable; +import it.cavallium.dbengine.database.disk.LLLocalGroupedReactiveRocksIterator; +import org.rocksdb.ReadOptions; + +public final class LLReadOptions extends ResourceSupport { + + private static final Drop DROP = new Drop<>() { + @Override + public void drop(LLReadOptions obj) { + if (obj.val != null) { + obj.val.close(); + } + } + + @Override + public Drop fork() { + return this; + } + + @Override + public void attach(LLReadOptions obj) { + + } + }; + + private ReadOptions val; + + public LLReadOptions(ReadOptions val) { + super(DROP); + this.val = val; + } + + @Override + protected RuntimeException createResourceClosedException() { + return new IllegalStateException("Closed"); + } + + @Override + protected void makeInaccessible() { + this.val = null; + } + + @Override + protected Owned prepareSend() { + var val = this.val; + return drop -> { + var instance = new LLReadOptions(val); + drop.attach(instance); + return instance; + }; + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLWriteOptions.java b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLWriteOptions.java new file mode 100644 index 0000000..86585a1 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLWriteOptions.java @@ -0,0 +1,55 @@ +package it.cavallium.dbengine.database.disk.rocksdb; + +import io.netty5.buffer.api.Drop; +import io.netty5.buffer.api.Owned; +import io.netty5.buffer.api.internal.ResourceSupport; +import org.rocksdb.WriteOptions; + +public final class LLWriteOptions extends ResourceSupport { + + private static final Drop DROP = new Drop<>() { + @Override + public void drop(LLWriteOptions obj) { + if (obj.val != null) { + obj.val.close(); + } + } + + @Override + public Drop fork() { + return this; + } + + @Override + public void attach(LLWriteOptions obj) { + + } + }; + + private WriteOptions val; + + public LLWriteOptions(WriteOptions val) { + super(DROP); + this.val = val; + } + + @Override + protected RuntimeException createResourceClosedException() { + return new IllegalStateException("Closed"); + } + + @Override + protected void makeInaccessible() { + this.val = null; + } + + @Override + protected Owned prepareSend() { + var val = this.val; + return drop -> { + var instance = new LLWriteOptions(val); + drop.attach(instance); + return instance; + }; + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/RocksIteratorObj.java similarity index 51% rename from src/main/java/it/cavallium/dbengine/database/disk/RocksDBIterator.java rename to src/main/java/it/cavallium/dbengine/database/disk/rocksdb/RocksIteratorObj.java index 0a82ecb..f012166 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/RocksIteratorObj.java @@ -1,21 +1,52 @@ -package it.cavallium.dbengine.database.disk; +package it.cavallium.dbengine.database.disk.rocksdb; import static it.cavallium.dbengine.database.LLUtils.isReadOnlyDirect; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Timer; import io.netty5.buffer.api.Buffer; +import io.netty5.buffer.api.Drop; +import io.netty5.buffer.api.Owned; import io.netty5.buffer.api.ReadableComponent; +import io.netty5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.database.LLUtils; -import it.cavallium.dbengine.database.SafeCloseable; import java.nio.ByteBuffer; -import org.jetbrains.annotations.Nullable; +import org.rocksdb.AbstractSlice; import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; -public class RocksDBIterator implements SafeCloseable { +public class RocksIteratorObj extends ResourceSupport { - private final RocksIterator rocksIterator; + protected static final Drop DROP = new Drop<>() { + @Override + public void drop(RocksIteratorObj obj) { + if (obj.rocksIterator != null) { + obj.rocksIterator.close(); + } + if (obj.sliceMin != null) { + obj.sliceMin.close(); + } + if (obj.sliceMax != null) { + obj.sliceMax.close(); + } + } + + @Override + public Drop fork() { + return this; + } + + @Override + public void attach(RocksIteratorObj obj) { + + } + }; + + private RocksIterator rocksIterator; + private RocksObj> sliceMin; + private RocksObj> sliceMax; + private Buffer min; + private Buffer max; private final boolean allowNettyDirect; private final Counter startedIterSeek; private final Counter endedIterSeek; @@ -23,8 +54,14 @@ public class RocksDBIterator implements SafeCloseable { private final Counter startedIterNext; private final Counter endedIterNext; private final Timer iterNextTime; + private Object seekingFrom; + private Object seekingTo; - public RocksDBIterator(RocksIterator rocksIterator, + public RocksIteratorObj(RocksIterator rocksIterator, + RocksObj> sliceMin, + RocksObj> sliceMax, + Buffer min, + Buffer max, boolean allowNettyDirect, Counter startedIterSeek, Counter endedIterSeek, @@ -32,6 +69,42 @@ public class RocksDBIterator implements SafeCloseable { Counter startedIterNext, Counter endedIterNext, Timer iterNextTime) { + this(rocksIterator, + sliceMin, + sliceMax, + min, + max, + allowNettyDirect, + startedIterSeek, + endedIterSeek, + iterSeekTime, + startedIterNext, + endedIterNext, + iterNextTime, + null, + null + ); + } + + private RocksIteratorObj(RocksIterator rocksIterator, + RocksObj> sliceMin, + RocksObj> sliceMax, + Buffer min, + Buffer max, + boolean allowNettyDirect, + Counter startedIterSeek, + Counter endedIterSeek, + Timer iterSeekTime, + Counter startedIterNext, + Counter endedIterNext, + Timer iterNextTime, + Object seekingFrom, + Object seekingTo) { + super(DROP); + this.sliceMin = sliceMin; + this.sliceMax = sliceMax; + this.min = min; + this.max = max; this.rocksIterator = rocksIterator; this.allowNettyDirect = allowNettyDirect; this.startedIterSeek = startedIterSeek; @@ -40,11 +113,8 @@ public class RocksDBIterator implements SafeCloseable { this.startedIterNext = startedIterNext; this.endedIterNext = endedIterNext; this.iterNextTime = iterNextTime; - } - - @Override - public void close() { - rocksIterator.close(); + this.seekingFrom = seekingFrom; + this.seekingTo = seekingTo; } public void seek(ByteBuffer seekBuf) throws RocksDBException { @@ -90,25 +160,25 @@ public class RocksDBIterator implements SafeCloseable { /** * Useful for reverse iterations */ - @Nullable - public SafeCloseable seekFrom(Buffer key) { + public void seekFrom(Buffer key) { if (allowNettyDirect && isReadOnlyDirect(key)) { ByteBuffer keyInternalByteBuffer = ((ReadableComponent) key).readableBuffer(); assert keyInternalByteBuffer.position() == 0; rocksIterator.seekForPrev(keyInternalByteBuffer); // This is useful to retain the key buffer in memory and avoid deallocations - return key::isAccessible; + this.seekingFrom = key; } else { - rocksIterator.seekForPrev(LLUtils.toArray(key)); - return null; + var keyArray = LLUtils.toArray(key); + rocksIterator.seekForPrev(keyArray); + // This is useful to retain the key buffer in memory and avoid deallocations + this.seekingFrom = keyArray; } } /** * Useful for forward iterations */ - @Nullable - public SafeCloseable seekTo(Buffer key) { + public void seekTo(Buffer key) { if (allowNettyDirect && isReadOnlyDirect(key)) { ByteBuffer keyInternalByteBuffer = ((ReadableComponent) key).readableBuffer(); assert keyInternalByteBuffer.position() == 0; @@ -116,13 +186,14 @@ public class RocksDBIterator implements SafeCloseable { iterSeekTime.record(() -> rocksIterator.seek(keyInternalByteBuffer)); endedIterSeek.increment(); // This is useful to retain the key buffer in memory and avoid deallocations - return key::isAccessible; + this.seekingTo = key; } else { - var array = LLUtils.toArray(key); + var keyArray = LLUtils.toArray(key); startedIterSeek.increment(); - iterSeekTime.record(() -> rocksIterator.seek(array)); + iterSeekTime.record(() -> rocksIterator.seek(keyArray)); endedIterSeek.increment(); - return null; + // This is useful to retain the key buffer in memory and avoid deallocations + this.seekingTo = keyArray; } } @@ -173,4 +244,50 @@ public class RocksDBIterator implements SafeCloseable { rocksIterator.prev(); } } + + @Override + protected void makeInaccessible() { + this.rocksIterator = null; + this.sliceMin = null; + this.sliceMax = null; + this.min = null; + this.max = null; + this.seekingFrom = null; + this.seekingTo = null; + } + + @Override + protected RuntimeException createResourceClosedException() { + return new IllegalStateException("Closed"); + } + + @Override + protected Owned prepareSend() { + var rocksIterator = this.rocksIterator; + var sliceMin = this.sliceMin; + var sliceMax = this.sliceMax; + var minSend = this.min != null ? this.min.send() : null; + var maxSend = this.max != null ? this.max.send() : null; + var seekingFrom = this.seekingFrom; + var seekingTo = this.seekingTo; + return drop -> { + var instance = new RocksIteratorObj(rocksIterator, + sliceMin, + sliceMax, + minSend != null ? minSend.receive() : null, + maxSend != null ? maxSend.receive() : null, + allowNettyDirect, + startedIterSeek, + endedIterSeek, + iterSeekTime, + startedIterNext, + endedIterNext, + iterNextTime, + seekingFrom, + seekingTo + ); + drop.attach(instance); + return instance; + }; + } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/RocksObj.java b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/RocksObj.java new file mode 100644 index 0000000..41a9ddc --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/RocksObj.java @@ -0,0 +1,62 @@ +package it.cavallium.dbengine.database.disk.rocksdb; + +import io.netty5.buffer.api.Drop; +import io.netty5.buffer.api.Owned; +import io.netty5.buffer.api.internal.ResourceSupport; +import org.jetbrains.annotations.NotNull; +import org.rocksdb.AbstractNativeReference; + +public class RocksObj extends ResourceSupport, RocksObj> { + + private static final Drop> DROP = new Drop<>() { + @Override + public void drop(RocksObj obj) { + if (obj.val != null) { + obj.val.close(); + } + } + + @Override + public Drop> fork() { + return this; + } + + @Override + public void attach(RocksObj obj) { + + } + }; + + private T val; + + public RocksObj(T val) { + //noinspection unchecked + super((Drop>) (Drop) DROP); + this.val = val; + } + + @NotNull + public T v() { + return val; + } + + @Override + protected void makeInaccessible() { + this.val = null; + } + + @Override + protected RuntimeException createResourceClosedException() { + return new IllegalStateException("Closed"); + } + + @Override + protected Owned> prepareSend() { + var val = this.val; + return drop -> { + var instance = new RocksObj<>(val); + drop.attach(instance); + return instance; + }; + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/HugePqArray.java b/src/main/java/it/cavallium/dbengine/lucene/HugePqArray.java index dd97940..d1783fb 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/HugePqArray.java +++ b/src/main/java/it/cavallium/dbengine/lucene/HugePqArray.java @@ -6,6 +6,7 @@ import it.cavallium.dbengine.database.SafeCloseable; import it.cavallium.dbengine.database.disk.LLTempHugePqEnv; import it.cavallium.dbengine.database.disk.HugePqEnv; import it.cavallium.dbengine.database.disk.StandardRocksDBColumn; +import it.cavallium.dbengine.database.disk.rocksdb.RocksObj; import java.util.concurrent.atomic.AtomicBoolean; import org.jetbrains.annotations.Nullable; import org.rocksdb.ReadOptions; @@ -25,8 +26,6 @@ public class HugePqArray implements IArray, SafeCloseable { private final HugePqEnv env; private final int hugePqId; private final StandardRocksDBColumn rocksDB; - private static final WriteOptions WRITE_OPTIONS = new WriteOptions().setDisableWAL(true).setSync(false); - private static final ReadOptions READ_OPTIONS = new ReadOptions().setVerifyChecksums(false); private final V defaultValue; private final long virtualSize; @@ -42,6 +41,14 @@ public class HugePqArray implements IArray, SafeCloseable { this.virtualSize = size; } + private static RocksObj newReadOptions() { + return new RocksObj<>(new ReadOptions().setVerifyChecksums(false)); + } + + private static RocksObj newWriteOptions() { + return new RocksObj<>(new WriteOptions().setDisableWAL(true).setSync(false)); + } + public HugePqCodec getValueCodec() { return valueCodec; } @@ -59,9 +66,10 @@ public class HugePqArray implements IArray, SafeCloseable { ensureBounds(index); ensureThread(); var keyBuf = allocate(Long.BYTES); - try (var valueBuf = valueCodec.serialize(this::allocate, value); keyBuf) { + try (var writeOptions = newWriteOptions(); + var valueBuf = valueCodec.serialize(this::allocate, value); keyBuf) { keyBuf.writeLong(index); - rocksDB.put(WRITE_OPTIONS, keyBuf, valueBuf); + rocksDB.put(writeOptions, keyBuf, valueBuf); } catch (RocksDBException e) { throw new IllegalStateException(e); } @@ -71,10 +79,10 @@ public class HugePqArray implements IArray, SafeCloseable { public void reset(long index) { ensureBounds(index); ensureThread(); - var keyBuf = allocate(Long.BYTES); - try (keyBuf) { + try (var writeOptions = newWriteOptions(); + var keyBuf = allocate(Long.BYTES)) { keyBuf.writeLong(index); - rocksDB.delete(WRITE_OPTIONS, keyBuf); + rocksDB.delete(writeOptions, keyBuf); } catch (RocksDBException e) { throw new IllegalStateException(e); } @@ -88,7 +96,8 @@ public class HugePqArray implements IArray, SafeCloseable { var keyBuf = allocate(Long.BYTES); try (keyBuf) { keyBuf.writeLong(index); - try (var value = rocksDB.get(READ_OPTIONS, keyBuf)) { + try (var readOptions = newReadOptions(); + var value = rocksDB.get(readOptions, keyBuf)) { if (value == null) { return null; } diff --git a/src/main/java/it/cavallium/dbengine/lucene/HugePqPriorityQueue.java b/src/main/java/it/cavallium/dbengine/lucene/HugePqPriorityQueue.java index 51aa3d1..47b12b8 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/HugePqPriorityQueue.java +++ b/src/main/java/it/cavallium/dbengine/lucene/HugePqPriorityQueue.java @@ -6,10 +6,12 @@ import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.SafeCloseable; import it.cavallium.dbengine.database.disk.LLTempHugePqEnv; import it.cavallium.dbengine.database.disk.HugePqEnv; -import it.cavallium.dbengine.database.disk.RocksIteratorTuple; +import it.cavallium.dbengine.database.disk.RocksIterWithReadOpts; import it.cavallium.dbengine.database.disk.StandardRocksDBColumn; import it.cavallium.dbengine.database.disk.UpdateAtomicResultMode; import it.cavallium.dbengine.database.disk.UpdateAtomicResultPrevious; +import it.cavallium.dbengine.database.disk.rocksdb.RocksIteratorObj; +import it.cavallium.dbengine.database.disk.rocksdb.RocksObj; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -37,8 +39,6 @@ public class HugePqPriorityQueue implements PriorityQueue, Reversable codec; private long size = 0; @@ -51,6 +51,14 @@ public class HugePqPriorityQueue implements PriorityQueue, Reversable newReadOptions() { + return new RocksObj<>(new ReadOptions().setVerifyChecksums(false)); + } + + private static RocksObj newWriteOptions() { + return new RocksObj<>(new WriteOptions().setDisableWAL(true).setSync(false)); + } + private Buffer allocate(int size) { return rocksDB.getAllocator().allocate(size); } @@ -69,7 +77,9 @@ public class HugePqPriorityQueue implements PriorityQueue, Reversable implements PriorityQueue, Reversable implements PriorityQueue, Reversable implements PriorityQueue, Reversable implements PriorityQueue, Reversable implements PriorityQueue, Reversable iterate(long skips, boolean reverse) { - return Flux., RocksIteratorTuple>generate(() -> { - var it = rocksDB.newRocksIterator(true, READ_OPTIONS, LLRange.all(), reverse); - var rocksIterator = it.iterator(); + return Flux., RocksIterWithReadOpts>generate(() -> { + var readOptions = newReadOptions(); + var rocksIterator = rocksDB.newRocksIterator(true, readOptions, LLRange.all(), reverse); if (reverse) { rocksIterator.seekToLast(); } else { @@ -249,9 +260,9 @@ public class HugePqPriorityQueue implements PriorityQueue, Reversable { - var rocksIterator = itT.iterator(); + return new RocksIterWithReadOpts(readOptions, rocksIterator); + }, (t, sink) -> { + var rocksIterator = t.iter(); if (rocksIterator.isValid()) { try (var keyBuf = rocksDB.getAllocator().copyOf(rocksIterator.key()); var valBuf = rocksDB.getAllocator().copyOf(rocksIterator.value())) { @@ -284,8 +295,8 @@ public class HugePqPriorityQueue implements PriorityQueue, Reversable item); + return t; + }, RocksIterWithReadOpts::close).concatMapIterable(item -> item); } @Override diff --git a/src/main/java/module-info.java b/src/main/java/module-info.java index 64174da..1f66aa2 100644 --- a/src/main/java/module-info.java +++ b/src/main/java/module-info.java @@ -18,6 +18,7 @@ module dbengine { opens it.cavallium.dbengine.database.remote; exports it.cavallium.dbengine; exports it.cavallium.dbengine.utils; + exports it.cavallium.dbengine.database.disk.rocksdb; requires org.jetbrains.annotations; requires reactor.core; requires com.google.common;