diff --git a/.gitignore b/.gitignore index 7ce4242..930bc63 100644 --- a/.gitignore +++ b/.gitignore @@ -186,3 +186,5 @@ dbengine.iml /.classpath /.project /.settings/ + +.flattened-pom.xml diff --git a/pom.xml b/pom.xml index 93754f0..e363bc1 100644 --- a/pom.xml +++ b/pom.xml @@ -188,7 +188,7 @@ org.rocksdb rocksdbjni - 7.1.2 + 7.2.2 org.apache.lucene @@ -374,7 +374,7 @@ it.cavallium data-generator-runtime - 1.0.66 + 1.0.68 org.jetbrains @@ -470,7 +470,7 @@ it.cavallium data-generator - 0.9.131 + 0.9.133 generate-lucene-query-sources diff --git a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java index 77ab37f..095255e 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java @@ -277,14 +277,14 @@ public sealed abstract class AbstractRocksDBColumn implements static ReleasableSlice emptyReleasableSlice() { var arr = new byte[0]; - return new ReleasableSliceImplWithoutRelease(new Slice(arr)); + return new ReleasableSliceImplWithRelease(new Slice(arr)); } /** * This method should not modify or move the writerIndex/readerIndex of the buffers inside the range */ @NotNull - public RocksIteratorTuple getRocksIterator(boolean allowNettyDirect, + public RocksIteratorTuple newRocksIterator(boolean allowNettyDirect, ReadOptions readOptions, LLRange range, boolean reverse) throws RocksDBException { @@ -301,9 +301,9 @@ public sealed abstract class AbstractRocksDBColumn implements } else { sliceMax = emptyReleasableSlice(); } + SafeCloseable seekFromOrTo = null; var rocksIterator = this.newIterator(readOptions); try { - SafeCloseable seekFromOrTo; if (reverse) { if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMax()) { seekFromOrTo = Objects.requireNonNullElseGet(rocksIterator.seekFrom(range.getMaxUnsafe()), @@ -324,6 +324,11 @@ public sealed abstract class AbstractRocksDBColumn implements return new RocksIteratorTuple(rocksIterator, sliceMin, sliceMax, seekFromOrTo); } catch (Throwable ex) { rocksIterator.close(); + sliceMax.close(); + sliceMax.close(); + if (seekFromOrTo != null) { + seekFromOrTo.close(); + } throw ex; } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index fd29a9f..ea5e912 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -8,7 +8,6 @@ import static it.cavallium.dbengine.database.LLUtils.isReadOnlyDirect; import static it.cavallium.dbengine.database.LLUtils.toStringSafe; import static it.cavallium.dbengine.database.disk.UpdateAtomicResultMode.DELTA; import static java.util.Objects.requireNonNull; -import static java.util.Objects.requireNonNullElse; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Timer; @@ -35,7 +34,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.ForkJoinPool; @@ -926,16 +924,17 @@ public class LLLocalDictionary implements LLDictionary { } } ro.setVerifyChecksums(true); - try (var rocksIteratorTuple = db.getRocksIterator(nettyDirect, ro, range, false)) { - var rocksIterator = rocksIteratorTuple.iterator(); - rocksIterator.seekToFirst(); - while (rocksIterator.isValid() && !sink.isCancelled()) { - try { - rocksIterator.key(DUMMY_WRITE_ONLY_BYTE_BUFFER); - rocksIterator.value(DUMMY_WRITE_ONLY_BYTE_BUFFER); - rocksIterator.next(); - } catch (RocksDBException ex) { - sink.next(new BadBlock(databaseName, ColumnUtils.special(columnName), null, ex)); + try (var rocksIteratorTuple = db.newRocksIterator(nettyDirect, ro, range, false)) { + try (var rocksIterator = rocksIteratorTuple.iterator()) { + rocksIterator.seekToFirst(); + while (rocksIterator.isValid() && !sink.isCancelled()) { + try { + rocksIterator.key(DUMMY_WRITE_ONLY_BYTE_BUFFER); + rocksIterator.value(DUMMY_WRITE_ONLY_BYTE_BUFFER); + rocksIterator.next(); + } catch (RocksDBException ex) { + sink.next(new BadBlock(databaseName, ColumnUtils.special(columnName), null, ex)); + } } } } @@ -1603,7 +1602,7 @@ public class LLLocalDictionary implements LLDictionary { if (sliceBegin != null) { rangeReadOpts.setIterateLowerBound(sliceBegin); } - if (sliceBegin != null) { + if (sliceEnd != null) { rangeReadOpts.setIterateUpperBound(sliceEnd); } try (var rocksIterator = db.newIterator(rangeReadOpts)) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java index 3a8158c..4bf3a4b 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java @@ -19,7 +19,6 @@ import org.jetbrains.annotations.Nullable; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDBException; import reactor.core.publisher.Flux; -import reactor.util.function.Tuples; public abstract class LLLocalGroupedReactiveRocksIterator extends ResourceSupport, LLLocalGroupedReactiveRocksIterator> { @@ -86,17 +85,16 @@ public abstract class LLLocalGroupedReactiveRocksIterator extends } } - public final Flux> flux() { return Flux.generate(() -> { var readOptions = generateCustomReadOptions(this.readOptions, true, isBoundedRange(range), smallRange); if (logger.isTraceEnabled()) { logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range)); } - return Tuples.of(readOptions, db.getRocksIterator(allowNettyDirect, readOptions, range, false)); + return new RocksIterWithReadOpts(readOptions, db.newRocksIterator(allowNettyDirect, readOptions, range, false)); }, (tuple, sink) -> { try { - var rocksIterator = tuple.getT2().iterator(); + var rocksIterator = tuple.iter().iterator(); ObjectArrayList values = new ObjectArrayList<>(); Buffer firstGroupKey = null; try { @@ -159,10 +157,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator extends sink.error(ex); } return tuple; - }, t -> { - t.getT2().close(); - t.getT1().close(); - }); + }, RocksIterWithReadOpts::close); } public abstract T getEntry(@Nullable Send key, @Nullable Send value); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java index 4cecb0a..a038312 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java @@ -90,10 +90,10 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends if (logger.isTraceEnabled()) { logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(rangeShared)); } - return Tuples.of(readOptions, db.getRocksIterator(allowNettyDirect, readOptions, rangeShared, false)); + return new RocksIterWithReadOpts(readOptions, db.newRocksIterator(allowNettyDirect, readOptions, rangeShared, false)); }, (tuple, sink) -> { try { - var rocksIterator = tuple.getT2().iterator(); + var rocksIterator = tuple.iter().iterator(); Buffer firstGroupKey = null; try { while (rocksIterator.isValid()) { @@ -150,10 +150,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends sink.error(ex); } return tuple; - }, t -> { - t.getT2().close(); - t.getT1().close(); - }); + }, RocksIterWithReadOpts::close); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMigrationReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMigrationReactiveRocksIterator.java index 550dfa0..43c9ba3 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMigrationReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMigrationReactiveRocksIterator.java @@ -1,21 +1,14 @@ package it.cavallium.dbengine.database.disk; -import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB; import static it.cavallium.dbengine.database.LLUtils.generateCustomReadOptions; -import static it.cavallium.dbengine.database.LLUtils.isBoundedRange; -import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.Drop; import io.netty5.buffer.api.Owned; import io.netty5.buffer.api.Send; import io.netty5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.database.LLRange; -import it.cavallium.dbengine.database.LLUtils; -import it.cavallium.dbengine.database.disk.LLLocalMigrationReactiveRocksIterator.ByteEntry; -import java.util.Map; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.jetbrains.annotations.Nullable; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDBException; import reactor.core.publisher.Flux; @@ -76,11 +69,11 @@ public final class LLLocalMigrationReactiveRocksIterator extends public Flux flux() { return Flux.generate(() -> { var readOptions = generateCustomReadOptions(this.readOptions, false, false, false); - return Tuples.of(readOptions, db.getRocksIterator(false, readOptions, rangeShared, false)); + return new RocksIterWithReadOpts(readOptions, db.newRocksIterator(false, readOptions, rangeShared, false)); }, (tuple, sink) -> { try { //noinspection resource - var rocksIterator = tuple.getT2().iterator(); + var rocksIterator = tuple.iter().iterator(); if (rocksIterator.isValid()) { byte[] key = rocksIterator.key(); byte[] value = rocksIterator.value(); @@ -93,11 +86,7 @@ public final class LLLocalMigrationReactiveRocksIterator extends sink.error(ex); } return tuple; - }, t -> { - t.getT2().close(); - t.getT1().close(); - this.close(); - }); + }, RocksIterWithReadOpts::close); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java index b6aeb5f..f613465 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java @@ -87,10 +87,10 @@ public abstract class LLLocalReactiveRocksIterator extends if (logger.isTraceEnabled()) { logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(rangeShared)); } - return Tuples.of(readOptions, db.getRocksIterator(allowNettyDirect, readOptions, rangeShared, reverse)); + return new RocksIterWithReadOpts(readOptions, db.newRocksIterator(allowNettyDirect, readOptions, rangeShared, reverse)); }, (tuple, sink) -> { try { - var rocksIterator = tuple.getT2().iterator(); + var rocksIterator = tuple.iter().iterator(); if (rocksIterator.isValid()) { Buffer key; if (allowNettyDirect) { @@ -145,10 +145,7 @@ public abstract class LLLocalReactiveRocksIterator extends sink.error(ex); } return tuple; - }, t -> { - t.getT2().close(); - t.getT1().close(); - }); + }, RocksIterWithReadOpts::close); } public abstract T getEntry(@Nullable Send key, @Nullable Send value); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java index 47a0c94..ba3f775 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java @@ -3,10 +3,8 @@ package it.cavallium.dbengine.database.disk; import io.micrometer.core.instrument.MeterRegistry; import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.BufferAllocator; -import io.netty5.buffer.api.Send; import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLUtils; -import it.cavallium.dbengine.database.serialization.SerializationFunction; import java.io.IOException; import java.util.List; import org.jetbrains.annotations.NotNull; @@ -16,7 +14,6 @@ import org.rocksdb.CompactRangeOptions; import org.rocksdb.FlushOptions; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDBException; -import org.rocksdb.RocksIterator; import org.rocksdb.WriteBatch; import org.rocksdb.WriteOptions; @@ -26,7 +23,7 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn { * This method should not modify or move the writerIndex/readerIndex of the buffers inside the range */ @NotNull - RocksIteratorTuple getRocksIterator(boolean allowNettyDirect, + RocksIteratorTuple newRocksIterator(boolean allowNettyDirect, ReadOptions readOptions, LLRange range, boolean reverse) throws RocksDBException; diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksIterWithReadOpts.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksIterWithReadOpts.java new file mode 100644 index 0000000..b257021 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksIterWithReadOpts.java @@ -0,0 +1,15 @@ +package it.cavallium.dbengine.database.disk; + +import it.cavallium.dbengine.database.SafeCloseable; +import org.rocksdb.ReadOptions; + +record RocksIterWithReadOpts(ReadOptions readOptions, RocksIteratorTuple iter) implements SafeCloseable { + + @Override + public void close() { + if (readOptions != null) { + readOptions.close(); + } + iter.close(); + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/HugePqPriorityQueue.java b/src/main/java/it/cavallium/dbengine/lucene/HugePqPriorityQueue.java index 766b39f..51aa3d1 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/HugePqPriorityQueue.java +++ b/src/main/java/it/cavallium/dbengine/lucene/HugePqPriorityQueue.java @@ -101,16 +101,17 @@ public class HugePqPriorityQueue implements PriorityQueue, Reversable implements PriorityQueue, Reversable implements PriorityQueue, Reversable iterate(long skips, boolean reverse) { return Flux., RocksIteratorTuple>generate(() -> { - var it = rocksDB.getRocksIterator(true, READ_OPTIONS, LLRange.all(), reverse); + var it = rocksDB.newRocksIterator(true, READ_OPTIONS, LLRange.all(), reverse); var rocksIterator = it.iterator(); if (reverse) { rocksIterator.seekToLast();