Improve direct buffer support

This commit is contained in:
Andrea Cavalli 2022-03-16 19:19:26 +01:00
parent 0a6a0657a3
commit ba3765eece
7 changed files with 173 additions and 156 deletions

View File

@ -9,6 +9,8 @@ import io.netty5.buffer.api.AllocatorControl;
import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.BufferAllocator; import io.netty5.buffer.api.BufferAllocator;
import io.netty5.buffer.api.CompositeBuffer; import io.netty5.buffer.api.CompositeBuffer;
import io.netty5.buffer.api.DefaultBufferAllocators;
import io.netty5.buffer.api.Drop;
import io.netty5.buffer.api.MemoryManager; import io.netty5.buffer.api.MemoryManager;
import io.netty5.buffer.api.ReadableComponent; import io.netty5.buffer.api.ReadableComponent;
import io.netty5.buffer.api.Resource; import io.netty5.buffer.api.Resource;
@ -80,8 +82,6 @@ public class LLUtils {
public static final int INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES = 4096; public static final int INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES = 4096;
public static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocateDirect(0).asReadOnlyBuffer(); public static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocateDirect(0).asReadOnlyBuffer();
@Nullable
private static final MemoryManager UNSAFE_MEMORY_MANAGER;
private static final AllocatorControl NO_OP_ALLOCATION_CONTROL = (AllocatorControl) BufferAllocator.offHeapUnpooled(); private static final AllocatorControl NO_OP_ALLOCATION_CONTROL = (AllocatorControl) BufferAllocator.offHeapUnpooled();
private static final byte[] RESPONSE_TRUE = new byte[]{1}; private static final byte[] RESPONSE_TRUE = new byte[]{1};
private static final byte[] RESPONSE_FALSE = new byte[]{0}; private static final byte[] RESPONSE_FALSE = new byte[]{0};
@ -91,13 +91,6 @@ public class LLUtils {
public static final AtomicBoolean hookRegistered = new AtomicBoolean(); public static final AtomicBoolean hookRegistered = new AtomicBoolean();
static { static {
MemoryManager unsafeMemoryManager;
try {
unsafeMemoryManager = new UnsafeMemoryManager();
} catch (UnsupportedOperationException ignored) {
unsafeMemoryManager = new ByteBufferMemoryManager();
}
UNSAFE_MEMORY_MANAGER = unsafeMemoryManager;
for (int i1 = 0; i1 < 256; i1++) { for (int i1 = 0; i1 < 256; i1++) {
var b = LEXICONOGRAPHIC_ITERATION_SEEKS[i1]; var b = LEXICONOGRAPHIC_ITERATION_SEEKS[i1];
b[0] = (byte) i1; b[0] = (byte) i1;
@ -493,13 +486,17 @@ public class LLUtils {
*/ */
@Nullable @Nullable
public static Buffer readNullableDirectNioBuffer(BufferAllocator alloc, ToIntFunction<ByteBuffer> reader) { public static Buffer readNullableDirectNioBuffer(BufferAllocator alloc, ToIntFunction<ByteBuffer> reader) {
var directBuffer = allocateShared(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES); if (alloc.getAllocationType() != OFF_HEAP) {
throw new UnsupportedOperationException("Allocator type is not direct: " + alloc);
}
var directBuffer = alloc.allocate(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES);
try {
assert directBuffer.readerOffset() == 0; assert directBuffer.readerOffset() == 0;
assert directBuffer.writerOffset() == 0; assert directBuffer.writerOffset() == 0;
var directBufferWriter = ((WritableComponent) directBuffer).writableBuffer(); var directBufferWriter = ((WritableComponent) directBuffer).writableBuffer();
assert directBufferWriter.position() == 0; assert directBufferWriter.position() == 0;
assert directBufferWriter.capacity() >= directBuffer.capacity();
assert directBufferWriter.isDirect(); assert directBufferWriter.isDirect();
try {
int trueSize = reader.applyAsInt(directBufferWriter); int trueSize = reader.applyAsInt(directBufferWriter);
if (trueSize == RocksDB.NOT_FOUND) { if (trueSize == RocksDB.NOT_FOUND) {
directBuffer.close(); directBuffer.close();
@ -728,58 +725,18 @@ public class LLUtils {
return ByteBuffer.allocateDirect(size); return ByteBuffer.allocateDirect(size);
} }
/** private static Drop<Buffer> drop() {
* The returned object will be also of type {@link WritableComponent} {@link ReadableComponent} // We cannot reliably drop unsafe memory. We have to rely on the cleaner to do that.
*/ return Statics.NO_OP_DROP;
public static Buffer allocateShared(int size) {
return LLUtils.UNSAFE_MEMORY_MANAGER.allocateShared(NO_OP_ALLOCATION_CONTROL, size, Statics.NO_OP_DROP, OFF_HEAP);
} }
/** public static boolean isReadOnlyDirect(Buffer inputBuffer) {
* Get the internal byte buffer, if present return inputBuffer.isDirect() && inputBuffer instanceof ReadableComponent;
*/
@Nullable
public static ByteBuffer asReadOnlyDirect(Buffer inputBuffer) {
var bytes = inputBuffer.readableBytes();
if (bytes == 0) {
return EMPTY_BYTE_BUFFER;
}
if (inputBuffer instanceof ReadableComponent rc) {
var componentBuffer = rc.readableBuffer();
if (componentBuffer != null && componentBuffer.isDirect()) {
assert componentBuffer.isReadOnly();
assert componentBuffer.isDirect();
return componentBuffer;
}
} else if (inputBuffer.countReadableComponents() == 1) {
AtomicReference<ByteBuffer> bufferRef = new AtomicReference<>();
inputBuffer.forEachReadable(0, (index, comp) -> {
var compBuffer = comp.readableBuffer();
if (compBuffer != null && compBuffer.isDirect()) {
bufferRef.setPlain(compBuffer);
}
return false;
});
var buffer = bufferRef.getPlain();
if (buffer != null) {
assert buffer.isReadOnly();
assert buffer.isDirect();
return buffer;
}
} }
return null; public static ByteBuffer getReadOnlyDirect(Buffer inputBuffer) {
} assert isReadOnlyDirect(inputBuffer);
return ((ReadableComponent) inputBuffer).readableBuffer();
/**
* Copy the buffer into a newly allocated direct buffer
*/
@NotNull
public static ByteBuffer copyToNewDirectBuffer(Buffer inputBuffer) {
int bytes = inputBuffer.readableBytes();
var directBuffer = ByteBuffer.allocateDirect(bytes);
inputBuffer.copyInto(inputBuffer.readerOffset(), directBuffer, 0, bytes);
return directBuffer.asReadOnlyBuffer();
} }
public static Buffer fromByteArray(BufferAllocator alloc, byte[] array) { public static Buffer fromByteArray(BufferAllocator alloc, byte[] array) {

View File

@ -9,10 +9,14 @@ import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.MeterRegistry;
import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.BufferAllocator; import io.netty5.buffer.api.BufferAllocator;
import io.netty5.buffer.api.DefaultBufferAllocators;
import io.netty5.buffer.api.MemoryManager;
import io.netty5.buffer.api.ReadableComponent;
import io.netty5.buffer.api.WritableComponent; import io.netty5.buffer.api.WritableComponent;
import io.netty5.util.internal.PlatformDependent; import io.netty5.util.internal.PlatformDependent;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.RepeatedElementList; import it.cavallium.dbengine.database.RepeatedElementList;
import it.cavallium.dbengine.lucene.DirectNIOFSDirectory;
import it.cavallium.dbengine.rpc.current.data.DatabaseOptions; import it.cavallium.dbengine.rpc.current.data.DatabaseOptions;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
@ -99,20 +103,26 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
} }
if (nettyDirect) { if (nettyDirect) {
// Get the key nio buffer to pass to RocksDB // Get the key nio buffer to pass to RocksDB
ByteBuffer keyNioBuffer = LLUtils.asReadOnlyDirect(key); ByteBuffer keyNioBuffer;
boolean mustCloseKey; boolean mustCloseKey;
if (keyNioBuffer == null) { {
mustCloseKey = true; if (!LLUtils.isReadOnlyDirect(key)) {
// If the nio buffer is not available, copy the netty buffer into a new direct buffer // If the nio buffer is not available, copy the netty buffer into a new direct buffer
keyNioBuffer = LLUtils.copyToNewDirectBuffer(key); mustCloseKey = true;
var directKey = DefaultBufferAllocators.offHeapAllocator().allocate(key.readableBytes());
key.copyInto(key.readerOffset(), directKey, 0, key.readableBytes());
key = directKey;
} else { } else {
assert keyNioBuffer.isDirect();
mustCloseKey = false; mustCloseKey = false;
} }
keyNioBuffer = ((ReadableComponent) key).readableBuffer();
assert keyNioBuffer.isDirect();
assert keyNioBuffer.limit() == key.readableBytes(); assert keyNioBuffer.limit() == key.readableBytes();
}
try { try {
// Create a direct result buffer because RocksDB works only with direct buffers // Create a direct result buffer because RocksDB works only with direct buffers
var resultBuffer = LLUtils.allocateShared(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES); var resultBuffer = alloc.allocate(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES);
try { try {
assert resultBuffer.readerOffset() == 0; assert resultBuffer.readerOffset() == 0;
assert resultBuffer.writerOffset() == 0; assert resultBuffer.writerOffset() == 0;
@ -170,7 +180,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
} }
} finally { } finally {
if (mustCloseKey) { if (mustCloseKey) {
PlatformDependent.freeDirectBuffer(keyNioBuffer); key.close();
} }
} }
} else { } else {
@ -219,36 +229,51 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
assert value.isAccessible(); assert value.isAccessible();
if (nettyDirect) { if (nettyDirect) {
// Get the key nio buffer to pass to RocksDB // Get the key nio buffer to pass to RocksDB
ByteBuffer keyNioBuffer = LLUtils.asReadOnlyDirect(key); ByteBuffer keyNioBuffer;
boolean mustCloseKey; boolean mustCloseKey;
if (keyNioBuffer == null) { {
mustCloseKey = true; if (!LLUtils.isReadOnlyDirect(key)) {
// If the nio buffer is not available, copy the netty buffer into a new direct buffer // If the nio buffer is not available, copy the netty buffer into a new direct buffer
keyNioBuffer = LLUtils.copyToNewDirectBuffer(key); mustCloseKey = true;
var directKey = DefaultBufferAllocators.offHeapAllocator().allocate(key.readableBytes());
key.copyInto(key.readerOffset(), directKey, 0, key.readableBytes());
key = directKey;
} else { } else {
mustCloseKey = false; mustCloseKey = false;
} }
keyNioBuffer = ((ReadableComponent) key).readableBuffer();
assert keyNioBuffer.isDirect();
assert keyNioBuffer.limit() == key.readableBytes();
}
try { try {
// Get the value nio buffer to pass to RocksDB // Get the value nio buffer to pass to RocksDB
ByteBuffer valueNioBuffer = LLUtils.asReadOnlyDirect(value); ByteBuffer valueNioBuffer;
boolean mustCloseValue; boolean mustCloseValue;
if (valueNioBuffer == null) { {
mustCloseValue = true; if (!LLUtils.isReadOnlyDirect(value)) {
// If the nio buffer is not available, copy the netty buffer into a new direct buffer // If the nio buffer is not available, copy the netty buffer into a new direct buffer
valueNioBuffer = LLUtils.copyToNewDirectBuffer(value); mustCloseValue = true;
var directValue = DefaultBufferAllocators.offHeapAllocator().allocate(value.readableBytes());
value.copyInto(value.readerOffset(), directValue, 0, value.readableBytes());
value = directValue;
} else { } else {
mustCloseValue = false; mustCloseValue = false;
} }
valueNioBuffer = ((ReadableComponent) value).readableBuffer();
assert valueNioBuffer.isDirect();
assert valueNioBuffer.limit() == value.readableBytes();
}
try { try {
db.put(cfh, writeOptions, keyNioBuffer, valueNioBuffer); db.put(cfh, writeOptions, keyNioBuffer, valueNioBuffer);
} finally { } finally {
if (mustCloseValue) { if (mustCloseValue) {
PlatformDependent.freeDirectBuffer(valueNioBuffer); value.close();
} }
} }
} finally { } finally {
if (mustCloseKey) { if (mustCloseKey) {
PlatformDependent.freeDirectBuffer(keyNioBuffer); key.close();
} }
} }
} else { } else {
@ -277,15 +302,22 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
} }
if (nettyDirect) { if (nettyDirect) {
// Get the key nio buffer to pass to RocksDB // Get the key nio buffer to pass to RocksDB
ByteBuffer keyNioBuffer = LLUtils.asReadOnlyDirect(key); ByteBuffer keyNioBuffer;
boolean mustCloseKey; boolean mustCloseKey;
if (keyNioBuffer == null) { {
mustCloseKey = true; if (!LLUtils.isReadOnlyDirect(key)) {
// If the nio buffer is not available, copy the netty buffer into a new direct buffer // If the nio buffer is not available, copy the netty buffer into a new direct buffer
keyNioBuffer = LLUtils.copyToNewDirectBuffer(key); mustCloseKey = true;
var directKey = DefaultBufferAllocators.offHeapAllocator().allocate(key.readableBytes());
key.copyInto(key.readerOffset(), directKey, 0, key.readableBytes());
key = directKey;
} else { } else {
mustCloseKey = false; mustCloseKey = false;
} }
keyNioBuffer = ((ReadableComponent) key).readableBuffer();
assert keyNioBuffer.isDirect();
assert keyNioBuffer.limit() == key.readableBytes();
}
try { try {
if (db.keyMayExist(cfh, keyNioBuffer)) { if (db.keyMayExist(cfh, keyNioBuffer)) {
int size = db.get(cfh, readOptions, keyNioBuffer.position(0), LLUtils.EMPTY_BYTE_BUFFER); int size = db.get(cfh, readOptions, keyNioBuffer.position(0), LLUtils.EMPTY_BYTE_BUFFER);
@ -295,7 +327,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
} }
} finally { } finally {
if (mustCloseKey) { if (mustCloseKey) {
PlatformDependent.freeDirectBuffer(keyNioBuffer); key.close();
} }
} }
} else { } else {
@ -332,20 +364,27 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
} }
if (nettyDirect) { if (nettyDirect) {
// Get the key nio buffer to pass to RocksDB // Get the key nio buffer to pass to RocksDB
ByteBuffer keyNioBuffer = LLUtils.asReadOnlyDirect(key); ByteBuffer keyNioBuffer;
boolean mustCloseKey; boolean mustCloseKey;
if (keyNioBuffer == null) { {
mustCloseKey = true; if (!LLUtils.isReadOnlyDirect(key)) {
// If the nio buffer is not available, copy the netty buffer into a new direct buffer // If the nio buffer is not available, copy the netty buffer into a new direct buffer
keyNioBuffer = LLUtils.copyToNewDirectBuffer(key); mustCloseKey = true;
var directKey = DefaultBufferAllocators.offHeapAllocator().allocate(key.readableBytes());
key.copyInto(key.readerOffset(), directKey, 0, key.readableBytes());
key = directKey;
} else { } else {
mustCloseKey = false; mustCloseKey = false;
} }
keyNioBuffer = ((ReadableComponent) key).readableBuffer();
assert keyNioBuffer.isDirect();
assert keyNioBuffer.limit() == key.readableBytes();
}
try { try {
db.delete(cfh, writeOptions, keyNioBuffer); db.delete(cfh, writeOptions, keyNioBuffer);
} finally { } finally {
if (mustCloseKey) { if (mustCloseKey) {
PlatformDependent.freeDirectBuffer(keyNioBuffer); key.close();
} }
} }
} else { } else {

View File

@ -2,8 +2,8 @@ package it.cavallium.dbengine.database.disk;
import static io.netty5.buffer.api.StandardAllocationTypes.OFF_HEAP; import static io.netty5.buffer.api.StandardAllocationTypes.OFF_HEAP;
import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB; import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
import static it.cavallium.dbengine.database.LLUtils.asReadOnlyDirect;
import static it.cavallium.dbengine.database.LLUtils.fromByteArray; import static it.cavallium.dbengine.database.LLUtils.fromByteArray;
import static it.cavallium.dbengine.database.LLUtils.isReadOnlyDirect;
import static it.cavallium.dbengine.database.LLUtils.toStringSafe; import static it.cavallium.dbengine.database.LLUtils.toStringSafe;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import static java.util.Objects.requireNonNullElse; import static java.util.Objects.requireNonNullElse;
@ -12,6 +12,7 @@ import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.Timer;
import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.BufferAllocator; import io.netty5.buffer.api.BufferAllocator;
import io.netty5.buffer.api.ReadableComponent;
import io.netty5.buffer.api.Send; import io.netty5.buffer.api.Send;
import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.client.BadBlock;
import it.cavallium.dbengine.database.ColumnUtils; import it.cavallium.dbengine.database.ColumnUtils;
@ -299,9 +300,9 @@ public class LLLocalDictionary implements LLDictionary {
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
readOpts.setFillCache(fillCache); readOpts.setFillCache(fillCache);
if (range.hasMin()) { if (range.hasMin()) {
var rangeMinInternalByteBuffer = asReadOnlyDirect(range.getMinUnsafe()); if (nettyDirect && isReadOnlyDirect(range.getMinUnsafe())) {
if (nettyDirect && rangeMinInternalByteBuffer != null) { readOpts.setIterateLowerBound(slice1 = new DirectSlice(
readOpts.setIterateLowerBound(slice1 = new DirectSlice(rangeMinInternalByteBuffer, ((ReadableComponent) range.getMinUnsafe()).readableBuffer(),
range.getMinUnsafe().readableBytes() range.getMinUnsafe().readableBytes()
)); ));
} else { } else {
@ -309,9 +310,9 @@ public class LLLocalDictionary implements LLDictionary {
} }
} }
if (range.hasMax()) { if (range.hasMax()) {
var rangeMaxInternalByteBuffer = asReadOnlyDirect(range.getMaxUnsafe()); if (nettyDirect && isReadOnlyDirect(range.getMaxUnsafe())) {
if (nettyDirect && rangeMaxInternalByteBuffer != null) { readOpts.setIterateUpperBound(slice2 = new DirectSlice(
readOpts.setIterateUpperBound(slice2 = new DirectSlice(rangeMaxInternalByteBuffer, ((ReadableComponent) range.getMaxUnsafe()).readableBuffer(),
range.getMaxUnsafe().readableBytes() range.getMaxUnsafe().readableBytes()
)); ));
} else { } else {
@ -320,9 +321,8 @@ public class LLLocalDictionary implements LLDictionary {
} }
try (RocksIterator rocksIterator = db.newIterator(readOpts)) { try (RocksIterator rocksIterator = db.newIterator(readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
var rangeMinInternalByteBuffer = asReadOnlyDirect(range.getMinUnsafe()); if (nettyDirect && isReadOnlyDirect(range.getMinUnsafe())) {
if (nettyDirect && rangeMinInternalByteBuffer != null) { rocksIterator.seek(((ReadableComponent) range.getMinUnsafe()).readableBuffer());
rocksIterator.seek(rangeMinInternalByteBuffer);
} else { } else {
rocksIterator.seek(LLUtils.toArray(range.getMinUnsafe())); rocksIterator.seek(LLUtils.toArray(range.getMinUnsafe()));
} }
@ -1226,8 +1226,8 @@ public class LLLocalDictionary implements LLDictionary {
@Nullable @Nullable
private static SafeCloseable rocksIterSeekTo(boolean allowNettyDirect, private static SafeCloseable rocksIterSeekTo(boolean allowNettyDirect,
RocksIterator rocksIterator, Buffer key) { RocksIterator rocksIterator, Buffer key) {
ByteBuffer keyInternalByteBuffer; if (allowNettyDirect && isReadOnlyDirect(key)) {
if (allowNettyDirect && (keyInternalByteBuffer = asReadOnlyDirect(key)) != null) { ByteBuffer keyInternalByteBuffer = ((ReadableComponent) key).readableBuffer();
assert keyInternalByteBuffer.position() == 0; assert keyInternalByteBuffer.position() == 0;
rocksIterator.seek(keyInternalByteBuffer); rocksIterator.seek(keyInternalByteBuffer);
// This is useful to retain the key buffer in memory and avoid deallocations // This is useful to retain the key buffer in memory and avoid deallocations
@ -1245,9 +1245,9 @@ public class LLLocalDictionary implements LLDictionary {
ReadOptions readOpts, IterateBound boundType, Buffer key) { ReadOptions readOpts, IterateBound boundType, Buffer key) {
requireNonNull(key); requireNonNull(key);
AbstractSlice<?> slice; AbstractSlice<?> slice;
ByteBuffer keyInternalByteBuffer;
if (allowNettyDirect && LLLocalDictionary.USE_DIRECT_BUFFER_BOUNDS if (allowNettyDirect && LLLocalDictionary.USE_DIRECT_BUFFER_BOUNDS
&& (keyInternalByteBuffer = asReadOnlyDirect(key)) != null) { && (isReadOnlyDirect(key))) {
ByteBuffer keyInternalByteBuffer = ((ReadableComponent) key).readableBuffer();
assert keyInternalByteBuffer.position() == 0; assert keyInternalByteBuffer.position() == 0;
slice = new DirectSlice(keyInternalByteBuffer, key.readableBytes()); slice = new DirectSlice(keyInternalByteBuffer, key.readableBytes());
assert slice.size() == key.readableBytes(); assert slice.size() == key.readableBytes();

View File

@ -28,13 +28,14 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn {
var allocator = getAllocator(); var allocator = getAllocator();
try (var keyBuf = allocator.allocate(key.length)) { try (var keyBuf = allocator.allocate(key.length)) {
keyBuf.writeBytes(key); keyBuf.writeBytes(key);
var result = this.get(readOptions, keyBuf); try (var result = this.get(readOptions, keyBuf)) {
if (result == null) { if (result == null) {
return null; return null;
} }
return LLUtils.toArray(result); return LLUtils.toArray(result);
} }
} }
}
@Nullable @Nullable
Buffer get(@NotNull ReadOptions readOptions, Buffer key) throws RocksDBException; Buffer get(@NotNull ReadOptions readOptions, Buffer key) throws RocksDBException;

View File

@ -1,10 +1,11 @@
package org.rocksdb; package org.rocksdb;
import static it.cavallium.dbengine.database.LLUtils.asReadOnlyDirect;
import static it.cavallium.dbengine.database.LLUtils.isDirect; import static it.cavallium.dbengine.database.LLUtils.isDirect;
import static it.cavallium.dbengine.database.LLUtils.isReadOnlyDirect;
import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.BufferAllocator; import io.netty5.buffer.api.BufferAllocator;
import io.netty5.buffer.api.ReadableComponent;
import io.netty5.buffer.api.Send; import io.netty5.buffer.api.Send;
import io.netty5.util.internal.PlatformDependent; import io.netty5.util.internal.PlatformDependent;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
@ -107,11 +108,11 @@ public class CappedWriteBatch extends WriteBatch {
Send<Buffer> valueToReceive) throws RocksDBException { Send<Buffer> valueToReceive) throws RocksDBException {
var key = keyToReceive.receive(); var key = keyToReceive.receive();
var value = valueToReceive.receive(); var value = valueToReceive.receive();
ByteBuffer keyNioBuffer;
ByteBuffer valueNioBuffer;
if (USE_FAST_DIRECT_BUFFERS if (USE_FAST_DIRECT_BUFFERS
&& (keyNioBuffer = asReadOnlyDirect(key)) != null && (isReadOnlyDirect(key))
&& (valueNioBuffer = asReadOnlyDirect(value)) != null) { && (isReadOnlyDirect(value))) {
ByteBuffer keyNioBuffer = ((ReadableComponent) key).readableBuffer();
ByteBuffer valueNioBuffer = ((ReadableComponent) value).readableBuffer();
buffersToRelease.add(value); buffersToRelease.add(value);
buffersToRelease.add(key); buffersToRelease.add(key);
@ -169,8 +170,8 @@ public class CappedWriteBatch extends WriteBatch {
public synchronized void delete(ColumnFamilyHandle columnFamilyHandle, Send<Buffer> keyToReceive) throws RocksDBException { public synchronized void delete(ColumnFamilyHandle columnFamilyHandle, Send<Buffer> keyToReceive) throws RocksDBException {
var key = keyToReceive.receive(); var key = keyToReceive.receive();
ByteBuffer keyNioBuffer; if (USE_FAST_DIRECT_BUFFERS && isReadOnlyDirect(key)) {
if (USE_FAST_DIRECT_BUFFERS && (keyNioBuffer = asReadOnlyDirect(key)) != null) { ByteBuffer keyNioBuffer = ((ReadableComponent) key).readableBuffer();
buffersToRelease.add(key); buffersToRelease.add(key);
remove(columnFamilyHandle, keyNioBuffer); remove(columnFamilyHandle, keyNioBuffer);
} else { } else {

View File

@ -66,6 +66,15 @@ public abstract class TestSingletons {
.verifyComplete(); .verifyComplete();
} }
@Test
public void testCreateIntegerNoop() {
StepVerifier
.create(tempDb(getTempDbGenerator(), allocator, db -> tempInt(db, "test", 0)
.then()
))
.verifyComplete();
}
@Test @Test
public void testCreateLong() { public void testCreateLong() {
StepVerifier StepVerifier

View File

@ -11,10 +11,8 @@ import io.netty.incubator.codec.quic.InsecureQuicTokenHandler;
import io.netty.incubator.codec.quic.QuicConnectionIdGenerator; import io.netty.incubator.codec.quic.QuicConnectionIdGenerator;
import io.netty.incubator.codec.quic.QuicSslContext; import io.netty.incubator.codec.quic.QuicSslContext;
import io.netty.incubator.codec.quic.QuicSslContextBuilder; import io.netty.incubator.codec.quic.QuicSslContextBuilder;
import it.cavallium.data.generator.nativedata.NullableString;
import it.cavallium.dbengine.database.remote.RPCCodecs.RPCEventCodec; import it.cavallium.dbengine.database.remote.RPCCodecs.RPCEventCodec;
import it.cavallium.dbengine.rpc.current.data.Empty; import it.cavallium.dbengine.rpc.current.data.Empty;
import it.cavallium.dbengine.rpc.current.data.RPCCrash;
import it.cavallium.dbengine.rpc.current.data.RPCEvent; import it.cavallium.dbengine.rpc.current.data.RPCEvent;
import it.cavallium.dbengine.rpc.current.data.SingletonGet; import it.cavallium.dbengine.rpc.current.data.SingletonGet;
import it.cavallium.dbengine.rpc.current.data.nullables.NullableLLSnapshot; import it.cavallium.dbengine.rpc.current.data.nullables.NullableLLSnapshot;
@ -222,28 +220,36 @@ class QuicUtilsTest {
@Test @Test
void sendUpdateServerFail1() { void sendUpdateServerFail1() {
RPCEvent results = QuicUtils.<RPCEvent>sendUpdate(clientConn, assertThrows(RPCException.class,
() -> QuicUtils
.<RPCEvent>sendUpdate(clientConn,
RPCEventCodec::new, RPCEventCodec::new,
new SingletonGet(FAIL_IMMEDIATELY, NullableLLSnapshot.empty()), new SingletonGet(FAIL_IMMEDIATELY, NullableLLSnapshot.empty()),
serverData -> Mono.fromCallable(() -> { serverData -> Mono.fromCallable(() -> {
fail("Called update"); fail("Called update");
return new SingletonGet(NORMAL, NullableLLSnapshot.empty()); return new SingletonGet(NORMAL, NullableLLSnapshot.empty());
}) })
).blockOptional().orElseThrow(); )
assertEquals(RPCCrash.of(500, NullableString.of("Expected error")), results); .blockOptional()
.orElseThrow()
);
} }
@Test @Test
void sendUpdateServerFail2() { void sendUpdateServerFail2() {
RPCEvent results = QuicUtils.<RPCEvent>sendUpdate(clientConn, assertThrows(RPCException.class,
() -> QuicUtils
.<RPCEvent>sendUpdate(clientConn,
RPCEventCodec::new, RPCEventCodec::new,
new SingletonGet(NORMAL, NullableLLSnapshot.empty()), new SingletonGet(NORMAL, NullableLLSnapshot.empty()),
serverData -> Mono.fromCallable(() -> { serverData -> Mono.fromCallable(() -> {
assertEquals(Empty.of(), serverData); assertEquals(Empty.of(), serverData);
return new SingletonGet(FAIL_IMMEDIATELY, NullableLLSnapshot.empty()); return new SingletonGet(FAIL_IMMEDIATELY, NullableLLSnapshot.empty());
}) })
).blockOptional().orElseThrow(); )
assertEquals(RPCCrash.of(500, NullableString.of("Expected error")), results); .blockOptional()
.orElseThrow()
);
} }
@Test @Test
@ -265,12 +271,16 @@ class QuicUtilsTest {
@Test @Test
void sendFailedRequest() { void sendFailedRequest() {
RPCEvent response = QuicUtils.<RPCEvent, RPCEvent>sendSimpleRequest(clientConn, assertThrows(RPCException.class,
() -> QuicUtils
.<RPCEvent, RPCEvent>sendSimpleRequest(clientConn,
RPCEventCodec::new, RPCEventCodec::new,
RPCEventCodec::new, RPCEventCodec::new,
new SingletonGet(FAIL_IMMEDIATELY, NullableLLSnapshot.empty()) new SingletonGet(FAIL_IMMEDIATELY, NullableLLSnapshot.empty())
).blockOptional().orElseThrow(); )
assertEquals(RPCCrash.of(500, NullableString.of("Expected error")), response); .blockOptional()
.orElseThrow()
);
} }
@Test @Test