From 99e101914dc7b2650ac93783535d323e19296749 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Tue, 10 May 2022 16:57:41 +0200 Subject: [PATCH] Fix some possible leaks --- .../cavallium/dbengine/database/LLUtils.java | 11 +- .../database/disk/AbstractRocksDBColumn.java | 184 ++++++++---------- .../database/disk/LLLocalDictionary.java | 177 ++++++++++------- .../LLLocalGroupedReactiveRocksIterator.java | 20 +- ...LLLocalKeyPrefixReactiveRocksIterator.java | 20 +- ...LLLocalMigrationReactiveRocksIterator.java | 16 +- .../disk/LLLocalReactiveRocksIterator.java | 15 +- .../database/disk/LLLocalSingleton.java | 14 +- .../database/disk/RocksIteratorTuple.java | 17 +- .../disk/UnmodifiableReadOptions.java | 102 ---------- .../disk/UnmodifiableWriteOptions.java | 48 ----- .../disk/UnreleasableReadOptions.java | 19 -- .../disk/UnreleasableWriteOptions.java | 19 -- .../dbengine/lucene/HugePqArray.java | 16 +- .../dbengine/lucene/HugePqPriorityQueue.java | 24 +-- .../dbengine/lucene/LLSlotDocCodec.java | 6 +- 16 files changed, 264 insertions(+), 444 deletions(-) delete mode 100644 src/main/java/it/cavallium/dbengine/database/disk/UnmodifiableReadOptions.java delete mode 100644 src/main/java/it/cavallium/dbengine/database/disk/UnmodifiableWriteOptions.java delete mode 100644 src/main/java/it/cavallium/dbengine/database/disk/UnreleasableReadOptions.java delete mode 100644 src/main/java/it/cavallium/dbengine/database/disk/UnreleasableWriteOptions.java diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 23f2072..323f98b 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -722,20 +722,19 @@ public class LLUtils { } /** - * Generate a copy of the passed ReadOptions, with some parameters modified to help with bulk iterations - * @param readOptions the read options to copy + * Generate a ReadOptions, with some parameters modified to help with bulk iterations + * @param readOptions the read options to start with * @param canFillCache true to fill the cache. If closedRange is false, this field will be ignored * @param boundedRange true if the range is bounded from both sides * @param smallRange true if the range is small - * @return a new instance of ReadOptions + * @return the passed instance of ReadOptions, or a new one if the passed readOptions is null */ public static ReadOptions generateCustomReadOptions(@Nullable ReadOptions readOptions, boolean canFillCache, boolean boundedRange, boolean smallRange) { - if (readOptions != null) { - readOptions = new ReadOptions(readOptions); - } else { + if (readOptions == null) { + //noinspection resource readOptions = new ReadOptions(); } if (boundedRange || smallRange) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java index 94e301a..77ab37f 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java @@ -321,7 +321,7 @@ public sealed abstract class AbstractRocksDBColumn implements rocksIterator.seekToFirst(); } } - return new RocksIteratorTuple(List.of(readOptions), rocksIterator, sliceMin, sliceMax, seekFromOrTo); + return new RocksIteratorTuple(rocksIterator, sliceMin, sliceMax, seekFromOrTo); } catch (Throwable ex) { rocksIterator.close(); throw ex; @@ -463,39 +463,33 @@ public sealed abstract class AbstractRocksDBColumn implements } } } else { - try { - byte[] keyArray = LLUtils.toArray(key); - requireNonNull(keyArray); - Holder data = new Holder<>(); - if (db.keyMayExist(cfh, readOptions, keyArray, data)) { - // todo: "data.getValue().length > 0" is checked because keyMayExist is broken, and sometimes it - // returns an empty array, as if it exists - if (data.getValue() != null && data.getValue().length > 0) { - readValueFoundWithBloomCacheBufferSize.record(data.getValue().length); - return LLUtils.fromByteArray(alloc, data.getValue()); - } else { - readAttemptsCount++; - byte[] result = db.get(cfh, readOptions, keyArray); - if (result == null) { - if (data.getValue() != null) { - readValueNotFoundWithBloomBufferSize.record(0); - } else { - readValueNotFoundWithMayExistBloomBufferSize.record(0); - } - return null; - } else { - readValueFoundWithBloomUncachedBufferSize.record(0); - return LLUtils.fromByteArray(alloc, result); - } - } + byte[] keyArray = LLUtils.toArray(key); + requireNonNull(keyArray); + Holder data = new Holder<>(); + if (db.keyMayExist(cfh, readOptions, keyArray, data)) { + // todo: "data.getValue().length > 0" is checked because keyMayExist is broken, and sometimes it + // returns an empty array, as if it exists + if (data.getValue() != null && data.getValue().length > 0) { + readValueFoundWithBloomCacheBufferSize.record(data.getValue().length); + return LLUtils.fromByteArray(alloc, data.getValue()); } else { - readValueNotFoundWithBloomBufferSize.record(0); - return null; - } - } finally { - if (!(readOptions instanceof UnreleasableReadOptions)) { - readOptions.close(); + readAttemptsCount++; + byte[] result = db.get(cfh, readOptions, keyArray); + if (result == null) { + if (data.getValue() != null) { + readValueNotFoundWithBloomBufferSize.record(0); + } else { + readValueNotFoundWithMayExistBloomBufferSize.record(0); + } + return null; + } else { + readValueFoundWithBloomUncachedBufferSize.record(0); + return LLUtils.fromByteArray(alloc, result); + } } + } else { + readValueNotFoundWithBloomBufferSize.record(0); + return null; } } } finally { @@ -510,69 +504,63 @@ public sealed abstract class AbstractRocksDBColumn implements public void put(@NotNull WriteOptions writeOptions, Buffer key, Buffer value) throws RocksDBException { var closeReadLock = closeLock.readLock(); try { - try { - ensureOpen(); - ensureOwned(writeOptions); - assert key.isAccessible(); - assert value.isAccessible(); - this.keyBufferSize.record(key.readableBytes()); - this.writeValueBufferSize.record(value.readableBytes()); - if (nettyDirect) { - // Get the key nio buffer to pass to RocksDB - ByteBuffer keyNioBuffer; - boolean mustCloseKey; + ensureOpen(); + ensureOwned(writeOptions); + assert key.isAccessible(); + assert value.isAccessible(); + this.keyBufferSize.record(key.readableBytes()); + this.writeValueBufferSize.record(value.readableBytes()); + if (nettyDirect) { + // Get the key nio buffer to pass to RocksDB + ByteBuffer keyNioBuffer; + boolean mustCloseKey; + { + if (!LLUtils.isReadOnlyDirect(key)) { + // If the nio buffer is not available, copy the netty buffer into a new direct buffer + mustCloseKey = true; + var directKey = DefaultBufferAllocators.offHeapAllocator().allocate(key.readableBytes()); + key.copyInto(key.readerOffset(), directKey, 0, key.readableBytes()); + key = directKey; + } else { + mustCloseKey = false; + } + keyNioBuffer = ((ReadableComponent) key).readableBuffer(); + assert keyNioBuffer.isDirect(); + assert keyNioBuffer.limit() == key.readableBytes(); + } + try { + // Get the value nio buffer to pass to RocksDB + ByteBuffer valueNioBuffer; + boolean mustCloseValue; { - if (!LLUtils.isReadOnlyDirect(key)) { + if (!LLUtils.isReadOnlyDirect(value)) { // If the nio buffer is not available, copy the netty buffer into a new direct buffer - mustCloseKey = true; - var directKey = DefaultBufferAllocators.offHeapAllocator().allocate(key.readableBytes()); - key.copyInto(key.readerOffset(), directKey, 0, key.readableBytes()); - key = directKey; + mustCloseValue = true; + var directValue = DefaultBufferAllocators.offHeapAllocator().allocate(value.readableBytes()); + value.copyInto(value.readerOffset(), directValue, 0, value.readableBytes()); + value = directValue; } else { - mustCloseKey = false; + mustCloseValue = false; } - keyNioBuffer = ((ReadableComponent) key).readableBuffer(); - assert keyNioBuffer.isDirect(); - assert keyNioBuffer.limit() == key.readableBytes(); + valueNioBuffer = ((ReadableComponent) value).readableBuffer(); + assert valueNioBuffer.isDirect(); + assert valueNioBuffer.limit() == value.readableBytes(); } - try { - // Get the value nio buffer to pass to RocksDB - ByteBuffer valueNioBuffer; - boolean mustCloseValue; - { - if (!LLUtils.isReadOnlyDirect(value)) { - // If the nio buffer is not available, copy the netty buffer into a new direct buffer - mustCloseValue = true; - var directValue = DefaultBufferAllocators.offHeapAllocator().allocate(value.readableBytes()); - value.copyInto(value.readerOffset(), directValue, 0, value.readableBytes()); - value = directValue; - } else { - mustCloseValue = false; - } - valueNioBuffer = ((ReadableComponent) value).readableBuffer(); - assert valueNioBuffer.isDirect(); - assert valueNioBuffer.limit() == value.readableBytes(); - } - try { - db.put(cfh, writeOptions, keyNioBuffer, valueNioBuffer); - } finally { - if (mustCloseValue) { - value.close(); - } - } + try { + db.put(cfh, writeOptions, keyNioBuffer, valueNioBuffer); } finally { - if (mustCloseKey) { - key.close(); + if (mustCloseValue) { + value.close(); } } - } else { - db.put(cfh, writeOptions, LLUtils.toArray(key), LLUtils.toArray(value)); - } - } finally { - if (!(writeOptions instanceof UnreleasableWriteOptions)) { - writeOptions.close(); + } finally { + if (mustCloseKey) { + key.close(); + } } + } else { + db.put(cfh, writeOptions, LLUtils.toArray(key), LLUtils.toArray(value)); } } finally { closeLock.unlockRead(closeReadLock); @@ -628,18 +616,12 @@ public sealed abstract class AbstractRocksDBColumn implements byte[] keyBytes = LLUtils.toArray(key); Holder data = new Holder<>(); boolean mayExistHit = false; - try { - if (db.keyMayExist(cfh, readOptions, keyBytes, data)) { - mayExistHit = true; - if (data.getValue() != null) { - size = data.getValue().length; - } else { - size = db.get(cfh, readOptions, keyBytes, NO_DATA); - } - } - } finally { - if (!(readOptions instanceof UnreleasableReadOptions)) { - readOptions.close(); + if (db.keyMayExist(cfh, readOptions, keyBytes, data)) { + mayExistHit = true; + if (data.getValue() != null) { + size = data.getValue().length; + } else { + size = db.get(cfh, readOptions, keyBytes, NO_DATA); } } boolean found = size != RocksDB.NOT_FOUND; @@ -692,13 +674,7 @@ public sealed abstract class AbstractRocksDBColumn implements } } else { byte[] keyBytes = LLUtils.toArray(key); - try { - return db.keyMayExist(cfh, readOptions, keyBytes, null); - } finally { - if (!(readOptions instanceof UnreleasableReadOptions)) { - readOptions.close(); - } - } + return db.keyMayExist(cfh, readOptions, keyBytes, null); } } finally { closeLock.unlockRead(closeReadLock); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 6eefe00..68b1645 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -75,9 +75,9 @@ public class LLLocalDictionary implements LLDictionary { static final long MAX_WRITE_BATCH_SIZE = 1024L * 1024L * 1024L; // 1GiB static final int CAPPED_WRITE_BATCH_CAP = 50000; // 50K operations static final int MULTI_GET_WINDOW = 16; - static final ReadOptions EMPTY_READ_OPTIONS = new UnreleasableReadOptions(new UnmodifiableReadOptions()); - static final WriteOptions EMPTY_WRITE_OPTIONS = new UnreleasableWriteOptions(new UnmodifiableWriteOptions()); - static final WriteOptions BATCH_WRITE_OPTIONS = new UnreleasableWriteOptions(new UnmodifiableWriteOptions()); + private static final ReadOptions EMPTY_READ_OPTIONS = new ReadOptions(); + private static final WriteOptions EMPTY_WRITE_OPTIONS = new WriteOptions(); + private static final WriteOptions BATCH_WRITE_OPTIONS = new WriteOptions(); static final boolean PREFER_AUTO_SEEK_BOUND = false; /** * It used to be false, @@ -212,27 +212,23 @@ public class LLLocalDictionary implements LLDictionary { return columnName; } - /** - * Please don't modify the returned ReadOptions! - * If you want to modify it, wrap it into a new ReadOptions! - */ - private ReadOptions resolveSnapshot(LLSnapshot snapshot) { - if (snapshot != null) { - return getReadOptions(snapshotResolver.apply(snapshot)); - } else { - return EMPTY_READ_OPTIONS; - } + @NotNull + private ReadOptions generateReadOptionsOrStatic(LLSnapshot snapshot) { + return generateReadOptions(snapshot != null ? snapshotResolver.apply(snapshot) : null, true); } - /** - * Please don't modify the returned ReadOptions! - * If you want to modify it, wrap it into a new ReadOptions! - */ - private ReadOptions getReadOptions(Snapshot snapshot) { + @Nullable + private ReadOptions generateReadOptionsOrNull(LLSnapshot snapshot) { + return generateReadOptions(snapshot != null ? snapshotResolver.apply(snapshot) : null, false); + } + + private ReadOptions generateReadOptions(Snapshot snapshot, boolean orStaticOpts) { if (snapshot != null) { - return LLUtils.generateCustomReadOptions(null, true, true, true).setSnapshot(snapshot); - } else { + return new ReadOptions().setSnapshot(snapshot); + } else if (orStaticOpts) { return EMPTY_READ_OPTIONS; + } else { + return null; } } @@ -253,13 +249,16 @@ public class LLLocalDictionary implements LLDictionary { try (var key = keySend.receive()) { logger.trace(MARKER_ROCKSDB, "Reading {}", () -> toStringSafe(key)); try { - var readOptions = requireNonNullElse(resolveSnapshot(snapshot), EMPTY_READ_OPTIONS); + var readOptions = generateReadOptionsOrStatic(snapshot); Buffer result; startedGet.increment(); try { result = getTime.recordCallable(() -> db.get(readOptions, key)); } finally { endedGet.increment(); + if (readOptions != EMPTY_READ_OPTIONS) { + readOptions.close(); + } } logger.trace(MARKER_ROCKSDB, "Read {}: {}", () -> toStringSafe(key), () -> toStringSafe(result)); if (result != null) { @@ -291,7 +290,7 @@ public class LLLocalDictionary implements LLDictionary { AbstractSlice slice1 = null; AbstractSlice slice2 = null; - try (var readOpts = LLUtils.generateCustomReadOptions(resolveSnapshot(snapshot), + try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNull(snapshot), true, isBoundedRange(range), true @@ -360,8 +359,14 @@ public class LLLocalDictionary implements LLDictionary { startedContains.increment(); try { var result = containsTime.recordCallable(() -> { - var unmodifiableReadOpts = resolveSnapshot(snapshot); - return db.exists(unmodifiableReadOpts, key); + var readOptions = generateReadOptionsOrStatic(snapshot); + try { + return db.exists(readOptions, key); + } finally { + if (readOptions != EMPTY_READ_OPTIONS) { + readOptions.close(); + } + } }); assert result != null; return result; @@ -580,21 +585,28 @@ public class LLLocalDictionary implements LLDictionary { } try { assert !Schedulers.isInNonBlockingThread() : "Called getMulti in a nonblocking thread"; - var readOptions = Objects.requireNonNullElse(resolveSnapshot(snapshot), EMPTY_READ_OPTIONS); - List results = db.multiGetAsList(readOptions, LLUtils.toArray(keyBufsWindow)); - var mappedResults = new ArrayList>(results.size()); - for (int i = 0; i < results.size(); i++) { - byte[] val = results.get(i); - Optional valueOpt; - if (val != null) { - // free memory - results.set(i, null); + ArrayList> mappedResults; + var readOptions = generateReadOptionsOrStatic(snapshot); + try { + List results = db.multiGetAsList(readOptions, LLUtils.toArray(keyBufsWindow)); + mappedResults = new ArrayList>(results.size()); + for (int i = 0; i < results.size(); i++) { + byte[] val = results.get(i); + Optional valueOpt; + if (val != null) { + // free memory + results.set(i, null); - valueOpt = Optional.of(LLUtils.fromByteArray(alloc, val)); - } else { - valueOpt = Optional.empty(); + valueOpt = Optional.of(LLUtils.fromByteArray(alloc, val)); + } else { + valueOpt = Optional.empty(); + } + mappedResults.add(valueOpt); + } + } finally { + if (readOptions != EMPTY_READ_OPTIONS) { + readOptions.close(); } - mappedResults.add(valueOpt); } sink.next(mappedResults); } catch (RocksDBException ex) { @@ -680,24 +692,30 @@ public class LLLocalDictionary implements LLDictionary { } ArrayList, Optional>>> mappedInputs; { - var readOptions = Objects.requireNonNullElse(resolveSnapshot(null), EMPTY_READ_OPTIONS); - var inputs = db.multiGetAsList(readOptions, LLUtils.toArray(keyBufsWindow)); - mappedInputs = new ArrayList<>(inputs.size()); - for (int i = 0; i < inputs.size(); i++) { - var val = inputs.get(i); - if (val != null) { - inputs.set(i, null); - mappedInputs.add(Tuples.of( - entriesWindow.get(i).getT1(), - keyBufsWindow.get(i).send(), - Optional.of(fromByteArray(alloc, val).send()) - )); - } else { - mappedInputs.add(Tuples.of( - entriesWindow.get(i).getT1(), - keyBufsWindow.get(i).send(), - Optional.empty() - )); + var readOptions = generateReadOptionsOrStatic(null); + try { + var inputs = db.multiGetAsList(readOptions, LLUtils.toArray(keyBufsWindow)); + mappedInputs = new ArrayList<>(inputs.size()); + for (int i = 0; i < inputs.size(); i++) { + var val = inputs.get(i); + if (val != null) { + inputs.set(i, null); + mappedInputs.add(Tuples.of( + entriesWindow.get(i).getT1(), + keyBufsWindow.get(i).send(), + Optional.of(fromByteArray(alloc, val).send()) + )); + } else { + mappedInputs.add(Tuples.of( + entriesWindow.get(i).getT1(), + keyBufsWindow.get(i).send(), + Optional.empty() + )); + } + } + } finally { + if (readOptions != EMPTY_READ_OPTIONS) { + readOptions.close(); } } } @@ -814,7 +832,7 @@ public class LLLocalDictionary implements LLDictionary { boolean reverse, boolean smallRange) { Mono iteratorMono = rangeMono.map(rangeSend -> { - ReadOptions resolvedSnapshot = resolveSnapshot(snapshot); + ReadOptions resolvedSnapshot = generateReadOptionsOrNull(snapshot); return new LLLocalEntryReactiveRocksIterator(db, rangeSend, nettyDirect, resolvedSnapshot, reverse, smallRange); }); return Flux.usingWhen(iteratorMono, @@ -826,7 +844,7 @@ public class LLLocalDictionary implements LLDictionary { private Flux>> getRangeMultiGrouped(LLSnapshot snapshot, Mono> rangeMono, int prefixLength, boolean smallRange) { Mono iteratorMono = rangeMono.map(rangeSend -> { - ReadOptions resolvedSnapshot = resolveSnapshot(snapshot); + ReadOptions resolvedSnapshot = generateReadOptionsOrNull(snapshot); return new LLLocalGroupedEntryReactiveRocksIterator(db, prefixLength, rangeSend, @@ -864,7 +882,7 @@ public class LLLocalDictionary implements LLDictionary { int prefixLength, boolean smallRange) { Mono iteratorMono = rangeMono.map(rangeSend -> { - ReadOptions resolvedSnapshot = resolveSnapshot(snapshot); + ReadOptions resolvedSnapshot = generateReadOptionsOrNull(snapshot); return new LLLocalGroupedKeyReactiveRocksIterator(db, prefixLength, rangeSend, @@ -886,7 +904,7 @@ public class LLLocalDictionary implements LLDictionary { .create(sink -> { var range = rangeSend.receive(); sink.onDispose(range::close); - try (var ro = LLUtils.generateCustomReadOptions(getReadOptions(null), + try (var ro = LLUtils.generateCustomReadOptions(null, false, isBoundedRange(range), false @@ -925,7 +943,7 @@ public class LLLocalDictionary implements LLDictionary { public Flux> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, Mono> rangeMono, int prefixLength, boolean smallRange) { Mono iteratorMono = rangeMono.map(range -> { - ReadOptions resolvedSnapshot = resolveSnapshot(snapshot); + ReadOptions resolvedSnapshot = generateReadOptionsOrNull(snapshot); return new LLLocalKeyPrefixReactiveRocksIterator(db, prefixLength, range, nettyDirect, resolvedSnapshot, true, smallRange ); @@ -958,7 +976,7 @@ public class LLLocalDictionary implements LLDictionary { boolean reverse, boolean smallRange) { Mono iteratorMono = rangeMono.map(range -> { - ReadOptions resolvedSnapshot = resolveSnapshot(snapshot); + ReadOptions resolvedSnapshot = generateReadOptionsOrNull(snapshot); return new LLLocalKeyReactiveRocksIterator(db, range, nettyDirect, resolvedSnapshot, reverse, smallRange); }); return Flux.usingWhen(iteratorMono, @@ -976,8 +994,7 @@ public class LLLocalDictionary implements LLDictionary { try (var range = rangeSend.receive()) { assert !Schedulers.isInNonBlockingThread() : "Called setRange in a nonblocking thread"; if (!USE_WRITE_BATCH_IN_SET_RANGE_DELETE || !USE_WRITE_BATCHES_IN_SET_RANGE) { - assert EMPTY_READ_OPTIONS.isOwningHandle(); - try (var opts = LLUtils.generateCustomReadOptions(EMPTY_READ_OPTIONS, + try (var opts = LLUtils.generateCustomReadOptions(null, true, isBoundedRange(range), smallRange @@ -1152,7 +1169,8 @@ public class LLLocalDictionary implements LLDictionary { private void deleteSmallRangeWriteBatch(CappedWriteBatch writeBatch, Send rangeToReceive) throws RocksDBException { var range = rangeToReceive.receive(); - try (var readOpts = new ReadOptions(getReadOptions(null))) { + var readOpts = generateReadOptionsOrStatic(null); + try { readOpts.setFillCache(false); ReleasableSlice minBound; if (range.hasMin()) { @@ -1194,13 +1212,17 @@ public class LLLocalDictionary implements LLDictionary { } catch (Throwable e) { range.close(); throw e; + } finally { + if (readOpts != EMPTY_READ_OPTIONS) { + readOpts.close(); + } } } private void deleteSmallRangeWriteBatch(WriteBatch writeBatch, Send rangeToReceive) throws RocksDBException { try (var range = rangeToReceive.receive()) { - try (var readOpts = LLUtils.generateCustomReadOptions(getReadOptions(null), true, isBoundedRange(range), true)) { + try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNull(null), true, isBoundedRange(range), true)) { readOpts.setFillCache(false); ReleasableSlice minBound; if (range.hasMin()) { @@ -1264,7 +1286,7 @@ public class LLLocalDictionary implements LLDictionary { .fromCallable(() -> { assert !Schedulers.isInNonBlockingThread() : "Called clear in a nonblocking thread"; boolean shouldCompactLater = false; - try (var readOpts = LLUtils.generateCustomReadOptions(getReadOptions(null), false, false, false)) { + try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNull(null), false, false, false)) { readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); // readOpts.setIgnoreRangeDeletions(true); @@ -1336,7 +1358,7 @@ public class LLLocalDictionary implements LLDictionary { if (range.isAll()) { sink.next(fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot)); } else { - try (var readOpts = LLUtils.generateCustomReadOptions(resolveSnapshot(snapshot), false, isBoundedRange(range), false)) { + try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNull(snapshot), false, isBoundedRange(range), false)) { readOpts.setFillCache(false); readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); ReleasableSlice minBound; @@ -1397,7 +1419,7 @@ public class LLLocalDictionary implements LLDictionary { return rangeMono.publishOn(dbRScheduler).handle((rangeSend, sink) -> { try (var range = rangeSend.receive()) { assert !Schedulers.isInNonBlockingThread() : "Called getOne in a nonblocking thread"; - try (var readOpts = LLUtils.generateCustomReadOptions(resolveSnapshot(snapshot), true, true, true)) { + try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNull(snapshot), true, true, true)) { ReleasableSlice minBound; if (range.hasMin()) { minBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe()); @@ -1452,7 +1474,7 @@ public class LLLocalDictionary implements LLDictionary { return rangeMono.publishOn(dbRScheduler).handle((rangeSend, sink) -> { try (var range = rangeSend.receive()) { assert !Schedulers.isInNonBlockingThread() : "Called getOneKey in a nonblocking thread"; - try (var readOpts = LLUtils.generateCustomReadOptions(resolveSnapshot(snapshot), true, true, true)) { + try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNull(snapshot), true, true, true)) { ReleasableSlice minBound; if (range.hasMin()) { minBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe()); @@ -1499,7 +1521,11 @@ public class LLLocalDictionary implements LLDictionary { } private long fastSizeAll(@Nullable LLSnapshot snapshot) throws RocksDBException { - try (var rocksdbSnapshot = new ReadOptions(resolveSnapshot(snapshot))) { + var rocksdbSnapshot = generateReadOptionsOrNull(snapshot); + if (rocksdbSnapshot == null) { + rocksdbSnapshot = new ReadOptions(); + } + try { if (USE_CURRENT_FASTSIZE_FOR_OLD_SNAPSHOTS || rocksdbSnapshot.snapshot() == null) { try { return db.getLongProperty("rocksdb.estimate-num-keys"); @@ -1524,6 +1550,8 @@ public class LLLocalDictionary implements LLDictionary { return count; } } + } finally { + rocksdbSnapshot.close(); } } @@ -1531,7 +1559,7 @@ public class LLLocalDictionary implements LLDictionary { if (Schedulers.isInNonBlockingThread()) { throw new UnsupportedOperationException("Called exactSizeAll in a nonblocking thread"); } - try (var readOpts = LLUtils.generateCustomReadOptions(resolveSnapshot(snapshot), false, false, false)) { + try (var readOpts = LLUtils.generateCustomReadOptions(generateReadOptionsOrNull(snapshot), false, false, false)) { if (LLUtils.MANUAL_READAHEAD) { readOpts.setReadaheadSize(128 * 1024); // 128KiB } @@ -1613,7 +1641,8 @@ public class LLLocalDictionary implements LLDictionary { return rangeMono.publishOn(dbWScheduler).handle((rangeSend, sink) -> { try (var range = rangeSend.receive()) { assert !Schedulers.isInNonBlockingThread() : "Called removeOne in a nonblocking thread"; - try (var readOpts = new ReadOptions(getReadOptions(null))) { + var readOpts = generateReadOptionsOrStatic(null); + try { ReleasableSlice minBound; if (range.hasMin()) { minBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe()); @@ -1655,6 +1684,10 @@ public class LLLocalDictionary implements LLDictionary { } finally { minBound.close(); } + } finally { + if (readOpts != EMPTY_READ_OPTIONS) { + readOpts.close(); + } } } catch (RocksDBException ex) { sink.error(ex); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java index adede02..42fa1f4 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java @@ -19,6 +19,7 @@ import org.jetbrains.annotations.Nullable; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDBException; import reactor.core.publisher.Flux; +import reactor.util.function.Tuples; public abstract class LLLocalGroupedReactiveRocksIterator extends ResourceSupport, LLLocalGroupedReactiveRocksIterator> { @@ -36,9 +37,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator extends } try { if (obj.readOptions != null) { - if (!(obj.readOptions instanceof UnreleasableReadOptions)) { - obj.readOptions.close(); - } + obj.readOptions.close(); } } catch (Throwable ex) { logger.error("Failed to close readOptions", ex); @@ -94,10 +93,10 @@ public abstract class LLLocalGroupedReactiveRocksIterator extends if (logger.isTraceEnabled()) { logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range)); } - return db.getRocksIterator(allowNettyDirect, readOptions, range, false); + return Tuples.of(readOptions, db.getRocksIterator(allowNettyDirect, readOptions, range, false)); }, (tuple, sink) -> { try { - var rocksIterator = tuple.iterator(); + var rocksIterator = tuple.getT2().iterator(); ObjectArrayList values = new ObjectArrayList<>(); Buffer firstGroupKey = null; try { @@ -160,7 +159,11 @@ public abstract class LLLocalGroupedReactiveRocksIterator extends sink.error(ex); } return tuple; - }, RocksIteratorTuple::close); + }, t -> { + t.getT2().close(); + t.getT1().close(); + this.close(); + }); } public abstract T getEntry(@Nullable Send key, @Nullable Send value); @@ -173,14 +176,15 @@ public abstract class LLLocalGroupedReactiveRocksIterator extends @Override protected Owned> prepareSend() { var range = this.range.send(); - var readOptions = new ReadOptions(this.readOptions); + var readOptions = this.readOptions; return drop -> new LLLocalGroupedReactiveRocksIterator<>(db, prefixLength, range, allowNettyDirect, readOptions, canFillCache, - readValues, smallRange + readValues, + smallRange ) { @Override public T getEntry(@Nullable Send key, @Nullable Send value) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java index ce6ef3e..b8fbf55 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java @@ -16,6 +16,7 @@ import org.apache.logging.log4j.Logger; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDBException; import reactor.core.publisher.Flux; +import reactor.util.function.Tuples; public class LLLocalKeyPrefixReactiveRocksIterator extends ResourceSupport { @@ -33,9 +34,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends } try { if (obj.readOptions != null) { - if (!(obj.readOptions instanceof UnreleasableReadOptions)) { - obj.readOptions.close(); - } + obj.readOptions.close(); } } catch (Throwable ex) { logger.error("Failed to close readOptions", ex); @@ -91,10 +90,10 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends if (logger.isTraceEnabled()) { logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(rangeShared)); } - return db.getRocksIterator(allowNettyDirect, readOptions, rangeShared, false); + return Tuples.of(readOptions, db.getRocksIterator(allowNettyDirect, readOptions, rangeShared, false)); }, (tuple, sink) -> { try { - var rocksIterator = tuple.iterator(); + var rocksIterator = tuple.getT2().iterator(); Buffer firstGroupKey = null; try { while (rocksIterator.isValid()) { @@ -151,7 +150,11 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends sink.error(ex); } return tuple; - }, RocksIteratorTuple::close); + }, t -> { + t.getT2().close(); + t.getT1().close(); + this.close(); + }); } @Override @@ -162,13 +165,14 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends @Override protected Owned prepareSend() { var range = this.rangeShared.send(); - var readOptions = new ReadOptions(this.readOptions); + var readOptions = this.readOptions; return drop -> new LLLocalKeyPrefixReactiveRocksIterator(db, prefixLength, range, allowNettyDirect, readOptions, - canFillCache, smallRange + canFillCache, + smallRange ); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMigrationReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMigrationReactiveRocksIterator.java index 38e4ec8..550dfa0 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMigrationReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMigrationReactiveRocksIterator.java @@ -19,6 +19,7 @@ import org.jetbrains.annotations.Nullable; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDBException; import reactor.core.publisher.Flux; +import reactor.util.function.Tuples; public final class LLLocalMigrationReactiveRocksIterator extends ResourceSupport { @@ -36,9 +37,7 @@ public final class LLLocalMigrationReactiveRocksIterator extends } try { if (obj.readOptions != null) { - if (!(obj.readOptions instanceof UnreleasableReadOptions)) { - obj.readOptions.close(); - } + obj.readOptions.close(); } } catch (Throwable ex) { logger.error("Failed to close readOptions", ex); @@ -77,10 +76,11 @@ public final class LLLocalMigrationReactiveRocksIterator extends public Flux flux() { return Flux.generate(() -> { var readOptions = generateCustomReadOptions(this.readOptions, false, false, false); - return db.getRocksIterator(false, readOptions, rangeShared, false); + return Tuples.of(readOptions, db.getRocksIterator(false, readOptions, rangeShared, false)); }, (tuple, sink) -> { try { - var rocksIterator = tuple.iterator(); + //noinspection resource + var rocksIterator = tuple.getT2().iterator(); if (rocksIterator.isValid()) { byte[] key = rocksIterator.key(); byte[] value = rocksIterator.value(); @@ -93,7 +93,11 @@ public final class LLLocalMigrationReactiveRocksIterator extends sink.error(ex); } return tuple; - }, RocksIteratorTuple::close); + }, t -> { + t.getT2().close(); + t.getT1().close(); + this.close(); + }); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java index 264a7ec..a16c397 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java @@ -17,6 +17,7 @@ import org.jetbrains.annotations.Nullable; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDBException; import reactor.core.publisher.Flux; +import reactor.util.function.Tuples; public abstract class LLLocalReactiveRocksIterator extends ResourceSupport, LLLocalReactiveRocksIterator> { @@ -34,9 +35,7 @@ public abstract class LLLocalReactiveRocksIterator extends } try { if (obj.readOptions != null) { - if (!(obj.readOptions instanceof UnreleasableReadOptions)) { - obj.readOptions.close(); - } + obj.readOptions.close(); } } catch (Throwable ex) { logger.error("Failed to close readOptions", ex); @@ -88,10 +87,10 @@ public abstract class LLLocalReactiveRocksIterator extends if (logger.isTraceEnabled()) { logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(rangeShared)); } - return db.getRocksIterator(allowNettyDirect, readOptions, rangeShared, reverse); + return Tuples.of(readOptions, db.getRocksIterator(allowNettyDirect, readOptions, rangeShared, reverse)); }, (tuple, sink) -> { try { - var rocksIterator = tuple.iterator(); + var rocksIterator = tuple.getT2().iterator(); if (rocksIterator.isValid()) { Buffer key; if (allowNettyDirect) { @@ -146,7 +145,11 @@ public abstract class LLLocalReactiveRocksIterator extends sink.error(ex); } return tuple; - }, RocksIteratorTuple::close); + }, t -> { + t.getT2().close(); + t.getT1().close(); + this.close(); + }); } public abstract T getEntry(@Nullable Send key, @Nullable Send value); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java index 2ef031c..2209195 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java @@ -26,8 +26,8 @@ import reactor.core.scheduler.Schedulers; public class LLLocalSingleton implements LLSingleton { - static final ReadOptions EMPTY_READ_OPTIONS = new UnreleasableReadOptions(new UnmodifiableReadOptions()); - static final WriteOptions EMPTY_WRITE_OPTIONS = new UnreleasableWriteOptions(new UnmodifiableWriteOptions()); + private static final ReadOptions EMPTY_READ_OPTIONS = new ReadOptions(); + private static final WriteOptions EMPTY_WRITE_OPTIONS = new WriteOptions(); private final RocksDBColumn db; private final Function snapshotResolver; private final byte[] name; @@ -71,11 +71,13 @@ public class LLLocalSingleton implements LLSingleton { return Mono.fromCallable(callable).subscribeOn(write ? dbWScheduler : dbRScheduler); } - private ReadOptions resolveSnapshot(LLSnapshot snapshot) { + private ReadOptions generateReadOptions(LLSnapshot snapshot, boolean orStaticOpts) { if (snapshot != null) { - return LLUtils.generateCustomReadOptions(null, true, true, true).setSnapshot(snapshotResolver.apply(snapshot)); - } else { + return new ReadOptions().setSnapshot(snapshotResolver.apply(snapshot)); + } else if (orStaticOpts) { return EMPTY_READ_OPTIONS; + } else { + return null; } } @@ -88,7 +90,7 @@ public class LLLocalSingleton implements LLSingleton { public Mono> get(@Nullable LLSnapshot snapshot) { return nameMono.publishOn(dbRScheduler).handle((nameSend, sink) -> { try (Buffer name = nameSend.receive()) { - Buffer result = db.get(resolveSnapshot(snapshot), name); + Buffer result = db.get(generateReadOptions(snapshot, true), name); if (result != null) { sink.next(result.send()); } else { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksIteratorTuple.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksIteratorTuple.java index 9ea797b..600b927 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/RocksIteratorTuple.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksIteratorTuple.java @@ -8,22 +8,13 @@ import org.rocksdb.ReadOptions; import org.rocksdb.RocksIterator; import org.rocksdb.RocksObject; -public record RocksIteratorTuple(List refs, @NotNull RocksDBIterator iterator, - @NotNull ReleasableSlice sliceMin, @NotNull ReleasableSlice sliceMax, - @NotNull SafeCloseable seekTo) implements - SafeCloseable { +public record RocksIteratorTuple(@NotNull RocksDBIterator iterator, + @NotNull ReleasableSlice sliceMin, + @NotNull ReleasableSlice sliceMax, + @NotNull SafeCloseable seekTo) implements SafeCloseable { @Override public void close() { - for (RocksObject rocksObject : refs) { - if (rocksObject instanceof UnreleasableReadOptions) { - continue; - } - if (rocksObject instanceof UnreleasableWriteOptions) { - continue; - } - rocksObject.close(); - } iterator.close(); sliceMin.close(); sliceMax.close(); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/UnmodifiableReadOptions.java b/src/main/java/it/cavallium/dbengine/database/disk/UnmodifiableReadOptions.java deleted file mode 100644 index 5927a70..0000000 --- a/src/main/java/it/cavallium/dbengine/database/disk/UnmodifiableReadOptions.java +++ /dev/null @@ -1,102 +0,0 @@ -package it.cavallium.dbengine.database.disk; - -import org.rocksdb.AbstractSlice; -import org.rocksdb.AbstractTableFilter; -import org.rocksdb.ReadOptions; -import org.rocksdb.ReadTier; -import org.rocksdb.Snapshot; - -public class UnmodifiableReadOptions extends ReadOptions { - - public UnmodifiableReadOptions() { - - } - - public UnmodifiableReadOptions(ReadOptions readOptions) { - super(readOptions); - } - - @Override - public ReadOptions setBackgroundPurgeOnIteratorCleanup(boolean b) { - throw uoe(); - } - - @Override - public ReadOptions setFillCache(boolean b) { - throw uoe(); - } - - @Override - public ReadOptions setSnapshot(Snapshot snapshot) { - throw uoe(); - } - - @Override - public ReadOptions setReadTier(ReadTier readTier) { - throw uoe(); - } - - @Override - public ReadOptions setTailing(boolean b) { - throw uoe(); - } - - @Override - public ReadOptions setVerifyChecksums(boolean b) { - throw uoe(); - } - - @Override - public ReadOptions setManaged(boolean b) { - throw uoe(); - } - - @Override - public ReadOptions setTotalOrderSeek(boolean b) { - throw uoe(); - } - - @Override - public ReadOptions setPrefixSameAsStart(boolean b) { - throw uoe(); - } - - @Override - public ReadOptions setPinData(boolean b) { - throw uoe(); - } - - @Override - public ReadOptions setReadaheadSize(long l) { - throw uoe(); - } - - @Override - public ReadOptions setMaxSkippableInternalKeys(long l) { - throw uoe(); - } - - @Override - public ReadOptions setIgnoreRangeDeletions(boolean b) { - throw uoe(); - } - - @Override - public ReadOptions setIterateLowerBound(AbstractSlice abstractSlice) { - throw uoe(); - } - - @Override - public ReadOptions setIterateUpperBound(AbstractSlice abstractSlice) { - throw uoe(); - } - - @Override - public ReadOptions setTableFilter(AbstractTableFilter abstractTableFilter) { - throw uoe(); - } - - private UnsupportedOperationException uoe() { - return new UnsupportedOperationException("Unmodifiable read options"); - } -} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/UnmodifiableWriteOptions.java b/src/main/java/it/cavallium/dbengine/database/disk/UnmodifiableWriteOptions.java deleted file mode 100644 index db46fb3..0000000 --- a/src/main/java/it/cavallium/dbengine/database/disk/UnmodifiableWriteOptions.java +++ /dev/null @@ -1,48 +0,0 @@ -package it.cavallium.dbengine.database.disk; - -import org.rocksdb.AbstractSlice; -import org.rocksdb.AbstractTableFilter; -import org.rocksdb.ReadOptions; -import org.rocksdb.ReadTier; -import org.rocksdb.Snapshot; -import org.rocksdb.WriteOptions; - -public class UnmodifiableWriteOptions extends WriteOptions { - - public UnmodifiableWriteOptions() { - - } - - public UnmodifiableWriteOptions(WriteOptions writeOptions) { - super(writeOptions); - } - - @Override - public WriteOptions setDisableWAL(boolean b) { - throw uoe(); - } - - @Override - public WriteOptions setIgnoreMissingColumnFamilies(boolean b) { - throw uoe(); - } - - @Override - public WriteOptions setLowPri(boolean b) { - throw uoe(); - } - - @Override - public WriteOptions setNoSlowdown(boolean b) { - throw uoe(); - } - - @Override - public WriteOptions setSync(boolean b) { - throw uoe(); - } - - private UnsupportedOperationException uoe() { - return new UnsupportedOperationException("Unmodifiable read options"); - } -} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/UnreleasableReadOptions.java b/src/main/java/it/cavallium/dbengine/database/disk/UnreleasableReadOptions.java deleted file mode 100644 index c410572..0000000 --- a/src/main/java/it/cavallium/dbengine/database/disk/UnreleasableReadOptions.java +++ /dev/null @@ -1,19 +0,0 @@ -package it.cavallium.dbengine.database.disk; - -import org.rocksdb.*; - -public class UnreleasableReadOptions extends ReadOptions { - - public UnreleasableReadOptions() { - - } - - public UnreleasableReadOptions(ReadOptions readOptions) { - super(readOptions); - } - - @Override - public void close() { - throw new UnsupportedOperationException("Can't close " + this.getClass().getSimpleName()); - } -} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/UnreleasableWriteOptions.java b/src/main/java/it/cavallium/dbengine/database/disk/UnreleasableWriteOptions.java deleted file mode 100644 index 008cf73..0000000 --- a/src/main/java/it/cavallium/dbengine/database/disk/UnreleasableWriteOptions.java +++ /dev/null @@ -1,19 +0,0 @@ -package it.cavallium.dbengine.database.disk; - -import org.rocksdb.WriteOptions; - -public class UnreleasableWriteOptions extends WriteOptions { - - public UnreleasableWriteOptions() { - - } - - public UnreleasableWriteOptions(WriteOptions writeOptions) { - super(writeOptions); - } - - @Override - public void close() { - throw new UnsupportedOperationException("Can't close " + this.getClass().getSimpleName()); - } -} diff --git a/src/main/java/it/cavallium/dbengine/lucene/HugePqArray.java b/src/main/java/it/cavallium/dbengine/lucene/HugePqArray.java index 0d1c5d6..dd97940 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/HugePqArray.java +++ b/src/main/java/it/cavallium/dbengine/lucene/HugePqArray.java @@ -6,11 +6,8 @@ import it.cavallium.dbengine.database.SafeCloseable; import it.cavallium.dbengine.database.disk.LLTempHugePqEnv; import it.cavallium.dbengine.database.disk.HugePqEnv; import it.cavallium.dbengine.database.disk.StandardRocksDBColumn; -import it.cavallium.dbengine.database.disk.UnreleasableReadOptions; -import it.cavallium.dbengine.database.disk.UnreleasableWriteOptions; import java.util.concurrent.atomic.AtomicBoolean; import org.jetbrains.annotations.Nullable; -import org.rocksdb.FlushOptions; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; @@ -28,11 +25,8 @@ public class HugePqArray implements IArray, SafeCloseable { private final HugePqEnv env; private final int hugePqId; private final StandardRocksDBColumn rocksDB; - private static final UnreleasableWriteOptions writeOptions = new UnreleasableWriteOptions(new WriteOptions() - .setDisableWAL(true) - .setSync(false)); - private static final UnreleasableReadOptions readOptions = new UnreleasableReadOptions(new ReadOptions() - .setVerifyChecksums(false)); + private static final WriteOptions WRITE_OPTIONS = new WriteOptions().setDisableWAL(true).setSync(false); + private static final ReadOptions READ_OPTIONS = new ReadOptions().setVerifyChecksums(false); private final V defaultValue; private final long virtualSize; @@ -67,7 +61,7 @@ public class HugePqArray implements IArray, SafeCloseable { var keyBuf = allocate(Long.BYTES); try (var valueBuf = valueCodec.serialize(this::allocate, value); keyBuf) { keyBuf.writeLong(index); - rocksDB.put(writeOptions, keyBuf, valueBuf); + rocksDB.put(WRITE_OPTIONS, keyBuf, valueBuf); } catch (RocksDBException e) { throw new IllegalStateException(e); } @@ -80,7 +74,7 @@ public class HugePqArray implements IArray, SafeCloseable { var keyBuf = allocate(Long.BYTES); try (keyBuf) { keyBuf.writeLong(index); - rocksDB.delete(writeOptions, keyBuf); + rocksDB.delete(WRITE_OPTIONS, keyBuf); } catch (RocksDBException e) { throw new IllegalStateException(e); } @@ -94,7 +88,7 @@ public class HugePqArray implements IArray, SafeCloseable { var keyBuf = allocate(Long.BYTES); try (keyBuf) { keyBuf.writeLong(index); - try (var value = rocksDB.get(readOptions, keyBuf)) { + try (var value = rocksDB.get(READ_OPTIONS, keyBuf)) { if (value == null) { return null; } diff --git a/src/main/java/it/cavallium/dbengine/lucene/HugePqPriorityQueue.java b/src/main/java/it/cavallium/dbengine/lucene/HugePqPriorityQueue.java index bd2b164..766b39f 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/HugePqPriorityQueue.java +++ b/src/main/java/it/cavallium/dbengine/lucene/HugePqPriorityQueue.java @@ -8,8 +8,6 @@ import it.cavallium.dbengine.database.disk.LLTempHugePqEnv; import it.cavallium.dbengine.database.disk.HugePqEnv; import it.cavallium.dbengine.database.disk.RocksIteratorTuple; import it.cavallium.dbengine.database.disk.StandardRocksDBColumn; -import it.cavallium.dbengine.database.disk.UnreleasableReadOptions; -import it.cavallium.dbengine.database.disk.UnreleasableWriteOptions; import it.cavallium.dbengine.database.disk.UpdateAtomicResultMode; import it.cavallium.dbengine.database.disk.UpdateAtomicResultPrevious; import java.io.IOException; @@ -39,11 +37,8 @@ public class HugePqPriorityQueue implements PriorityQueue, Reversable codec; private long size = 0; @@ -74,7 +69,7 @@ public class HugePqPriorityQueue implements PriorityQueue, Reversable implements PriorityQueue, Reversable implements PriorityQueue, Reversable implements PriorityQueue, Reversable implements PriorityQueue, Reversable implements PriorityQueue, Reversable iterate(long skips, boolean reverse) { return Flux., RocksIteratorTuple>generate(() -> { - var it = rocksDB.getRocksIterator(true, readOptions, LLRange.all(), reverse); + var it = rocksDB.getRocksIterator(true, READ_OPTIONS, LLRange.all(), reverse); var rocksIterator = it.iterator(); if (reverse) { rocksIterator.seekToLast(); diff --git a/src/main/java/it/cavallium/dbengine/lucene/LLSlotDocCodec.java b/src/main/java/it/cavallium/dbengine/lucene/LLSlotDocCodec.java index 569c95c..a058a8d 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LLSlotDocCodec.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LLSlotDocCodec.java @@ -24,6 +24,7 @@ public class LLSlotDocCodec implements HugePqCodec, FieldValueHitQueu protected final FieldComparator[] comparators; protected final int[] reverseMul; + private final ComparatorOptions comparatorOptions; private final AbstractComparator comparator; public LLSlotDocCodec(LLTempHugePqEnv env, int numHits, SortField[] fields) { @@ -42,7 +43,8 @@ public class LLSlotDocCodec implements HugePqCodec, FieldValueHitQueu reverseMul[i] = field.getReverse() ? -1 : 1; comparators[i] = HugePqComparator.getComparator(env, field, numHits, i == 0); } - comparator = new AbstractComparator(new ComparatorOptions().setMaxReusedBufferSize(0)) { + comparatorOptions = new ComparatorOptions().setMaxReusedBufferSize(0); + comparator = new AbstractComparator(comparatorOptions) { @Override public String name() { return "slot-doc-codec-comparator"; @@ -193,6 +195,8 @@ public class LLSlotDocCodec implements HugePqCodec, FieldValueHitQueu closeable.close(); } } + comparator.close(); + comparatorOptions.close(); } @Override