From cdb65b31f377355101d44d8361d4c79c3dd26537 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Wed, 26 Jan 2022 16:06:15 +0100 Subject: [PATCH] Various local dict optimizations, customize fillCache in containsRange --- .../dbengine/database/LLDictionary.java | 2 +- .../collections/DatabaseMapDictionary.java | 5 +- .../DatabaseMapDictionaryDeep.java | 2 +- .../database/collections/DatabaseSingle.java | 4 +- .../database/disk/LLLocalDictionary.java | 149 +++++++++--------- .../database/memory/LLMemoryDictionary.java | 2 +- 6 files changed, 79 insertions(+), 85 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java index e865b5a..0d5e29e 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java @@ -126,7 +126,7 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure { return replaceRange(range, canKeysChange, entriesReplacer, false); } - Mono isRangeEmpty(@Nullable LLSnapshot snapshot, Mono> range); + Mono isRangeEmpty(@Nullable LLSnapshot snapshot, Mono> range, boolean fillCache); Mono sizeRange(@Nullable LLSnapshot snapshot, Mono> range, boolean fast); diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index 5abd110..149023a 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -181,7 +181,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep isEmpty(@Nullable CompositeSnapshot snapshot) { - return dictionary.isRangeEmpty(resolveSnapshot(snapshot), rangeMono); + return dictionary.isRangeEmpty(resolveSnapshot(snapshot), rangeMono, false); } @Override @@ -194,7 +194,8 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep containsKey(@Nullable CompositeSnapshot snapshot, T keySuffix) { return dictionary .isRangeEmpty(resolveSnapshot(snapshot), - Mono.fromCallable(() -> LLRange.singleUnsafe(serializeKeySuffixToKey(keySuffix)).send()) + Mono.fromCallable(() -> LLRange.singleUnsafe(serializeKeySuffixToKey(keySuffix)).send()), + true ) .map(empty -> !empty); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index baa2b6a..3de6aa9 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -287,7 +287,7 @@ public class DatabaseMapDictionaryDeep> extend @Override public Mono isEmpty(@Nullable CompositeSnapshot snapshot) { - return dictionary.isRangeEmpty(resolveSnapshot(snapshot), rangeMono); + return dictionary.isRangeEmpty(resolveSnapshot(snapshot), rangeMono, false); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java index a51f0f9..f745eb5 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java @@ -181,14 +181,14 @@ public class DatabaseSingle extends ResourceSupport, Databas @Override public Mono leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) { return dictionary - .isRangeEmpty(resolveSnapshot(snapshot), keyMono.map(LLRange::single).map(ResourceSupport::send)) + .isRangeEmpty(resolveSnapshot(snapshot), keyMono.map(LLRange::single).map(ResourceSupport::send), false) .map(empty -> empty ? 0L : 1L); } @Override public Mono isEmpty(@Nullable CompositeSnapshot snapshot) { return dictionary - .isRangeEmpty(resolveSnapshot(snapshot), keyMono.map(LLRange::single).map(ResourceSupport::send)); + .isRangeEmpty(resolveSnapshot(snapshot), keyMono.map(LLRange::single).map(ResourceSupport::send), true); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index d5dd575..ad36710 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -12,7 +12,6 @@ import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Timer; import io.net5.buffer.api.Buffer; import io.net5.buffer.api.BufferAllocator; -import io.net5.buffer.api.Resource; import io.net5.buffer.api.Send; import io.net5.buffer.api.internal.ResourceSupport; import io.net5.util.internal.PlatformDependent; @@ -36,7 +35,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.concurrent.Callable; @@ -65,7 +63,6 @@ import org.rocksdb.Snapshot; import org.rocksdb.WriteBatch; import org.rocksdb.WriteOptions; import reactor.core.publisher.Flux; -import reactor.core.publisher.Hooks; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; @@ -263,7 +260,7 @@ public class LLLocalDictionary implements LLDictionary { boolean existsAlmostCertainly) { return keyMono .publishOn(dbScheduler) - .>handle((keySend, sink) -> { + .handle((keySend, sink) -> { try (var key = keySend.receive()) { try { var readOptions = requireNonNullElse(resolveSnapshot(snapshot), EMPTY_READ_OPTIONS); @@ -280,90 +277,86 @@ public class LLLocalDictionary implements LLDictionary { } else { sink.complete(); } + } catch (RocksDBException ex) { + sink.error(new IOException("Failed to read " + toStringSafe(key) + ": " + ex.getMessage())); } catch (Exception ex) { - sink.error(new IOException("Failed to read " + toStringSafe(key), ex)); + sink.error(ex); } } - }) - .onErrorMap(cause -> new IOException("Failed to read", cause)); - } - - @Override - public Mono isRangeEmpty(@Nullable LLSnapshot snapshot, Mono> rangeMono) { - return rangeMono - .publishOn(dbScheduler) - .handle((rangeSend, sink) -> { - try (var range = rangeSend.receive()) { - boolean rangeEmpty = !containsRange(snapshot, range); - sink.next(rangeEmpty); - } catch (Throwable ex) { - sink.error(ex); - } }); } - public boolean containsRange(@Nullable LLSnapshot snapshot, LLRange range) throws RocksDBException { - assert !Schedulers.isInNonBlockingThread() : "Called containsRange in a nonblocking thread"; - startedContains.increment(); - try { - var result = containsTime.recordCallable(() -> { - if (range.isSingle()) { - var unmodifiableReadOpts = resolveSnapshot(snapshot); - return db.exists(unmodifiableReadOpts, range.getSingleUnsafe()); - } else { - // Temporary resources to release after finished - AbstractSlice slice1 = null; - AbstractSlice slice2 = null; - try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { - readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); - readOpts.setFillCache(false); - if (range.hasMin()) { - var rangeMinInternalByteBuffer = asReadOnlyDirect(range.getMinUnsafe()); - if (nettyDirect && rangeMinInternalByteBuffer != null) { - readOpts.setIterateLowerBound(slice1 = new DirectSlice(rangeMinInternalByteBuffer, - range.getMinUnsafe().readableBytes())); - } else { - readOpts.setIterateLowerBound(slice1 = new Slice(LLUtils.toArray(range.getMinUnsafe()))); - } - } - if (range.hasMax()) { - var rangeMaxInternalByteBuffer = asReadOnlyDirect(range.getMaxUnsafe()); - if (nettyDirect && rangeMaxInternalByteBuffer != null) { - readOpts.setIterateUpperBound(slice2 = new DirectSlice(rangeMaxInternalByteBuffer, - range.getMaxUnsafe().readableBytes())); - } else { - readOpts.setIterateUpperBound(slice2 = new Slice(LLUtils.toArray(range.getMaxUnsafe()))); - } - } - try (RocksIterator rocksIterator = db.newIterator(readOpts)) { - if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - var rangeMinInternalByteBuffer = asReadOnlyDirect(range.getMinUnsafe()); - if (nettyDirect && rangeMinInternalByteBuffer != null) { - rocksIterator.seek(rangeMinInternalByteBuffer); - } else { - rocksIterator.seek(LLUtils.toArray(range.getMinUnsafe())); + @Override + public Mono isRangeEmpty(@Nullable LLSnapshot snapshot, Mono> rangeMono, boolean fillCache) { + return rangeMono.publishOn(dbScheduler).handle((rangeSend, sink) -> { + try (var range = rangeSend.receive()) { + assert !Schedulers.isInNonBlockingThread() : "Called isRangeEmpty in a nonblocking thread"; + startedContains.increment(); + try { + var result = containsTime.recordCallable(() -> { + if (range.isSingle()) { + return !containsKey(snapshot, range.getSingleUnsafe()); + } else { + // Temporary resources to release after finished + AbstractSlice slice1 = null; + AbstractSlice slice2 = null; + + try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { + readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); + readOpts.setFillCache(fillCache); + if (range.hasMin()) { + var rangeMinInternalByteBuffer = asReadOnlyDirect(range.getMinUnsafe()); + if (nettyDirect && rangeMinInternalByteBuffer != null) { + readOpts.setIterateLowerBound(slice1 = new DirectSlice(rangeMinInternalByteBuffer, + range.getMinUnsafe().readableBytes() + )); + } else { + readOpts.setIterateLowerBound(slice1 = new Slice(LLUtils.toArray(range.getMinUnsafe()))); + } + } + if (range.hasMax()) { + var rangeMaxInternalByteBuffer = asReadOnlyDirect(range.getMaxUnsafe()); + if (nettyDirect && rangeMaxInternalByteBuffer != null) { + readOpts.setIterateUpperBound(slice2 = new DirectSlice(rangeMaxInternalByteBuffer, + range.getMaxUnsafe().readableBytes() + )); + } else { + readOpts.setIterateUpperBound(slice2 = new Slice(LLUtils.toArray(range.getMaxUnsafe()))); + } + } + try (RocksIterator rocksIterator = db.newIterator(readOpts)) { + if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { + var rangeMinInternalByteBuffer = asReadOnlyDirect(range.getMinUnsafe()); + if (nettyDirect && rangeMinInternalByteBuffer != null) { + rocksIterator.seek(rangeMinInternalByteBuffer); + } else { + rocksIterator.seek(LLUtils.toArray(range.getMinUnsafe())); + } + } else { + rocksIterator.seekToFirst(); + } + rocksIterator.status(); + return rocksIterator.isValid(); + } + } finally { + if (slice1 != null) { + slice1.close(); + } + if (slice2 != null) { + slice2.close(); } - } else { - rocksIterator.seekToFirst(); } - rocksIterator.status(); - return rocksIterator.isValid(); } - } finally { - if (slice1 != null) slice1.close(); - if (slice2 != null) slice2.close(); - } + }); + assert result != null; + sink.next(!result); + } finally { + endedContains.increment(); } - }); - assert result != null; - return result; - } catch (RocksDBException | RuntimeException e) { - throw e; - } catch (Exception ex) { - throw new RuntimeException(ex); - } finally { - endedContains.increment(); - } + } catch (Throwable ex) { + sink.error(ex); + } + }); } private Mono containsKey(@Nullable LLSnapshot snapshot, Mono> keyMono) { diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java index f193c61..977e013 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java @@ -516,7 +516,7 @@ public class LLMemoryDictionary implements LLDictionary { } @Override - public Mono isRangeEmpty(@Nullable LLSnapshot snapshot, Mono> rangeMono) { + public Mono isRangeEmpty(@Nullable LLSnapshot snapshot, Mono> rangeMono, boolean fillCache) { return getRangeKeys(snapshot, rangeMono) .map(buf -> { buf.receive().close();