diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java index da0d6b4..7ca5330 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java @@ -74,79 +74,78 @@ public abstract class LLLocalReactiveRocksIterator extends } public final Flux flux() { - return Flux - .generate(() -> { - var readOptions = new ReadOptions(this.readOptions); - if (!range.hasMin() || !range.hasMax()) { - readOptions.setReadaheadSize(32 * 1024); // 32KiB - readOptions.setFillCache(false); + return Flux.generate(() -> { + var readOptions = new ReadOptions(this.readOptions); + if (!range.hasMin() || !range.hasMax()) { + readOptions.setReadaheadSize(32 * 1024); // 32KiB + readOptions.setFillCache(false); + } + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range)); + } + return getRocksIterator(db.getAllocator(), allowNettyDirect, readOptions, range.copy().send(), db); + }, (tuple, sink) -> { + try { + var rocksIterator = tuple.getT1(); + rocksIterator.status(); + if (rocksIterator.isValid()) { + Buffer key; + if (allowNettyDirect) { + key = LLUtils.readDirectNioBuffer(db.getAllocator(), rocksIterator::key); + } else { + key = LLUtils.fromByteArray(db.getAllocator(), rocksIterator.key()); } - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range)); - } - return getRocksIterator(db.getAllocator(), allowNettyDirect, readOptions, range.copy().send(), db); - }, (tuple, sink) -> { - try { - var rocksIterator = tuple.getT1(); - rocksIterator.status(); - if (rocksIterator.isValid()) { - Buffer key; + try (key) { + Buffer value; + if (readValues) { if (allowNettyDirect) { - key = LLUtils.readDirectNioBuffer(db.getAllocator(), rocksIterator::key); + value = LLUtils.readDirectNioBuffer(db.getAllocator(), rocksIterator::value); } else { - key = LLUtils.fromByteArray(db.getAllocator(), rocksIterator.key()); - } - try (key) { - Buffer value; - if (readValues) { - if (allowNettyDirect) { - value = LLUtils.readDirectNioBuffer(db.getAllocator(), rocksIterator::value); - } else { - value = LLUtils.fromByteArray(db.getAllocator(), rocksIterator.value()); - } - } else { - value = null; - } - - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, - "Range {} is reading {}: {}", - LLUtils.toStringSafe(range), - LLUtils.toStringSafe(key), - LLUtils.toStringSafe(value) - ); - } - - try { - rocksIterator.next(); - rocksIterator.status(); - sink.next(getEntry(key.send(), value == null ? null : value.send())); - } finally { - if (value != null) { - value.close(); - } - } + value = LLUtils.fromByteArray(db.getAllocator(), rocksIterator.value()); } } else { - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Range {} ended", LLUtils.toStringSafe(range)); - } - sink.complete(); + value = null; } - } catch (RocksDBException ex) { + if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Range {} failed", LLUtils.toStringSafe(range)); + logger.trace(MARKER_ROCKSDB, + "Range {} is reading {}: {}", + LLUtils.toStringSafe(range), + LLUtils.toStringSafe(key), + LLUtils.toStringSafe(value) + ); + } + + try { + rocksIterator.next(); + rocksIterator.status(); + sink.next(getEntry(key.send(), value == null ? null : value.send())); + } finally { + if (value != null) { + value.close(); + } } - sink.error(ex); } - return tuple; - }, tuple -> { - var rocksIterator = tuple.getT1(); - rocksIterator.close(); - tuple.getT2().close(); - tuple.getT3().close(); - tuple.getT4().close(); - }); + } else { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Range {} ended", LLUtils.toStringSafe(range)); + } + sink.complete(); + } + } catch (RocksDBException ex) { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Range {} failed", LLUtils.toStringSafe(range)); + } + sink.error(ex); + } + return tuple; + }, tuple -> { + var rocksIterator = tuple.getT1(); + rocksIterator.close(); + tuple.getT2().close(); + tuple.getT3().close(); + tuple.getT4().close(); + }); } public abstract T getEntry(@Nullable Send key, @Nullable Send value);