From 80d0ced8880b74410459caaab0dd5019f6010a20 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Tue, 19 Oct 2021 00:22:05 +0200 Subject: [PATCH] Optimistic transactions, inline codecs --- .../dbengine/client/MappedSerializer.java | 16 +- .../client/MappedSerializerFixedLength.java | 11 +- .../cavallium/dbengine/database/LLUtils.java | 66 ++- .../database/collections/DatabaseEmpty.java | 19 +- .../collections/DatabaseMapDictionary.java | 236 ++++---- .../DatabaseMapDictionaryDeep.java | 67 ++- .../DatabaseMapDictionaryHashed.java | 4 +- .../database/collections/DatabaseSingle.java | 55 +- .../SubStageGetterSingleBytes.java | 2 +- .../collections/ValueWithHashSerializer.java | 42 +- .../collections/ValuesSetSerializer.java | 50 +- .../database/disk/LLLocalDictionary.java | 526 ++++++++++-------- ...LLLocalKeyPrefixReactiveRocksIterator.java | 3 +- .../serialization/BufferDataInput.java | 123 +--- .../serialization/BufferDataInputOwned.java | 145 +++++ .../serialization/BufferDataInputShared.java | 137 +++++ .../database/serialization/Codec.java | 2 - .../serialization/CodecSerializer.java | 32 +- .../database/serialization/Serializer.java | 104 ++-- .../SerializerFixedBinaryLength.java | 124 ++--- .../it/cavallium/dbengine/DbTestUtils.java | 43 +- .../cavallium/dbengine/TestDictionaryMap.java | 58 +- 22 files changed, 1143 insertions(+), 722 deletions(-) create mode 100644 src/main/java/it/cavallium/dbengine/database/serialization/BufferDataInputOwned.java create mode 100644 src/main/java/it/cavallium/dbengine/database/serialization/BufferDataInputShared.java diff --git a/src/main/java/it/cavallium/dbengine/client/MappedSerializer.java b/src/main/java/it/cavallium/dbengine/client/MappedSerializer.java index 5fcf3a4..880f90f 100644 --- a/src/main/java/it/cavallium/dbengine/client/MappedSerializer.java +++ b/src/main/java/it/cavallium/dbengine/client/MappedSerializer.java @@ -19,15 +19,17 @@ public class MappedSerializer implements Serializer { } @Override - public @NotNull DeserializationResult deserialize(@NotNull Send serialized) throws SerializationException { - try (serialized) { - var deserialized = serializer.deserialize(serialized); - return new DeserializationResult<>(keyMapper.map(deserialized.deserializedData()), deserialized.bytesRead()); - } + public @NotNull B deserialize(@NotNull Buffer serialized) throws SerializationException { + return keyMapper.map(serializer.deserialize(serialized)); } @Override - public @NotNull Send serialize(@NotNull B deserialized) throws SerializationException { - return serializer.serialize(keyMapper.unmap(deserialized)); + public void serialize(@NotNull B deserialized, Buffer output) throws SerializationException { + serializer.serialize(keyMapper.unmap(deserialized), output); + } + + @Override + public int getSerializedSizeHint() { + return serializer.getSerializedSizeHint(); } } diff --git a/src/main/java/it/cavallium/dbengine/client/MappedSerializerFixedLength.java b/src/main/java/it/cavallium/dbengine/client/MappedSerializerFixedLength.java index e1caacb..d4fe711 100644 --- a/src/main/java/it/cavallium/dbengine/client/MappedSerializerFixedLength.java +++ b/src/main/java/it/cavallium/dbengine/client/MappedSerializerFixedLength.java @@ -19,16 +19,13 @@ public class MappedSerializerFixedLength implements SerializerFixedBinaryL } @Override - public @NotNull DeserializationResult deserialize(@NotNull Send serialized) throws SerializationException { - try (serialized) { - var deserialized = fixedLengthSerializer.deserialize(serialized); - return new DeserializationResult<>(keyMapper.map(deserialized.deserializedData()), deserialized.bytesRead()); - } + public @NotNull B deserialize(@NotNull Buffer serialized) throws SerializationException { + return keyMapper.map(fixedLengthSerializer.deserialize(serialized)); } @Override - public @NotNull Send serialize(@NotNull B deserialized) throws SerializationException { - return fixedLengthSerializer.serialize(keyMapper.unmap(deserialized)); + public void serialize(@NotNull B deserialized, Buffer output) throws SerializationException { + fixedLengthSerializer.serialize(keyMapper.unmap(deserialized), output); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 00e97f1..1ba0c1a 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -63,7 +63,6 @@ import reactor.util.function.Tuple3; public class LLUtils { private static final Logger logger = LoggerFactory.getLogger(LLUtils.class); - public static final Marker MARKER_DB_BUFFER = MarkerFactory.getMarker("DB_BUFFER"); public static final Marker MARKER_ROCKSDB = MarkerFactory.getMarker("ROCKSDB"); public static final Marker MARKER_LUCENE = MarkerFactory.getMarker("LUCENE"); @@ -193,6 +192,18 @@ public class LLUtils { } } + public static String toStringSafe(byte @Nullable[] key) { + try { + if (key == null) { + return toString(key); + } else { + return "(released)"; + } + } catch (IllegalReferenceCountException ex) { + return "(released)"; + } + } + public static String toStringSafe(@Nullable LLRange range) { try { if (range == null || range.isAccessible()) { @@ -268,6 +279,53 @@ public class LLUtils { } } + public static String toString(byte @Nullable[] key) { + if (key == null) { + return "null"; + } else { + int startIndex = 0; + int iMax = key.length - 1; + int iLimit = 128; + if (iMax <= -1) { + return "[]"; + } else { + StringBuilder arraySB = new StringBuilder(); + StringBuilder asciiSB = new StringBuilder(); + boolean isAscii = true; + arraySB.append('['); + int i = 0; + + while (true) { + var byteVal = (int) key[startIndex + i]; + arraySB.append(byteVal); + if (isAscii) { + if (byteVal >= 32 && byteVal < 127) { + asciiSB.append((char) byteVal); + } else if (byteVal == 0) { + asciiSB.append('␀'); + } else { + isAscii = false; + asciiSB = null; + } + } + if (i == iLimit) { + arraySB.append("…"); + } + if (i == iMax || i == iLimit) { + if (isAscii) { + return asciiSB.insert(0, "\"").append("\"").toString(); + } else { + return arraySB.append(']').toString(); + } + } + + arraySB.append(", "); + ++i; + } + } + } + } + public static boolean equals(Buffer a, Buffer b) { if (a == null && b == null) { return true; @@ -1005,6 +1063,12 @@ public class LLUtils { } } + public static String deserializeString(@NotNull Buffer buffer, int readerOffset, int length, Charset charset) { + byte[] bytes = new byte[Math.min(length, buffer.readableBytes())]; + buffer.copyInto(readerOffset, bytes, 0, length); + return new String(bytes, charset); + } + public static int utf8MaxBytes(String deserialized) { return deserialized.length() * 3; } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java index 1d9b9ea..ed610a0 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java @@ -6,8 +6,8 @@ import io.net5.buffer.api.Drop; import io.net5.buffer.api.Send; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.Serializer; -import it.cavallium.dbengine.database.serialization.Serializer.DeserializationResult; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -15,20 +15,23 @@ public class DatabaseEmpty { @SuppressWarnings({"unused", "InstantiationOfUtilityClass"}) public static final Nothing NOTHING = new Nothing(); - public static final DeserializationResult NOTHING_RESULT = new DeserializationResult<>(NOTHING, 0); public static Serializer nothingSerializer(BufferAllocator bufferAllocator) { return new Serializer<>() { + @Override - public @NotNull DeserializationResult deserialize(@Nullable Send serialized) { - try (serialized) { - return NOTHING_RESULT; - } + public @NotNull Nothing deserialize(@NotNull Buffer serialized) { + return NOTHING; } @Override - public @NotNull Send serialize(@NotNull Nothing deserialized) { - return LLUtils.empty(bufferAllocator); + public void serialize(@NotNull Nothing deserialized, Buffer output) { + + } + + @Override + public int getSerializedSizeHint() { + return 0; } }; } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index c69537c..6829e8f 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -1,7 +1,6 @@ package it.cavallium.dbengine.database.collections; import io.net5.buffer.api.Buffer; -import io.net5.buffer.api.Drop; import io.net5.buffer.api.Send; import io.net5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.client.CompositeSnapshot; @@ -66,9 +65,36 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep(dictionary, prefixKey, keySuffixSerializer, valueSerializer, onClose); } + private void deserializeValue(Send valueToReceive, SynchronousSink sink) { + try (var value = valueToReceive.receive()) { + sink.next(valueSerializer.deserialize(value)); + } catch (Throwable ex) { + sink.error(ex); + } + } + + private Send serializeValue(U value) throws SerializationException { + var valSizeHint = valueSerializer.getSerializedSizeHint(); + if (valSizeHint == -1) valSizeHint = 128; + try (var valBuf = dictionary.getAllocator().allocate(valSizeHint)) { + valueSerializer.serialize(value, valBuf); + return valBuf.send(); + } + } + + private Send serializeKeySuffixToKey(T keySuffix) throws SerializationException { + try (var keyBuf = keyPrefix.copy()) { + assert keyBuf.readableBytes() == keyPrefixLength; + keyBuf.ensureWritable(keySuffixLength + keyExtLength); + serializeSuffix(keySuffix, keyBuf); + assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength; + return keyBuf.send(); + } + } + private Send toKey(Send suffixKeyToSend) { try (var suffixKey = suffixKeyToSend.receive()) { - assert suffixKeyConsistency(suffixKey.readableBytes()); + assert suffixKeyLengthConsistency(suffixKey.readableBytes()); if (keyPrefix.readableBytes() > 0) { try (var result = LLUtils.compositeBuffer(dictionary.getAllocator(), LLUtils.copy(dictionary.getAllocator(), keyPrefix), @@ -84,29 +110,28 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep value, SynchronousSink sink) { - try { - sink.next(valueSerializer.deserialize(value).deserializedData()); - } catch (SerializationException ex) { - sink.error(ex); - } - } - @Override public Mono> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) { return dictionary .getRange(resolveSnapshot(snapshot), rangeMono, existsAlmostCertainly) .>handle((entrySend, sink) -> { - try (var entry = entrySend.receive()) { - T key; - try (var serializedKey = entry.getKey().receive()) { - removePrefix(serializedKey); - suffixKeyConsistency(serializedKey.readableBytes()); - key = deserializeSuffix(serializedKey.send()); + Entry deserializedEntry; + try { + try (var entry = entrySend.receive()) { + T key; + try (var serializedKey = entry.getKey().receive()) { + splitPrefix(serializedKey).close(); + suffixKeyLengthConsistency(serializedKey.readableBytes()); + key = deserializeSuffix(serializedKey); + } + U value; + try (var valueBuf = entry.getValue().receive()) { + value = valueSerializer.deserialize(valueBuf); + } + deserializedEntry = Map.entry(key, value); } - var value = valueSerializer.deserialize(entry.getValue()).deserializedData(); - sink.next(Map.entry(key, value)); - } catch (SerializationException ex) { + sink.next(deserializedEntry); + } catch (Throwable ex) { sink.error(ex); } }) @@ -120,14 +145,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep { - try { - sink.next(LLEntry.of(this.toKey(serializeSuffix(entry.getKey())), - valueSerializer.serialize(entry.getValue())).send()); - } catch (SerializationException e) { - sink.error(e); - } - }) + .handle(this::serializeEntrySink) ).then(Mono.empty())) .singleOrEmpty() .transform(LLUtils::handleDiscard); @@ -152,14 +170,14 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> at(@Nullable CompositeSnapshot snapshot, T keySuffix) { return Mono.fromCallable(() -> - new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix)), valueSerializer, null)); + new DatabaseSingle<>(dictionary, serializeKeySuffixToKey(keySuffix), valueSerializer, null)); } @Override public Mono getValue(@Nullable CompositeSnapshot snapshot, T keySuffix, boolean existsAlmostCertainly) { return dictionary .get(resolveSnapshot(snapshot), - Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix))), + Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix)), existsAlmostCertainly ) .handle(this::deserializeValue); @@ -167,8 +185,8 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep putValue(T keySuffix, U value) { - var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix))).single(); - var valueMono = Mono.fromCallable(() -> valueSerializer.serialize(value)).single(); + var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix)).single(); + var valueMono = Mono.fromCallable(() -> serializeValue(value)).single(); return dictionary .put(keyMono, valueMono, LLDictionaryResultType.VOID) .doOnNext(Send::close) @@ -185,7 +203,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep updater) { - var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix))); + var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix)); return dictionary .update(keyMono, getSerializedUpdater(updater), updateReturnMode, existsAlmostCertainly) .handle(this::deserializeValue); @@ -195,12 +213,14 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> updateValueAndGetDelta(T keySuffix, boolean existsAlmostCertainly, SerializationFunction<@Nullable U, @Nullable U> updater) { - var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix))); + var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix)); return dictionary .updateAndGetDelta(keyMono, getSerializedUpdater(updater), existsAlmostCertainly) - .transform(mono -> LLUtils.mapLLDelta(mono, - serialized -> valueSerializer.deserialize(serialized).deserializedData() - )); + .transform(mono -> LLUtils.mapLLDelta(mono, serializedToReceive -> { + try (var serialized = serializedToReceive.receive()) { + return valueSerializer.deserialize(serialized); + } + })); } public SerializationFunction<@Nullable Send, @Nullable Send> getSerializedUpdater( @@ -211,12 +231,14 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep extends DatabaseMapDictionaryDeep extends DatabaseMapDictionaryDeep putValueAndGetPrevious(T keySuffix, U value) { - var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix))); - var valueMono = Mono.fromCallable(() -> valueSerializer.serialize(value)); - return dictionary - .put(keyMono, - valueMono, - LLDictionaryResultType.PREVIOUS_VALUE) - .handle(this::deserializeValue); + var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix)); + var valueMono = Mono.fromCallable(() -> serializeValue(value)); + return dictionary.put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE).handle(this::deserializeValue); } @Override public Mono putValueAndGetChanged(T keySuffix, U value) { - var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix))); - var valueMono = Mono.fromCallable(() -> valueSerializer.serialize(value)); + var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix)); + var valueMono = Mono.fromCallable(() -> serializeValue(value)); return dictionary .put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE) .handle(this::deserializeValue) @@ -265,7 +285,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep remove(T keySuffix) { - var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix))); + var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix)); return dictionary .remove(keyMono, LLDictionaryResultType.VOID) .doOnNext(Send::close) @@ -274,15 +294,13 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep removeAndGetPrevious(T keySuffix) { - var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix))); - return dictionary - .remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE) - .handle(this::deserializeValue); + var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix)); + return dictionary.remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE).handle(this::deserializeValue); } @Override public Mono removeAndGetStatus(T keySuffix) { - var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix))); + var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix)); return dictionary .remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE_EXISTENCE) .map(LLUtils::responseToBoolean); @@ -293,8 +311,9 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep>>handle((keySuffix, sink) -> { try { - sink.next(Tuples.of(keySuffix, toKey(serializeSuffix(keySuffix)))); - } catch (SerializationException ex) { + Tuple2> tuple = Tuples.of(keySuffix, serializeKeySuffixToKey(keySuffix)); + sink.next(tuple); + } catch (Throwable ex) { sink.error(ex); } }); @@ -304,14 +323,14 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep valueOpt; if (entry.getT3().isPresent()) { - try (var buf = entry.getT3().get()) { - valueOpt = Optional.of(valueSerializer.deserialize(buf).deserializedData()); + try (var buf = entry.getT3().get().receive()) { + valueOpt = Optional.of(valueSerializer.deserialize(buf)); } } else { valueOpt = Optional.empty(); } sink.next(Map.entry(entry.getT1(), valueOpt)); - } catch (SerializationException ex) { + } catch (Throwable ex) { sink.error(ex); } finally { entry.getT2().close(); @@ -321,22 +340,29 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep serializeEntry(T key, U value) throws SerializationException { - try (var serializedKey = toKey(serializeSuffix(key))) { - var serializedValueToReceive = valueSerializer.serialize(value); - try (var serializedValue = serializedValueToReceive.receive()) { - return LLEntry.of(serializedKey, serializedValue.send()).send(); + private Send serializeEntry(T keySuffix, U value) throws SerializationException { + try (var key = serializeKeySuffixToKey(keySuffix)) { + try (var serializedValue = serializeValue(value)) { + return LLEntry.of(key, serializedValue).send(); } } } + private void serializeEntrySink(Entry entry, SynchronousSink> sink) { + try { + sink.next(serializeEntry(entry.getKey(), entry.getValue())); + } catch (Throwable e) { + sink.error(e); + } + } + @Override public Mono putMulti(Flux> entries) { var serializedEntries = entries .>handle((entry, sink) -> { try { sink.next(serializeEntry(entry.getKey(), entry.getValue())); - } catch (SerializationException e) { + } catch (Throwable e) { sink.error(e); } }) @@ -363,8 +389,9 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep, X>>handle((entry, sink) -> { try { - sink.next(Tuples.of(serializeSuffix(entry.getT1()), entry.getT2())); - } catch (SerializationException ex) { + Send serializedKey = serializeKeySuffixToKey(entry.getT1()); + sink.next(Tuples.of(serializedKey, entry.getT2())); + } catch (Throwable ex) { sink.error(ex); } }) @@ -377,17 +404,17 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep { - try { - sink.next(new ExtraKeyOperationResult<>(deserializeSuffix(result.key()), - result.extra(), - result.changed() - )); - } catch (SerializationException ex) { - sink.error(ex); - } - }); + return dictionary.updateMulti(serializedEntries, serializedUpdater).handle((result, sink) -> { + try { + T keySuffix; + try (var keySuffixBuf = result.key().receive()) { + keySuffix = deserializeSuffix(keySuffixBuf); + } + sink.next(new ExtraKeyOperationResult<>(keySuffix, result.extra(), result.changed())); + } catch (Throwable ex) { + sink.error(ex); + } + }); } @Override @@ -398,12 +425,15 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep(dictionary, toKey(keyBuf.send()), valueSerializer, null) - )); - } catch (SerializationException ex) { + splitPrefix(keyBuf).close(); + suffixKeyLengthConsistency(keyBuf.readableBytes()); + T keySuffix; + try (var keyBufCopy = keyBuf.copy()) { + keySuffix = deserializeSuffix(keyBufCopy); + } + var subStage = new DatabaseSingle<>(dictionary, toKey(keyBuf.send()), valueSerializer, null); + sink.next(Map.entry(keySuffix, subStage)); + } catch (Throwable ex) { sink.error(ex); } }); @@ -414,16 +444,25 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep>handle((serializedEntryToReceive, sink) -> { - try (var serializedEntry = serializedEntryToReceive.receive()) { - try (var keyBuf = serializedEntry.getKey().receive()) { - assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength; - // Remove prefix. Keep only the suffix and the ext - removePrefix(keyBuf); - suffixKeyConsistency(keyBuf.readableBytes()); - sink.next(Map.entry(deserializeSuffix(keyBuf.send()), - valueSerializer.deserialize(serializedEntry.getValue()).deserializedData())); + try { + Entry entry; + try (var serializedEntry = serializedEntryToReceive.receive()) { + try (var keyBuf = serializedEntry.getKey().receive()) { + assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength; + // Remove prefix. Keep only the suffix and the ext + splitPrefix(keyBuf).close(); + suffixKeyLengthConsistency(keyBuf.readableBytes()); + T keySuffix = deserializeSuffix(keyBuf); + + U value; + try (var valueBuf = serializedEntry.getValue().receive()) { + value = valueSerializer.deserialize(valueBuf); + } + entry = Map.entry(keySuffix, value); + } } - } catch (SerializationException e) { + sink.next(entry); + } catch (Throwable e) { sink.error(e); } }) @@ -441,14 +480,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> setAllValuesAndGetPrevious(Flux> entries) { return Flux.concat( this.getAllValues(null), - dictionary.setRange(rangeMono, entries.handle((entry, sink) -> { - try { - sink.next(LLEntry.of(toKey(serializeSuffix(entry.getKey())), - valueSerializer.serialize(entry.getValue())).send()); - } catch (SerializationException e) { - sink.error(e); - } - })).then(Mono.empty()) + dictionary.setRange(rangeMono, entries.handle(this::serializeEntrySink)).then(Mono.empty()) ); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index 1205f45..3e86296 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -269,7 +269,7 @@ public class DatabaseMapDictionaryDeep> extend } @SuppressWarnings("unused") - protected boolean suffixKeyConsistency(int keySuffixLength) { + protected boolean suffixKeyLengthConsistency(int keySuffixLength) { return this.keySuffixLength == keySuffixLength; } @@ -285,13 +285,15 @@ public class DatabaseMapDictionaryDeep> extend /** * Removes the prefix from the key + * @return the prefix */ - protected void removePrefix(Buffer key) { + protected Buffer splitPrefix(Buffer key) { assert key.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength || key.readableBytes() == keyPrefixLength + keySuffixLength; - key.readerOffset(key.readerOffset() + this.keyPrefixLength); + var prefix = key.readSplit(this.keyPrefixLength); assert key.readableBytes() == keySuffixLength + keyExtLength || key.readableBytes() == keySuffixLength; + return prefix; } /** @@ -334,7 +336,13 @@ public class DatabaseMapDictionaryDeep> extend @Override public Mono at(@Nullable CompositeSnapshot snapshot, T keySuffix) { - var suffixKeyWithoutExt = Mono.fromCallable(() -> toKeyWithoutExt(serializeSuffix(keySuffix))); + var suffixKeyWithoutExt = Mono.fromCallable(() -> { + try (var keyWithoutExtBuf = keyPrefix.copy()) { + keyWithoutExtBuf.ensureWritable(keySuffixLength + keyExtLength); + serializeSuffix(keySuffix, keyWithoutExtBuf); + return keyWithoutExtBuf.send(); + } + }); return this.subStageGetter .subStage(dictionary, snapshot, suffixKeyWithoutExt) .transform(LLUtils::handleDiscard) @@ -360,8 +368,10 @@ public class DatabaseMapDictionaryDeep> extend groupKeyWithoutExtSend -> this.subStageGetter .subStage(dictionary, snapshot, Mono.fromCallable(() -> groupKeyWithoutExtSend.copy().send())) .>handle((us, sink) -> { + T deserializedSuffix; try { - sink.next(Map.entry(this.deserializeSuffix(getGroupSuffix(groupKeyWithoutExtSend.send())), us)); + deserializedSuffix = this.deserializeSuffix(splitGroupSuffix(groupKeyWithoutExtSend)); + sink.next(Map.entry(deserializedSuffix, us)); } catch (SerializationException ex) { sink.error(ex); } @@ -371,13 +381,18 @@ public class DatabaseMapDictionaryDeep> extend .transform(LLUtils::handleDiscard); } - private Send getGroupSuffix(Send groupKeyWithoutExt) { - try (var buffer = groupKeyWithoutExt.receive()) { - assert subStageKeysConsistency(buffer.readableBytes() + keyExtLength); - this.removePrefix(buffer); - assert subStageKeysConsistency(keyPrefixLength + buffer.readableBytes() + keyExtLength); - return buffer.send(); - } + /** + * Split the input. The input will become the ext, the returned data will be the group suffix + * @param groupKey group key, will become ext + * @return group suffix + */ + private Buffer splitGroupSuffix(@NotNull Buffer groupKey) { + assert subStageKeysConsistency(groupKey.readableBytes()) + || subStageKeysConsistency(groupKey.readableBytes() + keyExtLength); + this.splitPrefix(groupKey).close(); + assert subStageKeysConsistency(keyPrefixLength + groupKey.readableBytes()) + || subStageKeysConsistency(keyPrefixLength + groupKey.readableBytes() + keyExtLength); + return groupKey.readSplit(keySuffixLength); } private Send getGroupWithoutExt(Send groupKeyWithExtSend) { @@ -430,25 +445,21 @@ public class DatabaseMapDictionaryDeep> extend } //todo: temporary wrapper. convert the whole class to buffers - protected T deserializeSuffix(@NotNull Send keySuffixToReceive) throws SerializationException { - try (var keySuffix = keySuffixToReceive.receive()) { - assert suffixKeyConsistency(keySuffix.readableBytes()); - var result = keySuffixSerializer.deserialize(keySuffix.send()); - assert keyPrefix.isAccessible(); - return result.deserializedData(); - } + protected T deserializeSuffix(@NotNull Buffer keySuffix) throws SerializationException { + assert suffixKeyLengthConsistency(keySuffix.readableBytes()); + var result = keySuffixSerializer.deserialize(keySuffix); + assert keyPrefix.isAccessible(); + return result; } //todo: temporary wrapper. convert the whole class to buffers - @NotNull - protected Send serializeSuffix(T keySuffix) throws SerializationException { - try (var suffixDataToReceive = keySuffixSerializer.serialize(keySuffix)) { - try (Buffer suffixData = suffixDataToReceive.receive()) { - assert suffixKeyConsistency(suffixData.readableBytes()); - assert keyPrefix.isAccessible(); - return suffixData.send(); - } - } + protected void serializeSuffix(T keySuffix, Buffer output) throws SerializationException { + output.ensureWritable(keySuffixLength); + var beforeWriterOffset = output.writerOffset(); + keySuffixSerializer.serialize(keySuffix, output); + var afterWriterOffset = output.writerOffset(); + assert suffixKeyLengthConsistency(afterWriterOffset - beforeWriterOffset); + assert keyPrefix.isAccessible(); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java index 319e993..5922599 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java @@ -77,9 +77,9 @@ public class DatabaseMapDictionaryHashed extends ResourceSupport valueWithHashSerializer - = new ValueWithHashSerializer<>(alloc, keySuffixSerializer, valueSerializer); + = new ValueWithHashSerializer<>(keySuffixSerializer, valueSerializer); ValuesSetSerializer> valuesSetSerializer - = new ValuesSetSerializer<>(alloc, valueWithHashSerializer); + = new ValuesSetSerializer<>(valueWithHashSerializer); this.subDictionary = DatabaseMapDictionary.tail(dictionary, prefixKey, keySuffixHashSerializer, valuesSetSerializer, onClose); this.keySuffixHashFunction = keySuffixHashFunction; diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java index 40efd77..5103ec7 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java @@ -84,13 +84,26 @@ public class DatabaseSingle extends ResourceSupport, Databas } private void deserializeValue(Send value, SynchronousSink sink) { - try (value) { - sink.next(serializer.deserialize(value).deserializedData()); + try { + U deserializedValue; + try (var valueBuf = value.receive()) { + deserializedValue = serializer.deserialize(valueBuf); + } + sink.next(deserializedValue); } catch (SerializationException ex) { sink.error(ex); } } + private Send serializeValue(U value) throws SerializationException { + var valSizeHint = serializer.getSerializedSizeHint(); + if (valSizeHint == -1) valSizeHint = 128; + try (var valBuf = dictionary.getAllocator().allocate(valSizeHint)) { + serializer.serialize(value, valBuf); + return valBuf.send(); + } + } + @Override public Mono get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) { return dictionary @@ -101,7 +114,7 @@ public class DatabaseSingle extends ResourceSupport, Databas @Override public Mono setAndGetPrevious(U value) { return dictionary - .put(keyMono, Mono.fromCallable(() -> serializer.serialize(value)), LLDictionaryResultType.PREVIOUS_VALUE) + .put(keyMono, Mono.fromCallable(() -> serializeValue(value)), LLDictionaryResultType.PREVIOUS_VALUE) .handle(this::deserializeValue); } @@ -112,12 +125,20 @@ public class DatabaseSingle extends ResourceSupport, Databas return dictionary .update(keyMono, (oldValueSer) -> { try (oldValueSer) { - var result = updater.apply(oldValueSer == null ? null - : serializer.deserialize(oldValueSer).deserializedData()); + U result; + if (oldValueSer == null) { + result = updater.apply(null); + } else { + U deserializedValue; + try (var valueBuf = oldValueSer.receive()) { + deserializedValue = serializer.deserialize(valueBuf); + } + result = updater.apply(deserializedValue); + } if (result == null) { return null; } else { - return serializer.serialize(result); + return serializeValue(result); } } }, updateReturnMode, existsAlmostCertainly) @@ -130,17 +151,27 @@ public class DatabaseSingle extends ResourceSupport, Databas return dictionary .updateAndGetDelta(keyMono, (oldValueSer) -> { try (oldValueSer) { - var result = updater.apply(oldValueSer == null ? null - : serializer.deserialize(oldValueSer).deserializedData()); + U result; + if (oldValueSer == null) { + result = updater.apply(null); + } else { + U deserializedValue; + try (var valueBuf = oldValueSer.receive()) { + deserializedValue = serializer.deserialize(valueBuf); + } + result = updater.apply(deserializedValue); + } if (result == null) { return null; } else { - return serializer.serialize(result); + return serializeValue(result); } } - }, existsAlmostCertainly).transform(mono -> LLUtils.mapLLDelta(mono, - serialized -> serializer.deserialize(serialized).deserializedData() - )); + }, existsAlmostCertainly).transform(mono -> LLUtils.mapLLDelta(mono, serialized -> { + try (var valueBuf = serialized.receive()) { + return serializer.deserialize(valueBuf); + } + })); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingleBytes.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingleBytes.java index 06efe00..162082f 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingleBytes.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingleBytes.java @@ -7,6 +7,6 @@ import it.cavallium.dbengine.database.serialization.Serializer; public class SubStageGetterSingleBytes extends SubStageGetterSingle> { public SubStageGetterSingleBytes() { - super(Serializer.noop()); + super(Serializer.NOOP_SEND_SERIALIZER); } } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/ValueWithHashSerializer.java b/src/main/java/it/cavallium/dbengine/database/collections/ValueWithHashSerializer.java index 549852e..adef940 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/ValueWithHashSerializer.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/ValueWithHashSerializer.java @@ -15,38 +15,40 @@ import org.jetbrains.annotations.Nullable; class ValueWithHashSerializer implements Serializer> { - private final BufferAllocator allocator; private final Serializer keySuffixSerializer; private final Serializer valueSerializer; - ValueWithHashSerializer(BufferAllocator allocator, + ValueWithHashSerializer( Serializer keySuffixSerializer, Serializer valueSerializer) { - this.allocator = allocator; this.keySuffixSerializer = keySuffixSerializer; this.valueSerializer = valueSerializer; } @Override - public @NotNull DeserializationResult> deserialize(@Nullable Send serializedToReceive) - throws SerializationException { - Objects.requireNonNull(serializedToReceive); - try (var serialized = serializedToReceive.receive()) { - DeserializationResult deserializedKey = keySuffixSerializer.deserialize(serialized.copy().send()); - DeserializationResult deserializedValue = valueSerializer.deserialize(serialized - .copy(serialized.readerOffset() + deserializedKey.bytesRead(), - serialized.readableBytes() - deserializedKey.bytesRead() - ) - .send()); - return new DeserializationResult<>(Map.entry(deserializedKey.deserializedData(), - deserializedValue.deserializedData()), deserializedKey.bytesRead() + deserializedValue.bytesRead()); - } + public @NotNull Entry deserialize(@NotNull Buffer serialized) throws SerializationException { + Objects.requireNonNull(serialized); + X deserializedKey = keySuffixSerializer.deserialize(serialized); + Y deserializedValue = valueSerializer.deserialize(serialized); + return Map.entry(deserializedKey, deserializedValue); } @Override - public @NotNull Send serialize(@NotNull Entry deserialized) throws SerializationException { - var keySuffix = keySuffixSerializer.serialize(deserialized.getKey()); - var value = valueSerializer.serialize(deserialized.getValue()); - return LLUtils.compositeBuffer(allocator, keySuffix, value).send(); + public void serialize(@NotNull Entry deserialized, Buffer output) throws SerializationException { + keySuffixSerializer.serialize(deserialized.getKey(), output); + valueSerializer.serialize(deserialized.getValue(), output); + } + + @Override + public int getSerializedSizeHint() { + var hint1 = keySuffixSerializer.getSerializedSizeHint(); + var hint2 = valueSerializer.getSerializedSizeHint(); + if (hint1 == -1 && hint2 == -1) { + return -1; + } else if (hint1 == -1) { + return hint2; + } else { + return hint1; + } } } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/ValuesSetSerializer.java b/src/main/java/it/cavallium/dbengine/database/collections/ValuesSetSerializer.java index 330ba68..37bd7db 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/ValuesSetSerializer.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/ValuesSetSerializer.java @@ -13,46 +13,34 @@ import org.jetbrains.annotations.Nullable; class ValuesSetSerializer implements Serializer> { - private final BufferAllocator allocator; private final Serializer entrySerializer; - ValuesSetSerializer(BufferAllocator allocator, Serializer entrySerializer) { - this.allocator = allocator; + ValuesSetSerializer(Serializer entrySerializer) { this.entrySerializer = entrySerializer; } @Override - public @NotNull DeserializationResult> deserialize(@Nullable Send serializedToReceive) throws SerializationException { - Objects.requireNonNull(serializedToReceive); - try (var serialized = serializedToReceive.receive()) { - int initialReaderOffset = serialized.readerOffset(); - int entriesLength = serialized.readInt(); - ArrayList deserializedElements = new ArrayList<>(entriesLength); - for (int i = 0; i < entriesLength; i++) { - var deserializationResult = entrySerializer.deserialize(serialized - .copy(serialized.readerOffset(), serialized.readableBytes()) - .send()); - deserializedElements.add(deserializationResult.deserializedData()); - serialized.readerOffset(serialized.readerOffset() + deserializationResult.bytesRead()); - } - return new DeserializationResult<>(new ObjectArraySet<>(deserializedElements), serialized.readerOffset() - initialReaderOffset); + public @NotNull ObjectArraySet deserialize(@NotNull Buffer serialized) throws SerializationException { + Objects.requireNonNull(serialized); + int entriesLength = serialized.readInt(); + ArrayList deserializedElements = new ArrayList<>(entriesLength); + for (int i = 0; i < entriesLength; i++) { + var deserializationResult = entrySerializer.deserialize(serialized); + deserializedElements.add(deserializationResult); + } + return new ObjectArraySet<>(deserializedElements); + } + + @Override + public void serialize(@NotNull ObjectArraySet deserialized, Buffer output) throws SerializationException { + output.writeInt(deserialized.size()); + for (X entry : deserialized) { + entrySerializer.serialize(entry, output); } } @Override - public @NotNull Send serialize(@NotNull ObjectArraySet deserialized) throws SerializationException { - try (Buffer output = allocator.allocate(64)) { - output.writeInt(deserialized.size()); - for (X entry : deserialized) { - var serializedToReceive = entrySerializer.serialize(entry); - try (Buffer serialized = serializedToReceive.receive()) { - if (serialized.readableBytes() > 0) { - output.ensureWritable(serialized.readableBytes()); - output.writeBytes(serialized); - } - } - } - return output.send(); - } + public int getSerializedSizeHint() { + return -1; } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index a700276..f610636 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -92,7 +92,6 @@ public class LLLocalDictionary implements LLDictionary { static final long MAX_WRITE_BATCH_SIZE = 1024L * 1024L * 1024L; // 1GiB static final int CAPPED_WRITE_BATCH_CAP = 50000; // 50K operations static final int MULTI_GET_WINDOW = 16; - static final Duration MULTI_GET_WINDOW_TIMEOUT = Duration.ofSeconds(1); static final ReadOptions EMPTY_READ_OPTIONS = new UnreleasableReadOptions(new UnmodifiableReadOptions()); static final WriteOptions EMPTY_WRITE_OPTIONS = new UnreleasableWriteOptions(new UnmodifiableWriteOptions()); static final WriteOptions BATCH_WRITE_OPTIONS = new UnreleasableWriteOptions(new UnmodifiableWriteOptions()); @@ -125,7 +124,6 @@ public class LLLocalDictionary implements LLDictionary { static final boolean USE_WRITE_BATCH_IN_SET_RANGE_DELETE = false; static final boolean PARALLEL_EXACT_SIZE = true; - private static final int STRIPES = 512; private static final byte[] FIRST_KEY = new byte[]{}; private static final byte[] NO_DATA = new byte[0]; @@ -158,10 +156,13 @@ public class LLLocalDictionary implements LLDictionary { private final Function snapshotResolver; private final UpdateMode updateMode; private final BufferAllocator alloc; - private final String getRangeMultiDebugName; - private final String getRangeKeysMultiDebugName; private final DatabaseOptions databaseOptions; + private final String getRangeMultiDebugName; + private final String getRangeMultiGroupedDebugName; + private final String getRangeKeysDebugName; + private final String getRangeKeysGroupedDebugName; + public LLLocalDictionary( BufferAllocator allocator, @NotNull OptimisticTransactionDB db, @@ -182,7 +183,9 @@ public class LLLocalDictionary implements LLDictionary { this.snapshotResolver = snapshotResolver; this.updateMode = updateMode; this.getRangeMultiDebugName = databaseName + "(" + columnName + ")" + "::getRangeMulti"; - this.getRangeKeysMultiDebugName = databaseName + "(" + columnName + ")" + "::getRangeKeysMulti"; + this.getRangeMultiGroupedDebugName = databaseName + "(" + columnName + ")" + "::getRangeMultiGrouped"; + this.getRangeKeysDebugName = databaseName + "(" + columnName + ")" + "::getRangeKeys"; + this.getRangeKeysGroupedDebugName = databaseName + "(" + columnName + ")" + "::getRangeKeysGrouped"; this.databaseOptions = databaseOptions; alloc = allocator; } @@ -220,34 +223,6 @@ public class LLLocalDictionary implements LLDictionary { } } - private int getLockIndex(Buffer key) { - return Math.abs(LLUtils.hashCode(key) % STRIPES); - } - - private IntArrayList getLockIndices(List keys) { - var list = new IntArrayList(keys.size()); - for (Buffer key : keys) { - list.add(getLockIndex(key)); - } - return list; - } - - private IntArrayList getLockIndicesEntries(List keys) { - var list = new IntArrayList(keys.size()); - for (LLEntry key : keys) { - list.add(getLockIndex(key.getKeyUnsafe())); - } - return list; - } - - private IntArrayList getLockIndicesWithExtra(List> entries) { - var list = new IntArrayList(entries.size()); - for (Tuple2 key : entries) { - list.add(getLockIndex(key.getT1())); - } - return list; - } - @Override public BufferAllocator getAllocator() { return alloc; @@ -275,7 +250,8 @@ public class LLLocalDictionary implements LLDictionary { var result = dbGet(cfh, resolveSnapshot(snapshot), key.send(), existsAlmostCertainly); if (logger.isTraceEnabled(MARKER_ROCKSDB)) { try (var result2 = result == null ? null : result.receive()) { - logger.trace(MARKER_ROCKSDB, "Reading {}: {}", LLUtils.toStringSafe(logKey), LLUtils.toString(result2)); + logger.trace(MARKER_ROCKSDB, "Reading {}: {}", LLUtils.toStringSafe(logKey), + LLUtils.toString(result2)); return result2 == null ? null : result2.send(); } } else { @@ -607,6 +583,23 @@ public class LLLocalDictionary implements LLDictionary { return Mono.fromSupplier(() -> updateMode); } + /** + * + * @return true if not committed successfully + */ + private boolean commitOptimistically(Transaction tx) throws RocksDBException { + try { + tx.commit(); + return true; + } catch (RocksDBException ex) { + var status = ex.getStatus() != null ? ex.getStatus().getCode() : Code.Ok; + if (status == Code.Busy || status == Code.TryAgain) { + return false; + } + throw ex; + } + } + // Remember to change also updateAndGetDelta() if you are modifying this function @SuppressWarnings("DuplicatedCode") @Override @@ -624,85 +617,121 @@ public class LLLocalDictionary implements LLDictionary { throw new UnsupportedOperationException("update() is disallowed"); } try (var tx = db.beginTransaction(EMPTY_WRITE_OPTIONS, DEFAULT_OPTIMISTIC_TX_OPTIONS)) { - var prevDataArray = tx.getForUpdate(EMPTY_READ_OPTIONS, cfh, keyArray, true); - Buffer prevData; - if (prevDataArray != null) { - prevData = BYTE_ARRAY_BUFFERS.allocate(prevDataArray.length); - prevData.writeBytes(prevDataArray); - } else { - prevData = null; - } - try (prevData) { - Buffer prevDataToSendToUpdater; - if (prevData != null) { - prevDataToSendToUpdater = prevData.copy(); - } else { - prevDataToSendToUpdater = null; + boolean committedSuccessfully; + Send sentPrevData; + Send sentCurData; + do { + var prevDataArray = tx.getForUpdate(EMPTY_READ_OPTIONS, cfh, keyArray, true); + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, + "Reading {}: {} (before update)", + LLUtils.toStringSafe(key), + LLUtils.toStringSafe(prevDataArray) + ); } + Buffer prevData; + if (prevDataArray != null) { + prevData = BYTE_ARRAY_BUFFERS.allocate(prevDataArray.length); + prevData.writeBytes(prevDataArray); + } else { + prevData = null; + } + try (prevData) { + Buffer prevDataToSendToUpdater; + if (prevData != null) { + prevDataToSendToUpdater = prevData.copy(); + } else { + prevDataToSendToUpdater = null; + } - @Nullable Buffer newData; - try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) { - try (var newDataToReceive = updater.apply(sentData)) { - if (newDataToReceive != null) { - newData = newDataToReceive.receive(); - } else { - newData = null; + @Nullable Buffer newData; + try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) { + try (var newDataToReceive = updater.apply(sentData)) { + if (newDataToReceive != null) { + newData = newDataToReceive.receive(); + } else { + newData = null; + } } } - } - try (newData) { - var newDataArray = newData == null ? null : LLUtils.toArray(newData); - if (prevData != null && newData == null) { - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key)); - } - tx.delete(cfh, keyArray, true); - commitOptimistically(tx); - } else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) { + try (newData) { + var newDataArray = newData == null ? null : LLUtils.toArray(newData); if (logger.isTraceEnabled()) { logger.trace(MARKER_ROCKSDB, - "Writing {}: {} (after update)", + "Updating {}. previous data: {}, updated data: {}", LLUtils.toStringSafe(key), - LLUtils.toStringSafe(newData) + LLUtils.toStringSafe(prevDataArray), + LLUtils.toStringSafe(newDataArray) ); } - tx.put(cfh, keyArray, newDataArray); - commitOptimistically(tx); - } else { - tx.rollback(); + if (prevData != null && newData == null) { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key)); + } + tx.delete(cfh, keyArray, true); + committedSuccessfully = commitOptimistically(tx); + } else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, + "Writing {}: {} (after update)", + LLUtils.toStringSafe(key), + LLUtils.toStringSafe(newData) + ); + } + tx.put(cfh, keyArray, newDataArray); + committedSuccessfully = commitOptimistically(tx); + } else { + committedSuccessfully = true; + tx.rollback(); + } + sentPrevData = prevData == null ? null : prevData.send(); + sentCurData = newData == null ? null : newData.send(); + if (!committedSuccessfully) { + if (sentPrevData != null) { + sentPrevData.close(); + } + if (sentCurData != null) { + sentCurData.close(); + } + logger.debug(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):" + + " waiting 5ms before retrying", LLUtils.toStringSafe(key)); + // Wait for 5ms + LockSupport.parkNanos(5000000); + } } - return switch (updateReturnMode) { - case GET_NEW_VALUE -> newData != null ? newData.send() : null; - case GET_OLD_VALUE -> prevData != null ? prevData.send() : null; - case NOTHING -> null; - //noinspection UnnecessaryDefault - default -> throw new IllegalArgumentException(); - }; } - } + } while (!committedSuccessfully); + return switch (updateReturnMode) { + case GET_NEW_VALUE -> { + if (sentPrevData != null) { + sentPrevData.close(); + } + yield sentCurData; + } + case GET_OLD_VALUE -> { + if (sentCurData != null) { + sentCurData.close(); + } + yield sentPrevData; + } + case NOTHING -> { + if (sentPrevData != null) { + sentPrevData.close(); + } + if (sentCurData != null) { + sentCurData.close(); + } + yield null; + } + //noinspection UnnecessaryDefault + default -> throw new IllegalArgumentException(); + }; } } }).onErrorMap(cause -> new IOException("Failed to read or write", cause)), keySend -> Mono.fromRunnable(keySend::close)); } - private void commitOptimistically(Transaction tx) throws RocksDBException { - Code commitStatusCode = null; - do { - try { - tx.commit(); - } catch (RocksDBException ex) { - if (ex.getStatus() != null && ex.getStatus().getCode() == Code.TryAgain) { - commitStatusCode = Code.TryAgain; - // Park for maximum 5ms - LockSupport.parkNanos(5000000); - } else { - throw ex; - } - } - } while (commitStatusCode == Code.TryAgain); - } - // Remember to change also update() if you are modifying this function @Override public Mono> updateAndGetDelta(Mono> keyMono, @@ -718,58 +747,91 @@ public class LLLocalDictionary implements LLDictionary { throw new UnsupportedOperationException("update() is disallowed"); } try (var tx = db.beginTransaction(EMPTY_WRITE_OPTIONS, DEFAULT_OPTIMISTIC_TX_OPTIONS)) { - var prevDataArray = tx.getForUpdate(EMPTY_READ_OPTIONS, cfh, keyArray, true); - Buffer prevData; - if (prevDataArray != null) { - prevData = BYTE_ARRAY_BUFFERS.allocate(prevDataArray.length); - prevData.writeBytes(prevDataArray); - } else { - prevData = null; - } - try (prevData) { - Buffer prevDataToSendToUpdater; - if (prevData != null) { - prevDataToSendToUpdater = prevData.copy(); - } else { - prevDataToSendToUpdater = null; + boolean committedSuccessfully; + Send sentPrevData; + Send sentCurData; + do { + var prevDataArray = tx.getForUpdate(EMPTY_READ_OPTIONS, cfh, keyArray, true); + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, + "Reading {}: {} (before update)", + LLUtils.toStringSafe(key), + LLUtils.toStringSafe(prevDataArray) + ); } + Buffer prevData; + if (prevDataArray != null) { + prevData = BYTE_ARRAY_BUFFERS.allocate(prevDataArray.length); + prevData.writeBytes(prevDataArray); + } else { + prevData = null; + } + try (prevData) { + Buffer prevDataToSendToUpdater; + if (prevData != null) { + prevDataToSendToUpdater = prevData.copy(); + } else { + prevDataToSendToUpdater = null; + } - @Nullable Buffer newData; - try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) { - try (var newDataToReceive = updater.apply(sentData)) { - if (newDataToReceive != null) { - newData = newDataToReceive.receive(); - } else { - newData = null; + @Nullable Buffer newData; + try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) { + try (var newDataToReceive = updater.apply(sentData)) { + if (newDataToReceive != null) { + newData = newDataToReceive.receive(); + } else { + newData = null; + } } } - } - try (newData) { - var newDataArray = newData == null ? null : LLUtils.toArray(newData); - if (prevData != null && newData == null) { - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key)); - } - tx.delete(cfh, keyArray, true); - commitOptimistically(tx); - } else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) { + try (newData) { + var newDataArray = newData == null ? null : LLUtils.toArray(newData); if (logger.isTraceEnabled()) { logger.trace(MARKER_ROCKSDB, - "Writing {}: {} (after update)", + "Updating {}. previous data: {}, updated data: {}", LLUtils.toStringSafe(key), - LLUtils.toStringSafe(newData) + LLUtils.toStringSafe(prevDataArray), + LLUtils.toStringSafe(newDataArray) ); } - tx.put(cfh, keyArray, newDataArray); - commitOptimistically(tx); - } else { - tx.rollback(); + if (prevData != null && newData == null) { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key)); + } + tx.delete(cfh, keyArray, true); + committedSuccessfully = commitOptimistically(tx); + } else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, + "Writing {}: {} (after update)", + LLUtils.toStringSafe(key), + LLUtils.toStringSafe(newData) + ); + } + tx.put(cfh, keyArray, newDataArray); + committedSuccessfully = commitOptimistically(tx); + } else { + tx.rollback(); + committedSuccessfully = true; + } + sentPrevData = prevData == null ? null : prevData.send(); + sentCurData = newData == null ? null : newData.send(); + if (!committedSuccessfully) { + if (sentPrevData != null) { + sentPrevData.close(); + } + if (sentCurData != null) { + sentCurData.close(); + } + logger.debug(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):" + + " waiting 5ms before retrying", LLUtils.toStringSafe(key)); + // Wait for 5ms + LockSupport.parkNanos(5000000); + } } - return LLDelta - .of(prevData != null ? prevData.send() : null, newData != null ? newData.send() : null) - .send(); } - } + } while (!committedSuccessfully); + return LLDelta.of(sentPrevData, sentCurData).send(); } } }).onErrorMap(cause -> new IOException("Failed to read or write", cause)), @@ -1215,11 +1277,12 @@ public class LLLocalDictionary implements LLDictionary { ); } - private Flux>> getRangeMultiGrouped(LLSnapshot snapshot, Mono> rangeMono, int prefixLength) { + private Flux>> getRangeMultiGrouped(LLSnapshot snapshot, Mono> rangeMono, + int prefixLength) { return Flux.usingWhen(rangeMono, rangeSend -> Flux.using( () -> new LLLocalGroupedEntryReactiveRocksIterator(db, alloc, cfh, prefixLength, rangeSend, - databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot), "getRangeMultiGrouped"), + databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot), getRangeMultiGroupedDebugName), reactiveRocksIterator -> reactiveRocksIterator.flux().subscribeOn(dbScheduler), LLLocalGroupedReactiveRocksIterator::release ).transform(LLUtils::handleDiscard), @@ -1250,7 +1313,7 @@ public class LLLocalDictionary implements LLDictionary { return Flux.usingWhen(rangeMono, rangeSend -> Flux.using( () -> new LLLocalGroupedKeyReactiveRocksIterator(db, alloc, cfh, prefixLength, rangeSend, - databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot), "getRangeKeysGrouped"), + databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot), getRangeKeysDebugName), reactiveRocksIterator -> reactiveRocksIterator.flux().subscribeOn(dbScheduler), LLLocalGroupedReactiveRocksIterator::release ).transform(LLUtils::handleDiscard), @@ -1306,7 +1369,8 @@ public class LLLocalDictionary implements LLDictionary { } @Override - public Flux> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, Mono> rangeMono, int prefixLength) { + public Flux> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, Mono> rangeMono, + int prefixLength) { return Flux.usingWhen(rangeMono, rangeSend -> Flux .using( @@ -1318,7 +1382,7 @@ public class LLLocalDictionary implements LLDictionary { databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot), true, - "getRangeKeysGrouped" + getRangeKeysGroupedDebugName ), LLLocalKeyPrefixReactiveRocksIterator::flux, LLLocalKeyPrefixReactiveRocksIterator::release @@ -1525,7 +1589,8 @@ public class LLLocalDictionary implements LLDictionary { } else { if (USE_WRITE_BATCHES_IN_SET_RANGE) { return Mono.fromCallable(() -> { - throw new UnsupportedOperationException("Can't use write batches in setRange without window. Please fix params"); + throw new UnsupportedOperationException("Can't use write batches in setRange without window." + + " Please fix the parameters"); }); } return this @@ -1562,14 +1627,16 @@ public class LLLocalDictionary implements LLDictionary { readOpts.setFillCache(false); ReleasableSlice minBound; if (range.hasMin()) { - minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.LOWER, range.getMin()); + minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, + IterateBound.LOWER, range.getMin()); } else { minBound = emptyReleasableSlice(); } try { ReleasableSlice maxBound; if (range.hasMax()) { - maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.UPPER, range.getMax()); + maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, + IterateBound.UPPER, range.getMax()); } else { maxBound = emptyReleasableSlice(); } @@ -1612,7 +1679,8 @@ public class LLLocalDictionary implements LLDictionary { readOpts.setFillCache(false); ReleasableSlice minBound; if (range.hasMin()) { - minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.LOWER, range.getMin()); + minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, + IterateBound.LOWER, range.getMin()); } else { minBound = emptyReleasableSlice(); } @@ -1655,8 +1723,8 @@ public class LLLocalDictionary implements LLDictionary { } @Nullable - private static SafeCloseable rocksIterSeekTo(BufferAllocator alloc, boolean allowNettyDirect, RocksIterator rocksIterator, - Send bufferToReceive) { + private static SafeCloseable rocksIterSeekTo(BufferAllocator alloc, boolean allowNettyDirect, + RocksIterator rocksIterator, Send bufferToReceive) { try (var buffer = bufferToReceive.receive()) { if (allowNettyDirect) { var direct = LLUtils.convertToReadableDirect(alloc, buffer.send()); @@ -1673,8 +1741,8 @@ public class LLLocalDictionary implements LLDictionary { } } - private static ReleasableSlice setIterateBound(BufferAllocator alloc, boolean allowNettyDirect, ReadOptions readOpts, - IterateBound boundType, Send bufferToReceive) { + private static ReleasableSlice setIterateBound(BufferAllocator alloc, boolean allowNettyDirect, + ReadOptions readOpts, IterateBound boundType, Send bufferToReceive) { var buffer = bufferToReceive.receive(); try { requireNonNull(buffer); @@ -1785,7 +1853,7 @@ public class LLLocalDictionary implements LLDictionary { writeBatch.writeToDbAndClose(); - + //noinspection ConstantConditions if (shouldCompactLater) { // Compact range db.suggestCompactRange(cfh); @@ -1814,76 +1882,72 @@ public class LLLocalDictionary implements LLDictionary { @Override public Mono sizeRange(@Nullable LLSnapshot snapshot, Mono> rangeMono, boolean fast) { - return Mono.usingWhen(rangeMono, - rangeSend -> { - return runOnDb(() -> { - try (var range = rangeSend.receive()) { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called sizeRange in a nonblocking thread"); - } - if (range.isAll()) { - return fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot); + return Mono.usingWhen(rangeMono, rangeSend -> runOnDb(() -> { + try (var range = rangeSend.receive()) { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called sizeRange in a nonblocking thread"); + } + if (range.isAll()) { + return fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot); + } else { + try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { + readOpts.setFillCache(false); + readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); + ReleasableSlice minBound; + if (range.hasMin()) { + minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, + IterateBound.LOWER, range.getMin()); + } else { + minBound = emptyReleasableSlice(); + } + try { + ReleasableSlice maxBound; + if (range.hasMax()) { + maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, + IterateBound.UPPER, range.getMax()); } else { - try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { - readOpts.setFillCache(false); - readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); - ReleasableSlice minBound; - if (range.hasMin()) { - minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.LOWER, - range.getMin()); + maxBound = emptyReleasableSlice(); + } + try { + if (fast) { + readOpts.setIgnoreRangeDeletions(true); + + } + try (var rocksIterator = db.newIterator(cfh, readOpts)) { + SafeCloseable seekTo; + if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { + seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), + rocksIterator, range.getMin()); } else { - minBound = emptyReleasableSlice(); + seekTo = null; + rocksIterator.seekToFirst(); } try { - ReleasableSlice maxBound; - if (range.hasMax()) { - maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.UPPER, - range.getMax()); - } else { - maxBound = emptyReleasableSlice(); - } - try { - if (fast) { - readOpts.setIgnoreRangeDeletions(true); - - } - try (var rocksIterator = db.newIterator(cfh, readOpts)) { - SafeCloseable seekTo; - if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), rocksIterator, - range.getMin()); - } else { - seekTo = null; - rocksIterator.seekToFirst(); - } - try { - long i = 0; - rocksIterator.status(); - while (rocksIterator.isValid()) { - rocksIterator.next(); - rocksIterator.status(); - i++; - } - return i; - } finally { - if (seekTo != null) { - seekTo.close(); - } - } - } - } finally { - maxBound.close(); + long i = 0; + rocksIterator.status(); + while (rocksIterator.isValid()) { + rocksIterator.next(); + rocksIterator.status(); + i++; } + return i; } finally { - minBound.close(); + if (seekTo != null) { + seekTo.close(); + } } } + } finally { + maxBound.close(); } + } finally { + minBound.close(); } - }).onErrorMap(cause -> new IOException("Failed to get size of range", cause)); - }, - rangeSend -> Mono.fromRunnable(rangeSend::close) - ); + } + } + } + }).onErrorMap(cause -> new IOException("Failed to get size of range", cause)), + rangeSend -> Mono.fromRunnable(rangeSend::close)); } @Override @@ -1897,23 +1961,24 @@ public class LLLocalDictionary implements LLDictionary { try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { ReleasableSlice minBound; if (range.hasMin()) { - minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.LOWER, - range.getMin()); + minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, + IterateBound.LOWER, range.getMin()); } else { minBound = emptyReleasableSlice(); } try { ReleasableSlice maxBound; if (range.hasMax()) { - maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.UPPER, - range.getMax()); + maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, + IterateBound.UPPER, range.getMax()); } else { maxBound = emptyReleasableSlice(); } try (var rocksIterator = db.newIterator(cfh, readOpts)) { SafeCloseable seekTo; if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), rocksIterator, range.getMin()); + seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), + rocksIterator, range.getMin()); } else { seekTo = null; rocksIterator.seekToFirst(); @@ -1958,23 +2023,24 @@ public class LLLocalDictionary implements LLDictionary { try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { ReleasableSlice minBound; if (range.hasMin()) { - minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.LOWER, - range.getMin()); + minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, + IterateBound.LOWER, range.getMin()); } else { minBound = emptyReleasableSlice(); } try { ReleasableSlice maxBound; if (range.hasMax()) { - maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.UPPER, - range.getMax()); + maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, + IterateBound.UPPER, range.getMax()); } else { maxBound = emptyReleasableSlice(); } try (var rocksIterator = db.newIterator(cfh, readOpts)) { SafeCloseable seekTo; if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), rocksIterator, range.getMin()); + seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), + rocksIterator, range.getMin()); } else { seekTo = null; rocksIterator.seekToFirst(); @@ -2126,23 +2192,24 @@ public class LLLocalDictionary implements LLDictionary { try (var readOpts = new ReadOptions(getReadOptions(null))) { ReleasableSlice minBound; if (range.hasMin()) { - minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.LOWER, - range.getMin()); + minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, + IterateBound.LOWER, range.getMin()); } else { minBound = emptyReleasableSlice(); } try { ReleasableSlice maxBound; if (range.hasMax()) { - maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.UPPER, - range.getMax()); + maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, + IterateBound.UPPER, range.getMax()); } else { maxBound = emptyReleasableSlice(); } try (RocksIterator rocksIterator = db.newIterator(cfh, readOpts)) { SafeCloseable seekTo; if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), rocksIterator, range.getMin()); + seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), + rocksIterator, range.getMin()); } else { seekTo = null; rocksIterator.seekToFirst(); @@ -2177,7 +2244,8 @@ public class LLLocalDictionary implements LLDictionary { } @NotNull - public static Tuple4 getRocksIterator(BufferAllocator alloc, + public static Tuple4 getRocksIterator( + BufferAllocator alloc, boolean allowNettyDirect, ReadOptions readOptions, Send rangeToReceive, @@ -2202,8 +2270,8 @@ public class LLLocalDictionary implements LLDictionary { var rocksIterator = db.newIterator(cfh, readOptions); SafeCloseable seekTo; if (!PREFER_SEEK_TO_FIRST && range.hasMin()) { - seekTo = Objects.requireNonNullElseGet(rocksIterSeekTo(alloc, allowNettyDirect, rocksIterator, range.getMin()), - () -> ((SafeCloseable) () -> {}) + seekTo = Objects.requireNonNullElseGet(rocksIterSeekTo(alloc, allowNettyDirect, + rocksIterator, range.getMin()), () -> ((SafeCloseable) () -> {}) ); } else { seekTo = () -> {}; diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java index d82e570..e60bd84 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java @@ -89,6 +89,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator { } if (firstGroupKey != null) { + assert firstGroupKey.isAccessible(); var groupKeyPrefix = firstGroupKey.copy(firstGroupKey.readerOffset(), prefixLength); assert groupKeyPrefix.isAccessible(); @@ -126,7 +127,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator { tuple.getT3().close(); tuple.getT4().close(); }), - Send::close + resource -> resource.close() ); } diff --git a/src/main/java/it/cavallium/dbengine/database/serialization/BufferDataInput.java b/src/main/java/it/cavallium/dbengine/database/serialization/BufferDataInput.java index 7c65b6d..03d8b6e 100644 --- a/src/main/java/it/cavallium/dbengine/database/serialization/BufferDataInput.java +++ b/src/main/java/it/cavallium/dbengine/database/serialization/BufferDataInput.java @@ -1,144 +1,55 @@ package it.cavallium.dbengine.database.serialization; -import io.net5.buffer.api.Buffer; -import io.net5.buffer.api.Send; -import it.cavallium.dbengine.database.SafeCloseable; import java.io.DataInput; -import java.nio.charset.StandardCharsets; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; -public class BufferDataInput implements DataInput, SafeCloseable { - - @Nullable - private final Buffer buf; - private final int initialReaderOffset; - - public BufferDataInput(@Nullable Send bufferSend) { - this.buf = bufferSend == null ? null : bufferSend.receive().makeReadOnly(); - this.initialReaderOffset = buf == null ? 0 : buf.readerOffset(); - } +public interface BufferDataInput extends DataInput { @Override - public void readFully(byte @NotNull [] b) { - this.readFully(b, 0, b.length); - } + void readFully(byte @NotNull [] b); @Override - public void readFully(byte @NotNull [] b, int off, int len) { - if (buf == null) { - if (len != 0) { - throw new IndexOutOfBoundsException(); - } - } else { - buf.copyInto(buf.readerOffset(), b, off, len); - buf.readerOffset(buf.readerOffset() + len); - } - } + void readFully(byte @NotNull [] b, int off, int len); @Override - public int skipBytes(int n) { - if (buf == null) { - if (n != 0) { - throw new IndexOutOfBoundsException(); - } - return 0; - } else { - n = Math.min(n, buf.readerOffset() - buf.writerOffset()); - buf.readerOffset(buf.readerOffset() + n); - return n; - } - } + int skipBytes(int n); @Override - public boolean readBoolean() { - if (buf == null) throw new IndexOutOfBoundsException(); - return buf.readUnsignedByte() != 0; - } + boolean readBoolean(); @Override - public byte readByte() { - if (buf == null) throw new IndexOutOfBoundsException(); - return buf.readByte(); - } + byte readByte(); @Override - public int readUnsignedByte() { - if (buf == null) throw new IndexOutOfBoundsException(); - return buf.readUnsignedByte(); - } + int readUnsignedByte(); @Override - public short readShort() { - if (buf == null) throw new IndexOutOfBoundsException(); - return buf.readShort(); - } + short readShort(); @Override - public int readUnsignedShort() { - if (buf == null) throw new IndexOutOfBoundsException(); - return buf.readUnsignedShort(); - } + int readUnsignedShort(); @Override - public char readChar() { - if (buf == null) throw new IndexOutOfBoundsException(); - return buf.readChar(); - } + char readChar(); @Override - public int readInt() { - if (buf == null) throw new IndexOutOfBoundsException(); - return buf.readInt(); - } + int readInt(); @Override - public long readLong() { - if (buf == null) throw new IndexOutOfBoundsException(); - return buf.readLong(); - } + long readLong(); @Override - public float readFloat() { - if (buf == null) throw new IndexOutOfBoundsException(); - return buf.readFloat(); - } + float readFloat(); @Override - public double readDouble() { - if (buf == null) throw new IndexOutOfBoundsException(); - return buf.readDouble(); - } + double readDouble(); @Override - public String readLine() { - if (buf == null) throw new IndexOutOfBoundsException(); - throw new UnsupportedOperationException(); - } + String readLine(); @NotNull @Override - public String readUTF() { - if (buf == null) throw new IndexOutOfBoundsException(); - var len = buf.readUnsignedShort(); - byte[] bytes = new byte[len]; - buf.copyInto(buf.readerOffset(), bytes, 0, len); - buf.readerOffset(buf.readerOffset() + len); - return new String(bytes, StandardCharsets.UTF_8); - } + String readUTF(); - @Override - public void close() { - if (buf != null) { - buf.close(); - } - } - - public int getReadBytesCount() { - if (buf == null) { - return 0; - } else { - return buf.readerOffset() - initialReaderOffset; - } - } + int getReadBytesCount(); } diff --git a/src/main/java/it/cavallium/dbengine/database/serialization/BufferDataInputOwned.java b/src/main/java/it/cavallium/dbengine/database/serialization/BufferDataInputOwned.java new file mode 100644 index 0000000..eebd09d --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/serialization/BufferDataInputOwned.java @@ -0,0 +1,145 @@ +package it.cavallium.dbengine.database.serialization; + +import io.net5.buffer.api.Buffer; +import io.net5.buffer.api.Send; +import it.cavallium.dbengine.database.SafeCloseable; +import java.io.DataInput; +import java.nio.charset.StandardCharsets; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +public class BufferDataInputOwned implements SafeCloseable, BufferDataInput { + + @Nullable + private final Buffer buf; + private final int initialReaderOffset; + + public BufferDataInputOwned(@Nullable Send bufferSend) { + this.buf = bufferSend == null ? null : bufferSend.receive().makeReadOnly(); + this.initialReaderOffset = buf == null ? 0 : buf.readerOffset(); + } + + @Override + public void readFully(byte @NotNull [] b) { + this.readFully(b, 0, b.length); + } + + @Override + public void readFully(byte @NotNull [] b, int off, int len) { + if (buf == null) { + if (len != 0) { + throw new IndexOutOfBoundsException(); + } + } else { + buf.copyInto(buf.readerOffset(), b, off, len); + buf.readerOffset(buf.readerOffset() + len); + } + } + + @Override + public int skipBytes(int n) { + if (buf == null) { + if (n != 0) { + throw new IndexOutOfBoundsException(); + } + return 0; + } else { + n = Math.min(n, buf.readerOffset() - buf.writerOffset()); + buf.readerOffset(buf.readerOffset() + n); + return n; + } + } + + @Override + public boolean readBoolean() { + if (buf == null) throw new IndexOutOfBoundsException(); + return buf.readUnsignedByte() != 0; + } + + @Override + public byte readByte() { + if (buf == null) throw new IndexOutOfBoundsException(); + return buf.readByte(); + } + + @Override + public int readUnsignedByte() { + if (buf == null) throw new IndexOutOfBoundsException(); + return buf.readUnsignedByte(); + } + + @Override + public short readShort() { + if (buf == null) throw new IndexOutOfBoundsException(); + return buf.readShort(); + } + + @Override + public int readUnsignedShort() { + if (buf == null) throw new IndexOutOfBoundsException(); + return buf.readUnsignedShort(); + } + + @Override + public char readChar() { + if (buf == null) throw new IndexOutOfBoundsException(); + return buf.readChar(); + } + + @Override + public int readInt() { + if (buf == null) throw new IndexOutOfBoundsException(); + return buf.readInt(); + } + + @Override + public long readLong() { + if (buf == null) throw new IndexOutOfBoundsException(); + return buf.readLong(); + } + + @Override + public float readFloat() { + if (buf == null) throw new IndexOutOfBoundsException(); + return buf.readFloat(); + } + + @Override + public double readDouble() { + if (buf == null) throw new IndexOutOfBoundsException(); + return buf.readDouble(); + } + + @Override + public String readLine() { + if (buf == null) throw new IndexOutOfBoundsException(); + throw new UnsupportedOperationException(); + } + + @NotNull + @Override + public String readUTF() { + if (buf == null) throw new IndexOutOfBoundsException(); + var len = buf.readUnsignedShort(); + byte[] bytes = new byte[len]; + buf.copyInto(buf.readerOffset(), bytes, 0, len); + buf.readerOffset(buf.readerOffset() + len); + return new String(bytes, StandardCharsets.UTF_8); + } + + @Override + public void close() { + if (buf != null) { + buf.close(); + } + } + + @Override + public int getReadBytesCount() { + if (buf == null) { + return 0; + } else { + return buf.readerOffset() - initialReaderOffset; + } + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/serialization/BufferDataInputShared.java b/src/main/java/it/cavallium/dbengine/database/serialization/BufferDataInputShared.java new file mode 100644 index 0000000..33279ee --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/serialization/BufferDataInputShared.java @@ -0,0 +1,137 @@ +package it.cavallium.dbengine.database.serialization; + +import io.net5.buffer.api.Buffer; +import io.net5.buffer.api.Send; +import it.cavallium.dbengine.database.SafeCloseable; +import java.io.DataInput; +import java.nio.charset.StandardCharsets; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +public class BufferDataInputShared implements BufferDataInput { + + @Nullable + private final Buffer buf; + private final int initialReaderOffset; + + public BufferDataInputShared(@Nullable Buffer buffer) { + this.buf = buffer; + this.initialReaderOffset = buf == null ? 0 : buf.readerOffset(); + } + + @Override + public void readFully(byte @NotNull [] b) { + this.readFully(b, 0, b.length); + } + + @Override + public void readFully(byte @NotNull [] b, int off, int len) { + if (buf == null) { + if (len != 0) { + throw new IndexOutOfBoundsException(); + } + } else { + buf.copyInto(buf.readerOffset(), b, off, len); + buf.readerOffset(buf.readerOffset() + len); + } + } + + @Override + public int skipBytes(int n) { + if (buf == null) { + if (n != 0) { + throw new IndexOutOfBoundsException(); + } + return 0; + } else { + n = Math.min(n, buf.readerOffset() - buf.writerOffset()); + buf.readerOffset(buf.readerOffset() + n); + return n; + } + } + + @Override + public boolean readBoolean() { + if (buf == null) throw new IndexOutOfBoundsException(); + return buf.readUnsignedByte() != 0; + } + + @Override + public byte readByte() { + if (buf == null) throw new IndexOutOfBoundsException(); + return buf.readByte(); + } + + @Override + public int readUnsignedByte() { + if (buf == null) throw new IndexOutOfBoundsException(); + return buf.readUnsignedByte(); + } + + @Override + public short readShort() { + if (buf == null) throw new IndexOutOfBoundsException(); + return buf.readShort(); + } + + @Override + public int readUnsignedShort() { + if (buf == null) throw new IndexOutOfBoundsException(); + return buf.readUnsignedShort(); + } + + @Override + public char readChar() { + if (buf == null) throw new IndexOutOfBoundsException(); + return buf.readChar(); + } + + @Override + public int readInt() { + if (buf == null) throw new IndexOutOfBoundsException(); + return buf.readInt(); + } + + @Override + public long readLong() { + if (buf == null) throw new IndexOutOfBoundsException(); + return buf.readLong(); + } + + @Override + public float readFloat() { + if (buf == null) throw new IndexOutOfBoundsException(); + return buf.readFloat(); + } + + @Override + public double readDouble() { + if (buf == null) throw new IndexOutOfBoundsException(); + return buf.readDouble(); + } + + @Override + public String readLine() { + if (buf == null) throw new IndexOutOfBoundsException(); + throw new UnsupportedOperationException(); + } + + @NotNull + @Override + public String readUTF() { + if (buf == null) throw new IndexOutOfBoundsException(); + var len = buf.readUnsignedShort(); + byte[] bytes = new byte[len]; + buf.copyInto(buf.readerOffset(), bytes, 0, len); + buf.readerOffset(buf.readerOffset() + len); + return new String(bytes, StandardCharsets.UTF_8); + } + + public int getReadBytesCount() { + if (buf == null) { + return 0; + } else { + return buf.readerOffset() - initialReaderOffset; + } + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/serialization/Codec.java b/src/main/java/it/cavallium/dbengine/database/serialization/Codec.java index 6e8e474..0b51c48 100644 --- a/src/main/java/it/cavallium/dbengine/database/serialization/Codec.java +++ b/src/main/java/it/cavallium/dbengine/database/serialization/Codec.java @@ -1,7 +1,5 @@ package it.cavallium.dbengine.database.serialization; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import org.jetbrains.annotations.NotNull; diff --git a/src/main/java/it/cavallium/dbengine/database/serialization/CodecSerializer.java b/src/main/java/it/cavallium/dbengine/database/serialization/CodecSerializer.java index a6b86bb..351749e 100644 --- a/src/main/java/it/cavallium/dbengine/database/serialization/CodecSerializer.java +++ b/src/main/java/it/cavallium/dbengine/database/serialization/CodecSerializer.java @@ -11,23 +11,23 @@ import org.warp.commonutils.error.IndexOutOfBoundsException; public class CodecSerializer implements Serializer { - private final BufferAllocator allocator; private final Codecs deserializationCodecs; private final Codec serializationCodec; private final int serializationCodecId; private final boolean microCodecs; + private final int serializedSizeHint; /** * * @param microCodecs if true, allow only codecs with a value from 0 to 255 to save disk space + * @param serializedSizeHint suggested default buffer size, -1 if unknown */ public CodecSerializer( - BufferAllocator allocator, Codecs deserializationCodecs, Codec serializationCodec, int serializationCodecId, - boolean microCodecs) { - this.allocator = allocator; + boolean microCodecs, + int serializedSizeHint) { this.deserializationCodecs = deserializationCodecs; this.serializationCodec = serializationCodec; this.serializationCodecId = serializationCodecId; @@ -35,11 +35,22 @@ public class CodecSerializer implements Serializer { if (microCodecs && (serializationCodecId > 255 || serializationCodecId < 0)) { throw new IndexOutOfBoundsException(serializationCodecId, 0, 255); } + if (serializedSizeHint != -1) { + this.serializedSizeHint = (microCodecs ? Byte.BYTES : Integer.BYTES) + serializedSizeHint; + } else { + this.serializedSizeHint = -1; + } } @Override - public @NotNull DeserializationResult deserialize(@Nullable Send serializedToReceive) { - try (var is = new BufferDataInput(serializedToReceive)) { + public int getSerializedSizeHint() { + return serializedSizeHint; + } + + @Override + public @NotNull A deserialize(@NotNull Buffer serializedBuf) throws SerializationException { + try { + var is = new BufferDataInputShared(serializedBuf); int codecId; if (microCodecs) { codecId = is.readUnsignedByte(); @@ -47,7 +58,7 @@ public class CodecSerializer implements Serializer { codecId = is.readInt(); } var serializer = deserializationCodecs.getCodec(codecId); - return new DeserializationResult<>(serializer.deserialize(is), is.getReadBytesCount()); + return serializer.deserialize(is); } catch (IOException ex) { // This shouldn't happen throw new IOError(ex); @@ -55,16 +66,15 @@ public class CodecSerializer implements Serializer { } @Override - public @NotNull Send serialize(@NotNull A deserialized) { - try (Buffer buf = allocator.allocate(64)) { - var os = new BufferDataOutput(buf); + public void serialize(@NotNull A deserialized, Buffer output) throws SerializationException { + try { + var os = new BufferDataOutput(output); if (microCodecs) { os.writeByte(serializationCodecId); } else { os.writeInt(serializationCodecId); } serializationCodec.serialize(os, deserialized); - return buf.send(); } catch (IOException ex) { // This shouldn't happen throw new IOError(ex); diff --git a/src/main/java/it/cavallium/dbengine/database/serialization/Serializer.java b/src/main/java/it/cavallium/dbengine/database/serialization/Serializer.java index 4ffb437..b8da051 100644 --- a/src/main/java/it/cavallium/dbengine/database/serialization/Serializer.java +++ b/src/main/java/it/cavallium/dbengine/database/serialization/Serializer.java @@ -1,8 +1,10 @@ package it.cavallium.dbengine.database.serialization; +import io.net5.buffer.ByteBufUtil; import io.net5.buffer.api.Buffer; import io.net5.buffer.api.BufferAllocator; import io.net5.buffer.api.Send; +import io.net5.util.internal.StringUtil; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.netty.NullableBuffer; import java.nio.charset.StandardCharsets; @@ -12,56 +14,82 @@ import org.jetbrains.annotations.Nullable; public interface Serializer { - record DeserializationResult(T deserializedData, int bytesRead) {} + /** + * + * @param serialized the serialized data should be split! + */ + @NotNull A deserialize(@NotNull Buffer serialized) throws SerializationException; - @NotNull DeserializationResult deserialize(@NotNull Send serialized) throws SerializationException; + /** + * @param output its writable size will be at least equal to the size hint + */ + void serialize(@NotNull A deserialized, Buffer output) throws SerializationException; - @NotNull Send serialize(@NotNull A deserialized) throws SerializationException; + /** + * @return suggested default buffer size, -1 if unknown + */ + int getSerializedSizeHint(); - Serializer> NOOP_SERIALIZER = new Serializer<>() { + Serializer NOOP_SERIALIZER = new Serializer<>() { @Override - public @NotNull DeserializationResult> deserialize(@NotNull Send serialized) { - try (var serializedBuf = serialized.receive()) { - var readableBytes = serializedBuf.readableBytes(); - return new DeserializationResult<>(serializedBuf.send(), readableBytes); - } + public @NotNull Buffer deserialize(@NotNull Buffer serialized) { + return serialized.split(); } @Override - public @NotNull Send serialize(@NotNull Send deserialized) { - return deserialized; + public void serialize(@NotNull Buffer deserialized, @NotNull Buffer deserializedToReceive) { + deserializedToReceive.ensureWritable(deserialized.readableBytes()); + deserializedToReceive.writeBytes(deserialized); + } + + @Override + public int getSerializedSizeHint() { + return -1; } }; - static Serializer> noop() { - return NOOP_SERIALIZER; - } + Serializer> NOOP_SEND_SERIALIZER = new Serializer<>() { + @Override + public @NotNull Send deserialize(@NotNull Buffer serialized) { + return serialized.split().send(); + } - static Serializer utf8(BufferAllocator allocator) { - return new Serializer<>() { - @Override - public @NotNull DeserializationResult deserialize(@Nullable Send serializedToReceive) { - Objects.requireNonNull(serializedToReceive); - try (Buffer serialized = serializedToReceive.receive()) { - assert serialized.isAccessible(); - int length = serialized.readInt(); - var readerOffset = serialized.readerOffset(); - return new DeserializationResult<>(LLUtils.deserializeString(serialized.send(), - readerOffset, length, StandardCharsets.UTF_8), Integer.BYTES + length); - } + @Override + public void serialize(@NotNull Send deserialized, @NotNull Buffer deserializedToReceive) { + try (var received = deserialized.receive()) { + deserializedToReceive.ensureWritable(received.readableBytes()); + deserializedToReceive.writeBytes(received); } + } - @Override - public @NotNull Send serialize(@NotNull String deserialized) { - var bytes = deserialized.getBytes(StandardCharsets.UTF_8); - try (Buffer buf = allocator.allocate(Integer.BYTES + bytes.length)) { - assert buf.isAccessible(); - buf.writeInt(bytes.length); - buf.writeBytes(bytes); - assert buf.isAccessible(); - return buf.send(); - } + @Override + public int getSerializedSizeHint() { + return -1; + } + }; + + + Serializer UTF8_SERIALIZER = new Serializer<>() { + @Override + public @NotNull String deserialize(@NotNull Buffer serialized) { + assert serialized.isAccessible(); + int length = serialized.readInt(); + try (var strBuf = serialized.readSplit(length)) { + return LLUtils.deserializeString(strBuf, strBuf.readerOffset(), length, StandardCharsets.UTF_8); } - }; - } + } + + @Override + public void serialize(@NotNull String deserialized, Buffer output) { + var bytes = deserialized.getBytes(StandardCharsets.UTF_8); + output.ensureWritable(Integer.BYTES + bytes.length); + output.writeInt(bytes.length); + output.writeBytes(bytes); + } + + @Override + public int getSerializedSizeHint() { + return -1; + } + }; } diff --git a/src/main/java/it/cavallium/dbengine/database/serialization/SerializerFixedBinaryLength.java b/src/main/java/it/cavallium/dbengine/database/serialization/SerializerFixedBinaryLength.java index 78fe656..002ac28 100644 --- a/src/main/java/it/cavallium/dbengine/database/serialization/SerializerFixedBinaryLength.java +++ b/src/main/java/it/cavallium/dbengine/database/serialization/SerializerFixedBinaryLength.java @@ -14,31 +14,33 @@ public interface SerializerFixedBinaryLength extends Serializer { int getSerializedBinaryLength(); - static SerializerFixedBinaryLength> noop(int length) { + @Override + default int getSerializedSizeHint() { + return getSerializedBinaryLength(); + } + + static SerializerFixedBinaryLength noop(int length) { return new SerializerFixedBinaryLength<>() { @Override - public @NotNull DeserializationResult> deserialize(@NotNull Send serialized) { + public @NotNull Buffer deserialize(@NotNull Buffer serialized) { Objects.requireNonNull(serialized); - try (var buf = serialized.receive()) { - if (buf.readableBytes() != getSerializedBinaryLength()) { - throw new IllegalArgumentException( - "Fixed serializer with " + getSerializedBinaryLength() + " bytes has tried to deserialize an element with " - + buf.readableBytes() + " bytes instead"); - } - var readableBytes = buf.readableBytes(); - return new DeserializationResult<>(buf.send(), readableBytes); + if (serialized.readableBytes() < getSerializedBinaryLength()) { + throw new IllegalArgumentException( + "Fixed serializer with " + getSerializedBinaryLength() + " bytes has tried to deserialize an element with " + + serialized.readableBytes() + " bytes instead"); } + return serialized.readSplit(getSerializedBinaryLength()); } @Override - public @NotNull Send serialize(@NotNull Send deserialized) { - try (Buffer buf = deserialized.receive()) { - if (buf.readableBytes() != getSerializedBinaryLength()) { + public void serialize(@NotNull Buffer deserialized, Buffer output) { + try (deserialized) { + if (deserialized.readableBytes() != getSerializedBinaryLength()) { throw new IllegalArgumentException( "Fixed serializer with " + getSerializedBinaryLength() + " bytes has tried to serialize an element with " - + buf.readableBytes() + " bytes instead"); + + deserialized.readableBytes() + " bytes instead"); } - return buf.send(); + output.writeBytes(deserialized); } } @@ -49,39 +51,32 @@ public interface SerializerFixedBinaryLength extends Serializer { }; } - static SerializerFixedBinaryLength utf8(BufferAllocator allocator, int length) { + static SerializerFixedBinaryLength utf8(int length) { return new SerializerFixedBinaryLength<>() { + @Override - public @NotNull DeserializationResult deserialize(@NotNull Send serializedToReceive) - throws SerializationException { - try (var serialized = serializedToReceive.receive()) { - if (serialized.readableBytes() != getSerializedBinaryLength()) { - throw new SerializationException( - "Fixed serializer with " + getSerializedBinaryLength() + " bytes has tried to deserialize an element with " - + serialized.readableBytes() + " bytes instead"); - } - var readerOffset = serialized.readerOffset(); - return new DeserializationResult<>(LLUtils.deserializeString(serialized.send(), - readerOffset, length, StandardCharsets.UTF_8), length); + public @NotNull String deserialize(@NotNull Buffer serialized) throws SerializationException { + if (serialized.readableBytes() < getSerializedBinaryLength()) { + throw new SerializationException( + "Fixed serializer with " + getSerializedBinaryLength() + " bytes has tried to deserialize an element with " + + serialized.readableBytes() + " bytes instead"); } + var readerOffset = serialized.readerOffset(); + return LLUtils.deserializeString(serialized.send(), readerOffset, length, StandardCharsets.UTF_8); } @Override - public @NotNull Send serialize(@NotNull String deserialized) throws SerializationException { - // UTF-8 uses max. 3 bytes per char, so calculate the worst case. - try (Buffer buf = allocator.allocate(LLUtils.utf8MaxBytes(deserialized))) { - assert buf.isAccessible(); - var bytes = deserialized.getBytes(StandardCharsets.UTF_8); - buf.ensureWritable(bytes.length); - buf.writeBytes(bytes); - if (buf.readableBytes() != getSerializedBinaryLength()) { - throw new SerializationException("Fixed serializer with " + getSerializedBinaryLength() - + " bytes has tried to serialize an element with " - + buf.readableBytes() + " bytes instead"); - } - assert buf.isAccessible(); - return buf.send(); + public void serialize(@NotNull String deserialized, Buffer output) throws SerializationException { + assert output.isAccessible(); + var bytes = deserialized.getBytes(StandardCharsets.UTF_8); + output.ensureWritable(bytes.length); + output.writeBytes(bytes); + if (output.readableBytes() < getSerializedBinaryLength()) { + throw new SerializationException("Fixed serializer with " + getSerializedBinaryLength() + + " bytes has tried to serialize an element with " + + output.readableBytes() + " bytes instead"); } + assert output.isAccessible(); } @Override @@ -93,24 +88,21 @@ public interface SerializerFixedBinaryLength extends Serializer { static SerializerFixedBinaryLength intSerializer(BufferAllocator allocator) { return new SerializerFixedBinaryLength<>() { + @Override - public @NotNull DeserializationResult deserialize(@NotNull Send serializedToReceive) { - Objects.requireNonNull(serializedToReceive); - try (var serialized = serializedToReceive.receive()) { - if (serialized.readableBytes() != getSerializedBinaryLength()) { - throw new IllegalArgumentException( - "Fixed serializer with " + getSerializedBinaryLength() + " bytes has tried to deserialize an element with " - + serialized.readableBytes() + " bytes instead"); - } - return new DeserializationResult<>(serialized.readInt(), Integer.BYTES); + public @NotNull Integer deserialize(@NotNull Buffer serialized) { + Objects.requireNonNull(serialized); + if (serialized.readableBytes() < getSerializedBinaryLength()) { + throw new IllegalArgumentException( + "Fixed serializer with " + getSerializedBinaryLength() + " bytes has tried to deserialize an element with " + + serialized.readableBytes() + " bytes instead"); } + return serialized.readInt(); } @Override - public @NotNull Send serialize(@NotNull Integer deserialized) { - try (Buffer buf = allocator.allocate(Integer.BYTES)) { - return buf.writeInt(deserialized).send(); - } + public void serialize(@NotNull Integer deserialized, Buffer output) { + output.writeInt(deserialized); } @Override @@ -122,25 +114,21 @@ public interface SerializerFixedBinaryLength extends Serializer { static SerializerFixedBinaryLength longSerializer(BufferAllocator allocator) { return new SerializerFixedBinaryLength<>() { + @Override - public @NotNull DeserializationResult deserialize(@NotNull Send serializedToReceive) { - Objects.requireNonNull(serializedToReceive); - try (var serialized = serializedToReceive.receive()) { - if (serialized.readableBytes() != getSerializedBinaryLength()) { - throw new IllegalArgumentException( - "Fixed serializer with " + getSerializedBinaryLength() + " bytes has tried to deserialize an element with " - + serialized.readableBytes() + " bytes instead"); - } - var readableBytes = serialized.readableBytes(); - return new DeserializationResult<>(serialized.readLong(), Long.BYTES); + public @NotNull Long deserialize(@NotNull Buffer serialized) { + Objects.requireNonNull(serialized); + if (serialized.readableBytes() < getSerializedBinaryLength()) { + throw new IllegalArgumentException( + "Fixed serializer with " + getSerializedBinaryLength() + " bytes has tried to deserialize an element with " + + serialized.readableBytes() + " bytes instead"); } + return serialized.readLong(); } @Override - public @NotNull Send serialize(@NotNull Long deserialized) { - try (Buffer buf = allocator.allocate(Long.BYTES)) { - return buf.writeLong(deserialized).send(); - } + public void serialize(@NotNull Long deserialized, Buffer output) { + output.writeLong(deserialized); } @Override diff --git a/src/test/java/it/cavallium/dbengine/DbTestUtils.java b/src/test/java/it/cavallium/dbengine/DbTestUtils.java index 2d99dd2..840bc6d 100644 --- a/src/test/java/it/cavallium/dbengine/DbTestUtils.java +++ b/src/test/java/it/cavallium/dbengine/DbTestUtils.java @@ -24,6 +24,7 @@ import it.cavallium.dbengine.database.collections.DatabaseStageMap; import it.cavallium.dbengine.database.collections.SubStageGetterHashMap; import it.cavallium.dbengine.database.collections.SubStageGetterMap; import it.cavallium.dbengine.database.disk.MemorySegmentUtils; +import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import java.nio.file.Path; @@ -154,14 +155,14 @@ public class DbTestUtils { int keyBytes) { if (mapType == MapType.MAP) { return DatabaseMapDictionary.simple(dictionary, - SerializerFixedBinaryLength.utf8(dictionary.getAllocator(), keyBytes), - Serializer.utf8(dictionary.getAllocator()), + SerializerFixedBinaryLength.utf8(keyBytes), + Serializer.UTF8_SERIALIZER, null ); } else { return DatabaseMapDictionaryHashed.simple(dictionary, - Serializer.utf8(dictionary.getAllocator()), - Serializer.utf8(dictionary.getAllocator()), + Serializer.UTF8_SERIALIZER, + Serializer.UTF8_SERIALIZER, s -> (short) s.hashCode(), new SerializerFixedBinaryLength<>() { @Override @@ -170,21 +171,15 @@ public class DbTestUtils { } @Override - public @NotNull DeserializationResult deserialize(@Nullable Send serializedToReceive) { - Objects.requireNonNull(serializedToReceive); - try (var serialized = serializedToReceive.receive()) { - var val = serialized.readShort(); - return new DeserializationResult<>(val, Short.BYTES); - } + public @NotNull Short deserialize(@NotNull Buffer serialized) throws SerializationException { + Objects.requireNonNull(serialized); + var val = serialized.readShort(); + return val; } @Override - public @NotNull Send serialize(@NotNull Short deserialized) { - try (var out = dictionary.getAllocator().allocate(Short.BYTES)) { - out.writeShort(deserialized); - out.writerOffset(Short.BYTES); - return out.send(); - } + public void serialize(@NotNull Short deserialized, Buffer output) throws SerializationException { + output.writeShort(deserialized); } }, null @@ -198,10 +193,10 @@ public class DbTestUtils { int key1Bytes, int key2Bytes) { return DatabaseMapDictionaryDeep.deepTail(dictionary, - SerializerFixedBinaryLength.utf8(dictionary.getAllocator(), key1Bytes), + SerializerFixedBinaryLength.utf8(key1Bytes), key2Bytes, - new SubStageGetterMap<>(SerializerFixedBinaryLength.utf8(dictionary.getAllocator(), key2Bytes), - Serializer.utf8(dictionary.getAllocator()) + new SubStageGetterMap<>(SerializerFixedBinaryLength.utf8(key2Bytes), + Serializer.UTF8_SERIALIZER ), null ); @@ -212,10 +207,10 @@ public class DbTestUtils { LLDictionary dictionary, int key1Bytes) { return DatabaseMapDictionaryDeep.deepTail(dictionary, - SerializerFixedBinaryLength.utf8(dictionary.getAllocator(), key1Bytes), + SerializerFixedBinaryLength.utf8(key1Bytes), Integer.BYTES, - new SubStageGetterHashMap<>(Serializer.utf8(dictionary.getAllocator()), - Serializer.utf8(dictionary.getAllocator()), + new SubStageGetterHashMap<>(Serializer.UTF8_SERIALIZER, + Serializer.UTF8_SERIALIZER, String::hashCode, SerializerFixedBinaryLength.intSerializer(dictionary.getAllocator()) ), @@ -226,8 +221,8 @@ public class DbTestUtils { public static DatabaseMapDictionaryHashed tempDatabaseMapDictionaryHashMap( LLDictionary dictionary) { return DatabaseMapDictionaryHashed.simple(dictionary, - Serializer.utf8(dictionary.getAllocator()), - Serializer.utf8(dictionary.getAllocator()), + Serializer.UTF8_SERIALIZER, + Serializer.UTF8_SERIALIZER, String::hashCode, SerializerFixedBinaryLength.intSerializer(dictionary.getAllocator()), null diff --git a/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java b/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java index ae4d638..b944555 100644 --- a/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java +++ b/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java @@ -224,7 +224,7 @@ public abstract class TestDictionaryMap { .flatMapMany(map -> Flux .concat( map.updateValue(key, old -> { - assert old == null; + Assertions.assertNull(old); return "error?"; }), map.updateValue(key, false, old -> { @@ -624,8 +624,8 @@ public abstract class TestDictionaryMap { .concat( map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()), map.get(null) - .map(Map::entrySet) - .flatMapIterable(list -> list) + .map(Map::entrySet) + .flatMapIterable(list -> list) ) .doFinally(s -> map.close()) ) @@ -707,27 +707,37 @@ public abstract class TestDictionaryMap { @ParameterizedTest @MethodSource("provideArgumentsPutMulti") public void testPutMultiClear(MapType mapType, UpdateMode updateMode, Map entries, boolean shouldFail) { - Step stpVer = StepVerifier - .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) - .map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) - .flatMapMany(map -> Flux - .concat( - map.isEmpty(null), - map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()), - map.isEmpty(null), - map.clear().then(Mono.empty()), - map.isEmpty(null) - ) - .doFinally(s -> map.close()) - ) - .flatMap(val -> shouldFail ? Mono.empty() : Mono.just(val)) - .transform(LLUtils::handleDiscard) - )); - if (shouldFail) { - this.checkLeaks = false; - stpVer.verifyError(); - } else { - stpVer.expectNext(true, entries.isEmpty(), true).verifyComplete(); + List result; + try { + result = SyncUtils.run(DbTestUtils.tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) + .map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) + .flatMapMany(map -> Flux + .concat( + map.isEmpty(null), + map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()), + map.isEmpty(null), + map.clear().then(Mono.empty()), + map.isEmpty(null) + ) + .doFinally(s -> map.close()) + ) + .flatMap(val -> shouldFail ? Mono.empty() : Mono.just(val)) + .transform(LLUtils::handleDiscard) + .collectList() + ).singleOrEmpty()); + } catch (Exception ex) { + if (shouldFail) { + this.checkLeaks = false; + } else { + throw ex; + } + return; } + + Assertions.assertEquals(true, result.get(0)); + + Assertions.assertEquals(entries.isEmpty(), result.get(1)); + + Assertions.assertEquals(true, result.get(2)); } }