From 574a35907d56c814b9490add36faa2c15044a5f5 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Wed, 26 Jan 2022 21:18:43 +0100 Subject: [PATCH] Further optimizations --- .../database/disk/LLLocalDictionary.java | 659 +++++++++--------- 1 file changed, 327 insertions(+), 332 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index b9a53fd..a0b79c3 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -13,7 +13,6 @@ import io.micrometer.core.instrument.Timer; import io.net5.buffer.api.Buffer; import io.net5.buffer.api.BufferAllocator; import io.net5.buffer.api.Send; -import io.net5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.client.DatabaseOptions; import it.cavallium.dbengine.database.Column; @@ -400,28 +399,10 @@ public class LLLocalDictionary implements LLDictionary { .handle((entry, sink) -> { try (var key = entry.getKey().receive()) { try (var value = entry.getValue().receive()) { - assert key.isAccessible(); - assert value.isAccessible(); - if (logger.isTraceEnabled(MARKER_ROCKSDB)) { - var varargs = new Supplier[]{() -> toStringSafe(key), () -> toStringSafe(value)}; - logger.trace(MARKER_ROCKSDB, "Writing {}: {}", varargs); - } - startedPut.increment(); - try { - putTime.recordCallable(() -> { - db.put(EMPTY_WRITE_OPTIONS, key, value); - return null; - }); - } catch (RocksDBException ex) { - sink.error(new RocksDBException("Failed to write: " + ex.getMessage())); - return; - } catch (Exception ex) { - sink.error(ex); - return; - } finally { - endedPut.increment(); - } + put(key, value); sink.complete(); + } catch (RocksDBException ex) { + sink.error(ex); } } }); @@ -429,6 +410,30 @@ public class LLLocalDictionary implements LLDictionary { return Flux.concat(previousDataMono, putMono).singleOrEmpty(); } + private void put(Buffer key, Buffer value) throws RocksDBException { + assert key.isAccessible(); + assert value.isAccessible(); + if (logger.isTraceEnabled(MARKER_ROCKSDB)) { + var varargs = new Supplier[]{() -> toStringSafe(key), () -> toStringSafe(value)}; + logger.trace(MARKER_ROCKSDB, "Writing {}: {}", varargs); + } + startedPut.increment(); + try { + putTime.recordCallable(() -> { + db.put(EMPTY_WRITE_OPTIONS, key, value); + return null; + }); + } catch (RocksDBException ex) { + throw new RocksDBException("Failed to write: " + ex.getMessage()); + } catch (RuntimeException ex) { + throw ex; + } catch (Exception ex) { + throw new RuntimeException("Failed to write", ex); + } finally { + endedPut.increment(); + } + } + @Override public Mono getUpdateMode() { return Mono.just(updateMode); @@ -950,180 +955,181 @@ public class LLLocalDictionary implements LLDictionary { @Override public Mono setRange(Mono> rangeMono, Flux> entries) { - //todo: change usingWhen and use a better alternative - return Mono.usingWhen(rangeMono, - rangeSend -> { - if (USE_WINDOW_IN_SET_RANGE) { - return this - .runOnDb(() -> { - try (var range = rangeSend.receive()) { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called setRange in a nonblocking thread"); - } - if (!USE_WRITE_BATCH_IN_SET_RANGE_DELETE || !USE_WRITE_BATCHES_IN_SET_RANGE) { - assert EMPTY_READ_OPTIONS.isOwningHandle(); - try (var opts = new ReadOptions(EMPTY_READ_OPTIONS)) { - ReleasableSlice minBound; - if (range.hasMin()) { - minBound = setIterateBound(nettyDirect, opts, IterateBound.LOWER, range.getMinUnsafe()); - } else { - minBound = emptyReleasableSlice(); - } - try { - ReleasableSlice maxBound; - if (range.hasMax()) { - maxBound = setIterateBound(nettyDirect, opts, IterateBound.UPPER, range.getMaxUnsafe()); - } else { - maxBound = emptyReleasableSlice(); - } - assert cfh.isOwningHandle(); - assert opts.isOwningHandle(); - SafeCloseable seekTo; - try (RocksIterator it = db.newIterator(opts)) { - if (!PREFER_SEEK_TO_FIRST && range.hasMin()) { - seekTo = rocksIterSeekTo(nettyDirect, it, range.getMinUnsafe()); - } else { - seekTo = null; - it.seekToFirst(); - } - try { - it.status(); - while (it.isValid()) { - db.delete(EMPTY_WRITE_OPTIONS, it.key()); - it.next(); - it.status(); - } - } finally { - if (seekTo != null) { - seekTo.close(); - } - } - } finally { - maxBound.close(); - } - } finally { - minBound.close(); - } - } - } else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) { - try (var batch = new CappedWriteBatch(db, - alloc, - CAPPED_WRITE_BATCH_CAP, - RESERVED_WRITE_BATCH_SIZE, - MAX_WRITE_BATCH_SIZE, - BATCH_WRITE_OPTIONS - )) { - if (range.isSingle()) { - batch.delete(cfh, range.getSingle()); - } else { - deleteSmallRangeWriteBatch(batch, range.copy().send()); - } - batch.writeToDbAndClose(); - } + if (USE_WINDOW_IN_SET_RANGE) { + return rangeMono + .publishOn(dbScheduler) + .handle((rangeSend, sink) -> { + try (var range = rangeSend.receive()) { + assert !Schedulers.isInNonBlockingThread() : "Called setRange in a nonblocking thread"; + if (!USE_WRITE_BATCH_IN_SET_RANGE_DELETE || !USE_WRITE_BATCHES_IN_SET_RANGE) { + assert EMPTY_READ_OPTIONS.isOwningHandle(); + try (var opts = new ReadOptions(EMPTY_READ_OPTIONS)) { + ReleasableSlice minBound; + if (range.hasMin()) { + minBound = setIterateBound(nettyDirect, opts, IterateBound.LOWER, range.getMinUnsafe()); + } else { + minBound = emptyReleasableSlice(); + } + try { + ReleasableSlice maxBound; + if (range.hasMax()) { + maxBound = setIterateBound(nettyDirect, opts, IterateBound.UPPER, range.getMaxUnsafe()); } else { - try (var batch = new WriteBatch(RESERVED_WRITE_BATCH_SIZE)) { - if (range.isSingle()) { - batch.delete(cfh, LLUtils.toArray(range.getSingleUnsafe())); - } else { - deleteSmallRangeWriteBatch(batch, range.copy().send()); + maxBound = emptyReleasableSlice(); + } + assert cfh.isOwningHandle(); + assert opts.isOwningHandle(); + SafeCloseable seekTo; + try (RocksIterator it = db.newIterator(opts)) { + if (!PREFER_SEEK_TO_FIRST && range.hasMin()) { + seekTo = rocksIterSeekTo(nettyDirect, it, range.getMinUnsafe()); + } else { + seekTo = null; + it.seekToFirst(); + } + try { + it.status(); + while (it.isValid()) { + db.delete(EMPTY_WRITE_OPTIONS, it.key()); + it.next(); + it.status(); } - db.write(EMPTY_WRITE_OPTIONS, batch); - batch.clear(); + } finally { + if (seekTo != null) { + seekTo.close(); + } + } + } finally { + maxBound.close(); + } + } finally { + minBound.close(); + } + } + } else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) { + try (var batch = new CappedWriteBatch(db, + alloc, + CAPPED_WRITE_BATCH_CAP, + RESERVED_WRITE_BATCH_SIZE, + MAX_WRITE_BATCH_SIZE, + BATCH_WRITE_OPTIONS + )) { + if (range.isSingle()) { + batch.delete(cfh, range.getSingle()); + } else { + deleteSmallRangeWriteBatch(batch, range.copy().send()); + } + batch.writeToDbAndClose(); + } + } else { + try (var batch = new WriteBatch(RESERVED_WRITE_BATCH_SIZE)) { + if (range.isSingle()) { + batch.delete(cfh, LLUtils.toArray(range.getSingleUnsafe())); + } else { + deleteSmallRangeWriteBatch(batch, range.copy().send()); + } + db.write(EMPTY_WRITE_OPTIONS, batch); + batch.clear(); + } + } + sink.complete(); + } catch (RocksDBException ex) { + sink.error(new RocksDBException("Failed to set a range: " + ex.getMessage())); + } + }) + .thenMany(entries.window(MULTI_GET_WINDOW)) + .flatMap(keysWindowFlux -> keysWindowFlux + .collectList() + .flatMap(entriesListSend -> this + .runOnDb(() -> { + List entriesList = new ArrayList<>(entriesListSend.size()); + for (Send entrySend : entriesListSend) { + entriesList.add(entrySend.receive()); + } + try { + if (!USE_WRITE_BATCHES_IN_SET_RANGE) { + for (LLEntry entry : entriesList) { + assert entry.isAccessible(); + db.put(EMPTY_WRITE_OPTIONS, entry.getKeyUnsafe(), entry.getValueUnsafe()); + } + } else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) { + try (var batch = new CappedWriteBatch(db, + alloc, + CAPPED_WRITE_BATCH_CAP, + RESERVED_WRITE_BATCH_SIZE, + MAX_WRITE_BATCH_SIZE, + BATCH_WRITE_OPTIONS + )) { + for (LLEntry entry : entriesList) { + assert entry.isAccessible(); + if (nettyDirect) { + batch.put(cfh, entry.getKey(), entry.getValue()); + } else { + batch.put(cfh, + LLUtils.toArray(entry.getKeyUnsafe()), + LLUtils.toArray(entry.getValueUnsafe()) + ); + } + } + batch.writeToDbAndClose(); + } + } else { + try (var batch = new WriteBatch(RESERVED_WRITE_BATCH_SIZE)) { + for (LLEntry entry : entriesList) { + assert entry.isAccessible(); + batch.put(cfh, LLUtils.toArray(entry.getKeyUnsafe()), + LLUtils.toArray(entry.getValueUnsafe())); + } + db.write(EMPTY_WRITE_OPTIONS, batch); + batch.clear(); + } + } + return null; + } finally { + for (LLEntry entry : entriesList) { + assert entry.isAccessible(); + entry.close(); } } - return null; - } - }) - .thenMany(entries.window(MULTI_GET_WINDOW)) - .flatMap(keysWindowFlux -> keysWindowFlux - .collectList() - .flatMap(entriesListSend -> this - .runOnDb(() -> { - List entriesList = new ArrayList<>(entriesListSend.size()); - for (Send entrySend : entriesListSend) { - entriesList.add(entrySend.receive()); - } - try { - if (!USE_WRITE_BATCHES_IN_SET_RANGE) { - for (LLEntry entry : entriesList) { - assert entry.isAccessible(); - db.put(EMPTY_WRITE_OPTIONS, entry.getKeyUnsafe(), entry.getValueUnsafe()); - } - } else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) { - try (var batch = new CappedWriteBatch(db, - alloc, - CAPPED_WRITE_BATCH_CAP, - RESERVED_WRITE_BATCH_SIZE, - MAX_WRITE_BATCH_SIZE, - BATCH_WRITE_OPTIONS - )) { - for (LLEntry entry : entriesList) { - assert entry.isAccessible(); - if (nettyDirect) { - batch.put(cfh, entry.getKey(), entry.getValue()); - } else { - batch.put(cfh, - LLUtils.toArray(entry.getKeyUnsafe()), - LLUtils.toArray(entry.getValueUnsafe()) - ); - } - } - batch.writeToDbAndClose(); - } - } else { - try (var batch = new WriteBatch(RESERVED_WRITE_BATCH_SIZE)) { - for (LLEntry entry : entriesList) { - assert entry.isAccessible(); - batch.put(cfh, LLUtils.toArray(entry.getKeyUnsafe()), - LLUtils.toArray(entry.getValueUnsafe())); - } - db.write(EMPTY_WRITE_OPTIONS, batch); - batch.clear(); - } - } - return null; - } finally { - for (LLEntry entry : entriesList) { - assert entry.isAccessible(); - entry.close(); - } - } - }) - ) - ) - .then() - .onErrorMap(cause -> new IOException("Failed to write range", cause)); - } else { - if (USE_WRITE_BATCHES_IN_SET_RANGE) { - return Mono.fromCallable(() -> { - throw new UnsupportedOperationException("Can't use write batches in setRange without window." - + " Please fix the parameters"); - }); + }) + ) + ) + .then() + .onErrorMap(cause -> new IOException("Failed to write range", cause)); + } else { + if (USE_WRITE_BATCHES_IN_SET_RANGE) { + return Mono.error(() -> new UnsupportedOperationException( + "Can't use write batches in setRange without window. Please fix the parameters")); + } + var deleteMono = this + .getRange(null, rangeMono, false) + .publishOn(dbScheduler) + .handle((oldValueSend, sink) -> { + try (var oldValue = oldValueSend.receive()) { + db.delete(EMPTY_WRITE_OPTIONS, oldValue.getKeyUnsafe()); + sink.complete(); + } catch (RocksDBException ex) { + sink.error(new RocksDBException("Failed to write range: " + ex.getMessage())); } - return this - .getRange(null, rangeMono, false) - .flatMap(oldValueSend -> this.runOnDb(() -> { - try (var oldValue = oldValueSend.receive()) { - db.delete(EMPTY_WRITE_OPTIONS, oldValue.getKeyUnsafe()); - return null; - } - })) - .then(entries - .flatMap(entrySend -> Mono.using( - entrySend::receive, - entry -> this - .put(LLUtils.lazyRetain(entry::getKey), LLUtils.lazyRetain(entry::getValue), - LLDictionaryResultType.VOID) - .doOnNext(Send::close), - ResourceSupport::close - )) - .then(Mono.empty()) - ) - .onErrorMap(cause -> new IOException("Failed to write range", cause)); - } - }, - rangeSend -> Mono.fromRunnable(rangeSend::close) - ); + }) + .then(Mono.empty()); + + var putMono = entries + .publishOn(dbScheduler) + .handle((entrySend, sink) -> { + try (var entry = entrySend.receive()) { + if (entry.getKeyUnsafe() != null && entry.getValueUnsafe() != null) { + this.put(entry.getKeyUnsafe(), entry.getValueUnsafe()); + } + sink.complete(); + } catch (RocksDBException ex) { + sink.error(new RocksDBException("Failed to write range: " + ex.getMessage())); + } + }) + .then(Mono.empty()); + + return deleteMono.then(putMono); + } } //todo: this is broken, check why. (is this still true?) @@ -1289,9 +1295,7 @@ public class LLLocalDictionary implements LLDictionary { public Mono clear() { return Mono .fromCallable(() -> { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called clear in a nonblocking thread"); - } + assert !Schedulers.isInNonBlockingThread() : "Called clear in a nonblocking thread"; boolean shouldCompactLater = false; try (var readOpts = new ReadOptions(getReadOptions(null))) { readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); @@ -1360,13 +1364,11 @@ public class LLLocalDictionary implements LLDictionary { @Override public Mono sizeRange(@Nullable LLSnapshot snapshot, Mono> rangeMono, boolean fast) { - return Mono.usingWhen(rangeMono, rangeSend -> runOnDb(() -> { + return rangeMono.publishOn(dbScheduler).handle((rangeSend, sink) -> { try (var range = rangeSend.receive()) { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called sizeRange in a nonblocking thread"); - } + assert !Schedulers.isInNonBlockingThread() : "Called sizeRange in a nonblocking thread"; if (range.isAll()) { - return fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot); + sink.next(fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot)); } else { try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { readOpts.setFillCache(false); @@ -1405,7 +1407,7 @@ public class LLLocalDictionary implements LLDictionary { rocksIterator.status(); i++; } - return i; + sink.next(i); } finally { if (seekTo != null) { seekTo.close(); @@ -1420,123 +1422,118 @@ public class LLLocalDictionary implements LLDictionary { } } } + } catch (RocksDBException ex) { + sink.error(new RocksDBException("Failed to get size of range: " + ex.getMessage())); } - }).onErrorMap(cause -> new IOException("Failed to get size of range", cause)), - rangeSend -> Mono.fromRunnable(rangeSend::close)); + }); } @Override public Mono> getOne(@Nullable LLSnapshot snapshot, Mono> rangeMono) { - return Mono.usingWhen(rangeMono, - rangeSend -> runOnDb(() -> { - try (var range = rangeSend.receive()) { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called getOne in a nonblocking thread"); + return rangeMono.publishOn(dbScheduler).handle((rangeSend, sink) -> { + try (var range = rangeSend.receive()) { + assert !Schedulers.isInNonBlockingThread() : "Called getOne in a nonblocking thread"; + try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { + ReleasableSlice minBound; + if (range.hasMin()) { + minBound = setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe()); + } else { + minBound = emptyReleasableSlice(); + } + try { + ReleasableSlice maxBound; + if (range.hasMax()) { + maxBound = setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe()); + } else { + maxBound = emptyReleasableSlice(); } - try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { - ReleasableSlice minBound; - if (range.hasMin()) { - minBound = setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe()); + try (var rocksIterator = db.newIterator(readOpts)) { + SafeCloseable seekTo; + if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { + seekTo = rocksIterSeekTo(nettyDirect, rocksIterator, range.getMinUnsafe()); } else { - minBound = emptyReleasableSlice(); + seekTo = null; + rocksIterator.seekToFirst(); } try { - ReleasableSlice maxBound; - if (range.hasMax()) { - maxBound = setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe()); + rocksIterator.status(); + if (rocksIterator.isValid()) { + try (var key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key)) { + try (var value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value)) { + sink.next(LLEntry.of(key.send(), value.send()).send()); + } + } } else { - maxBound = emptyReleasableSlice(); - } - try (var rocksIterator = db.newIterator(readOpts)) { - SafeCloseable seekTo; - if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - seekTo = rocksIterSeekTo(nettyDirect, rocksIterator, range.getMinUnsafe()); - } else { - seekTo = null; - rocksIterator.seekToFirst(); - } - try { - rocksIterator.status(); - if (rocksIterator.isValid()) { - try (var key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key)) { - try (var value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value)) { - return LLEntry.of(key.send(), value.send()).send(); - } - } - } else { - return null; - } - } finally { - if (seekTo != null) { - seekTo.close(); - } - } - } finally { - maxBound.close(); + sink.complete(); } } finally { - minBound.close(); + if (seekTo != null) { + seekTo.close(); + } } + } finally { + maxBound.close(); } + } finally { + minBound.close(); } - }), - rangeSend -> Mono.fromRunnable(rangeSend::close) - ); + } + } catch (RocksDBException ex) { + sink.error(new RocksDBException("Failed to get one entry: " + ex.getMessage())); + } + }); } @Override public Mono> getOneKey(@Nullable LLSnapshot snapshot, Mono> rangeMono) { - return Mono.usingWhen(rangeMono, - rangeSend -> runOnDb(() -> { - try (var range = rangeSend.receive()) { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called getOneKey in a nonblocking thread"); + return rangeMono.publishOn(dbScheduler).handle((rangeSend, sink) -> { + try (var range = rangeSend.receive()) { + assert !Schedulers.isInNonBlockingThread() : "Called getOneKey in a nonblocking thread"; + try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { + ReleasableSlice minBound; + if (range.hasMin()) { + minBound = setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe()); + } else { + minBound = emptyReleasableSlice(); + } + try { + ReleasableSlice maxBound; + if (range.hasMax()) { + maxBound = setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe()); + } else { + maxBound = emptyReleasableSlice(); } - try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { - ReleasableSlice minBound; - if (range.hasMin()) { - minBound = setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe()); + try (var rocksIterator = db.newIterator(readOpts)) { + SafeCloseable seekTo; + if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { + seekTo = rocksIterSeekTo(nettyDirect, rocksIterator, range.getMinUnsafe()); } else { - minBound = emptyReleasableSlice(); + seekTo = null; + rocksIterator.seekToFirst(); } try { - ReleasableSlice maxBound; - if (range.hasMax()) { - maxBound = setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe()); + rocksIterator.status(); + if (rocksIterator.isValid()) { + sink.next(LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).send()); } else { - maxBound = emptyReleasableSlice(); - } - try (var rocksIterator = db.newIterator(readOpts)) { - SafeCloseable seekTo; - if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - seekTo = rocksIterSeekTo(nettyDirect, rocksIterator, range.getMinUnsafe()); - } else { - seekTo = null; - rocksIterator.seekToFirst(); - } - try { - rocksIterator.status(); - if (rocksIterator.isValid()) { - return LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).send(); - } else { - return null; - } - } finally { - if (seekTo != null) { - seekTo.close(); - } - } - } finally { - maxBound.close(); + sink.complete(); } } finally { - minBound.close(); + if (seekTo != null) { + seekTo.close(); + } } + } finally { + maxBound.close(); } + } finally { + minBound.close(); } - }), - rangeSend -> Mono.fromRunnable(rangeSend::close) - ); + } + } catch (RocksDBException ex) { + sink.error(new RocksDBException("Failed to get one key: " + ex.getMessage())); + } + }); } private long fastSizeAll(@Nullable LLSnapshot snapshot) throws RocksDBException { @@ -1652,59 +1649,57 @@ public class LLLocalDictionary implements LLDictionary { @Override public Mono> removeOne(Mono> rangeMono) { - return Mono.usingWhen(rangeMono, - rangeSend -> runOnDb(() -> { - try (var range = rangeSend.receive()) { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called removeOne in a nonblocking thread"); + return rangeMono.publishOn(dbScheduler).handle((rangeSend, sink) -> { + try (var range = rangeSend.receive()) { + assert !Schedulers.isInNonBlockingThread() : "Called removeOne in a nonblocking thread"; + try (var readOpts = new ReadOptions(getReadOptions(null))) { + ReleasableSlice minBound; + if (range.hasMin()) { + minBound = setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe()); + } else { + minBound = emptyReleasableSlice(); + } + try { + ReleasableSlice maxBound; + if (range.hasMax()) { + maxBound = setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe()); + } else { + maxBound = emptyReleasableSlice(); } - try (var readOpts = new ReadOptions(getReadOptions(null))) { - ReleasableSlice minBound; - if (range.hasMin()) { - minBound = setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe()); + try (RocksIterator rocksIterator = db.newIterator(readOpts)) { + SafeCloseable seekTo; + if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { + seekTo = rocksIterSeekTo(nettyDirect, rocksIterator, range.getMinUnsafe()); } else { - minBound = emptyReleasableSlice(); + seekTo = null; + rocksIterator.seekToFirst(); } try { - ReleasableSlice maxBound; - if (range.hasMax()) { - maxBound = setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe()); - } else { - maxBound = emptyReleasableSlice(); - } - try (RocksIterator rocksIterator = db.newIterator(readOpts)) { - SafeCloseable seekTo; - if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - seekTo = rocksIterSeekTo(nettyDirect, rocksIterator, range.getMinUnsafe()); - } else { - seekTo = null; - rocksIterator.seekToFirst(); - } - try { - rocksIterator.status(); - if (!rocksIterator.isValid()) { - return null; - } - Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key); - Buffer value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value); - db.delete(EMPTY_WRITE_OPTIONS, key); - return LLEntry.of(key, value).send(); - } finally { - if (seekTo != null) { - seekTo.close(); - } - } - } finally { - maxBound.close(); + rocksIterator.status(); + if (!rocksIterator.isValid()) { + sink.complete(); + return; } + Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key); + Buffer value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value); + db.delete(EMPTY_WRITE_OPTIONS, key); + sink.next(LLEntry.of(key, value).send()); } finally { - minBound.close(); + if (seekTo != null) { + seekTo.close(); + } } + } finally { + maxBound.close(); } + } finally { + minBound.close(); } - }).onErrorMap(cause -> new IOException("Failed to delete", cause)), - rangeSend -> Mono.fromRunnable(rangeSend::close) - ); + } + } catch (RocksDBException ex) { + sink.error(ex); + } + }); } /**