This commit is contained in:
Andrea Cavalli 2022-05-04 12:36:32 +02:00
parent a1c0e19adc
commit 02f1276181
5 changed files with 112 additions and 95 deletions

View File

@ -16,6 +16,7 @@ import io.netty5.buffer.api.Send;
import io.netty5.buffer.api.WritableComponent; import io.netty5.buffer.api.WritableComponent;
import io.netty5.buffer.api.internal.Statics; import io.netty5.buffer.api.internal.Statics;
import io.netty5.util.IllegalReferenceCountException; import io.netty5.util.IllegalReferenceCountException;
import it.cavallium.dbengine.database.disk.RocksIteratorTuple;
import it.cavallium.dbengine.database.disk.UpdateAtomicResultCurrent; import it.cavallium.dbengine.database.disk.UpdateAtomicResultCurrent;
import it.cavallium.dbengine.database.disk.UpdateAtomicResultDelta; import it.cavallium.dbengine.database.disk.UpdateAtomicResultDelta;
import it.cavallium.dbengine.database.disk.UpdateAtomicResultPrevious; import it.cavallium.dbengine.database.disk.UpdateAtomicResultPrevious;
@ -1012,6 +1013,8 @@ public class LLUtils {
iterable.forEach(LLUtils::onNextDropped); iterable.forEach(LLUtils::onNextDropped);
} else if (next instanceof SafeCloseable safeCloseable) { } else if (next instanceof SafeCloseable safeCloseable) {
safeCloseable.close(); safeCloseable.close();
} else if (next instanceof RocksIteratorTuple iteratorTuple) {
iteratorTuple.close();
} else if (next instanceof UpdateAtomicResultDelta delta) { } else if (next instanceof UpdateAtomicResultDelta delta) {
delta.delta().close(); delta.delta().close();
} else if (next instanceof UpdateAtomicResultCurrent cur) { } else if (next instanceof UpdateAtomicResultCurrent cur) {

View File

@ -302,25 +302,30 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
sliceMax = emptyReleasableSlice(); sliceMax = emptyReleasableSlice();
} }
var rocksIterator = this.newIterator(readOptions); var rocksIterator = this.newIterator(readOptions);
SafeCloseable seekFromOrTo; try {
if (reverse) { SafeCloseable seekFromOrTo;
if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMax()) { if (reverse) {
seekFromOrTo = Objects.requireNonNullElseGet(rocksIterator.seekFrom(range.getMaxUnsafe()), if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMax()) {
() -> ((SafeCloseable) () -> {})); seekFromOrTo = Objects.requireNonNullElseGet(rocksIterator.seekFrom(range.getMaxUnsafe()),
() -> ((SafeCloseable) () -> {}));
} else {
seekFromOrTo = () -> {};
rocksIterator.seekToLast();
}
} else { } else {
seekFromOrTo = () -> {}; if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) {
rocksIterator.seekToLast(); seekFromOrTo = Objects.requireNonNullElseGet(rocksIterator.seekTo(range.getMinUnsafe()),
} () -> ((SafeCloseable) () -> {}));
} else { } else {
if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) { seekFromOrTo = () -> {};
seekFromOrTo = Objects.requireNonNullElseGet(rocksIterator.seekTo(range.getMinUnsafe()), rocksIterator.seekToFirst();
() -> ((SafeCloseable) () -> {})); }
} else {
seekFromOrTo = () -> {};
rocksIterator.seekToFirst();
} }
return new RocksIteratorTuple(List.of(readOptions), rocksIterator, sliceMin, sliceMax, seekFromOrTo);
} catch (Throwable ex) {
rocksIterator.close();
throw ex;
} }
return new RocksIteratorTuple(List.of(readOptions), rocksIterator, sliceMin, sliceMax, seekFromOrTo);
} }
protected T getDb() { protected T getDb() {
@ -904,15 +909,20 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
ensureOpen(); ensureOpen();
ensureOwned(readOptions); ensureOwned(readOptions);
var it = db.newIterator(cfh, readOptions); var it = db.newIterator(cfh, readOptions);
return new RocksDBIterator(it, try {
nettyDirect, return new RocksDBIterator(it,
this.startedIterSeek, nettyDirect,
this.endedIterSeek, this.startedIterSeek,
this.iterSeekTime, this.endedIterSeek,
this.startedIterNext, this.iterSeekTime,
this.endedIterNext, this.startedIterNext,
this.iterNextTime this.endedIterNext,
); this.iterNextTime
);
} catch (Throwable ex) {
it.close();
throw ex;
}
} finally { } finally {
closeLock.unlockRead(closeReadLock); closeLock.unlockRead(closeReadLock);
} }

View File

@ -89,75 +89,78 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> extends
public final Flux<List<T>> flux() { public final Flux<List<T>> flux() {
return Flux return Flux.generate(() -> {
.generate(() -> { var readOptions = generateCustomReadOptions(this.readOptions, true, isBoundedRange(range), smallRange);
var readOptions = generateCustomReadOptions(this.readOptions, true, isBoundedRange(range), smallRange); if (logger.isTraceEnabled()) {
if (logger.isTraceEnabled()) { logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range));
logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range)); }
} return db.getRocksIterator(allowNettyDirect, readOptions, range, false);
return db.getRocksIterator(allowNettyDirect, readOptions, range, false); }, (tuple, sink) -> {
}, (tuple, sink) -> { try {
try { var rocksIterator = tuple.iterator();
var rocksIterator = tuple.iterator(); ObjectArrayList<T> values = new ObjectArrayList<>();
ObjectArrayList<T> values = new ObjectArrayList<>(); Buffer firstGroupKey = null;
Buffer firstGroupKey = null; try {
try { while (rocksIterator.isValid()) {
while (rocksIterator.isValid()) { try (Buffer key = LLUtils.readDirectNioBuffer(db.getAllocator(), rocksIterator::key)) {
try (Buffer key = LLUtils.readDirectNioBuffer(db.getAllocator(), rocksIterator::key)) { if (firstGroupKey == null) {
if (firstGroupKey == null) { firstGroupKey = key.copy();
firstGroupKey = key.copy(); } else if (!LLUtils.equals(firstGroupKey,
} else if (!LLUtils.equals(firstGroupKey, firstGroupKey.readerOffset(), firstGroupKey.readerOffset(),
key, key.readerOffset(), prefixLength)) { key,
break; key.readerOffset(),
} prefixLength
@Nullable Buffer value; )) {
if (readValues) { break;
value = LLUtils.readDirectNioBuffer(db.getAllocator(), rocksIterator::value); }
} else { @Nullable Buffer value;
value = null; if (readValues) {
} value = LLUtils.readDirectNioBuffer(db.getAllocator(), rocksIterator::value);
} else {
value = null;
}
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, logger.trace(MARKER_ROCKSDB,
"Range {} is reading {}: {}", "Range {} is reading {}: {}",
LLUtils.toStringSafe(range), LLUtils.toStringSafe(range),
LLUtils.toStringSafe(key), LLUtils.toStringSafe(key),
LLUtils.toStringSafe(value) LLUtils.toStringSafe(value)
); );
} }
try { try {
rocksIterator.next(); rocksIterator.next();
T entry = getEntry(key.send(), value == null ? null : value.send()); T entry = getEntry(key.send(), value == null ? null : value.send());
values.add(entry); values.add(entry);
} finally { } finally {
if (value != null) { if (value != null) {
value.close(); value.close();
}
}
} }
} }
} finally {
if (firstGroupKey != null) {
firstGroupKey.close();
}
} }
if (!values.isEmpty()) {
sink.next(values);
} else {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Range {} ended", LLUtils.toStringSafe(range));
}
sink.complete();
}
} catch (RocksDBException ex) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Range {} failed", LLUtils.toStringSafe(range));
}
sink.error(ex);
} }
return tuple; } finally {
}, RocksIteratorTuple::close); if (firstGroupKey != null) {
firstGroupKey.close();
}
}
if (!values.isEmpty()) {
sink.next(values);
} else {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Range {} ended", LLUtils.toStringSafe(range));
}
sink.complete();
}
} catch (RocksDBException ex) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Range {} failed", LLUtils.toStringSafe(range));
}
sink.error(ex);
}
return tuple;
}, RocksIteratorTuple::close);
} }
public abstract T getEntry(@Nullable Send<Buffer> key, @Nullable Send<Buffer> value); public abstract T getEntry(@Nullable Send<Buffer> key, @Nullable Send<Buffer> value);

View File

@ -667,7 +667,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
if (closed) { if (closed) {
return 0d; return 0d;
} }
return database.getLongProperty(propertyName); return database.getAggregatedLongProperty(propertyName) / (double) handles.size();
} catch (RocksDBException e) { } catch (RocksDBException e) {
if ("NotFound".equals(e.getMessage())) { if ("NotFound".equals(e.getMessage())) {
return 0d; return 0d;

View File

@ -668,13 +668,14 @@ public class RocksdbFileStore {
private List<String> listKeyInternal() { private List<String> listKeyInternal() {
List<String> keys = new ArrayList<>(); List<String> keys = new ArrayList<>();
RocksIterator iterator = db.newIterator(filename); try (RocksIterator iterator = db.newIterator(filename)) {
iterator.seekToFirst(); iterator.seekToFirst();
while (iterator.isValid()) { while (iterator.isValid()) {
keys.add(new String(iterator.key(), StandardCharsets.US_ASCII).intern()); keys.add(new String(iterator.key(), StandardCharsets.US_ASCII).intern());
iterator.next(); iterator.next();
}
return keys;
} }
return keys;
} }
public void append(String name, Buffer buf, int offset, int len) throws IOException { public void append(String name, Buffer buf, int offset, int len) throws IOException {