CavalliumDBEngine/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java

487 lines
16 KiB
Java
Raw Normal View History

package it.cavallium.dbengine.database.collections;
2021-09-17 16:56:28 +02:00
import io.net5.buffer.api.Buffer;
2021-11-08 12:06:32 +01:00
import io.net5.buffer.api.Resource;
2021-09-17 16:56:28 +02:00
import io.net5.buffer.api.Send;
import io.net5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.client.CompositeSnapshot;
2021-05-08 03:09:00 +02:00
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLDictionary;
2021-01-31 21:23:43 +01:00
import it.cavallium.dbengine.database.LLDictionaryResultType;
2021-08-28 22:42:51 +02:00
import it.cavallium.dbengine.database.LLEntry;
2022-01-22 23:21:40 +01:00
import it.cavallium.dbengine.database.LLRange;
2021-01-31 21:23:43 +01:00
import it.cavallium.dbengine.database.LLUtils;
2021-05-02 19:18:15 +02:00
import it.cavallium.dbengine.database.UpdateMode;
2021-05-08 03:09:00 +02:00
import it.cavallium.dbengine.database.UpdateReturnMode;
2021-11-08 10:49:59 +01:00
import it.cavallium.dbengine.database.serialization.KVSerializationFunction;
2021-08-22 21:23:22 +02:00
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
2021-02-02 19:40:37 +01:00
import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
2021-12-18 18:16:56 +01:00
import it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap;
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap;
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMaps;
2021-06-06 02:23:51 +02:00
import java.util.Collections;
2021-09-08 00:22:39 +02:00
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
2021-05-02 19:18:15 +02:00
import java.util.Objects;
2021-07-23 15:20:33 +02:00
import java.util.Optional;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
2021-08-22 21:23:22 +02:00
import reactor.core.publisher.SynchronousSink;
2021-07-17 11:52:08 +02:00
import reactor.util.function.Tuple2;
2021-01-31 21:23:43 +01:00
/**
* Optimized implementation of "DatabaseMapDictionary with SubStageGetterSingle"
*/
public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U, DatabaseStageEntry<U>> {
2021-09-02 17:15:40 +02:00
private final Serializer<U> valueSerializer;
protected DatabaseMapDictionary(LLDictionary dictionary,
2021-11-08 16:33:41 +01:00
@Nullable Buffer prefixKey,
2021-09-02 17:15:40 +02:00
SerializerFixedBinaryLength<T> keySuffixSerializer,
2021-09-23 20:57:28 +02:00
Serializer<U> valueSerializer,
2021-10-01 19:17:33 +02:00
Runnable onClose) {
// Do not retain or release or use the prefixKey here
2021-10-01 19:17:33 +02:00
super(dictionary, prefixKey, keySuffixSerializer, new SubStageGetterSingle<>(valueSerializer), 0, onClose);
2021-01-31 21:23:43 +01:00
this.valueSerializer = valueSerializer;
}
2021-01-31 21:23:43 +01:00
public static <T, U> DatabaseMapDictionary<T, U> simple(LLDictionary dictionary,
2021-09-02 17:15:40 +02:00
SerializerFixedBinaryLength<T> keySerializer,
2021-09-23 20:57:28 +02:00
Serializer<U> valueSerializer,
2021-10-01 19:17:33 +02:00
Runnable onClose) {
2021-11-08 16:33:41 +01:00
return new DatabaseMapDictionary<>(dictionary, null, keySerializer,
2021-10-01 19:17:33 +02:00
valueSerializer, onClose);
}
2021-01-31 21:23:43 +01:00
public static <T, U> DatabaseMapDictionary<T, U> tail(LLDictionary dictionary,
2021-11-08 16:33:41 +01:00
@Nullable Buffer prefixKey,
2021-09-02 17:15:40 +02:00
SerializerFixedBinaryLength<T> keySuffixSerializer,
2021-09-23 20:57:28 +02:00
Serializer<U> valueSerializer,
2021-10-01 19:17:33 +02:00
Runnable onClose) {
return new DatabaseMapDictionary<>(dictionary, prefixKey, keySuffixSerializer, valueSerializer, onClose);
}
2021-10-19 00:22:05 +02:00
private void deserializeValue(Send<Buffer> valueToReceive, SynchronousSink<U> sink) {
try (var value = valueToReceive.receive()) {
sink.next(valueSerializer.deserialize(value));
} catch (Throwable ex) {
sink.error(ex);
}
}
2021-11-08 16:33:41 +01:00
private Buffer serializeValue(U value) throws SerializationException {
2021-10-19 00:22:05 +02:00
var valSizeHint = valueSerializer.getSerializedSizeHint();
if (valSizeHint == -1) valSizeHint = 128;
2021-11-08 16:33:41 +01:00
var valBuf = dictionary.getAllocator().allocate(valSizeHint);
try {
2021-10-19 00:22:05 +02:00
valueSerializer.serialize(value, valBuf);
2021-11-08 16:33:41 +01:00
return valBuf;
} catch (Throwable t) {
valBuf.close();
throw t;
2021-10-19 00:22:05 +02:00
}
}
2021-11-08 16:33:41 +01:00
private Buffer serializeKeySuffixToKey(T keySuffix) throws SerializationException {
Buffer keyBuf;
if (keyPrefix != null) {
keyBuf = keyPrefix.copy();
} else {
keyBuf = this.dictionary.getAllocator().allocate(keyPrefixLength + keySuffixLength + keyExtLength);
}
try {
2021-10-19 00:22:05 +02:00
assert keyBuf.readableBytes() == keyPrefixLength;
keyBuf.ensureWritable(keySuffixLength + keyExtLength);
serializeSuffix(keySuffix, keyBuf);
assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
2021-11-08 16:33:41 +01:00
return keyBuf;
} catch (Throwable t) {
keyBuf.close();
throw t;
2021-10-19 00:22:05 +02:00
}
}
2021-11-08 16:33:41 +01:00
private Buffer toKey(Buffer suffixKey) {
assert suffixKeyLengthConsistency(suffixKey.readableBytes());
if (keyPrefix != null && keyPrefix.readableBytes() > 0) {
var result = LLUtils.compositeBuffer(dictionary.getAllocator(),
LLUtils.copy(dictionary.getAllocator(), keyPrefix),
suffixKey.send()
);
try {
assert result.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
return result;
} catch (Throwable t) {
result.close();
throw t;
}
2021-11-08 16:33:41 +01:00
} else {
assert suffixKey.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
return suffixKey;
}
}
2021-01-31 21:23:43 +01:00
@Override
2021-12-18 18:16:56 +01:00
public Mono<Object2ObjectSortedMap<T, U>> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) {
return dictionary
.getRange(resolveSnapshot(snapshot), rangeMono, existsAlmostCertainly)
2021-08-31 09:14:46 +02:00
.<Entry<T, U>>handle((entrySend, sink) -> {
2021-10-19 00:22:05 +02:00
Entry<T, U> deserializedEntry;
try {
try (var entry = entrySend.receive()) {
T key;
try (var serializedKey = entry.getKey().receive()) {
splitPrefix(serializedKey).close();
suffixKeyLengthConsistency(serializedKey.readableBytes());
key = deserializeSuffix(serializedKey);
}
U value;
try (var valueBuf = entry.getValue().receive()) {
value = valueSerializer.deserialize(valueBuf);
}
deserializedEntry = Map.entry(key, value);
}
2021-10-19 00:22:05 +02:00
sink.next(deserializedEntry);
} catch (Throwable ex) {
2021-08-22 21:23:22 +02:00
sink.error(ex);
}
})
2021-12-18 18:16:56 +01:00
.collectMap(Entry::getKey, Entry::getValue, Object2ObjectLinkedOpenHashMap::new)
.map(map -> (Object2ObjectSortedMap<T, U>) map)
.filter(map -> !map.isEmpty());
2021-01-31 19:52:47 +01:00
}
2021-01-31 21:23:43 +01:00
@Override
2021-12-18 18:16:56 +01:00
public Mono<Object2ObjectSortedMap<T, U>> setAndGetPrevious(Object2ObjectSortedMap<T, U> value) {
2021-08-22 21:23:22 +02:00
return this
.get(null, false)
.concatWith(dictionary.setRange(rangeMono, Flux
2021-08-22 19:54:23 +02:00
.fromIterable(Collections.unmodifiableMap(value).entrySet())
2021-10-19 00:22:05 +02:00
.handle(this::serializeEntrySink)
2021-08-22 21:23:22 +02:00
).then(Mono.empty()))
2022-01-26 14:22:54 +01:00
.singleOrEmpty();
2021-01-31 19:52:47 +01:00
}
2021-01-31 21:23:43 +01:00
@Override
2021-12-18 18:16:56 +01:00
public Mono<Object2ObjectSortedMap<T, U>> clearAndGetPrevious() {
2021-05-02 19:18:15 +02:00
return this
2021-12-18 18:16:56 +01:00
.setAndGetPrevious(Object2ObjectSortedMaps.emptyMap());
}
2021-01-31 21:23:43 +01:00
@Override
2021-02-24 16:43:07 +01:00
public Mono<Long> leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) {
return dictionary.sizeRange(resolveSnapshot(snapshot), rangeMono, fast);
}
2021-03-14 03:13:19 +01:00
@Override
public Mono<Boolean> isEmpty(@Nullable CompositeSnapshot snapshot) {
return dictionary.isRangeEmpty(resolveSnapshot(snapshot), rangeMono, false);
2021-03-14 03:13:19 +01:00
}
2021-01-31 21:23:43 +01:00
@Override
public Mono<DatabaseStageEntry<U>> at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
2021-09-02 17:15:40 +02:00
return Mono.fromCallable(() ->
2021-10-19 00:22:05 +02:00
new DatabaseSingle<>(dictionary, serializeKeySuffixToKey(keySuffix), valueSerializer, null));
}
2022-01-22 23:21:40 +01:00
@Override
public Mono<Boolean> containsKey(@Nullable CompositeSnapshot snapshot, T keySuffix) {
return dictionary
.isRangeEmpty(resolveSnapshot(snapshot),
Mono.fromCallable(() -> LLRange.singleUnsafe(serializeKeySuffixToKey(keySuffix)).send()),
true
2022-01-22 23:21:40 +01:00
)
.map(empty -> !empty);
}
2021-01-31 21:23:43 +01:00
@Override
2021-03-18 16:19:41 +01:00
public Mono<U> getValue(@Nullable CompositeSnapshot snapshot, T keySuffix, boolean existsAlmostCertainly) {
2021-08-31 09:14:46 +02:00
return dictionary
2021-09-22 18:33:28 +02:00
.get(resolveSnapshot(snapshot),
2021-11-08 16:33:41 +01:00
Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix).send()),
2021-09-22 18:33:28 +02:00
existsAlmostCertainly
)
2021-09-30 18:25:36 +02:00
.handle(this::deserializeValue);
}
2021-01-31 21:23:43 +01:00
@Override
public Mono<Void> putValue(T keySuffix, U value) {
2021-11-08 16:33:41 +01:00
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix).send()).single();
var valueMono = Mono.fromCallable(() -> serializeValue(value).send()).single();
2021-08-31 09:14:46 +02:00
return dictionary
.put(keyMono, valueMono, LLDictionaryResultType.VOID)
.doOnNext(Send::close)
.then();
}
2021-05-02 19:18:15 +02:00
@Override
public Mono<UpdateMode> getUpdateMode() {
return dictionary.getUpdateMode();
}
2021-02-06 19:21:31 +01:00
@Override
2021-05-08 03:09:00 +02:00
public Mono<U> updateValue(T keySuffix,
UpdateReturnMode updateReturnMode,
2021-03-18 16:19:41 +01:00
boolean existsAlmostCertainly,
2021-08-22 21:23:22 +02:00
SerializationFunction<@Nullable U, @Nullable U> updater) {
2021-11-08 16:33:41 +01:00
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix).send());
2021-08-31 09:14:46 +02:00
return dictionary
.update(keyMono, getSerializedUpdater(updater), updateReturnMode, existsAlmostCertainly)
2021-09-30 18:25:36 +02:00
.handle(this::deserializeValue);
2021-05-08 03:09:00 +02:00
}
@Override
public Mono<Delta<U>> updateValueAndGetDelta(T keySuffix,
boolean existsAlmostCertainly,
2021-08-22 21:23:22 +02:00
SerializationFunction<@Nullable U, @Nullable U> updater) {
2021-11-08 16:33:41 +01:00
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix).send());
2021-08-31 09:14:46 +02:00
return dictionary
.updateAndGetDelta(keyMono, getSerializedUpdater(updater), existsAlmostCertainly)
2021-10-19 00:22:05 +02:00
.transform(mono -> LLUtils.mapLLDelta(mono, serializedToReceive -> {
try (var serialized = serializedToReceive.receive()) {
return valueSerializer.deserialize(serialized);
}
}));
2021-02-06 19:21:31 +01:00
}
2021-11-08 16:33:41 +01:00
public SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> getSerializedUpdater(
2021-08-31 09:14:46 +02:00
SerializationFunction<@Nullable U, @Nullable U> updater) {
2021-07-17 11:52:08 +02:00
return oldSerialized -> {
2021-08-31 09:14:46 +02:00
try (oldSerialized) {
2021-08-22 21:23:22 +02:00
U result;
if (oldSerialized == null) {
result = updater.apply(null);
} else {
2021-10-19 00:22:05 +02:00
try (var oldSerializedReceived = oldSerialized.receive()) {
result = updater.apply(valueSerializer.deserialize(oldSerializedReceived));
}
2021-08-22 21:23:22 +02:00
}
2021-07-17 11:52:08 +02:00
if (result == null) {
return null;
} else {
2021-10-19 00:22:05 +02:00
return serializeValue(result);
2021-07-17 11:52:08 +02:00
}
}
};
}
2021-11-08 16:33:41 +01:00
public KVSerializationFunction<@NotNull T, @Nullable Send<Buffer>, @Nullable Buffer> getSerializedUpdater(
2021-11-08 10:49:59 +01:00
KVSerializationFunction<@NotNull T, @Nullable U, @Nullable U> updater) {
return (key, oldSerialized) -> {
2021-08-31 09:14:46 +02:00
try (oldSerialized) {
2021-08-22 21:23:22 +02:00
U result;
if (oldSerialized == null) {
2021-11-08 10:49:59 +01:00
result = updater.apply(key, null);
2021-08-22 21:23:22 +02:00
} else {
2021-10-19 00:22:05 +02:00
try (var oldSerializedReceived = oldSerialized.receive()) {
2021-11-08 10:49:59 +01:00
result = updater.apply(key, valueSerializer.deserialize(oldSerializedReceived));
2021-10-19 00:22:05 +02:00
}
2021-08-22 21:23:22 +02:00
}
2021-07-17 11:52:08 +02:00
if (result == null) {
return null;
} else {
2021-10-19 00:22:05 +02:00
return serializeValue(result);
2021-07-17 11:52:08 +02:00
}
}
};
}
2021-01-31 21:23:43 +01:00
@Override
public Mono<U> putValueAndGetPrevious(T keySuffix, U value) {
2021-11-08 16:33:41 +01:00
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix).send());
var valueMono = Mono.fromCallable(() -> serializeValue(value).send());
2021-10-19 00:22:05 +02:00
return dictionary.put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE).handle(this::deserializeValue);
}
2021-01-31 21:23:43 +01:00
@Override
2021-05-02 19:18:15 +02:00
public Mono<Boolean> putValueAndGetChanged(T keySuffix, U value) {
2021-11-08 16:33:41 +01:00
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix).send());
var valueMono = Mono.fromCallable(() -> serializeValue(value).send());
2021-08-31 09:14:46 +02:00
return dictionary
.put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE)
2021-09-30 18:25:36 +02:00
.handle(this::deserializeValue)
2021-08-31 09:14:46 +02:00
.map(oldValue -> !Objects.equals(oldValue, value))
.defaultIfEmpty(value != null);
}
2021-01-31 21:23:43 +01:00
@Override
public Mono<Void> remove(T keySuffix) {
2021-11-08 16:33:41 +01:00
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix).send());
2021-08-31 09:14:46 +02:00
return dictionary
.remove(keyMono, LLDictionaryResultType.VOID)
.doOnNext(Send::close)
.then();
}
2021-01-31 21:23:43 +01:00
@Override
public Mono<U> removeAndGetPrevious(T keySuffix) {
2021-11-08 16:33:41 +01:00
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix).send());
2021-10-19 00:22:05 +02:00
return dictionary.remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE).handle(this::deserializeValue);
}
2021-01-31 21:23:43 +01:00
@Override
public Mono<Boolean> removeAndGetStatus(T keySuffix) {
2021-11-08 16:33:41 +01:00
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix).send());
2021-08-31 09:14:46 +02:00
return dictionary
.remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE_EXISTENCE)
.map(LLUtils::responseToBoolean);
}
@Override
2021-11-08 10:49:59 +01:00
public Flux<Optional<U>> getMulti(@Nullable CompositeSnapshot snapshot, Flux<T> keys, boolean existsAlmostCertainly) {
2021-08-31 09:14:46 +02:00
var mappedKeys = keys
2021-11-08 10:49:59 +01:00
.<Send<Buffer>>handle((keySuffix, sink) -> {
2021-08-28 22:42:51 +02:00
try {
2021-11-08 16:33:41 +01:00
sink.next(serializeKeySuffixToKey(keySuffix).send());
2021-10-19 00:22:05 +02:00
} catch (Throwable ex) {
2021-08-31 09:14:46 +02:00
sink.error(ex);
2021-08-22 21:23:22 +02:00
}
2021-08-31 09:14:46 +02:00
});
return dictionary
.getMulti(resolveSnapshot(snapshot), mappedKeys, existsAlmostCertainly)
2021-11-08 10:49:59 +01:00
.<Optional<U>>handle((valueBufOpt, sink) -> {
2021-08-31 09:14:46 +02:00
try {
Optional<U> valueOpt;
2021-11-08 10:49:59 +01:00
if (valueBufOpt.isPresent()) {
2021-11-08 16:33:41 +01:00
valueOpt = Optional.of(valueSerializer.deserialize(valueBufOpt.get()));
2021-08-31 09:14:46 +02:00
} else {
valueOpt = Optional.empty();
}
2021-11-08 10:49:59 +01:00
sink.next(valueOpt);
2021-10-19 00:22:05 +02:00
} catch (Throwable ex) {
2021-08-31 09:14:46 +02:00
sink.error(ex);
2021-09-08 00:22:39 +02:00
} finally {
2021-11-08 16:33:41 +01:00
valueBufOpt.ifPresent(Resource::close);
2021-08-31 09:14:46 +02:00
}
2022-01-26 14:22:54 +01:00
});
2021-05-02 19:18:15 +02:00
}
2021-11-08 16:33:41 +01:00
private LLEntry serializeEntry(T keySuffix, U value) throws SerializationException {
var key = serializeKeySuffixToKey(keySuffix);
try {
var serializedValue = serializeValue(value);
return LLEntry.of(key, serializedValue);
} catch (Throwable t) {
key.close();
throw t;
2021-05-02 19:18:15 +02:00
}
}
2021-10-19 00:22:05 +02:00
private void serializeEntrySink(Entry<T,U> entry, SynchronousSink<Send<LLEntry>> sink) {
try {
2021-11-08 16:33:41 +01:00
sink.next(serializeEntry(entry.getKey(), entry.getValue()).send());
2021-10-19 00:22:05 +02:00
} catch (Throwable e) {
sink.error(e);
}
}
@Override
public Mono<Void> putMulti(Flux<Entry<T, U>> entries) {
2021-05-02 19:18:15 +02:00
var serializedEntries = entries
2021-08-31 09:14:46 +02:00
.<Send<LLEntry>>handle((entry, sink) -> {
2021-08-28 22:42:51 +02:00
try {
2021-11-08 16:33:41 +01:00
sink.next(serializeEntry(entry.getKey(), entry.getValue()).send());
2021-10-19 00:22:05 +02:00
} catch (Throwable e) {
2021-08-28 22:42:51 +02:00
sink.error(e);
}
2021-09-08 00:22:39 +02:00
});
2022-01-26 19:03:51 +01:00
return dictionary.putMulti(serializedEntries);
}
2021-07-17 11:52:08 +02:00
@Override
2021-11-08 10:49:59 +01:00
public Flux<Boolean> updateMulti(Flux<T> keys,
KVSerializationFunction<T, @Nullable U, @Nullable U> updater) {
var sharedKeys = keys.publish().refCount(2);
2022-01-26 14:22:54 +01:00
var serializedKeys = sharedKeys.<Send<Buffer>>handle((key, sink) -> {
try {
Send<Buffer> serializedKey = serializeKeySuffixToKey(key).send();
sink.next(serializedKey);
} catch (Throwable ex) {
sink.error(ex);
}
});
2021-07-17 11:52:08 +02:00
var serializedUpdater = getSerializedUpdater(updater);
2021-11-08 10:49:59 +01:00
return dictionary.updateMulti(sharedKeys, serializedKeys, serializedUpdater);
2021-07-17 11:52:08 +02:00
}
@Override
2021-01-31 21:23:43 +01:00
public Flux<Entry<T, DatabaseStageEntry<U>>> getAllStages(@Nullable CompositeSnapshot snapshot) {
return dictionary
.getRangeKeys(resolveSnapshot(snapshot), rangeMono)
.handle((keyBufToReceive, sink) -> {
2021-11-08 16:33:41 +01:00
var keyBuf = keyBufToReceive.receive();
try {
assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
// Remove prefix. Keep only the suffix and the ext
2021-10-19 00:22:05 +02:00
splitPrefix(keyBuf).close();
suffixKeyLengthConsistency(keyBuf.readableBytes());
T keySuffix;
try (var keyBufCopy = keyBuf.copy()) {
keySuffix = deserializeSuffix(keyBufCopy);
}
2021-11-08 16:33:41 +01:00
var subStage = new DatabaseSingle<>(dictionary, toKey(keyBuf), valueSerializer, null);
2021-10-19 00:22:05 +02:00
sink.next(Map.entry(keySuffix, subStage));
} catch (Throwable ex) {
2021-11-08 16:33:41 +01:00
keyBuf.close();
2021-08-31 09:14:46 +02:00
sink.error(ex);
2021-05-02 19:18:15 +02:00
}
});
}
2021-03-22 20:02:19 +01:00
@Override
public Flux<Entry<T, U>> getAllValues(@Nullable CompositeSnapshot snapshot) {
return dictionary
.getRange(resolveSnapshot(snapshot), rangeMono)
2021-08-31 09:14:46 +02:00
.<Entry<T, U>>handle((serializedEntryToReceive, sink) -> {
2021-10-19 00:22:05 +02:00
try {
Entry<T, U> entry;
try (var serializedEntry = serializedEntryToReceive.receive()) {
2021-11-08 16:33:41 +01:00
var keyBuf = serializedEntry.getKeyUnsafe();
assert keyBuf != null;
assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
// Remove prefix. Keep only the suffix and the ext
splitPrefix(keyBuf).close();
suffixKeyLengthConsistency(keyBuf.readableBytes());
T keySuffix = deserializeSuffix(keyBuf);
assert serializedEntry.getValueUnsafe() != null;
U value = valueSerializer.deserialize(serializedEntry.getValueUnsafe());
entry = Map.entry(keySuffix, value);
}
2021-10-19 00:22:05 +02:00
sink.next(entry);
} catch (Throwable e) {
2021-08-22 21:23:22 +02:00
sink.error(e);
}
});
2021-03-22 20:02:19 +01:00
}
2021-01-31 15:47:48 +01:00
@Override
public Flux<Entry<T, U>> setAllValuesAndGetPrevious(Flux<Entry<T, U>> entries) {
2021-08-29 01:15:51 +02:00
return Flux.concat(
this.getAllValues(null),
2021-10-19 00:22:05 +02:00
dictionary.setRange(rangeMono, entries.handle(this::serializeEntrySink)).then(Mono.empty())
2021-08-22 21:23:22 +02:00
);
2021-01-31 15:47:48 +01:00
}
2021-03-14 03:13:19 +01:00
@Override
public Mono<Void> clear() {
2021-08-31 09:14:46 +02:00
if (range.isAll()) {
return dictionary.clear();
} else if (range.isSingle()) {
return dictionary
.remove(Mono.fromCallable(range::getSingle), LLDictionaryResultType.VOID)
.doOnNext(Send::close)
.then();
} else {
return dictionary.setRange(rangeMono, Flux.empty());
}
}
}