From 2e58189015d7161c77c9dda5179f201347ddc498 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sat, 21 May 2022 23:49:06 +0200 Subject: [PATCH] Code cleanup --- .../collections/DatabaseMapDictionary.java | 51 +++++++++---------- .../DatabaseMapDictionaryHashed.java | 7 --- .../collections/DatabaseMapSingle.java | 5 +- .../collections/DatabaseSingleBucket.java | 4 +- .../collections/DatabaseSingleMapped.java | 5 +- .../collections/DatabaseSingleton.java | 5 +- .../database/collections/DatabaseStage.java | 9 +--- .../collections/DatabaseStageMap.java | 28 ++++------ .../database/disk/LLLocalDictionary.java | 18 +++++-- 9 files changed, 56 insertions(+), 76 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index 755ceb9..ab645d6 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -37,7 +37,6 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.SynchronousSink; /** * Optimized implementation of "DatabaseMapDictionary with SubStageGetterSingle" @@ -206,7 +205,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) { + public Mono> get(@Nullable CompositeSnapshot snapshot) { return dictionary .getRange(resolveSnapshot(snapshot), rangeMono, false, true) .map(entry -> { @@ -231,7 +230,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> setAndGetPrevious(Object2ObjectSortedMap value) { return this - .get(null, false) + .get(null) .concatWith(dictionary.setRange(rangeMono, Flux .fromIterable(Collections.unmodifiableMap(value).entrySet()) .map(this::serializeEntry), true).then(Mono.empty())) @@ -271,7 +270,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep getValue(@Nullable CompositeSnapshot snapshot, T keySuffix, boolean existsAlmostCertainly) { + public Mono getValue(@Nullable CompositeSnapshot snapshot, T keySuffix) { return Mono.usingWhen(dictionary .get(resolveSnapshot(snapshot), Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix))), value -> Mono.fromCallable(() -> deserializeValue(keySuffix, value)), @@ -398,13 +397,17 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> getMulti(@Nullable CompositeSnapshot snapshot, Flux keys, boolean existsAlmostCertainly) { + public Flux> getMulti(@Nullable CompositeSnapshot snapshot, Flux keys) { var mappedKeys = keys.map(this::serializeKeySuffixToKey); return dictionary .getMulti(resolveSnapshot(snapshot), mappedKeys) .map(valueBufOpt -> { try (valueBufOpt) { - return valueBufOpt.map(valueSerializer::deserialize); + if (valueBufOpt.isPresent()) { + return Optional.of(valueSerializer.deserialize(valueBufOpt.get())); + } else { + return Optional.empty(); + } } }); } @@ -491,29 +494,23 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep sliceRangeMono, boolean reverse, boolean smallRange) { return dictionary .getRangeKeys(resolveSnapshot(snapshot), sliceRangeMono, reverse, smallRange) - .flatMapSequential(keyBuf -> Mono - .>>fromCallable(() -> { - assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength; - // Remove prefix. Keep only the suffix and the ext - splitPrefix(keyBuf).close(); - suffixKeyLengthConsistency(keyBuf.readableBytes()); - T keySuffix; - try (var keyBufCopy = keyBuf.copy()) { - keySuffix = deserializeSuffix(keyBufCopy); - } - var bufSupplier = BufSupplier.ofOwned(toKey(keyBuf)); + .map(keyBuf -> { + try (keyBuf) { + assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength; + // Remove prefix. Keep only the suffix and the ext + splitPrefix(keyBuf).close(); + suffixKeyLengthConsistency(keyBuf.readableBytes()); + var bufSupplier = BufSupplier.ofOwned(toKey(keyBuf.copy())); + try { + T keySuffix = deserializeSuffix(keyBuf); var subStage = new DatabaseMapSingle<>(dictionary, bufSupplier, valueSerializer, null); return new SubStageEntry<>(keySuffix, subStage); - }).doOnCancel(() -> { - if (keyBuf.isAccessible()) { - keyBuf.close(); - } - }).doOnError(ex -> { - if (keyBuf.isAccessible()) { - keyBuf.close(); - } - }) - ); + } catch (Throwable ex) { + bufSupplier.close(); + throw ex; + } + } + }); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java index 8701ea7..336d815 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java @@ -260,13 +260,6 @@ public class DatabaseMapDictionaryHashed extends .map(this::deserializeMap); } - @Override - public Mono> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) { - return subDictionary - .get(snapshot, existsAlmostCertainly) - .map(this::deserializeMap); - } - @Override public Mono leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) { return subDictionary.leavesCount(snapshot, fast); diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java index 90bc21d..b0c61a3 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java @@ -3,7 +3,6 @@ package it.cavallium.dbengine.database.collections; import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.Drop; import io.netty5.buffer.api.Owned; -import io.netty5.buffer.api.Send; import io.netty5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.client.CompositeSnapshot; @@ -19,13 +18,11 @@ import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.database.serialization.Serializer; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Supplier; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.SynchronousSink; public class DatabaseMapSingle extends ResourceSupport, DatabaseMapSingle> implements DatabaseStageEntry { @@ -113,7 +110,7 @@ public class DatabaseMapSingle extends ResourceSupport, Data } @Override - public Mono get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) { + public Mono get(@Nullable CompositeSnapshot snapshot) { return Mono.usingWhen(dictionary.get(resolveSnapshot(snapshot), keyMono), buf -> Mono.fromSupplier(() -> deserializeValue(buf)), buf -> Mono.fromRunnable(buf::close) diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java index 3cf222d..83b1222 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java @@ -82,8 +82,8 @@ public class DatabaseSingleBucket } @Override - public Mono get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) { - return bucketStage.get(snapshot, existsAlmostCertainly).flatMap(this::extractValueTransformation); + public Mono get(@Nullable CompositeSnapshot snapshot) { + return bucketStage.get(snapshot).flatMap(this::extractValueTransformation); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java index 229c7e0..07b5659 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java @@ -8,7 +8,6 @@ import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.client.Mapper; import it.cavallium.dbengine.database.Delta; -import it.cavallium.dbengine.database.LLEntry; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.serialization.SerializationException; @@ -79,8 +78,8 @@ public class DatabaseSingleMapped extends ResourceSupport } @Override - public Mono get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) { - return serializedSingle.get(snapshot, existsAlmostCertainly).handle(this::deserializeSink); + public Mono get(@Nullable CompositeSnapshot snapshot) { + return serializedSingle.get(snapshot).handle(this::deserializeSink); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java index 8b12a3b..2211db4 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java @@ -3,12 +3,10 @@ package it.cavallium.dbengine.database.collections; import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.Drop; import io.netty5.buffer.api.Owned; -import io.netty5.buffer.api.Send; import io.netty5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.Delta; -import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLSingleton; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; @@ -16,7 +14,6 @@ import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.database.serialization.Serializer; -import java.util.concurrent.atomic.AtomicLong; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.Nullable; @@ -105,7 +102,7 @@ public class DatabaseSingleton extends ResourceSupport, Data } @Override - public Mono get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) { + public Mono get(@Nullable CompositeSnapshot snapshot) { return singleton.get(resolveSnapshot(snapshot)) .handle(this::deserializeValue); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java index a4f1275..2c05590 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java @@ -8,23 +8,18 @@ import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.serialization.SerializationFunction; import java.util.Objects; -import java.util.function.Function; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public interface DatabaseStage extends DatabaseStageWithEntry, Resource> { - default Mono get(@Nullable CompositeSnapshot snapshot) { - return get(snapshot, false); - } - - Mono get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly); + Mono get(@Nullable CompositeSnapshot snapshot); default Mono getOrDefault(@Nullable CompositeSnapshot snapshot, Mono defaultValue, boolean existsAlmostCertainly) { - return get(snapshot, existsAlmostCertainly).switchIfEmpty(defaultValue).single(); + return get(snapshot).switchIfEmpty(defaultValue).single(); } default Mono getOrDefault(@Nullable CompositeSnapshot snapshot, Mono defaultValue) { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java index c691dee..32720ab 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java @@ -36,17 +36,13 @@ public interface DatabaseStageMap> extends ); } - default Mono getValue(@Nullable CompositeSnapshot snapshot, T key, boolean existsAlmostCertainly) { + default Mono getValue(@Nullable CompositeSnapshot snapshot, T key) { return Mono.usingWhen(this.at(snapshot, key), - stage -> stage.get(snapshot, existsAlmostCertainly), + stage -> stage.get(snapshot), LLUtils::finalizeResource ); } - default Mono getValue(@Nullable CompositeSnapshot snapshot, T key) { - return getValue(snapshot, key, false); - } - default Mono getValueOrDefault(@Nullable CompositeSnapshot snapshot, T key, Mono defaultValue) { return getValue(snapshot, key).switchIfEmpty(defaultValue).single(); } @@ -110,21 +106,15 @@ public interface DatabaseStageMap> extends return removeAndGetPrevious(key).map(o -> true).defaultIfEmpty(false); } - /** - * GetMulti must return the elements in sequence! - */ - default Flux> getMulti(@Nullable CompositeSnapshot snapshot, Flux keys, boolean existsAlmostCertainly) { - return keys.flatMapSequential(key -> this - .getValue(snapshot, key, existsAlmostCertainly) - .map(Optional::of) - .defaultIfEmpty(Optional.empty())); - } - /** * GetMulti must return the elements in sequence! */ default Flux> getMulti(@Nullable CompositeSnapshot snapshot, Flux keys) { - return getMulti(snapshot, keys, false); + return keys.flatMapSequential(key -> this + .getValue(snapshot, key) + .map(Optional::of) + .defaultIfEmpty(Optional.empty()) + ); } default Mono putMulti(Flux> entries) { @@ -138,7 +128,7 @@ public interface DatabaseStageMap> extends .getAllStages(snapshot, smallRange) .flatMapSequential(stage -> stage .getValue() - .get(snapshot, true) + .get(snapshot) .map(value -> Map.entry(stage.getKey(), value)) .doFinally(s -> stage.getValue().close()) ); @@ -253,7 +243,7 @@ public interface DatabaseStageMap> extends } @Override - default Mono> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) { + default Mono> get(@Nullable CompositeSnapshot snapshot) { return this .getAllValues(snapshot, true) .collectMap(Entry::getKey, Entry::getValue, Object2ObjectLinkedOpenHashMap::new) diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index bd87d7a..d68e633 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -33,6 +33,8 @@ import it.cavallium.dbengine.database.serialization.KVSerializationFunction; import it.cavallium.dbengine.rpc.current.data.DatabaseOptions; import java.io.IOException; import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -260,14 +262,16 @@ public class LLLocalDictionary implements LLDictionary { ); } - private Buffer getSync(LLSnapshot snapshot, Buffer key) throws Exception { + private Buffer getSync(LLSnapshot snapshot, Buffer key) throws IOException { logger.trace(MARKER_ROCKSDB, "Reading {}", () -> toStringSafe(key)); try { var readOptions = generateReadOptionsOrStatic(snapshot); Buffer result; startedGet.increment(); try { - result = getTime.recordCallable(() -> db.get(readOptions, key)); + var initTime = System.nanoTime(); + result = db.get(readOptions, key); + getTime.record(Duration.ofNanos(System.nanoTime() - initTime)); } finally { endedGet.increment(); if (readOptions != EMPTY_READ_OPTIONS) { @@ -543,7 +547,15 @@ public class LLLocalDictionary implements LLDictionary { @Override public Flux getMulti(@Nullable LLSnapshot snapshot, Flux keys) { - return keys.flatMapSequential(key -> runOnDb(false, () -> OptionalBuf.ofNullable(getSync(snapshot, key)))); + return keys + .publishOn(dbRScheduler) + .handle((key, sink) -> { + try (key) { + sink.next(OptionalBuf.ofNullable(getSync(snapshot, key))); + } catch (IOException ex) { + sink.error(ex); + } + }); } @Override