Update rocksdb to 6.26.1, Update netty

This commit is contained in:
Andrea Cavalli 2021-12-12 02:17:36 +01:00
parent 18b242d746
commit 2a5e90d667
8 changed files with 502 additions and 506 deletions

View File

@ -377,7 +377,7 @@
<dependency> <dependency>
<groupId>org.rocksdb</groupId> <groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId> <artifactId>rocksdbjni</artifactId>
<version>6.25.3</version> <version>6.26.1</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.lucene</groupId> <groupId>org.apache.lucene</groupId>
@ -538,8 +538,11 @@
<annotationProcessor>io.soabase.recordbuilder.processor.RecordBuilderProcessor</annotationProcessor> <annotationProcessor>io.soabase.recordbuilder.processor.RecordBuilderProcessor</annotationProcessor>
</annotationProcessors> </annotationProcessors>
<useIncrementalCompilation>false</useIncrementalCompilation> <useIncrementalCompilation>false</useIncrementalCompilation>
<compilerArgs>--enable-preview <compilerArgs>
<arg>--enable-preview</arg>
<arg>--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED</arg> <arg>--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED</arg>
<arg>--add-opens=java.base/java.lang=ALL-UNNAMED</arg>
<arg>--add-opens=java.base/java.nio=ALL-UNNAMED</arg>
</compilerArgs> </compilerArgs>
<source>17</source> <source>17</source>
<target>17</target> <target>17</target>
@ -575,7 +578,7 @@
</dependency> </dependency>
</dependencies> </dependencies>
<configuration> <configuration>
<argLine>--enable-preview --add-modules jdk.incubator.foreign -Dforeign.restricted=permit --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED --enable-native-access=ALL-UNNAMED</argLine> <argLine>--enable-preview --add-modules jdk.incubator.foreign -Dforeign.restricted=permit --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --enable-native-access=ALL-UNNAMED</argLine>
<systemProperties> <systemProperties>
<property> <property>
<name>ci</name> <name>ci</name>

View File

@ -1,14 +1,22 @@
package it.cavallium.dbengine.database; package it.cavallium.dbengine.database;
import static io.net5.buffer.api.StandardAllocationTypes.OFF_HEAP;
import static org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY; import static org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
import io.net5.buffer.api.AllocatorControl;
import io.net5.buffer.api.Buffer; import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator; import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.CompositeBuffer; import io.net5.buffer.api.CompositeBuffer;
import io.net5.buffer.api.MemoryManager;
import io.net5.buffer.api.ReadableComponent;
import io.net5.buffer.api.Resource; import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send; import io.net5.buffer.api.Send;
import io.net5.buffer.api.WritableComponent;
import io.net5.buffer.api.bytebuffer.ByteBufferMemoryManager;
import io.net5.buffer.api.internal.Statics;
import io.net5.buffer.api.unsafe.UnsafeMemoryManager;
import io.net5.util.IllegalReferenceCountException; import io.net5.util.IllegalReferenceCountException;
import io.net5.util.internal.PlatformDependent; import io.net5.util.internal.PlatformDependent;
import it.cavallium.dbengine.database.collections.DatabaseStage; import it.cavallium.dbengine.database.collections.DatabaseStage;
@ -27,6 +35,7 @@ import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.ToIntFunction; import java.util.function.ToIntFunction;
import org.apache.lucene.document.Document; import org.apache.lucene.document.Document;
@ -65,7 +74,11 @@ public class LLUtils {
public static final Marker MARKER_ROCKSDB = MarkerFactory.getMarker("ROCKSDB"); public static final Marker MARKER_ROCKSDB = MarkerFactory.getMarker("ROCKSDB");
public static final Marker MARKER_LUCENE = MarkerFactory.getMarker("LUCENE"); public static final Marker MARKER_LUCENE = MarkerFactory.getMarker("LUCENE");
private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocateDirect(0); public static final int INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES = 4096;
public static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocateDirect(0).asReadOnlyBuffer();
@Nullable
private static final MemoryManager UNSAFE_MEMORY_MANAGER;
private static final AllocatorControl NO_OP_ALLOCATION_CONTROL = (AllocatorControl) BufferAllocator.offHeapUnpooled();
private static final byte[] RESPONSE_TRUE = new byte[]{1}; private static final byte[] RESPONSE_TRUE = new byte[]{1};
private static final byte[] RESPONSE_FALSE = new byte[]{0}; private static final byte[] RESPONSE_FALSE = new byte[]{0};
private static final byte[] RESPONSE_TRUE_BUF = new byte[]{1}; private static final byte[] RESPONSE_TRUE_BUF = new byte[]{1};
@ -73,6 +86,13 @@ public class LLUtils {
public static final byte[][] LEXICONOGRAPHIC_ITERATION_SEEKS = new byte[256][1]; public static final byte[][] LEXICONOGRAPHIC_ITERATION_SEEKS = new byte[256][1];
static { static {
MemoryManager unsafeMemoryManager;
try {
unsafeMemoryManager = new UnsafeMemoryManager();
} catch (UnsupportedOperationException ignored) {
unsafeMemoryManager = new ByteBufferMemoryManager();
}
UNSAFE_MEMORY_MANAGER = unsafeMemoryManager;
for (int i1 = 0; i1 < 256; i1++) { for (int i1 = 0; i1 < 256; i1++) {
var b = LEXICONOGRAPHIC_ITERATION_SEEKS[i1]; var b = LEXICONOGRAPHIC_ITERATION_SEEKS[i1];
b[0] = (byte) i1; b[0] = (byte) i1;
@ -420,47 +440,36 @@ public class LLUtils {
@SuppressWarnings("ConstantConditions") @SuppressWarnings("ConstantConditions")
@Nullable @Nullable
public static Buffer readNullableDirectNioBuffer(BufferAllocator alloc, ToIntFunction<ByteBuffer> reader) { public static Buffer readNullableDirectNioBuffer(BufferAllocator alloc, ToIntFunction<ByteBuffer> reader) {
ByteBuffer directBuffer; var directBuffer = LLUtils.allocateShared(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES);
Buffer buffer; assert directBuffer.readerOffset() == 0;
{ assert directBuffer.writerOffset() == 0;
var direct = LLUtils.newDirect(alloc, 4096); var directBufferWriter = ((WritableComponent) directBuffer).writableBuffer();
directBuffer = direct.byteBuffer(); assert directBufferWriter.position() == 0;
buffer = direct.buffer().receive(); assert directBufferWriter.isDirect();
}
try { try {
int size; int trueSize = reader.applyAsInt(directBufferWriter);
do { if (trueSize == RocksDB.NOT_FOUND) {
directBuffer.limit(directBuffer.capacity()); directBuffer.close();
assert directBuffer.isDirect(); return null;
size = reader.applyAsInt(directBuffer);
if (size != RocksDB.NOT_FOUND) {
if (size == directBuffer.limit()) {
buffer.readerOffset(0).writerOffset(size);
return buffer;
} else {
assert size > directBuffer.limit();
assert directBuffer.limit() > 0;
// Free the buffer
if (directBuffer != null) {
// todo: check if free is needed
PlatformDependent.freeDirectBuffer(directBuffer);
directBuffer = null;
}
directBuffer = LLUtils.obtainDirect(buffer, true);
buffer.ensureWritable(size);
}
}
} while (size != RocksDB.NOT_FOUND);
// Return null if size is equal to RocksDB.NOT_FOUND
return null;
} finally {
// Free the buffer
if (directBuffer != null) {
// todo: check if free is needed
PlatformDependent.freeDirectBuffer(directBuffer);
directBuffer = null;
} }
int readSize = directBufferWriter.limit();
if (trueSize < readSize) {
throw new IllegalStateException();
} else if (trueSize == readSize) {
return directBuffer.writerOffset(directBufferWriter.limit());
} else {
assert directBuffer.readerOffset() == 0;
directBuffer.ensureWritable(trueSize);
assert directBuffer.writerOffset() == 0;
directBufferWriter = ((WritableComponent) directBuffer).writableBuffer();
assert directBufferWriter.position() == 0;
assert directBufferWriter.isDirect();
reader.applyAsInt(directBufferWriter);
return directBuffer.writerOffset(trueSize);
}
} catch (Throwable t) {
directBuffer.close();
throw t;
} }
} }
@ -614,86 +623,63 @@ public class LLUtils {
} }
} }
public static record DirectBuffer(@NotNull Send<Buffer> buffer, @NotNull ByteBuffer byteBuffer) {} @Deprecated
public record DirectBuffer(@NotNull Buffer buffer, @NotNull ByteBuffer byteBuffer) {}
@NotNull @NotNull
public static DirectBuffer newDirect(BufferAllocator allocator, int size) { public static ByteBuffer newDirect(int size) {
try (var buf = allocator.allocate(size)) { return ByteBuffer.allocateDirect(size);
var direct = obtainDirect(buf, true);
return new DirectBuffer(buf.send(), direct);
}
} }
@NotNull /**
public static DirectBuffer convertToReadableDirect(BufferAllocator allocator, Send<Buffer> content) { * The returned object will be also of type {@link WritableComponent} {@link ReadableComponent}
try (var buf = content.receive()) { */
DirectBuffer result; public static Buffer allocateShared(int size) {
if (buf.countComponents() == 1) { return LLUtils.UNSAFE_MEMORY_MANAGER.allocateShared(NO_OP_ALLOCATION_CONTROL, size, Statics.NO_OP_DROP, OFF_HEAP);
var direct = obtainDirect(buf, false);
result = new DirectBuffer(buf.send(), direct);
} else {
var direct = newDirect(allocator, buf.readableBytes());
try (var buf2 = direct.buffer().receive()) {
buf.copyInto(buf.readerOffset(), buf2, buf2.writerOffset(), buf.readableBytes());
buf2.writerOffset(buf2.writerOffset() + buf.readableBytes());
assert buf2.readableBytes() == buf.readableBytes();
result = new DirectBuffer(buf2.send(), direct.byteBuffer());
}
}
return result;
}
} }
/**
* Get the internal byte buffer, if present
*/
@Nullable
public static ByteBuffer asReadOnlyDirect(Buffer inputBuffer) {
var bytes = inputBuffer.readableBytes();
if (inputBuffer instanceof ReadableComponent rc) {
var componentBuffer = rc.readableBuffer();
if (componentBuffer != null && componentBuffer.isDirect()) {
assert componentBuffer.isReadOnly();
assert componentBuffer.isDirect();
return componentBuffer;
}
} else if (inputBuffer.countReadableComponents() == 1) {
AtomicReference<ByteBuffer> bufferRef = new AtomicReference<>();
inputBuffer.forEachReadable(0, (index, comp) -> {
var compBuffer = comp.readableBuffer();
if (compBuffer != null && compBuffer.isDirect()) {
bufferRef.setPlain(compBuffer);
}
return false;
});
var buffer = bufferRef.getPlain();
if (buffer != null) {
assert buffer.isReadOnly();
assert buffer.isDirect();
return buffer;
}
}
return null;
}
/**
* Copy the buffer into a newly allocated direct buffer
*/
@NotNull @NotNull
public static ByteBuffer obtainDirect(Buffer buffer, boolean writable) { public static ByteBuffer copyToNewDirectBuffer(Buffer inputBuffer) {
if (!PlatformDependent.hasUnsafe()) { int bytes = inputBuffer.readableBytes();
throw new UnsupportedOperationException("Please enable unsafe support or disable netty direct buffers", var directBuffer = ByteBuffer.allocateDirect(bytes);
PlatformDependent.getUnsafeUnavailabilityCause() inputBuffer.copyInto(inputBuffer.readerOffset(), directBuffer, 0, bytes);
); return directBuffer.asReadOnlyBuffer();
}
if (!MemorySegmentUtils.isSupported()) {
throw new UnsupportedOperationException("Foreign Memory Access API support is disabled."
+ " Please set \"" + MemorySegmentUtils.getSuggestedArgs() + "\"",
MemorySegmentUtils.getUnsupportedCause()
);
}
assert buffer.isAccessible();
if (buffer.readOnly()) {
throw new IllegalStateException("Buffer is read only");
}
buffer.compact();
assert buffer.readerOffset() == 0;
AtomicLong nativeAddress = new AtomicLong(0);
if (buffer.countComponents() == 1) {
if (writable) {
if (buffer.countWritableComponents() == 1) {
buffer.forEachWritable(0, (i, c) -> {
assert c.writableNativeAddress() != 0;
nativeAddress.setPlain(c.writableNativeAddress());
return false;
});
}
} else {
var readableComponents = buffer.countReadableComponents();
if (readableComponents == 1) {
buffer.forEachReadable(0, (i, c) -> {
assert c.readableNativeAddress() != 0;
nativeAddress.setPlain(c.readableNativeAddress());
return false;
});
}
}
}
if (nativeAddress.getPlain() == 0) {
if (buffer.capacity() == 0) {
return EMPTY_BYTE_BUFFER;
}
if (!buffer.isAccessible()) {
throw new IllegalStateException("Buffer is not accessible");
}
throw new IllegalStateException("Buffer is not direct");
}
return MemorySegmentUtils.directBuffer(nativeAddress.getPlain(), writable ? buffer.capacity() : buffer.writerOffset());
} }
public static Buffer fromByteArray(BufferAllocator alloc, byte[] array) { public static Buffer fromByteArray(BufferAllocator alloc, byte[] array) {

View File

@ -1,13 +1,22 @@
package it.cavallium.dbengine.database.disk; package it.cavallium.dbengine.database.disk;
import static io.net5.buffer.api.StandardAllocationTypes.OFF_HEAP;
import static it.cavallium.dbengine.database.LLUtils.INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.MeterRegistry;
import io.net5.buffer.api.AllocationType;
import io.net5.buffer.api.AllocatorControl;
import io.net5.buffer.api.Buffer; import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator; import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.CompositeBuffer;
import io.net5.buffer.api.MemoryManager;
import io.net5.buffer.api.Send; import io.net5.buffer.api.Send;
import io.net5.buffer.api.StandardAllocationTypes; import io.net5.buffer.api.StandardAllocationTypes;
import io.net5.buffer.api.WritableComponent;
import io.net5.buffer.api.internal.Statics;
import io.net5.buffer.api.unsafe.UnsafeMemoryManager;
import io.net5.util.internal.PlatformDependent; import io.net5.util.internal.PlatformDependent;
import it.cavallium.dbengine.client.DatabaseOptions; import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
@ -20,8 +29,10 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.CompactRangeOptions; import org.rocksdb.CompactRangeOptions;
import org.rocksdb.FileOperationInfo;
import org.rocksdb.FlushOptions; import org.rocksdb.FlushOptions;
import org.rocksdb.Holder; import org.rocksdb.Holder;
import org.rocksdb.KeyMayExistWorkaround;
import org.rocksdb.ReadOptions; import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB; import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException; import org.rocksdb.RocksDBException;
@ -32,11 +43,11 @@ import org.rocksdb.WriteOptions;
import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory; import org.warp.commonutils.log.LoggerFactory;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
import sun.misc.Unsafe;
public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements RocksDBColumn public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements RocksDBColumn
permits StandardRocksDBColumn, OptimisticRocksDBColumn, PessimisticRocksDBColumn { permits StandardRocksDBColumn, OptimisticRocksDBColumn, PessimisticRocksDBColumn {
private static final int INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES = 4096;
private static final byte[] NO_DATA = new byte[0]; private static final byte[] NO_DATA = new byte[0];
protected static final UpdateAtomicResult RESULT_NOTHING = new UpdateAtomicResultNothing(); protected static final UpdateAtomicResult RESULT_NOTHING = new UpdateAtomicResultNothing();
@ -58,7 +69,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
MeterRegistry meterRegistry) { MeterRegistry meterRegistry) {
this.db = db; this.db = db;
this.opts = databaseOptions; this.opts = databaseOptions;
this.nettyDirect = opts.allowNettyDirect() && alloc.getAllocationType() == StandardAllocationTypes.OFF_HEAP; this.nettyDirect = opts.allowNettyDirect() && alloc.getAllocationType() == OFF_HEAP;
this.alloc = alloc; this.alloc = alloc;
this.cfh = cfh; this.cfh = cfh;
@ -82,147 +93,179 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
} }
@Override @Override
public @Nullable Send<Buffer> get(@NotNull ReadOptions readOptions, public @Nullable Buffer get(@NotNull ReadOptions readOptions, Buffer key, boolean existsAlmostCertainly)
Send<Buffer> keySend, throws RocksDBException {
boolean existsAlmostCertainly) throws RocksDBException { if (Schedulers.isInNonBlockingThread()) {
try (var key = keySend.receive()) { throw new UnsupportedOperationException("Called dbGet in a nonblocking thread");
if (Schedulers.isInNonBlockingThread()) { }
throw new UnsupportedOperationException("Called dbGet in a nonblocking thread"); if (!db.isOwningHandle()) {
} throw new IllegalStateException("Database is closed");
if (!db.isOwningHandle()) { }
throw new IllegalStateException("Database is closed"); if (!readOptions.isOwningHandle()) {
} throw new IllegalStateException("ReadOptions is closed");
if (!readOptions.isOwningHandle()) { }
throw new IllegalStateException("ReadOptions is closed"); if (!cfh.isOwningHandle()) {
} throw new IllegalStateException("Column family is closed");
if (!cfh.isOwningHandle()) { }
throw new IllegalStateException("Column family is closed"); if (nettyDirect) {
} // Get the key nio buffer to pass to RocksDB
if (nettyDirect) { ByteBuffer keyNioBuffer = LLUtils.asReadOnlyDirect(key);
assert keyNioBuffer.isDirect();
//todo: implement keyMayExist if existsAlmostCertainly is false. boolean mustCloseKey;
// Unfortunately it's not feasible until RocksDB implements keyMayExist with buffers if (keyNioBuffer == null) {
mustCloseKey = true;
// Create the key nio buffer to pass to RocksDB // If the nio buffer is not available, copy the netty buffer into a new direct buffer
var keyNioBuffer = LLUtils.convertToReadableDirect(alloc, key.send()); keyNioBuffer = LLUtils.copyToNewDirectBuffer(key);
// Create a direct result buffer because RocksDB works only with direct buffers
try (Buffer resultBuf = alloc.allocate(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES)) {
int valueSize;
ByteBuffer resultNioBuf;
do {
// Create the result nio buffer to pass to RocksDB
resultNioBuf = LLUtils.obtainDirect(resultBuf, true);
assert keyNioBuffer.byteBuffer().isDirect();
assert resultNioBuf.isDirect();
// todo: use keyMayExist when rocksdb will implement keyMayExist with buffers
valueSize = db.get(cfh,
readOptions,
keyNioBuffer.byteBuffer().position(0),
resultNioBuf
);
if (valueSize != RocksDB.NOT_FOUND) {
// todo: check if position is equal to data that have been read
// todo: check if limit is equal to value size or data that have been read
assert valueSize <= 0 || resultNioBuf.limit() > 0;
// Check if read data is not bigger than the total value size.
// If it's bigger it means that RocksDB is writing the start
// of the result into the result buffer more than once.
assert resultNioBuf.limit() <= valueSize;
// Update data size metrics
this.lastDataSizeMetric.set(valueSize);
if (valueSize <= resultNioBuf.limit()) {
// Return the result ready to be read
return resultBuf.readerOffset(0).writerOffset(valueSize).send();
} else {
//noinspection UnusedAssignment
resultNioBuf = null;
}
// Rewind the keyNioBuf position, making it readable again for the next loop iteration
keyNioBuffer.byteBuffer().rewind();
if (resultBuf.capacity() < valueSize) {
// Expand the resultBuf size if the result is bigger than the current result
// buffer size
resultBuf.ensureWritable(valueSize);
}
}
// Repeat if the result has been found but it's still not finished
} while (valueSize != RocksDB.NOT_FOUND);
// If the value is not found return null
return null;
} finally {
keyNioBuffer.buffer().close();
PlatformDependent.freeDirectBuffer(keyNioBuffer.byteBuffer());
}
} else { } else {
mustCloseKey = false;
}
assert keyNioBuffer.limit() == key.readableBytes();
try {
// Create a direct result buffer because RocksDB works only with direct buffers
var resultBuffer = LLUtils.allocateShared(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES);
try { try {
byte[] keyArray = LLUtils.toArray(key); assert resultBuffer.readerOffset() == 0;
requireNonNull(keyArray); assert resultBuffer.writerOffset() == 0;
Holder<byte[]> data = existsAlmostCertainly ? null : new Holder<>(); var resultWritable = ((WritableComponent) resultBuffer).writableBuffer();
if (existsAlmostCertainly || db.keyMayExist(cfh, readOptions, keyArray, data)) {
if (!existsAlmostCertainly && data.getValue() != null) { var keyMayExist = db.keyMayExist(cfh, keyNioBuffer, resultWritable);
return LLUtils.fromByteArray(alloc, data.getValue()).send(); var keyMayExistState = KeyMayExistWorkaround.getExistenceState(keyMayExist);
} else { int keyMayExistValueLength = KeyMayExistWorkaround.getValueLength(keyMayExist);
byte[] result = db.get(cfh, readOptions, keyArray); // At the beginning, size reflects the expected size, then it becomes the real data size
if (result == null) { int size = keyMayExistState == 2 ? keyMayExistValueLength : -1;
switch (keyMayExistState) {
// kNotExist
case 0: {
resultBuffer.close();
return null;
}
// kExistsWithoutValue
case 1: {
assert keyMayExistValueLength == 0;
resultWritable.clear();
// real data size
size = db.get(cfh, readOptions, keyNioBuffer, resultWritable);
if (size == RocksDB.NOT_FOUND) {
resultBuffer.close();
return null; return null;
} else {
return LLUtils.fromByteArray(alloc, result).send();
} }
} }
// kExistsWithValue
case 2: {
// real data size
this.lastDataSizeMetric.set(size);
assert size >= 0;
if (size <= resultWritable.limit()) {
assert size == resultWritable.limit();
return resultBuffer.writerOffset(resultWritable.limit());
} else {
resultBuffer.ensureWritable(size);
resultWritable = ((WritableComponent) resultBuffer).writableBuffer();
assert resultBuffer.readerOffset() == 0;
assert resultBuffer.writerOffset() == 0;
size = db.get(cfh, readOptions, keyNioBuffer, resultWritable);
if (size == RocksDB.NOT_FOUND) {
resultBuffer.close();
return null;
}
assert size == resultWritable.limit();
return resultBuffer.writerOffset(resultWritable.limit());
}
}
default: {
throw new IllegalStateException();
}
}
} catch (Throwable t) {
resultBuffer.close();
throw t;
}
} finally {
if (mustCloseKey) {
PlatformDependent.freeDirectBuffer(keyNioBuffer);
}
}
} else {
try {
byte[] keyArray = LLUtils.toArray(key);
requireNonNull(keyArray);
Holder<byte[]> data = existsAlmostCertainly ? null : new Holder<>();
if (existsAlmostCertainly || db.keyMayExist(cfh, readOptions, keyArray, data)) {
if (!existsAlmostCertainly && data.getValue() != null) {
return LLUtils.fromByteArray(alloc, data.getValue());
} else { } else {
return null; byte[] result = db.get(cfh, readOptions, keyArray);
} if (result == null) {
} finally { return null;
if (!(readOptions instanceof UnreleasableReadOptions)) { } else {
readOptions.close(); return LLUtils.fromByteArray(alloc, result);
}
} }
} else {
return null;
}
} finally {
if (!(readOptions instanceof UnreleasableReadOptions)) {
readOptions.close();
} }
} }
} }
} }
@Override @Override
public void put(@NotNull WriteOptions writeOptions, Send<Buffer> keyToReceive, public void put(@NotNull WriteOptions writeOptions, Buffer key, Buffer value) throws RocksDBException {
Send<Buffer> valueToReceive) throws RocksDBException {
try { try {
try (var key = keyToReceive.receive()) { if (Schedulers.isInNonBlockingThread()) {
try (var value = valueToReceive.receive()) { throw new UnsupportedOperationException("Called dbPut in a nonblocking thread");
if (Schedulers.isInNonBlockingThread()) { }
throw new UnsupportedOperationException("Called dbPut in a nonblocking thread"); if (!db.isOwningHandle()) {
} throw new IllegalStateException("Database is closed");
if (!db.isOwningHandle()) { }
throw new IllegalStateException("Database is closed"); if (!writeOptions.isOwningHandle()) {
} throw new IllegalStateException("WriteOptions is closed");
if (!writeOptions.isOwningHandle()) { }
throw new IllegalStateException("WriteOptions is closed"); if (!cfh.isOwningHandle()) {
} throw new IllegalStateException("Column family is closed");
if (!cfh.isOwningHandle()) { }
throw new IllegalStateException("Column family is closed"); assert key.isAccessible();
} assert value.isAccessible();
assert key.isAccessible(); if (nettyDirect) {
assert value.isAccessible(); // Get the key nio buffer to pass to RocksDB
if (nettyDirect) { ByteBuffer keyNioBuffer = LLUtils.asReadOnlyDirect(key);
var keyNioBuffer = LLUtils.convertToReadableDirect(alloc, key.send()); boolean mustCloseKey;
try (var ignored1 = keyNioBuffer.buffer().receive()) { if (keyNioBuffer == null) {
assert keyNioBuffer.byteBuffer().isDirect(); mustCloseKey = true;
var valueNioBuffer = LLUtils.convertToReadableDirect(alloc, value.send()); // If the nio buffer is not available, copy the netty buffer into a new direct buffer
try (var ignored2 = valueNioBuffer.buffer().receive()) { keyNioBuffer = LLUtils.copyToNewDirectBuffer(key);
assert valueNioBuffer.byteBuffer().isDirect(); } else {
db.put(cfh, writeOptions, keyNioBuffer.byteBuffer(), valueNioBuffer.byteBuffer()); mustCloseKey = false;
} finally { }
PlatformDependent.freeDirectBuffer(valueNioBuffer.byteBuffer()); try {
} // Get the value nio buffer to pass to RocksDB
} finally { ByteBuffer valueNioBuffer = LLUtils.asReadOnlyDirect(value);
PlatformDependent.freeDirectBuffer(keyNioBuffer.byteBuffer()); boolean mustCloseValue;
} if (valueNioBuffer == null) {
mustCloseValue = true;
// If the nio buffer is not available, copy the netty buffer into a new direct buffer
valueNioBuffer = LLUtils.copyToNewDirectBuffer(value);
} else { } else {
db.put(cfh, writeOptions, LLUtils.toArray(key), LLUtils.toArray(value)); mustCloseValue = false;
}
try {
db.put(cfh, writeOptions, keyNioBuffer, valueNioBuffer);
} finally {
if (mustCloseValue) {
PlatformDependent.freeDirectBuffer(valueNioBuffer);
}
}
} finally {
if (mustCloseKey) {
PlatformDependent.freeDirectBuffer(keyNioBuffer);
} }
} }
} else {
db.put(cfh, writeOptions, LLUtils.toArray(key), LLUtils.toArray(value));
} }
} finally { } finally {
if (!(writeOptions instanceof UnreleasableWriteOptions)) { if (!(writeOptions instanceof UnreleasableWriteOptions)) {
@ -232,20 +275,43 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
} }
@Override @Override
public boolean exists(@NotNull ReadOptions readOptions, Send<Buffer> keySend) throws RocksDBException { public boolean exists(@NotNull ReadOptions readOptions, Buffer key) throws RocksDBException {
try (var key = keySend.receive()) { if (Schedulers.isInNonBlockingThread()) {
if (Schedulers.isInNonBlockingThread()) { throw new UnsupportedOperationException("Called containsKey in a nonblocking thread");
throw new UnsupportedOperationException("Called containsKey in a nonblocking thread"); }
if (!db.isOwningHandle()) {
throw new IllegalStateException("Database is closed");
}
if (!readOptions.isOwningHandle()) {
throw new IllegalStateException("ReadOptions is closed");
}
if (!cfh.isOwningHandle()) {
throw new IllegalStateException("Column family is closed");
}
if (nettyDirect) {
// Get the key nio buffer to pass to RocksDB
ByteBuffer keyNioBuffer = LLUtils.asReadOnlyDirect(key);
boolean mustCloseKey;
if (keyNioBuffer == null) {
mustCloseKey = true;
// If the nio buffer is not available, copy the netty buffer into a new direct buffer
keyNioBuffer = LLUtils.copyToNewDirectBuffer(key);
} else {
mustCloseKey = false;
} }
if (!db.isOwningHandle()) { try {
throw new IllegalStateException("Database is closed"); if (db.keyMayExist(cfh, keyNioBuffer)) {
} int size = db.get(cfh, readOptions, keyNioBuffer, LLUtils.EMPTY_BYTE_BUFFER);
if (!readOptions.isOwningHandle()) { return size != RocksDB.NOT_FOUND;
throw new IllegalStateException("ReadOptions is closed"); } else {
} return false;
if (!cfh.isOwningHandle()) { }
throw new IllegalStateException("Column family is closed"); } finally {
if (mustCloseKey) {
PlatformDependent.freeDirectBuffer(keyNioBuffer);
}
} }
} else {
int size = RocksDB.NOT_FOUND; int size = RocksDB.NOT_FOUND;
byte[] keyBytes = LLUtils.toArray(key); byte[] keyBytes = LLUtils.toArray(key);
Holder<byte[]> data = new Holder<>(); Holder<byte[]> data = new Holder<>();
@ -267,28 +333,36 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
} }
@Override @Override
public void delete(WriteOptions writeOptions, Send<Buffer> keySend) throws RocksDBException { public void delete(WriteOptions writeOptions, Buffer key) throws RocksDBException {
try (var key = keySend.receive()) { if (!db.isOwningHandle()) {
if (!db.isOwningHandle()) { throw new IllegalStateException("Database is closed");
throw new IllegalStateException("Database is closed"); }
} if (!writeOptions.isOwningHandle()) {
if (!writeOptions.isOwningHandle()) { throw new IllegalStateException("WriteOptions is closed");
throw new IllegalStateException("WriteOptions is closed"); }
} if (!cfh.isOwningHandle()) {
if (!cfh.isOwningHandle()) { throw new IllegalStateException("Column family is closed");
throw new IllegalStateException("Column family is closed"); }
} if (nettyDirect) {
if (nettyDirect) { // Get the key nio buffer to pass to RocksDB
DirectBuffer keyNioBuffer = LLUtils.convertToReadableDirect(alloc, key.send()); ByteBuffer keyNioBuffer = LLUtils.asReadOnlyDirect(key);
try { boolean mustCloseKey;
db.delete(cfh, writeOptions, keyNioBuffer.byteBuffer()); if (keyNioBuffer == null) {
} finally { mustCloseKey = true;
keyNioBuffer.buffer().close(); // If the nio buffer is not available, copy the netty buffer into a new direct buffer
PlatformDependent.freeDirectBuffer(keyNioBuffer.byteBuffer()); keyNioBuffer = LLUtils.copyToNewDirectBuffer(key);
}
} else { } else {
db.delete(cfh, writeOptions, LLUtils.toArray(key)); mustCloseKey = false;
} }
try {
db.delete(cfh, writeOptions, keyNioBuffer);
} finally {
if (mustCloseKey) {
PlatformDependent.freeDirectBuffer(keyNioBuffer);
}
}
} else {
db.delete(cfh, writeOptions, LLUtils.toArray(key));
} }
} }

View File

@ -210,15 +210,13 @@ public class LLLocalDictionary implements LLDictionary {
} }
try (logKey) { try (logKey) {
var readOptions = requireNonNullElse(resolveSnapshot(snapshot), EMPTY_READ_OPTIONS); var readOptions = requireNonNullElse(resolveSnapshot(snapshot), EMPTY_READ_OPTIONS);
var result = db.get(readOptions, key.send(), existsAlmostCertainly); var result = db.get(readOptions, key, existsAlmostCertainly);
if (logger.isTraceEnabled(MARKER_ROCKSDB)) { if (logger.isTraceEnabled(MARKER_ROCKSDB)) {
try (var result2 = result == null ? null : result.receive()) {
logger.trace(MARKER_ROCKSDB, "Reading {}: {}", LLUtils.toStringSafe(logKey), logger.trace(MARKER_ROCKSDB, "Reading {}: {}", LLUtils.toStringSafe(logKey),
LLUtils.toString(result2)); LLUtils.toString(result));
return result2 == null ? null : result2.send(); return result == null ? null : result.send();
}
} else { } else {
return result; return result == null ? null : result.send();
} }
} }
} catch (Exception ex) { } catch (Exception ex) {
@ -250,73 +248,49 @@ public class LLLocalDictionary implements LLDictionary {
return Mono.usingWhen(rangeMono, return Mono.usingWhen(rangeMono,
rangeSend -> runOnDb(() -> { rangeSend -> runOnDb(() -> {
// Temporary resources to release after finished // Temporary resources to release after finished
Buffer cloned1 = null;
Buffer cloned2 = null;
Buffer cloned3 = null;
ByteBuffer direct1 = null;
ByteBuffer direct2 = null;
ByteBuffer direct3 = null;
AbstractSlice<?> slice1 = null; AbstractSlice<?> slice1 = null;
AbstractSlice<?> slice2 = null; AbstractSlice<?> slice2 = null;
try { try (var range = rangeSend.receive()) {
try (var range = rangeSend.receive()) { if (Schedulers.isInNonBlockingThread()) {
if (Schedulers.isInNonBlockingThread()) { throw new UnsupportedOperationException("Called containsRange in a nonblocking thread");
throw new UnsupportedOperationException("Called containsRange in a nonblocking thread"); }
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
readOpts.setFillCache(false);
if (range.hasMin()) {
var rangeMinInternalByteBuffer = LLUtils.asReadOnlyDirect(range.getMinUnsafe());
if (nettyDirect && rangeMinInternalByteBuffer != null) {
readOpts.setIterateLowerBound(slice1 = new DirectSlice(rangeMinInternalByteBuffer,
range.getMinUnsafe().readableBytes()));
} else {
readOpts.setIterateLowerBound(slice1 = new Slice(LLUtils.toArray(range.getMinUnsafe())));
}
} }
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { if (range.hasMax()) {
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); var rangeMaxInternalByteBuffer = LLUtils.asReadOnlyDirect(range.getMaxUnsafe());
readOpts.setFillCache(false); if (nettyDirect && rangeMaxInternalByteBuffer != null) {
if (range.hasMin()) { readOpts.setIterateUpperBound(slice2 = new DirectSlice(rangeMaxInternalByteBuffer,
try (var rangeMin = range.getMin().receive()) { range.getMaxUnsafe().readableBytes()));
if (nettyDirect) { } else {
var directBuf = LLUtils.convertToReadableDirect(alloc, rangeMin.send()); readOpts.setIterateUpperBound(slice2 = new Slice(LLUtils.toArray(range.getMaxUnsafe())));
cloned1 = directBuf.buffer().receive();
direct1 = directBuf.byteBuffer();
readOpts.setIterateLowerBound(slice1 = new DirectSlice(directBuf.byteBuffer()));
} else {
readOpts.setIterateLowerBound(slice1 = new Slice(LLUtils.toArray(rangeMin)));
}
}
} }
if (range.hasMax()) { }
try (var rangeMax = range.getMax().receive()) { try (RocksIterator rocksIterator = db.newIterator(readOpts)) {
if (nettyDirect) { if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
var directBuf = LLUtils.convertToReadableDirect(alloc, rangeMax.send()); var rangeMinInternalByteBuffer = LLUtils.asReadOnlyDirect(range.getMinUnsafe());
cloned2 = directBuf.buffer().receive(); if (nettyDirect && rangeMinInternalByteBuffer != null) {
direct2 = directBuf.byteBuffer(); rocksIterator.seek(rangeMinInternalByteBuffer);
readOpts.setIterateUpperBound(slice2 = new DirectSlice(directBuf.byteBuffer()));
} else {
readOpts.setIterateUpperBound(slice2 = new Slice(LLUtils.toArray(rangeMax)));
}
}
}
try (RocksIterator rocksIterator = db.newIterator(readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
try (var rangeMin = range.getMin().receive()) {
if (nettyDirect) {
var directBuf = LLUtils.convertToReadableDirect(alloc, rangeMin.send());
cloned3 = directBuf.buffer().receive();
direct3 = directBuf.byteBuffer();
rocksIterator.seek(directBuf.byteBuffer());
} else {
rocksIterator.seek(LLUtils.toArray(rangeMin));
}
}
} else { } else {
rocksIterator.seekToFirst(); rocksIterator.seek(LLUtils.toArray(range.getMinUnsafe()));
} }
rocksIterator.status(); } else {
return rocksIterator.isValid(); rocksIterator.seekToFirst();
} }
rocksIterator.status();
return rocksIterator.isValid();
} }
} }
} finally { } finally {
if (cloned1 != null) cloned1.close();
if (cloned2 != null) cloned2.close();
if (cloned3 != null) cloned3.close();
if (direct1 != null) PlatformDependent.freeDirectBuffer(direct1);
if (direct2 != null) PlatformDependent.freeDirectBuffer(direct2);
if (direct3 != null) PlatformDependent.freeDirectBuffer(direct3);
if (slice1 != null) slice1.close(); if (slice1 != null) slice1.close();
if (slice2 != null) slice2.close(); if (slice2 != null) slice2.close();
} }
@ -328,7 +302,9 @@ public class LLLocalDictionary implements LLDictionary {
return Mono.usingWhen(keyMono, return Mono.usingWhen(keyMono,
keySend -> runOnDb(() -> { keySend -> runOnDb(() -> {
var unmodifiableReadOpts = resolveSnapshot(snapshot); var unmodifiableReadOpts = resolveSnapshot(snapshot);
return db.exists(unmodifiableReadOpts, keySend); try (var key = keySend.receive()) {
return db.exists(unmodifiableReadOpts, key);
}
}).onErrorMap(cause -> new IOException("Failed to read", cause)), }).onErrorMap(cause -> new IOException("Failed to read", cause)),
keySend -> Mono.fromRunnable(keySend::close) keySend -> Mono.fromRunnable(keySend::close)
); );
@ -351,7 +327,7 @@ public class LLLocalDictionary implements LLDictionary {
logger.trace(MARKER_ROCKSDB, "Writing {}: {}", logger.trace(MARKER_ROCKSDB, "Writing {}: {}",
LLUtils.toStringSafe(key), LLUtils.toStringSafe(value)); LLUtils.toStringSafe(key), LLUtils.toStringSafe(value));
} }
db.put(EMPTY_WRITE_OPTIONS, key.send(), value.send()); db.put(EMPTY_WRITE_OPTIONS, key, value);
return null; return null;
} }
} }
@ -437,18 +413,15 @@ public class LLLocalDictionary implements LLDictionary {
.getPreviousData(keyMono, resultType, true) .getPreviousData(keyMono, resultType, true)
.concatWith(this .concatWith(this
.<Send<Buffer>>runOnDb(() -> { .<Send<Buffer>>runOnDb(() -> {
try (keySend) { try (var key = keySend.receive()) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
try (var key = keySend.receive()) { logger.trace(MARKER_ROCKSDB, "Deleting {}", LLUtils.toStringSafe(key));
logger.trace(MARKER_ROCKSDB, "Deleting {}", LLUtils.toStringSafe(key)); db.delete(EMPTY_WRITE_OPTIONS, key);
db.delete(EMPTY_WRITE_OPTIONS, key.send());
}
return null;
} else { } else {
db.delete(EMPTY_WRITE_OPTIONS, keySend); db.delete(EMPTY_WRITE_OPTIONS, key);
return null;
} }
} }
return null;
}) })
.onErrorMap(cause -> new IOException("Failed to delete", cause)) .onErrorMap(cause -> new IOException("Failed to delete", cause))
) )
@ -468,22 +441,19 @@ public class LLLocalDictionary implements LLDictionary {
keyMono, keyMono,
keySend -> this keySend -> this
.runOnDb(() -> { .runOnDb(() -> {
try (keySend) { try (var key = keySend.receive()) {
if (Schedulers.isInNonBlockingThread()) { if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called getPreviousData in a nonblocking thread"); throw new UnsupportedOperationException("Called getPreviousData in a nonblocking thread");
} }
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
try (var key = keySend.receive()) { var keyString = LLUtils.toStringSafe(key);
var keyString = LLUtils.toStringSafe(key); var result = db.get(EMPTY_READ_OPTIONS, key, existsAlmostCertainly);
var result = db.get(EMPTY_READ_OPTIONS, key.send(), existsAlmostCertainly); logger.trace(MARKER_ROCKSDB, "Reading {}: {}", keyString, LLUtils.toStringSafe(result));
try (var bufferResult = result == null ? null : result.receive()) { return result == null ? null : result.send();
logger.trace(MARKER_ROCKSDB, "Reading {}: {}", keyString, LLUtils.toStringSafe(bufferResult));
return bufferResult == null ? null : bufferResult.send();
}
}
} else { } else {
return db.get(EMPTY_READ_OPTIONS, keySend, existsAlmostCertainly); var result = db.get(EMPTY_READ_OPTIONS, key, existsAlmostCertainly);
return result == null ? null : result.send();
} }
} }
}) })
@ -575,9 +545,9 @@ public class LLLocalDictionary implements LLDictionary {
try (var readOptions = resolveSnapshot(null)) { try (var readOptions = resolveSnapshot(null)) {
for (LLEntry entry : entriesWindow) { for (LLEntry entry : entriesWindow) {
try (var key = entry.getKey().receive()) { try (var key = entry.getKey().receive()) {
Send<Buffer> oldValue = db.get(readOptions, key.copy().send(), false); Buffer oldValue = db.get(readOptions, key, false);
if (oldValue != null) { if (oldValue != null) {
oldValues.add(LLEntry.of(key.send(), oldValue).send()); oldValues.add(LLEntry.of(key, oldValue).send());
} }
} }
} }
@ -610,7 +580,7 @@ public class LLLocalDictionary implements LLDictionary {
batch.close(); batch.close();
} else { } else {
for (LLEntry entry : entriesWindow) { for (LLEntry entry : entriesWindow) {
db.put(EMPTY_WRITE_OPTIONS, entry.getKey(), entry.getValue()); db.put(EMPTY_WRITE_OPTIONS, entry.getKeyUnsafe(), entry.getValueUnsafe());
} }
} }
return oldValues; return oldValues;
@ -719,7 +689,7 @@ public class LLLocalDictionary implements LLDictionary {
} else { } else {
int i = 0; int i = 0;
for (Tuple2<K, Buffer> entry : entriesWindow) { for (Tuple2<K, Buffer> entry : entriesWindow) {
db.put(EMPTY_WRITE_OPTIONS, entry.getT2().send(), updatedValuesToWrite.get(i).send()); db.put(EMPTY_WRITE_OPTIONS, entry.getT2(), updatedValuesToWrite.get(i));
i++; i++;
} }
} }
@ -959,22 +929,14 @@ public class LLLocalDictionary implements LLDictionary {
try (var opts = new ReadOptions(EMPTY_READ_OPTIONS)) { try (var opts = new ReadOptions(EMPTY_READ_OPTIONS)) {
ReleasableSlice minBound; ReleasableSlice minBound;
if (range.hasMin()) { if (range.hasMin()) {
minBound = setIterateBound(alloc, nettyDirect, minBound = setIterateBound(nettyDirect, opts, IterateBound.LOWER, range.getMinUnsafe());
opts,
IterateBound.LOWER,
range.getMin()
);
} else { } else {
minBound = emptyReleasableSlice(); minBound = emptyReleasableSlice();
} }
try { try {
ReleasableSlice maxBound; ReleasableSlice maxBound;
if (range.hasMax()) { if (range.hasMax()) {
maxBound = setIterateBound(alloc, nettyDirect, maxBound = setIterateBound(nettyDirect, opts, IterateBound.UPPER, range.getMaxUnsafe());
opts,
IterateBound.UPPER,
range.getMax()
);
} else { } else {
maxBound = emptyReleasableSlice(); maxBound = emptyReleasableSlice();
} }
@ -983,7 +945,7 @@ public class LLLocalDictionary implements LLDictionary {
SafeCloseable seekTo; SafeCloseable seekTo;
try (RocksIterator it = db.newIterator(opts)) { try (RocksIterator it = db.newIterator(opts)) {
if (!PREFER_SEEK_TO_FIRST && range.hasMin()) { if (!PREFER_SEEK_TO_FIRST && range.hasMin()) {
seekTo = rocksIterSeekTo(alloc, nettyDirect, it, range.getMin()); seekTo = rocksIterSeekTo(nettyDirect, it, range.getMinUnsafe());
} else { } else {
seekTo = null; seekTo = null;
it.seekToFirst(); it.seekToFirst();
@ -1049,7 +1011,7 @@ public class LLLocalDictionary implements LLDictionary {
if (!USE_WRITE_BATCHES_IN_SET_RANGE) { if (!USE_WRITE_BATCHES_IN_SET_RANGE) {
for (LLEntry entry : entriesList) { for (LLEntry entry : entriesList) {
assert entry.isAccessible(); assert entry.isAccessible();
db.put(EMPTY_WRITE_OPTIONS, entry.getKey(), entry.getValue()); db.put(EMPTY_WRITE_OPTIONS, entry.getKeyUnsafe(), entry.getValueUnsafe());
} }
} else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) { } else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) {
try (var batch = new CappedWriteBatch(db, try (var batch = new CappedWriteBatch(db,
@ -1106,7 +1068,7 @@ public class LLLocalDictionary implements LLDictionary {
.getRange(null, rangeMono, false) .getRange(null, rangeMono, false)
.flatMap(oldValueSend -> this.<Void>runOnDb(() -> { .flatMap(oldValueSend -> this.<Void>runOnDb(() -> {
try (var oldValue = oldValueSend.receive()) { try (var oldValue = oldValueSend.receive()) {
db.delete(EMPTY_WRITE_OPTIONS, oldValue.getKey()); db.delete(EMPTY_WRITE_OPTIONS, oldValue.getKeyUnsafe());
return null; return null;
} }
})) }))
@ -1136,23 +1098,21 @@ public class LLLocalDictionary implements LLDictionary {
readOpts.setFillCache(false); readOpts.setFillCache(false);
ReleasableSlice minBound; ReleasableSlice minBound;
if (range.hasMin()) { if (range.hasMin()) {
minBound = setIterateBound(alloc, nettyDirect, readOpts, minBound = setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe());
IterateBound.LOWER, range.getMin());
} else { } else {
minBound = emptyReleasableSlice(); minBound = emptyReleasableSlice();
} }
try { try {
ReleasableSlice maxBound; ReleasableSlice maxBound;
if (range.hasMax()) { if (range.hasMax()) {
maxBound = setIterateBound(alloc, nettyDirect, readOpts, maxBound = setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe());
IterateBound.UPPER, range.getMax());
} else { } else {
maxBound = emptyReleasableSlice(); maxBound = emptyReleasableSlice();
} }
try (var rocksIterator = db.newIterator(readOpts)) { try (var rocksIterator = db.newIterator(readOpts)) {
SafeCloseable seekTo; SafeCloseable seekTo;
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
seekTo = rocksIterSeekTo(alloc, nettyDirect, rocksIterator, range.getMin()); seekTo = rocksIterSeekTo(nettyDirect, rocksIterator, range.getMinUnsafe());
} else { } else {
seekTo = null; seekTo = null;
rocksIterator.seekToFirst(); rocksIterator.seekToFirst();
@ -1188,23 +1148,21 @@ public class LLLocalDictionary implements LLDictionary {
readOpts.setFillCache(false); readOpts.setFillCache(false);
ReleasableSlice minBound; ReleasableSlice minBound;
if (range.hasMin()) { if (range.hasMin()) {
minBound = setIterateBound(alloc, nettyDirect, readOpts, minBound = setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe());
IterateBound.LOWER, range.getMin());
} else { } else {
minBound = emptyReleasableSlice(); minBound = emptyReleasableSlice();
} }
try { try {
ReleasableSlice maxBound; ReleasableSlice maxBound;
if (range.hasMax()) { if (range.hasMax()) {
maxBound = setIterateBound(alloc, nettyDirect, readOpts, IterateBound.UPPER, maxBound = setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe());
range.getMax());
} else { } else {
maxBound = emptyReleasableSlice(); maxBound = emptyReleasableSlice();
} }
try (var rocksIterator = db.newIterator(readOpts)) { try (var rocksIterator = db.newIterator(readOpts)) {
SafeCloseable seekTo; SafeCloseable seekTo;
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
seekTo = rocksIterSeekTo(alloc, nettyDirect, rocksIterator, range.getMin()); seekTo = rocksIterSeekTo(nettyDirect, rocksIterator, range.getMinUnsafe());
} else { } else {
seekTo = null; seekTo = null;
rocksIterator.seekToFirst(); rocksIterator.seekToFirst();
@ -1232,59 +1190,42 @@ public class LLLocalDictionary implements LLDictionary {
} }
@Nullable @Nullable
private static SafeCloseable rocksIterSeekTo(BufferAllocator alloc, boolean allowNettyDirect, private static SafeCloseable rocksIterSeekTo(boolean allowNettyDirect,
RocksIterator rocksIterator, Send<Buffer> bufferToReceive) { RocksIterator rocksIterator, Buffer key) {
try (var buffer = bufferToReceive.receive()) { ByteBuffer keyInternalByteBuffer;
if (allowNettyDirect) { if (allowNettyDirect && (keyInternalByteBuffer = LLUtils.asReadOnlyDirect(key)) != null) {
var direct = LLUtils.convertToReadableDirect(alloc, buffer.send()); rocksIterator.seek(keyInternalByteBuffer);
assert direct.byteBuffer().isDirect(); return null;
rocksIterator.seek(direct.byteBuffer()); } else {
return () -> { rocksIterator.seek(LLUtils.toArray(key));
direct.buffer().close(); return null;
PlatformDependent.freeDirectBuffer(direct.byteBuffer());
};
} else {
rocksIterator.seek(LLUtils.toArray(buffer));
return null;
}
} }
} }
private static ReleasableSlice setIterateBound(BufferAllocator alloc, boolean allowNettyDirect, private static ReleasableSlice setIterateBound(boolean allowNettyDirect,
ReadOptions readOpts, IterateBound boundType, Send<Buffer> bufferToReceive) { ReadOptions readOpts, IterateBound boundType, Buffer key) {
var buffer = bufferToReceive.receive(); requireNonNull(key);
try { AbstractSlice<?> slice;
requireNonNull(buffer); ByteBuffer keyInternalByteBuffer;
AbstractSlice<?> slice; if (allowNettyDirect && LLLocalDictionary.USE_DIRECT_BUFFER_BOUNDS
if (allowNettyDirect && LLLocalDictionary.USE_DIRECT_BUFFER_BOUNDS) { && (keyInternalByteBuffer = LLUtils.asReadOnlyDirect(key)) != null) {
var direct = LLUtils.convertToReadableDirect(alloc, buffer.send()); slice = new DirectSlice(keyInternalByteBuffer, key.readableBytes());
buffer = direct.buffer().receive(); assert slice.size() == key.readableBytes();
assert direct.byteBuffer().isDirect(); assert slice.compare(new Slice(LLUtils.toArray(key))) == 0;
slice = new DirectSlice(direct.byteBuffer(), buffer.readableBytes()); if (boundType == IterateBound.LOWER) {
assert slice.size() == buffer.readableBytes(); readOpts.setIterateLowerBound(slice);
assert slice.compare(new Slice(LLUtils.toArray(buffer))) == 0;
if (boundType == IterateBound.LOWER) {
readOpts.setIterateLowerBound(slice);
} else {
readOpts.setIterateUpperBound(slice);
}
return new ReleasableSliceImpl(slice, buffer, direct.byteBuffer());
} else { } else {
try { readOpts.setIterateUpperBound(slice);
slice = new Slice(requireNonNull(LLUtils.toArray(buffer)));
if (boundType == IterateBound.LOWER) {
readOpts.setIterateLowerBound(slice);
} else {
readOpts.setIterateUpperBound(slice);
}
return new ReleasableSliceImpl(slice, null, null);
} finally {
buffer.close();
}
} }
} catch (Throwable e) { return new ReleasableSliceImpl(slice, null, null);
buffer.close(); } else {
throw e; slice = new Slice(requireNonNull(LLUtils.toArray(key)));
if (boundType == IterateBound.LOWER) {
readOpts.setIterateLowerBound(slice);
} else {
readOpts.setIterateUpperBound(slice);
}
return new ReleasableSliceImpl(slice, null, null);
} }
} }
@ -1400,16 +1341,14 @@ public class LLLocalDictionary implements LLDictionary {
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
ReleasableSlice minBound; ReleasableSlice minBound;
if (range.hasMin()) { if (range.hasMin()) {
minBound = setIterateBound(alloc, nettyDirect, readOpts, minBound = setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe());
IterateBound.LOWER, range.getMin());
} else { } else {
minBound = emptyReleasableSlice(); minBound = emptyReleasableSlice();
} }
try { try {
ReleasableSlice maxBound; ReleasableSlice maxBound;
if (range.hasMax()) { if (range.hasMax()) {
maxBound = setIterateBound(alloc, nettyDirect, readOpts, maxBound = setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe());
IterateBound.UPPER, range.getMax());
} else { } else {
maxBound = emptyReleasableSlice(); maxBound = emptyReleasableSlice();
} }
@ -1421,8 +1360,7 @@ public class LLLocalDictionary implements LLDictionary {
try (var rocksIterator = db.newIterator(readOpts)) { try (var rocksIterator = db.newIterator(readOpts)) {
SafeCloseable seekTo; SafeCloseable seekTo;
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
seekTo = rocksIterSeekTo(alloc, nettyDirect, seekTo = rocksIterSeekTo(nettyDirect, rocksIterator, range.getMinUnsafe());
rocksIterator, range.getMin());
} else { } else {
seekTo = null; seekTo = null;
rocksIterator.seekToFirst(); rocksIterator.seekToFirst();
@ -1466,24 +1404,21 @@ public class LLLocalDictionary implements LLDictionary {
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
ReleasableSlice minBound; ReleasableSlice minBound;
if (range.hasMin()) { if (range.hasMin()) {
minBound = setIterateBound(alloc, nettyDirect, readOpts, minBound = setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe());
IterateBound.LOWER, range.getMin());
} else { } else {
minBound = emptyReleasableSlice(); minBound = emptyReleasableSlice();
} }
try { try {
ReleasableSlice maxBound; ReleasableSlice maxBound;
if (range.hasMax()) { if (range.hasMax()) {
maxBound = setIterateBound(alloc, nettyDirect, readOpts, maxBound = setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe());
IterateBound.UPPER, range.getMax());
} else { } else {
maxBound = emptyReleasableSlice(); maxBound = emptyReleasableSlice();
} }
try (var rocksIterator = db.newIterator(readOpts)) { try (var rocksIterator = db.newIterator(readOpts)) {
SafeCloseable seekTo; SafeCloseable seekTo;
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
seekTo = rocksIterSeekTo(alloc, nettyDirect, seekTo = rocksIterSeekTo(nettyDirect, rocksIterator, range.getMinUnsafe());
rocksIterator, range.getMin());
} else { } else {
seekTo = null; seekTo = null;
rocksIterator.seekToFirst(); rocksIterator.seekToFirst();
@ -1528,24 +1463,21 @@ public class LLLocalDictionary implements LLDictionary {
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
ReleasableSlice minBound; ReleasableSlice minBound;
if (range.hasMin()) { if (range.hasMin()) {
minBound = setIterateBound(alloc, nettyDirect, readOpts, minBound = setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe());
IterateBound.LOWER, range.getMin());
} else { } else {
minBound = emptyReleasableSlice(); minBound = emptyReleasableSlice();
} }
try { try {
ReleasableSlice maxBound; ReleasableSlice maxBound;
if (range.hasMax()) { if (range.hasMax()) {
maxBound = setIterateBound(alloc, nettyDirect, readOpts, maxBound = setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe());
IterateBound.UPPER, range.getMax());
} else { } else {
maxBound = emptyReleasableSlice(); maxBound = emptyReleasableSlice();
} }
try (var rocksIterator = db.newIterator(readOpts)) { try (var rocksIterator = db.newIterator(readOpts)) {
SafeCloseable seekTo; SafeCloseable seekTo;
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
seekTo = rocksIterSeekTo(alloc, nettyDirect, seekTo = rocksIterSeekTo(nettyDirect, rocksIterator, range.getMinUnsafe());
rocksIterator, range.getMin());
} else { } else {
seekTo = null; seekTo = null;
rocksIterator.seekToFirst(); rocksIterator.seekToFirst();
@ -1697,24 +1629,21 @@ public class LLLocalDictionary implements LLDictionary {
try (var readOpts = new ReadOptions(getReadOptions(null))) { try (var readOpts = new ReadOptions(getReadOptions(null))) {
ReleasableSlice minBound; ReleasableSlice minBound;
if (range.hasMin()) { if (range.hasMin()) {
minBound = setIterateBound(alloc, nettyDirect, readOpts, minBound = setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe());
IterateBound.LOWER, range.getMin());
} else { } else {
minBound = emptyReleasableSlice(); minBound = emptyReleasableSlice();
} }
try { try {
ReleasableSlice maxBound; ReleasableSlice maxBound;
if (range.hasMax()) { if (range.hasMax()) {
maxBound = setIterateBound(alloc, nettyDirect, readOpts, maxBound = setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe());
IterateBound.UPPER, range.getMax());
} else { } else {
maxBound = emptyReleasableSlice(); maxBound = emptyReleasableSlice();
} }
try (RocksIterator rocksIterator = db.newIterator(readOpts)) { try (RocksIterator rocksIterator = db.newIterator(readOpts)) {
SafeCloseable seekTo; SafeCloseable seekTo;
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
seekTo = rocksIterSeekTo(alloc, nettyDirect, seekTo = rocksIterSeekTo(nettyDirect, rocksIterator, range.getMinUnsafe());
rocksIterator, range.getMin());
} else { } else {
seekTo = null; seekTo = null;
rocksIterator.seekToFirst(); rocksIterator.seekToFirst();
@ -1724,12 +1653,10 @@ public class LLLocalDictionary implements LLDictionary {
if (!rocksIterator.isValid()) { if (!rocksIterator.isValid()) {
return null; return null;
} }
try (Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key)) { Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
try (Buffer value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value)) { Buffer value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value);
db.delete(EMPTY_WRITE_OPTIONS, key.copy().send()); db.delete(EMPTY_WRITE_OPTIONS, key);
return LLEntry.of(key.send(), value.send()).send(); return LLEntry.of(key, value).send();
}
}
} finally { } finally {
if (seekTo != null) { if (seekTo != null) {
seekTo.close(); seekTo.close();
@ -1761,21 +1688,20 @@ public class LLLocalDictionary implements LLDictionary {
ReleasableSlice sliceMin; ReleasableSlice sliceMin;
ReleasableSlice sliceMax; ReleasableSlice sliceMax;
if (range.hasMin()) { if (range.hasMin()) {
sliceMin = setIterateBound(alloc, allowNettyDirect, readOptions, IterateBound.LOWER, range.getMin()); sliceMin = setIterateBound(allowNettyDirect, readOptions, IterateBound.LOWER, range.getMinUnsafe());
} else { } else {
sliceMin = emptyReleasableSlice(); sliceMin = emptyReleasableSlice();
} }
if (range.hasMax()) { if (range.hasMax()) {
sliceMax = setIterateBound(alloc, allowNettyDirect, readOptions, IterateBound.UPPER, range.getMax()); sliceMax = setIterateBound(allowNettyDirect, readOptions, IterateBound.UPPER, range.getMaxUnsafe());
} else { } else {
sliceMax = emptyReleasableSlice(); sliceMax = emptyReleasableSlice();
} }
var rocksIterator = db.newIterator(readOptions); var rocksIterator = db.newIterator(readOptions);
SafeCloseable seekTo; SafeCloseable seekTo;
if (!PREFER_SEEK_TO_FIRST && range.hasMin()) { if (!PREFER_SEEK_TO_FIRST && range.hasMin()) {
seekTo = Objects.requireNonNullElseGet(rocksIterSeekTo(alloc, allowNettyDirect, seekTo = Objects.requireNonNullElseGet(rocksIterSeekTo(allowNettyDirect, rocksIterator, range.getMinUnsafe()),
rocksIterator, range.getMin()), () -> ((SafeCloseable) () -> {}) () -> ((SafeCloseable) () -> {}));
);
} else { } else {
seekTo = () -> {}; seekTo = () -> {};
rocksIterator.seekToFirst(); rocksIterator.seekToFirst();

View File

@ -34,24 +34,22 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn {
var allocator = getAllocator(); var allocator = getAllocator();
try (var keyBuf = allocator.allocate(key.length)) { try (var keyBuf = allocator.allocate(key.length)) {
keyBuf.writeBytes(key); keyBuf.writeBytes(key);
var result = this.get(readOptions, keyBuf.send(), existsAlmostCertainly); var result = this.get(readOptions, keyBuf, existsAlmostCertainly);
if (result == null) { if (result == null) {
return null; return null;
} }
try (var resultBuf = result.receive()) { return LLUtils.toArray(result);
return LLUtils.toArray(resultBuf);
}
} }
} }
@Nullable @Nullable
Send<Buffer> get(@NotNull ReadOptions readOptions, Send<Buffer> keySend, Buffer get(@NotNull ReadOptions readOptions, Buffer key,
boolean existsAlmostCertainly) throws RocksDBException; boolean existsAlmostCertainly) throws RocksDBException;
boolean exists(@NotNull ReadOptions readOptions, Send<Buffer> keySend) throws RocksDBException; boolean exists(@NotNull ReadOptions readOptions, Buffer key) throws RocksDBException;
void put(@NotNull WriteOptions writeOptions, Send<Buffer> keyToReceive, void put(@NotNull WriteOptions writeOptions, Buffer key,
Send<Buffer> valueToReceive) throws RocksDBException; Buffer value) throws RocksDBException;
default void put(@NotNull WriteOptions writeOptions, byte[] key, byte[] value) default void put(@NotNull WriteOptions writeOptions, byte[] key, byte[] value)
throws RocksDBException { throws RocksDBException {
@ -61,7 +59,7 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn {
try (var valBuf = allocator.allocate(value.length)) { try (var valBuf = allocator.allocate(value.length)) {
valBuf.writeBytes(value); valBuf.writeBytes(value);
this.put(writeOptions, keyBuf.send(), valBuf.send()); this.put(writeOptions, keyBuf, valBuf);
} }
} }
} }
@ -72,7 +70,7 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn {
Send<Buffer> keySend, SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater, Send<Buffer> keySend, SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
boolean existsAlmostCertainly, UpdateAtomicResultMode returnMode) throws RocksDBException, IOException; boolean existsAlmostCertainly, UpdateAtomicResultMode returnMode) throws RocksDBException, IOException;
void delete(WriteOptions writeOptions, Send<Buffer> keySend) throws RocksDBException; void delete(WriteOptions writeOptions, Buffer key) throws RocksDBException;
void delete(WriteOptions writeOptions, byte[] key) throws RocksDBException; void delete(WriteOptions writeOptions, byte[] key) throws RocksDBException;

View File

@ -62,12 +62,7 @@ public final class StandardRocksDBColumn extends AbstractRocksDBColumn<RocksDB>
prevData = null; prevData = null;
} }
} else { } else {
var obtainedPrevData = this.get(readOptions, key.copy().send(), existsAlmostCertainly); prevData = this.get(readOptions, key, existsAlmostCertainly);
if (obtainedPrevData == null) {
prevData = null;
} else {
prevData = obtainedPrevData.receive();
}
} }
} else { } else {
prevData = null; prevData = null;
@ -101,7 +96,7 @@ public final class StandardRocksDBColumn extends AbstractRocksDBColumn<RocksDB>
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key)); logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
} }
this.delete(writeOptions, key.send()); this.delete(writeOptions, key);
changed = true; changed = true;
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) { } else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
@ -115,7 +110,7 @@ public final class StandardRocksDBColumn extends AbstractRocksDBColumn<RocksDB>
dataToPut = newData; dataToPut = newData;
} }
try { try {
this.put(writeOptions, key.send(), dataToPut.send()); this.put(writeOptions, key, dataToPut);
changed = true; changed = true;
} finally { } finally {
if (dataToPut != newData) { if (dataToPut != newData) {

View File

@ -108,21 +108,15 @@ public class CappedWriteBatch extends WriteBatch {
Send<Buffer> valueToReceive) throws RocksDBException { Send<Buffer> valueToReceive) throws RocksDBException {
var key = keyToReceive.receive(); var key = keyToReceive.receive();
var value = valueToReceive.receive(); var value = valueToReceive.receive();
if (USE_FAST_DIRECT_BUFFERS && isDirect(key) && isDirect(value)) { ByteBuffer keyNioBuffer;
ByteBuffer valueNioBuffer;
if (USE_FAST_DIRECT_BUFFERS
&& (keyNioBuffer = LLUtils.asReadOnlyDirect(key)) != null
&& (valueNioBuffer = LLUtils.asReadOnlyDirect(value)) != null) {
buffersToRelease.add(value); buffersToRelease.add(value);
var keyNioBuffer = LLUtils.convertToReadableDirect(alloc, key.send());
key = keyNioBuffer.buffer().receive();
buffersToRelease.add(key); buffersToRelease.add(key);
byteBuffersToRelease.add(keyNioBuffer.byteBuffer());
assert keyNioBuffer.byteBuffer().isDirect();
var valueNioBuffer = LLUtils.convertToReadableDirect(alloc, value.send()); super.put(columnFamilyHandle, keyNioBuffer, valueNioBuffer);
value = valueNioBuffer.buffer().receive();
buffersToRelease.add(value);
byteBuffersToRelease.add(valueNioBuffer.byteBuffer());
assert valueNioBuffer.byteBuffer().isDirect();
super.put(columnFamilyHandle, keyNioBuffer.byteBuffer(), valueNioBuffer.byteBuffer());
} else { } else {
try { try {
byte[] keyArray = LLUtils.toArray(key); byte[] keyArray = LLUtils.toArray(key);
@ -176,19 +170,16 @@ public class CappedWriteBatch extends WriteBatch {
public synchronized void delete(ColumnFamilyHandle columnFamilyHandle, Send<Buffer> keyToReceive) throws RocksDBException { public synchronized void delete(ColumnFamilyHandle columnFamilyHandle, Send<Buffer> keyToReceive) throws RocksDBException {
var key = keyToReceive.receive(); var key = keyToReceive.receive();
if (USE_FAST_DIRECT_BUFFERS) { ByteBuffer keyNioBuffer;
var keyNioBuffer = LLUtils.convertToReadableDirect(alloc, key.send()); if (USE_FAST_DIRECT_BUFFERS && (keyNioBuffer = LLUtils.asReadOnlyDirect(key)) != null) {
key = keyNioBuffer.buffer().receive();
buffersToRelease.add(key); buffersToRelease.add(key);
byteBuffersToRelease.add(keyNioBuffer.byteBuffer());
assert keyNioBuffer.byteBuffer().isDirect();
removeDirect(nativeHandle_, removeDirect(nativeHandle_,
keyNioBuffer.byteBuffer(), keyNioBuffer,
keyNioBuffer.byteBuffer().position(), keyNioBuffer.position(),
keyNioBuffer.byteBuffer().remaining(), keyNioBuffer.remaining(),
columnFamilyHandle.nativeHandle_ columnFamilyHandle.nativeHandle_
); );
keyNioBuffer.byteBuffer().position(keyNioBuffer.byteBuffer().limit()); keyNioBuffer.position(keyNioBuffer.limit());
} else { } else {
try { try {
super.delete(columnFamilyHandle, LLUtils.toArray(key)); super.delete(columnFamilyHandle, LLUtils.toArray(key));

View File

@ -0,0 +1,23 @@
package org.rocksdb;
public class KeyMayExistWorkaround {
/**
* @return real value length
*/
public static int getValueLength(KeyMayExist keyMayExist) {
return keyMayExist.valueLength;
}
/**
* 0 = not exists
*
* 1 = exists without value
*
* 2 = exists with value
*
*/
public static int getExistenceState(KeyMayExist keyMayExist) {
return keyMayExist.exists.ordinal();
}
}