diff --git a/pom.xml b/pom.xml index e2c3ad5..1886f31 100644 --- a/pom.xml +++ b/pom.xml @@ -377,7 +377,7 @@ org.rocksdb rocksdbjni - 6.25.3 + 6.26.1 org.apache.lucene @@ -538,8 +538,11 @@ io.soabase.recordbuilder.processor.RecordBuilderProcessor false - --enable-preview + + --enable-preview --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED + --add-opens=java.base/java.lang=ALL-UNNAMED + --add-opens=java.base/java.nio=ALL-UNNAMED 17 17 @@ -575,7 +578,7 @@ - --enable-preview --add-modules jdk.incubator.foreign -Dforeign.restricted=permit --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED --enable-native-access=ALL-UNNAMED + --enable-preview --add-modules jdk.incubator.foreign -Dforeign.restricted=permit --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --enable-native-access=ALL-UNNAMED ci diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 29cee39..c793f76 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -1,14 +1,22 @@ package it.cavallium.dbengine.database; +import static io.net5.buffer.api.StandardAllocationTypes.OFF_HEAP; import static org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; +import io.net5.buffer.api.AllocatorControl; import io.net5.buffer.api.Buffer; import io.net5.buffer.api.BufferAllocator; import io.net5.buffer.api.CompositeBuffer; +import io.net5.buffer.api.MemoryManager; +import io.net5.buffer.api.ReadableComponent; import io.net5.buffer.api.Resource; import io.net5.buffer.api.Send; +import io.net5.buffer.api.WritableComponent; +import io.net5.buffer.api.bytebuffer.ByteBufferMemoryManager; +import io.net5.buffer.api.internal.Statics; +import io.net5.buffer.api.unsafe.UnsafeMemoryManager; import io.net5.util.IllegalReferenceCountException; import io.net5.util.internal.PlatformDependent; import it.cavallium.dbengine.database.collections.DatabaseStage; @@ -27,6 +35,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.ToIntFunction; import org.apache.lucene.document.Document; @@ -65,7 +74,11 @@ public class LLUtils { public static final Marker MARKER_ROCKSDB = MarkerFactory.getMarker("ROCKSDB"); public static final Marker MARKER_LUCENE = MarkerFactory.getMarker("LUCENE"); - private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocateDirect(0); + public static final int INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES = 4096; + public static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocateDirect(0).asReadOnlyBuffer(); + @Nullable + private static final MemoryManager UNSAFE_MEMORY_MANAGER; + private static final AllocatorControl NO_OP_ALLOCATION_CONTROL = (AllocatorControl) BufferAllocator.offHeapUnpooled(); private static final byte[] RESPONSE_TRUE = new byte[]{1}; private static final byte[] RESPONSE_FALSE = new byte[]{0}; private static final byte[] RESPONSE_TRUE_BUF = new byte[]{1}; @@ -73,6 +86,13 @@ public class LLUtils { public static final byte[][] LEXICONOGRAPHIC_ITERATION_SEEKS = new byte[256][1]; static { + MemoryManager unsafeMemoryManager; + try { + unsafeMemoryManager = new UnsafeMemoryManager(); + } catch (UnsupportedOperationException ignored) { + unsafeMemoryManager = new ByteBufferMemoryManager(); + } + UNSAFE_MEMORY_MANAGER = unsafeMemoryManager; for (int i1 = 0; i1 < 256; i1++) { var b = LEXICONOGRAPHIC_ITERATION_SEEKS[i1]; b[0] = (byte) i1; @@ -420,47 +440,36 @@ public class LLUtils { @SuppressWarnings("ConstantConditions") @Nullable public static Buffer readNullableDirectNioBuffer(BufferAllocator alloc, ToIntFunction reader) { - ByteBuffer directBuffer; - Buffer buffer; - { - var direct = LLUtils.newDirect(alloc, 4096); - directBuffer = direct.byteBuffer(); - buffer = direct.buffer().receive(); - } + var directBuffer = LLUtils.allocateShared(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES); + assert directBuffer.readerOffset() == 0; + assert directBuffer.writerOffset() == 0; + var directBufferWriter = ((WritableComponent) directBuffer).writableBuffer(); + assert directBufferWriter.position() == 0; + assert directBufferWriter.isDirect(); try { - int size; - do { - directBuffer.limit(directBuffer.capacity()); - assert directBuffer.isDirect(); - size = reader.applyAsInt(directBuffer); - if (size != RocksDB.NOT_FOUND) { - if (size == directBuffer.limit()) { - buffer.readerOffset(0).writerOffset(size); - return buffer; - } else { - assert size > directBuffer.limit(); - assert directBuffer.limit() > 0; - // Free the buffer - if (directBuffer != null) { - // todo: check if free is needed - PlatformDependent.freeDirectBuffer(directBuffer); - directBuffer = null; - } - directBuffer = LLUtils.obtainDirect(buffer, true); - buffer.ensureWritable(size); - } - } - } while (size != RocksDB.NOT_FOUND); - - // Return null if size is equal to RocksDB.NOT_FOUND - return null; - } finally { - // Free the buffer - if (directBuffer != null) { - // todo: check if free is needed - PlatformDependent.freeDirectBuffer(directBuffer); - directBuffer = null; + int trueSize = reader.applyAsInt(directBufferWriter); + if (trueSize == RocksDB.NOT_FOUND) { + directBuffer.close(); + return null; } + int readSize = directBufferWriter.limit(); + if (trueSize < readSize) { + throw new IllegalStateException(); + } else if (trueSize == readSize) { + return directBuffer.writerOffset(directBufferWriter.limit()); + } else { + assert directBuffer.readerOffset() == 0; + directBuffer.ensureWritable(trueSize); + assert directBuffer.writerOffset() == 0; + directBufferWriter = ((WritableComponent) directBuffer).writableBuffer(); + assert directBufferWriter.position() == 0; + assert directBufferWriter.isDirect(); + reader.applyAsInt(directBufferWriter); + return directBuffer.writerOffset(trueSize); + } + } catch (Throwable t) { + directBuffer.close(); + throw t; } } @@ -614,86 +623,63 @@ public class LLUtils { } } - public static record DirectBuffer(@NotNull Send buffer, @NotNull ByteBuffer byteBuffer) {} + @Deprecated + public record DirectBuffer(@NotNull Buffer buffer, @NotNull ByteBuffer byteBuffer) {} @NotNull - public static DirectBuffer newDirect(BufferAllocator allocator, int size) { - try (var buf = allocator.allocate(size)) { - var direct = obtainDirect(buf, true); - return new DirectBuffer(buf.send(), direct); - } + public static ByteBuffer newDirect(int size) { + return ByteBuffer.allocateDirect(size); } - @NotNull - public static DirectBuffer convertToReadableDirect(BufferAllocator allocator, Send content) { - try (var buf = content.receive()) { - DirectBuffer result; - if (buf.countComponents() == 1) { - var direct = obtainDirect(buf, false); - result = new DirectBuffer(buf.send(), direct); - } else { - var direct = newDirect(allocator, buf.readableBytes()); - try (var buf2 = direct.buffer().receive()) { - buf.copyInto(buf.readerOffset(), buf2, buf2.writerOffset(), buf.readableBytes()); - buf2.writerOffset(buf2.writerOffset() + buf.readableBytes()); - assert buf2.readableBytes() == buf.readableBytes(); - result = new DirectBuffer(buf2.send(), direct.byteBuffer()); - } - } - return result; - } + /** + * The returned object will be also of type {@link WritableComponent} {@link ReadableComponent} + */ + public static Buffer allocateShared(int size) { + return LLUtils.UNSAFE_MEMORY_MANAGER.allocateShared(NO_OP_ALLOCATION_CONTROL, size, Statics.NO_OP_DROP, OFF_HEAP); } + /** + * Get the internal byte buffer, if present + */ + @Nullable + public static ByteBuffer asReadOnlyDirect(Buffer inputBuffer) { + var bytes = inputBuffer.readableBytes(); + if (inputBuffer instanceof ReadableComponent rc) { + var componentBuffer = rc.readableBuffer(); + if (componentBuffer != null && componentBuffer.isDirect()) { + assert componentBuffer.isReadOnly(); + assert componentBuffer.isDirect(); + return componentBuffer; + } + } else if (inputBuffer.countReadableComponents() == 1) { + AtomicReference bufferRef = new AtomicReference<>(); + inputBuffer.forEachReadable(0, (index, comp) -> { + var compBuffer = comp.readableBuffer(); + if (compBuffer != null && compBuffer.isDirect()) { + bufferRef.setPlain(compBuffer); + } + return false; + }); + var buffer = bufferRef.getPlain(); + if (buffer != null) { + assert buffer.isReadOnly(); + assert buffer.isDirect(); + return buffer; + } + } + + return null; + } + + /** + * Copy the buffer into a newly allocated direct buffer + */ @NotNull - public static ByteBuffer obtainDirect(Buffer buffer, boolean writable) { - if (!PlatformDependent.hasUnsafe()) { - throw new UnsupportedOperationException("Please enable unsafe support or disable netty direct buffers", - PlatformDependent.getUnsafeUnavailabilityCause() - ); - } - if (!MemorySegmentUtils.isSupported()) { - throw new UnsupportedOperationException("Foreign Memory Access API support is disabled." - + " Please set \"" + MemorySegmentUtils.getSuggestedArgs() + "\"", - MemorySegmentUtils.getUnsupportedCause() - ); - } - assert buffer.isAccessible(); - if (buffer.readOnly()) { - throw new IllegalStateException("Buffer is read only"); - } - buffer.compact(); - assert buffer.readerOffset() == 0; - AtomicLong nativeAddress = new AtomicLong(0); - if (buffer.countComponents() == 1) { - if (writable) { - if (buffer.countWritableComponents() == 1) { - buffer.forEachWritable(0, (i, c) -> { - assert c.writableNativeAddress() != 0; - nativeAddress.setPlain(c.writableNativeAddress()); - return false; - }); - } - } else { - var readableComponents = buffer.countReadableComponents(); - if (readableComponents == 1) { - buffer.forEachReadable(0, (i, c) -> { - assert c.readableNativeAddress() != 0; - nativeAddress.setPlain(c.readableNativeAddress()); - return false; - }); - } - } - } - if (nativeAddress.getPlain() == 0) { - if (buffer.capacity() == 0) { - return EMPTY_BYTE_BUFFER; - } - if (!buffer.isAccessible()) { - throw new IllegalStateException("Buffer is not accessible"); - } - throw new IllegalStateException("Buffer is not direct"); - } - return MemorySegmentUtils.directBuffer(nativeAddress.getPlain(), writable ? buffer.capacity() : buffer.writerOffset()); + public static ByteBuffer copyToNewDirectBuffer(Buffer inputBuffer) { + int bytes = inputBuffer.readableBytes(); + var directBuffer = ByteBuffer.allocateDirect(bytes); + inputBuffer.copyInto(inputBuffer.readerOffset(), directBuffer, 0, bytes); + return directBuffer.asReadOnlyBuffer(); } public static Buffer fromByteArray(BufferAllocator alloc, byte[] array) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java index be54afd..19c1682 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java @@ -1,13 +1,22 @@ package it.cavallium.dbengine.database.disk; +import static io.net5.buffer.api.StandardAllocationTypes.OFF_HEAP; +import static it.cavallium.dbengine.database.LLUtils.INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES; import static java.util.Objects.requireNonNull; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.MeterRegistry; +import io.net5.buffer.api.AllocationType; +import io.net5.buffer.api.AllocatorControl; import io.net5.buffer.api.Buffer; import io.net5.buffer.api.BufferAllocator; +import io.net5.buffer.api.CompositeBuffer; +import io.net5.buffer.api.MemoryManager; import io.net5.buffer.api.Send; import io.net5.buffer.api.StandardAllocationTypes; +import io.net5.buffer.api.WritableComponent; +import io.net5.buffer.api.internal.Statics; +import io.net5.buffer.api.unsafe.UnsafeMemoryManager; import io.net5.util.internal.PlatformDependent; import it.cavallium.dbengine.client.DatabaseOptions; import it.cavallium.dbengine.database.LLUtils; @@ -20,8 +29,10 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.CompactRangeOptions; +import org.rocksdb.FileOperationInfo; import org.rocksdb.FlushOptions; import org.rocksdb.Holder; +import org.rocksdb.KeyMayExistWorkaround; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; @@ -32,11 +43,11 @@ import org.rocksdb.WriteOptions; import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.LoggerFactory; import reactor.core.scheduler.Schedulers; +import sun.misc.Unsafe; public sealed abstract class AbstractRocksDBColumn implements RocksDBColumn permits StandardRocksDBColumn, OptimisticRocksDBColumn, PessimisticRocksDBColumn { - private static final int INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES = 4096; private static final byte[] NO_DATA = new byte[0]; protected static final UpdateAtomicResult RESULT_NOTHING = new UpdateAtomicResultNothing(); @@ -58,7 +69,7 @@ public sealed abstract class AbstractRocksDBColumn implements MeterRegistry meterRegistry) { this.db = db; this.opts = databaseOptions; - this.nettyDirect = opts.allowNettyDirect() && alloc.getAllocationType() == StandardAllocationTypes.OFF_HEAP; + this.nettyDirect = opts.allowNettyDirect() && alloc.getAllocationType() == OFF_HEAP; this.alloc = alloc; this.cfh = cfh; @@ -82,147 +93,179 @@ public sealed abstract class AbstractRocksDBColumn implements } @Override - public @Nullable Send get(@NotNull ReadOptions readOptions, - Send keySend, - boolean existsAlmostCertainly) throws RocksDBException { - try (var key = keySend.receive()) { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called dbGet in a nonblocking thread"); - } - if (!db.isOwningHandle()) { - throw new IllegalStateException("Database is closed"); - } - if (!readOptions.isOwningHandle()) { - throw new IllegalStateException("ReadOptions is closed"); - } - if (!cfh.isOwningHandle()) { - throw new IllegalStateException("Column family is closed"); - } - if (nettyDirect) { - - //todo: implement keyMayExist if existsAlmostCertainly is false. - // Unfortunately it's not feasible until RocksDB implements keyMayExist with buffers - - // Create the key nio buffer to pass to RocksDB - var keyNioBuffer = LLUtils.convertToReadableDirect(alloc, key.send()); - // Create a direct result buffer because RocksDB works only with direct buffers - try (Buffer resultBuf = alloc.allocate(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES)) { - int valueSize; - ByteBuffer resultNioBuf; - do { - // Create the result nio buffer to pass to RocksDB - resultNioBuf = LLUtils.obtainDirect(resultBuf, true); - assert keyNioBuffer.byteBuffer().isDirect(); - assert resultNioBuf.isDirect(); - // todo: use keyMayExist when rocksdb will implement keyMayExist with buffers - valueSize = db.get(cfh, - readOptions, - keyNioBuffer.byteBuffer().position(0), - resultNioBuf - ); - if (valueSize != RocksDB.NOT_FOUND) { - - // todo: check if position is equal to data that have been read - // todo: check if limit is equal to value size or data that have been read - assert valueSize <= 0 || resultNioBuf.limit() > 0; - - // Check if read data is not bigger than the total value size. - // If it's bigger it means that RocksDB is writing the start - // of the result into the result buffer more than once. - assert resultNioBuf.limit() <= valueSize; - - // Update data size metrics - this.lastDataSizeMetric.set(valueSize); - - if (valueSize <= resultNioBuf.limit()) { - // Return the result ready to be read - return resultBuf.readerOffset(0).writerOffset(valueSize).send(); - } else { - //noinspection UnusedAssignment - resultNioBuf = null; - } - // Rewind the keyNioBuf position, making it readable again for the next loop iteration - keyNioBuffer.byteBuffer().rewind(); - if (resultBuf.capacity() < valueSize) { - // Expand the resultBuf size if the result is bigger than the current result - // buffer size - resultBuf.ensureWritable(valueSize); - } - } - // Repeat if the result has been found but it's still not finished - } while (valueSize != RocksDB.NOT_FOUND); - // If the value is not found return null - return null; - } finally { - keyNioBuffer.buffer().close(); - PlatformDependent.freeDirectBuffer(keyNioBuffer.byteBuffer()); - } + public @Nullable Buffer get(@NotNull ReadOptions readOptions, Buffer key, boolean existsAlmostCertainly) + throws RocksDBException { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called dbGet in a nonblocking thread"); + } + if (!db.isOwningHandle()) { + throw new IllegalStateException("Database is closed"); + } + if (!readOptions.isOwningHandle()) { + throw new IllegalStateException("ReadOptions is closed"); + } + if (!cfh.isOwningHandle()) { + throw new IllegalStateException("Column family is closed"); + } + if (nettyDirect) { + // Get the key nio buffer to pass to RocksDB + ByteBuffer keyNioBuffer = LLUtils.asReadOnlyDirect(key); + assert keyNioBuffer.isDirect(); + boolean mustCloseKey; + if (keyNioBuffer == null) { + mustCloseKey = true; + // If the nio buffer is not available, copy the netty buffer into a new direct buffer + keyNioBuffer = LLUtils.copyToNewDirectBuffer(key); } else { + mustCloseKey = false; + } + assert keyNioBuffer.limit() == key.readableBytes(); + try { + // Create a direct result buffer because RocksDB works only with direct buffers + var resultBuffer = LLUtils.allocateShared(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES); try { - byte[] keyArray = LLUtils.toArray(key); - requireNonNull(keyArray); - Holder data = existsAlmostCertainly ? null : new Holder<>(); - if (existsAlmostCertainly || db.keyMayExist(cfh, readOptions, keyArray, data)) { - if (!existsAlmostCertainly && data.getValue() != null) { - return LLUtils.fromByteArray(alloc, data.getValue()).send(); - } else { - byte[] result = db.get(cfh, readOptions, keyArray); - if (result == null) { + assert resultBuffer.readerOffset() == 0; + assert resultBuffer.writerOffset() == 0; + var resultWritable = ((WritableComponent) resultBuffer).writableBuffer(); + + var keyMayExist = db.keyMayExist(cfh, keyNioBuffer, resultWritable); + var keyMayExistState = KeyMayExistWorkaround.getExistenceState(keyMayExist); + int keyMayExistValueLength = KeyMayExistWorkaround.getValueLength(keyMayExist); + // At the beginning, size reflects the expected size, then it becomes the real data size + int size = keyMayExistState == 2 ? keyMayExistValueLength : -1; + switch (keyMayExistState) { + // kNotExist + case 0: { + resultBuffer.close(); + return null; + } + // kExistsWithoutValue + case 1: { + assert keyMayExistValueLength == 0; + resultWritable.clear(); + // real data size + size = db.get(cfh, readOptions, keyNioBuffer, resultWritable); + if (size == RocksDB.NOT_FOUND) { + resultBuffer.close(); return null; - } else { - return LLUtils.fromByteArray(alloc, result).send(); } } + // kExistsWithValue + case 2: { + // real data size + this.lastDataSizeMetric.set(size); + assert size >= 0; + if (size <= resultWritable.limit()) { + assert size == resultWritable.limit(); + return resultBuffer.writerOffset(resultWritable.limit()); + } else { + resultBuffer.ensureWritable(size); + resultWritable = ((WritableComponent) resultBuffer).writableBuffer(); + assert resultBuffer.readerOffset() == 0; + assert resultBuffer.writerOffset() == 0; + + size = db.get(cfh, readOptions, keyNioBuffer, resultWritable); + if (size == RocksDB.NOT_FOUND) { + resultBuffer.close(); + return null; + } + assert size == resultWritable.limit(); + return resultBuffer.writerOffset(resultWritable.limit()); + } + } + default: { + throw new IllegalStateException(); + } + } + } catch (Throwable t) { + resultBuffer.close(); + throw t; + } + } finally { + if (mustCloseKey) { + PlatformDependent.freeDirectBuffer(keyNioBuffer); + } + } + } else { + try { + byte[] keyArray = LLUtils.toArray(key); + requireNonNull(keyArray); + Holder data = existsAlmostCertainly ? null : new Holder<>(); + if (existsAlmostCertainly || db.keyMayExist(cfh, readOptions, keyArray, data)) { + if (!existsAlmostCertainly && data.getValue() != null) { + return LLUtils.fromByteArray(alloc, data.getValue()); } else { - return null; - } - } finally { - if (!(readOptions instanceof UnreleasableReadOptions)) { - readOptions.close(); + byte[] result = db.get(cfh, readOptions, keyArray); + if (result == null) { + return null; + } else { + return LLUtils.fromByteArray(alloc, result); + } } + } else { + return null; + } + } finally { + if (!(readOptions instanceof UnreleasableReadOptions)) { + readOptions.close(); } } } } @Override - public void put(@NotNull WriteOptions writeOptions, Send keyToReceive, - Send valueToReceive) throws RocksDBException { + public void put(@NotNull WriteOptions writeOptions, Buffer key, Buffer value) throws RocksDBException { try { - try (var key = keyToReceive.receive()) { - try (var value = valueToReceive.receive()) { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called dbPut in a nonblocking thread"); - } - if (!db.isOwningHandle()) { - throw new IllegalStateException("Database is closed"); - } - if (!writeOptions.isOwningHandle()) { - throw new IllegalStateException("WriteOptions is closed"); - } - if (!cfh.isOwningHandle()) { - throw new IllegalStateException("Column family is closed"); - } - assert key.isAccessible(); - assert value.isAccessible(); - if (nettyDirect) { - var keyNioBuffer = LLUtils.convertToReadableDirect(alloc, key.send()); - try (var ignored1 = keyNioBuffer.buffer().receive()) { - assert keyNioBuffer.byteBuffer().isDirect(); - var valueNioBuffer = LLUtils.convertToReadableDirect(alloc, value.send()); - try (var ignored2 = valueNioBuffer.buffer().receive()) { - assert valueNioBuffer.byteBuffer().isDirect(); - db.put(cfh, writeOptions, keyNioBuffer.byteBuffer(), valueNioBuffer.byteBuffer()); - } finally { - PlatformDependent.freeDirectBuffer(valueNioBuffer.byteBuffer()); - } - } finally { - PlatformDependent.freeDirectBuffer(keyNioBuffer.byteBuffer()); - } + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called dbPut in a nonblocking thread"); + } + if (!db.isOwningHandle()) { + throw new IllegalStateException("Database is closed"); + } + if (!writeOptions.isOwningHandle()) { + throw new IllegalStateException("WriteOptions is closed"); + } + if (!cfh.isOwningHandle()) { + throw new IllegalStateException("Column family is closed"); + } + assert key.isAccessible(); + assert value.isAccessible(); + if (nettyDirect) { + // Get the key nio buffer to pass to RocksDB + ByteBuffer keyNioBuffer = LLUtils.asReadOnlyDirect(key); + boolean mustCloseKey; + if (keyNioBuffer == null) { + mustCloseKey = true; + // If the nio buffer is not available, copy the netty buffer into a new direct buffer + keyNioBuffer = LLUtils.copyToNewDirectBuffer(key); + } else { + mustCloseKey = false; + } + try { + // Get the value nio buffer to pass to RocksDB + ByteBuffer valueNioBuffer = LLUtils.asReadOnlyDirect(value); + boolean mustCloseValue; + if (valueNioBuffer == null) { + mustCloseValue = true; + // If the nio buffer is not available, copy the netty buffer into a new direct buffer + valueNioBuffer = LLUtils.copyToNewDirectBuffer(value); } else { - db.put(cfh, writeOptions, LLUtils.toArray(key), LLUtils.toArray(value)); + mustCloseValue = false; + } + try { + db.put(cfh, writeOptions, keyNioBuffer, valueNioBuffer); + } finally { + if (mustCloseValue) { + PlatformDependent.freeDirectBuffer(valueNioBuffer); + } + } + } finally { + if (mustCloseKey) { + PlatformDependent.freeDirectBuffer(keyNioBuffer); } } + } else { + db.put(cfh, writeOptions, LLUtils.toArray(key), LLUtils.toArray(value)); } } finally { if (!(writeOptions instanceof UnreleasableWriteOptions)) { @@ -232,20 +275,43 @@ public sealed abstract class AbstractRocksDBColumn implements } @Override - public boolean exists(@NotNull ReadOptions readOptions, Send keySend) throws RocksDBException { - try (var key = keySend.receive()) { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called containsKey in a nonblocking thread"); + public boolean exists(@NotNull ReadOptions readOptions, Buffer key) throws RocksDBException { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called containsKey in a nonblocking thread"); + } + if (!db.isOwningHandle()) { + throw new IllegalStateException("Database is closed"); + } + if (!readOptions.isOwningHandle()) { + throw new IllegalStateException("ReadOptions is closed"); + } + if (!cfh.isOwningHandle()) { + throw new IllegalStateException("Column family is closed"); + } + if (nettyDirect) { + // Get the key nio buffer to pass to RocksDB + ByteBuffer keyNioBuffer = LLUtils.asReadOnlyDirect(key); + boolean mustCloseKey; + if (keyNioBuffer == null) { + mustCloseKey = true; + // If the nio buffer is not available, copy the netty buffer into a new direct buffer + keyNioBuffer = LLUtils.copyToNewDirectBuffer(key); + } else { + mustCloseKey = false; } - if (!db.isOwningHandle()) { - throw new IllegalStateException("Database is closed"); - } - if (!readOptions.isOwningHandle()) { - throw new IllegalStateException("ReadOptions is closed"); - } - if (!cfh.isOwningHandle()) { - throw new IllegalStateException("Column family is closed"); + try { + if (db.keyMayExist(cfh, keyNioBuffer)) { + int size = db.get(cfh, readOptions, keyNioBuffer, LLUtils.EMPTY_BYTE_BUFFER); + return size != RocksDB.NOT_FOUND; + } else { + return false; + } + } finally { + if (mustCloseKey) { + PlatformDependent.freeDirectBuffer(keyNioBuffer); + } } + } else { int size = RocksDB.NOT_FOUND; byte[] keyBytes = LLUtils.toArray(key); Holder data = new Holder<>(); @@ -267,28 +333,36 @@ public sealed abstract class AbstractRocksDBColumn implements } @Override - public void delete(WriteOptions writeOptions, Send keySend) throws RocksDBException { - try (var key = keySend.receive()) { - if (!db.isOwningHandle()) { - throw new IllegalStateException("Database is closed"); - } - if (!writeOptions.isOwningHandle()) { - throw new IllegalStateException("WriteOptions is closed"); - } - if (!cfh.isOwningHandle()) { - throw new IllegalStateException("Column family is closed"); - } - if (nettyDirect) { - DirectBuffer keyNioBuffer = LLUtils.convertToReadableDirect(alloc, key.send()); - try { - db.delete(cfh, writeOptions, keyNioBuffer.byteBuffer()); - } finally { - keyNioBuffer.buffer().close(); - PlatformDependent.freeDirectBuffer(keyNioBuffer.byteBuffer()); - } + public void delete(WriteOptions writeOptions, Buffer key) throws RocksDBException { + if (!db.isOwningHandle()) { + throw new IllegalStateException("Database is closed"); + } + if (!writeOptions.isOwningHandle()) { + throw new IllegalStateException("WriteOptions is closed"); + } + if (!cfh.isOwningHandle()) { + throw new IllegalStateException("Column family is closed"); + } + if (nettyDirect) { + // Get the key nio buffer to pass to RocksDB + ByteBuffer keyNioBuffer = LLUtils.asReadOnlyDirect(key); + boolean mustCloseKey; + if (keyNioBuffer == null) { + mustCloseKey = true; + // If the nio buffer is not available, copy the netty buffer into a new direct buffer + keyNioBuffer = LLUtils.copyToNewDirectBuffer(key); } else { - db.delete(cfh, writeOptions, LLUtils.toArray(key)); + mustCloseKey = false; } + try { + db.delete(cfh, writeOptions, keyNioBuffer); + } finally { + if (mustCloseKey) { + PlatformDependent.freeDirectBuffer(keyNioBuffer); + } + } + } else { + db.delete(cfh, writeOptions, LLUtils.toArray(key)); } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 0215960..a195aaf 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -210,15 +210,13 @@ public class LLLocalDictionary implements LLDictionary { } try (logKey) { var readOptions = requireNonNullElse(resolveSnapshot(snapshot), EMPTY_READ_OPTIONS); - var result = db.get(readOptions, key.send(), existsAlmostCertainly); + var result = db.get(readOptions, key, existsAlmostCertainly); if (logger.isTraceEnabled(MARKER_ROCKSDB)) { - try (var result2 = result == null ? null : result.receive()) { logger.trace(MARKER_ROCKSDB, "Reading {}: {}", LLUtils.toStringSafe(logKey), - LLUtils.toString(result2)); - return result2 == null ? null : result2.send(); - } + LLUtils.toString(result)); + return result == null ? null : result.send(); } else { - return result; + return result == null ? null : result.send(); } } } catch (Exception ex) { @@ -250,73 +248,49 @@ public class LLLocalDictionary implements LLDictionary { return Mono.usingWhen(rangeMono, rangeSend -> runOnDb(() -> { // Temporary resources to release after finished - Buffer cloned1 = null; - Buffer cloned2 = null; - Buffer cloned3 = null; - ByteBuffer direct1 = null; - ByteBuffer direct2 = null; - ByteBuffer direct3 = null; AbstractSlice slice1 = null; AbstractSlice slice2 = null; - try { - try (var range = rangeSend.receive()) { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called containsRange in a nonblocking thread"); + try (var range = rangeSend.receive()) { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called containsRange in a nonblocking thread"); + } + try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { + readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); + readOpts.setFillCache(false); + if (range.hasMin()) { + var rangeMinInternalByteBuffer = LLUtils.asReadOnlyDirect(range.getMinUnsafe()); + if (nettyDirect && rangeMinInternalByteBuffer != null) { + readOpts.setIterateLowerBound(slice1 = new DirectSlice(rangeMinInternalByteBuffer, + range.getMinUnsafe().readableBytes())); + } else { + readOpts.setIterateLowerBound(slice1 = new Slice(LLUtils.toArray(range.getMinUnsafe()))); + } } - try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { - readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); - readOpts.setFillCache(false); - if (range.hasMin()) { - try (var rangeMin = range.getMin().receive()) { - if (nettyDirect) { - var directBuf = LLUtils.convertToReadableDirect(alloc, rangeMin.send()); - cloned1 = directBuf.buffer().receive(); - direct1 = directBuf.byteBuffer(); - readOpts.setIterateLowerBound(slice1 = new DirectSlice(directBuf.byteBuffer())); - } else { - readOpts.setIterateLowerBound(slice1 = new Slice(LLUtils.toArray(rangeMin))); - } - } + if (range.hasMax()) { + var rangeMaxInternalByteBuffer = LLUtils.asReadOnlyDirect(range.getMaxUnsafe()); + if (nettyDirect && rangeMaxInternalByteBuffer != null) { + readOpts.setIterateUpperBound(slice2 = new DirectSlice(rangeMaxInternalByteBuffer, + range.getMaxUnsafe().readableBytes())); + } else { + readOpts.setIterateUpperBound(slice2 = new Slice(LLUtils.toArray(range.getMaxUnsafe()))); } - if (range.hasMax()) { - try (var rangeMax = range.getMax().receive()) { - if (nettyDirect) { - var directBuf = LLUtils.convertToReadableDirect(alloc, rangeMax.send()); - cloned2 = directBuf.buffer().receive(); - direct2 = directBuf.byteBuffer(); - readOpts.setIterateUpperBound(slice2 = new DirectSlice(directBuf.byteBuffer())); - } else { - readOpts.setIterateUpperBound(slice2 = new Slice(LLUtils.toArray(rangeMax))); - } - } - } - try (RocksIterator rocksIterator = db.newIterator(readOpts)) { - if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - try (var rangeMin = range.getMin().receive()) { - if (nettyDirect) { - var directBuf = LLUtils.convertToReadableDirect(alloc, rangeMin.send()); - cloned3 = directBuf.buffer().receive(); - direct3 = directBuf.byteBuffer(); - rocksIterator.seek(directBuf.byteBuffer()); - } else { - rocksIterator.seek(LLUtils.toArray(rangeMin)); - } - } + } + try (RocksIterator rocksIterator = db.newIterator(readOpts)) { + if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { + var rangeMinInternalByteBuffer = LLUtils.asReadOnlyDirect(range.getMinUnsafe()); + if (nettyDirect && rangeMinInternalByteBuffer != null) { + rocksIterator.seek(rangeMinInternalByteBuffer); } else { - rocksIterator.seekToFirst(); + rocksIterator.seek(LLUtils.toArray(range.getMinUnsafe())); } - rocksIterator.status(); - return rocksIterator.isValid(); + } else { + rocksIterator.seekToFirst(); } + rocksIterator.status(); + return rocksIterator.isValid(); } } } finally { - if (cloned1 != null) cloned1.close(); - if (cloned2 != null) cloned2.close(); - if (cloned3 != null) cloned3.close(); - if (direct1 != null) PlatformDependent.freeDirectBuffer(direct1); - if (direct2 != null) PlatformDependent.freeDirectBuffer(direct2); - if (direct3 != null) PlatformDependent.freeDirectBuffer(direct3); if (slice1 != null) slice1.close(); if (slice2 != null) slice2.close(); } @@ -328,7 +302,9 @@ public class LLLocalDictionary implements LLDictionary { return Mono.usingWhen(keyMono, keySend -> runOnDb(() -> { var unmodifiableReadOpts = resolveSnapshot(snapshot); - return db.exists(unmodifiableReadOpts, keySend); + try (var key = keySend.receive()) { + return db.exists(unmodifiableReadOpts, key); + } }).onErrorMap(cause -> new IOException("Failed to read", cause)), keySend -> Mono.fromRunnable(keySend::close) ); @@ -351,7 +327,7 @@ public class LLLocalDictionary implements LLDictionary { logger.trace(MARKER_ROCKSDB, "Writing {}: {}", LLUtils.toStringSafe(key), LLUtils.toStringSafe(value)); } - db.put(EMPTY_WRITE_OPTIONS, key.send(), value.send()); + db.put(EMPTY_WRITE_OPTIONS, key, value); return null; } } @@ -437,18 +413,15 @@ public class LLLocalDictionary implements LLDictionary { .getPreviousData(keyMono, resultType, true) .concatWith(this .>runOnDb(() -> { - try (keySend) { + try (var key = keySend.receive()) { if (logger.isTraceEnabled()) { - try (var key = keySend.receive()) { - logger.trace(MARKER_ROCKSDB, "Deleting {}", LLUtils.toStringSafe(key)); - db.delete(EMPTY_WRITE_OPTIONS, key.send()); - } - return null; + logger.trace(MARKER_ROCKSDB, "Deleting {}", LLUtils.toStringSafe(key)); + db.delete(EMPTY_WRITE_OPTIONS, key); } else { - db.delete(EMPTY_WRITE_OPTIONS, keySend); - return null; + db.delete(EMPTY_WRITE_OPTIONS, key); } } + return null; }) .onErrorMap(cause -> new IOException("Failed to delete", cause)) ) @@ -468,22 +441,19 @@ public class LLLocalDictionary implements LLDictionary { keyMono, keySend -> this .runOnDb(() -> { - try (keySend) { + try (var key = keySend.receive()) { if (Schedulers.isInNonBlockingThread()) { throw new UnsupportedOperationException("Called getPreviousData in a nonblocking thread"); } if (logger.isTraceEnabled()) { - try (var key = keySend.receive()) { - var keyString = LLUtils.toStringSafe(key); - var result = db.get(EMPTY_READ_OPTIONS, key.send(), existsAlmostCertainly); - try (var bufferResult = result == null ? null : result.receive()) { - logger.trace(MARKER_ROCKSDB, "Reading {}: {}", keyString, LLUtils.toStringSafe(bufferResult)); - return bufferResult == null ? null : bufferResult.send(); - } - } + var keyString = LLUtils.toStringSafe(key); + var result = db.get(EMPTY_READ_OPTIONS, key, existsAlmostCertainly); + logger.trace(MARKER_ROCKSDB, "Reading {}: {}", keyString, LLUtils.toStringSafe(result)); + return result == null ? null : result.send(); } else { - return db.get(EMPTY_READ_OPTIONS, keySend, existsAlmostCertainly); + var result = db.get(EMPTY_READ_OPTIONS, key, existsAlmostCertainly); + return result == null ? null : result.send(); } } }) @@ -575,9 +545,9 @@ public class LLLocalDictionary implements LLDictionary { try (var readOptions = resolveSnapshot(null)) { for (LLEntry entry : entriesWindow) { try (var key = entry.getKey().receive()) { - Send oldValue = db.get(readOptions, key.copy().send(), false); + Buffer oldValue = db.get(readOptions, key, false); if (oldValue != null) { - oldValues.add(LLEntry.of(key.send(), oldValue).send()); + oldValues.add(LLEntry.of(key, oldValue).send()); } } } @@ -610,7 +580,7 @@ public class LLLocalDictionary implements LLDictionary { batch.close(); } else { for (LLEntry entry : entriesWindow) { - db.put(EMPTY_WRITE_OPTIONS, entry.getKey(), entry.getValue()); + db.put(EMPTY_WRITE_OPTIONS, entry.getKeyUnsafe(), entry.getValueUnsafe()); } } return oldValues; @@ -719,7 +689,7 @@ public class LLLocalDictionary implements LLDictionary { } else { int i = 0; for (Tuple2 entry : entriesWindow) { - db.put(EMPTY_WRITE_OPTIONS, entry.getT2().send(), updatedValuesToWrite.get(i).send()); + db.put(EMPTY_WRITE_OPTIONS, entry.getT2(), updatedValuesToWrite.get(i)); i++; } } @@ -959,22 +929,14 @@ public class LLLocalDictionary implements LLDictionary { try (var opts = new ReadOptions(EMPTY_READ_OPTIONS)) { ReleasableSlice minBound; if (range.hasMin()) { - minBound = setIterateBound(alloc, nettyDirect, - opts, - IterateBound.LOWER, - range.getMin() - ); + minBound = setIterateBound(nettyDirect, opts, IterateBound.LOWER, range.getMinUnsafe()); } else { minBound = emptyReleasableSlice(); } try { ReleasableSlice maxBound; if (range.hasMax()) { - maxBound = setIterateBound(alloc, nettyDirect, - opts, - IterateBound.UPPER, - range.getMax() - ); + maxBound = setIterateBound(nettyDirect, opts, IterateBound.UPPER, range.getMaxUnsafe()); } else { maxBound = emptyReleasableSlice(); } @@ -983,7 +945,7 @@ public class LLLocalDictionary implements LLDictionary { SafeCloseable seekTo; try (RocksIterator it = db.newIterator(opts)) { if (!PREFER_SEEK_TO_FIRST && range.hasMin()) { - seekTo = rocksIterSeekTo(alloc, nettyDirect, it, range.getMin()); + seekTo = rocksIterSeekTo(nettyDirect, it, range.getMinUnsafe()); } else { seekTo = null; it.seekToFirst(); @@ -1049,7 +1011,7 @@ public class LLLocalDictionary implements LLDictionary { if (!USE_WRITE_BATCHES_IN_SET_RANGE) { for (LLEntry entry : entriesList) { assert entry.isAccessible(); - db.put(EMPTY_WRITE_OPTIONS, entry.getKey(), entry.getValue()); + db.put(EMPTY_WRITE_OPTIONS, entry.getKeyUnsafe(), entry.getValueUnsafe()); } } else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) { try (var batch = new CappedWriteBatch(db, @@ -1106,7 +1068,7 @@ public class LLLocalDictionary implements LLDictionary { .getRange(null, rangeMono, false) .flatMap(oldValueSend -> this.runOnDb(() -> { try (var oldValue = oldValueSend.receive()) { - db.delete(EMPTY_WRITE_OPTIONS, oldValue.getKey()); + db.delete(EMPTY_WRITE_OPTIONS, oldValue.getKeyUnsafe()); return null; } })) @@ -1136,23 +1098,21 @@ public class LLLocalDictionary implements LLDictionary { readOpts.setFillCache(false); ReleasableSlice minBound; if (range.hasMin()) { - minBound = setIterateBound(alloc, nettyDirect, readOpts, - IterateBound.LOWER, range.getMin()); + minBound = setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe()); } else { minBound = emptyReleasableSlice(); } try { ReleasableSlice maxBound; if (range.hasMax()) { - maxBound = setIterateBound(alloc, nettyDirect, readOpts, - IterateBound.UPPER, range.getMax()); + maxBound = setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe()); } else { maxBound = emptyReleasableSlice(); } try (var rocksIterator = db.newIterator(readOpts)) { SafeCloseable seekTo; if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - seekTo = rocksIterSeekTo(alloc, nettyDirect, rocksIterator, range.getMin()); + seekTo = rocksIterSeekTo(nettyDirect, rocksIterator, range.getMinUnsafe()); } else { seekTo = null; rocksIterator.seekToFirst(); @@ -1188,23 +1148,21 @@ public class LLLocalDictionary implements LLDictionary { readOpts.setFillCache(false); ReleasableSlice minBound; if (range.hasMin()) { - minBound = setIterateBound(alloc, nettyDirect, readOpts, - IterateBound.LOWER, range.getMin()); + minBound = setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe()); } else { minBound = emptyReleasableSlice(); } try { ReleasableSlice maxBound; if (range.hasMax()) { - maxBound = setIterateBound(alloc, nettyDirect, readOpts, IterateBound.UPPER, - range.getMax()); + maxBound = setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe()); } else { maxBound = emptyReleasableSlice(); } try (var rocksIterator = db.newIterator(readOpts)) { SafeCloseable seekTo; if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - seekTo = rocksIterSeekTo(alloc, nettyDirect, rocksIterator, range.getMin()); + seekTo = rocksIterSeekTo(nettyDirect, rocksIterator, range.getMinUnsafe()); } else { seekTo = null; rocksIterator.seekToFirst(); @@ -1232,59 +1190,42 @@ public class LLLocalDictionary implements LLDictionary { } @Nullable - private static SafeCloseable rocksIterSeekTo(BufferAllocator alloc, boolean allowNettyDirect, - RocksIterator rocksIterator, Send bufferToReceive) { - try (var buffer = bufferToReceive.receive()) { - if (allowNettyDirect) { - var direct = LLUtils.convertToReadableDirect(alloc, buffer.send()); - assert direct.byteBuffer().isDirect(); - rocksIterator.seek(direct.byteBuffer()); - return () -> { - direct.buffer().close(); - PlatformDependent.freeDirectBuffer(direct.byteBuffer()); - }; - } else { - rocksIterator.seek(LLUtils.toArray(buffer)); - return null; - } + private static SafeCloseable rocksIterSeekTo(boolean allowNettyDirect, + RocksIterator rocksIterator, Buffer key) { + ByteBuffer keyInternalByteBuffer; + if (allowNettyDirect && (keyInternalByteBuffer = LLUtils.asReadOnlyDirect(key)) != null) { + rocksIterator.seek(keyInternalByteBuffer); + return null; + } else { + rocksIterator.seek(LLUtils.toArray(key)); + return null; } } - private static ReleasableSlice setIterateBound(BufferAllocator alloc, boolean allowNettyDirect, - ReadOptions readOpts, IterateBound boundType, Send bufferToReceive) { - var buffer = bufferToReceive.receive(); - try { - requireNonNull(buffer); - AbstractSlice slice; - if (allowNettyDirect && LLLocalDictionary.USE_DIRECT_BUFFER_BOUNDS) { - var direct = LLUtils.convertToReadableDirect(alloc, buffer.send()); - buffer = direct.buffer().receive(); - assert direct.byteBuffer().isDirect(); - slice = new DirectSlice(direct.byteBuffer(), buffer.readableBytes()); - assert slice.size() == buffer.readableBytes(); - assert slice.compare(new Slice(LLUtils.toArray(buffer))) == 0; - if (boundType == IterateBound.LOWER) { - readOpts.setIterateLowerBound(slice); - } else { - readOpts.setIterateUpperBound(slice); - } - return new ReleasableSliceImpl(slice, buffer, direct.byteBuffer()); + private static ReleasableSlice setIterateBound(boolean allowNettyDirect, + ReadOptions readOpts, IterateBound boundType, Buffer key) { + requireNonNull(key); + AbstractSlice slice; + ByteBuffer keyInternalByteBuffer; + if (allowNettyDirect && LLLocalDictionary.USE_DIRECT_BUFFER_BOUNDS + && (keyInternalByteBuffer = LLUtils.asReadOnlyDirect(key)) != null) { + slice = new DirectSlice(keyInternalByteBuffer, key.readableBytes()); + assert slice.size() == key.readableBytes(); + assert slice.compare(new Slice(LLUtils.toArray(key))) == 0; + if (boundType == IterateBound.LOWER) { + readOpts.setIterateLowerBound(slice); } else { - try { - slice = new Slice(requireNonNull(LLUtils.toArray(buffer))); - if (boundType == IterateBound.LOWER) { - readOpts.setIterateLowerBound(slice); - } else { - readOpts.setIterateUpperBound(slice); - } - return new ReleasableSliceImpl(slice, null, null); - } finally { - buffer.close(); - } + readOpts.setIterateUpperBound(slice); } - } catch (Throwable e) { - buffer.close(); - throw e; + return new ReleasableSliceImpl(slice, null, null); + } else { + slice = new Slice(requireNonNull(LLUtils.toArray(key))); + if (boundType == IterateBound.LOWER) { + readOpts.setIterateLowerBound(slice); + } else { + readOpts.setIterateUpperBound(slice); + } + return new ReleasableSliceImpl(slice, null, null); } } @@ -1400,16 +1341,14 @@ public class LLLocalDictionary implements LLDictionary { readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); ReleasableSlice minBound; if (range.hasMin()) { - minBound = setIterateBound(alloc, nettyDirect, readOpts, - IterateBound.LOWER, range.getMin()); + minBound = setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe()); } else { minBound = emptyReleasableSlice(); } try { ReleasableSlice maxBound; if (range.hasMax()) { - maxBound = setIterateBound(alloc, nettyDirect, readOpts, - IterateBound.UPPER, range.getMax()); + maxBound = setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe()); } else { maxBound = emptyReleasableSlice(); } @@ -1421,8 +1360,7 @@ public class LLLocalDictionary implements LLDictionary { try (var rocksIterator = db.newIterator(readOpts)) { SafeCloseable seekTo; if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - seekTo = rocksIterSeekTo(alloc, nettyDirect, - rocksIterator, range.getMin()); + seekTo = rocksIterSeekTo(nettyDirect, rocksIterator, range.getMinUnsafe()); } else { seekTo = null; rocksIterator.seekToFirst(); @@ -1466,24 +1404,21 @@ public class LLLocalDictionary implements LLDictionary { try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { ReleasableSlice minBound; if (range.hasMin()) { - minBound = setIterateBound(alloc, nettyDirect, readOpts, - IterateBound.LOWER, range.getMin()); + minBound = setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe()); } else { minBound = emptyReleasableSlice(); } try { ReleasableSlice maxBound; if (range.hasMax()) { - maxBound = setIterateBound(alloc, nettyDirect, readOpts, - IterateBound.UPPER, range.getMax()); + maxBound = setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe()); } else { maxBound = emptyReleasableSlice(); } try (var rocksIterator = db.newIterator(readOpts)) { SafeCloseable seekTo; if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - seekTo = rocksIterSeekTo(alloc, nettyDirect, - rocksIterator, range.getMin()); + seekTo = rocksIterSeekTo(nettyDirect, rocksIterator, range.getMinUnsafe()); } else { seekTo = null; rocksIterator.seekToFirst(); @@ -1528,24 +1463,21 @@ public class LLLocalDictionary implements LLDictionary { try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { ReleasableSlice minBound; if (range.hasMin()) { - minBound = setIterateBound(alloc, nettyDirect, readOpts, - IterateBound.LOWER, range.getMin()); + minBound = setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe()); } else { minBound = emptyReleasableSlice(); } try { ReleasableSlice maxBound; if (range.hasMax()) { - maxBound = setIterateBound(alloc, nettyDirect, readOpts, - IterateBound.UPPER, range.getMax()); + maxBound = setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe()); } else { maxBound = emptyReleasableSlice(); } try (var rocksIterator = db.newIterator(readOpts)) { SafeCloseable seekTo; if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - seekTo = rocksIterSeekTo(alloc, nettyDirect, - rocksIterator, range.getMin()); + seekTo = rocksIterSeekTo(nettyDirect, rocksIterator, range.getMinUnsafe()); } else { seekTo = null; rocksIterator.seekToFirst(); @@ -1697,24 +1629,21 @@ public class LLLocalDictionary implements LLDictionary { try (var readOpts = new ReadOptions(getReadOptions(null))) { ReleasableSlice minBound; if (range.hasMin()) { - minBound = setIterateBound(alloc, nettyDirect, readOpts, - IterateBound.LOWER, range.getMin()); + minBound = setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe()); } else { minBound = emptyReleasableSlice(); } try { ReleasableSlice maxBound; if (range.hasMax()) { - maxBound = setIterateBound(alloc, nettyDirect, readOpts, - IterateBound.UPPER, range.getMax()); + maxBound = setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe()); } else { maxBound = emptyReleasableSlice(); } try (RocksIterator rocksIterator = db.newIterator(readOpts)) { SafeCloseable seekTo; if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - seekTo = rocksIterSeekTo(alloc, nettyDirect, - rocksIterator, range.getMin()); + seekTo = rocksIterSeekTo(nettyDirect, rocksIterator, range.getMinUnsafe()); } else { seekTo = null; rocksIterator.seekToFirst(); @@ -1724,12 +1653,10 @@ public class LLLocalDictionary implements LLDictionary { if (!rocksIterator.isValid()) { return null; } - try (Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key)) { - try (Buffer value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value)) { - db.delete(EMPTY_WRITE_OPTIONS, key.copy().send()); - return LLEntry.of(key.send(), value.send()).send(); - } - } + Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key); + Buffer value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value); + db.delete(EMPTY_WRITE_OPTIONS, key); + return LLEntry.of(key, value).send(); } finally { if (seekTo != null) { seekTo.close(); @@ -1761,21 +1688,20 @@ public class LLLocalDictionary implements LLDictionary { ReleasableSlice sliceMin; ReleasableSlice sliceMax; if (range.hasMin()) { - sliceMin = setIterateBound(alloc, allowNettyDirect, readOptions, IterateBound.LOWER, range.getMin()); + sliceMin = setIterateBound(allowNettyDirect, readOptions, IterateBound.LOWER, range.getMinUnsafe()); } else { sliceMin = emptyReleasableSlice(); } if (range.hasMax()) { - sliceMax = setIterateBound(alloc, allowNettyDirect, readOptions, IterateBound.UPPER, range.getMax()); + sliceMax = setIterateBound(allowNettyDirect, readOptions, IterateBound.UPPER, range.getMaxUnsafe()); } else { sliceMax = emptyReleasableSlice(); } var rocksIterator = db.newIterator(readOptions); SafeCloseable seekTo; if (!PREFER_SEEK_TO_FIRST && range.hasMin()) { - seekTo = Objects.requireNonNullElseGet(rocksIterSeekTo(alloc, allowNettyDirect, - rocksIterator, range.getMin()), () -> ((SafeCloseable) () -> {}) - ); + seekTo = Objects.requireNonNullElseGet(rocksIterSeekTo(allowNettyDirect, rocksIterator, range.getMinUnsafe()), + () -> ((SafeCloseable) () -> {})); } else { seekTo = () -> {}; rocksIterator.seekToFirst(); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java index 51cf683..6ba05ec 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java @@ -34,24 +34,22 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn { var allocator = getAllocator(); try (var keyBuf = allocator.allocate(key.length)) { keyBuf.writeBytes(key); - var result = this.get(readOptions, keyBuf.send(), existsAlmostCertainly); + var result = this.get(readOptions, keyBuf, existsAlmostCertainly); if (result == null) { return null; } - try (var resultBuf = result.receive()) { - return LLUtils.toArray(resultBuf); - } + return LLUtils.toArray(result); } } @Nullable - Send get(@NotNull ReadOptions readOptions, Send keySend, + Buffer get(@NotNull ReadOptions readOptions, Buffer key, boolean existsAlmostCertainly) throws RocksDBException; - boolean exists(@NotNull ReadOptions readOptions, Send keySend) throws RocksDBException; + boolean exists(@NotNull ReadOptions readOptions, Buffer key) throws RocksDBException; - void put(@NotNull WriteOptions writeOptions, Send keyToReceive, - Send valueToReceive) throws RocksDBException; + void put(@NotNull WriteOptions writeOptions, Buffer key, + Buffer value) throws RocksDBException; default void put(@NotNull WriteOptions writeOptions, byte[] key, byte[] value) throws RocksDBException { @@ -61,7 +59,7 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn { try (var valBuf = allocator.allocate(value.length)) { valBuf.writeBytes(value); - this.put(writeOptions, keyBuf.send(), valBuf.send()); + this.put(writeOptions, keyBuf, valBuf); } } } @@ -72,7 +70,7 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn { Send keySend, SerializationFunction<@Nullable Send, @Nullable Buffer> updater, boolean existsAlmostCertainly, UpdateAtomicResultMode returnMode) throws RocksDBException, IOException; - void delete(WriteOptions writeOptions, Send keySend) throws RocksDBException; + void delete(WriteOptions writeOptions, Buffer key) throws RocksDBException; void delete(WriteOptions writeOptions, byte[] key) throws RocksDBException; diff --git a/src/main/java/it/cavallium/dbengine/database/disk/StandardRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/StandardRocksDBColumn.java index 818c562..12814a1 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/StandardRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/StandardRocksDBColumn.java @@ -62,12 +62,7 @@ public final class StandardRocksDBColumn extends AbstractRocksDBColumn prevData = null; } } else { - var obtainedPrevData = this.get(readOptions, key.copy().send(), existsAlmostCertainly); - if (obtainedPrevData == null) { - prevData = null; - } else { - prevData = obtainedPrevData.receive(); - } + prevData = this.get(readOptions, key, existsAlmostCertainly); } } else { prevData = null; @@ -101,7 +96,7 @@ public final class StandardRocksDBColumn extends AbstractRocksDBColumn if (logger.isTraceEnabled()) { logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key)); } - this.delete(writeOptions, key.send()); + this.delete(writeOptions, key); changed = true; } else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) { if (logger.isTraceEnabled()) { @@ -115,7 +110,7 @@ public final class StandardRocksDBColumn extends AbstractRocksDBColumn dataToPut = newData; } try { - this.put(writeOptions, key.send(), dataToPut.send()); + this.put(writeOptions, key, dataToPut); changed = true; } finally { if (dataToPut != newData) { diff --git a/src/main/java/org/rocksdb/CappedWriteBatch.java b/src/main/java/org/rocksdb/CappedWriteBatch.java index d97f3a7..7b762f3 100644 --- a/src/main/java/org/rocksdb/CappedWriteBatch.java +++ b/src/main/java/org/rocksdb/CappedWriteBatch.java @@ -108,21 +108,15 @@ public class CappedWriteBatch extends WriteBatch { Send valueToReceive) throws RocksDBException { var key = keyToReceive.receive(); var value = valueToReceive.receive(); - if (USE_FAST_DIRECT_BUFFERS && isDirect(key) && isDirect(value)) { + ByteBuffer keyNioBuffer; + ByteBuffer valueNioBuffer; + if (USE_FAST_DIRECT_BUFFERS + && (keyNioBuffer = LLUtils.asReadOnlyDirect(key)) != null + && (valueNioBuffer = LLUtils.asReadOnlyDirect(value)) != null) { buffersToRelease.add(value); - var keyNioBuffer = LLUtils.convertToReadableDirect(alloc, key.send()); - key = keyNioBuffer.buffer().receive(); buffersToRelease.add(key); - byteBuffersToRelease.add(keyNioBuffer.byteBuffer()); - assert keyNioBuffer.byteBuffer().isDirect(); - var valueNioBuffer = LLUtils.convertToReadableDirect(alloc, value.send()); - value = valueNioBuffer.buffer().receive(); - buffersToRelease.add(value); - byteBuffersToRelease.add(valueNioBuffer.byteBuffer()); - assert valueNioBuffer.byteBuffer().isDirect(); - - super.put(columnFamilyHandle, keyNioBuffer.byteBuffer(), valueNioBuffer.byteBuffer()); + super.put(columnFamilyHandle, keyNioBuffer, valueNioBuffer); } else { try { byte[] keyArray = LLUtils.toArray(key); @@ -176,19 +170,16 @@ public class CappedWriteBatch extends WriteBatch { public synchronized void delete(ColumnFamilyHandle columnFamilyHandle, Send keyToReceive) throws RocksDBException { var key = keyToReceive.receive(); - if (USE_FAST_DIRECT_BUFFERS) { - var keyNioBuffer = LLUtils.convertToReadableDirect(alloc, key.send()); - key = keyNioBuffer.buffer().receive(); + ByteBuffer keyNioBuffer; + if (USE_FAST_DIRECT_BUFFERS && (keyNioBuffer = LLUtils.asReadOnlyDirect(key)) != null) { buffersToRelease.add(key); - byteBuffersToRelease.add(keyNioBuffer.byteBuffer()); - assert keyNioBuffer.byteBuffer().isDirect(); removeDirect(nativeHandle_, - keyNioBuffer.byteBuffer(), - keyNioBuffer.byteBuffer().position(), - keyNioBuffer.byteBuffer().remaining(), + keyNioBuffer, + keyNioBuffer.position(), + keyNioBuffer.remaining(), columnFamilyHandle.nativeHandle_ ); - keyNioBuffer.byteBuffer().position(keyNioBuffer.byteBuffer().limit()); + keyNioBuffer.position(keyNioBuffer.limit()); } else { try { super.delete(columnFamilyHandle, LLUtils.toArray(key)); diff --git a/src/main/java/org/rocksdb/KeyMayExistWorkaround.java b/src/main/java/org/rocksdb/KeyMayExistWorkaround.java new file mode 100644 index 0000000..c832479 --- /dev/null +++ b/src/main/java/org/rocksdb/KeyMayExistWorkaround.java @@ -0,0 +1,23 @@ +package org.rocksdb; + +public class KeyMayExistWorkaround { + + /** + * @return real value length + */ + public static int getValueLength(KeyMayExist keyMayExist) { + return keyMayExist.valueLength; + } + + /** + * 0 = not exists + * + * 1 = exists without value + * + * 2 = exists with value + * + */ + public static int getExistenceState(KeyMayExist keyMayExist) { + return keyMayExist.exists.ordinal(); + } +}