Add optimization
This commit is contained in:
parent
1dc14fe179
commit
aaa203f7ad
@ -13,23 +13,48 @@ import reactor.core.publisher.Mono;
|
||||
@NotAtomic
|
||||
public interface LLDictionary extends LLKeyValueDatabaseStructure {
|
||||
|
||||
Mono<byte[]> get(@Nullable LLSnapshot snapshot, byte[] key);
|
||||
Mono<byte[]> get(@Nullable LLSnapshot snapshot, byte[] key, boolean existsAlmostCertainly);
|
||||
|
||||
default Mono<byte[]> get(@Nullable LLSnapshot snapshot, byte[] key) {
|
||||
return get(snapshot, key, false);
|
||||
}
|
||||
|
||||
Mono<byte[]> put(byte[] key, byte[] value, LLDictionaryResultType resultType);
|
||||
|
||||
Mono<Boolean> update(byte[] key, Function<Optional<byte[]>, Optional<byte[]>> updater);
|
||||
Mono<Boolean> update(byte[] key, Function<Optional<byte[]>, Optional<byte[]>> updater, boolean existsAlmostCertainly);
|
||||
|
||||
default Mono<Boolean> update(byte[] key, Function<Optional<byte[]>, Optional<byte[]>> updater) {
|
||||
return update(key, updater, false);
|
||||
}
|
||||
|
||||
Mono<Void> clear();
|
||||
|
||||
Mono<byte[]> remove(byte[] key, LLDictionaryResultType resultType);
|
||||
|
||||
Flux<Entry<byte[], byte[]>> getMulti(@Nullable LLSnapshot snapshot, Flux<byte[]> keys);
|
||||
Flux<Entry<byte[], byte[]>> getMulti(@Nullable LLSnapshot snapshot, Flux<byte[]> keys, boolean existsAlmostCertainly);
|
||||
|
||||
default Flux<Entry<byte[], byte[]>> getMulti(@Nullable LLSnapshot snapshot, Flux<byte[]> keys) {
|
||||
return getMulti(snapshot, keys, false);
|
||||
}
|
||||
|
||||
Flux<Entry<byte[], byte[]>> putMulti(Flux<Entry<byte[], byte[]>> entries, boolean getOldValues);
|
||||
|
||||
Flux<Entry<byte[], byte[]>> getRange(@Nullable LLSnapshot snapshot, LLRange range);
|
||||
Flux<Entry<byte[], byte[]>> getRange(@Nullable LLSnapshot snapshot, LLRange range, boolean existsAlmostCertainly);
|
||||
|
||||
Flux<List<Entry<byte[], byte[]>>> getRangeGrouped(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength);
|
||||
default Flux<Entry<byte[], byte[]>> getRange(@Nullable LLSnapshot snapshot, LLRange range) {
|
||||
return getRange(snapshot, range, false);
|
||||
}
|
||||
|
||||
Flux<List<Entry<byte[], byte[]>>> getRangeGrouped(@Nullable LLSnapshot snapshot,
|
||||
LLRange range,
|
||||
int prefixLength,
|
||||
boolean existsAlmostCertainly);
|
||||
|
||||
default Flux<List<Entry<byte[], byte[]>>> getRangeGrouped(@Nullable LLSnapshot snapshot,
|
||||
LLRange range,
|
||||
int prefixLength) {
|
||||
return getRangeGrouped(snapshot, range, prefixLength, false);
|
||||
}
|
||||
|
||||
Flux<byte[]> getRangeKeys(@Nullable LLSnapshot snapshot, LLRange range);
|
||||
|
||||
@ -39,24 +64,33 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
|
||||
|
||||
Flux<Entry<byte[], byte[]>> setRange(LLRange range, Flux<Entry<byte[], byte[]>> entries, boolean getOldValues);
|
||||
|
||||
default Mono<Void> replaceRange(LLRange range, boolean canKeysChange, Function<Entry<byte[], byte[]>, Mono<Entry<byte[], byte[]>>> entriesReplacer) {
|
||||
default Mono<Void> replaceRange(LLRange range,
|
||||
boolean canKeysChange,
|
||||
Function<Entry<byte[], byte[]>, Mono<Entry<byte[], byte[]>>> entriesReplacer,
|
||||
boolean existsAlmostCertainly) {
|
||||
return Mono.defer(() -> {
|
||||
if (canKeysChange) {
|
||||
return this
|
||||
.setRange(range, this
|
||||
.getRange(null, range)
|
||||
.getRange(null, range, existsAlmostCertainly)
|
||||
.flatMap(entriesReplacer), false)
|
||||
.then();
|
||||
} else {
|
||||
return this
|
||||
.putMulti(this
|
||||
.getRange(null, range)
|
||||
.getRange(null, range, existsAlmostCertainly)
|
||||
.flatMap(entriesReplacer), false)
|
||||
.then();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
default Mono<Void> replaceRange(LLRange range,
|
||||
boolean canKeysChange,
|
||||
Function<Entry<byte[], byte[]>, Mono<Entry<byte[], byte[]>>> entriesReplacer) {
|
||||
return replaceRange(range, canKeysChange, entriesReplacer, false);
|
||||
}
|
||||
|
||||
Mono<Boolean> isRangeEmpty(@Nullable LLSnapshot snapshot, LLRange range);
|
||||
|
||||
Mono<Long> sizeRange(@Nullable LLSnapshot snapshot, LLRange range, boolean fast);
|
||||
|
@ -52,9 +52,9 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Map<T, U>> get(@Nullable CompositeSnapshot snapshot) {
|
||||
public Mono<Map<T, U>> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) {
|
||||
return dictionary
|
||||
.getRange(resolveSnapshot(snapshot), range)
|
||||
.getRange(resolveSnapshot(snapshot), range, existsAlmostCertainly)
|
||||
.collectMap(
|
||||
entry -> deserializeSuffix(stripPrefix(entry.getKey())),
|
||||
entry -> deserialize(entry.getValue()),
|
||||
@ -104,8 +104,10 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<U> getValue(@Nullable CompositeSnapshot snapshot, T keySuffix) {
|
||||
return dictionary.get(resolveSnapshot(snapshot), toKey(serializeSuffix(keySuffix))).map(this::deserialize);
|
||||
public Mono<U> getValue(@Nullable CompositeSnapshot snapshot, T keySuffix, boolean existsAlmostCertainly) {
|
||||
return dictionary
|
||||
.get(resolveSnapshot(snapshot), toKey(serializeSuffix(keySuffix)), existsAlmostCertainly)
|
||||
.map(this::deserialize);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -114,9 +116,12 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Boolean> updateValue(T keySuffix, Function<Optional<U>, Optional<U>> updater) {
|
||||
public Mono<Boolean> updateValue(T keySuffix,
|
||||
boolean existsAlmostCertainly,
|
||||
Function<Optional<U>, Optional<U>> updater) {
|
||||
return dictionary.update(toKey(serializeSuffix(keySuffix)),
|
||||
oldSerialized -> updater.apply(oldSerialized.map(this::deserialize)).map(this::serialize)
|
||||
oldSerialized -> updater.apply(oldSerialized.map(this::deserialize)).map(this::serialize),
|
||||
existsAlmostCertainly
|
||||
);
|
||||
}
|
||||
|
||||
@ -154,9 +159,9 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Entry<T, U>> getMulti(@Nullable CompositeSnapshot snapshot, Flux<T> keys) {
|
||||
public Flux<Entry<T, U>> getMulti(@Nullable CompositeSnapshot snapshot, Flux<T> keys, boolean existsAlmostCertainly) {
|
||||
return dictionary
|
||||
.getMulti(resolveSnapshot(snapshot), keys.map(keySuffix -> toKey(serializeSuffix(keySuffix))))
|
||||
.getMulti(resolveSnapshot(snapshot), keys.map(keySuffix -> toKey(serializeSuffix(keySuffix))), existsAlmostCertainly)
|
||||
.map(entry -> Map.entry(deserializeSuffix(stripPrefix(entry.getKey())), deserialize(entry.getValue())));
|
||||
}
|
||||
|
||||
|
@ -32,8 +32,8 @@ public class DatabaseSingle<U> implements DatabaseStageEntry<U> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<U> get(@Nullable CompositeSnapshot snapshot) {
|
||||
return dictionary.get(resolveSnapshot(snapshot), key).map(this::deserialize);
|
||||
public Mono<U> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) {
|
||||
return dictionary.get(resolveSnapshot(snapshot), key, existsAlmostCertainly).map(this::deserialize);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -42,9 +42,10 @@ public class DatabaseSingle<U> implements DatabaseStageEntry<U> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Boolean> update(Function<Optional<U>, Optional<U>> updater) {
|
||||
public Mono<Boolean> update(Function<Optional<U>, Optional<U>> updater, boolean existsAlmostCertainly) {
|
||||
return dictionary.update(key,
|
||||
(oldValueSer) -> updater.apply(oldValueSer.map(this::deserialize)).map(this::serialize)
|
||||
(oldValueSer) -> updater.apply(oldValueSer.map(this::deserialize)).map(this::serialize),
|
||||
existsAlmostCertainly
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -18,8 +18,8 @@ public class DatabaseSingleMapped<U> implements DatabaseStageEntry<U> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<U> get(@Nullable CompositeSnapshot snapshot) {
|
||||
return serializedSingle.get(snapshot).map(this::deserialize);
|
||||
public Mono<U> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) {
|
||||
return serializedSingle.get(snapshot, existsAlmostCertainly).map(this::deserialize);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -43,8 +43,9 @@ public class DatabaseSingleMapped<U> implements DatabaseStageEntry<U> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Boolean> update(Function<Optional<U>, Optional<U>> updater) {
|
||||
return serializedSingle.update(oldValue -> updater.apply(oldValue.map(this::deserialize)).map(this::serialize));
|
||||
public Mono<Boolean> update(Function<Optional<U>, Optional<U>> updater, boolean existsAlmostCertainly) {
|
||||
return serializedSingle
|
||||
.update(oldValue -> updater.apply(oldValue.map(this::deserialize)).map(this::serialize), existsAlmostCertainly);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -9,10 +9,20 @@ import reactor.core.publisher.Mono;
|
||||
|
||||
public interface DatabaseStage<T> extends DatabaseStageWithEntry<T> {
|
||||
|
||||
Mono<T> get(@Nullable CompositeSnapshot snapshot);
|
||||
default Mono<T> get(@Nullable CompositeSnapshot snapshot) {
|
||||
return get(snapshot, false);
|
||||
}
|
||||
|
||||
Mono<T> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly);
|
||||
|
||||
default Mono<T> getOrDefault(@Nullable CompositeSnapshot snapshot,
|
||||
Mono<T> defaultValue,
|
||||
boolean existsAlmostCertainly) {
|
||||
return get(snapshot, existsAlmostCertainly).switchIfEmpty(defaultValue).single();
|
||||
}
|
||||
|
||||
default Mono<T> getOrDefault(@Nullable CompositeSnapshot snapshot, Mono<T> defaultValue) {
|
||||
return get(snapshot).switchIfEmpty(defaultValue).single();
|
||||
return getOrDefault(snapshot, defaultValue, false);
|
||||
}
|
||||
|
||||
default Mono<Void> set(T value) {
|
||||
@ -25,7 +35,11 @@ public interface DatabaseStage<T> extends DatabaseStageWithEntry<T> {
|
||||
return setAndGetPrevious(value).map(oldValue -> !Objects.equals(oldValue, value)).defaultIfEmpty(false);
|
||||
}
|
||||
|
||||
Mono<Boolean> update(Function<Optional<T>, Optional<T>> updater);
|
||||
Mono<Boolean> update(Function<Optional<T>, Optional<T>> updater, boolean existsAlmostCertainly);
|
||||
|
||||
default Mono<Boolean> update(Function<Optional<T>, Optional<T>> updater) {
|
||||
return update(updater, false);
|
||||
}
|
||||
|
||||
default Mono<Void> clear() {
|
||||
return clearAndGetStatus().then();
|
||||
|
@ -17,8 +17,12 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
|
||||
|
||||
Mono<US> at(@Nullable CompositeSnapshot snapshot, T key);
|
||||
|
||||
default Mono<U> getValue(@Nullable CompositeSnapshot snapshot, T key, boolean existsAlmostCertainly) {
|
||||
return this.at(snapshot, key).flatMap(v -> v.get(snapshot, existsAlmostCertainly));
|
||||
}
|
||||
|
||||
default Mono<U> getValue(@Nullable CompositeSnapshot snapshot, T key) {
|
||||
return this.at(snapshot, key).flatMap(v -> v.get(snapshot));
|
||||
return getValue(snapshot, key, false);
|
||||
}
|
||||
|
||||
default Mono<U> getValueOrDefault(@Nullable CompositeSnapshot snapshot, T key, Mono<U> defaultValue) {
|
||||
@ -29,8 +33,12 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
|
||||
return at(null, key).single().flatMap(v -> v.set(value));
|
||||
}
|
||||
|
||||
default Mono<Boolean> updateValue(T key, boolean existsAlmostCertainly, Function<Optional<U>, Optional<U>> updater) {
|
||||
return at(null, key).single().flatMap(v -> v.update(updater, existsAlmostCertainly));
|
||||
}
|
||||
|
||||
default Mono<Boolean> updateValue(T key, Function<Optional<U>, Optional<U>> updater) {
|
||||
return at(null, key).single().flatMap(v -> v.update(updater));
|
||||
return updateValue(key, false, updater);
|
||||
}
|
||||
|
||||
default Mono<U> putValueAndGetPrevious(T key, U value) {
|
||||
@ -53,8 +61,14 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
|
||||
return removeAndGetPrevious(key).map(o -> true).defaultIfEmpty(false);
|
||||
}
|
||||
|
||||
default Flux<Entry<T, U>> getMulti(@Nullable CompositeSnapshot snapshot, Flux<T> keys, boolean existsAlmostCertainly) {
|
||||
return keys.flatMapSequential(key -> this
|
||||
.getValue(snapshot, key, existsAlmostCertainly)
|
||||
.map(value -> Map.entry(key, value)));
|
||||
}
|
||||
|
||||
default Flux<Entry<T, U>> getMulti(@Nullable CompositeSnapshot snapshot, Flux<T> keys) {
|
||||
return keys.flatMapSequential(key -> this.getValue(snapshot, key).map(value -> Map.entry(key, value)));
|
||||
return getMulti(snapshot, keys, false);
|
||||
}
|
||||
|
||||
default Mono<Void> putMulti(Flux<Entry<T, U>> entries) {
|
||||
@ -68,7 +82,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
|
||||
.getAllStages(snapshot)
|
||||
.flatMapSequential(entry -> entry
|
||||
.getValue()
|
||||
.get(snapshot)
|
||||
.get(snapshot, true)
|
||||
.map(value -> Map.entry(entry.getKey(), value))
|
||||
);
|
||||
}
|
||||
@ -112,7 +126,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
|
||||
}
|
||||
|
||||
@Override
|
||||
default Mono<Boolean> update(Function<Optional<Map<T, U>>, Optional<Map<T, U>>> updater) {
|
||||
default Mono<Boolean> update(Function<Optional<Map<T, U>>, Optional<Map<T, U>>> updater, boolean existsAlmostCertainly) {
|
||||
return this
|
||||
.getAllValues(null)
|
||||
.collectMap(Entry::getKey, Entry::getValue, HashMap::new)
|
||||
@ -132,7 +146,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
|
||||
}
|
||||
|
||||
@Override
|
||||
default Mono<Map<T, U>> get(@Nullable CompositeSnapshot snapshot) {
|
||||
default Mono<Map<T, U>> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) {
|
||||
return getAllValues(snapshot)
|
||||
.collectMap(Entry::getKey, Entry::getValue, HashMap::new);
|
||||
}
|
||||
|
@ -122,7 +122,7 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<byte[]> get(@Nullable LLSnapshot snapshot, byte[] key) {
|
||||
public Mono<byte[]> get(@Nullable LLSnapshot snapshot, byte[] key, boolean existsAlmostCertainly) {
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
StampedLock lock;
|
||||
@ -137,9 +137,9 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
}
|
||||
try {
|
||||
logger.trace("Reading {}", key);
|
||||
Holder<byte[]> data = new Holder<>();
|
||||
if (db.keyMayExist(cfh, resolveSnapshot(snapshot), key, data)) {
|
||||
if (data.getValue() != null) {
|
||||
Holder<byte[]> data = existsAlmostCertainly ? null : new Holder<>();
|
||||
if (existsAlmostCertainly || db.keyMayExist(cfh, resolveSnapshot(snapshot), key, data)) {
|
||||
if (!existsAlmostCertainly && data.getValue() != null) {
|
||||
return data.getValue();
|
||||
} else {
|
||||
return db.get(cfh, resolveSnapshot(snapshot), key);
|
||||
@ -253,7 +253,9 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Boolean> update(byte[] key, Function<Optional<byte[]>, Optional<byte[]>> value) {
|
||||
public Mono<Boolean> update(byte[] key,
|
||||
Function<Optional<byte[]>, Optional<byte[]>> value,
|
||||
boolean existsAlmostCertainly) {
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
if (updateMode == UpdateMode.DISALLOW) throw new UnsupportedOperationException("update() is disallowed");
|
||||
@ -272,9 +274,9 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
while (true) {
|
||||
boolean changed = false;
|
||||
Optional<byte[]> prevData;
|
||||
var prevDataHolder = new Holder<byte[]>();
|
||||
if (db.keyMayExist(cfh, key, prevDataHolder)) {
|
||||
if (prevDataHolder.getValue() != null) {
|
||||
var prevDataHolder = existsAlmostCertainly ? null : new Holder<byte[]>();
|
||||
if (existsAlmostCertainly || db.keyMayExist(cfh, key, prevDataHolder)) {
|
||||
if (!existsAlmostCertainly && prevDataHolder.getValue() != null) {
|
||||
prevData = Optional.ofNullable(prevDataHolder.getValue());
|
||||
} else {
|
||||
prevData = Optional.ofNullable(db.get(cfh, key));
|
||||
@ -403,7 +405,9 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Entry<byte[], byte[]>> getMulti(@Nullable LLSnapshot snapshot, Flux<byte[]> keys) {
|
||||
public Flux<Entry<byte[], byte[]>> getMulti(@Nullable LLSnapshot snapshot,
|
||||
Flux<byte[]> keys,
|
||||
boolean existsAlmostCertainly) {
|
||||
return keys
|
||||
.window(MULTI_GET_WINDOW)
|
||||
.flatMap(keysWindowFlux -> keysWindowFlux.collectList()
|
||||
@ -460,7 +464,7 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
.window(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP))
|
||||
.flatMap(Flux::collectList)
|
||||
.flatMap(entriesWindow -> this
|
||||
.getMulti(null, Flux.fromIterable(entriesWindow).map(Entry::getKey))
|
||||
.getMulti(null, Flux.fromIterable(entriesWindow).map(Entry::getKey), false)
|
||||
.publishOn(dbScheduler)
|
||||
.concatWith(Mono.fromCallable(() -> {
|
||||
Iterable<StampedLock> locks;
|
||||
@ -502,11 +506,13 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
|
||||
|
||||
@NotNull
|
||||
private Mono<Entry<byte[], byte[]>> putEntryToWriteBatch(Entry<byte[], byte[]> newEntry, boolean getOldValues,
|
||||
private Mono<Entry<byte[], byte[]>> putEntryToWriteBatch(Entry<byte[], byte[]> newEntry,
|
||||
boolean getOldValues,
|
||||
boolean existsAlmostCertainly,
|
||||
CappedWriteBatch writeBatch) {
|
||||
Mono<byte[]> getOldValueMono;
|
||||
if (getOldValues) {
|
||||
getOldValueMono = get(null, newEntry.getKey());
|
||||
getOldValueMono = get(null, newEntry.getKey(), existsAlmostCertainly);
|
||||
} else {
|
||||
getOldValueMono = Mono.empty();
|
||||
}
|
||||
@ -523,9 +529,11 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Entry<byte[], byte[]>> getRange(@Nullable LLSnapshot snapshot, LLRange range) {
|
||||
public Flux<Entry<byte[], byte[]>> getRange(@Nullable LLSnapshot snapshot,
|
||||
LLRange range,
|
||||
boolean existsAlmostCertainly) {
|
||||
if (range.isSingle()) {
|
||||
return getRangeSingle(snapshot, range.getMin());
|
||||
return getRangeSingle(snapshot, range.getMin(), existsAlmostCertainly);
|
||||
} else {
|
||||
return getRangeMulti(snapshot, range);
|
||||
}
|
||||
@ -534,17 +542,17 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
@Override
|
||||
public Flux<List<Entry<byte[], byte[]>>> getRangeGrouped(@Nullable LLSnapshot snapshot,
|
||||
LLRange range,
|
||||
int prefixLength) {
|
||||
int prefixLength, boolean existsAlmostCertainly) {
|
||||
if (range.isSingle()) {
|
||||
return getRangeSingle(snapshot, range.getMin()).map(List::of);
|
||||
return getRangeSingle(snapshot, range.getMin(), existsAlmostCertainly).map(List::of);
|
||||
} else {
|
||||
return getRangeMultiGrouped(snapshot, range, prefixLength);
|
||||
}
|
||||
}
|
||||
|
||||
private Flux<Entry<byte[],byte[]>> getRangeSingle(LLSnapshot snapshot, byte[] key) {
|
||||
private Flux<Entry<byte[],byte[]>> getRangeSingle(LLSnapshot snapshot, byte[] key, boolean existsAlmostCertainly) {
|
||||
return this
|
||||
.get(snapshot, key)
|
||||
.get(snapshot, key, existsAlmostCertainly)
|
||||
.map(value -> Map.entry(key, value))
|
||||
.flux();
|
||||
}
|
||||
@ -660,7 +668,7 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
})
|
||||
.subscribeOn(dbScheduler)
|
||||
.thenMany(entries)
|
||||
.flatMapSequential(newEntry -> putEntryToWriteBatch(newEntry, getOldValues, writeBatch)),
|
||||
.flatMapSequential(newEntry -> putEntryToWriteBatch(newEntry, getOldValues, false, writeBatch)),
|
||||
writeBatch -> Mono
|
||||
.fromCallable(() -> {
|
||||
try (writeBatch) {
|
||||
|
Loading…
Reference in New Issue
Block a user