Fix memory usage

This commit is contained in:
Andrea Cavalli 2021-05-04 01:21:29 +02:00
parent f93cae96f3
commit 5b89be8ee0
3 changed files with 30 additions and 19 deletions

View File

@ -600,7 +600,7 @@ public class LLLocalDictionary implements LLDictionary {
}
}
})
.onErrorMap(cause -> new IOException("Failed to read or write " + LLUtils.toString(key), cause))
.onErrorMap(cause -> new IOException("Failed to read or write " + (key.refCnt() > 0 ? LLUtils.toString(key) : "(released)"), cause))
.subscribeOn(dbScheduler)
.doFinally(s -> key.release());
}

View File

@ -30,6 +30,7 @@ import org.rocksdb.BloomFilter;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.CompactRangeOptions;
import org.rocksdb.CompactionPriority;
import org.rocksdb.CompactionStyle;
import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions;
@ -40,6 +41,7 @@ import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.Snapshot;
import org.rocksdb.TableFormatConfig;
import org.rocksdb.WALRecoveryMode;
import org.rocksdb.WriteBufferManager;
import org.warp.commonutils.log.Logger;
@ -181,7 +183,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
// end force compact
}
@SuppressWarnings("CommentedOutCode")
@SuppressWarnings({"CommentedOutCode", "PointlessArithmeticExpression"})
private static Options openRocksDb(Path path, boolean crashIfWalError, boolean lowMemory)
throws IOException {
// Get databases directory path
@ -224,7 +226,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
if (lowMemory) {
// LOW MEMORY
options
.setBytesPerSync(1024 * 1024)
.setBytesPerSync(512 * 1024) // 512KiB
.setWalBytesPerSync(1024 * 1024)
.setIncreaseParallelism(1)
.setMaxOpenFiles(2)
@ -245,8 +247,9 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
.setAllowConcurrentMemtableWrite(true)
.setEnableWriteThreadAdaptiveYield(true)
.setIncreaseParallelism(Runtime.getRuntime().availableProcessors())
.setBytesPerSync(10 * 1024 * 1024)
.setBytesPerSync(1 * 1024 * 1024) // 1MiB
.setWalBytesPerSync(10 * 1024 * 1024)
.setMaxOpenFiles(15)
.optimizeLevelStyleCompaction(
128 * 1024 * 1024) // 128MiB of ram will be used for level style compaction
.setWriteBufferSize(64 * 1024 * 1024) // 64MB
@ -257,13 +260,18 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
new DbPath(databasesDirPath.resolve(path.getFileName() + "_cold"),
600L * 1024L * 1024L * 1024L))) // 600GiB
;
tableOptions.setBlockCache(new LRUCache(256L * 1024L * 1024L)); // 256MiB
options.setWriteBufferManager(new WriteBufferManager(256L * 1024L * 1024L, new LRUCache(256L * 1024L * 1024L))); // 256MiB
tableOptions.setBlockCache(new LRUCache(128L * 1024L * 1024L)); // 128MiB
options.setWriteBufferManager(new WriteBufferManager(256L * 1024L * 1024L, new LRUCache(128L * 1024L * 1024L))); // 128MiB
}
final BloomFilter bloomFilter = new BloomFilter(10, false);
tableOptions.setOptimizeFiltersForMemory(true);
tableOptions.setFilterPolicy(bloomFilter);
tableOptions.setBlockSize(16 * 1024); // 16MiB
tableOptions.setCacheIndexAndFilterBlocks(true);
tableOptions.setPinL0FilterAndIndexBlocksInCache(true);
options.setTableFormatConfig(tableOptions);
options.setCompactionPriority(CompactionPriority.MinOverlappingRatio);
return options;
}

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.LLDictionary;
@ -33,6 +34,7 @@ import reactor.core.scheduler.Schedulers;
public class DbTestUtils {
public static final ByteBufAllocator ALLOCATOR = new PooledByteBufAllocator(true);
public static final AtomicInteger dbId = new AtomicInteger(0);
public static <U> Flux<U> tempDb(Function<LLKeyValueDatabase, Publisher<U>> action) {
@ -52,7 +54,7 @@ public class DbTestUtils {
return null;
})
.subscribeOn(Schedulers.boundedElastic())
.then(new LLLocalDatabaseConnection(PooledByteBufAllocator.DEFAULT, wrkspcPath, true).connect())
.then(new LLLocalDatabaseConnection(DbTestUtils.ALLOCATOR, wrkspcPath, true).connect())
.flatMap(conn -> conn.getDatabase("testdb",
List.of(Column.dictionary("testmap"), Column.special("ints"), Column.special("longs")),
false, true
@ -93,15 +95,15 @@ public class DbTestUtils {
LLDictionary dictionary,
DbType dbType,
int keyBytes) {
if (dbType == DbType.MAP || true) { //todo: fix hashmaps
if (dbType == DbType.MAP) {
return DatabaseMapDictionary.simple(dictionary,
SerializerFixedBinaryLength.utf8(PooledByteBufAllocator.DEFAULT, keyBytes),
Serializer.utf8(PooledByteBufAllocator.DEFAULT)
SerializerFixedBinaryLength.utf8(DbTestUtils.ALLOCATOR, keyBytes),
Serializer.utf8(DbTestUtils.ALLOCATOR)
);
} else {
return DatabaseMapDictionaryHashed.simple(dictionary,
SerializerFixedBinaryLength.utf8(PooledByteBufAllocator.DEFAULT, keyBytes),
Serializer.utf8(PooledByteBufAllocator.DEFAULT),
SerializerFixedBinaryLength.utf8(DbTestUtils.ALLOCATOR, keyBytes),
Serializer.utf8(DbTestUtils.ALLOCATOR),
String::hashCode,
new SerializerFixedBinaryLength<>() {
@Override
@ -112,8 +114,9 @@ public class DbTestUtils {
@Override
public @NotNull Integer deserialize(@NotNull ByteBuf serialized) {
try {
var prevReaderIdx = serialized.readerIndex();
var val = serialized.readInt();
serialized.readerIndex(serialized.readerIndex() + keyBytes);
serialized.readerIndex(prevReaderIdx + keyBytes);
return val;
} finally {
serialized.release();
@ -122,7 +125,7 @@ public class DbTestUtils {
@Override
public @NotNull ByteBuf serialize(@NotNull Integer deserialized) {
var out = PooledByteBufAllocator.DEFAULT.directBuffer(keyBytes);
var out = DbTestUtils.ALLOCATOR.directBuffer(keyBytes);
try {
out.writeInt(deserialized);
out.writerIndex(keyBytes);
@ -141,19 +144,19 @@ public class DbTestUtils {
int key1Bytes,
int key2Bytes) {
return DatabaseMapDictionaryDeep.deepTail(dictionary,
SerializerFixedBinaryLength.utf8(PooledByteBufAllocator.DEFAULT, key1Bytes),
SerializerFixedBinaryLength.utf8(DbTestUtils.ALLOCATOR, key1Bytes),
key2Bytes,
new SubStageGetterMap<>(SerializerFixedBinaryLength.utf8(PooledByteBufAllocator.DEFAULT, key2Bytes), Serializer.utf8(PooledByteBufAllocator.DEFAULT))
new SubStageGetterMap<>(SerializerFixedBinaryLength.utf8(DbTestUtils.ALLOCATOR, key2Bytes), Serializer.utf8(DbTestUtils.ALLOCATOR))
);
}
public static <T, U> DatabaseMapDictionaryHashed<String, String, Integer> tempDatabaseMapDictionaryHashMap(
LLDictionary dictionary) {
return DatabaseMapDictionaryHashed.simple(dictionary,
Serializer.utf8(PooledByteBufAllocator.DEFAULT),
Serializer.utf8(PooledByteBufAllocator.DEFAULT),
Serializer.utf8(DbTestUtils.ALLOCATOR),
Serializer.utf8(DbTestUtils.ALLOCATOR),
String::hashCode,
SerializerFixedBinaryLength.intSerializer(PooledByteBufAllocator.DEFAULT)
SerializerFixedBinaryLength.intSerializer(DbTestUtils.ALLOCATOR)
);
}
}