From d9187b70a98c299cae1cd652c8b51d7c2217046f Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Tue, 2 Feb 2021 15:36:11 +0100 Subject: [PATCH] Update pom.xml, Example.java, and 9 more files... --- pom.xml | 9 +- .../it.cavallium.dbengine.client/Example.java | 176 +++++++++++++++++- .../dbengine/database/LLDictionary.java | 4 + .../DatabaseMapDictionaryDeep.java | 71 +++++-- .../collections/DatabaseStageMap.java | 9 +- .../collections/QueryableBuilder.java | 22 +++ .../database/collections/SubStageGetter.java | 2 + .../collections/SubStageGetterMap.java | 35 +++- .../collections/SubStageGetterMapDeep.java | 43 ++++- .../collections/SubStageGetterSingle.java | 5 + .../database/disk/LLLocalDictionary.java | 79 +++++--- 11 files changed, 402 insertions(+), 53 deletions(-) create mode 100644 src/main/java/it/cavallium/dbengine/database/collections/QueryableBuilder.java diff --git a/pom.xml b/pom.xml index 2af6edd..2b424a7 100644 --- a/pom.xml +++ b/pom.xml @@ -26,6 +26,11 @@ + + org.slf4j + slf4j-api + 1.7.30 + org.warp common-utils @@ -138,12 +143,12 @@ io.projectreactor reactor-core - 3.4.1 + 3.4.2 io.projectreactor reactor-tools - 3.4.1 + 3.4.2 diff --git a/src/example/java/it.cavallium.dbengine.client/Example.java b/src/example/java/it.cavallium.dbengine.client/Example.java index 2393deb..88d83c0 100644 --- a/src/example/java/it.cavallium.dbengine.client/Example.java +++ b/src/example/java/it.cavallium.dbengine.client/Example.java @@ -1,12 +1,18 @@ package it.cavallium.dbengine.client; import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; import it.cavallium.dbengine.database.Column; import it.cavallium.dbengine.database.LLKeyValueDatabase; import it.cavallium.dbengine.database.collections.DatabaseMapDictionary; import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep; +import it.cavallium.dbengine.database.collections.DatabaseStageEntry; +import it.cavallium.dbengine.database.collections.DatabaseStageMap; +import it.cavallium.dbengine.database.collections.QueryableBuilder; import it.cavallium.dbengine.database.collections.Serializer; import it.cavallium.dbengine.database.collections.SerializerFixedBinaryLength; +import it.cavallium.dbengine.database.collections.SubStageGetterMap; +import it.cavallium.dbengine.database.collections.SubStageGetterMapDeep; import it.cavallium.dbengine.database.collections.SubStageGetterSingleBytes; import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection; import java.io.IOException; @@ -20,6 +26,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.concurrent.CompletionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; @@ -32,9 +39,9 @@ import reactor.util.function.Tuples; public class Example { - private static final boolean printPreviousValue = false; + public static final boolean printPreviousValue = false; private static final int numRepeats = 1000; - private static final int batchSize = 10000; + private static final int batchSize = 1000; public static void main(String[] args) throws InterruptedException { /* @@ -53,12 +60,169 @@ public class Example { */ - rangeTestPutMultiProgressive() - .then(rangeTestPutMultiSame()) + rangeTestPutMultiSame() + .then(rangeTestPutMultiProgressive()) + .then(testPutMulti()) + .then(testPutValue()) + .then(testAtPut()) + .then(test2LevelPut()) + .then(test3LevelPut()) + .then(test4LevelPut()) .subscribeOn(Schedulers.parallel()) .blockOptional(); } + private static Mono testCreateQueryable() { + var ssg = new SubStageGetterSingleBytes(); + var ser = SerializerFixedBinaryLength.noop(4); + var itemKey = new byte[]{0, 1, 2, 3}; + var newValue = new byte[]{4, 5, 6, 7}; + return test("Create Queryable", + tempDb() + .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) + .map(tuple -> tuple.mapT2(dict -> { + var builder = new QueryableBuilder(2); + return builder.wrap(DatabaseMapDictionaryDeep.simple(dict, builder.tail(ssg, ser), builder.serializer())); + })), + tuple -> Flux.range(0, batchSize).flatMap(n -> Mono + .defer(() -> Mono + .fromRunnable(() -> { + if (printPreviousValue) + System.out.println("Setting new value at key " + Arrays.toString(itemKey) + ": " + Arrays.toString(newValue)); + }) + .then(tuple.getT2().at(null, itemKey)) + .flatMap(handle -> handle.setAndGetPrevious(newValue)) + .doOnSuccess(oldValue -> { + if (printPreviousValue) + System.out.println("Old value: " + (oldValue == null ? "None" : Arrays.toString(oldValue))); + }) + )) + .then(), + numRepeats, + tuple -> tuple.getT1().close()); + } + + private static Mono test2LevelPut() { + var k1ser = SerializerFixedBinaryLength.noop(4); + var k2ser = SerializerFixedBinaryLength.noop(4); + var vser = SerializerFixedBinaryLength.noop(4); + var ssg = new SubStageGetterMap(k2ser, vser); + return test("2 level put", + tempDb() + .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) + .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.deepTail(dict, ssg, k1ser, ssg.getKeyBinaryLength()))), + tuple -> Flux.range(0, batchSize).flatMap(n -> { + var itemKey1 = Ints.toByteArray(n / 4); + var itemKey2 = Ints.toByteArray(n); + var newValue = Ints.toByteArray(n); + return Mono + .defer(() -> Mono + .fromRunnable(() -> { + if (printPreviousValue) + System.out.println("Setting new value at key " + Arrays.toString(itemKey1) + "+" + Arrays.toString(itemKey2) + ": " + Arrays.toString(newValue)); + }) + .then(tuple.getT2().at(null, itemKey1)) + .map(handle -> (DatabaseStageMap>) handle) + .flatMap(handleK1 -> handleK1.at(null, itemKey2)) + .flatMap(handleK2 -> handleK2.setAndGetPrevious(newValue)) + .doOnSuccess(oldValue -> { + if (printPreviousValue) + System.out.println("Old value: " + (oldValue == null ? "None" : Arrays.toString(oldValue))); + }) + ); + }) + .then(), + numRepeats, + tuple -> tuple.getT1().close()); + } + + private static Mono test3LevelPut() { + var k1ser = SerializerFixedBinaryLength.noop(4); + var k2ser = SerializerFixedBinaryLength.noop(8); + var k3ser = SerializerFixedBinaryLength.noop(4); + var vser = SerializerFixedBinaryLength.noop(4); + var ssg3 = new SubStageGetterMap(k3ser, vser); + var ssg2 = new SubStageGetterMapDeep<>(ssg3, k2ser, ssg3.getKeyBinaryLength()); + return test("3 level put", + tempDb() + .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) + .map(tuple -> tuple.mapT2(dict -> { + return DatabaseMapDictionaryDeep.deepTail(dict, ssg2, k1ser, ssg2.getKeyBinaryLength()); + })), + tuple -> Flux.range(0, batchSize).flatMap(n -> { + var itemKey1 = Ints.toByteArray(n / 4); + var itemKey2 = Longs.toByteArray(n); + var itemKey3 = Ints.toByteArray(n); + var newValue = Ints.toByteArray(n); + return Mono + .defer(() -> Mono + .fromRunnable(() -> { + if (printPreviousValue) + System.out.println("Setting new value at key " + Arrays.toString(itemKey1) + "+" + Arrays.toString(itemKey2) + "+" + Arrays.toString(itemKey3) + ": " + Arrays.toString(newValue)); + }) + .then(tuple.getT2().at(null, itemKey1)) + .map(handle -> (DatabaseStageMap, DatabaseStageEntry>>) handle) + .flatMap(handleK1 -> handleK1.at(null, itemKey2)) + .map(handle -> (DatabaseStageMap>) handle) + .flatMap(handleK2 -> handleK2.at(null, itemKey3)) + .flatMap(handleK3 -> handleK3.setAndGetPrevious(newValue)) + .doOnSuccess(oldValue -> { + if (printPreviousValue) + System.out.println("Old value: " + (oldValue == null ? "None" : Arrays.toString(oldValue))); + }) + ); + }) + .then(), + numRepeats, + tuple -> tuple.getT1().close()); + } + + private static Mono test4LevelPut() { + var k1ser = SerializerFixedBinaryLength.noop(4); + var k2ser = SerializerFixedBinaryLength.noop(8); + var k3ser = SerializerFixedBinaryLength.noop(4); + var k4ser = SerializerFixedBinaryLength.noop(8); + var vser = SerializerFixedBinaryLength.noop(4); + var ssg4 = new SubStageGetterMap(k4ser, vser); + var ssg3 = new SubStageGetterMapDeep<>(ssg4, k3ser, ssg4.getKeyBinaryLength()); + var ssg2 = new SubStageGetterMapDeep<>(ssg3, k2ser, ssg3.getKeyBinaryLength()); + return test("4 level put", + tempDb() + .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) + .map(tuple -> tuple.mapT2(dict -> { + return DatabaseMapDictionaryDeep.deepTail(dict, ssg2, k1ser, ssg2.getKeyBinaryLength()); + })), + tuple -> Flux.range(0, batchSize).flatMap(n -> { + var itemKey1 = Ints.toByteArray(n / 4); + var itemKey2 = Longs.toByteArray(n); + var itemKey3 = Ints.toByteArray(n * 2); + var itemKey4 = Longs.toByteArray(n * 3L); + var newValue = Ints.toByteArray(n * 4); + return Mono + .defer(() -> Mono + .fromRunnable(() -> { + if (printPreviousValue) + System.out.println("Setting new value at key " + Arrays.toString(itemKey1) + "+" + Arrays.toString(itemKey2) + "+" + Arrays.toString(itemKey3) + "+" + Arrays.toString(itemKey4) + ": " + Arrays.toString(newValue)); + }) + .then(tuple.getT2().at(null, itemKey1)) + .map(handle -> (DatabaseStageMap>, DatabaseStageEntry>>>) handle) + .flatMap(handleK1 -> handleK1.at(null, itemKey2)) + .map(handle -> (DatabaseStageMap, DatabaseStageEntry>>) handle) + .flatMap(handleK2 -> handleK2.at(null, itemKey3)) + .map(handle -> (DatabaseStageMap>) handle) + .flatMap(handleK3 -> handleK3.at(null, itemKey4)) + .flatMap(handleK4 -> handleK4.setAndGetPrevious(newValue)) + .doOnSuccess(oldValue -> { + if (printPreviousValue) + System.out.println("Old value: " + (oldValue == null ? "None" : Arrays.toString(oldValue))); + }) + ); + }) + .then(), + numRepeats, + tuple -> tuple.getT1().close()); + } + private static Mono testAtPut() { var ssg = new SubStageGetterSingleBytes(); var ser = SerializerFixedBinaryLength.noop(4); @@ -238,7 +402,7 @@ public class Example { for (int i = 0; i < batchSize; i++) { keysToPut.put(Ints.toByteArray(i * 3), Ints.toByteArray(i * 11)); } - return test("MapDictionary::putMulti (batch of " + batchSize + " entries)", + return test("MapDictionary::putMulti (same keys, batch of " + batchSize + " entries)", tempDb() .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ser, vser))), @@ -258,7 +422,7 @@ public class Example { var ser = SerializerFixedBinaryLength.noop(4); var vser = Serializer.noop(); AtomicInteger ai = new AtomicInteger(0); - return test("MapDictionary::putMulti (batch of " + batchSize + " entries)", + return test("MapDictionary::putMulti (progressive keys, batch of " + batchSize + " entries)", tempDb() .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ser, vser))), diff --git a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java index 2a5c0b4..e2fc98f 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java @@ -53,5 +53,9 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure { Mono sizeRange(@Nullable LLSnapshot snapshot, LLRange range, boolean fast); + Mono> getOne(@Nullable LLSnapshot snapshot, LLRange range); + + Mono getOneKey(@Nullable LLSnapshot snapshot, LLRange range); + Mono> removeOne(LLRange range); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index 8bb8d98..4cab9e0 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -40,7 +40,7 @@ public class DatabaseMapDictionaryDeep> implem byte fillValue) { assert prefixKey.length == prefixLength; assert suffixLength > 0; - assert extLength > 0; + assert extLength >= 0; byte[] result = Arrays.copyOf(prefixKey, prefixLength + suffixLength + extLength); Arrays.fill(result, prefixLength, result.length, fillValue); return result; @@ -71,7 +71,7 @@ public class DatabaseMapDictionaryDeep> implem assert prefixKey.length == prefixLength; assert suffixKey.length == suffixLength; assert suffixLength > 0; - assert extLength > 0; + assert extLength >= 0; byte[] result = Arrays.copyOf(prefixKey, prefixLength + suffixLength + extLength); System.arraycopy(suffixKey, 0, result, prefixLength, suffixLength); Arrays.fill(result, prefixLength + suffixLength, result.length, fillValue); @@ -120,6 +120,7 @@ public class DatabaseMapDictionaryDeep> implem byte[] firstKey = firstKey(keyPrefix, keyPrefix.length, keySuffixLength, keyExtLength); byte[] lastKey = lastKey(keyPrefix, keyPrefix.length, keySuffixLength, keyExtLength); this.range = keyPrefix.length == 0 ? LLRange.all() : LLRange.of(firstKey, lastKey); + assert subStageKeysConsistency(keyPrefix.length + keySuffixLength + keyExtLength); } @SuppressWarnings("unused") @@ -167,6 +168,7 @@ public class DatabaseMapDictionaryDeep> implem assert suffixKey.length == keySuffixLength; byte[] result = Arrays.copyOf(keyPrefix, keyPrefix.length + keySuffixLength); System.arraycopy(suffixKey, 0, result, keyPrefix.length, keySuffixLength); + assert result.length == keyPrefix.length + keySuffixLength; return result; } @@ -193,6 +195,7 @@ public class DatabaseMapDictionaryDeep> implem return LLRange.of(first, end); } + @SuppressWarnings("ReactiveStreamsUnusedPublisher") @Override public Mono at(@Nullable CompositeSnapshot snapshot, T keySuffix) { byte[] keySuffixData = serializeSuffix(keySuffix); @@ -200,26 +203,57 @@ public class DatabaseMapDictionaryDeep> implem .subStage(dictionary, snapshot, toKeyWithoutExt(keySuffixData), - this.dictionary.getRangeKeys(resolveSnapshot(snapshot), toExtRange(keySuffixData)) + this.subStageGetter.needsKeyFlux() + ? this.dictionary.getRangeKeys(resolveSnapshot(snapshot), toExtRange(keySuffixData)) + : Flux.empty() ); } @Override public Flux> getAllStages(@Nullable CompositeSnapshot snapshot) { - return dictionary - .getRangeKeysGrouped(resolveSnapshot(snapshot), range, keyPrefix.length + keySuffixLength) - .flatMap(rangeKeys -> { - //System.out.println(Thread.currentThread() + "\tkReceived range key flux"); - byte[] groupKeyWithoutExt = removeExtFromFullKey(rangeKeys.get(0)); - byte[] groupSuffix = this.stripPrefix(groupKeyWithoutExt); - return this.subStageGetter - .subStage(dictionary, snapshot, groupKeyWithoutExt, Flux.fromIterable(rangeKeys)) - //.doOnSuccess(s -> System.out.println(Thread.currentThread() + "\tObtained stage for a key")) + return Flux.defer(() -> { + if (this.subStageGetter.needsKeyFlux()) { + return dictionary + .getRangeKeysGrouped(resolveSnapshot(snapshot), range, keyPrefix.length + keySuffixLength) + .flatMap(rangeKeys -> { + byte[] groupKeyWithExt = rangeKeys.get(0); + byte[] groupKeyWithoutExt = removeExtFromFullKey(groupKeyWithExt); + byte[] groupSuffix = this.stripPrefix(groupKeyWithoutExt); + assert subStageKeysConsistency(groupKeyWithExt.length); + return this.subStageGetter + .subStage(dictionary, + snapshot, + groupKeyWithoutExt, + this.subStageGetter.needsKeyFlux() ? Flux.defer(() -> Flux.fromIterable(rangeKeys)) : Flux.empty() + ) .map(us -> Map.entry(this.deserializeSuffix(groupSuffix), us)); - //.doOnSuccess(s -> System.out.println(Thread.currentThread() + "\tMapped stage for a key")); - } - ); - //.doOnNext(s -> System.out.println(Thread.currentThread() + "\tNext stage")) + }); + } else { + return dictionary + .getOneKey(resolveSnapshot(snapshot), range) + .flatMap(randomKeyWithExt -> { + byte[] keyWithoutExt = removeExtFromFullKey(randomKeyWithExt); + byte[] keySuffix = this.stripPrefix(keyWithoutExt); + assert subStageKeysConsistency(keyWithoutExt.length); + return this.subStageGetter + .subStage(dictionary, snapshot, keyWithoutExt, Mono.just(randomKeyWithExt).flux()) + .map(us -> Map.entry(this.deserializeSuffix(keySuffix), us)); + }); + } + }); + + } + + private boolean subStageKeysConsistency(int totalKeyLength) { + if (subStageGetter instanceof SubStageGetterMapDeep) { + return totalKeyLength + == keyPrefix.length + keySuffixLength + ((SubStageGetterMapDeep) subStageGetter).getKeyBinaryLength(); + } else if (subStageGetter instanceof SubStageGetterMap) { + return totalKeyLength + == keyPrefix.length + keySuffixLength + ((SubStageGetterMap) subStageGetter).getKeyBinaryLength(); + } else { + return true; + } } @Override @@ -234,11 +268,14 @@ public class DatabaseMapDictionaryDeep> implem //todo: temporary wrapper. convert the whole class to buffers protected T deserializeSuffix(byte[] keySuffix) { + assert suffixKeyConsistency(keySuffix.length); return keySuffixSerializer.deserialize(keySuffix); } //todo: temporary wrapper. convert the whole class to buffers protected byte[] serializeSuffix(T keySuffix) { - return keySuffixSerializer.serialize(keySuffix); + byte[] suffixData = keySuffixSerializer.serialize(keySuffix); + assert suffixKeyConsistency(suffixData.length); + return suffixData; } } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java index 8229dc3..6a23d10 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java @@ -1,14 +1,13 @@ package it.cavallium.dbengine.database.collections; import it.cavallium.dbengine.client.CompositeSnapshot; +import it.cavallium.dbengine.database.collections.Joiner.ValueGetter; +import it.cavallium.dbengine.database.collections.JoinerBlocking.ValueGetterBlocking; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; -import java.util.Objects; import java.util.function.Function; import org.jetbrains.annotations.Nullable; -import it.cavallium.dbengine.database.collections.Joiner.ValueGetter; -import it.cavallium.dbengine.database.collections.JoinerBlocking.ValueGetterBlocking; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -26,7 +25,7 @@ public interface DatabaseStageMap> extends Dat } default Mono putValue(T key, U value) { - return putValueAndGetStatus(key, value).then(); + return at(null, key).single().flatMap(v -> v.set(value)); } default Mono putValueAndGetPrevious(T key, U value) { @@ -34,7 +33,7 @@ public interface DatabaseStageMap> extends Dat } default Mono putValueAndGetStatus(T key, U value) { - return putValueAndGetPrevious(key, value).map(oldValue -> !Objects.equals(oldValue, value)).defaultIfEmpty(false); + return at(null, key).single().flatMap(v -> v.setAndGetStatus(value)); } default Mono remove(T key) { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/QueryableBuilder.java b/src/main/java/it/cavallium/dbengine/database/collections/QueryableBuilder.java new file mode 100644 index 0000000..7f57683 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/collections/QueryableBuilder.java @@ -0,0 +1,22 @@ +package it.cavallium.dbengine.database.collections; + +public class QueryableBuilder { + + public QueryableBuilder(int stagesNumber) { + + } + + public SerializerFixedBinaryLength serializer() { + return null; + } + + public > SubStageGetterSingleBytes tail(U ssg, + SerializerFixedBinaryLength ser) { + return null; + + } + + public , M extends DatabaseStageMap> M wrap(M map) { + return null; + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetter.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetter.java index 5b47ef8..c130a84 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetter.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetter.java @@ -12,4 +12,6 @@ public interface SubStageGetter> { @Nullable CompositeSnapshot snapshot, byte[] prefixKey, Flux keyFlux); + + boolean needsKeyFlux(); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java index da920b3..3919bf1 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java @@ -9,6 +9,15 @@ import reactor.core.publisher.Mono; public class SubStageGetterMap implements SubStageGetter, DatabaseStageEntry>> { + private static final boolean assertsEnabled; + static { + boolean assertsEnabledTmp = false; + //noinspection AssertWithSideEffects + assert assertsEnabledTmp = true; + //noinspection ConstantConditions + assertsEnabled = assertsEnabledTmp; + } + private final SerializerFixedBinaryLength keySerializer; private final Serializer valueSerializer; @@ -23,6 +32,30 @@ public class SubStageGetterMap implements SubStageGetter, Databa @Nullable CompositeSnapshot snapshot, byte[] prefixKey, Flux keyFlux) { - return Mono.just(DatabaseMapDictionary.tail(dictionary, keySerializer, valueSerializer, prefixKey)); + Mono>> result = Mono.just(DatabaseMapDictionary.tail(dictionary, + keySerializer, + valueSerializer, + prefixKey + )); + if (assertsEnabled) { + return checkKeyFluxConsistency(prefixKey, keyFlux).then(result); + } else { + return result; + } + } + + @Override + public boolean needsKeyFlux() { + return assertsEnabled; + } + + private Mono checkKeyFluxConsistency(byte[] prefixKey, Flux keyFlux) { + return keyFlux.doOnNext(key -> { + assert key.length == prefixKey.length + getKeyBinaryLength(); + }).then(); + } + + public int getKeyBinaryLength() { + return keySerializer.getSerializedBinaryLength(); } } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java index 8974ca2..04aecc5 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java @@ -10,6 +10,15 @@ import reactor.core.publisher.Mono; public class SubStageGetterMapDeep> implements SubStageGetter, DatabaseStageEntry>> { + private static final boolean assertsEnabled; + static { + boolean assertsEnabledTmp = false; + //noinspection AssertWithSideEffects + assert assertsEnabledTmp = true; + //noinspection ConstantConditions + assertsEnabled = assertsEnabledTmp; + } + private final SubStageGetter subStageGetter; private final SerializerFixedBinaryLength keySerializer; private final int keyExtLength; @@ -20,6 +29,18 @@ public class SubStageGetterMapDeep> implements this.subStageGetter = subStageGetter; this.keySerializer = keySerializer; this.keyExtLength = keyExtLength; + assert keyExtConsistency(); + } + + + private boolean keyExtConsistency() { + if (subStageGetter instanceof SubStageGetterMapDeep) { + return keyExtLength == ((SubStageGetterMapDeep) subStageGetter).getKeyBinaryLength(); + } else if (subStageGetter instanceof SubStageGetterMap) { + return keyExtLength == ((SubStageGetterMap) subStageGetter).getKeyBinaryLength(); + } else { + return true; + } } @Override @@ -27,11 +48,31 @@ public class SubStageGetterMapDeep> implements @Nullable CompositeSnapshot snapshot, byte[] prefixKey, Flux keyFlux) { - return Mono.just(DatabaseMapDictionaryDeep.deepIntermediate(dictionary, + Mono>> result = Mono.just(DatabaseMapDictionaryDeep.deepIntermediate(dictionary, subStageGetter, keySerializer, prefixKey, keyExtLength )); + if (assertsEnabled) { + return checkKeyFluxConsistency(prefixKey, keyFlux).then(result); + } else { + return result; + } + } + + @Override + public boolean needsKeyFlux() { + return assertsEnabled; + } + + private Mono checkKeyFluxConsistency(byte[] prefixKey, Flux keyFlux) { + return keyFlux.doOnNext(key -> { + assert key.length == prefixKey.length + getKeyBinaryLength(); + }).then(); + } + + public int getKeyBinaryLength() { + return keySerializer.getSerializedBinaryLength() + keyExtLength; } } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java index 989042c..6a90686 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java @@ -41,6 +41,11 @@ public class SubStageGetterSingle implements SubStageGetter get(@Nullable LLSnapshot snapshot, byte[] key) { return Mono .fromCallable(() -> { + logger.trace("Reading {}", key); Holder data = new Holder<>(); if (db.keyMayExist(cfh, resolveSnapshot(snapshot), key, data)) { if (data.getValue() != null) { @@ -160,6 +164,7 @@ public class LLLocalDictionary implements LLDictionary { return getPrevValue(key, resultType) .concatWith(Mono .fromCallable(() -> { + logger.trace("Writing {}: {}", key, value); db.put(cfh, key, value); return null; }) @@ -191,6 +196,7 @@ public class LLLocalDictionary implements LLDictionary { case PREVIOUS_VALUE: return Mono .fromCallable(() -> { + logger.trace("Reading {}", key); var data = new Holder(); if (db.keyMayExist(cfh, key, data)) { if (data.getValue() != null) { @@ -249,7 +255,6 @@ public class LLLocalDictionary implements LLDictionary { .getMulti(null, Flux.fromIterable(entriesWindow).map(Entry::getKey)) .publishOn(dbScheduler) .concatWith(Mono.fromCallable(() -> { - //System.out.println(Thread.currentThread()+"\tTest"); var batch = new CappedWriteBatch(db, CAPPED_WRITE_BATCH_CAP, RESERVED_WRITE_BATCH_SIZE, @@ -346,7 +351,6 @@ public class LLLocalDictionary implements LLDictionary { private Flux> getRangeMulti(LLSnapshot snapshot, LLRange range) { return Flux .>push(sink -> { - //System.out.println(Thread.currentThread() + "\tPreparing Read rande item"); try (var rocksIterator = db.newIterator(cfh, resolveSnapshot(snapshot))) { if (range.hasMin()) { rocksIterator.seek(range.getMin()); @@ -359,12 +363,10 @@ public class LLLocalDictionary implements LLDictionary { if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) { break; } - //System.out.println(Thread.currentThread() + "\tRead rande item"); sink.next(Map.entry(key, rocksIterator.value())); rocksIterator.next(); } } finally { - //System.out.println(Thread.currentThread() + "\tFinish Read rande item"); sink.complete(); } }) @@ -374,7 +376,6 @@ public class LLLocalDictionary implements LLDictionary { private Flux>> getRangeMultiGrouped(LLSnapshot snapshot, LLRange range, int prefixLength) { return Flux .>>push(sink -> { - //System.out.println(Thread.currentThread() + "\tPreparing Read rande item"); try (var rocksIterator = db.newIterator(cfh, resolveSnapshot(snapshot))) { if (range.hasMin()) { rocksIterator.seek(range.getMin()); @@ -397,7 +398,6 @@ public class LLLocalDictionary implements LLDictionary { currentGroupValues.add(Map.entry(key, rocksIterator.value())); } else { if (!currentGroupValues.isEmpty()) { - //System.out.println(Thread.currentThread() + "\tRead rande item"); sink.next(currentGroupValues); } firstGroupKey = key; @@ -406,11 +406,9 @@ public class LLLocalDictionary implements LLDictionary { rocksIterator.next(); } if (!currentGroupValues.isEmpty()) { - //System.out.println(Thread.currentThread() + "\tRead rande item"); sink.next(currentGroupValues); } - } finally { - //System.out.println(Thread.currentThread() + "\tFinish Read rande item"); + } finally {; sink.complete(); } }) @@ -421,10 +419,8 @@ public class LLLocalDictionary implements LLDictionary { public Flux getRangeKeys(@Nullable LLSnapshot snapshot, LLRange range) { return Flux.defer(() -> { if (range.isSingle()) { - //System.out.println(Thread.currentThread() + "getRangeKeys single"); - return getRangeKeysSingle(snapshot, range.getMin()).doOnTerminate(() -> {}/*System.out.println(Thread.currentThread() + "getRangeKeys single end")*/); + return getRangeKeysSingle(snapshot, range.getMin()); } else { - //System.out.println(Thread.currentThread() + "getRangeKeys multi"); return getRangeKeysMulti(snapshot, range); } }); @@ -434,7 +430,6 @@ public class LLLocalDictionary implements LLDictionary { public Flux> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) { return Flux .>push(sink -> { - //System.out.println(Thread.currentThread() + "\tPreparing Read rande item"); try (var rocksIterator = db.newIterator(cfh, resolveSnapshot(snapshot))) { if (range.hasMin()) { rocksIterator.seek(range.getMin()); @@ -457,7 +452,6 @@ public class LLLocalDictionary implements LLDictionary { currentGroupValues.add(key); } else { if (!currentGroupValues.isEmpty()) { - //System.out.println(Thread.currentThread() + "\tRead rande item"); sink.next(currentGroupValues); } firstGroupKey = key; @@ -467,11 +461,9 @@ public class LLLocalDictionary implements LLDictionary { rocksIterator.next(); } if (!currentGroupValues.isEmpty()) { - //System.out.println(Thread.currentThread() + "\tRead rande item"); sink.next(currentGroupValues); } } finally { - //System.out.println(Thread.currentThread() + "\tFinish Read rande item"); sink.complete(); } }) @@ -489,7 +481,6 @@ public class LLLocalDictionary implements LLDictionary { private Flux getRangeKeysMulti(LLSnapshot snapshot, LLRange range) { return Flux .push(sink -> { - //System.out.println(Thread.currentThread() + "\tkPreparing Read rande item"); try (var rocksIterator = db.newIterator(cfh, resolveSnapshot(snapshot))) { if (range.hasMin()) { rocksIterator.seek(range.getMin()); @@ -497,21 +488,17 @@ public class LLLocalDictionary implements LLDictionary { rocksIterator.seekToFirst(); } byte[] key; - sink.onRequest(l -> {}/*System.out.println(Thread.currentThread() + "\tkRequested " + l)*/); while (rocksIterator.isValid()) { key = rocksIterator.key(); if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) { break; } - //System.out.println(Thread.currentThread() + "\tkRead rande item"); sink.next(key); rocksIterator.next(); } } finally { - //System.out.println(Thread.currentThread() + "\tkFinish Read rande item"); sink.complete(); } - //System.out.println(Thread.currentThread() + "\tkFinish end Read rande item"); }) .subscribeOn(dbScheduler); } @@ -645,6 +632,56 @@ public class LLLocalDictionary implements LLDictionary { }); } + @Override + public Mono> getOne(@Nullable LLSnapshot snapshot, LLRange range) { + return Mono + .fromCallable(() -> { + try (var rocksIterator = db.newIterator(cfh, resolveSnapshot(snapshot))) { + if (range.hasMin()) { + rocksIterator.seek(range.getMin()); + } else { + rocksIterator.seekToFirst(); + } + byte[] key; + if (rocksIterator.isValid()) { + key = rocksIterator.key(); + if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) { + return null; + } + return Map.entry(key, rocksIterator.value()); + } else { + return null; + } + } + }) + .subscribeOn(dbScheduler); + } + + @Override + public Mono getOneKey(@Nullable LLSnapshot snapshot, LLRange range) { + return Mono + .fromCallable(() -> { + try (var rocksIterator = db.newIterator(cfh, resolveSnapshot(snapshot))) { + if (range.hasMin()) { + rocksIterator.seek(range.getMin()); + } else { + rocksIterator.seekToFirst(); + } + byte[] key; + if (rocksIterator.isValid()) { + key = rocksIterator.key(); + if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) { + return null; + } + return key; + } else { + return null; + } + } + }) + .subscribeOn(dbScheduler); + } + private long fastSizeAll(@Nullable LLSnapshot snapshot) { var rocksdbSnapshot = resolveSnapshot(snapshot); if (USE_CURRENT_FASTSIZE_FOR_OLD_SNAPSHOTS || rocksdbSnapshot.snapshot() == null) {