Add the possibility to iterate only a slice of the database
This commit is contained in:
parent
81b26eed82
commit
39811dc3f3
@ -72,13 +72,22 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
|||||||
|
|
||||||
public static <K, V> Flux<Entry<K, V>> getLeavesFrom(DatabaseMapDictionary<K, V> databaseMapDictionary,
|
public static <K, V> Flux<Entry<K, V>> getLeavesFrom(DatabaseMapDictionary<K, V> databaseMapDictionary,
|
||||||
CompositeSnapshot snapshot,
|
CompositeSnapshot snapshot,
|
||||||
Mono<K> key,
|
Mono<K> keyMin,
|
||||||
|
Mono<K> keyMax,
|
||||||
boolean reverse, boolean smallRange) {
|
boolean reverse, boolean smallRange) {
|
||||||
Mono<Optional<K>> keyOptMono = key.map(Optional::of).defaultIfEmpty(Optional.empty());
|
Mono<Optional<K>> keyMinOptMono = keyMin.map(Optional::of).defaultIfEmpty(Optional.empty());
|
||||||
|
Mono<Optional<K>> keyMaxOptMono = keyMax.map(Optional::of).defaultIfEmpty(Optional.empty());
|
||||||
|
|
||||||
return keyOptMono.flatMapMany(keyOpt -> {
|
return Mono.zip(keyMinOptMono, keyMaxOptMono).flatMapMany(entry -> {
|
||||||
if (keyOpt.isPresent()) {
|
var keyMinOpt = entry.getT1();
|
||||||
return databaseMapDictionary.getAllValues(snapshot, keyOpt.get(), reverse, smallRange);
|
var keyMaxOpt = entry.getT2();
|
||||||
|
if (keyMinOpt.isPresent() || keyMaxOpt.isPresent()) {
|
||||||
|
return databaseMapDictionary.getAllValues(snapshot,
|
||||||
|
keyMinOpt.orElse(null),
|
||||||
|
keyMaxOpt.orElse(null),
|
||||||
|
reverse,
|
||||||
|
smallRange
|
||||||
|
);
|
||||||
} else {
|
} else {
|
||||||
return databaseMapDictionary.getAllValues(snapshot, smallRange);
|
return databaseMapDictionary.getAllValues(snapshot, smallRange);
|
||||||
}
|
}
|
||||||
@ -87,15 +96,19 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
|||||||
|
|
||||||
public static <K> Flux<K> getKeyLeavesFrom(DatabaseMapDictionary<K, ?> databaseMapDictionary,
|
public static <K> Flux<K> getKeyLeavesFrom(DatabaseMapDictionary<K, ?> databaseMapDictionary,
|
||||||
CompositeSnapshot snapshot,
|
CompositeSnapshot snapshot,
|
||||||
Mono<K> key,
|
Mono<K> keyMin,
|
||||||
|
Mono<K> keyMax,
|
||||||
boolean reverse, boolean smallRange) {
|
boolean reverse, boolean smallRange) {
|
||||||
Mono<Optional<K>> keyOptMono = key.map(Optional::of).defaultIfEmpty(Optional.empty());
|
Mono<Optional<K>> keyMinOptMono = keyMin.map(Optional::of).defaultIfEmpty(Optional.empty());
|
||||||
|
Mono<Optional<K>> keyMaxOptMono = keyMax.map(Optional::of).defaultIfEmpty(Optional.empty());
|
||||||
|
|
||||||
return keyOptMono.flatMapMany(keyOpt -> {
|
return Mono.zip(keyMinOptMono, keyMaxOptMono).flatMapMany(keys -> {
|
||||||
|
var keyMinOpt = keys.getT1();
|
||||||
|
var keyMaxOpt = keys.getT2();
|
||||||
Flux<? extends Entry<K, ? extends DatabaseStageEntry<?>>> stagesFlux;
|
Flux<? extends Entry<K, ? extends DatabaseStageEntry<?>>> stagesFlux;
|
||||||
if (keyOpt.isPresent()) {
|
if (keyMinOpt.isPresent() || keyMaxOpt.isPresent()) {
|
||||||
stagesFlux = databaseMapDictionary
|
stagesFlux = databaseMapDictionary
|
||||||
.getAllStages(snapshot, keyOpt.get(), reverse, smallRange);
|
.getAllStages(snapshot, keyMinOpt.orElse(null), keyMaxOpt.orElse(null), reverse, smallRange);
|
||||||
} else {
|
} else {
|
||||||
stagesFlux = databaseMapDictionary.getAllStages(snapshot, smallRange);
|
stagesFlux = databaseMapDictionary.getAllStages(snapshot, smallRange);
|
||||||
}
|
}
|
||||||
@ -483,34 +496,63 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
|||||||
return getAllStages(snapshot, rangeMono, false, smallRange);
|
return getAllStages(snapshot, rangeMono, false, smallRange);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Send<LLRange> getPatchedRange(@NotNull Send<LLRange> rangeSend, @Nullable T keyMin, @Nullable T keyMax)
|
||||||
|
throws SerializationException {
|
||||||
|
try (var range = rangeSend.receive()) {
|
||||||
|
try (Send<Buffer> keyMinBuf = serializeSuffixForRange(keyMin)) {
|
||||||
|
try (Send<Buffer> keyMaxBuf = serializeSuffixForRange(keyMax)) {
|
||||||
|
Send<Buffer> keyMinBufSend;
|
||||||
|
if (keyMinBuf == null) {
|
||||||
|
keyMinBufSend = range.getMin();
|
||||||
|
} else {
|
||||||
|
keyMinBufSend = keyMinBuf;
|
||||||
|
}
|
||||||
|
Send<Buffer> keyMaxBufSend;
|
||||||
|
if (keyMaxBuf == null) {
|
||||||
|
keyMaxBufSend = range.getMax();
|
||||||
|
} else {
|
||||||
|
keyMaxBufSend = keyMaxBuf;
|
||||||
|
}
|
||||||
|
return LLRange.of(keyMinBufSend, keyMaxBufSend).send();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Send<Buffer> serializeSuffixForRange(@Nullable T key) throws SerializationException {
|
||||||
|
if (key == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
try (var keyWithoutExtBuf = keyPrefix == null ? alloc.allocate(keySuffixLength + keyExtLength)
|
||||||
|
// todo: use a read-only copy
|
||||||
|
: keyPrefix.copy()) {
|
||||||
|
keyWithoutExtBuf.ensureWritable(keySuffixLength + keyExtLength);
|
||||||
|
serializeSuffix(key, keyWithoutExtBuf);
|
||||||
|
return keyWithoutExtBuf.send();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get all stages
|
* Get all stages
|
||||||
* @param key from/to the specified key, if not null
|
|
||||||
* @param reverse if true, the results will go backwards from the specified key (inclusive)
|
* @param reverse if true, the results will go backwards from the specified key (inclusive)
|
||||||
* @param smallRange
|
* @param smallRange
|
||||||
*/
|
*/
|
||||||
public Flux<Entry<T, DatabaseStageEntry<U>>> getAllStages(@Nullable CompositeSnapshot snapshot,
|
public Flux<Entry<T, DatabaseStageEntry<U>>> getAllStages(@Nullable CompositeSnapshot snapshot,
|
||||||
@Nullable T key,
|
@Nullable T keyMin,
|
||||||
|
@Nullable T keyMax,
|
||||||
boolean reverse,
|
boolean reverse,
|
||||||
boolean smallRange) {
|
boolean smallRange) {
|
||||||
if (key == null) {
|
if (keyMin == null && keyMax == null) {
|
||||||
return getAllStages(snapshot, smallRange);
|
return getAllStages(snapshot, smallRange);
|
||||||
} else {
|
} else {
|
||||||
Mono<Send<LLRange>> boundedRangeMono = rangeMono.flatMap(fullRangeSend -> Mono.fromCallable(() -> {
|
Mono<Send<LLRange>> boundedRangeMono = rangeMono
|
||||||
try (var fullRange = fullRangeSend.receive()) {
|
.handle((fullRangeSend, sink) -> {
|
||||||
try (var keyWithoutExtBuf = keyPrefix == null ? alloc.allocate(keySuffixLength + keyExtLength)
|
try {
|
||||||
// todo: use a read-only copy
|
sink.next(getPatchedRange(fullRangeSend, keyMin, keyMax));
|
||||||
: keyPrefix.copy()) {
|
} catch (SerializationException e) {
|
||||||
keyWithoutExtBuf.ensureWritable(keySuffixLength + keyExtLength);
|
sink.error(e);
|
||||||
serializeSuffix(key, keyWithoutExtBuf);
|
|
||||||
if (reverse) {
|
|
||||||
return LLRange.of(fullRange.getMin(), keyWithoutExtBuf.send()).send();
|
|
||||||
} else {
|
|
||||||
return LLRange.of(keyWithoutExtBuf.send(), fullRange.getMax()).send();
|
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
}
|
|
||||||
}));
|
|
||||||
return getAllStages(snapshot, boundedRangeMono, reverse, smallRange);
|
return getAllStages(snapshot, boundedRangeMono, reverse, smallRange);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -546,32 +588,25 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Get all values
|
* Get all values
|
||||||
* @param key from/to the specified key, if not null
|
|
||||||
* @param reverse if true, the results will go backwards from the specified key (inclusive)
|
* @param reverse if true, the results will go backwards from the specified key (inclusive)
|
||||||
* @param smallRange
|
* @param smallRange
|
||||||
*/
|
*/
|
||||||
public Flux<Entry<T, U>> getAllValues(@Nullable CompositeSnapshot snapshot,
|
public Flux<Entry<T, U>> getAllValues(@Nullable CompositeSnapshot snapshot,
|
||||||
@Nullable T key,
|
@Nullable T keyMin,
|
||||||
|
@Nullable T keyMax,
|
||||||
boolean reverse,
|
boolean reverse,
|
||||||
boolean smallRange) {
|
boolean smallRange) {
|
||||||
if (key == null) {
|
if (keyMin == null && keyMax == null) {
|
||||||
return getAllValues(snapshot, smallRange);
|
return getAllValues(snapshot, smallRange);
|
||||||
} else {
|
} else {
|
||||||
Mono<Send<LLRange>> boundedRangeMono = rangeMono.flatMap(fullRangeSend -> Mono.fromCallable(() -> {
|
Mono<Send<LLRange>> boundedRangeMono = rangeMono
|
||||||
try (var fullRange = fullRangeSend.receive()) {
|
.handle((fullRangeSend, sink) -> {
|
||||||
try (var keyWithoutExtBuf = keyPrefix == null ? alloc.allocate(keySuffixLength + keyExtLength)
|
try {
|
||||||
// todo: use a read-only copy
|
sink.next(getPatchedRange(fullRangeSend, keyMin, keyMax));
|
||||||
: keyPrefix.copy()) {
|
} catch (SerializationException e) {
|
||||||
keyWithoutExtBuf.ensureWritable(keySuffixLength + keyExtLength);
|
sink.error(e);
|
||||||
serializeSuffix(key, keyWithoutExtBuf);
|
|
||||||
if (reverse) {
|
|
||||||
return LLRange.of(fullRange.getMin(), keyWithoutExtBuf.send()).send();
|
|
||||||
} else {
|
|
||||||
return LLRange.of(keyWithoutExtBuf.send(), fullRange.getMax()).send();
|
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
}
|
|
||||||
}));
|
|
||||||
return getAllValues(snapshot, boundedRangeMono, reverse, smallRange);
|
return getAllValues(snapshot, boundedRangeMono, reverse, smallRange);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user