diff --git a/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java b/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java index a6e93cf..eaa1da2 100644 --- a/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java +++ b/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java @@ -1135,27 +1135,27 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { throw ex; } }, res -> Flux.generate(() -> { - if (!reverse) { - res.it.seekToFirst(); - } else { - res.it.seekToLast(); - } - return res.it; + if (!reverse) { + res.it.seekToFirst(); + } else { + res.it.seekToLast(); + } + return res.it; }, (it, sink) -> { - if (!it.isValid()) { - sink.complete(); - } else { - var calculatedKey = toMemorySegment(arena, it.key()); - var calculatedValue = res.col.schema().hasValue() ? toMemorySegment(it.value()) : MemorySegment.NULL; - //noinspection unchecked - sink.next((T) decodeKVNoBuckets(arena, res.col, calculatedKey, calculatedValue)); - if (!reverse) { - res.it.next(); - } else { - res.it.prev(); - } - } - return it; + if (!it.isValid()) { + sink.complete(); + } else { + var calculatedKey = toMemorySegment(arena, it.key()); + var calculatedValue = res.col.schema().hasValue() ? toMemorySegment(it.value()) : MemorySegment.NULL; + if (!reverse) { + res.it.next(); + } else { + res.it.prev(); + } + //noinspection unchecked + sink.next((T) decodeKVNoBuckets(arena, res.col, calculatedKey, calculatedValue)); + } + return it; }), Resources::close) .subscribeOn(Schedulers.boundedElastic()) .doFirst(ops::beginOp)