diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index ff4913e..e1ea6a0 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -72,13 +72,22 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep Flux> getLeavesFrom(DatabaseMapDictionary databaseMapDictionary, CompositeSnapshot snapshot, - Mono key, + Mono keyMin, + Mono keyMax, boolean reverse, boolean smallRange) { - Mono> keyOptMono = key.map(Optional::of).defaultIfEmpty(Optional.empty()); + Mono> keyMinOptMono = keyMin.map(Optional::of).defaultIfEmpty(Optional.empty()); + Mono> keyMaxOptMono = keyMax.map(Optional::of).defaultIfEmpty(Optional.empty()); - return keyOptMono.flatMapMany(keyOpt -> { - if (keyOpt.isPresent()) { - return databaseMapDictionary.getAllValues(snapshot, keyOpt.get(), reverse, smallRange); + return Mono.zip(keyMinOptMono, keyMaxOptMono).flatMapMany(entry -> { + var keyMinOpt = entry.getT1(); + var keyMaxOpt = entry.getT2(); + if (keyMinOpt.isPresent() || keyMaxOpt.isPresent()) { + return databaseMapDictionary.getAllValues(snapshot, + keyMinOpt.orElse(null), + keyMaxOpt.orElse(null), + reverse, + smallRange + ); } else { return databaseMapDictionary.getAllValues(snapshot, smallRange); } @@ -87,15 +96,19 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep Flux getKeyLeavesFrom(DatabaseMapDictionary databaseMapDictionary, CompositeSnapshot snapshot, - Mono key, + Mono keyMin, + Mono keyMax, boolean reverse, boolean smallRange) { - Mono> keyOptMono = key.map(Optional::of).defaultIfEmpty(Optional.empty()); + Mono> keyMinOptMono = keyMin.map(Optional::of).defaultIfEmpty(Optional.empty()); + Mono> keyMaxOptMono = keyMax.map(Optional::of).defaultIfEmpty(Optional.empty()); - return keyOptMono.flatMapMany(keyOpt -> { + return Mono.zip(keyMinOptMono, keyMaxOptMono).flatMapMany(keys -> { + var keyMinOpt = keys.getT1(); + var keyMaxOpt = keys.getT2(); Flux>> stagesFlux; - if (keyOpt.isPresent()) { + if (keyMinOpt.isPresent() || keyMaxOpt.isPresent()) { stagesFlux = databaseMapDictionary - .getAllStages(snapshot, keyOpt.get(), reverse, smallRange); + .getAllStages(snapshot, keyMinOpt.orElse(null), keyMaxOpt.orElse(null), reverse, smallRange); } else { stagesFlux = databaseMapDictionary.getAllStages(snapshot, smallRange); } @@ -483,34 +496,63 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep getPatchedRange(@NotNull Send rangeSend, @Nullable T keyMin, @Nullable T keyMax) + throws SerializationException { + try (var range = rangeSend.receive()) { + try (Send keyMinBuf = serializeSuffixForRange(keyMin)) { + try (Send keyMaxBuf = serializeSuffixForRange(keyMax)) { + Send keyMinBufSend; + if (keyMinBuf == null) { + keyMinBufSend = range.getMin(); + } else { + keyMinBufSend = keyMinBuf; + } + Send keyMaxBufSend; + if (keyMaxBuf == null) { + keyMaxBufSend = range.getMax(); + } else { + keyMaxBufSend = keyMaxBuf; + } + return LLRange.of(keyMinBufSend, keyMaxBufSend).send(); + } + } + } + } + + private Send serializeSuffixForRange(@Nullable T key) throws SerializationException { + if (key == null) { + return null; + } + try (var keyWithoutExtBuf = keyPrefix == null ? alloc.allocate(keySuffixLength + keyExtLength) + // todo: use a read-only copy + : keyPrefix.copy()) { + keyWithoutExtBuf.ensureWritable(keySuffixLength + keyExtLength); + serializeSuffix(key, keyWithoutExtBuf); + return keyWithoutExtBuf.send(); + } + } + /** * Get all stages - * @param key from/to the specified key, if not null * @param reverse if true, the results will go backwards from the specified key (inclusive) * @param smallRange */ public Flux>> getAllStages(@Nullable CompositeSnapshot snapshot, - @Nullable T key, + @Nullable T keyMin, + @Nullable T keyMax, boolean reverse, boolean smallRange) { - if (key == null) { + if (keyMin == null && keyMax == null) { return getAllStages(snapshot, smallRange); } else { - Mono> boundedRangeMono = rangeMono.flatMap(fullRangeSend -> Mono.fromCallable(() -> { - try (var fullRange = fullRangeSend.receive()) { - try (var keyWithoutExtBuf = keyPrefix == null ? alloc.allocate(keySuffixLength + keyExtLength) - // todo: use a read-only copy - : keyPrefix.copy()) { - keyWithoutExtBuf.ensureWritable(keySuffixLength + keyExtLength); - serializeSuffix(key, keyWithoutExtBuf); - if (reverse) { - return LLRange.of(fullRange.getMin(), keyWithoutExtBuf.send()).send(); - } else { - return LLRange.of(keyWithoutExtBuf.send(), fullRange.getMax()).send(); + Mono> boundedRangeMono = rangeMono + .handle((fullRangeSend, sink) -> { + try { + sink.next(getPatchedRange(fullRangeSend, keyMin, keyMax)); + } catch (SerializationException e) { + sink.error(e); } - } - } - })); + }); return getAllStages(snapshot, boundedRangeMono, reverse, smallRange); } } @@ -546,32 +588,25 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> getAllValues(@Nullable CompositeSnapshot snapshot, - @Nullable T key, + @Nullable T keyMin, + @Nullable T keyMax, boolean reverse, boolean smallRange) { - if (key == null) { + if (keyMin == null && keyMax == null) { return getAllValues(snapshot, smallRange); } else { - Mono> boundedRangeMono = rangeMono.flatMap(fullRangeSend -> Mono.fromCallable(() -> { - try (var fullRange = fullRangeSend.receive()) { - try (var keyWithoutExtBuf = keyPrefix == null ? alloc.allocate(keySuffixLength + keyExtLength) - // todo: use a read-only copy - : keyPrefix.copy()) { - keyWithoutExtBuf.ensureWritable(keySuffixLength + keyExtLength); - serializeSuffix(key, keyWithoutExtBuf); - if (reverse) { - return LLRange.of(fullRange.getMin(), keyWithoutExtBuf.send()).send(); - } else { - return LLRange.of(keyWithoutExtBuf.send(), fullRange.getMax()).send(); + Mono> boundedRangeMono = rangeMono + .handle((fullRangeSend, sink) -> { + try { + sink.next(getPatchedRange(fullRangeSend, keyMin, keyMax)); + } catch (SerializationException e) { + sink.error(e); } - } - } - })); + }); return getAllValues(snapshot, boundedRangeMono, reverse, smallRange); } }