diff --git a/src/example/java/it.cavallium.dbengine.client/SpeedExample.java b/src/example/java/it.cavallium.dbengine.client/SpeedExample.java index 184fc5a..34f7768 100644 --- a/src/example/java/it.cavallium.dbengine.client/SpeedExample.java +++ b/src/example/java/it.cavallium.dbengine.client/SpeedExample.java @@ -187,7 +187,7 @@ public class SpeedExample { return test("MapDictionaryDeep::at::put (same key, same value, " + batchSize + " times)", tempDb() .flatMap(db -> db.getDictionary("testmap", UpdateMode.DISALLOW).map(dict -> Tuples.of(db, dict))) - .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ser, ssg))), + .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ser, ssg))), tuple -> Flux.range(0, batchSize).flatMap(n -> Mono .defer(() -> Mono .fromRunnable(() -> { @@ -214,7 +214,7 @@ public class SpeedExample { return test("MapDictionaryDeep::putValueAndGetPrevious (same key, same value, " + batchSize + " times)", tempDb() .flatMap(db -> db.getDictionary("testmap", UpdateMode.DISALLOW).map(dict -> Tuples.of(db, dict))) - .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ser, ssg))), + .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ser, ssg))), tuple -> Flux.range(0, batchSize).flatMap(n -> Mono .defer(() -> Mono .fromRunnable(() -> { @@ -233,7 +233,7 @@ public class SpeedExample { } private static Mono testPutValue(int valSize) { - var ssg = new SubStageGetterSingleBytes(); + var ssg = Serializer.noop(); var ser = SerializerFixedBinaryLength.noop(4); var itemKey = new byte[]{0, 1, 2, 3}; var newValue = new byte[valSize]; @@ -243,7 +243,7 @@ public class SpeedExample { return test("MapDictionaryDeep::putValue (same key, same value, " + valSize + " bytes, " + batchSize + " times)", tempDb() .flatMap(db -> db.getDictionary("testmap", UpdateMode.DISALLOW).map(dict -> Tuples.of(db, dict))) - .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ser, ssg))), + .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ser, ssg))), tuple -> Flux.range(0, batchSize).flatMap(n -> Mono .defer(() -> Mono .fromRunnable(() -> { @@ -270,7 +270,7 @@ public class SpeedExample { return test("MapDictionaryDeep::updateValue (same key, alternating value, " + valSize + " bytes, " + batchSize + " times)", tempDb() .flatMap(db -> db.getDictionary("testmap", UpdateMode.ALLOW).map(dict -> Tuples.of(db, dict))) - .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ser, ssg))), + .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ser, ssg))), tuple -> Flux.range(0, batchSize).flatMap(n -> Mono .defer(() -> tuple.getT2().updateValue(itemKey, (old) -> { if (old.isPresent()) { @@ -297,7 +297,7 @@ public class SpeedExample { return test("MapDictionaryDeep::putMulti (batch of " + batchSize + " entries)", tempDb() .flatMap(db -> db.getDictionary("testmap", UpdateMode.DISALLOW).map(dict -> Tuples.of(db, dict))) - .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ser, ssg))), + .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionary.simple(dict, ser, ssg))), tuple -> Mono.defer(() -> tuple.getT2().putMulti(putMultiFlux)), numRepeats, tuple -> Mono diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index b973fba..d04b015 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -178,16 +178,26 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep>> getAllStages(@Nullable CompositeSnapshot snapshot) { return dictionary .getRangeKeys(resolveSnapshot(snapshot), range) - .map(keySuffix -> Map.entry(deserializeSuffix(stripPrefix(keySuffix)), + .map(key -> Map.entry(deserializeSuffix(stripPrefix(key)), new DatabaseSingleMapped<>( new DatabaseSingle<>(dictionary, - toKey(stripPrefix(keySuffix)), + toKey(stripPrefix(key)), Serializer.noop()), valueSerializer ) )); } + @Override + public Flux> getAllValues(@Nullable CompositeSnapshot snapshot) { + return dictionary + .getRange(resolveSnapshot(snapshot), range) + .map(serializedEntry -> Map.entry( + deserializeSuffix(stripPrefix(serializedEntry.getKey())), + valueSerializer.deserialize(serializedEntry.getValue()) + )); + } + @Override public Flux> setAllValuesAndGetPrevious(Flux> entries) { return dictionary diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 929bce5..fac377d 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -614,6 +614,7 @@ public class LLLocalDictionary implements LLDictionary { prefixLength, range, resolveSnapshot(snapshot), + true, "getRangeKeysGrouped" ).flux().subscribeOn(dbScheduler); } @@ -746,7 +747,7 @@ public class LLLocalDictionary implements LLDictionary { // readOpts.setIgnoreRangeDeletions(true); readOpts.setFillCache(false); - readOpts.setReadaheadSize(2 * 1024 * 1024); + //readOpts.setReadaheadSize(2 * 1024 * 1024); try (CappedWriteBatch writeBatch = new CappedWriteBatch(db, CAPPED_WRITE_BATCH_CAP, RESERVED_WRITE_BATCH_SIZE, @@ -925,7 +926,7 @@ public class LLLocalDictionary implements LLDictionary { private long exactSizeAll(@Nullable LLSnapshot snapshot) { var readOpts = resolveSnapshot(snapshot); readOpts.setFillCache(false); - readOpts.setReadaheadSize(2 * 1024 * 1024); + //readOpts.setReadaheadSize(2 * 1024 * 1024); readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); if (PARALLEL_EXACT_SIZE) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedEntryReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedEntryReactiveRocksIterator.java index c1804ee..f91244c 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedEntryReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedEntryReactiveRocksIterator.java @@ -16,7 +16,7 @@ public class LLLocalGroupedEntryReactiveRocksIterator extends LLRange range, ReadOptions readOptions, String debugName) { - super(db, cfh, prefixLength, range, readOptions, true, debugName); + super(db, cfh, prefixLength, range, readOptions, false, true, debugName); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedKeyReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedKeyReactiveRocksIterator.java index 64656df..e6ddf86 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedKeyReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedKeyReactiveRocksIterator.java @@ -13,7 +13,7 @@ public class LLLocalGroupedKeyReactiveRocksIterator extends LLLocalGroupedReacti LLRange range, ReadOptions readOptions, String debugName) { - super(db, cfh, prefixLength, range, readOptions, false, debugName); + super(db, cfh, prefixLength, range, readOptions, true, false, debugName); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java index b8003d8..92b0018 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java @@ -22,6 +22,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator { private final int prefixLength; private final LLRange range; private final ReadOptions readOptions; + private final boolean canFillCache; private final boolean readValues; private final String debugName; @@ -30,6 +31,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator { int prefixLength, LLRange range, ReadOptions readOptions, + boolean canFillCache, boolean readValues, String debugName) { this.db = db; @@ -37,6 +39,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator { this.prefixLength = prefixLength; this.range = range; this.readOptions = readOptions; + this.canFillCache = canFillCache; this.readValues = readValues; this.debugName = debugName; } @@ -47,7 +50,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator { return Flux .generate(() -> { var readOptions = new ReadOptions(this.readOptions); - readOptions.setFillCache(range.hasMin() && range.hasMax()); + readOptions.setFillCache(canFillCache && range.hasMin() && range.hasMax()); Slice sliceMin; Slice sliceMax; if (range.hasMin()) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java index 7c4d62a..8268761 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java @@ -20,6 +20,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator { private final int prefixLength; private final LLRange range; private final ReadOptions readOptions; + private final boolean canFillCache; private final String debugName; public LLLocalKeyPrefixReactiveRocksIterator(RocksDB db, @@ -27,12 +28,14 @@ public class LLLocalKeyPrefixReactiveRocksIterator { int prefixLength, LLRange range, ReadOptions readOptions, + boolean canFillCache, String debugName) { this.db = db; this.cfh = cfh; this.prefixLength = prefixLength; this.range = range; this.readOptions = readOptions; + this.canFillCache = canFillCache; this.debugName = debugName; } @@ -42,8 +45,8 @@ public class LLLocalKeyPrefixReactiveRocksIterator { .generate(() -> { var readOptions = new ReadOptions(this.readOptions); if (!range.hasMin() || !range.hasMax()) { - readOptions.setReadaheadSize(2 * 1024 * 1024); - readOptions.setFillCache(false); + //readOptions.setReadaheadSize(2 * 1024 * 1024); + readOptions.setFillCache(canFillCache); } Slice sliceMin; Slice sliceMax; diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 2dfa8a3..7ea8660 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -91,7 +91,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { if (lowMemory) { this.dbScheduler = lowMemorySupplier.get(); } else { - this.dbScheduler = Schedulers.newBoundedElastic(6, + this.dbScheduler = Schedulers.newBoundedElastic(Math.max(8, Runtime.getRuntime().availableProcessors()), Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "db-" + name, 60, @@ -209,7 +209,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { //options.setUseDirectReads(true); //options.setUseDirectIoForFlushAndCompaction(true); //options.setWritableFileMaxBufferSize(1024 * 1024); // 1MB by default - options.setCompactionReadaheadSize(2 * 1024 * 1024); // recommend at least 2MB + //options.setCompactionReadaheadSize(2 * 1024 * 1024); // recommend at least 2MB final BlockBasedTableConfig tableOptions = new BlockBasedTableConfig(); if (lowMemory) { // LOW MEMORY