From 29086b1939afeb2d8b5c1078007d30f7590abf51 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 23 Sep 2021 02:15:58 +0200 Subject: [PATCH] Add some tests for low level dictionary, log tests, pass more tests --- pom.xml | 12 +- .../dbengine/client/MappedSerializer.java | 4 +- .../client/MappedSerializerFixedLength.java | 4 +- .../cavallium/dbengine/database/LLEntry.java | 5 +- .../cavallium/dbengine/database/LLUtils.java | 103 +++--- .../database/collections/DatabaseEmpty.java | 6 +- .../collections/DatabaseMapDictionary.java | 103 +++--- .../DatabaseMapDictionaryDeep.java | 128 +++---- .../DatabaseMapDictionaryHashed.java | 3 +- .../collections/ValueWithHashSerializer.java | 13 +- .../collections/ValuesSetSerializer.java | 4 +- .../database/disk/LLLocalDictionary.java | 123 +++++-- .../database/disk/MemorySegmentUtils.java | 2 +- .../database/serialization/Serializer.java | 13 +- .../SerializerFixedBinaryLength.java | 25 +- .../dbengine/netty/NullableBuffer.java | 74 +++++ .../it/cavallium/dbengine/DbTestUtils.java | 33 ++ .../cavallium/dbengine/TestDictionaryMap.java | 103 ++++-- .../cavallium/dbengine/TestLLDictionary.java | 311 ++++++++++++++++++ .../dbengine/TestLLDictionaryLeaks.java | 63 +--- .../dbengine/TestLocalLLDictionary.java | 11 + .../dbengine/TestMemoryLLDictionary.java | 11 + src/test/resources/log4j2.xml | 20 ++ 23 files changed, 876 insertions(+), 298 deletions(-) create mode 100644 src/main/java/it/cavallium/dbengine/netty/NullableBuffer.java create mode 100644 src/test/java/it/cavallium/dbengine/TestLLDictionary.java create mode 100644 src/test/java/it/cavallium/dbengine/TestLocalLLDictionary.java create mode 100644 src/test/java/it/cavallium/dbengine/TestMemoryLLDictionary.java create mode 100644 src/test/resources/log4j2.xml diff --git a/pom.xml b/pom.xml index ec68272..8aabc35 100644 --- a/pom.xml +++ b/pom.xml @@ -172,6 +172,11 @@ log4j-slf4j-impl test + + com.lmax + disruptor + 3.3.4 + org.rocksdb rocksdbjni @@ -352,6 +357,11 @@ log4j-slf4j-impl 2.14.1 + + com.lmax + disruptor + test + org.rocksdb rocksdbjni @@ -542,7 +552,7 @@ - --enable-preview --add-modules jdk.incubator.foreign -Dforeign.restricted=permit --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED --enable-native-access + --enable-preview --add-modules jdk.incubator.foreign -Dforeign.restricted=permit --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED --enable-native-access=ALL-UNNAMED ci diff --git a/src/main/java/it/cavallium/dbengine/client/MappedSerializer.java b/src/main/java/it/cavallium/dbengine/client/MappedSerializer.java index f29c5ff..5fcf3a4 100644 --- a/src/main/java/it/cavallium/dbengine/client/MappedSerializer.java +++ b/src/main/java/it/cavallium/dbengine/client/MappedSerializer.java @@ -19,7 +19,7 @@ public class MappedSerializer implements Serializer { } @Override - public @NotNull DeserializationResult deserialize(@Nullable Send serialized) throws SerializationException { + public @NotNull DeserializationResult deserialize(@NotNull Send serialized) throws SerializationException { try (serialized) { var deserialized = serializer.deserialize(serialized); return new DeserializationResult<>(keyMapper.map(deserialized.deserializedData()), deserialized.bytesRead()); @@ -27,7 +27,7 @@ public class MappedSerializer implements Serializer { } @Override - public @Nullable Send serialize(@NotNull B deserialized) throws SerializationException { + public @NotNull Send serialize(@NotNull B deserialized) throws SerializationException { return serializer.serialize(keyMapper.unmap(deserialized)); } } diff --git a/src/main/java/it/cavallium/dbengine/client/MappedSerializerFixedLength.java b/src/main/java/it/cavallium/dbengine/client/MappedSerializerFixedLength.java index 2d970f8..e1caacb 100644 --- a/src/main/java/it/cavallium/dbengine/client/MappedSerializerFixedLength.java +++ b/src/main/java/it/cavallium/dbengine/client/MappedSerializerFixedLength.java @@ -19,7 +19,7 @@ public class MappedSerializerFixedLength implements SerializerFixedBinaryL } @Override - public @NotNull DeserializationResult deserialize(@Nullable Send serialized) throws SerializationException { + public @NotNull DeserializationResult deserialize(@NotNull Send serialized) throws SerializationException { try (serialized) { var deserialized = fixedLengthSerializer.deserialize(serialized); return new DeserializationResult<>(keyMapper.map(deserialized.deserializedData()), deserialized.bytesRead()); @@ -27,7 +27,7 @@ public class MappedSerializerFixedLength implements SerializerFixedBinaryL } @Override - public @Nullable Send serialize(@NotNull B deserialized) throws SerializationException { + public @NotNull Send serialize(@NotNull B deserialized) throws SerializationException { return fixedLengthSerializer.serialize(keyMapper.unmap(deserialized)); } diff --git a/src/main/java/it/cavallium/dbengine/database/LLEntry.java b/src/main/java/it/cavallium/dbengine/database/LLEntry.java index 71c0318..e93552a 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLEntry.java +++ b/src/main/java/it/cavallium/dbengine/database/LLEntry.java @@ -7,6 +7,7 @@ import io.net5.buffer.api.Send; import io.net5.buffer.api.internal.ResourceSupport; import java.util.StringJoiner; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; public class LLEntry extends ResourceSupport { @NotNull @@ -14,7 +15,7 @@ public class LLEntry extends ResourceSupport { @NotNull private final Buffer value; - private LLEntry(Send key, Send value, Drop drop) { + private LLEntry(@NotNull Send key, @NotNull Send value, Drop drop) { super(new LLEntry.CloseOnDrop(drop)); this.key = key.receive().makeReadOnly(); this.value = value.receive().makeReadOnly(); @@ -29,7 +30,7 @@ public class LLEntry extends ResourceSupport { return true; } - public static LLEntry of(Send key, Send value) { + public static LLEntry of(@NotNull Send key, @NotNull Send value) { return new LLEntry(key, value, d -> {}); } diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 1cd7699..c251e19 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -1,5 +1,7 @@ package it.cavallium.dbengine.database; +import static org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY; + import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; import io.net5.buffer.api.Buffer; @@ -10,30 +12,25 @@ import io.net5.buffer.api.Send; import io.net5.util.IllegalReferenceCountException; import io.net5.util.internal.PlatformDependent; import it.cavallium.dbengine.database.collections.DatabaseStage; -import it.cavallium.dbengine.database.disk.LLIndexSearcher; -import it.cavallium.dbengine.database.disk.LLLocalLuceneIndex; import it.cavallium.dbengine.database.disk.MemorySegmentUtils; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.lucene.RandomSortField; -import it.cavallium.dbengine.lucene.analyzer.WordAnalyzer; -import it.cavallium.dbengine.lucene.searcher.LocalQueryParams; import java.nio.ByteBuffer; import java.nio.charset.Charset; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; -import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.function.ToIntFunction; -import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.FloatPoint; @@ -42,24 +39,13 @@ import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.document.StringField; import org.apache.lucene.document.TextField; -import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.Term; -import it.cavallium.dbengine.lucene.mlt.MultiMoreLikeThis; -import org.apache.lucene.search.BooleanClause.Occur; -import org.apache.lucene.search.BooleanQuery; -import org.apache.lucene.search.ConstantScoreQuery; -import org.apache.lucene.search.MatchAllDocsQuery; -import org.apache.lucene.search.MatchNoDocsQuery; -import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortField; import org.apache.lucene.search.SortedNumericSortField; -import org.apache.lucene.search.similarities.ClassicSimilarity; -import org.apache.lucene.search.similarities.Similarity; -import org.apache.lucene.search.similarities.TFIDFSimilarity; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.rocksdb.RocksDB; @@ -195,9 +181,9 @@ public class LLUtils { return new it.cavallium.dbengine.database.LLKeyScore(hit.docId(), hit.score(), hit.key()); } - public static String toStringSafe(Buffer key) { + public static String toStringSafe(@Nullable Buffer key) { try { - if (key.isAccessible()) { + if (key == null || key.isAccessible()) { return toString(key); } else { return "(released)"; @@ -207,7 +193,7 @@ public class LLUtils { } } - public static String toString(Buffer key) { + public static String toString(@Nullable Buffer key) { if (key == null) { return "null"; } else { @@ -217,20 +203,35 @@ public class LLUtils { if (iMax <= -1) { return "[]"; } else { - StringBuilder b = new StringBuilder(); - b.append('['); + StringBuilder arraySB = new StringBuilder(); + StringBuilder asciiSB = new StringBuilder(); + boolean isAscii = true; + arraySB.append('['); int i = 0; while (true) { - b.append(key.getByte(startIndex + i)); + var byteVal = key.getUnsignedByte(startIndex + i); + arraySB.append(byteVal); + if (isAscii) { + if (byteVal >= 32 && byteVal < 127) { + asciiSB.append((char) byteVal); + } else { + isAscii = false; + asciiSB = null; + } + } if (i == iLimit) { - b.append("…"); + arraySB.append("…"); } if (i == iMax || i == iLimit) { - return b.append(']').toString(); + if (isAscii) { + return asciiSB.insert(0, "\"").append("\"").toString(); + } else { + return arraySB.append(']').toString(); + } } - b.append(", "); + arraySB.append(", "); ++i; } } @@ -279,7 +280,10 @@ public class LLUtils { return true; } - public static byte[] toArray(Buffer key) { + public static byte[] toArray(@Nullable Buffer key) { + if (key == null) { + return EMPTY_BYTE_ARRAY; + } byte[] array = new byte[key.readableBytes()]; key.copyInto(key.readerOffset(), array, 0, key.readableBytes()); return array; @@ -355,7 +359,6 @@ public class LLUtils { PlatformDependent.freeDirectBuffer(directBuffer); directBuffer = null; } - buffer.close(); } } @@ -445,6 +448,22 @@ public class LLUtils { return true; } + public static Send empty(BufferAllocator allocator) { + try (var empty = CompositeBuffer.compose(allocator)) { + assert empty.readableBytes() == 0; + assert empty.capacity() == 0; + return empty.send(); + } + } + + public static Send copy(BufferAllocator allocator, Buffer buf) { + if (CompositeBuffer.isComposite(buf) && buf.capacity() == 0) { + return empty(allocator); + } else { + return buf.copy().send(); + } + } + public static record DirectBuffer(@NotNull Send buffer, @NotNull ByteBuffer byteBuffer) {} @NotNull @@ -485,18 +504,23 @@ public class LLUtils { ); } assert buffer.isAccessible(); + buffer.compact(); + assert buffer.readerOffset() == 0; AtomicLong nativeAddress = new AtomicLong(0); if (buffer.countComponents() == 1) { if (writable) { if (buffer.countWritableComponents() == 1) { buffer.forEachWritable(0, (i, c) -> { + assert c.writableNativeAddress() != 0; nativeAddress.setPlain(c.writableNativeAddress()); return false; }); } } else { - if (buffer.countReadableComponents() == 1) { + var readableComponents = buffer.countReadableComponents(); + if (readableComponents == 1) { buffer.forEachReadable(0, (i, c) -> { + assert c.readableNativeAddress() != 0; nativeAddress.setPlain(c.readableNativeAddress()); return false; }); @@ -512,7 +536,7 @@ public class LLUtils { } throw new IllegalStateException("Buffer is not direct"); } - return MemorySegmentUtils.directBuffer(nativeAddress.getPlain(), buffer.capacity()); + return MemorySegmentUtils.directBuffer(nativeAddress.getPlain(), writable ? buffer.capacity() : buffer.writerOffset()); } public static Buffer fromByteArray(BufferAllocator alloc, byte[] array) { @@ -534,22 +558,21 @@ public class LLUtils { return buffer.receive(); } - public static Buffer compositeBuffer(BufferAllocator alloc, Send buffer1, Send buffer2) { + @NotNull + public static Buffer compositeBuffer(BufferAllocator alloc, + @NotNull Send buffer1, + @NotNull Send buffer2) { return CompositeBuffer.compose(alloc, buffer1, buffer2); } + @NotNull public static Buffer compositeBuffer(BufferAllocator alloc, - Send buffer1, - Send buffer2, - Send buffer3) { + @NotNull Send buffer1, + @NotNull Send buffer2, + @NotNull Send buffer3) { return CompositeBuffer.compose(alloc, buffer1, buffer2, buffer3); } - @SafeVarargs - public static Buffer compositeBuffer(BufferAllocator alloc, Send... buffers) { - return CompositeBuffer.compose(alloc, buffers); - } - public static Mono resolveDelta(Mono> prev, UpdateReturnMode updateReturnMode) { return prev.handle((delta, sink) -> { switch (updateReturnMode) { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java index 0f11eb0..6769af6 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java @@ -2,8 +2,10 @@ package it.cavallium.dbengine.database.collections; import io.net5.buffer.api.Buffer; import io.net5.buffer.api.BufferAllocator; +import io.net5.buffer.api.CompositeBuffer; import io.net5.buffer.api.Send; import it.cavallium.dbengine.database.LLDictionary; +import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.Serializer.DeserializationResult; import org.jetbrains.annotations.NotNull; @@ -25,8 +27,8 @@ public class DatabaseEmpty { } @Override - public @Nullable Send serialize(@NotNull Nothing deserialized) { - return null; + public @NotNull Send serialize(@NotNull Nothing deserialized) { + return LLUtils.empty(bufferAllocator); } }; } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index 386326a..fd931c7 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -39,7 +40,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep valueSerializer; protected DatabaseMapDictionary(LLDictionary dictionary, - @Nullable Send prefixKey, + @NotNull Send prefixKey, SerializerFixedBinaryLength keySuffixSerializer, Serializer valueSerializer) { // Do not retain or release or use the prefixKey here @@ -50,7 +51,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep DatabaseMapDictionary simple(LLDictionary dictionary, SerializerFixedBinaryLength keySerializer, Serializer valueSerializer) { - return new DatabaseMapDictionary<>(dictionary, null, keySerializer, valueSerializer); + return new DatabaseMapDictionary<>(dictionary, LLUtils.empty(dictionary.getAllocator()), keySerializer, valueSerializer); } public static DatabaseMapDictionary tail(LLDictionary dictionary, @@ -60,13 +61,21 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep(dictionary, prefixKey, keySuffixSerializer, valueSerializer); } - private Buffer toKey(Send suffixKeyToSend) { - var suffixKey = suffixKeyToSend.receive(); - assert suffixKeyConsistency(suffixKey.readableBytes()); - if (keyPrefix != null) { - return LLUtils.compositeBuffer(dictionary.getAllocator(), keyPrefix.copy().send(), suffixKey.send()); - } else { - return suffixKey; + private Send toKey(Send suffixKeyToSend) { + try (var suffixKey = suffixKeyToSend.receive()) { + assert suffixKeyConsistency(suffixKey.readableBytes()); + if (keyPrefix.readableBytes() > 0) { + try (var result = LLUtils.compositeBuffer(dictionary.getAllocator(), + LLUtils.copy(dictionary.getAllocator(), keyPrefix), + suffixKey.send() + )) { + assert result.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength; + return result.send(); + } + } else { + assert suffixKey.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength; + return suffixKey.send(); + } } } @@ -84,7 +93,12 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep>handle((entrySend, sink) -> { try (var entry = entrySend.receive()) { - var key = deserializeSuffix(stripPrefix(entry.getKey())); + T key; + try (var serializedKey = entry.getKey().receive()) { + removePrefix(serializedKey); + suffixKeyConsistency(serializedKey.readableBytes()); + key = deserializeSuffix(serializedKey.send()); + } var value = valueSerializer.deserialize(entry.getValue()).deserializedData(); sink.next(Map.entry(key, value)); } catch (SerializationException ex) { @@ -103,7 +117,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep { try { - sink.next(LLEntry.of(this.toKey(serializeSuffix(entry.getKey()).send()).send(), + sink.next(LLEntry.of(this.toKey(serializeSuffix(entry.getKey())), valueSerializer.serialize(entry.getValue())).send()); } catch (SerializationException e) { sink.error(e); @@ -133,14 +147,14 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> at(@Nullable CompositeSnapshot snapshot, T keySuffix) { return Mono.fromCallable(() -> - new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix).send()).send(), valueSerializer)); + new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix)), valueSerializer)); } @Override public Mono getValue(@Nullable CompositeSnapshot snapshot, T keySuffix, boolean existsAlmostCertainly) { return dictionary .get(resolveSnapshot(snapshot), - Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send()), + Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix))), existsAlmostCertainly ) .handle((value, sink) -> deserializeValue(value, sink)); @@ -148,8 +162,8 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep putValue(T keySuffix, U value) { - var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send()); - var valueMono = Mono.fromCallable(() -> valueSerializer.serialize(value)); + var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix))).single(); + var valueMono = Mono.fromCallable(() -> valueSerializer.serialize(value)).single(); return dictionary .put(keyMono, valueMono, LLDictionaryResultType.VOID) .doOnNext(Send::close) @@ -166,7 +180,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep updater) { - var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send()); + var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix))); return dictionary .update(keyMono, getSerializedUpdater(updater), updateReturnMode, existsAlmostCertainly) .handle((value, sink) -> deserializeValue(value, sink)); @@ -176,7 +190,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> updateValueAndGetDelta(T keySuffix, boolean existsAlmostCertainly, SerializationFunction<@Nullable U, @Nullable U> updater) { - var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send()); + var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix))); return dictionary .updateAndGetDelta(keyMono, getSerializedUpdater(updater), existsAlmostCertainly) .transform(mono -> LLUtils.mapLLDelta(mono, @@ -224,7 +238,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep putValueAndGetPrevious(T keySuffix, U value) { - var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send()); + var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix))); var valueMono = Mono.fromCallable(() -> valueSerializer.serialize(value)); return dictionary .put(keyMono, @@ -235,7 +249,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep putValueAndGetChanged(T keySuffix, U value) { - var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send()); + var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix))); var valueMono = Mono.fromCallable(() -> valueSerializer.serialize(value)); return dictionary .put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE) @@ -246,7 +260,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep remove(T keySuffix) { - var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send()); + var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix))); return dictionary .remove(keyMono, LLDictionaryResultType.VOID) .doOnNext(Send::close) @@ -255,7 +269,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep removeAndGetPrevious(T keySuffix) { - var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send()); + var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix))); return dictionary .remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE) .handle((value, sink) -> deserializeValue(value, sink)); @@ -263,7 +277,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep removeAndGetStatus(T keySuffix) { - var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send()); + var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix))); return dictionary .remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE_EXISTENCE) .map(LLUtils::responseToBoolean); @@ -274,7 +288,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep>>handle((keySuffix, sink) -> { try { - sink.next(Tuples.of(keySuffix, toKey(serializeSuffix(keySuffix).send()).send())); + sink.next(Tuples.of(keySuffix, toKey(serializeSuffix(keySuffix)))); } catch (SerializationException ex) { sink.error(ex); } @@ -303,9 +317,10 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep serializeEntry(T key, U value) throws SerializationException { - try (var serializedKey = toKey(serializeSuffix(key).send())) { - try (var serializedValue = valueSerializer.serialize(value).receive()) { - return LLEntry.of(serializedKey.send(), serializedValue.send()).send(); + try (var serializedKey = toKey(serializeSuffix(key))) { + var serializedValueToReceive = valueSerializer.serialize(value); + try (var serializedValue = serializedValueToReceive.receive()) { + return LLEntry.of(serializedKey, serializedValue.send()).send(); } } } @@ -343,7 +358,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep, X>>handle((entry, sink) -> { try { - sink.next(Tuples.of(serializeSuffix(entry.getT1()).send(), entry.getT2())); + sink.next(Tuples.of(serializeSuffix(entry.getT1()), entry.getT2())); } catch (SerializationException ex) { sink.error(ex); } @@ -374,16 +389,18 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep>> getAllStages(@Nullable CompositeSnapshot snapshot) { return dictionary .getRangeKeys(resolveSnapshot(snapshot), rangeMono) - .handle((key, sink) -> { - try (key) { - try (var keySuffixWithExt = stripPrefix(key).receive()) { - sink.next(Map.entry(deserializeSuffix(keySuffixWithExt.copy().send()), - new DatabaseSingle<>(dictionary, - toKey(keySuffixWithExt.send()).send(), - valueSerializer - ) - )); - } + .handle((keyBufToReceive, sink) -> { + try (var keyBuf = keyBufToReceive.receive()) { + assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength; + // Remove prefix. Keep only the suffix and the ext + removePrefix(keyBuf); + suffixKeyConsistency(keyBuf.readableBytes()); + sink.next(Map.entry(deserializeSuffix(keyBuf.copy().send()), + new DatabaseSingle<>(dictionary, + toKey(keyBuf.send()), + valueSerializer + ) + )); } catch (SerializationException ex) { sink.error(ex); } @@ -396,8 +413,14 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep>handle((serializedEntryToReceive, sink) -> { try (var serializedEntry = serializedEntryToReceive.receive()) { - sink.next(Map.entry(deserializeSuffix(stripPrefix(serializedEntry.getKey())), - valueSerializer.deserialize(serializedEntry.getValue()).deserializedData())); + try (var keyBuf = serializedEntry.getKey().receive()) { + assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength; + // Remove prefix. Keep only the suffix and the ext + removePrefix(keyBuf); + suffixKeyConsistency(keyBuf.readableBytes()); + sink.next(Map.entry(deserializeSuffix(keyBuf.send()), + valueSerializer.deserialize(serializedEntry.getValue()).deserializedData())); + } } catch (SerializationException e) { sink.error(e); } @@ -418,7 +441,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep { try { - sink.next(LLEntry.of(toKey(serializeSuffix(entry.getKey()).send()).send(), + sink.next(LLEntry.of(toKey(serializeSuffix(entry.getKey())), valueSerializer.serialize(entry.getValue())).send()); } catch (SerializationException e) { sink.error(e); diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index 7dc589a..2dff876 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -2,7 +2,6 @@ package it.cavallium.dbengine.database.collections; import io.net5.buffer.api.Buffer; import io.net5.buffer.api.BufferAllocator; -import io.net5.buffer.api.CompositeBuffer; import io.net5.buffer.api.Resource; import io.net5.buffer.api.Send; import io.net5.util.IllegalReferenceCountException; @@ -18,6 +17,8 @@ import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -29,6 +30,7 @@ public class DatabaseMapDictionaryDeep> implem private final BufferAllocator alloc; protected final SubStageGetter subStageGetter; protected final SerializerFixedBinaryLength keySuffixSerializer; + @NotNull protected final Buffer keyPrefix; protected final int keyPrefixLength; protected final int keySuffixLength; @@ -90,27 +92,26 @@ public class DatabaseMapDictionaryDeep> implem } protected static Buffer zeroFillKeySuffixAndExt(BufferAllocator alloc, - @Nullable Send prefixKeySend, + @NotNull Send prefixKeySend, int prefixLength, int suffixLength, int extLength) { - try (var result = prefixKeySend == null ? null : prefixKeySend.receive()) { - if (result == null) { - assert prefixLength == 0; - var buf = alloc.allocate(prefixLength + suffixLength + extLength); - buf.writerOffset(prefixLength + suffixLength + extLength); - buf.fill((byte) 0); - return buf; - } else { - assert result.readableBytes() == prefixLength; - assert suffixLength > 0; - assert extLength >= 0; - result.ensureWritable(suffixLength + extLength, suffixLength + extLength, true); - for (int i = 0; i < suffixLength + extLength; i++) { - result.writeByte((byte) 0x0); - } - return result; + var result = prefixKeySend.receive(); + if (result == null) { + assert prefixLength == 0; + var buf = alloc.allocate(prefixLength + suffixLength + extLength); + buf.writerOffset(prefixLength + suffixLength + extLength); + buf.fill((byte) 0); + return buf; + } else { + assert result.readableBytes() == prefixLength; + assert suffixLength > 0; + assert extLength >= 0; + result.ensureWritable(suffixLength + extLength, suffixLength + extLength, true); + for (int i = 0; i < suffixLength + extLength; i++) { + result.writeByte((byte) 0x0); } + return result; } } @@ -175,7 +176,7 @@ public class DatabaseMapDictionaryDeep> implem public static DatabaseMapDictionaryDeep> simple(LLDictionary dictionary, SerializerFixedBinaryLength keySerializer, SubStageGetterSingle subStageGetter) { - return new DatabaseMapDictionaryDeep<>(dictionary, null, keySerializer, subStageGetter, 0); + return new DatabaseMapDictionaryDeep<>(dictionary, LLUtils.empty(dictionary.getAllocator()), keySerializer, subStageGetter, 0); } public static > DatabaseMapDictionaryDeep deepTail(LLDictionary dictionary, @@ -183,7 +184,7 @@ public class DatabaseMapDictionaryDeep> implem int keyExtLength, SubStageGetter subStageGetter) { return new DatabaseMapDictionaryDeep<>(dictionary, - null, + LLUtils.empty(dictionary.getAllocator()), keySerializer, subStageGetter, keyExtLength @@ -199,26 +200,26 @@ public class DatabaseMapDictionaryDeep> implem } protected DatabaseMapDictionaryDeep(LLDictionary dictionary, - @Nullable Send prefixKeyToReceive, + @NotNull Send prefixKeyToReceive, SerializerFixedBinaryLength keySuffixSerializer, SubStageGetter subStageGetter, int keyExtLength) { - try (var prefixKey = prefixKeyToReceive == null ? null : prefixKeyToReceive.receive()) { + try (var prefixKey = prefixKeyToReceive.receive()) { this.dictionary = dictionary; this.alloc = dictionary.getAllocator(); this.subStageGetter = subStageGetter; this.keySuffixSerializer = keySuffixSerializer; - assert prefixKey == null || prefixKey.isAccessible(); - this.keyPrefixLength = prefixKey == null ? 0 : prefixKey.readableBytes(); + assert prefixKey.isAccessible(); + this.keyPrefixLength = prefixKey.readableBytes(); this.keySuffixLength = keySuffixSerializer.getSerializedBinaryLength(); this.keyExtLength = keyExtLength; - Buffer firstKey = firstRangeKey(alloc, prefixKey == null ? null : prefixKey.copy().send(), keyPrefixLength, + Buffer firstKey = firstRangeKey(alloc, LLUtils.copy(alloc, prefixKey), keyPrefixLength, keySuffixLength, keyExtLength); try (firstKey) { - var nextRangeKey = nextRangeKey(alloc, prefixKey == null ? null : prefixKey.copy().send(), + var nextRangeKey = nextRangeKey(alloc, LLUtils.copy(alloc, prefixKey), keyPrefixLength, keySuffixLength, keyExtLength); try (nextRangeKey) { - assert prefixKey == null || prefixKey.isAccessible(); + assert prefixKey.isAccessible(); assert keyPrefixLength == 0 || !LLUtils.equals(firstKey, nextRangeKey); this.range = keyPrefixLength == 0 ? LLRange.all() : LLRange.of(firstKey.send(), nextRangeKey.send()); this.rangeMono = LLUtils.lazyRetainRange(this.range); @@ -226,7 +227,7 @@ public class DatabaseMapDictionaryDeep> implem } } - this.keyPrefix = prefixKey == null ? null : prefixKey.send().receive(); + this.keyPrefix = prefixKey.send().receive(); } } @@ -246,21 +247,28 @@ public class DatabaseMapDictionaryDeep> implem } /** - * Keep only suffix and ext + * Removes the prefix from the key */ - protected Send stripPrefix(Send keyToReceive) { - try (var key = keyToReceive.receive()) { - return key.copy(this.keyPrefixLength, key.readableBytes() - this.keyPrefixLength).send(); - } + protected void removePrefix(Buffer key) { + assert key.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength; + key.readerOffset(key.readerOffset() + this.keyPrefixLength).compact(); + assert key.readableBytes() == keySuffixLength + keyExtLength; } /** - * Add prefix to suffix + * Removes the ext from the key */ + protected void removeExt(Buffer key) { + assert key.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength; + key.writerOffset(keyPrefixLength + keySuffixLength).compact(); + assert key.readableBytes() == keyPrefixLength + keySuffixLength; + } + protected Send toKeyWithoutExt(Send suffixKeyToReceive) { try (var suffixKey = suffixKeyToReceive.receive()) { assert suffixKey.readableBytes() == keySuffixLength; - try (Buffer result = LLUtils.compositeBuffer(alloc, keyPrefix.copy().send(), suffixKey.send())) { + try (var result = Objects.requireNonNull(LLUtils.compositeBuffer(alloc, + LLUtils.copy(alloc, keyPrefix), suffixKey.send()))) { assert result.readableBytes() == keyPrefixLength + keySuffixLength; return result.send(); } @@ -287,8 +295,9 @@ public class DatabaseMapDictionaryDeep> implem @Override public Mono at(@Nullable CompositeSnapshot snapshot, T keySuffix) { + var suffixKeyWithoutExt = Mono.fromCallable(() -> toKeyWithoutExt(serializeSuffix(keySuffix))); return this.subStageGetter - .subStage(dictionary, snapshot, Mono.fromCallable(() -> toKeyWithoutExt(serializeSuffix(keySuffix).send()))) + .subStage(dictionary, snapshot, suffixKeyWithoutExt) .transform(LLUtils::handleDiscard) .doOnDiscard(DatabaseStage.class, DatabaseStage::release); } @@ -310,11 +319,10 @@ public class DatabaseMapDictionaryDeep> implem .flatMapSequential(groupKeyWithoutExtSend_ -> Mono.using( groupKeyWithoutExtSend_::receive, groupKeyWithoutExtSend -> this.subStageGetter - .subStage(dictionary, snapshot, getGroupKeyWithoutExt(groupKeyWithoutExtSend.copy().send())) + .subStage(dictionary, snapshot, Mono.fromCallable(() -> groupKeyWithoutExtSend.copy().send())) .>handle((us, sink) -> { try { - sink.next(Map.entry(this.deserializeSuffix(getGroupSuffix(groupKeyWithoutExtSend.send())), - us)); + sink.next(Map.entry(this.deserializeSuffix(getGroupSuffix(groupKeyWithoutExtSend.send())), us)); } catch (SerializationException ex) { sink.error(ex); } @@ -324,22 +332,22 @@ public class DatabaseMapDictionaryDeep> implem .transform(LLUtils::handleDiscard); } - private Send getGroupSuffix(Send groupKeyWithoutExtSend) { - try (var groupKeyWithoutExt = groupKeyWithoutExtSend.receive()) { - try (var groupSuffix = this.stripPrefix(groupKeyWithoutExt.copy().send()).receive()) { - assert subStageKeysConsistency(groupKeyWithoutExt.readableBytes() + keyExtLength); - return groupSuffix.send(); - } + private Send getGroupSuffix(Send groupKeyWithoutExt) { + try (var buffer = groupKeyWithoutExt.receive()) { + assert subStageKeysConsistency(buffer.readableBytes() + keyExtLength); + this.removePrefix(buffer); + assert subStageKeysConsistency(keyPrefixLength + buffer.readableBytes() + keyExtLength); + return buffer.send(); } } - private Mono> getGroupKeyWithoutExt(Send groupKeyWithoutExtSend) { - return Mono.fromCallable(() -> { - try (var groupKeyWithoutExt = groupKeyWithoutExtSend.receive()) { - assert subStageKeysConsistency(groupKeyWithoutExt.readableBytes() + keyExtLength); - return groupKeyWithoutExt.send(); - } - }); + private Send getGroupWithoutExt(Send groupKeyWithExtSend) { + try (var buffer = groupKeyWithExtSend.receive()) { + assert subStageKeysConsistency(buffer.readableBytes()); + this.removeExt(buffer); + assert subStageKeysConsistency(buffer.readableBytes() + keyExtLength); + return buffer.send(); + } } private boolean subStageKeysConsistency(int totalKeyLength) { @@ -383,7 +391,7 @@ public class DatabaseMapDictionaryDeep> implem } //todo: temporary wrapper. convert the whole class to buffers - protected T deserializeSuffix(Send keySuffixToReceive) throws SerializationException { + protected T deserializeSuffix(@NotNull Send keySuffixToReceive) throws SerializationException { try (var keySuffix = keySuffixToReceive.receive()) { assert suffixKeyConsistency(keySuffix.readableBytes()); var result = keySuffixSerializer.deserialize(keySuffix.send()); @@ -393,11 +401,15 @@ public class DatabaseMapDictionaryDeep> implem } //todo: temporary wrapper. convert the whole class to buffers - protected Buffer serializeSuffix(T keySuffix) throws SerializationException { - Buffer suffixData = keySuffixSerializer.serialize(keySuffix).receive(); - assert suffixKeyConsistency(suffixData.readableBytes()); - assert keyPrefix.isAccessible(); - return suffixData; + @NotNull + protected Send serializeSuffix(T keySuffix) throws SerializationException { + try (var suffixDataToReceive = keySuffixSerializer.serialize(keySuffix)) { + try (Buffer suffixData = suffixDataToReceive.receive()) { + assert suffixKeyConsistency(suffixData.readableBytes()); + assert keyPrefix.isAccessible(); + return suffixData.send(); + } + } } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java index 90f6e34..653cc35 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java @@ -6,6 +6,7 @@ import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.LLDictionary; +import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; @@ -58,7 +59,7 @@ public class DatabaseMapDictionaryHashed implements DatabaseStageMap keyHashSerializer) { return new DatabaseMapDictionaryHashed<>( dictionary, - null, + LLUtils.empty(dictionary.getAllocator()), keySerializer, valueSerializer, keyHashFunction, diff --git a/src/main/java/it/cavallium/dbengine/database/collections/ValueWithHashSerializer.java b/src/main/java/it/cavallium/dbengine/database/collections/ValueWithHashSerializer.java index f380b73..549852e 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/ValueWithHashSerializer.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/ValueWithHashSerializer.java @@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.collections; import io.net5.buffer.api.Buffer; import io.net5.buffer.api.BufferAllocator; +import io.net5.buffer.api.CompositeBuffer; import io.net5.buffer.api.Send; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.serialization.SerializationException; @@ -43,17 +44,9 @@ class ValueWithHashSerializer implements Serializer> { } @Override - public @Nullable Send serialize(@NotNull Entry deserialized) throws SerializationException { + public @NotNull Send serialize(@NotNull Entry deserialized) throws SerializationException { var keySuffix = keySuffixSerializer.serialize(deserialized.getKey()); var value = valueSerializer.serialize(deserialized.getValue()); - if (value == null && keySuffix == null) { - return null; - } else if (value == null) { - return keySuffix; - } else if (keySuffix == null) { - return value; - } else { - return LLUtils.compositeBuffer(allocator, keySuffix, value).send(); - } + return LLUtils.compositeBuffer(allocator, keySuffix, value).send(); } } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/ValuesSetSerializer.java b/src/main/java/it/cavallium/dbengine/database/collections/ValuesSetSerializer.java index 749ba0f..330ba68 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/ValuesSetSerializer.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/ValuesSetSerializer.java @@ -45,8 +45,8 @@ class ValuesSetSerializer implements Serializer> { output.writeInt(deserialized.size()); for (X entry : deserialized) { var serializedToReceive = entrySerializer.serialize(entry); - if (serializedToReceive != null) { - try (Buffer serialized = serializedToReceive.receive()) { + try (Buffer serialized = serializedToReceive.receive()) { + if (serialized.readableBytes() > 0) { output.ensureWritable(serialized.readableBytes()); output.writeBytes(serialized); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index f4165da..8c53e41 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -267,10 +267,23 @@ public class LLLocalDictionary implements LLDictionary { stamp = 0; } try { + Buffer logKey; if (logger.isTraceEnabled(MARKER_ROCKSDB)) { - logger.trace(MARKER_ROCKSDB, "Reading {}", LLUtils.toStringSafe(key)); + logKey = key.copy(); + } else { + logKey = null; + } + try (logKey) { + var result = dbGet(cfh, resolveSnapshot(snapshot), key.send(), existsAlmostCertainly); + if (logger.isTraceEnabled(MARKER_ROCKSDB)) { + try (var result2 = result == null ? null : result.receive()) { + logger.trace(MARKER_ROCKSDB, "Reading {}: {}", LLUtils.toStringSafe(logKey), LLUtils.toString(result2)); + return result2 == null ? null : result2.send(); + } + } else { + return result; + } } - return dbGet(cfh, resolveSnapshot(snapshot), key.send(), existsAlmostCertainly); } finally { if (updateMode == UpdateMode.ALLOW) { lock.unlockRead(stamp); @@ -414,6 +427,8 @@ public class LLLocalDictionary implements LLDictionary { if (Schedulers.isInNonBlockingThread()) { throw new UnsupportedOperationException("Called dbPut in a nonblocking thread"); } + assert key.isAccessible(); + assert value.isAccessible(); if (databaseOptions.allowNettyDirect()) { var keyNioBuffer = LLUtils.convertToReadableDirect(alloc, key.send()); try (var ignored1 = keyNioBuffer.buffer().receive()) { @@ -592,6 +607,8 @@ public class LLLocalDictionary implements LLDictionary { valueSend -> this.>runOnDb(() -> { try (var key = keySend.receive()) { try (var value = valueSend.receive()) { + assert key.isAccessible(); + assert value.isAccessible(); StampedLock lock; long stamp; if (updateMode == UpdateMode.ALLOW) { @@ -656,9 +673,6 @@ public class LLLocalDictionary implements LLDictionary { stamp = 0; } try { - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Reading {}", LLUtils.toStringSafe(key)); - } while (true) { @Nullable Buffer prevData; var prevDataHolder = existsAlmostCertainly ? null : new Holder(); @@ -682,19 +696,37 @@ public class LLLocalDictionary implements LLDictionary { } else { prevData = null; } + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, + "Reading {}: {} (before update)", + LLUtils.toStringSafe(key), + LLUtils.toStringSafe(prevData) + ); + } try { @Nullable Buffer newData; try (Buffer prevDataToSendToUpdater = prevData == null ? null : prevData.copy()) { - try (var newDataToReceive = updater.apply( - prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send())) { - if (newDataToReceive != null) { - newData = newDataToReceive.receive(); - } else { - newData = null; + try (var sentData = prevDataToSendToUpdater == null ? null + : prevDataToSendToUpdater.send()) { + try (var newDataToReceive = updater.apply(sentData)) { + if (newDataToReceive != null) { + newData = newDataToReceive.receive(); + } else { + newData = null; + } } } } + assert newData == null || newData.isAccessible(); try { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, + "Updating {}. previous data: {}, updated data: {}", + LLUtils.toStringSafe(key), + LLUtils.toStringSafe(prevData), + LLUtils.toStringSafe(newData) + ); + } if (prevData != null && newData == null) { //noinspection DuplicatedCode if (updateMode == UpdateMode.ALLOW) { @@ -709,7 +741,7 @@ public class LLLocalDictionary implements LLDictionary { } } if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Deleting {}", LLUtils.toStringSafe(key)); + logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key)); } dbDelete(cfh, null, key.send()); } else if (newData != null @@ -727,7 +759,11 @@ public class LLLocalDictionary implements LLDictionary { } } if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Writing {}: {}", LLUtils.toStringSafe(key), LLUtils.toStringSafe(newData)); + logger.trace(MARKER_ROCKSDB, + "Writing {}: {} (after update)", + LLUtils.toStringSafe(key), + LLUtils.toStringSafe(newData) + ); } Buffer dataToPut; if (updateReturnMode == UpdateReturnMode.GET_NEW_VALUE) { @@ -779,7 +815,7 @@ public class LLLocalDictionary implements LLDictionary { SerializationFunction<@Nullable Send, @Nullable Send> updater, boolean existsAlmostCertainly) { return Mono.usingWhen(keyMono, - keySend -> this.runOnDb(() -> { + keySend -> runOnDb(() -> { try (var key = keySend.receive()) { if (Schedulers.isInNonBlockingThread()) { throw new UnsupportedOperationException("Called update in a nonblocking thread"); @@ -799,7 +835,7 @@ public class LLLocalDictionary implements LLDictionary { } try { if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Reading {}", LLUtils.toStringSafe(key)); + logger.trace(MARKER_ROCKSDB, "Reading {} (before update)", LLUtils.toStringSafe(key)); } while (true) { @Nullable Buffer prevData; @@ -824,19 +860,37 @@ public class LLLocalDictionary implements LLDictionary { } else { prevData = null; } + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, + "Read {}: {} (before update)", + LLUtils.toStringSafe(key), + LLUtils.toStringSafe(prevData) + ); + } try { @Nullable Buffer newData; try (Buffer prevDataToSendToUpdater = prevData == null ? null : prevData.copy()) { - try (var newDataToReceive = updater.apply( - prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send())) { - if (newDataToReceive != null) { - newData = newDataToReceive.receive(); - } else { - newData = null; + try (var sentData = prevDataToSendToUpdater == null ? null + : prevDataToSendToUpdater.send()) { + try (var newDataToReceive = updater.apply(sentData)) { + if (newDataToReceive != null) { + newData = newDataToReceive.receive(); + } else { + newData = null; + } } } } + assert newData == null || newData.isAccessible(); try { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, + "Updating {}. previous data: {}, updated data: {}", + LLUtils.toStringSafe(key), + LLUtils.toStringSafe(prevData), + LLUtils.toStringSafe(newData) + ); + } if (prevData != null && newData == null) { //noinspection DuplicatedCode if (updateMode == UpdateMode.ALLOW) { @@ -851,7 +905,7 @@ public class LLLocalDictionary implements LLDictionary { } } if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Deleting {}", LLUtils.toStringSafe(key)); + logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key)); } dbDelete(cfh, null, key.send()); } else if (newData != null @@ -869,8 +923,11 @@ public class LLLocalDictionary implements LLDictionary { } } if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Writing {}: {}", - LLUtils.toStringSafe(key), LLUtils.toStringSafe(newData)); + logger.trace(MARKER_ROCKSDB, + "Writing {}: {} (after update)", + LLUtils.toStringSafe(key), + LLUtils.toStringSafe(newData) + ); } assert key.isAccessible(); assert newData.isAccessible(); @@ -986,18 +1043,24 @@ public class LLLocalDictionary implements LLDictionary { stamp = 0; } try { - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Reading {}", LLUtils.toArray(key)); - } var data = new Holder(); + Buffer bufferResult; if (db.keyMayExist(cfh, LLUtils.toArray(key), data)) { if (data.getValue() != null) { - return LLUtils.fromByteArray(alloc, data.getValue()).send(); + bufferResult = LLUtils.fromByteArray(alloc, data.getValue()); } else { - return dbGet(cfh, null, key.send(), true); + try (var bufferResultToReceive = dbGet(cfh, null, key.send(), true)) { + bufferResult = bufferResultToReceive == null ? null : bufferResultToReceive.receive(); + } } } else { - return null; + bufferResult = null; + } + try (bufferResult) { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Reading {}: {}", LLUtils.toStringSafe(key), LLUtils.toStringSafe(bufferResult)); + } + return bufferResult == null ? null : bufferResult.send(); } } finally { if (updateMode == UpdateMode.ALLOW) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/MemorySegmentUtils.java b/src/main/java/it/cavallium/dbengine/database/disk/MemorySegmentUtils.java index 608404e..c283149 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/MemorySegmentUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/MemorySegmentUtils.java @@ -124,6 +124,6 @@ public class MemorySegmentUtils { } public static String getSuggestedArgs() { - return "--enable-preview --add-modules jdk.incubator.foreign -Dforeign.restricted=permit --enable-native-access"; + return "--enable-preview --add-modules jdk.incubator.foreign -Dforeign.restricted=permit --enable-native-access=ALL-UNNAMED"; } } diff --git a/src/main/java/it/cavallium/dbengine/database/serialization/Serializer.java b/src/main/java/it/cavallium/dbengine/database/serialization/Serializer.java index cd31dc2..4ffb437 100644 --- a/src/main/java/it/cavallium/dbengine/database/serialization/Serializer.java +++ b/src/main/java/it/cavallium/dbengine/database/serialization/Serializer.java @@ -4,6 +4,7 @@ import io.net5.buffer.api.Buffer; import io.net5.buffer.api.BufferAllocator; import io.net5.buffer.api.Send; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.netty.NullableBuffer; import java.nio.charset.StandardCharsets; import java.util.Objects; import org.jetbrains.annotations.NotNull; @@ -13,16 +14,16 @@ public interface Serializer { record DeserializationResult(T deserializedData, int bytesRead) {} - @NotNull DeserializationResult deserialize(@Nullable Send serialized) throws SerializationException; + @NotNull DeserializationResult deserialize(@NotNull Send serialized) throws SerializationException; - @Nullable Send serialize(@NotNull A deserialized) throws SerializationException; + @NotNull Send serialize(@NotNull A deserialized) throws SerializationException; Serializer> NOOP_SERIALIZER = new Serializer<>() { @Override - public @NotNull DeserializationResult> deserialize(@Nullable Send serialized) { - try (var serializedBuf = serialized == null ? null : serialized.receive()) { - var readableBytes = serializedBuf == null ? 0 : serializedBuf.readableBytes(); - return new DeserializationResult<>(serializedBuf == null ? null : serializedBuf.send(), readableBytes); + public @NotNull DeserializationResult> deserialize(@NotNull Send serialized) { + try (var serializedBuf = serialized.receive()) { + var readableBytes = serializedBuf.readableBytes(); + return new DeserializationResult<>(serializedBuf.send(), readableBytes); } } diff --git a/src/main/java/it/cavallium/dbengine/database/serialization/SerializerFixedBinaryLength.java b/src/main/java/it/cavallium/dbengine/database/serialization/SerializerFixedBinaryLength.java index d744aad..78fe656 100644 --- a/src/main/java/it/cavallium/dbengine/database/serialization/SerializerFixedBinaryLength.java +++ b/src/main/java/it/cavallium/dbengine/database/serialization/SerializerFixedBinaryLength.java @@ -17,10 +17,7 @@ public interface SerializerFixedBinaryLength extends Serializer { static SerializerFixedBinaryLength> noop(int length) { return new SerializerFixedBinaryLength<>() { @Override - public @NotNull DeserializationResult> deserialize(@Nullable Send serialized) { - if (length == 0 && serialized == null) { - return new DeserializationResult<>(null, 0); - } + public @NotNull DeserializationResult> deserialize(@NotNull Send serialized) { Objects.requireNonNull(serialized); try (var buf = serialized.receive()) { if (buf.readableBytes() != getSerializedBinaryLength()) { @@ -55,12 +52,8 @@ public interface SerializerFixedBinaryLength extends Serializer { static SerializerFixedBinaryLength utf8(BufferAllocator allocator, int length) { return new SerializerFixedBinaryLength<>() { @Override - public @NotNull DeserializationResult deserialize(@Nullable Send serializedToReceive) + public @NotNull DeserializationResult deserialize(@NotNull Send serializedToReceive) throws SerializationException { - if (length == 0 && serializedToReceive == null) { - return new DeserializationResult<>(null, 0); - } - Objects.requireNonNull(serializedToReceive); try (var serialized = serializedToReceive.receive()) { if (serialized.readableBytes() != getSerializedBinaryLength()) { throw new SerializationException( @@ -78,7 +71,9 @@ public interface SerializerFixedBinaryLength extends Serializer { // UTF-8 uses max. 3 bytes per char, so calculate the worst case. try (Buffer buf = allocator.allocate(LLUtils.utf8MaxBytes(deserialized))) { assert buf.isAccessible(); - buf.writeBytes(deserialized.getBytes(StandardCharsets.UTF_8)); + var bytes = deserialized.getBytes(StandardCharsets.UTF_8); + buf.ensureWritable(bytes.length); + buf.writeBytes(bytes); if (buf.readableBytes() != getSerializedBinaryLength()) { throw new SerializationException("Fixed serializer with " + getSerializedBinaryLength() + " bytes has tried to serialize an element with " @@ -99,10 +94,7 @@ public interface SerializerFixedBinaryLength extends Serializer { static SerializerFixedBinaryLength intSerializer(BufferAllocator allocator) { return new SerializerFixedBinaryLength<>() { @Override - public @NotNull DeserializationResult deserialize(@Nullable Send serializedToReceive) { - if (getSerializedBinaryLength() == 0 && serializedToReceive == null) { - return new DeserializationResult<>(null, 0); - } + public @NotNull DeserializationResult deserialize(@NotNull Send serializedToReceive) { Objects.requireNonNull(serializedToReceive); try (var serialized = serializedToReceive.receive()) { if (serialized.readableBytes() != getSerializedBinaryLength()) { @@ -131,10 +123,7 @@ public interface SerializerFixedBinaryLength extends Serializer { static SerializerFixedBinaryLength longSerializer(BufferAllocator allocator) { return new SerializerFixedBinaryLength<>() { @Override - public @NotNull DeserializationResult deserialize(@Nullable Send serializedToReceive) { - if (getSerializedBinaryLength() == 0 && serializedToReceive == null) { - return new DeserializationResult<>(null, 0); - } + public @NotNull DeserializationResult deserialize(@NotNull Send serializedToReceive) { Objects.requireNonNull(serializedToReceive); try (var serialized = serializedToReceive.receive()) { if (serialized.readableBytes() != getSerializedBinaryLength()) { diff --git a/src/main/java/it/cavallium/dbengine/netty/NullableBuffer.java b/src/main/java/it/cavallium/dbengine/netty/NullableBuffer.java new file mode 100644 index 0000000..c1e3eef --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/netty/NullableBuffer.java @@ -0,0 +1,74 @@ +package it.cavallium.dbengine.netty; + +import io.net5.buffer.api.Buffer; +import io.net5.buffer.api.Drop; +import io.net5.buffer.api.Owned; +import io.net5.buffer.api.Send; +import io.net5.buffer.api.internal.ResourceSupport; +import it.cavallium.dbengine.client.SearchResult; +import org.jetbrains.annotations.Nullable; + +public class NullableBuffer extends ResourceSupport { + + @Nullable + private Buffer buffer; + + public NullableBuffer(@Nullable Buffer buffer, Drop drop) { + super(new CloseOnDrop(drop)); + this.buffer = buffer == null ? null : buffer.send().receive(); + } + + public NullableBuffer(@Nullable Send buffer, Drop drop) { + super(new CloseOnDrop(drop)); + this.buffer = buffer == null ? null : buffer.receive(); + } + + @Nullable + public Buffer buf() { + return buffer; + } + + @Nullable + public Send sendBuf() { + return buffer == null ? null : buffer.send(); + } + + @Override + protected RuntimeException createResourceClosedException() { + return new IllegalStateException("Closed"); + } + + @Override + protected Owned prepareSend() { + var buffer = this.buffer == null ? null : this.buffer.send(); + makeInaccessible(); + return drop -> new NullableBuffer(buffer, drop); + } + + private void makeInaccessible() { + this.buffer = null; + } + + private static class CloseOnDrop implements Drop { + + private final Drop delegate; + + public CloseOnDrop(Drop drop) { + this.delegate = drop; + } + + @Override + public void drop(NullableBuffer obj) { + try { + if (obj.buffer != null) { + if (obj.buffer.isAccessible()) { + obj.buffer.close(); + } + } + delegate.drop(obj); + } finally { + obj.makeInaccessible(); + } + } + } +} diff --git a/src/test/java/it/cavallium/dbengine/DbTestUtils.java b/src/test/java/it/cavallium/dbengine/DbTestUtils.java index afff40e..c16b139 100644 --- a/src/test/java/it/cavallium/dbengine/DbTestUtils.java +++ b/src/test/java/it/cavallium/dbengine/DbTestUtils.java @@ -32,6 +32,7 @@ import org.jetbrains.annotations.Nullable; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; public class DbTestUtils { @@ -42,6 +43,38 @@ public class DbTestUtils { return "0123456789".repeat(1024); } + public static void run(Flux publisher) { + publisher.subscribeOn(Schedulers.immediate()).blockLast(); + } + + public static void runVoid(Mono publisher) { + publisher.then().subscribeOn(Schedulers.immediate()).block(); + } + + public static T run(Mono publisher) { + return publisher.subscribeOn(Schedulers.immediate()).block(); + } + + public static T run(boolean shouldFail, Mono publisher) { + return publisher.subscribeOn(Schedulers.immediate()).transform(mono -> { + if (shouldFail) { + return mono.onErrorResume(ex -> Mono.empty()); + } else { + return mono; + } + }).block(); + } + + public static void runVoid(boolean shouldFail, Mono publisher) { + publisher.then().subscribeOn(Schedulers.immediate()).transform(mono -> { + if (shouldFail) { + return mono.onErrorResume(ex -> Mono.empty()); + } else { + return mono; + } + }).block(); + } + public static record TestAllocator(PooledBufferAllocator allocator) {} public static TestAllocator newAllocator() { diff --git a/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java b/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java index f8ef0d1..f56ce1d 100644 --- a/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java +++ b/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java @@ -19,15 +19,20 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.warp.commonutils.log.Logger; +import org.warp.commonutils.log.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; import reactor.test.StepVerifier.Step; +import reactor.test.util.TestLogger; +import reactor.util.Loggers; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; public abstract class TestDictionaryMap { + private static final Logger log = LoggerFactory.getLogger(TestDictionaryMap.class); private TestAllocator allocator; private boolean checkLeaks = true; @@ -97,21 +102,25 @@ public abstract class TestDictionaryMap { @ParameterizedTest @MethodSource("provideArgumentsPut") public void testPut(MapType mapType, UpdateMode updateMode, String key, String value, boolean shouldFail) { - var stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) - .flatMap(map -> map - .putValue(key, value) - .then(map.getValue(null, key)) - .doAfterTerminate(map::release) - ) - )); - if (shouldFail) { - this.checkLeaks = false; - stpVer.verifyError(); - } else { - stpVer.expectNext(value).verifyComplete(); - } + var gen = getTempDbGenerator(); + var db = run(gen.openTempDb(allocator)); + var dict = run(tempDictionary(db.db(), updateMode)); + var map = tempDatabaseMapDictionaryMap(dict, mapType, 5); + + runVoid(shouldFail, map.putValue(key, value)); + + var resultingMapSize = run(map.leavesCount(null, false)); + Assertions.assertEquals(shouldFail ? 0 : 1, resultingMapSize); + + var resultingMap = run(map.get(null)); + Assertions.assertEquals(shouldFail ? null : Map.of(key, value), resultingMap); + + runVoid(map.close()); + map.release(); + + //if (shouldFail) this.checkLeaks = false; + + gen.closeTempDb(db); } @ParameterizedTest @@ -257,26 +266,50 @@ public abstract class TestDictionaryMap { .map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) .flatMapMany(map -> Flux .concat( - map.updateValue(key, old -> { - assert old == null; - return "error?"; - }).then(map.getValue(null, key)), - map.updateValue(key, false, old -> { - assert Objects.equals(old, "error?"); - return "error?"; - }).then(map.getValue(null, key)), - map.updateValue(key, true, old -> { - assert Objects.equals(old, "error?"); - return "error?"; - }).then(map.getValue(null, key)), - map.updateValue(key, true, old -> { - assert Objects.equals(old, "error?"); - return value; - }).then(map.getValue(null, key)), - map.updateValue(key, true, old -> { - assert Objects.equals(old, value); - return value; - }).then(map.getValue(null, key)) + Mono + .fromRunnable(() -> log.debug("1. Updating value: {}", key)) + .then(map.updateValue(key, old -> { + assert old == null; + return "error?"; + })) + .doOnSuccess(s -> log.debug("1. Getting value: {}", key)) + .then(map.getValue(null, key)), + + Mono + .fromRunnable(() -> log.debug("2. Updating value: {}", key)) + .then(map.updateValue(key, false, old -> { + assert Objects.equals(old, "error?"); + return "error?"; + })) + .doOnSuccess(s -> log.debug("2. Getting value: {}", key)) + .then(map.getValue(null, key)), + + Mono + .fromRunnable(() -> log.debug("3. Updating value: {}", key)) + .then(map.updateValue(key, true, old -> { + assert Objects.equals(old, "error?"); + return "error?"; + })) + .doOnSuccess(s -> log.debug("3. Getting value: {}", key)) + .then(map.getValue(null, key)), + + Mono + .fromRunnable(() -> log.debug("4. Updating value: {}", key)) + .then(map.updateValue(key, true, old -> { + assert Objects.equals(old, "error?"); + return value; + })) + .doOnSuccess(s -> log.debug("4. Getting value: {}", key)) + .then(map.getValue(null, key)), + + Mono + .fromRunnable(() -> log.debug("5. Updating value: {}", key)) + .then(map.updateValue(key, true, old -> { + assert Objects.equals(old, value); + return value; + })) + .doOnSuccess(s -> log.debug("5. Getting value: {}", key)) + .then(map.getValue(null, key)) ) .doAfterTerminate(map::release) ) diff --git a/src/test/java/it/cavallium/dbengine/TestLLDictionary.java b/src/test/java/it/cavallium/dbengine/TestLLDictionary.java new file mode 100644 index 0000000..05fded5 --- /dev/null +++ b/src/test/java/it/cavallium/dbengine/TestLLDictionary.java @@ -0,0 +1,311 @@ +package it.cavallium.dbengine; + +import static it.cavallium.dbengine.DbTestUtils.destroyAllocator; +import static it.cavallium.dbengine.DbTestUtils.ensureNoLeaks; +import static it.cavallium.dbengine.DbTestUtils.newAllocator; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import io.net5.buffer.api.Buffer; +import io.net5.buffer.api.Send; +import it.cavallium.dbengine.DbTestUtils.TempDb; +import it.cavallium.dbengine.DbTestUtils.TestAllocator; +import it.cavallium.dbengine.database.LLDictionary; +import it.cavallium.dbengine.database.LLDictionaryResultType; +import it.cavallium.dbengine.database.LLKeyValueDatabase; +import it.cavallium.dbengine.database.LLRange; +import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.UpdateMode; +import it.cavallium.dbengine.database.UpdateReturnMode; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Objects; +import java.util.stream.Stream; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.warp.commonutils.log.Logger; +import org.warp.commonutils.log.LoggerFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +public abstract class TestLLDictionary { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + private static final Mono> RANGE_ALL = Mono.fromCallable(() -> LLRange.all().send()); + private TestAllocator allocator; + private TempDb tempDb; + private LLKeyValueDatabase db; + + protected abstract TemporaryDbGenerator getTempDbGenerator(); + + @BeforeEach + public void beforeEach() { + this.allocator = newAllocator(); + ensureNoLeaks(allocator.allocator(), false, false); + tempDb = Objects.requireNonNull(getTempDbGenerator().openTempDb(allocator).block(), "TempDB"); + db = tempDb.db(); + } + + @AfterEach + public void afterEach() { + getTempDbGenerator().closeTempDb(tempDb).block(); + ensureNoLeaks(allocator.allocator(), true, false); + destroyAllocator(allocator); + } + + public static Stream provideArguments() { + return Arrays.stream(UpdateMode.values()).map(Arguments::of); + } + + public static Stream providePutArguments() { + var updateModes = Arrays.stream(UpdateMode.values()); + return updateModes.flatMap(updateMode -> { + var resultTypes = Arrays.stream(LLDictionaryResultType.values()); + return resultTypes.map(resultType -> Arguments.of(updateMode, resultType)); + }); + } + + public static Stream provideUpdateArguments() { + var updateModes = Arrays.stream(UpdateMode.values()); + return updateModes.flatMap(updateMode -> { + var resultTypes = Arrays.stream(UpdateReturnMode.values()); + return resultTypes.map(resultType -> Arguments.of(updateMode, resultType)); + }); + } + + private LLDictionary getDict(UpdateMode updateMode) { + var dict = DbTestUtils.tempDictionary(db, updateMode).blockOptional().orElseThrow(); + var key1 = Mono.fromCallable(() -> fromString("test-key-1")); + var key2 = Mono.fromCallable(() -> fromString("test-key-2")); + var key3 = Mono.fromCallable(() -> fromString("test-key-3")); + var key4 = Mono.fromCallable(() -> fromString("test-key-4")); + var value = Mono.fromCallable(() -> fromString("test-value")); + dict.put(key1, value, LLDictionaryResultType.VOID).block(); + dict.put(key2, value, LLDictionaryResultType.VOID).block(); + dict.put(key3, value, LLDictionaryResultType.VOID).block(); + dict.put(key4, value, LLDictionaryResultType.VOID).block(); + return dict; + } + + private Send fromString(String s) { + var sb = s.getBytes(StandardCharsets.UTF_8); + try (var b = db.getAllocator().allocate(sb.length + 3 + 13)) { + assert b.writerOffset() == 0; + assert b.readerOffset() == 0; + b.writerOffset(3).writeBytes(sb); + b.readerOffset(3); + assert b.readableBytes() == sb.length; + + var part1 = b.split(); + + return LLUtils.compositeBuffer(db.getAllocator(), part1.send(), b.send()).send(); + } + } + + private String toString(Send b) { + try (var bb = b.receive()) { + byte[] data = new byte[bb.readableBytes()]; + bb.copyInto(bb.readerOffset(), data, 0, data.length); + return new String(data, StandardCharsets.UTF_8); + } + } + + private void run(Flux publisher) { + publisher.subscribeOn(Schedulers.immediate()).blockLast(); + } + + private void runVoid(Mono publisher) { + publisher.then().subscribeOn(Schedulers.immediate()).block(); + } + + private T run(Mono publisher) { + return publisher.subscribeOn(Schedulers.immediate()).block(); + } + + private T run(boolean shouldFail, Mono publisher) { + return publisher.subscribeOn(Schedulers.immediate()).transform(mono -> { + if (shouldFail) { + return mono.onErrorResume(ex -> Mono.empty()); + } else { + return mono; + } + }).block(); + } + + private void runVoid(boolean shouldFail, Mono publisher) { + publisher.then().subscribeOn(Schedulers.immediate()).transform(mono -> { + if (shouldFail) { + return mono.onErrorResume(ex -> Mono.empty()); + } else { + return mono; + } + }).block(); + } + + @Test + public void testNoOp() { + } + + @Test + public void testNoOpAllocation() { + for (int i = 0; i < 10; i++) { + var a = allocator.allocator().allocate(i * 512); + a.send().receive().close(); + } + } + + @ParameterizedTest + @MethodSource("provideArguments") + public void testGetDict(UpdateMode updateMode) { + var dict = getDict(updateMode); + Assertions.assertNotNull(dict); + } + + @ParameterizedTest + @MethodSource("provideArguments") + public void testGetColumnName(UpdateMode updateMode) { + var dict = getDict(updateMode); + Assertions.assertEquals("hash_map_testmap", dict.getColumnName()); + } + + @ParameterizedTest + @MethodSource("provideArguments") + public void testGetAllocator(UpdateMode updateMode) { + var dict = getDict(updateMode); + var alloc = dict.getAllocator(); + Assertions.assertEquals(alloc, alloc); + } + + @ParameterizedTest + @MethodSource("provideArguments") + public void testGet(UpdateMode updateMode) { + var dict = getDict(updateMode); + var keyEx = Mono.fromCallable(() -> fromString("test-key-1")); + var keyNonEx = Mono.fromCallable(() -> fromString("test-nonexistent")); + Assertions.assertEquals("test-value", run(dict.get(null, keyEx).map(this::toString).transform(LLUtils::handleDiscard))); + Assertions.assertEquals("test-value", run(dict.get(null, keyEx, true).map(this::toString).transform(LLUtils::handleDiscard))); + Assertions.assertEquals("test-value", run(dict.get(null, keyEx, false).map(this::toString).transform(LLUtils::handleDiscard))); + Assertions.assertEquals((String) null, run(dict.get(null, keyNonEx).map(this::toString).transform(LLUtils::handleDiscard))); + Assertions.assertEquals((String) null, run(dict.get(null, keyNonEx, true).map(this::toString).transform(LLUtils::handleDiscard))); + Assertions.assertEquals((String) null, run(dict.get(null, keyNonEx, false).map(this::toString).transform(LLUtils::handleDiscard))); + } + + @ParameterizedTest + @MethodSource("providePutArguments") + public void testPutExisting(UpdateMode updateMode, LLDictionaryResultType resultType) { + var dict = getDict(updateMode); + var keyEx = Mono.fromCallable(() -> fromString("test-key-1")); + var value = Mono.fromCallable(() -> fromString("test-value")); + + var beforeSize = run(dict.sizeRange(null, RANGE_ALL, false)); + + runVoid(dict.put(keyEx, value, resultType).then().doOnDiscard(Send.class, Send::close)); + + var afterSize = run(dict.sizeRange(null, RANGE_ALL, false)); + Assertions.assertEquals(0, afterSize - beforeSize); + } + + @ParameterizedTest + @MethodSource("providePutArguments") + public void testPutNew(UpdateMode updateMode, LLDictionaryResultType resultType) { + var dict = getDict(updateMode); + var keyNonEx = Mono.fromCallable(() -> fromString("test-nonexistent")); + var value = Mono.fromCallable(() -> fromString("test-value")); + + var beforeSize = run(dict.sizeRange(null, RANGE_ALL, false)); + + runVoid(dict.put(keyNonEx, value, resultType).then().doOnDiscard(Send.class, Send::close)); + + var afterSize = run(dict.sizeRange(null, Mono.fromCallable(() -> LLRange.all().send()), false)); + Assertions.assertEquals(1, afterSize - beforeSize); + + Assertions.assertTrue(run(dict.getRangeKeys(null, RANGE_ALL).map(this::toString).collectList()).contains("test-nonexistent")); + } + + @ParameterizedTest + @MethodSource("provideArguments") + public void testGetUpdateMode(UpdateMode updateMode) { + var dict = getDict(updateMode); + assertEquals(updateMode, run(dict.getUpdateMode())); + } + + @ParameterizedTest + @MethodSource("provideUpdateArguments") + public void testUpdateExisting(UpdateMode updateMode, UpdateReturnMode updateReturnMode) { + var dict = getDict(updateMode); + var keyEx = Mono.fromCallable(() -> fromString("test-key-1")); + var beforeSize = run(dict.sizeRange(null, RANGE_ALL, false)); + long afterSize; + runVoid(updateMode == UpdateMode.DISALLOW, + dict.update(keyEx, old -> fromString("test-value"), updateReturnMode, true).then().transform(LLUtils::handleDiscard) + ); + afterSize = run(dict.sizeRange(null, RANGE_ALL, false)); + assertEquals(0, afterSize - beforeSize); + runVoid(updateMode == UpdateMode.DISALLOW, + dict.update(keyEx, old -> fromString("test-value"), updateReturnMode, false).then().transform(LLUtils::handleDiscard) + ); + afterSize = run(dict.sizeRange(null, RANGE_ALL, false)); + assertEquals(0, afterSize - beforeSize); + runVoid(updateMode == UpdateMode.DISALLOW, + dict.update(keyEx, old -> fromString("test-value"), updateReturnMode).then().transform(LLUtils::handleDiscard) + ); + afterSize = run(dict.sizeRange(null, RANGE_ALL, false)); + assertEquals(0, afterSize - beforeSize); + } + + @ParameterizedTest + @MethodSource("provideUpdateArguments") + public void testUpdateNew(UpdateMode updateMode, UpdateReturnMode updateReturnMode) { + int expected = updateMode == UpdateMode.DISALLOW ? 0 : 1; + var dict = getDict(updateMode); + var keyNonEx = Mono.fromCallable(() -> fromString("test-nonexistent")); + var beforeSize = run(dict.sizeRange(null, RANGE_ALL, false)); + long afterSize; + runVoid(updateMode == UpdateMode.DISALLOW, + dict.update(keyNonEx, old -> fromString("test-value"), updateReturnMode, true).then().transform(LLUtils::handleDiscard) + ); + afterSize = run(dict.sizeRange(null, RANGE_ALL, false)); + assertEquals(expected, afterSize - beforeSize); + runVoid(updateMode == UpdateMode.DISALLOW, + dict.update(keyNonEx, old -> fromString("test-value"), updateReturnMode, false).then().transform(LLUtils::handleDiscard) + ); + afterSize = run(dict.sizeRange(null, RANGE_ALL, false)); + assertEquals(expected, afterSize - beforeSize); + runVoid(updateMode == UpdateMode.DISALLOW, + dict.update(keyNonEx, old -> fromString("test-value"), updateReturnMode).then().transform(LLUtils::handleDiscard) + ); + afterSize = run(dict.sizeRange(null, RANGE_ALL, false)); + assertEquals(expected, afterSize - beforeSize); + + if (updateMode != UpdateMode.DISALLOW) { + Assertions.assertTrue(run(dict.getRangeKeys(null, RANGE_ALL).map(this::toString).collectList()).contains( + "test-nonexistent")); + } + } + + @ParameterizedTest + @MethodSource("provideArguments") + public void testUpdateAndGetDelta(UpdateMode updateMode) { + log.warn("Test not implemented"); + //todo: implement + } + + @ParameterizedTest + @MethodSource("provideArguments") + public void testClear(UpdateMode updateMode) { + log.warn("Test not implemented"); + //todo: implement + } + + @ParameterizedTest + @MethodSource("providePutArguments") + public void testRemove(UpdateMode updateMode, LLDictionaryResultType resultType) { + log.warn("Test not implemented"); + //todo: implement + } +} diff --git a/src/test/java/it/cavallium/dbengine/TestLLDictionaryLeaks.java b/src/test/java/it/cavallium/dbengine/TestLLDictionaryLeaks.java index 3a15e47..026107d 100644 --- a/src/test/java/it/cavallium/dbengine/TestLLDictionaryLeaks.java +++ b/src/test/java/it/cavallium/dbengine/TestLLDictionaryLeaks.java @@ -26,9 +26,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; public abstract class TestLLDictionaryLeaks { @@ -90,43 +88,12 @@ public abstract class TestLLDictionaryLeaks { private Send fromString(String s) { var sb = s.getBytes(StandardCharsets.UTF_8); try (var b = db.getAllocator().allocate(sb.length)) { - b.writeBytes(b); + b.writeBytes(sb); + assert b.readableBytes() == sb.length; return b.send(); } } - private void run(Flux publisher) { - publisher.subscribeOn(Schedulers.immediate()).blockLast(); - } - - private void runVoid(Mono publisher) { - publisher.then().subscribeOn(Schedulers.immediate()).block(); - } - - private T run(Mono publisher) { - return publisher.subscribeOn(Schedulers.immediate()).block(); - } - - private T run(boolean shouldFail, Mono publisher) { - return publisher.subscribeOn(Schedulers.immediate()).transform(mono -> { - if (shouldFail) { - return mono.onErrorResume(ex -> Mono.empty()); - } else { - return mono; - } - }).block(); - } - - private void runVoid(boolean shouldFail, Mono publisher) { - publisher.then().subscribeOn(Schedulers.immediate()).transform(mono -> { - if (shouldFail) { - return mono.onErrorResume(ex -> Mono.empty()); - } else { - return mono; - } - }).block(); - } - @Test public void testNoOp() { } @@ -164,9 +131,9 @@ public abstract class TestLLDictionaryLeaks { public void testGet(UpdateMode updateMode) { var dict = getDict(updateMode); var key = Mono.fromCallable(() -> fromString("test")); - runVoid(dict.get(null, key).then().transform(LLUtils::handleDiscard)); - runVoid(dict.get(null, key, true).then().transform(LLUtils::handleDiscard)); - runVoid(dict.get(null, key, false).then().transform(LLUtils::handleDiscard)); + DbTestUtils.runVoid(dict.get(null, key).then().transform(LLUtils::handleDiscard)); + DbTestUtils.runVoid(dict.get(null, key, true).then().transform(LLUtils::handleDiscard)); + DbTestUtils.runVoid(dict.get(null, key, false).then().transform(LLUtils::handleDiscard)); } @ParameterizedTest @@ -175,14 +142,14 @@ public abstract class TestLLDictionaryLeaks { var dict = getDict(updateMode); var key = Mono.fromCallable(() -> fromString("test-key")); var value = Mono.fromCallable(() -> fromString("test-value")); - runVoid(dict.put(key, value, resultType).then().doOnDiscard(Send.class, Send::close)); + DbTestUtils.runVoid(dict.put(key, value, resultType).then().doOnDiscard(Send.class, Send::close)); } @ParameterizedTest @MethodSource("provideArguments") public void testGetUpdateMode(UpdateMode updateMode) { var dict = getDict(updateMode); - assertEquals(updateMode, run(dict.getUpdateMode())); + assertEquals(updateMode, DbTestUtils.run(dict.getUpdateMode())); } @ParameterizedTest @@ -190,13 +157,13 @@ public abstract class TestLLDictionaryLeaks { public void testUpdate(UpdateMode updateMode, UpdateReturnMode updateReturnMode) { var dict = getDict(updateMode); var key = Mono.fromCallable(() -> fromString("test-key")); - runVoid(updateMode == UpdateMode.DISALLOW, + DbTestUtils.runVoid(updateMode == UpdateMode.DISALLOW, dict.update(key, old -> old, updateReturnMode, true).then().transform(LLUtils::handleDiscard) ); - runVoid(updateMode == UpdateMode.DISALLOW, + DbTestUtils.runVoid(updateMode == UpdateMode.DISALLOW, dict.update(key, old -> old, updateReturnMode, false).then().transform(LLUtils::handleDiscard) ); - runVoid(updateMode == UpdateMode.DISALLOW, + DbTestUtils.runVoid(updateMode == UpdateMode.DISALLOW, dict.update(key, old -> old, updateReturnMode).then().transform(LLUtils::handleDiscard) ); } @@ -206,13 +173,13 @@ public abstract class TestLLDictionaryLeaks { public void testUpdateAndGetDelta(UpdateMode updateMode) { var dict = getDict(updateMode); var key = Mono.fromCallable(() -> fromString("test-key")); - runVoid(updateMode == UpdateMode.DISALLOW, + DbTestUtils.runVoid(updateMode == UpdateMode.DISALLOW, dict.updateAndGetDelta(key, old -> old, true).then().transform(LLUtils::handleDiscard) ); - runVoid(updateMode == UpdateMode.DISALLOW, + DbTestUtils.runVoid(updateMode == UpdateMode.DISALLOW, dict.updateAndGetDelta(key, old -> old, false).then().transform(LLUtils::handleDiscard) ); - runVoid(updateMode == UpdateMode.DISALLOW, + DbTestUtils.runVoid(updateMode == UpdateMode.DISALLOW, dict.updateAndGetDelta(key, old -> old).then().transform(LLUtils::handleDiscard) ); } @@ -221,7 +188,7 @@ public abstract class TestLLDictionaryLeaks { @MethodSource("provideArguments") public void testClear(UpdateMode updateMode) { var dict = getDict(updateMode); - runVoid(dict.clear()); + DbTestUtils.runVoid(dict.clear()); } @ParameterizedTest @@ -229,6 +196,6 @@ public abstract class TestLLDictionaryLeaks { public void testRemove(UpdateMode updateMode, LLDictionaryResultType resultType) { var dict = getDict(updateMode); var key = Mono.fromCallable(() -> fromString("test-key")); - runVoid(dict.remove(key, resultType).then().doOnDiscard(Send.class, Send::close)); + DbTestUtils.runVoid(dict.remove(key, resultType).then().doOnDiscard(Send.class, Send::close)); } } diff --git a/src/test/java/it/cavallium/dbengine/TestLocalLLDictionary.java b/src/test/java/it/cavallium/dbengine/TestLocalLLDictionary.java new file mode 100644 index 0000000..abd01de --- /dev/null +++ b/src/test/java/it/cavallium/dbengine/TestLocalLLDictionary.java @@ -0,0 +1,11 @@ +package it.cavallium.dbengine; + +public class TestLocalLLDictionary extends TestLLDictionary { + + private static final TemporaryDbGenerator GENERATOR = new LocalTemporaryDbGenerator(); + + @Override + protected TemporaryDbGenerator getTempDbGenerator() { + return GENERATOR; + } +} diff --git a/src/test/java/it/cavallium/dbengine/TestMemoryLLDictionary.java b/src/test/java/it/cavallium/dbengine/TestMemoryLLDictionary.java new file mode 100644 index 0000000..ce032a6 --- /dev/null +++ b/src/test/java/it/cavallium/dbengine/TestMemoryLLDictionary.java @@ -0,0 +1,11 @@ +package it.cavallium.dbengine; + +public class TestMemoryLLDictionary extends TestLLDictionary { + + private static final TemporaryDbGenerator GENERATOR = new MemoryTemporaryDbGenerator(); + + @Override + protected TemporaryDbGenerator getTempDbGenerator() { + return GENERATOR; + } +} diff --git a/src/test/resources/log4j2.xml b/src/test/resources/log4j2.xml new file mode 100644 index 0000000..9731f37 --- /dev/null +++ b/src/test/resources/log4j2.xml @@ -0,0 +1,20 @@ + + + + + + + + + + + + + + \ No newline at end of file