Ignore mono elements faster

This commit is contained in:
Andrea Cavalli 2023-01-27 15:33:32 +01:00
parent b9b420afba
commit 694c2d811d
7 changed files with 53 additions and 9 deletions

View File

@ -22,6 +22,7 @@ import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import it.cavallium.dbengine.utils.InternalMonoUtils;
import it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap;
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap;
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMaps;
@ -228,9 +229,12 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
public Mono<Object2ObjectSortedMap<T, U>> setAndGetPrevious(Object2ObjectSortedMap<T, U> value) {
return this
.get(null)
.concatWith(dictionary.setRange(rangeMono, Flux
.fromIterable(Collections.unmodifiableMap(value).entrySet())
.map(entry -> serializeEntry(entry)), true).then(Mono.empty()))
.concatWith(dictionary
.setRange(rangeMono,
Flux.fromIterable(Collections.unmodifiableMap(value).entrySet()).map(entry -> serializeEntry(entry)),
true
)
.as(InternalMonoUtils::toAny))
.singleOrEmpty();
}

View File

@ -22,6 +22,7 @@ import it.cavallium.dbengine.database.collections.DatabaseEmpty.Nothing;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import it.cavallium.dbengine.utils.InternalMonoUtils;
import it.cavallium.dbengine.utils.SimpleResource;
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap;
import java.util.List;
@ -362,7 +363,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
.concatWith(this
.clear()
.then(this.putMulti(entries))
.then(Mono.empty())
.as(InternalMonoUtils::toAny)
);
}

View File

@ -14,6 +14,7 @@ import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.utils.InternalMonoUtils;
import it.cavallium.dbengine.utils.SimpleResource;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -92,7 +93,9 @@ public class DatabaseSingleton<U> extends SimpleResource implements DatabaseStag
@Override
public Mono<U> setAndGetPrevious(U value) {
var resultMono = Flux
.concat(singleton.get(null), singleton.set(Mono.fromCallable(() -> serializeValue(value))).then(Mono.empty()))
.concat(singleton.get(null),
singleton.set(Mono.fromCallable(() -> serializeValue(value))).as(InternalMonoUtils::toAny)
)
.last();
return Mono.usingWhen(resultMono,
result -> Mono.fromSupplier(() -> this.deserializeValue(result)),
@ -154,7 +157,7 @@ public class DatabaseSingleton<U> extends SimpleResource implements DatabaseStag
@Override
public Mono<U> clearAndGetPrevious() {
var resultMono = Flux.concat(singleton.get(null), singleton.set(Mono.empty()).then(Mono.empty())).last();
var resultMono = Flux.concat(singleton.get(null), singleton.set(Mono.empty()).as(InternalMonoUtils::toAny)).last();
return Mono.usingWhen(resultMono,
result -> Mono.fromSupplier(() -> this.deserializeValue(result)),
LLUtils::finalizeResource

View File

@ -18,6 +18,7 @@ import it.cavallium.dbengine.database.disk.BinarySerializationFunction;
import it.cavallium.dbengine.database.serialization.KVSerializationFunction;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.utils.InternalMonoUtils;
import it.unimi.dsi.fastutil.bytes.ByteList;
import java.io.IOException;
import java.util.List;
@ -91,7 +92,7 @@ public class LLMemoryDictionary implements LLDictionary {
.defaultIfEmpty(false)
.map((Boolean bool) -> LLUtils.booleanToResponseByteBuffer(allocator, bool));
} else {
return result.then(Mono.empty());
return result.as(InternalMonoUtils::ignoreElements);
}
}

View File

@ -15,6 +15,7 @@ import it.cavallium.dbengine.database.RocksDBStringProperty;
import it.cavallium.dbengine.database.TableWithProperties;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.rpc.current.data.Column;
import it.cavallium.dbengine.utils.InternalMonoUtils;
import it.unimi.dsi.fastutil.bytes.ByteList;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
@ -76,7 +77,7 @@ public class LLMemoryKeyValueDatabase implements LLKeyValueDatabase {
if (defaultValue != null) {
return mono.switchIfEmpty(singleton
.set(Mono.fromSupplier(() -> allocator.copyOf(defaultValue)))
.then(Mono.empty()));
.as(InternalMonoUtils::toAny));
} else {
return mono;
}

View File

@ -8,6 +8,7 @@ import it.cavallium.dbengine.database.OptionalBuf;
import it.cavallium.dbengine.rpc.current.data.RPCCrash;
import it.cavallium.dbengine.rpc.current.data.RPCEvent;
import it.cavallium.dbengine.rpc.current.data.nullables.NullableBytes;
import it.cavallium.dbengine.utils.InternalMonoUtils;
import it.unimi.dsi.fastutil.bytes.ByteArrayList;
import it.unimi.dsi.fastutil.bytes.ByteList;
import java.nio.charset.StandardCharsets;
@ -241,7 +242,7 @@ public class QuicUtils {
.flatMap(updater)
.flatMap(stream::send);
return Flux
.merge(firstRequest, firstResponse.then(Mono.empty()), secondRequest, secondResponse)
.merge(firstRequest, firstResponse.as(InternalMonoUtils::ignoreElements), secondRequest, secondResponse)
.doFinally(s -> stream.close());
})
.map(QuicUtils::mapErrors)

View File

@ -0,0 +1,33 @@
package it.cavallium.dbengine.utils;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class InternalMonoUtils {
@SuppressWarnings("unchecked")
public static <ANY> Mono<ANY> toAny(Mono<Void> request) {
return (Mono<ANY>) request;
}
@SuppressWarnings("unchecked")
public static <ANY> Mono<ANY> toAny(Flux<Void> request) {
return (Mono<ANY>) Mono.ignoreElements(request);
}
@SuppressWarnings("unchecked")
public static <ANY> Mono<ANY> toAny(Publisher<Void> request) {
if (request instanceof Mono<Void> mono) {
return (Mono<ANY>) mono;
} else {
return (Mono<ANY>) Mono.ignoreElements(request);
}
}
@SuppressWarnings("unchecked")
public static <ANY_IN, ANY_OUT> Mono<ANY_OUT> ignoreElements(Publisher<ANY_IN> flux) {
return (Mono<ANY_OUT>) Mono.ignoreElements(flux);
}
}