package it.cavallium.dbengine.database.collections; import io.net5.buffer.api.Buffer; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.serialization.KVSerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.function.Function; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; @SuppressWarnings("unused") public interface DatabaseStageMap> extends DatabaseStageEntry> { Mono at(@Nullable CompositeSnapshot snapshot, T key); default Mono getValue(@Nullable CompositeSnapshot snapshot, T key, boolean existsAlmostCertainly) { return LLUtils.usingResource(, key), stage -> stage.get(snapshot, existsAlmostCertainly), true); } default Mono getValue(@Nullable CompositeSnapshot snapshot, T key) { return getValue(snapshot, key, false); } default Mono getValueOrDefault(@Nullable CompositeSnapshot snapshot, T key, Mono defaultValue) { return getValue(snapshot, key).switchIfEmpty(defaultValue).single(); } default Mono putValue(T key, U value) { return LLUtils.usingResource(at(null, key).single(), stage -> stage.set(value), true); } Mono getUpdateMode(); default Mono updateValue(T key, UpdateReturnMode updateReturnMode, boolean existsAlmostCertainly, SerializationFunction<@Nullable U, @Nullable U> updater) { return LLUtils.usingResource(, key).single(), stage -> stage.update(updater, updateReturnMode, existsAlmostCertainly), true); } default Flux updateMulti(Flux keys, KVSerializationFunction updater) { return keys.flatMapSequential(key -> this.updateValue(key, prevValue -> updater.apply(key, prevValue))); } default Mono updateValue(T key, UpdateReturnMode updateReturnMode, SerializationFunction<@Nullable U, @Nullable U> updater) { return updateValue(key, updateReturnMode, false, updater); } default Mono updateValue(T key, SerializationFunction<@Nullable U, @Nullable U> updater) { return updateValueAndGetDelta(key, false, updater).map(LLUtils::isDeltaChanged).single(); } default Mono updateValue(T key, boolean existsAlmostCertainly, SerializationFunction<@Nullable U, @Nullable U> updater) { return updateValueAndGetDelta(key, existsAlmostCertainly, updater).map(LLUtils::isDeltaChanged).single(); } default Mono> updateValueAndGetDelta(T key, boolean existsAlmostCertainly, SerializationFunction<@Nullable U, @Nullable U> updater) { return LLUtils.usingResource(, key).single(), stage -> stage.updateAndGetDelta(updater, existsAlmostCertainly), true); } default Mono> updateValueAndGetDelta(T key, SerializationFunction<@Nullable U, @Nullable U> updater) { return updateValueAndGetDelta(key, false, updater); } default Mono putValueAndGetPrevious(T key, U value) { return LLUtils.usingResource(at(null, key).single(), stage -> stage.setAndGetPrevious(value), true); } /** * @return true if the key was associated with any value, false if the key didn't exist. */ default Mono putValueAndGetChanged(T key, U value) { return LLUtils.usingResource(at(null, key).single(), stage -> stage.setAndGetChanged(value), true).single(); } default Mono remove(T key) { return removeAndGetStatus(key).then(); } default Mono removeAndGetPrevious(T key) { return LLUtils.usingResource(at(null, key), DatabaseStage::clearAndGetPrevious, true); } default Mono removeAndGetStatus(T key) { return removeAndGetPrevious(key).map(o -> true).defaultIfEmpty(false); } /** * GetMulti must return the elements in sequence! */ default Flux> getMulti(@Nullable CompositeSnapshot snapshot, Flux keys, boolean existsAlmostCertainly) { return keys .flatMapSequential(key -> this.getValue(snapshot, key, existsAlmostCertainly)) .map(Optional::of) .defaultIfEmpty(Optional.empty()) .doOnDiscard(Entry.class, unknownEntry -> { if (unknownEntry.getValue() instanceof Optional optionalBuffer && optionalBuffer.isPresent() && optionalBuffer.get() instanceof Buffer buffer) { buffer.close(); } }); } /** * GetMulti must return the elements in sequence! */ default Flux> getMulti(@Nullable CompositeSnapshot snapshot, Flux keys) { return getMulti(snapshot, keys, false); } default Mono putMulti(Flux> entries) { return entries.flatMap(entry -> this.putValue(entry.getKey(), entry.getValue())).then(); } Flux> getAllStages(@Nullable CompositeSnapshot snapshot); default Flux> getAllValues(@Nullable CompositeSnapshot snapshot) { return this .getAllStages(snapshot) .flatMapSequential(stage -> stage .getValue() .get(snapshot, true) .map(value -> Map.entry(stage.getKey(), value)) .doFinally(s -> stage.getValue().close()) ); } default Mono setAllValues(Flux> entries) { return setAllValuesAndGetPrevious(entries).then(); } Flux> setAllValuesAndGetPrevious(Flux> entries); default Mono clear() { return setAllValues(Flux.empty()); } default Mono replaceAllValues(boolean canKeysChange, Function, Mono>> entriesReplacer) { if (canKeysChange) { return this.setAllValues(this.getAllValues(null).flatMap(entriesReplacer)).then(); } else { return this .getAllValues(null) .flatMap(entriesReplacer) .flatMap(replacedEntry -> this .at(null, replacedEntry.getKey()) .flatMap(stage -> stage .set(replacedEntry.getValue()) .doFinally(s -> stage.close()) ) ) .then(); } } default Mono replaceAll(Function, Mono> entriesReplacer) { return this .getAllStages(null) .flatMap(stage -> entriesReplacer.apply(stage) .doFinally(s -> stage.getValue().close()) ) .then(); } @Override default Mono> setAndGetPrevious(Map value) { return this .setAllValuesAndGetPrevious(Flux.fromIterable(Map.copyOf(value).entrySet())) .collectMap(Entry::getKey, Entry::getValue, HashMap::new) .filter(map -> !map.isEmpty()); } @Override default Mono setAndGetChanged(Map value) { return this .setAndGetPrevious(value) .map(oldValue -> !Objects.equals(oldValue, value.isEmpty() ? null : value)) .switchIfEmpty(Mono.fromSupplier(() -> !value.isEmpty())); } @Override default Mono>> updateAndGetDelta(SerializationFunction<@Nullable Map, @Nullable Map> updater, boolean existsAlmostCertainly) { return this .getUpdateMode() .single() .flatMap(updateMode -> { if (updateMode == UpdateMode.ALLOW_UNSAFE) { return this .getAllValues(null) .collectMap(Entry::getKey, Entry::getValue, HashMap::new) .single() .>, Optional>>>handle((v, sink) -> { if (v.isEmpty()) { v = null; } try { var result = updater.apply(v); if (result != null && result.isEmpty()) { result = null; }, Optional.ofNullable(result))); } catch (SerializationException ex) { sink.error(ex); } }) .flatMap(result -> Mono .justOrEmpty(result.getT2()) .flatMap(values -> this.setAllValues(Flux.fromIterable(Map.copyOf(values).entrySet()))) .thenReturn(new Delta<>(result.getT1().orElse(null), result.getT2().orElse(null))) ); } else if (updateMode == UpdateMode.ALLOW) { return Mono.fromCallable(() -> { throw new UnsupportedOperationException("Maps can't be updated atomically"); }); } else if (updateMode == UpdateMode.DISALLOW) { return Mono.fromCallable(() -> { throw new UnsupportedOperationException("Map can't be updated because updates are disabled"); }); } else { return Mono.fromCallable(() -> { throw new UnsupportedOperationException("Unknown update mode: " + updateMode); }); } }); } @Override default Mono> clearAndGetPrevious() { return this.setAndGetPrevious(Map.of()); } @Override default Mono> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) { return this .getAllValues(snapshot) .collectMap(Entry::getKey, Entry::getValue, HashMap::new) .filter(map -> !map.isEmpty()); } @Override default Mono leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) { return this .getAllStages(snapshot) .doOnNext(stage -> stage.getValue().close()) .count(); } /** * Value getter doesn't lock data. Please make sure to lock before getting data. */ default ValueGetterBlocking getDbValueGetter(@Nullable CompositeSnapshot snapshot) { return k -> getValue(snapshot, k).block(); } default ValueGetter getAsyncDbValueGetter(@Nullable CompositeSnapshot snapshot) { return k -> getValue(snapshot, k); } default ValueTransformer getAsyncDbValueTransformer(@Nullable CompositeSnapshot snapshot) { return keys -> { var sharedKeys = keys.publish().refCount(2); var values = getMulti(snapshot, sharedKeys); return, values, Map::entry); }; } }