From bee2fe1bf5114b2959894f0ccfeafbfa6a3d7276 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Tue, 23 May 2023 01:05:03 +0200 Subject: [PATCH] Close iterators --- .../cavallium/dbengine/database/LLUtils.java | 1 - .../database/disk/LLLocalDictionary.java | 115 +++++------------ .../LLLocalGroupedReactiveRocksIterator.java | 22 +--- ...LLLocalKeyPrefixReactiveRocksIterator.java | 118 ++++++++---------- ...LLLocalMigrationReactiveRocksIterator.java | 29 ++--- .../disk/LLLocalReactiveRocksIterator.java | 22 +--- 6 files changed, 106 insertions(+), 201 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 570b4f0..3d25966 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -74,7 +74,6 @@ public class LLUtils { private static final byte[] RESPONSE_FALSE_BUF = new byte[]{0}; public static final byte[][] LEXICONOGRAPHIC_ITERATION_SEEKS = new byte[256][1]; public static final boolean MANUAL_READAHEAD = false; - public static final boolean ALLOW_STATIC_OPTIONS = false; public static final boolean FORCE_DISABLE_CHECKSUM_VERIFICATION = Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.checksum.disable.force", "false")); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 19789f0..474faf2 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -1,6 +1,5 @@ package it.cavallium.dbengine.database.disk; -import static it.cavallium.dbengine.database.LLUtils.ALLOW_STATIC_OPTIONS; import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB; import static it.cavallium.dbengine.database.LLUtils.isBoundedRange; import static it.cavallium.dbengine.database.LLUtils.mapList; @@ -32,7 +31,6 @@ import it.cavallium.dbengine.database.SerializedKey; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.disk.rocksdb.LLReadOptions; -import it.cavallium.dbengine.database.disk.rocksdb.LLSlice; import it.cavallium.dbengine.database.disk.rocksdb.LLWriteOptions; import it.cavallium.dbengine.database.serialization.KVSerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationFunction; @@ -57,12 +55,9 @@ import org.jetbrains.annotations.Nullable; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.CompactRangeOptions; import org.rocksdb.FlushOptions; -import org.rocksdb.ReadOptions; import org.rocksdb.RocksDBException; -import org.rocksdb.Slice; import org.rocksdb.Snapshot; import org.rocksdb.WriteBatch; -import org.rocksdb.WriteOptions; public class LLLocalDictionary implements LLDictionary { @@ -72,7 +67,6 @@ public class LLLocalDictionary implements LLDictionary { static final long MAX_WRITE_BATCH_SIZE = 1024L * 1024L * 1024L; // 1GiB static final int CAPPED_WRITE_BATCH_CAP = 50000; // 50K operations static final int MULTI_GET_WINDOW = 16; - private static final LLReadOptions EMPTY_READ_OPTIONS = LLUtils.ALLOW_STATIC_OPTIONS ? new LLReadOptions() : null; static final boolean PREFER_AUTO_SEEK_BOUND = false; /** * It used to be false, @@ -103,10 +97,6 @@ public class LLLocalDictionary implements LLDictionary { private static final boolean USE_NUM_ENTRIES_PRECISE_COUNTER = true; private static final byte[] FIRST_KEY = new byte[]{}; - /** - * 1KiB dummy buffer, write only, used for debugging purposes - */ - private static final ByteBuffer DUMMY_WRITE_ONLY_BYTE_BUFFER = ByteBuffer.allocateDirect(1024); private final RocksDBColumn db; private final ColumnFamilyHandle cfh; @@ -198,38 +188,16 @@ public class LLLocalDictionary implements LLDictionary { return columnName; } - @NotNull - private LLReadOptions generateReadOptionsOrStatic(LLSnapshot snapshot) { - var resolved = generateReadOptions(snapshot != null ? snapshotResolver.apply(snapshot) : null, true); - if (resolved != null) { - return resolved; - } else { - return new LLReadOptions(); - } - } - - @Nullable - private LLReadOptions generateReadOptionsOrNull(LLSnapshot snapshot) { - return generateReadOptions(snapshot != null ? snapshotResolver.apply(snapshot) : null, false); - } - @NotNull private LLReadOptions generateReadOptionsOrNew(LLSnapshot snapshot) { - var result = generateReadOptions(snapshot != null ? snapshotResolver.apply(snapshot) : null, false); - if (result != null) { - return result; - } else { - return new LLReadOptions(); - } + return generateReadOptions(snapshot != null ? snapshotResolver.apply(snapshot) : null); } - private LLReadOptions generateReadOptions(Snapshot snapshot, boolean orStaticOpts) { + private LLReadOptions generateReadOptions(Snapshot snapshot) { if (snapshot != null) { return new LLReadOptions().setSnapshot(snapshot); - } else if (ALLOW_STATIC_OPTIONS && orStaticOpts) { - return EMPTY_READ_OPTIONS; } else { - return null; + return new LLReadOptions(); } } @@ -241,18 +209,14 @@ public class LLLocalDictionary implements LLDictionary { private Buf getSync(LLSnapshot snapshot, Buf key) { logger.trace(MARKER_ROCKSDB, "Reading {}", () -> toStringSafe(key)); try { - var readOptions = generateReadOptionsOrStatic(snapshot); Buf result; startedGet.increment(); - try { + try (var readOptions = generateReadOptionsOrNew(snapshot)) { var initTime = System.nanoTime(); result = db.get(readOptions, key); getTime.record(Duration.ofNanos(System.nanoTime() - initTime)); } finally { endedGet.increment(); - if (readOptions != EMPTY_READ_OPTIONS) { - readOptions.close(); - } } logger.trace(MARKER_ROCKSDB, "Read {}: {}", () -> toStringSafe(key), () -> toStringSafe(result)); return result; @@ -272,7 +236,7 @@ public class LLLocalDictionary implements LLDictionary { } else { // Temporary resources to release after finished - try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNull(snapshot), + try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNew(snapshot), true, isBoundedRange(range), true @@ -304,13 +268,8 @@ public class LLLocalDictionary implements LLDictionary { startedContains.increment(); try { var result = containsTime.recordCallable(() -> { - var readOptions = generateReadOptionsOrStatic(snapshot); - try { + try (var readOptions = generateReadOptionsOrNew(snapshot)) { return db.exists(readOptions, key); - } finally { - if (readOptions != EMPTY_READ_OPTIONS) { - readOptions.close(); - } } }); assert result != null; @@ -375,15 +334,12 @@ public class LLLocalDictionary implements LLDictionary { }; UpdateAtomicResult result = null; try { - var readOptions = generateReadOptionsOrStatic(null); startedUpdates.increment(); - try (var writeOptions = new LLWriteOptions()) { + try (var readOptions = generateReadOptionsOrNew(null); + var writeOptions = new LLWriteOptions()) { result = updateTime.recordCallable(() -> db.updateAtomic(readOptions, writeOptions, key, updater, returnMode)); } finally { endedUpdates.increment(); - if (readOptions != EMPTY_READ_OPTIONS) { - readOptions.close(); - } } assert result != null; return switch (updateReturnMode) { @@ -410,18 +366,19 @@ public class LLLocalDictionary implements LLDictionary { + "safe atomic operations"); } - UpdateAtomicResultDelta result = null; + UpdateAtomicResultDelta result; try { - var readOptions = generateReadOptionsOrStatic(null); startedUpdates.increment(); - try (var writeOptions = new LLWriteOptions()) { - result = updateTime.recordCallable(() -> - (UpdateAtomicResultDelta) db.updateAtomic(readOptions, writeOptions, key, updater, DELTA)); + try (var readOptions = generateReadOptionsOrNew(null); + var writeOptions = new LLWriteOptions()) { + result = updateTime.recordCallable(() -> (UpdateAtomicResultDelta) db.updateAtomic(readOptions, + writeOptions, + key, + updater, + DELTA + )); } finally { endedUpdates.increment(); - if (readOptions != EMPTY_READ_OPTIONS) { - readOptions.close(); - } } assert result != null; return result.delta(); @@ -462,13 +419,8 @@ public class LLLocalDictionary implements LLDictionary { case PREVIOUS_VALUE -> { assert !LLUtils.isInNonBlockingThread() : "Called getPreviousData in a nonblocking thread"; Buf result; - var readOptions = generateReadOptionsOrStatic(null); - try { + try (var readOptions = generateReadOptionsOrNew(null)) { result = db.get(readOptions, key); - } finally { - if (readOptions != EMPTY_READ_OPTIONS) { - readOptions.close(); - } } logger.trace(MARKER_ROCKSDB, "Read {}: {}", () -> toStringSafe(key), () -> toStringSafe(result)); yield result; @@ -532,31 +484,24 @@ public class LLLocalDictionary implements LLDictionary { } ArrayList> mappedInputs; { - var readOptions = generateReadOptionsOrStatic(null); - try { + try (var readOptions = generateReadOptionsOrNew(null)) { var inputs = db.multiGetAsList(readOptions, mapList(keyBufsWindow, Buf::asArray)); mappedInputs = new ArrayList<>(inputs.size()); for (int i = 0; i < inputs.size(); i++) { var val = inputs.get(i); if (val != null) { inputs.set(i, null); - mappedInputs.add(new MappedInput<>( - entriesWindow.get(i).key(), + mappedInputs.add(new MappedInput<>(entriesWindow.get(i).key(), keyBufsWindow.get(i), OptionalBuf.of(Buf.wrap(val)) )); } else { - mappedInputs.add(new MappedInput<>( - entriesWindow.get(i).key(), + mappedInputs.add(new MappedInput<>(entriesWindow.get(i).key(), keyBufsWindow.get(i), OptionalBuf.empty() )); } } - } finally { - if (readOptions != EMPTY_READ_OPTIONS) { - readOptions.close(); - } } } var updatedValuesToWrite = new ArrayList(mappedInputs.size()); @@ -640,7 +585,7 @@ public class LLLocalDictionary implements LLDictionary { boolean smallRange) { return new LLLocalEntryReactiveRocksIterator(db, range, - () -> generateReadOptionsOrNull(snapshot), + () -> generateReadOptionsOrNew(snapshot), reverse, smallRange ).stream(); @@ -651,7 +596,7 @@ public class LLLocalDictionary implements LLDictionary { return new LLLocalGroupedEntryReactiveRocksIterator(db, prefixLength, range, - () -> generateReadOptionsOrNull(snapshot), + () -> generateReadOptionsOrNew(snapshot), smallRange ).stream(); } @@ -676,7 +621,7 @@ public class LLLocalDictionary implements LLDictionary { return new LLLocalGroupedKeyReactiveRocksIterator(db, prefixLength, range, - () -> generateReadOptionsOrNull(snapshot), + () -> generateReadOptionsOrNew(snapshot), smallRange ).stream(); } @@ -729,7 +674,7 @@ public class LLLocalDictionary implements LLDictionary { return new LLLocalKeyPrefixReactiveRocksIterator(db, prefixLength, range, - () -> generateReadOptionsOrNull(snapshot), + () -> generateReadOptionsOrNew(snapshot), true, smallRange ).stream(); @@ -753,7 +698,7 @@ public class LLLocalDictionary implements LLDictionary { boolean smallRange) { return new LLLocalKeyReactiveRocksIterator(db, range, - () -> generateReadOptionsOrNull(snapshot), + () -> generateReadOptionsOrNew(snapshot), reverse, smallRange ).stream(); @@ -950,7 +895,7 @@ public class LLLocalDictionary implements LLDictionary { if (range.isAll()) { return fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot); } else { - try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNull(snapshot), + try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNew(snapshot), false, isBoundedRange(range), false @@ -985,7 +930,7 @@ public class LLLocalDictionary implements LLDictionary { public LLEntry getOne(@Nullable LLSnapshot snapshot, LLRange range) { try { assert !LLUtils.isInNonBlockingThread() : "Called getOne in a nonblocking thread"; - try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNull(snapshot), true, true, true)) { + try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNew(snapshot), true, true, true)) { try (var rocksIterator = db.newIterator(readOpts, range.getMin(), range.getMax())) { if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) { rocksIterator.seekTo(range.getMin()); @@ -1010,7 +955,7 @@ public class LLLocalDictionary implements LLDictionary { public Buf getOneKey(@Nullable LLSnapshot snapshot, LLRange range) { try { assert !LLUtils.isInNonBlockingThread() : "Called getOneKey in a nonblocking thread"; - try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNull(snapshot), true, true, true)) { + try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNew(snapshot), true, true, true)) { try (var rocksIterator = db.newIterator(readOpts, range.getMin(), range.getMax())) { if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) { rocksIterator.seekTo(range.getMin()); @@ -1075,7 +1020,7 @@ public class LLLocalDictionary implements LLDictionary { if (LLUtils.isInNonBlockingThread()) { throw new UnsupportedOperationException("Called exactSizeAll in a nonblocking thread"); } - try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNull(snapshot), false, false, false)) { + try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNew(snapshot), false, false, false)) { if (LLUtils.MANUAL_READAHEAD) { readOpts.setReadaheadSize(128 * 1024); // 128KiB } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java index 6efe562..4c694c5 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java @@ -53,21 +53,12 @@ public abstract class LLLocalGroupedReactiveRocksIterator { } public final Stream> stream() { - var readOptions = generateCustomReadOptions(this.readOptions.get(), canFillCache, isBoundedRange(range), smallRange); - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range)); - } - - RocksIteratorObj rocksIterator; - try { - rocksIterator = db.newRocksIterator(readOptions, range, false); - } catch (RocksDBException e) { - readOptions.close(); - throw new DBException("Failed to iterate the range", e); - } - return StreamUtils.>streamWhileNonNull(() -> { - try { + try (var readOptions = generateCustomReadOptions(this.readOptions.get(), canFillCache, isBoundedRange(range), smallRange); + var rocksIterator = db.newRocksIterator(readOptions, range, false)) { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range)); + } ObjectArrayList values = new ObjectArrayList<>(); Buf firstGroupKey = null; while (rocksIterator.isValid()) { @@ -113,9 +104,6 @@ public abstract class LLLocalGroupedReactiveRocksIterator { } throw new CompletionException(new DBException("Range failed", ex)); } - }).onClose(() -> { - rocksIterator.close(); - readOptions.close(); }); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java index f463b96..85b78db 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java @@ -50,71 +50,61 @@ public class LLLocalKeyPrefixReactiveRocksIterator { public Stream stream() { - try { - var readOptions - = generateCustomReadOptions(this.readOptions.get(), canFillCache, isBoundedRange(range), smallRange); - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range)); - } - var rocksIterator = db.newRocksIterator(readOptions, range, false); - - return streamWhileNonNull(() -> { - try { - Buf firstGroupKey = null; - while (rocksIterator.isValid()) { - // Note that the underlying array is subject to changes! - Buf key = rocksIterator.keyBuf(); - var keyLen = key.size(); - if (keyLen >= prefixLength) { - if (firstGroupKey == null) { - firstGroupKey = key.copy(); - assert firstGroupKey == null || firstGroupKey.size() >= prefixLength; - } else if (!LLUtils.equals(firstGroupKey, - 0, - key, - 0, - prefixLength - )) { - break; - } - } else { - logger.error("Skipped a key with length {}, the expected minimum prefix key length is {}!" - + " This key will be dropped", key.size(), prefixLength); - } - rocksIterator.next(); - } - - if (firstGroupKey != null) { - var groupKeyPrefix = firstGroupKey.subList(0, prefixLength); - - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, - "Range {} is reading prefix {}", - LLUtils.toStringSafe(range), - LLUtils.toStringSafe(groupKeyPrefix) - ); - } - - return groupKeyPrefix; - } else { - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Range {} ended", LLUtils.toStringSafe(range)); - } - return null; - } - } catch (RocksDBException ex) { - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Range {} failed", LLUtils.toStringSafe(range)); - } - throw new CompletionException(new DBException("Range failed", ex)); + return streamWhileNonNull(() -> { + try (var readOptions = generateCustomReadOptions(this.readOptions.get(), canFillCache, isBoundedRange(range), smallRange); + var rocksIterator = db.newRocksIterator(readOptions, range, false)) { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range)); } - }).onClose(() -> { - rocksIterator.close(); - readOptions.close(); - }); - } catch (RocksDBException e) { - throw new DBException("Failed to open stream", e); - } + Buf firstGroupKey = null; + while (rocksIterator.isValid()) { + // Note that the underlying array is subject to changes! + Buf key = rocksIterator.keyBuf(); + var keyLen = key.size(); + if (keyLen >= prefixLength) { + if (firstGroupKey == null) { + firstGroupKey = key.copy(); + assert firstGroupKey == null || firstGroupKey.size() >= prefixLength; + } else if (!LLUtils.equals(firstGroupKey, + 0, + key, + 0, + prefixLength + )) { + break; + } + } else { + logger.error("Skipped a key with length {}, the expected minimum prefix key length is {}!" + + " This key will be dropped", key.size(), prefixLength); + } + rocksIterator.next(); + } + + if (firstGroupKey != null) { + var groupKeyPrefix = firstGroupKey.subList(0, prefixLength); + + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, + "Range {} is reading prefix {}", + LLUtils.toStringSafe(range), + LLUtils.toStringSafe(groupKeyPrefix) + ); + } + + return groupKeyPrefix; + } else { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Range {} ended", LLUtils.toStringSafe(range)); + } + return null; + } + } catch (RocksDBException ex) { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Range {} failed", LLUtils.toStringSafe(range)); + } + throw new CompletionException(new DBException("Range failed", ex)); + } + }); } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMigrationReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMigrationReactiveRocksIterator.java index 9d49aa1..6f3aa03 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMigrationReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMigrationReactiveRocksIterator.java @@ -34,25 +34,20 @@ public final class LLLocalMigrationReactiveRocksIterator { } public Stream stream() { - var readOptions = generateCustomReadOptions(this.readOptions.get(), false, false, false); - RocksIteratorObj rocksIterator; - try { - rocksIterator = db.newRocksIterator(readOptions, range, false); - } catch (RocksDBException e) { - throw new DBException("Failed to open iterator", e); - } return streamWhileNonNull(() -> { - if (rocksIterator.isValid()) { - var key = rocksIterator.keyBuf().copy(); - var value = rocksIterator.valueBuf().copy(); - rocksIterator.next(false); - return LLEntry.of(key, value); - } else { - return null; + try (var readOptions = generateCustomReadOptions(this.readOptions.get(), false, false, false); + var rocksIterator = db.newRocksIterator(readOptions, range, false)) { + if (rocksIterator.isValid()) { + var key = rocksIterator.keyBuf().copy(); + var value = rocksIterator.valueBuf().copy(); + rocksIterator.next(false); + return LLEntry.of(key, value); + } else { + return null; + } + } catch (RocksDBException e) { + throw new DBException("Failed to open iterator", e); } - }).onClose(() -> { - rocksIterator.close(); - readOptions.close(); }); } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java index 0a35247..4aabe74 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java @@ -48,21 +48,12 @@ public abstract class LLLocalReactiveRocksIterator { } public final Stream stream() { - var readOptions = generateCustomReadOptions(this.readOptions.get(), true, isBoundedRange(range), smallRange); - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range)); - } - - RocksIteratorObj rocksIterator; - try { - rocksIterator = db.newRocksIterator(readOptions, range, reverse); - } catch (RocksDBException e) { - readOptions.close(); - throw new DBException("Failed to iterate the range", e); - } - return streamWhileNonNull(() -> { - try { + try (var readOptions = generateCustomReadOptions(this.readOptions.get(), true, isBoundedRange(range), smallRange); + var rocksIterator = db.newRocksIterator(readOptions, range, reverse)) { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range)); + } if (rocksIterator.isValid()) { // Note that the underlying array is subject to changes! Buf key; @@ -102,9 +93,6 @@ public abstract class LLLocalReactiveRocksIterator { } throw new CompletionException(ex); } - }).onClose(() -> { - rocksIterator.close(); - readOptions.close(); }); }