Add verifyChecksum method

This commit is contained in:
Andrea Cavalli 2021-06-27 15:06:48 +02:00
parent 1d281d0305
commit 507101e453
4 changed files with 22 additions and 4 deletions

View File

@ -24,4 +24,6 @@ public interface CompositeDatabase {
* Find corrupted items
*/
Flux<BadBlock> badBlocks();
Mono<Void> verifyChecksum();
}

View File

@ -44,6 +44,8 @@ public interface LLKeyValueDatabase extends LLSnapshottable, LLKeyValueDatabaseS
Mono<Long> getProperty(String propertyName);
Mono<Void> verifyChecksum();
ByteBufAllocator getAllocator();
Mono<Void> close();

View File

@ -1263,7 +1263,7 @@ public class LLLocalDictionary implements LLDictionary {
try (var rocksIterator = rocksIteratorTuple.getT1()) {
rocksIterator.seekToFirst();
rocksIterator.status();
while (rocksIterator.isValid()) {
while (rocksIterator.isValid() && !sink.isCancelled()) {
try {
rocksIterator.status();
rocksIterator.key(DUMMY_WRITE_ONLY_BYTE_BUFFER);

View File

@ -132,7 +132,9 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
logger.warn(ex.getLocalizedMessage());
options
.setUseDirectReads(false)
.setUseDirectIoForFlushAndCompaction(false);
.setUseDirectIoForFlushAndCompaction(false)
.setAllowMmapReads(true)
.setAllowMmapWrites(true);
}
default -> throw ex;
}
@ -270,8 +272,6 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
// LOW MEMORY
options
.setLevelCompactionDynamicLevelBytes(false)
.setAllowMmapReads(false)
.setAllowMmapWrites(false)
.setBytesPerSync(0) // default
.setWalBytesPerSync(0) // default
.setIncreaseParallelism(1)
@ -317,6 +317,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
if (USE_DIRECT_IO) {
options
.setAllowMmapReads(false)
.setAllowMmapWrites(false)
.setUseDirectIoForFlushAndCompaction(true)
.setUseDirectReads(true)
// Option to enable readahead in compaction
@ -486,6 +488,18 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
.subscribeOn(dbScheduler);
}
@Override
public Mono<Void> verifyChecksum() {
return Mono
.<Void>fromCallable(() -> {
db.verifyChecksum();
return null;
})
.onErrorMap(cause -> new IOException("Failed to verify checksum of database \""
+ getDatabaseName() + "\"", cause))
.subscribeOn(dbScheduler);
}
@Override
public ByteBufAllocator getAllocator() {
return allocator;