Add smallRange parameter
This commit is contained in:
parent
388b79c6d1
commit
81b26eed82
@ -48,43 +48,51 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
|
||||
<K> Flux<Boolean> updateMulti(Flux<K> keys, Flux<Send<Buffer>> serializedKeys,
|
||||
KVSerializationFunction<K, @Nullable Send<Buffer>, @Nullable Buffer> updateFunction);
|
||||
|
||||
Flux<Send<LLEntry>> getRange(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> range, boolean reverse);
|
||||
Flux<Send<LLEntry>> getRange(@Nullable LLSnapshot snapshot,
|
||||
Mono<Send<LLRange>> range,
|
||||
boolean reverse,
|
||||
boolean smallRange);
|
||||
|
||||
Flux<List<Send<LLEntry>>> getRangeGrouped(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> range, int prefixLength);
|
||||
Flux<List<Send<LLEntry>>> getRangeGrouped(@Nullable LLSnapshot snapshot,
|
||||
Mono<Send<LLRange>> range,
|
||||
int prefixLength,
|
||||
boolean smallRange);
|
||||
|
||||
Flux<Send<Buffer>> getRangeKeys(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> range, boolean reverse);
|
||||
Flux<Send<Buffer>> getRangeKeys(@Nullable LLSnapshot snapshot,
|
||||
Mono<Send<LLRange>> range,
|
||||
boolean reverse,
|
||||
boolean smallRange);
|
||||
|
||||
Flux<List<Send<Buffer>>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> range, int prefixLength);
|
||||
Flux<List<Send<Buffer>>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot,
|
||||
Mono<Send<LLRange>> range,
|
||||
int prefixLength,
|
||||
boolean smallRange);
|
||||
|
||||
Flux<Send<Buffer>> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> range, int prefixLength);
|
||||
Flux<Send<Buffer>> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot,
|
||||
Mono<Send<LLRange>> range,
|
||||
int prefixLength,
|
||||
boolean smallRange);
|
||||
|
||||
Flux<BadBlock> badBlocks(Mono<Send<LLRange>> range);
|
||||
|
||||
Mono<Void> setRange(Mono<Send<LLRange>> range, Flux<Send<LLEntry>> entries);
|
||||
Mono<Void> setRange(Mono<Send<LLRange>> range, Flux<Send<LLEntry>> entries, boolean smallRange);
|
||||
|
||||
default Mono<Void> replaceRange(Mono<Send<LLRange>> range,
|
||||
boolean canKeysChange,
|
||||
Function<Send<LLEntry>, Mono<Send<LLEntry>>> entriesReplacer,
|
||||
boolean existsAlmostCertainly) {
|
||||
boolean smallRange) {
|
||||
return Mono.defer(() -> {
|
||||
if (canKeysChange) {
|
||||
return this
|
||||
.setRange(range, this
|
||||
.getRange(null, range, false)
|
||||
.flatMap(entriesReplacer)
|
||||
);
|
||||
.getRange(null, range, false, smallRange)
|
||||
.flatMap(entriesReplacer), smallRange);
|
||||
} else {
|
||||
return this.putMulti(this.getRange(null, range, false).flatMap(entriesReplacer));
|
||||
return this.putMulti(this.getRange(null, range, false, smallRange).flatMap(entriesReplacer));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
default Mono<Void> replaceRange(Mono<Send<LLRange>> range,
|
||||
boolean canKeysChange,
|
||||
Function<Send<LLEntry>, Mono<Send<LLEntry>>> entriesReplacer) {
|
||||
return replaceRange(range, canKeysChange, entriesReplacer, false);
|
||||
}
|
||||
|
||||
Mono<Boolean> isRangeEmpty(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> range, boolean fillCache);
|
||||
|
||||
Mono<Long> sizeRange(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> range, boolean fast);
|
||||
|
@ -713,7 +713,7 @@ public class LLUtils {
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean isClosedRange(LLRange rangeShared) {
|
||||
public static boolean isBoundedRange(LLRange rangeShared) {
|
||||
return rangeShared.hasMin() && rangeShared.hasMax();
|
||||
}
|
||||
|
||||
@ -721,28 +721,25 @@ public class LLUtils {
|
||||
* Generate a copy of the passed ReadOptions, with some parameters modified to help with bulk iterations
|
||||
* @param readOptions the read options to copy
|
||||
* @param canFillCache true to fill the cache. If closedRange is false, this field will be ignored
|
||||
* @param closedRange true if the range is closed
|
||||
* @param prefixSameAsStart true if the prefix is same as start
|
||||
* @param boundedRange true if the range is bounded from both sides
|
||||
* @param smallRange true if the range is small
|
||||
* @return a new instance of ReadOptions
|
||||
*/
|
||||
public static ReadOptions generateCustomReadOptions(@Nullable ReadOptions readOptions,
|
||||
boolean canFillCache,
|
||||
boolean closedRange,
|
||||
boolean prefixSameAsStart) {
|
||||
boolean boundedRange,
|
||||
boolean smallRange) {
|
||||
if (readOptions != null) {
|
||||
readOptions = new ReadOptions(readOptions);
|
||||
} else {
|
||||
readOptions = new ReadOptions();
|
||||
}
|
||||
if (closedRange) {
|
||||
if (boundedRange || smallRange) {
|
||||
readOptions.setFillCache(canFillCache);
|
||||
readOptions.setPrefixSameAsStart(prefixSameAsStart);
|
||||
readOptions.setTotalOrderSeek(!prefixSameAsStart);
|
||||
} else {
|
||||
readOptions.setReadaheadSize(4 * 1024 * 1024); // 4MiB
|
||||
readOptions.setFillCache(false);
|
||||
readOptions.setVerifyChecksums(false);
|
||||
readOptions.setTotalOrderSeek(true);
|
||||
}
|
||||
|
||||
return readOptions;
|
||||
|
@ -73,14 +73,14 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
public static <K, V> Flux<Entry<K, V>> getLeavesFrom(DatabaseMapDictionary<K, V> databaseMapDictionary,
|
||||
CompositeSnapshot snapshot,
|
||||
Mono<K> key,
|
||||
boolean reverse) {
|
||||
boolean reverse, boolean smallRange) {
|
||||
Mono<Optional<K>> keyOptMono = key.map(Optional::of).defaultIfEmpty(Optional.empty());
|
||||
|
||||
return keyOptMono.flatMapMany(keyOpt -> {
|
||||
if (keyOpt.isPresent()) {
|
||||
return databaseMapDictionary.getAllValues(snapshot, keyOpt.get(), reverse);
|
||||
return databaseMapDictionary.getAllValues(snapshot, keyOpt.get(), reverse, smallRange);
|
||||
} else {
|
||||
return databaseMapDictionary.getAllValues(snapshot);
|
||||
return databaseMapDictionary.getAllValues(snapshot, smallRange);
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -88,16 +88,16 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
public static <K> Flux<K> getKeyLeavesFrom(DatabaseMapDictionary<K, ?> databaseMapDictionary,
|
||||
CompositeSnapshot snapshot,
|
||||
Mono<K> key,
|
||||
boolean reverse) {
|
||||
boolean reverse, boolean smallRange) {
|
||||
Mono<Optional<K>> keyOptMono = key.map(Optional::of).defaultIfEmpty(Optional.empty());
|
||||
|
||||
return keyOptMono.flatMapMany(keyOpt -> {
|
||||
Flux<? extends Entry<K, ? extends DatabaseStageEntry<?>>> stagesFlux;
|
||||
if (keyOpt.isPresent()) {
|
||||
stagesFlux = databaseMapDictionary
|
||||
.getAllStages(snapshot, keyOpt.get(), reverse);
|
||||
.getAllStages(snapshot, keyOpt.get(), reverse, smallRange);
|
||||
} else {
|
||||
stagesFlux = databaseMapDictionary.getAllStages(snapshot);
|
||||
stagesFlux = databaseMapDictionary.getAllStages(snapshot, smallRange);
|
||||
}
|
||||
return stagesFlux.doOnNext(e -> e.getValue().close())
|
||||
.doOnDiscard(Entry.class, e -> {
|
||||
@ -195,7 +195,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
@Override
|
||||
public Mono<Object2ObjectSortedMap<T, U>> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) {
|
||||
return dictionary
|
||||
.getRange(resolveSnapshot(snapshot), rangeMono, false)
|
||||
.getRange(resolveSnapshot(snapshot), rangeMono, false, true)
|
||||
.<Entry<T, U>>handle((entrySend, sink) -> {
|
||||
Entry<T, U> deserializedEntry;
|
||||
try {
|
||||
@ -228,8 +228,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
.get(null, false)
|
||||
.concatWith(dictionary.setRange(rangeMono, Flux
|
||||
.fromIterable(Collections.unmodifiableMap(value).entrySet())
|
||||
.handle(this::serializeEntrySink)
|
||||
).then(Mono.empty()))
|
||||
.handle(this::serializeEntrySink), true).then(Mono.empty()))
|
||||
.singleOrEmpty();
|
||||
}
|
||||
|
||||
@ -480,21 +479,22 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Entry<T, DatabaseStageEntry<U>>> getAllStages(@Nullable CompositeSnapshot snapshot) {
|
||||
return getAllStages(snapshot, rangeMono, false);
|
||||
public Flux<Entry<T, DatabaseStageEntry<U>>> getAllStages(@Nullable CompositeSnapshot snapshot, boolean smallRange) {
|
||||
return getAllStages(snapshot, rangeMono, false, smallRange);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all stages
|
||||
* @param key from/to the specified key, if not null
|
||||
* @param reverse if true, the results will go backwards from the specified key (inclusive)
|
||||
* if false, the results will go forward from the specified key (inclusive)
|
||||
* @param smallRange
|
||||
*/
|
||||
public Flux<Entry<T, DatabaseStageEntry<U>>> getAllStages(@Nullable CompositeSnapshot snapshot,
|
||||
@Nullable T key,
|
||||
boolean reverse) {
|
||||
boolean reverse,
|
||||
boolean smallRange) {
|
||||
if (key == null) {
|
||||
return getAllStages(snapshot);
|
||||
return getAllStages(snapshot, smallRange);
|
||||
} else {
|
||||
Mono<Send<LLRange>> boundedRangeMono = rangeMono.flatMap(fullRangeSend -> Mono.fromCallable(() -> {
|
||||
try (var fullRange = fullRangeSend.receive()) {
|
||||
@ -511,14 +511,14 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
}
|
||||
}
|
||||
}));
|
||||
return getAllStages(snapshot, boundedRangeMono, reverse);
|
||||
return getAllStages(snapshot, boundedRangeMono, reverse, smallRange);
|
||||
}
|
||||
}
|
||||
|
||||
private Flux<Entry<T, DatabaseStageEntry<U>>> getAllStages(@Nullable CompositeSnapshot snapshot,
|
||||
Mono<Send<LLRange>> sliceRangeMono, boolean reverse) {
|
||||
Mono<Send<LLRange>> sliceRangeMono, boolean reverse, boolean smallRange) {
|
||||
return dictionary
|
||||
.getRangeKeys(resolveSnapshot(snapshot), sliceRangeMono, reverse)
|
||||
.getRangeKeys(resolveSnapshot(snapshot), sliceRangeMono, reverse, smallRange)
|
||||
.handle((keyBufToReceive, sink) -> {
|
||||
var keyBuf = keyBufToReceive.receive();
|
||||
try {
|
||||
@ -540,19 +540,22 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Entry<T, U>> getAllValues(@Nullable CompositeSnapshot snapshot) {
|
||||
return getAllValues(snapshot, rangeMono, false);
|
||||
public Flux<Entry<T, U>> getAllValues(@Nullable CompositeSnapshot snapshot, boolean smallRange) {
|
||||
return getAllValues(snapshot, rangeMono, false, smallRange);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all values
|
||||
* @param key from/to the specified key, if not null
|
||||
* @param reverse if true, the results will go backwards from the specified key (inclusive)
|
||||
* if false, the results will go forward from the specified key (inclusive)
|
||||
* @param smallRange
|
||||
*/
|
||||
public Flux<Entry<T, U>> getAllValues(@Nullable CompositeSnapshot snapshot, @Nullable T key, boolean reverse) {
|
||||
public Flux<Entry<T, U>> getAllValues(@Nullable CompositeSnapshot snapshot,
|
||||
@Nullable T key,
|
||||
boolean reverse,
|
||||
boolean smallRange) {
|
||||
if (key == null) {
|
||||
return getAllValues(snapshot);
|
||||
return getAllValues(snapshot, smallRange);
|
||||
} else {
|
||||
Mono<Send<LLRange>> boundedRangeMono = rangeMono.flatMap(fullRangeSend -> Mono.fromCallable(() -> {
|
||||
try (var fullRange = fullRangeSend.receive()) {
|
||||
@ -569,15 +572,15 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
}
|
||||
}
|
||||
}));
|
||||
return getAllValues(snapshot, boundedRangeMono, reverse);
|
||||
return getAllValues(snapshot, boundedRangeMono, reverse, smallRange);
|
||||
}
|
||||
}
|
||||
|
||||
private Flux<Entry<T, U>> getAllValues(@Nullable CompositeSnapshot snapshot,
|
||||
Mono<Send<LLRange>> sliceRangeMono,
|
||||
boolean reverse) {
|
||||
boolean reverse, boolean smallRange) {
|
||||
return dictionary
|
||||
.getRange(resolveSnapshot(snapshot), sliceRangeMono, reverse)
|
||||
.getRange(resolveSnapshot(snapshot), sliceRangeMono, reverse, smallRange)
|
||||
.<Entry<T, U>>handle((serializedEntryToReceive, sink) -> {
|
||||
try {
|
||||
Entry<T, U> entry;
|
||||
@ -604,8 +607,8 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
@Override
|
||||
public Flux<Entry<T, U>> setAllValuesAndGetPrevious(Flux<Entry<T, U>> entries) {
|
||||
return Flux.concat(
|
||||
this.getAllValues(null),
|
||||
dictionary.setRange(rangeMono, entries.handle(this::serializeEntrySink)).then(Mono.empty())
|
||||
this.getAllValues(null, false),
|
||||
dictionary.setRange(rangeMono, entries.handle(this::serializeEntrySink), false).then(Mono.empty())
|
||||
);
|
||||
}
|
||||
|
||||
@ -619,7 +622,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
.doOnNext(Send::close)
|
||||
.then();
|
||||
} else {
|
||||
return dictionary.setRange(rangeMono, Flux.empty());
|
||||
return dictionary.setRange(rangeMono, Flux.empty(), false);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -369,9 +369,9 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Entry<T, US>> getAllStages(@Nullable CompositeSnapshot snapshot) {
|
||||
public Flux<Entry<T, US>> getAllStages(@Nullable CompositeSnapshot snapshot, boolean smallRange) {
|
||||
return dictionary
|
||||
.getRangeKeyPrefixes(resolveSnapshot(snapshot), rangeMono, keyPrefixLength + keySuffixLength)
|
||||
.getRangeKeyPrefixes(resolveSnapshot(snapshot), rangeMono, keyPrefixLength + keySuffixLength, smallRange)
|
||||
.flatMapSequential(groupKeyWithoutExtSend_ -> Mono.using(
|
||||
groupKeyWithoutExtSend_::receive,
|
||||
groupKeyWithoutExtSend -> this.subStageGetter
|
||||
@ -418,7 +418,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
|
||||
@Override
|
||||
public Flux<Entry<T, U>> setAllValuesAndGetPrevious(Flux<Entry<T, U>> entries) {
|
||||
return this
|
||||
.getAllValues(null)
|
||||
.getAllValues(null, false)
|
||||
.concatWith(this
|
||||
.clear()
|
||||
.then(this.putMulti(entries))
|
||||
@ -438,7 +438,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
|
||||
.doOnNext(Send::close)
|
||||
.then();
|
||||
} else {
|
||||
return dictionary.setRange(rangeMono, Flux.empty());
|
||||
return dictionary.setRange(rangeMono, Flux.empty(), false);
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -551,7 +551,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
|
||||
sink.next(fullRange.send());
|
||||
}
|
||||
}
|
||||
}), false)
|
||||
}), false, false)
|
||||
.concatMapIterable(entrySend -> {
|
||||
K1 key1 = null;
|
||||
Object key2 = null;
|
||||
|
@ -4,7 +4,6 @@ import io.netty5.buffer.api.Buffer;
|
||||
import io.netty5.buffer.api.BufferAllocator;
|
||||
import io.netty5.buffer.api.Drop;
|
||||
import io.netty5.buffer.api.Owned;
|
||||
import io.netty5.buffer.api.Resource;
|
||||
import io.netty5.buffer.api.Send;
|
||||
import it.cavallium.dbengine.client.BadBlock;
|
||||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
@ -18,7 +17,6 @@ import it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap;
|
||||
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap;
|
||||
import it.unimi.dsi.fastutil.objects.ObjectArraySet;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Objects;
|
||||
@ -212,9 +210,9 @@ public class DatabaseMapDictionaryHashed<T, U, TH> extends
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Entry<T, DatabaseStageEntry<U>>> getAllStages(@Nullable CompositeSnapshot snapshot) {
|
||||
public Flux<Entry<T, DatabaseStageEntry<U>>> getAllStages(@Nullable CompositeSnapshot snapshot, boolean smallRange) {
|
||||
return subDictionary
|
||||
.getAllValues(snapshot)
|
||||
.getAllValues(snapshot, smallRange)
|
||||
.map(Entry::getValue)
|
||||
.map(Collections::unmodifiableSet)
|
||||
.flatMap(bucket -> Flux
|
||||
@ -225,9 +223,9 @@ public class DatabaseMapDictionaryHashed<T, U, TH> extends
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Entry<T, U>> getAllValues(@Nullable CompositeSnapshot snapshot) {
|
||||
public Flux<Entry<T, U>> getAllValues(@Nullable CompositeSnapshot snapshot, boolean smallRange) {
|
||||
return subDictionary
|
||||
.getAllValues(snapshot)
|
||||
.getAllValues(snapshot, smallRange)
|
||||
.map(Entry::getValue)
|
||||
.map(Collections::unmodifiableSet)
|
||||
.concatMapIterable(list -> list);
|
||||
|
@ -11,17 +11,14 @@ import it.cavallium.dbengine.database.serialization.SerializationFunction;
|
||||
import it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap;
|
||||
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap;
|
||||
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMaps;
|
||||
import java.time.Duration;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
import java.util.logging.Level;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.publisher.SignalType;
|
||||
import reactor.util.function.Tuple2;
|
||||
import reactor.util.function.Tuples;
|
||||
|
||||
@ -123,11 +120,11 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends
|
||||
return entries.flatMap(entry -> this.putValue(entry.getKey(), entry.getValue())).then();
|
||||
}
|
||||
|
||||
Flux<Entry<T, US>> getAllStages(@Nullable CompositeSnapshot snapshot);
|
||||
Flux<Entry<T, US>> getAllStages(@Nullable CompositeSnapshot snapshot, boolean smallRange);
|
||||
|
||||
default Flux<Entry<T, U>> getAllValues(@Nullable CompositeSnapshot snapshot) {
|
||||
default Flux<Entry<T, U>> getAllValues(@Nullable CompositeSnapshot snapshot, boolean smallRange) {
|
||||
return this
|
||||
.getAllStages(snapshot)
|
||||
.getAllStages(snapshot, smallRange)
|
||||
.flatMapSequential(stage -> stage
|
||||
.getValue()
|
||||
.get(snapshot, true)
|
||||
@ -146,13 +143,14 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends
|
||||
return setAllValues(Flux.empty());
|
||||
}
|
||||
|
||||
default Mono<Void> replaceAllValues(boolean canKeysChange, Function<Entry<T, U>,
|
||||
Mono<Entry<T, U>>> entriesReplacer) {
|
||||
default Mono<Void> replaceAllValues(boolean canKeysChange,
|
||||
Function<Entry<T, U>, Mono<Entry<T, U>>> entriesReplacer,
|
||||
boolean smallRange) {
|
||||
if (canKeysChange) {
|
||||
return this.setAllValues(this.getAllValues(null).flatMap(entriesReplacer)).then();
|
||||
return this.setAllValues(this.getAllValues(null, smallRange).flatMap(entriesReplacer)).then();
|
||||
} else {
|
||||
return this
|
||||
.getAllValues(null)
|
||||
.getAllValues(null, smallRange)
|
||||
.flatMap(entriesReplacer)
|
||||
.flatMap(replacedEntry -> this
|
||||
.at(null, replacedEntry.getKey())
|
||||
@ -167,7 +165,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends
|
||||
|
||||
default Mono<Void> replaceAll(Function<Entry<T, US>, Mono<Void>> entriesReplacer) {
|
||||
return this
|
||||
.getAllStages(null)
|
||||
.getAllStages(null, false)
|
||||
.flatMap(stage -> entriesReplacer.apply(stage)
|
||||
.doFinally(s -> stage.getValue().close())
|
||||
)
|
||||
@ -199,7 +197,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends
|
||||
.flatMap(updateMode -> {
|
||||
if (updateMode == UpdateMode.ALLOW_UNSAFE) {
|
||||
return this
|
||||
.getAllValues(null)
|
||||
.getAllValues(null, true)
|
||||
.collectMap(Entry::getKey, Entry::getValue, Object2ObjectLinkedOpenHashMap::new)
|
||||
.map(map -> (Object2ObjectSortedMap<T, U>) map)
|
||||
.single()
|
||||
@ -246,7 +244,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends
|
||||
@Override
|
||||
default Mono<Object2ObjectSortedMap<T, U>> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) {
|
||||
return this
|
||||
.getAllValues(snapshot)
|
||||
.getAllValues(snapshot, true)
|
||||
.collectMap(Entry::getKey, Entry::getValue, Object2ObjectLinkedOpenHashMap::new)
|
||||
.map(map -> (Object2ObjectSortedMap<T, U>) map)
|
||||
.filter(map -> !map.isEmpty());
|
||||
@ -255,7 +253,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends
|
||||
@Override
|
||||
default Mono<Long> leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) {
|
||||
return this
|
||||
.getAllStages(snapshot)
|
||||
.getAllStages(snapshot, false)
|
||||
.doOnNext(stage -> stage.getValue().close())
|
||||
.count();
|
||||
}
|
||||
|
@ -768,14 +768,17 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Send<LLEntry>> getRange(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono, boolean reverse) {
|
||||
public Flux<Send<LLEntry>> getRange(@Nullable LLSnapshot snapshot,
|
||||
Mono<Send<LLRange>> rangeMono,
|
||||
boolean reverse,
|
||||
boolean smallRange) {
|
||||
return rangeMono.flatMapMany(rangeSend -> {
|
||||
try (var range = rangeSend.receive()) {
|
||||
if (range.isSingle()) {
|
||||
var rangeSingleMono = rangeMono.map(r -> r.receive().getSingle());
|
||||
return getRangeSingle(snapshot, rangeSingleMono);
|
||||
} else {
|
||||
return getRangeMulti(snapshot, rangeMono, reverse);
|
||||
return getRangeMulti(snapshot, rangeMono, reverse, smallRange);
|
||||
}
|
||||
}
|
||||
});
|
||||
@ -784,14 +787,15 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
@Override
|
||||
public Flux<List<Send<LLEntry>>> getRangeGrouped(@Nullable LLSnapshot snapshot,
|
||||
Mono<Send<LLRange>> rangeMono,
|
||||
int prefixLength) {
|
||||
int prefixLength,
|
||||
boolean smallRange) {
|
||||
return rangeMono.flatMapMany(rangeSend -> {
|
||||
try (var range = rangeSend.receive()) {
|
||||
if (range.isSingle()) {
|
||||
var rangeSingleMono = rangeMono.map(r -> r.receive().getSingle());
|
||||
return getRangeSingle(snapshot, rangeSingleMono).map(List::of);
|
||||
} else {
|
||||
return getRangeMultiGrouped(snapshot, rangeMono, prefixLength);
|
||||
return getRangeMultiGrouped(snapshot, rangeMono, prefixLength, smallRange);
|
||||
}
|
||||
}
|
||||
});
|
||||
@ -804,10 +808,13 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
.flux();
|
||||
}
|
||||
|
||||
private Flux<Send<LLEntry>> getRangeMulti(LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono, boolean reverse) {
|
||||
private Flux<Send<LLEntry>> getRangeMulti(LLSnapshot snapshot,
|
||||
Mono<Send<LLRange>> rangeMono,
|
||||
boolean reverse,
|
||||
boolean smallRange) {
|
||||
Mono<LLLocalEntryReactiveRocksIterator> iteratorMono = rangeMono.map(rangeSend -> {
|
||||
ReadOptions resolvedSnapshot = resolveSnapshot(snapshot);
|
||||
return new LLLocalEntryReactiveRocksIterator(db, rangeSend, nettyDirect, resolvedSnapshot, reverse);
|
||||
return new LLLocalEntryReactiveRocksIterator(db, rangeSend, nettyDirect, resolvedSnapshot, reverse, smallRange);
|
||||
});
|
||||
return Flux.usingWhen(iteratorMono,
|
||||
iterator -> iterator.flux().subscribeOn(dbScheduler, false),
|
||||
@ -816,10 +823,16 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
}
|
||||
|
||||
private Flux<List<Send<LLEntry>>> getRangeMultiGrouped(LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono,
|
||||
int prefixLength) {
|
||||
int prefixLength, boolean smallRange) {
|
||||
Mono<LLLocalGroupedEntryReactiveRocksIterator> iteratorMono = rangeMono.map(rangeSend -> {
|
||||
ReadOptions resolvedSnapshot = resolveSnapshot(snapshot);
|
||||
return new LLLocalGroupedEntryReactiveRocksIterator(db, prefixLength, rangeSend, nettyDirect, resolvedSnapshot);
|
||||
return new LLLocalGroupedEntryReactiveRocksIterator(db,
|
||||
prefixLength,
|
||||
rangeSend,
|
||||
nettyDirect,
|
||||
resolvedSnapshot,
|
||||
smallRange
|
||||
);
|
||||
});
|
||||
return Flux.usingWhen(
|
||||
iteratorMono,
|
||||
@ -829,13 +842,16 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Send<Buffer>> getRangeKeys(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono, boolean reverse) {
|
||||
public Flux<Send<Buffer>> getRangeKeys(@Nullable LLSnapshot snapshot,
|
||||
Mono<Send<LLRange>> rangeMono,
|
||||
boolean reverse,
|
||||
boolean smallRange) {
|
||||
return rangeMono.flatMapMany(rangeSend -> {
|
||||
try (var range = rangeSend.receive()) {
|
||||
if (range.isSingle()) {
|
||||
return this.getRangeKeysSingle(snapshot, rangeMono.map(r -> r.receive().getSingle()));
|
||||
} else {
|
||||
return this.getRangeKeysMulti(snapshot, rangeMono, reverse);
|
||||
return this.getRangeKeysMulti(snapshot, rangeMono, reverse, smallRange);
|
||||
}
|
||||
}
|
||||
});
|
||||
@ -844,10 +860,17 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
@Override
|
||||
public Flux<List<Send<Buffer>>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot,
|
||||
Mono<Send<LLRange>> rangeMono,
|
||||
int prefixLength) {
|
||||
int prefixLength,
|
||||
boolean smallRange) {
|
||||
Mono<LLLocalGroupedKeyReactiveRocksIterator> iteratorMono = rangeMono.map(rangeSend -> {
|
||||
ReadOptions resolvedSnapshot = resolveSnapshot(snapshot);
|
||||
return new LLLocalGroupedKeyReactiveRocksIterator(db, prefixLength, rangeSend, nettyDirect, resolvedSnapshot);
|
||||
return new LLLocalGroupedKeyReactiveRocksIterator(db,
|
||||
prefixLength,
|
||||
rangeSend,
|
||||
nettyDirect,
|
||||
resolvedSnapshot,
|
||||
smallRange
|
||||
);
|
||||
});
|
||||
return Flux.usingWhen(iteratorMono,
|
||||
iterator -> iterator.flux().subscribeOn(dbScheduler, false),
|
||||
@ -899,10 +922,12 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
|
||||
@Override
|
||||
public Flux<Send<Buffer>> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono,
|
||||
int prefixLength) {
|
||||
int prefixLength, boolean smallRange) {
|
||||
Mono<LLLocalKeyPrefixReactiveRocksIterator> iteratorMono = rangeMono.map(range -> {
|
||||
ReadOptions resolvedSnapshot = resolveSnapshot(snapshot);
|
||||
return new LLLocalKeyPrefixReactiveRocksIterator(db, prefixLength, range, nettyDirect, resolvedSnapshot, true);
|
||||
return new LLLocalKeyPrefixReactiveRocksIterator(db, prefixLength, range, nettyDirect, resolvedSnapshot, true,
|
||||
smallRange
|
||||
);
|
||||
});
|
||||
return Flux.usingWhen(iteratorMono,
|
||||
iterator -> iterator.flux().subscribeOn(dbScheduler),
|
||||
@ -927,10 +952,13 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
.flux();
|
||||
}
|
||||
|
||||
private Flux<Send<Buffer>> getRangeKeysMulti(LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono, boolean reverse) {
|
||||
private Flux<Send<Buffer>> getRangeKeysMulti(LLSnapshot snapshot,
|
||||
Mono<Send<LLRange>> rangeMono,
|
||||
boolean reverse,
|
||||
boolean smallRange) {
|
||||
Mono<LLLocalKeyReactiveRocksIterator> iteratorMono = rangeMono.map(range -> {
|
||||
ReadOptions resolvedSnapshot = resolveSnapshot(snapshot);
|
||||
return new LLLocalKeyReactiveRocksIterator(db, range, nettyDirect, resolvedSnapshot, reverse);
|
||||
return new LLLocalKeyReactiveRocksIterator(db, range, nettyDirect, resolvedSnapshot, reverse, smallRange);
|
||||
});
|
||||
return Flux.usingWhen(iteratorMono,
|
||||
iterator -> iterator.flux().subscribeOn(dbScheduler, false),
|
||||
@ -939,7 +967,7 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> setRange(Mono<Send<LLRange>> rangeMono, Flux<Send<LLEntry>> entries) {
|
||||
public Mono<Void> setRange(Mono<Send<LLRange>> rangeMono, Flux<Send<LLEntry>> entries, boolean smallRange) {
|
||||
if (USE_WINDOW_IN_SET_RANGE) {
|
||||
return rangeMono
|
||||
.publishOn(dbScheduler)
|
||||
@ -1087,7 +1115,7 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
"Can't use write batches in setRange without window. Please fix the parameters"));
|
||||
}
|
||||
var deleteMono = this
|
||||
.getRange(null, rangeMono, false)
|
||||
.getRange(null, rangeMono, false, smallRange)
|
||||
.publishOn(dbScheduler)
|
||||
.handle((oldValueSend, sink) -> {
|
||||
try (var oldValue = oldValueSend.receive()) {
|
||||
|
@ -11,8 +11,10 @@ public class LLLocalEntryReactiveRocksIterator extends LLLocalReactiveRocksItera
|
||||
public LLLocalEntryReactiveRocksIterator(RocksDBColumn db,
|
||||
Send<LLRange> range,
|
||||
boolean allowNettyDirect,
|
||||
ReadOptions readOptions, boolean reverse) {
|
||||
super(db, range, allowNettyDirect, readOptions, true, reverse);
|
||||
ReadOptions readOptions,
|
||||
boolean reverse,
|
||||
boolean smallRange) {
|
||||
super(db, range, allowNettyDirect, readOptions, true, reverse, smallRange);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -13,8 +13,9 @@ public class LLLocalGroupedEntryReactiveRocksIterator extends
|
||||
int prefixLength,
|
||||
Send<LLRange> range,
|
||||
boolean allowNettyDirect,
|
||||
ReadOptions readOptions) {
|
||||
super(db, prefixLength, range, allowNettyDirect, readOptions, false, true);
|
||||
ReadOptions readOptions,
|
||||
boolean smallRange) {
|
||||
super(db, prefixLength, range, allowNettyDirect, readOptions, false, true, smallRange);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -11,8 +11,9 @@ public class LLLocalGroupedKeyReactiveRocksIterator extends LLLocalGroupedReacti
|
||||
int prefixLength,
|
||||
Send<LLRange> range,
|
||||
boolean allowNettyDirect,
|
||||
ReadOptions readOptions) {
|
||||
super(db, prefixLength, range, allowNettyDirect, readOptions, true, false);
|
||||
ReadOptions readOptions,
|
||||
boolean smallRange) {
|
||||
super(db, prefixLength, range, allowNettyDirect, readOptions, true, false, smallRange);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -2,7 +2,7 @@ package it.cavallium.dbengine.database.disk;
|
||||
|
||||
import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
|
||||
import static it.cavallium.dbengine.database.LLUtils.generateCustomReadOptions;
|
||||
import static it.cavallium.dbengine.database.LLUtils.isClosedRange;
|
||||
import static it.cavallium.dbengine.database.LLUtils.isBoundedRange;
|
||||
|
||||
import io.netty5.buffer.api.Buffer;
|
||||
import io.netty5.buffer.api.Drop;
|
||||
@ -63,6 +63,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> extends
|
||||
private ReadOptions readOptions;
|
||||
private final boolean canFillCache;
|
||||
private final boolean readValues;
|
||||
private final boolean smallRange;
|
||||
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
public LLLocalGroupedReactiveRocksIterator(RocksDBColumn db,
|
||||
@ -71,7 +72,8 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> extends
|
||||
boolean allowNettyDirect,
|
||||
ReadOptions readOptions,
|
||||
boolean canFillCache,
|
||||
boolean readValues) {
|
||||
boolean readValues,
|
||||
boolean smallRange) {
|
||||
super((Drop<LLLocalGroupedReactiveRocksIterator<T>>) (Drop) DROP);
|
||||
try (range) {
|
||||
this.db = db;
|
||||
@ -81,6 +83,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> extends
|
||||
this.readOptions = readOptions;
|
||||
this.canFillCache = canFillCache;
|
||||
this.readValues = readValues;
|
||||
this.smallRange = smallRange;
|
||||
}
|
||||
}
|
||||
|
||||
@ -88,7 +91,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> extends
|
||||
public final Flux<List<T>> flux() {
|
||||
return Flux
|
||||
.generate(() -> {
|
||||
var readOptions = generateCustomReadOptions(this.readOptions, true, isClosedRange(range), true);
|
||||
var readOptions = generateCustomReadOptions(this.readOptions, true, isBoundedRange(range), smallRange);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range));
|
||||
}
|
||||
@ -176,7 +179,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> extends
|
||||
allowNettyDirect,
|
||||
readOptions,
|
||||
canFillCache,
|
||||
readValues
|
||||
readValues, smallRange
|
||||
) {
|
||||
@Override
|
||||
public T getEntry(@Nullable Send<Buffer> key, @Nullable Send<Buffer> value) {
|
||||
|
@ -2,7 +2,7 @@ package it.cavallium.dbengine.database.disk;
|
||||
|
||||
import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
|
||||
import static it.cavallium.dbengine.database.LLUtils.generateCustomReadOptions;
|
||||
import static it.cavallium.dbengine.database.LLUtils.isClosedRange;
|
||||
import static it.cavallium.dbengine.database.LLUtils.isBoundedRange;
|
||||
|
||||
import io.netty5.buffer.api.Buffer;
|
||||
import io.netty5.buffer.api.Drop;
|
||||
@ -59,13 +59,15 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends
|
||||
private final boolean allowNettyDirect;
|
||||
private ReadOptions readOptions;
|
||||
private final boolean canFillCache;
|
||||
private final boolean smallRange;
|
||||
|
||||
public LLLocalKeyPrefixReactiveRocksIterator(RocksDBColumn db,
|
||||
int prefixLength,
|
||||
Send<LLRange> range,
|
||||
boolean allowNettyDirect,
|
||||
ReadOptions readOptions,
|
||||
boolean canFillCache) {
|
||||
boolean canFillCache,
|
||||
boolean smallRange) {
|
||||
super(DROP);
|
||||
try (range) {
|
||||
this.db = db;
|
||||
@ -74,13 +76,18 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends
|
||||
this.allowNettyDirect = allowNettyDirect;
|
||||
this.readOptions = readOptions;
|
||||
this.canFillCache = canFillCache;
|
||||
this.smallRange = smallRange;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public Flux<Send<Buffer>> flux() {
|
||||
return Flux.generate(() -> {
|
||||
var readOptions = generateCustomReadOptions(this.readOptions, canFillCache, isClosedRange(rangeShared), true);
|
||||
var readOptions = generateCustomReadOptions(this.readOptions,
|
||||
canFillCache,
|
||||
isBoundedRange(rangeShared),
|
||||
smallRange
|
||||
);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(rangeShared));
|
||||
}
|
||||
@ -163,7 +170,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends
|
||||
range,
|
||||
allowNettyDirect,
|
||||
readOptions,
|
||||
canFillCache
|
||||
canFillCache, smallRange
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -11,8 +11,9 @@ public class LLLocalKeyReactiveRocksIterator extends LLLocalReactiveRocksIterato
|
||||
Send<LLRange> range,
|
||||
boolean allowNettyDirect,
|
||||
ReadOptions readOptions,
|
||||
boolean reverse) {
|
||||
super(db, range, allowNettyDirect, readOptions, false, reverse);
|
||||
boolean reverse,
|
||||
boolean smallRange) {
|
||||
super(db, range, allowNettyDirect, readOptions, false, reverse, smallRange);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -2,7 +2,7 @@ package it.cavallium.dbengine.database.disk;
|
||||
|
||||
import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
|
||||
import static it.cavallium.dbengine.database.LLUtils.generateCustomReadOptions;
|
||||
import static it.cavallium.dbengine.database.LLUtils.isClosedRange;
|
||||
import static it.cavallium.dbengine.database.LLUtils.isBoundedRange;
|
||||
import static it.cavallium.dbengine.database.disk.LLLocalDictionary.getRocksIterator;
|
||||
|
||||
import io.netty5.buffer.api.Buffer;
|
||||
@ -61,6 +61,7 @@ public abstract class LLLocalReactiveRocksIterator<T> extends
|
||||
private ReadOptions readOptions;
|
||||
private final boolean readValues;
|
||||
private final boolean reverse;
|
||||
private final boolean smallRange;
|
||||
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
public LLLocalReactiveRocksIterator(RocksDBColumn db,
|
||||
@ -68,7 +69,8 @@ public abstract class LLLocalReactiveRocksIterator<T> extends
|
||||
boolean allowNettyDirect,
|
||||
ReadOptions readOptions,
|
||||
boolean readValues,
|
||||
boolean reverse) {
|
||||
boolean reverse,
|
||||
boolean smallRange) {
|
||||
super((Drop<LLLocalReactiveRocksIterator<T>>) (Drop) DROP);
|
||||
try (range) {
|
||||
this.db = db;
|
||||
@ -77,12 +79,13 @@ public abstract class LLLocalReactiveRocksIterator<T> extends
|
||||
this.readOptions = readOptions;
|
||||
this.readValues = readValues;
|
||||
this.reverse = reverse;
|
||||
this.smallRange = smallRange;
|
||||
}
|
||||
}
|
||||
|
||||
public final Flux<T> flux() {
|
||||
return Flux.generate(() -> {
|
||||
var readOptions = generateCustomReadOptions(this.readOptions, true, isClosedRange(rangeShared), true);
|
||||
var readOptions = generateCustomReadOptions(this.readOptions, true, isBoundedRange(rangeShared), smallRange);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(rangeShared));
|
||||
}
|
||||
@ -160,7 +163,14 @@ public abstract class LLLocalReactiveRocksIterator<T> extends
|
||||
protected Owned<LLLocalReactiveRocksIterator<T>> prepareSend() {
|
||||
var range = this.rangeShared.send();
|
||||
var readOptions = this.readOptions;
|
||||
return drop -> new LLLocalReactiveRocksIterator<>(db, range, allowNettyDirect, readOptions, readValues, reverse) {
|
||||
return drop -> new LLLocalReactiveRocksIterator<>(db,
|
||||
range,
|
||||
allowNettyDirect,
|
||||
readOptions,
|
||||
readValues,
|
||||
reverse,
|
||||
smallRange
|
||||
) {
|
||||
@Override
|
||||
public T getEntry(@Nullable Send<Buffer> key, @Nullable Send<Buffer> value) {
|
||||
return LLLocalReactiveRocksIterator.this.getEntry(key, value);
|
||||
|
@ -17,7 +17,6 @@ import it.cavallium.dbengine.database.serialization.SerializationException;
|
||||
import it.cavallium.dbengine.database.serialization.SerializationFunction;
|
||||
import it.unimi.dsi.fastutil.bytes.ByteList;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
@ -296,7 +295,10 @@ public class LLMemoryDictionary implements LLDictionary {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Send<LLEntry>> getRange(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono, boolean reverse) {
|
||||
public Flux<Send<LLEntry>> getRange(@Nullable LLSnapshot snapshot,
|
||||
Mono<Send<LLRange>> rangeMono,
|
||||
boolean reverse,
|
||||
boolean smallRange) {
|
||||
return Flux.usingWhen(rangeMono, rangeToReceive -> {
|
||||
try (var range = rangeToReceive.receive()) {
|
||||
if (range.isSingle()) {
|
||||
@ -331,7 +333,7 @@ public class LLMemoryDictionary implements LLDictionary {
|
||||
@Override
|
||||
public Flux<List<Send<LLEntry>>> getRangeGrouped(@Nullable LLSnapshot snapshot,
|
||||
Mono<Send<LLRange>> rangeMono,
|
||||
int prefixLength) {
|
||||
int prefixLength, boolean smallRange) {
|
||||
return Flux.usingWhen(rangeMono, rangeToReceive -> {
|
||||
try (var range = rangeToReceive.receive()) {
|
||||
if (range.isSingle()) {
|
||||
@ -362,7 +364,10 @@ public class LLMemoryDictionary implements LLDictionary {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Send<Buffer>> getRangeKeys(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono, boolean reverse) {
|
||||
public Flux<Send<Buffer>> getRangeKeys(@Nullable LLSnapshot snapshot,
|
||||
Mono<Send<LLRange>> rangeMono,
|
||||
boolean reverse,
|
||||
boolean smallRange) {
|
||||
return Flux.usingWhen(rangeMono,
|
||||
rangeToReceive -> {
|
||||
try (var range = rangeToReceive.receive()) {
|
||||
@ -396,7 +401,7 @@ public class LLMemoryDictionary implements LLDictionary {
|
||||
@Override
|
||||
public Flux<List<Send<Buffer>>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot,
|
||||
Mono<Send<LLRange>> rangeMono,
|
||||
int prefixLength) {
|
||||
int prefixLength, boolean smallRange) {
|
||||
return Flux.usingWhen(rangeMono, rangeToReceive -> {
|
||||
try (var range = rangeToReceive.receive()) {
|
||||
if (range.isSingle()) {
|
||||
@ -430,7 +435,7 @@ public class LLMemoryDictionary implements LLDictionary {
|
||||
@Override
|
||||
public Flux<Send<Buffer>> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot,
|
||||
Mono<Send<LLRange>> rangeMono,
|
||||
int prefixLength) {
|
||||
int prefixLength, boolean smallRange) {
|
||||
return Flux.usingWhen(rangeMono, rangeToReceive -> {
|
||||
try (var range = rangeToReceive.receive()) {
|
||||
if (range.isSingle()) {
|
||||
@ -465,7 +470,7 @@ public class LLMemoryDictionary implements LLDictionary {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> setRange(Mono<Send<LLRange>> rangeMono, Flux<Send<LLEntry>> entries) {
|
||||
public Mono<Void> setRange(Mono<Send<LLRange>> rangeMono, Flux<Send<LLEntry>> entries, boolean smallRange) {
|
||||
return Mono.usingWhen(rangeMono, rangeToReceive -> {
|
||||
try (var range = rangeToReceive.receive()) {
|
||||
Mono<Void> clearMono;
|
||||
@ -520,7 +525,7 @@ public class LLMemoryDictionary implements LLDictionary {
|
||||
|
||||
@Override
|
||||
public Mono<Boolean> isRangeEmpty(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono, boolean fillCache) {
|
||||
return getRangeKeys(snapshot, rangeMono, false)
|
||||
return getRangeKeys(snapshot, rangeMono, false, false)
|
||||
.doOnNext(buf -> buf.receive().close())
|
||||
.count()
|
||||
.map(count -> count == 0);
|
||||
@ -536,14 +541,14 @@ public class LLMemoryDictionary implements LLDictionary {
|
||||
|
||||
@Override
|
||||
public Mono<Send<LLEntry>> getOne(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono) {
|
||||
return getRange(snapshot, rangeMono, false)
|
||||
return getRange(snapshot, rangeMono, false, false)
|
||||
.take(1, true)
|
||||
.singleOrEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Send<Buffer>> getOneKey(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono) {
|
||||
return getRangeKeys(snapshot, rangeMono, false)
|
||||
return getRangeKeys(snapshot, rangeMono, false, false)
|
||||
.take(1, true)
|
||||
.singleOrEmpty();
|
||||
}
|
||||
|
@ -15,7 +15,6 @@ import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -29,8 +28,6 @@ import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.test.StepVerifier;
|
||||
import reactor.test.StepVerifier.Step;
|
||||
import reactor.test.util.TestLogger;
|
||||
import reactor.util.Loggers;
|
||||
import reactor.util.function.Tuple2;
|
||||
import reactor.util.function.Tuples;
|
||||
|
||||
@ -609,7 +606,7 @@ public abstract class TestDictionaryMap {
|
||||
.flatMapMany(map -> Flux
|
||||
.concat(
|
||||
map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()),
|
||||
map.getAllValues(null)
|
||||
map.getAllValues(null, false)
|
||||
)
|
||||
.doFinally(s -> map.close())
|
||||
)
|
||||
@ -666,7 +663,7 @@ public abstract class TestDictionaryMap {
|
||||
.concat(
|
||||
map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()),
|
||||
map
|
||||
.getAllStages(null)
|
||||
.getAllStages(null, false)
|
||||
.flatMap(stage -> stage
|
||||
.getValue()
|
||||
.get(null)
|
||||
|
@ -6,7 +6,6 @@ import static it.cavallium.dbengine.DbTestUtils.isCIMode;
|
||||
import static it.cavallium.dbengine.DbTestUtils.newAllocator;
|
||||
import static it.cavallium.dbengine.DbTestUtils.destroyAllocator;
|
||||
import static it.cavallium.dbengine.DbTestUtils.tempDatabaseMapDictionaryDeepMap;
|
||||
import static it.cavallium.dbengine.DbTestUtils.tempDatabaseMapDictionaryMap;
|
||||
import static it.cavallium.dbengine.DbTestUtils.tempDb;
|
||||
import static it.cavallium.dbengine.DbTestUtils.tempDictionary;
|
||||
import static it.cavallium.dbengine.SyncUtils.*;
|
||||
@ -14,12 +13,9 @@ import static org.assertj.core.api.Assertions.*;
|
||||
|
||||
import io.netty5.buffer.api.internal.ResourceSupport;
|
||||
import it.cavallium.dbengine.DbTestUtils.TestAllocator;
|
||||
import it.cavallium.dbengine.database.LLUtils;
|
||||
import it.cavallium.dbengine.database.UpdateMode;
|
||||
import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep;
|
||||
import it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap;
|
||||
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap;
|
||||
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMaps;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -42,7 +38,6 @@ import org.junit.jupiter.params.provider.MethodSource;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.test.StepVerifier;
|
||||
import reactor.test.StepVerifier.FirstStep;
|
||||
import reactor.test.StepVerifier.Step;
|
||||
import reactor.util.function.Tuple2;
|
||||
import reactor.util.function.Tuple3;
|
||||
@ -245,7 +240,7 @@ public abstract class TestDictionaryMapDeep {
|
||||
.map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6))
|
||||
.flatMapMany(map -> map
|
||||
.putValue(key, value)
|
||||
.thenMany(map.getAllValues(null))
|
||||
.thenMany(map.getAllValues(null, false))
|
||||
.doFinally(s -> map.close())
|
||||
)
|
||||
));
|
||||
@ -285,9 +280,9 @@ public abstract class TestDictionaryMapDeep {
|
||||
))
|
||||
)
|
||||
.thenMany(map
|
||||
.getAllStages(null)
|
||||
.getAllStages(null, false)
|
||||
.flatMap(v -> v.getValue()
|
||||
.getAllValues(null)
|
||||
.getAllValues(null, false)
|
||||
.map(result -> Tuples.of(v.getKey(), result.getKey(), result.getValue()))
|
||||
.doFinally(s -> v.getValue().close())
|
||||
)
|
||||
@ -1017,7 +1012,7 @@ public abstract class TestDictionaryMapDeep {
|
||||
.flatMapMany(map -> Flux
|
||||
.concat(
|
||||
map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()),
|
||||
map.getAllValues(null)
|
||||
map.getAllValues(null, false)
|
||||
)
|
||||
.doFinally(s -> map.close())
|
||||
)
|
||||
@ -1075,7 +1070,7 @@ public abstract class TestDictionaryMapDeep {
|
||||
.concat(
|
||||
map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()),
|
||||
map
|
||||
.getAllStages(null)
|
||||
.getAllStages(null, false)
|
||||
.flatMap(stage -> stage
|
||||
.getValue()
|
||||
.get(null)
|
||||
|
@ -11,14 +11,8 @@ import static it.cavallium.dbengine.DbTestUtils.tempDictionary;
|
||||
|
||||
import it.cavallium.dbengine.DbTestUtils.TestAllocator;
|
||||
import it.cavallium.dbengine.database.UpdateMode;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
@ -26,11 +20,7 @@ import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.test.StepVerifier;
|
||||
import reactor.test.StepVerifier.Step;
|
||||
import reactor.util.function.Tuple2;
|
||||
import reactor.util.function.Tuple3;
|
||||
import reactor.util.function.Tuple4;
|
||||
import reactor.util.function.Tuples;
|
||||
|
||||
@ -123,7 +113,7 @@ public abstract class TestDictionaryMapDeepHashMap {
|
||||
.flatMapMany(map -> map
|
||||
.at(null, key1).flatMap(v -> v.putValue(key2, value).doFinally(s -> v.close()))
|
||||
.thenMany(map
|
||||
.getAllValues(null)
|
||||
.getAllValues(null, false)
|
||||
.map(Entry::getValue)
|
||||
.flatMap(maps -> Flux.fromIterable(maps.entrySet()))
|
||||
.map(Entry::getValue)
|
||||
|
@ -192,8 +192,8 @@ public abstract class TestLLDictionary {
|
||||
var afterSize = run(dict.sizeRange(null, Mono.fromCallable(() -> LLRange.all().send()), false));
|
||||
Assertions.assertEquals(1, afterSize - beforeSize);
|
||||
|
||||
Assertions.assertTrue(run(dict.getRangeKeys(null, RANGE_ALL, false).map(this::toString).collectList()).contains("test-nonexistent"));
|
||||
Assertions.assertTrue(run(dict.getRangeKeys(null, RANGE_ALL, true).map(this::toString).collectList()).contains("test-nonexistent"));
|
||||
Assertions.assertTrue(run(dict.getRangeKeys(null, RANGE_ALL, false, false).map(this::toString).collectList()).contains("test-nonexistent"));
|
||||
Assertions.assertTrue(run(dict.getRangeKeys(null, RANGE_ALL, true, false).map(this::toString).collectList()).contains("test-nonexistent"));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ -252,9 +252,9 @@ public abstract class TestLLDictionary {
|
||||
assertEquals(expected, afterSize - beforeSize);
|
||||
|
||||
if (updateMode != UpdateMode.DISALLOW) {
|
||||
Assertions.assertTrue(run(dict.getRangeKeys(null, RANGE_ALL, false).map(this::toString).collectList()).contains(
|
||||
Assertions.assertTrue(run(dict.getRangeKeys(null, RANGE_ALL, false, false).map(this::toString).collectList()).contains(
|
||||
"test-nonexistent"));
|
||||
Assertions.assertTrue(run(dict.getRangeKeys(null, RANGE_ALL, true).map(this::toString).collectList()).contains(
|
||||
Assertions.assertTrue(run(dict.getRangeKeys(null, RANGE_ALL, true, false).map(this::toString).collectList()).contains(
|
||||
"test-nonexistent"));
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user