From 63cd178988bf9e82abdc76a7e7c8700fbb9f8627 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Mon, 8 Nov 2021 10:49:59 +0100 Subject: [PATCH] Use merge operator when possible --- .../it/cavallium/dbengine/client/Hits.java | 26 ++- .../database/ExtraKeyOperationResult.java | 3 - .../dbengine/database/KeyOperationResult.java | 3 - .../dbengine/database/LLDictionary.java | 16 +- .../dbengine/database/LLLuceneIndex.java | 5 + .../collections/DatabaseMapDictionary.java | 57 +++--- .../collections/DatabaseStageMap.java | 48 ++--- .../collections/ValueTransformer.java | 2 +- .../database/disk/LLLocalDictionary.java | 171 +++++++----------- .../database/memory/LLMemoryDictionary.java | 32 ++-- .../BiSerializationFunction.java | 7 - .../KVSerializationFunction.java | 7 + .../cavallium/dbengine/TestDictionaryMap.java | 50 +++-- .../dbengine/TestDictionaryMapDeep.java | 47 +++-- 14 files changed, 215 insertions(+), 259 deletions(-) delete mode 100644 src/main/java/it/cavallium/dbengine/database/ExtraKeyOperationResult.java delete mode 100644 src/main/java/it/cavallium/dbengine/database/KeyOperationResult.java delete mode 100644 src/main/java/it/cavallium/dbengine/database/serialization/BiSerializationFunction.java create mode 100644 src/main/java/it/cavallium/dbengine/database/serialization/KVSerializationFunction.java diff --git a/src/main/java/it/cavallium/dbengine/client/Hits.java b/src/main/java/it/cavallium/dbengine/client/Hits.java index e8b3918..10ab45a 100644 --- a/src/main/java/it/cavallium/dbengine/client/Hits.java +++ b/src/main/java/it/cavallium/dbengine/client/Hits.java @@ -76,15 +76,23 @@ public final class Hits extends ResourceSupport, Hits> { ValueTransformer valueTransformer) { return resultToReceive -> { var result = resultToReceive.receive(); - var hitsToTransform = result.results().map(hit -> Tuples.of(hit.score(), hit.key())); - var transformed = valueTransformer - .transform(hitsToTransform) - .filter(tuple3 -> tuple3.getT3().isPresent()) - .map(tuple3 -> new LazyHitEntry<>(Mono.just(tuple3.getT2()), - Mono.just(tuple3.getT3().orElseThrow()), - tuple3.getT1() - )); - return new Hits<>(transformed, result.totalHitsCount(), result::close).send(); + + var sharedHitsFlux = result.results().publish().refCount(3); + var scoresFlux = sharedHitsFlux.map(HitKey::score); + var keysFlux = sharedHitsFlux.map(HitKey::key); + + var valuesFlux = valueTransformer.transform(keysFlux); + + var transformedFlux = Flux.zip((Object[] data) -> { + //noinspection unchecked + var keyMono = Mono.just((T) data[0]); + //noinspection unchecked + var valMono = Mono.just((U) data[1]); + var score = (Float) data[2]; + return new LazyHitEntry<>(keyMono, valMono, score); + }, keysFlux, valuesFlux, scoresFlux); + + return new Hits<>(transformedFlux, result.totalHitsCount(), result::close).send(); }; } diff --git a/src/main/java/it/cavallium/dbengine/database/ExtraKeyOperationResult.java b/src/main/java/it/cavallium/dbengine/database/ExtraKeyOperationResult.java deleted file mode 100644 index 8bea532..0000000 --- a/src/main/java/it/cavallium/dbengine/database/ExtraKeyOperationResult.java +++ /dev/null @@ -1,3 +0,0 @@ -package it.cavallium.dbengine.database; - -public record ExtraKeyOperationResult(T key, X extra, boolean changed) {} diff --git a/src/main/java/it/cavallium/dbengine/database/KeyOperationResult.java b/src/main/java/it/cavallium/dbengine/database/KeyOperationResult.java deleted file mode 100644 index 697b21c..0000000 --- a/src/main/java/it/cavallium/dbengine/database/KeyOperationResult.java +++ /dev/null @@ -1,3 +0,0 @@ -package it.cavallium.dbengine.database; - -public record KeyOperationResult(T key, boolean changed) {} diff --git a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java index 69d9403..b8d2df7 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java @@ -4,7 +4,7 @@ import io.net5.buffer.api.Buffer; import io.net5.buffer.api.BufferAllocator; import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.BadBlock; -import it.cavallium.dbengine.database.serialization.BiSerializationFunction; +import it.cavallium.dbengine.database.serialization.KVSerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationFunction; import java.util.List; import java.util.Optional; @@ -13,8 +13,6 @@ import org.jetbrains.annotations.Nullable; import org.warp.commonutils.concurrency.atomicity.NotAtomic; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.util.function.Tuple2; -import reactor.util.function.Tuple3; @SuppressWarnings("unused") @NotAtomic @@ -62,19 +60,19 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure { Mono> remove(Mono> key, LLDictionaryResultType resultType); - Flux, Optional>>> getMulti(@Nullable LLSnapshot snapshot, - Flux>> keys, + Flux>> getMulti(@Nullable LLSnapshot snapshot, + Flux> keys, boolean existsAlmostCertainly); - default Flux, Optional>>> getMulti(@Nullable LLSnapshot snapshot, - Flux>> keys) { + default Flux>> getMulti(@Nullable LLSnapshot snapshot, + Flux> keys) { return getMulti(snapshot, keys, false); } Flux> putMulti(Flux> entries, boolean getOldValues); - Flux, X>> updateMulti(Flux, X>> entries, - BiSerializationFunction, X, Send> updateFunction); + Flux updateMulti(Flux keys, Flux> serializedKeys, + KVSerializationFunction, @Nullable Send> updateFunction); Flux> getRange(@Nullable LLSnapshot snapshot, Mono> range, boolean existsAlmostCertainly); diff --git a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java index e2dc91c..1ec559d 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java @@ -20,6 +20,11 @@ public interface LLLuceneIndex extends LLSnapshottable { Mono addDocument(LLTerm id, LLUpdateDocument doc); + /** + * WARNING! This operation is atomic! + * Please don't send infinite or huge documents fluxes, because they will + * be kept in ram all at once. + */ Mono addDocuments(Flux> documents); Mono deleteDocument(LLTerm id); diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index 6829e8f..c39782e 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -5,14 +5,13 @@ import io.net5.buffer.api.Send; import io.net5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.Delta; -import it.cavallium.dbengine.database.ExtraKeyOperationResult; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLEntry; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateReturnMode; -import it.cavallium.dbengine.database.serialization.BiSerializationFunction; +import it.cavallium.dbengine.database.serialization.KVSerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.database.serialization.Serializer; @@ -244,16 +243,16 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep BiSerializationFunction<@Nullable Send, X, @Nullable Send> getSerializedUpdater( - BiSerializationFunction<@Nullable U, X, @Nullable U> updater) { - return (oldSerialized, extra) -> { + public KVSerializationFunction<@NotNull T, @Nullable Send, @Nullable Send> getSerializedUpdater( + KVSerializationFunction<@NotNull T, @Nullable U, @Nullable U> updater) { + return (key, oldSerialized) -> { try (oldSerialized) { U result; if (oldSerialized == null) { - result = updater.apply(null, extra); + result = updater.apply(key, null); } else { try (var oldSerializedReceived = oldSerialized.receive()) { - result = updater.apply(valueSerializer.deserialize(oldSerializedReceived), extra); + result = updater.apply(key, valueSerializer.deserialize(oldSerializedReceived)); } } if (result == null) { @@ -307,34 +306,33 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep>> getMulti(@Nullable CompositeSnapshot snapshot, Flux keys, boolean existsAlmostCertainly) { + public Flux> getMulti(@Nullable CompositeSnapshot snapshot, Flux keys, boolean existsAlmostCertainly) { var mappedKeys = keys - .>>handle((keySuffix, sink) -> { + .>handle((keySuffix, sink) -> { try { - Tuple2> tuple = Tuples.of(keySuffix, serializeKeySuffixToKey(keySuffix)); - sink.next(tuple); + var buf = serializeKeySuffixToKey(keySuffix); + sink.next(buf); } catch (Throwable ex) { sink.error(ex); } }); return dictionary .getMulti(resolveSnapshot(snapshot), mappedKeys, existsAlmostCertainly) - .>>handle((entry, sink) -> { + .>handle((valueBufOpt, sink) -> { try { Optional valueOpt; - if (entry.getT3().isPresent()) { - try (var buf = entry.getT3().get().receive()) { + if (valueBufOpt.isPresent()) { + try (var buf = valueBufOpt.get().receive()) { valueOpt = Optional.of(valueSerializer.deserialize(buf)); } } else { valueOpt = Optional.empty(); } - sink.next(Map.entry(entry.getT1(), valueOpt)); + sink.next(valueOpt); } catch (Throwable ex) { sink.error(ex); } finally { - entry.getT2().close(); - entry.getT3().ifPresent(Send::close); + valueBufOpt.ifPresent(Send::close); } }) .transform(LLUtils::handleDiscard); @@ -384,13 +382,14 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep Flux> updateMulti(Flux> entries, - BiSerializationFunction<@Nullable U, X, @Nullable U> updater) { - var serializedEntries = entries - ., X>>handle((entry, sink) -> { + public Flux updateMulti(Flux keys, + KVSerializationFunction updater) { + var sharedKeys = keys.publish().refCount(2); + var serializedKeys = sharedKeys + .>handle((key, sink) -> { try { - Send serializedKey = serializeKeySuffixToKey(entry.getT1()); - sink.next(Tuples.of(serializedKey, entry.getT2())); + Send serializedKey = serializeKeySuffixToKey(key); + sink.next(serializedKey); } catch (Throwable ex) { sink.error(ex); } @@ -404,17 +403,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep { - try { - T keySuffix; - try (var keySuffixBuf = result.key().receive()) { - keySuffix = deserializeSuffix(keySuffixBuf); - } - sink.next(new ExtraKeyOperationResult<>(keySuffix, result.extra(), result.changed())); - } catch (Throwable ex) { - sink.error(ex); - } - }); + return dictionary.updateMulti(sharedKeys, serializedKeys, serializedUpdater); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java index 6588646..cc4c8e0 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java @@ -3,12 +3,10 @@ package it.cavallium.dbengine.database.collections; import io.net5.buffer.api.Buffer; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.Delta; -import it.cavallium.dbengine.database.ExtraKeyOperationResult; -import it.cavallium.dbengine.database.KeyOperationResult; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateReturnMode; -import it.cavallium.dbengine.database.serialization.BiSerializationFunction; +import it.cavallium.dbengine.database.serialization.KVSerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; import java.util.HashMap; @@ -16,16 +14,11 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; -import java.util.Queue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.function.BiFunction; import java.util.function.Function; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; -import reactor.util.function.Tuple3; import reactor.util.function.Tuples; @SuppressWarnings("unused") @@ -61,13 +54,8 @@ public interface DatabaseStageMap> extends Dat stage -> stage.update(updater, updateReturnMode, existsAlmostCertainly), true); } - default Flux> updateMulti(Flux> entries, - BiSerializationFunction<@Nullable U, X, @Nullable U> updater) { - return entries - .flatMapSequential(entry -> this - .updateValue(entry.getT1(), prevValue -> updater.apply(prevValue, entry.getT2())) - .map(changed -> new ExtraKeyOperationResult<>(entry.getT1(), entry.getT2(), changed)) - ); + default Flux updateMulti(Flux keys, KVSerializationFunction updater) { + return keys.flatMapSequential(key -> this.updateValue(key, prevValue -> updater.apply(key, prevValue))); } default Mono updateValue(T key, UpdateReturnMode updateReturnMode, SerializationFunction<@Nullable U, @Nullable U> updater) { @@ -119,13 +107,11 @@ public interface DatabaseStageMap> extends Dat /** * GetMulti must return the elements in sequence! */ - default Flux>> getMulti(@Nullable CompositeSnapshot snapshot, Flux keys, boolean existsAlmostCertainly) { + default Flux> getMulti(@Nullable CompositeSnapshot snapshot, Flux keys, boolean existsAlmostCertainly) { return keys - .flatMapSequential(key -> this - .getValue(snapshot, key, existsAlmostCertainly) - .map(value -> Map.entry(key, Optional.of(value))) - .switchIfEmpty(Mono.fromSupplier(() -> Map.entry(key, Optional.empty()))) - ) + .flatMapSequential(key -> this.getValue(snapshot, key, existsAlmostCertainly)) + .map(Optional::of) + .defaultIfEmpty(Optional.empty()) .doOnDiscard(Entry.class, unknownEntry -> { if (unknownEntry.getValue() instanceof Optional optionalBuffer && optionalBuffer.isPresent() @@ -138,7 +124,7 @@ public interface DatabaseStageMap> extends Dat /** * GetMulti must return the elements in sequence! */ - default Flux>> getMulti(@Nullable CompositeSnapshot snapshot, Flux keys) { + default Flux> getMulti(@Nullable CompositeSnapshot snapshot, Flux keys) { return getMulti(snapshot, keys, false); } @@ -293,20 +279,10 @@ public interface DatabaseStageMap> extends Dat } default ValueTransformer getAsyncDbValueTransformer(@Nullable CompositeSnapshot snapshot) { - return new ValueTransformer<>() { - @Override - public Flux>> transform(Flux> keys) { - return Flux.defer(() -> { - ConcurrentLinkedQueue extraValues = new ConcurrentLinkedQueue<>(); - return getMulti(snapshot, keys.map(key -> { - extraValues.add(key.getT1()); - return key.getT2(); - })).map(result -> { - var extraValue = extraValues.remove(); - return Tuples.of(extraValue, result.getKey(), result.getValue()); - }); - }); - } + return keys -> { + var sharedKeys = keys.publish().refCount(2); + var values = getMulti(snapshot, sharedKeys); + return Flux.zip(sharedKeys, values, Map::entry); }; } } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/ValueTransformer.java b/src/main/java/it/cavallium/dbengine/database/collections/ValueTransformer.java index 6049fe6..ea2a074 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/ValueTransformer.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/ValueTransformer.java @@ -12,5 +12,5 @@ public interface ValueTransformer { /** * Can return Flux error IOException */ - Flux>> transform(Flux> keys); + Flux>> transform(Flux keys); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index fd99551..5c8233c 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -9,16 +9,13 @@ import static java.util.Objects.requireNonNullElse; import io.net5.buffer.api.Buffer; import io.net5.buffer.api.BufferAllocator; -import io.net5.buffer.api.MemoryManager; import io.net5.buffer.api.Resource; import io.net5.buffer.api.Send; -import io.net5.buffer.api.StandardAllocationTypes; import io.net5.buffer.api.internal.ResourceSupport; import io.net5.util.internal.PlatformDependent; import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.client.DatabaseOptions; import it.cavallium.dbengine.database.Column; -import it.cavallium.dbengine.database.ExtraKeyOperationResult; import it.cavallium.dbengine.database.LLDelta; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; @@ -29,7 +26,7 @@ import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.SafeCloseable; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateReturnMode; -import it.cavallium.dbengine.database.serialization.BiSerializationFunction; +import it.cavallium.dbengine.database.serialization.KVSerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationFunction; import java.io.IOException; import java.nio.ByteBuffer; @@ -57,8 +54,6 @@ import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; import org.rocksdb.Slice; import org.rocksdb.Snapshot; -import org.rocksdb.TransactionDB; -import org.rocksdb.TransactionOptions; import org.rocksdb.WriteBatch; import org.rocksdb.WriteOptions; import org.warp.commonutils.concurrency.atomicity.NotAtomic; @@ -491,8 +486,8 @@ public class LLLocalDictionary implements LLDictionary { } @Override - public Flux, Optional>>> getMulti(@Nullable LLSnapshot snapshot, - Flux>> keys, + public Flux>> getMulti(@Nullable LLSnapshot snapshot, + Flux> keys, boolean existsAlmostCertainly) { return keys .buffer(MULTI_GET_WINDOW) @@ -509,58 +504,46 @@ public class LLLocalDictionary implements LLDictionary { resource.close(); } }) - .flatMapSequential(keysWindow -> { - List> keyBufsWindowSend = new ArrayList<>(keysWindow.size()); - for (Tuple2> objects : keysWindow) { - keyBufsWindowSend.add(objects.getT2()); + .flatMapSequential(keysWindow -> runOnDb(() -> { + List keyBufsWindow = new ArrayList<>(keysWindow.size()); + for (Send bufferSend : keysWindow) { + keyBufsWindow.add(bufferSend.receive()); } - return runOnDb(() -> { - List keyBufsWindow = new ArrayList<>(keyBufsWindowSend.size()); - for (Send bufferSend : keyBufsWindowSend) { - keyBufsWindow.add(bufferSend.receive()); + try { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called getMulti in a nonblocking thread"); } - try { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called getMulti in a nonblocking thread"); - } - var readOptions = Objects.requireNonNullElse(resolveSnapshot(snapshot), EMPTY_READ_OPTIONS); - List results = db.multiGetAsList(readOptions, LLUtils.toArray(keyBufsWindow)); - var mappedResults = new ArrayList, Optional>>>(results.size()); - for (int i = 0; i < results.size(); i++) { - byte[] val = results.get(i); - Optional valueOpt; - if (val != null) { - results.set(i, null); - valueOpt = Optional.of(LLUtils.fromByteArray(alloc, val)); - } else { - valueOpt = Optional.empty(); - } - mappedResults.add(Tuples.of(keysWindow.get(i).getT1(), - keyBufsWindow.get(i).send(), - valueOpt.map(Resource::send) - )); - } - return mappedResults; - } finally { - for (Buffer buffer : keyBufsWindow) { - buffer.close(); + var readOptions = Objects.requireNonNullElse(resolveSnapshot(snapshot), EMPTY_READ_OPTIONS); + List results = db.multiGetAsList(readOptions, LLUtils.toArray(keyBufsWindow)); + var mappedResults = new ArrayList>>(results.size()); + for (int i = 0; i < results.size(); i++) { + byte[] val = results.get(i); + Optional valueOpt; + if (val != null) { + // free memory + results.set(i, null); + + valueOpt = Optional.of(LLUtils.fromByteArray(alloc, val)); + } else { + valueOpt = Optional.empty(); } + mappedResults.add(valueOpt.map(Resource::send)); } - }) - .flatMapIterable(list -> list) - .onErrorMap(cause -> new IOException("Failed to read keys", cause)) - .doAfterTerminate(() -> keyBufsWindowSend.forEach(Send::close)); - }, 2) // Max concurrency is 2 to read data while preparing the next segment + return mappedResults; + } finally { + for (Buffer buffer : keyBufsWindow) { + buffer.close(); + } + } + }) + .flatMapIterable(list -> list) + .onErrorMap(cause -> new IOException("Failed to read keys", cause)) + .doAfterTerminate(() -> keysWindow.forEach(Send::close)), 2) // Max concurrency is 2 to read data while preparing the next segment .doOnDiscard(LLEntry.class, ResourceSupport::close) - .doOnDiscard(Tuple3.class, discardedEntry -> { - if (discardedEntry.getT2() instanceof Buffer bb) { + .doOnDiscard(Optional.class, opt -> { + if (opt.isPresent() && opt.get() instanceof Buffer bb) { bb.close(); } - if (discardedEntry.getT2() instanceof Optional opt) { - if (opt.isPresent() && opt.get() instanceof Buffer bb) { - bb.close(); - } - } }); } @@ -634,24 +617,24 @@ public class LLLocalDictionary implements LLDictionary { } @Override - public Flux, X>> updateMulti(Flux, X>> entries, - BiSerializationFunction, X, Send> updateFunction) { - return entries + public Flux updateMulti(Flux keys, Flux> serializedKeys, + KVSerializationFunction, @Nullable Send> updateFunction) { + return Flux.zip(keys, serializedKeys) .buffer(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP)) - .flatMapSequential(ew -> this., X>>>runOnDb(() -> { - List> entriesWindow = new ArrayList<>(ew.size()); - for (Tuple2, X> tuple : ew) { - entriesWindow.add(tuple.mapT1(Send::receive)); + .flatMapSequential(ew -> this.>runOnDb(() -> { + List> entriesWindow = new ArrayList<>(ew.size()); + for (Tuple2> tuple : ew) { + entriesWindow.add(tuple.mapT2(Send::receive)); } try { if (Schedulers.isInNonBlockingThread()) { throw new UnsupportedOperationException("Called updateMulti in a nonblocking thread"); } List keyBufsWindow = new ArrayList<>(entriesWindow.size()); - for (Tuple2 objects : entriesWindow) { - keyBufsWindow.add(objects.getT1()); + for (Tuple2 objects : entriesWindow) { + keyBufsWindow.add(objects.getT2()); } - ArrayList, X, Optional>>> mappedInputs; + ArrayList, Optional>>> mappedInputs; { var readOptions = Objects.requireNonNullElse(resolveSnapshot(null), EMPTY_READ_OPTIONS); var inputs = db.multiGetAsList(readOptions, LLUtils.toArray(keyBufsWindow)); @@ -661,30 +644,38 @@ public class LLLocalDictionary implements LLDictionary { if (val != null) { inputs.set(i, null); mappedInputs.add(Tuples.of( + entriesWindow.get(i).getT1(), keyBufsWindow.get(i).send(), - entriesWindow.get(i).getT2(), Optional.of(fromByteArray(alloc, val).send()) )); } else { mappedInputs.add(Tuples.of( + entriesWindow.get(i).getT1(), keyBufsWindow.get(i).send(), - entriesWindow.get(i).getT2(), Optional.empty() )); } } } var updatedValuesToWrite = new ArrayList>(mappedInputs.size()); - var valueChangedResult = new ArrayList, X>>(mappedInputs.size()); + var valueChangedResult = new ArrayList(mappedInputs.size()); try { for (var mappedInput : mappedInputs) { - try (var updatedValue = updateFunction - .apply(mappedInput.getT1(), mappedInput.getT2()).receive()) { - try (var t3 = mappedInput.getT3().map(Send::receive).orElse(null)) { - valueChangedResult.add(new ExtraKeyOperationResult<>(mappedInput.getT1(), - mappedInput.getT2(), !LLUtils.equals(t3, updatedValue))); + try (var updatedValueToReceive = updateFunction + .apply(mappedInput.getT1(), mappedInput.getT2())) { + if (updatedValueToReceive != null) { + try (var updatedValue = updatedValueToReceive.receive()) { + try (var t3 = mappedInput.getT3().map(Send::receive).orElse(null)) { + valueChangedResult.add(!LLUtils.equals(t3, updatedValue)); + } + updatedValuesToWrite.add(updatedValue.send()); + } + } else { + try (var t3 = mappedInput.getT3().map(Send::receive).orElse(null)) { + valueChangedResult.add(!LLUtils.equals(t3, null)); + } + updatedValuesToWrite.add(null); } - updatedValuesToWrite.add(updatedValue.send()); } } } finally { @@ -702,12 +693,12 @@ public class LLLocalDictionary implements LLDictionary { BATCH_WRITE_OPTIONS ); int i = 0; - for (Tuple2 entry : entriesWindow) { + for (Tuple2 entry : entriesWindow) { var valueToWrite = updatedValuesToWrite.get(i); if (valueToWrite == null) { - batch.delete(cfh, entry.getT1().send()); + batch.delete(cfh, entry.getT2().send()); } else { - batch.put(cfh, entry.getT1().send(), valueToWrite); + batch.put(cfh, entry.getT2().send(), valueToWrite); } i++; } @@ -715,15 +706,15 @@ public class LLLocalDictionary implements LLDictionary { batch.close(); } else { int i = 0; - for (Tuple2 entry : entriesWindow) { - db.put(EMPTY_WRITE_OPTIONS, entry.getT1().send(), updatedValuesToWrite.get(i)); + for (Tuple2 entry : entriesWindow) { + db.put(EMPTY_WRITE_OPTIONS, entry.getT2().send(), updatedValuesToWrite.get(i)); i++; } } return valueChangedResult; } finally { - for (Tuple2 tuple : entriesWindow) { - tuple.getT1().close(); + for (Tuple2 tuple : entriesWindow) { + tuple.getT2().close(); } } }).flatMapIterable(list -> list), /* Max concurrency is 2 to update data while preparing the next segment */ 2) @@ -734,28 +725,6 @@ public class LLLocalDictionary implements LLDictionary { if (entry.getT2() instanceof Buffer bb) { bb.close(); } - }) - .doOnDiscard(ExtraKeyOperationResult.class, entry -> { - if (entry.key() instanceof Buffer bb) { - bb.close(); - } - if (entry.extra() instanceof Buffer bb) { - bb.close(); - } - }) - .doOnDiscard(List.class, obj -> { - if (!obj.isEmpty() && obj.get(0) instanceof ExtraKeyOperationResult) { - //noinspection unchecked - var castedEntries = (List>) obj; - for (ExtraKeyOperationResult entry : castedEntries) { - if (entry.key() instanceof Resource bb) { - bb.close(); - } - if (entry.extra() instanceof Resource bb) { - bb.close(); - } - } - } }); } diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java index 0de957d..d46d59f 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java @@ -4,7 +4,6 @@ import io.net5.buffer.api.Buffer; import io.net5.buffer.api.BufferAllocator; import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.BadBlock; -import it.cavallium.dbengine.database.ExtraKeyOperationResult; import it.cavallium.dbengine.database.LLDelta; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; @@ -13,7 +12,7 @@ import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateMode; -import it.cavallium.dbengine.database.serialization.BiSerializationFunction; +import it.cavallium.dbengine.database.serialization.KVSerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.unimi.dsi.fastutil.bytes.ByteList; @@ -254,20 +253,18 @@ public class LLMemoryDictionary implements LLDictionary { } @Override - public Flux, Optional>>> getMulti(@Nullable LLSnapshot snapshot, - Flux>> keys, + public Flux>> getMulti(@Nullable LLSnapshot snapshot, Flux> keys, boolean existsAlmostCertainly) { - return keys - .map(key -> { - try (var t2 = key.getT2().receive()) { - ByteList v = snapshots.get(resolveSnapshot(snapshot)).get(k(t2.copy().send())); - if (v != null) { - return Tuples.of(key.getT1(), t2.send(), Optional.of(kk(v))); - } else { - return Tuples.of(key.getT1(), t2.send(), Optional.empty()); - } - } - }); + return keys.map(key -> { + try (var t2 = key.receive()) { + ByteList v = snapshots.get(resolveSnapshot(snapshot)).get(k(t2.copy().send())); + if (v != null) { + return Optional.of(kk(v)); + } else { + return Optional.empty(); + } + } + }); } @Override @@ -287,8 +284,9 @@ public class LLMemoryDictionary implements LLDictionary { } @Override - public Flux, X>> updateMulti(Flux, X>> entries, - BiSerializationFunction, X, Send> updateFunction) { + public Flux updateMulti(Flux keys, + Flux> serializedKeys, + KVSerializationFunction, @Nullable Send> updateFunction) { return Flux.error(new UnsupportedOperationException("Not implemented")); } diff --git a/src/main/java/it/cavallium/dbengine/database/serialization/BiSerializationFunction.java b/src/main/java/it/cavallium/dbengine/database/serialization/BiSerializationFunction.java deleted file mode 100644 index 9f22762..0000000 --- a/src/main/java/it/cavallium/dbengine/database/serialization/BiSerializationFunction.java +++ /dev/null @@ -1,7 +0,0 @@ -package it.cavallium.dbengine.database.serialization; - -@FunctionalInterface -public interface BiSerializationFunction { - - U apply(T1 argument1, T2 argument2) throws SerializationException; -} diff --git a/src/main/java/it/cavallium/dbengine/database/serialization/KVSerializationFunction.java b/src/main/java/it/cavallium/dbengine/database/serialization/KVSerializationFunction.java new file mode 100644 index 0000000..a50f061 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/serialization/KVSerializationFunction.java @@ -0,0 +1,7 @@ +package it.cavallium.dbengine.database.serialization; + +@FunctionalInterface +public interface KVSerializationFunction { + + UV apply(K key, V value) throws SerializationException; +} diff --git a/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java b/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java index b944555..e39dd70 100644 --- a/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java +++ b/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java @@ -390,14 +390,18 @@ public abstract class TestDictionaryMap { Step> stpVer = StepVerifier .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) - .flatMapMany(map -> Flux - .concat( - map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()), - map.getMulti(null, Flux.fromIterable(entries.keySet())) - ) - .doFinally(s -> map.close()) - ) - .filter(k -> k.getValue().isPresent()) + .flatMapMany(map -> { + var entriesFlux = Flux.fromIterable(entries.entrySet()); + var keysFlux = entriesFlux.map(Entry::getKey); + var resultsFlux = Flux + .concat( + map.putMulti(entriesFlux).then(Mono.empty()), + map.getMulti(null, keysFlux) + ); + return Flux.zip(keysFlux, resultsFlux, Map::entry).doFinally(s -> map.close()); + }) + + .filter(entry -> entry.getValue().isPresent()) .map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())) .transform(LLUtils::handleDiscard) )); @@ -420,11 +424,14 @@ public abstract class TestDictionaryMap { Step> stpVer = StepVerifier .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) - .flatMapMany(map -> map - .setAllValues(Flux.fromIterable(entries.entrySet())) - .thenMany(map.getMulti(null, Flux.fromIterable(entries.keySet()))) - .doFinally(s -> map.close()) - ) + .flatMapMany(map -> { + var entriesFlux = Flux.fromIterable(entries.entrySet()); + var keysFlux = entriesFlux.map(Entry::getKey); + var resultsFlux = map + .setAllValues(entriesFlux) + .thenMany(map.getMulti(null, keysFlux)); + return Flux.zip(keysFlux, resultsFlux, Map::entry).doFinally(s -> map.close()); + }) .filter(k -> k.getValue().isPresent()) .map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())) .transform(LLUtils::handleDiscard) @@ -476,13 +483,16 @@ public abstract class TestDictionaryMap { Step> stpVer = StepVerifier .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5)) - .flatMapMany(map -> Flux - .concat( - map.set(entries).then(Mono.empty()), - map.getMulti(null, Flux.fromIterable(entries.keySet())) - ) - .doFinally(s -> map.close()) - ) + .flatMapMany(map -> { + var entriesFlux = Flux.fromIterable(entries.entrySet()); + var keysFlux = entriesFlux.map(Entry::getKey); + var resultsFlux = Flux + .concat( + map.set(entries).then(Mono.empty()), + map.getMulti(null, Flux.fromIterable(entries.keySet())) + ); + return Flux.zip(keysFlux, resultsFlux, Map::entry).doFinally(s -> map.close()); + }) .filter(k -> k.getValue().isPresent()) .map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())) .transform(LLUtils::handleDiscard) diff --git a/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeep.java b/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeep.java index 0083429..0150cdd 100644 --- a/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeep.java +++ b/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeep.java @@ -769,13 +769,16 @@ public abstract class TestDictionaryMapDeep { Step>> stpVer = StepVerifier .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) - .flatMapMany(map -> Flux - .concat( - map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()), - map.getMulti(null, Flux.fromIterable(entries.keySet())) - ) - .doFinally(s -> map.close()) - ) + .flatMapMany(map -> { + var entriesFlux = Flux.fromIterable(entries.entrySet()); + var keysFlux = entriesFlux.map(Entry::getKey); + var resultsFlux = Flux + .concat( + map.putMulti(entriesFlux).then(Mono.empty()), + map.getMulti(null, keysFlux) + ); + return Flux.zip(keysFlux, resultsFlux, Map::entry).doFinally(s -> map.close()); + }) .filter(k -> k.getValue().isPresent()) .map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())) .transform(LLUtils::handleDiscard) @@ -800,11 +803,14 @@ public abstract class TestDictionaryMapDeep { Step>> stpVer = StepVerifier .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) - .flatMapMany(map -> map - .setAllValues(Flux.fromIterable(entries.entrySet())) - .thenMany(map.getMulti(null, Flux.fromIterable(entries.keySet()))) - .doFinally(s -> map.close()) - ) + .flatMapMany(map -> { + var entriesFlux = Flux.fromIterable(entries.entrySet()); + var keysFlux = entriesFlux.map(Entry::getKey); + var resultsFlux = map + .setAllValues(Flux.fromIterable(entries.entrySet())) + .thenMany(map.getMulti(null, Flux.fromIterable(entries.keySet()))); + return Flux.zip(keysFlux, resultsFlux, Map::entry).doFinally(s -> map.close()); + }) .filter(k -> k.getValue().isPresent()) .map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())) )); @@ -856,13 +862,16 @@ public abstract class TestDictionaryMapDeep { Step>> stpVer = StepVerifier .create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) - .flatMapMany(map -> Flux - .concat( - map.set(entries).then(Mono.empty()), - map.getMulti(null, Flux.fromIterable(entries.keySet())) - ) - .doFinally(s -> map.close()) - ) + .flatMapMany(map -> { + var entriesFlux = Flux.fromIterable(entries.entrySet()); + var keysFlux = entriesFlux.map(Entry::getKey); + var resultsFlux = Flux + .concat( + map.set(entries).then(Mono.empty()), + map.getMulti(null, Flux.fromIterable(entries.keySet())) + ); + return Flux.zip(keysFlux, resultsFlux, Map::entry).doFinally(s -> map.close()); + }) .filter(k -> k.getValue().isPresent()) .map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())) .transform(LLUtils::handleDiscard)