diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index ba95f14..2be0b7c 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -68,9 +68,9 @@ public class LLLocalDictionary implements LLDictionary { static final long MAX_WRITE_BATCH_SIZE = 1024L * 1024L * 1024L; // 1GiB static final int CAPPED_WRITE_BATCH_CAP = 50000; // 50K operations static final int MULTI_GET_WINDOW = 500; - static final ReadOptions EMPTY_READ_OPTIONS = new ReadOptions(); - static final WriteOptions EMPTY_WRITE_OPTIONS = new WriteOptions(); - static final WriteOptions BATCH_WRITE_OPTIONS = new WriteOptions(); + static final ReadOptions EMPTY_READ_OPTIONS = new UnmodifiableReadOptions(); + static final WriteOptions EMPTY_WRITE_OPTIONS = new UnmodifiableWriteOptions(); + static final WriteOptions BATCH_WRITE_OPTIONS = new UnmodifiableWriteOptions(); static final boolean PREFER_SEEK_TO_FIRST = false; static final boolean VERIFY_CHECKSUMS_WHEN_NOT_NEEDED = false; public static final boolean DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED = true; @@ -153,6 +153,9 @@ public class LLLocalDictionary implements LLDictionary { return databaseName; } + /** + * Please don't modify the returned ReadOptions! If you want to modify it, wrap it into a new ReadOptions! + */ private ReadOptions resolveSnapshot(LLSnapshot snapshot) { if (snapshot != null) { return getReadOptions(snapshotResolver.apply(snapshot)); @@ -161,6 +164,9 @@ public class LLLocalDictionary implements LLDictionary { } } + /** + * Please don't modify the returned ReadOptions! If you want to modify it, wrap it into a new ReadOptions! + */ private ReadOptions getReadOptions(Snapshot snapshot) { if (snapshot != null) { return new ReadOptions().setSnapshot(snapshot); @@ -388,40 +394,41 @@ public class LLLocalDictionary implements LLDictionary { try { return Mono .fromCallable(() -> { - var readOpts = resolveSnapshot(snapshot); - readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); - readOpts.setFillCache(false); - if (range.hasMin()) { - if (range.getMin().isDirect()) { - readOpts.setIterateLowerBound(new DirectSlice(Objects.requireNonNull(LLUtils.toDirect(range.getMin()), - "This range must use direct buffers" - ))); - } else { - readOpts.setIterateLowerBound(new Slice(LLUtils.toArray(range.getMin()))); - } - } - if (range.hasMax()) { - if (range.getMax().isDirect()) { - readOpts.setIterateUpperBound(new DirectSlice(Objects.requireNonNull(LLUtils.toDirect(range.getMax()), - "This range must use direct buffers" - ))); - } else { - readOpts.setIterateUpperBound(new Slice(LLUtils.toArray(range.getMax()))); - } - } - try (RocksIterator rocksIterator = db.newIterator(cfh, readOpts)) { - if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { + try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { + readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); + readOpts.setFillCache(false); + if (range.hasMin()) { if (range.getMin().isDirect()) { - rocksIterator.seek(Objects.requireNonNull(LLUtils.toDirect(range.getMin()), + readOpts.setIterateLowerBound(new DirectSlice(Objects.requireNonNull(LLUtils.toDirect(range.getMin()), "This range must use direct buffers" - )); + ))); } else { - rocksIterator.seek(LLUtils.toArray(range.getMin())); + readOpts.setIterateLowerBound(new Slice(LLUtils.toArray(range.getMin()))); } - } else { - rocksIterator.seekToFirst(); } - return rocksIterator.isValid(); + if (range.hasMax()) { + if (range.getMax().isDirect()) { + readOpts.setIterateUpperBound(new DirectSlice(Objects.requireNonNull(LLUtils.toDirect(range.getMax()), + "This range must use direct buffers" + ))); + } else { + readOpts.setIterateUpperBound(new Slice(LLUtils.toArray(range.getMax()))); + } + } + try (RocksIterator rocksIterator = db.newIterator(cfh, readOpts)) { + if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { + if (range.getMin().isDirect()) { + rocksIterator.seek(Objects.requireNonNull(LLUtils.toDirect(range.getMin()), + "This range must use direct buffers" + )); + } else { + rocksIterator.seek(LLUtils.toArray(range.getMin())); + } + } else { + rocksIterator.seekToFirst(); + } + return rocksIterator.isValid(); + } } }) .onErrorMap(cause -> new IOException("Failed to read range " + range.toString(), cause)) @@ -451,11 +458,12 @@ public class LLLocalDictionary implements LLDictionary { int size = RocksDB.NOT_FOUND; byte[] keyBytes = LLUtils.toArray(key); Holder data = new Holder<>(); - if (db.keyMayExist(cfh, resolveSnapshot(snapshot), keyBytes, data)) { + var unmodifiableReadOpts = resolveSnapshot(snapshot); + if (db.keyMayExist(cfh, unmodifiableReadOpts, keyBytes, data)) { if (data.getValue() != null) { size = data.getValue().length; } else { - size = db.get(cfh, resolveSnapshot(snapshot), keyBytes, NO_DATA); + size = db.get(cfh, unmodifiableReadOpts, keyBytes, NO_DATA); } } return size != RocksDB.NOT_FOUND; @@ -1296,35 +1304,36 @@ public class LLLocalDictionary implements LLDictionary { return Mono .fromCallable(() -> { if (!USE_WRITE_BATCH_IN_SET_RANGE_DELETE || !USE_WRITE_BATCHES_IN_SET_RANGE) { - var opts = new ReadOptions(EMPTY_READ_OPTIONS); - ReleasableSlice minBound; - if (range.hasMin()) { - minBound = setIterateBound(opts, IterateBound.LOWER, range.getMin().retain()); - } else { - minBound = emptyReleasableSlice(); - } - try { - ReleasableSlice maxBound; - if (range.hasMax()) { - maxBound = setIterateBound(opts, IterateBound.UPPER, range.getMax().retain()); + try (var opts = new ReadOptions(EMPTY_READ_OPTIONS)) { + ReleasableSlice minBound; + if (range.hasMin()) { + minBound = setIterateBound(opts, IterateBound.LOWER, range.getMin().retain()); } else { - maxBound = emptyReleasableSlice(); + minBound = emptyReleasableSlice(); } - try (RocksIterator it = db.newIterator(cfh, opts)) { - if (!PREFER_SEEK_TO_FIRST && range.hasMin()) { - rocksIterSeekTo(it, range.getMin().retain()); + try { + ReleasableSlice maxBound; + if (range.hasMax()) { + maxBound = setIterateBound(opts, IterateBound.UPPER, range.getMax().retain()); } else { - it.seekToFirst(); + maxBound = emptyReleasableSlice(); } - while (it.isValid()) { - db.delete(cfh, it.key()); - it.next(); + try (RocksIterator it = db.newIterator(cfh, opts)) { + if (!PREFER_SEEK_TO_FIRST && range.hasMin()) { + rocksIterSeekTo(it, range.getMin().retain()); + } else { + it.seekToFirst(); + } + while (it.isValid()) { + db.delete(cfh, it.key()); + it.next(); + } + } finally { + maxBound.release(); } } finally { - maxBound.release(); + minBound.release(); } - } finally { - minBound.release(); } } else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) { try (var batch = new CappedWriteBatch(db, @@ -1445,8 +1454,7 @@ public class LLLocalDictionary implements LLDictionary { //todo: this is broken, check why private void deleteSmallRangeWriteBatch(CappedWriteBatch writeBatch, LLRange range) throws RocksDBException { - try { - var readOpts = getReadOptions(null); + try (var readOpts = new ReadOptions(getReadOptions(null))) { readOpts.setFillCache(false); ReleasableSlice minBound; if (range.hasMin()) { @@ -1484,8 +1492,7 @@ public class LLLocalDictionary implements LLDictionary { private void deleteSmallRangeWriteBatch(WriteBatch writeBatch, LLRange range) throws RocksDBException { - try { - var readOpts = getReadOptions(null); + try (var readOpts = new ReadOptions(getReadOptions(null))) { readOpts.setFillCache(false); ReleasableSlice minBound; if (range.hasMin()) { @@ -1656,45 +1663,46 @@ public class LLLocalDictionary implements LLDictionary { } else { return Mono .fromCallable(() -> { - var readOpts = resolveSnapshot(snapshot); - readOpts.setFillCache(false); - readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); - ReleasableSlice minBound; - if (range.hasMin()) { - minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain()); - } else { - minBound = emptyReleasableSlice(); - } - try { - ReleasableSlice maxBound; - if (range.hasMax()) { - maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain()); + try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { + readOpts.setFillCache(false); + readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); + ReleasableSlice minBound; + if (range.hasMin()) { + minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain()); } else { - maxBound = emptyReleasableSlice(); + minBound = emptyReleasableSlice(); } try { - if (fast) { - readOpts.setIgnoreRangeDeletions(true); - + ReleasableSlice maxBound; + if (range.hasMax()) { + maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain()); + } else { + maxBound = emptyReleasableSlice(); } - try (var rocksIterator = db.newIterator(cfh, readOpts)) { - if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - rocksIterSeekTo(rocksIterator, range.getMin().retain()); - } else { - rocksIterator.seekToFirst(); + try { + if (fast) { + readOpts.setIgnoreRangeDeletions(true); + } - long i = 0; - while (rocksIterator.isValid()) { - rocksIterator.next(); - i++; + try (var rocksIterator = db.newIterator(cfh, readOpts)) { + if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { + rocksIterSeekTo(rocksIterator, range.getMin().retain()); + } else { + rocksIterator.seekToFirst(); + } + long i = 0; + while (rocksIterator.isValid()) { + rocksIterator.next(); + i++; + } + return i; } - return i; + } finally { + maxBound.release(); } } finally { - maxBound.release(); + minBound.release(); } - } finally { - minBound.release(); } }) .onErrorMap(cause -> new IOException("Failed to get size of range " @@ -1714,46 +1722,47 @@ public class LLLocalDictionary implements LLDictionary { try { return Mono .fromCallable(() -> { - var readOpts = resolveSnapshot(snapshot); - ReleasableSlice minBound; - if (range.hasMin()) { - minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain()); - } else { - minBound = emptyReleasableSlice(); - } - try { - ReleasableSlice maxBound; - if (range.hasMax()) { - maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain()); + try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { + ReleasableSlice minBound; + if (range.hasMin()) { + minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain()); } else { - maxBound = emptyReleasableSlice(); + minBound = emptyReleasableSlice(); } - try (var rocksIterator = db.newIterator(cfh, readOpts)) { - if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - rocksIterSeekTo(rocksIterator, range.getMin().retain()); + try { + ReleasableSlice maxBound; + if (range.hasMax()) { + maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain()); } else { - rocksIterator.seekToFirst(); + maxBound = emptyReleasableSlice(); } - if (rocksIterator.isValid()) { - ByteBuf key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key); - try { - ByteBuf value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value); - try { - return Map.entry(key.retain(), value.retain()); - } finally { - value.release(); - } - } finally { - key.release(); + try (var rocksIterator = db.newIterator(cfh, readOpts)) { + if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { + rocksIterSeekTo(rocksIterator, range.getMin().retain()); + } else { + rocksIterator.seekToFirst(); } - } else { - return null; + if (rocksIterator.isValid()) { + ByteBuf key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key); + try { + ByteBuf value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value); + try { + return Map.entry(key.retain(), value.retain()); + } finally { + value.release(); + } + } finally { + key.release(); + } + } else { + return null; + } + } finally { + maxBound.release(); } } finally { - maxBound.release(); + minBound.release(); } - } finally { - minBound.release(); } }) .subscribeOn(dbScheduler) @@ -1769,38 +1778,39 @@ public class LLLocalDictionary implements LLDictionary { try { return Mono .fromCallable(() -> { - var readOpts = resolveSnapshot(snapshot); - ReleasableSlice minBound; - if (range.hasMin()) { - minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain()); - } else { - minBound = emptyReleasableSlice(); - } - try { - ReleasableSlice maxBound; - if (range.hasMax()) { - maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain()); + try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { + ReleasableSlice minBound; + if (range.hasMin()) { + minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain()); } else { - maxBound = emptyReleasableSlice(); + minBound = emptyReleasableSlice(); } - try (var rocksIterator = db.newIterator(cfh, readOpts)) { - if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - rocksIterSeekTo(rocksIterator, range.getMin().retain()); + try { + ReleasableSlice maxBound; + if (range.hasMax()) { + maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain()); } else { - rocksIterator.seekToFirst(); + maxBound = emptyReleasableSlice(); } - ByteBuf key; - if (rocksIterator.isValid()) { - key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key); - return key; - } else { - return null; + try (var rocksIterator = db.newIterator(cfh, readOpts)) { + if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { + rocksIterSeekTo(rocksIterator, range.getMin().retain()); + } else { + rocksIterator.seekToFirst(); + } + ByteBuf key; + if (rocksIterator.isValid()) { + key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key); + return key; + } else { + return null; + } + } finally { + maxBound.release(); } } finally { - maxBound.release(); + minBound.release(); } - } finally { - minBound.release(); } }) .subscribeOn(dbScheduler) @@ -1812,102 +1822,105 @@ public class LLLocalDictionary implements LLDictionary { } private long fastSizeAll(@Nullable LLSnapshot snapshot) { - var rocksdbSnapshot = resolveSnapshot(snapshot); - if (USE_CURRENT_FASTSIZE_FOR_OLD_SNAPSHOTS || rocksdbSnapshot.snapshot() == null) { - try { - return db.getLongProperty(cfh, "rocksdb.estimate-num-keys"); - } catch (RocksDBException e) { - e.printStackTrace(); - return 0; - } - } else if (PARALLEL_EXACT_SIZE) { - return exactSizeAll(snapshot); - } else { - rocksdbSnapshot.setFillCache(false); - rocksdbSnapshot.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); - rocksdbSnapshot.setIgnoreRangeDeletions(true); - long count = 0; - try (RocksIterator iter = db.newIterator(cfh, rocksdbSnapshot)) { - iter.seekToFirst(); - // If it's a fast size of a snapshot, count only up to 100'000 elements - while (iter.isValid() && count < 100_000) { - count++; - iter.next(); + try (var rocksdbSnapshot = new ReadOptions(resolveSnapshot(snapshot))) { + if (USE_CURRENT_FASTSIZE_FOR_OLD_SNAPSHOTS || rocksdbSnapshot.snapshot() == null) { + try { + return db.getLongProperty(cfh, "rocksdb.estimate-num-keys"); + } catch (RocksDBException e) { + e.printStackTrace(); + return 0; + } + } else if (PARALLEL_EXACT_SIZE) { + return exactSizeAll(snapshot); + } else { + rocksdbSnapshot.setFillCache(false); + rocksdbSnapshot.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); + rocksdbSnapshot.setIgnoreRangeDeletions(true); + long count = 0; + try (RocksIterator iter = db.newIterator(cfh, rocksdbSnapshot)) { + iter.seekToFirst(); + // If it's a fast size of a snapshot, count only up to 100'000 elements + while (iter.isValid() && count < 100_000) { + count++; + iter.next(); + } + return count; } - return count; } } } private long exactSizeAll(@Nullable LLSnapshot snapshot) { - var readOpts = resolveSnapshot(snapshot); - readOpts.setFillCache(false); - readOpts.setReadaheadSize(32 * 1024); // 32KiB - readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); + try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { + readOpts.setFillCache(false); + readOpts.setReadaheadSize(32 * 1024); // 32KiB + readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); - if (PARALLEL_EXACT_SIZE) { - var commonPool = ForkJoinPool.commonPool(); - var futures = IntStream - .range(-1, LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS.length) - .mapToObj(idx -> Pair.of(idx == -1 ? new byte[0] : LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS[idx], - idx + 1 >= LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS.length ? null - : LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS[idx + 1] - )) - .map(range -> (Callable) () -> { - long partialCount = 0; - var rangeReadOpts = new ReadOptions(readOpts); - Slice sliceBegin; - if (range.getKey() != null) { - sliceBegin = new Slice(range.getKey()); - } else { - sliceBegin = null; - } - Slice sliceEnd; - if (range.getValue() != null) { - sliceEnd = new Slice(range.getValue()); - } else { - sliceEnd = null; - } - try { - if (sliceBegin != null) { - rangeReadOpts.setIterateLowerBound(sliceBegin); - } - if (sliceBegin != null) { - rangeReadOpts.setIterateUpperBound(sliceEnd); - } - try (RocksIterator iter = db.newIterator(cfh, rangeReadOpts)) { - iter.seekToFirst(); - while (iter.isValid()) { - partialCount++; - iter.next(); + if (PARALLEL_EXACT_SIZE) { + var commonPool = ForkJoinPool.commonPool(); + var futures = IntStream + .range(-1, LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS.length) + .mapToObj(idx -> Pair.of(idx == -1 ? new byte[0] : LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS[idx], + idx + 1 >= LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS.length ? null + : LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS[idx + 1] + )) + .map(range -> (Callable) () -> { + long partialCount = 0; + try (var rangeReadOpts = new ReadOptions(readOpts)) { + Slice sliceBegin; + if (range.getKey() != null) { + sliceBegin = new Slice(range.getKey()); + } else { + sliceBegin = null; + } + Slice sliceEnd; + if (range.getValue() != null) { + sliceEnd = new Slice(range.getValue()); + } else { + sliceEnd = null; + } + try { + if (sliceBegin != null) { + rangeReadOpts.setIterateLowerBound(sliceBegin); + } + if (sliceBegin != null) { + rangeReadOpts.setIterateUpperBound(sliceEnd); + } + try (RocksIterator iter = db.newIterator(cfh, rangeReadOpts)) { + iter.seekToFirst(); + while (iter.isValid()) { + partialCount++; + iter.next(); + } + return partialCount; + } + } finally { + if (sliceBegin != null) { + sliceBegin.close(); + } + if (sliceEnd != null) { + sliceEnd.close(); + } } - return partialCount; } - } finally { - if (sliceBegin != null) { - sliceBegin.close(); - } - if (sliceEnd != null) { - sliceEnd.close(); - } - } - }) - .map(commonPool::submit) - .collect(Collectors.toList()); - long count = 0; - for (ForkJoinTask future : futures) { - count += future.join(); - } - return count; - } else { - long count = 0; - try (RocksIterator iter = db.newIterator(cfh, readOpts)) { - iter.seekToFirst(); - while (iter.isValid()) { - count++; - iter.next(); + }) + .map(commonPool::submit) + .collect(Collectors.toList()); + long count = 0; + for (ForkJoinTask future : futures) { + count += future.join(); } return count; + } else { + long count = 0; + try (RocksIterator iter = db.newIterator(cfh, readOpts)) { + iter.seekToFirst(); + while (iter.isValid()) { + count++; + iter.next(); + } + return count; + } } } } @@ -1917,38 +1930,39 @@ public class LLLocalDictionary implements LLDictionary { try { return Mono .fromCallable(() -> { - var readOpts = getReadOptions(null); - ReleasableSlice minBound; - if (range.hasMin()) { - minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain()); - } else { - minBound = emptyReleasableSlice(); - } - try { - ReleasableSlice maxBound; - if (range.hasMax()) { - maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain()); + try (var readOpts = new ReadOptions(getReadOptions(null))) { + ReleasableSlice minBound; + if (range.hasMin()) { + minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain()); } else { - maxBound = emptyReleasableSlice(); + minBound = emptyReleasableSlice(); } - try (RocksIterator rocksIterator = db.newIterator(cfh, readOpts)) { - if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - rocksIterSeekTo(rocksIterator, range.getMin().retain()); + try { + ReleasableSlice maxBound; + if (range.hasMax()) { + maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain()); } else { - rocksIterator.seekToFirst(); + maxBound = emptyReleasableSlice(); } - if (!rocksIterator.isValid()) { - return null; + try (RocksIterator rocksIterator = db.newIterator(cfh, readOpts)) { + if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { + rocksIterSeekTo(rocksIterator, range.getMin().retain()); + } else { + rocksIterator.seekToFirst(); + } + if (!rocksIterator.isValid()) { + return null; + } + ByteBuf key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key); + ByteBuf value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value); + dbDelete(cfh, null, key); + return Map.entry(key, value); + } finally { + maxBound.release(); } - ByteBuf key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key); - ByteBuf value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value); - dbDelete(cfh, null, key); - return Map.entry(key, value); } finally { - maxBound.release(); + minBound.release(); } - } finally { - minBound.release(); } }) .onErrorMap(cause -> new IOException("Failed to delete " + range.toString(), cause)) diff --git a/src/main/java/it/cavallium/dbengine/database/disk/UnmodifiableReadOptions.java b/src/main/java/it/cavallium/dbengine/database/disk/UnmodifiableReadOptions.java new file mode 100644 index 0000000..6c1afc5 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/disk/UnmodifiableReadOptions.java @@ -0,0 +1,108 @@ +package it.cavallium.dbengine.database.disk; + +import org.rocksdb.AbstractSlice; +import org.rocksdb.AbstractTableFilter; +import org.rocksdb.ReadOptions; +import org.rocksdb.ReadTier; +import org.rocksdb.Snapshot; +import org.warp.commonutils.range.UnmodifiableRange; + +public class UnmodifiableReadOptions extends ReadOptions { + + public UnmodifiableReadOptions() { + + } + + public UnmodifiableReadOptions(ReadOptions readOptions) { + super(readOptions); + } + + @Override + public ReadOptions setBackgroundPurgeOnIteratorCleanup(boolean b) { + throw uoe(); + } + + @Override + public ReadOptions setFillCache(boolean b) { + throw uoe(); + } + + @Override + public ReadOptions setSnapshot(Snapshot snapshot) { + throw uoe(); + } + + @Override + public ReadOptions setReadTier(ReadTier readTier) { + throw uoe(); + } + + @Override + public ReadOptions setTailing(boolean b) { + throw uoe(); + } + + @Override + public ReadOptions setVerifyChecksums(boolean b) { + throw uoe(); + } + + @Override + public ReadOptions setManaged(boolean b) { + throw uoe(); + } + + @Override + public ReadOptions setTotalOrderSeek(boolean b) { + throw uoe(); + } + + @Override + public ReadOptions setPrefixSameAsStart(boolean b) { + throw uoe(); + } + + @Override + public ReadOptions setPinData(boolean b) { + throw uoe(); + } + + @Override + public ReadOptions setReadaheadSize(long l) { + throw uoe(); + } + + @Override + public ReadOptions setMaxSkippableInternalKeys(long l) { + throw uoe(); + } + + @Override + public ReadOptions setIgnoreRangeDeletions(boolean b) { + throw uoe(); + } + + @Override + public ReadOptions setIterateLowerBound(AbstractSlice abstractSlice) { + throw uoe(); + } + + @Override + public ReadOptions setIterateUpperBound(AbstractSlice abstractSlice) { + throw uoe(); + } + + @Override + public ReadOptions setIterStartSeqnum(long l) { + throw uoe(); + } + + @Override + public ReadOptions setTableFilter(AbstractTableFilter abstractTableFilter) { + throw uoe(); + } + + private UnsupportedOperationException uoe() { + return new UnsupportedOperationException("Unmodifiable read options"); + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/UnmodifiableWriteOptions.java b/src/main/java/it/cavallium/dbengine/database/disk/UnmodifiableWriteOptions.java new file mode 100644 index 0000000..db46fb3 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/disk/UnmodifiableWriteOptions.java @@ -0,0 +1,48 @@ +package it.cavallium.dbengine.database.disk; + +import org.rocksdb.AbstractSlice; +import org.rocksdb.AbstractTableFilter; +import org.rocksdb.ReadOptions; +import org.rocksdb.ReadTier; +import org.rocksdb.Snapshot; +import org.rocksdb.WriteOptions; + +public class UnmodifiableWriteOptions extends WriteOptions { + + public UnmodifiableWriteOptions() { + + } + + public UnmodifiableWriteOptions(WriteOptions writeOptions) { + super(writeOptions); + } + + @Override + public WriteOptions setDisableWAL(boolean b) { + throw uoe(); + } + + @Override + public WriteOptions setIgnoreMissingColumnFamilies(boolean b) { + throw uoe(); + } + + @Override + public WriteOptions setLowPri(boolean b) { + throw uoe(); + } + + @Override + public WriteOptions setNoSlowdown(boolean b) { + throw uoe(); + } + + @Override + public WriteOptions setSync(boolean b) { + throw uoe(); + } + + private UnsupportedOperationException uoe() { + return new UnsupportedOperationException("Unmodifiable read options"); + } +}