package it.cavallium.dbengine.database.collections; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.collections.Joiner.ValueGetter; import it.cavallium.dbengine.database.collections.JoinerBlocking.ValueGetterBlocking; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.function.Function; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @SuppressWarnings("unused") public interface DatabaseStageMap> extends DatabaseStageEntry> { Mono at(@Nullable CompositeSnapshot snapshot, T key); default Mono getValue(@Nullable CompositeSnapshot snapshot, T key) { return this.at(snapshot, key).flatMap(v -> v.get(snapshot)); } default Mono getValueOrDefault(@Nullable CompositeSnapshot snapshot, T key, Mono defaultValue) { return getValue(snapshot, key).switchIfEmpty(defaultValue).single(); } default Mono putValue(T key, U value) { return at(null, key).single().flatMap(v -> v.set(value)); } default Mono putValueAndGetPrevious(T key, U value) { return at(null, key).single().flatMap(v -> v.setAndGetPrevious(value)); } default Mono putValueAndGetStatus(T key, U value) { return at(null, key).single().flatMap(v -> v.setAndGetStatus(value)); } default Mono remove(T key) { return removeAndGetStatus(key).then(); } default Mono removeAndGetPrevious(T key) { return at(null, key).flatMap(DatabaseStage::clearAndGetPrevious); } default Mono removeAndGetStatus(T key) { return removeAndGetPrevious(key).map(o -> true).defaultIfEmpty(false); } default Flux> getMulti(@Nullable CompositeSnapshot snapshot, Flux keys) { return keys.flatMap(key -> this.getValue(snapshot, key).map(value -> Map.entry(key, value))); } default Mono putMulti(Flux> entries) { return entries.flatMap(entry -> this.putValue(entry.getKey(), entry.getValue())).then(); } Flux> getAllStages(@Nullable CompositeSnapshot snapshot); default Flux> getAllValues(@Nullable CompositeSnapshot snapshot) { return this .getAllStages(null) .flatMap(entry -> entry.getValue().get(null).map(value -> Map.entry(entry.getKey(), value))); } default Mono setAllValues(Flux> entries) { return setAllValuesAndGetPrevious(entries).then(); } Flux> setAllValuesAndGetPrevious(Flux> entries); default Mono clear() { return setAllValues(Flux.empty()); } default Mono replaceAllValues(boolean canKeysChange, Function, Mono>> entriesReplacer) { return Mono.defer(() -> { if (canKeysChange) { return this.setAllValues(this.getAllValues(null).flatMap(entriesReplacer)).then(); } else { return this .getAllValues(null) .flatMap(entriesReplacer) .flatMap(replacedEntry -> this .at(null, replacedEntry.getKey()) .map(entry -> entry.set(replacedEntry.getValue()))) .then(); } }); } default Mono replaceAll(Function, Mono> entriesReplacer) { return this .getAllStages(null) .flatMap(entriesReplacer) .then(); } @Override default Mono> setAndGetPrevious(Map value) { return this .setAllValuesAndGetPrevious(Flux.fromIterable(value.entrySet())) .collectMap(Entry::getKey, Entry::getValue, HashMap::new); } @Override default Mono> clearAndGetPrevious() { return this.setAndGetPrevious(Map.of()); } @Override default Mono> get(@Nullable CompositeSnapshot snapshot) { return getAllValues(snapshot) .collectMap(Entry::getKey, Entry::getValue, HashMap::new); } @Override default Mono size(@Nullable CompositeSnapshot snapshot, boolean fast) { return getAllStages(snapshot).count(); } /** * Value getter doesn't lock data. Please make sure to lock before getting data. */ default ValueGetterBlocking getDbValueGetter() { return k -> getValue(null, k).block(); } /** * Value getter doesn't lock data. Please make sure to lock before getting data. */ default ValueGetter getAsyncDbValueGetter() { return k -> getValue(null, k); } }