From 5b89be8ee08ad69712a9ecf0adb96caeb342316b Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Tue, 4 May 2021 01:21:29 +0200 Subject: [PATCH] Fix memory usage --- .../database/disk/LLLocalDictionary.java | 2 +- .../disk/LLLocalKeyValueDatabase.java | 18 ++++++++---- .../it/cavallium/dbengine/DbTestUtils.java | 29 ++++++++++--------- 3 files changed, 30 insertions(+), 19 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 937ce96..596fb94 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -600,7 +600,7 @@ public class LLLocalDictionary implements LLDictionary { } } }) - .onErrorMap(cause -> new IOException("Failed to read or write " + LLUtils.toString(key), cause)) + .onErrorMap(cause -> new IOException("Failed to read or write " + (key.refCnt() > 0 ? LLUtils.toString(key) : "(released)"), cause)) .subscribeOn(dbScheduler) .doFinally(s -> key.release()); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 7c37543..78f86f0 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -30,6 +30,7 @@ import org.rocksdb.BloomFilter; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.CompactRangeOptions; +import org.rocksdb.CompactionPriority; import org.rocksdb.CompactionStyle; import org.rocksdb.CompressionType; import org.rocksdb.DBOptions; @@ -40,6 +41,7 @@ import org.rocksdb.Options; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.Snapshot; +import org.rocksdb.TableFormatConfig; import org.rocksdb.WALRecoveryMode; import org.rocksdb.WriteBufferManager; import org.warp.commonutils.log.Logger; @@ -181,7 +183,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { // end force compact } - @SuppressWarnings("CommentedOutCode") + @SuppressWarnings({"CommentedOutCode", "PointlessArithmeticExpression"}) private static Options openRocksDb(Path path, boolean crashIfWalError, boolean lowMemory) throws IOException { // Get databases directory path @@ -224,7 +226,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { if (lowMemory) { // LOW MEMORY options - .setBytesPerSync(1024 * 1024) + .setBytesPerSync(512 * 1024) // 512KiB .setWalBytesPerSync(1024 * 1024) .setIncreaseParallelism(1) .setMaxOpenFiles(2) @@ -245,8 +247,9 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { .setAllowConcurrentMemtableWrite(true) .setEnableWriteThreadAdaptiveYield(true) .setIncreaseParallelism(Runtime.getRuntime().availableProcessors()) - .setBytesPerSync(10 * 1024 * 1024) + .setBytesPerSync(1 * 1024 * 1024) // 1MiB .setWalBytesPerSync(10 * 1024 * 1024) + .setMaxOpenFiles(15) .optimizeLevelStyleCompaction( 128 * 1024 * 1024) // 128MiB of ram will be used for level style compaction .setWriteBufferSize(64 * 1024 * 1024) // 64MB @@ -257,13 +260,18 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { new DbPath(databasesDirPath.resolve(path.getFileName() + "_cold"), 600L * 1024L * 1024L * 1024L))) // 600GiB ; - tableOptions.setBlockCache(new LRUCache(256L * 1024L * 1024L)); // 256MiB - options.setWriteBufferManager(new WriteBufferManager(256L * 1024L * 1024L, new LRUCache(256L * 1024L * 1024L))); // 256MiB + tableOptions.setBlockCache(new LRUCache(128L * 1024L * 1024L)); // 128MiB + options.setWriteBufferManager(new WriteBufferManager(256L * 1024L * 1024L, new LRUCache(128L * 1024L * 1024L))); // 128MiB } final BloomFilter bloomFilter = new BloomFilter(10, false); + tableOptions.setOptimizeFiltersForMemory(true); tableOptions.setFilterPolicy(bloomFilter); + tableOptions.setBlockSize(16 * 1024); // 16MiB + tableOptions.setCacheIndexAndFilterBlocks(true); + tableOptions.setPinL0FilterAndIndexBlocksInCache(true); options.setTableFormatConfig(tableOptions); + options.setCompactionPriority(CompactionPriority.MinOverlappingRatio); return options; } diff --git a/src/test/java/it/cavallium/dbengine/DbTestUtils.java b/src/test/java/it/cavallium/dbengine/DbTestUtils.java index 7177e26..04fd0bd 100644 --- a/src/test/java/it/cavallium/dbengine/DbTestUtils.java +++ b/src/test/java/it/cavallium/dbengine/DbTestUtils.java @@ -1,6 +1,7 @@ package it.cavallium.dbengine; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator; import it.cavallium.dbengine.database.Column; import it.cavallium.dbengine.database.LLDictionary; @@ -33,6 +34,7 @@ import reactor.core.scheduler.Schedulers; public class DbTestUtils { + public static final ByteBufAllocator ALLOCATOR = new PooledByteBufAllocator(true); public static final AtomicInteger dbId = new AtomicInteger(0); public static Flux tempDb(Function> action) { @@ -52,7 +54,7 @@ public class DbTestUtils { return null; }) .subscribeOn(Schedulers.boundedElastic()) - .then(new LLLocalDatabaseConnection(PooledByteBufAllocator.DEFAULT, wrkspcPath, true).connect()) + .then(new LLLocalDatabaseConnection(DbTestUtils.ALLOCATOR, wrkspcPath, true).connect()) .flatMap(conn -> conn.getDatabase("testdb", List.of(Column.dictionary("testmap"), Column.special("ints"), Column.special("longs")), false, true @@ -93,15 +95,15 @@ public class DbTestUtils { LLDictionary dictionary, DbType dbType, int keyBytes) { - if (dbType == DbType.MAP || true) { //todo: fix hashmaps + if (dbType == DbType.MAP) { return DatabaseMapDictionary.simple(dictionary, - SerializerFixedBinaryLength.utf8(PooledByteBufAllocator.DEFAULT, keyBytes), - Serializer.utf8(PooledByteBufAllocator.DEFAULT) + SerializerFixedBinaryLength.utf8(DbTestUtils.ALLOCATOR, keyBytes), + Serializer.utf8(DbTestUtils.ALLOCATOR) ); } else { return DatabaseMapDictionaryHashed.simple(dictionary, - SerializerFixedBinaryLength.utf8(PooledByteBufAllocator.DEFAULT, keyBytes), - Serializer.utf8(PooledByteBufAllocator.DEFAULT), + SerializerFixedBinaryLength.utf8(DbTestUtils.ALLOCATOR, keyBytes), + Serializer.utf8(DbTestUtils.ALLOCATOR), String::hashCode, new SerializerFixedBinaryLength<>() { @Override @@ -112,8 +114,9 @@ public class DbTestUtils { @Override public @NotNull Integer deserialize(@NotNull ByteBuf serialized) { try { + var prevReaderIdx = serialized.readerIndex(); var val = serialized.readInt(); - serialized.readerIndex(serialized.readerIndex() + keyBytes); + serialized.readerIndex(prevReaderIdx + keyBytes); return val; } finally { serialized.release(); @@ -122,7 +125,7 @@ public class DbTestUtils { @Override public @NotNull ByteBuf serialize(@NotNull Integer deserialized) { - var out = PooledByteBufAllocator.DEFAULT.directBuffer(keyBytes); + var out = DbTestUtils.ALLOCATOR.directBuffer(keyBytes); try { out.writeInt(deserialized); out.writerIndex(keyBytes); @@ -141,19 +144,19 @@ public class DbTestUtils { int key1Bytes, int key2Bytes) { return DatabaseMapDictionaryDeep.deepTail(dictionary, - SerializerFixedBinaryLength.utf8(PooledByteBufAllocator.DEFAULT, key1Bytes), + SerializerFixedBinaryLength.utf8(DbTestUtils.ALLOCATOR, key1Bytes), key2Bytes, - new SubStageGetterMap<>(SerializerFixedBinaryLength.utf8(PooledByteBufAllocator.DEFAULT, key2Bytes), Serializer.utf8(PooledByteBufAllocator.DEFAULT)) + new SubStageGetterMap<>(SerializerFixedBinaryLength.utf8(DbTestUtils.ALLOCATOR, key2Bytes), Serializer.utf8(DbTestUtils.ALLOCATOR)) ); } public static DatabaseMapDictionaryHashed tempDatabaseMapDictionaryHashMap( LLDictionary dictionary) { return DatabaseMapDictionaryHashed.simple(dictionary, - Serializer.utf8(PooledByteBufAllocator.DEFAULT), - Serializer.utf8(PooledByteBufAllocator.DEFAULT), + Serializer.utf8(DbTestUtils.ALLOCATOR), + Serializer.utf8(DbTestUtils.ALLOCATOR), String::hashCode, - SerializerFixedBinaryLength.intSerializer(PooledByteBufAllocator.DEFAULT) + SerializerFixedBinaryLength.intSerializer(DbTestUtils.ALLOCATOR) ); } }