Prevent reading corrupted database parts, reimplement badblocks

This commit is contained in:
Andrea Cavalli 2021-06-27 11:58:12 +02:00
parent 63cdb43644
commit 1d281d0305
7 changed files with 115 additions and 46 deletions

View File

@ -2,6 +2,7 @@ package it.cavallium.dbengine.database;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import it.cavallium.dbengine.client.BadBlock;
import java.util.List;
import java.util.Map.Entry;
import java.util.function.Function;
@ -88,6 +89,8 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
Flux<ByteBuf> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength);
Flux<BadBlock> badBlocks(LLRange range);
Mono<Void> setRange(LLRange range, Flux<Entry<ByteBuf, ByteBuf>> entries);
default Mono<Void> replaceRange(LLRange range,

View File

@ -426,19 +426,10 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
@Override
public Flux<BadBlock> badBlocks() {
return this
.getAllValues(null)
.doOnNext(entry -> {
if (entry.getKey() instanceof ReferenceCounted referenceCounted) {
referenceCounted.release();
}
if (entry.getValue() instanceof ReferenceCounted referenceCounted) {
referenceCounted.release();
}
})
.concatMap(entry -> Mono.<BadBlock>empty())
.onErrorResume(ex -> Mono.just(new BadBlock(dictionary.getDatabaseName(),
Column.special(dictionary.getColumnName()), null, ex)));
return Flux
.defer(() -> dictionary.badBlocks(range.retain()))
.doFirst(range::retain)
.doAfterTerminate(range::release);
}
private static record GroupBuffers(ByteBuf groupKeyWithExt, ByteBuf groupKeyWithoutExt, ByteBuf groupSuffix) {}

View File

@ -151,19 +151,7 @@ public class DatabaseSingle<U> implements DatabaseStageEntry<U> {
@Override
public Flux<BadBlock> badBlocks() {
return this
.get(null, true)
.doOnNext(entry -> {
if (entry instanceof ReferenceCounted referenceCounted) {
referenceCounted.release();
}
})
.then(Mono.<BadBlock>empty())
.onErrorResume(ex -> Mono.just(new BadBlock(dictionary.getDatabaseName(),
Column.special(dictionary.getDatabaseName()),
ByteList.of(LLUtils.toArray(key)),
ex
)))
.flux();
return Flux
.defer(() -> dictionary.badBlocks(LLRange.single(key.retain())));
}
}

View File

@ -3,6 +3,8 @@ package it.cavallium.dbengine.database.disk;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCounted;
import it.cavallium.dbengine.client.BadBlock;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
@ -11,10 +13,12 @@ import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.UpdateReturnMode;
import it.unimi.dsi.fastutil.bytes.ByteList;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -72,7 +76,10 @@ public class LLLocalDictionary implements LLDictionary {
static final WriteOptions EMPTY_WRITE_OPTIONS = new UnmodifiableWriteOptions();
static final WriteOptions BATCH_WRITE_OPTIONS = new UnmodifiableWriteOptions();
static final boolean PREFER_SEEK_TO_FIRST = false;
static final boolean VERIFY_CHECKSUMS_WHEN_NOT_NEEDED = false;
/**
* It used to be false, now it's true to avoid crashes during iterations on completely corrupted files
*/
static final boolean VERIFY_CHECKSUMS_WHEN_NOT_NEEDED = true;
public static final boolean DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED = true;
/**
* Default: true. Use false to debug problems with windowing.
@ -107,6 +114,11 @@ public class LLLocalDictionary implements LLDictionary {
private static final boolean USE_DIRECT_BUFFER_BOUNDS = true;
private static final int INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES = 4096;
/**
* 1KiB dummy buffer, write only, used for debugging purposes
*/
private static final ByteBuffer DUMMY_WRITE_ONLY_BYTE_BUFFER = ByteBuffer.allocateDirect(1024);
static {
boolean assertionsEnabled = false;
//noinspection AssertWithSideEffects
@ -433,6 +445,7 @@ public class LLLocalDictionary implements LLDictionary {
} else {
rocksIterator.seekToFirst();
}
rocksIterator.status();
return rocksIterator.isValid();
}
}
@ -1234,6 +1247,49 @@ public class LLLocalDictionary implements LLDictionary {
}
}
@Override
public Flux<BadBlock> badBlocks(LLRange range) {
return Flux
.<BadBlock>create(sink -> {
try {
var ro = new ReadOptions(getReadOptions(null));
ro.setFillCache(false);
if (!range.isSingle()) {
ro.setReadaheadSize(32 * 1024);
}
ro.setVerifyChecksums(true);
var rocksIteratorTuple = getRocksIterator(ro, range.retain(), db, cfh);
try {
try (var rocksIterator = rocksIteratorTuple.getT1()) {
rocksIterator.seekToFirst();
rocksIterator.status();
while (rocksIterator.isValid()) {
try {
rocksIterator.status();
rocksIterator.key(DUMMY_WRITE_ONLY_BYTE_BUFFER);
rocksIterator.status();
rocksIterator.value(DUMMY_WRITE_ONLY_BYTE_BUFFER);
rocksIterator.status();
} catch (RocksDBException ex) {
sink.next(new BadBlock(databaseName, Column.special(columnName), null, ex));
}
rocksIterator.next();
}
}
} finally {
rocksIteratorTuple.getT2().release();
rocksIteratorTuple.getT3().release();
}
sink.complete();
} catch (Throwable ex) {
sink.error(ex);
}
})
.subscribeOn(dbScheduler)
.doFirst(range::retain)
.doAfterTerminate(range::release);
}
@Override
public Flux<ByteBuf> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) {
try {
@ -1330,9 +1386,11 @@ public class LLLocalDictionary implements LLDictionary {
} else {
it.seekToFirst();
}
it.status();
while (it.isValid()) {
db.delete(cfh, it.key());
it.next();
it.status();
}
} finally {
maxBound.release();
@ -1481,9 +1539,11 @@ public class LLLocalDictionary implements LLDictionary {
} else {
rocksIterator.seekToFirst();
}
rocksIterator.status();
while (rocksIterator.isValid()) {
writeBatch.delete(cfh, LLUtils.readDirectNioBuffer(alloc, rocksIterator::key));
rocksIterator.next();
rocksIterator.status();
}
} finally {
maxBound.release();
@ -1519,9 +1579,11 @@ public class LLLocalDictionary implements LLDictionary {
} else {
rocksIterator.seekToFirst();
}
rocksIterator.status();
while (rocksIterator.isValid()) {
writeBatch.delete(cfh, rocksIterator.key());
rocksIterator.next();
rocksIterator.status();
}
} finally {
maxBound.release();
@ -1619,14 +1681,15 @@ public class LLLocalDictionary implements LLDictionary {
byte[] firstDeletedKey = null;
byte[] lastDeletedKey = null;
try (RocksIterator iter = db.newIterator(cfh, readOpts)) {
iter.seekToLast();
try (RocksIterator rocksIterator = db.newIterator(cfh, readOpts)) {
rocksIterator.seekToLast();
if (iter.isValid()) {
rocksIterator.status();
if (rocksIterator.isValid()) {
firstDeletedKey = FIRST_KEY;
lastDeletedKey = iter.key();
writeBatch.deleteRange(cfh, FIRST_KEY, iter.key());
writeBatch.delete(cfh, iter.key());
lastDeletedKey = rocksIterator.key();
writeBatch.deleteRange(cfh, FIRST_KEY, rocksIterator.key());
writeBatch.delete(cfh, rocksIterator.key());
}
}
@ -1697,8 +1760,10 @@ public class LLLocalDictionary implements LLDictionary {
rocksIterator.seekToFirst();
}
long i = 0;
rocksIterator.status();
while (rocksIterator.isValid()) {
rocksIterator.next();
rocksIterator.status();
i++;
}
return i;
@ -1748,6 +1813,7 @@ public class LLLocalDictionary implements LLDictionary {
} else {
rocksIterator.seekToFirst();
}
rocksIterator.status();
if (rocksIterator.isValid()) {
ByteBuf key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
try {
@ -1805,6 +1871,7 @@ public class LLLocalDictionary implements LLDictionary {
rocksIterator.seekToFirst();
}
ByteBuf key;
rocksIterator.status();
if (rocksIterator.isValid()) {
key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
return key;
@ -1827,7 +1894,7 @@ public class LLLocalDictionary implements LLDictionary {
}
}
private long fastSizeAll(@Nullable LLSnapshot snapshot) {
private long fastSizeAll(@Nullable LLSnapshot snapshot) throws RocksDBException {
try (var rocksdbSnapshot = new ReadOptions(resolveSnapshot(snapshot))) {
if (USE_CURRENT_FASTSIZE_FOR_OLD_SNAPSHOTS || rocksdbSnapshot.snapshot() == null) {
try {
@ -1843,12 +1910,14 @@ public class LLLocalDictionary implements LLDictionary {
rocksdbSnapshot.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
rocksdbSnapshot.setIgnoreRangeDeletions(true);
long count = 0;
try (RocksIterator iter = db.newIterator(cfh, rocksdbSnapshot)) {
iter.seekToFirst();
try (RocksIterator rocksIterator = db.newIterator(cfh, rocksdbSnapshot)) {
rocksIterator.seekToFirst();
rocksIterator.status();
// If it's a fast size of a snapshot, count only up to 100'000 elements
while (iter.isValid() && count < 100_000) {
while (rocksIterator.isValid() && count < 100_000) {
count++;
iter.next();
rocksIterator.next();
rocksIterator.status();
}
return count;
}
@ -1892,11 +1961,13 @@ public class LLLocalDictionary implements LLDictionary {
if (sliceBegin != null) {
rangeReadOpts.setIterateUpperBound(sliceEnd);
}
try (RocksIterator iter = db.newIterator(cfh, rangeReadOpts)) {
iter.seekToFirst();
while (iter.isValid()) {
try (RocksIterator rocksIterator = db.newIterator(cfh, rangeReadOpts)) {
rocksIterator.seekToFirst();
rocksIterator.status();
while (rocksIterator.isValid()) {
partialCount++;
iter.next();
rocksIterator.next();
rocksIterator.status();
}
return partialCount;
}
@ -1956,6 +2027,7 @@ public class LLLocalDictionary implements LLDictionary {
} else {
rocksIterator.seekToFirst();
}
rocksIterator.status();
if (!rocksIterator.isValid()) {
return null;
}

View File

@ -12,6 +12,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksMutableObject;
import reactor.core.publisher.Flux;
import static io.netty.buffer.Unpooled.*;
@ -57,6 +58,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> {
ObjectArrayList<T> values = new ObjectArrayList<>();
ByteBuf firstGroupKey = null;
try {
rocksIterator.status();
while (rocksIterator.isValid()) {
ByteBuf key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
try {
@ -73,6 +75,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> {
}
try {
rocksIterator.next();
rocksIterator.status();
T entry = getEntry(key.retain(), value.retain());
values.add(entry);
} finally {
@ -92,10 +95,12 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> {
} else {
sink.complete();
}
return tuple;
} catch (RocksDBException ex) {
sink.error(ex);
} finally {
range.release();
}
return tuple;
}, tuple -> {
var rocksIterator = tuple.getT1();
rocksIterator.close();

View File

@ -9,6 +9,7 @@ import java.util.Arrays;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksMutableObject;
import reactor.core.publisher.Flux;
import static io.netty.buffer.Unpooled.*;
@ -54,6 +55,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator {
range.retain();
try {
var rocksIterator = tuple.getT1();
rocksIterator.status();
ByteBuf firstGroupKey = null;
try {
while (rocksIterator.isValid()) {
@ -65,6 +67,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator {
break;
}
rocksIterator.next();
rocksIterator.status();
} finally {
key.release();
}
@ -80,10 +83,12 @@ public class LLLocalKeyPrefixReactiveRocksIterator {
firstGroupKey.release();
}
}
return tuple;
} catch (RocksDBException ex) {
sink.error(ex);
} finally {
range.release();
}
return tuple;
}, tuple -> {
var rocksIterator = tuple.getT1();
rocksIterator.close();

View File

@ -13,6 +13,7 @@ import org.jetbrains.annotations.NotNull;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.RocksMutableObject;
import reactor.core.publisher.Flux;
@ -61,6 +62,7 @@ public abstract class LLLocalReactiveRocksIterator<T> {
range.retain();
try {
var rocksIterator = tuple.getT1();
rocksIterator.status();
if (rocksIterator.isValid()) {
ByteBuf key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
try {
@ -72,6 +74,7 @@ public abstract class LLLocalReactiveRocksIterator<T> {
}
try {
rocksIterator.next();
rocksIterator.status();
sink.next(getEntry(key.retain(), value.retain()));
} finally {
value.release();
@ -82,10 +85,12 @@ public abstract class LLLocalReactiveRocksIterator<T> {
} else {
sink.complete();
}
return tuple;
} catch (RocksDBException ex) {
sink.error(ex);
} finally {
range.release();
}
return tuple;
}, tuple -> {
var rocksIterator = tuple.getT1();
rocksIterator.close();