From fb19a7a9f3c8de34f7c7747877e343d06a0fcf16 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Wed, 26 Jan 2022 19:56:51 +0100 Subject: [PATCH] Optimize some methods --- .../database/disk/LLLocalDictionary.java | 185 ++++++++---------- 1 file changed, 82 insertions(+), 103 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index ee44262..b9a53fd 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -14,7 +14,6 @@ import io.net5.buffer.api.Buffer; import io.net5.buffer.api.BufferAllocator; import io.net5.buffer.api.Send; import io.net5.buffer.api.internal.ResourceSupport; -import io.net5.util.internal.PlatformDependent; import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.client.DatabaseOptions; import it.cavallium.dbengine.database.Column; @@ -38,10 +37,8 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.Callable; -import java.util.concurrent.CompletionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; -import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.IntStream; import org.apache.commons.lang3.tuple.Pair; @@ -781,38 +778,33 @@ public class LLLocalDictionary implements LLDictionary { public Flux> getRange(@Nullable LLSnapshot snapshot, Mono> rangeMono, boolean existsAlmostCertainly) { - return Flux.usingWhen(rangeMono, - rangeSend -> { - try (var range = rangeSend.receive()) { - if (range.isSingle()) { - var rangeSingleMono = rangeMono.map(r -> r.receive().getSingle()); - return getRangeSingle(snapshot, rangeSingleMono, existsAlmostCertainly); - } else { - return getRangeMulti(snapshot, rangeMono); - } - } - }, - rangeSend -> Mono.fromRunnable(rangeSend::close) - ); + return rangeMono.flatMapMany(rangeSend -> { + try (var range = rangeSend.receive()) { + if (range.isSingle()) { + var rangeSingleMono = rangeMono.map(r -> r.receive().getSingle()); + return getRangeSingle(snapshot, rangeSingleMono, existsAlmostCertainly); + } else { + return getRangeMulti(snapshot, rangeMono); + } + } + }); } @Override public Flux>> getRangeGrouped(@Nullable LLSnapshot snapshot, Mono> rangeMono, - int prefixLength, boolean existsAlmostCertainly) { - return Flux.usingWhen(rangeMono, - rangeSend -> { - try (var range = rangeSend.receive()) { - if (range.isSingle()) { - var rangeSingleMono = rangeMono.map(r -> r.receive().getSingle()); - return getRangeSingle(snapshot, rangeSingleMono, existsAlmostCertainly).map(List::of); - } else { - return getRangeMultiGrouped(snapshot, rangeMono, prefixLength); - } - } - }, - rangeSend -> Mono.fromRunnable(rangeSend::close) - ); + int prefixLength, + boolean existsAlmostCertainly) { + return rangeMono.flatMapMany(rangeSend -> { + try (var range = rangeSend.receive()) { + if (range.isSingle()) { + var rangeSingleMono = rangeMono.map(r -> r.receive().getSingle()); + return getRangeSingle(snapshot, rangeSingleMono, existsAlmostCertainly).map(List::of); + } else { + return getRangeMultiGrouped(snapshot, rangeMono, prefixLength); + } + } + }); } private Flux> getRangeSingle(LLSnapshot snapshot, @@ -825,58 +817,53 @@ public class LLLocalDictionary implements LLDictionary { } private Flux> getRangeMulti(LLSnapshot snapshot, Mono> rangeMono) { - return Flux.usingWhen(rangeMono, - rangeSend -> Flux.using( - () -> new LLLocalEntryReactiveRocksIterator(db, rangeSend, - nettyDirect, resolveSnapshot(snapshot)), - iterator -> iterator.flux().subscribeOn(dbScheduler, false), - LLLocalReactiveRocksIterator::close - ), - rangeSend -> Mono.fromRunnable(rangeSend::close) + Mono iteratorMono = rangeMono.map(rangeSend -> { + ReadOptions resolvedSnapshot = resolveSnapshot(snapshot); + return new LLLocalEntryReactiveRocksIterator(db, rangeSend, nettyDirect, resolvedSnapshot); + }); + return Flux.usingWhen(iteratorMono, + iterator -> iterator.flux().subscribeOn(dbScheduler, false), + iterator -> Mono.fromRunnable(iterator::close) ); } private Flux>> getRangeMultiGrouped(LLSnapshot snapshot, Mono> rangeMono, int prefixLength) { - return Flux.usingWhen(rangeMono, - rangeSend -> Flux.using( - () -> new LLLocalGroupedEntryReactiveRocksIterator(db, prefixLength, rangeSend, - nettyDirect, resolveSnapshot(snapshot)), - iterator -> iterator.flux().subscribeOn(dbScheduler, false), - LLLocalGroupedReactiveRocksIterator::close - ), - rangeSend -> Mono.fromRunnable(rangeSend::close) + Mono iteratorMono = rangeMono.map(rangeSend -> { + ReadOptions resolvedSnapshot = resolveSnapshot(snapshot); + return new LLLocalGroupedEntryReactiveRocksIterator(db, prefixLength, rangeSend, nettyDirect, resolvedSnapshot); + }); + return Flux.usingWhen( + iteratorMono, + iterator -> iterator.flux().subscribeOn(dbScheduler, false), + iterator -> Mono.fromRunnable(iterator::close) ); } @Override public Flux> getRangeKeys(@Nullable LLSnapshot snapshot, Mono> rangeMono) { - return Flux.usingWhen(rangeMono, - rangeSend -> { - try (var range = rangeSend.receive()) { - if (range.isSingle()) { - return this.getRangeKeysSingle(snapshot, rangeMono.map(r -> r.receive().getSingle())); - } else { - return this.getRangeKeysMulti(snapshot, rangeMono); - } - } - }, - rangeSend -> Mono.fromRunnable(rangeSend::close) - ); + return rangeMono.flatMapMany(rangeSend -> { + try (var range = rangeSend.receive()) { + if (range.isSingle()) { + return this.getRangeKeysSingle(snapshot, rangeMono.map(r -> r.receive().getSingle())); + } else { + return this.getRangeKeysMulti(snapshot, rangeMono); + } + } + }); } @Override public Flux>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, Mono> rangeMono, int prefixLength) { - return Flux.usingWhen(rangeMono, - rangeSend -> Flux.using( - () -> new LLLocalGroupedKeyReactiveRocksIterator(db, prefixLength, rangeSend, - nettyDirect, resolveSnapshot(snapshot)), - iterator -> iterator.flux().subscribeOn(dbScheduler, false), - LLLocalGroupedReactiveRocksIterator::close - ), - rangeSend -> Mono.fromRunnable(rangeSend::close) + Mono iteratorMono = rangeMono.map(rangeSend -> { + ReadOptions resolvedSnapshot = resolveSnapshot(snapshot); + return new LLLocalGroupedKeyReactiveRocksIterator(db, prefixLength, rangeSend, nettyDirect, resolvedSnapshot); + }); + return Flux.usingWhen(iteratorMono, + iterator -> iterator.flux().subscribeOn(dbScheduler, false), + iterator -> Mono.fromRunnable(iterator::close) ); } @@ -923,55 +910,47 @@ public class LLLocalDictionary implements LLDictionary { @Override public Flux> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, Mono> rangeMono, int prefixLength) { - return Flux.usingWhen(rangeMono, - rangeSend -> Flux - .using( - () -> new LLLocalKeyPrefixReactiveRocksIterator(db, - prefixLength, - rangeSend, - nettyDirect, - resolveSnapshot(snapshot), - true - ), - LLLocalKeyPrefixReactiveRocksIterator::flux, - LLLocalKeyPrefixReactiveRocksIterator::close - ) - .subscribeOn(dbScheduler), - rangeSend -> Mono.fromRunnable(rangeSend::close) + Mono iteratorMono = rangeMono.map(range -> { + ReadOptions resolvedSnapshot = resolveSnapshot(snapshot); + return new LLLocalKeyPrefixReactiveRocksIterator(db, prefixLength, range, nettyDirect, resolvedSnapshot, true); + }); + return Flux.usingWhen(iteratorMono, + iterator -> iterator.flux().subscribeOn(dbScheduler), + iterator -> Mono.fromRunnable(iterator::close) ); } private Flux> getRangeKeysSingle(LLSnapshot snapshot, Mono> keyMono) { - return Flux.usingWhen(keyMono, - keySend -> this - .containsKey(snapshot, keyMono) - .>handle((contains, sink) -> { - if (contains) { - sink.next(keySend); - } else { - sink.complete(); - } - }) - .flux(), - keySend -> Mono.fromRunnable(keySend::close) - ); + return keyMono + .publishOn(dbScheduler) + .>handle((keySend, sink) -> { + try (var key = keySend.receive()) { + if (containsKey(snapshot, key)) { + sink.next(key.send()); + } else { + sink.complete(); + } + } catch (Throwable ex) { + sink.error(ex); + } + }) + .flux(); } private Flux> getRangeKeysMulti(LLSnapshot snapshot, Mono> rangeMono) { - return Flux.usingWhen(rangeMono, - rangeSend -> Flux.using( - () -> new LLLocalKeyReactiveRocksIterator(db, rangeSend, - nettyDirect, resolveSnapshot(snapshot) - ), - iterator -> iterator.flux().subscribeOn(dbScheduler, false), - LLLocalReactiveRocksIterator::close - ), - rangeSend -> Mono.fromRunnable(rangeSend::close) + Mono iteratorMono = rangeMono.map(range -> { + ReadOptions resolvedSnapshot = resolveSnapshot(snapshot); + return new LLLocalKeyReactiveRocksIterator(db, range, nettyDirect, resolvedSnapshot); + }); + return Flux.usingWhen(iteratorMono, + iterator -> iterator.flux().subscribeOn(dbScheduler, false), + iterator -> Mono.fromRunnable(iterator::close) ); } @Override public Mono setRange(Mono> rangeMono, Flux> entries) { + //todo: change usingWhen and use a better alternative return Mono.usingWhen(rangeMono, rangeSend -> { if (USE_WINDOW_IN_SET_RANGE) {