Various local dict optimizations, customize fillCache in containsRange

This commit is contained in:
Andrea Cavalli 2022-01-26 16:06:15 +01:00
parent cf53eb4f5a
commit cdb65b31f3
6 changed files with 79 additions and 85 deletions

View File

@ -126,7 +126,7 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
return replaceRange(range, canKeysChange, entriesReplacer, false);
}
Mono<Boolean> isRangeEmpty(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> range);
Mono<Boolean> isRangeEmpty(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> range, boolean fillCache);
Mono<Long> sizeRange(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> range, boolean fast);

View File

@ -181,7 +181,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<Boolean> isEmpty(@Nullable CompositeSnapshot snapshot) {
return dictionary.isRangeEmpty(resolveSnapshot(snapshot), rangeMono);
return dictionary.isRangeEmpty(resolveSnapshot(snapshot), rangeMono, false);
}
@Override
@ -194,7 +194,8 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
public Mono<Boolean> containsKey(@Nullable CompositeSnapshot snapshot, T keySuffix) {
return dictionary
.isRangeEmpty(resolveSnapshot(snapshot),
Mono.fromCallable(() -> LLRange.singleUnsafe(serializeKeySuffixToKey(keySuffix)).send())
Mono.fromCallable(() -> LLRange.singleUnsafe(serializeKeySuffixToKey(keySuffix)).send()),
true
)
.map(empty -> !empty);
}

View File

@ -287,7 +287,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
@Override
public Mono<Boolean> isEmpty(@Nullable CompositeSnapshot snapshot) {
return dictionary.isRangeEmpty(resolveSnapshot(snapshot), rangeMono);
return dictionary.isRangeEmpty(resolveSnapshot(snapshot), rangeMono, false);
}
@Override

View File

@ -181,14 +181,14 @@ public class DatabaseSingle<U> extends ResourceSupport<DatabaseStage<U>, Databas
@Override
public Mono<Long> leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) {
return dictionary
.isRangeEmpty(resolveSnapshot(snapshot), keyMono.map(LLRange::single).map(ResourceSupport::send))
.isRangeEmpty(resolveSnapshot(snapshot), keyMono.map(LLRange::single).map(ResourceSupport::send), false)
.map(empty -> empty ? 0L : 1L);
}
@Override
public Mono<Boolean> isEmpty(@Nullable CompositeSnapshot snapshot) {
return dictionary
.isRangeEmpty(resolveSnapshot(snapshot), keyMono.map(LLRange::single).map(ResourceSupport::send));
.isRangeEmpty(resolveSnapshot(snapshot), keyMono.map(LLRange::single).map(ResourceSupport::send), true);
}
@Override

View File

@ -12,7 +12,6 @@ import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send;
import io.net5.buffer.api.internal.ResourceSupport;
import io.net5.util.internal.PlatformDependent;
@ -36,7 +35,6 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
@ -65,7 +63,6 @@ import org.rocksdb.Snapshot;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
@ -263,7 +260,7 @@ public class LLLocalDictionary implements LLDictionary {
boolean existsAlmostCertainly) {
return keyMono
.publishOn(dbScheduler)
.<Send<Buffer>>handle((keySend, sink) -> {
.handle((keySend, sink) -> {
try (var key = keySend.receive()) {
try {
var readOptions = requireNonNullElse(resolveSnapshot(snapshot), EMPTY_READ_OPTIONS);
@ -280,90 +277,86 @@ public class LLLocalDictionary implements LLDictionary {
} else {
sink.complete();
}
} catch (RocksDBException ex) {
sink.error(new IOException("Failed to read " + toStringSafe(key) + ": " + ex.getMessage()));
} catch (Exception ex) {
sink.error(new IOException("Failed to read " + toStringSafe(key), ex));
sink.error(ex);
}
}
})
.onErrorMap(cause -> new IOException("Failed to read", cause));
}
@Override
public Mono<Boolean> isRangeEmpty(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono) {
return rangeMono
.publishOn(dbScheduler)
.handle((rangeSend, sink) -> {
try (var range = rangeSend.receive()) {
boolean rangeEmpty = !containsRange(snapshot, range);
sink.next(rangeEmpty);
} catch (Throwable ex) {
sink.error(ex);
}
});
}
public boolean containsRange(@Nullable LLSnapshot snapshot, LLRange range) throws RocksDBException {
assert !Schedulers.isInNonBlockingThread() : "Called containsRange in a nonblocking thread";
startedContains.increment();
try {
var result = containsTime.recordCallable(() -> {
if (range.isSingle()) {
var unmodifiableReadOpts = resolveSnapshot(snapshot);
return db.exists(unmodifiableReadOpts, range.getSingleUnsafe());
} else {
// Temporary resources to release after finished
AbstractSlice<?> slice1 = null;
AbstractSlice<?> slice2 = null;
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
readOpts.setFillCache(false);
if (range.hasMin()) {
var rangeMinInternalByteBuffer = asReadOnlyDirect(range.getMinUnsafe());
if (nettyDirect && rangeMinInternalByteBuffer != null) {
readOpts.setIterateLowerBound(slice1 = new DirectSlice(rangeMinInternalByteBuffer,
range.getMinUnsafe().readableBytes()));
} else {
readOpts.setIterateLowerBound(slice1 = new Slice(LLUtils.toArray(range.getMinUnsafe())));
}
}
if (range.hasMax()) {
var rangeMaxInternalByteBuffer = asReadOnlyDirect(range.getMaxUnsafe());
if (nettyDirect && rangeMaxInternalByteBuffer != null) {
readOpts.setIterateUpperBound(slice2 = new DirectSlice(rangeMaxInternalByteBuffer,
range.getMaxUnsafe().readableBytes()));
} else {
readOpts.setIterateUpperBound(slice2 = new Slice(LLUtils.toArray(range.getMaxUnsafe())));
}
}
try (RocksIterator rocksIterator = db.newIterator(readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
var rangeMinInternalByteBuffer = asReadOnlyDirect(range.getMinUnsafe());
if (nettyDirect && rangeMinInternalByteBuffer != null) {
rocksIterator.seek(rangeMinInternalByteBuffer);
} else {
rocksIterator.seek(LLUtils.toArray(range.getMinUnsafe()));
@Override
public Mono<Boolean> isRangeEmpty(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono, boolean fillCache) {
return rangeMono.publishOn(dbScheduler).handle((rangeSend, sink) -> {
try (var range = rangeSend.receive()) {
assert !Schedulers.isInNonBlockingThread() : "Called isRangeEmpty in a nonblocking thread";
startedContains.increment();
try {
var result = containsTime.recordCallable(() -> {
if (range.isSingle()) {
return !containsKey(snapshot, range.getSingleUnsafe());
} else {
// Temporary resources to release after finished
AbstractSlice<?> slice1 = null;
AbstractSlice<?> slice2 = null;
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
readOpts.setFillCache(fillCache);
if (range.hasMin()) {
var rangeMinInternalByteBuffer = asReadOnlyDirect(range.getMinUnsafe());
if (nettyDirect && rangeMinInternalByteBuffer != null) {
readOpts.setIterateLowerBound(slice1 = new DirectSlice(rangeMinInternalByteBuffer,
range.getMinUnsafe().readableBytes()
));
} else {
readOpts.setIterateLowerBound(slice1 = new Slice(LLUtils.toArray(range.getMinUnsafe())));
}
}
if (range.hasMax()) {
var rangeMaxInternalByteBuffer = asReadOnlyDirect(range.getMaxUnsafe());
if (nettyDirect && rangeMaxInternalByteBuffer != null) {
readOpts.setIterateUpperBound(slice2 = new DirectSlice(rangeMaxInternalByteBuffer,
range.getMaxUnsafe().readableBytes()
));
} else {
readOpts.setIterateUpperBound(slice2 = new Slice(LLUtils.toArray(range.getMaxUnsafe())));
}
}
try (RocksIterator rocksIterator = db.newIterator(readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
var rangeMinInternalByteBuffer = asReadOnlyDirect(range.getMinUnsafe());
if (nettyDirect && rangeMinInternalByteBuffer != null) {
rocksIterator.seek(rangeMinInternalByteBuffer);
} else {
rocksIterator.seek(LLUtils.toArray(range.getMinUnsafe()));
}
} else {
rocksIterator.seekToFirst();
}
rocksIterator.status();
return rocksIterator.isValid();
}
} finally {
if (slice1 != null) {
slice1.close();
}
if (slice2 != null) {
slice2.close();
}
} else {
rocksIterator.seekToFirst();
}
rocksIterator.status();
return rocksIterator.isValid();
}
} finally {
if (slice1 != null) slice1.close();
if (slice2 != null) slice2.close();
}
});
assert result != null;
sink.next(!result);
} finally {
endedContains.increment();
}
});
assert result != null;
return result;
} catch (RocksDBException | RuntimeException e) {
throw e;
} catch (Exception ex) {
throw new RuntimeException(ex);
} finally {
endedContains.increment();
}
} catch (Throwable ex) {
sink.error(ex);
}
});
}
private Mono<Boolean> containsKey(@Nullable LLSnapshot snapshot, Mono<Send<Buffer>> keyMono) {

View File

@ -516,7 +516,7 @@ public class LLMemoryDictionary implements LLDictionary {
}
@Override
public Mono<Boolean> isRangeEmpty(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono) {
public Mono<Boolean> isRangeEmpty(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono, boolean fillCache) {
return getRangeKeys(snapshot, rangeMono)
.map(buf -> {
buf.receive().close();