package it.cavallium.dbengine.database.collections; import io.netty.buffer.ByteBuf; import io.netty.util.ReferenceCounted; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.ExtraKeyOperationResult; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.serialization.BiSerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.function.BiFunction; import java.util.function.Function; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.SynchronousSink; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; /** * Optimized implementation of "DatabaseMapDictionary with SubStageGetterSingle" */ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> { private final Serializer valueSerializer; protected DatabaseMapDictionary(LLDictionary dictionary, ByteBuf prefixKey, SerializerFixedBinaryLength keySuffixSerializer, Serializer valueSerializer) { // Do not retain or release or use the prefixKey here super(dictionary, prefixKey, keySuffixSerializer, new SubStageGetterSingle<>(valueSerializer), 0); this.valueSerializer = valueSerializer; } public static DatabaseMapDictionary simple(LLDictionary dictionary, SerializerFixedBinaryLength keySerializer, Serializer valueSerializer) { return new DatabaseMapDictionary<>(dictionary, dictionary.getAllocator().buffer(0), keySerializer, valueSerializer); } public static DatabaseMapDictionary tail(LLDictionary dictionary, ByteBuf prefixKey, SerializerFixedBinaryLength keySuffixSerializer, Serializer valueSerializer) { return new DatabaseMapDictionary<>(dictionary, prefixKey, keySuffixSerializer, valueSerializer); } private ByteBuf toKey(ByteBuf suffixKey) { try { assert suffixKeyConsistency(suffixKey.readableBytes()); return LLUtils.compositeBuffer(dictionary.getAllocator(), keyPrefix.retain(), suffixKey.retain()); } finally { suffixKey.release(); } } private void deserializeValue(ByteBuf value, SynchronousSink sink) { try { sink.next(valueSerializer.deserialize(value)); } catch (SerializationException ex) { sink.error(ex); } } @Override public Mono> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) { return dictionary .getRange(resolveSnapshot(snapshot), rangeMono, existsAlmostCertainly) .>handle((entry, sink) -> { try { var key = deserializeSuffix(stripPrefix(entry.getKey(), false)); var value = valueSerializer.deserialize(entry.getValue()); sink.next(Map.entry(key, value)); } catch (SerializationException ex) { sink.error(ex); } }) .collectMap(Entry::getKey, Entry::getValue, HashMap::new) .filter(map -> !map.isEmpty()); } @Override public Mono> setAndGetPrevious(Map value) { return this .get(null, false) .concatWith(dictionary.setRange(rangeMono, Flux .fromIterable(Collections.unmodifiableMap(value).entrySet()) .handle((entry, sink) -> { try { sink.next(Map.entry(this.toKey(serializeSuffix(entry.getKey())), valueSerializer.serialize(entry.getValue()))); } catch (SerializationException e) { sink.error(e); } }) ).then(Mono.empty())) .singleOrEmpty() .transform(LLUtils::handleDiscard); } @Override public Mono> clearAndGetPrevious() { return this .setAndGetPrevious(Map.of()); } @Override public Mono leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) { return dictionary.sizeRange(resolveSnapshot(snapshot), rangeMono, fast); } @Override public Mono isEmpty(@Nullable CompositeSnapshot snapshot) { return dictionary.isRangeEmpty(resolveSnapshot(snapshot), rangeMono); } @Override public Mono> at(@Nullable CompositeSnapshot snapshot, T keySuffix) { return Mono .fromCallable(() -> new DatabaseSingleMapped<>( new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix)), Serializer.noop()) , valueSerializer) ); } @Override public Mono getValue(@Nullable CompositeSnapshot snapshot, T keySuffix, boolean existsAlmostCertainly) { return Mono .using( () -> toKey(serializeSuffix(keySuffix)), keyBuf -> dictionary .get(resolveSnapshot(snapshot), LLUtils.lazyRetain(keyBuf), existsAlmostCertainly) .handle(this::deserializeValue), ReferenceCounted::release ); } @Override public Mono putValue(T keySuffix, U value) { return Mono .using( () -> serializeSuffix(keySuffix), keySuffixBuf -> Mono .using( () -> toKey(keySuffixBuf.retain()), keyBuf -> Mono .using(() -> valueSerializer.serialize(value), valueBuf -> dictionary .put(LLUtils.lazyRetain(keyBuf), LLUtils.lazyRetain(valueBuf), LLDictionaryResultType.VOID) .doOnNext(ReferenceCounted::release), ReferenceCounted::release ), ReferenceCounted::release ), ReferenceCounted::release ) .then(); } @Override public Mono getUpdateMode() { return dictionary.getUpdateMode(); } @Override public Mono updateValue(T keySuffix, UpdateReturnMode updateReturnMode, boolean existsAlmostCertainly, SerializationFunction<@Nullable U, @Nullable U> updater) { return Mono .using( () -> toKey(serializeSuffix(keySuffix)), keyBuf -> dictionary .update(LLUtils.lazyRetain(keyBuf), getSerializedUpdater(updater), updateReturnMode, existsAlmostCertainly) .handle(this::deserializeValue), ReferenceCounted::release ); } @Override public Mono> updateValueAndGetDelta(T keySuffix, boolean existsAlmostCertainly, SerializationFunction<@Nullable U, @Nullable U> updater) { return Mono .using( () -> toKey(serializeSuffix(keySuffix)), keyBuf -> dictionary .updateAndGetDelta(LLUtils.lazyRetain(keyBuf), getSerializedUpdater(updater), existsAlmostCertainly) .transform(mono -> LLUtils.mapDelta(mono, valueSerializer::deserialize)), ReferenceCounted::release ); } public SerializationFunction<@Nullable ByteBuf, @Nullable ByteBuf> getSerializedUpdater(SerializationFunction<@Nullable U, @Nullable U> updater) { return oldSerialized -> { try { U result; if (oldSerialized == null) { result = updater.apply(null); } else { result = updater.apply(valueSerializer.deserialize(oldSerialized.retain())); } if (result == null) { return null; } else { return valueSerializer.serialize(result); } } finally { if (oldSerialized != null) { oldSerialized.release(); } } }; } public BiSerializationFunction<@Nullable ByteBuf, X, @Nullable ByteBuf> getSerializedUpdater( BiSerializationFunction<@Nullable U, X, @Nullable U> updater) { return (oldSerialized, extra) -> { try { U result; if (oldSerialized == null) { result = updater.apply(null, extra); } else { result = updater.apply(valueSerializer.deserialize(oldSerialized.retain()), extra); } if (result == null) { return null; } else { return valueSerializer.serialize(result); } } finally { if (oldSerialized != null) { oldSerialized.release(); } } }; } @Override public Mono putValueAndGetPrevious(T keySuffix, U value) { return Mono .using( () -> serializeSuffix(keySuffix), keySuffixBuf -> Mono .using( () -> toKey(keySuffixBuf.retain()), keyBuf -> Mono .using(() -> valueSerializer.serialize(value), valueBuf -> dictionary .put(LLUtils.lazyRetain(keyBuf), LLUtils.lazyRetain(valueBuf), LLDictionaryResultType.PREVIOUS_VALUE) .handle(this::deserializeValue), ReferenceCounted::release ), ReferenceCounted::release ), ReferenceCounted::release ); } @Override public Mono putValueAndGetChanged(T keySuffix, U value) { return Mono .using( () -> serializeSuffix(keySuffix), keySuffixBuf -> Mono .using( () -> toKey(keySuffixBuf.retain()), keyBuf -> Mono .using(() -> valueSerializer.serialize(value), valueBuf -> dictionary .put(LLUtils.lazyRetain(keyBuf), LLUtils.lazyRetain(valueBuf), LLDictionaryResultType.PREVIOUS_VALUE ) .handle(this::deserializeValue) .map(oldValue -> !Objects.equals(oldValue, value)) .defaultIfEmpty(value != null), ReferenceCounted::release ), ReferenceCounted::release ), ReferenceCounted::release ); } @Override public Mono remove(T keySuffix) { return Mono .using( () -> toKey(serializeSuffix(keySuffix)), keyBuf -> dictionary .remove(LLUtils.lazyRetain(keyBuf), LLDictionaryResultType.VOID) .doOnNext(ReferenceCounted::release) .then(), ReferenceCounted::release ); } @Override public Mono removeAndGetPrevious(T keySuffix) { return Mono .using( () -> toKey(serializeSuffix(keySuffix)), keyBuf -> dictionary .remove(LLUtils.lazyRetain(keyBuf), LLDictionaryResultType.PREVIOUS_VALUE) .handle(this::deserializeValue), ReferenceCounted::release ); } @Override public Mono removeAndGetStatus(T keySuffix) { return Mono .using( () -> toKey(serializeSuffix(keySuffix)), keyBuf -> dictionary .remove(LLUtils.lazyRetain(keyBuf), LLDictionaryResultType.PREVIOUS_VALUE_EXISTENCE) .map(LLUtils::responseToBoolean), ReferenceCounted::release ); } @Override public Flux>> getMulti(@Nullable CompositeSnapshot snapshot, Flux keys, boolean existsAlmostCertainly) { return dictionary .getMulti(resolveSnapshot(snapshot), keys.flatMap(keySuffix -> Mono.fromCallable(() -> { ByteBuf keySuffixBuf = serializeSuffix(keySuffix); try { return Tuples.of(keySuffix, toKey(keySuffixBuf.retain())); } finally { keySuffixBuf.release(); } })), existsAlmostCertainly) .flatMapSequential(entry -> { entry.getT2().release(); return Mono.fromCallable(() -> { Optional valueOpt; if (entry.getT3().isPresent()) { valueOpt = Optional.of(valueSerializer.deserialize(entry.getT3().get())); } else { valueOpt = Optional.empty(); } return Map.entry(entry.getT1(), valueOpt); }); }); } private Entry serializeEntry(T key, U value) throws SerializationException { ByteBuf serializedKey = toKey(serializeSuffix(key)); try { ByteBuf serializedValue = valueSerializer.serialize(value); try { return Map.entry(serializedKey.retain(), serializedValue.retain()); } finally { serializedValue.release(); } } finally { serializedKey.release(); } } @Override public Mono putMulti(Flux> entries) { var serializedEntries = entries .flatMap(entry -> Mono .fromCallable(() -> serializeEntry(entry.getKey(), entry.getValue())) .doOnDiscard(Entry.class, uncastedEntry -> { if (uncastedEntry.getKey() instanceof ByteBuf byteBuf) { byteBuf.release(); } if (uncastedEntry.getValue() instanceof ByteBuf byteBuf) { byteBuf.release(); } }) ); return dictionary .putMulti(serializedEntries, false) .then(); } @Override public Flux> updateMulti(Flux> entries, BiSerializationFunction<@Nullable U, X, @Nullable U> updater) { Flux> serializedEntries = entries .flatMap(entry -> Mono .fromCallable(() -> Tuples.of(serializeSuffix(entry.getT1()), entry.getT2())) ) .doOnDiscard(Tuple2.class, uncastedEntry -> { if (uncastedEntry.getT1() instanceof ByteBuf byteBuf) { byteBuf.release(); } if (uncastedEntry.getT2() instanceof ByteBuf byteBuf) { byteBuf.release(); } }); var serializedUpdater = getSerializedUpdater(updater); return dictionary.updateMulti(serializedEntries, serializedUpdater) .handle((result, sink) -> { try { sink.next(new ExtraKeyOperationResult<>(deserializeSuffix(result.key()), result.extra(), result.changed() )); } catch (SerializationException ex) { sink.error(ex); } }); } @Override public Flux>> getAllStages(@Nullable CompositeSnapshot snapshot) { return dictionary .getRangeKeys(resolveSnapshot(snapshot), rangeMono) .handle((key, sink) -> { try { ByteBuf keySuffixWithExt = stripPrefix(key.retain(), false); try { sink.next(Map.entry(deserializeSuffix(keySuffixWithExt.retainedSlice()), new DatabaseSingleMapped<>(new DatabaseSingle<>(dictionary, toKey(keySuffixWithExt.retainedSlice()), Serializer.noop() ), valueSerializer) )); } catch (SerializationException ex) { sink.error(ex); } finally { keySuffixWithExt.release(); } } finally { key.release(); } }); } @Override public Flux> getAllValues(@Nullable CompositeSnapshot snapshot) { return dictionary .getRange(resolveSnapshot(snapshot), rangeMono) .>handle((serializedEntry, sink) -> { try { sink.next(Map.entry( deserializeSuffix(stripPrefix(serializedEntry.getKey(), false)), valueSerializer.deserialize(serializedEntry.getValue()) )); } catch (SerializationException e) { sink.error(e); } }) .doOnDiscard(Entry.class, uncastedEntry -> { if (uncastedEntry.getKey() instanceof ByteBuf byteBuf) { byteBuf.release(); } if (uncastedEntry.getValue() instanceof ByteBuf byteBuf) { byteBuf.release(); } }); } @Override public Flux> setAllValuesAndGetPrevious(Flux> entries) { return Flux.usingWhen( Mono.just(true), b -> getAllValues(null), b -> dictionary.setRange(rangeMono, entries.handle((entry, sink) -> { try { ByteBuf serializedValue = valueSerializer.serialize(entry.getValue()); sink.next(Map.entry(toKey(serializeSuffix(entry.getKey())), serializedValue)); } catch (SerializationException e) { sink.error(e); } })) ); } @Override public Mono clear() { return Mono.defer(() -> { if (range.isAll()) { return dictionary.clear(); } else if (range.isSingle()) { return dictionary .remove(LLUtils.lazyRetain(range.getSingle()), LLDictionaryResultType.VOID) .doOnNext(ReferenceCounted::release) .then(); } else { return dictionary.setRange(rangeMono, Flux.empty()); } }); } }