From 08eb457235d283fd267eafdd2bb5c853e3088279 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sun, 14 Mar 2021 13:08:03 +0100 Subject: [PATCH] Refactor iterations --- .../DatabaseMapDictionaryDeep.java | 10 +- .../collections/DatabaseStageMap.java | 24 +- .../collections/SubStageGetterMap.java | 2 +- .../collections/SubStageGetterMapDeep.java | 2 +- .../database/disk/LLLocalDictionary.java | 216 +++++++++--------- ...calLuceneGroupedEntryReactiveIterator.java | 5 +- ...ocalLuceneGroupedKeysReactiveIterator.java | 5 +- .../LLLocalLuceneGroupedReactiveIterator.java | 19 +- .../disk/LLLocalLuceneReactiveIterator.java | 14 +- 9 files changed, 149 insertions(+), 148 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index 5d39d76..4f24665 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -213,13 +213,17 @@ public class DatabaseMapDictionaryDeep> implem @Override public Mono at(@Nullable CompositeSnapshot snapshot, T keySuffix) { byte[] keySuffixData = serializeSuffix(keySuffix); + Flux keyFlux; + if (this.subStageGetter.needsKeyFlux()) { + keyFlux = this.dictionary.getRangeKeys(resolveSnapshot(snapshot), toExtRange(keySuffixData)); + } else { + keyFlux = Flux.empty(); + } return this.subStageGetter .subStage(dictionary, snapshot, toKeyWithoutExt(keySuffixData), - this.subStageGetter.needsKeyFlux() - ? this.dictionary.getRangeKeys(resolveSnapshot(snapshot), toExtRange(keySuffixData)) - : Flux.empty() + keyFlux ); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java index ec25b3a..1885792 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java @@ -84,19 +84,17 @@ public interface DatabaseStageMap> extends Dat } default Mono replaceAllValues(boolean canKeysChange, Function, Mono>> entriesReplacer) { - return Mono.defer(() -> { - if (canKeysChange) { - return this.setAllValues(this.getAllValues(null).flatMap(entriesReplacer)).then(); - } else { - return this - .getAllValues(null) - .flatMap(entriesReplacer) - .flatMap(replacedEntry -> this - .at(null, replacedEntry.getKey()) - .map(entry -> entry.set(replacedEntry.getValue()))) - .then(); - } - }); + if (canKeysChange) { + return this.setAllValues(this.getAllValues(null).flatMap(entriesReplacer)).then(); + } else { + return this + .getAllValues(null) + .flatMap(entriesReplacer) + .flatMap(replacedEntry -> this + .at(null, replacedEntry.getKey()) + .map(entry -> entry.set(replacedEntry.getValue()))) + .then(); + } } default Mono replaceAll(Function, Mono> entriesReplacer) { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java index bd948a0..ac35f6f 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java @@ -46,7 +46,7 @@ public class SubStageGetterMap implements SubStageGetter, Databa @Override public boolean needsKeyFlux() { - return assertsEnabled; + return true; } private Mono checkKeyFluxConsistency(byte[] prefixKey, Flux keyFlux) { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java index 2786e1e..27302c1 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java @@ -63,7 +63,7 @@ public class SubStageGetterMapDeep> implements @Override public boolean needsKeyFlux() { - return assertsEnabled; + return true; } private Mono checkKeyFluxConsistency(byte[] prefixKey, Flux keyFlux) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index e4c36c1..505e58a 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -168,6 +168,7 @@ public class LLLocalDictionary implements LLDictionary { .fromCallable(() -> { var readOpts = resolveSnapshot(snapshot); readOpts.setVerifyChecksums(false); + readOpts.setFillCache(false); if (range.hasMin()) { readOpts.setIterateLowerBound(new Slice(range.getMin())); } @@ -355,49 +356,47 @@ public class LLLocalDictionary implements LLDictionary { } private Mono getPrevValue(byte[] key, LLDictionaryResultType resultType) { - return Mono.defer(() -> { - switch (resultType) { - case VALUE_CHANGED: - return containsKey(null, key).single().map(LLUtils::booleanToResponse); - case PREVIOUS_VALUE: - return Mono - .fromCallable(() -> { - StampedLock lock; - long stamp; - if (updateMode == UpdateMode.ALLOW) { - lock = itemsLock.getAt(getLockIndex(key)); - - stamp = lock.readLock(); - } else { - lock = null; - stamp = 0; - } - try { - logger.trace("Reading {}", key); - var data = new Holder(); - if (db.keyMayExist(cfh, key, data)) { - if (data.getValue() != null) { - return data.getValue(); - } else { - return db.get(cfh, key); - } + switch (resultType) { + case VALUE_CHANGED: + return containsKey(null, key).single().map(LLUtils::booleanToResponse); + case PREVIOUS_VALUE: + return Mono + .fromCallable(() -> { + StampedLock lock; + long stamp; + if (updateMode == UpdateMode.ALLOW) { + lock = itemsLock.getAt(getLockIndex(key)); + + stamp = lock.readLock(); + } else { + lock = null; + stamp = 0; + } + try { + logger.trace("Reading {}", key); + var data = new Holder(); + if (db.keyMayExist(cfh, key, data)) { + if (data.getValue() != null) { + return data.getValue(); } else { - return null; - } - } finally { - if (updateMode == UpdateMode.ALLOW) { - lock.unlockRead(stamp); + return db.get(cfh, key); } + } else { + return null; } - }) - .onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(key), cause)) - .subscribeOn(dbScheduler); - case VOID: - return Mono.empty(); - default: - return Mono.error(new IllegalStateException("Unexpected value: " + resultType)); - } - }); + } finally { + if (updateMode == UpdateMode.ALLOW) { + lock.unlockRead(stamp); + } + } + }) + .onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(key), cause)) + .subscribeOn(dbScheduler); + case VOID: + return Mono.empty(); + default: + return Mono.error(new IllegalStateException("Unexpected value: " + resultType)); + } } @Override @@ -522,26 +521,22 @@ public class LLLocalDictionary implements LLDictionary { @Override public Flux> getRange(@Nullable LLSnapshot snapshot, LLRange range) { - return Flux.defer(() -> { - if (range.isSingle()) { - return getRangeSingle(snapshot, range.getMin()); - } else { - return getRangeMulti(snapshot, range); - } - }); + if (range.isSingle()) { + return getRangeSingle(snapshot, range.getMin()); + } else { + return getRangeMulti(snapshot, range); + } } @Override public Flux>> getRangeGrouped(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) { - return Flux.defer(() -> { - if (range.isSingle()) { - return getRangeSingle(snapshot, range.getMin()).map(List::of); - } else { - return getRangeMultiGrouped(snapshot, range, prefixLength); - } - }); + if (range.isSingle()) { + return getRangeSingle(snapshot, range.getMin()).map(List::of); + } else { + return getRangeMultiGrouped(snapshot, range, prefixLength); + } } private Flux> getRangeSingle(LLSnapshot snapshot, byte[] key) { @@ -552,22 +547,30 @@ public class LLLocalDictionary implements LLDictionary { } private Flux> getRangeMulti(LLSnapshot snapshot, LLRange range) { - return new LLLocalLuceneEntryReactiveIterator(db, cfh, range, resolveSnapshot(snapshot)).subscribeOn(dbScheduler); + return new LLLocalLuceneEntryReactiveIterator(db, cfh, range, resolveSnapshot(snapshot)) + .flux() + .subscribeOn(dbScheduler); } private Flux>> getRangeMultiGrouped(LLSnapshot snapshot, LLRange range, int prefixLength) { - return new LLLocalLuceneGroupedEntryReactiveIterator(db, cfh, prefixLength, range, resolveSnapshot(snapshot)).subscribeOn(dbScheduler); + return new LLLocalLuceneGroupedEntryReactiveIterator(db, + cfh, + prefixLength, + range, + resolveSnapshot(snapshot), + "getRangeMultiGrouped" + ) + .flux() + .subscribeOn(dbScheduler); } @Override public Flux getRangeKeys(@Nullable LLSnapshot snapshot, LLRange range) { - return Flux.defer(() -> { - if (range.isSingle()) { - return getRangeKeysSingle(snapshot, range.getMin()); - } else { - return getRangeKeysMulti(snapshot, range); - } - }); + if (range.isSingle()) { + return getRangeKeysSingle(snapshot, range.getMin()); + } else { + return getRangeKeysMulti(snapshot, range); + } } @Override @@ -576,8 +579,9 @@ public class LLLocalDictionary implements LLDictionary { cfh, prefixLength, range, - resolveSnapshot(snapshot) - ).subscribeOn(dbScheduler); + resolveSnapshot(snapshot), + "getRangeKeysGrouped" + ).flux().subscribeOn(dbScheduler); } private Flux getRangeKeysSingle(LLSnapshot snapshot, byte[] key) { @@ -589,14 +593,14 @@ public class LLLocalDictionary implements LLDictionary { } private Flux getRangeKeysMulti(LLSnapshot snapshot, LLRange range) { - return new LLLocalLuceneKeysReactiveIterator(db, cfh, range, resolveSnapshot(snapshot)).subscribeOn(dbScheduler); + return new LLLocalLuceneKeysReactiveIterator(db, cfh, range, resolveSnapshot(snapshot)).flux().subscribeOn(dbScheduler); } @Override public Flux> setRange(LLRange range, Flux> entries, boolean getOldValues) { - return Flux.defer(() -> Flux + return Flux .usingWhen( Mono .fromCallable(() -> new CappedWriteBatch(db, @@ -653,8 +657,7 @@ public class LLLocalDictionary implements LLDictionary { .subscribeOn(dbScheduler) ) .subscribeOn(dbScheduler) - .onErrorMap(cause -> new IOException("Failed to write range", cause)) - ); + .onErrorMap(cause -> new IOException("Failed to write range", cause)); } private static byte[] incrementLexicographically(byte[] key) { @@ -685,7 +688,7 @@ public class LLLocalDictionary implements LLDictionary { .fromCallable(() -> { var readOpts = getReadOptions(null); readOpts.setVerifyChecksums(false); - + // readOpts.setIgnoreRangeDeletions(true); readOpts.setFillCache(false); try (CappedWriteBatch writeBatch = new CappedWriteBatch(db, @@ -728,44 +731,41 @@ public class LLLocalDictionary implements LLDictionary { @Override public Mono sizeRange(@Nullable LLSnapshot snapshot, LLRange range, boolean fast) { - return Mono - .defer(() -> { - if (range.isAll()) { - return Mono - .fromCallable(() -> fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot)) - .onErrorMap(IOException::new) - .subscribeOn(dbScheduler); - } else { - return Mono - .fromCallable(() -> { - var readOpts = resolveSnapshot(snapshot); - readOpts.setFillCache(false); - readOpts.setVerifyChecksums(false); - if (range.hasMin()) { - readOpts.setIterateLowerBound(new Slice(range.getMin())); - } - if (range.hasMax()) { - readOpts.setIterateUpperBound(new Slice(range.getMax())); - } - if (fast) { - readOpts.setIgnoreRangeDeletions(true); - - } - try (var iter = db.newIterator(cfh, readOpts)) { - iter.seekToFirst(); - long i = 0; - while (iter.isValid()) { - iter.next(); - i++; - } - return i; - } - }) - .onErrorMap(cause -> new IOException("Failed to get size of range " - + range.toString(), cause)) - .subscribeOn(dbScheduler); - } - }); + if (range.isAll()) { + return Mono + .fromCallable(() -> fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot)) + .onErrorMap(IOException::new) + .subscribeOn(dbScheduler); + } else { + return Mono + .fromCallable(() -> { + var readOpts = resolveSnapshot(snapshot); + readOpts.setFillCache(false); + readOpts.setVerifyChecksums(false); + if (range.hasMin()) { + readOpts.setIterateLowerBound(new Slice(range.getMin())); + } + if (range.hasMax()) { + readOpts.setIterateUpperBound(new Slice(range.getMax())); + } + if (fast) { + readOpts.setIgnoreRangeDeletions(true); + + } + try (var iter = db.newIterator(cfh, readOpts)) { + iter.seekToFirst(); + long i = 0; + while (iter.isValid()) { + iter.next(); + i++; + } + return i; + } + }) + .onErrorMap(cause -> new IOException("Failed to get size of range " + + range.toString(), cause)) + .subscribeOn(dbScheduler); + } } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneGroupedEntryReactiveIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneGroupedEntryReactiveIterator.java index 8aaf252..61e2b85 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneGroupedEntryReactiveIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneGroupedEntryReactiveIterator.java @@ -13,8 +13,9 @@ public class LLLocalLuceneGroupedEntryReactiveIterator extends LLLocalLuceneGrou ColumnFamilyHandle cfh, int prefixLength, LLRange range, - ReadOptions readOptions) { - super(db, cfh, prefixLength, range, readOptions, true); + ReadOptions readOptions, + String debugName) { + super(db, cfh, prefixLength, range, readOptions, true, debugName); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneGroupedKeysReactiveIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneGroupedKeysReactiveIterator.java index 5ca0420..10e38a5 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneGroupedKeysReactiveIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneGroupedKeysReactiveIterator.java @@ -11,8 +11,9 @@ public class LLLocalLuceneGroupedKeysReactiveIterator extends LLLocalLuceneGroup ColumnFamilyHandle cfh, int prefixLength, LLRange range, - ReadOptions readOptions) { - super(db, cfh, prefixLength, range, readOptions, false); + ReadOptions readOptions, + String debugName) { + super(db, cfh, prefixLength, range, readOptions, false, debugName); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneGroupedReactiveIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneGroupedReactiveIterator.java index 7407159..b4e9df5 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneGroupedReactiveIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneGroupedReactiveIterator.java @@ -4,15 +4,13 @@ import it.cavallium.dbengine.database.LLRange; import it.unimi.dsi.fastutil.objects.ObjectArrayList; import java.util.Arrays; import java.util.List; -import org.jetbrains.annotations.NotNull; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.Slice; -import reactor.core.CoreSubscriber; import reactor.core.publisher.Flux; -public abstract class LLLocalLuceneGroupedReactiveIterator extends Flux> { +public abstract class LLLocalLuceneGroupedReactiveIterator { private static final byte[] EMPTY = new byte[0]; @@ -22,24 +20,28 @@ public abstract class LLLocalLuceneGroupedReactiveIterator extends Flux> actual) { - Flux> flux = Flux + + @SuppressWarnings("Convert2MethodRef") + public Flux> flux() { + return Flux .generate(() -> { var readOptions = new ReadOptions(this.readOptions); readOptions.setFillCache(range.hasMin() && range.hasMax()); @@ -78,8 +80,7 @@ public abstract class LLLocalLuceneGroupedReactiveIterator extends Flux {}); - flux.subscribe(actual); + }, rocksIterator1 -> rocksIterator1.close()); } public abstract T getEntry(byte[] key, byte[] value); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneReactiveIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneReactiveIterator.java index 04018e3..e024913 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneReactiveIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneReactiveIterator.java @@ -1,16 +1,13 @@ package it.cavallium.dbengine.database.disk; import it.cavallium.dbengine.database.LLRange; -import org.jetbrains.annotations.NotNull; -import org.rocksdb.AbstractImmutableNativeReference; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.Slice; -import reactor.core.CoreSubscriber; import reactor.core.publisher.Flux; -public abstract class LLLocalLuceneReactiveIterator extends Flux { +public abstract class LLLocalLuceneReactiveIterator { private static final byte[] EMPTY = new byte[0]; @@ -32,9 +29,9 @@ public abstract class LLLocalLuceneReactiveIterator extends Flux { this.readValues = readValues; } - @Override - public void subscribe(@NotNull CoreSubscriber actual) { - Flux flux = Flux + @SuppressWarnings("Convert2MethodRef") + public Flux flux() { + return Flux .generate(() -> { var readOptions = new ReadOptions(this.readOptions); readOptions.setFillCache(range.hasMin() && range.hasMax()); @@ -61,8 +58,7 @@ public abstract class LLLocalLuceneReactiveIterator extends Flux { sink.complete(); } return rocksIterator; - }, AbstractImmutableNativeReference::close); - flux.subscribe(actual); + }, rocksIterator1 -> rocksIterator1.close()); } public abstract T getEntry(byte[] key, byte[] value);