Add some tests for low level dictionary, log tests, pass more tests

This commit is contained in:
Andrea Cavalli 2021-09-23 02:15:58 +02:00
parent e034f3b778
commit 29086b1939
23 changed files with 876 additions and 298 deletions

12
pom.xml
View File

@ -172,6 +172,11 @@
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.4</version>
</dependency>
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
@ -352,6 +357,11 @@
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.14.1</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
@ -542,7 +552,7 @@
</dependency>
</dependencies>
<configuration>
<argLine>--enable-preview --add-modules jdk.incubator.foreign -Dforeign.restricted=permit --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED --enable-native-access</argLine>
<argLine>--enable-preview --add-modules jdk.incubator.foreign -Dforeign.restricted=permit --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED --enable-native-access=ALL-UNNAMED</argLine>
<systemProperties>
<property>
<name>ci</name>

View File

@ -19,7 +19,7 @@ public class MappedSerializer<A, B> implements Serializer<B> {
}
@Override
public @NotNull DeserializationResult<B> deserialize(@Nullable Send<Buffer> serialized) throws SerializationException {
public @NotNull DeserializationResult<B> deserialize(@NotNull Send<Buffer> serialized) throws SerializationException {
try (serialized) {
var deserialized = serializer.deserialize(serialized);
return new DeserializationResult<>(keyMapper.map(deserialized.deserializedData()), deserialized.bytesRead());
@ -27,7 +27,7 @@ public class MappedSerializer<A, B> implements Serializer<B> {
}
@Override
public @Nullable Send<Buffer> serialize(@NotNull B deserialized) throws SerializationException {
public @NotNull Send<Buffer> serialize(@NotNull B deserialized) throws SerializationException {
return serializer.serialize(keyMapper.unmap(deserialized));
}
}

View File

@ -19,7 +19,7 @@ public class MappedSerializerFixedLength<A, B> implements SerializerFixedBinaryL
}
@Override
public @NotNull DeserializationResult<B> deserialize(@Nullable Send<Buffer> serialized) throws SerializationException {
public @NotNull DeserializationResult<B> deserialize(@NotNull Send<Buffer> serialized) throws SerializationException {
try (serialized) {
var deserialized = fixedLengthSerializer.deserialize(serialized);
return new DeserializationResult<>(keyMapper.map(deserialized.deserializedData()), deserialized.bytesRead());
@ -27,7 +27,7 @@ public class MappedSerializerFixedLength<A, B> implements SerializerFixedBinaryL
}
@Override
public @Nullable Send<Buffer> serialize(@NotNull B deserialized) throws SerializationException {
public @NotNull Send<Buffer> serialize(@NotNull B deserialized) throws SerializationException {
return fixedLengthSerializer.serialize(keyMapper.unmap(deserialized));
}

View File

@ -7,6 +7,7 @@ import io.net5.buffer.api.Send;
import io.net5.buffer.api.internal.ResourceSupport;
import java.util.StringJoiner;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
public class LLEntry extends ResourceSupport<LLEntry, LLEntry> {
@NotNull
@ -14,7 +15,7 @@ public class LLEntry extends ResourceSupport<LLEntry, LLEntry> {
@NotNull
private final Buffer value;
private LLEntry(Send<Buffer> key, Send<Buffer> value, Drop<LLEntry> drop) {
private LLEntry(@NotNull Send<Buffer> key, @NotNull Send<Buffer> value, Drop<LLEntry> drop) {
super(new LLEntry.CloseOnDrop(drop));
this.key = key.receive().makeReadOnly();
this.value = value.receive().makeReadOnly();
@ -29,7 +30,7 @@ public class LLEntry extends ResourceSupport<LLEntry, LLEntry> {
return true;
}
public static LLEntry of(Send<Buffer> key, Send<Buffer> value) {
public static LLEntry of(@NotNull Send<Buffer> key, @NotNull Send<Buffer> value) {
return new LLEntry(key, value, d -> {});
}

View File

@ -1,5 +1,7 @@
package it.cavallium.dbengine.database;
import static org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import io.net5.buffer.api.Buffer;
@ -10,30 +12,25 @@ import io.net5.buffer.api.Send;
import io.net5.util.IllegalReferenceCountException;
import io.net5.util.internal.PlatformDependent;
import it.cavallium.dbengine.database.collections.DatabaseStage;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLLocalLuceneIndex;
import it.cavallium.dbengine.database.disk.MemorySegmentUtils;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.lucene.RandomSortField;
import it.cavallium.dbengine.lucene.analyzer.WordAnalyzer;
import it.cavallium.dbengine.lucene.searcher.LocalQueryParams;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.ToIntFunction;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.FloatPoint;
@ -42,24 +39,13 @@ import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.Term;
import it.cavallium.dbengine.lucene.mlt.MultiMoreLikeThis;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.ConstantScoreQuery;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.MatchNoDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortedNumericSortField;
import org.apache.lucene.search.similarities.ClassicSimilarity;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.search.similarities.TFIDFSimilarity;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.RocksDB;
@ -195,9 +181,9 @@ public class LLUtils {
return new it.cavallium.dbengine.database.LLKeyScore(hit.docId(), hit.score(), hit.key());
}
public static String toStringSafe(Buffer key) {
public static String toStringSafe(@Nullable Buffer key) {
try {
if (key.isAccessible()) {
if (key == null || key.isAccessible()) {
return toString(key);
} else {
return "(released)";
@ -207,7 +193,7 @@ public class LLUtils {
}
}
public static String toString(Buffer key) {
public static String toString(@Nullable Buffer key) {
if (key == null) {
return "null";
} else {
@ -217,20 +203,35 @@ public class LLUtils {
if (iMax <= -1) {
return "[]";
} else {
StringBuilder b = new StringBuilder();
b.append('[');
StringBuilder arraySB = new StringBuilder();
StringBuilder asciiSB = new StringBuilder();
boolean isAscii = true;
arraySB.append('[');
int i = 0;
while (true) {
b.append(key.getByte(startIndex + i));
var byteVal = key.getUnsignedByte(startIndex + i);
arraySB.append(byteVal);
if (isAscii) {
if (byteVal >= 32 && byteVal < 127) {
asciiSB.append((char) byteVal);
} else {
isAscii = false;
asciiSB = null;
}
}
if (i == iLimit) {
b.append("");
arraySB.append("");
}
if (i == iMax || i == iLimit) {
return b.append(']').toString();
if (isAscii) {
return asciiSB.insert(0, "\"").append("\"").toString();
} else {
return arraySB.append(']').toString();
}
}
b.append(", ");
arraySB.append(", ");
++i;
}
}
@ -279,7 +280,10 @@ public class LLUtils {
return true;
}
public static byte[] toArray(Buffer key) {
public static byte[] toArray(@Nullable Buffer key) {
if (key == null) {
return EMPTY_BYTE_ARRAY;
}
byte[] array = new byte[key.readableBytes()];
key.copyInto(key.readerOffset(), array, 0, key.readableBytes());
return array;
@ -355,7 +359,6 @@ public class LLUtils {
PlatformDependent.freeDirectBuffer(directBuffer);
directBuffer = null;
}
buffer.close();
}
}
@ -445,6 +448,22 @@ public class LLUtils {
return true;
}
public static Send<Buffer> empty(BufferAllocator allocator) {
try (var empty = CompositeBuffer.compose(allocator)) {
assert empty.readableBytes() == 0;
assert empty.capacity() == 0;
return empty.send();
}
}
public static Send<Buffer> copy(BufferAllocator allocator, Buffer buf) {
if (CompositeBuffer.isComposite(buf) && buf.capacity() == 0) {
return empty(allocator);
} else {
return buf.copy().send();
}
}
public static record DirectBuffer(@NotNull Send<Buffer> buffer, @NotNull ByteBuffer byteBuffer) {}
@NotNull
@ -485,18 +504,23 @@ public class LLUtils {
);
}
assert buffer.isAccessible();
buffer.compact();
assert buffer.readerOffset() == 0;
AtomicLong nativeAddress = new AtomicLong(0);
if (buffer.countComponents() == 1) {
if (writable) {
if (buffer.countWritableComponents() == 1) {
buffer.forEachWritable(0, (i, c) -> {
assert c.writableNativeAddress() != 0;
nativeAddress.setPlain(c.writableNativeAddress());
return false;
});
}
} else {
if (buffer.countReadableComponents() == 1) {
var readableComponents = buffer.countReadableComponents();
if (readableComponents == 1) {
buffer.forEachReadable(0, (i, c) -> {
assert c.readableNativeAddress() != 0;
nativeAddress.setPlain(c.readableNativeAddress());
return false;
});
@ -512,7 +536,7 @@ public class LLUtils {
}
throw new IllegalStateException("Buffer is not direct");
}
return MemorySegmentUtils.directBuffer(nativeAddress.getPlain(), buffer.capacity());
return MemorySegmentUtils.directBuffer(nativeAddress.getPlain(), writable ? buffer.capacity() : buffer.writerOffset());
}
public static Buffer fromByteArray(BufferAllocator alloc, byte[] array) {
@ -534,22 +558,21 @@ public class LLUtils {
return buffer.receive();
}
public static Buffer compositeBuffer(BufferAllocator alloc, Send<Buffer> buffer1, Send<Buffer> buffer2) {
@NotNull
public static Buffer compositeBuffer(BufferAllocator alloc,
@NotNull Send<Buffer> buffer1,
@NotNull Send<Buffer> buffer2) {
return CompositeBuffer.compose(alloc, buffer1, buffer2);
}
@NotNull
public static Buffer compositeBuffer(BufferAllocator alloc,
Send<Buffer> buffer1,
Send<Buffer> buffer2,
Send<Buffer> buffer3) {
@NotNull Send<Buffer> buffer1,
@NotNull Send<Buffer> buffer2,
@NotNull Send<Buffer> buffer3) {
return CompositeBuffer.compose(alloc, buffer1, buffer2, buffer3);
}
@SafeVarargs
public static Buffer compositeBuffer(BufferAllocator alloc, Send<Buffer>... buffers) {
return CompositeBuffer.compose(alloc, buffers);
}
public static <T> Mono<T> resolveDelta(Mono<Delta<T>> prev, UpdateReturnMode updateReturnMode) {
return prev.handle((delta, sink) -> {
switch (updateReturnMode) {

View File

@ -2,8 +2,10 @@ package it.cavallium.dbengine.database.collections;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.CompositeBuffer;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.Serializer.DeserializationResult;
import org.jetbrains.annotations.NotNull;
@ -25,8 +27,8 @@ public class DatabaseEmpty {
}
@Override
public @Nullable Send<Buffer> serialize(@NotNull Nothing deserialized) {
return null;
public @NotNull Send<Buffer> serialize(@NotNull Nothing deserialized) {
return LLUtils.empty(bufferAllocator);
}
};
}

View File

@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -39,7 +40,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
private final Serializer<U> valueSerializer;
protected DatabaseMapDictionary(LLDictionary dictionary,
@Nullable Send<Buffer> prefixKey,
@NotNull Send<Buffer> prefixKey,
SerializerFixedBinaryLength<T> keySuffixSerializer,
Serializer<U> valueSerializer) {
// Do not retain or release or use the prefixKey here
@ -50,7 +51,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
public static <T, U> DatabaseMapDictionary<T, U> simple(LLDictionary dictionary,
SerializerFixedBinaryLength<T> keySerializer,
Serializer<U> valueSerializer) {
return new DatabaseMapDictionary<>(dictionary, null, keySerializer, valueSerializer);
return new DatabaseMapDictionary<>(dictionary, LLUtils.empty(dictionary.getAllocator()), keySerializer, valueSerializer);
}
public static <T, U> DatabaseMapDictionary<T, U> tail(LLDictionary dictionary,
@ -60,13 +61,21 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
return new DatabaseMapDictionary<>(dictionary, prefixKey, keySuffixSerializer, valueSerializer);
}
private Buffer toKey(Send<Buffer> suffixKeyToSend) {
var suffixKey = suffixKeyToSend.receive();
assert suffixKeyConsistency(suffixKey.readableBytes());
if (keyPrefix != null) {
return LLUtils.compositeBuffer(dictionary.getAllocator(), keyPrefix.copy().send(), suffixKey.send());
} else {
return suffixKey;
private Send<Buffer> toKey(Send<Buffer> suffixKeyToSend) {
try (var suffixKey = suffixKeyToSend.receive()) {
assert suffixKeyConsistency(suffixKey.readableBytes());
if (keyPrefix.readableBytes() > 0) {
try (var result = LLUtils.compositeBuffer(dictionary.getAllocator(),
LLUtils.copy(dictionary.getAllocator(), keyPrefix),
suffixKey.send()
)) {
assert result.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
return result.send();
}
} else {
assert suffixKey.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
return suffixKey.send();
}
}
}
@ -84,7 +93,12 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
.getRange(resolveSnapshot(snapshot), rangeMono, existsAlmostCertainly)
.<Entry<T, U>>handle((entrySend, sink) -> {
try (var entry = entrySend.receive()) {
var key = deserializeSuffix(stripPrefix(entry.getKey()));
T key;
try (var serializedKey = entry.getKey().receive()) {
removePrefix(serializedKey);
suffixKeyConsistency(serializedKey.readableBytes());
key = deserializeSuffix(serializedKey.send());
}
var value = valueSerializer.deserialize(entry.getValue()).deserializedData();
sink.next(Map.entry(key, value));
} catch (SerializationException ex) {
@ -103,7 +117,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
.fromIterable(Collections.unmodifiableMap(value).entrySet())
.handle((entry, sink) -> {
try {
sink.next(LLEntry.of(this.toKey(serializeSuffix(entry.getKey()).send()).send(),
sink.next(LLEntry.of(this.toKey(serializeSuffix(entry.getKey())),
valueSerializer.serialize(entry.getValue())).send());
} catch (SerializationException e) {
sink.error(e);
@ -133,14 +147,14 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<DatabaseStageEntry<U>> at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
return Mono.fromCallable(() ->
new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix).send()).send(), valueSerializer));
new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix)), valueSerializer));
}
@Override
public Mono<U> getValue(@Nullable CompositeSnapshot snapshot, T keySuffix, boolean existsAlmostCertainly) {
return dictionary
.get(resolveSnapshot(snapshot),
Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send()),
Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix))),
existsAlmostCertainly
)
.handle((value, sink) -> deserializeValue(value, sink));
@ -148,8 +162,8 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<Void> putValue(T keySuffix, U value) {
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send());
var valueMono = Mono.fromCallable(() -> valueSerializer.serialize(value));
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix))).single();
var valueMono = Mono.fromCallable(() -> valueSerializer.serialize(value)).single();
return dictionary
.put(keyMono, valueMono, LLDictionaryResultType.VOID)
.doOnNext(Send::close)
@ -166,7 +180,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
UpdateReturnMode updateReturnMode,
boolean existsAlmostCertainly,
SerializationFunction<@Nullable U, @Nullable U> updater) {
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send());
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
return dictionary
.update(keyMono, getSerializedUpdater(updater), updateReturnMode, existsAlmostCertainly)
.handle((value, sink) -> deserializeValue(value, sink));
@ -176,7 +190,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
public Mono<Delta<U>> updateValueAndGetDelta(T keySuffix,
boolean existsAlmostCertainly,
SerializationFunction<@Nullable U, @Nullable U> updater) {
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send());
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
return dictionary
.updateAndGetDelta(keyMono, getSerializedUpdater(updater), existsAlmostCertainly)
.transform(mono -> LLUtils.mapLLDelta(mono,
@ -224,7 +238,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<U> putValueAndGetPrevious(T keySuffix, U value) {
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send());
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
var valueMono = Mono.fromCallable(() -> valueSerializer.serialize(value));
return dictionary
.put(keyMono,
@ -235,7 +249,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<Boolean> putValueAndGetChanged(T keySuffix, U value) {
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send());
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
var valueMono = Mono.fromCallable(() -> valueSerializer.serialize(value));
return dictionary
.put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE)
@ -246,7 +260,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<Void> remove(T keySuffix) {
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send());
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
return dictionary
.remove(keyMono, LLDictionaryResultType.VOID)
.doOnNext(Send::close)
@ -255,7 +269,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<U> removeAndGetPrevious(T keySuffix) {
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send());
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
return dictionary
.remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE)
.handle((value, sink) -> deserializeValue(value, sink));
@ -263,7 +277,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<Boolean> removeAndGetStatus(T keySuffix) {
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send());
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
return dictionary
.remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE_EXISTENCE)
.map(LLUtils::responseToBoolean);
@ -274,7 +288,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
var mappedKeys = keys
.<Tuple2<T, Send<Buffer>>>handle((keySuffix, sink) -> {
try {
sink.next(Tuples.of(keySuffix, toKey(serializeSuffix(keySuffix).send()).send()));
sink.next(Tuples.of(keySuffix, toKey(serializeSuffix(keySuffix))));
} catch (SerializationException ex) {
sink.error(ex);
}
@ -303,9 +317,10 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
}
private Send<LLEntry> serializeEntry(T key, U value) throws SerializationException {
try (var serializedKey = toKey(serializeSuffix(key).send())) {
try (var serializedValue = valueSerializer.serialize(value).receive()) {
return LLEntry.of(serializedKey.send(), serializedValue.send()).send();
try (var serializedKey = toKey(serializeSuffix(key))) {
var serializedValueToReceive = valueSerializer.serialize(value);
try (var serializedValue = serializedValueToReceive.receive()) {
return LLEntry.of(serializedKey, serializedValue.send()).send();
}
}
}
@ -343,7 +358,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
var serializedEntries = entries
.<Tuple2<Send<Buffer>, X>>handle((entry, sink) -> {
try {
sink.next(Tuples.of(serializeSuffix(entry.getT1()).send(), entry.getT2()));
sink.next(Tuples.of(serializeSuffix(entry.getT1()), entry.getT2()));
} catch (SerializationException ex) {
sink.error(ex);
}
@ -374,16 +389,18 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
public Flux<Entry<T, DatabaseStageEntry<U>>> getAllStages(@Nullable CompositeSnapshot snapshot) {
return dictionary
.getRangeKeys(resolveSnapshot(snapshot), rangeMono)
.handle((key, sink) -> {
try (key) {
try (var keySuffixWithExt = stripPrefix(key).receive()) {
sink.next(Map.entry(deserializeSuffix(keySuffixWithExt.copy().send()),
new DatabaseSingle<>(dictionary,
toKey(keySuffixWithExt.send()).send(),
valueSerializer
)
));
}
.handle((keyBufToReceive, sink) -> {
try (var keyBuf = keyBufToReceive.receive()) {
assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
// Remove prefix. Keep only the suffix and the ext
removePrefix(keyBuf);
suffixKeyConsistency(keyBuf.readableBytes());
sink.next(Map.entry(deserializeSuffix(keyBuf.copy().send()),
new DatabaseSingle<>(dictionary,
toKey(keyBuf.send()),
valueSerializer
)
));
} catch (SerializationException ex) {
sink.error(ex);
}
@ -396,8 +413,14 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
.getRange(resolveSnapshot(snapshot), rangeMono)
.<Entry<T, U>>handle((serializedEntryToReceive, sink) -> {
try (var serializedEntry = serializedEntryToReceive.receive()) {
sink.next(Map.entry(deserializeSuffix(stripPrefix(serializedEntry.getKey())),
valueSerializer.deserialize(serializedEntry.getValue()).deserializedData()));
try (var keyBuf = serializedEntry.getKey().receive()) {
assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
// Remove prefix. Keep only the suffix and the ext
removePrefix(keyBuf);
suffixKeyConsistency(keyBuf.readableBytes());
sink.next(Map.entry(deserializeSuffix(keyBuf.send()),
valueSerializer.deserialize(serializedEntry.getValue()).deserializedData()));
}
} catch (SerializationException e) {
sink.error(e);
}
@ -418,7 +441,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
this.getAllValues(null),
dictionary.setRange(rangeMono, entries.handle((entry, sink) -> {
try {
sink.next(LLEntry.of(toKey(serializeSuffix(entry.getKey()).send()).send(),
sink.next(LLEntry.of(toKey(serializeSuffix(entry.getKey())),
valueSerializer.serialize(entry.getValue())).send());
} catch (SerializationException e) {
sink.error(e);

View File

@ -2,7 +2,6 @@ package it.cavallium.dbengine.database.collections;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.CompositeBuffer;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send;
import io.net5.util.IllegalReferenceCountException;
@ -18,6 +17,8 @@ import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -29,6 +30,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
private final BufferAllocator alloc;
protected final SubStageGetter<U, US> subStageGetter;
protected final SerializerFixedBinaryLength<T> keySuffixSerializer;
@NotNull
protected final Buffer keyPrefix;
protected final int keyPrefixLength;
protected final int keySuffixLength;
@ -90,27 +92,26 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
}
protected static Buffer zeroFillKeySuffixAndExt(BufferAllocator alloc,
@Nullable Send<Buffer> prefixKeySend,
@NotNull Send<Buffer> prefixKeySend,
int prefixLength,
int suffixLength,
int extLength) {
try (var result = prefixKeySend == null ? null : prefixKeySend.receive()) {
if (result == null) {
assert prefixLength == 0;
var buf = alloc.allocate(prefixLength + suffixLength + extLength);
buf.writerOffset(prefixLength + suffixLength + extLength);
buf.fill((byte) 0);
return buf;
} else {
assert result.readableBytes() == prefixLength;
assert suffixLength > 0;
assert extLength >= 0;
result.ensureWritable(suffixLength + extLength, suffixLength + extLength, true);
for (int i = 0; i < suffixLength + extLength; i++) {
result.writeByte((byte) 0x0);
}
return result;
var result = prefixKeySend.receive();
if (result == null) {
assert prefixLength == 0;
var buf = alloc.allocate(prefixLength + suffixLength + extLength);
buf.writerOffset(prefixLength + suffixLength + extLength);
buf.fill((byte) 0);
return buf;
} else {
assert result.readableBytes() == prefixLength;
assert suffixLength > 0;
assert extLength >= 0;
result.ensureWritable(suffixLength + extLength, suffixLength + extLength, true);
for (int i = 0; i < suffixLength + extLength; i++) {
result.writeByte((byte) 0x0);
}
return result;
}
}
@ -175,7 +176,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
public static <T, U> DatabaseMapDictionaryDeep<T, U, DatabaseStageEntry<U>> simple(LLDictionary dictionary,
SerializerFixedBinaryLength<T> keySerializer,
SubStageGetterSingle<U> subStageGetter) {
return new DatabaseMapDictionaryDeep<>(dictionary, null, keySerializer, subStageGetter, 0);
return new DatabaseMapDictionaryDeep<>(dictionary, LLUtils.empty(dictionary.getAllocator()), keySerializer, subStageGetter, 0);
}
public static <T, U, US extends DatabaseStage<U>> DatabaseMapDictionaryDeep<T, U, US> deepTail(LLDictionary dictionary,
@ -183,7 +184,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
int keyExtLength,
SubStageGetter<U, US> subStageGetter) {
return new DatabaseMapDictionaryDeep<>(dictionary,
null,
LLUtils.empty(dictionary.getAllocator()),
keySerializer,
subStageGetter,
keyExtLength
@ -199,26 +200,26 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
}
protected DatabaseMapDictionaryDeep(LLDictionary dictionary,
@Nullable Send<Buffer> prefixKeyToReceive,
@NotNull Send<Buffer> prefixKeyToReceive,
SerializerFixedBinaryLength<T> keySuffixSerializer,
SubStageGetter<U, US> subStageGetter,
int keyExtLength) {
try (var prefixKey = prefixKeyToReceive == null ? null : prefixKeyToReceive.receive()) {
try (var prefixKey = prefixKeyToReceive.receive()) {
this.dictionary = dictionary;
this.alloc = dictionary.getAllocator();
this.subStageGetter = subStageGetter;
this.keySuffixSerializer = keySuffixSerializer;
assert prefixKey == null || prefixKey.isAccessible();
this.keyPrefixLength = prefixKey == null ? 0 : prefixKey.readableBytes();
assert prefixKey.isAccessible();
this.keyPrefixLength = prefixKey.readableBytes();
this.keySuffixLength = keySuffixSerializer.getSerializedBinaryLength();
this.keyExtLength = keyExtLength;
Buffer firstKey = firstRangeKey(alloc, prefixKey == null ? null : prefixKey.copy().send(), keyPrefixLength,
Buffer firstKey = firstRangeKey(alloc, LLUtils.copy(alloc, prefixKey), keyPrefixLength,
keySuffixLength, keyExtLength);
try (firstKey) {
var nextRangeKey = nextRangeKey(alloc, prefixKey == null ? null : prefixKey.copy().send(),
var nextRangeKey = nextRangeKey(alloc, LLUtils.copy(alloc, prefixKey),
keyPrefixLength, keySuffixLength, keyExtLength);
try (nextRangeKey) {
assert prefixKey == null || prefixKey.isAccessible();
assert prefixKey.isAccessible();
assert keyPrefixLength == 0 || !LLUtils.equals(firstKey, nextRangeKey);
this.range = keyPrefixLength == 0 ? LLRange.all() : LLRange.of(firstKey.send(), nextRangeKey.send());
this.rangeMono = LLUtils.lazyRetainRange(this.range);
@ -226,7 +227,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
}
}
this.keyPrefix = prefixKey == null ? null : prefixKey.send().receive();
this.keyPrefix = prefixKey.send().receive();
}
}
@ -246,21 +247,28 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
}
/**
* Keep only suffix and ext
* Removes the prefix from the key
*/
protected Send<Buffer> stripPrefix(Send<Buffer> keyToReceive) {
try (var key = keyToReceive.receive()) {
return key.copy(this.keyPrefixLength, key.readableBytes() - this.keyPrefixLength).send();
}
protected void removePrefix(Buffer key) {
assert key.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
key.readerOffset(key.readerOffset() + this.keyPrefixLength).compact();
assert key.readableBytes() == keySuffixLength + keyExtLength;
}
/**
* Add prefix to suffix
* Removes the ext from the key
*/
protected void removeExt(Buffer key) {
assert key.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
key.writerOffset(keyPrefixLength + keySuffixLength).compact();
assert key.readableBytes() == keyPrefixLength + keySuffixLength;
}
protected Send<Buffer> toKeyWithoutExt(Send<Buffer> suffixKeyToReceive) {
try (var suffixKey = suffixKeyToReceive.receive()) {
assert suffixKey.readableBytes() == keySuffixLength;
try (Buffer result = LLUtils.compositeBuffer(alloc, keyPrefix.copy().send(), suffixKey.send())) {
try (var result = Objects.requireNonNull(LLUtils.compositeBuffer(alloc,
LLUtils.copy(alloc, keyPrefix), suffixKey.send()))) {
assert result.readableBytes() == keyPrefixLength + keySuffixLength;
return result.send();
}
@ -287,8 +295,9 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
@Override
public Mono<US> at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
var suffixKeyWithoutExt = Mono.fromCallable(() -> toKeyWithoutExt(serializeSuffix(keySuffix)));
return this.subStageGetter
.subStage(dictionary, snapshot, Mono.fromCallable(() -> toKeyWithoutExt(serializeSuffix(keySuffix).send())))
.subStage(dictionary, snapshot, suffixKeyWithoutExt)
.transform(LLUtils::handleDiscard)
.doOnDiscard(DatabaseStage.class, DatabaseStage::release);
}
@ -310,11 +319,10 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
.flatMapSequential(groupKeyWithoutExtSend_ -> Mono.using(
groupKeyWithoutExtSend_::receive,
groupKeyWithoutExtSend -> this.subStageGetter
.subStage(dictionary, snapshot, getGroupKeyWithoutExt(groupKeyWithoutExtSend.copy().send()))
.subStage(dictionary, snapshot, Mono.fromCallable(() -> groupKeyWithoutExtSend.copy().send()))
.<Entry<T, US>>handle((us, sink) -> {
try {
sink.next(Map.entry(this.deserializeSuffix(getGroupSuffix(groupKeyWithoutExtSend.send())),
us));
sink.next(Map.entry(this.deserializeSuffix(getGroupSuffix(groupKeyWithoutExtSend.send())), us));
} catch (SerializationException ex) {
sink.error(ex);
}
@ -324,22 +332,22 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
.transform(LLUtils::handleDiscard);
}
private Send<Buffer> getGroupSuffix(Send<Buffer> groupKeyWithoutExtSend) {
try (var groupKeyWithoutExt = groupKeyWithoutExtSend.receive()) {
try (var groupSuffix = this.stripPrefix(groupKeyWithoutExt.copy().send()).receive()) {
assert subStageKeysConsistency(groupKeyWithoutExt.readableBytes() + keyExtLength);
return groupSuffix.send();
}
private Send<Buffer> getGroupSuffix(Send<Buffer> groupKeyWithoutExt) {
try (var buffer = groupKeyWithoutExt.receive()) {
assert subStageKeysConsistency(buffer.readableBytes() + keyExtLength);
this.removePrefix(buffer);
assert subStageKeysConsistency(keyPrefixLength + buffer.readableBytes() + keyExtLength);
return buffer.send();
}
}
private Mono<Send<Buffer>> getGroupKeyWithoutExt(Send<Buffer> groupKeyWithoutExtSend) {
return Mono.fromCallable(() -> {
try (var groupKeyWithoutExt = groupKeyWithoutExtSend.receive()) {
assert subStageKeysConsistency(groupKeyWithoutExt.readableBytes() + keyExtLength);
return groupKeyWithoutExt.send();
}
});
private Send<Buffer> getGroupWithoutExt(Send<Buffer> groupKeyWithExtSend) {
try (var buffer = groupKeyWithExtSend.receive()) {
assert subStageKeysConsistency(buffer.readableBytes());
this.removeExt(buffer);
assert subStageKeysConsistency(buffer.readableBytes() + keyExtLength);
return buffer.send();
}
}
private boolean subStageKeysConsistency(int totalKeyLength) {
@ -383,7 +391,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
}
//todo: temporary wrapper. convert the whole class to buffers
protected T deserializeSuffix(Send<Buffer> keySuffixToReceive) throws SerializationException {
protected T deserializeSuffix(@NotNull Send<Buffer> keySuffixToReceive) throws SerializationException {
try (var keySuffix = keySuffixToReceive.receive()) {
assert suffixKeyConsistency(keySuffix.readableBytes());
var result = keySuffixSerializer.deserialize(keySuffix.send());
@ -393,11 +401,15 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
}
//todo: temporary wrapper. convert the whole class to buffers
protected Buffer serializeSuffix(T keySuffix) throws SerializationException {
Buffer suffixData = keySuffixSerializer.serialize(keySuffix).receive();
assert suffixKeyConsistency(suffixData.readableBytes());
assert keyPrefix.isAccessible();
return suffixData;
@NotNull
protected Send<Buffer> serializeSuffix(T keySuffix) throws SerializationException {
try (var suffixDataToReceive = keySuffixSerializer.serialize(keySuffix)) {
try (Buffer suffixData = suffixDataToReceive.receive()) {
assert suffixKeyConsistency(suffixData.readableBytes());
assert keyPrefix.isAccessible();
return suffixData.send();
}
}
}
@Override

View File

@ -6,6 +6,7 @@ import io.net5.buffer.api.Send;
import it.cavallium.dbengine.client.BadBlock;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
@ -58,7 +59,7 @@ public class DatabaseMapDictionaryHashed<T, U, TH> implements DatabaseStageMap<T
SerializerFixedBinaryLength<UH> keyHashSerializer) {
return new DatabaseMapDictionaryHashed<>(
dictionary,
null,
LLUtils.empty(dictionary.getAllocator()),
keySerializer,
valueSerializer,
keyHashFunction,

View File

@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.collections;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.CompositeBuffer;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.serialization.SerializationException;
@ -43,17 +44,9 @@ class ValueWithHashSerializer<X, Y> implements Serializer<Entry<X, Y>> {
}
@Override
public @Nullable Send<Buffer> serialize(@NotNull Entry<X, Y> deserialized) throws SerializationException {
public @NotNull Send<Buffer> serialize(@NotNull Entry<X, Y> deserialized) throws SerializationException {
var keySuffix = keySuffixSerializer.serialize(deserialized.getKey());
var value = valueSerializer.serialize(deserialized.getValue());
if (value == null && keySuffix == null) {
return null;
} else if (value == null) {
return keySuffix;
} else if (keySuffix == null) {
return value;
} else {
return LLUtils.compositeBuffer(allocator, keySuffix, value).send();
}
return LLUtils.compositeBuffer(allocator, keySuffix, value).send();
}
}

View File

@ -45,8 +45,8 @@ class ValuesSetSerializer<X> implements Serializer<ObjectArraySet<X>> {
output.writeInt(deserialized.size());
for (X entry : deserialized) {
var serializedToReceive = entrySerializer.serialize(entry);
if (serializedToReceive != null) {
try (Buffer serialized = serializedToReceive.receive()) {
try (Buffer serialized = serializedToReceive.receive()) {
if (serialized.readableBytes() > 0) {
output.ensureWritable(serialized.readableBytes());
output.writeBytes(serialized);
}

View File

@ -267,10 +267,23 @@ public class LLLocalDictionary implements LLDictionary {
stamp = 0;
}
try {
Buffer logKey;
if (logger.isTraceEnabled(MARKER_ROCKSDB)) {
logger.trace(MARKER_ROCKSDB, "Reading {}", LLUtils.toStringSafe(key));
logKey = key.copy();
} else {
logKey = null;
}
try (logKey) {
var result = dbGet(cfh, resolveSnapshot(snapshot), key.send(), existsAlmostCertainly);
if (logger.isTraceEnabled(MARKER_ROCKSDB)) {
try (var result2 = result == null ? null : result.receive()) {
logger.trace(MARKER_ROCKSDB, "Reading {}: {}", LLUtils.toStringSafe(logKey), LLUtils.toString(result2));
return result2 == null ? null : result2.send();
}
} else {
return result;
}
}
return dbGet(cfh, resolveSnapshot(snapshot), key.send(), existsAlmostCertainly);
} finally {
if (updateMode == UpdateMode.ALLOW) {
lock.unlockRead(stamp);
@ -414,6 +427,8 @@ public class LLLocalDictionary implements LLDictionary {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called dbPut in a nonblocking thread");
}
assert key.isAccessible();
assert value.isAccessible();
if (databaseOptions.allowNettyDirect()) {
var keyNioBuffer = LLUtils.convertToReadableDirect(alloc, key.send());
try (var ignored1 = keyNioBuffer.buffer().receive()) {
@ -592,6 +607,8 @@ public class LLLocalDictionary implements LLDictionary {
valueSend -> this.<Send<Buffer>>runOnDb(() -> {
try (var key = keySend.receive()) {
try (var value = valueSend.receive()) {
assert key.isAccessible();
assert value.isAccessible();
StampedLock lock;
long stamp;
if (updateMode == UpdateMode.ALLOW) {
@ -656,9 +673,6 @@ public class LLLocalDictionary implements LLDictionary {
stamp = 0;
}
try {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Reading {}", LLUtils.toStringSafe(key));
}
while (true) {
@Nullable Buffer prevData;
var prevDataHolder = existsAlmostCertainly ? null : new Holder<byte[]>();
@ -682,19 +696,37 @@ public class LLLocalDictionary implements LLDictionary {
} else {
prevData = null;
}
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Reading {}: {} (before update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevData)
);
}
try {
@Nullable Buffer newData;
try (Buffer prevDataToSendToUpdater = prevData == null ? null : prevData.copy()) {
try (var newDataToReceive = updater.apply(
prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send())) {
if (newDataToReceive != null) {
newData = newDataToReceive.receive();
} else {
newData = null;
try (var sentData = prevDataToSendToUpdater == null ? null
: prevDataToSendToUpdater.send()) {
try (var newDataToReceive = updater.apply(sentData)) {
if (newDataToReceive != null) {
newData = newDataToReceive.receive();
} else {
newData = null;
}
}
}
}
assert newData == null || newData.isAccessible();
try {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Updating {}. previous data: {}, updated data: {}",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevData),
LLUtils.toStringSafe(newData)
);
}
if (prevData != null && newData == null) {
//noinspection DuplicatedCode
if (updateMode == UpdateMode.ALLOW) {
@ -709,7 +741,7 @@ public class LLLocalDictionary implements LLDictionary {
}
}
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Deleting {}", LLUtils.toStringSafe(key));
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
}
dbDelete(cfh, null, key.send());
} else if (newData != null
@ -727,7 +759,11 @@ public class LLLocalDictionary implements LLDictionary {
}
}
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Writing {}: {}", LLUtils.toStringSafe(key), LLUtils.toStringSafe(newData));
logger.trace(MARKER_ROCKSDB,
"Writing {}: {} (after update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(newData)
);
}
Buffer dataToPut;
if (updateReturnMode == UpdateReturnMode.GET_NEW_VALUE) {
@ -779,7 +815,7 @@ public class LLLocalDictionary implements LLDictionary {
SerializationFunction<@Nullable Send<Buffer>, @Nullable Send<Buffer>> updater,
boolean existsAlmostCertainly) {
return Mono.usingWhen(keyMono,
keySend -> this.runOnDb(() -> {
keySend -> runOnDb(() -> {
try (var key = keySend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called update in a nonblocking thread");
@ -799,7 +835,7 @@ public class LLLocalDictionary implements LLDictionary {
}
try {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Reading {}", LLUtils.toStringSafe(key));
logger.trace(MARKER_ROCKSDB, "Reading {} (before update)", LLUtils.toStringSafe(key));
}
while (true) {
@Nullable Buffer prevData;
@ -824,19 +860,37 @@ public class LLLocalDictionary implements LLDictionary {
} else {
prevData = null;
}
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Read {}: {} (before update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevData)
);
}
try {
@Nullable Buffer newData;
try (Buffer prevDataToSendToUpdater = prevData == null ? null : prevData.copy()) {
try (var newDataToReceive = updater.apply(
prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send())) {
if (newDataToReceive != null) {
newData = newDataToReceive.receive();
} else {
newData = null;
try (var sentData = prevDataToSendToUpdater == null ? null
: prevDataToSendToUpdater.send()) {
try (var newDataToReceive = updater.apply(sentData)) {
if (newDataToReceive != null) {
newData = newDataToReceive.receive();
} else {
newData = null;
}
}
}
}
assert newData == null || newData.isAccessible();
try {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Updating {}. previous data: {}, updated data: {}",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevData),
LLUtils.toStringSafe(newData)
);
}
if (prevData != null && newData == null) {
//noinspection DuplicatedCode
if (updateMode == UpdateMode.ALLOW) {
@ -851,7 +905,7 @@ public class LLLocalDictionary implements LLDictionary {
}
}
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Deleting {}", LLUtils.toStringSafe(key));
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
}
dbDelete(cfh, null, key.send());
} else if (newData != null
@ -869,8 +923,11 @@ public class LLLocalDictionary implements LLDictionary {
}
}
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Writing {}: {}",
LLUtils.toStringSafe(key), LLUtils.toStringSafe(newData));
logger.trace(MARKER_ROCKSDB,
"Writing {}: {} (after update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(newData)
);
}
assert key.isAccessible();
assert newData.isAccessible();
@ -986,18 +1043,24 @@ public class LLLocalDictionary implements LLDictionary {
stamp = 0;
}
try {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Reading {}", LLUtils.toArray(key));
}
var data = new Holder<byte[]>();
Buffer bufferResult;
if (db.keyMayExist(cfh, LLUtils.toArray(key), data)) {
if (data.getValue() != null) {
return LLUtils.fromByteArray(alloc, data.getValue()).send();
bufferResult = LLUtils.fromByteArray(alloc, data.getValue());
} else {
return dbGet(cfh, null, key.send(), true);
try (var bufferResultToReceive = dbGet(cfh, null, key.send(), true)) {
bufferResult = bufferResultToReceive == null ? null : bufferResultToReceive.receive();
}
}
} else {
return null;
bufferResult = null;
}
try (bufferResult) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Reading {}: {}", LLUtils.toStringSafe(key), LLUtils.toStringSafe(bufferResult));
}
return bufferResult == null ? null : bufferResult.send();
}
} finally {
if (updateMode == UpdateMode.ALLOW) {

View File

@ -124,6 +124,6 @@ public class MemorySegmentUtils {
}
public static String getSuggestedArgs() {
return "--enable-preview --add-modules jdk.incubator.foreign -Dforeign.restricted=permit --enable-native-access";
return "--enable-preview --add-modules jdk.incubator.foreign -Dforeign.restricted=permit --enable-native-access=ALL-UNNAMED";
}
}

View File

@ -4,6 +4,7 @@ import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.netty.NullableBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import org.jetbrains.annotations.NotNull;
@ -13,16 +14,16 @@ public interface Serializer<A> {
record DeserializationResult<T>(T deserializedData, int bytesRead) {}
@NotNull DeserializationResult<A> deserialize(@Nullable Send<Buffer> serialized) throws SerializationException;
@NotNull DeserializationResult<A> deserialize(@NotNull Send<Buffer> serialized) throws SerializationException;
@Nullable Send<Buffer> serialize(@NotNull A deserialized) throws SerializationException;
@NotNull Send<Buffer> serialize(@NotNull A deserialized) throws SerializationException;
Serializer<Send<Buffer>> NOOP_SERIALIZER = new Serializer<>() {
@Override
public @NotNull DeserializationResult<Send<Buffer>> deserialize(@Nullable Send<Buffer> serialized) {
try (var serializedBuf = serialized == null ? null : serialized.receive()) {
var readableBytes = serializedBuf == null ? 0 : serializedBuf.readableBytes();
return new DeserializationResult<>(serializedBuf == null ? null : serializedBuf.send(), readableBytes);
public @NotNull DeserializationResult<Send<Buffer>> deserialize(@NotNull Send<Buffer> serialized) {
try (var serializedBuf = serialized.receive()) {
var readableBytes = serializedBuf.readableBytes();
return new DeserializationResult<>(serializedBuf.send(), readableBytes);
}
}

View File

@ -17,10 +17,7 @@ public interface SerializerFixedBinaryLength<A> extends Serializer<A> {
static SerializerFixedBinaryLength<Send<Buffer>> noop(int length) {
return new SerializerFixedBinaryLength<>() {
@Override
public @NotNull DeserializationResult<Send<Buffer>> deserialize(@Nullable Send<Buffer> serialized) {
if (length == 0 && serialized == null) {
return new DeserializationResult<>(null, 0);
}
public @NotNull DeserializationResult<Send<Buffer>> deserialize(@NotNull Send<Buffer> serialized) {
Objects.requireNonNull(serialized);
try (var buf = serialized.receive()) {
if (buf.readableBytes() != getSerializedBinaryLength()) {
@ -55,12 +52,8 @@ public interface SerializerFixedBinaryLength<A> extends Serializer<A> {
static SerializerFixedBinaryLength<String> utf8(BufferAllocator allocator, int length) {
return new SerializerFixedBinaryLength<>() {
@Override
public @NotNull DeserializationResult<String> deserialize(@Nullable Send<Buffer> serializedToReceive)
public @NotNull DeserializationResult<String> deserialize(@NotNull Send<Buffer> serializedToReceive)
throws SerializationException {
if (length == 0 && serializedToReceive == null) {
return new DeserializationResult<>(null, 0);
}
Objects.requireNonNull(serializedToReceive);
try (var serialized = serializedToReceive.receive()) {
if (serialized.readableBytes() != getSerializedBinaryLength()) {
throw new SerializationException(
@ -78,7 +71,9 @@ public interface SerializerFixedBinaryLength<A> extends Serializer<A> {
// UTF-8 uses max. 3 bytes per char, so calculate the worst case.
try (Buffer buf = allocator.allocate(LLUtils.utf8MaxBytes(deserialized))) {
assert buf.isAccessible();
buf.writeBytes(deserialized.getBytes(StandardCharsets.UTF_8));
var bytes = deserialized.getBytes(StandardCharsets.UTF_8);
buf.ensureWritable(bytes.length);
buf.writeBytes(bytes);
if (buf.readableBytes() != getSerializedBinaryLength()) {
throw new SerializationException("Fixed serializer with " + getSerializedBinaryLength()
+ " bytes has tried to serialize an element with "
@ -99,10 +94,7 @@ public interface SerializerFixedBinaryLength<A> extends Serializer<A> {
static SerializerFixedBinaryLength<Integer> intSerializer(BufferAllocator allocator) {
return new SerializerFixedBinaryLength<>() {
@Override
public @NotNull DeserializationResult<Integer> deserialize(@Nullable Send<Buffer> serializedToReceive) {
if (getSerializedBinaryLength() == 0 && serializedToReceive == null) {
return new DeserializationResult<>(null, 0);
}
public @NotNull DeserializationResult<Integer> deserialize(@NotNull Send<Buffer> serializedToReceive) {
Objects.requireNonNull(serializedToReceive);
try (var serialized = serializedToReceive.receive()) {
if (serialized.readableBytes() != getSerializedBinaryLength()) {
@ -131,10 +123,7 @@ public interface SerializerFixedBinaryLength<A> extends Serializer<A> {
static SerializerFixedBinaryLength<Long> longSerializer(BufferAllocator allocator) {
return new SerializerFixedBinaryLength<>() {
@Override
public @NotNull DeserializationResult<Long> deserialize(@Nullable Send<Buffer> serializedToReceive) {
if (getSerializedBinaryLength() == 0 && serializedToReceive == null) {
return new DeserializationResult<>(null, 0);
}
public @NotNull DeserializationResult<Long> deserialize(@NotNull Send<Buffer> serializedToReceive) {
Objects.requireNonNull(serializedToReceive);
try (var serialized = serializedToReceive.receive()) {
if (serialized.readableBytes() != getSerializedBinaryLength()) {

View File

@ -0,0 +1,74 @@
package it.cavallium.dbengine.netty;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.Drop;
import io.net5.buffer.api.Owned;
import io.net5.buffer.api.Send;
import io.net5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.client.SearchResult;
import org.jetbrains.annotations.Nullable;
public class NullableBuffer extends ResourceSupport<NullableBuffer, NullableBuffer> {
@Nullable
private Buffer buffer;
public NullableBuffer(@Nullable Buffer buffer, Drop<NullableBuffer> drop) {
super(new CloseOnDrop(drop));
this.buffer = buffer == null ? null : buffer.send().receive();
}
public NullableBuffer(@Nullable Send<Buffer> buffer, Drop<NullableBuffer> drop) {
super(new CloseOnDrop(drop));
this.buffer = buffer == null ? null : buffer.receive();
}
@Nullable
public Buffer buf() {
return buffer;
}
@Nullable
public Send<Buffer> sendBuf() {
return buffer == null ? null : buffer.send();
}
@Override
protected RuntimeException createResourceClosedException() {
return new IllegalStateException("Closed");
}
@Override
protected Owned<NullableBuffer> prepareSend() {
var buffer = this.buffer == null ? null : this.buffer.send();
makeInaccessible();
return drop -> new NullableBuffer(buffer, drop);
}
private void makeInaccessible() {
this.buffer = null;
}
private static class CloseOnDrop implements Drop<NullableBuffer> {
private final Drop<NullableBuffer> delegate;
public CloseOnDrop(Drop<NullableBuffer> drop) {
this.delegate = drop;
}
@Override
public void drop(NullableBuffer obj) {
try {
if (obj.buffer != null) {
if (obj.buffer.isAccessible()) {
obj.buffer.close();
}
}
delegate.drop(obj);
} finally {
obj.makeInaccessible();
}
}
}
}

View File

@ -32,6 +32,7 @@ import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class DbTestUtils {
@ -42,6 +43,38 @@ public class DbTestUtils {
return "0123456789".repeat(1024);
}
public static void run(Flux<?> publisher) {
publisher.subscribeOn(Schedulers.immediate()).blockLast();
}
public static void runVoid(Mono<Void> publisher) {
publisher.then().subscribeOn(Schedulers.immediate()).block();
}
public static <T> T run(Mono<T> publisher) {
return publisher.subscribeOn(Schedulers.immediate()).block();
}
public static <T> T run(boolean shouldFail, Mono<T> publisher) {
return publisher.subscribeOn(Schedulers.immediate()).transform(mono -> {
if (shouldFail) {
return mono.onErrorResume(ex -> Mono.empty());
} else {
return mono;
}
}).block();
}
public static void runVoid(boolean shouldFail, Mono<Void> publisher) {
publisher.then().subscribeOn(Schedulers.immediate()).transform(mono -> {
if (shouldFail) {
return mono.onErrorResume(ex -> Mono.empty());
} else {
return mono;
}
}).block();
}
public static record TestAllocator(PooledBufferAllocator allocator) {}
public static TestAllocator newAllocator() {

View File

@ -19,15 +19,20 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import reactor.test.StepVerifier.Step;
import reactor.test.util.TestLogger;
import reactor.util.Loggers;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
public abstract class TestDictionaryMap {
private static final Logger log = LoggerFactory.getLogger(TestDictionaryMap.class);
private TestAllocator allocator;
private boolean checkLeaks = true;
@ -97,21 +102,25 @@ public abstract class TestDictionaryMap {
@ParameterizedTest
@MethodSource("provideArgumentsPut")
public void testPut(MapType mapType, UpdateMode updateMode, String key, String value, boolean shouldFail) {
var stpVer = StepVerifier
.create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5))
.flatMap(map -> map
.putValue(key, value)
.then(map.getValue(null, key))
.doAfterTerminate(map::release)
)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
stpVer.expectNext(value).verifyComplete();
}
var gen = getTempDbGenerator();
var db = run(gen.openTempDb(allocator));
var dict = run(tempDictionary(db.db(), updateMode));
var map = tempDatabaseMapDictionaryMap(dict, mapType, 5);
runVoid(shouldFail, map.putValue(key, value));
var resultingMapSize = run(map.leavesCount(null, false));
Assertions.assertEquals(shouldFail ? 0 : 1, resultingMapSize);
var resultingMap = run(map.get(null));
Assertions.assertEquals(shouldFail ? null : Map.of(key, value), resultingMap);
runVoid(map.close());
map.release();
//if (shouldFail) this.checkLeaks = false;
gen.closeTempDb(db);
}
@ParameterizedTest
@ -257,26 +266,50 @@ public abstract class TestDictionaryMap {
.map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5))
.flatMapMany(map -> Flux
.concat(
map.updateValue(key, old -> {
assert old == null;
return "error?";
}).then(map.getValue(null, key)),
map.updateValue(key, false, old -> {
assert Objects.equals(old, "error?");
return "error?";
}).then(map.getValue(null, key)),
map.updateValue(key, true, old -> {
assert Objects.equals(old, "error?");
return "error?";
}).then(map.getValue(null, key)),
map.updateValue(key, true, old -> {
assert Objects.equals(old, "error?");
return value;
}).then(map.getValue(null, key)),
map.updateValue(key, true, old -> {
assert Objects.equals(old, value);
return value;
}).then(map.getValue(null, key))
Mono
.fromRunnable(() -> log.debug("1. Updating value: {}", key))
.then(map.updateValue(key, old -> {
assert old == null;
return "error?";
}))
.doOnSuccess(s -> log.debug("1. Getting value: {}", key))
.then(map.getValue(null, key)),
Mono
.fromRunnable(() -> log.debug("2. Updating value: {}", key))
.then(map.updateValue(key, false, old -> {
assert Objects.equals(old, "error?");
return "error?";
}))
.doOnSuccess(s -> log.debug("2. Getting value: {}", key))
.then(map.getValue(null, key)),
Mono
.fromRunnable(() -> log.debug("3. Updating value: {}", key))
.then(map.updateValue(key, true, old -> {
assert Objects.equals(old, "error?");
return "error?";
}))
.doOnSuccess(s -> log.debug("3. Getting value: {}", key))
.then(map.getValue(null, key)),
Mono
.fromRunnable(() -> log.debug("4. Updating value: {}", key))
.then(map.updateValue(key, true, old -> {
assert Objects.equals(old, "error?");
return value;
}))
.doOnSuccess(s -> log.debug("4. Getting value: {}", key))
.then(map.getValue(null, key)),
Mono
.fromRunnable(() -> log.debug("5. Updating value: {}", key))
.then(map.updateValue(key, true, old -> {
assert Objects.equals(old, value);
return value;
}))
.doOnSuccess(s -> log.debug("5. Getting value: {}", key))
.then(map.getValue(null, key))
)
.doAfterTerminate(map::release)
)

View File

@ -0,0 +1,311 @@
package it.cavallium.dbengine;
import static it.cavallium.dbengine.DbTestUtils.destroyAllocator;
import static it.cavallium.dbengine.DbTestUtils.ensureNoLeaks;
import static it.cavallium.dbengine.DbTestUtils.newAllocator;
import static org.junit.jupiter.api.Assertions.assertEquals;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.DbTestUtils.TempDb;
import it.cavallium.dbengine.DbTestUtils.TestAllocator;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.UpdateReturnMode;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Objects;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public abstract class TestLLDictionary {
private final Logger log = LoggerFactory.getLogger(this.getClass());
private static final Mono<Send<LLRange>> RANGE_ALL = Mono.fromCallable(() -> LLRange.all().send());
private TestAllocator allocator;
private TempDb tempDb;
private LLKeyValueDatabase db;
protected abstract TemporaryDbGenerator getTempDbGenerator();
@BeforeEach
public void beforeEach() {
this.allocator = newAllocator();
ensureNoLeaks(allocator.allocator(), false, false);
tempDb = Objects.requireNonNull(getTempDbGenerator().openTempDb(allocator).block(), "TempDB");
db = tempDb.db();
}
@AfterEach
public void afterEach() {
getTempDbGenerator().closeTempDb(tempDb).block();
ensureNoLeaks(allocator.allocator(), true, false);
destroyAllocator(allocator);
}
public static Stream<Arguments> provideArguments() {
return Arrays.stream(UpdateMode.values()).map(Arguments::of);
}
public static Stream<Arguments> providePutArguments() {
var updateModes = Arrays.stream(UpdateMode.values());
return updateModes.flatMap(updateMode -> {
var resultTypes = Arrays.stream(LLDictionaryResultType.values());
return resultTypes.map(resultType -> Arguments.of(updateMode, resultType));
});
}
public static Stream<Arguments> provideUpdateArguments() {
var updateModes = Arrays.stream(UpdateMode.values());
return updateModes.flatMap(updateMode -> {
var resultTypes = Arrays.stream(UpdateReturnMode.values());
return resultTypes.map(resultType -> Arguments.of(updateMode, resultType));
});
}
private LLDictionary getDict(UpdateMode updateMode) {
var dict = DbTestUtils.tempDictionary(db, updateMode).blockOptional().orElseThrow();
var key1 = Mono.fromCallable(() -> fromString("test-key-1"));
var key2 = Mono.fromCallable(() -> fromString("test-key-2"));
var key3 = Mono.fromCallable(() -> fromString("test-key-3"));
var key4 = Mono.fromCallable(() -> fromString("test-key-4"));
var value = Mono.fromCallable(() -> fromString("test-value"));
dict.put(key1, value, LLDictionaryResultType.VOID).block();
dict.put(key2, value, LLDictionaryResultType.VOID).block();
dict.put(key3, value, LLDictionaryResultType.VOID).block();
dict.put(key4, value, LLDictionaryResultType.VOID).block();
return dict;
}
private Send<Buffer> fromString(String s) {
var sb = s.getBytes(StandardCharsets.UTF_8);
try (var b = db.getAllocator().allocate(sb.length + 3 + 13)) {
assert b.writerOffset() == 0;
assert b.readerOffset() == 0;
b.writerOffset(3).writeBytes(sb);
b.readerOffset(3);
assert b.readableBytes() == sb.length;
var part1 = b.split();
return LLUtils.compositeBuffer(db.getAllocator(), part1.send(), b.send()).send();
}
}
private String toString(Send<Buffer> b) {
try (var bb = b.receive()) {
byte[] data = new byte[bb.readableBytes()];
bb.copyInto(bb.readerOffset(), data, 0, data.length);
return new String(data, StandardCharsets.UTF_8);
}
}
private void run(Flux<?> publisher) {
publisher.subscribeOn(Schedulers.immediate()).blockLast();
}
private void runVoid(Mono<Void> publisher) {
publisher.then().subscribeOn(Schedulers.immediate()).block();
}
private <T> T run(Mono<T> publisher) {
return publisher.subscribeOn(Schedulers.immediate()).block();
}
private <T> T run(boolean shouldFail, Mono<T> publisher) {
return publisher.subscribeOn(Schedulers.immediate()).transform(mono -> {
if (shouldFail) {
return mono.onErrorResume(ex -> Mono.empty());
} else {
return mono;
}
}).block();
}
private void runVoid(boolean shouldFail, Mono<Void> publisher) {
publisher.then().subscribeOn(Schedulers.immediate()).transform(mono -> {
if (shouldFail) {
return mono.onErrorResume(ex -> Mono.empty());
} else {
return mono;
}
}).block();
}
@Test
public void testNoOp() {
}
@Test
public void testNoOpAllocation() {
for (int i = 0; i < 10; i++) {
var a = allocator.allocator().allocate(i * 512);
a.send().receive().close();
}
}
@ParameterizedTest
@MethodSource("provideArguments")
public void testGetDict(UpdateMode updateMode) {
var dict = getDict(updateMode);
Assertions.assertNotNull(dict);
}
@ParameterizedTest
@MethodSource("provideArguments")
public void testGetColumnName(UpdateMode updateMode) {
var dict = getDict(updateMode);
Assertions.assertEquals("hash_map_testmap", dict.getColumnName());
}
@ParameterizedTest
@MethodSource("provideArguments")
public void testGetAllocator(UpdateMode updateMode) {
var dict = getDict(updateMode);
var alloc = dict.getAllocator();
Assertions.assertEquals(alloc, alloc);
}
@ParameterizedTest
@MethodSource("provideArguments")
public void testGet(UpdateMode updateMode) {
var dict = getDict(updateMode);
var keyEx = Mono.fromCallable(() -> fromString("test-key-1"));
var keyNonEx = Mono.fromCallable(() -> fromString("test-nonexistent"));
Assertions.assertEquals("test-value", run(dict.get(null, keyEx).map(this::toString).transform(LLUtils::handleDiscard)));
Assertions.assertEquals("test-value", run(dict.get(null, keyEx, true).map(this::toString).transform(LLUtils::handleDiscard)));
Assertions.assertEquals("test-value", run(dict.get(null, keyEx, false).map(this::toString).transform(LLUtils::handleDiscard)));
Assertions.assertEquals((String) null, run(dict.get(null, keyNonEx).map(this::toString).transform(LLUtils::handleDiscard)));
Assertions.assertEquals((String) null, run(dict.get(null, keyNonEx, true).map(this::toString).transform(LLUtils::handleDiscard)));
Assertions.assertEquals((String) null, run(dict.get(null, keyNonEx, false).map(this::toString).transform(LLUtils::handleDiscard)));
}
@ParameterizedTest
@MethodSource("providePutArguments")
public void testPutExisting(UpdateMode updateMode, LLDictionaryResultType resultType) {
var dict = getDict(updateMode);
var keyEx = Mono.fromCallable(() -> fromString("test-key-1"));
var value = Mono.fromCallable(() -> fromString("test-value"));
var beforeSize = run(dict.sizeRange(null, RANGE_ALL, false));
runVoid(dict.put(keyEx, value, resultType).then().doOnDiscard(Send.class, Send::close));
var afterSize = run(dict.sizeRange(null, RANGE_ALL, false));
Assertions.assertEquals(0, afterSize - beforeSize);
}
@ParameterizedTest
@MethodSource("providePutArguments")
public void testPutNew(UpdateMode updateMode, LLDictionaryResultType resultType) {
var dict = getDict(updateMode);
var keyNonEx = Mono.fromCallable(() -> fromString("test-nonexistent"));
var value = Mono.fromCallable(() -> fromString("test-value"));
var beforeSize = run(dict.sizeRange(null, RANGE_ALL, false));
runVoid(dict.put(keyNonEx, value, resultType).then().doOnDiscard(Send.class, Send::close));
var afterSize = run(dict.sizeRange(null, Mono.fromCallable(() -> LLRange.all().send()), false));
Assertions.assertEquals(1, afterSize - beforeSize);
Assertions.assertTrue(run(dict.getRangeKeys(null, RANGE_ALL).map(this::toString).collectList()).contains("test-nonexistent"));
}
@ParameterizedTest
@MethodSource("provideArguments")
public void testGetUpdateMode(UpdateMode updateMode) {
var dict = getDict(updateMode);
assertEquals(updateMode, run(dict.getUpdateMode()));
}
@ParameterizedTest
@MethodSource("provideUpdateArguments")
public void testUpdateExisting(UpdateMode updateMode, UpdateReturnMode updateReturnMode) {
var dict = getDict(updateMode);
var keyEx = Mono.fromCallable(() -> fromString("test-key-1"));
var beforeSize = run(dict.sizeRange(null, RANGE_ALL, false));
long afterSize;
runVoid(updateMode == UpdateMode.DISALLOW,
dict.update(keyEx, old -> fromString("test-value"), updateReturnMode, true).then().transform(LLUtils::handleDiscard)
);
afterSize = run(dict.sizeRange(null, RANGE_ALL, false));
assertEquals(0, afterSize - beforeSize);
runVoid(updateMode == UpdateMode.DISALLOW,
dict.update(keyEx, old -> fromString("test-value"), updateReturnMode, false).then().transform(LLUtils::handleDiscard)
);
afterSize = run(dict.sizeRange(null, RANGE_ALL, false));
assertEquals(0, afterSize - beforeSize);
runVoid(updateMode == UpdateMode.DISALLOW,
dict.update(keyEx, old -> fromString("test-value"), updateReturnMode).then().transform(LLUtils::handleDiscard)
);
afterSize = run(dict.sizeRange(null, RANGE_ALL, false));
assertEquals(0, afterSize - beforeSize);
}
@ParameterizedTest
@MethodSource("provideUpdateArguments")
public void testUpdateNew(UpdateMode updateMode, UpdateReturnMode updateReturnMode) {
int expected = updateMode == UpdateMode.DISALLOW ? 0 : 1;
var dict = getDict(updateMode);
var keyNonEx = Mono.fromCallable(() -> fromString("test-nonexistent"));
var beforeSize = run(dict.sizeRange(null, RANGE_ALL, false));
long afterSize;
runVoid(updateMode == UpdateMode.DISALLOW,
dict.update(keyNonEx, old -> fromString("test-value"), updateReturnMode, true).then().transform(LLUtils::handleDiscard)
);
afterSize = run(dict.sizeRange(null, RANGE_ALL, false));
assertEquals(expected, afterSize - beforeSize);
runVoid(updateMode == UpdateMode.DISALLOW,
dict.update(keyNonEx, old -> fromString("test-value"), updateReturnMode, false).then().transform(LLUtils::handleDiscard)
);
afterSize = run(dict.sizeRange(null, RANGE_ALL, false));
assertEquals(expected, afterSize - beforeSize);
runVoid(updateMode == UpdateMode.DISALLOW,
dict.update(keyNonEx, old -> fromString("test-value"), updateReturnMode).then().transform(LLUtils::handleDiscard)
);
afterSize = run(dict.sizeRange(null, RANGE_ALL, false));
assertEquals(expected, afterSize - beforeSize);
if (updateMode != UpdateMode.DISALLOW) {
Assertions.assertTrue(run(dict.getRangeKeys(null, RANGE_ALL).map(this::toString).collectList()).contains(
"test-nonexistent"));
}
}
@ParameterizedTest
@MethodSource("provideArguments")
public void testUpdateAndGetDelta(UpdateMode updateMode) {
log.warn("Test not implemented");
//todo: implement
}
@ParameterizedTest
@MethodSource("provideArguments")
public void testClear(UpdateMode updateMode) {
log.warn("Test not implemented");
//todo: implement
}
@ParameterizedTest
@MethodSource("providePutArguments")
public void testRemove(UpdateMode updateMode, LLDictionaryResultType resultType) {
log.warn("Test not implemented");
//todo: implement
}
}

View File

@ -26,9 +26,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public abstract class TestLLDictionaryLeaks {
@ -90,43 +88,12 @@ public abstract class TestLLDictionaryLeaks {
private Send<Buffer> fromString(String s) {
var sb = s.getBytes(StandardCharsets.UTF_8);
try (var b = db.getAllocator().allocate(sb.length)) {
b.writeBytes(b);
b.writeBytes(sb);
assert b.readableBytes() == sb.length;
return b.send();
}
}
private void run(Flux<?> publisher) {
publisher.subscribeOn(Schedulers.immediate()).blockLast();
}
private void runVoid(Mono<Void> publisher) {
publisher.then().subscribeOn(Schedulers.immediate()).block();
}
private <T> T run(Mono<T> publisher) {
return publisher.subscribeOn(Schedulers.immediate()).block();
}
private <T> T run(boolean shouldFail, Mono<T> publisher) {
return publisher.subscribeOn(Schedulers.immediate()).transform(mono -> {
if (shouldFail) {
return mono.onErrorResume(ex -> Mono.empty());
} else {
return mono;
}
}).block();
}
private void runVoid(boolean shouldFail, Mono<Void> publisher) {
publisher.then().subscribeOn(Schedulers.immediate()).transform(mono -> {
if (shouldFail) {
return mono.onErrorResume(ex -> Mono.empty());
} else {
return mono;
}
}).block();
}
@Test
public void testNoOp() {
}
@ -164,9 +131,9 @@ public abstract class TestLLDictionaryLeaks {
public void testGet(UpdateMode updateMode) {
var dict = getDict(updateMode);
var key = Mono.fromCallable(() -> fromString("test"));
runVoid(dict.get(null, key).then().transform(LLUtils::handleDiscard));
runVoid(dict.get(null, key, true).then().transform(LLUtils::handleDiscard));
runVoid(dict.get(null, key, false).then().transform(LLUtils::handleDiscard));
DbTestUtils.runVoid(dict.get(null, key).then().transform(LLUtils::handleDiscard));
DbTestUtils.runVoid(dict.get(null, key, true).then().transform(LLUtils::handleDiscard));
DbTestUtils.runVoid(dict.get(null, key, false).then().transform(LLUtils::handleDiscard));
}
@ParameterizedTest
@ -175,14 +142,14 @@ public abstract class TestLLDictionaryLeaks {
var dict = getDict(updateMode);
var key = Mono.fromCallable(() -> fromString("test-key"));
var value = Mono.fromCallable(() -> fromString("test-value"));
runVoid(dict.put(key, value, resultType).then().doOnDiscard(Send.class, Send::close));
DbTestUtils.runVoid(dict.put(key, value, resultType).then().doOnDiscard(Send.class, Send::close));
}
@ParameterizedTest
@MethodSource("provideArguments")
public void testGetUpdateMode(UpdateMode updateMode) {
var dict = getDict(updateMode);
assertEquals(updateMode, run(dict.getUpdateMode()));
assertEquals(updateMode, DbTestUtils.run(dict.getUpdateMode()));
}
@ParameterizedTest
@ -190,13 +157,13 @@ public abstract class TestLLDictionaryLeaks {
public void testUpdate(UpdateMode updateMode, UpdateReturnMode updateReturnMode) {
var dict = getDict(updateMode);
var key = Mono.fromCallable(() -> fromString("test-key"));
runVoid(updateMode == UpdateMode.DISALLOW,
DbTestUtils.runVoid(updateMode == UpdateMode.DISALLOW,
dict.update(key, old -> old, updateReturnMode, true).then().transform(LLUtils::handleDiscard)
);
runVoid(updateMode == UpdateMode.DISALLOW,
DbTestUtils.runVoid(updateMode == UpdateMode.DISALLOW,
dict.update(key, old -> old, updateReturnMode, false).then().transform(LLUtils::handleDiscard)
);
runVoid(updateMode == UpdateMode.DISALLOW,
DbTestUtils.runVoid(updateMode == UpdateMode.DISALLOW,
dict.update(key, old -> old, updateReturnMode).then().transform(LLUtils::handleDiscard)
);
}
@ -206,13 +173,13 @@ public abstract class TestLLDictionaryLeaks {
public void testUpdateAndGetDelta(UpdateMode updateMode) {
var dict = getDict(updateMode);
var key = Mono.fromCallable(() -> fromString("test-key"));
runVoid(updateMode == UpdateMode.DISALLOW,
DbTestUtils.runVoid(updateMode == UpdateMode.DISALLOW,
dict.updateAndGetDelta(key, old -> old, true).then().transform(LLUtils::handleDiscard)
);
runVoid(updateMode == UpdateMode.DISALLOW,
DbTestUtils.runVoid(updateMode == UpdateMode.DISALLOW,
dict.updateAndGetDelta(key, old -> old, false).then().transform(LLUtils::handleDiscard)
);
runVoid(updateMode == UpdateMode.DISALLOW,
DbTestUtils.runVoid(updateMode == UpdateMode.DISALLOW,
dict.updateAndGetDelta(key, old -> old).then().transform(LLUtils::handleDiscard)
);
}
@ -221,7 +188,7 @@ public abstract class TestLLDictionaryLeaks {
@MethodSource("provideArguments")
public void testClear(UpdateMode updateMode) {
var dict = getDict(updateMode);
runVoid(dict.clear());
DbTestUtils.runVoid(dict.clear());
}
@ParameterizedTest
@ -229,6 +196,6 @@ public abstract class TestLLDictionaryLeaks {
public void testRemove(UpdateMode updateMode, LLDictionaryResultType resultType) {
var dict = getDict(updateMode);
var key = Mono.fromCallable(() -> fromString("test-key"));
runVoid(dict.remove(key, resultType).then().doOnDiscard(Send.class, Send::close));
DbTestUtils.runVoid(dict.remove(key, resultType).then().doOnDiscard(Send.class, Send::close));
}
}

View File

@ -0,0 +1,11 @@
package it.cavallium.dbengine;
public class TestLocalLLDictionary extends TestLLDictionary {
private static final TemporaryDbGenerator GENERATOR = new LocalTemporaryDbGenerator();
@Override
protected TemporaryDbGenerator getTempDbGenerator() {
return GENERATOR;
}
}

View File

@ -0,0 +1,11 @@
package it.cavallium.dbengine;
public class TestMemoryLLDictionary extends TestLLDictionary {
private static final TemporaryDbGenerator GENERATOR = new MemoryTemporaryDbGenerator();
@Override
protected TemporaryDbGenerator getTempDbGenerator() {
return GENERATOR;
}
}

View File

@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<Configuration strict="true"
xmlns="http://logging.apache.org/log4j/2.0/config"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://logging.apache.org/log4j/2.0/config
https://raw.githubusercontent.com/apache/logging-log4j2/log4j-2.14.1/log4j-core/src/main/resources/Log4j-config.xsd"
status="ALL">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout
pattern="%d{${LOG_DATEFORMAT_PATTERN:-yyyy-MM-dd HH:mm:ss.SSS}} %highlight{${LOG_LEVEL_PATTERN:-%5p}}{FATAL=red blink, ERROR=red, WARN=yellow bold, INFO=green, DEBUG=green bold, TRACE=blue} %style{%processId}{magenta} %style{%-20.20c{1}}{cyan} : %m%n%ex"/>
</Console>
</Appenders>
<Loggers>
<Root level="ALL">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>