From 4394c74ad96ecfa5a2249a3538ba22db0ae10258 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sat, 20 Mar 2021 12:41:11 +0100 Subject: [PATCH] Bugfix --- pom.xml | 12 +-- .../database/disk/LLLocalDictionary.java | 89 ++++++++++++------- .../disk/LLLocalKeyValueDatabase.java | 15 +++- .../database/disk/LLLocalLuceneIndex.java | 2 +- 4 files changed, 75 insertions(+), 43 deletions(-) diff --git a/pom.xml b/pom.xml index aa77247..69a8c5c 100644 --- a/pom.xml +++ b/pom.xml @@ -143,32 +143,32 @@ org.apache.lucene lucene-core - 8.6.2 + 8.8.1 org.apache.lucene lucene-join - 8.6.2 + 8.8.1 org.apache.lucene lucene-analyzers-common - 8.6.2 + 8.8.1 org.apache.lucene lucene-codecs - 8.6.2 + 8.8.1 org.apache.lucene lucene-backward-codecs - 8.6.2 + 8.8.1 org.apache.lucene lucene-queries - 8.6.2 + 8.8.1 org.jetbrains diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 27bfbf6..3ffad40 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -27,6 +27,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.CompactRangeOptions; import org.rocksdb.FlushOptions; import org.rocksdb.Holder; import org.rocksdb.ReadOptions; @@ -647,33 +648,8 @@ public class LLLocalDictionary implements LLDictionary { .fromCallable(() -> { if (range.isSingle()) { writeBatch.delete(cfh, range.getSingle()); - } else if (range.hasMin() && range.hasMax()) { - writeBatch.deleteRange(cfh, range.getMin(), range.getMax()); - } else if (range.hasMax()) { - writeBatch.deleteRange(cfh, FIRST_KEY, range.getMax()); - } else if (range.hasMin()) { - // Delete from x to end of column - var readOpts = getReadOptions(null); - readOpts.setIterateLowerBound(new Slice(range.getMin())); - try (var it = db.newIterator(cfh, readOpts)) { - it.seekToLast(); - if (it.isValid()) { - writeBatch.deleteRange(cfh, range.getMin(), it.key()); - // Delete the last key because we are deleting everything from "min" onward, without a max bound - writeBatch.delete(it.key()); - } - } } else { - // Delete all - var readOpts = getReadOptions(null); - try (var it = db.newIterator(cfh, readOpts)) { - it.seekToLast(); - if (it.isValid()) { - writeBatch.deleteRange(cfh, FIRST_KEY, it.key()); - // Delete the last key because we are deleting everything without a max bound - writeBatch.delete(it.key()); - } - } + deleteSmallRangeWriteBatch(writeBatch, range); } return null; }) @@ -693,6 +669,52 @@ public class LLLocalDictionary implements LLDictionary { .onErrorMap(cause -> new IOException("Failed to write range", cause)); } + private void deleteSmallRange(LLRange range) + throws RocksDBException { + var readOpts = getReadOptions(null); + readOpts.setFillCache(false); + if (range.hasMin()) { + readOpts.setIterateLowerBound(new Slice(range.getMin())); + } + if (range.hasMax()) { + readOpts.setIterateUpperBound(new Slice(range.getMax())); + } + try (var rocksIterator = db.newIterator(cfh, readOpts)) { + if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { + rocksIterator.seek(range.getMin()); + } else { + rocksIterator.seekToFirst(); + } + while (rocksIterator.isValid()) { + db.delete(cfh, rocksIterator.key()); + rocksIterator.next(); + } + } + } + + private void deleteSmallRangeWriteBatch(CappedWriteBatch writeBatch, LLRange range) + throws RocksDBException { + var readOpts = getReadOptions(null); + readOpts.setFillCache(false); + if (range.hasMin()) { + readOpts.setIterateLowerBound(new Slice(range.getMin())); + } + if (range.hasMax()) { + readOpts.setIterateUpperBound(new Slice(range.getMax())); + } + try (var rocksIterator = db.newIterator(cfh, readOpts)) { + if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { + rocksIterator.seek(range.getMin()); + } else { + rocksIterator.seekToFirst(); + } + while (rocksIterator.isValid()) { + writeBatch.delete(cfh, rocksIterator.key()); + rocksIterator.next(); + } + } + } + private static byte[] incrementLexicographically(byte[] key) { boolean remainder = true; int prefixLength = key.length; @@ -731,26 +753,27 @@ public class LLLocalDictionary implements LLDictionary { BATCH_WRITE_OPTIONS )) { - //byte[] firstDeletedKey = null; - //byte[] lastDeletedKey = null; + byte[] firstDeletedKey = null; + byte[] lastDeletedKey = null; try (RocksIterator iter = db.newIterator(cfh, readOpts)) { iter.seekToLast(); if (iter.isValid()) { + firstDeletedKey = FIRST_KEY; + lastDeletedKey = iter.key(); writeBatch.deleteRange(cfh, FIRST_KEY, iter.key()); writeBatch.delete(cfh, iter.key()); - //firstDeletedKey = FIRST_KEY; - //lastDeletedKey = incrementLexicographically(iter.key()); } } writeBatch.writeToDbAndClose(); + // Compact range db.suggestCompactRange(cfh); - //if (firstDeletedKey != null && lastDeletedKey != null) { - //db.compactRange(cfh, firstDeletedKey, lastDeletedKey, new CompactRangeOptions().setChangeLevel(false)); - //} + if (firstDeletedKey != null && lastDeletedKey != null) { + db.compactRange(cfh, firstDeletedKey, lastDeletedKey, new CompactRangeOptions().setChangeLevel(false)); + } db.flush(new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true), cfh); db.flushWal(true); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 0586f1f..a76b46f 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -33,11 +33,13 @@ import org.rocksdb.CompressionType; import org.rocksdb.DBOptions; import org.rocksdb.DbPath; import org.rocksdb.FlushOptions; +import org.rocksdb.LRUCache; import org.rocksdb.Options; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.Snapshot; import org.rocksdb.WALRecoveryMode; +import org.rocksdb.WriteBufferManager; import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.LoggerFactory; import reactor.core.publisher.Mono; @@ -190,11 +192,15 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { options.setDeleteObsoleteFilesPeriodMicros(20 * 1000000); // 20 seconds options.setPreserveDeletes(false); options.setKeepLogFileNum(10); + options.setAllowMmapReads(true); + options.setAllowMmapWrites(true); + options.setAllowFAllocate(true); // Direct I/O parameters. Removed because they use too much disk. //options.setUseDirectReads(true); //options.setUseDirectIoForFlushAndCompaction(true); //options.setCompactionReadaheadSize(2 * 1024 * 1024); // recommend at least 2MB //options.setWritableFileMaxBufferSize(1024 * 1024); // 1MB by default + final BlockBasedTableConfig tableOptions = new BlockBasedTableConfig(); if (lowMemory) { // LOW MEMORY options @@ -211,6 +217,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { new DbPath(databasesDirPath.resolve(path.getFileName() + "_cold"), 600L * 1024L * 1024L * 1024L))) // 600GiB ; + tableOptions.setBlockCache(new LRUCache(8L * 1024L * 1024L)); // 8MiB + options.setWriteBufferManager(new WriteBufferManager(8L * 1024L * 1024L, new LRUCache(8L * 1024L * 1024L))); // 8MiB } else { // HIGH MEMORY options @@ -221,18 +229,19 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { .setWalBytesPerSync(10 * 1024 * 1024) .optimizeLevelStyleCompaction( 128 * 1024 * 1024) // 128MiB of ram will be used for level style compaction - .setWriteBufferSize(128 * 1024 * 1024) // 128MB + .setWriteBufferSize(64 * 1024 * 1024) // 64MB .setWalSizeLimitMB(1024) // 1024MB - .setMaxTotalWalSize(8L * 1024L * 1024L * 1024L) // 8GiB max wal directory size + .setMaxTotalWalSize(2L * 1024L * 1024L * 1024L) // 2GiB max wal directory size .setDbPaths(List.of(new DbPath(databasesDirPath.resolve(path.getFileName() + "_hot"), 400L * 1024L * 1024L * 1024L), // 400GiB new DbPath(databasesDirPath.resolve(path.getFileName() + "_cold"), 600L * 1024L * 1024L * 1024L))) // 600GiB ; + tableOptions.setBlockCache(new LRUCache(256L * 1024L * 1024L)); // 256MiB + options.setWriteBufferManager(new WriteBufferManager(256L * 1024L * 1024L, new LRUCache(256L * 1024L * 1024L))); // 256MiB } final BloomFilter bloomFilter = new BloomFilter(10, false); - final BlockBasedTableConfig tableOptions = new BlockBasedTableConfig(); tableOptions.setFilterPolicy(bloomFilter); options.setTableFormatConfig(tableOptions); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index b588b48..fe88936 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -146,7 +146,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { indexWriterConfig.setRAMPerThreadHardLimitMB(32); } else { indexWriterConfig.setRAMBufferSizeMB(128); - indexWriterConfig.setRAMPerThreadHardLimitMB(512); + //indexWriterConfig.setRAMPerThreadHardLimitMB(512); } indexWriterConfig.setSimilarity(getSimilarity()); this.indexWriter = new IndexWriter(directory, indexWriterConfig);