Implement ResourceSupport

This commit is contained in:
Andrea Cavalli 2021-10-28 11:44:20 +02:00
parent 0ef0ad6830
commit 417c174761
7 changed files with 205 additions and 62 deletions

View File

@ -83,7 +83,6 @@ public class LLLocalDictionary implements LLDictionary {
static final ReadOptions EMPTY_READ_OPTIONS = new UnreleasableReadOptions(new UnmodifiableReadOptions()); static final ReadOptions EMPTY_READ_OPTIONS = new UnreleasableReadOptions(new UnmodifiableReadOptions());
static final WriteOptions EMPTY_WRITE_OPTIONS = new UnreleasableWriteOptions(new UnmodifiableWriteOptions()); static final WriteOptions EMPTY_WRITE_OPTIONS = new UnreleasableWriteOptions(new UnmodifiableWriteOptions());
static final WriteOptions BATCH_WRITE_OPTIONS = new UnreleasableWriteOptions(new UnmodifiableWriteOptions()); static final WriteOptions BATCH_WRITE_OPTIONS = new UnreleasableWriteOptions(new UnmodifiableWriteOptions());
static final TransactionOptions DEFAULT_TX_OPTIONS = new TransactionOptions();
static final boolean PREFER_SEEK_TO_FIRST = false; static final boolean PREFER_SEEK_TO_FIRST = false;
/** /**
* It used to be false, * It used to be false,
@ -113,7 +112,6 @@ public class LLLocalDictionary implements LLDictionary {
static final boolean PARALLEL_EXACT_SIZE = true; static final boolean PARALLEL_EXACT_SIZE = true;
private static final byte[] FIRST_KEY = new byte[]{}; private static final byte[] FIRST_KEY = new byte[]{};
private static final byte[] NO_DATA = new byte[0];
/** /**
* Default: true * Default: true
@ -134,11 +132,6 @@ public class LLLocalDictionary implements LLDictionary {
private final BufferAllocator alloc; private final BufferAllocator alloc;
private final DatabaseOptions databaseOptions; private final DatabaseOptions databaseOptions;
private final String getRangeMultiDebugName;
private final String getRangeMultiGroupedDebugName;
private final String getRangeKeysDebugName;
private final String getRangeKeysGroupedDebugName;
public LLLocalDictionary( public LLLocalDictionary(
BufferAllocator allocator, BufferAllocator allocator,
@NotNull RocksDBColumn db, @NotNull RocksDBColumn db,
@ -156,10 +149,6 @@ public class LLLocalDictionary implements LLDictionary {
this.dbScheduler = dbScheduler; this.dbScheduler = dbScheduler;
this.snapshotResolver = snapshotResolver; this.snapshotResolver = snapshotResolver;
this.updateMode = updateMode; this.updateMode = updateMode;
this.getRangeMultiDebugName = databaseName + "(" + columnName + ")" + "::getRangeMulti";
this.getRangeMultiGroupedDebugName = databaseName + "(" + columnName + ")" + "::getRangeMultiGrouped";
this.getRangeKeysDebugName = databaseName + "(" + columnName + ")" + "::getRangeKeys";
this.getRangeKeysGroupedDebugName = databaseName + "(" + columnName + ")" + "::getRangeKeysGrouped";
this.databaseOptions = databaseOptions; this.databaseOptions = databaseOptions;
alloc = allocator; alloc = allocator;
} }
@ -818,9 +807,9 @@ public class LLLocalDictionary implements LLDictionary {
return Flux.usingWhen(rangeMono, return Flux.usingWhen(rangeMono,
rangeSend -> Flux.using( rangeSend -> Flux.using(
() -> new LLLocalEntryReactiveRocksIterator(db, rangeSend, () -> new LLLocalEntryReactiveRocksIterator(db, rangeSend,
databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot), getRangeMultiDebugName), databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot)),
llLocalEntryReactiveRocksIterator -> llLocalEntryReactiveRocksIterator.flux().subscribeOn(dbScheduler), llLocalEntryReactiveRocksIterator -> llLocalEntryReactiveRocksIterator.flux().subscribeOn(dbScheduler),
LLLocalReactiveRocksIterator::release LLLocalReactiveRocksIterator::close
).transform(LLUtils::handleDiscard), ).transform(LLUtils::handleDiscard),
rangeSend -> Mono.fromRunnable(rangeSend::close) rangeSend -> Mono.fromRunnable(rangeSend::close)
); );
@ -831,9 +820,9 @@ public class LLLocalDictionary implements LLDictionary {
return Flux.usingWhen(rangeMono, return Flux.usingWhen(rangeMono,
rangeSend -> Flux.using( rangeSend -> Flux.using(
() -> new LLLocalGroupedEntryReactiveRocksIterator(db, prefixLength, rangeSend, () -> new LLLocalGroupedEntryReactiveRocksIterator(db, prefixLength, rangeSend,
databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot), getRangeMultiGroupedDebugName), databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot)),
reactiveRocksIterator -> reactiveRocksIterator.flux().subscribeOn(dbScheduler), reactiveRocksIterator -> reactiveRocksIterator.flux().subscribeOn(dbScheduler),
LLLocalGroupedReactiveRocksIterator::release LLLocalGroupedReactiveRocksIterator::close
).transform(LLUtils::handleDiscard), ).transform(LLUtils::handleDiscard),
rangeSend -> Mono.fromRunnable(rangeSend::close) rangeSend -> Mono.fromRunnable(rangeSend::close)
); );
@ -862,9 +851,9 @@ public class LLLocalDictionary implements LLDictionary {
return Flux.usingWhen(rangeMono, return Flux.usingWhen(rangeMono,
rangeSend -> Flux.using( rangeSend -> Flux.using(
() -> new LLLocalGroupedKeyReactiveRocksIterator(db, prefixLength, rangeSend, () -> new LLLocalGroupedKeyReactiveRocksIterator(db, prefixLength, rangeSend,
databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot), getRangeKeysDebugName), databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot)),
reactiveRocksIterator -> reactiveRocksIterator.flux().subscribeOn(dbScheduler), reactiveRocksIterator -> reactiveRocksIterator.flux().subscribeOn(dbScheduler),
LLLocalGroupedReactiveRocksIterator::release LLLocalGroupedReactiveRocksIterator::close
).transform(LLUtils::handleDiscard), ).transform(LLUtils::handleDiscard),
rangeSend -> Mono.fromRunnable(rangeSend::close) rangeSend -> Mono.fromRunnable(rangeSend::close)
); );
@ -929,11 +918,10 @@ public class LLLocalDictionary implements LLDictionary {
rangeSend, rangeSend,
databaseOptions.allowNettyDirect(), databaseOptions.allowNettyDirect(),
resolveSnapshot(snapshot), resolveSnapshot(snapshot),
true, true
getRangeKeysGroupedDebugName
), ),
LLLocalKeyPrefixReactiveRocksIterator::flux, LLLocalKeyPrefixReactiveRocksIterator::flux,
LLLocalKeyPrefixReactiveRocksIterator::release LLLocalKeyPrefixReactiveRocksIterator::close
) )
.subscribeOn(dbScheduler), .subscribeOn(dbScheduler),
rangeSend -> Mono.fromRunnable(rangeSend::close) rangeSend -> Mono.fromRunnable(rangeSend::close)
@ -964,7 +952,7 @@ public class LLLocalDictionary implements LLDictionary {
databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot) databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot)
), ),
llLocalKeyReactiveRocksIterator -> llLocalKeyReactiveRocksIterator.flux().subscribeOn(dbScheduler), llLocalKeyReactiveRocksIterator -> llLocalKeyReactiveRocksIterator.flux().subscribeOn(dbScheduler),
LLLocalReactiveRocksIterator::release LLLocalReactiveRocksIterator::close
).transform(LLUtils::handleDiscard), ).transform(LLUtils::handleDiscard),
rangeSend -> Mono.fromRunnable(rangeSend::close) rangeSend -> Mono.fromRunnable(rangeSend::close)
); );

View File

@ -2,9 +2,11 @@ package it.cavallium.dbengine.database.disk;
import io.net5.buffer.api.Buffer; import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator; import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.Owned;
import io.net5.buffer.api.Send; import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.LLEntry; import it.cavallium.dbengine.database.LLEntry;
import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.netty.NullableBuffer;
import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions; import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB; import org.rocksdb.RocksDB;
@ -14,8 +16,7 @@ public class LLLocalEntryReactiveRocksIterator extends LLLocalReactiveRocksItera
public LLLocalEntryReactiveRocksIterator(RocksDBColumn db, public LLLocalEntryReactiveRocksIterator(RocksDBColumn db,
Send<LLRange> range, Send<LLRange> range,
boolean allowNettyDirect, boolean allowNettyDirect,
ReadOptions readOptions, ReadOptions readOptions) {
String debugName) {
super(db, range, allowNettyDirect, readOptions, true); super(db, range, allowNettyDirect, readOptions, true);
} }

View File

@ -1,13 +1,10 @@
package it.cavallium.dbengine.database.disk; package it.cavallium.dbengine.database.disk;
import io.net5.buffer.api.Buffer; import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.Send; import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.LLEntry; import it.cavallium.dbengine.database.LLEntry;
import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLRange;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions; import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
public class LLLocalGroupedEntryReactiveRocksIterator extends public class LLLocalGroupedEntryReactiveRocksIterator extends
LLLocalGroupedReactiveRocksIterator<Send<LLEntry>> { LLLocalGroupedReactiveRocksIterator<Send<LLEntry>> {
@ -16,8 +13,7 @@ public class LLLocalGroupedEntryReactiveRocksIterator extends
int prefixLength, int prefixLength,
Send<LLRange> range, Send<LLRange> range,
boolean allowNettyDirect, boolean allowNettyDirect,
ReadOptions readOptions, ReadOptions readOptions) {
String debugName) {
super(db, prefixLength, range, allowNettyDirect, readOptions, false, true); super(db, prefixLength, range, allowNettyDirect, readOptions, false, true);
} }

View File

@ -1,12 +1,9 @@
package it.cavallium.dbengine.database.disk; package it.cavallium.dbengine.database.disk;
import io.net5.buffer.api.Buffer; import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.Send; import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLRange;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions; import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
public class LLLocalGroupedKeyReactiveRocksIterator extends LLLocalGroupedReactiveRocksIterator<Send<Buffer>> { public class LLLocalGroupedKeyReactiveRocksIterator extends LLLocalGroupedReactiveRocksIterator<Send<Buffer>> {
@ -14,8 +11,7 @@ public class LLLocalGroupedKeyReactiveRocksIterator extends LLLocalGroupedReacti
int prefixLength, int prefixLength,
Send<LLRange> range, Send<LLRange> range,
boolean allowNettyDirect, boolean allowNettyDirect,
ReadOptions readOptions, ReadOptions readOptions) {
String debugName) {
super(db, prefixLength, range, allowNettyDirect, readOptions, true, false); super(db, prefixLength, range, allowNettyDirect, readOptions, true, false);
} }

View File

@ -3,32 +3,64 @@ package it.cavallium.dbengine.database.disk;
import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB; import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
import io.net5.buffer.api.Buffer; import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator; import io.net5.buffer.api.Drop;
import io.net5.buffer.api.Owned;
import io.net5.buffer.api.Send; import io.net5.buffer.api.Send;
import io.net5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.unimi.dsi.fastutil.objects.ObjectArrayList; import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.util.List; import java.util.List;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions; import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException; import org.rocksdb.RocksDBException;
import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory; import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
public abstract class LLLocalGroupedReactiveRocksIterator<T> { public abstract class LLLocalGroupedReactiveRocksIterator<T> extends
ResourceSupport<LLLocalGroupedReactiveRocksIterator<T>, LLLocalGroupedReactiveRocksIterator<T>> {
protected static final Logger logger = LoggerFactory.getLogger(LLLocalGroupedReactiveRocksIterator.class); protected static final Logger logger = LoggerFactory.getLogger(LLLocalGroupedReactiveRocksIterator.class);
private static final Drop<LLLocalGroupedReactiveRocksIterator<?>> DROP = new Drop<>() {
@Override
public void drop(LLLocalGroupedReactiveRocksIterator<?> obj) {
try {
if (obj.range != null) {
obj.range.close();
}
} catch (Throwable ex) {
logger.error("Failed to close range", ex);
}
try {
if (obj.readOptions != null) {
obj.readOptions.close();
}
} catch (Throwable ex) {
logger.error("Failed to close readOptions", ex);
}
}
@Override
public Drop<LLLocalGroupedReactiveRocksIterator<?>> fork() {
return this;
}
@Override
public void attach(LLLocalGroupedReactiveRocksIterator<?> obj) {
}
};
private final RocksDBColumn db; private final RocksDBColumn db;
private final int prefixLength; private final int prefixLength;
private final LLRange range; private LLRange range;
private final boolean allowNettyDirect; private final boolean allowNettyDirect;
private final ReadOptions readOptions; private ReadOptions readOptions;
private final boolean canFillCache; private final boolean canFillCache;
private final boolean readValues; private final boolean readValues;
@SuppressWarnings({"unchecked", "rawtypes"})
public LLLocalGroupedReactiveRocksIterator(RocksDBColumn db, public LLLocalGroupedReactiveRocksIterator(RocksDBColumn db,
int prefixLength, int prefixLength,
Send<LLRange> range, Send<LLRange> range,
@ -36,6 +68,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> {
ReadOptions readOptions, ReadOptions readOptions,
boolean canFillCache, boolean canFillCache,
boolean readValues) { boolean readValues) {
super((Drop<LLLocalGroupedReactiveRocksIterator<T>>) (Drop) DROP);
try (range) { try (range) {
this.db = db; this.db = db;
this.prefixLength = prefixLength; this.prefixLength = prefixLength;
@ -48,7 +81,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> {
} }
public Flux<List<T>> flux() { public final Flux<List<T>> flux() {
return Flux return Flux
.generate(() -> { .generate(() -> {
var readOptions = new ReadOptions(this.readOptions); var readOptions = new ReadOptions(this.readOptions);
@ -131,7 +164,32 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> {
public abstract T getEntry(@Nullable Send<Buffer> key, @Nullable Send<Buffer> value); public abstract T getEntry(@Nullable Send<Buffer> key, @Nullable Send<Buffer> value);
public void release() { @Override
range.close(); protected final RuntimeException createResourceClosedException() {
return new IllegalStateException("Closed");
}
@Override
protected Owned<LLLocalGroupedReactiveRocksIterator<T>> prepareSend() {
var range = this.range.send();
var readOptions = new ReadOptions(this.readOptions);
return drop -> new LLLocalGroupedReactiveRocksIterator<>(db,
prefixLength,
range,
allowNettyDirect,
readOptions,
canFillCache,
readValues
) {
@Override
public T getEntry(@Nullable Send<Buffer> key, @Nullable Send<Buffer> value) {
return LLLocalGroupedReactiveRocksIterator.this.getEntry(key, value);
}
};
}
protected void makeInaccessible() {
this.range = null;
this.readOptions = null;
} }
} }

View File

@ -4,9 +4,13 @@ import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
import io.net5.buffer.api.Buffer; import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator; import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.Drop;
import io.net5.buffer.api.Owned;
import io.net5.buffer.api.Send; import io.net5.buffer.api.Send;
import io.net5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions; import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB; import org.rocksdb.RocksDB;
@ -15,24 +19,54 @@ import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory; import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
public class LLLocalKeyPrefixReactiveRocksIterator { public class LLLocalKeyPrefixReactiveRocksIterator extends
ResourceSupport<LLLocalKeyPrefixReactiveRocksIterator, LLLocalKeyPrefixReactiveRocksIterator> {
protected static final Logger logger = LoggerFactory.getLogger(LLLocalKeyPrefixReactiveRocksIterator.class); protected static final Logger logger = LoggerFactory.getLogger(LLLocalKeyPrefixReactiveRocksIterator.class);
private static final Drop<LLLocalKeyPrefixReactiveRocksIterator> DROP = new Drop<>() {
@Override
public void drop(LLLocalKeyPrefixReactiveRocksIterator obj) {
try {
if (obj.range != null) {
obj.range.close();
}
} catch (Throwable ex) {
logger.error("Failed to close range", ex);
}
try {
if (obj.readOptions != null) {
obj.readOptions.close();
}
} catch (Throwable ex) {
logger.error("Failed to close readOptions", ex);
}
}
@Override
public Drop<LLLocalKeyPrefixReactiveRocksIterator> fork() {
return this;
}
@Override
public void attach(LLLocalKeyPrefixReactiveRocksIterator obj) {
}
};
private final RocksDBColumn db; private final RocksDBColumn db;
private final int prefixLength; private final int prefixLength;
private final LLRange range; private LLRange range;
private final boolean allowNettyDirect; private final boolean allowNettyDirect;
private final ReadOptions readOptions; private ReadOptions readOptions;
private final boolean canFillCache; private final boolean canFillCache;
private final String debugName;
public LLLocalKeyPrefixReactiveRocksIterator(RocksDBColumn db, public LLLocalKeyPrefixReactiveRocksIterator(RocksDBColumn db,
int prefixLength, int prefixLength,
Send<LLRange> range, Send<LLRange> range,
boolean allowNettyDirect, boolean allowNettyDirect,
ReadOptions readOptions, ReadOptions readOptions,
boolean canFillCache, boolean canFillCache) {
String debugName) { super(DROP);
try (range) { try (range) {
this.db = db; this.db = db;
this.prefixLength = prefixLength; this.prefixLength = prefixLength;
@ -40,7 +74,6 @@ public class LLLocalKeyPrefixReactiveRocksIterator {
this.allowNettyDirect = allowNettyDirect; this.allowNettyDirect = allowNettyDirect;
this.readOptions = readOptions; this.readOptions = readOptions;
this.canFillCache = canFillCache; this.canFillCache = canFillCache;
this.debugName = debugName;
} }
} }
@ -127,7 +160,26 @@ public class LLLocalKeyPrefixReactiveRocksIterator {
); );
} }
public void release() { @Override
range.close(); protected final RuntimeException createResourceClosedException() {
return new IllegalStateException("Closed");
}
@Override
protected Owned<LLLocalKeyPrefixReactiveRocksIterator> prepareSend() {
var range = this.range.send();
var readOptions = new ReadOptions(this.readOptions);
return drop -> new LLLocalKeyPrefixReactiveRocksIterator(db,
prefixLength,
range,
allowNettyDirect,
readOptions,
canFillCache
);
}
protected void makeInaccessible() {
this.range = null;
this.readOptions = null;
} }
} }

View File

@ -5,10 +5,16 @@ import static it.cavallium.dbengine.database.disk.LLLocalDictionary.getRocksIter
import io.net5.buffer.api.Buffer; import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator; import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.Drop;
import io.net5.buffer.api.Owned;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send; import io.net5.buffer.api.Send;
import io.net5.buffer.api.internal.ResourceSupport;
import io.net5.util.IllegalReferenceCountException; import io.net5.util.IllegalReferenceCountException;
import it.cavallium.dbengine.client.SearchResult;
import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.netty.NullableBuffer;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyHandle;
@ -19,21 +25,53 @@ import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory; import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
public abstract class LLLocalReactiveRocksIterator<T> { public abstract class LLLocalReactiveRocksIterator<T> extends
ResourceSupport<LLLocalReactiveRocksIterator<T>, LLLocalReactiveRocksIterator<T>> {
protected static final Logger logger = LoggerFactory.getLogger(LLLocalReactiveRocksIterator.class); protected static final Logger logger = LoggerFactory.getLogger(LLLocalReactiveRocksIterator.class);
private final AtomicBoolean released = new AtomicBoolean(false); private static final Drop<LLLocalReactiveRocksIterator<?>> DROP = new Drop<>() {
@Override
public void drop(LLLocalReactiveRocksIterator<?> obj) {
try {
if (obj.range != null) {
obj.range.close();
}
} catch (Throwable ex) {
logger.error("Failed to close range", ex);
}
try {
if (obj.readOptions != null) {
obj.readOptions.close();
}
} catch (Throwable ex) {
logger.error("Failed to close readOptions", ex);
}
}
@Override
public Drop<LLLocalReactiveRocksIterator<?>> fork() {
return this;
}
@Override
public void attach(LLLocalReactiveRocksIterator<?> obj) {
}
};
private final RocksDBColumn db; private final RocksDBColumn db;
private final LLRange range; private LLRange range;
private final boolean allowNettyDirect; private final boolean allowNettyDirect;
private final ReadOptions readOptions; private ReadOptions readOptions;
private final boolean readValues; private final boolean readValues;
@SuppressWarnings({"unchecked", "rawtypes"})
public LLLocalReactiveRocksIterator(RocksDBColumn db, public LLLocalReactiveRocksIterator(RocksDBColumn db,
Send<LLRange> range, Send<LLRange> range,
boolean allowNettyDirect, boolean allowNettyDirect,
ReadOptions readOptions, ReadOptions readOptions,
boolean readValues) { boolean readValues) {
super((Drop<LLLocalReactiveRocksIterator<T>>) (Drop) DROP);
this.db = db; this.db = db;
this.range = range.receive(); this.range = range.receive();
this.allowNettyDirect = allowNettyDirect; this.allowNettyDirect = allowNettyDirect;
@ -41,7 +79,7 @@ public abstract class LLLocalReactiveRocksIterator<T> {
this.readValues = readValues; this.readValues = readValues;
} }
public Flux<T> flux() { public final Flux<T> flux() {
return Flux return Flux
.generate(() -> { .generate(() -> {
var readOptions = new ReadOptions(this.readOptions); var readOptions = new ReadOptions(this.readOptions);
@ -119,11 +157,25 @@ public abstract class LLLocalReactiveRocksIterator<T> {
public abstract T getEntry(@Nullable Send<Buffer> key, @Nullable Send<Buffer> value); public abstract T getEntry(@Nullable Send<Buffer> key, @Nullable Send<Buffer> value);
public void release() { @Override
if (released.compareAndSet(false, true)) { protected final RuntimeException createResourceClosedException() {
range.close(); return new IllegalStateException("Closed");
} else { }
throw new IllegalReferenceCountException(0, -1);
} @Override
protected Owned<LLLocalReactiveRocksIterator<T>> prepareSend() {
var range = this.range.send();
var readOptions = new ReadOptions(this.readOptions);
return drop -> new LLLocalReactiveRocksIterator<>(db, range, allowNettyDirect, readOptions, readValues) {
@Override
public T getEntry(@Nullable Send<Buffer> key, @Nullable Send<Buffer> value) {
return LLLocalReactiveRocksIterator.this.getEntry(key, value);
}
};
}
protected void makeInaccessible() {
this.range = null;
this.readOptions = null;
} }
} }