Refactor iterations

This commit is contained in:
Andrea Cavalli 2021-03-14 13:24:46 +01:00
parent 08eb457235
commit 3d5f987ffd
9 changed files with 153 additions and 27 deletions

View File

@ -35,6 +35,8 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
Flux<List<byte[]>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength); Flux<List<byte[]>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength);
Flux<byte[]> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength);
Flux<Entry<byte[], byte[]>> setRange(LLRange range, Flux<Entry<byte[], byte[]>> entries, boolean getOldValues); Flux<Entry<byte[], byte[]>> setRange(LLRange range, Flux<Entry<byte[], byte[]>> entries, boolean getOldValues);
default Mono<Void> replaceRange(LLRange range, boolean canKeysChange, Function<Entry<byte[], byte[]>, Mono<Entry<byte[], byte[]>>> entriesReplacer) { default Mono<Void> replaceRange(LLRange range, boolean canKeysChange, Function<Entry<byte[], byte[]>, Mono<Entry<byte[], byte[]>>> entriesReplacer) {

View File

@ -214,7 +214,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
public Mono<US> at(@Nullable CompositeSnapshot snapshot, T keySuffix) { public Mono<US> at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
byte[] keySuffixData = serializeSuffix(keySuffix); byte[] keySuffixData = serializeSuffix(keySuffix);
Flux<byte[]> keyFlux; Flux<byte[]> keyFlux;
if (this.subStageGetter.needsKeyFlux()) { if (this.subStageGetter.needsDebuggingKeyFlux()) {
keyFlux = this.dictionary.getRangeKeys(resolveSnapshot(snapshot), toExtRange(keySuffixData)); keyFlux = this.dictionary.getRangeKeys(resolveSnapshot(snapshot), toExtRange(keySuffixData));
} else { } else {
keyFlux = Flux.empty(); keyFlux = Flux.empty();
@ -229,10 +229,11 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
@Override @Override
public Flux<Entry<T, US>> getAllStages(@Nullable CompositeSnapshot snapshot) { public Flux<Entry<T, US>> getAllStages(@Nullable CompositeSnapshot snapshot) {
if (this.subStageGetter.needsKeyFlux()) { if (this.subStageGetter.needsDebuggingKeyFlux()) {
return dictionary return dictionary
.getRangeKeysGrouped(resolveSnapshot(snapshot), range, keyPrefix.length + keySuffixLength) .getRangeKeysGrouped(resolveSnapshot(snapshot), range, keyPrefix.length + keySuffixLength)
.flatMapSequential(rangeKeys -> { .flatMapSequential(rangeKeys -> {
assert this.subStageGetter.isMultiKey() || rangeKeys.size() == 1;
byte[] groupKeyWithExt = rangeKeys.get(0); byte[] groupKeyWithExt = rangeKeys.get(0);
byte[] groupKeyWithoutExt = removeExtFromFullKey(groupKeyWithExt); byte[] groupKeyWithoutExt = removeExtFromFullKey(groupKeyWithExt);
byte[] groupSuffix = this.stripPrefix(groupKeyWithoutExt); byte[] groupSuffix = this.stripPrefix(groupKeyWithoutExt);
@ -241,22 +242,25 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
.subStage(dictionary, .subStage(dictionary,
snapshot, snapshot,
groupKeyWithoutExt, groupKeyWithoutExt,
this.subStageGetter.needsKeyFlux() ? Flux.defer(() -> Flux.fromIterable(rangeKeys)) : Flux.empty() Flux.fromIterable(rangeKeys)
) )
.map(us -> Map.entry(this.deserializeSuffix(groupSuffix), us)); .map(us -> Map.entry(this.deserializeSuffix(groupSuffix), us));
}); });
} else { } else {
return dictionary return dictionary
.getOneKey(resolveSnapshot(snapshot), range) .getRangeKeyPrefixes(resolveSnapshot(snapshot), range, keyPrefix.length + keySuffixLength)
.flatMap(randomKeyWithExt -> { .flatMapSequential(groupKeyWithExt -> {
byte[] keyWithoutExt = removeExtFromFullKey(randomKeyWithExt); byte[] groupKeyWithoutExt = removeExtFromFullKey(groupKeyWithExt);
byte[] keySuffix = this.stripPrefix(keyWithoutExt); byte[] groupSuffix = this.stripPrefix(groupKeyWithoutExt);
assert subStageKeysConsistency(keyWithoutExt.length); assert subStageKeysConsistency(groupKeyWithExt.length);
return this.subStageGetter return this.subStageGetter
.subStage(dictionary, snapshot, keyWithoutExt, Mono.just(randomKeyWithExt).flux()) .subStage(dictionary,
.map(us -> Map.entry(this.deserializeSuffix(keySuffix), us)); snapshot,
}) groupKeyWithoutExt,
.flux(); Flux.empty()
)
.map(us -> Map.entry(this.deserializeSuffix(groupSuffix), us));
});
} }
} }

View File

@ -11,7 +11,9 @@ public interface SubStageGetter<U, US extends DatabaseStage<U>> {
Mono<US> subStage(LLDictionary dictionary, Mono<US> subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot, @Nullable CompositeSnapshot snapshot,
byte[] prefixKey, byte[] prefixKey,
Flux<byte[]> keyFlux); Flux<byte[]> debuggingKeyFlux);
boolean needsKeyFlux(); boolean isMultiKey();
boolean needsDebuggingKeyFlux();
} }

View File

@ -33,22 +33,27 @@ public class SubStageGetterMap<T, U> implements SubStageGetter<Map<T, U>, Databa
public Mono<DatabaseMapDictionary<T, U>> subStage(LLDictionary dictionary, public Mono<DatabaseMapDictionary<T, U>> subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot, @Nullable CompositeSnapshot snapshot,
byte[] prefixKey, byte[] prefixKey,
Flux<byte[]> keyFlux) { Flux<byte[]> debuggingKeyFlux) {
Mono<DatabaseMapDictionary<T, U>> result = Mono.just(DatabaseMapDictionary.tail(dictionary, prefixKey, keySerializer, Mono<DatabaseMapDictionary<T, U>> result = Mono.just(DatabaseMapDictionary.tail(dictionary, prefixKey, keySerializer,
valueSerializer valueSerializer
)); ));
if (assertsEnabled) { if (assertsEnabled) {
return checkKeyFluxConsistency(prefixKey, keyFlux).then(result); return checkKeyFluxConsistency(prefixKey, debuggingKeyFlux).then(result);
} else { } else {
return result; return result;
} }
} }
@Override @Override
public boolean needsKeyFlux() { public boolean isMultiKey() {
return true; return true;
} }
@Override
public boolean needsDebuggingKeyFlux() {
return assertsEnabled;
}
private Mono<Void> checkKeyFluxConsistency(byte[] prefixKey, Flux<byte[]> keyFlux) { private Mono<Void> checkKeyFluxConsistency(byte[] prefixKey, Flux<byte[]> keyFlux) {
return keyFlux.doOnNext(key -> { return keyFlux.doOnNext(key -> {
assert key.length == prefixKey.length + getKeyBinaryLength(); assert key.length == prefixKey.length + getKeyBinaryLength();

View File

@ -47,7 +47,7 @@ public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements
public Mono<DatabaseMapDictionaryDeep<T, U, US>> subStage(LLDictionary dictionary, public Mono<DatabaseMapDictionaryDeep<T, U, US>> subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot, @Nullable CompositeSnapshot snapshot,
byte[] prefixKey, byte[] prefixKey,
Flux<byte[]> keyFlux) { Flux<byte[]> debuggingKeyFlux) {
Mono<DatabaseMapDictionaryDeep<T, U, US>> result = Mono.just(DatabaseMapDictionaryDeep.deepIntermediate(dictionary, Mono<DatabaseMapDictionaryDeep<T, U, US>> result = Mono.just(DatabaseMapDictionaryDeep.deepIntermediate(dictionary,
prefixKey, prefixKey,
keySerializer, keySerializer,
@ -55,17 +55,22 @@ public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements
keyExtLength keyExtLength
)); ));
if (assertsEnabled) { if (assertsEnabled) {
return checkKeyFluxConsistency(prefixKey, keyFlux).then(result); return checkKeyFluxConsistency(prefixKey, debuggingKeyFlux).then(result);
} else { } else {
return result; return result;
} }
} }
@Override @Override
public boolean needsKeyFlux() { public boolean isMultiKey() {
return true; return true;
} }
@Override
public boolean needsDebuggingKeyFlux() {
return assertsEnabled;
}
private Mono<Void> checkKeyFluxConsistency(byte[] prefixKey, Flux<byte[]> keyFlux) { private Mono<Void> checkKeyFluxConsistency(byte[] prefixKey, Flux<byte[]> keyFlux) {
return keyFlux.doOnNext(key -> { return keyFlux.doOnNext(key -> {
assert key.length == prefixKey.length + getKeyBinaryLength(); assert key.length == prefixKey.length + getKeyBinaryLength();

View File

@ -30,17 +30,22 @@ public class SubStageGetterSet<T> implements SubStageGetter<Map<T, Nothing>, Dat
public Mono<DatabaseSetDictionary<T>> subStage(LLDictionary dictionary, public Mono<DatabaseSetDictionary<T>> subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot, @Nullable CompositeSnapshot snapshot,
byte[] prefixKey, byte[] prefixKey,
Flux<byte[]> keyFlux) { Flux<byte[]> debuggingKeyFlux) {
Mono<DatabaseSetDictionary<T>> result = Mono.just(DatabaseSetDictionary.tail(dictionary, prefixKey, keySerializer)); Mono<DatabaseSetDictionary<T>> result = Mono.just(DatabaseSetDictionary.tail(dictionary, prefixKey, keySerializer));
if (assertsEnabled) { if (assertsEnabled) {
return checkKeyFluxConsistency(prefixKey, keyFlux).then(result); return checkKeyFluxConsistency(prefixKey, debuggingKeyFlux).then(result);
} else { } else {
return result; return result;
} }
} }
@Override @Override
public boolean needsKeyFlux() { public boolean isMultiKey() {
return true;
}
@Override
public boolean needsDebuggingKeyFlux() {
return assertsEnabled; return assertsEnabled;
} }

View File

@ -10,6 +10,15 @@ import reactor.core.publisher.Mono;
public class SubStageGetterSingle<T> implements SubStageGetter<T, DatabaseStageEntry<T>> { public class SubStageGetterSingle<T> implements SubStageGetter<T, DatabaseStageEntry<T>> {
private static final boolean assertsEnabled;
static {
boolean assertsEnabledTmp = false;
//noinspection AssertWithSideEffects
assert assertsEnabledTmp = true;
//noinspection ConstantConditions
assertsEnabled = assertsEnabledTmp;
}
private final Serializer<T, byte[]> serializer; private final Serializer<T, byte[]> serializer;
public SubStageGetterSingle(Serializer<T, byte[]> serializer) { public SubStageGetterSingle(Serializer<T, byte[]> serializer) {
@ -20,8 +29,8 @@ public class SubStageGetterSingle<T> implements SubStageGetter<T, DatabaseStageE
public Mono<DatabaseStageEntry<T>> subStage(LLDictionary dictionary, public Mono<DatabaseStageEntry<T>> subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot, @Nullable CompositeSnapshot snapshot,
byte[] keyPrefix, byte[] keyPrefix,
Flux<byte[]> keyFlux) { Flux<byte[]> debuggingKeyFlux) {
return keyFlux return debuggingKeyFlux
.singleOrEmpty() .singleOrEmpty()
.flatMap(key -> Mono .flatMap(key -> Mono
.<DatabaseStageEntry<T>>fromCallable(() -> { .<DatabaseStageEntry<T>>fromCallable(() -> {
@ -40,8 +49,13 @@ public class SubStageGetterSingle<T> implements SubStageGetter<T, DatabaseStageE
} }
@Override @Override
public boolean needsKeyFlux() { public boolean isMultiKey() {
return true; return false;
}
@Override
public boolean needsDebuggingKeyFlux() {
return assertsEnabled;
} }
} }

View File

@ -584,6 +584,17 @@ public class LLLocalDictionary implements LLDictionary {
).flux().subscribeOn(dbScheduler); ).flux().subscribeOn(dbScheduler);
} }
@Override
public Flux<byte[]> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) {
return new LLLocalLuceneKeyPrefixesReactiveIterator(db,
cfh,
prefixLength,
range,
resolveSnapshot(snapshot),
"getRangeKeysGrouped"
).flux().subscribeOn(dbScheduler);
}
private Flux<byte[]> getRangeKeysSingle(LLSnapshot snapshot, byte[] key) { private Flux<byte[]> getRangeKeysSingle(LLSnapshot snapshot, byte[] key) {
return this return this
.containsKey(snapshot, key) .containsKey(snapshot, key)

View File

@ -0,0 +1,78 @@
package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.LLRange;
import java.util.Arrays;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.Slice;
import reactor.core.publisher.Flux;
public class LLLocalLuceneKeyPrefixesReactiveIterator {
private static final byte[] EMPTY = new byte[0];
private final RocksDB db;
private final ColumnFamilyHandle cfh;
private final int prefixLength;
private final LLRange range;
private final ReadOptions readOptions;
private final String debugName;
public LLLocalLuceneKeyPrefixesReactiveIterator(RocksDB db,
ColumnFamilyHandle cfh,
int prefixLength,
LLRange range,
ReadOptions readOptions,
String debugName) {
this.db = db;
this.cfh = cfh;
this.prefixLength = prefixLength;
this.range = range;
this.readOptions = readOptions;
this.debugName = debugName;
}
@SuppressWarnings("Convert2MethodRef")
public Flux<byte[]> flux() {
return Flux
.generate(() -> {
var readOptions = new ReadOptions(this.readOptions);
readOptions.setFillCache(range.hasMin() && range.hasMax());
if (range.hasMin()) {
readOptions.setIterateLowerBound(new Slice(range.getMin()));
}
if (range.hasMax()) {
readOptions.setIterateUpperBound(new Slice(range.getMax()));
}
readOptions.setPrefixSameAsStart(true);
var rocksIterator = db.newIterator(cfh, readOptions);
if (range.hasMin()) {
rocksIterator.seek(range.getMin());
} else {
rocksIterator.seekToFirst();
}
return rocksIterator;
}, (rocksIterator, sink) -> {
byte[] firstGroupKey = null;
while (rocksIterator.isValid()) {
byte[] key = rocksIterator.key();
if (firstGroupKey == null) {
firstGroupKey = key;
} else if (!Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) {
break;
}
rocksIterator.next();
}
if (firstGroupKey != null) {
var groupKeyPrefix = Arrays.copyOf(firstGroupKey, prefixLength);
sink.next(groupKeyPrefix);
} else {
sink.complete();
}
return rocksIterator;
}, rocksIterator1 -> rocksIterator1.close());
}
}