Refactor iterations

This commit is contained in:
Andrea Cavalli 2021-03-14 13:08:03 +01:00
parent 5f3bf768ad
commit 08eb457235
9 changed files with 149 additions and 148 deletions

View File

@ -213,13 +213,17 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
@Override @Override
public Mono<US> at(@Nullable CompositeSnapshot snapshot, T keySuffix) { public Mono<US> at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
byte[] keySuffixData = serializeSuffix(keySuffix); byte[] keySuffixData = serializeSuffix(keySuffix);
Flux<byte[]> keyFlux;
if (this.subStageGetter.needsKeyFlux()) {
keyFlux = this.dictionary.getRangeKeys(resolveSnapshot(snapshot), toExtRange(keySuffixData));
} else {
keyFlux = Flux.empty();
}
return this.subStageGetter return this.subStageGetter
.subStage(dictionary, .subStage(dictionary,
snapshot, snapshot,
toKeyWithoutExt(keySuffixData), toKeyWithoutExt(keySuffixData),
this.subStageGetter.needsKeyFlux() keyFlux
? this.dictionary.getRangeKeys(resolveSnapshot(snapshot), toExtRange(keySuffixData))
: Flux.empty()
); );
} }

View File

@ -84,19 +84,17 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
} }
default Mono<Void> replaceAllValues(boolean canKeysChange, Function<Entry<T, U>, Mono<Entry<T, U>>> entriesReplacer) { default Mono<Void> replaceAllValues(boolean canKeysChange, Function<Entry<T, U>, Mono<Entry<T, U>>> entriesReplacer) {
return Mono.defer(() -> { if (canKeysChange) {
if (canKeysChange) { return this.setAllValues(this.getAllValues(null).flatMap(entriesReplacer)).then();
return this.setAllValues(this.getAllValues(null).flatMap(entriesReplacer)).then(); } else {
} else { return this
return this .getAllValues(null)
.getAllValues(null) .flatMap(entriesReplacer)
.flatMap(entriesReplacer) .flatMap(replacedEntry -> this
.flatMap(replacedEntry -> this .at(null, replacedEntry.getKey())
.at(null, replacedEntry.getKey()) .map(entry -> entry.set(replacedEntry.getValue())))
.map(entry -> entry.set(replacedEntry.getValue()))) .then();
.then(); }
}
});
} }
default Mono<Void> replaceAll(Function<Entry<T, US>, Mono<Void>> entriesReplacer) { default Mono<Void> replaceAll(Function<Entry<T, US>, Mono<Void>> entriesReplacer) {

View File

@ -46,7 +46,7 @@ public class SubStageGetterMap<T, U> implements SubStageGetter<Map<T, U>, Databa
@Override @Override
public boolean needsKeyFlux() { public boolean needsKeyFlux() {
return assertsEnabled; return true;
} }
private Mono<Void> checkKeyFluxConsistency(byte[] prefixKey, Flux<byte[]> keyFlux) { private Mono<Void> checkKeyFluxConsistency(byte[] prefixKey, Flux<byte[]> keyFlux) {

View File

@ -63,7 +63,7 @@ public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements
@Override @Override
public boolean needsKeyFlux() { public boolean needsKeyFlux() {
return assertsEnabled; return true;
} }
private Mono<Void> checkKeyFluxConsistency(byte[] prefixKey, Flux<byte[]> keyFlux) { private Mono<Void> checkKeyFluxConsistency(byte[] prefixKey, Flux<byte[]> keyFlux) {

View File

@ -168,6 +168,7 @@ public class LLLocalDictionary implements LLDictionary {
.fromCallable(() -> { .fromCallable(() -> {
var readOpts = resolveSnapshot(snapshot); var readOpts = resolveSnapshot(snapshot);
readOpts.setVerifyChecksums(false); readOpts.setVerifyChecksums(false);
readOpts.setFillCache(false);
if (range.hasMin()) { if (range.hasMin()) {
readOpts.setIterateLowerBound(new Slice(range.getMin())); readOpts.setIterateLowerBound(new Slice(range.getMin()));
} }
@ -355,49 +356,47 @@ public class LLLocalDictionary implements LLDictionary {
} }
private Mono<byte[]> getPrevValue(byte[] key, LLDictionaryResultType resultType) { private Mono<byte[]> getPrevValue(byte[] key, LLDictionaryResultType resultType) {
return Mono.defer(() -> { switch (resultType) {
switch (resultType) { case VALUE_CHANGED:
case VALUE_CHANGED: return containsKey(null, key).single().map(LLUtils::booleanToResponse);
return containsKey(null, key).single().map(LLUtils::booleanToResponse); case PREVIOUS_VALUE:
case PREVIOUS_VALUE: return Mono
return Mono .fromCallable(() -> {
.fromCallable(() -> { StampedLock lock;
StampedLock lock; long stamp;
long stamp; if (updateMode == UpdateMode.ALLOW) {
if (updateMode == UpdateMode.ALLOW) { lock = itemsLock.getAt(getLockIndex(key));
lock = itemsLock.getAt(getLockIndex(key));
stamp = lock.readLock();
stamp = lock.readLock(); } else {
} else { lock = null;
lock = null; stamp = 0;
stamp = 0; }
} try {
try { logger.trace("Reading {}", key);
logger.trace("Reading {}", key); var data = new Holder<byte[]>();
var data = new Holder<byte[]>(); if (db.keyMayExist(cfh, key, data)) {
if (db.keyMayExist(cfh, key, data)) { if (data.getValue() != null) {
if (data.getValue() != null) { return data.getValue();
return data.getValue();
} else {
return db.get(cfh, key);
}
} else { } else {
return null; return db.get(cfh, key);
}
} finally {
if (updateMode == UpdateMode.ALLOW) {
lock.unlockRead(stamp);
} }
} else {
return null;
} }
}) } finally {
.onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(key), cause)) if (updateMode == UpdateMode.ALLOW) {
.subscribeOn(dbScheduler); lock.unlockRead(stamp);
case VOID: }
return Mono.empty(); }
default: })
return Mono.error(new IllegalStateException("Unexpected value: " + resultType)); .onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(key), cause))
} .subscribeOn(dbScheduler);
}); case VOID:
return Mono.empty();
default:
return Mono.error(new IllegalStateException("Unexpected value: " + resultType));
}
} }
@Override @Override
@ -522,26 +521,22 @@ public class LLLocalDictionary implements LLDictionary {
@Override @Override
public Flux<Entry<byte[], byte[]>> getRange(@Nullable LLSnapshot snapshot, LLRange range) { public Flux<Entry<byte[], byte[]>> getRange(@Nullable LLSnapshot snapshot, LLRange range) {
return Flux.defer(() -> { if (range.isSingle()) {
if (range.isSingle()) { return getRangeSingle(snapshot, range.getMin());
return getRangeSingle(snapshot, range.getMin()); } else {
} else { return getRangeMulti(snapshot, range);
return getRangeMulti(snapshot, range); }
}
});
} }
@Override @Override
public Flux<List<Entry<byte[], byte[]>>> getRangeGrouped(@Nullable LLSnapshot snapshot, public Flux<List<Entry<byte[], byte[]>>> getRangeGrouped(@Nullable LLSnapshot snapshot,
LLRange range, LLRange range,
int prefixLength) { int prefixLength) {
return Flux.defer(() -> { if (range.isSingle()) {
if (range.isSingle()) { return getRangeSingle(snapshot, range.getMin()).map(List::of);
return getRangeSingle(snapshot, range.getMin()).map(List::of); } else {
} else { return getRangeMultiGrouped(snapshot, range, prefixLength);
return getRangeMultiGrouped(snapshot, range, prefixLength); }
}
});
} }
private Flux<Entry<byte[],byte[]>> getRangeSingle(LLSnapshot snapshot, byte[] key) { private Flux<Entry<byte[],byte[]>> getRangeSingle(LLSnapshot snapshot, byte[] key) {
@ -552,22 +547,30 @@ public class LLLocalDictionary implements LLDictionary {
} }
private Flux<Entry<byte[],byte[]>> getRangeMulti(LLSnapshot snapshot, LLRange range) { private Flux<Entry<byte[],byte[]>> getRangeMulti(LLSnapshot snapshot, LLRange range) {
return new LLLocalLuceneEntryReactiveIterator(db, cfh, range, resolveSnapshot(snapshot)).subscribeOn(dbScheduler); return new LLLocalLuceneEntryReactiveIterator(db, cfh, range, resolveSnapshot(snapshot))
.flux()
.subscribeOn(dbScheduler);
} }
private Flux<List<Entry<byte[],byte[]>>> getRangeMultiGrouped(LLSnapshot snapshot, LLRange range, int prefixLength) { private Flux<List<Entry<byte[],byte[]>>> getRangeMultiGrouped(LLSnapshot snapshot, LLRange range, int prefixLength) {
return new LLLocalLuceneGroupedEntryReactiveIterator(db, cfh, prefixLength, range, resolveSnapshot(snapshot)).subscribeOn(dbScheduler); return new LLLocalLuceneGroupedEntryReactiveIterator(db,
cfh,
prefixLength,
range,
resolveSnapshot(snapshot),
"getRangeMultiGrouped"
)
.flux()
.subscribeOn(dbScheduler);
} }
@Override @Override
public Flux<byte[]> getRangeKeys(@Nullable LLSnapshot snapshot, LLRange range) { public Flux<byte[]> getRangeKeys(@Nullable LLSnapshot snapshot, LLRange range) {
return Flux.defer(() -> { if (range.isSingle()) {
if (range.isSingle()) { return getRangeKeysSingle(snapshot, range.getMin());
return getRangeKeysSingle(snapshot, range.getMin()); } else {
} else { return getRangeKeysMulti(snapshot, range);
return getRangeKeysMulti(snapshot, range); }
}
});
} }
@Override @Override
@ -576,8 +579,9 @@ public class LLLocalDictionary implements LLDictionary {
cfh, cfh,
prefixLength, prefixLength,
range, range,
resolveSnapshot(snapshot) resolveSnapshot(snapshot),
).subscribeOn(dbScheduler); "getRangeKeysGrouped"
).flux().subscribeOn(dbScheduler);
} }
private Flux<byte[]> getRangeKeysSingle(LLSnapshot snapshot, byte[] key) { private Flux<byte[]> getRangeKeysSingle(LLSnapshot snapshot, byte[] key) {
@ -589,14 +593,14 @@ public class LLLocalDictionary implements LLDictionary {
} }
private Flux<byte[]> getRangeKeysMulti(LLSnapshot snapshot, LLRange range) { private Flux<byte[]> getRangeKeysMulti(LLSnapshot snapshot, LLRange range) {
return new LLLocalLuceneKeysReactiveIterator(db, cfh, range, resolveSnapshot(snapshot)).subscribeOn(dbScheduler); return new LLLocalLuceneKeysReactiveIterator(db, cfh, range, resolveSnapshot(snapshot)).flux().subscribeOn(dbScheduler);
} }
@Override @Override
public Flux<Entry<byte[], byte[]>> setRange(LLRange range, public Flux<Entry<byte[], byte[]>> setRange(LLRange range,
Flux<Entry<byte[], byte[]>> entries, Flux<Entry<byte[], byte[]>> entries,
boolean getOldValues) { boolean getOldValues) {
return Flux.defer(() -> Flux return Flux
.usingWhen( .usingWhen(
Mono Mono
.fromCallable(() -> new CappedWriteBatch(db, .fromCallable(() -> new CappedWriteBatch(db,
@ -653,8 +657,7 @@ public class LLLocalDictionary implements LLDictionary {
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
) )
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
.onErrorMap(cause -> new IOException("Failed to write range", cause)) .onErrorMap(cause -> new IOException("Failed to write range", cause));
);
} }
private static byte[] incrementLexicographically(byte[] key) { private static byte[] incrementLexicographically(byte[] key) {
@ -685,7 +688,7 @@ public class LLLocalDictionary implements LLDictionary {
.<Void>fromCallable(() -> { .<Void>fromCallable(() -> {
var readOpts = getReadOptions(null); var readOpts = getReadOptions(null);
readOpts.setVerifyChecksums(false); readOpts.setVerifyChecksums(false);
// readOpts.setIgnoreRangeDeletions(true); // readOpts.setIgnoreRangeDeletions(true);
readOpts.setFillCache(false); readOpts.setFillCache(false);
try (CappedWriteBatch writeBatch = new CappedWriteBatch(db, try (CappedWriteBatch writeBatch = new CappedWriteBatch(db,
@ -728,44 +731,41 @@ public class LLLocalDictionary implements LLDictionary {
@Override @Override
public Mono<Long> sizeRange(@Nullable LLSnapshot snapshot, LLRange range, boolean fast) { public Mono<Long> sizeRange(@Nullable LLSnapshot snapshot, LLRange range, boolean fast) {
return Mono if (range.isAll()) {
.defer(() -> { return Mono
if (range.isAll()) { .fromCallable(() -> fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot))
return Mono .onErrorMap(IOException::new)
.fromCallable(() -> fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot)) .subscribeOn(dbScheduler);
.onErrorMap(IOException::new) } else {
.subscribeOn(dbScheduler); return Mono
} else { .fromCallable(() -> {
return Mono var readOpts = resolveSnapshot(snapshot);
.fromCallable(() -> { readOpts.setFillCache(false);
var readOpts = resolveSnapshot(snapshot); readOpts.setVerifyChecksums(false);
readOpts.setFillCache(false); if (range.hasMin()) {
readOpts.setVerifyChecksums(false); readOpts.setIterateLowerBound(new Slice(range.getMin()));
if (range.hasMin()) { }
readOpts.setIterateLowerBound(new Slice(range.getMin())); if (range.hasMax()) {
} readOpts.setIterateUpperBound(new Slice(range.getMax()));
if (range.hasMax()) { }
readOpts.setIterateUpperBound(new Slice(range.getMax())); if (fast) {
} readOpts.setIgnoreRangeDeletions(true);
if (fast) {
readOpts.setIgnoreRangeDeletions(true); }
try (var iter = db.newIterator(cfh, readOpts)) {
} iter.seekToFirst();
try (var iter = db.newIterator(cfh, readOpts)) { long i = 0;
iter.seekToFirst(); while (iter.isValid()) {
long i = 0; iter.next();
while (iter.isValid()) { i++;
iter.next(); }
i++; return i;
} }
return i; })
} .onErrorMap(cause -> new IOException("Failed to get size of range "
}) + range.toString(), cause))
.onErrorMap(cause -> new IOException("Failed to get size of range " .subscribeOn(dbScheduler);
+ range.toString(), cause)) }
.subscribeOn(dbScheduler);
}
});
} }
@Override @Override

View File

@ -13,8 +13,9 @@ public class LLLocalLuceneGroupedEntryReactiveIterator extends LLLocalLuceneGrou
ColumnFamilyHandle cfh, ColumnFamilyHandle cfh,
int prefixLength, int prefixLength,
LLRange range, LLRange range,
ReadOptions readOptions) { ReadOptions readOptions,
super(db, cfh, prefixLength, range, readOptions, true); String debugName) {
super(db, cfh, prefixLength, range, readOptions, true, debugName);
} }
@Override @Override

View File

@ -11,8 +11,9 @@ public class LLLocalLuceneGroupedKeysReactiveIterator extends LLLocalLuceneGroup
ColumnFamilyHandle cfh, ColumnFamilyHandle cfh,
int prefixLength, int prefixLength,
LLRange range, LLRange range,
ReadOptions readOptions) { ReadOptions readOptions,
super(db, cfh, prefixLength, range, readOptions, false); String debugName) {
super(db, cfh, prefixLength, range, readOptions, false, debugName);
} }
@Override @Override

View File

@ -4,15 +4,13 @@ import it.cavallium.dbengine.database.LLRange;
import it.unimi.dsi.fastutil.objects.ObjectArrayList; import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.jetbrains.annotations.NotNull;
import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions; import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB; import org.rocksdb.RocksDB;
import org.rocksdb.Slice; import org.rocksdb.Slice;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
public abstract class LLLocalLuceneGroupedReactiveIterator<T> extends Flux<List<T>> { public abstract class LLLocalLuceneGroupedReactiveIterator<T> {
private static final byte[] EMPTY = new byte[0]; private static final byte[] EMPTY = new byte[0];
@ -22,24 +20,28 @@ public abstract class LLLocalLuceneGroupedReactiveIterator<T> extends Flux<List<
private final LLRange range; private final LLRange range;
private final ReadOptions readOptions; private final ReadOptions readOptions;
private final boolean readValues; private final boolean readValues;
private final String debugName;
public LLLocalLuceneGroupedReactiveIterator(RocksDB db, public LLLocalLuceneGroupedReactiveIterator(RocksDB db,
ColumnFamilyHandle cfh, ColumnFamilyHandle cfh,
int prefixLength, int prefixLength,
LLRange range, LLRange range,
ReadOptions readOptions, ReadOptions readOptions,
boolean readValues) { boolean readValues,
String debugName) {
this.db = db; this.db = db;
this.cfh = cfh; this.cfh = cfh;
this.prefixLength = prefixLength; this.prefixLength = prefixLength;
this.range = range; this.range = range;
this.readOptions = readOptions; this.readOptions = readOptions;
this.readValues = readValues; this.readValues = readValues;
this.debugName = debugName;
} }
@Override
public void subscribe(@NotNull CoreSubscriber<? super List<T>> actual) { @SuppressWarnings("Convert2MethodRef")
Flux<List<T>> flux = Flux public Flux<List<T>> flux() {
return Flux
.generate(() -> { .generate(() -> {
var readOptions = new ReadOptions(this.readOptions); var readOptions = new ReadOptions(this.readOptions);
readOptions.setFillCache(range.hasMin() && range.hasMax()); readOptions.setFillCache(range.hasMin() && range.hasMax());
@ -78,8 +80,7 @@ public abstract class LLLocalLuceneGroupedReactiveIterator<T> extends Flux<List<
sink.complete(); sink.complete();
} }
return rocksIterator; return rocksIterator;
}, tuple -> {}); }, rocksIterator1 -> rocksIterator1.close());
flux.subscribe(actual);
} }
public abstract T getEntry(byte[] key, byte[] value); public abstract T getEntry(byte[] key, byte[] value);

View File

@ -1,16 +1,13 @@
package it.cavallium.dbengine.database.disk; package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLRange;
import org.jetbrains.annotations.NotNull;
import org.rocksdb.AbstractImmutableNativeReference;
import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions; import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB; import org.rocksdb.RocksDB;
import org.rocksdb.Slice; import org.rocksdb.Slice;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
public abstract class LLLocalLuceneReactiveIterator<T> extends Flux<T> { public abstract class LLLocalLuceneReactiveIterator<T> {
private static final byte[] EMPTY = new byte[0]; private static final byte[] EMPTY = new byte[0];
@ -32,9 +29,9 @@ public abstract class LLLocalLuceneReactiveIterator<T> extends Flux<T> {
this.readValues = readValues; this.readValues = readValues;
} }
@Override @SuppressWarnings("Convert2MethodRef")
public void subscribe(@NotNull CoreSubscriber<? super T> actual) { public Flux<T> flux() {
Flux<T> flux = Flux return Flux
.generate(() -> { .generate(() -> {
var readOptions = new ReadOptions(this.readOptions); var readOptions = new ReadOptions(this.readOptions);
readOptions.setFillCache(range.hasMin() && range.hasMax()); readOptions.setFillCache(range.hasMin() && range.hasMax());
@ -61,8 +58,7 @@ public abstract class LLLocalLuceneReactiveIterator<T> extends Flux<T> {
sink.complete(); sink.complete();
} }
return rocksIterator; return rocksIterator;
}, AbstractImmutableNativeReference::close); }, rocksIterator1 -> rocksIterator1.close());
flux.subscribe(actual);
} }
public abstract T getEntry(byte[] key, byte[] value); public abstract T getEntry(byte[] key, byte[] value);