This commit is contained in:
Andrea Cavalli 2021-03-21 13:06:54 +01:00
parent 4394c74ad9
commit 62b53c1399
4 changed files with 25 additions and 9 deletions

View File

@ -746,6 +746,7 @@ public class LLLocalDictionary implements LLDictionary {
// readOpts.setIgnoreRangeDeletions(true);
readOpts.setFillCache(false);
readOpts.setReadaheadSize(2 * 1024 * 1024);
try (CappedWriteBatch writeBatch = new CappedWriteBatch(db,
CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE,
@ -772,7 +773,14 @@ public class LLLocalDictionary implements LLDictionary {
// Compact range
db.suggestCompactRange(cfh);
if (firstDeletedKey != null && lastDeletedKey != null) {
db.compactRange(cfh, firstDeletedKey, lastDeletedKey, new CompactRangeOptions().setChangeLevel(false));
db.compactRange(cfh,
firstDeletedKey,
lastDeletedKey,
new CompactRangeOptions()
.setAllowWriteStall(false)
.setExclusiveManualCompaction(false)
.setChangeLevel(false)
);
}
db.flush(new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true), cfh);
@ -917,7 +925,7 @@ public class LLLocalDictionary implements LLDictionary {
private long exactSizeAll(@Nullable LLSnapshot snapshot) {
var readOpts = resolveSnapshot(snapshot);
readOpts.setFillCache(false);
// readOpts.setReadaheadSize(2 * 1024 * 1024);
readOpts.setReadaheadSize(2 * 1024 * 1024);
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
if (PARALLEL_EXACT_SIZE) {

View File

@ -37,13 +37,12 @@ public class LLLocalKeyPrefixReactiveRocksIterator {
}
@SuppressWarnings("Convert2MethodRef")
public Flux<byte[]> flux() {
return Flux
.generate(() -> {
var readOptions = new ReadOptions(this.readOptions);
if (!range.hasMin() || !range.hasMax()) {
// readOptions.setReadaheadSize(2 * 1024 * 1024);
readOptions.setReadaheadSize(2 * 1024 * 1024);
readOptions.setFillCache(false);
}
Slice sliceMin;

View File

@ -21,8 +21,11 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.apache.commons.lang3.time.StopWatch;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.ColumnFamilyDescriptor;
@ -107,7 +110,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
this.handles.put(columns.get(i), handles.get(i));
}
compactDb(db, handles);
// compactDb(db, handles);
flushDb(db, handles);
} catch (RocksDBException ex) {
throw new IOException(ex);
@ -144,9 +147,15 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
// force compact the database
for (ColumnFamilyHandle cfh : handles) {
var t = new Thread(() -> {
int r = ThreadLocalRandom.current().nextInt();
var s = StopWatch.createStarted();
try {
// Range rangeToCompact = db.suggestCompactRange(cfh);
db.compactRange(cfh, null, null, new CompactRangeOptions().setAllowWriteStall(false).setChangeLevel(false));
logger.info("Compacting range {}", r);
db.compactRange(cfh, null, null, new CompactRangeOptions()
.setAllowWriteStall(true)
.setExclusiveManualCompaction(true)
.setChangeLevel(false));
} catch (RocksDBException e) {
if ("Database shutdown".equalsIgnoreCase(e.getMessage())) {
logger.warn("Compaction cancelled: database shutdown");
@ -154,6 +163,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
logger.warn("Failed to compact range", e);
}
}
logger.info("Compacted range {} in {} milliseconds", r, s.getTime(TimeUnit.MILLISECONDS));
}, "Compaction");
t.setDaemon(true);
t.start();
@ -198,8 +208,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
// Direct I/O parameters. Removed because they use too much disk.
//options.setUseDirectReads(true);
//options.setUseDirectIoForFlushAndCompaction(true);
//options.setCompactionReadaheadSize(2 * 1024 * 1024); // recommend at least 2MB
//options.setWritableFileMaxBufferSize(1024 * 1024); // 1MB by default
options.setCompactionReadaheadSize(2 * 1024 * 1024); // recommend at least 2MB
final BlockBasedTableConfig tableOptions = new BlockBasedTableConfig();
if (lowMemory) {
// LOW MEMORY

View File

@ -32,13 +32,12 @@ public abstract class LLLocalReactiveRocksIterator<T> {
this.readValues = readValues;
}
@SuppressWarnings("Convert2MethodRef")
public Flux<T> flux() {
return Flux
.generate(() -> {
var readOptions = new ReadOptions(this.readOptions);
if (!range.hasMin() || !range.hasMax()) {
// readOptions.setReadaheadSize(2 * 1024 * 1024);
readOptions.setReadaheadSize(2 * 1024 * 1024);
readOptions.setFillCache(false);
}
Slice sliceMin;