diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index bc70f23..767f1d0 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -5,6 +5,7 @@ import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLSnapshot; +import it.cavallium.dbengine.database.disk.LLLocalDictionary; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import java.util.Arrays; import java.util.Map; @@ -214,7 +215,7 @@ public class DatabaseMapDictionaryDeep> implem public Mono at(@Nullable CompositeSnapshot snapshot, T keySuffix) { byte[] keySuffixData = serializeSuffix(keySuffix); Flux keyFlux; - if (this.subStageGetter.needsDebuggingKeyFlux()) { + if (LLLocalDictionary.DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED && this.subStageGetter.needsDebuggingKeyFlux()) { keyFlux = this.dictionary.getRangeKeys(resolveSnapshot(snapshot), toExtRange(keySuffixData)); } else { keyFlux = Flux.empty(); @@ -229,7 +230,7 @@ public class DatabaseMapDictionaryDeep> implem @Override public Flux> getAllStages(@Nullable CompositeSnapshot snapshot) { - if (this.subStageGetter.needsDebuggingKeyFlux()) { + if (LLLocalDictionary.DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED && this.subStageGetter.needsDebuggingKeyFlux()) { return dictionary .getRangeKeysGrouped(resolveSnapshot(snapshot), range, keyPrefix.length + keySuffixLength) .flatMapSequential(rangeKeys -> { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 62cf658..8cd1b16 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -48,8 +48,9 @@ public class LLLocalDictionary implements LLDictionary { static final int CAPPED_WRITE_BATCH_CAP = 50000; // 50K operations static final int MULTI_GET_WINDOW = 500; static final WriteOptions BATCH_WRITE_OPTIONS = new WriteOptions().setLowPri(true); - static final boolean PREFER_ALWAYS_SEEK_TO_FIRST = true; - static final boolean ALWAYS_VERIFY_CHECKSUMS = true; + static final boolean PREFER_SEEK_TO_FIRST = false; + static final boolean VERIFY_CHECKSUMS_WHEN_NOT_NEEDED = false; + public static final boolean DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED = true; private static final int STRIPES = 512; private static final byte[] FIRST_KEY = new byte[]{}; @@ -169,7 +170,7 @@ public class LLLocalDictionary implements LLDictionary { return Mono .fromCallable(() -> { var readOpts = resolveSnapshot(snapshot); - readOpts.setVerifyChecksums(ALWAYS_VERIFY_CHECKSUMS); + readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); readOpts.setFillCache(false); if (range.hasMin()) { readOpts.setIterateLowerBound(new Slice(range.getMin())); @@ -700,7 +701,7 @@ public class LLLocalDictionary implements LLDictionary { return Mono .fromCallable(() -> { var readOpts = getReadOptions(null); - readOpts.setVerifyChecksums(ALWAYS_VERIFY_CHECKSUMS); + readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); // readOpts.setIgnoreRangeDeletions(true); readOpts.setFillCache(false); @@ -754,7 +755,7 @@ public class LLLocalDictionary implements LLDictionary { .fromCallable(() -> { var readOpts = resolveSnapshot(snapshot); readOpts.setFillCache(false); - readOpts.setVerifyChecksums(ALWAYS_VERIFY_CHECKSUMS); + readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); if (range.hasMin()) { readOpts.setIterateLowerBound(new Slice(range.getMin())); } @@ -842,7 +843,7 @@ public class LLLocalDictionary implements LLDictionary { } } else { rocksdbSnapshot.setFillCache(false); - rocksdbSnapshot.setVerifyChecksums(ALWAYS_VERIFY_CHECKSUMS); + rocksdbSnapshot.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); rocksdbSnapshot.setIgnoreRangeDeletions(false); long count = 0; try (RocksIterator iter = db.newIterator(cfh, rocksdbSnapshot)) { @@ -860,7 +861,7 @@ public class LLLocalDictionary implements LLDictionary { private long exactSizeAll(@Nullable LLSnapshot snapshot) { var readOpts = resolveSnapshot(snapshot); readOpts.setFillCache(false); - readOpts.setVerifyChecksums(ALWAYS_VERIFY_CHECKSUMS); + readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); long count = 0; try (RocksIterator iter = db.newIterator(cfh, readOpts)) { @@ -885,7 +886,7 @@ public class LLLocalDictionary implements LLDictionary { readOpts.setIterateUpperBound(new Slice(range.getMax())); } try (RocksIterator iter = db.newIterator(cfh, readOpts)) { - if (!LLLocalDictionary.PREFER_ALWAYS_SEEK_TO_FIRST && range.hasMin()) { + if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { iter.seek(range.getMin()); } else { iter.seekToFirst(); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneGroupedReactiveIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneGroupedReactiveIterator.java index 2967698..670c5a1 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneGroupedReactiveIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneGroupedReactiveIterator.java @@ -4,11 +4,14 @@ import it.cavallium.dbengine.database.LLRange; import it.unimi.dsi.fastutil.objects.ObjectArrayList; import java.util.Arrays; import java.util.List; +import java.util.Optional; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; +import org.rocksdb.RocksMutableObject; import org.rocksdb.Slice; import reactor.core.publisher.Flux; +import reactor.util.function.Tuples; public abstract class LLLocalLuceneGroupedReactiveIterator { @@ -43,50 +46,56 @@ public abstract class LLLocalLuceneGroupedReactiveIterator { public Flux> flux() { return Flux .generate(() -> { - synchronized (this) { - var readOptions = new ReadOptions(this.readOptions); - readOptions.setFillCache(range.hasMin() && range.hasMax()); - if (range.hasMin()) { - readOptions.setIterateLowerBound(new Slice(range.getMin())); - } - if (range.hasMax()) { - readOptions.setIterateUpperBound(new Slice(range.getMax())); - } - var rocksIterator = db.newIterator(cfh, readOptions); - if (!LLLocalDictionary.PREFER_ALWAYS_SEEK_TO_FIRST && range.hasMin()) { - rocksIterator.seek(range.getMin()); - } else { - rocksIterator.seekToFirst(); - } - return rocksIterator; + var readOptions = new ReadOptions(this.readOptions); + readOptions.setFillCache(range.hasMin() && range.hasMax()); + Slice sliceMin; + Slice sliceMax; + if (range.hasMin()) { + sliceMin = new Slice(range.getMin()); + readOptions.setIterateLowerBound(sliceMin); + } else { + sliceMin = null; } - }, (rocksIterator, sink) -> { - synchronized (this) { - ObjectArrayList values = new ObjectArrayList<>(); - byte[] firstGroupKey = null; + if (range.hasMax()) { + sliceMax = new Slice(range.getMax()); + readOptions.setIterateUpperBound(sliceMax); + } else { + sliceMax = null; + } + var rocksIterator = db.newIterator(cfh, readOptions); + if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { + rocksIterator.seek(range.getMin()); + } else { + rocksIterator.seekToFirst(); + } + return Tuples.of(rocksIterator, Optional.ofNullable(sliceMin), Optional.ofNullable(sliceMax)); + }, (tuple, sink) -> { + var rocksIterator = tuple.getT1(); + ObjectArrayList values = new ObjectArrayList<>(); + byte[] firstGroupKey = null; - while (rocksIterator.isValid()) { - byte[] key = rocksIterator.key(); - if (firstGroupKey == null) { - firstGroupKey = key; - } else if (!Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) { - break; - } - byte[] value = readValues ? rocksIterator.value() : EMPTY; - rocksIterator.next(); - values.add(getEntry(key, value)); + while (rocksIterator.isValid()) { + byte[] key = rocksIterator.key(); + if (firstGroupKey == null) { + firstGroupKey = key; + } else if (!Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) { + break; } - if (!values.isEmpty()) { - sink.next(values); - } else { - sink.complete(); - } - return rocksIterator; + byte[] value = readValues ? rocksIterator.value() : EMPTY; + rocksIterator.next(); + values.add(getEntry(key, value)); } - }, rocksIterator1 -> { - synchronized (this) { - rocksIterator1.close(); + if (!values.isEmpty()) { + sink.next(values); + } else { + sink.complete(); } + return tuple; + }, tuple -> { + var rocksIterator = tuple.getT1(); + rocksIterator.close(); + tuple.getT2().ifPresent(RocksMutableObject::close); + tuple.getT3().ifPresent(RocksMutableObject::close); }); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneKeyPrefixesReactiveIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneKeyPrefixesReactiveIterator.java index 2ebb0fc..a18f59b 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneKeyPrefixesReactiveIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneKeyPrefixesReactiveIterator.java @@ -2,11 +2,14 @@ package it.cavallium.dbengine.database.disk; import it.cavallium.dbengine.database.LLRange; import java.util.Arrays; +import java.util.Optional; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; +import org.rocksdb.RocksMutableObject; import org.rocksdb.Slice; import reactor.core.publisher.Flux; +import reactor.util.function.Tuples; public class LLLocalLuceneKeyPrefixesReactiveIterator { @@ -38,50 +41,54 @@ public class LLLocalLuceneKeyPrefixesReactiveIterator { public Flux flux() { return Flux .generate(() -> { - synchronized (this) { - System.out.println(Thread.currentThread().getName()); - var readOptions = new ReadOptions(this.readOptions); - readOptions.setFillCache(range.hasMin() && range.hasMax()); - if (range.hasMin()) { - readOptions.setIterateLowerBound(new Slice(range.getMin())); - } - if (range.hasMax()) { - readOptions.setIterateUpperBound(new Slice(range.getMax())); - } - var rocksIterator = db.newIterator(cfh, readOptions); - if (!LLLocalDictionary.PREFER_ALWAYS_SEEK_TO_FIRST && range.hasMin()) { - rocksIterator.seek(range.getMin()); - } else { - rocksIterator.seekToFirst(); - } - return rocksIterator; + var readOptions = new ReadOptions(this.readOptions); + readOptions.setFillCache(range.hasMin() && range.hasMax()); + Slice sliceMin; + Slice sliceMax; + if (range.hasMin()) { + sliceMin = new Slice(range.getMin()); + readOptions.setIterateLowerBound(sliceMin); + } else { + sliceMin = null; } - }, (rocksIterator, sink) -> { - synchronized (this) { - System.out.println(Thread.currentThread().getName()); - byte[] firstGroupKey = null; + if (range.hasMax()) { + sliceMax = new Slice(range.getMax()); + readOptions.setIterateUpperBound(sliceMax); + } else { + sliceMax = null; + } + var rocksIterator = db.newIterator(cfh, readOptions); + if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { + rocksIterator.seek(range.getMin()); + } else { + rocksIterator.seekToFirst(); + } + return Tuples.of(rocksIterator, Optional.ofNullable(sliceMin), Optional.ofNullable(sliceMax)); + }, (tuple, sink) -> { + var rocksIterator = tuple.getT1(); + byte[] firstGroupKey = null; - while (rocksIterator.isValid()) { - byte[] key = rocksIterator.key(); - if (firstGroupKey == null) { - firstGroupKey = key; - } else if (!Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) { - break; - } - rocksIterator.next(); + while (rocksIterator.isValid()) { + byte[] key = rocksIterator.key(); + if (firstGroupKey == null) { + firstGroupKey = key; + } else if (!Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) { + break; } - if (firstGroupKey != null) { - var groupKeyPrefix = Arrays.copyOf(firstGroupKey, prefixLength); - sink.next(groupKeyPrefix); - } else { - sink.complete(); - } - return rocksIterator; + rocksIterator.next(); } - }, rocksIterator1 -> { - synchronized (this) { - rocksIterator1.close(); + if (firstGroupKey != null) { + var groupKeyPrefix = Arrays.copyOf(firstGroupKey, prefixLength); + sink.next(groupKeyPrefix); + } else { + sink.complete(); } + return tuple; + }, tuple -> { + var rocksIterator = tuple.getT1(); + rocksIterator.close(); + tuple.getT2().ifPresent(RocksMutableObject::close); + tuple.getT3().ifPresent(RocksMutableObject::close); }); } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneReactiveIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneReactiveIterator.java index 1f709a0..be8d7fd 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneReactiveIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneReactiveIterator.java @@ -1,11 +1,14 @@ package it.cavallium.dbengine.database.disk; import it.cavallium.dbengine.database.LLRange; +import java.util.Optional; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; +import org.rocksdb.RocksMutableObject; import org.rocksdb.Slice; import reactor.core.publisher.Flux; +import reactor.util.function.Tuples; public abstract class LLLocalLuceneReactiveIterator { @@ -33,39 +36,45 @@ public abstract class LLLocalLuceneReactiveIterator { public Flux flux() { return Flux .generate(() -> { - synchronized (this) { - var readOptions = new ReadOptions(this.readOptions); - readOptions.setFillCache(range.hasMin() && range.hasMax()); - if (range.hasMin()) { - readOptions.setIterateLowerBound(new Slice(range.getMin())); - } - if (range.hasMax()) { - readOptions.setIterateUpperBound(new Slice(range.getMax())); - } - var rocksIterator = db.newIterator(cfh, readOptions); - if (!LLLocalDictionary.PREFER_ALWAYS_SEEK_TO_FIRST && range.hasMin()) { - rocksIterator.seek(range.getMin()); - } else { - rocksIterator.seekToFirst(); - } - return rocksIterator; + var readOptions = new ReadOptions(this.readOptions); + readOptions.setFillCache(range.hasMin() && range.hasMax()); + Slice sliceMin; + Slice sliceMax; + if (range.hasMin()) { + sliceMin = new Slice(range.getMin()); + readOptions.setIterateLowerBound(sliceMin); + } else { + sliceMin = null; } - }, (rocksIterator, sink) -> { - synchronized (this) { - if (rocksIterator.isValid()) { - byte[] key = rocksIterator.key(); - byte[] value = readValues ? rocksIterator.value() : EMPTY; - rocksIterator.next(); - sink.next(getEntry(key, value)); - } else { - sink.complete(); - } - return rocksIterator; + if (range.hasMax()) { + sliceMax = new Slice(range.getMax()); + readOptions.setIterateUpperBound(sliceMax); + } else { + sliceMax = null; } - }, rocksIterator1 -> { - synchronized (this) { - rocksIterator1.close(); + var rocksIterator = db.newIterator(cfh, readOptions); + if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { + rocksIterator.seek(range.getMin()); + } else { + rocksIterator.seekToFirst(); } + return Tuples.of(rocksIterator, Optional.ofNullable(sliceMin), Optional.ofNullable(sliceMax)); + }, (tuple, sink) -> { + var rocksIterator = tuple.getT1(); + if (rocksIterator.isValid()) { + byte[] key = rocksIterator.key(); + byte[] value = readValues ? rocksIterator.value() : EMPTY; + rocksIterator.next(); + sink.next(getEntry(key, value)); + } else { + sink.complete(); + } + return tuple; + }, tuple -> { + var rocksIterator = tuple.getT1(); + rocksIterator.close(); + tuple.getT2().ifPresent(RocksMutableObject::close); + tuple.getT3().ifPresent(RocksMutableObject::close); }); }