From ba3765eece812b651743a3cacb77000cc21b7229 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Wed, 16 Mar 2022 19:19:26 +0100 Subject: [PATCH] Improve direct buffer support --- .../cavallium/dbengine/database/LLUtils.java | 83 +++--------- .../database/disk/AbstractRocksDBColumn.java | 123 ++++++++++++------ .../database/disk/LLLocalDictionary.java | 28 ++-- .../dbengine/database/disk/RocksDBColumn.java | 9 +- .../java/org/rocksdb/CappedWriteBatch.java | 15 ++- .../it/cavallium/dbengine/TestSingletons.java | 9 ++ .../database/remote/QuicUtilsTest.java | 62 +++++---- 7 files changed, 173 insertions(+), 156 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 3c78125..d4dc729 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -9,6 +9,8 @@ import io.netty5.buffer.api.AllocatorControl; import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.BufferAllocator; import io.netty5.buffer.api.CompositeBuffer; +import io.netty5.buffer.api.DefaultBufferAllocators; +import io.netty5.buffer.api.Drop; import io.netty5.buffer.api.MemoryManager; import io.netty5.buffer.api.ReadableComponent; import io.netty5.buffer.api.Resource; @@ -80,8 +82,6 @@ public class LLUtils { public static final int INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES = 4096; public static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocateDirect(0).asReadOnlyBuffer(); - @Nullable - private static final MemoryManager UNSAFE_MEMORY_MANAGER; private static final AllocatorControl NO_OP_ALLOCATION_CONTROL = (AllocatorControl) BufferAllocator.offHeapUnpooled(); private static final byte[] RESPONSE_TRUE = new byte[]{1}; private static final byte[] RESPONSE_FALSE = new byte[]{0}; @@ -91,13 +91,6 @@ public class LLUtils { public static final AtomicBoolean hookRegistered = new AtomicBoolean(); static { - MemoryManager unsafeMemoryManager; - try { - unsafeMemoryManager = new UnsafeMemoryManager(); - } catch (UnsupportedOperationException ignored) { - unsafeMemoryManager = new ByteBufferMemoryManager(); - } - UNSAFE_MEMORY_MANAGER = unsafeMemoryManager; for (int i1 = 0; i1 < 256; i1++) { var b = LEXICONOGRAPHIC_ITERATION_SEEKS[i1]; b[0] = (byte) i1; @@ -493,13 +486,17 @@ public class LLUtils { */ @Nullable public static Buffer readNullableDirectNioBuffer(BufferAllocator alloc, ToIntFunction reader) { - var directBuffer = allocateShared(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES); - assert directBuffer.readerOffset() == 0; - assert directBuffer.writerOffset() == 0; - var directBufferWriter = ((WritableComponent) directBuffer).writableBuffer(); - assert directBufferWriter.position() == 0; - assert directBufferWriter.isDirect(); + if (alloc.getAllocationType() != OFF_HEAP) { + throw new UnsupportedOperationException("Allocator type is not direct: " + alloc); + } + var directBuffer = alloc.allocate(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES); try { + assert directBuffer.readerOffset() == 0; + assert directBuffer.writerOffset() == 0; + var directBufferWriter = ((WritableComponent) directBuffer).writableBuffer(); + assert directBufferWriter.position() == 0; + assert directBufferWriter.capacity() >= directBuffer.capacity(); + assert directBufferWriter.isDirect(); int trueSize = reader.applyAsInt(directBufferWriter); if (trueSize == RocksDB.NOT_FOUND) { directBuffer.close(); @@ -728,58 +725,18 @@ public class LLUtils { return ByteBuffer.allocateDirect(size); } - /** - * The returned object will be also of type {@link WritableComponent} {@link ReadableComponent} - */ - public static Buffer allocateShared(int size) { - return LLUtils.UNSAFE_MEMORY_MANAGER.allocateShared(NO_OP_ALLOCATION_CONTROL, size, Statics.NO_OP_DROP, OFF_HEAP); + private static Drop drop() { + // We cannot reliably drop unsafe memory. We have to rely on the cleaner to do that. + return Statics.NO_OP_DROP; } - /** - * Get the internal byte buffer, if present - */ - @Nullable - public static ByteBuffer asReadOnlyDirect(Buffer inputBuffer) { - var bytes = inputBuffer.readableBytes(); - if (bytes == 0) { - return EMPTY_BYTE_BUFFER; - } - if (inputBuffer instanceof ReadableComponent rc) { - var componentBuffer = rc.readableBuffer(); - if (componentBuffer != null && componentBuffer.isDirect()) { - assert componentBuffer.isReadOnly(); - assert componentBuffer.isDirect(); - return componentBuffer; - } - } else if (inputBuffer.countReadableComponents() == 1) { - AtomicReference bufferRef = new AtomicReference<>(); - inputBuffer.forEachReadable(0, (index, comp) -> { - var compBuffer = comp.readableBuffer(); - if (compBuffer != null && compBuffer.isDirect()) { - bufferRef.setPlain(compBuffer); - } - return false; - }); - var buffer = bufferRef.getPlain(); - if (buffer != null) { - assert buffer.isReadOnly(); - assert buffer.isDirect(); - return buffer; - } - } - - return null; + public static boolean isReadOnlyDirect(Buffer inputBuffer) { + return inputBuffer.isDirect() && inputBuffer instanceof ReadableComponent; } - /** - * Copy the buffer into a newly allocated direct buffer - */ - @NotNull - public static ByteBuffer copyToNewDirectBuffer(Buffer inputBuffer) { - int bytes = inputBuffer.readableBytes(); - var directBuffer = ByteBuffer.allocateDirect(bytes); - inputBuffer.copyInto(inputBuffer.readerOffset(), directBuffer, 0, bytes); - return directBuffer.asReadOnlyBuffer(); + public static ByteBuffer getReadOnlyDirect(Buffer inputBuffer) { + assert isReadOnlyDirect(inputBuffer); + return ((ReadableComponent) inputBuffer).readableBuffer(); } public static Buffer fromByteArray(BufferAllocator alloc, byte[] array) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java index 34d551f..6cf3a5d 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java @@ -9,10 +9,14 @@ import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.MeterRegistry; import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.BufferAllocator; +import io.netty5.buffer.api.DefaultBufferAllocators; +import io.netty5.buffer.api.MemoryManager; +import io.netty5.buffer.api.ReadableComponent; import io.netty5.buffer.api.WritableComponent; import io.netty5.util.internal.PlatformDependent; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.RepeatedElementList; +import it.cavallium.dbengine.lucene.DirectNIOFSDirectory; import it.cavallium.dbengine.rpc.current.data.DatabaseOptions; import java.nio.ByteBuffer; import java.util.List; @@ -99,20 +103,26 @@ public sealed abstract class AbstractRocksDBColumn implements } if (nettyDirect) { // Get the key nio buffer to pass to RocksDB - ByteBuffer keyNioBuffer = LLUtils.asReadOnlyDirect(key); + ByteBuffer keyNioBuffer; boolean mustCloseKey; - if (keyNioBuffer == null) { - mustCloseKey = true; - // If the nio buffer is not available, copy the netty buffer into a new direct buffer - keyNioBuffer = LLUtils.copyToNewDirectBuffer(key); - } else { + { + if (!LLUtils.isReadOnlyDirect(key)) { + // If the nio buffer is not available, copy the netty buffer into a new direct buffer + mustCloseKey = true; + var directKey = DefaultBufferAllocators.offHeapAllocator().allocate(key.readableBytes()); + key.copyInto(key.readerOffset(), directKey, 0, key.readableBytes()); + key = directKey; + } else { + mustCloseKey = false; + } + keyNioBuffer = ((ReadableComponent) key).readableBuffer(); assert keyNioBuffer.isDirect(); - mustCloseKey = false; + assert keyNioBuffer.limit() == key.readableBytes(); } - assert keyNioBuffer.limit() == key.readableBytes(); + try { // Create a direct result buffer because RocksDB works only with direct buffers - var resultBuffer = LLUtils.allocateShared(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES); + var resultBuffer = alloc.allocate(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES); try { assert resultBuffer.readerOffset() == 0; assert resultBuffer.writerOffset() == 0; @@ -170,7 +180,7 @@ public sealed abstract class AbstractRocksDBColumn implements } } finally { if (mustCloseKey) { - PlatformDependent.freeDirectBuffer(keyNioBuffer); + key.close(); } } } else { @@ -219,36 +229,51 @@ public sealed abstract class AbstractRocksDBColumn implements assert value.isAccessible(); if (nettyDirect) { // Get the key nio buffer to pass to RocksDB - ByteBuffer keyNioBuffer = LLUtils.asReadOnlyDirect(key); + ByteBuffer keyNioBuffer; boolean mustCloseKey; - if (keyNioBuffer == null) { - mustCloseKey = true; - // If the nio buffer is not available, copy the netty buffer into a new direct buffer - keyNioBuffer = LLUtils.copyToNewDirectBuffer(key); - } else { - mustCloseKey = false; + { + if (!LLUtils.isReadOnlyDirect(key)) { + // If the nio buffer is not available, copy the netty buffer into a new direct buffer + mustCloseKey = true; + var directKey = DefaultBufferAllocators.offHeapAllocator().allocate(key.readableBytes()); + key.copyInto(key.readerOffset(), directKey, 0, key.readableBytes()); + key = directKey; + } else { + mustCloseKey = false; + } + keyNioBuffer = ((ReadableComponent) key).readableBuffer(); + assert keyNioBuffer.isDirect(); + assert keyNioBuffer.limit() == key.readableBytes(); } try { // Get the value nio buffer to pass to RocksDB - ByteBuffer valueNioBuffer = LLUtils.asReadOnlyDirect(value); + ByteBuffer valueNioBuffer; boolean mustCloseValue; - if (valueNioBuffer == null) { - mustCloseValue = true; - // If the nio buffer is not available, copy the netty buffer into a new direct buffer - valueNioBuffer = LLUtils.copyToNewDirectBuffer(value); - } else { - mustCloseValue = false; + { + if (!LLUtils.isReadOnlyDirect(value)) { + // If the nio buffer is not available, copy the netty buffer into a new direct buffer + mustCloseValue = true; + var directValue = DefaultBufferAllocators.offHeapAllocator().allocate(value.readableBytes()); + value.copyInto(value.readerOffset(), directValue, 0, value.readableBytes()); + value = directValue; + } else { + mustCloseValue = false; + } + valueNioBuffer = ((ReadableComponent) value).readableBuffer(); + assert valueNioBuffer.isDirect(); + assert valueNioBuffer.limit() == value.readableBytes(); } + try { db.put(cfh, writeOptions, keyNioBuffer, valueNioBuffer); } finally { if (mustCloseValue) { - PlatformDependent.freeDirectBuffer(valueNioBuffer); + value.close(); } } } finally { if (mustCloseKey) { - PlatformDependent.freeDirectBuffer(keyNioBuffer); + key.close(); } } } else { @@ -277,14 +302,21 @@ public sealed abstract class AbstractRocksDBColumn implements } if (nettyDirect) { // Get the key nio buffer to pass to RocksDB - ByteBuffer keyNioBuffer = LLUtils.asReadOnlyDirect(key); + ByteBuffer keyNioBuffer; boolean mustCloseKey; - if (keyNioBuffer == null) { - mustCloseKey = true; - // If the nio buffer is not available, copy the netty buffer into a new direct buffer - keyNioBuffer = LLUtils.copyToNewDirectBuffer(key); - } else { - mustCloseKey = false; + { + if (!LLUtils.isReadOnlyDirect(key)) { + // If the nio buffer is not available, copy the netty buffer into a new direct buffer + mustCloseKey = true; + var directKey = DefaultBufferAllocators.offHeapAllocator().allocate(key.readableBytes()); + key.copyInto(key.readerOffset(), directKey, 0, key.readableBytes()); + key = directKey; + } else { + mustCloseKey = false; + } + keyNioBuffer = ((ReadableComponent) key).readableBuffer(); + assert keyNioBuffer.isDirect(); + assert keyNioBuffer.limit() == key.readableBytes(); } try { if (db.keyMayExist(cfh, keyNioBuffer)) { @@ -295,7 +327,7 @@ public sealed abstract class AbstractRocksDBColumn implements } } finally { if (mustCloseKey) { - PlatformDependent.freeDirectBuffer(keyNioBuffer); + key.close(); } } } else { @@ -332,20 +364,27 @@ public sealed abstract class AbstractRocksDBColumn implements } if (nettyDirect) { // Get the key nio buffer to pass to RocksDB - ByteBuffer keyNioBuffer = LLUtils.asReadOnlyDirect(key); + ByteBuffer keyNioBuffer; boolean mustCloseKey; - if (keyNioBuffer == null) { - mustCloseKey = true; - // If the nio buffer is not available, copy the netty buffer into a new direct buffer - keyNioBuffer = LLUtils.copyToNewDirectBuffer(key); - } else { - mustCloseKey = false; + { + if (!LLUtils.isReadOnlyDirect(key)) { + // If the nio buffer is not available, copy the netty buffer into a new direct buffer + mustCloseKey = true; + var directKey = DefaultBufferAllocators.offHeapAllocator().allocate(key.readableBytes()); + key.copyInto(key.readerOffset(), directKey, 0, key.readableBytes()); + key = directKey; + } else { + mustCloseKey = false; + } + keyNioBuffer = ((ReadableComponent) key).readableBuffer(); + assert keyNioBuffer.isDirect(); + assert keyNioBuffer.limit() == key.readableBytes(); } try { db.delete(cfh, writeOptions, keyNioBuffer); } finally { if (mustCloseKey) { - PlatformDependent.freeDirectBuffer(keyNioBuffer); + key.close(); } } } else { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index f4bb958..80efce5 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -2,8 +2,8 @@ package it.cavallium.dbengine.database.disk; import static io.netty5.buffer.api.StandardAllocationTypes.OFF_HEAP; import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB; -import static it.cavallium.dbengine.database.LLUtils.asReadOnlyDirect; import static it.cavallium.dbengine.database.LLUtils.fromByteArray; +import static it.cavallium.dbengine.database.LLUtils.isReadOnlyDirect; import static it.cavallium.dbengine.database.LLUtils.toStringSafe; import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNullElse; @@ -12,6 +12,7 @@ import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Timer; import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.BufferAllocator; +import io.netty5.buffer.api.ReadableComponent; import io.netty5.buffer.api.Send; import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.database.ColumnUtils; @@ -299,9 +300,9 @@ public class LLLocalDictionary implements LLDictionary { readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); readOpts.setFillCache(fillCache); if (range.hasMin()) { - var rangeMinInternalByteBuffer = asReadOnlyDirect(range.getMinUnsafe()); - if (nettyDirect && rangeMinInternalByteBuffer != null) { - readOpts.setIterateLowerBound(slice1 = new DirectSlice(rangeMinInternalByteBuffer, + if (nettyDirect && isReadOnlyDirect(range.getMinUnsafe())) { + readOpts.setIterateLowerBound(slice1 = new DirectSlice( + ((ReadableComponent) range.getMinUnsafe()).readableBuffer(), range.getMinUnsafe().readableBytes() )); } else { @@ -309,9 +310,9 @@ public class LLLocalDictionary implements LLDictionary { } } if (range.hasMax()) { - var rangeMaxInternalByteBuffer = asReadOnlyDirect(range.getMaxUnsafe()); - if (nettyDirect && rangeMaxInternalByteBuffer != null) { - readOpts.setIterateUpperBound(slice2 = new DirectSlice(rangeMaxInternalByteBuffer, + if (nettyDirect && isReadOnlyDirect(range.getMaxUnsafe())) { + readOpts.setIterateUpperBound(slice2 = new DirectSlice( + ((ReadableComponent) range.getMaxUnsafe()).readableBuffer(), range.getMaxUnsafe().readableBytes() )); } else { @@ -320,9 +321,8 @@ public class LLLocalDictionary implements LLDictionary { } try (RocksIterator rocksIterator = db.newIterator(readOpts)) { if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - var rangeMinInternalByteBuffer = asReadOnlyDirect(range.getMinUnsafe()); - if (nettyDirect && rangeMinInternalByteBuffer != null) { - rocksIterator.seek(rangeMinInternalByteBuffer); + if (nettyDirect && isReadOnlyDirect(range.getMinUnsafe())) { + rocksIterator.seek(((ReadableComponent) range.getMinUnsafe()).readableBuffer()); } else { rocksIterator.seek(LLUtils.toArray(range.getMinUnsafe())); } @@ -1226,8 +1226,8 @@ public class LLLocalDictionary implements LLDictionary { @Nullable private static SafeCloseable rocksIterSeekTo(boolean allowNettyDirect, RocksIterator rocksIterator, Buffer key) { - ByteBuffer keyInternalByteBuffer; - if (allowNettyDirect && (keyInternalByteBuffer = asReadOnlyDirect(key)) != null) { + if (allowNettyDirect && isReadOnlyDirect(key)) { + ByteBuffer keyInternalByteBuffer = ((ReadableComponent) key).readableBuffer(); assert keyInternalByteBuffer.position() == 0; rocksIterator.seek(keyInternalByteBuffer); // This is useful to retain the key buffer in memory and avoid deallocations @@ -1245,9 +1245,9 @@ public class LLLocalDictionary implements LLDictionary { ReadOptions readOpts, IterateBound boundType, Buffer key) { requireNonNull(key); AbstractSlice slice; - ByteBuffer keyInternalByteBuffer; if (allowNettyDirect && LLLocalDictionary.USE_DIRECT_BUFFER_BOUNDS - && (keyInternalByteBuffer = asReadOnlyDirect(key)) != null) { + && (isReadOnlyDirect(key))) { + ByteBuffer keyInternalByteBuffer = ((ReadableComponent) key).readableBuffer(); assert keyInternalByteBuffer.position() == 0; slice = new DirectSlice(keyInternalByteBuffer, key.readableBytes()); assert slice.size() == key.readableBytes(); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java index 8e1072f..59c453a 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java @@ -28,11 +28,12 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn { var allocator = getAllocator(); try (var keyBuf = allocator.allocate(key.length)) { keyBuf.writeBytes(key); - var result = this.get(readOptions, keyBuf); - if (result == null) { - return null; + try (var result = this.get(readOptions, keyBuf)) { + if (result == null) { + return null; + } + return LLUtils.toArray(result); } - return LLUtils.toArray(result); } } diff --git a/src/main/java/org/rocksdb/CappedWriteBatch.java b/src/main/java/org/rocksdb/CappedWriteBatch.java index 70affa5..fbd4474 100644 --- a/src/main/java/org/rocksdb/CappedWriteBatch.java +++ b/src/main/java/org/rocksdb/CappedWriteBatch.java @@ -1,10 +1,11 @@ package org.rocksdb; -import static it.cavallium.dbengine.database.LLUtils.asReadOnlyDirect; import static it.cavallium.dbengine.database.LLUtils.isDirect; +import static it.cavallium.dbengine.database.LLUtils.isReadOnlyDirect; import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.BufferAllocator; +import io.netty5.buffer.api.ReadableComponent; import io.netty5.buffer.api.Send; import io.netty5.util.internal.PlatformDependent; import it.cavallium.dbengine.database.LLUtils; @@ -107,11 +108,11 @@ public class CappedWriteBatch extends WriteBatch { Send valueToReceive) throws RocksDBException { var key = keyToReceive.receive(); var value = valueToReceive.receive(); - ByteBuffer keyNioBuffer; - ByteBuffer valueNioBuffer; if (USE_FAST_DIRECT_BUFFERS - && (keyNioBuffer = asReadOnlyDirect(key)) != null - && (valueNioBuffer = asReadOnlyDirect(value)) != null) { + && (isReadOnlyDirect(key)) + && (isReadOnlyDirect(value))) { + ByteBuffer keyNioBuffer = ((ReadableComponent) key).readableBuffer(); + ByteBuffer valueNioBuffer = ((ReadableComponent) value).readableBuffer(); buffersToRelease.add(value); buffersToRelease.add(key); @@ -169,8 +170,8 @@ public class CappedWriteBatch extends WriteBatch { public synchronized void delete(ColumnFamilyHandle columnFamilyHandle, Send keyToReceive) throws RocksDBException { var key = keyToReceive.receive(); - ByteBuffer keyNioBuffer; - if (USE_FAST_DIRECT_BUFFERS && (keyNioBuffer = asReadOnlyDirect(key)) != null) { + if (USE_FAST_DIRECT_BUFFERS && isReadOnlyDirect(key)) { + ByteBuffer keyNioBuffer = ((ReadableComponent) key).readableBuffer(); buffersToRelease.add(key); remove(columnFamilyHandle, keyNioBuffer); } else { diff --git a/src/test/java/it/cavallium/dbengine/TestSingletons.java b/src/test/java/it/cavallium/dbengine/TestSingletons.java index 90a406a..2f91e1f 100644 --- a/src/test/java/it/cavallium/dbengine/TestSingletons.java +++ b/src/test/java/it/cavallium/dbengine/TestSingletons.java @@ -66,6 +66,15 @@ public abstract class TestSingletons { .verifyComplete(); } + @Test + public void testCreateIntegerNoop() { + StepVerifier + .create(tempDb(getTempDbGenerator(), allocator, db -> tempInt(db, "test", 0) + .then() + )) + .verifyComplete(); + } + @Test public void testCreateLong() { StepVerifier diff --git a/src/test/java/it/cavallium/dbengine/database/remote/QuicUtilsTest.java b/src/test/java/it/cavallium/dbengine/database/remote/QuicUtilsTest.java index 36ce716..2c2c59b 100644 --- a/src/test/java/it/cavallium/dbengine/database/remote/QuicUtilsTest.java +++ b/src/test/java/it/cavallium/dbengine/database/remote/QuicUtilsTest.java @@ -11,10 +11,8 @@ import io.netty.incubator.codec.quic.InsecureQuicTokenHandler; import io.netty.incubator.codec.quic.QuicConnectionIdGenerator; import io.netty.incubator.codec.quic.QuicSslContext; import io.netty.incubator.codec.quic.QuicSslContextBuilder; -import it.cavallium.data.generator.nativedata.NullableString; import it.cavallium.dbengine.database.remote.RPCCodecs.RPCEventCodec; import it.cavallium.dbengine.rpc.current.data.Empty; -import it.cavallium.dbengine.rpc.current.data.RPCCrash; import it.cavallium.dbengine.rpc.current.data.RPCEvent; import it.cavallium.dbengine.rpc.current.data.SingletonGet; import it.cavallium.dbengine.rpc.current.data.nullables.NullableLLSnapshot; @@ -222,28 +220,36 @@ class QuicUtilsTest { @Test void sendUpdateServerFail1() { - RPCEvent results = QuicUtils.sendUpdate(clientConn, - RPCEventCodec::new, - new SingletonGet(FAIL_IMMEDIATELY, NullableLLSnapshot.empty()), - serverData -> Mono.fromCallable(() -> { - fail("Called update"); - return new SingletonGet(NORMAL, NullableLLSnapshot.empty()); - }) - ).blockOptional().orElseThrow(); - assertEquals(RPCCrash.of(500, NullableString.of("Expected error")), results); + assertThrows(RPCException.class, + () -> QuicUtils + .sendUpdate(clientConn, + RPCEventCodec::new, + new SingletonGet(FAIL_IMMEDIATELY, NullableLLSnapshot.empty()), + serverData -> Mono.fromCallable(() -> { + fail("Called update"); + return new SingletonGet(NORMAL, NullableLLSnapshot.empty()); + }) + ) + .blockOptional() + .orElseThrow() + ); } @Test void sendUpdateServerFail2() { - RPCEvent results = QuicUtils.sendUpdate(clientConn, - RPCEventCodec::new, - new SingletonGet(NORMAL, NullableLLSnapshot.empty()), - serverData -> Mono.fromCallable(() -> { - assertEquals(Empty.of(), serverData); - return new SingletonGet(FAIL_IMMEDIATELY, NullableLLSnapshot.empty()); - }) - ).blockOptional().orElseThrow(); - assertEquals(RPCCrash.of(500, NullableString.of("Expected error")), results); + assertThrows(RPCException.class, + () -> QuicUtils + .sendUpdate(clientConn, + RPCEventCodec::new, + new SingletonGet(NORMAL, NullableLLSnapshot.empty()), + serverData -> Mono.fromCallable(() -> { + assertEquals(Empty.of(), serverData); + return new SingletonGet(FAIL_IMMEDIATELY, NullableLLSnapshot.empty()); + }) + ) + .blockOptional() + .orElseThrow() + ); } @Test @@ -265,12 +271,16 @@ class QuicUtilsTest { @Test void sendFailedRequest() { - RPCEvent response = QuicUtils.sendSimpleRequest(clientConn, - RPCEventCodec::new, - RPCEventCodec::new, - new SingletonGet(FAIL_IMMEDIATELY, NullableLLSnapshot.empty()) - ).blockOptional().orElseThrow(); - assertEquals(RPCCrash.of(500, NullableString.of("Expected error")), response); + assertThrows(RPCException.class, + () -> QuicUtils + .sendSimpleRequest(clientConn, + RPCEventCodec::new, + RPCEventCodec::new, + new SingletonGet(FAIL_IMMEDIATELY, NullableLLSnapshot.empty()) + ) + .blockOptional() + .orElseThrow() + ); } @Test