From 95afa6f9dde113849f1f2cee6ac5087e64caea39 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Wed, 26 Jan 2022 19:03:51 +0100 Subject: [PATCH] Clean database code --- .../dbengine/database/LLDictionary.java | 8 +- .../collections/DatabaseMapDictionary.java | 2 +- .../collections/DatabaseStageMap.java | 7 +- .../database/disk/LLLocalDictionary.java | 458 +++++++++--------- .../LLLocalGroupedReactiveRocksIterator.java | 12 +- ...LLLocalKeyPrefixReactiveRocksIterator.java | 159 +++--- .../disk/LLLocalReactiveRocksIterator.java | 44 +- .../database/disk/ReleasableSlice.java | 6 - .../database/disk/RocksIteratorTuple.java | 18 + .../database/memory/LLMemoryDictionary.java | 11 +- .../cavallium/dbengine/TestLLDictionary.java | 17 +- 11 files changed, 355 insertions(+), 387 deletions(-) create mode 100644 src/main/java/it/cavallium/dbengine/database/disk/RocksIteratorTuple.java diff --git a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java index 0d5e29e..c53d0c5 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java @@ -67,7 +67,7 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure { return getMulti(snapshot, keys, false); } - Flux> putMulti(Flux> entries, boolean getOldValues); + Mono putMulti(Flux> entries); Flux updateMulti(Flux keys, Flux> serializedKeys, KVSerializationFunction, @Nullable Buffer> updateFunction); @@ -111,11 +111,7 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure { .flatMap(entriesReplacer) ); } else { - return this - .putMulti(this - .getRange(null, range, existsAlmostCertainly) - .flatMap(entriesReplacer), false) - .then(); + return this.putMulti(this.getRange(null, range, existsAlmostCertainly).flatMap(entriesReplacer)); } }); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index 149023a..3b9cabc 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -391,7 +391,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> extends default Mono> updateValueAndGetDelta(T key, boolean existsAlmostCertainly, SerializationFunction<@Nullable U, @Nullable U> updater) { - return LLUtils.usingResource(this.at(null, key).single(), - stage -> stage.updateAndGetDelta(updater, existsAlmostCertainly), true); + var stageMono = this.at(null, key).single(); + return stageMono.flatMap(stage -> stage + .updateAndGetDelta(updater, existsAlmostCertainly) + .doFinally(s -> stage.close())); } default Mono> updateValueAndGetDelta(T key, SerializationFunction<@Nullable U, @Nullable U> updater) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index ad36710..ee44262 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -38,6 +38,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.Callable; +import java.util.concurrent.CompletionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.TimeUnit; @@ -68,7 +69,6 @@ import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuple2; import reactor.util.function.Tuple3; -import reactor.util.function.Tuple4; import reactor.util.function.Tuples; public class LLLocalDictionary implements LLDictionary { @@ -128,7 +128,6 @@ public class LLLocalDictionary implements LLDictionary { private final Scheduler dbScheduler; private final Function snapshotResolver; private final UpdateMode updateMode; - private final DatabaseOptions databaseOptions; private final boolean nettyDirect; private final BufferAllocator alloc; @@ -165,7 +164,6 @@ public class LLLocalDictionary implements LLDictionary { this.dbScheduler = dbScheduler; this.snapshotResolver = snapshotResolver; this.updateMode = updateMode; - this.databaseOptions = databaseOptions; alloc = allocator; this.nettyDirect = databaseOptions.allowNettyDirect() && alloc.getAllocationType() == OFF_HEAP; var meterRegistry = db.getMeterRegistry(); @@ -350,6 +348,9 @@ public class LLLocalDictionary implements LLDictionary { }); assert result != null; sink.next(!result); + } catch (RocksDBException ex) { + sink.error(new RocksDBException("Failed to read range " + LLUtils.toStringSafe(range) + + ": " + ex.getMessage())); } finally { endedContains.increment(); } @@ -397,39 +398,43 @@ public class LLLocalDictionary implements LLDictionary { // Obtain the previous value from the database var previousDataMono = this.getPreviousData(keyMono, resultType, false); // Write the new entry to the database - var putMono = entryMono + Mono> putMono = entryMono .publishOn(dbScheduler) - .handle((entry, sink) -> { + .handle((entry, sink) -> { try (var key = entry.getKey().receive()) { try (var value = entry.getValue().receive()) { assert key.isAccessible(); assert value.isAccessible(); - var varargs = new Supplier[]{() -> toStringSafe(key), () -> toStringSafe(value)}; - logger.trace(MARKER_ROCKSDB, "Writing {}: {}", varargs); - db.put(EMPTY_WRITE_OPTIONS, key, value); + if (logger.isTraceEnabled(MARKER_ROCKSDB)) { + var varargs = new Supplier[]{() -> toStringSafe(key), () -> toStringSafe(value)}; + logger.trace(MARKER_ROCKSDB, "Writing {}: {}", varargs); + } + startedPut.increment(); + try { + putTime.recordCallable(() -> { + db.put(EMPTY_WRITE_OPTIONS, key, value); + return null; + }); + } catch (RocksDBException ex) { + sink.error(new RocksDBException("Failed to write: " + ex.getMessage())); + return; + } catch (Exception ex) { + sink.error(ex); + return; + } finally { + endedPut.increment(); + } sink.complete(); } - } catch (Throwable ex) { - sink.error(ex); } }); // Read the previous data, then write the new data, then return the previous data - return Flux - .concat(previousDataMono, putMono.then(Mono.empty())) - .singleOrEmpty() - .onErrorMap(cause -> new IOException("Failed to write", cause)) - .elapsed() - .map(tuple -> { - putTime.record(tuple.getT1(), TimeUnit.MILLISECONDS); - return tuple.getT2(); - }) - .doFirst(startedPut::increment) - .doFinally(s -> endedPut.increment()); + return Flux.concat(previousDataMono, putMono).singleOrEmpty(); } @Override public Mono getUpdateMode() { - return Mono.fromSupplier(() -> updateMode); + return Mono.just(updateMode); } @SuppressWarnings("DuplicatedCode") @@ -438,34 +443,43 @@ public class LLLocalDictionary implements LLDictionary { SerializationFunction<@Nullable Send, @Nullable Buffer> updater, UpdateReturnMode updateReturnMode, boolean existsAlmostCertainly) { - return Mono.usingWhen(keyMono, keySend -> runOnDb(() -> { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called update in a nonblocking thread"); + return keyMono + .publishOn(dbScheduler) + .handle((keySend, sink) -> { + try (keySend) { + assert !Schedulers.isInNonBlockingThread() : "Called update in a nonblocking thread"; + if (updateMode == UpdateMode.DISALLOW) { + sink.error(new UnsupportedOperationException("update() is disallowed")); + return; + } + UpdateAtomicResultMode returnMode = switch (updateReturnMode) { + case NOTHING -> UpdateAtomicResultMode.NOTHING; + case GET_NEW_VALUE -> UpdateAtomicResultMode.CURRENT; + case GET_OLD_VALUE -> UpdateAtomicResultMode.PREVIOUS; + }; + UpdateAtomicResult result; + startedUpdates.increment(); + try { + result = updateTime.recordCallable(() -> db.updateAtomic(EMPTY_READ_OPTIONS, + EMPTY_WRITE_OPTIONS, keySend, updater, existsAlmostCertainly, returnMode)); + } finally { + endedUpdates.increment(); + } + assert result != null; + var previous = switch (updateReturnMode) { + case NOTHING -> null; + case GET_NEW_VALUE -> ((UpdateAtomicResultCurrent) result).current(); + case GET_OLD_VALUE -> ((UpdateAtomicResultPrevious) result).previous(); + }; + if (previous != null) { + sink.next(previous); + } else { + sink.complete(); + } + } catch (Exception ex) { + sink.error(ex); } - if (updateMode == UpdateMode.DISALLOW) { - throw new UnsupportedOperationException("update() is disallowed"); - } - UpdateAtomicResultMode returnMode = switch (updateReturnMode) { - case NOTHING -> UpdateAtomicResultMode.NOTHING; - case GET_NEW_VALUE -> UpdateAtomicResultMode.CURRENT; - case GET_OLD_VALUE -> UpdateAtomicResultMode.PREVIOUS; - }; - UpdateAtomicResult result; - startedUpdates.increment(); - try { - result = updateTime.recordCallable(() -> db.updateAtomic(EMPTY_READ_OPTIONS, - EMPTY_WRITE_OPTIONS, keySend, updater, existsAlmostCertainly, returnMode)); - } finally { - endedUpdates.increment(); - } - assert result != null; - return switch (updateReturnMode) { - case NOTHING -> null; - case GET_NEW_VALUE -> ((UpdateAtomicResultCurrent) result).current(); - case GET_OLD_VALUE -> ((UpdateAtomicResultPrevious) result).previous(); - }; - }).onErrorMap(cause -> new IOException("Failed to read or write", cause)), - keySend -> Mono.fromRunnable(keySend::close)); + }); } @SuppressWarnings("DuplicatedCode") @@ -473,58 +487,65 @@ public class LLLocalDictionary implements LLDictionary { public Mono> updateAndGetDelta(Mono> keyMono, SerializationFunction<@Nullable Send, @Nullable Buffer> updater, boolean existsAlmostCertainly) { - return Mono.usingWhen(keyMono, keySend -> runOnDb(() -> { - if (Schedulers.isInNonBlockingThread()) { - keySend.close(); - throw new UnsupportedOperationException("Called update in a nonblocking thread"); + return keyMono + .publishOn(dbScheduler) + .handle((keySend, sink) -> { + try (keySend) { + assert !Schedulers.isInNonBlockingThread() : "Called update in a nonblocking thread"; + if (updateMode == UpdateMode.DISALLOW) { + sink.error(new UnsupportedOperationException("update() is disallowed")); + return; + } + if (updateMode == UpdateMode.ALLOW && !db.supportsTransactions()) { + sink.error(new UnsupportedOperationException("update() is disallowed because the database doesn't support" + + "safe atomic operations")); + return; + } + + UpdateAtomicResult result; + startedUpdates.increment(); + try { + result = updateTime.recordCallable(() -> db.updateAtomic(EMPTY_READ_OPTIONS, + EMPTY_WRITE_OPTIONS, keySend, updater, existsAlmostCertainly, UpdateAtomicResultMode.DELTA)); + } finally { + endedUpdates.increment(); + } + assert result != null; + sink.next(((UpdateAtomicResultDelta) result).delta()); + } catch (Exception ex) { + sink.error(ex); } - if (updateMode == UpdateMode.DISALLOW) { - keySend.close(); - throw new UnsupportedOperationException("update() is disallowed"); - } - if (updateMode == UpdateMode.ALLOW && !db.supportsTransactions()) { - throw new UnsupportedOperationException("update() is disallowed because the database doesn't support" - + "safe atomic operations"); - } - UpdateAtomicResult result; - startedUpdates.increment(); - try { - result = updateTime.recordCallable(() -> db.updateAtomic(EMPTY_READ_OPTIONS, - EMPTY_WRITE_OPTIONS, keySend, updater, existsAlmostCertainly, UpdateAtomicResultMode.DELTA)); - } finally { - endedUpdates.increment(); - } - assert result != null; - return ((UpdateAtomicResultDelta) result).delta(); - }).onErrorMap(cause -> new IOException("Failed to read or write", cause)), - keySend -> Mono.fromRunnable(keySend::close)); + }); } @Override public Mono> remove(Mono> keyMono, LLDictionaryResultType resultType) { - return Mono.usingWhen(keyMono, - keySend -> this - .getPreviousData(keyMono, resultType, true) - .concatWith(this - .>runOnDb(() -> { - try (var key = keySend.receive()) { - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Deleting {}", toStringSafe(key)); - db.delete(EMPTY_WRITE_OPTIONS, key); - } else { - db.delete(EMPTY_WRITE_OPTIONS, key); - } - } - return null; - }) - .onErrorMap(cause -> new IOException("Failed to delete", cause)) - ) - .singleOrEmpty(), - keySend -> Mono.fromRunnable(keySend::close) - ).elapsed().map(tuple -> { - removeTime.record(tuple.getT1(), TimeUnit.MILLISECONDS); - return tuple.getT2(); - }).doFirst(startedRemove::increment).doFinally(s -> endedRemove.increment()); + // Obtain the previous value from the database + Mono> previousDataMono = this.getPreviousData(keyMono, resultType, true); + // Delete the value from the database + Mono> removeMono = keyMono + .publishOn(dbScheduler) + .handle((keySend, sink) -> { + try (var key = keySend.receive()) { + logger.trace(MARKER_ROCKSDB, "Deleting {}", () -> toStringSafe(key)); + startedRemove.increment(); + try { + removeTime.recordCallable(() -> { + db.delete(EMPTY_WRITE_OPTIONS, key); + return null; + }); + } finally { + endedRemove.increment(); + } + sink.complete(); + } catch (RocksDBException ex) { + sink.error(new RocksDBException("Failed to delete: " + ex.getMessage())); + } catch (Exception ex) { + sink.error(ex); + } + }); + // Read the previous data, then delete the data, then return the previous data + return Flux.concat(previousDataMono, removeMono).singleOrEmpty(); } private Mono> getPreviousData(Mono> keyMono, LLDictionaryResultType resultType, @@ -534,28 +555,22 @@ public class LLLocalDictionary implements LLDictionary { .containsKey(null, keyMono) .single() .map((Boolean bool) -> LLUtils.booleanToResponseByteBuffer(alloc, bool)); - case PREVIOUS_VALUE -> Mono.usingWhen( - keyMono, - keySend -> this - .runOnDb(() -> { - try (var key = keySend.receive()) { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called getPreviousData in a nonblocking thread"); - } - - if (logger.isTraceEnabled()) { - var keyString = toStringSafe(key); - var result = db.get(EMPTY_READ_OPTIONS, key, existsAlmostCertainly); - logger.trace(MARKER_ROCKSDB, "Reading {}: {}", keyString, toStringSafe(result)); - return result == null ? null : result.send(); - } else { - var result = db.get(EMPTY_READ_OPTIONS, key, existsAlmostCertainly); - return result == null ? null : result.send(); - } - } - }) - .onErrorMap(cause -> new IOException("Failed to read ", cause)), - keySend -> Mono.fromRunnable(keySend::close)); + case PREVIOUS_VALUE -> keyMono + .publishOn(dbScheduler) + .handle((keySend, sink) -> { + try (var key = keySend.receive()) { + assert !Schedulers.isInNonBlockingThread() : "Called getPreviousData in a nonblocking thread"; + var result = db.get(EMPTY_READ_OPTIONS, key, existsAlmostCertainly); + logger.trace(MARKER_ROCKSDB, "Reading {}: {}", () -> toStringSafe(key), () -> toStringSafe(result)); + if (result == null) { + sink.complete(); + } else { + sink.next(result.send()); + } + } catch (Exception ex) { + sink.error(ex); + } + }); case VOID -> Mono.empty(); }; } @@ -566,15 +581,14 @@ public class LLLocalDictionary implements LLDictionary { boolean existsAlmostCertainly) { return keys .buffer(MULTI_GET_WINDOW) - .flatMapSequential(keysWindow -> runOnDb(() -> { + .publishOn(dbScheduler) + .>>handle((keysWindow, sink) -> { List keyBufsWindow = new ArrayList<>(keysWindow.size()); for (Send bufferSend : keysWindow) { keyBufsWindow.add(bufferSend.receive()); } try { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called getMulti in a nonblocking thread"); - } + assert !Schedulers.isInNonBlockingThread() : "Called getMulti in a nonblocking thread"; var readOptions = Objects.requireNonNullElse(resolveSnapshot(snapshot), EMPTY_READ_OPTIONS); List results = db.multiGetAsList(readOptions, LLUtils.toArray(keyBufsWindow)); var mappedResults = new ArrayList>(results.size()); @@ -591,84 +605,68 @@ public class LLLocalDictionary implements LLDictionary { } mappedResults.add(valueOpt); } - return mappedResults; + sink.next(mappedResults); + } catch (RocksDBException ex) { + sink.error(new RocksDBException("Failed to read keys: " + ex.getMessage())); } finally { for (Buffer buffer : keyBufsWindow) { buffer.close(); } } }) - .flatMapIterable(list -> list) - .onErrorMap(cause -> new IOException("Failed to read keys", cause)) - .doAfterTerminate(() -> keysWindow.forEach(Send::close)), 2); // Max concurrency is 2 to read data while preparing the next segment; + .flatMapIterable(list -> list); } @Override - public Flux> putMulti(Flux> entries, boolean getOldValues) { + public Mono putMulti(Flux> entries) { return entries .buffer(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP)) - .flatMapSequential(ew -> Mono - .>>fromCallable(() -> { - var entriesWindow = new ArrayList(ew.size()); - for (Send entrySend : ew) { - entriesWindow.add(entrySend.receive()); - } - try { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called putMulti in a nonblocking thread"); - } - ArrayList> oldValues; - if (getOldValues) { - oldValues = new ArrayList<>(entriesWindow.size()); - try (var readOptions = resolveSnapshot(null)) { - for (LLEntry entry : entriesWindow) { - try (var key = entry.getKey().receive()) { - Buffer oldValue = db.get(readOptions, key, false); - if (oldValue != null) { - oldValues.add(LLEntry.of(key, oldValue).send()); - } - } + .publishOn(dbScheduler) + .handle((entriesWindowList, sink) -> { + var entriesWindow = new ArrayList(entriesWindowList.size()); + for (Send entrySend : entriesWindowList) { + entriesWindow.add(entrySend.receive()); + } + try { + assert !Schedulers.isInNonBlockingThread() : "Called putMulti in a nonblocking thread"; + if (USE_WRITE_BATCHES_IN_PUT_MULTI) { + var batch = new CappedWriteBatch(db, + alloc, + CAPPED_WRITE_BATCH_CAP, + RESERVED_WRITE_BATCH_SIZE, + MAX_WRITE_BATCH_SIZE, + BATCH_WRITE_OPTIONS + ); + for (LLEntry entry : entriesWindow) { + var k = entry.getKey(); + var v = entry.getValue(); + if (nettyDirect) { + batch.put(cfh, k, v); + } else { + try (var key = k.receive()) { + try (var value = v.receive()) { + batch.put(cfh, LLUtils.toArray(key), LLUtils.toArray(value)); } } - } else { - oldValues = null; - } - if (USE_WRITE_BATCHES_IN_PUT_MULTI) { - var batch = new CappedWriteBatch(db, - alloc, - CAPPED_WRITE_BATCH_CAP, - RESERVED_WRITE_BATCH_SIZE, - MAX_WRITE_BATCH_SIZE, - BATCH_WRITE_OPTIONS - ); - for (LLEntry entry : entriesWindow) { - var k = entry.getKey(); - var v = entry.getValue(); - if (nettyDirect) { - batch.put(cfh, k, v); - } else { - try (var key = k.receive()) { - try (var value = v.receive()) { - batch.put(cfh, LLUtils.toArray(key), LLUtils.toArray(value)); - } - } - } - } - batch.writeToDbAndClose(); - batch.close(); - } else { - for (LLEntry entry : entriesWindow) { - db.put(EMPTY_WRITE_OPTIONS, entry.getKeyUnsafe(), entry.getValueUnsafe()); - } - } - return oldValues; - } finally { - for (LLEntry llEntry : entriesWindow) { - llEntry.close(); } } - }).subscribeOn(dbScheduler), 2) // Max concurrency is 2 to read data while preparing the next segment - .flatMapIterable(oldValuesList -> oldValuesList); + batch.writeToDbAndClose(); + batch.close(); + } else { + for (LLEntry entry : entriesWindow) { + db.put(EMPTY_WRITE_OPTIONS, entry.getKeyUnsafe(), entry.getValueUnsafe()); + } + } + sink.complete(); + } catch (RocksDBException ex) { + sink.error(new RocksDBException("Failed to write: " + ex.getMessage())); + } finally { + for (LLEntry llEntry : entriesWindow) { + llEntry.close(); + } + } + }) + .then(); } @Override @@ -895,30 +893,22 @@ public class LLLocalDictionary implements LLDictionary { ro.setReadaheadSize(32 * 1024); } ro.setVerifyChecksums(true); - var rocksIteratorTuple = getRocksIterator(alloc, - nettyDirect, ro, range, db - ); - try { - try (var rocksIterator = rocksIteratorTuple.getT1()) { - rocksIterator.seekToFirst(); - rocksIterator.status(); - while (rocksIterator.isValid() && !sink.isCancelled()) { - try { - rocksIterator.status(); - rocksIterator.key(DUMMY_WRITE_ONLY_BYTE_BUFFER); - rocksIterator.status(); - rocksIterator.value(DUMMY_WRITE_ONLY_BYTE_BUFFER); - rocksIterator.status(); - } catch (RocksDBException ex) { - sink.next(new BadBlock(databaseName, Column.special(columnName), null, ex)); - } - rocksIterator.next(); + try (var rocksIteratorTuple = getRocksIterator(nettyDirect, ro, range, db)) { + var rocksIterator = rocksIteratorTuple.iterator(); + rocksIterator.seekToFirst(); + rocksIterator.status(); + while (rocksIterator.isValid() && !sink.isCancelled()) { + try { + rocksIterator.status(); + rocksIterator.key(DUMMY_WRITE_ONLY_BYTE_BUFFER); + rocksIterator.status(); + rocksIterator.value(DUMMY_WRITE_ONLY_BYTE_BUFFER); + rocksIterator.status(); + } catch (RocksDBException ex) { + sink.next(new BadBlock(databaseName, Column.special(columnName), null, ex)); } + rocksIterator.next(); } - } finally { - rocksIteratorTuple.getT2().close(); - rocksIteratorTuple.getT3().close(); - rocksIteratorTuple.getT4().close(); } sink.complete(); } catch (Throwable ex) { @@ -1271,6 +1261,9 @@ public class LLLocalDictionary implements LLDictionary { } } + /** + * This method should not modify or move the writerIndex/readerIndex of the key + */ private static ReleasableSlice setIterateBound(boolean allowNettyDirect, ReadOptions readOpts, IterateBound boundType, Buffer key) { requireNonNull(key); @@ -1281,46 +1274,36 @@ public class LLLocalDictionary implements LLDictionary { assert keyInternalByteBuffer.position() == 0; slice = new DirectSlice(keyInternalByteBuffer, key.readableBytes()); assert slice.size() == key.readableBytes(); - assert slice.compare(new Slice(LLUtils.toArray(key))) == 0; - if (boundType == IterateBound.LOWER) { - readOpts.setIterateLowerBound(slice); - } else { - readOpts.setIterateUpperBound(slice); - } - return new ReleasableSliceImpl(slice, null, key); } else { slice = new Slice(requireNonNull(LLUtils.toArray(key))); - if (boundType == IterateBound.LOWER) { - readOpts.setIterateLowerBound(slice); - } else { - readOpts.setIterateUpperBound(slice); - } - return new ReleasableSliceImpl(slice, null, null); } + if (boundType == IterateBound.LOWER) { + readOpts.setIterateLowerBound(slice); + } else { + readOpts.setIterateUpperBound(slice); + } + return new ReleasableSliceImplWithRelease(slice); } private static ReleasableSlice emptyReleasableSlice() { var arr = new byte[0]; - return new SimpleSliceWithoutRelease(new Slice(arr), null, arr); + return new ReleasableSliceImplWithoutRelease(new Slice(arr)); } - public record SimpleSliceWithoutRelease(AbstractSlice slice, @Nullable Buffer byteBuf, - @Nullable Object additionalData) implements ReleasableSlice {} + /** + * This method should not modify or move the writerIndex/readerIndex of the key + */ + public record ReleasableSliceImplWithoutRelease(AbstractSlice slice) implements ReleasableSlice {} - public record ReleasableSliceImpl(AbstractSlice slice, @Nullable Buffer byteBuf, - @Nullable Object additionalData) implements ReleasableSlice { + /** + * This class should not modify or move the writerIndex/readerIndex of the key + */ + public record ReleasableSliceImplWithRelease(AbstractSlice slice) implements ReleasableSlice { @Override public void close() { - slice.clear(); slice.close(); - if (byteBuf != null) { - byteBuf.close(); - } - if (additionalData instanceof ByteBuffer bb && bb.isDirect()) { - PlatformDependent.freeDirectBuffer(bb); - } } } @@ -1745,15 +1728,16 @@ public class LLLocalDictionary implements LLDictionary { ); } + /** + * This method should not modify or move the writerIndex/readerIndex of the buffers inside the range + */ @NotNull - public static Tuple4 getRocksIterator(BufferAllocator alloc, + public static RocksIteratorTuple getRocksIterator( boolean allowNettyDirect, ReadOptions readOptions, LLRange range, RocksDBColumn db) { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called getRocksIterator in a nonblocking thread"); - } + assert !Schedulers.isInNonBlockingThread() : "Called getRocksIterator in a nonblocking thread"; ReleasableSlice sliceMin; ReleasableSlice sliceMax; if (range.hasMin()) { @@ -1775,6 +1759,6 @@ public class LLLocalDictionary implements LLDictionary { seekTo = () -> {}; rocksIterator.seekToFirst(); } - return Tuples.of(rocksIterator, sliceMin, sliceMax, seekTo); + return new RocksIteratorTuple(rocksIterator, sliceMin, sliceMax, seekTo); } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java index b6753d5..69d527b 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java @@ -91,10 +91,10 @@ public abstract class LLLocalGroupedReactiveRocksIterator extends if (logger.isTraceEnabled()) { logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range)); } - return LLLocalDictionary.getRocksIterator(db.getAllocator(), allowNettyDirect, readOptions, range, db); + return LLLocalDictionary.getRocksIterator(allowNettyDirect, readOptions, range, db); }, (tuple, sink) -> { try { - var rocksIterator = tuple.getT1(); + var rocksIterator = tuple.iterator(); ObjectArrayList values = new ObjectArrayList<>(); Buffer firstGroupKey = null; try { @@ -155,13 +155,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator extends sink.error(ex); } return tuple; - }, tuple -> { - var rocksIterator = tuple.getT1(); - rocksIterator.close(); - tuple.getT2().close(); - tuple.getT3().close(); - tuple.getT4().close(); - }); + }, RocksIteratorTuple::close); } public abstract T getEntry(@Nullable Send key, @Nullable Send value); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java index 7d2f243..7fc6622 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java @@ -3,7 +3,6 @@ package it.cavallium.dbengine.database.disk; import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB; import io.net5.buffer.api.Buffer; -import io.net5.buffer.api.BufferAllocator; import io.net5.buffer.api.Drop; import io.net5.buffer.api.Owned; import io.net5.buffer.api.Send; @@ -12,10 +11,7 @@ import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.jetbrains.annotations.Nullable; -import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ReadOptions; -import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import reactor.core.publisher.Flux; @@ -27,8 +23,8 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends @Override public void drop(LLLocalKeyPrefixReactiveRocksIterator obj) { try { - if (obj.range != null) { - obj.range.close(); + if (obj.rangeShared != null) { + obj.rangeShared.close(); } } catch (Throwable ex) { logger.error("Failed to close range", ex); @@ -57,7 +53,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends private final RocksDBColumn db; private final int prefixLength; - private LLRange range; + private LLRange rangeShared; private final boolean allowNettyDirect; private ReadOptions readOptions; private final boolean canFillCache; @@ -72,7 +68,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends try (range) { this.db = db; this.prefixLength = prefixLength; - this.range = range.receive(); + this.rangeShared = range.receive(); this.allowNettyDirect = allowNettyDirect; this.readOptions = readOptions; this.canFillCache = canFillCache; @@ -81,85 +77,78 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends public Flux> flux() { - return Flux.using( - () -> range.copy().send(), - rangeSend -> Flux - .generate(() -> { - var readOptions = new ReadOptions(this.readOptions); - if (!range.hasMin() || !range.hasMax()) { - readOptions.setReadaheadSize(32 * 1024); // 32KiB - readOptions.setFillCache(canFillCache); + return Flux.generate(() -> { + var readOptions = new ReadOptions(this.readOptions); + if (!rangeShared.hasMin() || !rangeShared.hasMax()) { + readOptions.setReadaheadSize(32 * 1024); // 32KiB + readOptions.setFillCache(canFillCache); + } + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(rangeShared)); + } + return LLLocalDictionary.getRocksIterator(allowNettyDirect, readOptions, rangeShared, db); + }, (tuple, sink) -> { + try { + var rocksIterator = tuple.iterator(); + rocksIterator.status(); + Buffer firstGroupKey = null; + try { + while (rocksIterator.isValid()) { + Buffer key; + if (allowNettyDirect) { + key = LLUtils.readDirectNioBuffer(db.getAllocator(), rocksIterator::key); + } else { + key = LLUtils.fromByteArray(db.getAllocator(), rocksIterator.key()); + } + try (key) { + if (firstGroupKey == null) { + firstGroupKey = key.copy(); + } else if (!LLUtils.equals(firstGroupKey, + firstGroupKey.readerOffset(), + key, + key.readerOffset(), + prefixLength + )) { + break; } - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range)); - } - return LLLocalDictionary.getRocksIterator(db.getAllocator(), allowNettyDirect, readOptions, range, db); - }, (tuple, sink) -> { - try { - var rocksIterator = tuple.getT1(); - rocksIterator.status(); - Buffer firstGroupKey = null; - try { - while (rocksIterator.isValid()) { - Buffer key; - if (allowNettyDirect) { - key = LLUtils.readDirectNioBuffer(db.getAllocator(), rocksIterator::key); - } else { - key = LLUtils.fromByteArray(db.getAllocator(), rocksIterator.key()); - } - try (key) { - if (firstGroupKey == null) { - firstGroupKey = key.copy(); - } else if (!LLUtils.equals(firstGroupKey, firstGroupKey.readerOffset(), key, key.readerOffset(), - prefixLength)) { - break; - } - rocksIterator.next(); - rocksIterator.status(); - } - } + rocksIterator.next(); + rocksIterator.status(); + } + } - if (firstGroupKey != null) { - assert firstGroupKey.isAccessible(); - var groupKeyPrefix = firstGroupKey.copy(firstGroupKey.readerOffset(), prefixLength); - assert groupKeyPrefix.isAccessible(); + if (firstGroupKey != null) { + assert firstGroupKey.isAccessible(); + var groupKeyPrefix = firstGroupKey.copy(firstGroupKey.readerOffset(), prefixLength); + assert groupKeyPrefix.isAccessible(); - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, - "Range {} is reading prefix {}", - LLUtils.toStringSafe(range), - LLUtils.toStringSafe(groupKeyPrefix) - ); - } + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, + "Range {} is reading prefix {}", + LLUtils.toStringSafe(rangeShared), + LLUtils.toStringSafe(groupKeyPrefix) + ); + } - sink.next(groupKeyPrefix.send()); - } else { - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Range {} ended", LLUtils.toStringSafe(range)); - } - sink.complete(); - } - } finally { - if (firstGroupKey != null) { - firstGroupKey.close(); - } - } - } catch (RocksDBException ex) { - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Range {} failed", LLUtils.toStringSafe(range)); - } - sink.error(ex); - } - return tuple; - }, tuple -> { - var rocksIterator = tuple.getT1(); - rocksIterator.close(); - tuple.getT2().close(); - tuple.getT3().close(); - tuple.getT4().close(); - }), - resource -> resource.close() - ); + sink.next(groupKeyPrefix.send()); + } else { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Range {} ended", LLUtils.toStringSafe(rangeShared)); + } + sink.complete(); + } + } finally { + if (firstGroupKey != null) { + firstGroupKey.close(); + } + } + } catch (RocksDBException ex) { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Range {} failed", LLUtils.toStringSafe(rangeShared)); + } + sink.error(ex); + } + return tuple; + }, RocksIteratorTuple::close); } @Override @@ -169,7 +158,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends @Override protected Owned prepareSend() { - var range = this.range.send(); + var range = this.rangeShared.send(); var readOptions = new ReadOptions(this.readOptions); return drop -> new LLLocalKeyPrefixReactiveRocksIterator(db, prefixLength, @@ -181,7 +170,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends } protected void makeInaccessible() { - this.range = null; + this.rangeShared = null; this.readOptions = null; } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java index b367981..a7c5ac2 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java @@ -25,8 +25,8 @@ public abstract class LLLocalReactiveRocksIterator extends @Override public void drop(LLLocalReactiveRocksIterator obj) { try { - if (obj.range != null) { - obj.range.close(); + if (obj.rangeShared != null) { + obj.rangeShared.close(); } } catch (Throwable ex) { logger.error("Failed to close range", ex); @@ -54,7 +54,7 @@ public abstract class LLLocalReactiveRocksIterator extends }; private final RocksDBColumn db; - private LLRange range; + private LLRange rangeShared; private final boolean allowNettyDirect; private ReadOptions readOptions; private final boolean readValues; @@ -66,27 +66,29 @@ public abstract class LLLocalReactiveRocksIterator extends ReadOptions readOptions, boolean readValues) { super((Drop>) (Drop) DROP); - this.db = db; - this.range = range.receive(); - this.allowNettyDirect = allowNettyDirect; - this.readOptions = readOptions; - this.readValues = readValues; + try (range) { + this.db = db; + this.rangeShared = range.receive(); + this.allowNettyDirect = allowNettyDirect; + this.readOptions = readOptions; + this.readValues = readValues; + } } public final Flux flux() { return Flux.generate(() -> { var readOptions = new ReadOptions(this.readOptions); - if (!range.hasMin() || !range.hasMax()) { + if (!rangeShared.hasMin() || !rangeShared.hasMax()) { readOptions.setReadaheadSize(32 * 1024); // 32KiB readOptions.setFillCache(false); } if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range)); + logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(rangeShared)); } - return getRocksIterator(db.getAllocator(), allowNettyDirect, readOptions, range, db); + return getRocksIterator(allowNettyDirect, readOptions, rangeShared, db); }, (tuple, sink) -> { try { - var rocksIterator = tuple.getT1(); + var rocksIterator = tuple.iterator(); rocksIterator.status(); if (rocksIterator.isValid()) { Buffer key; @@ -110,7 +112,7 @@ public abstract class LLLocalReactiveRocksIterator extends if (logger.isTraceEnabled()) { logger.trace(MARKER_ROCKSDB, "Range {} is reading {}: {}", - LLUtils.toStringSafe(range), + LLUtils.toStringSafe(rangeShared), LLUtils.toStringSafe(key), LLUtils.toStringSafe(value) ); @@ -128,24 +130,18 @@ public abstract class LLLocalReactiveRocksIterator extends } } else { if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Range {} ended", LLUtils.toStringSafe(range)); + logger.trace(MARKER_ROCKSDB, "Range {} ended", LLUtils.toStringSafe(rangeShared)); } sink.complete(); } } catch (RocksDBException ex) { if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Range {} failed", LLUtils.toStringSafe(range)); + logger.trace(MARKER_ROCKSDB, "Range {} failed", LLUtils.toStringSafe(rangeShared)); } sink.error(ex); } return tuple; - }, tuple -> { - var rocksIterator = tuple.getT1(); - rocksIterator.close(); - tuple.getT2().close(); - tuple.getT3().close(); - tuple.getT4().close(); - }); + }, RocksIteratorTuple::close); } public abstract T getEntry(@Nullable Send key, @Nullable Send value); @@ -157,7 +153,7 @@ public abstract class LLLocalReactiveRocksIterator extends @Override protected Owned> prepareSend() { - var range = this.range.send(); + var range = this.rangeShared.send(); var readOptions = new ReadOptions(this.readOptions); return drop -> new LLLocalReactiveRocksIterator<>(db, range, allowNettyDirect, readOptions, readValues) { @Override @@ -168,7 +164,7 @@ public abstract class LLLocalReactiveRocksIterator extends } protected void makeInaccessible() { - this.range = null; + this.rangeShared = null; this.readOptions = null; } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/ReleasableSlice.java b/src/main/java/it/cavallium/dbengine/database/disk/ReleasableSlice.java index 0dc925f..dab03b7 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/ReleasableSlice.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/ReleasableSlice.java @@ -10,10 +10,4 @@ public interface ReleasableSlice extends SafeCloseable { default void close() { } - - AbstractSlice slice(); - - Buffer byteBuf(); - - Object additionalData(); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksIteratorTuple.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksIteratorTuple.java new file mode 100644 index 0000000..c5b4fe8 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksIteratorTuple.java @@ -0,0 +1,18 @@ +package it.cavallium.dbengine.database.disk; + +import it.cavallium.dbengine.database.SafeCloseable; +import org.jetbrains.annotations.NotNull; +import org.rocksdb.RocksIterator; + +public record RocksIteratorTuple(@NotNull RocksIterator iterator, @NotNull ReleasableSlice sliceMin, + @NotNull ReleasableSlice sliceMax, @NotNull SafeCloseable seekTo) implements + SafeCloseable { + + @Override + public void close() { + iterator.close(); + sliceMin.close(); + sliceMax.close(); + seekTo.close(); + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java index 977e013..fb9c585 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java @@ -279,19 +279,16 @@ public class LLMemoryDictionary implements LLDictionary { } @Override - public Flux> putMulti(Flux> entries, boolean getOldValues) { - return entries.handle((entryToReceive, sink) -> { + public Mono putMulti(Flux> entries) { + return entries.doOnNext(entryToReceive -> { try (var entry = entryToReceive.receive()) { try (var key = entry.getKey().receive()) { try (var val = entry.getValue().receive()) { - var oldValue = mainDb.put(k(key.copy().send()), k(val.send())); - if (oldValue != null && getOldValues) { - sink.next(LLEntry.of(key.send(), kk(oldValue)).send()); - } + mainDb.put(k(key.copy().send()), k(val.send())); } } } - }); + }).then(); } @Override diff --git a/src/test/java/it/cavallium/dbengine/TestLLDictionary.java b/src/test/java/it/cavallium/dbengine/TestLLDictionary.java index 5c174b5..e7f4378 100644 --- a/src/test/java/it/cavallium/dbengine/TestLLDictionary.java +++ b/src/test/java/it/cavallium/dbengine/TestLLDictionary.java @@ -3,7 +3,8 @@ package it.cavallium.dbengine; import static it.cavallium.dbengine.DbTestUtils.destroyAllocator; import static it.cavallium.dbengine.DbTestUtils.ensureNoLeaks; import static it.cavallium.dbengine.DbTestUtils.newAllocator; -import static it.cavallium.dbengine.SyncUtils.*; +import static it.cavallium.dbengine.SyncUtils.run; +import static it.cavallium.dbengine.SyncUtils.runVoid; import static org.junit.jupiter.api.Assertions.assertEquals; import io.net5.buffer.api.Buffer; @@ -30,9 +31,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; public abstract class TestLLDictionary { @@ -211,17 +210,17 @@ public abstract class TestLLDictionary { var beforeSize = run(dict.sizeRange(null, RANGE_ALL, false)); long afterSize; runVoid(updateMode == UpdateMode.DISALLOW, - dict.update(keyEx, old -> fromString("test-value"), updateReturnMode, true).then() + dict.update(keyEx, old -> fromString("test-value"), updateReturnMode, true).doOnNext(Send::close).then() ); afterSize = run(dict.sizeRange(null, RANGE_ALL, false)); assertEquals(0, afterSize - beforeSize); runVoid(updateMode == UpdateMode.DISALLOW, - dict.update(keyEx, old -> fromString("test-value"), updateReturnMode, false).then() + dict.update(keyEx, old -> fromString("test-value"), updateReturnMode, false).doOnNext(Send::close).then() ); afterSize = run(dict.sizeRange(null, RANGE_ALL, false)); assertEquals(0, afterSize - beforeSize); runVoid(updateMode == UpdateMode.DISALLOW, - dict.update(keyEx, old -> fromString("test-value"), updateReturnMode).then() + dict.update(keyEx, old -> fromString("test-value"), updateReturnMode).doOnNext(Send::close).then() ); afterSize = run(dict.sizeRange(null, RANGE_ALL, false)); assertEquals(0, afterSize - beforeSize); @@ -236,17 +235,17 @@ public abstract class TestLLDictionary { var beforeSize = run(dict.sizeRange(null, RANGE_ALL, false)); long afterSize; runVoid(updateMode == UpdateMode.DISALLOW, - dict.update(keyNonEx, old -> fromString("test-value"), updateReturnMode, true).then() + dict.update(keyNonEx, old -> fromString("test-value"), updateReturnMode, true).doOnNext(Send::close).then() ); afterSize = run(dict.sizeRange(null, RANGE_ALL, false)); assertEquals(expected, afterSize - beforeSize); runVoid(updateMode == UpdateMode.DISALLOW, - dict.update(keyNonEx, old -> fromString("test-value"), updateReturnMode, false).then() + dict.update(keyNonEx, old -> fromString("test-value"), updateReturnMode, false).doOnNext(Send::close).then() ); afterSize = run(dict.sizeRange(null, RANGE_ALL, false)); assertEquals(expected, afterSize - beforeSize); runVoid(updateMode == UpdateMode.DISALLOW, - dict.update(keyNonEx, old -> fromString("test-value"), updateReturnMode).then() + dict.update(keyNonEx, old -> fromString("test-value"), updateReturnMode).doOnNext(Send::close).then() ); afterSize = run(dict.sizeRange(null, RANGE_ALL, false)); assertEquals(expected, afterSize - beforeSize);