From f739f4f9f4d252436ee7dc1f5a97e0e34626cef6 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Mon, 12 Sep 2022 20:14:56 +0200 Subject: [PATCH] UpdateMode is now a "blocking" method --- .../dbengine/database/LLDictionary.java | 2 +- .../collections/DatabaseMapDictionary.java | 2 +- .../DatabaseMapDictionaryDeep.java | 2 +- .../DatabaseMapDictionaryHashed.java | 6 +- .../collections/DatabaseStageMap.java | 84 +++++++++---------- .../database/disk/LLLocalDictionary.java | 4 +- .../database/memory/LLMemoryDictionary.java | 4 +- .../cavallium/dbengine/TestLLDictionary.java | 2 +- .../dbengine/TestLLDictionaryLeaks.java | 2 +- 9 files changed, 52 insertions(+), 56 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java index 17fa22f..1ea7704 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java @@ -25,7 +25,7 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure { Mono put(Mono key, Mono value, LLDictionaryResultType resultType); - Mono getUpdateMode(); + UpdateMode getUpdateMode(); default Mono update(Mono key, BinarySerializationFunction updater, diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index 12e3e85..af18c29 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -285,7 +285,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep getUpdateMode() { + public UpdateMode getUpdateMode() { return dictionary.getUpdateMode(); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index 5b9bbf7..f3c3a10 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -303,7 +303,7 @@ public class DatabaseMapDictionaryDeep> extend } @Override - public Mono getUpdateMode() { + public UpdateMode getUpdateMode() { return dictionary.getUpdateMode(); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java index 9828790..c8d3803 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java @@ -48,7 +48,8 @@ public class DatabaseMapDictionaryHashed extends SimpleResource implem Serializer valueSerializer, Function keySuffixHashFunction, SerializerFixedBinaryLength keySuffixHashSerializer) { - if (dictionary.getUpdateMode().transform(LLUtils::handleDiscard).block() != UpdateMode.ALLOW) { + var updateMode = dictionary.getUpdateMode(); + if (updateMode != UpdateMode.ALLOW) { throw new IllegalArgumentException("Hashed maps only works when UpdateMode is ALLOW"); } this.alloc = dictionary.getAllocator(); @@ -61,7 +62,6 @@ public class DatabaseMapDictionaryHashed extends SimpleResource implem this.keySuffixHashFunction = keySuffixHashFunction; } - @SuppressWarnings({"unchecked", "rawtypes"}) private DatabaseMapDictionaryHashed(BufferAllocator alloc, Function keySuffixHashFunction, DatabaseStage>>> subDictionary, @@ -177,7 +177,7 @@ public class DatabaseMapDictionaryHashed extends SimpleResource implem } @Override - public Mono getUpdateMode() { + public UpdateMode getUpdateMode() { return subDictionary.getUpdateMode(); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java index a4d9cb1..c236c8e 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java @@ -51,7 +51,7 @@ public interface DatabaseStageMap> extends return Mono.usingWhen(at(null, key).single(), stage -> stage.set(value), LLUtils::finalizeResource); } - Mono getUpdateMode(); + UpdateMode getUpdateMode(); default Mono updateValue(T key, UpdateReturnMode updateReturnMode, @@ -192,49 +192,45 @@ public interface DatabaseStageMap> extends @Override default Mono>> updateAndGetDelta(SerializationFunction<@Nullable Object2ObjectSortedMap, @Nullable Object2ObjectSortedMap> updater) { - return this - .getUpdateMode() - .single() - .flatMap(updateMode -> { - if (updateMode == UpdateMode.ALLOW_UNSAFE) { - return this - .getAllValues(null, true) - .collectMap(Entry::getKey, Entry::getValue, Object2ObjectLinkedOpenHashMap::new) - .map(map -> (Object2ObjectSortedMap) map) - .single() - .>, Optional>>>handle((v, sink) -> { - if (v.isEmpty()) { - v = null; - } - try { - var result = updater.apply(v); - if (result != null && result.isEmpty()) { - result = null; - } - sink.next(Tuples.of(Optional.ofNullable(v), Optional.ofNullable(result))); - } catch (SerializationException ex) { - sink.error(ex); - } - }) - .flatMap(result -> Mono - .justOrEmpty(result.getT2()) - .flatMap(values -> this.setAllValues(Flux.fromIterable(values.entrySet()))) - .thenReturn(new Delta<>(result.getT1().orElse(null), result.getT2().orElse(null))) - ); - } else if (updateMode == UpdateMode.ALLOW) { - return Mono.fromCallable(() -> { - throw new UnsupportedOperationException("Maps can't be updated atomically"); - }); - } else if (updateMode == UpdateMode.DISALLOW) { - return Mono.fromCallable(() -> { - throw new UnsupportedOperationException("Map can't be updated because updates are disabled"); - }); - } else { - return Mono.fromCallable(() -> { - throw new UnsupportedOperationException("Unknown update mode: " + updateMode); - }); - } - }); + var updateMode = this.getUpdateMode(); + if (updateMode == UpdateMode.ALLOW_UNSAFE) { + return this + .getAllValues(null, true) + .collectMap(Entry::getKey, Entry::getValue, Object2ObjectLinkedOpenHashMap::new) + .map(map -> (Object2ObjectSortedMap) map) + .single() + .>, Optional>>>handle((v, sink) -> { + if (v.isEmpty()) { + v = null; + } + try { + var result = updater.apply(v); + if (result != null && result.isEmpty()) { + result = null; + } + sink.next(Tuples.of(Optional.ofNullable(v), Optional.ofNullable(result))); + } catch (SerializationException ex) { + sink.error(ex); + } + }) + .flatMap(result -> Mono + .justOrEmpty(result.getT2()) + .flatMap(values -> this.setAllValues(Flux.fromIterable(values.entrySet()))) + .thenReturn(new Delta<>(result.getT1().orElse(null), result.getT2().orElse(null))) + ); + } else if (updateMode == UpdateMode.ALLOW) { + return Mono.fromCallable(() -> { + throw new UnsupportedOperationException("Maps can't be updated atomically"); + }); + } else if (updateMode == UpdateMode.DISALLOW) { + return Mono.fromCallable(() -> { + throw new UnsupportedOperationException("Map can't be updated because updates are disabled"); + }); + } else { + return Mono.fromCallable(() -> { + throw new UnsupportedOperationException("Unknown update mode: " + updateMode); + }); + } } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index c1a6239..9335ffc 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -409,8 +409,8 @@ public class LLLocalDictionary implements LLDictionary { } @Override - public Mono getUpdateMode() { - return Mono.just(updateMode); + public UpdateMode getUpdateMode() { + return updateMode; } @SuppressWarnings("DuplicatedCode") diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java index 0fbd613..688dacd 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java @@ -201,8 +201,8 @@ public class LLMemoryDictionary implements LLDictionary { } @Override - public Mono getUpdateMode() { - return Mono.just(updateMode); + public UpdateMode getUpdateMode() { + return updateMode; } @Override diff --git a/src/test/java/it/cavallium/dbengine/TestLLDictionary.java b/src/test/java/it/cavallium/dbengine/TestLLDictionary.java index 46569a4..a38d77c 100644 --- a/src/test/java/it/cavallium/dbengine/TestLLDictionary.java +++ b/src/test/java/it/cavallium/dbengine/TestLLDictionary.java @@ -201,7 +201,7 @@ public abstract class TestLLDictionary { @MethodSource("provideArguments") public void testGetUpdateMode(UpdateMode updateMode) { var dict = getDict(updateMode); - assertEquals(updateMode, run(dict.getUpdateMode())); + assertEquals(updateMode, dict.getUpdateMode()); } @ParameterizedTest diff --git a/src/test/java/it/cavallium/dbengine/TestLLDictionaryLeaks.java b/src/test/java/it/cavallium/dbengine/TestLLDictionaryLeaks.java index c019007..7f0b8e0 100644 --- a/src/test/java/it/cavallium/dbengine/TestLLDictionaryLeaks.java +++ b/src/test/java/it/cavallium/dbengine/TestLLDictionaryLeaks.java @@ -154,7 +154,7 @@ public abstract class TestLLDictionaryLeaks { @MethodSource("provideArguments") public void testGetUpdateMode(UpdateMode updateMode) { var dict = getDict(updateMode); - assertEquals(updateMode, run(dict.getUpdateMode())); + assertEquals(updateMode, dict.getUpdateMode()); } @ParameterizedTest