diff --git a/pom.xml b/pom.xml
index ec68272..8aabc35 100644
--- a/pom.xml
+++ b/pom.xml
@@ -172,6 +172,11 @@
log4j-slf4j-impl
test
+
+ com.lmax
+ disruptor
+ 3.3.4
+
org.rocksdb
rocksdbjni
@@ -352,6 +357,11 @@
log4j-slf4j-impl
2.14.1
+
+ com.lmax
+ disruptor
+ test
+
org.rocksdb
rocksdbjni
@@ -542,7 +552,7 @@
- --enable-preview --add-modules jdk.incubator.foreign -Dforeign.restricted=permit --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED --enable-native-access
+ --enable-preview --add-modules jdk.incubator.foreign -Dforeign.restricted=permit --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED --enable-native-access=ALL-UNNAMED
ci
diff --git a/src/main/java/it/cavallium/dbengine/client/MappedSerializer.java b/src/main/java/it/cavallium/dbengine/client/MappedSerializer.java
index f29c5ff..5fcf3a4 100644
--- a/src/main/java/it/cavallium/dbengine/client/MappedSerializer.java
+++ b/src/main/java/it/cavallium/dbengine/client/MappedSerializer.java
@@ -19,7 +19,7 @@ public class MappedSerializer implements Serializer {
}
@Override
- public @NotNull DeserializationResult deserialize(@Nullable Send serialized) throws SerializationException {
+ public @NotNull DeserializationResult deserialize(@NotNull Send serialized) throws SerializationException {
try (serialized) {
var deserialized = serializer.deserialize(serialized);
return new DeserializationResult<>(keyMapper.map(deserialized.deserializedData()), deserialized.bytesRead());
@@ -27,7 +27,7 @@ public class MappedSerializer implements Serializer {
}
@Override
- public @Nullable Send serialize(@NotNull B deserialized) throws SerializationException {
+ public @NotNull Send serialize(@NotNull B deserialized) throws SerializationException {
return serializer.serialize(keyMapper.unmap(deserialized));
}
}
diff --git a/src/main/java/it/cavallium/dbengine/client/MappedSerializerFixedLength.java b/src/main/java/it/cavallium/dbengine/client/MappedSerializerFixedLength.java
index 2d970f8..e1caacb 100644
--- a/src/main/java/it/cavallium/dbengine/client/MappedSerializerFixedLength.java
+++ b/src/main/java/it/cavallium/dbengine/client/MappedSerializerFixedLength.java
@@ -19,7 +19,7 @@ public class MappedSerializerFixedLength implements SerializerFixedBinaryL
}
@Override
- public @NotNull DeserializationResult deserialize(@Nullable Send serialized) throws SerializationException {
+ public @NotNull DeserializationResult deserialize(@NotNull Send serialized) throws SerializationException {
try (serialized) {
var deserialized = fixedLengthSerializer.deserialize(serialized);
return new DeserializationResult<>(keyMapper.map(deserialized.deserializedData()), deserialized.bytesRead());
@@ -27,7 +27,7 @@ public class MappedSerializerFixedLength implements SerializerFixedBinaryL
}
@Override
- public @Nullable Send serialize(@NotNull B deserialized) throws SerializationException {
+ public @NotNull Send serialize(@NotNull B deserialized) throws SerializationException {
return fixedLengthSerializer.serialize(keyMapper.unmap(deserialized));
}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLEntry.java b/src/main/java/it/cavallium/dbengine/database/LLEntry.java
index 71c0318..e93552a 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLEntry.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLEntry.java
@@ -7,6 +7,7 @@ import io.net5.buffer.api.Send;
import io.net5.buffer.api.internal.ResourceSupport;
import java.util.StringJoiner;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
public class LLEntry extends ResourceSupport {
@NotNull
@@ -14,7 +15,7 @@ public class LLEntry extends ResourceSupport {
@NotNull
private final Buffer value;
- private LLEntry(Send key, Send value, Drop drop) {
+ private LLEntry(@NotNull Send key, @NotNull Send value, Drop drop) {
super(new LLEntry.CloseOnDrop(drop));
this.key = key.receive().makeReadOnly();
this.value = value.receive().makeReadOnly();
@@ -29,7 +30,7 @@ public class LLEntry extends ResourceSupport {
return true;
}
- public static LLEntry of(Send key, Send value) {
+ public static LLEntry of(@NotNull Send key, @NotNull Send value) {
return new LLEntry(key, value, d -> {});
}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java
index 1cd7699..c251e19 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java
@@ -1,5 +1,7 @@
package it.cavallium.dbengine.database;
+import static org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY;
+
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import io.net5.buffer.api.Buffer;
@@ -10,30 +12,25 @@ import io.net5.buffer.api.Send;
import io.net5.util.IllegalReferenceCountException;
import io.net5.util.internal.PlatformDependent;
import it.cavallium.dbengine.database.collections.DatabaseStage;
-import it.cavallium.dbengine.database.disk.LLIndexSearcher;
-import it.cavallium.dbengine.database.disk.LLLocalLuceneIndex;
import it.cavallium.dbengine.database.disk.MemorySegmentUtils;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.lucene.RandomSortField;
-import it.cavallium.dbengine.lucene.analyzer.WordAnalyzer;
-import it.cavallium.dbengine.lucene.searcher.LocalQueryParams;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
-import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.ToIntFunction;
-import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.FloatPoint;
@@ -42,24 +39,13 @@ import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
-import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.Term;
-import it.cavallium.dbengine.lucene.mlt.MultiMoreLikeThis;
-import org.apache.lucene.search.BooleanClause.Occur;
-import org.apache.lucene.search.BooleanQuery;
-import org.apache.lucene.search.ConstantScoreQuery;
-import org.apache.lucene.search.MatchAllDocsQuery;
-import org.apache.lucene.search.MatchNoDocsQuery;
-import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortedNumericSortField;
-import org.apache.lucene.search.similarities.ClassicSimilarity;
-import org.apache.lucene.search.similarities.Similarity;
-import org.apache.lucene.search.similarities.TFIDFSimilarity;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.RocksDB;
@@ -195,9 +181,9 @@ public class LLUtils {
return new it.cavallium.dbengine.database.LLKeyScore(hit.docId(), hit.score(), hit.key());
}
- public static String toStringSafe(Buffer key) {
+ public static String toStringSafe(@Nullable Buffer key) {
try {
- if (key.isAccessible()) {
+ if (key == null || key.isAccessible()) {
return toString(key);
} else {
return "(released)";
@@ -207,7 +193,7 @@ public class LLUtils {
}
}
- public static String toString(Buffer key) {
+ public static String toString(@Nullable Buffer key) {
if (key == null) {
return "null";
} else {
@@ -217,20 +203,35 @@ public class LLUtils {
if (iMax <= -1) {
return "[]";
} else {
- StringBuilder b = new StringBuilder();
- b.append('[');
+ StringBuilder arraySB = new StringBuilder();
+ StringBuilder asciiSB = new StringBuilder();
+ boolean isAscii = true;
+ arraySB.append('[');
int i = 0;
while (true) {
- b.append(key.getByte(startIndex + i));
+ var byteVal = key.getUnsignedByte(startIndex + i);
+ arraySB.append(byteVal);
+ if (isAscii) {
+ if (byteVal >= 32 && byteVal < 127) {
+ asciiSB.append((char) byteVal);
+ } else {
+ isAscii = false;
+ asciiSB = null;
+ }
+ }
if (i == iLimit) {
- b.append("…");
+ arraySB.append("…");
}
if (i == iMax || i == iLimit) {
- return b.append(']').toString();
+ if (isAscii) {
+ return asciiSB.insert(0, "\"").append("\"").toString();
+ } else {
+ return arraySB.append(']').toString();
+ }
}
- b.append(", ");
+ arraySB.append(", ");
++i;
}
}
@@ -279,7 +280,10 @@ public class LLUtils {
return true;
}
- public static byte[] toArray(Buffer key) {
+ public static byte[] toArray(@Nullable Buffer key) {
+ if (key == null) {
+ return EMPTY_BYTE_ARRAY;
+ }
byte[] array = new byte[key.readableBytes()];
key.copyInto(key.readerOffset(), array, 0, key.readableBytes());
return array;
@@ -355,7 +359,6 @@ public class LLUtils {
PlatformDependent.freeDirectBuffer(directBuffer);
directBuffer = null;
}
- buffer.close();
}
}
@@ -445,6 +448,22 @@ public class LLUtils {
return true;
}
+ public static Send empty(BufferAllocator allocator) {
+ try (var empty = CompositeBuffer.compose(allocator)) {
+ assert empty.readableBytes() == 0;
+ assert empty.capacity() == 0;
+ return empty.send();
+ }
+ }
+
+ public static Send copy(BufferAllocator allocator, Buffer buf) {
+ if (CompositeBuffer.isComposite(buf) && buf.capacity() == 0) {
+ return empty(allocator);
+ } else {
+ return buf.copy().send();
+ }
+ }
+
public static record DirectBuffer(@NotNull Send buffer, @NotNull ByteBuffer byteBuffer) {}
@NotNull
@@ -485,18 +504,23 @@ public class LLUtils {
);
}
assert buffer.isAccessible();
+ buffer.compact();
+ assert buffer.readerOffset() == 0;
AtomicLong nativeAddress = new AtomicLong(0);
if (buffer.countComponents() == 1) {
if (writable) {
if (buffer.countWritableComponents() == 1) {
buffer.forEachWritable(0, (i, c) -> {
+ assert c.writableNativeAddress() != 0;
nativeAddress.setPlain(c.writableNativeAddress());
return false;
});
}
} else {
- if (buffer.countReadableComponents() == 1) {
+ var readableComponents = buffer.countReadableComponents();
+ if (readableComponents == 1) {
buffer.forEachReadable(0, (i, c) -> {
+ assert c.readableNativeAddress() != 0;
nativeAddress.setPlain(c.readableNativeAddress());
return false;
});
@@ -512,7 +536,7 @@ public class LLUtils {
}
throw new IllegalStateException("Buffer is not direct");
}
- return MemorySegmentUtils.directBuffer(nativeAddress.getPlain(), buffer.capacity());
+ return MemorySegmentUtils.directBuffer(nativeAddress.getPlain(), writable ? buffer.capacity() : buffer.writerOffset());
}
public static Buffer fromByteArray(BufferAllocator alloc, byte[] array) {
@@ -534,22 +558,21 @@ public class LLUtils {
return buffer.receive();
}
- public static Buffer compositeBuffer(BufferAllocator alloc, Send buffer1, Send buffer2) {
+ @NotNull
+ public static Buffer compositeBuffer(BufferAllocator alloc,
+ @NotNull Send buffer1,
+ @NotNull Send buffer2) {
return CompositeBuffer.compose(alloc, buffer1, buffer2);
}
+ @NotNull
public static Buffer compositeBuffer(BufferAllocator alloc,
- Send buffer1,
- Send buffer2,
- Send buffer3) {
+ @NotNull Send buffer1,
+ @NotNull Send buffer2,
+ @NotNull Send buffer3) {
return CompositeBuffer.compose(alloc, buffer1, buffer2, buffer3);
}
- @SafeVarargs
- public static Buffer compositeBuffer(BufferAllocator alloc, Send... buffers) {
- return CompositeBuffer.compose(alloc, buffers);
- }
-
public static Mono resolveDelta(Mono> prev, UpdateReturnMode updateReturnMode) {
return prev.handle((delta, sink) -> {
switch (updateReturnMode) {
diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java
index 0f11eb0..6769af6 100644
--- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java
+++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java
@@ -2,8 +2,10 @@ package it.cavallium.dbengine.database.collections;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator;
+import io.net5.buffer.api.CompositeBuffer;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.LLDictionary;
+import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.Serializer.DeserializationResult;
import org.jetbrains.annotations.NotNull;
@@ -25,8 +27,8 @@ public class DatabaseEmpty {
}
@Override
- public @Nullable Send serialize(@NotNull Nothing deserialized) {
- return null;
+ public @NotNull Send serialize(@NotNull Nothing deserialized) {
+ return LLUtils.empty(bufferAllocator);
}
};
}
diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java
index 386326a..fd931c7 100644
--- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java
+++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -39,7 +40,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep valueSerializer;
protected DatabaseMapDictionary(LLDictionary dictionary,
- @Nullable Send prefixKey,
+ @NotNull Send prefixKey,
SerializerFixedBinaryLength keySuffixSerializer,
Serializer valueSerializer) {
// Do not retain or release or use the prefixKey here
@@ -50,7 +51,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep DatabaseMapDictionary simple(LLDictionary dictionary,
SerializerFixedBinaryLength keySerializer,
Serializer valueSerializer) {
- return new DatabaseMapDictionary<>(dictionary, null, keySerializer, valueSerializer);
+ return new DatabaseMapDictionary<>(dictionary, LLUtils.empty(dictionary.getAllocator()), keySerializer, valueSerializer);
}
public static DatabaseMapDictionary tail(LLDictionary dictionary,
@@ -60,13 +61,21 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep(dictionary, prefixKey, keySuffixSerializer, valueSerializer);
}
- private Buffer toKey(Send suffixKeyToSend) {
- var suffixKey = suffixKeyToSend.receive();
- assert suffixKeyConsistency(suffixKey.readableBytes());
- if (keyPrefix != null) {
- return LLUtils.compositeBuffer(dictionary.getAllocator(), keyPrefix.copy().send(), suffixKey.send());
- } else {
- return suffixKey;
+ private Send toKey(Send suffixKeyToSend) {
+ try (var suffixKey = suffixKeyToSend.receive()) {
+ assert suffixKeyConsistency(suffixKey.readableBytes());
+ if (keyPrefix.readableBytes() > 0) {
+ try (var result = LLUtils.compositeBuffer(dictionary.getAllocator(),
+ LLUtils.copy(dictionary.getAllocator(), keyPrefix),
+ suffixKey.send()
+ )) {
+ assert result.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
+ return result.send();
+ }
+ } else {
+ assert suffixKey.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
+ return suffixKey.send();
+ }
}
}
@@ -84,7 +93,12 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep>handle((entrySend, sink) -> {
try (var entry = entrySend.receive()) {
- var key = deserializeSuffix(stripPrefix(entry.getKey()));
+ T key;
+ try (var serializedKey = entry.getKey().receive()) {
+ removePrefix(serializedKey);
+ suffixKeyConsistency(serializedKey.readableBytes());
+ key = deserializeSuffix(serializedKey.send());
+ }
var value = valueSerializer.deserialize(entry.getValue()).deserializedData();
sink.next(Map.entry(key, value));
} catch (SerializationException ex) {
@@ -103,7 +117,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep {
try {
- sink.next(LLEntry.of(this.toKey(serializeSuffix(entry.getKey()).send()).send(),
+ sink.next(LLEntry.of(this.toKey(serializeSuffix(entry.getKey())),
valueSerializer.serialize(entry.getValue())).send());
} catch (SerializationException e) {
sink.error(e);
@@ -133,14 +147,14 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
return Mono.fromCallable(() ->
- new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix).send()).send(), valueSerializer));
+ new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix)), valueSerializer));
}
@Override
public Mono getValue(@Nullable CompositeSnapshot snapshot, T keySuffix, boolean existsAlmostCertainly) {
return dictionary
.get(resolveSnapshot(snapshot),
- Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send()),
+ Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix))),
existsAlmostCertainly
)
.handle((value, sink) -> deserializeValue(value, sink));
@@ -148,8 +162,8 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep putValue(T keySuffix, U value) {
- var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send());
- var valueMono = Mono.fromCallable(() -> valueSerializer.serialize(value));
+ var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix))).single();
+ var valueMono = Mono.fromCallable(() -> valueSerializer.serialize(value)).single();
return dictionary
.put(keyMono, valueMono, LLDictionaryResultType.VOID)
.doOnNext(Send::close)
@@ -166,7 +180,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep updater) {
- var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send());
+ var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
return dictionary
.update(keyMono, getSerializedUpdater(updater), updateReturnMode, existsAlmostCertainly)
.handle((value, sink) -> deserializeValue(value, sink));
@@ -176,7 +190,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> updateValueAndGetDelta(T keySuffix,
boolean existsAlmostCertainly,
SerializationFunction<@Nullable U, @Nullable U> updater) {
- var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send());
+ var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
return dictionary
.updateAndGetDelta(keyMono, getSerializedUpdater(updater), existsAlmostCertainly)
.transform(mono -> LLUtils.mapLLDelta(mono,
@@ -224,7 +238,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep putValueAndGetPrevious(T keySuffix, U value) {
- var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send());
+ var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
var valueMono = Mono.fromCallable(() -> valueSerializer.serialize(value));
return dictionary
.put(keyMono,
@@ -235,7 +249,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep putValueAndGetChanged(T keySuffix, U value) {
- var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send());
+ var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
var valueMono = Mono.fromCallable(() -> valueSerializer.serialize(value));
return dictionary
.put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE)
@@ -246,7 +260,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep remove(T keySuffix) {
- var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send());
+ var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
return dictionary
.remove(keyMono, LLDictionaryResultType.VOID)
.doOnNext(Send::close)
@@ -255,7 +269,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep removeAndGetPrevious(T keySuffix) {
- var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send());
+ var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
return dictionary
.remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE)
.handle((value, sink) -> deserializeValue(value, sink));
@@ -263,7 +277,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep removeAndGetStatus(T keySuffix) {
- var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send());
+ var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
return dictionary
.remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE_EXISTENCE)
.map(LLUtils::responseToBoolean);
@@ -274,7 +288,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep>>handle((keySuffix, sink) -> {
try {
- sink.next(Tuples.of(keySuffix, toKey(serializeSuffix(keySuffix).send()).send()));
+ sink.next(Tuples.of(keySuffix, toKey(serializeSuffix(keySuffix))));
} catch (SerializationException ex) {
sink.error(ex);
}
@@ -303,9 +317,10 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep serializeEntry(T key, U value) throws SerializationException {
- try (var serializedKey = toKey(serializeSuffix(key).send())) {
- try (var serializedValue = valueSerializer.serialize(value).receive()) {
- return LLEntry.of(serializedKey.send(), serializedValue.send()).send();
+ try (var serializedKey = toKey(serializeSuffix(key))) {
+ var serializedValueToReceive = valueSerializer.serialize(value);
+ try (var serializedValue = serializedValueToReceive.receive()) {
+ return LLEntry.of(serializedKey, serializedValue.send()).send();
}
}
}
@@ -343,7 +358,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep, X>>handle((entry, sink) -> {
try {
- sink.next(Tuples.of(serializeSuffix(entry.getT1()).send(), entry.getT2()));
+ sink.next(Tuples.of(serializeSuffix(entry.getT1()), entry.getT2()));
} catch (SerializationException ex) {
sink.error(ex);
}
@@ -374,16 +389,18 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep>> getAllStages(@Nullable CompositeSnapshot snapshot) {
return dictionary
.getRangeKeys(resolveSnapshot(snapshot), rangeMono)
- .handle((key, sink) -> {
- try (key) {
- try (var keySuffixWithExt = stripPrefix(key).receive()) {
- sink.next(Map.entry(deserializeSuffix(keySuffixWithExt.copy().send()),
- new DatabaseSingle<>(dictionary,
- toKey(keySuffixWithExt.send()).send(),
- valueSerializer
- )
- ));
- }
+ .handle((keyBufToReceive, sink) -> {
+ try (var keyBuf = keyBufToReceive.receive()) {
+ assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
+ // Remove prefix. Keep only the suffix and the ext
+ removePrefix(keyBuf);
+ suffixKeyConsistency(keyBuf.readableBytes());
+ sink.next(Map.entry(deserializeSuffix(keyBuf.copy().send()),
+ new DatabaseSingle<>(dictionary,
+ toKey(keyBuf.send()),
+ valueSerializer
+ )
+ ));
} catch (SerializationException ex) {
sink.error(ex);
}
@@ -396,8 +413,14 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep>handle((serializedEntryToReceive, sink) -> {
try (var serializedEntry = serializedEntryToReceive.receive()) {
- sink.next(Map.entry(deserializeSuffix(stripPrefix(serializedEntry.getKey())),
- valueSerializer.deserialize(serializedEntry.getValue()).deserializedData()));
+ try (var keyBuf = serializedEntry.getKey().receive()) {
+ assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
+ // Remove prefix. Keep only the suffix and the ext
+ removePrefix(keyBuf);
+ suffixKeyConsistency(keyBuf.readableBytes());
+ sink.next(Map.entry(deserializeSuffix(keyBuf.send()),
+ valueSerializer.deserialize(serializedEntry.getValue()).deserializedData()));
+ }
} catch (SerializationException e) {
sink.error(e);
}
@@ -418,7 +441,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep {
try {
- sink.next(LLEntry.of(toKey(serializeSuffix(entry.getKey()).send()).send(),
+ sink.next(LLEntry.of(toKey(serializeSuffix(entry.getKey())),
valueSerializer.serialize(entry.getValue())).send());
} catch (SerializationException e) {
sink.error(e);
diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java
index 7dc589a..2dff876 100644
--- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java
+++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java
@@ -2,7 +2,6 @@ package it.cavallium.dbengine.database.collections;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator;
-import io.net5.buffer.api.CompositeBuffer;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send;
import io.net5.util.IllegalReferenceCountException;
@@ -18,6 +17,8 @@ import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -29,6 +30,7 @@ public class DatabaseMapDictionaryDeep> implem
private final BufferAllocator alloc;
protected final SubStageGetter subStageGetter;
protected final SerializerFixedBinaryLength keySuffixSerializer;
+ @NotNull
protected final Buffer keyPrefix;
protected final int keyPrefixLength;
protected final int keySuffixLength;
@@ -90,27 +92,26 @@ public class DatabaseMapDictionaryDeep> implem
}
protected static Buffer zeroFillKeySuffixAndExt(BufferAllocator alloc,
- @Nullable Send prefixKeySend,
+ @NotNull Send prefixKeySend,
int prefixLength,
int suffixLength,
int extLength) {
- try (var result = prefixKeySend == null ? null : prefixKeySend.receive()) {
- if (result == null) {
- assert prefixLength == 0;
- var buf = alloc.allocate(prefixLength + suffixLength + extLength);
- buf.writerOffset(prefixLength + suffixLength + extLength);
- buf.fill((byte) 0);
- return buf;
- } else {
- assert result.readableBytes() == prefixLength;
- assert suffixLength > 0;
- assert extLength >= 0;
- result.ensureWritable(suffixLength + extLength, suffixLength + extLength, true);
- for (int i = 0; i < suffixLength + extLength; i++) {
- result.writeByte((byte) 0x0);
- }
- return result;
+ var result = prefixKeySend.receive();
+ if (result == null) {
+ assert prefixLength == 0;
+ var buf = alloc.allocate(prefixLength + suffixLength + extLength);
+ buf.writerOffset(prefixLength + suffixLength + extLength);
+ buf.fill((byte) 0);
+ return buf;
+ } else {
+ assert result.readableBytes() == prefixLength;
+ assert suffixLength > 0;
+ assert extLength >= 0;
+ result.ensureWritable(suffixLength + extLength, suffixLength + extLength, true);
+ for (int i = 0; i < suffixLength + extLength; i++) {
+ result.writeByte((byte) 0x0);
}
+ return result;
}
}
@@ -175,7 +176,7 @@ public class DatabaseMapDictionaryDeep> implem
public static DatabaseMapDictionaryDeep> simple(LLDictionary dictionary,
SerializerFixedBinaryLength keySerializer,
SubStageGetterSingle subStageGetter) {
- return new DatabaseMapDictionaryDeep<>(dictionary, null, keySerializer, subStageGetter, 0);
+ return new DatabaseMapDictionaryDeep<>(dictionary, LLUtils.empty(dictionary.getAllocator()), keySerializer, subStageGetter, 0);
}
public static > DatabaseMapDictionaryDeep deepTail(LLDictionary dictionary,
@@ -183,7 +184,7 @@ public class DatabaseMapDictionaryDeep> implem
int keyExtLength,
SubStageGetter subStageGetter) {
return new DatabaseMapDictionaryDeep<>(dictionary,
- null,
+ LLUtils.empty(dictionary.getAllocator()),
keySerializer,
subStageGetter,
keyExtLength
@@ -199,26 +200,26 @@ public class DatabaseMapDictionaryDeep> implem
}
protected DatabaseMapDictionaryDeep(LLDictionary dictionary,
- @Nullable Send prefixKeyToReceive,
+ @NotNull Send prefixKeyToReceive,
SerializerFixedBinaryLength keySuffixSerializer,
SubStageGetter subStageGetter,
int keyExtLength) {
- try (var prefixKey = prefixKeyToReceive == null ? null : prefixKeyToReceive.receive()) {
+ try (var prefixKey = prefixKeyToReceive.receive()) {
this.dictionary = dictionary;
this.alloc = dictionary.getAllocator();
this.subStageGetter = subStageGetter;
this.keySuffixSerializer = keySuffixSerializer;
- assert prefixKey == null || prefixKey.isAccessible();
- this.keyPrefixLength = prefixKey == null ? 0 : prefixKey.readableBytes();
+ assert prefixKey.isAccessible();
+ this.keyPrefixLength = prefixKey.readableBytes();
this.keySuffixLength = keySuffixSerializer.getSerializedBinaryLength();
this.keyExtLength = keyExtLength;
- Buffer firstKey = firstRangeKey(alloc, prefixKey == null ? null : prefixKey.copy().send(), keyPrefixLength,
+ Buffer firstKey = firstRangeKey(alloc, LLUtils.copy(alloc, prefixKey), keyPrefixLength,
keySuffixLength, keyExtLength);
try (firstKey) {
- var nextRangeKey = nextRangeKey(alloc, prefixKey == null ? null : prefixKey.copy().send(),
+ var nextRangeKey = nextRangeKey(alloc, LLUtils.copy(alloc, prefixKey),
keyPrefixLength, keySuffixLength, keyExtLength);
try (nextRangeKey) {
- assert prefixKey == null || prefixKey.isAccessible();
+ assert prefixKey.isAccessible();
assert keyPrefixLength == 0 || !LLUtils.equals(firstKey, nextRangeKey);
this.range = keyPrefixLength == 0 ? LLRange.all() : LLRange.of(firstKey.send(), nextRangeKey.send());
this.rangeMono = LLUtils.lazyRetainRange(this.range);
@@ -226,7 +227,7 @@ public class DatabaseMapDictionaryDeep> implem
}
}
- this.keyPrefix = prefixKey == null ? null : prefixKey.send().receive();
+ this.keyPrefix = prefixKey.send().receive();
}
}
@@ -246,21 +247,28 @@ public class DatabaseMapDictionaryDeep> implem
}
/**
- * Keep only suffix and ext
+ * Removes the prefix from the key
*/
- protected Send stripPrefix(Send keyToReceive) {
- try (var key = keyToReceive.receive()) {
- return key.copy(this.keyPrefixLength, key.readableBytes() - this.keyPrefixLength).send();
- }
+ protected void removePrefix(Buffer key) {
+ assert key.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
+ key.readerOffset(key.readerOffset() + this.keyPrefixLength).compact();
+ assert key.readableBytes() == keySuffixLength + keyExtLength;
}
/**
- * Add prefix to suffix
+ * Removes the ext from the key
*/
+ protected void removeExt(Buffer key) {
+ assert key.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
+ key.writerOffset(keyPrefixLength + keySuffixLength).compact();
+ assert key.readableBytes() == keyPrefixLength + keySuffixLength;
+ }
+
protected Send toKeyWithoutExt(Send suffixKeyToReceive) {
try (var suffixKey = suffixKeyToReceive.receive()) {
assert suffixKey.readableBytes() == keySuffixLength;
- try (Buffer result = LLUtils.compositeBuffer(alloc, keyPrefix.copy().send(), suffixKey.send())) {
+ try (var result = Objects.requireNonNull(LLUtils.compositeBuffer(alloc,
+ LLUtils.copy(alloc, keyPrefix), suffixKey.send()))) {
assert result.readableBytes() == keyPrefixLength + keySuffixLength;
return result.send();
}
@@ -287,8 +295,9 @@ public class DatabaseMapDictionaryDeep> implem
@Override
public Mono at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
+ var suffixKeyWithoutExt = Mono.fromCallable(() -> toKeyWithoutExt(serializeSuffix(keySuffix)));
return this.subStageGetter
- .subStage(dictionary, snapshot, Mono.fromCallable(() -> toKeyWithoutExt(serializeSuffix(keySuffix).send())))
+ .subStage(dictionary, snapshot, suffixKeyWithoutExt)
.transform(LLUtils::handleDiscard)
.doOnDiscard(DatabaseStage.class, DatabaseStage::release);
}
@@ -310,11 +319,10 @@ public class DatabaseMapDictionaryDeep> implem
.flatMapSequential(groupKeyWithoutExtSend_ -> Mono.using(
groupKeyWithoutExtSend_::receive,
groupKeyWithoutExtSend -> this.subStageGetter
- .subStage(dictionary, snapshot, getGroupKeyWithoutExt(groupKeyWithoutExtSend.copy().send()))
+ .subStage(dictionary, snapshot, Mono.fromCallable(() -> groupKeyWithoutExtSend.copy().send()))
.>handle((us, sink) -> {
try {
- sink.next(Map.entry(this.deserializeSuffix(getGroupSuffix(groupKeyWithoutExtSend.send())),
- us));
+ sink.next(Map.entry(this.deserializeSuffix(getGroupSuffix(groupKeyWithoutExtSend.send())), us));
} catch (SerializationException ex) {
sink.error(ex);
}
@@ -324,22 +332,22 @@ public class DatabaseMapDictionaryDeep> implem
.transform(LLUtils::handleDiscard);
}
- private Send getGroupSuffix(Send groupKeyWithoutExtSend) {
- try (var groupKeyWithoutExt = groupKeyWithoutExtSend.receive()) {
- try (var groupSuffix = this.stripPrefix(groupKeyWithoutExt.copy().send()).receive()) {
- assert subStageKeysConsistency(groupKeyWithoutExt.readableBytes() + keyExtLength);
- return groupSuffix.send();
- }
+ private Send getGroupSuffix(Send groupKeyWithoutExt) {
+ try (var buffer = groupKeyWithoutExt.receive()) {
+ assert subStageKeysConsistency(buffer.readableBytes() + keyExtLength);
+ this.removePrefix(buffer);
+ assert subStageKeysConsistency(keyPrefixLength + buffer.readableBytes() + keyExtLength);
+ return buffer.send();
}
}
- private Mono> getGroupKeyWithoutExt(Send groupKeyWithoutExtSend) {
- return Mono.fromCallable(() -> {
- try (var groupKeyWithoutExt = groupKeyWithoutExtSend.receive()) {
- assert subStageKeysConsistency(groupKeyWithoutExt.readableBytes() + keyExtLength);
- return groupKeyWithoutExt.send();
- }
- });
+ private Send getGroupWithoutExt(Send groupKeyWithExtSend) {
+ try (var buffer = groupKeyWithExtSend.receive()) {
+ assert subStageKeysConsistency(buffer.readableBytes());
+ this.removeExt(buffer);
+ assert subStageKeysConsistency(buffer.readableBytes() + keyExtLength);
+ return buffer.send();
+ }
}
private boolean subStageKeysConsistency(int totalKeyLength) {
@@ -383,7 +391,7 @@ public class DatabaseMapDictionaryDeep> implem
}
//todo: temporary wrapper. convert the whole class to buffers
- protected T deserializeSuffix(Send keySuffixToReceive) throws SerializationException {
+ protected T deserializeSuffix(@NotNull Send keySuffixToReceive) throws SerializationException {
try (var keySuffix = keySuffixToReceive.receive()) {
assert suffixKeyConsistency(keySuffix.readableBytes());
var result = keySuffixSerializer.deserialize(keySuffix.send());
@@ -393,11 +401,15 @@ public class DatabaseMapDictionaryDeep> implem
}
//todo: temporary wrapper. convert the whole class to buffers
- protected Buffer serializeSuffix(T keySuffix) throws SerializationException {
- Buffer suffixData = keySuffixSerializer.serialize(keySuffix).receive();
- assert suffixKeyConsistency(suffixData.readableBytes());
- assert keyPrefix.isAccessible();
- return suffixData;
+ @NotNull
+ protected Send serializeSuffix(T keySuffix) throws SerializationException {
+ try (var suffixDataToReceive = keySuffixSerializer.serialize(keySuffix)) {
+ try (Buffer suffixData = suffixDataToReceive.receive()) {
+ assert suffixKeyConsistency(suffixData.readableBytes());
+ assert keyPrefix.isAccessible();
+ return suffixData.send();
+ }
+ }
}
@Override
diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java
index 90f6e34..653cc35 100644
--- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java
+++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java
@@ -6,6 +6,7 @@ import io.net5.buffer.api.Send;
import it.cavallium.dbengine.client.BadBlock;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
+import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
@@ -58,7 +59,7 @@ public class DatabaseMapDictionaryHashed implements DatabaseStageMap keyHashSerializer) {
return new DatabaseMapDictionaryHashed<>(
dictionary,
- null,
+ LLUtils.empty(dictionary.getAllocator()),
keySerializer,
valueSerializer,
keyHashFunction,
diff --git a/src/main/java/it/cavallium/dbengine/database/collections/ValueWithHashSerializer.java b/src/main/java/it/cavallium/dbengine/database/collections/ValueWithHashSerializer.java
index f380b73..549852e 100644
--- a/src/main/java/it/cavallium/dbengine/database/collections/ValueWithHashSerializer.java
+++ b/src/main/java/it/cavallium/dbengine/database/collections/ValueWithHashSerializer.java
@@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.collections;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator;
+import io.net5.buffer.api.CompositeBuffer;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.serialization.SerializationException;
@@ -43,17 +44,9 @@ class ValueWithHashSerializer implements Serializer> {
}
@Override
- public @Nullable Send serialize(@NotNull Entry deserialized) throws SerializationException {
+ public @NotNull Send serialize(@NotNull Entry deserialized) throws SerializationException {
var keySuffix = keySuffixSerializer.serialize(deserialized.getKey());
var value = valueSerializer.serialize(deserialized.getValue());
- if (value == null && keySuffix == null) {
- return null;
- } else if (value == null) {
- return keySuffix;
- } else if (keySuffix == null) {
- return value;
- } else {
- return LLUtils.compositeBuffer(allocator, keySuffix, value).send();
- }
+ return LLUtils.compositeBuffer(allocator, keySuffix, value).send();
}
}
diff --git a/src/main/java/it/cavallium/dbengine/database/collections/ValuesSetSerializer.java b/src/main/java/it/cavallium/dbengine/database/collections/ValuesSetSerializer.java
index 749ba0f..330ba68 100644
--- a/src/main/java/it/cavallium/dbengine/database/collections/ValuesSetSerializer.java
+++ b/src/main/java/it/cavallium/dbengine/database/collections/ValuesSetSerializer.java
@@ -45,8 +45,8 @@ class ValuesSetSerializer implements Serializer> {
output.writeInt(deserialized.size());
for (X entry : deserialized) {
var serializedToReceive = entrySerializer.serialize(entry);
- if (serializedToReceive != null) {
- try (Buffer serialized = serializedToReceive.receive()) {
+ try (Buffer serialized = serializedToReceive.receive()) {
+ if (serialized.readableBytes() > 0) {
output.ensureWritable(serialized.readableBytes());
output.writeBytes(serialized);
}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java
index f4165da..8c53e41 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java
@@ -267,10 +267,23 @@ public class LLLocalDictionary implements LLDictionary {
stamp = 0;
}
try {
+ Buffer logKey;
if (logger.isTraceEnabled(MARKER_ROCKSDB)) {
- logger.trace(MARKER_ROCKSDB, "Reading {}", LLUtils.toStringSafe(key));
+ logKey = key.copy();
+ } else {
+ logKey = null;
+ }
+ try (logKey) {
+ var result = dbGet(cfh, resolveSnapshot(snapshot), key.send(), existsAlmostCertainly);
+ if (logger.isTraceEnabled(MARKER_ROCKSDB)) {
+ try (var result2 = result == null ? null : result.receive()) {
+ logger.trace(MARKER_ROCKSDB, "Reading {}: {}", LLUtils.toStringSafe(logKey), LLUtils.toString(result2));
+ return result2 == null ? null : result2.send();
+ }
+ } else {
+ return result;
+ }
}
- return dbGet(cfh, resolveSnapshot(snapshot), key.send(), existsAlmostCertainly);
} finally {
if (updateMode == UpdateMode.ALLOW) {
lock.unlockRead(stamp);
@@ -414,6 +427,8 @@ public class LLLocalDictionary implements LLDictionary {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called dbPut in a nonblocking thread");
}
+ assert key.isAccessible();
+ assert value.isAccessible();
if (databaseOptions.allowNettyDirect()) {
var keyNioBuffer = LLUtils.convertToReadableDirect(alloc, key.send());
try (var ignored1 = keyNioBuffer.buffer().receive()) {
@@ -592,6 +607,8 @@ public class LLLocalDictionary implements LLDictionary {
valueSend -> this.>runOnDb(() -> {
try (var key = keySend.receive()) {
try (var value = valueSend.receive()) {
+ assert key.isAccessible();
+ assert value.isAccessible();
StampedLock lock;
long stamp;
if (updateMode == UpdateMode.ALLOW) {
@@ -656,9 +673,6 @@ public class LLLocalDictionary implements LLDictionary {
stamp = 0;
}
try {
- if (logger.isTraceEnabled()) {
- logger.trace(MARKER_ROCKSDB, "Reading {}", LLUtils.toStringSafe(key));
- }
while (true) {
@Nullable Buffer prevData;
var prevDataHolder = existsAlmostCertainly ? null : new Holder();
@@ -682,19 +696,37 @@ public class LLLocalDictionary implements LLDictionary {
} else {
prevData = null;
}
+ if (logger.isTraceEnabled()) {
+ logger.trace(MARKER_ROCKSDB,
+ "Reading {}: {} (before update)",
+ LLUtils.toStringSafe(key),
+ LLUtils.toStringSafe(prevData)
+ );
+ }
try {
@Nullable Buffer newData;
try (Buffer prevDataToSendToUpdater = prevData == null ? null : prevData.copy()) {
- try (var newDataToReceive = updater.apply(
- prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send())) {
- if (newDataToReceive != null) {
- newData = newDataToReceive.receive();
- } else {
- newData = null;
+ try (var sentData = prevDataToSendToUpdater == null ? null
+ : prevDataToSendToUpdater.send()) {
+ try (var newDataToReceive = updater.apply(sentData)) {
+ if (newDataToReceive != null) {
+ newData = newDataToReceive.receive();
+ } else {
+ newData = null;
+ }
}
}
}
+ assert newData == null || newData.isAccessible();
try {
+ if (logger.isTraceEnabled()) {
+ logger.trace(MARKER_ROCKSDB,
+ "Updating {}. previous data: {}, updated data: {}",
+ LLUtils.toStringSafe(key),
+ LLUtils.toStringSafe(prevData),
+ LLUtils.toStringSafe(newData)
+ );
+ }
if (prevData != null && newData == null) {
//noinspection DuplicatedCode
if (updateMode == UpdateMode.ALLOW) {
@@ -709,7 +741,7 @@ public class LLLocalDictionary implements LLDictionary {
}
}
if (logger.isTraceEnabled()) {
- logger.trace(MARKER_ROCKSDB, "Deleting {}", LLUtils.toStringSafe(key));
+ logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
}
dbDelete(cfh, null, key.send());
} else if (newData != null
@@ -727,7 +759,11 @@ public class LLLocalDictionary implements LLDictionary {
}
}
if (logger.isTraceEnabled()) {
- logger.trace(MARKER_ROCKSDB, "Writing {}: {}", LLUtils.toStringSafe(key), LLUtils.toStringSafe(newData));
+ logger.trace(MARKER_ROCKSDB,
+ "Writing {}: {} (after update)",
+ LLUtils.toStringSafe(key),
+ LLUtils.toStringSafe(newData)
+ );
}
Buffer dataToPut;
if (updateReturnMode == UpdateReturnMode.GET_NEW_VALUE) {
@@ -779,7 +815,7 @@ public class LLLocalDictionary implements LLDictionary {
SerializationFunction<@Nullable Send, @Nullable Send> updater,
boolean existsAlmostCertainly) {
return Mono.usingWhen(keyMono,
- keySend -> this.runOnDb(() -> {
+ keySend -> runOnDb(() -> {
try (var key = keySend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called update in a nonblocking thread");
@@ -799,7 +835,7 @@ public class LLLocalDictionary implements LLDictionary {
}
try {
if (logger.isTraceEnabled()) {
- logger.trace(MARKER_ROCKSDB, "Reading {}", LLUtils.toStringSafe(key));
+ logger.trace(MARKER_ROCKSDB, "Reading {} (before update)", LLUtils.toStringSafe(key));
}
while (true) {
@Nullable Buffer prevData;
@@ -824,19 +860,37 @@ public class LLLocalDictionary implements LLDictionary {
} else {
prevData = null;
}
+ if (logger.isTraceEnabled()) {
+ logger.trace(MARKER_ROCKSDB,
+ "Read {}: {} (before update)",
+ LLUtils.toStringSafe(key),
+ LLUtils.toStringSafe(prevData)
+ );
+ }
try {
@Nullable Buffer newData;
try (Buffer prevDataToSendToUpdater = prevData == null ? null : prevData.copy()) {
- try (var newDataToReceive = updater.apply(
- prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send())) {
- if (newDataToReceive != null) {
- newData = newDataToReceive.receive();
- } else {
- newData = null;
+ try (var sentData = prevDataToSendToUpdater == null ? null
+ : prevDataToSendToUpdater.send()) {
+ try (var newDataToReceive = updater.apply(sentData)) {
+ if (newDataToReceive != null) {
+ newData = newDataToReceive.receive();
+ } else {
+ newData = null;
+ }
}
}
}
+ assert newData == null || newData.isAccessible();
try {
+ if (logger.isTraceEnabled()) {
+ logger.trace(MARKER_ROCKSDB,
+ "Updating {}. previous data: {}, updated data: {}",
+ LLUtils.toStringSafe(key),
+ LLUtils.toStringSafe(prevData),
+ LLUtils.toStringSafe(newData)
+ );
+ }
if (prevData != null && newData == null) {
//noinspection DuplicatedCode
if (updateMode == UpdateMode.ALLOW) {
@@ -851,7 +905,7 @@ public class LLLocalDictionary implements LLDictionary {
}
}
if (logger.isTraceEnabled()) {
- logger.trace(MARKER_ROCKSDB, "Deleting {}", LLUtils.toStringSafe(key));
+ logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
}
dbDelete(cfh, null, key.send());
} else if (newData != null
@@ -869,8 +923,11 @@ public class LLLocalDictionary implements LLDictionary {
}
}
if (logger.isTraceEnabled()) {
- logger.trace(MARKER_ROCKSDB, "Writing {}: {}",
- LLUtils.toStringSafe(key), LLUtils.toStringSafe(newData));
+ logger.trace(MARKER_ROCKSDB,
+ "Writing {}: {} (after update)",
+ LLUtils.toStringSafe(key),
+ LLUtils.toStringSafe(newData)
+ );
}
assert key.isAccessible();
assert newData.isAccessible();
@@ -986,18 +1043,24 @@ public class LLLocalDictionary implements LLDictionary {
stamp = 0;
}
try {
- if (logger.isTraceEnabled()) {
- logger.trace(MARKER_ROCKSDB, "Reading {}", LLUtils.toArray(key));
- }
var data = new Holder();
+ Buffer bufferResult;
if (db.keyMayExist(cfh, LLUtils.toArray(key), data)) {
if (data.getValue() != null) {
- return LLUtils.fromByteArray(alloc, data.getValue()).send();
+ bufferResult = LLUtils.fromByteArray(alloc, data.getValue());
} else {
- return dbGet(cfh, null, key.send(), true);
+ try (var bufferResultToReceive = dbGet(cfh, null, key.send(), true)) {
+ bufferResult = bufferResultToReceive == null ? null : bufferResultToReceive.receive();
+ }
}
} else {
- return null;
+ bufferResult = null;
+ }
+ try (bufferResult) {
+ if (logger.isTraceEnabled()) {
+ logger.trace(MARKER_ROCKSDB, "Reading {}: {}", LLUtils.toStringSafe(key), LLUtils.toStringSafe(bufferResult));
+ }
+ return bufferResult == null ? null : bufferResult.send();
}
} finally {
if (updateMode == UpdateMode.ALLOW) {
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/MemorySegmentUtils.java b/src/main/java/it/cavallium/dbengine/database/disk/MemorySegmentUtils.java
index 608404e..c283149 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/MemorySegmentUtils.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/MemorySegmentUtils.java
@@ -124,6 +124,6 @@ public class MemorySegmentUtils {
}
public static String getSuggestedArgs() {
- return "--enable-preview --add-modules jdk.incubator.foreign -Dforeign.restricted=permit --enable-native-access";
+ return "--enable-preview --add-modules jdk.incubator.foreign -Dforeign.restricted=permit --enable-native-access=ALL-UNNAMED";
}
}
diff --git a/src/main/java/it/cavallium/dbengine/database/serialization/Serializer.java b/src/main/java/it/cavallium/dbengine/database/serialization/Serializer.java
index cd31dc2..4ffb437 100644
--- a/src/main/java/it/cavallium/dbengine/database/serialization/Serializer.java
+++ b/src/main/java/it/cavallium/dbengine/database/serialization/Serializer.java
@@ -4,6 +4,7 @@ import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.LLUtils;
+import it.cavallium.dbengine.netty.NullableBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import org.jetbrains.annotations.NotNull;
@@ -13,16 +14,16 @@ public interface Serializer {
record DeserializationResult(T deserializedData, int bytesRead) {}
- @NotNull DeserializationResult deserialize(@Nullable Send serialized) throws SerializationException;
+ @NotNull DeserializationResult deserialize(@NotNull Send serialized) throws SerializationException;
- @Nullable Send serialize(@NotNull A deserialized) throws SerializationException;
+ @NotNull Send serialize(@NotNull A deserialized) throws SerializationException;
Serializer> NOOP_SERIALIZER = new Serializer<>() {
@Override
- public @NotNull DeserializationResult> deserialize(@Nullable Send serialized) {
- try (var serializedBuf = serialized == null ? null : serialized.receive()) {
- var readableBytes = serializedBuf == null ? 0 : serializedBuf.readableBytes();
- return new DeserializationResult<>(serializedBuf == null ? null : serializedBuf.send(), readableBytes);
+ public @NotNull DeserializationResult> deserialize(@NotNull Send serialized) {
+ try (var serializedBuf = serialized.receive()) {
+ var readableBytes = serializedBuf.readableBytes();
+ return new DeserializationResult<>(serializedBuf.send(), readableBytes);
}
}
diff --git a/src/main/java/it/cavallium/dbengine/database/serialization/SerializerFixedBinaryLength.java b/src/main/java/it/cavallium/dbengine/database/serialization/SerializerFixedBinaryLength.java
index d744aad..78fe656 100644
--- a/src/main/java/it/cavallium/dbengine/database/serialization/SerializerFixedBinaryLength.java
+++ b/src/main/java/it/cavallium/dbengine/database/serialization/SerializerFixedBinaryLength.java
@@ -17,10 +17,7 @@ public interface SerializerFixedBinaryLength extends Serializer {
static SerializerFixedBinaryLength> noop(int length) {
return new SerializerFixedBinaryLength<>() {
@Override
- public @NotNull DeserializationResult> deserialize(@Nullable Send serialized) {
- if (length == 0 && serialized == null) {
- return new DeserializationResult<>(null, 0);
- }
+ public @NotNull DeserializationResult> deserialize(@NotNull Send serialized) {
Objects.requireNonNull(serialized);
try (var buf = serialized.receive()) {
if (buf.readableBytes() != getSerializedBinaryLength()) {
@@ -55,12 +52,8 @@ public interface SerializerFixedBinaryLength extends Serializer {
static SerializerFixedBinaryLength utf8(BufferAllocator allocator, int length) {
return new SerializerFixedBinaryLength<>() {
@Override
- public @NotNull DeserializationResult deserialize(@Nullable Send serializedToReceive)
+ public @NotNull DeserializationResult deserialize(@NotNull Send serializedToReceive)
throws SerializationException {
- if (length == 0 && serializedToReceive == null) {
- return new DeserializationResult<>(null, 0);
- }
- Objects.requireNonNull(serializedToReceive);
try (var serialized = serializedToReceive.receive()) {
if (serialized.readableBytes() != getSerializedBinaryLength()) {
throw new SerializationException(
@@ -78,7 +71,9 @@ public interface SerializerFixedBinaryLength extends Serializer {
// UTF-8 uses max. 3 bytes per char, so calculate the worst case.
try (Buffer buf = allocator.allocate(LLUtils.utf8MaxBytes(deserialized))) {
assert buf.isAccessible();
- buf.writeBytes(deserialized.getBytes(StandardCharsets.UTF_8));
+ var bytes = deserialized.getBytes(StandardCharsets.UTF_8);
+ buf.ensureWritable(bytes.length);
+ buf.writeBytes(bytes);
if (buf.readableBytes() != getSerializedBinaryLength()) {
throw new SerializationException("Fixed serializer with " + getSerializedBinaryLength()
+ " bytes has tried to serialize an element with "
@@ -99,10 +94,7 @@ public interface SerializerFixedBinaryLength extends Serializer {
static SerializerFixedBinaryLength intSerializer(BufferAllocator allocator) {
return new SerializerFixedBinaryLength<>() {
@Override
- public @NotNull DeserializationResult deserialize(@Nullable Send serializedToReceive) {
- if (getSerializedBinaryLength() == 0 && serializedToReceive == null) {
- return new DeserializationResult<>(null, 0);
- }
+ public @NotNull DeserializationResult deserialize(@NotNull Send serializedToReceive) {
Objects.requireNonNull(serializedToReceive);
try (var serialized = serializedToReceive.receive()) {
if (serialized.readableBytes() != getSerializedBinaryLength()) {
@@ -131,10 +123,7 @@ public interface SerializerFixedBinaryLength extends Serializer {
static SerializerFixedBinaryLength longSerializer(BufferAllocator allocator) {
return new SerializerFixedBinaryLength<>() {
@Override
- public @NotNull DeserializationResult deserialize(@Nullable Send serializedToReceive) {
- if (getSerializedBinaryLength() == 0 && serializedToReceive == null) {
- return new DeserializationResult<>(null, 0);
- }
+ public @NotNull DeserializationResult deserialize(@NotNull Send serializedToReceive) {
Objects.requireNonNull(serializedToReceive);
try (var serialized = serializedToReceive.receive()) {
if (serialized.readableBytes() != getSerializedBinaryLength()) {
diff --git a/src/main/java/it/cavallium/dbengine/netty/NullableBuffer.java b/src/main/java/it/cavallium/dbengine/netty/NullableBuffer.java
new file mode 100644
index 0000000..c1e3eef
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/netty/NullableBuffer.java
@@ -0,0 +1,74 @@
+package it.cavallium.dbengine.netty;
+
+import io.net5.buffer.api.Buffer;
+import io.net5.buffer.api.Drop;
+import io.net5.buffer.api.Owned;
+import io.net5.buffer.api.Send;
+import io.net5.buffer.api.internal.ResourceSupport;
+import it.cavallium.dbengine.client.SearchResult;
+import org.jetbrains.annotations.Nullable;
+
+public class NullableBuffer extends ResourceSupport {
+
+ @Nullable
+ private Buffer buffer;
+
+ public NullableBuffer(@Nullable Buffer buffer, Drop drop) {
+ super(new CloseOnDrop(drop));
+ this.buffer = buffer == null ? null : buffer.send().receive();
+ }
+
+ public NullableBuffer(@Nullable Send buffer, Drop drop) {
+ super(new CloseOnDrop(drop));
+ this.buffer = buffer == null ? null : buffer.receive();
+ }
+
+ @Nullable
+ public Buffer buf() {
+ return buffer;
+ }
+
+ @Nullable
+ public Send sendBuf() {
+ return buffer == null ? null : buffer.send();
+ }
+
+ @Override
+ protected RuntimeException createResourceClosedException() {
+ return new IllegalStateException("Closed");
+ }
+
+ @Override
+ protected Owned prepareSend() {
+ var buffer = this.buffer == null ? null : this.buffer.send();
+ makeInaccessible();
+ return drop -> new NullableBuffer(buffer, drop);
+ }
+
+ private void makeInaccessible() {
+ this.buffer = null;
+ }
+
+ private static class CloseOnDrop implements Drop {
+
+ private final Drop delegate;
+
+ public CloseOnDrop(Drop drop) {
+ this.delegate = drop;
+ }
+
+ @Override
+ public void drop(NullableBuffer obj) {
+ try {
+ if (obj.buffer != null) {
+ if (obj.buffer.isAccessible()) {
+ obj.buffer.close();
+ }
+ }
+ delegate.drop(obj);
+ } finally {
+ obj.makeInaccessible();
+ }
+ }
+ }
+}
diff --git a/src/test/java/it/cavallium/dbengine/DbTestUtils.java b/src/test/java/it/cavallium/dbengine/DbTestUtils.java
index afff40e..c16b139 100644
--- a/src/test/java/it/cavallium/dbengine/DbTestUtils.java
+++ b/src/test/java/it/cavallium/dbengine/DbTestUtils.java
@@ -32,6 +32,7 @@ import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
public class DbTestUtils {
@@ -42,6 +43,38 @@ public class DbTestUtils {
return "0123456789".repeat(1024);
}
+ public static void run(Flux> publisher) {
+ publisher.subscribeOn(Schedulers.immediate()).blockLast();
+ }
+
+ public static void runVoid(Mono publisher) {
+ publisher.then().subscribeOn(Schedulers.immediate()).block();
+ }
+
+ public static T run(Mono publisher) {
+ return publisher.subscribeOn(Schedulers.immediate()).block();
+ }
+
+ public static T run(boolean shouldFail, Mono publisher) {
+ return publisher.subscribeOn(Schedulers.immediate()).transform(mono -> {
+ if (shouldFail) {
+ return mono.onErrorResume(ex -> Mono.empty());
+ } else {
+ return mono;
+ }
+ }).block();
+ }
+
+ public static void runVoid(boolean shouldFail, Mono publisher) {
+ publisher.then().subscribeOn(Schedulers.immediate()).transform(mono -> {
+ if (shouldFail) {
+ return mono.onErrorResume(ex -> Mono.empty());
+ } else {
+ return mono;
+ }
+ }).block();
+ }
+
public static record TestAllocator(PooledBufferAllocator allocator) {}
public static TestAllocator newAllocator() {
diff --git a/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java b/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java
index f8ef0d1..f56ce1d 100644
--- a/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java
+++ b/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java
@@ -19,15 +19,20 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import org.warp.commonutils.log.Logger;
+import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import reactor.test.StepVerifier.Step;
+import reactor.test.util.TestLogger;
+import reactor.util.Loggers;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
public abstract class TestDictionaryMap {
+ private static final Logger log = LoggerFactory.getLogger(TestDictionaryMap.class);
private TestAllocator allocator;
private boolean checkLeaks = true;
@@ -97,21 +102,25 @@ public abstract class TestDictionaryMap {
@ParameterizedTest
@MethodSource("provideArgumentsPut")
public void testPut(MapType mapType, UpdateMode updateMode, String key, String value, boolean shouldFail) {
- var stpVer = StepVerifier
- .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode)
- .map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5))
- .flatMap(map -> map
- .putValue(key, value)
- .then(map.getValue(null, key))
- .doAfterTerminate(map::release)
- )
- ));
- if (shouldFail) {
- this.checkLeaks = false;
- stpVer.verifyError();
- } else {
- stpVer.expectNext(value).verifyComplete();
- }
+ var gen = getTempDbGenerator();
+ var db = run(gen.openTempDb(allocator));
+ var dict = run(tempDictionary(db.db(), updateMode));
+ var map = tempDatabaseMapDictionaryMap(dict, mapType, 5);
+
+ runVoid(shouldFail, map.putValue(key, value));
+
+ var resultingMapSize = run(map.leavesCount(null, false));
+ Assertions.assertEquals(shouldFail ? 0 : 1, resultingMapSize);
+
+ var resultingMap = run(map.get(null));
+ Assertions.assertEquals(shouldFail ? null : Map.of(key, value), resultingMap);
+
+ runVoid(map.close());
+ map.release();
+
+ //if (shouldFail) this.checkLeaks = false;
+
+ gen.closeTempDb(db);
}
@ParameterizedTest
@@ -257,26 +266,50 @@ public abstract class TestDictionaryMap {
.map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5))
.flatMapMany(map -> Flux
.concat(
- map.updateValue(key, old -> {
- assert old == null;
- return "error?";
- }).then(map.getValue(null, key)),
- map.updateValue(key, false, old -> {
- assert Objects.equals(old, "error?");
- return "error?";
- }).then(map.getValue(null, key)),
- map.updateValue(key, true, old -> {
- assert Objects.equals(old, "error?");
- return "error?";
- }).then(map.getValue(null, key)),
- map.updateValue(key, true, old -> {
- assert Objects.equals(old, "error?");
- return value;
- }).then(map.getValue(null, key)),
- map.updateValue(key, true, old -> {
- assert Objects.equals(old, value);
- return value;
- }).then(map.getValue(null, key))
+ Mono
+ .fromRunnable(() -> log.debug("1. Updating value: {}", key))
+ .then(map.updateValue(key, old -> {
+ assert old == null;
+ return "error?";
+ }))
+ .doOnSuccess(s -> log.debug("1. Getting value: {}", key))
+ .then(map.getValue(null, key)),
+
+ Mono
+ .fromRunnable(() -> log.debug("2. Updating value: {}", key))
+ .then(map.updateValue(key, false, old -> {
+ assert Objects.equals(old, "error?");
+ return "error?";
+ }))
+ .doOnSuccess(s -> log.debug("2. Getting value: {}", key))
+ .then(map.getValue(null, key)),
+
+ Mono
+ .fromRunnable(() -> log.debug("3. Updating value: {}", key))
+ .then(map.updateValue(key, true, old -> {
+ assert Objects.equals(old, "error?");
+ return "error?";
+ }))
+ .doOnSuccess(s -> log.debug("3. Getting value: {}", key))
+ .then(map.getValue(null, key)),
+
+ Mono
+ .fromRunnable(() -> log.debug("4. Updating value: {}", key))
+ .then(map.updateValue(key, true, old -> {
+ assert Objects.equals(old, "error?");
+ return value;
+ }))
+ .doOnSuccess(s -> log.debug("4. Getting value: {}", key))
+ .then(map.getValue(null, key)),
+
+ Mono
+ .fromRunnable(() -> log.debug("5. Updating value: {}", key))
+ .then(map.updateValue(key, true, old -> {
+ assert Objects.equals(old, value);
+ return value;
+ }))
+ .doOnSuccess(s -> log.debug("5. Getting value: {}", key))
+ .then(map.getValue(null, key))
)
.doAfterTerminate(map::release)
)
diff --git a/src/test/java/it/cavallium/dbengine/TestLLDictionary.java b/src/test/java/it/cavallium/dbengine/TestLLDictionary.java
new file mode 100644
index 0000000..05fded5
--- /dev/null
+++ b/src/test/java/it/cavallium/dbengine/TestLLDictionary.java
@@ -0,0 +1,311 @@
+package it.cavallium.dbengine;
+
+import static it.cavallium.dbengine.DbTestUtils.destroyAllocator;
+import static it.cavallium.dbengine.DbTestUtils.ensureNoLeaks;
+import static it.cavallium.dbengine.DbTestUtils.newAllocator;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import io.net5.buffer.api.Buffer;
+import io.net5.buffer.api.Send;
+import it.cavallium.dbengine.DbTestUtils.TempDb;
+import it.cavallium.dbengine.DbTestUtils.TestAllocator;
+import it.cavallium.dbengine.database.LLDictionary;
+import it.cavallium.dbengine.database.LLDictionaryResultType;
+import it.cavallium.dbengine.database.LLKeyValueDatabase;
+import it.cavallium.dbengine.database.LLRange;
+import it.cavallium.dbengine.database.LLUtils;
+import it.cavallium.dbengine.database.UpdateMode;
+import it.cavallium.dbengine.database.UpdateReturnMode;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.stream.Stream;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.warp.commonutils.log.Logger;
+import org.warp.commonutils.log.LoggerFactory;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+public abstract class TestLLDictionary {
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+ private static final Mono> RANGE_ALL = Mono.fromCallable(() -> LLRange.all().send());
+ private TestAllocator allocator;
+ private TempDb tempDb;
+ private LLKeyValueDatabase db;
+
+ protected abstract TemporaryDbGenerator getTempDbGenerator();
+
+ @BeforeEach
+ public void beforeEach() {
+ this.allocator = newAllocator();
+ ensureNoLeaks(allocator.allocator(), false, false);
+ tempDb = Objects.requireNonNull(getTempDbGenerator().openTempDb(allocator).block(), "TempDB");
+ db = tempDb.db();
+ }
+
+ @AfterEach
+ public void afterEach() {
+ getTempDbGenerator().closeTempDb(tempDb).block();
+ ensureNoLeaks(allocator.allocator(), true, false);
+ destroyAllocator(allocator);
+ }
+
+ public static Stream provideArguments() {
+ return Arrays.stream(UpdateMode.values()).map(Arguments::of);
+ }
+
+ public static Stream providePutArguments() {
+ var updateModes = Arrays.stream(UpdateMode.values());
+ return updateModes.flatMap(updateMode -> {
+ var resultTypes = Arrays.stream(LLDictionaryResultType.values());
+ return resultTypes.map(resultType -> Arguments.of(updateMode, resultType));
+ });
+ }
+
+ public static Stream provideUpdateArguments() {
+ var updateModes = Arrays.stream(UpdateMode.values());
+ return updateModes.flatMap(updateMode -> {
+ var resultTypes = Arrays.stream(UpdateReturnMode.values());
+ return resultTypes.map(resultType -> Arguments.of(updateMode, resultType));
+ });
+ }
+
+ private LLDictionary getDict(UpdateMode updateMode) {
+ var dict = DbTestUtils.tempDictionary(db, updateMode).blockOptional().orElseThrow();
+ var key1 = Mono.fromCallable(() -> fromString("test-key-1"));
+ var key2 = Mono.fromCallable(() -> fromString("test-key-2"));
+ var key3 = Mono.fromCallable(() -> fromString("test-key-3"));
+ var key4 = Mono.fromCallable(() -> fromString("test-key-4"));
+ var value = Mono.fromCallable(() -> fromString("test-value"));
+ dict.put(key1, value, LLDictionaryResultType.VOID).block();
+ dict.put(key2, value, LLDictionaryResultType.VOID).block();
+ dict.put(key3, value, LLDictionaryResultType.VOID).block();
+ dict.put(key4, value, LLDictionaryResultType.VOID).block();
+ return dict;
+ }
+
+ private Send fromString(String s) {
+ var sb = s.getBytes(StandardCharsets.UTF_8);
+ try (var b = db.getAllocator().allocate(sb.length + 3 + 13)) {
+ assert b.writerOffset() == 0;
+ assert b.readerOffset() == 0;
+ b.writerOffset(3).writeBytes(sb);
+ b.readerOffset(3);
+ assert b.readableBytes() == sb.length;
+
+ var part1 = b.split();
+
+ return LLUtils.compositeBuffer(db.getAllocator(), part1.send(), b.send()).send();
+ }
+ }
+
+ private String toString(Send b) {
+ try (var bb = b.receive()) {
+ byte[] data = new byte[bb.readableBytes()];
+ bb.copyInto(bb.readerOffset(), data, 0, data.length);
+ return new String(data, StandardCharsets.UTF_8);
+ }
+ }
+
+ private void run(Flux> publisher) {
+ publisher.subscribeOn(Schedulers.immediate()).blockLast();
+ }
+
+ private void runVoid(Mono publisher) {
+ publisher.then().subscribeOn(Schedulers.immediate()).block();
+ }
+
+ private T run(Mono publisher) {
+ return publisher.subscribeOn(Schedulers.immediate()).block();
+ }
+
+ private T run(boolean shouldFail, Mono publisher) {
+ return publisher.subscribeOn(Schedulers.immediate()).transform(mono -> {
+ if (shouldFail) {
+ return mono.onErrorResume(ex -> Mono.empty());
+ } else {
+ return mono;
+ }
+ }).block();
+ }
+
+ private void runVoid(boolean shouldFail, Mono publisher) {
+ publisher.then().subscribeOn(Schedulers.immediate()).transform(mono -> {
+ if (shouldFail) {
+ return mono.onErrorResume(ex -> Mono.empty());
+ } else {
+ return mono;
+ }
+ }).block();
+ }
+
+ @Test
+ public void testNoOp() {
+ }
+
+ @Test
+ public void testNoOpAllocation() {
+ for (int i = 0; i < 10; i++) {
+ var a = allocator.allocator().allocate(i * 512);
+ a.send().receive().close();
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideArguments")
+ public void testGetDict(UpdateMode updateMode) {
+ var dict = getDict(updateMode);
+ Assertions.assertNotNull(dict);
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideArguments")
+ public void testGetColumnName(UpdateMode updateMode) {
+ var dict = getDict(updateMode);
+ Assertions.assertEquals("hash_map_testmap", dict.getColumnName());
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideArguments")
+ public void testGetAllocator(UpdateMode updateMode) {
+ var dict = getDict(updateMode);
+ var alloc = dict.getAllocator();
+ Assertions.assertEquals(alloc, alloc);
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideArguments")
+ public void testGet(UpdateMode updateMode) {
+ var dict = getDict(updateMode);
+ var keyEx = Mono.fromCallable(() -> fromString("test-key-1"));
+ var keyNonEx = Mono.fromCallable(() -> fromString("test-nonexistent"));
+ Assertions.assertEquals("test-value", run(dict.get(null, keyEx).map(this::toString).transform(LLUtils::handleDiscard)));
+ Assertions.assertEquals("test-value", run(dict.get(null, keyEx, true).map(this::toString).transform(LLUtils::handleDiscard)));
+ Assertions.assertEquals("test-value", run(dict.get(null, keyEx, false).map(this::toString).transform(LLUtils::handleDiscard)));
+ Assertions.assertEquals((String) null, run(dict.get(null, keyNonEx).map(this::toString).transform(LLUtils::handleDiscard)));
+ Assertions.assertEquals((String) null, run(dict.get(null, keyNonEx, true).map(this::toString).transform(LLUtils::handleDiscard)));
+ Assertions.assertEquals((String) null, run(dict.get(null, keyNonEx, false).map(this::toString).transform(LLUtils::handleDiscard)));
+ }
+
+ @ParameterizedTest
+ @MethodSource("providePutArguments")
+ public void testPutExisting(UpdateMode updateMode, LLDictionaryResultType resultType) {
+ var dict = getDict(updateMode);
+ var keyEx = Mono.fromCallable(() -> fromString("test-key-1"));
+ var value = Mono.fromCallable(() -> fromString("test-value"));
+
+ var beforeSize = run(dict.sizeRange(null, RANGE_ALL, false));
+
+ runVoid(dict.put(keyEx, value, resultType).then().doOnDiscard(Send.class, Send::close));
+
+ var afterSize = run(dict.sizeRange(null, RANGE_ALL, false));
+ Assertions.assertEquals(0, afterSize - beforeSize);
+ }
+
+ @ParameterizedTest
+ @MethodSource("providePutArguments")
+ public void testPutNew(UpdateMode updateMode, LLDictionaryResultType resultType) {
+ var dict = getDict(updateMode);
+ var keyNonEx = Mono.fromCallable(() -> fromString("test-nonexistent"));
+ var value = Mono.fromCallable(() -> fromString("test-value"));
+
+ var beforeSize = run(dict.sizeRange(null, RANGE_ALL, false));
+
+ runVoid(dict.put(keyNonEx, value, resultType).then().doOnDiscard(Send.class, Send::close));
+
+ var afterSize = run(dict.sizeRange(null, Mono.fromCallable(() -> LLRange.all().send()), false));
+ Assertions.assertEquals(1, afterSize - beforeSize);
+
+ Assertions.assertTrue(run(dict.getRangeKeys(null, RANGE_ALL).map(this::toString).collectList()).contains("test-nonexistent"));
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideArguments")
+ public void testGetUpdateMode(UpdateMode updateMode) {
+ var dict = getDict(updateMode);
+ assertEquals(updateMode, run(dict.getUpdateMode()));
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideUpdateArguments")
+ public void testUpdateExisting(UpdateMode updateMode, UpdateReturnMode updateReturnMode) {
+ var dict = getDict(updateMode);
+ var keyEx = Mono.fromCallable(() -> fromString("test-key-1"));
+ var beforeSize = run(dict.sizeRange(null, RANGE_ALL, false));
+ long afterSize;
+ runVoid(updateMode == UpdateMode.DISALLOW,
+ dict.update(keyEx, old -> fromString("test-value"), updateReturnMode, true).then().transform(LLUtils::handleDiscard)
+ );
+ afterSize = run(dict.sizeRange(null, RANGE_ALL, false));
+ assertEquals(0, afterSize - beforeSize);
+ runVoid(updateMode == UpdateMode.DISALLOW,
+ dict.update(keyEx, old -> fromString("test-value"), updateReturnMode, false).then().transform(LLUtils::handleDiscard)
+ );
+ afterSize = run(dict.sizeRange(null, RANGE_ALL, false));
+ assertEquals(0, afterSize - beforeSize);
+ runVoid(updateMode == UpdateMode.DISALLOW,
+ dict.update(keyEx, old -> fromString("test-value"), updateReturnMode).then().transform(LLUtils::handleDiscard)
+ );
+ afterSize = run(dict.sizeRange(null, RANGE_ALL, false));
+ assertEquals(0, afterSize - beforeSize);
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideUpdateArguments")
+ public void testUpdateNew(UpdateMode updateMode, UpdateReturnMode updateReturnMode) {
+ int expected = updateMode == UpdateMode.DISALLOW ? 0 : 1;
+ var dict = getDict(updateMode);
+ var keyNonEx = Mono.fromCallable(() -> fromString("test-nonexistent"));
+ var beforeSize = run(dict.sizeRange(null, RANGE_ALL, false));
+ long afterSize;
+ runVoid(updateMode == UpdateMode.DISALLOW,
+ dict.update(keyNonEx, old -> fromString("test-value"), updateReturnMode, true).then().transform(LLUtils::handleDiscard)
+ );
+ afterSize = run(dict.sizeRange(null, RANGE_ALL, false));
+ assertEquals(expected, afterSize - beforeSize);
+ runVoid(updateMode == UpdateMode.DISALLOW,
+ dict.update(keyNonEx, old -> fromString("test-value"), updateReturnMode, false).then().transform(LLUtils::handleDiscard)
+ );
+ afterSize = run(dict.sizeRange(null, RANGE_ALL, false));
+ assertEquals(expected, afterSize - beforeSize);
+ runVoid(updateMode == UpdateMode.DISALLOW,
+ dict.update(keyNonEx, old -> fromString("test-value"), updateReturnMode).then().transform(LLUtils::handleDiscard)
+ );
+ afterSize = run(dict.sizeRange(null, RANGE_ALL, false));
+ assertEquals(expected, afterSize - beforeSize);
+
+ if (updateMode != UpdateMode.DISALLOW) {
+ Assertions.assertTrue(run(dict.getRangeKeys(null, RANGE_ALL).map(this::toString).collectList()).contains(
+ "test-nonexistent"));
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideArguments")
+ public void testUpdateAndGetDelta(UpdateMode updateMode) {
+ log.warn("Test not implemented");
+ //todo: implement
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideArguments")
+ public void testClear(UpdateMode updateMode) {
+ log.warn("Test not implemented");
+ //todo: implement
+ }
+
+ @ParameterizedTest
+ @MethodSource("providePutArguments")
+ public void testRemove(UpdateMode updateMode, LLDictionaryResultType resultType) {
+ log.warn("Test not implemented");
+ //todo: implement
+ }
+}
diff --git a/src/test/java/it/cavallium/dbengine/TestLLDictionaryLeaks.java b/src/test/java/it/cavallium/dbengine/TestLLDictionaryLeaks.java
index 3a15e47..026107d 100644
--- a/src/test/java/it/cavallium/dbengine/TestLLDictionaryLeaks.java
+++ b/src/test/java/it/cavallium/dbengine/TestLLDictionaryLeaks.java
@@ -26,9 +26,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
-import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
public abstract class TestLLDictionaryLeaks {
@@ -90,43 +88,12 @@ public abstract class TestLLDictionaryLeaks {
private Send fromString(String s) {
var sb = s.getBytes(StandardCharsets.UTF_8);
try (var b = db.getAllocator().allocate(sb.length)) {
- b.writeBytes(b);
+ b.writeBytes(sb);
+ assert b.readableBytes() == sb.length;
return b.send();
}
}
- private void run(Flux> publisher) {
- publisher.subscribeOn(Schedulers.immediate()).blockLast();
- }
-
- private void runVoid(Mono publisher) {
- publisher.then().subscribeOn(Schedulers.immediate()).block();
- }
-
- private T run(Mono publisher) {
- return publisher.subscribeOn(Schedulers.immediate()).block();
- }
-
- private T run(boolean shouldFail, Mono publisher) {
- return publisher.subscribeOn(Schedulers.immediate()).transform(mono -> {
- if (shouldFail) {
- return mono.onErrorResume(ex -> Mono.empty());
- } else {
- return mono;
- }
- }).block();
- }
-
- private void runVoid(boolean shouldFail, Mono publisher) {
- publisher.then().subscribeOn(Schedulers.immediate()).transform(mono -> {
- if (shouldFail) {
- return mono.onErrorResume(ex -> Mono.empty());
- } else {
- return mono;
- }
- }).block();
- }
-
@Test
public void testNoOp() {
}
@@ -164,9 +131,9 @@ public abstract class TestLLDictionaryLeaks {
public void testGet(UpdateMode updateMode) {
var dict = getDict(updateMode);
var key = Mono.fromCallable(() -> fromString("test"));
- runVoid(dict.get(null, key).then().transform(LLUtils::handleDiscard));
- runVoid(dict.get(null, key, true).then().transform(LLUtils::handleDiscard));
- runVoid(dict.get(null, key, false).then().transform(LLUtils::handleDiscard));
+ DbTestUtils.runVoid(dict.get(null, key).then().transform(LLUtils::handleDiscard));
+ DbTestUtils.runVoid(dict.get(null, key, true).then().transform(LLUtils::handleDiscard));
+ DbTestUtils.runVoid(dict.get(null, key, false).then().transform(LLUtils::handleDiscard));
}
@ParameterizedTest
@@ -175,14 +142,14 @@ public abstract class TestLLDictionaryLeaks {
var dict = getDict(updateMode);
var key = Mono.fromCallable(() -> fromString("test-key"));
var value = Mono.fromCallable(() -> fromString("test-value"));
- runVoid(dict.put(key, value, resultType).then().doOnDiscard(Send.class, Send::close));
+ DbTestUtils.runVoid(dict.put(key, value, resultType).then().doOnDiscard(Send.class, Send::close));
}
@ParameterizedTest
@MethodSource("provideArguments")
public void testGetUpdateMode(UpdateMode updateMode) {
var dict = getDict(updateMode);
- assertEquals(updateMode, run(dict.getUpdateMode()));
+ assertEquals(updateMode, DbTestUtils.run(dict.getUpdateMode()));
}
@ParameterizedTest
@@ -190,13 +157,13 @@ public abstract class TestLLDictionaryLeaks {
public void testUpdate(UpdateMode updateMode, UpdateReturnMode updateReturnMode) {
var dict = getDict(updateMode);
var key = Mono.fromCallable(() -> fromString("test-key"));
- runVoid(updateMode == UpdateMode.DISALLOW,
+ DbTestUtils.runVoid(updateMode == UpdateMode.DISALLOW,
dict.update(key, old -> old, updateReturnMode, true).then().transform(LLUtils::handleDiscard)
);
- runVoid(updateMode == UpdateMode.DISALLOW,
+ DbTestUtils.runVoid(updateMode == UpdateMode.DISALLOW,
dict.update(key, old -> old, updateReturnMode, false).then().transform(LLUtils::handleDiscard)
);
- runVoid(updateMode == UpdateMode.DISALLOW,
+ DbTestUtils.runVoid(updateMode == UpdateMode.DISALLOW,
dict.update(key, old -> old, updateReturnMode).then().transform(LLUtils::handleDiscard)
);
}
@@ -206,13 +173,13 @@ public abstract class TestLLDictionaryLeaks {
public void testUpdateAndGetDelta(UpdateMode updateMode) {
var dict = getDict(updateMode);
var key = Mono.fromCallable(() -> fromString("test-key"));
- runVoid(updateMode == UpdateMode.DISALLOW,
+ DbTestUtils.runVoid(updateMode == UpdateMode.DISALLOW,
dict.updateAndGetDelta(key, old -> old, true).then().transform(LLUtils::handleDiscard)
);
- runVoid(updateMode == UpdateMode.DISALLOW,
+ DbTestUtils.runVoid(updateMode == UpdateMode.DISALLOW,
dict.updateAndGetDelta(key, old -> old, false).then().transform(LLUtils::handleDiscard)
);
- runVoid(updateMode == UpdateMode.DISALLOW,
+ DbTestUtils.runVoid(updateMode == UpdateMode.DISALLOW,
dict.updateAndGetDelta(key, old -> old).then().transform(LLUtils::handleDiscard)
);
}
@@ -221,7 +188,7 @@ public abstract class TestLLDictionaryLeaks {
@MethodSource("provideArguments")
public void testClear(UpdateMode updateMode) {
var dict = getDict(updateMode);
- runVoid(dict.clear());
+ DbTestUtils.runVoid(dict.clear());
}
@ParameterizedTest
@@ -229,6 +196,6 @@ public abstract class TestLLDictionaryLeaks {
public void testRemove(UpdateMode updateMode, LLDictionaryResultType resultType) {
var dict = getDict(updateMode);
var key = Mono.fromCallable(() -> fromString("test-key"));
- runVoid(dict.remove(key, resultType).then().doOnDiscard(Send.class, Send::close));
+ DbTestUtils.runVoid(dict.remove(key, resultType).then().doOnDiscard(Send.class, Send::close));
}
}
diff --git a/src/test/java/it/cavallium/dbengine/TestLocalLLDictionary.java b/src/test/java/it/cavallium/dbengine/TestLocalLLDictionary.java
new file mode 100644
index 0000000..abd01de
--- /dev/null
+++ b/src/test/java/it/cavallium/dbengine/TestLocalLLDictionary.java
@@ -0,0 +1,11 @@
+package it.cavallium.dbengine;
+
+public class TestLocalLLDictionary extends TestLLDictionary {
+
+ private static final TemporaryDbGenerator GENERATOR = new LocalTemporaryDbGenerator();
+
+ @Override
+ protected TemporaryDbGenerator getTempDbGenerator() {
+ return GENERATOR;
+ }
+}
diff --git a/src/test/java/it/cavallium/dbengine/TestMemoryLLDictionary.java b/src/test/java/it/cavallium/dbengine/TestMemoryLLDictionary.java
new file mode 100644
index 0000000..ce032a6
--- /dev/null
+++ b/src/test/java/it/cavallium/dbengine/TestMemoryLLDictionary.java
@@ -0,0 +1,11 @@
+package it.cavallium.dbengine;
+
+public class TestMemoryLLDictionary extends TestLLDictionary {
+
+ private static final TemporaryDbGenerator GENERATOR = new MemoryTemporaryDbGenerator();
+
+ @Override
+ protected TemporaryDbGenerator getTempDbGenerator() {
+ return GENERATOR;
+ }
+}
diff --git a/src/test/resources/log4j2.xml b/src/test/resources/log4j2.xml
new file mode 100644
index 0000000..9731f37
--- /dev/null
+++ b/src/test/resources/log4j2.xml
@@ -0,0 +1,20 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file