UpdateMode is now a "blocking" method

This commit is contained in:
Andrea Cavalli 2022-09-12 20:14:56 +02:00
parent f9c2f7ca31
commit f739f4f9f4
9 changed files with 52 additions and 56 deletions

View File

@ -25,7 +25,7 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
Mono<Buffer> put(Mono<Buffer> key, Mono<Buffer> value, LLDictionaryResultType resultType); Mono<Buffer> put(Mono<Buffer> key, Mono<Buffer> value, LLDictionaryResultType resultType);
Mono<UpdateMode> getUpdateMode(); UpdateMode getUpdateMode();
default Mono<Buffer> update(Mono<Buffer> key, default Mono<Buffer> update(Mono<Buffer> key,
BinarySerializationFunction updater, BinarySerializationFunction updater,

View File

@ -285,7 +285,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
} }
@Override @Override
public Mono<UpdateMode> getUpdateMode() { public UpdateMode getUpdateMode() {
return dictionary.getUpdateMode(); return dictionary.getUpdateMode();
} }

View File

@ -303,7 +303,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
} }
@Override @Override
public Mono<UpdateMode> getUpdateMode() { public UpdateMode getUpdateMode() {
return dictionary.getUpdateMode(); return dictionary.getUpdateMode();
} }

View File

@ -48,7 +48,8 @@ public class DatabaseMapDictionaryHashed<T, U, TH> extends SimpleResource implem
Serializer<U> valueSerializer, Serializer<U> valueSerializer,
Function<T, TH> keySuffixHashFunction, Function<T, TH> keySuffixHashFunction,
SerializerFixedBinaryLength<TH> keySuffixHashSerializer) { SerializerFixedBinaryLength<TH> keySuffixHashSerializer) {
if (dictionary.getUpdateMode().transform(LLUtils::handleDiscard).block() != UpdateMode.ALLOW) { var updateMode = dictionary.getUpdateMode();
if (updateMode != UpdateMode.ALLOW) {
throw new IllegalArgumentException("Hashed maps only works when UpdateMode is ALLOW"); throw new IllegalArgumentException("Hashed maps only works when UpdateMode is ALLOW");
} }
this.alloc = dictionary.getAllocator(); this.alloc = dictionary.getAllocator();
@ -61,7 +62,6 @@ public class DatabaseMapDictionaryHashed<T, U, TH> extends SimpleResource implem
this.keySuffixHashFunction = keySuffixHashFunction; this.keySuffixHashFunction = keySuffixHashFunction;
} }
@SuppressWarnings({"unchecked", "rawtypes"})
private DatabaseMapDictionaryHashed(BufferAllocator alloc, private DatabaseMapDictionaryHashed(BufferAllocator alloc,
Function<T, TH> keySuffixHashFunction, Function<T, TH> keySuffixHashFunction,
DatabaseStage<Object2ObjectSortedMap<TH, ObjectArraySet<Entry<T, U>>>> subDictionary, DatabaseStage<Object2ObjectSortedMap<TH, ObjectArraySet<Entry<T, U>>>> subDictionary,
@ -177,7 +177,7 @@ public class DatabaseMapDictionaryHashed<T, U, TH> extends SimpleResource implem
} }
@Override @Override
public Mono<UpdateMode> getUpdateMode() { public UpdateMode getUpdateMode() {
return subDictionary.getUpdateMode(); return subDictionary.getUpdateMode();
} }

View File

@ -51,7 +51,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends
return Mono.usingWhen(at(null, key).single(), stage -> stage.set(value), LLUtils::finalizeResource); return Mono.usingWhen(at(null, key).single(), stage -> stage.set(value), LLUtils::finalizeResource);
} }
Mono<UpdateMode> getUpdateMode(); UpdateMode getUpdateMode();
default Mono<U> updateValue(T key, default Mono<U> updateValue(T key,
UpdateReturnMode updateReturnMode, UpdateReturnMode updateReturnMode,
@ -192,49 +192,45 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends
@Override @Override
default Mono<Delta<Object2ObjectSortedMap<T, U>>> updateAndGetDelta(SerializationFunction<@Nullable Object2ObjectSortedMap<T, U>, @Nullable Object2ObjectSortedMap<T, U>> updater) { default Mono<Delta<Object2ObjectSortedMap<T, U>>> updateAndGetDelta(SerializationFunction<@Nullable Object2ObjectSortedMap<T, U>, @Nullable Object2ObjectSortedMap<T, U>> updater) {
return this var updateMode = this.getUpdateMode();
.getUpdateMode() if (updateMode == UpdateMode.ALLOW_UNSAFE) {
.single() return this
.flatMap(updateMode -> { .getAllValues(null, true)
if (updateMode == UpdateMode.ALLOW_UNSAFE) { .collectMap(Entry::getKey, Entry::getValue, Object2ObjectLinkedOpenHashMap::new)
return this .map(map -> (Object2ObjectSortedMap<T, U>) map)
.getAllValues(null, true) .single()
.collectMap(Entry::getKey, Entry::getValue, Object2ObjectLinkedOpenHashMap::new) .<Tuple2<Optional<Object2ObjectSortedMap<T, U>>, Optional<Object2ObjectSortedMap<T, U>>>>handle((v, sink) -> {
.map(map -> (Object2ObjectSortedMap<T, U>) map) if (v.isEmpty()) {
.single() v = null;
.<Tuple2<Optional<Object2ObjectSortedMap<T, U>>, Optional<Object2ObjectSortedMap<T, U>>>>handle((v, sink) -> { }
if (v.isEmpty()) { try {
v = null; var result = updater.apply(v);
} if (result != null && result.isEmpty()) {
try { result = null;
var result = updater.apply(v); }
if (result != null && result.isEmpty()) { sink.next(Tuples.of(Optional.ofNullable(v), Optional.ofNullable(result)));
result = null; } catch (SerializationException ex) {
} sink.error(ex);
sink.next(Tuples.of(Optional.ofNullable(v), Optional.ofNullable(result))); }
} catch (SerializationException ex) { })
sink.error(ex); .flatMap(result -> Mono
} .justOrEmpty(result.getT2())
}) .flatMap(values -> this.setAllValues(Flux.fromIterable(values.entrySet())))
.flatMap(result -> Mono .thenReturn(new Delta<>(result.getT1().orElse(null), result.getT2().orElse(null)))
.justOrEmpty(result.getT2()) );
.flatMap(values -> this.setAllValues(Flux.fromIterable(values.entrySet()))) } else if (updateMode == UpdateMode.ALLOW) {
.thenReturn(new Delta<>(result.getT1().orElse(null), result.getT2().orElse(null))) return Mono.fromCallable(() -> {
); throw new UnsupportedOperationException("Maps can't be updated atomically");
} else if (updateMode == UpdateMode.ALLOW) { });
return Mono.fromCallable(() -> { } else if (updateMode == UpdateMode.DISALLOW) {
throw new UnsupportedOperationException("Maps can't be updated atomically"); return Mono.fromCallable(() -> {
}); throw new UnsupportedOperationException("Map can't be updated because updates are disabled");
} else if (updateMode == UpdateMode.DISALLOW) { });
return Mono.fromCallable(() -> { } else {
throw new UnsupportedOperationException("Map can't be updated because updates are disabled"); return Mono.fromCallable(() -> {
}); throw new UnsupportedOperationException("Unknown update mode: " + updateMode);
} else { });
return Mono.fromCallable(() -> { }
throw new UnsupportedOperationException("Unknown update mode: " + updateMode);
});
}
});
} }
@Override @Override

View File

@ -409,8 +409,8 @@ public class LLLocalDictionary implements LLDictionary {
} }
@Override @Override
public Mono<UpdateMode> getUpdateMode() { public UpdateMode getUpdateMode() {
return Mono.just(updateMode); return updateMode;
} }
@SuppressWarnings("DuplicatedCode") @SuppressWarnings("DuplicatedCode")

View File

@ -201,8 +201,8 @@ public class LLMemoryDictionary implements LLDictionary {
} }
@Override @Override
public Mono<UpdateMode> getUpdateMode() { public UpdateMode getUpdateMode() {
return Mono.just(updateMode); return updateMode;
} }
@Override @Override

View File

@ -201,7 +201,7 @@ public abstract class TestLLDictionary {
@MethodSource("provideArguments") @MethodSource("provideArguments")
public void testGetUpdateMode(UpdateMode updateMode) { public void testGetUpdateMode(UpdateMode updateMode) {
var dict = getDict(updateMode); var dict = getDict(updateMode);
assertEquals(updateMode, run(dict.getUpdateMode())); assertEquals(updateMode, dict.getUpdateMode());
} }
@ParameterizedTest @ParameterizedTest

View File

@ -154,7 +154,7 @@ public abstract class TestLLDictionaryLeaks {
@MethodSource("provideArguments") @MethodSource("provideArguments")
public void testGetUpdateMode(UpdateMode updateMode) { public void testGetUpdateMode(UpdateMode updateMode) {
var dict = getDict(updateMode); var dict = getDict(updateMode);
assertEquals(updateMode, run(dict.getUpdateMode())); assertEquals(updateMode, dict.getUpdateMode());
} }
@ParameterizedTest @ParameterizedTest