diff --git a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java index 953a679..5a15d8c 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java @@ -23,17 +23,17 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure { ByteBufAllocator getAllocator(); - Mono get(@Nullable LLSnapshot snapshot, ByteBuf key, boolean existsAlmostCertainly); + Mono get(@Nullable LLSnapshot snapshot, Mono key, boolean existsAlmostCertainly); - default Mono get(@Nullable LLSnapshot snapshot, ByteBuf key) { + default Mono get(@Nullable LLSnapshot snapshot, Mono key) { return get(snapshot, key, false); } - Mono put(ByteBuf key, ByteBuf value, LLDictionaryResultType resultType); + Mono put(Mono key, Mono value, LLDictionaryResultType resultType); Mono getUpdateMode(); - default Mono update(ByteBuf key, + default Mono update(Mono key, Function<@Nullable ByteBuf, @Nullable ByteBuf> updater, UpdateReturnMode updateReturnMode, boolean existsAlmostCertainly) { @@ -42,24 +42,24 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure { .transform(prev -> LLUtils.resolveDelta(prev, updateReturnMode)); } - default Mono update(ByteBuf key, + default Mono update(Mono key, Function<@Nullable ByteBuf, @Nullable ByteBuf> updater, UpdateReturnMode returnMode) { return update(key, updater, returnMode, false); } - Mono> updateAndGetDelta(ByteBuf key, + Mono> updateAndGetDelta(Mono key, Function<@Nullable ByteBuf, @Nullable ByteBuf> updater, boolean existsAlmostCertainly); - default Mono> updateAndGetDelta(ByteBuf key, + default Mono> updateAndGetDelta(Mono key, Function<@Nullable ByteBuf, @Nullable ByteBuf> updater) { return updateAndGetDelta(key, updater, false); } Mono clear(); - Mono remove(ByteBuf key, LLDictionaryResultType resultType); + Mono remove(Mono key, LLDictionaryResultType resultType); Flux>> getMulti(@Nullable LLSnapshot snapshot, Flux> keys, @@ -74,34 +74,34 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure { Flux> updateMulti(Flux> entries, BiFunction updateFunction); - Flux> getRange(@Nullable LLSnapshot snapshot, LLRange range, boolean existsAlmostCertainly); + Flux> getRange(@Nullable LLSnapshot snapshot, Mono range, boolean existsAlmostCertainly); - default Flux> getRange(@Nullable LLSnapshot snapshot, LLRange range) { + default Flux> getRange(@Nullable LLSnapshot snapshot, Mono range) { return getRange(snapshot, range, false); } Flux>> getRangeGrouped(@Nullable LLSnapshot snapshot, - LLRange range, + Mono range, int prefixLength, boolean existsAlmostCertainly); default Flux>> getRangeGrouped(@Nullable LLSnapshot snapshot, - LLRange range, + Mono range, int prefixLength) { return getRangeGrouped(snapshot, range, prefixLength, false); } - Flux getRangeKeys(@Nullable LLSnapshot snapshot, LLRange range); + Flux getRangeKeys(@Nullable LLSnapshot snapshot, Mono range); - Flux> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength); + Flux> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, Mono range, int prefixLength); - Flux getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength); + Flux getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, Mono range, int prefixLength); - Flux badBlocks(LLRange range); + Flux badBlocks(Mono range); - Mono setRange(LLRange range, Flux> entries); + Mono setRange(Mono range, Flux> entries); - default Mono replaceRange(LLRange range, + default Mono replaceRange(Mono range, boolean canKeysChange, Function, Mono>> entriesReplacer, boolean existsAlmostCertainly) { @@ -122,19 +122,19 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure { }); } - default Mono replaceRange(LLRange range, + default Mono replaceRange(Mono range, boolean canKeysChange, Function, Mono>> entriesReplacer) { return replaceRange(range, canKeysChange, entriesReplacer, false); } - Mono isRangeEmpty(@Nullable LLSnapshot snapshot, LLRange range); + Mono isRangeEmpty(@Nullable LLSnapshot snapshot, Mono range); - Mono sizeRange(@Nullable LLSnapshot snapshot, LLRange range, boolean fast); + Mono sizeRange(@Nullable LLSnapshot snapshot, Mono range, boolean fast); - Mono> getOne(@Nullable LLSnapshot snapshot, LLRange range); + Mono> getOne(@Nullable LLSnapshot snapshot, Mono range); - Mono getOneKey(@Nullable LLSnapshot snapshot, LLRange range); + Mono getOneKey(@Nullable LLSnapshot snapshot, Mono range); - Mono> removeOne(LLRange range); + Mono> removeOne(Mono range); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 33c0511..3363368 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -1,14 +1,15 @@ package it.cavallium.dbengine.database.disk; +import static io.netty.buffer.Unpooled.wrappedBuffer; + import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.util.ReferenceCounted; import it.cavallium.dbengine.client.BadBlock; -import it.cavallium.dbengine.database.Column; import it.cavallium.dbengine.client.DatabaseOptions; +import it.cavallium.dbengine.database.Column; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.ExtraKeyOperationResult; -import it.cavallium.dbengine.database.KeyOperationResult; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLRange; @@ -17,16 +18,13 @@ import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.RepeatedElementList; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateReturnMode; -import it.unimi.dsi.fastutil.booleans.BooleanArrayList; import it.unimi.dsi.fastutil.ints.IntArrayList; -import it.unimi.dsi.fastutil.objects.ObjectArrayList; import java.io.IOException; import java.nio.ByteBuffer; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -68,7 +66,6 @@ import reactor.core.scheduler.Scheduler; import reactor.util.function.Tuple2; import reactor.util.function.Tuple3; import reactor.util.function.Tuples; -import static io.netty.buffer.Unpooled.*; @NotAtomic public class LLLocalDictionary implements LLDictionary { @@ -237,39 +234,37 @@ public class LLLocalDictionary implements LLDictionary { return alloc; } - @Override - public Mono get(@Nullable LLSnapshot snapshot, ByteBuf key, boolean existsAlmostCertainly) { - try { - return Mono - .fromCallable(() -> { - StampedLock lock; - long stamp; - if (updateMode == UpdateMode.ALLOW) { - lock = itemsLock.getAt(getLockIndex(key)); + private Mono runOnDb(Callable<@Nullable T> callable) { + return Mono.fromCallable(callable).subscribeOn(dbScheduler); + } - stamp = lock.readLock(); - } else { - lock = null; - stamp = 0; + @Override + public Mono get(@Nullable LLSnapshot snapshot, Mono keyMono, boolean existsAlmostCertainly) { + return Mono.usingWhen(keyMono, + key -> runOnDb(() -> { + StampedLock lock; + long stamp; + if (updateMode == UpdateMode.ALLOW) { + lock = itemsLock.getAt(getLockIndex(key)); + + stamp = lock.readLock(); + } else { + lock = null; + stamp = 0; + } + try { + if (logger.isTraceEnabled()) { + logger.trace("Reading {}", LLUtils.toStringSafe(key)); } - try { - if (logger.isTraceEnabled()) { - logger.trace("Reading {}", LLUtils.toStringSafe(key)); - } - return dbGet(cfh, resolveSnapshot(snapshot), key.retain(), existsAlmostCertainly); - } finally { - if (updateMode == UpdateMode.ALLOW) { - lock.unlockRead(stamp); - } + return dbGet(cfh, resolveSnapshot(snapshot), key.retain(), existsAlmostCertainly); + } finally { + if (updateMode == UpdateMode.ALLOW) { + lock.unlockRead(stamp); } - }) - .subscribeOn(dbScheduler) - .onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause)) - .doFirst(key::retain) - .doAfterTerminate(key::release); - } finally { - key.release(); - } + } + }).onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause)), + key -> Mono.fromRunnable(key::release) + ); } private ByteBuf dbGet(ColumnFamilyHandle cfh, @@ -287,9 +282,7 @@ public class LLLocalDictionary implements LLDictionary { throw new RocksDBException("Key buffer must be direct"); } ByteBuffer keyNioBuffer = LLUtils.toDirect(key); - if (databaseOptions.enableDbAssertionsWhenUsingAssertions()) { - assert keyNioBuffer.isDirect(); - } + assert !databaseOptions.enableDbAssertionsWhenUsingAssertions() || keyNioBuffer.isDirect(); // Create a direct result buffer because RocksDB works only with direct buffers ByteBuf resultBuf = alloc.directBuffer(LLLocalDictionary.INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES); try { @@ -388,6 +381,7 @@ public class LLLocalDictionary implements LLDictionary { } } + @SuppressWarnings("SameParameterValue") private void dbPut(ColumnFamilyHandle cfh, @Nullable WriteOptions writeOptions, ByteBuf key, ByteBuf value) throws RocksDBException { try { @@ -399,15 +393,11 @@ public class LLLocalDictionary implements LLDictionary { throw new RocksDBException("Value buffer must be direct"); } var keyNioBuffer = LLUtils.toDirect(key); - if (databaseOptions.enableDbAssertionsWhenUsingAssertions()) { - assert keyNioBuffer.isDirect(); - } + assert !databaseOptions.enableDbAssertionsWhenUsingAssertions() || keyNioBuffer.isDirect(); var valueNioBuffer = LLUtils.toDirect(value); - if (databaseOptions.enableDbAssertionsWhenUsingAssertions()) { - assert valueNioBuffer.isDirect(); - } + assert !databaseOptions.enableDbAssertionsWhenUsingAssertions() || valueNioBuffer.isDirect(); db.put(cfh, Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS), keyNioBuffer, valueNioBuffer); } else { db.put(cfh, Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS), LLUtils.toArray(key), LLUtils.toArray(value)); @@ -419,162 +409,134 @@ public class LLLocalDictionary implements LLDictionary { } @Override - public Mono isRangeEmpty(@Nullable LLSnapshot snapshot, LLRange range) { - try { - return Mono - .defer(() -> { - if (range.isSingle()) { - return this.containsKey(snapshot, range.getSingle().retain()); - } else { - return this.containsRange(snapshot, range.retain()); - } - }) - .map(isContained -> !isContained) - .doFirst(range::retain) - .doAfterTerminate(range::release); - } finally { - range.release(); - } + public Mono isRangeEmpty(@Nullable LLSnapshot snapshot, Mono rangeMono) { + return Mono.usingWhen(rangeMono, + range -> { + if (range.isSingle()) { + return this.containsKey(snapshot, Mono.just(range.getSingle()).map(ByteBuf::retain)); + } else { + return this.containsRange(snapshot, Mono.just(range).map(LLRange::retain)); + } + }, + range -> Mono.fromRunnable(range::release) + ).map(isContained -> !isContained); } - public Mono containsRange(@Nullable LLSnapshot snapshot, LLRange range) { - try { - return Mono - .fromCallable(() -> { - try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { - readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); - readOpts.setFillCache(false); - if (range.hasMin()) { + public Mono containsRange(@Nullable LLSnapshot snapshot, Mono rangeMono) { + return Mono.usingWhen(rangeMono, + range -> runOnDb(() -> { + try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { + readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); + readOpts.setFillCache(false); + if (range.hasMin()) { + if (databaseOptions.allowNettyDirect() && range.getMin().isDirect()) { + readOpts.setIterateLowerBound(new DirectSlice(Objects.requireNonNull(LLUtils.toDirect(range.getMin()), + "This range must use direct buffers" + ))); + } else { + readOpts.setIterateLowerBound(new Slice(LLUtils.toArray(range.getMin()))); + } + } + if (range.hasMax()) { + if (databaseOptions.allowNettyDirect() && range.getMax().isDirect()) { + readOpts.setIterateUpperBound(new DirectSlice(Objects.requireNonNull(LLUtils.toDirect(range.getMax()), + "This range must use direct buffers" + ))); + } else { + readOpts.setIterateUpperBound(new Slice(LLUtils.toArray(range.getMax()))); + } + } + try (RocksIterator rocksIterator = db.newIterator(cfh, readOpts)) { + if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { if (databaseOptions.allowNettyDirect() && range.getMin().isDirect()) { - readOpts.setIterateLowerBound(new DirectSlice(Objects.requireNonNull(LLUtils.toDirect(range.getMin()), + rocksIterator.seek(Objects.requireNonNull(LLUtils.toDirect(range.getMin()), "This range must use direct buffers" - ))); + )); } else { - readOpts.setIterateLowerBound(new Slice(LLUtils.toArray(range.getMin()))); + rocksIterator.seek(LLUtils.toArray(range.getMin())); } + } else { + rocksIterator.seekToFirst(); } - if (range.hasMax()) { - if (databaseOptions.allowNettyDirect() && range.getMax().isDirect()) { - readOpts.setIterateUpperBound(new DirectSlice(Objects.requireNonNull(LLUtils.toDirect(range.getMax()), - "This range must use direct buffers" - ))); - } else { - readOpts.setIterateUpperBound(new Slice(LLUtils.toArray(range.getMax()))); - } - } - try (RocksIterator rocksIterator = db.newIterator(cfh, readOpts)) { - if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - if (databaseOptions.allowNettyDirect() && range.getMin().isDirect()) { - rocksIterator.seek(Objects.requireNonNull(LLUtils.toDirect(range.getMin()), - "This range must use direct buffers" - )); - } else { - rocksIterator.seek(LLUtils.toArray(range.getMin())); - } - } else { - rocksIterator.seekToFirst(); - } - rocksIterator.status(); - return rocksIterator.isValid(); - } + rocksIterator.status(); + return rocksIterator.isValid(); } - }) - .onErrorMap(cause -> new IOException("Failed to read range " + range.toString(), cause)) - .subscribeOn(dbScheduler) - .doFirst(range::retain) - .doAfterTerminate(range::release); - } finally { - range.release(); - } + } + }).onErrorMap(cause -> new IOException("Failed to read range " + range.toString(), cause)), + range -> Mono.fromRunnable(range::release)); } - private Mono containsKey(@Nullable LLSnapshot snapshot, ByteBuf key) { - try { - return Mono - .fromCallable(() -> { - StampedLock lock; - long stamp; - if (updateMode == UpdateMode.ALLOW) { - lock = itemsLock.getAt(getLockIndex(key)); + private Mono containsKey(@Nullable LLSnapshot snapshot, Mono keyMono) { + return Mono.usingWhen(keyMono, + key -> runOnDb(() -> { - stamp = lock.readLock(); - } else { - lock = null; - stamp = 0; - } - try { - int size = RocksDB.NOT_FOUND; - byte[] keyBytes = LLUtils.toArray(key); - Holder data = new Holder<>(); - var unmodifiableReadOpts = resolveSnapshot(snapshot); - if (db.keyMayExist(cfh, unmodifiableReadOpts, keyBytes, data)) { - if (data.getValue() != null) { - size = data.getValue().length; - } else { - size = db.get(cfh, unmodifiableReadOpts, keyBytes, NO_DATA); - } - } - return size != RocksDB.NOT_FOUND; - } finally { - if (updateMode == UpdateMode.ALLOW) { - lock.unlockRead(stamp); + StampedLock lock; + long stamp; + if (updateMode == UpdateMode.ALLOW) { + lock = itemsLock.getAt(getLockIndex(key)); + + stamp = lock.readLock(); + } else { + lock = null; + stamp = 0; + } + try { + int size = RocksDB.NOT_FOUND; + byte[] keyBytes = LLUtils.toArray(key); + Holder data = new Holder<>(); + var unmodifiableReadOpts = resolveSnapshot(snapshot); + if (db.keyMayExist(cfh, unmodifiableReadOpts, keyBytes, data)) { + if (data.getValue() != null) { + size = data.getValue().length; + } else { + size = db.get(cfh, unmodifiableReadOpts, keyBytes, NO_DATA); } } - }) - .onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause)) - .subscribeOn(dbScheduler) - .doFirst(key::retain) - .doAfterTerminate(key::release); - } finally { - key.release(); - } + return size != RocksDB.NOT_FOUND; + } finally { + if (updateMode == UpdateMode.ALLOW) { + lock.unlockRead(stamp); + } + } + }).onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause)), + key -> Mono.fromRunnable(key::release) + ); } @Override - public Mono put(ByteBuf key, ByteBuf value, LLDictionaryResultType resultType) { - try { - return Mono - .defer(() -> getPreviousData(key.retain(), resultType)) - .concatWith(Mono - .fromCallable(() -> { - StampedLock lock; - long stamp; - if (updateMode == UpdateMode.ALLOW) { - lock = itemsLock.getAt(getLockIndex(key)); - - stamp = lock.writeLock(); - } else { - lock = null; - stamp = 0; - } - try { - if (logger.isTraceEnabled()) { - logger.trace("Writing {}: {}", LLUtils.toStringSafe(key), LLUtils.toStringSafe(value)); - } - dbPut(cfh, null, key.retain(), value.retain()); - return null; - } finally { + public Mono put(Mono keyMono, Mono valueMono, LLDictionaryResultType resultType) { + return Mono.usingWhen(keyMono, + key -> this + .getPreviousData(Mono.just(key).map(ByteBuf::retain), resultType) + .concatWith(Mono.usingWhen(valueMono, + value -> this.runOnDb(() -> { + StampedLock lock; + long stamp; if (updateMode == UpdateMode.ALLOW) { - lock.unlockWrite(stamp); + lock = itemsLock.getAt(getLockIndex(key)); + + stamp = lock.writeLock(); + } else { + lock = null; + stamp = 0; } - } - }) - .subscribeOn(dbScheduler) - .onErrorMap(cause -> new IOException("Failed to write " + LLUtils.toStringSafe(key), cause)) - ) - .singleOrEmpty() - .doFirst(() -> { - key.retain(); - value.retain(); - }) - .doAfterTerminate(() -> { - key.release(); - value.release(); - }); - } finally { - key.release(); - value.release(); - } + try { + if (logger.isTraceEnabled()) { + logger.trace("Writing {}: {}", LLUtils.toStringSafe(key), LLUtils.toStringSafe(value)); + } + dbPut(cfh, null, key.retain(), value.retain()); + return null; + } finally { + if (updateMode == UpdateMode.ALLOW) { + lock.unlockWrite(stamp); + } + } + }), + value -> Mono.fromRunnable(value::release) + ).onErrorMap(cause -> new IOException("Failed to write " + LLUtils.toStringSafe(key), cause))) + .singleOrEmpty(), + key -> Mono.fromRunnable(key::release) + ); } @Override @@ -585,256 +547,243 @@ public class LLLocalDictionary implements LLDictionary { // Remember to change also updateAndGetDelta() if you are modifying this function @SuppressWarnings("DuplicatedCode") @Override - public Mono update(ByteBuf key, + public Mono update(Mono keyMono, Function<@Nullable ByteBuf, @Nullable ByteBuf> updater, UpdateReturnMode updateReturnMode, boolean existsAlmostCertainly) { - try { - return Mono - .fromCallable(() -> { - if (updateMode == UpdateMode.DISALLOW) { - throw new UnsupportedOperationException("update() is disallowed"); - } - StampedLock lock; - long stamp; - if (updateMode == UpdateMode.ALLOW) { - lock = itemsLock.getAt(getLockIndex(key)); + return Mono.usingWhen(keyMono, + key -> runOnDb(() -> { + if (updateMode == UpdateMode.DISALLOW) { + throw new UnsupportedOperationException("update() is disallowed"); + } + StampedLock lock; + long stamp; + if (updateMode == UpdateMode.ALLOW) { + lock = itemsLock.getAt(getLockIndex(key)); - stamp = lock.readLock(); - } else { - lock = null; - stamp = 0; + stamp = lock.readLock(); + } else { + lock = null; + stamp = 0; + } + try { + if (logger.isTraceEnabled()) { + logger.trace("Reading {}", LLUtils.toStringSafe(key)); } - try { - if (logger.isTraceEnabled()) { - logger.trace("Reading {}", LLUtils.toStringSafe(key)); - } - while (true) { - @Nullable ByteBuf prevData; - var prevDataHolder = existsAlmostCertainly ? null : new Holder(); - if (existsAlmostCertainly || db.keyMayExist(cfh, LLUtils.toArray(key), prevDataHolder)) { - if (!existsAlmostCertainly && prevDataHolder.getValue() != null) { - byte @Nullable [] prevDataBytes = prevDataHolder.getValue(); - if (prevDataBytes != null) { - prevData = wrappedBuffer(prevDataBytes); - } else { - prevData = null; - } + while (true) { + @Nullable ByteBuf prevData; + var prevDataHolder = existsAlmostCertainly ? null : new Holder(); + if (existsAlmostCertainly || db.keyMayExist(cfh, LLUtils.toArray(key), prevDataHolder)) { + if (!existsAlmostCertainly && prevDataHolder.getValue() != null) { + byte @Nullable [] prevDataBytes = prevDataHolder.getValue(); + if (prevDataBytes != null) { + prevData = wrappedBuffer(prevDataBytes); } else { - prevData = dbGet(cfh, null, key.retain(), existsAlmostCertainly); + prevData = null; } } else { - prevData = null; + prevData = dbGet(cfh, null, key.retain(), existsAlmostCertainly); } + } else { + prevData = null; + } + try { + @Nullable ByteBuf newData; + ByteBuf prevDataToSendToUpdater = prevData == null ? null : prevData.retainedSlice(); try { - @Nullable ByteBuf newData; - ByteBuf prevDataToSendToUpdater = prevData == null ? null : prevData.retainedSlice(); - try { - newData = updater.apply(prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.retain()); - if (!(prevDataToSendToUpdater == null - || prevDataToSendToUpdater.readerIndex() == 0 - || !prevDataToSendToUpdater.isReadable())) { - throw new IllegalStateException("The updater has read the previous data partially" - + " (read bytes: " + prevDataToSendToUpdater.readerIndex() - + " unread bytes: " + prevDataToSendToUpdater.readableBytes() + ")." - + " The only allowed options are reading the data fully or not reading it at all"); - } - } finally { - if (prevDataToSendToUpdater != null) { - prevDataToSendToUpdater.release(); - } - } - try { - if (prevData != null && newData == null) { - //noinspection DuplicatedCode - if (updateMode == UpdateMode.ALLOW) { - var ws = lock.tryConvertToWriteLock(stamp); - if (ws != 0) { - stamp = ws; - } else { - lock.unlockRead(stamp); - - stamp = lock.writeLock(); - continue; - } - } - if (logger.isTraceEnabled()) { - logger.trace("Deleting {}", LLUtils.toStringSafe(key)); - } - dbDelete(cfh, null, key.retain()); - } else if (newData != null - && (prevData == null || !LLUtils.equals(prevData, newData))) { - //noinspection DuplicatedCode - if (updateMode == UpdateMode.ALLOW) { - var ws = lock.tryConvertToWriteLock(stamp); - if (ws != 0) { - stamp = ws; - } else { - lock.unlockRead(stamp); - - stamp = lock.writeLock(); - continue; - } - } - if (logger.isTraceEnabled()) { - logger.trace("Writing {}: {}", LLUtils.toStringSafe(key), LLUtils.toStringSafe(newData)); - } - dbPut(cfh, null, key.retain(), newData.retain()); - } - return switch (updateReturnMode) { - case GET_NEW_VALUE -> newData != null ? newData.retain() : null; - case GET_OLD_VALUE -> prevData != null ? prevData.retain() : null; - case NOTHING -> null; - //noinspection UnnecessaryDefault - default -> throw new IllegalArgumentException(); - }; - } finally { - if (newData != null) { - newData.release(); - } + newData = updater.apply(prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.retain()); + if (!(prevDataToSendToUpdater == null + || prevDataToSendToUpdater.readerIndex() == 0 + || !prevDataToSendToUpdater.isReadable())) { + throw new IllegalStateException("The updater has read the previous data partially" + + " (read bytes: " + prevDataToSendToUpdater.readerIndex() + + " unread bytes: " + prevDataToSendToUpdater.readableBytes() + ")." + + " The only allowed options are reading the data fully or not reading it at all"); } } finally { - if (prevData != null) { - prevData.release(); + if (prevDataToSendToUpdater != null) { + prevDataToSendToUpdater.release(); } } - } - } finally { - if (updateMode == UpdateMode.ALLOW) { - lock.unlock(stamp); + try { + if (prevData != null && newData == null) { + //noinspection DuplicatedCode + if (updateMode == UpdateMode.ALLOW) { + var ws = lock.tryConvertToWriteLock(stamp); + if (ws != 0) { + stamp = ws; + } else { + lock.unlockRead(stamp); + + stamp = lock.writeLock(); + continue; + } + } + if (logger.isTraceEnabled()) { + logger.trace("Deleting {}", LLUtils.toStringSafe(key)); + } + dbDelete(cfh, null, key.retain()); + } else if (newData != null + && (prevData == null || !LLUtils.equals(prevData, newData))) { + //noinspection DuplicatedCode + if (updateMode == UpdateMode.ALLOW) { + var ws = lock.tryConvertToWriteLock(stamp); + if (ws != 0) { + stamp = ws; + } else { + lock.unlockRead(stamp); + + stamp = lock.writeLock(); + continue; + } + } + if (logger.isTraceEnabled()) { + logger.trace("Writing {}: {}", LLUtils.toStringSafe(key), LLUtils.toStringSafe(newData)); + } + dbPut(cfh, null, key.retain(), newData.retain()); + } + return switch (updateReturnMode) { + case GET_NEW_VALUE -> newData != null ? newData.retain() : null; + case GET_OLD_VALUE -> prevData != null ? prevData.retain() : null; + case NOTHING -> null; + //noinspection UnnecessaryDefault + default -> throw new IllegalArgumentException(); + }; + } finally { + if (newData != null) { + newData.release(); + } + } + } finally { + if (prevData != null) { + prevData.release(); + } } } - }) - .onErrorMap(cause -> new IOException("Failed to read or write " + LLUtils.toStringSafe(key), cause)) - .subscribeOn(dbScheduler) - .doFirst(key::retain) - .doAfterTerminate(key::release); - } finally { - key.release(); - } + } finally { + if (updateMode == UpdateMode.ALLOW) { + lock.unlock(stamp); + } + } + }).onErrorMap(cause -> new IOException("Failed to read or write " + LLUtils.toStringSafe(key), cause)), + key -> Mono.fromRunnable(key::release) + ); } // Remember to change also update() if you are modifying this function @SuppressWarnings("DuplicatedCode") @Override - public Mono> updateAndGetDelta(ByteBuf key, + public Mono> updateAndGetDelta(Mono keyMono, Function<@Nullable ByteBuf, @Nullable ByteBuf> updater, boolean existsAlmostCertainly) { - try { - return Mono - .fromCallable(() -> { - if (updateMode == UpdateMode.DISALLOW) throw new UnsupportedOperationException("update() is disallowed"); - StampedLock lock; - long stamp; - if (updateMode == UpdateMode.ALLOW) { - lock = itemsLock.getAt(getLockIndex(key)); + return Mono.usingWhen(keyMono, + key -> this.runOnDb(() -> { + if (updateMode == UpdateMode.DISALLOW) throw new UnsupportedOperationException("update() is disallowed"); + StampedLock lock; + long stamp; + if (updateMode == UpdateMode.ALLOW) { + lock = itemsLock.getAt(getLockIndex(key)); - stamp = lock.readLock(); - } else { - lock = null; - stamp = 0; + stamp = lock.readLock(); + } else { + lock = null; + stamp = 0; + } + try { + if (logger.isTraceEnabled()) { + logger.trace("Reading {}", LLUtils.toStringSafe(key)); } - try { - if (logger.isTraceEnabled()) { - logger.trace("Reading {}", LLUtils.toStringSafe(key)); - } - while (true) { - @Nullable ByteBuf prevData; - var prevDataHolder = existsAlmostCertainly ? null : new Holder(); - if (existsAlmostCertainly || db.keyMayExist(cfh, LLUtils.toArray(key), prevDataHolder)) { - if (!existsAlmostCertainly && prevDataHolder.getValue() != null) { - byte @Nullable [] prevDataBytes = prevDataHolder.getValue(); - if (prevDataBytes != null) { - prevData = wrappedBuffer(prevDataBytes); - } else { - prevData = null; - } + while (true) { + @Nullable ByteBuf prevData; + var prevDataHolder = existsAlmostCertainly ? null : new Holder(); + if (existsAlmostCertainly || db.keyMayExist(cfh, LLUtils.toArray(key), prevDataHolder)) { + if (!existsAlmostCertainly && prevDataHolder.getValue() != null) { + byte @Nullable [] prevDataBytes = prevDataHolder.getValue(); + if (prevDataBytes != null) { + prevData = wrappedBuffer(prevDataBytes); } else { - prevData = dbGet(cfh, null, key.retain(), existsAlmostCertainly); + prevData = null; } } else { - prevData = null; + prevData = dbGet(cfh, null, key.retain(), existsAlmostCertainly); + } + } else { + prevData = null; + } + try { + @Nullable ByteBuf newData; + ByteBuf prevDataToSendToUpdater = prevData == null ? null : prevData.retainedSlice(); + try { + newData = updater.apply(prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.retain()); + assert !databaseOptions.enableDbAssertionsWhenUsingAssertions() + || prevDataToSendToUpdater == null + || prevDataToSendToUpdater.readerIndex() == 0 + || !prevDataToSendToUpdater.isReadable(); + } finally { + if (prevDataToSendToUpdater != null) { + prevDataToSendToUpdater.release(); + } } try { - @Nullable ByteBuf newData; - ByteBuf prevDataToSendToUpdater = prevData == null ? null : prevData.retainedSlice(); - try { - newData = updater.apply(prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.retain()); - if (databaseOptions.enableDbAssertionsWhenUsingAssertions()) { - assert prevDataToSendToUpdater == null - || prevDataToSendToUpdater.readerIndex() == 0 - || !prevDataToSendToUpdater.isReadable(); - } - } finally { - if (prevDataToSendToUpdater != null) { - prevDataToSendToUpdater.release(); - } - } - try { - if (prevData != null && newData == null) { - //noinspection DuplicatedCode - if (updateMode == UpdateMode.ALLOW) { - var ws = lock.tryConvertToWriteLock(stamp); - if (ws != 0) { - stamp = ws; - } else { - lock.unlockRead(stamp); + if (prevData != null && newData == null) { + //noinspection DuplicatedCode + if (updateMode == UpdateMode.ALLOW) { + var ws = lock.tryConvertToWriteLock(stamp); + if (ws != 0) { + stamp = ws; + } else { + lock.unlockRead(stamp); - stamp = lock.writeLock(); - continue; - } + stamp = lock.writeLock(); + continue; } - if (logger.isTraceEnabled()) { - logger.trace("Deleting {}", LLUtils.toStringSafe(key)); - } - dbDelete(cfh, null, key.retain()); - } else if (newData != null - && (prevData == null || !LLUtils.equals(prevData, newData))) { - //noinspection DuplicatedCode - if (updateMode == UpdateMode.ALLOW) { - var ws = lock.tryConvertToWriteLock(stamp); - if (ws != 0) { - stamp = ws; - } else { - lock.unlockRead(stamp); + } + if (logger.isTraceEnabled()) { + logger.trace("Deleting {}", LLUtils.toStringSafe(key)); + } + dbDelete(cfh, null, key.retain()); + } else if (newData != null + && (prevData == null || !LLUtils.equals(prevData, newData))) { + //noinspection DuplicatedCode + if (updateMode == UpdateMode.ALLOW) { + var ws = lock.tryConvertToWriteLock(stamp); + if (ws != 0) { + stamp = ws; + } else { + lock.unlockRead(stamp); - stamp = lock.writeLock(); - continue; - } + stamp = lock.writeLock(); + continue; } - if (logger.isTraceEnabled()) { - logger.trace("Writing {}: {}", LLUtils.toStringSafe(key), LLUtils.toStringSafe(newData)); - } - dbPut(cfh, null, key.retain(), newData.retain()); } - return new Delta<>( - prevData != null ? prevData.retain() : null, - newData != null ? newData.retain() : null - ); - } finally { - if (newData != null) { - newData.release(); + if (logger.isTraceEnabled()) { + logger.trace("Writing {}: {}", LLUtils.toStringSafe(key), LLUtils.toStringSafe(newData)); } + dbPut(cfh, null, key.retain(), newData.retain()); } + return new Delta<>( + prevData != null ? prevData.retain() : null, + newData != null ? newData.retain() : null + ); } finally { - if (prevData != null) { - prevData.release(); + if (newData != null) { + newData.release(); } } - } - } finally { - if (updateMode == UpdateMode.ALLOW) { - lock.unlock(stamp); + } finally { + if (prevData != null) { + prevData.release(); + } } } - }) - .onErrorMap(cause -> new IOException("Failed to read or write " + LLUtils.toStringSafe(key), cause)) - .subscribeOn(dbScheduler) - .doFirst(key::retain) - .doAfterTerminate(key::release); - } finally { - key.release(); - } + } finally { + if (updateMode == UpdateMode.ALLOW) { + lock.unlock(stamp); + } + } + }).onErrorMap(cause -> new IOException("Failed to read or write " + LLUtils.toStringSafe(key), cause)), + key -> Mono.fromRunnable(key::release) + ); } private void dbDelete(ColumnFamilyHandle cfh, @Nullable WriteOptions writeOptions, ByteBuf key) @@ -855,106 +804,93 @@ public class LLLocalDictionary implements LLDictionary { } @Override - public Mono remove(ByteBuf key, LLDictionaryResultType resultType) { - try { - return Mono - .defer(() -> getPreviousData(key.retain(), resultType)) - .concatWith(Mono - .fromCallable(() -> { - StampedLock lock; - long stamp; - if (updateMode == UpdateMode.ALLOW) { - lock = itemsLock.getAt(getLockIndex(key)); - - stamp = lock.writeLock(); - } else { - lock = null; - stamp = 0; - } - try { - if (logger.isTraceEnabled()) { - logger.trace("Deleting {}", LLUtils.toStringSafe(key)); - } - dbDelete(cfh, null, key.retain()); - return null; - } finally { - if (updateMode == UpdateMode.ALLOW) { - lock.unlockWrite(stamp); - } - } - }) - .onErrorMap(cause -> new IOException("Failed to delete " + LLUtils.toStringSafe(key), cause)) - .subscribeOn(dbScheduler) - .then(Mono.empty()) - ) - .singleOrEmpty() - .doFirst(key::retain) - .doAfterTerminate(key::release); - } finally { - key.release(); - } - } - - private Mono getPreviousData(ByteBuf key, LLDictionaryResultType resultType) { - try { - return Mono - .defer(() -> switch (resultType) { - case PREVIOUS_VALUE_EXISTENCE -> this - .containsKey(null, key.retain()) - .single() - .map(LLUtils::booleanToResponseByteBuffer) - .doAfterTerminate(() -> { - if (databaseOptions.enableDbAssertionsWhenUsingAssertions()) { - assert key.refCnt() > 0; - } - }); - case PREVIOUS_VALUE -> Mono - .fromCallable(() -> { + public Mono remove(Mono keyMono, LLDictionaryResultType resultType) { + return Mono.usingWhen(keyMono, + key -> this + .getPreviousData(Mono.just(key).map(ByteBuf::retain), resultType) + .concatWith(this + .runOnDb(() -> { StampedLock lock; long stamp; if (updateMode == UpdateMode.ALLOW) { lock = itemsLock.getAt(getLockIndex(key)); - - stamp = lock.readLock(); + + stamp = lock.writeLock(); } else { lock = null; stamp = 0; } try { if (logger.isTraceEnabled()) { - logger.trace("Reading {}", LLUtils.toArray(key)); - } - var data = new Holder(); - if (db.keyMayExist(cfh, LLUtils.toArray(key), data)) { - if (data.getValue() != null) { - return wrappedBuffer(data.getValue()); - } else { - try { - return dbGet(cfh, null, key.retain(), true); - } finally { - if (databaseOptions.enableDbAssertionsWhenUsingAssertions()) { - assert key.refCnt() > 0; - } - } - } - } else { - return null; + logger.trace("Deleting {}", LLUtils.toStringSafe(key)); } + dbDelete(cfh, null, key.retain()); + return null; } finally { if (updateMode == UpdateMode.ALLOW) { - lock.unlockRead(stamp); + lock.unlockWrite(stamp); } } }) - .onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause)) - .subscribeOn(dbScheduler); - case VOID -> Mono.empty(); - }) - .doFirst(key::retain) - .doAfterTerminate(key::release); - } finally { - key.release(); - } + .onErrorMap(cause -> new IOException("Failed to delete " + LLUtils.toStringSafe(key), cause)) + ) + .singleOrEmpty(), + key -> Mono.fromCallable(key::release)); + } + + private Mono getPreviousData(Mono keyMono, LLDictionaryResultType resultType) { + return Mono + .usingWhen(keyMono, + key -> switch (resultType) { + case PREVIOUS_VALUE_EXISTENCE -> this + .containsKey(null, Mono.just(key).map(ByteBuf::retain)) + .single() + .map(LLUtils::booleanToResponseByteBuffer) + .doAfterTerminate(() -> { + assert !databaseOptions.enableDbAssertionsWhenUsingAssertions() || key.refCnt() > 0; + }); + case PREVIOUS_VALUE -> Mono + .fromCallable(() -> { + StampedLock lock; + long stamp; + if (updateMode == UpdateMode.ALLOW) { + lock = itemsLock.getAt(getLockIndex(key)); + + stamp = lock.readLock(); + } else { + lock = null; + stamp = 0; + } + try { + if (logger.isTraceEnabled()) { + logger.trace("Reading {}", LLUtils.toArray(key)); + } + var data = new Holder(); + if (db.keyMayExist(cfh, LLUtils.toArray(key), data)) { + if (data.getValue() != null) { + return wrappedBuffer(data.getValue()); + } else { + try { + return dbGet(cfh, null, key.retain(), true); + } finally { + assert !databaseOptions.enableDbAssertionsWhenUsingAssertions() || key.refCnt() > 0; + } + } + } else { + return null; + } + } finally { + if (updateMode == UpdateMode.ALLOW) { + lock.unlockRead(stamp); + } + } + }) + .onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause)) + .subscribeOn(dbScheduler); + case VOID -> Mono.empty(); + }, + key -> Mono.fromRunnable(key::release) + ); } @Override @@ -1284,433 +1220,406 @@ public class LLLocalDictionary implements LLDictionary { @Override public Flux> getRange(@Nullable LLSnapshot snapshot, - LLRange range, + Mono rangeMono, boolean existsAlmostCertainly) { - try { - return Flux - .defer(() -> { - if (range.isSingle()) { - return getRangeSingle(snapshot, range.getMin().retain(), existsAlmostCertainly); - } else { - return getRangeMulti(snapshot, range.retain()); - } - }) - .doFirst(range::retain) - .doAfterTerminate(range::release); - } finally { - range.release(); - } + return Flux.usingWhen(rangeMono, + range -> { + if (range.isSingle()) { + return getRangeSingle(snapshot, Mono.just(range.getMin()).map(ByteBuf::retain), existsAlmostCertainly); + } else { + return getRangeMulti(snapshot, Mono.just(range).map(LLRange::retain)); + } + }, + range -> Mono.fromRunnable(range::release) + ); } @Override public Flux>> getRangeGrouped(@Nullable LLSnapshot snapshot, - LLRange range, + Mono rangeMono, int prefixLength, boolean existsAlmostCertainly) { - try { - return Flux - .defer(() -> { - if (range.isSingle()) { - return getRangeSingle(snapshot, range.getMin().retain(), existsAlmostCertainly).map(List::of); - } else { - return getRangeMultiGrouped(snapshot, range.retain(), prefixLength); - } - }) - .doFirst(range::retain) - .doAfterTerminate(range::release); - } finally { - range.release(); - } - } - - private Flux> getRangeSingle(LLSnapshot snapshot, ByteBuf key, boolean existsAlmostCertainly) { - try { - return Mono - .defer(() -> this.get(snapshot, key.retain(), existsAlmostCertainly)) - .map(value -> Map.entry(key.retain(), value)) - .flux() - .doFirst(key::retain) - .doAfterTerminate(key::release); - } finally { - key.release(); - } - } - - @SuppressWarnings("Convert2MethodRef") - private Flux> getRangeMulti(LLSnapshot snapshot, LLRange range) { - try { - return Flux - .using( - () -> new LLLocalEntryReactiveRocksIterator(db, - alloc, - cfh, - range.retain(), - databaseOptions.allowNettyDirect(), - resolveSnapshot(snapshot), - getRangeMultiDebugName - ), - llLocalEntryReactiveRocksIterator -> llLocalEntryReactiveRocksIterator.flux(), - LLLocalReactiveRocksIterator::release - ) - .doOnDiscard(Entry.class, entry -> { - //noinspection unchecked - var castedEntry = (Entry) entry; - castedEntry.getKey().release(); - castedEntry.getValue().release(); - }) - .subscribeOn(dbScheduler) - .doFirst(range::retain) - .doAfterTerminate(range::release); - } finally { - range.release(); - } - } - - @SuppressWarnings("Convert2MethodRef") - private Flux>> getRangeMultiGrouped(LLSnapshot snapshot, LLRange range, int prefixLength) { - try { - return Flux - .using( - () -> new LLLocalGroupedEntryReactiveRocksIterator(db, - alloc, - cfh, - prefixLength, - range.retain(), - databaseOptions.allowNettyDirect(), - resolveSnapshot(snapshot), - "getRangeMultiGrouped" - ), - llLocalGroupedEntryReactiveRocksIterator -> llLocalGroupedEntryReactiveRocksIterator.flux(), - LLLocalGroupedReactiveRocksIterator::release - ) - .subscribeOn(dbScheduler) - .doFirst(range::retain) - .doAfterTerminate(range::release); - } finally { - range.release(); - } - } - - @Override - public Flux getRangeKeys(@Nullable LLSnapshot snapshot, LLRange range) { - try { - return Flux - .defer(() -> { - if (range.isSingle()) { - return this.getRangeKeysSingle(snapshot, range.getMin().retain()); - } else { - return this.getRangeKeysMulti(snapshot, range.retain()); - } - }) - .doFirst(range::retain) - .doAfterTerminate(range::release); - } finally { - range.release(); - } - } - - @Override - public Flux> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) { - try { - return Flux - .using( - () -> new LLLocalGroupedKeyReactiveRocksIterator(db, - alloc, - cfh, - prefixLength, - range.retain(), - databaseOptions.allowNettyDirect(), - resolveSnapshot(snapshot), - "getRangeKeysGrouped" - ), - LLLocalGroupedReactiveRocksIterator::flux, - LLLocalGroupedReactiveRocksIterator::release - ) - .subscribeOn(dbScheduler) - .doFirst(range::retain) - .doAfterTerminate(range::release); - } finally { - range.release(); - } - } - - @Override - public Flux badBlocks(LLRange range) { - return Flux - .create(sink -> { - try (var ro = new ReadOptions(getReadOptions(null))) { - ro.setFillCache(false); - if (!range.isSingle()) { - ro.setReadaheadSize(32 * 1024); - } - ro.setVerifyChecksums(true); - var rocksIteratorTuple = getRocksIterator(databaseOptions.allowNettyDirect(), ro, range.retain(), db, cfh); - try { - try (var rocksIterator = rocksIteratorTuple.getT1()) { - rocksIterator.seekToFirst(); - rocksIterator.status(); - while (rocksIterator.isValid() && !sink.isCancelled()) { - try { - rocksIterator.status(); - rocksIterator.key(DUMMY_WRITE_ONLY_BYTE_BUFFER); - rocksIterator.status(); - rocksIterator.value(DUMMY_WRITE_ONLY_BYTE_BUFFER); - rocksIterator.status(); - } catch (RocksDBException ex) { - sink.next(new BadBlock(databaseName, Column.special(columnName), null, ex)); - } - rocksIterator.next(); - } - } - } finally { - rocksIteratorTuple.getT2().release(); - rocksIteratorTuple.getT3().release(); - } - sink.complete(); - } catch (Throwable ex) { - sink.error(ex); + return Flux.usingWhen(rangeMono, + range -> { + if (range.isSingle()) { + var rangeSingleMono = Mono.just(range.getMin()).map(ByteBuf::retain); + return getRangeSingle(snapshot, rangeSingleMono, existsAlmostCertainly).map(List::of); + } else { + return getRangeMultiGrouped(snapshot, Mono.just(range).map(LLRange::retain), prefixLength); } - }) - .subscribeOn(dbScheduler) - .doFirst(range::retain) - .doAfterTerminate(range::release); + }, + range -> Mono.fromRunnable(range::release) + ); + } + + private Flux> getRangeSingle(LLSnapshot snapshot, + Mono keyMono, + boolean existsAlmostCertainly) { + return Flux.usingWhen(keyMono, + key -> this + .get(snapshot, Mono.just(key).map(ByteBuf::retain), existsAlmostCertainly) + .map(value -> Map.entry(key.retain(), value)), + key -> Mono.fromRunnable(key::release) + ); + } + + private Flux> getRangeMulti(LLSnapshot snapshot, Mono rangeMono) { + return Flux.usingWhen(rangeMono, + range -> Flux + .using( + () -> new LLLocalEntryReactiveRocksIterator(db, + alloc, + cfh, + range.retain(), + databaseOptions.allowNettyDirect(), + resolveSnapshot(snapshot), + getRangeMultiDebugName + ), + llLocalEntryReactiveRocksIterator -> llLocalEntryReactiveRocksIterator + .flux() + .subscribeOn(dbScheduler), + LLLocalReactiveRocksIterator::release + ), + range -> Mono.fromRunnable(range::release) + ); + } + + private Flux>> getRangeMultiGrouped(LLSnapshot snapshot, Mono rangeMono, int prefixLength) { + return Flux.usingWhen(rangeMono, + range -> Flux + .using( + () -> new LLLocalGroupedEntryReactiveRocksIterator(db, + alloc, + cfh, + prefixLength, + range.retain(), + databaseOptions.allowNettyDirect(), + resolveSnapshot(snapshot), + "getRangeMultiGrouped" + ), + reactiveRocksIterator -> reactiveRocksIterator + .flux() + .subscribeOn(dbScheduler), + LLLocalGroupedReactiveRocksIterator::release + ), + range -> Mono.fromRunnable(range::release) + ); } @Override - public Flux getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) { - try { - return Flux - .using( - () -> new LLLocalKeyPrefixReactiveRocksIterator(db, - alloc, - cfh, - prefixLength, - range.retain(), - databaseOptions.allowNettyDirect(), - resolveSnapshot(snapshot), - true, - "getRangeKeysGrouped" - ), - LLLocalKeyPrefixReactiveRocksIterator::flux, - LLLocalKeyPrefixReactiveRocksIterator::release - ) - .subscribeOn(dbScheduler) - .doFirst(range::retain) - .doAfterTerminate(range::release); - } finally { - range.release(); - } + public Flux getRangeKeys(@Nullable LLSnapshot snapshot, Mono rangeMono) { + return Flux.usingWhen(rangeMono, + range -> { + if (range.isSingle()) { + return this.getRangeKeysSingle(snapshot, Mono.just(range.getMin()).map(ByteBuf::retain)); + } else { + return this.getRangeKeysMulti(snapshot, Mono.just(range).map(LLRange::retain)); + } + }, + range -> Mono.fromRunnable(range::release) + ); } - private Flux getRangeKeysSingle(LLSnapshot snapshot, ByteBuf key) { - try { - return Mono - .defer(() -> this.containsKey(snapshot, key.retain())) - .flux() - .handle((contains, sink) -> { - if (contains) { - sink.next(key.retain()); - } else { - sink.complete(); - } - }) - .doOnDiscard(ByteBuf.class, ReferenceCounted::release) - .doFirst(key::retain) - .doAfterTerminate(key::release); - } finally { - key.release(); - } + @Override + public Flux> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, + Mono rangeMono, + int prefixLength) { + return Flux.usingWhen(rangeMono, + range -> Flux + .using( + () -> new LLLocalGroupedKeyReactiveRocksIterator(db, + alloc, + cfh, + prefixLength, + range.retain(), + databaseOptions.allowNettyDirect(), + resolveSnapshot(snapshot), + "getRangeKeysGrouped" + ), reactiveRocksIterator -> reactiveRocksIterator.flux() + .subscribeOn(dbScheduler), + LLLocalGroupedReactiveRocksIterator::release + ), + range -> Mono.fromRunnable(range::release) + ); + } + + @Override + public Flux badBlocks(Mono rangeMono) { + return Flux.usingWhen(rangeMono, + range -> Flux + .create(sink -> { + try (var ro = new ReadOptions(getReadOptions(null))) { + ro.setFillCache(false); + if (!range.isSingle()) { + ro.setReadaheadSize(32 * 1024); + } + ro.setVerifyChecksums(true); + var rocksIteratorTuple = getRocksIterator(databaseOptions.allowNettyDirect(), ro, range.retain(), db, cfh); + try { + try (var rocksIterator = rocksIteratorTuple.getT1()) { + rocksIterator.seekToFirst(); + rocksIterator.status(); + while (rocksIterator.isValid() && !sink.isCancelled()) { + try { + rocksIterator.status(); + rocksIterator.key(DUMMY_WRITE_ONLY_BYTE_BUFFER); + rocksIterator.status(); + rocksIterator.value(DUMMY_WRITE_ONLY_BYTE_BUFFER); + rocksIterator.status(); + } catch (RocksDBException ex) { + sink.next(new BadBlock(databaseName, Column.special(columnName), null, ex)); + } + rocksIterator.next(); + } + } + } finally { + rocksIteratorTuple.getT2().release(); + rocksIteratorTuple.getT3().release(); + } + sink.complete(); + } catch (Throwable ex) { + sink.error(ex); + } + }) + .subscribeOn(dbScheduler), + range -> Mono.fromRunnable(range::release) + ); + } + + @Override + public Flux getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, Mono rangeMono, int prefixLength) { + return Flux.usingWhen(rangeMono, + range -> Flux + .using( + () -> new LLLocalKeyPrefixReactiveRocksIterator(db, + alloc, + cfh, + prefixLength, + range.retain(), + databaseOptions.allowNettyDirect(), + resolveSnapshot(snapshot), + true, + "getRangeKeysGrouped" + ), + LLLocalKeyPrefixReactiveRocksIterator::flux, + LLLocalKeyPrefixReactiveRocksIterator::release + ) + .subscribeOn(dbScheduler), + range -> Mono.fromRunnable(range::release) + ); + } + + private Flux getRangeKeysSingle(LLSnapshot snapshot, Mono keyMono) { + return Flux.usingWhen(keyMono, + key -> this + .containsKey(snapshot, Mono.just(key).map(ByteBuf::retain)) + .flux() + .handle((contains, sink) -> { + if (contains) { + sink.next(key.retain()); + } else { + sink.complete(); + } + }) + .doOnDiscard(ByteBuf.class, ReferenceCounted::release), + key -> Mono.fromRunnable(key::release) + ); } @SuppressWarnings("Convert2MethodRef") - private Flux getRangeKeysMulti(LLSnapshot snapshot, LLRange range) { - try { - return Flux - .using( - () -> new LLLocalKeyReactiveRocksIterator(db, - alloc, - cfh, - range.retain(), - databaseOptions.allowNettyDirect(), - resolveSnapshot(snapshot), - getRangeKeysMultiDebugName - ), - llLocalKeyReactiveRocksIterator -> llLocalKeyReactiveRocksIterator.flux(), - LLLocalReactiveRocksIterator::release - ) - .doOnDiscard(ByteBuf.class, ReferenceCounted::release) - .subscribeOn(dbScheduler) - .doFirst(range::retain) - .doAfterTerminate(range::release); - } finally { - range.release(); - } + private Flux getRangeKeysMulti(LLSnapshot snapshot, Mono rangeMono) { + return Flux.usingWhen(rangeMono, + range -> Flux + .using( + () -> new LLLocalKeyReactiveRocksIterator(db, + alloc, + cfh, + range.retain(), + databaseOptions.allowNettyDirect(), + resolveSnapshot(snapshot), + getRangeKeysMultiDebugName + ), + llLocalKeyReactiveRocksIterator -> llLocalKeyReactiveRocksIterator.flux(), + LLLocalReactiveRocksIterator::release + ) + .doOnDiscard(ByteBuf.class, ReferenceCounted::release) + .subscribeOn(dbScheduler), + range -> Mono.fromRunnable(range::release) + ); } @Override - public Mono setRange(LLRange range, Flux> entries) { - try { - if (USE_WINDOW_IN_SET_RANGE) { - return Mono - .fromCallable(() -> { - if (!USE_WRITE_BATCH_IN_SET_RANGE_DELETE || !USE_WRITE_BATCHES_IN_SET_RANGE) { - try (var opts = new ReadOptions(EMPTY_READ_OPTIONS)) { - ReleasableSlice minBound; - if (range.hasMin()) { - minBound = setIterateBound(databaseOptions.allowNettyDirect(), - opts, - IterateBound.LOWER, - range.getMin().retain() - ); - } else { - minBound = emptyReleasableSlice(); - } - try { - ReleasableSlice maxBound; - if (range.hasMax()) { - maxBound = setIterateBound(databaseOptions.allowNettyDirect(), - opts, - IterateBound.UPPER, - range.getMax().retain() - ); - } else { - maxBound = emptyReleasableSlice(); - } - try (RocksIterator it = db.newIterator(cfh, opts)) { - if (!PREFER_SEEK_TO_FIRST && range.hasMin()) { - rocksIterSeekTo(databaseOptions.allowNettyDirect(), it, range.getMin().retain()); + public Mono setRange(Mono rangeMono, Flux> entries) { + return Mono.usingWhen(rangeMono, + range -> { + if (USE_WINDOW_IN_SET_RANGE) { + return Mono + .fromCallable(() -> { + if (!USE_WRITE_BATCH_IN_SET_RANGE_DELETE || !USE_WRITE_BATCHES_IN_SET_RANGE) { + try (var opts = new ReadOptions(EMPTY_READ_OPTIONS)) { + ReleasableSlice minBound; + if (range.hasMin()) { + minBound = setIterateBound(databaseOptions.allowNettyDirect(), + opts, + IterateBound.LOWER, + range.getMin().retain() + ); } else { - it.seekToFirst(); + minBound = emptyReleasableSlice(); } - it.status(); - while (it.isValid()) { - db.delete(cfh, it.key()); - it.next(); - it.status(); + try { + ReleasableSlice maxBound; + if (range.hasMax()) { + maxBound = setIterateBound(databaseOptions.allowNettyDirect(), + opts, + IterateBound.UPPER, + range.getMax().retain() + ); + } else { + maxBound = emptyReleasableSlice(); + } + try (RocksIterator it = db.newIterator(cfh, opts)) { + if (!PREFER_SEEK_TO_FIRST && range.hasMin()) { + rocksIterSeekTo(databaseOptions.allowNettyDirect(), it, range.getMin().retain()); + } else { + it.seekToFirst(); + } + it.status(); + while (it.isValid()) { + db.delete(cfh, it.key()); + it.next(); + it.status(); + } + } finally { + maxBound.release(); + } + } finally { + minBound.release(); } - } finally { - maxBound.release(); } - } finally { - minBound.release(); - } - } - } else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) { - try (var batch = new CappedWriteBatch(db, - CAPPED_WRITE_BATCH_CAP, - RESERVED_WRITE_BATCH_SIZE, - MAX_WRITE_BATCH_SIZE, - BATCH_WRITE_OPTIONS - )) { - if (range.isSingle()) { - batch.delete(cfh, range.getSingle().retain()); + } else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) { + try (var batch = new CappedWriteBatch(db, + CAPPED_WRITE_BATCH_CAP, + RESERVED_WRITE_BATCH_SIZE, + MAX_WRITE_BATCH_SIZE, + BATCH_WRITE_OPTIONS + )) { + if (range.isSingle()) { + batch.delete(cfh, range.getSingle().retain()); + } else { + deleteSmallRangeWriteBatch(batch, range.retain()); + } + batch.writeToDbAndClose(); + } } else { - deleteSmallRangeWriteBatch(batch, range.retain()); + try (var batch = new WriteBatch(RESERVED_WRITE_BATCH_SIZE)) { + if (range.isSingle()) { + batch.delete(cfh, LLUtils.toArray(range.getSingle())); + } else { + deleteSmallRangeWriteBatch(batch, range.retain()); + } + db.write(EMPTY_WRITE_OPTIONS, batch); + batch.clear(); + } } - batch.writeToDbAndClose(); - } - } else { - try (var batch = new WriteBatch(RESERVED_WRITE_BATCH_SIZE)) { - if (range.isSingle()) { - batch.delete(cfh, LLUtils.toArray(range.getSingle())); - } else { - deleteSmallRangeWriteBatch(batch, range.retain()); - } - db.write(EMPTY_WRITE_OPTIONS, batch); - batch.clear(); - } - } - return null; - }) - .subscribeOn(dbScheduler) - .thenMany(entries - .window(MULTI_GET_WINDOW) - ) - .flatMap(keysWindowFlux -> keysWindowFlux - .collectList() - .doOnDiscard(Entry.class, discardedEntry -> { - //noinspection unchecked - var entry = (Entry) discardedEntry; - entry.getKey().release(); - entry.getValue().release(); + return null; }) - .flatMap(entriesList -> Mono + .subscribeOn(dbScheduler) + .thenMany(entries + .window(MULTI_GET_WINDOW) + ) + .flatMap(keysWindowFlux -> keysWindowFlux + .collectList() + .doOnDiscard(Entry.class, discardedEntry -> { + //noinspection unchecked + var entry = (Entry) discardedEntry; + entry.getKey().release(); + entry.getValue().release(); + }) + .flatMap(entriesList -> Mono + .fromCallable(() -> { + try { + if (!USE_WRITE_BATCHES_IN_SET_RANGE) { + for (Entry entry : entriesList) { + db.put(cfh, EMPTY_WRITE_OPTIONS, entry.getKey().nioBuffer(), entry.getValue().nioBuffer()); + } + } else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) { + try (var batch = new CappedWriteBatch(db, + CAPPED_WRITE_BATCH_CAP, + RESERVED_WRITE_BATCH_SIZE, + MAX_WRITE_BATCH_SIZE, + BATCH_WRITE_OPTIONS + )) { + for (Entry entry : entriesList) { + batch.put(cfh, entry.getKey().retain(), entry.getValue().retain()); + } + batch.writeToDbAndClose(); + } + } else { + try (var batch = new WriteBatch(RESERVED_WRITE_BATCH_SIZE)) { + for (Entry entry : entriesList) { + batch.put(cfh, LLUtils.toArray(entry.getKey()), LLUtils.toArray(entry.getValue())); + } + db.write(EMPTY_WRITE_OPTIONS, batch); + batch.clear(); + } + } + return null; + } finally { + for (Entry entry : entriesList) { + entry.getKey().release(); + entry.getValue().release(); + } + } + }) + .subscribeOn(dbScheduler) + ) + ) + .then() + .onErrorMap(cause -> new IOException("Failed to write range", cause)); + } else { + if (USE_WRITE_BATCHES_IN_SET_RANGE) { + return Mono.fromCallable(() -> { + throw new UnsupportedOperationException("Can't use write batches in setRange without window. Please fix params"); + }); + } + return this + .getRange(null, Mono.just(range).map(LLRange::retain), false) + .flatMap(oldValue -> Mono .fromCallable(() -> { try { - if (!USE_WRITE_BATCHES_IN_SET_RANGE) { - for (Entry entry : entriesList) { - db.put(cfh, EMPTY_WRITE_OPTIONS, entry.getKey().nioBuffer(), entry.getValue().nioBuffer()); - } - } else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) { - try (var batch = new CappedWriteBatch(db, - CAPPED_WRITE_BATCH_CAP, - RESERVED_WRITE_BATCH_SIZE, - MAX_WRITE_BATCH_SIZE, - BATCH_WRITE_OPTIONS - )) { - for (Entry entry : entriesList) { - batch.put(cfh, entry.getKey().retain(), entry.getValue().retain()); - } - batch.writeToDbAndClose(); - } - } else { - try (var batch = new WriteBatch(RESERVED_WRITE_BATCH_SIZE)) { - for (Entry entry : entriesList) { - batch.put(cfh, LLUtils.toArray(entry.getKey()), LLUtils.toArray(entry.getValue())); - } - db.write(EMPTY_WRITE_OPTIONS, batch); - batch.clear(); - } - } + dbDelete(cfh, EMPTY_WRITE_OPTIONS, oldValue.getKey().retain()); return null; } finally { - for (Entry entry : entriesList) { - entry.getKey().release(); - entry.getValue().release(); - } + oldValue.getKey().release(); + oldValue.getValue().release(); } }) .subscribeOn(dbScheduler) ) - ) - .then() - .onErrorMap(cause -> new IOException("Failed to write range", cause)) - .doFirst(range::retain) - .doAfterTerminate(range::release); - } else { - if (USE_WRITE_BATCHES_IN_SET_RANGE) { - return Mono.fromCallable(() -> { - throw new UnsupportedOperationException("Can't use write batches in setRange without window. Please fix params"); - }); - } - return Flux - .defer(() -> this.getRange(null, range.retain(), false)) - .flatMap(oldValue -> Mono - .fromCallable(() -> { - try { - dbDelete(cfh, EMPTY_WRITE_OPTIONS, oldValue.getKey().retain()); - return null; - } finally { - oldValue.getKey().release(); - oldValue.getValue().release(); - } - }) - .subscribeOn(dbScheduler) - ) - .then(entries - .flatMap(entry -> this.put(entry.getKey(), entry.getValue(), LLDictionaryResultType.VOID)) - .doOnNext(ReferenceCounted::release) - .then(Mono.empty()) - ) - .onErrorMap(cause -> new IOException("Failed to write range", cause)) - .doFirst(range::retain) - .doAfterTerminate(range::release); - } - } finally { - range.release(); - } + .then(entries + .flatMap(entry -> Mono.using( + () -> entry, + releasableEntry -> this + .put(Mono.just(entry.getKey()).map(ByteBuf::retain), + Mono.just(entry.getValue()).map(ByteBuf::retain), + LLDictionaryResultType.VOID + ) + .doOnNext(ReferenceCounted::release), + releasableEntry -> { + releasableEntry.getKey().release(); + releasableEntry.getValue().release(); + }) + ) + .then(Mono.empty()) + ) + .onErrorMap(cause -> new IOException("Failed to write range", cause)); + } + }, + range -> Mono.fromRunnable(range::release) + ); } - //todo: this is broken, check why + //todo: this is broken, check why. (is this still true?) private void deleteSmallRangeWriteBatch(CappedWriteBatch writeBatch, LLRange range) throws RocksDBException { try (var readOpts = new ReadOptions(getReadOptions(null))) { @@ -1932,205 +1841,186 @@ public class LLLocalDictionary implements LLDictionary { } @Override - public Mono sizeRange(@Nullable LLSnapshot snapshot, LLRange range, boolean fast) { - try { - return Mono - .defer(() -> { - if (range.isAll()) { - return Mono - .fromCallable(() -> fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot)) - .onErrorMap(IOException::new) - .subscribeOn(dbScheduler); - } else { - return Mono - .fromCallable(() -> { - try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { - readOpts.setFillCache(false); - readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); - ReleasableSlice minBound; - if (range.hasMin()) { - minBound = setIterateBound(databaseOptions.allowNettyDirect(), - readOpts, - IterateBound.LOWER, + public Mono sizeRange(@Nullable LLSnapshot snapshot, Mono rangeMono, boolean fast) { + return Mono.usingWhen(rangeMono, + range -> { + if (range.isAll()) { + return this + .runOnDb(() -> fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot)) + .onErrorMap(IOException::new); + } else { + return runOnDb(() -> { + try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { + readOpts.setFillCache(false); + readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); + ReleasableSlice minBound; + if (range.hasMin()) { + minBound = setIterateBound(databaseOptions.allowNettyDirect(), + readOpts, + IterateBound.LOWER, + range.getMin().retain() + ); + } else { + minBound = emptyReleasableSlice(); + } + try { + ReleasableSlice maxBound; + if (range.hasMax()) { + maxBound = setIterateBound(databaseOptions.allowNettyDirect(), + readOpts, + IterateBound.UPPER, + range.getMax().retain() + ); + } else { + maxBound = emptyReleasableSlice(); + } + try { + if (fast) { + readOpts.setIgnoreRangeDeletions(true); + + } + try (var rocksIterator = db.newIterator(cfh, readOpts)) { + if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { + rocksIterSeekTo(databaseOptions.allowNettyDirect(), + rocksIterator, range.getMin().retain() ); } else { - minBound = emptyReleasableSlice(); + rocksIterator.seekToFirst(); } - try { - ReleasableSlice maxBound; - if (range.hasMax()) { - maxBound = setIterateBound(databaseOptions.allowNettyDirect(), - readOpts, - IterateBound.UPPER, - range.getMax().retain() - ); - } else { - maxBound = emptyReleasableSlice(); - } - try { - if (fast) { - readOpts.setIgnoreRangeDeletions(true); - - } - try (var rocksIterator = db.newIterator(cfh, readOpts)) { - if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - rocksIterSeekTo(databaseOptions.allowNettyDirect(), - rocksIterator, - range.getMin().retain() - ); - } else { - rocksIterator.seekToFirst(); - } - long i = 0; - rocksIterator.status(); - while (rocksIterator.isValid()) { - rocksIterator.next(); - rocksIterator.status(); - i++; - } - return i; - } - } finally { - maxBound.release(); - } - } finally { - minBound.release(); + long i = 0; + rocksIterator.status(); + while (rocksIterator.isValid()) { + rocksIterator.next(); + rocksIterator.status(); + i++; } + return i; } - }) - .onErrorMap(cause -> new IOException("Failed to get size of range " - + range, cause)) - .subscribeOn(dbScheduler); - } - }) - .doFirst(range::retain) - .doAfterTerminate(range::release); - } finally { - range.release(); - } + } finally { + maxBound.release(); + } + } finally { + minBound.release(); + } + } + }).onErrorMap(cause -> new IOException("Failed to get size of range " + range, cause)); + } + }, + range -> Mono.fromRunnable(range::release) + ); } @Override - public Mono> getOne(@Nullable LLSnapshot snapshot, LLRange range) { - try { - return Mono - .fromCallable(() -> { - try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { - ReleasableSlice minBound; - if (range.hasMin()) { - minBound = setIterateBound(databaseOptions.allowNettyDirect(), + public Mono> getOne(@Nullable LLSnapshot snapshot, Mono rangeMono) { + return Mono.usingWhen(rangeMono, + range -> runOnDb(() -> { + try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { + ReleasableSlice minBound; + if (range.hasMin()) { + minBound = setIterateBound(databaseOptions.allowNettyDirect(), + readOpts, + IterateBound.LOWER, + range.getMin().retain() + ); + } else { + minBound = emptyReleasableSlice(); + } + try { + ReleasableSlice maxBound; + if (range.hasMax()) { + maxBound = setIterateBound(databaseOptions.allowNettyDirect(), readOpts, - IterateBound.LOWER, - range.getMin().retain() + IterateBound.UPPER, + range.getMax().retain() ); } else { - minBound = emptyReleasableSlice(); + maxBound = emptyReleasableSlice(); } - try { - ReleasableSlice maxBound; - if (range.hasMax()) { - maxBound = setIterateBound(databaseOptions.allowNettyDirect(), - readOpts, - IterateBound.UPPER, - range.getMax().retain() - ); + try (var rocksIterator = db.newIterator(cfh, readOpts)) { + if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { + rocksIterSeekTo(databaseOptions.allowNettyDirect(), rocksIterator, range.getMin().retain()); } else { - maxBound = emptyReleasableSlice(); + rocksIterator.seekToFirst(); } - try (var rocksIterator = db.newIterator(cfh, readOpts)) { - if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - rocksIterSeekTo(databaseOptions.allowNettyDirect(), rocksIterator, range.getMin().retain()); - } else { - rocksIterator.seekToFirst(); - } - rocksIterator.status(); - if (rocksIterator.isValid()) { - ByteBuf key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key); + rocksIterator.status(); + if (rocksIterator.isValid()) { + ByteBuf key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key); + try { + ByteBuf value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value); try { - ByteBuf value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value); - try { - return Map.entry(key.retain(), value.retain()); - } finally { - value.release(); - } + return Map.entry(key.retain(), value.retain()); } finally { - key.release(); + value.release(); } - } else { - return null; + } finally { + key.release(); } - } finally { - maxBound.release(); + } else { + return null; } } finally { - minBound.release(); + maxBound.release(); } + } finally { + minBound.release(); } - }) - .subscribeOn(dbScheduler) - .doFirst(range::retain) - .doAfterTerminate(range::release); - } finally { - range.release(); - } + } + }), + range -> Mono.fromRunnable(range::release) + ); } @Override - public Mono getOneKey(@Nullable LLSnapshot snapshot, LLRange range) { - try { - return Mono - .fromCallable(() -> { - try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { - ReleasableSlice minBound; - if (range.hasMin()) { - minBound = setIterateBound(databaseOptions.allowNettyDirect(), + public Mono getOneKey(@Nullable LLSnapshot snapshot, Mono rangeMono) { + return Mono.usingWhen(rangeMono, + range -> runOnDb(() -> { + try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { + ReleasableSlice minBound; + if (range.hasMin()) { + minBound = setIterateBound(databaseOptions.allowNettyDirect(), + readOpts, + IterateBound.LOWER, + range.getMin().retain() + ); + } else { + minBound = emptyReleasableSlice(); + } + try { + ReleasableSlice maxBound; + if (range.hasMax()) { + maxBound = setIterateBound(databaseOptions.allowNettyDirect(), readOpts, - IterateBound.LOWER, - range.getMin().retain() + IterateBound.UPPER, + range.getMax().retain() ); } else { - minBound = emptyReleasableSlice(); + maxBound = emptyReleasableSlice(); } - try { - ReleasableSlice maxBound; - if (range.hasMax()) { - maxBound = setIterateBound(databaseOptions.allowNettyDirect(), - readOpts, - IterateBound.UPPER, - range.getMax().retain() - ); + try (var rocksIterator = db.newIterator(cfh, readOpts)) { + if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { + rocksIterSeekTo(databaseOptions.allowNettyDirect(), rocksIterator, range.getMin().retain()); } else { - maxBound = emptyReleasableSlice(); + rocksIterator.seekToFirst(); } - try (var rocksIterator = db.newIterator(cfh, readOpts)) { - if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - rocksIterSeekTo(databaseOptions.allowNettyDirect(), rocksIterator, range.getMin().retain()); - } else { - rocksIterator.seekToFirst(); - } - ByteBuf key; - rocksIterator.status(); - if (rocksIterator.isValid()) { - key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key); - return key; - } else { - return null; - } - } finally { - maxBound.release(); + ByteBuf key; + rocksIterator.status(); + if (rocksIterator.isValid()) { + key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key); + return key; + } else { + return null; } } finally { - minBound.release(); + maxBound.release(); } + } finally { + minBound.release(); } - }) - .subscribeOn(dbScheduler) - .doFirst(range::retain) - .doAfterTerminate(range::release); - } finally { - range.release(); - } + } + }), + range -> Mono.fromRunnable(range::release) + ); } private long fastSizeAll(@Nullable LLSnapshot snapshot) throws RocksDBException { @@ -2242,61 +2132,55 @@ public class LLLocalDictionary implements LLDictionary { } @Override - public Mono> removeOne(LLRange range) { - try { - return Mono - .fromCallable(() -> { - try (var readOpts = new ReadOptions(getReadOptions(null))) { - ReleasableSlice minBound; - if (range.hasMin()) { - minBound = setIterateBound(databaseOptions.allowNettyDirect(), + public Mono> removeOne(Mono rangeMono) { + return Mono.usingWhen(rangeMono, + range -> runOnDb(() -> { + try (var readOpts = new ReadOptions(getReadOptions(null))) { + ReleasableSlice minBound; + if (range.hasMin()) { + minBound = setIterateBound(databaseOptions.allowNettyDirect(), + readOpts, + IterateBound.LOWER, + range.getMin().retain() + ); + } else { + minBound = emptyReleasableSlice(); + } + try { + ReleasableSlice maxBound; + if (range.hasMax()) { + maxBound = setIterateBound(databaseOptions.allowNettyDirect(), readOpts, - IterateBound.LOWER, - range.getMin().retain() + IterateBound.UPPER, + range.getMax().retain() ); } else { - minBound = emptyReleasableSlice(); + maxBound = emptyReleasableSlice(); } - try { - ReleasableSlice maxBound; - if (range.hasMax()) { - maxBound = setIterateBound(databaseOptions.allowNettyDirect(), - readOpts, - IterateBound.UPPER, - range.getMax().retain() - ); + try (RocksIterator rocksIterator = db.newIterator(cfh, readOpts)) { + if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { + rocksIterSeekTo(databaseOptions.allowNettyDirect(), rocksIterator, range.getMin().retain()); } else { - maxBound = emptyReleasableSlice(); + rocksIterator.seekToFirst(); } - try (RocksIterator rocksIterator = db.newIterator(cfh, readOpts)) { - if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - rocksIterSeekTo(databaseOptions.allowNettyDirect(), rocksIterator, range.getMin().retain()); - } else { - rocksIterator.seekToFirst(); - } - rocksIterator.status(); - if (!rocksIterator.isValid()) { - return null; - } - ByteBuf key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key); - ByteBuf value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value); - dbDelete(cfh, null, key); - return Map.entry(key, value); - } finally { - maxBound.release(); + rocksIterator.status(); + if (!rocksIterator.isValid()) { + return null; } + ByteBuf key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key); + ByteBuf value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value); + dbDelete(cfh, null, key); + return Map.entry(key, value); } finally { - minBound.release(); + maxBound.release(); } + } finally { + minBound.release(); } - }) - .onErrorMap(cause -> new IOException("Failed to delete " + range.toString(), cause)) - .subscribeOn(dbScheduler) - .doFirst(range::retain) - .doAfterTerminate(range::release); - } finally { - range.release(); - } + } + }).onErrorMap(cause -> new IOException("Failed to delete " + range.toString(), cause)), + range -> Mono.fromRunnable(range::release) + ); } @NotNull