diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index d90c5a4..23f2072 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -16,6 +16,7 @@ import io.netty5.buffer.api.Send; import io.netty5.buffer.api.WritableComponent; import io.netty5.buffer.api.internal.Statics; import io.netty5.util.IllegalReferenceCountException; +import it.cavallium.dbengine.database.disk.RocksIteratorTuple; import it.cavallium.dbengine.database.disk.UpdateAtomicResultCurrent; import it.cavallium.dbengine.database.disk.UpdateAtomicResultDelta; import it.cavallium.dbengine.database.disk.UpdateAtomicResultPrevious; @@ -1012,6 +1013,8 @@ public class LLUtils { iterable.forEach(LLUtils::onNextDropped); } else if (next instanceof SafeCloseable safeCloseable) { safeCloseable.close(); + } else if (next instanceof RocksIteratorTuple iteratorTuple) { + iteratorTuple.close(); } else if (next instanceof UpdateAtomicResultDelta delta) { delta.delta().close(); } else if (next instanceof UpdateAtomicResultCurrent cur) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java index f514d72..94e301a 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java @@ -302,25 +302,30 @@ public sealed abstract class AbstractRocksDBColumn implements sliceMax = emptyReleasableSlice(); } var rocksIterator = this.newIterator(readOptions); - SafeCloseable seekFromOrTo; - if (reverse) { - if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMax()) { - seekFromOrTo = Objects.requireNonNullElseGet(rocksIterator.seekFrom(range.getMaxUnsafe()), - () -> ((SafeCloseable) () -> {})); + try { + SafeCloseable seekFromOrTo; + if (reverse) { + if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMax()) { + seekFromOrTo = Objects.requireNonNullElseGet(rocksIterator.seekFrom(range.getMaxUnsafe()), + () -> ((SafeCloseable) () -> {})); + } else { + seekFromOrTo = () -> {}; + rocksIterator.seekToLast(); + } } else { - seekFromOrTo = () -> {}; - rocksIterator.seekToLast(); - } - } else { - if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) { - seekFromOrTo = Objects.requireNonNullElseGet(rocksIterator.seekTo(range.getMinUnsafe()), - () -> ((SafeCloseable) () -> {})); - } else { - seekFromOrTo = () -> {}; - rocksIterator.seekToFirst(); + if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) { + seekFromOrTo = Objects.requireNonNullElseGet(rocksIterator.seekTo(range.getMinUnsafe()), + () -> ((SafeCloseable) () -> {})); + } else { + seekFromOrTo = () -> {}; + rocksIterator.seekToFirst(); + } } + return new RocksIteratorTuple(List.of(readOptions), rocksIterator, sliceMin, sliceMax, seekFromOrTo); + } catch (Throwable ex) { + rocksIterator.close(); + throw ex; } - return new RocksIteratorTuple(List.of(readOptions), rocksIterator, sliceMin, sliceMax, seekFromOrTo); } protected T getDb() { @@ -904,15 +909,20 @@ public sealed abstract class AbstractRocksDBColumn implements ensureOpen(); ensureOwned(readOptions); var it = db.newIterator(cfh, readOptions); - return new RocksDBIterator(it, - nettyDirect, - this.startedIterSeek, - this.endedIterSeek, - this.iterSeekTime, - this.startedIterNext, - this.endedIterNext, - this.iterNextTime - ); + try { + return new RocksDBIterator(it, + nettyDirect, + this.startedIterSeek, + this.endedIterSeek, + this.iterSeekTime, + this.startedIterNext, + this.endedIterNext, + this.iterNextTime + ); + } catch (Throwable ex) { + it.close(); + throw ex; + } } finally { closeLock.unlockRead(closeReadLock); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java index 82b7e44..adede02 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java @@ -89,75 +89,78 @@ public abstract class LLLocalGroupedReactiveRocksIterator extends public final Flux> flux() { - return Flux - .generate(() -> { - var readOptions = generateCustomReadOptions(this.readOptions, true, isBoundedRange(range), smallRange); - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range)); - } - return db.getRocksIterator(allowNettyDirect, readOptions, range, false); - }, (tuple, sink) -> { - try { - var rocksIterator = tuple.iterator(); - ObjectArrayList values = new ObjectArrayList<>(); - Buffer firstGroupKey = null; - try { - while (rocksIterator.isValid()) { - try (Buffer key = LLUtils.readDirectNioBuffer(db.getAllocator(), rocksIterator::key)) { - if (firstGroupKey == null) { - firstGroupKey = key.copy(); - } else if (!LLUtils.equals(firstGroupKey, firstGroupKey.readerOffset(), - key, key.readerOffset(), prefixLength)) { - break; - } - @Nullable Buffer value; - if (readValues) { - value = LLUtils.readDirectNioBuffer(db.getAllocator(), rocksIterator::value); - } else { - value = null; - } + return Flux.generate(() -> { + var readOptions = generateCustomReadOptions(this.readOptions, true, isBoundedRange(range), smallRange); + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range)); + } + return db.getRocksIterator(allowNettyDirect, readOptions, range, false); + }, (tuple, sink) -> { + try { + var rocksIterator = tuple.iterator(); + ObjectArrayList values = new ObjectArrayList<>(); + Buffer firstGroupKey = null; + try { + while (rocksIterator.isValid()) { + try (Buffer key = LLUtils.readDirectNioBuffer(db.getAllocator(), rocksIterator::key)) { + if (firstGroupKey == null) { + firstGroupKey = key.copy(); + } else if (!LLUtils.equals(firstGroupKey, + firstGroupKey.readerOffset(), + key, + key.readerOffset(), + prefixLength + )) { + break; + } + @Nullable Buffer value; + if (readValues) { + value = LLUtils.readDirectNioBuffer(db.getAllocator(), rocksIterator::value); + } else { + value = null; + } - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, - "Range {} is reading {}: {}", - LLUtils.toStringSafe(range), - LLUtils.toStringSafe(key), - LLUtils.toStringSafe(value) - ); - } + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, + "Range {} is reading {}: {}", + LLUtils.toStringSafe(range), + LLUtils.toStringSafe(key), + LLUtils.toStringSafe(value) + ); + } - try { - rocksIterator.next(); - T entry = getEntry(key.send(), value == null ? null : value.send()); - values.add(entry); - } finally { - if (value != null) { - value.close(); - } - } + try { + rocksIterator.next(); + T entry = getEntry(key.send(), value == null ? null : value.send()); + values.add(entry); + } finally { + if (value != null) { + value.close(); } } - } finally { - if (firstGroupKey != null) { - firstGroupKey.close(); - } } - if (!values.isEmpty()) { - sink.next(values); - } else { - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Range {} ended", LLUtils.toStringSafe(range)); - } - sink.complete(); - } - } catch (RocksDBException ex) { - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Range {} failed", LLUtils.toStringSafe(range)); - } - sink.error(ex); } - return tuple; - }, RocksIteratorTuple::close); + } finally { + if (firstGroupKey != null) { + firstGroupKey.close(); + } + } + if (!values.isEmpty()) { + sink.next(values); + } else { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Range {} ended", LLUtils.toStringSafe(range)); + } + sink.complete(); + } + } catch (RocksDBException ex) { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Range {} failed", LLUtils.toStringSafe(range)); + } + sink.error(ex); + } + return tuple; + }, RocksIteratorTuple::close); } public abstract T getEntry(@Nullable Send key, @Nullable Send value); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index eb438a6..e21cd0f 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -667,7 +667,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { if (closed) { return 0d; } - return database.getLongProperty(propertyName); + return database.getAggregatedLongProperty(propertyName) / (double) handles.size(); } catch (RocksDBException e) { if ("NotFound".equals(e.getMessage())) { return 0d; diff --git a/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbFileStore.java b/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbFileStore.java index 033a2dd..6177c08 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbFileStore.java +++ b/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbFileStore.java @@ -668,13 +668,14 @@ public class RocksdbFileStore { private List listKeyInternal() { List keys = new ArrayList<>(); - RocksIterator iterator = db.newIterator(filename); - iterator.seekToFirst(); - while (iterator.isValid()) { - keys.add(new String(iterator.key(), StandardCharsets.US_ASCII).intern()); - iterator.next(); + try (RocksIterator iterator = db.newIterator(filename)) { + iterator.seekToFirst(); + while (iterator.isValid()) { + keys.add(new String(iterator.key(), StandardCharsets.US_ASCII).intern()); + iterator.next(); + } + return keys; } - return keys; } public void append(String name, Buffer buf, int offset, int len) throws IOException {