Optimize some methods

This commit is contained in:
Andrea Cavalli 2022-01-26 19:56:51 +01:00
parent 95afa6f9dd
commit fb19a7a9f3

View File

@ -14,7 +14,6 @@ import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator; import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.Send; import io.net5.buffer.api.Send;
import io.net5.buffer.api.internal.ResourceSupport; import io.net5.buffer.api.internal.ResourceSupport;
import io.net5.util.internal.PlatformDependent;
import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.client.BadBlock;
import it.cavallium.dbengine.client.DatabaseOptions; import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.database.Column; import it.cavallium.dbengine.database.Column;
@ -38,10 +37,8 @@ import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask; import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.TimeUnit;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
@ -781,38 +778,33 @@ public class LLLocalDictionary implements LLDictionary {
public Flux<Send<LLEntry>> getRange(@Nullable LLSnapshot snapshot, public Flux<Send<LLEntry>> getRange(@Nullable LLSnapshot snapshot,
Mono<Send<LLRange>> rangeMono, Mono<Send<LLRange>> rangeMono,
boolean existsAlmostCertainly) { boolean existsAlmostCertainly) {
return Flux.usingWhen(rangeMono, return rangeMono.flatMapMany(rangeSend -> {
rangeSend -> { try (var range = rangeSend.receive()) {
try (var range = rangeSend.receive()) { if (range.isSingle()) {
if (range.isSingle()) { var rangeSingleMono = rangeMono.map(r -> r.receive().getSingle());
var rangeSingleMono = rangeMono.map(r -> r.receive().getSingle()); return getRangeSingle(snapshot, rangeSingleMono, existsAlmostCertainly);
return getRangeSingle(snapshot, rangeSingleMono, existsAlmostCertainly); } else {
} else { return getRangeMulti(snapshot, rangeMono);
return getRangeMulti(snapshot, rangeMono); }
} }
} });
},
rangeSend -> Mono.fromRunnable(rangeSend::close)
);
} }
@Override @Override
public Flux<List<Send<LLEntry>>> getRangeGrouped(@Nullable LLSnapshot snapshot, public Flux<List<Send<LLEntry>>> getRangeGrouped(@Nullable LLSnapshot snapshot,
Mono<Send<LLRange>> rangeMono, Mono<Send<LLRange>> rangeMono,
int prefixLength, boolean existsAlmostCertainly) { int prefixLength,
return Flux.usingWhen(rangeMono, boolean existsAlmostCertainly) {
rangeSend -> { return rangeMono.flatMapMany(rangeSend -> {
try (var range = rangeSend.receive()) { try (var range = rangeSend.receive()) {
if (range.isSingle()) { if (range.isSingle()) {
var rangeSingleMono = rangeMono.map(r -> r.receive().getSingle()); var rangeSingleMono = rangeMono.map(r -> r.receive().getSingle());
return getRangeSingle(snapshot, rangeSingleMono, existsAlmostCertainly).map(List::of); return getRangeSingle(snapshot, rangeSingleMono, existsAlmostCertainly).map(List::of);
} else { } else {
return getRangeMultiGrouped(snapshot, rangeMono, prefixLength); return getRangeMultiGrouped(snapshot, rangeMono, prefixLength);
} }
} }
}, });
rangeSend -> Mono.fromRunnable(rangeSend::close)
);
} }
private Flux<Send<LLEntry>> getRangeSingle(LLSnapshot snapshot, private Flux<Send<LLEntry>> getRangeSingle(LLSnapshot snapshot,
@ -825,58 +817,53 @@ public class LLLocalDictionary implements LLDictionary {
} }
private Flux<Send<LLEntry>> getRangeMulti(LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono) { private Flux<Send<LLEntry>> getRangeMulti(LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono) {
return Flux.usingWhen(rangeMono, Mono<LLLocalEntryReactiveRocksIterator> iteratorMono = rangeMono.map(rangeSend -> {
rangeSend -> Flux.using( ReadOptions resolvedSnapshot = resolveSnapshot(snapshot);
() -> new LLLocalEntryReactiveRocksIterator(db, rangeSend, return new LLLocalEntryReactiveRocksIterator(db, rangeSend, nettyDirect, resolvedSnapshot);
nettyDirect, resolveSnapshot(snapshot)), });
iterator -> iterator.flux().subscribeOn(dbScheduler, false), return Flux.usingWhen(iteratorMono,
LLLocalReactiveRocksIterator::close iterator -> iterator.flux().subscribeOn(dbScheduler, false),
), iterator -> Mono.fromRunnable(iterator::close)
rangeSend -> Mono.fromRunnable(rangeSend::close)
); );
} }
private Flux<List<Send<LLEntry>>> getRangeMultiGrouped(LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono, private Flux<List<Send<LLEntry>>> getRangeMultiGrouped(LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono,
int prefixLength) { int prefixLength) {
return Flux.usingWhen(rangeMono, Mono<LLLocalGroupedEntryReactiveRocksIterator> iteratorMono = rangeMono.map(rangeSend -> {
rangeSend -> Flux.using( ReadOptions resolvedSnapshot = resolveSnapshot(snapshot);
() -> new LLLocalGroupedEntryReactiveRocksIterator(db, prefixLength, rangeSend, return new LLLocalGroupedEntryReactiveRocksIterator(db, prefixLength, rangeSend, nettyDirect, resolvedSnapshot);
nettyDirect, resolveSnapshot(snapshot)), });
iterator -> iterator.flux().subscribeOn(dbScheduler, false), return Flux.usingWhen(
LLLocalGroupedReactiveRocksIterator::close iteratorMono,
), iterator -> iterator.flux().subscribeOn(dbScheduler, false),
rangeSend -> Mono.fromRunnable(rangeSend::close) iterator -> Mono.fromRunnable(iterator::close)
); );
} }
@Override @Override
public Flux<Send<Buffer>> getRangeKeys(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono) { public Flux<Send<Buffer>> getRangeKeys(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono) {
return Flux.usingWhen(rangeMono, return rangeMono.flatMapMany(rangeSend -> {
rangeSend -> { try (var range = rangeSend.receive()) {
try (var range = rangeSend.receive()) { if (range.isSingle()) {
if (range.isSingle()) { return this.getRangeKeysSingle(snapshot, rangeMono.map(r -> r.receive().getSingle()));
return this.getRangeKeysSingle(snapshot, rangeMono.map(r -> r.receive().getSingle())); } else {
} else { return this.getRangeKeysMulti(snapshot, rangeMono);
return this.getRangeKeysMulti(snapshot, rangeMono); }
} }
} });
},
rangeSend -> Mono.fromRunnable(rangeSend::close)
);
} }
@Override @Override
public Flux<List<Send<Buffer>>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, public Flux<List<Send<Buffer>>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot,
Mono<Send<LLRange>> rangeMono, Mono<Send<LLRange>> rangeMono,
int prefixLength) { int prefixLength) {
return Flux.usingWhen(rangeMono, Mono<LLLocalGroupedKeyReactiveRocksIterator> iteratorMono = rangeMono.map(rangeSend -> {
rangeSend -> Flux.using( ReadOptions resolvedSnapshot = resolveSnapshot(snapshot);
() -> new LLLocalGroupedKeyReactiveRocksIterator(db, prefixLength, rangeSend, return new LLLocalGroupedKeyReactiveRocksIterator(db, prefixLength, rangeSend, nettyDirect, resolvedSnapshot);
nettyDirect, resolveSnapshot(snapshot)), });
iterator -> iterator.flux().subscribeOn(dbScheduler, false), return Flux.usingWhen(iteratorMono,
LLLocalGroupedReactiveRocksIterator::close iterator -> iterator.flux().subscribeOn(dbScheduler, false),
), iterator -> Mono.fromRunnable(iterator::close)
rangeSend -> Mono.fromRunnable(rangeSend::close)
); );
} }
@ -923,55 +910,47 @@ public class LLLocalDictionary implements LLDictionary {
@Override @Override
public Flux<Send<Buffer>> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono, public Flux<Send<Buffer>> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono,
int prefixLength) { int prefixLength) {
return Flux.usingWhen(rangeMono, Mono<LLLocalKeyPrefixReactiveRocksIterator> iteratorMono = rangeMono.map(range -> {
rangeSend -> Flux ReadOptions resolvedSnapshot = resolveSnapshot(snapshot);
.using( return new LLLocalKeyPrefixReactiveRocksIterator(db, prefixLength, range, nettyDirect, resolvedSnapshot, true);
() -> new LLLocalKeyPrefixReactiveRocksIterator(db, });
prefixLength, return Flux.usingWhen(iteratorMono,
rangeSend, iterator -> iterator.flux().subscribeOn(dbScheduler),
nettyDirect, iterator -> Mono.fromRunnable(iterator::close)
resolveSnapshot(snapshot),
true
),
LLLocalKeyPrefixReactiveRocksIterator::flux,
LLLocalKeyPrefixReactiveRocksIterator::close
)
.subscribeOn(dbScheduler),
rangeSend -> Mono.fromRunnable(rangeSend::close)
); );
} }
private Flux<Send<Buffer>> getRangeKeysSingle(LLSnapshot snapshot, Mono<Send<Buffer>> keyMono) { private Flux<Send<Buffer>> getRangeKeysSingle(LLSnapshot snapshot, Mono<Send<Buffer>> keyMono) {
return Flux.usingWhen(keyMono, return keyMono
keySend -> this .publishOn(dbScheduler)
.containsKey(snapshot, keyMono) .<Send<Buffer>>handle((keySend, sink) -> {
.<Send<Buffer>>handle((contains, sink) -> { try (var key = keySend.receive()) {
if (contains) { if (containsKey(snapshot, key)) {
sink.next(keySend); sink.next(key.send());
} else { } else {
sink.complete(); sink.complete();
} }
}) } catch (Throwable ex) {
.flux(), sink.error(ex);
keySend -> Mono.fromRunnable(keySend::close) }
); })
.flux();
} }
private Flux<Send<Buffer>> getRangeKeysMulti(LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono) { private Flux<Send<Buffer>> getRangeKeysMulti(LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono) {
return Flux.usingWhen(rangeMono, Mono<LLLocalKeyReactiveRocksIterator> iteratorMono = rangeMono.map(range -> {
rangeSend -> Flux.using( ReadOptions resolvedSnapshot = resolveSnapshot(snapshot);
() -> new LLLocalKeyReactiveRocksIterator(db, rangeSend, return new LLLocalKeyReactiveRocksIterator(db, range, nettyDirect, resolvedSnapshot);
nettyDirect, resolveSnapshot(snapshot) });
), return Flux.usingWhen(iteratorMono,
iterator -> iterator.flux().subscribeOn(dbScheduler, false), iterator -> iterator.flux().subscribeOn(dbScheduler, false),
LLLocalReactiveRocksIterator::close iterator -> Mono.fromRunnable(iterator::close)
),
rangeSend -> Mono.fromRunnable(rangeSend::close)
); );
} }
@Override @Override
public Mono<Void> setRange(Mono<Send<LLRange>> rangeMono, Flux<Send<LLEntry>> entries) { public Mono<Void> setRange(Mono<Send<LLRange>> rangeMono, Flux<Send<LLEntry>> entries) {
//todo: change usingWhen and use a better alternative
return Mono.usingWhen(rangeMono, return Mono.usingWhen(rangeMono,
rangeSend -> { rangeSend -> {
if (USE_WINDOW_IN_SET_RANGE) { if (USE_WINDOW_IN_SET_RANGE) {