This commit is contained in:
Andrea Cavalli 2021-03-20 12:41:11 +01:00
parent 8075694e15
commit 4394c74ad9
4 changed files with 75 additions and 43 deletions

12
pom.xml
View File

@ -143,32 +143,32 @@
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-core</artifactId>
<version>8.6.2</version>
<version>8.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-join</artifactId>
<version>8.6.2</version>
<version>8.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-analyzers-common</artifactId>
<version>8.6.2</version>
<version>8.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-codecs</artifactId>
<version>8.6.2</version>
<version>8.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-backward-codecs</artifactId>
<version>8.6.2</version>
<version>8.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-queries</artifactId>
<version>8.6.2</version>
<version>8.8.1</version>
</dependency>
<dependency>
<groupId>org.jetbrains</groupId>

View File

@ -27,6 +27,7 @@ import org.apache.commons.lang3.tuple.Pair;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.CompactRangeOptions;
import org.rocksdb.FlushOptions;
import org.rocksdb.Holder;
import org.rocksdb.ReadOptions;
@ -647,33 +648,8 @@ public class LLLocalDictionary implements LLDictionary {
.fromCallable(() -> {
if (range.isSingle()) {
writeBatch.delete(cfh, range.getSingle());
} else if (range.hasMin() && range.hasMax()) {
writeBatch.deleteRange(cfh, range.getMin(), range.getMax());
} else if (range.hasMax()) {
writeBatch.deleteRange(cfh, FIRST_KEY, range.getMax());
} else if (range.hasMin()) {
// Delete from x to end of column
var readOpts = getReadOptions(null);
readOpts.setIterateLowerBound(new Slice(range.getMin()));
try (var it = db.newIterator(cfh, readOpts)) {
it.seekToLast();
if (it.isValid()) {
writeBatch.deleteRange(cfh, range.getMin(), it.key());
// Delete the last key because we are deleting everything from "min" onward, without a max bound
writeBatch.delete(it.key());
}
}
} else {
// Delete all
var readOpts = getReadOptions(null);
try (var it = db.newIterator(cfh, readOpts)) {
it.seekToLast();
if (it.isValid()) {
writeBatch.deleteRange(cfh, FIRST_KEY, it.key());
// Delete the last key because we are deleting everything without a max bound
writeBatch.delete(it.key());
}
}
deleteSmallRangeWriteBatch(writeBatch, range);
}
return null;
})
@ -693,6 +669,52 @@ public class LLLocalDictionary implements LLDictionary {
.onErrorMap(cause -> new IOException("Failed to write range", cause));
}
private void deleteSmallRange(LLRange range)
throws RocksDBException {
var readOpts = getReadOptions(null);
readOpts.setFillCache(false);
if (range.hasMin()) {
readOpts.setIterateLowerBound(new Slice(range.getMin()));
}
if (range.hasMax()) {
readOpts.setIterateUpperBound(new Slice(range.getMax()));
}
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterator.seek(range.getMin());
} else {
rocksIterator.seekToFirst();
}
while (rocksIterator.isValid()) {
db.delete(cfh, rocksIterator.key());
rocksIterator.next();
}
}
}
private void deleteSmallRangeWriteBatch(CappedWriteBatch writeBatch, LLRange range)
throws RocksDBException {
var readOpts = getReadOptions(null);
readOpts.setFillCache(false);
if (range.hasMin()) {
readOpts.setIterateLowerBound(new Slice(range.getMin()));
}
if (range.hasMax()) {
readOpts.setIterateUpperBound(new Slice(range.getMax()));
}
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterator.seek(range.getMin());
} else {
rocksIterator.seekToFirst();
}
while (rocksIterator.isValid()) {
writeBatch.delete(cfh, rocksIterator.key());
rocksIterator.next();
}
}
}
private static byte[] incrementLexicographically(byte[] key) {
boolean remainder = true;
int prefixLength = key.length;
@ -731,26 +753,27 @@ public class LLLocalDictionary implements LLDictionary {
BATCH_WRITE_OPTIONS
)) {
//byte[] firstDeletedKey = null;
//byte[] lastDeletedKey = null;
byte[] firstDeletedKey = null;
byte[] lastDeletedKey = null;
try (RocksIterator iter = db.newIterator(cfh, readOpts)) {
iter.seekToLast();
if (iter.isValid()) {
firstDeletedKey = FIRST_KEY;
lastDeletedKey = iter.key();
writeBatch.deleteRange(cfh, FIRST_KEY, iter.key());
writeBatch.delete(cfh, iter.key());
//firstDeletedKey = FIRST_KEY;
//lastDeletedKey = incrementLexicographically(iter.key());
}
}
writeBatch.writeToDbAndClose();
// Compact range
db.suggestCompactRange(cfh);
//if (firstDeletedKey != null && lastDeletedKey != null) {
//db.compactRange(cfh, firstDeletedKey, lastDeletedKey, new CompactRangeOptions().setChangeLevel(false));
//}
if (firstDeletedKey != null && lastDeletedKey != null) {
db.compactRange(cfh, firstDeletedKey, lastDeletedKey, new CompactRangeOptions().setChangeLevel(false));
}
db.flush(new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true), cfh);
db.flushWal(true);

View File

@ -33,11 +33,13 @@ import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions;
import org.rocksdb.DbPath;
import org.rocksdb.FlushOptions;
import org.rocksdb.LRUCache;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.Snapshot;
import org.rocksdb.WALRecoveryMode;
import org.rocksdb.WriteBufferManager;
import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Mono;
@ -190,11 +192,15 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
options.setDeleteObsoleteFilesPeriodMicros(20 * 1000000); // 20 seconds
options.setPreserveDeletes(false);
options.setKeepLogFileNum(10);
options.setAllowMmapReads(true);
options.setAllowMmapWrites(true);
options.setAllowFAllocate(true);
// Direct I/O parameters. Removed because they use too much disk.
//options.setUseDirectReads(true);
//options.setUseDirectIoForFlushAndCompaction(true);
//options.setCompactionReadaheadSize(2 * 1024 * 1024); // recommend at least 2MB
//options.setWritableFileMaxBufferSize(1024 * 1024); // 1MB by default
final BlockBasedTableConfig tableOptions = new BlockBasedTableConfig();
if (lowMemory) {
// LOW MEMORY
options
@ -211,6 +217,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
new DbPath(databasesDirPath.resolve(path.getFileName() + "_cold"),
600L * 1024L * 1024L * 1024L))) // 600GiB
;
tableOptions.setBlockCache(new LRUCache(8L * 1024L * 1024L)); // 8MiB
options.setWriteBufferManager(new WriteBufferManager(8L * 1024L * 1024L, new LRUCache(8L * 1024L * 1024L))); // 8MiB
} else {
// HIGH MEMORY
options
@ -221,18 +229,19 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
.setWalBytesPerSync(10 * 1024 * 1024)
.optimizeLevelStyleCompaction(
128 * 1024 * 1024) // 128MiB of ram will be used for level style compaction
.setWriteBufferSize(128 * 1024 * 1024) // 128MB
.setWriteBufferSize(64 * 1024 * 1024) // 64MB
.setWalSizeLimitMB(1024) // 1024MB
.setMaxTotalWalSize(8L * 1024L * 1024L * 1024L) // 8GiB max wal directory size
.setMaxTotalWalSize(2L * 1024L * 1024L * 1024L) // 2GiB max wal directory size
.setDbPaths(List.of(new DbPath(databasesDirPath.resolve(path.getFileName() + "_hot"),
400L * 1024L * 1024L * 1024L), // 400GiB
new DbPath(databasesDirPath.resolve(path.getFileName() + "_cold"),
600L * 1024L * 1024L * 1024L))) // 600GiB
;
tableOptions.setBlockCache(new LRUCache(256L * 1024L * 1024L)); // 256MiB
options.setWriteBufferManager(new WriteBufferManager(256L * 1024L * 1024L, new LRUCache(256L * 1024L * 1024L))); // 256MiB
}
final BloomFilter bloomFilter = new BloomFilter(10, false);
final BlockBasedTableConfig tableOptions = new BlockBasedTableConfig();
tableOptions.setFilterPolicy(bloomFilter);
options.setTableFormatConfig(tableOptions);

View File

@ -146,7 +146,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
indexWriterConfig.setRAMPerThreadHardLimitMB(32);
} else {
indexWriterConfig.setRAMBufferSizeMB(128);
indexWriterConfig.setRAMPerThreadHardLimitMB(512);
//indexWriterConfig.setRAMPerThreadHardLimitMB(512);
}
indexWriterConfig.setSimilarity(getSimilarity());
this.indexWriter = new IndexWriter(directory, indexWriterConfig);