Implement "badblocks" method

This commit is contained in:
Andrea Cavalli 2021-06-26 02:35:33 +02:00
parent ee05614115
commit bd8755c180
15 changed files with 105 additions and 17 deletions

View File

@ -0,0 +1,8 @@
package it.cavallium.dbengine.client;
import it.cavallium.dbengine.database.Column;
import it.unimi.dsi.fastutil.bytes.ByteList;
import org.jetbrains.annotations.Nullable;
public record BadBlock(String databaseName, @Nullable Column column, @Nullable ByteList rawKey,
@Nullable Throwable ex) {}

View File

@ -1,15 +1,27 @@
package it.cavallium.dbengine.client;
import io.netty.buffer.ByteBufAllocator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public interface CompositeDatabase {
Mono<Void> close();
Mono<CompositeSnapshot> takeSnapshot() throws SnapshotException;
/**
* Can return SnapshotException
*/
Mono<CompositeSnapshot> takeSnapshot();
Mono<Void> releaseSnapshot(CompositeSnapshot snapshot) throws SnapshotException;
/**
* Can return SnapshotException
*/
Mono<Void> releaseSnapshot(CompositeSnapshot snapshot);
ByteBufAllocator getAllocator();
/**
* Find corrupted items
*/
Flux<BadBlock> badBlocks();
}

View File

@ -15,6 +15,8 @@ import reactor.util.function.Tuple2;
@NotAtomic
public interface LLDictionary extends LLKeyValueDatabaseStructure {
String getColumnName();
ByteBufAllocator getAllocator();
Mono<ByteBuf> get(@Nullable LLSnapshot snapshot, ByteBuf key, boolean existsAlmostCertainly);

View File

@ -6,6 +6,7 @@ import io.netty.buffer.ByteBufAllocator;
import it.cavallium.dbengine.database.collections.DatabaseInt;
import it.cavallium.dbengine.database.collections.DatabaseLong;
import java.nio.charset.StandardCharsets;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public interface LLKeyValueDatabase extends LLSnapshottable, LLKeyValueDatabaseStructure {

View File

@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.collections;
import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCounted;
import it.cavallium.dbengine.client.BadBlock;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLDictionary;

View File

@ -3,7 +3,9 @@ package it.cavallium.dbengine.database.collections;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCounted;
import it.cavallium.dbengine.client.BadBlock;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLRange;
@ -421,6 +423,15 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
return dictionary.getUpdateMode();
}
@Override
public Flux<BadBlock> badBlocks() {
return this
.getAllValues(null)
.flatMap(result -> Mono.<BadBlock>empty())
.onErrorResume(ex -> Mono.just(new BadBlock(dictionary.getDatabaseName(),
Column.special(dictionary.getColumnName()), null, ex)));
}
private static record GroupBuffers(ByteBuf groupKeyWithExt, ByteBuf groupKeyWithoutExt, ByteBuf groupSuffix) {}
@Override

View File

@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.collections;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import it.cavallium.dbengine.client.BadBlock;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLDictionary;
@ -146,6 +147,11 @@ public class DatabaseMapDictionaryHashed<T, U, TH> implements DatabaseStageMap<T
return this;
}
@Override
public Flux<BadBlock> badBlocks() {
return this.subDictionary.badBlocks();
}
@Override
public void release() {
this.subDictionary.release();

View File

@ -2,7 +2,9 @@ package it.cavallium.dbengine.database.collections;
import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCounted;
import it.cavallium.dbengine.client.BadBlock;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
@ -11,9 +13,11 @@ import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.unimi.dsi.fastutil.bytes.ByteList;
import java.util.Optional;
import java.util.function.Function;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import static io.netty.buffer.Unpooled.*;
@ -144,4 +148,17 @@ public class DatabaseSingle<U> implements DatabaseStageEntry<U> {
public void release() {
key.release();
}
@Override
public Flux<BadBlock> badBlocks() {
return this
.get(null, true)
.then(Mono.<BadBlock>empty())
.onErrorResume(ex -> Mono.just(new BadBlock(dictionary.getDatabaseName(),
Column.special(dictionary.getDatabaseName()),
ByteList.of(LLUtils.toArray(key)),
ex
)))
.flux();
}
}

View File

@ -1,6 +1,8 @@
package it.cavallium.dbengine.database.collections;
import it.cavallium.dbengine.client.BadBlock;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateReturnMode;
@ -16,6 +18,7 @@ import java.util.function.Function;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.functional.TriFunction;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@SuppressWarnings("unused")
@ -122,6 +125,11 @@ public class DatabaseSingleBucket<K, V, TH> implements DatabaseStageEntry<V> {
return this;
}
@Override
public Flux<BadBlock> badBlocks() {
return bucketStage.badBlocks();
}
@Override
public void release() {
bucketStage.release();

View File

@ -1,5 +1,6 @@
package it.cavallium.dbengine.database.collections;
import it.cavallium.dbengine.client.BadBlock;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLUtils;
@ -7,6 +8,7 @@ import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.serialization.Serializer;
import java.util.function.Function;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@SuppressWarnings("unused")
@ -107,6 +109,11 @@ public class DatabaseSingleMapped<A, B> implements DatabaseStageEntry<A> {
return this;
}
@Override
public Flux<BadBlock> badBlocks() {
return this.serializedSingle.badBlocks();
}
@Override
public void release() {
serializedSingle.release();

View File

@ -1,5 +1,6 @@
package it.cavallium.dbengine.database.collections;
import it.cavallium.dbengine.client.BadBlock;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLUtils;
@ -7,6 +8,7 @@ import it.cavallium.dbengine.database.UpdateReturnMode;
import java.util.Objects;
import java.util.function.Function;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public interface DatabaseStage<T> extends DatabaseStageWithEntry<T> {
@ -87,4 +89,6 @@ public interface DatabaseStage<T> extends DatabaseStageWithEntry<T> {
default Mono<Boolean> isEmpty(@Nullable CompositeSnapshot snapshot) {
return leavesCount(snapshot, false).map(size -> size <= 0);
}
Flux<BadBlock> badBlocks();
}

View File

@ -1,5 +1,8 @@
package it.cavallium.dbengine.database.collections;
import it.cavallium.dbengine.client.BadBlock;
import reactor.core.publisher.Flux;
public interface DatabaseStageEntry<U> extends DatabaseStage<U> {
@Override

View File

@ -1,5 +1,8 @@
package it.cavallium.dbengine.database.collections;
import it.cavallium.dbengine.client.BadBlock;
import reactor.core.publisher.Mono;
public interface DatabaseStageWithEntry<T> {
DatabaseStageEntry<T> entry();

View File

@ -118,6 +118,7 @@ public class LLLocalDictionary implements LLDictionary {
private final RocksDB db;
private final ColumnFamilyHandle cfh;
private final String databaseName;
private final String columnName;
private final Scheduler dbScheduler;
private final Function<LLSnapshot, Snapshot> snapshotResolver;
private final Striped<StampedLock> itemsLock = Striped.readWriteStampedLock(STRIPES);
@ -131,7 +132,7 @@ public class LLLocalDictionary implements LLDictionary {
@NotNull RocksDB db,
@NotNull ColumnFamilyHandle columnFamilyHandle,
String databaseName,
String columnDisplayName,
String columnName,
Scheduler dbScheduler,
Function<LLSnapshot, Snapshot> snapshotResolver,
UpdateMode updateMode) {
@ -140,11 +141,12 @@ public class LLLocalDictionary implements LLDictionary {
Objects.requireNonNull(columnFamilyHandle);
this.cfh = columnFamilyHandle;
this.databaseName = databaseName;
this.columnName = columnName;
this.dbScheduler = dbScheduler;
this.snapshotResolver = snapshotResolver;
this.updateMode = updateMode;
this.getRangeMultiDebugName = databaseName + "(" + columnDisplayName + ")" + "::getRangeMulti";
this.getRangeKeysMultiDebugName = databaseName + "(" + columnDisplayName + ")" + "::getRangeKeysMulti";
this.getRangeMultiDebugName = databaseName + "(" + columnName + ")" + "::getRangeMulti";
this.getRangeKeysMultiDebugName = databaseName + "(" + columnName + ")" + "::getRangeKeysMulti";
alloc = allocator;
}
@ -153,6 +155,10 @@ public class LLLocalDictionary implements LLDictionary {
return databaseName;
}
public String getColumnName() {
return columnName;
}
/**
* Please don't modify the returned ReadOptions! If you want to modify it, wrap it into a new ReadOptions!
*/

View File

@ -48,6 +48,7 @@ import org.rocksdb.WALRecoveryMode;
import org.rocksdb.WriteBufferManager;
import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
@ -457,18 +458,16 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
@Override
public Mono<LLLocalDictionary> getDictionary(byte[] columnName, UpdateMode updateMode) {
return Mono
.fromCallable(() -> {
return new LLLocalDictionary(
allocator,
db,
getCfh(columnName),
name,
Column.toString(columnName),
dbScheduler,
(snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()),
updateMode
);
})
.fromCallable(() -> new LLLocalDictionary(
allocator,
db,
getCfh(columnName),
name,
Column.toString(columnName),
dbScheduler,
(snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()),
updateMode
))
.subscribeOn(dbScheduler);
}