diff --git a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java index 79f52be..96b51a8 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java @@ -35,6 +35,8 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure { Flux> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength); + Flux getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength); + Flux> setRange(LLRange range, Flux> entries, boolean getOldValues); default Mono replaceRange(LLRange range, boolean canKeysChange, Function, Mono>> entriesReplacer) { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index 4f24665..6966910 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -214,7 +214,7 @@ public class DatabaseMapDictionaryDeep> implem public Mono at(@Nullable CompositeSnapshot snapshot, T keySuffix) { byte[] keySuffixData = serializeSuffix(keySuffix); Flux keyFlux; - if (this.subStageGetter.needsKeyFlux()) { + if (this.subStageGetter.needsDebuggingKeyFlux()) { keyFlux = this.dictionary.getRangeKeys(resolveSnapshot(snapshot), toExtRange(keySuffixData)); } else { keyFlux = Flux.empty(); @@ -229,10 +229,11 @@ public class DatabaseMapDictionaryDeep> implem @Override public Flux> getAllStages(@Nullable CompositeSnapshot snapshot) { - if (this.subStageGetter.needsKeyFlux()) { + if (this.subStageGetter.needsDebuggingKeyFlux()) { return dictionary .getRangeKeysGrouped(resolveSnapshot(snapshot), range, keyPrefix.length + keySuffixLength) .flatMapSequential(rangeKeys -> { + assert this.subStageGetter.isMultiKey() || rangeKeys.size() == 1; byte[] groupKeyWithExt = rangeKeys.get(0); byte[] groupKeyWithoutExt = removeExtFromFullKey(groupKeyWithExt); byte[] groupSuffix = this.stripPrefix(groupKeyWithoutExt); @@ -241,22 +242,25 @@ public class DatabaseMapDictionaryDeep> implem .subStage(dictionary, snapshot, groupKeyWithoutExt, - this.subStageGetter.needsKeyFlux() ? Flux.defer(() -> Flux.fromIterable(rangeKeys)) : Flux.empty() + Flux.fromIterable(rangeKeys) ) .map(us -> Map.entry(this.deserializeSuffix(groupSuffix), us)); }); } else { return dictionary - .getOneKey(resolveSnapshot(snapshot), range) - .flatMap(randomKeyWithExt -> { - byte[] keyWithoutExt = removeExtFromFullKey(randomKeyWithExt); - byte[] keySuffix = this.stripPrefix(keyWithoutExt); - assert subStageKeysConsistency(keyWithoutExt.length); + .getRangeKeyPrefixes(resolveSnapshot(snapshot), range, keyPrefix.length + keySuffixLength) + .flatMapSequential(groupKeyWithExt -> { + byte[] groupKeyWithoutExt = removeExtFromFullKey(groupKeyWithExt); + byte[] groupSuffix = this.stripPrefix(groupKeyWithoutExt); + assert subStageKeysConsistency(groupKeyWithExt.length); return this.subStageGetter - .subStage(dictionary, snapshot, keyWithoutExt, Mono.just(randomKeyWithExt).flux()) - .map(us -> Map.entry(this.deserializeSuffix(keySuffix), us)); - }) - .flux(); + .subStage(dictionary, + snapshot, + groupKeyWithoutExt, + Flux.empty() + ) + .map(us -> Map.entry(this.deserializeSuffix(groupSuffix), us)); + }); } } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetter.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetter.java index c130a84..18471d0 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetter.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetter.java @@ -11,7 +11,9 @@ public interface SubStageGetter> { Mono subStage(LLDictionary dictionary, @Nullable CompositeSnapshot snapshot, byte[] prefixKey, - Flux keyFlux); + Flux debuggingKeyFlux); - boolean needsKeyFlux(); + boolean isMultiKey(); + + boolean needsDebuggingKeyFlux(); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java index ac35f6f..f8458e8 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java @@ -33,22 +33,27 @@ public class SubStageGetterMap implements SubStageGetter, Databa public Mono> subStage(LLDictionary dictionary, @Nullable CompositeSnapshot snapshot, byte[] prefixKey, - Flux keyFlux) { + Flux debuggingKeyFlux) { Mono> result = Mono.just(DatabaseMapDictionary.tail(dictionary, prefixKey, keySerializer, valueSerializer )); if (assertsEnabled) { - return checkKeyFluxConsistency(prefixKey, keyFlux).then(result); + return checkKeyFluxConsistency(prefixKey, debuggingKeyFlux).then(result); } else { return result; } } @Override - public boolean needsKeyFlux() { + public boolean isMultiKey() { return true; } + @Override + public boolean needsDebuggingKeyFlux() { + return assertsEnabled; + } + private Mono checkKeyFluxConsistency(byte[] prefixKey, Flux keyFlux) { return keyFlux.doOnNext(key -> { assert key.length == prefixKey.length + getKeyBinaryLength(); diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java index 27302c1..0c161f1 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java @@ -47,7 +47,7 @@ public class SubStageGetterMapDeep> implements public Mono> subStage(LLDictionary dictionary, @Nullable CompositeSnapshot snapshot, byte[] prefixKey, - Flux keyFlux) { + Flux debuggingKeyFlux) { Mono> result = Mono.just(DatabaseMapDictionaryDeep.deepIntermediate(dictionary, prefixKey, keySerializer, @@ -55,17 +55,22 @@ public class SubStageGetterMapDeep> implements keyExtLength )); if (assertsEnabled) { - return checkKeyFluxConsistency(prefixKey, keyFlux).then(result); + return checkKeyFluxConsistency(prefixKey, debuggingKeyFlux).then(result); } else { return result; } } @Override - public boolean needsKeyFlux() { + public boolean isMultiKey() { return true; } + @Override + public boolean needsDebuggingKeyFlux() { + return assertsEnabled; + } + private Mono checkKeyFluxConsistency(byte[] prefixKey, Flux keyFlux) { return keyFlux.doOnNext(key -> { assert key.length == prefixKey.length + getKeyBinaryLength(); diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSet.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSet.java index 6a3abd0..0c2bd0f 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSet.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSet.java @@ -30,17 +30,22 @@ public class SubStageGetterSet implements SubStageGetter, Dat public Mono> subStage(LLDictionary dictionary, @Nullable CompositeSnapshot snapshot, byte[] prefixKey, - Flux keyFlux) { + Flux debuggingKeyFlux) { Mono> result = Mono.just(DatabaseSetDictionary.tail(dictionary, prefixKey, keySerializer)); if (assertsEnabled) { - return checkKeyFluxConsistency(prefixKey, keyFlux).then(result); + return checkKeyFluxConsistency(prefixKey, debuggingKeyFlux).then(result); } else { return result; } } @Override - public boolean needsKeyFlux() { + public boolean isMultiKey() { + return true; + } + + @Override + public boolean needsDebuggingKeyFlux() { return assertsEnabled; } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java index a7acef3..2f68053 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java @@ -10,6 +10,15 @@ import reactor.core.publisher.Mono; public class SubStageGetterSingle implements SubStageGetter> { + private static final boolean assertsEnabled; + static { + boolean assertsEnabledTmp = false; + //noinspection AssertWithSideEffects + assert assertsEnabledTmp = true; + //noinspection ConstantConditions + assertsEnabled = assertsEnabledTmp; + } + private final Serializer serializer; public SubStageGetterSingle(Serializer serializer) { @@ -20,8 +29,8 @@ public class SubStageGetterSingle implements SubStageGetter> subStage(LLDictionary dictionary, @Nullable CompositeSnapshot snapshot, byte[] keyPrefix, - Flux keyFlux) { - return keyFlux + Flux debuggingKeyFlux) { + return debuggingKeyFlux .singleOrEmpty() .flatMap(key -> Mono .>fromCallable(() -> { @@ -40,8 +49,13 @@ public class SubStageGetterSingle implements SubStageGetter getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) { + return new LLLocalLuceneKeyPrefixesReactiveIterator(db, + cfh, + prefixLength, + range, + resolveSnapshot(snapshot), + "getRangeKeysGrouped" + ).flux().subscribeOn(dbScheduler); + } + private Flux getRangeKeysSingle(LLSnapshot snapshot, byte[] key) { return this .containsKey(snapshot, key) diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneKeyPrefixesReactiveIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneKeyPrefixesReactiveIterator.java new file mode 100644 index 0000000..07961a8 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneKeyPrefixesReactiveIterator.java @@ -0,0 +1,78 @@ +package it.cavallium.dbengine.database.disk; + +import it.cavallium.dbengine.database.LLRange; +import java.util.Arrays; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.Slice; +import reactor.core.publisher.Flux; + +public class LLLocalLuceneKeyPrefixesReactiveIterator { + + private static final byte[] EMPTY = new byte[0]; + + private final RocksDB db; + private final ColumnFamilyHandle cfh; + private final int prefixLength; + private final LLRange range; + private final ReadOptions readOptions; + private final String debugName; + + public LLLocalLuceneKeyPrefixesReactiveIterator(RocksDB db, + ColumnFamilyHandle cfh, + int prefixLength, + LLRange range, + ReadOptions readOptions, + String debugName) { + this.db = db; + this.cfh = cfh; + this.prefixLength = prefixLength; + this.range = range; + this.readOptions = readOptions; + this.debugName = debugName; + } + + + @SuppressWarnings("Convert2MethodRef") + public Flux flux() { + return Flux + .generate(() -> { + var readOptions = new ReadOptions(this.readOptions); + readOptions.setFillCache(range.hasMin() && range.hasMax()); + if (range.hasMin()) { + readOptions.setIterateLowerBound(new Slice(range.getMin())); + } + if (range.hasMax()) { + readOptions.setIterateUpperBound(new Slice(range.getMax())); + } + readOptions.setPrefixSameAsStart(true); + var rocksIterator = db.newIterator(cfh, readOptions); + if (range.hasMin()) { + rocksIterator.seek(range.getMin()); + } else { + rocksIterator.seekToFirst(); + } + return rocksIterator; + }, (rocksIterator, sink) -> { + byte[] firstGroupKey = null; + + while (rocksIterator.isValid()) { + byte[] key = rocksIterator.key(); + if (firstGroupKey == null) { + firstGroupKey = key; + } else if (!Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) { + break; + } + rocksIterator.next(); + } + if (firstGroupKey != null) { + var groupKeyPrefix = Arrays.copyOf(firstGroupKey, prefixLength); + sink.next(groupKeyPrefix); + } else { + sink.complete(); + } + return rocksIterator; + }, rocksIterator1 -> rocksIterator1.close()); + } +}