From 2a2457051296270b1414928a9ece0411f16caff4 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sun, 22 Aug 2021 21:23:22 +0200 Subject: [PATCH] Add checked serialization exception --- .../dbengine/client/MappedSerializer.java | 5 +- .../client/MappedSerializerFixedLength.java | 5 +- .../dbengine/database/LLDictionary.java | 12 +- .../cavallium/dbengine/database/LLUtils.java | 148 +++++++++++------ .../collections/DatabaseMapDictionary.java | 156 ++++++++++++------ .../DatabaseMapDictionaryDeep.java | 22 ++- .../database/collections/DatabaseSingle.java | 23 ++- .../collections/DatabaseSingleBucket.java | 5 +- .../collections/DatabaseSingleMapped.java | 42 +++-- .../database/collections/DatabaseStage.java | 9 +- .../collections/DatabaseStageMap.java | 32 ++-- .../collections/ValueWithHashSerializer.java | 5 +- .../collections/ValuesSetSerializer.java | 9 +- .../database/disk/LLLocalDictionary.java | 8 +- .../database/memory/LLMemoryDictionary.java | 14 +- .../BiSerializationFunction.java | 7 + .../serialization/SerializationException.java | 24 +++ .../serialization/SerializationFunction.java | 9 + .../database/serialization/Serializer.java | 4 +- .../SerializerFixedBinaryLength.java | 5 +- 20 files changed, 375 insertions(+), 169 deletions(-) create mode 100644 src/main/java/it/cavallium/dbengine/database/serialization/BiSerializationFunction.java create mode 100644 src/main/java/it/cavallium/dbengine/database/serialization/SerializationException.java create mode 100644 src/main/java/it/cavallium/dbengine/database/serialization/SerializationFunction.java diff --git a/src/main/java/it/cavallium/dbengine/client/MappedSerializer.java b/src/main/java/it/cavallium/dbengine/client/MappedSerializer.java index c84b1a4..de2ff04 100644 --- a/src/main/java/it/cavallium/dbengine/client/MappedSerializer.java +++ b/src/main/java/it/cavallium/dbengine/client/MappedSerializer.java @@ -1,6 +1,7 @@ package it.cavallium.dbengine.client; import io.netty.buffer.ByteBuf; +import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.Serializer; import org.jetbrains.annotations.NotNull; @@ -16,7 +17,7 @@ public class MappedSerializer implements Serializer { } @Override - public @NotNull B deserialize(@NotNull ByteBuf serialized) { + public @NotNull B deserialize(@NotNull ByteBuf serialized) throws SerializationException { try { return keyMapper.map(serializer.deserialize(serialized.retain())); } finally { @@ -25,7 +26,7 @@ public class MappedSerializer implements Serializer { } @Override - public @NotNull ByteBuf serialize(@NotNull B deserialized) { + public @NotNull ByteBuf serialize(@NotNull B deserialized) throws SerializationException { return serializer.serialize(keyMapper.unmap(deserialized)); } } diff --git a/src/main/java/it/cavallium/dbengine/client/MappedSerializerFixedLength.java b/src/main/java/it/cavallium/dbengine/client/MappedSerializerFixedLength.java index 2c9f9f4..f2d2508 100644 --- a/src/main/java/it/cavallium/dbengine/client/MappedSerializerFixedLength.java +++ b/src/main/java/it/cavallium/dbengine/client/MappedSerializerFixedLength.java @@ -1,6 +1,7 @@ package it.cavallium.dbengine.client; import io.netty.buffer.ByteBuf; +import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import org.jetbrains.annotations.NotNull; @@ -16,7 +17,7 @@ public class MappedSerializerFixedLength implements SerializerFixedBinaryL } @Override - public @NotNull B deserialize(@NotNull ByteBuf serialized) { + public @NotNull B deserialize(@NotNull ByteBuf serialized) throws SerializationException { try { return keyMapper.map(fixedLengthSerializer.deserialize(serialized.retain())); } finally { @@ -25,7 +26,7 @@ public class MappedSerializerFixedLength implements SerializerFixedBinaryL } @Override - public @NotNull ByteBuf serialize(@NotNull B deserialized) { + public @NotNull ByteBuf serialize(@NotNull B deserialized) throws SerializationException { return fixedLengthSerializer.serialize(keyMapper.unmap(deserialized)); } diff --git a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java index 5a15d8c..1caf698 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java @@ -3,6 +3,8 @@ package it.cavallium.dbengine.database; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import it.cavallium.dbengine.client.BadBlock; +import it.cavallium.dbengine.database.serialization.BiSerializationFunction; +import it.cavallium.dbengine.database.serialization.SerializationFunction; import java.util.List; import java.util.Map.Entry; import java.util.Optional; @@ -34,7 +36,7 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure { Mono getUpdateMode(); default Mono update(Mono key, - Function<@Nullable ByteBuf, @Nullable ByteBuf> updater, + SerializationFunction<@Nullable ByteBuf, @Nullable ByteBuf> updater, UpdateReturnMode updateReturnMode, boolean existsAlmostCertainly) { return this @@ -43,17 +45,17 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure { } default Mono update(Mono key, - Function<@Nullable ByteBuf, @Nullable ByteBuf> updater, + SerializationFunction<@Nullable ByteBuf, @Nullable ByteBuf> updater, UpdateReturnMode returnMode) { return update(key, updater, returnMode, false); } Mono> updateAndGetDelta(Mono key, - Function<@Nullable ByteBuf, @Nullable ByteBuf> updater, + SerializationFunction<@Nullable ByteBuf, @Nullable ByteBuf> updater, boolean existsAlmostCertainly); default Mono> updateAndGetDelta(Mono key, - Function<@Nullable ByteBuf, @Nullable ByteBuf> updater) { + SerializationFunction<@Nullable ByteBuf, @Nullable ByteBuf> updater) { return updateAndGetDelta(key, updater, false); } @@ -72,7 +74,7 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure { Flux> putMulti(Flux> entries, boolean getOldValues); Flux> updateMulti(Flux> entries, - BiFunction updateFunction); + BiSerializationFunction updateFunction); Flux> getRange(@Nullable LLSnapshot snapshot, Mono range, boolean existsAlmostCertainly); diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 266bcd0..78478fb 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -7,17 +7,22 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; +import io.netty.util.AbstractReferenceCounted; import io.netty.util.IllegalReferenceCountException; +import io.netty.util.ReferenceCounted; +import it.cavallium.dbengine.database.disk.ReleasableSlice; +import it.cavallium.dbengine.database.serialization.SerializationException; +import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.lucene.RandomSortField; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.ToIntFunction; import org.apache.lucene.document.Document; @@ -37,6 +42,8 @@ import org.apache.lucene.search.SortedNumericSortField; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.rocksdb.RocksDB; +import org.warp.commonutils.functional.IOFunction; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @SuppressWarnings("unused") @@ -94,16 +101,12 @@ public class LLUtils { } public static ScoreMode toScoreMode(LLScoreMode scoreMode) { - switch (scoreMode) { - case COMPLETE: - return ScoreMode.COMPLETE; - case TOP_SCORES: - return ScoreMode.TOP_SCORES; - case COMPLETE_NO_SCORES: - return ScoreMode.COMPLETE_NO_SCORES; - default: - throw new IllegalStateException("Unexpected value: " + scoreMode); - } + return switch (scoreMode) { + case COMPLETE -> ScoreMode.COMPLETE; + case TOP_SCORES -> ScoreMode.TOP_SCORES; + case COMPLETE_NO_SCORES -> ScoreMode.COMPLETE_NO_SCORES; + default -> throw new IllegalStateException("Unexpected value: " + scoreMode); + }; } public static Term toTerm(LLTerm term) { @@ -143,25 +146,18 @@ public class LLUtils { } private static IndexableField toField(LLItem item) { - switch (item.getType()) { - case IntPoint: - return new IntPoint(item.getName(), Ints.fromByteArray(item.getData())); - case LongPoint: - return new LongPoint(item.getName(), Longs.fromByteArray(item.getData())); - case FloatPoint: - return new FloatPoint(item.getName(), ByteBuffer.wrap(item.getData()).getFloat()); - case TextField: - return new TextField(item.getName(), item.stringValue(), Field.Store.NO); - case TextFieldStored: - return new TextField(item.getName(), item.stringValue(), Field.Store.YES); - case SortedNumericDocValuesField: - return new SortedNumericDocValuesField(item.getName(), Longs.fromByteArray(item.getData())); - case StringField: - return new StringField(item.getName(), item.stringValue(), Field.Store.NO); - case StringFieldStored: - return new StringField(item.getName(), item.stringValue(), Field.Store.YES); - } - throw new UnsupportedOperationException("Unsupported field type"); + return switch (item.getType()) { + case IntPoint -> new IntPoint(item.getName(), Ints.fromByteArray(item.getData())); + case LongPoint -> new LongPoint(item.getName(), Longs.fromByteArray(item.getData())); + case FloatPoint -> new FloatPoint(item.getName(), ByteBuffer.wrap(item.getData()).getFloat()); + case TextField -> new TextField(item.getName(), item.stringValue(), Field.Store.NO); + case TextFieldStored -> new TextField(item.getName(), item.stringValue(), Field.Store.YES); + case SortedNumericDocValuesField -> new SortedNumericDocValuesField(item.getName(), + Longs.fromByteArray(item.getData()) + ); + case StringField -> new StringField(item.getName(), item.stringValue(), Field.Store.NO); + case StringFieldStored -> new StringField(item.getName(), item.stringValue(), Field.Store.YES); + }; } public static it.cavallium.dbengine.database.LLKeyScore toKeyScore(LLKeyScore hit) { @@ -442,48 +438,50 @@ public class LLUtils { public static Mono resolveDelta(Mono> prev, UpdateReturnMode updateReturnMode) { return prev.handle((delta, sink) -> { switch (updateReturnMode) { - case GET_NEW_VALUE: + case GET_NEW_VALUE -> { var current = delta.current(); if (current != null) { sink.next(current); } else { sink.complete(); } - break; - case GET_OLD_VALUE: + } + case GET_OLD_VALUE -> { var previous = delta.previous(); if (previous != null) { sink.next(previous); } else { sink.complete(); } - break; - case NOTHING: - sink.complete(); - break; - default: - sink.error(new IllegalStateException()); + } + case NOTHING -> sink.complete(); + default -> sink.error(new IllegalStateException()); } }); } - public static Mono> mapDelta(Mono> mono, Function<@NotNull T, @Nullable U> mapper) { - return mono.map(delta -> { - T prev = delta.previous(); - T curr = delta.current(); - U newPrev; - U newCurr; - if (prev != null) { - newPrev = mapper.apply(prev); - } else { - newPrev = null; + public static Mono> mapDelta(Mono> mono, + SerializationFunction<@NotNull T, @Nullable U> mapper) { + return mono.handle((delta, sink) -> { + try { + T prev = delta.previous(); + T curr = delta.current(); + U newPrev; + U newCurr; + if (prev != null) { + newPrev = mapper.apply(prev); + } else { + newPrev = null; + } + if (curr != null) { + newCurr = mapper.apply(curr); + } else { + newCurr = null; + } + sink.next(new Delta<>(newPrev, newCurr)); + } catch (SerializationException ex) { + sink.error(ex); } - if (curr != null) { - newCurr = mapper.apply(curr); - } else { - newCurr = null; - } - return new Delta<>(newPrev, newCurr); }); } @@ -514,4 +512,44 @@ public class LLUtils { return false; }); } + + public static Mono handleDiscard(Mono mono) { + return mono.doOnDiscard(Map.Entry.class, e -> { + if (e.getKey() instanceof ByteBuf bb) { + if (bb.refCnt() > 0) { + bb.release(); + } + } + if (e.getValue() instanceof ByteBuf bb) { + if (bb.refCnt() > 0) { + bb.release(); + } + } + }); + } + + public static Flux handleDiscard(Flux mono) { + return mono + .doOnDiscard(ReferenceCounted.class, LLUtils::discardRefCounted) + .doOnDiscard(Map.Entry.class, LLUtils::discardEntry); + } + + private static void discardEntry(Map.Entry e) { + if (e.getKey() instanceof ByteBuf bb) { + if (bb.refCnt() > 0) { + bb.release(); + } + } + if (e.getValue() instanceof ByteBuf bb) { + if (bb.refCnt() > 0) { + bb.release(); + } + } + } + + private static void discardRefCounted(ReferenceCounted referenceCounted) { + if (referenceCounted.refCnt() > 0) { + referenceCounted.release(); + } + } } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index 72b2988..5e7c821 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -10,6 +10,9 @@ import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateReturnMode; +import it.cavallium.dbengine.database.serialization.BiSerializationFunction; +import it.cavallium.dbengine.database.serialization.SerializationException; +import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import java.util.Collections; @@ -23,6 +26,7 @@ import java.util.function.Function; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.SynchronousSink; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; @@ -64,28 +68,48 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep sink) { + try { + sink.next(valueSerializer.deserialize(value)); + } catch (SerializationException ex) { + sink.error(ex); + } + } + @Override public Mono> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) { return dictionary .getRange(resolveSnapshot(snapshot), rangeMono, existsAlmostCertainly) - .collectMap( - entry -> deserializeSuffix(stripPrefix(entry.getKey(), false)), - entry -> valueSerializer.deserialize(entry.getValue()), - HashMap::new) + .>handle((entry, sink) -> { + try { + var key = deserializeSuffix(stripPrefix(entry.getKey(), false)); + var value = valueSerializer.deserialize(entry.getValue()); + sink.next(Map.entry(key, value)); + } catch (SerializationException ex) { + sink.error(ex); + } + }) + .collectMap(Entry::getKey, Entry::getValue, HashMap::new) .filter(map -> !map.isEmpty()); } @Override public Mono> setAndGetPrevious(Map value) { - return Mono.usingWhen( - Mono.just(true), - b -> get(null, false), - b -> dictionary.setRange(rangeMono, Flux + return this + .get(null, false) + .concatWith(dictionary.setRange(rangeMono, Flux .fromIterable(Collections.unmodifiableMap(value).entrySet()) - .map(entry -> Map.entry(this.toKey(serializeSuffix(entry.getKey())), - valueSerializer.serialize(entry.getValue()))) - ) - ); + .handle((entry, sink) -> { + try { + sink.next(Map.entry(this.toKey(serializeSuffix(entry.getKey())), + valueSerializer.serialize(entry.getValue()))); + } catch (SerializationException e) { + sink.error(e); + } + }) + ).then(Mono.empty())) + .singleOrEmpty() + .transform(LLUtils::handleDiscard); } @Override @@ -107,7 +131,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> at(@Nullable CompositeSnapshot snapshot, T keySuffix) { return Mono - .fromSupplier(() -> new DatabaseSingleMapped<>( + .fromCallable(() -> new DatabaseSingleMapped<>( new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix)), Serializer.noop()) , valueSerializer) ); @@ -120,7 +144,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep toKey(serializeSuffix(keySuffix)), keyBuf -> dictionary .get(resolveSnapshot(snapshot), LLUtils.lazyRetain(keyBuf), existsAlmostCertainly) - .map(valueSerializer::deserialize), + .handle(this::deserializeValue), ReferenceCounted::release ); } @@ -158,13 +182,13 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep updateValue(T keySuffix, UpdateReturnMode updateReturnMode, boolean existsAlmostCertainly, - Function<@Nullable U, @Nullable U> updater) { + SerializationFunction<@Nullable U, @Nullable U> updater) { return Mono .using( () -> toKey(serializeSuffix(keySuffix)), keyBuf -> dictionary .update(LLUtils.lazyRetain(keyBuf), getSerializedUpdater(updater), updateReturnMode, existsAlmostCertainly) - .map(valueSerializer::deserialize), + .handle(this::deserializeValue), ReferenceCounted::release ); } @@ -172,7 +196,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> updateValueAndGetDelta(T keySuffix, boolean existsAlmostCertainly, - Function<@Nullable U, @Nullable U> updater) { + SerializationFunction<@Nullable U, @Nullable U> updater) { return Mono .using( () -> toKey(serializeSuffix(keySuffix)), @@ -183,10 +207,15 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep getSerializedUpdater(Function<@Nullable U, @Nullable U> updater) { + public SerializationFunction<@Nullable ByteBuf, @Nullable ByteBuf> getSerializedUpdater(SerializationFunction<@Nullable U, @Nullable U> updater) { return oldSerialized -> { try { - var result = updater.apply(oldSerialized == null ? null : valueSerializer.deserialize(oldSerialized.retain())); + U result; + if (oldSerialized == null) { + result = updater.apply(null); + } else { + result = updater.apply(valueSerializer.deserialize(oldSerialized.retain())); + } if (result == null) { return null; } else { @@ -200,10 +229,16 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep BiFunction<@Nullable ByteBuf, X, @Nullable ByteBuf> getSerializedUpdater(BiFunction<@Nullable U, X, @Nullable U> updater) { + public BiSerializationFunction<@Nullable ByteBuf, X, @Nullable ByteBuf> getSerializedUpdater( + BiSerializationFunction<@Nullable U, X, @Nullable U> updater) { return (oldSerialized, extra) -> { try { - var result = updater.apply(oldSerialized == null ? null : valueSerializer.deserialize(oldSerialized.retain()), extra); + U result; + if (oldSerialized == null) { + result = updater.apply(null, extra); + } else { + result = updater.apply(valueSerializer.deserialize(oldSerialized.retain()), extra); + } if (result == null) { return null; } else { @@ -231,7 +266,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep extends DatabaseMapDictionaryDeep !Objects.equals(oldValue, value)) .defaultIfEmpty(value != null), ReferenceCounted::release @@ -286,7 +321,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep toKey(serializeSuffix(keySuffix)), keyBuf -> dictionary .remove(LLUtils.lazyRetain(keyBuf), LLDictionaryResultType.PREVIOUS_VALUE) - .map(valueSerializer::deserialize), + .handle(this::deserializeValue), ReferenceCounted::release ); } @@ -316,11 +351,19 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep { entry.getT2().release(); - return Mono.fromCallable(() -> Map.entry(entry.getT1(), entry.getT3().map(valueSerializer::deserialize))); + return Mono.fromCallable(() -> { + Optional valueOpt; + if (entry.getT3().isPresent()) { + valueOpt = Optional.of(valueSerializer.deserialize(entry.getT3().get())); + } else { + valueOpt = Optional.empty(); + } + return Map.entry(entry.getT1(), valueOpt); + }); }); } - private Entry serializeEntry(T key, U value) { + private Entry serializeEntry(T key, U value) throws SerializationException { ByteBuf serializedKey = toKey(serializeSuffix(key)); try { ByteBuf serializedValue = valueSerializer.serialize(value); @@ -355,7 +398,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep Flux> updateMulti(Flux> entries, - BiFunction<@Nullable U, X, @Nullable U> updater) { + BiSerializationFunction<@Nullable U, X, @Nullable U> updater) { Flux> serializedEntries = entries .flatMap(entry -> Mono .fromCallable(() -> Tuples.of(serializeSuffix(entry.getT1()), entry.getT2())) @@ -370,26 +413,34 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep new ExtraKeyOperationResult<>(deserializeSuffix(result.key()), - result.extra(), - result.changed() - )); + .handle((result, sink) -> { + try { + sink.next(new ExtraKeyOperationResult<>(deserializeSuffix(result.key()), + result.extra(), + result.changed() + )); + } catch (SerializationException ex) { + sink.error(ex); + } + }); } @Override public Flux>> getAllStages(@Nullable CompositeSnapshot snapshot) { return dictionary .getRangeKeys(resolveSnapshot(snapshot), rangeMono) - .map(key -> { + .handle((key, sink) -> { ByteBuf keySuffixWithExt = stripPrefix(key.retain(), false); try { try { - return Map.entry(deserializeSuffix(keySuffixWithExt.retainedSlice()), + sink.next(Map.entry(deserializeSuffix(keySuffixWithExt.retainedSlice()), new DatabaseSingleMapped<>(new DatabaseSingle<>(dictionary, toKey(keySuffixWithExt.retainedSlice()), Serializer.noop() ), valueSerializer) - ); + )); + } catch (SerializationException ex) { + sink.error(ex); } finally { keySuffixWithExt.release(); } @@ -403,10 +454,16 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> getAllValues(@Nullable CompositeSnapshot snapshot) { return dictionary .getRange(resolveSnapshot(snapshot), rangeMono) - .map(serializedEntry -> Map.entry( - deserializeSuffix(stripPrefix(serializedEntry.getKey(), false)), - valueSerializer.deserialize(serializedEntry.getValue()) - )) + .>handle((serializedEntry, sink) -> { + try { + sink.next(Map.entry( + deserializeSuffix(stripPrefix(serializedEntry.getKey(), false)), + valueSerializer.deserialize(serializedEntry.getValue()) + )); + } catch (SerializationException e) { + sink.error(e); + } + }) .doOnDiscard(Entry.class, uncastedEntry -> { if (uncastedEntry.getKey() instanceof ByteBuf byteBuf) { byteBuf.release(); @@ -419,17 +476,18 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> setAllValuesAndGetPrevious(Flux> entries) { - return Flux - .usingWhen( - Mono.just(true), - b -> getAllValues(null), - b -> dictionary - .setRange(rangeMono, - entries.map(entry -> Map.entry(toKey(serializeSuffix(entry.getKey())), - valueSerializer.serialize(entry.getValue()) - )) - ) - ); + return Flux.usingWhen( + Mono.just(true), + b -> getAllValues(null), + b -> dictionary.setRange(rangeMono, entries.handle((entry, sink) -> { + try { + ByteBuf serializedValue = valueSerializer.serialize(entry.getValue()); + sink.next(Map.entry(toKey(serializeSuffix(entry.getKey())), serializedValue)); + } catch (SerializationException e) { + sink.error(e); + } + })) + ); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index 0691eae..9831b7d 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -13,6 +13,7 @@ import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.disk.LLLocalDictionary; +import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import java.util.Collection; import java.util.List; @@ -457,7 +458,14 @@ public class DatabaseMapDictionaryDeep> implem LLUtils.lazyRetain(buffers.groupKeyWithoutExt), Flux.fromIterable(rangeKeys).map(ByteBuf::retain) ) - .map(us -> Map.entry(this.deserializeSuffix(buffers.groupSuffix.retain()), us)) + .>handle((us, sink) -> { + try { + var deserializedSuffix = this.deserializeSuffix(buffers.groupSuffix.retain()); + sink.next(Map.entry(deserializedSuffix, us)); + } catch (SerializationException ex) { + sink.error(ex); + } + }) ), buffers -> { buffers.groupSuffix.release(); @@ -494,7 +502,13 @@ public class DatabaseMapDictionaryDeep> implem LLUtils.lazyRetain(groupKeyWithoutExt), Flux.empty() ) - .map(us -> Map.entry(this.deserializeSuffix(groupSuffix.retain()), us)), + .>handle((us, sink) -> { + try { + sink.next(Map.entry(this.deserializeSuffix(groupSuffix.retain()), us)); + } catch (SerializationException ex) { + sink.error(ex); + } + }), ReferenceCounted::release ) ); @@ -543,7 +557,7 @@ public class DatabaseMapDictionaryDeep> implem } //todo: temporary wrapper. convert the whole class to buffers - protected T deserializeSuffix(ByteBuf keySuffix) { + protected T deserializeSuffix(ByteBuf keySuffix) throws SerializationException { try { assert suffixKeyConsistency(keySuffix.readableBytes()); var result = keySuffixSerializer.deserialize(keySuffix.retain()); @@ -555,7 +569,7 @@ public class DatabaseMapDictionaryDeep> implem } //todo: temporary wrapper. convert the whole class to buffers - protected ByteBuf serializeSuffix(T keySuffix) { + protected ByteBuf serializeSuffix(T keySuffix) throws SerializationException { ByteBuf suffixData = keySuffixSerializer.serialize(keySuffix); assert suffixKeyConsistency(suffixData.readableBytes()); assert keyPrefix.refCnt() > 0; diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java index 628e186..f0e56f5 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java @@ -11,11 +11,14 @@ import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateReturnMode; +import it.cavallium.dbengine.database.serialization.SerializationException; +import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.database.serialization.Serializer; import java.util.function.Function; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.SynchronousSink; public class DatabaseSingle implements DatabaseStageEntry { @@ -43,11 +46,19 @@ public class DatabaseSingle implements DatabaseStageEntry { } } + private void deserializeValue(ByteBuf value, SynchronousSink sink) { + try { + sink.next(serializer.deserialize(value)); + } catch (SerializationException ex) { + sink.error(ex); + } + } + @Override public Mono get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) { return dictionary .get(resolveSnapshot(snapshot), keyMono, existsAlmostCertainly) - .map(serializer::deserialize); + .handle(this::deserializeValue); } @Override @@ -56,13 +67,13 @@ public class DatabaseSingle implements DatabaseStageEntry { .using(() -> serializer.serialize(value), valueByteBuf -> dictionary .put(keyMono, LLUtils.lazyRetain(valueByteBuf), LLDictionaryResultType.PREVIOUS_VALUE) - .map(serializer::deserialize), + .handle(this::deserializeValue), ReferenceCounted::release ); } @Override - public Mono update(Function<@Nullable U, @Nullable U> updater, + public Mono update(SerializationFunction<@Nullable U, @Nullable U> updater, UpdateReturnMode updateReturnMode, boolean existsAlmostCertainly) { return dictionary @@ -74,11 +85,11 @@ public class DatabaseSingle implements DatabaseStageEntry { return serializer.serialize(result); } }, updateReturnMode, existsAlmostCertainly) - .map(serializer::deserialize); + .handle(this::deserializeValue); } @Override - public Mono> updateAndGetDelta(Function<@Nullable U, @Nullable U> updater, + public Mono> updateAndGetDelta(SerializationFunction<@Nullable U, @Nullable U> updater, boolean existsAlmostCertainly) { return dictionary .updateAndGetDelta(keyMono, (oldValueSer) -> { @@ -95,7 +106,7 @@ public class DatabaseSingle implements DatabaseStageEntry { public Mono clearAndGetPrevious() { return dictionary .remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE) - .map(serializer::deserialize); + .handle(this::deserializeValue); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java index daef325..a60934d 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java @@ -6,6 +6,7 @@ import it.cavallium.dbengine.database.Column; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateReturnMode; +import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.unimi.dsi.fastutil.objects.ObjectArraySet; import it.unimi.dsi.fastutil.objects.ObjectSets; import java.util.HashSet; @@ -58,7 +59,7 @@ public class DatabaseSingleBucket implements DatabaseStageEntry { } @Override - public Mono update(Function<@Nullable V, @Nullable V> updater, + public Mono update(SerializationFunction<@Nullable V, @Nullable V> updater, UpdateReturnMode updateReturnMode, boolean existsAlmostCertainly) { return bucketStage @@ -76,7 +77,7 @@ public class DatabaseSingleBucket implements DatabaseStageEntry { } @Override - public Mono> updateAndGetDelta(Function<@Nullable V, @Nullable V> updater, boolean existsAlmostCertainly) { + public Mono> updateAndGetDelta(SerializationFunction<@Nullable V, @Nullable V> updater, boolean existsAlmostCertainly) { return bucketStage .updateAndGetDelta(oldBucket -> { V oldValue = extractValue(oldBucket); diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java index 9b72d44..fb38a13 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java @@ -1,15 +1,19 @@ package it.cavallium.dbengine.database.collections; +import io.netty.buffer.ByteBuf; import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateReturnMode; +import it.cavallium.dbengine.database.serialization.SerializationException; +import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.database.serialization.Serializer; import java.util.function.Function; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.SynchronousSink; @SuppressWarnings("unused") public class DatabaseSingleMapped implements DatabaseStageEntry { @@ -22,33 +26,49 @@ public class DatabaseSingleMapped implements DatabaseStageEntry { this.serializer = serializer; } + private void deserializeSink(B value, SynchronousSink sink) { + try { + sink.next(this.deserialize(value)); + } catch (SerializationException ex) { + sink.error(ex); + } + } + @Override public Mono get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) { - return serializedSingle.get(snapshot, existsAlmostCertainly).map(this::deserialize); + return serializedSingle.get(snapshot, existsAlmostCertainly).handle(this::deserializeSink); } @Override public Mono getOrDefault(@Nullable CompositeSnapshot snapshot, Mono defaultValue) { - return serializedSingle.get(snapshot).map(this::deserialize).switchIfEmpty(defaultValue); + return serializedSingle.get(snapshot).handle(this::deserializeSink).switchIfEmpty(defaultValue); } @Override public Mono set(A value) { - return serializedSingle.set(serialize(value)); + return Mono + .fromCallable(() -> serialize(value)) + .flatMap(serializedSingle::set); } @Override public Mono setAndGetPrevious(A value) { - return serializedSingle.setAndGetPrevious(serialize(value)).map(this::deserialize); + return Mono + .fromCallable(() -> serialize(value)) + .flatMap(serializedSingle::setAndGetPrevious) + .handle(this::deserializeSink); } @Override public Mono setAndGetChanged(A value) { - return serializedSingle.setAndGetChanged(serialize(value)).single(); + return Mono + .fromCallable(() -> serialize(value)) + .flatMap(serializedSingle::setAndGetChanged) + .single(); } @Override - public Mono update(Function<@Nullable A, @Nullable A> updater, + public Mono update(SerializationFunction<@Nullable A, @Nullable A> updater, UpdateReturnMode updateReturnMode, boolean existsAlmostCertainly) { return serializedSingle.update(oldValue -> { @@ -58,11 +78,11 @@ public class DatabaseSingleMapped implements DatabaseStageEntry { } else { return this.serialize(result); } - }, updateReturnMode, existsAlmostCertainly).map(this::deserialize); + }, updateReturnMode, existsAlmostCertainly).handle(this::deserializeSink); } @Override - public Mono> updateAndGetDelta(Function<@Nullable A, @Nullable A> updater, + public Mono> updateAndGetDelta(SerializationFunction<@Nullable A, @Nullable A> updater, boolean existsAlmostCertainly) { return serializedSingle.updateAndGetDelta(oldValue -> { var result = updater.apply(oldValue == null ? null : this.deserialize(oldValue)); @@ -81,7 +101,7 @@ public class DatabaseSingleMapped implements DatabaseStageEntry { @Override public Mono clearAndGetPrevious() { - return serializedSingle.clearAndGetPrevious().map(this::deserialize); + return serializedSingle.clearAndGetPrevious().handle(this::deserializeSink); } @Override @@ -120,12 +140,12 @@ public class DatabaseSingleMapped implements DatabaseStageEntry { } //todo: temporary wrapper. convert the whole class to buffers - private A deserialize(B bytes) { + private A deserialize(B bytes) throws SerializationException { return serializer.deserialize(bytes); } //todo: temporary wrapper. convert the whole class to buffers - private B serialize(A bytes) { + private B serialize(A bytes) throws SerializationException { return serializer.serialize(bytes); } } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java index a1cae09..acd160b 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java @@ -5,6 +5,7 @@ import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateReturnMode; +import it.cavallium.dbengine.database.serialization.SerializationFunction; import java.util.Objects; import java.util.function.Function; import org.jetbrains.annotations.Nullable; @@ -44,7 +45,7 @@ public interface DatabaseStage extends DatabaseStageWithEntry { .switchIfEmpty(Mono.fromSupplier(() -> value != null)); } - default Mono update(Function<@Nullable T, @Nullable T> updater, + default Mono update(SerializationFunction<@Nullable T, @Nullable T> updater, UpdateReturnMode updateReturnMode, boolean existsAlmostCertainly) { return this @@ -52,14 +53,14 @@ public interface DatabaseStage extends DatabaseStageWithEntry { .transform(prev -> LLUtils.resolveDelta(prev, updateReturnMode)); } - default Mono update(Function<@Nullable T, @Nullable T> updater, UpdateReturnMode updateReturnMode) { + default Mono update(SerializationFunction<@Nullable T, @Nullable T> updater, UpdateReturnMode updateReturnMode) { return update(updater, updateReturnMode, false); } - Mono> updateAndGetDelta(Function<@Nullable T, @Nullable T> updater, + Mono> updateAndGetDelta(SerializationFunction<@Nullable T, @Nullable T> updater, boolean existsAlmostCertainly); - default Mono> updateAndGetDelta(Function<@Nullable T, @Nullable T> updater) { + default Mono> updateAndGetDelta(SerializationFunction<@Nullable T, @Nullable T> updater) { return updateAndGetDelta(updater, false); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java index 3c2f726..6e3bed9 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java @@ -7,6 +7,9 @@ import it.cavallium.dbengine.database.KeyOperationResult; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateReturnMode; +import it.cavallium.dbengine.database.serialization.BiSerializationFunction; +import it.cavallium.dbengine.database.serialization.SerializationException; +import it.cavallium.dbengine.database.serialization.SerializationFunction; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; @@ -58,7 +61,7 @@ public interface DatabaseStageMap> extends Dat default Mono updateValue(T key, UpdateReturnMode updateReturnMode, boolean existsAlmostCertainly, - Function<@Nullable U, @Nullable U> updater) { + SerializationFunction<@Nullable U, @Nullable U> updater) { return Mono.usingWhen( this.at(null, key).single(), stage -> stage.update(updater, updateReturnMode, existsAlmostCertainly), @@ -66,7 +69,8 @@ public interface DatabaseStageMap> extends Dat ); } - default Flux> updateMulti(Flux> entries, BiFunction<@Nullable U, X, @Nullable U> updater) { + default Flux> updateMulti(Flux> entries, + BiSerializationFunction<@Nullable U, X, @Nullable U> updater) { return entries .flatMapSequential(entry -> this .updateValue(entry.getT1(), prevValue -> updater.apply(prevValue, entry.getT2())) @@ -74,21 +78,21 @@ public interface DatabaseStageMap> extends Dat ); } - default Mono updateValue(T key, UpdateReturnMode updateReturnMode, Function<@Nullable U, @Nullable U> updater) { + default Mono updateValue(T key, UpdateReturnMode updateReturnMode, SerializationFunction<@Nullable U, @Nullable U> updater) { return updateValue(key, updateReturnMode, false, updater); } - default Mono updateValue(T key, Function<@Nullable U, @Nullable U> updater) { + default Mono updateValue(T key, SerializationFunction<@Nullable U, @Nullable U> updater) { return updateValueAndGetDelta(key, false, updater).map(LLUtils::isDeltaChanged).single(); } - default Mono updateValue(T key, boolean existsAlmostCertainly, Function<@Nullable U, @Nullable U> updater) { + default Mono updateValue(T key, boolean existsAlmostCertainly, SerializationFunction<@Nullable U, @Nullable U> updater) { return updateValueAndGetDelta(key, existsAlmostCertainly, updater).map(LLUtils::isDeltaChanged).single(); } default Mono> updateValueAndGetDelta(T key, boolean existsAlmostCertainly, - Function<@Nullable U, @Nullable U> updater) { + SerializationFunction<@Nullable U, @Nullable U> updater) { return Mono.usingWhen( this.at(null, key).single(), stage -> stage.updateAndGetDelta(updater, existsAlmostCertainly), @@ -96,7 +100,7 @@ public interface DatabaseStageMap> extends Dat ); } - default Mono> updateValueAndGetDelta(T key, Function<@Nullable U, @Nullable U> updater) { + default Mono> updateValueAndGetDelta(T key, SerializationFunction<@Nullable U, @Nullable U> updater) { return updateValueAndGetDelta(key, false, updater); } @@ -221,7 +225,7 @@ public interface DatabaseStageMap> extends Dat } @Override - default Mono>> updateAndGetDelta(Function<@Nullable Map, @Nullable Map> updater, + default Mono>> updateAndGetDelta(SerializationFunction<@Nullable Map, @Nullable Map> updater, boolean existsAlmostCertainly) { return this .getUpdateMode() @@ -236,11 +240,15 @@ public interface DatabaseStageMap> extends Dat if (v.isEmpty()) { v = null; } - var result = updater.apply(v); - if (result != null && result.isEmpty()) { - result = null; + try { + var result = updater.apply(v); + if (result != null && result.isEmpty()) { + result = null; + } + sink.next(Tuples.of(Optional.ofNullable(v), Optional.ofNullable(result))); + } catch (SerializationException ex) { + sink.error(ex); } - sink.next(Tuples.of(Optional.ofNullable(v), Optional.ofNullable(result))); }) .flatMap(result -> Mono .justOrEmpty(result.getT2()) diff --git a/src/main/java/it/cavallium/dbengine/database/collections/ValueWithHashSerializer.java b/src/main/java/it/cavallium/dbengine/database/collections/ValueWithHashSerializer.java index bb09c0c..5d29c8c 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/ValueWithHashSerializer.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/ValueWithHashSerializer.java @@ -3,6 +3,7 @@ package it.cavallium.dbengine.database.collections; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.Serializer; import java.util.Map; import java.util.Map.Entry; @@ -23,7 +24,7 @@ class ValueWithHashSerializer implements Serializer, ByteBuf> } @Override - public @NotNull Entry deserialize(@NotNull ByteBuf serialized) { + public @NotNull Entry deserialize(@NotNull ByteBuf serialized) throws SerializationException { try { X deserializedKey = keySuffixSerializer.deserialize(serialized.retain()); Y deserializedValue = valueSerializer.deserialize(serialized.retain()); @@ -34,7 +35,7 @@ class ValueWithHashSerializer implements Serializer, ByteBuf> } @Override - public @NotNull ByteBuf serialize(@NotNull Entry deserialized) { + public @NotNull ByteBuf serialize(@NotNull Entry deserialized) throws SerializationException { ByteBuf keySuffix = keySuffixSerializer.serialize(deserialized.getKey()); try { ByteBuf value = valueSerializer.serialize(deserialized.getValue()); diff --git a/src/main/java/it/cavallium/dbengine/database/collections/ValuesSetSerializer.java b/src/main/java/it/cavallium/dbengine/database/collections/ValuesSetSerializer.java index d6d10cb..df71bc5 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/ValuesSetSerializer.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/ValuesSetSerializer.java @@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.collections; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.Serializer; import it.unimi.dsi.fastutil.objects.ObjectArraySet; import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet; @@ -23,7 +24,7 @@ class ValuesSetSerializer implements Serializer, ByteBuf> { } @Override - public @NotNull ObjectArraySet deserialize(@NotNull ByteBuf serialized) { + public @NotNull ObjectArraySet deserialize(@NotNull ByteBuf serialized) throws SerializationException { try { int entriesLength = serialized.readInt(); ArrayList deserializedElements = new ArrayList<>(entriesLength); @@ -38,18 +39,18 @@ class ValuesSetSerializer implements Serializer, ByteBuf> { } @Override - public @NotNull ByteBuf serialize(@NotNull ObjectArraySet deserialized) { + public @NotNull ByteBuf serialize(@NotNull ObjectArraySet deserialized) throws SerializationException { ByteBuf output = allocator.buffer(); try { output.writeInt(deserialized.size()); - deserialized.forEach((entry) -> { + for (X entry : deserialized) { ByteBuf serialized = entrySerializer.serialize(entry); try { output.writeBytes(serialized); } finally { serialized.release(); } - }); + } return output.retain(); } finally { output.release(); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index e5dfb0c..7e45c65 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -18,6 +18,8 @@ import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.RepeatedElementList; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateReturnMode; +import it.cavallium.dbengine.database.serialization.BiSerializationFunction; +import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.unimi.dsi.fastutil.ints.IntArrayList; import java.io.IOException; import java.nio.ByteBuffer; @@ -567,7 +569,7 @@ public class LLLocalDictionary implements LLDictionary { @SuppressWarnings("DuplicatedCode") @Override public Mono update(Mono keyMono, - Function<@Nullable ByteBuf, @Nullable ByteBuf> updater, + SerializationFunction<@Nullable ByteBuf, @Nullable ByteBuf> updater, UpdateReturnMode updateReturnMode, boolean existsAlmostCertainly) { return Mono.usingWhen(keyMono, @@ -700,7 +702,7 @@ public class LLLocalDictionary implements LLDictionary { @SuppressWarnings("DuplicatedCode") @Override public Mono> updateAndGetDelta(Mono keyMono, - Function<@Nullable ByteBuf, @Nullable ByteBuf> updater, + SerializationFunction<@Nullable ByteBuf, @Nullable ByteBuf> updater, boolean existsAlmostCertainly) { return Mono.usingWhen(keyMono, key -> this.runOnDb(() -> { @@ -1111,7 +1113,7 @@ public class LLLocalDictionary implements LLDictionary { @Override public Flux> updateMulti(Flux> entries, - BiFunction updateFunction) { + BiSerializationFunction updateFunction) { return entries .buffer(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP)) .flatMapSequential(ew -> Flux diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java index 8010fed..e1c2d9c 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java @@ -11,6 +11,9 @@ import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateMode; +import it.cavallium.dbengine.database.serialization.BiSerializationFunction; +import it.cavallium.dbengine.database.serialization.SerializationException; +import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.unimi.dsi.fastutil.bytes.ByteList; import java.io.IOException; import java.util.List; @@ -165,7 +168,7 @@ public class LLMemoryDictionary implements LLDictionary { @Override public Mono> updateAndGetDelta(Mono keyMono, - Function<@Nullable ByteBuf, @Nullable ByteBuf> updater, + SerializationFunction<@Nullable ByteBuf, @Nullable ByteBuf> updater, boolean existsAlmostCertainly) { return Mono.usingWhen(keyMono, key -> Mono.fromCallable(() -> { @@ -174,7 +177,12 @@ public class LLMemoryDictionary implements LLDictionary { if (old != null) { oldRef.set(kk(old)); } - var v = updater.apply(old != null ? kk(old) : null); + ByteBuf v = null; + try { + v = updater.apply(old != null ? kk(old) : null); + } catch (SerializationException e) { + throw new IllegalStateException(e); + } try { return k(v); } finally { @@ -258,7 +266,7 @@ public class LLMemoryDictionary implements LLDictionary { @Override public Flux> updateMulti(Flux> entries, - BiFunction updateFunction) { + BiSerializationFunction updateFunction) { return Flux.error(new UnsupportedOperationException("Not implemented")); } diff --git a/src/main/java/it/cavallium/dbengine/database/serialization/BiSerializationFunction.java b/src/main/java/it/cavallium/dbengine/database/serialization/BiSerializationFunction.java new file mode 100644 index 0000000..9f22762 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/serialization/BiSerializationFunction.java @@ -0,0 +1,7 @@ +package it.cavallium.dbengine.database.serialization; + +@FunctionalInterface +public interface BiSerializationFunction { + + U apply(T1 argument1, T2 argument2) throws SerializationException; +} diff --git a/src/main/java/it/cavallium/dbengine/database/serialization/SerializationException.java b/src/main/java/it/cavallium/dbengine/database/serialization/SerializationException.java new file mode 100644 index 0000000..6330782 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/serialization/SerializationException.java @@ -0,0 +1,24 @@ +package it.cavallium.dbengine.database.serialization; + +import java.io.IOException; +import java.io.UncheckedIOException; + +public class SerializationException extends Exception { + + public SerializationException() { + super(); + } + + public SerializationException(String message) { + super(message); + } + + public SerializationException(String message, Throwable cause) { + super(message, cause); + } + + public SerializationException(Throwable cause) { + super(cause); + } + +} diff --git a/src/main/java/it/cavallium/dbengine/database/serialization/SerializationFunction.java b/src/main/java/it/cavallium/dbengine/database/serialization/SerializationFunction.java new file mode 100644 index 0000000..30f258f --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/serialization/SerializationFunction.java @@ -0,0 +1,9 @@ +package it.cavallium.dbengine.database.serialization; + +import org.jetbrains.annotations.NotNull; + +@FunctionalInterface +public interface SerializationFunction { + + U apply(T argument) throws SerializationException; +} diff --git a/src/main/java/it/cavallium/dbengine/database/serialization/Serializer.java b/src/main/java/it/cavallium/dbengine/database/serialization/Serializer.java index ba5b758..c6323a4 100644 --- a/src/main/java/it/cavallium/dbengine/database/serialization/Serializer.java +++ b/src/main/java/it/cavallium/dbengine/database/serialization/Serializer.java @@ -9,9 +9,9 @@ import org.jetbrains.annotations.NotNull; public interface Serializer { - @NotNull A deserialize(@NotNull B serialized); + @NotNull A deserialize(@NotNull B serialized) throws SerializationException; - @NotNull B serialize(@NotNull A deserialized); + @NotNull B serialize(@NotNull A deserialized) throws SerializationException; Serializer NOOP_SERIALIZER = new Serializer<>() { @Override diff --git a/src/main/java/it/cavallium/dbengine/database/serialization/SerializerFixedBinaryLength.java b/src/main/java/it/cavallium/dbengine/database/serialization/SerializerFixedBinaryLength.java index 426fc41..f1db643 100644 --- a/src/main/java/it/cavallium/dbengine/database/serialization/SerializerFixedBinaryLength.java +++ b/src/main/java/it/cavallium/dbengine/database/serialization/SerializerFixedBinaryLength.java @@ -8,7 +8,6 @@ import io.netty.buffer.ByteBufUtil; import io.netty.buffer.PooledByteBufAllocator; import java.io.NotSerializableException; import java.nio.charset.StandardCharsets; -import org.apache.commons.lang3.SerializationException; import org.jetbrains.annotations.NotNull; @SuppressWarnings("unused") @@ -53,7 +52,7 @@ public interface SerializerFixedBinaryLength extends Serializer { static SerializerFixedBinaryLength utf8(ByteBufAllocator allocator, int length) { return new SerializerFixedBinaryLength<>() { @Override - public @NotNull String deserialize(@NotNull ByteBuf serialized) { + public @NotNull String deserialize(@NotNull ByteBuf serialized) throws SerializationException { try { if (serialized.readableBytes() != getSerializedBinaryLength()) { throw new SerializationException( @@ -69,7 +68,7 @@ public interface SerializerFixedBinaryLength extends Serializer { } @Override - public @NotNull ByteBuf serialize(@NotNull String deserialized) { + public @NotNull ByteBuf serialize(@NotNull String deserialized) throws SerializationException { // UTF-8 uses max. 3 bytes per char, so calculate the worst case. ByteBuf buf = allocator.buffer(ByteBufUtil.utf8MaxBytes(deserialized)); try {