Optimistic transactions, inline codecs

This commit is contained in:
Andrea Cavalli 2021-10-19 00:22:05 +02:00
parent aad5f8c96c
commit 80d0ced888
22 changed files with 1143 additions and 722 deletions

View File

@ -19,15 +19,17 @@ public class MappedSerializer<A, B> implements Serializer<B> {
}
@Override
public @NotNull DeserializationResult<B> deserialize(@NotNull Send<Buffer> serialized) throws SerializationException {
try (serialized) {
var deserialized = serializer.deserialize(serialized);
return new DeserializationResult<>(keyMapper.map(deserialized.deserializedData()), deserialized.bytesRead());
}
public @NotNull B deserialize(@NotNull Buffer serialized) throws SerializationException {
return keyMapper.map(serializer.deserialize(serialized));
}
@Override
public @NotNull Send<Buffer> serialize(@NotNull B deserialized) throws SerializationException {
return serializer.serialize(keyMapper.unmap(deserialized));
public void serialize(@NotNull B deserialized, Buffer output) throws SerializationException {
serializer.serialize(keyMapper.unmap(deserialized), output);
}
@Override
public int getSerializedSizeHint() {
return serializer.getSerializedSizeHint();
}
}

View File

@ -19,16 +19,13 @@ public class MappedSerializerFixedLength<A, B> implements SerializerFixedBinaryL
}
@Override
public @NotNull DeserializationResult<B> deserialize(@NotNull Send<Buffer> serialized) throws SerializationException {
try (serialized) {
var deserialized = fixedLengthSerializer.deserialize(serialized);
return new DeserializationResult<>(keyMapper.map(deserialized.deserializedData()), deserialized.bytesRead());
}
public @NotNull B deserialize(@NotNull Buffer serialized) throws SerializationException {
return keyMapper.map(fixedLengthSerializer.deserialize(serialized));
}
@Override
public @NotNull Send<Buffer> serialize(@NotNull B deserialized) throws SerializationException {
return fixedLengthSerializer.serialize(keyMapper.unmap(deserialized));
public void serialize(@NotNull B deserialized, Buffer output) throws SerializationException {
fixedLengthSerializer.serialize(keyMapper.unmap(deserialized), output);
}
@Override

View File

@ -63,7 +63,6 @@ import reactor.util.function.Tuple3;
public class LLUtils {
private static final Logger logger = LoggerFactory.getLogger(LLUtils.class);
public static final Marker MARKER_DB_BUFFER = MarkerFactory.getMarker("DB_BUFFER");
public static final Marker MARKER_ROCKSDB = MarkerFactory.getMarker("ROCKSDB");
public static final Marker MARKER_LUCENE = MarkerFactory.getMarker("LUCENE");
@ -193,6 +192,18 @@ public class LLUtils {
}
}
public static String toStringSafe(byte @Nullable[] key) {
try {
if (key == null) {
return toString(key);
} else {
return "(released)";
}
} catch (IllegalReferenceCountException ex) {
return "(released)";
}
}
public static String toStringSafe(@Nullable LLRange range) {
try {
if (range == null || range.isAccessible()) {
@ -268,6 +279,53 @@ public class LLUtils {
}
}
public static String toString(byte @Nullable[] key) {
if (key == null) {
return "null";
} else {
int startIndex = 0;
int iMax = key.length - 1;
int iLimit = 128;
if (iMax <= -1) {
return "[]";
} else {
StringBuilder arraySB = new StringBuilder();
StringBuilder asciiSB = new StringBuilder();
boolean isAscii = true;
arraySB.append('[');
int i = 0;
while (true) {
var byteVal = (int) key[startIndex + i];
arraySB.append(byteVal);
if (isAscii) {
if (byteVal >= 32 && byteVal < 127) {
asciiSB.append((char) byteVal);
} else if (byteVal == 0) {
asciiSB.append('␀');
} else {
isAscii = false;
asciiSB = null;
}
}
if (i == iLimit) {
arraySB.append("");
}
if (i == iMax || i == iLimit) {
if (isAscii) {
return asciiSB.insert(0, "\"").append("\"").toString();
} else {
return arraySB.append(']').toString();
}
}
arraySB.append(", ");
++i;
}
}
}
}
public static boolean equals(Buffer a, Buffer b) {
if (a == null && b == null) {
return true;
@ -1005,6 +1063,12 @@ public class LLUtils {
}
}
public static String deserializeString(@NotNull Buffer buffer, int readerOffset, int length, Charset charset) {
byte[] bytes = new byte[Math.min(length, buffer.readableBytes())];
buffer.copyInto(readerOffset, bytes, 0, length);
return new String(bytes, charset);
}
public static int utf8MaxBytes(String deserialized) {
return deserialized.length() * 3;
}

View File

@ -6,8 +6,8 @@ import io.net5.buffer.api.Drop;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.Serializer.DeserializationResult;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@ -15,20 +15,23 @@ public class DatabaseEmpty {
@SuppressWarnings({"unused", "InstantiationOfUtilityClass"})
public static final Nothing NOTHING = new Nothing();
public static final DeserializationResult<Nothing> NOTHING_RESULT = new DeserializationResult<>(NOTHING, 0);
public static Serializer<Nothing> nothingSerializer(BufferAllocator bufferAllocator) {
return new Serializer<>() {
@Override
public @NotNull DeserializationResult<Nothing> deserialize(@Nullable Send<Buffer> serialized) {
try (serialized) {
return NOTHING_RESULT;
}
public @NotNull Nothing deserialize(@NotNull Buffer serialized) {
return NOTHING;
}
@Override
public @NotNull Send<Buffer> serialize(@NotNull Nothing deserialized) {
return LLUtils.empty(bufferAllocator);
public void serialize(@NotNull Nothing deserialized, Buffer output) {
}
@Override
public int getSerializedSizeHint() {
return 0;
}
};
}

View File

@ -1,7 +1,6 @@
package it.cavallium.dbengine.database.collections;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.Drop;
import io.net5.buffer.api.Send;
import io.net5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.client.CompositeSnapshot;
@ -66,9 +65,36 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
return new DatabaseMapDictionary<>(dictionary, prefixKey, keySuffixSerializer, valueSerializer, onClose);
}
private void deserializeValue(Send<Buffer> valueToReceive, SynchronousSink<U> sink) {
try (var value = valueToReceive.receive()) {
sink.next(valueSerializer.deserialize(value));
} catch (Throwable ex) {
sink.error(ex);
}
}
private Send<Buffer> serializeValue(U value) throws SerializationException {
var valSizeHint = valueSerializer.getSerializedSizeHint();
if (valSizeHint == -1) valSizeHint = 128;
try (var valBuf = dictionary.getAllocator().allocate(valSizeHint)) {
valueSerializer.serialize(value, valBuf);
return valBuf.send();
}
}
private Send<Buffer> serializeKeySuffixToKey(T keySuffix) throws SerializationException {
try (var keyBuf = keyPrefix.copy()) {
assert keyBuf.readableBytes() == keyPrefixLength;
keyBuf.ensureWritable(keySuffixLength + keyExtLength);
serializeSuffix(keySuffix, keyBuf);
assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
return keyBuf.send();
}
}
private Send<Buffer> toKey(Send<Buffer> suffixKeyToSend) {
try (var suffixKey = suffixKeyToSend.receive()) {
assert suffixKeyConsistency(suffixKey.readableBytes());
assert suffixKeyLengthConsistency(suffixKey.readableBytes());
if (keyPrefix.readableBytes() > 0) {
try (var result = LLUtils.compositeBuffer(dictionary.getAllocator(),
LLUtils.copy(dictionary.getAllocator(), keyPrefix),
@ -84,29 +110,28 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
}
}
private void deserializeValue(Send<Buffer> value, SynchronousSink<U> sink) {
try {
sink.next(valueSerializer.deserialize(value).deserializedData());
} catch (SerializationException ex) {
sink.error(ex);
}
}
@Override
public Mono<Map<T, U>> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) {
return dictionary
.getRange(resolveSnapshot(snapshot), rangeMono, existsAlmostCertainly)
.<Entry<T, U>>handle((entrySend, sink) -> {
try (var entry = entrySend.receive()) {
T key;
try (var serializedKey = entry.getKey().receive()) {
removePrefix(serializedKey);
suffixKeyConsistency(serializedKey.readableBytes());
key = deserializeSuffix(serializedKey.send());
Entry<T, U> deserializedEntry;
try {
try (var entry = entrySend.receive()) {
T key;
try (var serializedKey = entry.getKey().receive()) {
splitPrefix(serializedKey).close();
suffixKeyLengthConsistency(serializedKey.readableBytes());
key = deserializeSuffix(serializedKey);
}
U value;
try (var valueBuf = entry.getValue().receive()) {
value = valueSerializer.deserialize(valueBuf);
}
deserializedEntry = Map.entry(key, value);
}
var value = valueSerializer.deserialize(entry.getValue()).deserializedData();
sink.next(Map.entry(key, value));
} catch (SerializationException ex) {
sink.next(deserializedEntry);
} catch (Throwable ex) {
sink.error(ex);
}
})
@ -120,14 +145,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
.get(null, false)
.concatWith(dictionary.setRange(rangeMono, Flux
.fromIterable(Collections.unmodifiableMap(value).entrySet())
.handle((entry, sink) -> {
try {
sink.next(LLEntry.of(this.toKey(serializeSuffix(entry.getKey())),
valueSerializer.serialize(entry.getValue())).send());
} catch (SerializationException e) {
sink.error(e);
}
})
.handle(this::serializeEntrySink)
).then(Mono.empty()))
.singleOrEmpty()
.transform(LLUtils::handleDiscard);
@ -152,14 +170,14 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<DatabaseStageEntry<U>> at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
return Mono.fromCallable(() ->
new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix)), valueSerializer, null));
new DatabaseSingle<>(dictionary, serializeKeySuffixToKey(keySuffix), valueSerializer, null));
}
@Override
public Mono<U> getValue(@Nullable CompositeSnapshot snapshot, T keySuffix, boolean existsAlmostCertainly) {
return dictionary
.get(resolveSnapshot(snapshot),
Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix))),
Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix)),
existsAlmostCertainly
)
.handle(this::deserializeValue);
@ -167,8 +185,8 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<Void> putValue(T keySuffix, U value) {
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix))).single();
var valueMono = Mono.fromCallable(() -> valueSerializer.serialize(value)).single();
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix)).single();
var valueMono = Mono.fromCallable(() -> serializeValue(value)).single();
return dictionary
.put(keyMono, valueMono, LLDictionaryResultType.VOID)
.doOnNext(Send::close)
@ -185,7 +203,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
UpdateReturnMode updateReturnMode,
boolean existsAlmostCertainly,
SerializationFunction<@Nullable U, @Nullable U> updater) {
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix));
return dictionary
.update(keyMono, getSerializedUpdater(updater), updateReturnMode, existsAlmostCertainly)
.handle(this::deserializeValue);
@ -195,12 +213,14 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
public Mono<Delta<U>> updateValueAndGetDelta(T keySuffix,
boolean existsAlmostCertainly,
SerializationFunction<@Nullable U, @Nullable U> updater) {
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix));
return dictionary
.updateAndGetDelta(keyMono, getSerializedUpdater(updater), existsAlmostCertainly)
.transform(mono -> LLUtils.mapLLDelta(mono,
serialized -> valueSerializer.deserialize(serialized).deserializedData()
));
.transform(mono -> LLUtils.mapLLDelta(mono, serializedToReceive -> {
try (var serialized = serializedToReceive.receive()) {
return valueSerializer.deserialize(serialized);
}
}));
}
public SerializationFunction<@Nullable Send<Buffer>, @Nullable Send<Buffer>> getSerializedUpdater(
@ -211,12 +231,14 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
if (oldSerialized == null) {
result = updater.apply(null);
} else {
result = updater.apply(valueSerializer.deserialize(oldSerialized).deserializedData());
try (var oldSerializedReceived = oldSerialized.receive()) {
result = updater.apply(valueSerializer.deserialize(oldSerializedReceived));
}
}
if (result == null) {
return null;
} else {
return valueSerializer.serialize(result);
return serializeValue(result);
}
}
};
@ -230,12 +252,14 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
if (oldSerialized == null) {
result = updater.apply(null, extra);
} else {
result = updater.apply(valueSerializer.deserialize(oldSerialized).deserializedData(), extra);
try (var oldSerializedReceived = oldSerialized.receive()) {
result = updater.apply(valueSerializer.deserialize(oldSerializedReceived), extra);
}
}
if (result == null) {
return null;
} else {
return valueSerializer.serialize(result);
return serializeValue(result);
}
}
};
@ -243,19 +267,15 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<U> putValueAndGetPrevious(T keySuffix, U value) {
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
var valueMono = Mono.fromCallable(() -> valueSerializer.serialize(value));
return dictionary
.put(keyMono,
valueMono,
LLDictionaryResultType.PREVIOUS_VALUE)
.handle(this::deserializeValue);
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix));
var valueMono = Mono.fromCallable(() -> serializeValue(value));
return dictionary.put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE).handle(this::deserializeValue);
}
@Override
public Mono<Boolean> putValueAndGetChanged(T keySuffix, U value) {
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
var valueMono = Mono.fromCallable(() -> valueSerializer.serialize(value));
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix));
var valueMono = Mono.fromCallable(() -> serializeValue(value));
return dictionary
.put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE)
.handle(this::deserializeValue)
@ -265,7 +285,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<Void> remove(T keySuffix) {
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix));
return dictionary
.remove(keyMono, LLDictionaryResultType.VOID)
.doOnNext(Send::close)
@ -274,15 +294,13 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<U> removeAndGetPrevious(T keySuffix) {
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
return dictionary
.remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE)
.handle(this::deserializeValue);
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix));
return dictionary.remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE).handle(this::deserializeValue);
}
@Override
public Mono<Boolean> removeAndGetStatus(T keySuffix) {
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix));
return dictionary
.remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE_EXISTENCE)
.map(LLUtils::responseToBoolean);
@ -293,8 +311,9 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
var mappedKeys = keys
.<Tuple2<T, Send<Buffer>>>handle((keySuffix, sink) -> {
try {
sink.next(Tuples.of(keySuffix, toKey(serializeSuffix(keySuffix))));
} catch (SerializationException ex) {
Tuple2<T, Send<Buffer>> tuple = Tuples.of(keySuffix, serializeKeySuffixToKey(keySuffix));
sink.next(tuple);
} catch (Throwable ex) {
sink.error(ex);
}
});
@ -304,14 +323,14 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
try {
Optional<U> valueOpt;
if (entry.getT3().isPresent()) {
try (var buf = entry.getT3().get()) {
valueOpt = Optional.of(valueSerializer.deserialize(buf).deserializedData());
try (var buf = entry.getT3().get().receive()) {
valueOpt = Optional.of(valueSerializer.deserialize(buf));
}
} else {
valueOpt = Optional.empty();
}
sink.next(Map.entry(entry.getT1(), valueOpt));
} catch (SerializationException ex) {
} catch (Throwable ex) {
sink.error(ex);
} finally {
entry.getT2().close();
@ -321,22 +340,29 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
.transform(LLUtils::handleDiscard);
}
private Send<LLEntry> serializeEntry(T key, U value) throws SerializationException {
try (var serializedKey = toKey(serializeSuffix(key))) {
var serializedValueToReceive = valueSerializer.serialize(value);
try (var serializedValue = serializedValueToReceive.receive()) {
return LLEntry.of(serializedKey, serializedValue.send()).send();
private Send<LLEntry> serializeEntry(T keySuffix, U value) throws SerializationException {
try (var key = serializeKeySuffixToKey(keySuffix)) {
try (var serializedValue = serializeValue(value)) {
return LLEntry.of(key, serializedValue).send();
}
}
}
private void serializeEntrySink(Entry<T,U> entry, SynchronousSink<Send<LLEntry>> sink) {
try {
sink.next(serializeEntry(entry.getKey(), entry.getValue()));
} catch (Throwable e) {
sink.error(e);
}
}
@Override
public Mono<Void> putMulti(Flux<Entry<T, U>> entries) {
var serializedEntries = entries
.<Send<LLEntry>>handle((entry, sink) -> {
try {
sink.next(serializeEntry(entry.getKey(), entry.getValue()));
} catch (SerializationException e) {
} catch (Throwable e) {
sink.error(e);
}
})
@ -363,8 +389,9 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
var serializedEntries = entries
.<Tuple2<Send<Buffer>, X>>handle((entry, sink) -> {
try {
sink.next(Tuples.of(serializeSuffix(entry.getT1()), entry.getT2()));
} catch (SerializationException ex) {
Send<Buffer> serializedKey = serializeKeySuffixToKey(entry.getT1());
sink.next(Tuples.of(serializedKey, entry.getT2()));
} catch (Throwable ex) {
sink.error(ex);
}
})
@ -377,17 +404,17 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
}
});
var serializedUpdater = getSerializedUpdater(updater);
return dictionary.updateMulti(serializedEntries, serializedUpdater)
.handle((result, sink) -> {
try {
sink.next(new ExtraKeyOperationResult<>(deserializeSuffix(result.key()),
result.extra(),
result.changed()
));
} catch (SerializationException ex) {
sink.error(ex);
}
});
return dictionary.updateMulti(serializedEntries, serializedUpdater).handle((result, sink) -> {
try {
T keySuffix;
try (var keySuffixBuf = result.key().receive()) {
keySuffix = deserializeSuffix(keySuffixBuf);
}
sink.next(new ExtraKeyOperationResult<>(keySuffix, result.extra(), result.changed()));
} catch (Throwable ex) {
sink.error(ex);
}
});
}
@Override
@ -398,12 +425,15 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
try (var keyBuf = keyBufToReceive.receive()) {
assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
// Remove prefix. Keep only the suffix and the ext
removePrefix(keyBuf);
suffixKeyConsistency(keyBuf.readableBytes());
sink.next(Map.entry(deserializeSuffix(keyBuf.copy().send()),
new DatabaseSingle<>(dictionary, toKey(keyBuf.send()), valueSerializer, null)
));
} catch (SerializationException ex) {
splitPrefix(keyBuf).close();
suffixKeyLengthConsistency(keyBuf.readableBytes());
T keySuffix;
try (var keyBufCopy = keyBuf.copy()) {
keySuffix = deserializeSuffix(keyBufCopy);
}
var subStage = new DatabaseSingle<>(dictionary, toKey(keyBuf.send()), valueSerializer, null);
sink.next(Map.entry(keySuffix, subStage));
} catch (Throwable ex) {
sink.error(ex);
}
});
@ -414,16 +444,25 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
return dictionary
.getRange(resolveSnapshot(snapshot), rangeMono)
.<Entry<T, U>>handle((serializedEntryToReceive, sink) -> {
try (var serializedEntry = serializedEntryToReceive.receive()) {
try (var keyBuf = serializedEntry.getKey().receive()) {
assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
// Remove prefix. Keep only the suffix and the ext
removePrefix(keyBuf);
suffixKeyConsistency(keyBuf.readableBytes());
sink.next(Map.entry(deserializeSuffix(keyBuf.send()),
valueSerializer.deserialize(serializedEntry.getValue()).deserializedData()));
try {
Entry<T, U> entry;
try (var serializedEntry = serializedEntryToReceive.receive()) {
try (var keyBuf = serializedEntry.getKey().receive()) {
assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
// Remove prefix. Keep only the suffix and the ext
splitPrefix(keyBuf).close();
suffixKeyLengthConsistency(keyBuf.readableBytes());
T keySuffix = deserializeSuffix(keyBuf);
U value;
try (var valueBuf = serializedEntry.getValue().receive()) {
value = valueSerializer.deserialize(valueBuf);
}
entry = Map.entry(keySuffix, value);
}
}
} catch (SerializationException e) {
sink.next(entry);
} catch (Throwable e) {
sink.error(e);
}
})
@ -441,14 +480,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
public Flux<Entry<T, U>> setAllValuesAndGetPrevious(Flux<Entry<T, U>> entries) {
return Flux.concat(
this.getAllValues(null),
dictionary.setRange(rangeMono, entries.handle((entry, sink) -> {
try {
sink.next(LLEntry.of(toKey(serializeSuffix(entry.getKey())),
valueSerializer.serialize(entry.getValue())).send());
} catch (SerializationException e) {
sink.error(e);
}
})).then(Mono.empty())
dictionary.setRange(rangeMono, entries.handle(this::serializeEntrySink)).then(Mono.empty())
);
}

View File

@ -269,7 +269,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
}
@SuppressWarnings("unused")
protected boolean suffixKeyConsistency(int keySuffixLength) {
protected boolean suffixKeyLengthConsistency(int keySuffixLength) {
return this.keySuffixLength == keySuffixLength;
}
@ -285,13 +285,15 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
/**
* Removes the prefix from the key
* @return the prefix
*/
protected void removePrefix(Buffer key) {
protected Buffer splitPrefix(Buffer key) {
assert key.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength
|| key.readableBytes() == keyPrefixLength + keySuffixLength;
key.readerOffset(key.readerOffset() + this.keyPrefixLength);
var prefix = key.readSplit(this.keyPrefixLength);
assert key.readableBytes() == keySuffixLength + keyExtLength
|| key.readableBytes() == keySuffixLength;
return prefix;
}
/**
@ -334,7 +336,13 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
@Override
public Mono<US> at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
var suffixKeyWithoutExt = Mono.fromCallable(() -> toKeyWithoutExt(serializeSuffix(keySuffix)));
var suffixKeyWithoutExt = Mono.fromCallable(() -> {
try (var keyWithoutExtBuf = keyPrefix.copy()) {
keyWithoutExtBuf.ensureWritable(keySuffixLength + keyExtLength);
serializeSuffix(keySuffix, keyWithoutExtBuf);
return keyWithoutExtBuf.send();
}
});
return this.subStageGetter
.subStage(dictionary, snapshot, suffixKeyWithoutExt)
.transform(LLUtils::handleDiscard)
@ -360,8 +368,10 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
groupKeyWithoutExtSend -> this.subStageGetter
.subStage(dictionary, snapshot, Mono.fromCallable(() -> groupKeyWithoutExtSend.copy().send()))
.<Entry<T, US>>handle((us, sink) -> {
T deserializedSuffix;
try {
sink.next(Map.entry(this.deserializeSuffix(getGroupSuffix(groupKeyWithoutExtSend.send())), us));
deserializedSuffix = this.deserializeSuffix(splitGroupSuffix(groupKeyWithoutExtSend));
sink.next(Map.entry(deserializedSuffix, us));
} catch (SerializationException ex) {
sink.error(ex);
}
@ -371,13 +381,18 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
.transform(LLUtils::handleDiscard);
}
private Send<Buffer> getGroupSuffix(Send<Buffer> groupKeyWithoutExt) {
try (var buffer = groupKeyWithoutExt.receive()) {
assert subStageKeysConsistency(buffer.readableBytes() + keyExtLength);
this.removePrefix(buffer);
assert subStageKeysConsistency(keyPrefixLength + buffer.readableBytes() + keyExtLength);
return buffer.send();
}
/**
* Split the input. The input will become the ext, the returned data will be the group suffix
* @param groupKey group key, will become ext
* @return group suffix
*/
private Buffer splitGroupSuffix(@NotNull Buffer groupKey) {
assert subStageKeysConsistency(groupKey.readableBytes())
|| subStageKeysConsistency(groupKey.readableBytes() + keyExtLength);
this.splitPrefix(groupKey).close();
assert subStageKeysConsistency(keyPrefixLength + groupKey.readableBytes())
|| subStageKeysConsistency(keyPrefixLength + groupKey.readableBytes() + keyExtLength);
return groupKey.readSplit(keySuffixLength);
}
private Send<Buffer> getGroupWithoutExt(Send<Buffer> groupKeyWithExtSend) {
@ -430,25 +445,21 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
}
//todo: temporary wrapper. convert the whole class to buffers
protected T deserializeSuffix(@NotNull Send<Buffer> keySuffixToReceive) throws SerializationException {
try (var keySuffix = keySuffixToReceive.receive()) {
assert suffixKeyConsistency(keySuffix.readableBytes());
var result = keySuffixSerializer.deserialize(keySuffix.send());
assert keyPrefix.isAccessible();
return result.deserializedData();
}
protected T deserializeSuffix(@NotNull Buffer keySuffix) throws SerializationException {
assert suffixKeyLengthConsistency(keySuffix.readableBytes());
var result = keySuffixSerializer.deserialize(keySuffix);
assert keyPrefix.isAccessible();
return result;
}
//todo: temporary wrapper. convert the whole class to buffers
@NotNull
protected Send<Buffer> serializeSuffix(T keySuffix) throws SerializationException {
try (var suffixDataToReceive = keySuffixSerializer.serialize(keySuffix)) {
try (Buffer suffixData = suffixDataToReceive.receive()) {
assert suffixKeyConsistency(suffixData.readableBytes());
assert keyPrefix.isAccessible();
return suffixData.send();
}
}
protected void serializeSuffix(T keySuffix, Buffer output) throws SerializationException {
output.ensureWritable(keySuffixLength);
var beforeWriterOffset = output.writerOffset();
keySuffixSerializer.serialize(keySuffix, output);
var afterWriterOffset = output.writerOffset();
assert suffixKeyLengthConsistency(afterWriterOffset - beforeWriterOffset);
assert keyPrefix.isAccessible();
}
@Override

View File

@ -77,9 +77,9 @@ public class DatabaseMapDictionaryHashed<T, U, TH> extends ResourceSupport<Datab
}
this.alloc = dictionary.getAllocator();
ValueWithHashSerializer<T, U> valueWithHashSerializer
= new ValueWithHashSerializer<>(alloc, keySuffixSerializer, valueSerializer);
= new ValueWithHashSerializer<>(keySuffixSerializer, valueSerializer);
ValuesSetSerializer<Entry<T, U>> valuesSetSerializer
= new ValuesSetSerializer<>(alloc, valueWithHashSerializer);
= new ValuesSetSerializer<>(valueWithHashSerializer);
this.subDictionary = DatabaseMapDictionary.tail(dictionary, prefixKey, keySuffixHashSerializer,
valuesSetSerializer, onClose);
this.keySuffixHashFunction = keySuffixHashFunction;

View File

@ -84,13 +84,26 @@ public class DatabaseSingle<U> extends ResourceSupport<DatabaseStage<U>, Databas
}
private void deserializeValue(Send<Buffer> value, SynchronousSink<U> sink) {
try (value) {
sink.next(serializer.deserialize(value).deserializedData());
try {
U deserializedValue;
try (var valueBuf = value.receive()) {
deserializedValue = serializer.deserialize(valueBuf);
}
sink.next(deserializedValue);
} catch (SerializationException ex) {
sink.error(ex);
}
}
private Send<Buffer> serializeValue(U value) throws SerializationException {
var valSizeHint = serializer.getSerializedSizeHint();
if (valSizeHint == -1) valSizeHint = 128;
try (var valBuf = dictionary.getAllocator().allocate(valSizeHint)) {
serializer.serialize(value, valBuf);
return valBuf.send();
}
}
@Override
public Mono<U> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) {
return dictionary
@ -101,7 +114,7 @@ public class DatabaseSingle<U> extends ResourceSupport<DatabaseStage<U>, Databas
@Override
public Mono<U> setAndGetPrevious(U value) {
return dictionary
.put(keyMono, Mono.fromCallable(() -> serializer.serialize(value)), LLDictionaryResultType.PREVIOUS_VALUE)
.put(keyMono, Mono.fromCallable(() -> serializeValue(value)), LLDictionaryResultType.PREVIOUS_VALUE)
.handle(this::deserializeValue);
}
@ -112,12 +125,20 @@ public class DatabaseSingle<U> extends ResourceSupport<DatabaseStage<U>, Databas
return dictionary
.update(keyMono, (oldValueSer) -> {
try (oldValueSer) {
var result = updater.apply(oldValueSer == null ? null
: serializer.deserialize(oldValueSer).deserializedData());
U result;
if (oldValueSer == null) {
result = updater.apply(null);
} else {
U deserializedValue;
try (var valueBuf = oldValueSer.receive()) {
deserializedValue = serializer.deserialize(valueBuf);
}
result = updater.apply(deserializedValue);
}
if (result == null) {
return null;
} else {
return serializer.serialize(result);
return serializeValue(result);
}
}
}, updateReturnMode, existsAlmostCertainly)
@ -130,17 +151,27 @@ public class DatabaseSingle<U> extends ResourceSupport<DatabaseStage<U>, Databas
return dictionary
.updateAndGetDelta(keyMono, (oldValueSer) -> {
try (oldValueSer) {
var result = updater.apply(oldValueSer == null ? null
: serializer.deserialize(oldValueSer).deserializedData());
U result;
if (oldValueSer == null) {
result = updater.apply(null);
} else {
U deserializedValue;
try (var valueBuf = oldValueSer.receive()) {
deserializedValue = serializer.deserialize(valueBuf);
}
result = updater.apply(deserializedValue);
}
if (result == null) {
return null;
} else {
return serializer.serialize(result);
return serializeValue(result);
}
}
}, existsAlmostCertainly).transform(mono -> LLUtils.mapLLDelta(mono,
serialized -> serializer.deserialize(serialized).deserializedData()
));
}, existsAlmostCertainly).transform(mono -> LLUtils.mapLLDelta(mono, serialized -> {
try (var valueBuf = serialized.receive()) {
return serializer.deserialize(valueBuf);
}
}));
}
@Override

View File

@ -7,6 +7,6 @@ import it.cavallium.dbengine.database.serialization.Serializer;
public class SubStageGetterSingleBytes extends SubStageGetterSingle<Send<Buffer>> {
public SubStageGetterSingleBytes() {
super(Serializer.noop());
super(Serializer.NOOP_SEND_SERIALIZER);
}
}

View File

@ -15,38 +15,40 @@ import org.jetbrains.annotations.Nullable;
class ValueWithHashSerializer<X, Y> implements Serializer<Entry<X, Y>> {
private final BufferAllocator allocator;
private final Serializer<X> keySuffixSerializer;
private final Serializer<Y> valueSerializer;
ValueWithHashSerializer(BufferAllocator allocator,
ValueWithHashSerializer(
Serializer<X> keySuffixSerializer,
Serializer<Y> valueSerializer) {
this.allocator = allocator;
this.keySuffixSerializer = keySuffixSerializer;
this.valueSerializer = valueSerializer;
}
@Override
public @NotNull DeserializationResult<Entry<X, Y>> deserialize(@Nullable Send<Buffer> serializedToReceive)
throws SerializationException {
Objects.requireNonNull(serializedToReceive);
try (var serialized = serializedToReceive.receive()) {
DeserializationResult<X> deserializedKey = keySuffixSerializer.deserialize(serialized.copy().send());
DeserializationResult<Y> deserializedValue = valueSerializer.deserialize(serialized
.copy(serialized.readerOffset() + deserializedKey.bytesRead(),
serialized.readableBytes() - deserializedKey.bytesRead()
)
.send());
return new DeserializationResult<>(Map.entry(deserializedKey.deserializedData(),
deserializedValue.deserializedData()), deserializedKey.bytesRead() + deserializedValue.bytesRead());
}
public @NotNull Entry<X, Y> deserialize(@NotNull Buffer serialized) throws SerializationException {
Objects.requireNonNull(serialized);
X deserializedKey = keySuffixSerializer.deserialize(serialized);
Y deserializedValue = valueSerializer.deserialize(serialized);
return Map.entry(deserializedKey, deserializedValue);
}
@Override
public @NotNull Send<Buffer> serialize(@NotNull Entry<X, Y> deserialized) throws SerializationException {
var keySuffix = keySuffixSerializer.serialize(deserialized.getKey());
var value = valueSerializer.serialize(deserialized.getValue());
return LLUtils.compositeBuffer(allocator, keySuffix, value).send();
public void serialize(@NotNull Entry<X, Y> deserialized, Buffer output) throws SerializationException {
keySuffixSerializer.serialize(deserialized.getKey(), output);
valueSerializer.serialize(deserialized.getValue(), output);
}
@Override
public int getSerializedSizeHint() {
var hint1 = keySuffixSerializer.getSerializedSizeHint();
var hint2 = valueSerializer.getSerializedSizeHint();
if (hint1 == -1 && hint2 == -1) {
return -1;
} else if (hint1 == -1) {
return hint2;
} else {
return hint1;
}
}
}

View File

@ -13,46 +13,34 @@ import org.jetbrains.annotations.Nullable;
class ValuesSetSerializer<X> implements Serializer<ObjectArraySet<X>> {
private final BufferAllocator allocator;
private final Serializer<X> entrySerializer;
ValuesSetSerializer(BufferAllocator allocator, Serializer<X> entrySerializer) {
this.allocator = allocator;
ValuesSetSerializer(Serializer<X> entrySerializer) {
this.entrySerializer = entrySerializer;
}
@Override
public @NotNull DeserializationResult<ObjectArraySet<X>> deserialize(@Nullable Send<Buffer> serializedToReceive) throws SerializationException {
Objects.requireNonNull(serializedToReceive);
try (var serialized = serializedToReceive.receive()) {
int initialReaderOffset = serialized.readerOffset();
int entriesLength = serialized.readInt();
ArrayList<X> deserializedElements = new ArrayList<>(entriesLength);
for (int i = 0; i < entriesLength; i++) {
var deserializationResult = entrySerializer.deserialize(serialized
.copy(serialized.readerOffset(), serialized.readableBytes())
.send());
deserializedElements.add(deserializationResult.deserializedData());
serialized.readerOffset(serialized.readerOffset() + deserializationResult.bytesRead());
}
return new DeserializationResult<>(new ObjectArraySet<>(deserializedElements), serialized.readerOffset() - initialReaderOffset);
public @NotNull ObjectArraySet<X> deserialize(@NotNull Buffer serialized) throws SerializationException {
Objects.requireNonNull(serialized);
int entriesLength = serialized.readInt();
ArrayList<X> deserializedElements = new ArrayList<>(entriesLength);
for (int i = 0; i < entriesLength; i++) {
var deserializationResult = entrySerializer.deserialize(serialized);
deserializedElements.add(deserializationResult);
}
return new ObjectArraySet<>(deserializedElements);
}
@Override
public void serialize(@NotNull ObjectArraySet<X> deserialized, Buffer output) throws SerializationException {
output.writeInt(deserialized.size());
for (X entry : deserialized) {
entrySerializer.serialize(entry, output);
}
}
@Override
public @NotNull Send<Buffer> serialize(@NotNull ObjectArraySet<X> deserialized) throws SerializationException {
try (Buffer output = allocator.allocate(64)) {
output.writeInt(deserialized.size());
for (X entry : deserialized) {
var serializedToReceive = entrySerializer.serialize(entry);
try (Buffer serialized = serializedToReceive.receive()) {
if (serialized.readableBytes() > 0) {
output.ensureWritable(serialized.readableBytes());
output.writeBytes(serialized);
}
}
}
return output.send();
}
public int getSerializedSizeHint() {
return -1;
}
}

View File

@ -92,7 +92,6 @@ public class LLLocalDictionary implements LLDictionary {
static final long MAX_WRITE_BATCH_SIZE = 1024L * 1024L * 1024L; // 1GiB
static final int CAPPED_WRITE_BATCH_CAP = 50000; // 50K operations
static final int MULTI_GET_WINDOW = 16;
static final Duration MULTI_GET_WINDOW_TIMEOUT = Duration.ofSeconds(1);
static final ReadOptions EMPTY_READ_OPTIONS = new UnreleasableReadOptions(new UnmodifiableReadOptions());
static final WriteOptions EMPTY_WRITE_OPTIONS = new UnreleasableWriteOptions(new UnmodifiableWriteOptions());
static final WriteOptions BATCH_WRITE_OPTIONS = new UnreleasableWriteOptions(new UnmodifiableWriteOptions());
@ -125,7 +124,6 @@ public class LLLocalDictionary implements LLDictionary {
static final boolean USE_WRITE_BATCH_IN_SET_RANGE_DELETE = false;
static final boolean PARALLEL_EXACT_SIZE = true;
private static final int STRIPES = 512;
private static final byte[] FIRST_KEY = new byte[]{};
private static final byte[] NO_DATA = new byte[0];
@ -158,10 +156,13 @@ public class LLLocalDictionary implements LLDictionary {
private final Function<LLSnapshot, Snapshot> snapshotResolver;
private final UpdateMode updateMode;
private final BufferAllocator alloc;
private final String getRangeMultiDebugName;
private final String getRangeKeysMultiDebugName;
private final DatabaseOptions databaseOptions;
private final String getRangeMultiDebugName;
private final String getRangeMultiGroupedDebugName;
private final String getRangeKeysDebugName;
private final String getRangeKeysGroupedDebugName;
public LLLocalDictionary(
BufferAllocator allocator,
@NotNull OptimisticTransactionDB db,
@ -182,7 +183,9 @@ public class LLLocalDictionary implements LLDictionary {
this.snapshotResolver = snapshotResolver;
this.updateMode = updateMode;
this.getRangeMultiDebugName = databaseName + "(" + columnName + ")" + "::getRangeMulti";
this.getRangeKeysMultiDebugName = databaseName + "(" + columnName + ")" + "::getRangeKeysMulti";
this.getRangeMultiGroupedDebugName = databaseName + "(" + columnName + ")" + "::getRangeMultiGrouped";
this.getRangeKeysDebugName = databaseName + "(" + columnName + ")" + "::getRangeKeys";
this.getRangeKeysGroupedDebugName = databaseName + "(" + columnName + ")" + "::getRangeKeysGrouped";
this.databaseOptions = databaseOptions;
alloc = allocator;
}
@ -220,34 +223,6 @@ public class LLLocalDictionary implements LLDictionary {
}
}
private int getLockIndex(Buffer key) {
return Math.abs(LLUtils.hashCode(key) % STRIPES);
}
private IntArrayList getLockIndices(List<Buffer> keys) {
var list = new IntArrayList(keys.size());
for (Buffer key : keys) {
list.add(getLockIndex(key));
}
return list;
}
private IntArrayList getLockIndicesEntries(List<LLEntry> keys) {
var list = new IntArrayList(keys.size());
for (LLEntry key : keys) {
list.add(getLockIndex(key.getKeyUnsafe()));
}
return list;
}
private <X> IntArrayList getLockIndicesWithExtra(List<Tuple2<Buffer, X>> entries) {
var list = new IntArrayList(entries.size());
for (Tuple2<Buffer, X> key : entries) {
list.add(getLockIndex(key.getT1()));
}
return list;
}
@Override
public BufferAllocator getAllocator() {
return alloc;
@ -275,7 +250,8 @@ public class LLLocalDictionary implements LLDictionary {
var result = dbGet(cfh, resolveSnapshot(snapshot), key.send(), existsAlmostCertainly);
if (logger.isTraceEnabled(MARKER_ROCKSDB)) {
try (var result2 = result == null ? null : result.receive()) {
logger.trace(MARKER_ROCKSDB, "Reading {}: {}", LLUtils.toStringSafe(logKey), LLUtils.toString(result2));
logger.trace(MARKER_ROCKSDB, "Reading {}: {}", LLUtils.toStringSafe(logKey),
LLUtils.toString(result2));
return result2 == null ? null : result2.send();
}
} else {
@ -607,6 +583,23 @@ public class LLLocalDictionary implements LLDictionary {
return Mono.fromSupplier(() -> updateMode);
}
/**
*
* @return true if not committed successfully
*/
private boolean commitOptimistically(Transaction tx) throws RocksDBException {
try {
tx.commit();
return true;
} catch (RocksDBException ex) {
var status = ex.getStatus() != null ? ex.getStatus().getCode() : Code.Ok;
if (status == Code.Busy || status == Code.TryAgain) {
return false;
}
throw ex;
}
}
// Remember to change also updateAndGetDelta() if you are modifying this function
@SuppressWarnings("DuplicatedCode")
@Override
@ -624,85 +617,121 @@ public class LLLocalDictionary implements LLDictionary {
throw new UnsupportedOperationException("update() is disallowed");
}
try (var tx = db.beginTransaction(EMPTY_WRITE_OPTIONS, DEFAULT_OPTIMISTIC_TX_OPTIONS)) {
var prevDataArray = tx.getForUpdate(EMPTY_READ_OPTIONS, cfh, keyArray, true);
Buffer prevData;
if (prevDataArray != null) {
prevData = BYTE_ARRAY_BUFFERS.allocate(prevDataArray.length);
prevData.writeBytes(prevDataArray);
} else {
prevData = null;
}
try (prevData) {
Buffer prevDataToSendToUpdater;
if (prevData != null) {
prevDataToSendToUpdater = prevData.copy();
} else {
prevDataToSendToUpdater = null;
boolean committedSuccessfully;
Send<Buffer> sentPrevData;
Send<Buffer> sentCurData;
do {
var prevDataArray = tx.getForUpdate(EMPTY_READ_OPTIONS, cfh, keyArray, true);
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Reading {}: {} (before update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevDataArray)
);
}
Buffer prevData;
if (prevDataArray != null) {
prevData = BYTE_ARRAY_BUFFERS.allocate(prevDataArray.length);
prevData.writeBytes(prevDataArray);
} else {
prevData = null;
}
try (prevData) {
Buffer prevDataToSendToUpdater;
if (prevData != null) {
prevDataToSendToUpdater = prevData.copy();
} else {
prevDataToSendToUpdater = null;
}
@Nullable Buffer newData;
try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) {
try (var newDataToReceive = updater.apply(sentData)) {
if (newDataToReceive != null) {
newData = newDataToReceive.receive();
} else {
newData = null;
@Nullable Buffer newData;
try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) {
try (var newDataToReceive = updater.apply(sentData)) {
if (newDataToReceive != null) {
newData = newDataToReceive.receive();
} else {
newData = null;
}
}
}
}
try (newData) {
var newDataArray = newData == null ? null : LLUtils.toArray(newData);
if (prevData != null && newData == null) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
}
tx.delete(cfh, keyArray, true);
commitOptimistically(tx);
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
try (newData) {
var newDataArray = newData == null ? null : LLUtils.toArray(newData);
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Writing {}: {} (after update)",
"Updating {}. previous data: {}, updated data: {}",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(newData)
LLUtils.toStringSafe(prevDataArray),
LLUtils.toStringSafe(newDataArray)
);
}
tx.put(cfh, keyArray, newDataArray);
commitOptimistically(tx);
} else {
tx.rollback();
if (prevData != null && newData == null) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
}
tx.delete(cfh, keyArray, true);
committedSuccessfully = commitOptimistically(tx);
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Writing {}: {} (after update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(newData)
);
}
tx.put(cfh, keyArray, newDataArray);
committedSuccessfully = commitOptimistically(tx);
} else {
committedSuccessfully = true;
tx.rollback();
}
sentPrevData = prevData == null ? null : prevData.send();
sentCurData = newData == null ? null : newData.send();
if (!committedSuccessfully) {
if (sentPrevData != null) {
sentPrevData.close();
}
if (sentCurData != null) {
sentCurData.close();
}
logger.debug(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):"
+ " waiting 5ms before retrying", LLUtils.toStringSafe(key));
// Wait for 5ms
LockSupport.parkNanos(5000000);
}
}
return switch (updateReturnMode) {
case GET_NEW_VALUE -> newData != null ? newData.send() : null;
case GET_OLD_VALUE -> prevData != null ? prevData.send() : null;
case NOTHING -> null;
//noinspection UnnecessaryDefault
default -> throw new IllegalArgumentException();
};
}
}
} while (!committedSuccessfully);
return switch (updateReturnMode) {
case GET_NEW_VALUE -> {
if (sentPrevData != null) {
sentPrevData.close();
}
yield sentCurData;
}
case GET_OLD_VALUE -> {
if (sentCurData != null) {
sentCurData.close();
}
yield sentPrevData;
}
case NOTHING -> {
if (sentPrevData != null) {
sentPrevData.close();
}
if (sentCurData != null) {
sentCurData.close();
}
yield null;
}
//noinspection UnnecessaryDefault
default -> throw new IllegalArgumentException();
};
}
}
}).onErrorMap(cause -> new IOException("Failed to read or write", cause)),
keySend -> Mono.fromRunnable(keySend::close));
}
private void commitOptimistically(Transaction tx) throws RocksDBException {
Code commitStatusCode = null;
do {
try {
tx.commit();
} catch (RocksDBException ex) {
if (ex.getStatus() != null && ex.getStatus().getCode() == Code.TryAgain) {
commitStatusCode = Code.TryAgain;
// Park for maximum 5ms
LockSupport.parkNanos(5000000);
} else {
throw ex;
}
}
} while (commitStatusCode == Code.TryAgain);
}
// Remember to change also update() if you are modifying this function
@Override
public Mono<Send<LLDelta>> updateAndGetDelta(Mono<Send<Buffer>> keyMono,
@ -718,58 +747,91 @@ public class LLLocalDictionary implements LLDictionary {
throw new UnsupportedOperationException("update() is disallowed");
}
try (var tx = db.beginTransaction(EMPTY_WRITE_OPTIONS, DEFAULT_OPTIMISTIC_TX_OPTIONS)) {
var prevDataArray = tx.getForUpdate(EMPTY_READ_OPTIONS, cfh, keyArray, true);
Buffer prevData;
if (prevDataArray != null) {
prevData = BYTE_ARRAY_BUFFERS.allocate(prevDataArray.length);
prevData.writeBytes(prevDataArray);
} else {
prevData = null;
}
try (prevData) {
Buffer prevDataToSendToUpdater;
if (prevData != null) {
prevDataToSendToUpdater = prevData.copy();
} else {
prevDataToSendToUpdater = null;
boolean committedSuccessfully;
Send<Buffer> sentPrevData;
Send<Buffer> sentCurData;
do {
var prevDataArray = tx.getForUpdate(EMPTY_READ_OPTIONS, cfh, keyArray, true);
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Reading {}: {} (before update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevDataArray)
);
}
Buffer prevData;
if (prevDataArray != null) {
prevData = BYTE_ARRAY_BUFFERS.allocate(prevDataArray.length);
prevData.writeBytes(prevDataArray);
} else {
prevData = null;
}
try (prevData) {
Buffer prevDataToSendToUpdater;
if (prevData != null) {
prevDataToSendToUpdater = prevData.copy();
} else {
prevDataToSendToUpdater = null;
}
@Nullable Buffer newData;
try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) {
try (var newDataToReceive = updater.apply(sentData)) {
if (newDataToReceive != null) {
newData = newDataToReceive.receive();
} else {
newData = null;
@Nullable Buffer newData;
try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) {
try (var newDataToReceive = updater.apply(sentData)) {
if (newDataToReceive != null) {
newData = newDataToReceive.receive();
} else {
newData = null;
}
}
}
}
try (newData) {
var newDataArray = newData == null ? null : LLUtils.toArray(newData);
if (prevData != null && newData == null) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
}
tx.delete(cfh, keyArray, true);
commitOptimistically(tx);
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
try (newData) {
var newDataArray = newData == null ? null : LLUtils.toArray(newData);
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Writing {}: {} (after update)",
"Updating {}. previous data: {}, updated data: {}",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(newData)
LLUtils.toStringSafe(prevDataArray),
LLUtils.toStringSafe(newDataArray)
);
}
tx.put(cfh, keyArray, newDataArray);
commitOptimistically(tx);
} else {
tx.rollback();
if (prevData != null && newData == null) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
}
tx.delete(cfh, keyArray, true);
committedSuccessfully = commitOptimistically(tx);
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Writing {}: {} (after update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(newData)
);
}
tx.put(cfh, keyArray, newDataArray);
committedSuccessfully = commitOptimistically(tx);
} else {
tx.rollback();
committedSuccessfully = true;
}
sentPrevData = prevData == null ? null : prevData.send();
sentCurData = newData == null ? null : newData.send();
if (!committedSuccessfully) {
if (sentPrevData != null) {
sentPrevData.close();
}
if (sentCurData != null) {
sentCurData.close();
}
logger.debug(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):"
+ " waiting 5ms before retrying", LLUtils.toStringSafe(key));
// Wait for 5ms
LockSupport.parkNanos(5000000);
}
}
return LLDelta
.of(prevData != null ? prevData.send() : null, newData != null ? newData.send() : null)
.send();
}
}
} while (!committedSuccessfully);
return LLDelta.of(sentPrevData, sentCurData).send();
}
}
}).onErrorMap(cause -> new IOException("Failed to read or write", cause)),
@ -1215,11 +1277,12 @@ public class LLLocalDictionary implements LLDictionary {
);
}
private Flux<List<Send<LLEntry>>> getRangeMultiGrouped(LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono, int prefixLength) {
private Flux<List<Send<LLEntry>>> getRangeMultiGrouped(LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono,
int prefixLength) {
return Flux.usingWhen(rangeMono,
rangeSend -> Flux.using(
() -> new LLLocalGroupedEntryReactiveRocksIterator(db, alloc, cfh, prefixLength, rangeSend,
databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot), "getRangeMultiGrouped"),
databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot), getRangeMultiGroupedDebugName),
reactiveRocksIterator -> reactiveRocksIterator.flux().subscribeOn(dbScheduler),
LLLocalGroupedReactiveRocksIterator::release
).transform(LLUtils::handleDiscard),
@ -1250,7 +1313,7 @@ public class LLLocalDictionary implements LLDictionary {
return Flux.usingWhen(rangeMono,
rangeSend -> Flux.using(
() -> new LLLocalGroupedKeyReactiveRocksIterator(db, alloc, cfh, prefixLength, rangeSend,
databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot), "getRangeKeysGrouped"),
databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot), getRangeKeysDebugName),
reactiveRocksIterator -> reactiveRocksIterator.flux().subscribeOn(dbScheduler),
LLLocalGroupedReactiveRocksIterator::release
).transform(LLUtils::handleDiscard),
@ -1306,7 +1369,8 @@ public class LLLocalDictionary implements LLDictionary {
}
@Override
public Flux<Send<Buffer>> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono, int prefixLength) {
public Flux<Send<Buffer>> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono,
int prefixLength) {
return Flux.usingWhen(rangeMono,
rangeSend -> Flux
.using(
@ -1318,7 +1382,7 @@ public class LLLocalDictionary implements LLDictionary {
databaseOptions.allowNettyDirect(),
resolveSnapshot(snapshot),
true,
"getRangeKeysGrouped"
getRangeKeysGroupedDebugName
),
LLLocalKeyPrefixReactiveRocksIterator::flux,
LLLocalKeyPrefixReactiveRocksIterator::release
@ -1525,7 +1589,8 @@ public class LLLocalDictionary implements LLDictionary {
} else {
if (USE_WRITE_BATCHES_IN_SET_RANGE) {
return Mono.fromCallable(() -> {
throw new UnsupportedOperationException("Can't use write batches in setRange without window. Please fix params");
throw new UnsupportedOperationException("Can't use write batches in setRange without window."
+ " Please fix the parameters");
});
}
return this
@ -1562,14 +1627,16 @@ public class LLLocalDictionary implements LLDictionary {
readOpts.setFillCache(false);
ReleasableSlice minBound;
if (range.hasMin()) {
minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.LOWER, range.getMin());
minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts,
IterateBound.LOWER, range.getMin());
} else {
minBound = emptyReleasableSlice();
}
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.UPPER, range.getMax());
maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts,
IterateBound.UPPER, range.getMax());
} else {
maxBound = emptyReleasableSlice();
}
@ -1612,7 +1679,8 @@ public class LLLocalDictionary implements LLDictionary {
readOpts.setFillCache(false);
ReleasableSlice minBound;
if (range.hasMin()) {
minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.LOWER, range.getMin());
minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts,
IterateBound.LOWER, range.getMin());
} else {
minBound = emptyReleasableSlice();
}
@ -1655,8 +1723,8 @@ public class LLLocalDictionary implements LLDictionary {
}
@Nullable
private static SafeCloseable rocksIterSeekTo(BufferAllocator alloc, boolean allowNettyDirect, RocksIterator rocksIterator,
Send<Buffer> bufferToReceive) {
private static SafeCloseable rocksIterSeekTo(BufferAllocator alloc, boolean allowNettyDirect,
RocksIterator rocksIterator, Send<Buffer> bufferToReceive) {
try (var buffer = bufferToReceive.receive()) {
if (allowNettyDirect) {
var direct = LLUtils.convertToReadableDirect(alloc, buffer.send());
@ -1673,8 +1741,8 @@ public class LLLocalDictionary implements LLDictionary {
}
}
private static ReleasableSlice setIterateBound(BufferAllocator alloc, boolean allowNettyDirect, ReadOptions readOpts,
IterateBound boundType, Send<Buffer> bufferToReceive) {
private static ReleasableSlice setIterateBound(BufferAllocator alloc, boolean allowNettyDirect,
ReadOptions readOpts, IterateBound boundType, Send<Buffer> bufferToReceive) {
var buffer = bufferToReceive.receive();
try {
requireNonNull(buffer);
@ -1785,7 +1853,7 @@ public class LLLocalDictionary implements LLDictionary {
writeBatch.writeToDbAndClose();
//noinspection ConstantConditions
if (shouldCompactLater) {
// Compact range
db.suggestCompactRange(cfh);
@ -1814,76 +1882,72 @@ public class LLLocalDictionary implements LLDictionary {
@Override
public Mono<Long> sizeRange(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono, boolean fast) {
return Mono.usingWhen(rangeMono,
rangeSend -> {
return runOnDb(() -> {
try (var range = rangeSend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called sizeRange in a nonblocking thread");
}
if (range.isAll()) {
return fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot);
return Mono.usingWhen(rangeMono, rangeSend -> runOnDb(() -> {
try (var range = rangeSend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called sizeRange in a nonblocking thread");
}
if (range.isAll()) {
return fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot);
} else {
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
readOpts.setFillCache(false);
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
ReleasableSlice minBound;
if (range.hasMin()) {
minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts,
IterateBound.LOWER, range.getMin());
} else {
minBound = emptyReleasableSlice();
}
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts,
IterateBound.UPPER, range.getMax());
} else {
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
readOpts.setFillCache(false);
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
ReleasableSlice minBound;
if (range.hasMin()) {
minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.LOWER,
range.getMin());
maxBound = emptyReleasableSlice();
}
try {
if (fast) {
readOpts.setIgnoreRangeDeletions(true);
}
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
SafeCloseable seekTo;
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(),
rocksIterator, range.getMin());
} else {
minBound = emptyReleasableSlice();
seekTo = null;
rocksIterator.seekToFirst();
}
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.UPPER,
range.getMax());
} else {
maxBound = emptyReleasableSlice();
}
try {
if (fast) {
readOpts.setIgnoreRangeDeletions(true);
}
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
SafeCloseable seekTo;
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), rocksIterator,
range.getMin());
} else {
seekTo = null;
rocksIterator.seekToFirst();
}
try {
long i = 0;
rocksIterator.status();
while (rocksIterator.isValid()) {
rocksIterator.next();
rocksIterator.status();
i++;
}
return i;
} finally {
if (seekTo != null) {
seekTo.close();
}
}
}
} finally {
maxBound.close();
long i = 0;
rocksIterator.status();
while (rocksIterator.isValid()) {
rocksIterator.next();
rocksIterator.status();
i++;
}
return i;
} finally {
minBound.close();
if (seekTo != null) {
seekTo.close();
}
}
}
} finally {
maxBound.close();
}
} finally {
minBound.close();
}
}).onErrorMap(cause -> new IOException("Failed to get size of range", cause));
},
rangeSend -> Mono.fromRunnable(rangeSend::close)
);
}
}
}
}).onErrorMap(cause -> new IOException("Failed to get size of range", cause)),
rangeSend -> Mono.fromRunnable(rangeSend::close));
}
@Override
@ -1897,23 +1961,24 @@ public class LLLocalDictionary implements LLDictionary {
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
ReleasableSlice minBound;
if (range.hasMin()) {
minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.LOWER,
range.getMin());
minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts,
IterateBound.LOWER, range.getMin());
} else {
minBound = emptyReleasableSlice();
}
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.UPPER,
range.getMax());
maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts,
IterateBound.UPPER, range.getMax());
} else {
maxBound = emptyReleasableSlice();
}
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
SafeCloseable seekTo;
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), rocksIterator, range.getMin());
seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(),
rocksIterator, range.getMin());
} else {
seekTo = null;
rocksIterator.seekToFirst();
@ -1958,23 +2023,24 @@ public class LLLocalDictionary implements LLDictionary {
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
ReleasableSlice minBound;
if (range.hasMin()) {
minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.LOWER,
range.getMin());
minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts,
IterateBound.LOWER, range.getMin());
} else {
minBound = emptyReleasableSlice();
}
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.UPPER,
range.getMax());
maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts,
IterateBound.UPPER, range.getMax());
} else {
maxBound = emptyReleasableSlice();
}
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
SafeCloseable seekTo;
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), rocksIterator, range.getMin());
seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(),
rocksIterator, range.getMin());
} else {
seekTo = null;
rocksIterator.seekToFirst();
@ -2126,23 +2192,24 @@ public class LLLocalDictionary implements LLDictionary {
try (var readOpts = new ReadOptions(getReadOptions(null))) {
ReleasableSlice minBound;
if (range.hasMin()) {
minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.LOWER,
range.getMin());
minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts,
IterateBound.LOWER, range.getMin());
} else {
minBound = emptyReleasableSlice();
}
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.UPPER,
range.getMax());
maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts,
IterateBound.UPPER, range.getMax());
} else {
maxBound = emptyReleasableSlice();
}
try (RocksIterator rocksIterator = db.newIterator(cfh, readOpts)) {
SafeCloseable seekTo;
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), rocksIterator, range.getMin());
seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(),
rocksIterator, range.getMin());
} else {
seekTo = null;
rocksIterator.seekToFirst();
@ -2177,7 +2244,8 @@ public class LLLocalDictionary implements LLDictionary {
}
@NotNull
public static Tuple4<RocksIterator, ReleasableSlice, ReleasableSlice, SafeCloseable> getRocksIterator(BufferAllocator alloc,
public static Tuple4<RocksIterator, ReleasableSlice, ReleasableSlice, SafeCloseable> getRocksIterator(
BufferAllocator alloc,
boolean allowNettyDirect,
ReadOptions readOptions,
Send<LLRange> rangeToReceive,
@ -2202,8 +2270,8 @@ public class LLLocalDictionary implements LLDictionary {
var rocksIterator = db.newIterator(cfh, readOptions);
SafeCloseable seekTo;
if (!PREFER_SEEK_TO_FIRST && range.hasMin()) {
seekTo = Objects.requireNonNullElseGet(rocksIterSeekTo(alloc, allowNettyDirect, rocksIterator, range.getMin()),
() -> ((SafeCloseable) () -> {})
seekTo = Objects.requireNonNullElseGet(rocksIterSeekTo(alloc, allowNettyDirect,
rocksIterator, range.getMin()), () -> ((SafeCloseable) () -> {})
);
} else {
seekTo = () -> {};

View File

@ -89,6 +89,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator {
}
if (firstGroupKey != null) {
assert firstGroupKey.isAccessible();
var groupKeyPrefix = firstGroupKey.copy(firstGroupKey.readerOffset(), prefixLength);
assert groupKeyPrefix.isAccessible();
@ -126,7 +127,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator {
tuple.getT3().close();
tuple.getT4().close();
}),
Send::close
resource -> resource.close()
);
}

View File

@ -1,144 +1,55 @@
package it.cavallium.dbengine.database.serialization;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.SafeCloseable;
import java.io.DataInput;
import java.nio.charset.StandardCharsets;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
public class BufferDataInput implements DataInput, SafeCloseable {
@Nullable
private final Buffer buf;
private final int initialReaderOffset;
public BufferDataInput(@Nullable Send<Buffer> bufferSend) {
this.buf = bufferSend == null ? null : bufferSend.receive().makeReadOnly();
this.initialReaderOffset = buf == null ? 0 : buf.readerOffset();
}
public interface BufferDataInput extends DataInput {
@Override
public void readFully(byte @NotNull [] b) {
this.readFully(b, 0, b.length);
}
void readFully(byte @NotNull [] b);
@Override
public void readFully(byte @NotNull [] b, int off, int len) {
if (buf == null) {
if (len != 0) {
throw new IndexOutOfBoundsException();
}
} else {
buf.copyInto(buf.readerOffset(), b, off, len);
buf.readerOffset(buf.readerOffset() + len);
}
}
void readFully(byte @NotNull [] b, int off, int len);
@Override
public int skipBytes(int n) {
if (buf == null) {
if (n != 0) {
throw new IndexOutOfBoundsException();
}
return 0;
} else {
n = Math.min(n, buf.readerOffset() - buf.writerOffset());
buf.readerOffset(buf.readerOffset() + n);
return n;
}
}
int skipBytes(int n);
@Override
public boolean readBoolean() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readUnsignedByte() != 0;
}
boolean readBoolean();
@Override
public byte readByte() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readByte();
}
byte readByte();
@Override
public int readUnsignedByte() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readUnsignedByte();
}
int readUnsignedByte();
@Override
public short readShort() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readShort();
}
short readShort();
@Override
public int readUnsignedShort() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readUnsignedShort();
}
int readUnsignedShort();
@Override
public char readChar() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readChar();
}
char readChar();
@Override
public int readInt() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readInt();
}
int readInt();
@Override
public long readLong() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readLong();
}
long readLong();
@Override
public float readFloat() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readFloat();
}
float readFloat();
@Override
public double readDouble() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readDouble();
}
double readDouble();
@Override
public String readLine() {
if (buf == null) throw new IndexOutOfBoundsException();
throw new UnsupportedOperationException();
}
String readLine();
@NotNull
@Override
public String readUTF() {
if (buf == null) throw new IndexOutOfBoundsException();
var len = buf.readUnsignedShort();
byte[] bytes = new byte[len];
buf.copyInto(buf.readerOffset(), bytes, 0, len);
buf.readerOffset(buf.readerOffset() + len);
return new String(bytes, StandardCharsets.UTF_8);
}
String readUTF();
@Override
public void close() {
if (buf != null) {
buf.close();
}
}
public int getReadBytesCount() {
if (buf == null) {
return 0;
} else {
return buf.readerOffset() - initialReaderOffset;
}
}
int getReadBytesCount();
}

View File

@ -0,0 +1,145 @@
package it.cavallium.dbengine.database.serialization;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.SafeCloseable;
import java.io.DataInput;
import java.nio.charset.StandardCharsets;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
public class BufferDataInputOwned implements SafeCloseable, BufferDataInput {
@Nullable
private final Buffer buf;
private final int initialReaderOffset;
public BufferDataInputOwned(@Nullable Send<Buffer> bufferSend) {
this.buf = bufferSend == null ? null : bufferSend.receive().makeReadOnly();
this.initialReaderOffset = buf == null ? 0 : buf.readerOffset();
}
@Override
public void readFully(byte @NotNull [] b) {
this.readFully(b, 0, b.length);
}
@Override
public void readFully(byte @NotNull [] b, int off, int len) {
if (buf == null) {
if (len != 0) {
throw new IndexOutOfBoundsException();
}
} else {
buf.copyInto(buf.readerOffset(), b, off, len);
buf.readerOffset(buf.readerOffset() + len);
}
}
@Override
public int skipBytes(int n) {
if (buf == null) {
if (n != 0) {
throw new IndexOutOfBoundsException();
}
return 0;
} else {
n = Math.min(n, buf.readerOffset() - buf.writerOffset());
buf.readerOffset(buf.readerOffset() + n);
return n;
}
}
@Override
public boolean readBoolean() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readUnsignedByte() != 0;
}
@Override
public byte readByte() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readByte();
}
@Override
public int readUnsignedByte() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readUnsignedByte();
}
@Override
public short readShort() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readShort();
}
@Override
public int readUnsignedShort() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readUnsignedShort();
}
@Override
public char readChar() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readChar();
}
@Override
public int readInt() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readInt();
}
@Override
public long readLong() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readLong();
}
@Override
public float readFloat() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readFloat();
}
@Override
public double readDouble() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readDouble();
}
@Override
public String readLine() {
if (buf == null) throw new IndexOutOfBoundsException();
throw new UnsupportedOperationException();
}
@NotNull
@Override
public String readUTF() {
if (buf == null) throw new IndexOutOfBoundsException();
var len = buf.readUnsignedShort();
byte[] bytes = new byte[len];
buf.copyInto(buf.readerOffset(), bytes, 0, len);
buf.readerOffset(buf.readerOffset() + len);
return new String(bytes, StandardCharsets.UTF_8);
}
@Override
public void close() {
if (buf != null) {
buf.close();
}
}
@Override
public int getReadBytesCount() {
if (buf == null) {
return 0;
} else {
return buf.readerOffset() - initialReaderOffset;
}
}
}

View File

@ -0,0 +1,137 @@
package it.cavallium.dbengine.database.serialization;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.SafeCloseable;
import java.io.DataInput;
import java.nio.charset.StandardCharsets;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
public class BufferDataInputShared implements BufferDataInput {
@Nullable
private final Buffer buf;
private final int initialReaderOffset;
public BufferDataInputShared(@Nullable Buffer buffer) {
this.buf = buffer;
this.initialReaderOffset = buf == null ? 0 : buf.readerOffset();
}
@Override
public void readFully(byte @NotNull [] b) {
this.readFully(b, 0, b.length);
}
@Override
public void readFully(byte @NotNull [] b, int off, int len) {
if (buf == null) {
if (len != 0) {
throw new IndexOutOfBoundsException();
}
} else {
buf.copyInto(buf.readerOffset(), b, off, len);
buf.readerOffset(buf.readerOffset() + len);
}
}
@Override
public int skipBytes(int n) {
if (buf == null) {
if (n != 0) {
throw new IndexOutOfBoundsException();
}
return 0;
} else {
n = Math.min(n, buf.readerOffset() - buf.writerOffset());
buf.readerOffset(buf.readerOffset() + n);
return n;
}
}
@Override
public boolean readBoolean() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readUnsignedByte() != 0;
}
@Override
public byte readByte() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readByte();
}
@Override
public int readUnsignedByte() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readUnsignedByte();
}
@Override
public short readShort() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readShort();
}
@Override
public int readUnsignedShort() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readUnsignedShort();
}
@Override
public char readChar() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readChar();
}
@Override
public int readInt() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readInt();
}
@Override
public long readLong() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readLong();
}
@Override
public float readFloat() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readFloat();
}
@Override
public double readDouble() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readDouble();
}
@Override
public String readLine() {
if (buf == null) throw new IndexOutOfBoundsException();
throw new UnsupportedOperationException();
}
@NotNull
@Override
public String readUTF() {
if (buf == null) throw new IndexOutOfBoundsException();
var len = buf.readUnsignedShort();
byte[] bytes = new byte[len];
buf.copyInto(buf.readerOffset(), bytes, 0, len);
buf.readerOffset(buf.readerOffset() + len);
return new String(bytes, StandardCharsets.UTF_8);
}
public int getReadBytesCount() {
if (buf == null) {
return 0;
} else {
return buf.readerOffset() - initialReaderOffset;
}
}
}

View File

@ -1,7 +1,5 @@
package it.cavallium.dbengine.database.serialization;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.jetbrains.annotations.NotNull;

View File

@ -11,23 +11,23 @@ import org.warp.commonutils.error.IndexOutOfBoundsException;
public class CodecSerializer<A> implements Serializer<A> {
private final BufferAllocator allocator;
private final Codecs<A> deserializationCodecs;
private final Codec<A> serializationCodec;
private final int serializationCodecId;
private final boolean microCodecs;
private final int serializedSizeHint;
/**
*
* @param microCodecs if true, allow only codecs with a value from 0 to 255 to save disk space
* @param serializedSizeHint suggested default buffer size, -1 if unknown
*/
public CodecSerializer(
BufferAllocator allocator,
Codecs<A> deserializationCodecs,
Codec<A> serializationCodec,
int serializationCodecId,
boolean microCodecs) {
this.allocator = allocator;
boolean microCodecs,
int serializedSizeHint) {
this.deserializationCodecs = deserializationCodecs;
this.serializationCodec = serializationCodec;
this.serializationCodecId = serializationCodecId;
@ -35,11 +35,22 @@ public class CodecSerializer<A> implements Serializer<A> {
if (microCodecs && (serializationCodecId > 255 || serializationCodecId < 0)) {
throw new IndexOutOfBoundsException(serializationCodecId, 0, 255);
}
if (serializedSizeHint != -1) {
this.serializedSizeHint = (microCodecs ? Byte.BYTES : Integer.BYTES) + serializedSizeHint;
} else {
this.serializedSizeHint = -1;
}
}
@Override
public @NotNull DeserializationResult<A> deserialize(@Nullable Send<Buffer> serializedToReceive) {
try (var is = new BufferDataInput(serializedToReceive)) {
public int getSerializedSizeHint() {
return serializedSizeHint;
}
@Override
public @NotNull A deserialize(@NotNull Buffer serializedBuf) throws SerializationException {
try {
var is = new BufferDataInputShared(serializedBuf);
int codecId;
if (microCodecs) {
codecId = is.readUnsignedByte();
@ -47,7 +58,7 @@ public class CodecSerializer<A> implements Serializer<A> {
codecId = is.readInt();
}
var serializer = deserializationCodecs.getCodec(codecId);
return new DeserializationResult<>(serializer.deserialize(is), is.getReadBytesCount());
return serializer.deserialize(is);
} catch (IOException ex) {
// This shouldn't happen
throw new IOError(ex);
@ -55,16 +66,15 @@ public class CodecSerializer<A> implements Serializer<A> {
}
@Override
public @NotNull Send<Buffer> serialize(@NotNull A deserialized) {
try (Buffer buf = allocator.allocate(64)) {
var os = new BufferDataOutput(buf);
public void serialize(@NotNull A deserialized, Buffer output) throws SerializationException {
try {
var os = new BufferDataOutput(output);
if (microCodecs) {
os.writeByte(serializationCodecId);
} else {
os.writeInt(serializationCodecId);
}
serializationCodec.serialize(os, deserialized);
return buf.send();
} catch (IOException ex) {
// This shouldn't happen
throw new IOError(ex);

View File

@ -1,8 +1,10 @@
package it.cavallium.dbengine.database.serialization;
import io.net5.buffer.ByteBufUtil;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.Send;
import io.net5.util.internal.StringUtil;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.netty.NullableBuffer;
import java.nio.charset.StandardCharsets;
@ -12,56 +14,82 @@ import org.jetbrains.annotations.Nullable;
public interface Serializer<A> {
record DeserializationResult<T>(T deserializedData, int bytesRead) {}
/**
*
* @param serialized the serialized data should be split!
*/
@NotNull A deserialize(@NotNull Buffer serialized) throws SerializationException;
@NotNull DeserializationResult<A> deserialize(@NotNull Send<Buffer> serialized) throws SerializationException;
/**
* @param output its writable size will be at least equal to the size hint
*/
void serialize(@NotNull A deserialized, Buffer output) throws SerializationException;
@NotNull Send<Buffer> serialize(@NotNull A deserialized) throws SerializationException;
/**
* @return suggested default buffer size, -1 if unknown
*/
int getSerializedSizeHint();
Serializer<Send<Buffer>> NOOP_SERIALIZER = new Serializer<>() {
Serializer<Buffer> NOOP_SERIALIZER = new Serializer<>() {
@Override
public @NotNull DeserializationResult<Send<Buffer>> deserialize(@NotNull Send<Buffer> serialized) {
try (var serializedBuf = serialized.receive()) {
var readableBytes = serializedBuf.readableBytes();
return new DeserializationResult<>(serializedBuf.send(), readableBytes);
}
public @NotNull Buffer deserialize(@NotNull Buffer serialized) {
return serialized.split();
}
@Override
public @NotNull Send<Buffer> serialize(@NotNull Send<Buffer> deserialized) {
return deserialized;
public void serialize(@NotNull Buffer deserialized, @NotNull Buffer deserializedToReceive) {
deserializedToReceive.ensureWritable(deserialized.readableBytes());
deserializedToReceive.writeBytes(deserialized);
}
@Override
public int getSerializedSizeHint() {
return -1;
}
};
static Serializer<Send<Buffer>> noop() {
return NOOP_SERIALIZER;
}
Serializer<Send<Buffer>> NOOP_SEND_SERIALIZER = new Serializer<>() {
@Override
public @NotNull Send<Buffer> deserialize(@NotNull Buffer serialized) {
return serialized.split().send();
}
static Serializer<String> utf8(BufferAllocator allocator) {
return new Serializer<>() {
@Override
public @NotNull DeserializationResult<String> deserialize(@Nullable Send<Buffer> serializedToReceive) {
Objects.requireNonNull(serializedToReceive);
try (Buffer serialized = serializedToReceive.receive()) {
assert serialized.isAccessible();
int length = serialized.readInt();
var readerOffset = serialized.readerOffset();
return new DeserializationResult<>(LLUtils.deserializeString(serialized.send(),
readerOffset, length, StandardCharsets.UTF_8), Integer.BYTES + length);
}
@Override
public void serialize(@NotNull Send<Buffer> deserialized, @NotNull Buffer deserializedToReceive) {
try (var received = deserialized.receive()) {
deserializedToReceive.ensureWritable(received.readableBytes());
deserializedToReceive.writeBytes(received);
}
}
@Override
public @NotNull Send<Buffer> serialize(@NotNull String deserialized) {
var bytes = deserialized.getBytes(StandardCharsets.UTF_8);
try (Buffer buf = allocator.allocate(Integer.BYTES + bytes.length)) {
assert buf.isAccessible();
buf.writeInt(bytes.length);
buf.writeBytes(bytes);
assert buf.isAccessible();
return buf.send();
}
@Override
public int getSerializedSizeHint() {
return -1;
}
};
Serializer<String> UTF8_SERIALIZER = new Serializer<>() {
@Override
public @NotNull String deserialize(@NotNull Buffer serialized) {
assert serialized.isAccessible();
int length = serialized.readInt();
try (var strBuf = serialized.readSplit(length)) {
return LLUtils.deserializeString(strBuf, strBuf.readerOffset(), length, StandardCharsets.UTF_8);
}
};
}
}
@Override
public void serialize(@NotNull String deserialized, Buffer output) {
var bytes = deserialized.getBytes(StandardCharsets.UTF_8);
output.ensureWritable(Integer.BYTES + bytes.length);
output.writeInt(bytes.length);
output.writeBytes(bytes);
}
@Override
public int getSerializedSizeHint() {
return -1;
}
};
}

View File

@ -14,31 +14,33 @@ public interface SerializerFixedBinaryLength<A> extends Serializer<A> {
int getSerializedBinaryLength();
static SerializerFixedBinaryLength<Send<Buffer>> noop(int length) {
@Override
default int getSerializedSizeHint() {
return getSerializedBinaryLength();
}
static SerializerFixedBinaryLength<Buffer> noop(int length) {
return new SerializerFixedBinaryLength<>() {
@Override
public @NotNull DeserializationResult<Send<Buffer>> deserialize(@NotNull Send<Buffer> serialized) {
public @NotNull Buffer deserialize(@NotNull Buffer serialized) {
Objects.requireNonNull(serialized);
try (var buf = serialized.receive()) {
if (buf.readableBytes() != getSerializedBinaryLength()) {
throw new IllegalArgumentException(
"Fixed serializer with " + getSerializedBinaryLength() + " bytes has tried to deserialize an element with "
+ buf.readableBytes() + " bytes instead");
}
var readableBytes = buf.readableBytes();
return new DeserializationResult<>(buf.send(), readableBytes);
if (serialized.readableBytes() < getSerializedBinaryLength()) {
throw new IllegalArgumentException(
"Fixed serializer with " + getSerializedBinaryLength() + " bytes has tried to deserialize an element with "
+ serialized.readableBytes() + " bytes instead");
}
return serialized.readSplit(getSerializedBinaryLength());
}
@Override
public @NotNull Send<Buffer> serialize(@NotNull Send<Buffer> deserialized) {
try (Buffer buf = deserialized.receive()) {
if (buf.readableBytes() != getSerializedBinaryLength()) {
public void serialize(@NotNull Buffer deserialized, Buffer output) {
try (deserialized) {
if (deserialized.readableBytes() != getSerializedBinaryLength()) {
throw new IllegalArgumentException(
"Fixed serializer with " + getSerializedBinaryLength() + " bytes has tried to serialize an element with "
+ buf.readableBytes() + " bytes instead");
+ deserialized.readableBytes() + " bytes instead");
}
return buf.send();
output.writeBytes(deserialized);
}
}
@ -49,39 +51,32 @@ public interface SerializerFixedBinaryLength<A> extends Serializer<A> {
};
}
static SerializerFixedBinaryLength<String> utf8(BufferAllocator allocator, int length) {
static SerializerFixedBinaryLength<String> utf8(int length) {
return new SerializerFixedBinaryLength<>() {
@Override
public @NotNull DeserializationResult<String> deserialize(@NotNull Send<Buffer> serializedToReceive)
throws SerializationException {
try (var serialized = serializedToReceive.receive()) {
if (serialized.readableBytes() != getSerializedBinaryLength()) {
throw new SerializationException(
"Fixed serializer with " + getSerializedBinaryLength() + " bytes has tried to deserialize an element with "
+ serialized.readableBytes() + " bytes instead");
}
var readerOffset = serialized.readerOffset();
return new DeserializationResult<>(LLUtils.deserializeString(serialized.send(),
readerOffset, length, StandardCharsets.UTF_8), length);
public @NotNull String deserialize(@NotNull Buffer serialized) throws SerializationException {
if (serialized.readableBytes() < getSerializedBinaryLength()) {
throw new SerializationException(
"Fixed serializer with " + getSerializedBinaryLength() + " bytes has tried to deserialize an element with "
+ serialized.readableBytes() + " bytes instead");
}
var readerOffset = serialized.readerOffset();
return LLUtils.deserializeString(serialized.send(), readerOffset, length, StandardCharsets.UTF_8);
}
@Override
public @NotNull Send<Buffer> serialize(@NotNull String deserialized) throws SerializationException {
// UTF-8 uses max. 3 bytes per char, so calculate the worst case.
try (Buffer buf = allocator.allocate(LLUtils.utf8MaxBytes(deserialized))) {
assert buf.isAccessible();
var bytes = deserialized.getBytes(StandardCharsets.UTF_8);
buf.ensureWritable(bytes.length);
buf.writeBytes(bytes);
if (buf.readableBytes() != getSerializedBinaryLength()) {
throw new SerializationException("Fixed serializer with " + getSerializedBinaryLength()
+ " bytes has tried to serialize an element with "
+ buf.readableBytes() + " bytes instead");
}
assert buf.isAccessible();
return buf.send();
public void serialize(@NotNull String deserialized, Buffer output) throws SerializationException {
assert output.isAccessible();
var bytes = deserialized.getBytes(StandardCharsets.UTF_8);
output.ensureWritable(bytes.length);
output.writeBytes(bytes);
if (output.readableBytes() < getSerializedBinaryLength()) {
throw new SerializationException("Fixed serializer with " + getSerializedBinaryLength()
+ " bytes has tried to serialize an element with "
+ output.readableBytes() + " bytes instead");
}
assert output.isAccessible();
}
@Override
@ -93,24 +88,21 @@ public interface SerializerFixedBinaryLength<A> extends Serializer<A> {
static SerializerFixedBinaryLength<Integer> intSerializer(BufferAllocator allocator) {
return new SerializerFixedBinaryLength<>() {
@Override
public @NotNull DeserializationResult<Integer> deserialize(@NotNull Send<Buffer> serializedToReceive) {
Objects.requireNonNull(serializedToReceive);
try (var serialized = serializedToReceive.receive()) {
if (serialized.readableBytes() != getSerializedBinaryLength()) {
throw new IllegalArgumentException(
"Fixed serializer with " + getSerializedBinaryLength() + " bytes has tried to deserialize an element with "
+ serialized.readableBytes() + " bytes instead");
}
return new DeserializationResult<>(serialized.readInt(), Integer.BYTES);
public @NotNull Integer deserialize(@NotNull Buffer serialized) {
Objects.requireNonNull(serialized);
if (serialized.readableBytes() < getSerializedBinaryLength()) {
throw new IllegalArgumentException(
"Fixed serializer with " + getSerializedBinaryLength() + " bytes has tried to deserialize an element with "
+ serialized.readableBytes() + " bytes instead");
}
return serialized.readInt();
}
@Override
public @NotNull Send<Buffer> serialize(@NotNull Integer deserialized) {
try (Buffer buf = allocator.allocate(Integer.BYTES)) {
return buf.writeInt(deserialized).send();
}
public void serialize(@NotNull Integer deserialized, Buffer output) {
output.writeInt(deserialized);
}
@Override
@ -122,25 +114,21 @@ public interface SerializerFixedBinaryLength<A> extends Serializer<A> {
static SerializerFixedBinaryLength<Long> longSerializer(BufferAllocator allocator) {
return new SerializerFixedBinaryLength<>() {
@Override
public @NotNull DeserializationResult<Long> deserialize(@NotNull Send<Buffer> serializedToReceive) {
Objects.requireNonNull(serializedToReceive);
try (var serialized = serializedToReceive.receive()) {
if (serialized.readableBytes() != getSerializedBinaryLength()) {
throw new IllegalArgumentException(
"Fixed serializer with " + getSerializedBinaryLength() + " bytes has tried to deserialize an element with "
+ serialized.readableBytes() + " bytes instead");
}
var readableBytes = serialized.readableBytes();
return new DeserializationResult<>(serialized.readLong(), Long.BYTES);
public @NotNull Long deserialize(@NotNull Buffer serialized) {
Objects.requireNonNull(serialized);
if (serialized.readableBytes() < getSerializedBinaryLength()) {
throw new IllegalArgumentException(
"Fixed serializer with " + getSerializedBinaryLength() + " bytes has tried to deserialize an element with "
+ serialized.readableBytes() + " bytes instead");
}
return serialized.readLong();
}
@Override
public @NotNull Send<Buffer> serialize(@NotNull Long deserialized) {
try (Buffer buf = allocator.allocate(Long.BYTES)) {
return buf.writeLong(deserialized).send();
}
public void serialize(@NotNull Long deserialized, Buffer output) {
output.writeLong(deserialized);
}
@Override

View File

@ -24,6 +24,7 @@ import it.cavallium.dbengine.database.collections.DatabaseStageMap;
import it.cavallium.dbengine.database.collections.SubStageGetterHashMap;
import it.cavallium.dbengine.database.collections.SubStageGetterMap;
import it.cavallium.dbengine.database.disk.MemorySegmentUtils;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import java.nio.file.Path;
@ -154,14 +155,14 @@ public class DbTestUtils {
int keyBytes) {
if (mapType == MapType.MAP) {
return DatabaseMapDictionary.simple(dictionary,
SerializerFixedBinaryLength.utf8(dictionary.getAllocator(), keyBytes),
Serializer.utf8(dictionary.getAllocator()),
SerializerFixedBinaryLength.utf8(keyBytes),
Serializer.UTF8_SERIALIZER,
null
);
} else {
return DatabaseMapDictionaryHashed.simple(dictionary,
Serializer.utf8(dictionary.getAllocator()),
Serializer.utf8(dictionary.getAllocator()),
Serializer.UTF8_SERIALIZER,
Serializer.UTF8_SERIALIZER,
s -> (short) s.hashCode(),
new SerializerFixedBinaryLength<>() {
@Override
@ -170,21 +171,15 @@ public class DbTestUtils {
}
@Override
public @NotNull DeserializationResult<Short> deserialize(@Nullable Send<Buffer> serializedToReceive) {
Objects.requireNonNull(serializedToReceive);
try (var serialized = serializedToReceive.receive()) {
var val = serialized.readShort();
return new DeserializationResult<>(val, Short.BYTES);
}
public @NotNull Short deserialize(@NotNull Buffer serialized) throws SerializationException {
Objects.requireNonNull(serialized);
var val = serialized.readShort();
return val;
}
@Override
public @NotNull Send<Buffer> serialize(@NotNull Short deserialized) {
try (var out = dictionary.getAllocator().allocate(Short.BYTES)) {
out.writeShort(deserialized);
out.writerOffset(Short.BYTES);
return out.send();
}
public void serialize(@NotNull Short deserialized, Buffer output) throws SerializationException {
output.writeShort(deserialized);
}
},
null
@ -198,10 +193,10 @@ public class DbTestUtils {
int key1Bytes,
int key2Bytes) {
return DatabaseMapDictionaryDeep.deepTail(dictionary,
SerializerFixedBinaryLength.utf8(dictionary.getAllocator(), key1Bytes),
SerializerFixedBinaryLength.utf8(key1Bytes),
key2Bytes,
new SubStageGetterMap<>(SerializerFixedBinaryLength.utf8(dictionary.getAllocator(), key2Bytes),
Serializer.utf8(dictionary.getAllocator())
new SubStageGetterMap<>(SerializerFixedBinaryLength.utf8(key2Bytes),
Serializer.UTF8_SERIALIZER
),
null
);
@ -212,10 +207,10 @@ public class DbTestUtils {
LLDictionary dictionary,
int key1Bytes) {
return DatabaseMapDictionaryDeep.deepTail(dictionary,
SerializerFixedBinaryLength.utf8(dictionary.getAllocator(), key1Bytes),
SerializerFixedBinaryLength.utf8(key1Bytes),
Integer.BYTES,
new SubStageGetterHashMap<>(Serializer.utf8(dictionary.getAllocator()),
Serializer.utf8(dictionary.getAllocator()),
new SubStageGetterHashMap<>(Serializer.UTF8_SERIALIZER,
Serializer.UTF8_SERIALIZER,
String::hashCode,
SerializerFixedBinaryLength.intSerializer(dictionary.getAllocator())
),
@ -226,8 +221,8 @@ public class DbTestUtils {
public static <T, U> DatabaseMapDictionaryHashed<String, String, Integer> tempDatabaseMapDictionaryHashMap(
LLDictionary dictionary) {
return DatabaseMapDictionaryHashed.simple(dictionary,
Serializer.utf8(dictionary.getAllocator()),
Serializer.utf8(dictionary.getAllocator()),
Serializer.UTF8_SERIALIZER,
Serializer.UTF8_SERIALIZER,
String::hashCode,
SerializerFixedBinaryLength.intSerializer(dictionary.getAllocator()),
null

View File

@ -224,7 +224,7 @@ public abstract class TestDictionaryMap {
.flatMapMany(map -> Flux
.concat(
map.updateValue(key, old -> {
assert old == null;
Assertions.assertNull(old);
return "error?";
}),
map.updateValue(key, false, old -> {
@ -624,8 +624,8 @@ public abstract class TestDictionaryMap {
.concat(
map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()),
map.get(null)
.map(Map::entrySet)
.flatMapIterable(list -> list)
.map(Map::entrySet)
.flatMapIterable(list -> list)
)
.doFinally(s -> map.close())
)
@ -707,27 +707,37 @@ public abstract class TestDictionaryMap {
@ParameterizedTest
@MethodSource("provideArgumentsPutMulti")
public void testPutMultiClear(MapType mapType, UpdateMode updateMode, Map<String, String> entries, boolean shouldFail) {
Step<Boolean> stpVer = StepVerifier
.create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5))
.flatMapMany(map -> Flux
.concat(
map.isEmpty(null),
map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()),
map.isEmpty(null),
map.clear().then(Mono.empty()),
map.isEmpty(null)
)
.doFinally(s -> map.close())
)
.flatMap(val -> shouldFail ? Mono.empty() : Mono.just(val))
.transform(LLUtils::handleDiscard)
));
if (shouldFail) {
this.checkLeaks = false;
stpVer.verifyError();
} else {
stpVer.expectNext(true, entries.isEmpty(), true).verifyComplete();
List<Boolean> result;
try {
result = SyncUtils.run(DbTestUtils.tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5))
.flatMapMany(map -> Flux
.concat(
map.isEmpty(null),
map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()),
map.isEmpty(null),
map.clear().then(Mono.empty()),
map.isEmpty(null)
)
.doFinally(s -> map.close())
)
.flatMap(val -> shouldFail ? Mono.empty() : Mono.just(val))
.transform(LLUtils::handleDiscard)
.collectList()
).singleOrEmpty());
} catch (Exception ex) {
if (shouldFail) {
this.checkLeaks = false;
} else {
throw ex;
}
return;
}
Assertions.assertEquals(true, result.get(0));
Assertions.assertEquals(entries.isEmpty(), result.get(1));
Assertions.assertEquals(true, result.get(2));
}
}