diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index 60bcaa3..e9e21a4 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -6,6 +6,7 @@ import io.net5.buffer.api.Drop; import io.net5.buffer.api.Owned; import io.net5.buffer.api.Resource; import io.net5.buffer.api.Send; +import io.net5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.LLDictionary; @@ -13,13 +14,14 @@ import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; -import io.net5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang3.function.TriFunction; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; @@ -73,6 +75,7 @@ public class DatabaseMapDictionaryDeep> extend protected final LLDictionary dictionary; private final BufferAllocator alloc; + private final AtomicLong totalZeroBytesErrors = new AtomicLong(); protected final SubStageGetter subStageGetter; protected final SerializerFixedBinaryLength keySuffixSerializer; protected final int keyPrefixLength; @@ -437,4 +440,50 @@ public class DatabaseMapDictionaryDeep> extend this.range = null; this.onClose = null; } + + public static Flux getAllLeaves2(DatabaseMapDictionaryDeep, DatabaseMapDictionary> deepMap, + CompositeSnapshot snapshot, + TriFunction merger) { + if (deepMap.subStageGetter instanceof SubStageGetterMap subStageGetterMap) { + var keySuffix1Serializer = deepMap.keySuffixSerializer; + var keySuffix2Serializer = subStageGetterMap.keySerializer; + var valueSerializer = subStageGetterMap.valueSerializer; + return deepMap + .dictionary + .getRange(deepMap.resolveSnapshot(snapshot), deepMap.rangeMono) + .handle((entrySend, sink) -> { + try (var entry = entrySend.receive()) { + var keyBuf = entry.getKeyUnsafe(); + var valueBuf = entry.getValueUnsafe(); + assert keyBuf != null; + keyBuf.skipReadable(deepMap.keyPrefixLength); + K1 key1; + K2 key2; + try (var key1Buf = keyBuf.split(deepMap.keySuffixLength)) { + key1 = keySuffix1Serializer.deserialize(key1Buf); + } + key2 = keySuffix2Serializer.deserialize(keyBuf); + assert valueBuf != null; + var value = valueSerializer.deserialize(valueBuf); + sink.next(merger.apply(key1, key2, value)); + } catch (IndexOutOfBoundsException ex) { + var exMessage = ex.getMessage(); + if (exMessage != null && exMessage.contains("read 0 to 0, write 0 to ")) { + var totalZeroBytesErrors = deepMap.totalZeroBytesErrors.incrementAndGet(); + if (totalZeroBytesErrors < 512 || totalZeroBytesErrors % 10000 == 0) { + LOG.error("Unexpected zero-bytes value in column " + deepMap.dictionary.getDatabaseName() + ":" + + deepMap.dictionary.getColumnName() + " total=" + totalZeroBytesErrors); + } + sink.complete(); + } else { + sink.error(ex); + } + } catch (SerializationException ex) { + sink.error(ex); + } + }); + } else { + throw new IllegalArgumentException(); + } + } } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java index ad0c905..81c7bc1 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java @@ -16,8 +16,8 @@ import reactor.core.publisher.Mono; public class SubStageGetterMap implements SubStageGetter, DatabaseMapDictionary> { - private final SerializerFixedBinaryLength keySerializer; - private final Serializer valueSerializer; + final SerializerFixedBinaryLength keySerializer; + final Serializer valueSerializer; public SubStageGetterMap(SerializerFixedBinaryLength keySerializer, Serializer valueSerializer) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index e63d1ab..c8b8442 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -137,7 +137,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { } } - var optionsWithCache = openRocksDb(path, databaseOptions); + OptionsWithCache optionsWithCache = openRocksDb(path, databaseOptions); var rocksdbOptions = optionsWithCache.options(); try { List descriptors = new LinkedList<>(); @@ -175,7 +175,6 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { tableOptions .setIndexType(IndexType.kTwoLevelIndexSearch) .setPartitionFilters(true) - .setBlockCache(optionsWithCache.cache()) .setCacheIndexAndFilterBlocks(databaseOptions.setCacheIndexAndFilterBlocks().orElse(true)) .setCacheIndexAndFilterBlocksWithHighPriority(true) .setPinTopLevelIndexAndFilter(true) @@ -185,7 +184,6 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { tableOptions .setIndexType(IndexType.kTwoLevelIndexSearch) .setPartitionFilters(true) - .setBlockCache(optionsWithCache.cache()) .setCacheIndexAndFilterBlocks(databaseOptions.setCacheIndexAndFilterBlocks().orElse( true)) .setCacheIndexAndFilterBlocksWithHighPriority(true) .setPinTopLevelIndexAndFilter(true) @@ -195,7 +193,10 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { tableOptions.setFilterPolicy(bloomFilter); tableOptions.setOptimizeFiltersForMemory(true); } - tableOptions.setBlockSize(16 * 1024); // 16KiB + tableOptions + .setBlockCacheCompressed(optionsWithCache.compressedCache()) + .setBlockCache(optionsWithCache.standardCache()) + .setBlockSize(16 * 1024); // 16KiB columnOptions.setTableFormatConfig(tableOptions); columnOptions.setCompactionPriority(CompactionPriority.MinOverlappingRatio); @@ -391,7 +392,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { // end force compact } - record OptionsWithCache(DBOptions options, Cache cache) {} + record OptionsWithCache(DBOptions options, Cache standardCache, Cache compressedCache) {} private static OptionsWithCache openRocksDb(@Nullable Path path, DatabaseOptions databaseOptions) throws IOException { // Get databases directory path @@ -409,6 +410,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { // the Options class contains a set of configurable DB options // that determines the behaviour of the database. var options = new DBOptions(); + options.setParanoidChecks(false); options.setCreateIfMissing(true); options.setCreateMissingColumnFamilies(true); options.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL); @@ -427,6 +429,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { options.setMaxOpenFiles(databaseOptions.maxOpenFiles().orElse(-1)); Cache blockCache; + Cache compressedCache; if (databaseOptions.lowMemory()) { // LOW MEMORY options @@ -438,7 +441,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { .setWalSizeLimitMB(0) // 16MB .setMaxTotalWalSize(0) // automatic ; - blockCache = new ClockCache(databaseOptions.blockCache().orElse( 8L * SizeUnit.MB), -1, true); + blockCache = new ClockCache(databaseOptions.blockCache().orElse( 8L * SizeUnit.MB) / 2, -1, true); + compressedCache = new ClockCache(databaseOptions.blockCache().orElse( 8L * SizeUnit.MB) / 2, -1, true); if (databaseOptions.useDirectIO()) { options @@ -460,7 +464,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { .setWalSizeLimitMB(1024) // 1024MB .setMaxTotalWalSize(2L * SizeUnit.GB) // 2GiB max wal directory size ; - blockCache = new ClockCache(databaseOptions.blockCache().orElse( 512 * SizeUnit.MB), -1, true); + blockCache = new ClockCache(databaseOptions.blockCache().orElse( 512 * SizeUnit.MB) / 2); + compressedCache = new ClockCache(databaseOptions.blockCache().orElse( 512 * SizeUnit.MB) / 2); if (databaseOptions.useDirectIO()) { options @@ -491,7 +496,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { options.setUseDirectIoForFlushAndCompaction(true); } - return new OptionsWithCache(options, blockCache); + return new OptionsWithCache(options, blockCache, compressedCache); } private static List convertPaths(Path databasesDirPath, Path path, List volumes) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java index 4622343..da02793 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java @@ -79,8 +79,8 @@ public abstract class LLLocalReactiveRocksIterator extends return Flux.generate(() -> { var readOptions = new ReadOptions(this.readOptions); if (!rangeShared.hasMin() || !rangeShared.hasMax()) { - readOptions.setReadaheadSize(32 * 1024); // 32KiB - readOptions.setFillCache(false); + //readOptions.setReadaheadSize(32 * 1024); // 32KiB + //readOptions.setFillCache(false); readOptions.setVerifyChecksums(false); } if (logger.isTraceEnabled()) { diff --git a/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbFileStore.java b/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbFileStore.java index 055cd80..cde585c 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbFileStore.java +++ b/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbFileStore.java @@ -130,6 +130,7 @@ public class RocksdbFileStore { private static DBOptions getDBOptions() { var options = new DBOptions(); + options.setParanoidChecks(false); options.setWalSizeLimitMB(256); options.setMaxWriteBatchGroupSizeBytes(2 * SizeUnit.MB); //options.setAtomicFlush(false);