diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 323f98b..24511ea 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -723,7 +723,7 @@ public class LLUtils { /** * Generate a ReadOptions, with some parameters modified to help with bulk iterations - * @param readOptions the read options to start with + * @param readOptions the read options to start with, it will be modified * @param canFillCache true to fill the cache. If closedRange is false, this field will be ignored * @param boundedRange true if the range is bounded from both sides * @param smallRange true if the range is small diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 68b1645..fd29a9f 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -222,6 +222,16 @@ public class LLLocalDictionary implements LLDictionary { return generateReadOptions(snapshot != null ? snapshotResolver.apply(snapshot) : null, false); } + @NotNull + private ReadOptions generateReadOptionsOrNew(LLSnapshot snapshot) { + var result = generateReadOptions(snapshot != null ? snapshotResolver.apply(snapshot) : null, false); + if (result != null) { + return result; + } else { + return new ReadOptions(); + } + } + private ReadOptions generateReadOptions(Snapshot snapshot, boolean orStaticOpts) { if (snapshot != null) { return new ReadOptions().setSnapshot(snapshot); @@ -589,7 +599,7 @@ public class LLLocalDictionary implements LLDictionary { var readOptions = generateReadOptionsOrStatic(snapshot); try { List results = db.multiGetAsList(readOptions, LLUtils.toArray(keyBufsWindow)); - mappedResults = new ArrayList>(results.size()); + mappedResults = new ArrayList<>(results.size()); for (int i = 0; i < results.size(); i++) { byte[] val = results.get(i); Optional valueOpt; @@ -832,8 +842,8 @@ public class LLLocalDictionary implements LLDictionary { boolean reverse, boolean smallRange) { Mono iteratorMono = rangeMono.map(rangeSend -> { - ReadOptions resolvedSnapshot = generateReadOptionsOrNull(snapshot); - return new LLLocalEntryReactiveRocksIterator(db, rangeSend, nettyDirect, resolvedSnapshot, reverse, smallRange); + ReadOptions readOptions = generateReadOptionsOrNull(snapshot); + return new LLLocalEntryReactiveRocksIterator(db, rangeSend, nettyDirect, readOptions, reverse, smallRange); }); return Flux.usingWhen(iteratorMono, iterator -> iterator.flux().subscribeOn(dbRScheduler, false), @@ -844,12 +854,12 @@ public class LLLocalDictionary implements LLDictionary { private Flux>> getRangeMultiGrouped(LLSnapshot snapshot, Mono> rangeMono, int prefixLength, boolean smallRange) { Mono iteratorMono = rangeMono.map(rangeSend -> { - ReadOptions resolvedSnapshot = generateReadOptionsOrNull(snapshot); + ReadOptions readOptions = generateReadOptionsOrNull(snapshot); return new LLLocalGroupedEntryReactiveRocksIterator(db, prefixLength, rangeSend, nettyDirect, - resolvedSnapshot, + readOptions, smallRange ); }); @@ -882,12 +892,12 @@ public class LLLocalDictionary implements LLDictionary { int prefixLength, boolean smallRange) { Mono iteratorMono = rangeMono.map(rangeSend -> { - ReadOptions resolvedSnapshot = generateReadOptionsOrNull(snapshot); + ReadOptions readOptions = generateReadOptionsOrNull(snapshot); return new LLLocalGroupedKeyReactiveRocksIterator(db, prefixLength, rangeSend, nettyDirect, - resolvedSnapshot, + readOptions, smallRange ); }); @@ -943,8 +953,13 @@ public class LLLocalDictionary implements LLDictionary { public Flux> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, Mono> rangeMono, int prefixLength, boolean smallRange) { Mono iteratorMono = rangeMono.map(range -> { - ReadOptions resolvedSnapshot = generateReadOptionsOrNull(snapshot); - return new LLLocalKeyPrefixReactiveRocksIterator(db, prefixLength, range, nettyDirect, resolvedSnapshot, true, + ReadOptions readOptions = generateReadOptionsOrNull(snapshot); + return new LLLocalKeyPrefixReactiveRocksIterator(db, + prefixLength, + range, + nettyDirect, + readOptions, + true, smallRange ); }); @@ -976,8 +991,8 @@ public class LLLocalDictionary implements LLDictionary { boolean reverse, boolean smallRange) { Mono iteratorMono = rangeMono.map(range -> { - ReadOptions resolvedSnapshot = generateReadOptionsOrNull(snapshot); - return new LLLocalKeyReactiveRocksIterator(db, range, nettyDirect, resolvedSnapshot, reverse, smallRange); + ReadOptions readOptions = generateReadOptionsOrNull(snapshot); + return new LLLocalKeyReactiveRocksIterator(db, range, nettyDirect, readOptions, reverse, smallRange); }); return Flux.usingWhen(iteratorMono, iterator -> iterator.flux().subscribeOn(dbRScheduler, false), @@ -1169,8 +1184,7 @@ public class LLLocalDictionary implements LLDictionary { private void deleteSmallRangeWriteBatch(CappedWriteBatch writeBatch, Send rangeToReceive) throws RocksDBException { var range = rangeToReceive.receive(); - var readOpts = generateReadOptionsOrStatic(null); - try { + try (var readOpts = generateReadOptionsOrNew(null)) { readOpts.setFillCache(false); ReleasableSlice minBound; if (range.hasMin()) { @@ -1212,17 +1226,13 @@ public class LLLocalDictionary implements LLDictionary { } catch (Throwable e) { range.close(); throw e; - } finally { - if (readOpts != EMPTY_READ_OPTIONS) { - readOpts.close(); - } } } private void deleteSmallRangeWriteBatch(WriteBatch writeBatch, Send rangeToReceive) throws RocksDBException { try (var range = rangeToReceive.receive()) { - try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNull(null), true, isBoundedRange(range), true)) { + try (var readOpts = LLUtils.generateCustomReadOptions(null, true, isBoundedRange(range), true)) { readOpts.setFillCache(false); ReleasableSlice minBound; if (range.hasMin()) { @@ -1286,7 +1296,7 @@ public class LLLocalDictionary implements LLDictionary { .fromCallable(() -> { assert !Schedulers.isInNonBlockingThread() : "Called clear in a nonblocking thread"; boolean shouldCompactLater = false; - try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNull(null), false, false, false)) { + try (var readOpts = LLUtils.generateCustomReadOptions(null, false, false, false)) { readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); // readOpts.setIgnoreRangeDeletions(true); @@ -1331,15 +1341,18 @@ public class LLLocalDictionary implements LLDictionary { // Compact range db.suggestCompactRange(); if (lastDeletedKey != null) { - db.compactRange(firstDeletedKey, lastDeletedKey, new CompactRangeOptions() + try (var cro = new CompactRangeOptions() .setAllowWriteStall(false) .setExclusiveManualCompaction(false) - .setChangeLevel(false) - ); + .setChangeLevel(false)) { + db.compactRange(firstDeletedKey, lastDeletedKey, cro); + } } } - db.flush(new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true)); + try (var fo = new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true)) { + db.flush(fo); + } db.flushWal(true); } return null; @@ -1358,7 +1371,11 @@ public class LLLocalDictionary implements LLDictionary { if (range.isAll()) { sink.next(fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot)); } else { - try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNull(snapshot), false, isBoundedRange(range), false)) { + try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNull(snapshot), + false, + isBoundedRange(range), + false + )) { readOpts.setFillCache(false); readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); ReleasableSlice minBound; @@ -1521,11 +1538,7 @@ public class LLLocalDictionary implements LLDictionary { } private long fastSizeAll(@Nullable LLSnapshot snapshot) throws RocksDBException { - var rocksdbSnapshot = generateReadOptionsOrNull(snapshot); - if (rocksdbSnapshot == null) { - rocksdbSnapshot = new ReadOptions(); - } - try { + try (var rocksdbSnapshot = generateReadOptionsOrNew(snapshot)) { if (USE_CURRENT_FASTSIZE_FOR_OLD_SNAPSHOTS || rocksdbSnapshot.snapshot() == null) { try { return db.getLongProperty("rocksdb.estimate-num-keys"); @@ -1550,8 +1563,6 @@ public class LLLocalDictionary implements LLDictionary { return count; } } - } finally { - rocksdbSnapshot.close(); } } @@ -1641,8 +1652,7 @@ public class LLLocalDictionary implements LLDictionary { return rangeMono.publishOn(dbWScheduler).handle((rangeSend, sink) -> { try (var range = rangeSend.receive()) { assert !Schedulers.isInNonBlockingThread() : "Called removeOne in a nonblocking thread"; - var readOpts = generateReadOptionsOrStatic(null); - try { + try (var readOpts = new ReadOptions()) { ReleasableSlice minBound; if (range.hasMin()) { minBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe()); @@ -1684,10 +1694,6 @@ public class LLLocalDictionary implements LLDictionary { } finally { minBound.close(); } - } finally { - if (readOpts != EMPTY_READ_OPTIONS) { - readOpts.close(); - } } } catch (RocksDBException ex) { sink.error(ex); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java index 42fa1f4..3a8158c 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java @@ -79,7 +79,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator extends this.prefixLength = prefixLength; this.range = range.receive(); this.allowNettyDirect = allowNettyDirect; - this.readOptions = readOptions; + this.readOptions = readOptions != null ? readOptions : new ReadOptions(); this.canFillCache = canFillCache; this.readValues = readValues; this.smallRange = smallRange; @@ -162,7 +162,6 @@ public abstract class LLLocalGroupedReactiveRocksIterator extends }, t -> { t.getT2().close(); t.getT1().close(); - this.close(); }); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java index b8fbf55..4cecb0a 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java @@ -73,7 +73,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends this.prefixLength = prefixLength; this.rangeShared = range.receive(); this.allowNettyDirect = allowNettyDirect; - this.readOptions = readOptions; + this.readOptions = readOptions != null ? readOptions : new ReadOptions(); this.canFillCache = canFillCache; this.smallRange = smallRange; } @@ -153,7 +153,6 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends }, t -> { t.getT2().close(); t.getT1().close(); - this.close(); }); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index c63b92b..aed34e5 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -783,10 +783,12 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { try { // Range rangeToCompact = db.suggestCompactRange(cfh); logger.info("Compacting range {}", r); - db.compactRange(cfh, null, null, new CompactRangeOptions() + try (var cro = new CompactRangeOptions() .setAllowWriteStall(true) .setExclusiveManualCompaction(true) - .setChangeLevel(false)); + .setChangeLevel(false)) { + db.compactRange(cfh, null, null, cro); + } } catch (RocksDBException e) { if ("Database shutdown".equalsIgnoreCase(e.getMessage())) { logger.warn("Compaction cancelled: database shutdown"); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java index a16c397..b6aeb5f 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java @@ -148,7 +148,6 @@ public abstract class LLLocalReactiveRocksIterator extends }, t -> { t.getT2().close(); t.getT1().close(); - this.close(); }); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java index 2209195..97a93b7 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java @@ -90,7 +90,15 @@ public class LLLocalSingleton implements LLSingleton { public Mono> get(@Nullable LLSnapshot snapshot) { return nameMono.publishOn(dbRScheduler).handle((nameSend, sink) -> { try (Buffer name = nameSend.receive()) { - Buffer result = db.get(generateReadOptions(snapshot, true), name); + Buffer result; + var readOptions = generateReadOptions(snapshot, true); + try { + result = db.get(readOptions, name); + } finally { + if (readOptions != EMPTY_READ_OPTIONS) { + readOptions.close(); + } + } if (result != null) { sink.next(result.send()); } else {