diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index e908fe1..e9c7156 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -22,6 +22,7 @@ import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; +import it.cavallium.dbengine.utils.InternalMonoUtils; import it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMaps; @@ -228,9 +229,12 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> setAndGetPrevious(Object2ObjectSortedMap value) { return this .get(null) - .concatWith(dictionary.setRange(rangeMono, Flux - .fromIterable(Collections.unmodifiableMap(value).entrySet()) - .map(entry -> serializeEntry(entry)), true).then(Mono.empty())) + .concatWith(dictionary + .setRange(rangeMono, + Flux.fromIterable(Collections.unmodifiableMap(value).entrySet()).map(entry -> serializeEntry(entry)), + true + ) + .as(InternalMonoUtils::toAny)) .singleOrEmpty(); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index b6894ca..cc71a39 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -22,6 +22,7 @@ import it.cavallium.dbengine.database.collections.DatabaseEmpty.Nothing; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; +import it.cavallium.dbengine.utils.InternalMonoUtils; import it.cavallium.dbengine.utils.SimpleResource; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap; import java.util.List; @@ -362,7 +363,7 @@ public class DatabaseMapDictionaryDeep> extend .concatWith(this .clear() .then(this.putMulti(entries)) - .then(Mono.empty()) + .as(InternalMonoUtils::toAny) ); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java index 1f2a00e..1eda5cc 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java @@ -14,6 +14,7 @@ import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.database.serialization.Serializer; +import it.cavallium.dbengine.utils.InternalMonoUtils; import it.cavallium.dbengine.utils.SimpleResource; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -92,7 +93,9 @@ public class DatabaseSingleton extends SimpleResource implements DatabaseStag @Override public Mono setAndGetPrevious(U value) { var resultMono = Flux - .concat(singleton.get(null), singleton.set(Mono.fromCallable(() -> serializeValue(value))).then(Mono.empty())) + .concat(singleton.get(null), + singleton.set(Mono.fromCallable(() -> serializeValue(value))).as(InternalMonoUtils::toAny) + ) .last(); return Mono.usingWhen(resultMono, result -> Mono.fromSupplier(() -> this.deserializeValue(result)), @@ -154,7 +157,7 @@ public class DatabaseSingleton extends SimpleResource implements DatabaseStag @Override public Mono clearAndGetPrevious() { - var resultMono = Flux.concat(singleton.get(null), singleton.set(Mono.empty()).then(Mono.empty())).last(); + var resultMono = Flux.concat(singleton.get(null), singleton.set(Mono.empty()).as(InternalMonoUtils::toAny)).last(); return Mono.usingWhen(resultMono, result -> Mono.fromSupplier(() -> this.deserializeValue(result)), LLUtils::finalizeResource diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java index f4b5b34..521996a 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java @@ -18,6 +18,7 @@ import it.cavallium.dbengine.database.disk.BinarySerializationFunction; import it.cavallium.dbengine.database.serialization.KVSerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; +import it.cavallium.dbengine.utils.InternalMonoUtils; import it.unimi.dsi.fastutil.bytes.ByteList; import java.io.IOException; import java.util.List; @@ -91,7 +92,7 @@ public class LLMemoryDictionary implements LLDictionary { .defaultIfEmpty(false) .map((Boolean bool) -> LLUtils.booleanToResponseByteBuffer(allocator, bool)); } else { - return result.then(Mono.empty()); + return result.as(InternalMonoUtils::ignoreElements); } } diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java index 22f5ae4..26afcc7 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java @@ -15,6 +15,7 @@ import it.cavallium.dbengine.database.RocksDBStringProperty; import it.cavallium.dbengine.database.TableWithProperties; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.rpc.current.data.Column; +import it.cavallium.dbengine.utils.InternalMonoUtils; import it.unimi.dsi.fastutil.bytes.ByteList; import java.nio.charset.StandardCharsets; import java.nio.file.Path; @@ -76,7 +77,7 @@ public class LLMemoryKeyValueDatabase implements LLKeyValueDatabase { if (defaultValue != null) { return mono.switchIfEmpty(singleton .set(Mono.fromSupplier(() -> allocator.copyOf(defaultValue))) - .then(Mono.empty())); + .as(InternalMonoUtils::toAny)); } else { return mono; } diff --git a/src/main/java/it/cavallium/dbengine/database/remote/QuicUtils.java b/src/main/java/it/cavallium/dbengine/database/remote/QuicUtils.java index 94b7b6c..964a85e 100644 --- a/src/main/java/it/cavallium/dbengine/database/remote/QuicUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/remote/QuicUtils.java @@ -8,6 +8,7 @@ import it.cavallium.dbengine.database.OptionalBuf; import it.cavallium.dbengine.rpc.current.data.RPCCrash; import it.cavallium.dbengine.rpc.current.data.RPCEvent; import it.cavallium.dbengine.rpc.current.data.nullables.NullableBytes; +import it.cavallium.dbengine.utils.InternalMonoUtils; import it.unimi.dsi.fastutil.bytes.ByteArrayList; import it.unimi.dsi.fastutil.bytes.ByteList; import java.nio.charset.StandardCharsets; @@ -241,7 +242,7 @@ public class QuicUtils { .flatMap(updater) .flatMap(stream::send); return Flux - .merge(firstRequest, firstResponse.then(Mono.empty()), secondRequest, secondResponse) + .merge(firstRequest, firstResponse.as(InternalMonoUtils::ignoreElements), secondRequest, secondResponse) .doFinally(s -> stream.close()); }) .map(QuicUtils::mapErrors) diff --git a/src/main/java/it/cavallium/dbengine/utils/InternalMonoUtils.java b/src/main/java/it/cavallium/dbengine/utils/InternalMonoUtils.java new file mode 100644 index 0000000..a828afb --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/utils/InternalMonoUtils.java @@ -0,0 +1,33 @@ +package it.cavallium.dbengine.utils; + +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class InternalMonoUtils { + + + @SuppressWarnings("unchecked") + public static Mono toAny(Mono request) { + return (Mono) request; + } + + @SuppressWarnings("unchecked") + public static Mono toAny(Flux request) { + return (Mono) Mono.ignoreElements(request); + } + + @SuppressWarnings("unchecked") + public static Mono toAny(Publisher request) { + if (request instanceof Mono mono) { + return (Mono) mono; + } else { + return (Mono) Mono.ignoreElements(request); + } + } + + @SuppressWarnings("unchecked") + public static Mono ignoreElements(Publisher flux) { + return (Mono) Mono.ignoreElements(flux); + } +}