Fix more leaks

This commit is contained in:
Andrea Cavalli 2022-05-11 20:32:56 +02:00
parent bfc78f1465
commit 6a5a9a3e94
11 changed files with 76 additions and 78 deletions

2
.gitignore vendored
View File

@ -186,3 +186,5 @@ dbengine.iml
/.classpath /.classpath
/.project /.project
/.settings/ /.settings/
.flattened-pom.xml

View File

@ -188,7 +188,7 @@
<dependency> <dependency>
<groupId>org.rocksdb</groupId> <groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId> <artifactId>rocksdbjni</artifactId>
<version>7.1.2</version> <version>7.2.2</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.lucene</groupId> <groupId>org.apache.lucene</groupId>
@ -374,7 +374,7 @@
<dependency> <dependency>
<groupId>it.cavallium</groupId> <groupId>it.cavallium</groupId>
<artifactId>data-generator-runtime</artifactId> <artifactId>data-generator-runtime</artifactId>
<version>1.0.66</version> <version>1.0.68</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.jetbrains</groupId> <groupId>org.jetbrains</groupId>
@ -470,7 +470,7 @@
<plugin> <plugin>
<groupId>it.cavallium</groupId> <groupId>it.cavallium</groupId>
<artifactId>data-generator</artifactId> <artifactId>data-generator</artifactId>
<version>0.9.131</version> <version>0.9.133</version>
<executions> <executions>
<execution> <execution>
<id>generate-lucene-query-sources</id> <id>generate-lucene-query-sources</id>

View File

@ -277,14 +277,14 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
static ReleasableSlice emptyReleasableSlice() { static ReleasableSlice emptyReleasableSlice() {
var arr = new byte[0]; var arr = new byte[0];
return new ReleasableSliceImplWithoutRelease(new Slice(arr)); return new ReleasableSliceImplWithRelease(new Slice(arr));
} }
/** /**
* This method should not modify or move the writerIndex/readerIndex of the buffers inside the range * This method should not modify or move the writerIndex/readerIndex of the buffers inside the range
*/ */
@NotNull @NotNull
public RocksIteratorTuple getRocksIterator(boolean allowNettyDirect, public RocksIteratorTuple newRocksIterator(boolean allowNettyDirect,
ReadOptions readOptions, ReadOptions readOptions,
LLRange range, LLRange range,
boolean reverse) throws RocksDBException { boolean reverse) throws RocksDBException {
@ -301,9 +301,9 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
} else { } else {
sliceMax = emptyReleasableSlice(); sliceMax = emptyReleasableSlice();
} }
SafeCloseable seekFromOrTo = null;
var rocksIterator = this.newIterator(readOptions); var rocksIterator = this.newIterator(readOptions);
try { try {
SafeCloseable seekFromOrTo;
if (reverse) { if (reverse) {
if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMax()) { if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMax()) {
seekFromOrTo = Objects.requireNonNullElseGet(rocksIterator.seekFrom(range.getMaxUnsafe()), seekFromOrTo = Objects.requireNonNullElseGet(rocksIterator.seekFrom(range.getMaxUnsafe()),
@ -324,6 +324,11 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
return new RocksIteratorTuple(rocksIterator, sliceMin, sliceMax, seekFromOrTo); return new RocksIteratorTuple(rocksIterator, sliceMin, sliceMax, seekFromOrTo);
} catch (Throwable ex) { } catch (Throwable ex) {
rocksIterator.close(); rocksIterator.close();
sliceMax.close();
sliceMax.close();
if (seekFromOrTo != null) {
seekFromOrTo.close();
}
throw ex; throw ex;
} }
} }

View File

@ -8,7 +8,6 @@ import static it.cavallium.dbengine.database.LLUtils.isReadOnlyDirect;
import static it.cavallium.dbengine.database.LLUtils.toStringSafe; import static it.cavallium.dbengine.database.LLUtils.toStringSafe;
import static it.cavallium.dbengine.database.disk.UpdateAtomicResultMode.DELTA; import static it.cavallium.dbengine.database.disk.UpdateAtomicResultMode.DELTA;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import static java.util.Objects.requireNonNullElse;
import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.Timer;
@ -35,7 +34,6 @@ import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinPool;
@ -926,16 +924,17 @@ public class LLLocalDictionary implements LLDictionary {
} }
} }
ro.setVerifyChecksums(true); ro.setVerifyChecksums(true);
try (var rocksIteratorTuple = db.getRocksIterator(nettyDirect, ro, range, false)) { try (var rocksIteratorTuple = db.newRocksIterator(nettyDirect, ro, range, false)) {
var rocksIterator = rocksIteratorTuple.iterator(); try (var rocksIterator = rocksIteratorTuple.iterator()) {
rocksIterator.seekToFirst(); rocksIterator.seekToFirst();
while (rocksIterator.isValid() && !sink.isCancelled()) { while (rocksIterator.isValid() && !sink.isCancelled()) {
try { try {
rocksIterator.key(DUMMY_WRITE_ONLY_BYTE_BUFFER); rocksIterator.key(DUMMY_WRITE_ONLY_BYTE_BUFFER);
rocksIterator.value(DUMMY_WRITE_ONLY_BYTE_BUFFER); rocksIterator.value(DUMMY_WRITE_ONLY_BYTE_BUFFER);
rocksIterator.next(); rocksIterator.next();
} catch (RocksDBException ex) { } catch (RocksDBException ex) {
sink.next(new BadBlock(databaseName, ColumnUtils.special(columnName), null, ex)); sink.next(new BadBlock(databaseName, ColumnUtils.special(columnName), null, ex));
}
} }
} }
} }
@ -1603,7 +1602,7 @@ public class LLLocalDictionary implements LLDictionary {
if (sliceBegin != null) { if (sliceBegin != null) {
rangeReadOpts.setIterateLowerBound(sliceBegin); rangeReadOpts.setIterateLowerBound(sliceBegin);
} }
if (sliceBegin != null) { if (sliceEnd != null) {
rangeReadOpts.setIterateUpperBound(sliceEnd); rangeReadOpts.setIterateUpperBound(sliceEnd);
} }
try (var rocksIterator = db.newIterator(rangeReadOpts)) { try (var rocksIterator = db.newIterator(rangeReadOpts)) {

View File

@ -19,7 +19,6 @@ import org.jetbrains.annotations.Nullable;
import org.rocksdb.ReadOptions; import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException; import org.rocksdb.RocksDBException;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.util.function.Tuples;
public abstract class LLLocalGroupedReactiveRocksIterator<T> extends public abstract class LLLocalGroupedReactiveRocksIterator<T> extends
ResourceSupport<LLLocalGroupedReactiveRocksIterator<T>, LLLocalGroupedReactiveRocksIterator<T>> { ResourceSupport<LLLocalGroupedReactiveRocksIterator<T>, LLLocalGroupedReactiveRocksIterator<T>> {
@ -86,17 +85,16 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> extends
} }
} }
public final Flux<List<T>> flux() { public final Flux<List<T>> flux() {
return Flux.generate(() -> { return Flux.generate(() -> {
var readOptions = generateCustomReadOptions(this.readOptions, true, isBoundedRange(range), smallRange); var readOptions = generateCustomReadOptions(this.readOptions, true, isBoundedRange(range), smallRange);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range)); logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range));
} }
return Tuples.of(readOptions, db.getRocksIterator(allowNettyDirect, readOptions, range, false)); return new RocksIterWithReadOpts(readOptions, db.newRocksIterator(allowNettyDirect, readOptions, range, false));
}, (tuple, sink) -> { }, (tuple, sink) -> {
try { try {
var rocksIterator = tuple.getT2().iterator(); var rocksIterator = tuple.iter().iterator();
ObjectArrayList<T> values = new ObjectArrayList<>(); ObjectArrayList<T> values = new ObjectArrayList<>();
Buffer firstGroupKey = null; Buffer firstGroupKey = null;
try { try {
@ -159,10 +157,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> extends
sink.error(ex); sink.error(ex);
} }
return tuple; return tuple;
}, t -> { }, RocksIterWithReadOpts::close);
t.getT2().close();
t.getT1().close();
});
} }
public abstract T getEntry(@Nullable Send<Buffer> key, @Nullable Send<Buffer> value); public abstract T getEntry(@Nullable Send<Buffer> key, @Nullable Send<Buffer> value);

View File

@ -90,10 +90,10 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(rangeShared)); logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(rangeShared));
} }
return Tuples.of(readOptions, db.getRocksIterator(allowNettyDirect, readOptions, rangeShared, false)); return new RocksIterWithReadOpts(readOptions, db.newRocksIterator(allowNettyDirect, readOptions, rangeShared, false));
}, (tuple, sink) -> { }, (tuple, sink) -> {
try { try {
var rocksIterator = tuple.getT2().iterator(); var rocksIterator = tuple.iter().iterator();
Buffer firstGroupKey = null; Buffer firstGroupKey = null;
try { try {
while (rocksIterator.isValid()) { while (rocksIterator.isValid()) {
@ -150,10 +150,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends
sink.error(ex); sink.error(ex);
} }
return tuple; return tuple;
}, t -> { }, RocksIterWithReadOpts::close);
t.getT2().close();
t.getT1().close();
});
} }
@Override @Override

View File

@ -1,21 +1,14 @@
package it.cavallium.dbengine.database.disk; package it.cavallium.dbengine.database.disk;
import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
import static it.cavallium.dbengine.database.LLUtils.generateCustomReadOptions; import static it.cavallium.dbengine.database.LLUtils.generateCustomReadOptions;
import static it.cavallium.dbengine.database.LLUtils.isBoundedRange;
import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.Drop; import io.netty5.buffer.api.Drop;
import io.netty5.buffer.api.Owned; import io.netty5.buffer.api.Owned;
import io.netty5.buffer.api.Send; import io.netty5.buffer.api.Send;
import io.netty5.buffer.api.internal.ResourceSupport; import io.netty5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLLocalMigrationReactiveRocksIterator.ByteEntry;
import java.util.Map;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ReadOptions; import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException; import org.rocksdb.RocksDBException;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
@ -76,11 +69,11 @@ public final class LLLocalMigrationReactiveRocksIterator extends
public Flux<ByteEntry> flux() { public Flux<ByteEntry> flux() {
return Flux.generate(() -> { return Flux.generate(() -> {
var readOptions = generateCustomReadOptions(this.readOptions, false, false, false); var readOptions = generateCustomReadOptions(this.readOptions, false, false, false);
return Tuples.of(readOptions, db.getRocksIterator(false, readOptions, rangeShared, false)); return new RocksIterWithReadOpts(readOptions, db.newRocksIterator(false, readOptions, rangeShared, false));
}, (tuple, sink) -> { }, (tuple, sink) -> {
try { try {
//noinspection resource //noinspection resource
var rocksIterator = tuple.getT2().iterator(); var rocksIterator = tuple.iter().iterator();
if (rocksIterator.isValid()) { if (rocksIterator.isValid()) {
byte[] key = rocksIterator.key(); byte[] key = rocksIterator.key();
byte[] value = rocksIterator.value(); byte[] value = rocksIterator.value();
@ -93,11 +86,7 @@ public final class LLLocalMigrationReactiveRocksIterator extends
sink.error(ex); sink.error(ex);
} }
return tuple; return tuple;
}, t -> { }, RocksIterWithReadOpts::close);
t.getT2().close();
t.getT1().close();
this.close();
});
} }
@Override @Override

View File

@ -87,10 +87,10 @@ public abstract class LLLocalReactiveRocksIterator<T> extends
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(rangeShared)); logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(rangeShared));
} }
return Tuples.of(readOptions, db.getRocksIterator(allowNettyDirect, readOptions, rangeShared, reverse)); return new RocksIterWithReadOpts(readOptions, db.newRocksIterator(allowNettyDirect, readOptions, rangeShared, reverse));
}, (tuple, sink) -> { }, (tuple, sink) -> {
try { try {
var rocksIterator = tuple.getT2().iterator(); var rocksIterator = tuple.iter().iterator();
if (rocksIterator.isValid()) { if (rocksIterator.isValid()) {
Buffer key; Buffer key;
if (allowNettyDirect) { if (allowNettyDirect) {
@ -145,10 +145,7 @@ public abstract class LLLocalReactiveRocksIterator<T> extends
sink.error(ex); sink.error(ex);
} }
return tuple; return tuple;
}, t -> { }, RocksIterWithReadOpts::close);
t.getT2().close();
t.getT1().close();
});
} }
public abstract T getEntry(@Nullable Send<Buffer> key, @Nullable Send<Buffer> value); public abstract T getEntry(@Nullable Send<Buffer> key, @Nullable Send<Buffer> value);

View File

@ -3,10 +3,8 @@ package it.cavallium.dbengine.database.disk;
import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.MeterRegistry;
import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.BufferAllocator; import io.netty5.buffer.api.BufferAllocator;
import io.netty5.buffer.api.Send;
import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
@ -16,7 +14,6 @@ import org.rocksdb.CompactRangeOptions;
import org.rocksdb.FlushOptions; import org.rocksdb.FlushOptions;
import org.rocksdb.ReadOptions; import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException; import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.WriteBatch; import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions; import org.rocksdb.WriteOptions;
@ -26,7 +23,7 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn {
* This method should not modify or move the writerIndex/readerIndex of the buffers inside the range * This method should not modify or move the writerIndex/readerIndex of the buffers inside the range
*/ */
@NotNull @NotNull
RocksIteratorTuple getRocksIterator(boolean allowNettyDirect, RocksIteratorTuple newRocksIterator(boolean allowNettyDirect,
ReadOptions readOptions, ReadOptions readOptions,
LLRange range, LLRange range,
boolean reverse) throws RocksDBException; boolean reverse) throws RocksDBException;

View File

@ -0,0 +1,15 @@
package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.SafeCloseable;
import org.rocksdb.ReadOptions;
record RocksIterWithReadOpts(ReadOptions readOptions, RocksIteratorTuple iter) implements SafeCloseable {
@Override
public void close() {
if (readOptions != null) {
readOptions.close();
}
iter.close();
}
}

View File

@ -101,16 +101,17 @@ public class HugePqPriorityQueue<T> implements PriorityQueue<T>, Reversable<Reve
} }
private T databaseTop() { private T databaseTop() {
try (var it = rocksDB.getRocksIterator(true, READ_OPTIONS, LLRange.all(), false)) { try (var it = rocksDB.newRocksIterator(true, READ_OPTIONS, LLRange.all(), false)) {
var rocksIterator = it.iterator(); try (var rocksIterator = it.iterator()) {
rocksIterator.seekToFirst(); rocksIterator.seekToFirst();
if (rocksIterator.isValid()) { if (rocksIterator.isValid()) {
var key = rocksIterator.key(); var key = rocksIterator.key();
try (var keyBuf = rocksDB.getAllocator().copyOf(key)) { try (var keyBuf = rocksDB.getAllocator().copyOf(key)) {
return deserializeKey(keyBuf); return deserializeKey(keyBuf);
}
} else {
return null;
} }
} else {
return null;
} }
} catch (RocksDBException e) { } catch (RocksDBException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
@ -120,18 +121,19 @@ public class HugePqPriorityQueue<T> implements PriorityQueue<T>, Reversable<Reve
@Override @Override
public T pop() { public T pop() {
ensureThread(); ensureThread();
try (var it = rocksDB.getRocksIterator(true, READ_OPTIONS, LLRange.all(), false)) { try (var it = rocksDB.newRocksIterator(true, READ_OPTIONS, LLRange.all(), false)) {
var rocksIterator = it.iterator(); try (var rocksIterator = it.iterator()) {
rocksIterator.seekToFirst(); rocksIterator.seekToFirst();
if (rocksIterator.isValid()) { if (rocksIterator.isValid()) {
var key = rocksIterator.key(); var key = rocksIterator.key();
try (var keyBuf = rocksDB.getAllocator().copyOf(key)) { try (var keyBuf = rocksDB.getAllocator().copyOf(key)) {
rocksDB.updateAtomic(READ_OPTIONS, WRITE_OPTIONS, keyBuf, this::reduceOrRemove, UpdateAtomicResultMode.NOTHING); rocksDB.updateAtomic(READ_OPTIONS, WRITE_OPTIONS, keyBuf, this::reduceOrRemove, UpdateAtomicResultMode.NOTHING);
--size; --size;
return deserializeKey(keyBuf); return deserializeKey(keyBuf);
}
} else {
return null;
} }
} else {
return null;
} }
} catch (RocksDBException | IOException e) { } catch (RocksDBException | IOException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
@ -231,7 +233,7 @@ public class HugePqPriorityQueue<T> implements PriorityQueue<T>, Reversable<Reve
private Flux<T> iterate(long skips, boolean reverse) { private Flux<T> iterate(long skips, boolean reverse) {
return Flux.<List<T>, RocksIteratorTuple>generate(() -> { return Flux.<List<T>, RocksIteratorTuple>generate(() -> {
var it = rocksDB.getRocksIterator(true, READ_OPTIONS, LLRange.all(), reverse); var it = rocksDB.newRocksIterator(true, READ_OPTIONS, LLRange.all(), reverse);
var rocksIterator = it.iterator(); var rocksIterator = it.iterator();
if (reverse) { if (reverse) {
rocksIterator.seekToLast(); rocksIterator.seekToLast();