Fix unmodifiable read options
This commit is contained in:
parent
cb8dbf2fa2
commit
2df2c00e36
|
@ -1251,8 +1251,7 @@ public class LLLocalDictionary implements LLDictionary {
|
||||||
public Flux<BadBlock> badBlocks(LLRange range) {
|
public Flux<BadBlock> badBlocks(LLRange range) {
|
||||||
return Flux
|
return Flux
|
||||||
.<BadBlock>create(sink -> {
|
.<BadBlock>create(sink -> {
|
||||||
try {
|
try (var ro = new ReadOptions(getReadOptions(null))) {
|
||||||
var ro = new ReadOptions(getReadOptions(null));
|
|
||||||
ro.setFillCache(false);
|
ro.setFillCache(false);
|
||||||
if (!range.isSingle()) {
|
if (!range.isSingle()) {
|
||||||
ro.setReadaheadSize(32 * 1024);
|
ro.setReadaheadSize(32 * 1024);
|
||||||
|
@ -1666,53 +1665,54 @@ public class LLLocalDictionary implements LLDictionary {
|
||||||
public Mono<Void> clear() {
|
public Mono<Void> clear() {
|
||||||
return Mono
|
return Mono
|
||||||
.<Void>fromCallable(() -> {
|
.<Void>fromCallable(() -> {
|
||||||
var readOpts = getReadOptions(null);
|
try (var readOpts = new ReadOptions(getReadOptions(null))) {
|
||||||
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
|
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
|
||||||
|
|
||||||
// readOpts.setIgnoreRangeDeletions(true);
|
// readOpts.setIgnoreRangeDeletions(true);
|
||||||
readOpts.setFillCache(false);
|
readOpts.setFillCache(false);
|
||||||
readOpts.setReadaheadSize(32 * 1024); // 32KiB
|
readOpts.setReadaheadSize(32 * 1024); // 32KiB
|
||||||
try (CappedWriteBatch writeBatch = new CappedWriteBatch(db,
|
try (CappedWriteBatch writeBatch = new CappedWriteBatch(db,
|
||||||
CAPPED_WRITE_BATCH_CAP,
|
CAPPED_WRITE_BATCH_CAP,
|
||||||
RESERVED_WRITE_BATCH_SIZE,
|
RESERVED_WRITE_BATCH_SIZE,
|
||||||
MAX_WRITE_BATCH_SIZE,
|
MAX_WRITE_BATCH_SIZE,
|
||||||
BATCH_WRITE_OPTIONS
|
BATCH_WRITE_OPTIONS
|
||||||
)) {
|
)) {
|
||||||
|
|
||||||
byte[] firstDeletedKey = null;
|
byte[] firstDeletedKey = null;
|
||||||
byte[] lastDeletedKey = null;
|
byte[] lastDeletedKey = null;
|
||||||
try (RocksIterator rocksIterator = db.newIterator(cfh, readOpts)) {
|
try (RocksIterator rocksIterator = db.newIterator(cfh, readOpts)) {
|
||||||
rocksIterator.seekToLast();
|
rocksIterator.seekToLast();
|
||||||
|
|
||||||
rocksIterator.status();
|
rocksIterator.status();
|
||||||
if (rocksIterator.isValid()) {
|
if (rocksIterator.isValid()) {
|
||||||
firstDeletedKey = FIRST_KEY;
|
firstDeletedKey = FIRST_KEY;
|
||||||
lastDeletedKey = rocksIterator.key();
|
lastDeletedKey = rocksIterator.key();
|
||||||
writeBatch.deleteRange(cfh, FIRST_KEY, rocksIterator.key());
|
writeBatch.deleteRange(cfh, FIRST_KEY, rocksIterator.key());
|
||||||
writeBatch.delete(cfh, rocksIterator.key());
|
writeBatch.delete(cfh, rocksIterator.key());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
writeBatch.writeToDbAndClose();
|
||||||
|
|
||||||
|
|
||||||
|
// Compact range
|
||||||
|
db.suggestCompactRange(cfh);
|
||||||
|
if (firstDeletedKey != null && lastDeletedKey != null) {
|
||||||
|
db.compactRange(cfh,
|
||||||
|
firstDeletedKey,
|
||||||
|
lastDeletedKey,
|
||||||
|
new CompactRangeOptions()
|
||||||
|
.setAllowWriteStall(false)
|
||||||
|
.setExclusiveManualCompaction(false)
|
||||||
|
.setChangeLevel(false)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
db.flush(new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true), cfh);
|
||||||
|
db.flushWal(true);
|
||||||
}
|
}
|
||||||
|
return null;
|
||||||
writeBatch.writeToDbAndClose();
|
|
||||||
|
|
||||||
|
|
||||||
// Compact range
|
|
||||||
db.suggestCompactRange(cfh);
|
|
||||||
if (firstDeletedKey != null && lastDeletedKey != null) {
|
|
||||||
db.compactRange(cfh,
|
|
||||||
firstDeletedKey,
|
|
||||||
lastDeletedKey,
|
|
||||||
new CompactRangeOptions()
|
|
||||||
.setAllowWriteStall(false)
|
|
||||||
.setExclusiveManualCompaction(false)
|
|
||||||
.setChangeLevel(false)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
db.flush(new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true), cfh);
|
|
||||||
db.flushWal(true);
|
|
||||||
}
|
}
|
||||||
return null;
|
|
||||||
})
|
})
|
||||||
.onErrorMap(cause -> new IOException("Failed to clear", cause))
|
.onErrorMap(cause -> new IOException("Failed to clear", cause))
|
||||||
.subscribeOn(dbScheduler);
|
.subscribeOn(dbScheduler);
|
||||||
|
|
|
@ -481,6 +481,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
|
||||||
|
|
||||||
private ColumnFamilyHandle getCfh(byte[] columnName) throws RocksDBException {
|
private ColumnFamilyHandle getCfh(byte[] columnName) throws RocksDBException {
|
||||||
ColumnFamilyHandle cfh = handles.get(Column.special(Column.toString(columnName)));
|
ColumnFamilyHandle cfh = handles.get(Column.special(Column.toString(columnName)));
|
||||||
|
//noinspection RedundantIfStatement
|
||||||
if (!enableColumnsBug) {
|
if (!enableColumnsBug) {
|
||||||
assert Arrays.equals(cfh.getName(), columnName);
|
assert Arrays.equals(cfh.getName(), columnName);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user