From 85bfdc33e97699f970ad8c2a609a2f095c4e8c51 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Tue, 26 Sep 2023 00:34:44 +0200 Subject: [PATCH] Implement repair module, improve badBlocks function, read-only mode --- pom.xml | 92 ++++++- src/main/data-generator/quic-rpc.yaml | 2 + .../cavallium/dbengine/client/BadBlock.java | 8 - .../dbengine/client/CompositeDatabase.java | 2 +- .../client/DefaultDatabaseOptions.java | 2 + .../dbengine/client/VerificationProgress.java | 29 ++ .../dbengine/database/LLDictionary.java | 6 +- .../cavallium/dbengine/database/LLRange.java | 71 ++++- .../cavallium/dbengine/database/LLUtils.java | 4 +- .../DatabaseMapDictionaryDeep.java | 5 +- .../DatabaseMapDictionaryHashed.java | 4 +- .../collections/DatabaseMapSingle.java | 4 +- .../collections/DatabaseSingleBucket.java | 4 +- .../collections/DatabaseSingleMapped.java | 4 +- .../collections/DatabaseSingleton.java | 4 +- .../database/collections/DatabaseStage.java | 4 +- .../database/disk/AbstractRocksDBColumn.java | 13 + .../disk/LLLocalDatabaseConnection.java | 6 +- .../database/disk/LLLocalDictionary.java | 233 +++++++++++++--- .../disk/LLLocalKeyValueDatabase.java | 25 +- .../database/disk/LiveFileMetadata.java | 8 + .../dbengine/database/disk/RocksDBColumn.java | 3 + .../database/disk/rocksdb/LLReadOptions.java | 3 + .../database/memory/LLMemoryDictionary.java | 7 +- .../it/cavallium/dbengine/repair/Repair.java | 248 ++++++++++++++++++ src/repair/resources/log4j2.xml | 23 ++ .../cavallium/dbengine/tests/LLRangeTest.java | 120 +++++++++ src/test/java/module-info.java | 1 + 28 files changed, 839 insertions(+), 96 deletions(-) delete mode 100644 src/main/java/it/cavallium/dbengine/client/BadBlock.java create mode 100644 src/main/java/it/cavallium/dbengine/client/VerificationProgress.java create mode 100644 src/main/java/it/cavallium/dbengine/database/disk/LiveFileMetadata.java create mode 100644 src/repair/java/it/cavallium/dbengine/repair/Repair.java create mode 100644 src/repair/resources/log4j2.xml create mode 100644 src/test/java/it/cavallium/dbengine/tests/LLRangeTest.java diff --git a/pom.xml b/pom.xml index 3b1c9ed..7911d47 100644 --- a/pom.xml +++ b/pom.xml @@ -16,7 +16,7 @@ 9.7.0 8.5.3 5.9.0 - 1.0.10 + 1.0.12 @@ -364,14 +364,6 @@ src/test/java - - - ../src/main/libs - - **/*.jar - - - kr.motd.maven @@ -390,7 +382,7 @@ maven-compiler-plugin 3.11.0 - 17 + 21 io.soabase.record-builder @@ -401,7 +393,9 @@ io.soabase.recordbuilder.processor.RecordBuilderProcessor - + 21 + 21 + it.cavallium @@ -535,4 +529,80 @@ + + + repair + + false + + dbengine.build + repair + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 3.4.0 + + + add-source + generate-sources + + add-source + + + + src/repair/java + + + + + + + org.apache.maven.plugins + maven-assembly-plugin + 3.6.0 + + + jar-with-dependencies + + + + it.cavallium.dbengine.repair.Repair + + + + + + make-assembly + package + + single + + + + + + + + src/repair/resources + + + + + + org.apache.logging.log4j + log4j-slf4j2-impl + 2.20.0 + + + com.lmax + disruptor + 3.4.4 + + + + diff --git a/src/main/data-generator/quic-rpc.yaml b/src/main/data-generator/quic-rpc.yaml index df09519..afee9c9 100644 --- a/src/main/data-generator/quic-rpc.yaml +++ b/src/main/data-generator/quic-rpc.yaml @@ -255,6 +255,8 @@ baseTypesData: columnOptions: NamedColumnOptions[] logPath: -String walPath: -String + openAsSecondary: boolean + secondaryDirectoryName: -String # Remember to update ColumnOptions common getters DefaultColumnOptions: data: diff --git a/src/main/java/it/cavallium/dbengine/client/BadBlock.java b/src/main/java/it/cavallium/dbengine/client/BadBlock.java deleted file mode 100644 index 1edfdbc..0000000 --- a/src/main/java/it/cavallium/dbengine/client/BadBlock.java +++ /dev/null @@ -1,8 +0,0 @@ -package it.cavallium.dbengine.client; - -import it.cavallium.buffer.Buf; -import it.cavallium.dbengine.rpc.current.data.Column; -import org.jetbrains.annotations.Nullable; - -public record BadBlock(String databaseName, @Nullable Column column, @Nullable Buf rawKey, - @Nullable Throwable ex) {} diff --git a/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java b/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java index 8d51878..30afe41 100644 --- a/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java +++ b/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java @@ -26,7 +26,7 @@ public interface CompositeDatabase extends DatabaseProperties, DatabaseOperation /** * Find corrupted items */ - Stream badBlocks(); + Stream verify(); void verifyChecksum(); } diff --git a/src/main/java/it/cavallium/dbengine/client/DefaultDatabaseOptions.java b/src/main/java/it/cavallium/dbengine/client/DefaultDatabaseOptions.java index 9dbec59..18567fe 100644 --- a/src/main/java/it/cavallium/dbengine/client/DefaultDatabaseOptions.java +++ b/src/main/java/it/cavallium/dbengine/client/DefaultDatabaseOptions.java @@ -66,6 +66,8 @@ public class DefaultDatabaseOptions { DEFAULT_DEFAULT_COLUMN_OPTIONS, List.of(), NullableString.empty(), + NullableString.empty(), + false, NullableString.empty() ); diff --git a/src/main/java/it/cavallium/dbengine/client/VerificationProgress.java b/src/main/java/it/cavallium/dbengine/client/VerificationProgress.java new file mode 100644 index 0000000..22f2da8 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/client/VerificationProgress.java @@ -0,0 +1,29 @@ +package it.cavallium.dbengine.client; + +import it.cavallium.buffer.Buf; +import it.cavallium.dbengine.rpc.current.data.Column; +import org.jetbrains.annotations.Nullable; + +public sealed interface VerificationProgress { + record BlockBad(String databaseName, Column column, Buf rawKey, String file, Throwable ex) + implements VerificationProgress {} + record FileOk(String databaseName, Column column, String file) + implements VerificationProgress {} + record Progress(String databaseName, Column column, String file, + long scanned, long total, + long fileScanned, long fileTotal) + implements VerificationProgress { + + public double getProgress() { + return scanned / (double) total; + } + + public double getFileProgress() { + return fileScanned / (double) fileTotal; + } + } + + @Nullable String databaseName(); + @Nullable Column column(); + @Nullable String file(); +} diff --git a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java index f9909dc..ad2c6d1 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java @@ -1,16 +1,14 @@ package it.cavallium.dbengine.database; import it.cavallium.buffer.Buf; -import it.cavallium.dbengine.client.BadBlock; +import it.cavallium.dbengine.client.VerificationProgress; import it.cavallium.dbengine.database.serialization.KVSerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationFunction; -import java.io.IOException; import java.util.List; import java.util.function.Function; import java.util.stream.Stream; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import org.rocksdb.RocksDBException; @SuppressWarnings("unused") public interface LLDictionary extends LLKeyValueDatabaseStructure { @@ -66,7 +64,7 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure { int prefixLength, boolean smallRange); - Stream badBlocks(LLRange range); + Stream badBlocks(LLRange range); void setRange(LLRange range, Stream entries, boolean smallRange); diff --git a/src/main/java/it/cavallium/dbengine/database/LLRange.java b/src/main/java/it/cavallium/dbengine/database/LLRange.java index 7d7306d..defb906 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLRange.java +++ b/src/main/java/it/cavallium/dbengine/database/LLRange.java @@ -1,6 +1,8 @@ package it.cavallium.dbengine.database; import it.cavallium.buffer.Buf; +import it.cavallium.dbengine.database.disk.LiveFileMetadata; +import java.util.Objects; import java.util.StringJoiner; import org.jetbrains.annotations.Nullable; @@ -19,6 +21,8 @@ public class LLRange { private LLRange(@Nullable Buf min, @Nullable Buf max, @Nullable Buf single) { assert single == null || (min == null && max == null); + assert min == null || max == null || min.compareTo(max) <= 0 + : "Minimum buffer is bigger than maximum buffer: " + min + " > " + max; this.min = min; this.max = max; this.single = single; @@ -44,6 +48,57 @@ public class LLRange { return new LLRange(min, max, null); } + public static boolean isInside(LLRange rangeSub, LLRange rangeParent) { + if (rangeParent.isAll()) { + return true; + } else if (rangeParent.isSingle()) { + return Objects.equals(rangeSub, rangeParent); + } else { + return ((!rangeParent.hasMin() || (rangeSub.hasMin() && rangeParent.getMin().compareTo(rangeSub.getMin()) <= 0))) + && ((!rangeParent.hasMax() || (rangeSub.hasMax() && rangeParent.getMax().compareTo(rangeSub.getMax()) >= 0))); + } + } + + @Nullable + public static LLRange intersect(LLRange rangeA, LLRange rangeB) { + boolean aEndInclusive = rangeA.isSingle(); + boolean bEndInclusive = rangeB.isSingle(); + Buf min = rangeA.isAll() + ? rangeB.getMin() + : (rangeB.isAll() + ? rangeA.getMin() + : (rangeA.getMin().compareTo(rangeB.getMin()) <= 0 ? rangeB.getMin() : rangeA.getMin())); + int aComparedToB; + Buf max; + boolean maxInclusive; + if (rangeA.isAll()) { + max = rangeB.getMax(); + maxInclusive = bEndInclusive; + } else if (rangeB.isAll()) { + max = rangeA.getMax(); + maxInclusive = aEndInclusive; + } else if ((aComparedToB = rangeA.getMax().compareTo(rangeB.getMax())) >= 0) { + max = rangeB.getMax(); + if (aComparedToB == 0) { + maxInclusive = bEndInclusive && aEndInclusive; + } else { + maxInclusive = bEndInclusive; + } + } else { + max = rangeA.getMax(); + maxInclusive = aEndInclusive; + } + if (min != null && max != null && min.compareTo(max) >= (maxInclusive ? 1 : 0)) { + return null; + } else { + if (min != null && min.equals(max)) { + return LLRange.single(min); + } else { + return LLRange.of(min, max); + } + } + } + public boolean isAll() { return min == null && max == null && single == null; } @@ -108,12 +163,20 @@ public class LLRange { return result; } + @SuppressWarnings("UnnecessaryUnicodeEscape") @Override public String toString() { - return new StringJoiner(", ", LLRange.class.getSimpleName() + "[", "]") - .add("min=" + LLUtils.toString(min)) - .add("max=" + LLUtils.toString(max)) - .toString(); + if (single != null) { + return "[" + single + "]"; + } else if (min != null && max != null) { + return "[" + LLUtils.toString(min) + "," + LLUtils.toString(max) + ")"; + } else if (min != null) { + return "[" + min + ",\u221E)"; + } else if (max != null) { + return "[\u2205," + max + ")"; + } else { + return "[\u221E)"; + } } public LLRange copy() { diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 3d25966..b6f9fa9 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -91,7 +91,7 @@ public class LLUtils { private static final Consumer NULL_CONSUMER = ignored -> {}; private static final Buf BUF_TRUE = Buf.wrap(new byte[] {(byte) 1}); private static final Buf BUF_FALSE = Buf.wrap(new byte[] {(byte) 0}); - private static final HexFormat HEX_FORMAT = HexFormat.of().withUpperCase(); + public static final HexFormat HEX_FORMAT = HexFormat.of().withUpperCase(); static { for (int i1 = 0; i1 < 256; i1++) { @@ -487,7 +487,7 @@ public class LLUtils { } } readOptions.setFillCache(canFillCache && !hugeRange); - readOptions.setVerifyChecksums(!FORCE_DISABLE_CHECKSUM_VERIFICATION && !hugeRange); + readOptions.setVerifyChecksums(!FORCE_DISABLE_CHECKSUM_VERIFICATION || !hugeRange); return readOptions; } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index a9b2e71..e5a2148 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -5,7 +5,7 @@ import static it.cavallium.dbengine.utils.StreamUtils.resourceStream; import it.cavallium.buffer.Buf; import it.cavallium.buffer.BufDataInput; import it.cavallium.buffer.BufDataOutput; -import it.cavallium.dbengine.client.BadBlock; +import it.cavallium.dbengine.client.VerificationProgress; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; @@ -18,7 +18,6 @@ import it.cavallium.dbengine.database.collections.DatabaseEmpty.Nothing; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; -import it.cavallium.dbengine.utils.StreamUtils; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap; import java.util.Map.Entry; import java.util.Optional; @@ -270,7 +269,7 @@ public class DatabaseMapDictionaryDeep> implem } @Override - public Stream badBlocks() { + public Stream badBlocks() { return dictionary.badBlocks(range); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java index dd760ea..a4777b7 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java @@ -1,7 +1,7 @@ package it.cavallium.dbengine.database.collections; import it.cavallium.buffer.Buf; -import it.cavallium.dbengine.client.BadBlock; +import it.cavallium.dbengine.client.VerificationProgress; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.SubStageEntry; @@ -148,7 +148,7 @@ public class DatabaseMapDictionaryHashed implements DatabaseStageMap badBlocks() { + public Stream badBlocks() { return this.subDictionary.badBlocks(); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java index 22cf09f..6ba1c83 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java @@ -3,7 +3,7 @@ package it.cavallium.dbengine.database.collections; import it.cavallium.buffer.Buf; import it.cavallium.buffer.BufDataInput; import it.cavallium.buffer.BufDataOutput; -import it.cavallium.dbengine.client.BadBlock; +import it.cavallium.dbengine.client.VerificationProgress; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLDictionary; @@ -121,7 +121,7 @@ public final class DatabaseMapSingle implements DatabaseStageEntry { } @Override - public Stream badBlocks() { + public Stream badBlocks() { return dictionary.badBlocks(LLRange.single(key)); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java index 280a2de..5254882 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java @@ -1,6 +1,6 @@ package it.cavallium.dbengine.database.collections; -import it.cavallium.dbengine.client.BadBlock; +import it.cavallium.dbengine.client.VerificationProgress; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLUtils; @@ -124,7 +124,7 @@ public class DatabaseSingleBucket implements DatabaseStageEntry { } @Override - public Stream badBlocks() { + public Stream badBlocks() { return bucketStage.badBlocks(); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java index 467fdf6..973c895 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java @@ -1,6 +1,6 @@ package it.cavallium.dbengine.database.collections; -import it.cavallium.dbengine.client.BadBlock; +import it.cavallium.dbengine.client.VerificationProgress; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.client.Mapper; import it.cavallium.dbengine.database.Delta; @@ -107,7 +107,7 @@ public class DatabaseSingleMapped implements DatabaseStageEntry { } @Override - public Stream badBlocks() { + public Stream badBlocks() { return this.serializedSingle.badBlocks(); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java index 8d2b4c5..2440633 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java @@ -3,7 +3,7 @@ package it.cavallium.dbengine.database.collections; import it.cavallium.buffer.Buf; import it.cavallium.buffer.BufDataInput; import it.cavallium.buffer.BufDataOutput; -import it.cavallium.dbengine.client.BadBlock; +import it.cavallium.dbengine.client.VerificationProgress; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLSingleton; @@ -119,7 +119,7 @@ public class DatabaseSingleton implements DatabaseStageEntry { } @Override - public Stream badBlocks() { + public Stream badBlocks() { return Stream.empty(); } } \ No newline at end of file diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java index dafa877..bc77c49 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java @@ -1,6 +1,6 @@ package it.cavallium.dbengine.database.collections; -import it.cavallium.dbengine.client.BadBlock; +import it.cavallium.dbengine.client.VerificationProgress; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLUtils; @@ -64,5 +64,5 @@ public interface DatabaseStage extends DatabaseStageWithEntry { return leavesCount(snapshot, false) <= 0; } - Stream badBlocks(); + Stream badBlocks(); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java index 186fd0e..e453415 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java @@ -7,6 +7,7 @@ import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Timer; import it.cavallium.buffer.Buf; +import it.cavallium.dbengine.database.ColumnUtils; import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.RepeatedElementList; @@ -18,10 +19,12 @@ import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.utils.SimpleResource; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.StampedLock; +import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; @@ -32,6 +35,7 @@ import org.rocksdb.CompactRangeOptions; import org.rocksdb.FlushOptions; import org.rocksdb.Holder; import org.rocksdb.KeyMayExist; +import org.rocksdb.LiveFileMetaData; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.RocksObject; @@ -555,6 +559,15 @@ public sealed abstract class AbstractRocksDBColumn implements } } + @Override + public Stream getAllLiveFiles() throws RocksDBException { + byte[] cfhName = cfh.getName(); + return db.getLiveFilesMetaData().stream() + .filter(file -> Arrays.equals(cfhName, file.columnFamilyName())) + .map(file -> new LiveFileMetadata(file.path(), file.fileName(), file.level(), columnName, + file.numEntries(),file.size(), LLRange.of(Buf.wrap(file.smallestKey()), Buf.wrap(file.largestKey())))); + } + protected int getLevels() { var closeReadLock = closeLock.readLock(); try { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java index a8b2205..38625bd 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java @@ -60,13 +60,17 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection { return new LLLocalKeyValueDatabase(meterRegistry, name, inMemory, - basePath.resolve("database_" + name), + getDatabasePath(name), columns, new LinkedList<>(), databaseOptions ); } + public Path getDatabasePath(String databaseName) { + return basePath.resolve("database_" + databaseName); + } + @Override public LLLuceneIndex getLuceneIndex(String clusterName, LuceneIndexStructure indexStructure, diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 12eb764..e2d7a30 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -4,7 +4,6 @@ import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB; import static it.cavallium.dbengine.database.LLUtils.isBoundedRange; import static it.cavallium.dbengine.database.LLUtils.mapList; import static it.cavallium.dbengine.database.LLUtils.toStringSafe; -import static it.cavallium.dbengine.database.LLUtils.wrapNullable; import static it.cavallium.dbengine.database.disk.UpdateAtomicResultMode.DELTA; import static it.cavallium.dbengine.utils.StreamUtils.ROCKSDB_POOL; import static it.cavallium.dbengine.utils.StreamUtils.collectOn; @@ -18,7 +17,10 @@ import static it.cavallium.dbengine.utils.StreamUtils.batches; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Timer; import it.cavallium.buffer.Buf; -import it.cavallium.dbengine.client.BadBlock; +import it.cavallium.dbengine.client.VerificationProgress; +import it.cavallium.dbengine.client.VerificationProgress.BlockBad; +import it.cavallium.dbengine.client.VerificationProgress.FileOk; +import it.cavallium.dbengine.client.VerificationProgress.Progress; import it.cavallium.dbengine.database.ColumnUtils; import it.cavallium.dbengine.database.LLDelta; import it.cavallium.dbengine.database.LLDictionary; @@ -36,19 +38,26 @@ import it.cavallium.dbengine.database.disk.rocksdb.LLReadOptions; import it.cavallium.dbengine.database.disk.rocksdb.LLWriteOptions; import it.cavallium.dbengine.database.serialization.KVSerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationFunction; -import it.cavallium.dbengine.rpc.current.data.DatabaseOptions; +import it.cavallium.dbengine.rpc.current.data.Column; import it.cavallium.dbengine.utils.DBException; +import it.cavallium.dbengine.utils.StreamUtils; import java.io.IOException; -import java.nio.ByteBuffer; +import java.math.BigInteger; +import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; import java.util.stream.IntStream; import java.util.stream.Stream; -import org.apache.commons.lang3.tuple.Pair; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.util.Supplier; @@ -628,38 +637,95 @@ public class LLLocalDictionary implements LLDictionary { } @Override - public Stream badBlocks(LLRange range) { + public Stream badBlocks(LLRange rangeFull) { + Set brokenFiles = new ConcurrentHashMap().keySet(true); + LongAdder totalScanned = new LongAdder(); + AtomicLong totalEstimate = new AtomicLong(); + try { - return resourceStream( - () -> LLUtils.generateCustomReadOptions(null, false, isBoundedRange(range), false), - ro -> { - ro.setFillCache(false); - if (!range.isSingle()) { - if (LLUtils.MANUAL_READAHEAD) { - ro.setReadaheadSize(32 * 1024); - } - } - ro.setVerifyChecksums(true); - return resourceStream(() -> db.newRocksIterator(ro, range, false), rocksIterator -> { - rocksIterator.seekToFirst(); - return streamWhileNonNull(() -> { - if (!rocksIterator.isValid()) return null; - Buf rawKey = null; + totalEstimate.set(db.getNumEntries()); + } catch (Throwable ex) { + logger.warn("Failed to get total entries count", ex); + } + + Column column = ColumnUtils.special(columnName); + try { + record FileRange(String path, @Nullable LLRange range, long countEstimate) {} + + return db.getAllLiveFiles() + .map(metadata -> new FileRange(Path.of(metadata.path()).resolve("./" + metadata.fileName()).normalize().toString(), + LLRange.intersect(metadata.keysRange(), rangeFull), + metadata.numEntries() + )) + .filter(fr -> fr.range != null) + .parallel() + .flatMap(fr -> { + String path = fr.path; + LLRange rangePartial = fr.range; + AtomicLong fileScanned = new AtomicLong(); + final long fileEstimate = fr.countEstimate; + AtomicBoolean streamEnded = new AtomicBoolean(false); + totalScanned.add(fileEstimate); try { - rawKey = rocksIterator.keyBuf().copy(); - rocksIterator.next(); - } catch (RocksDBException ex) { - return new BadBlock(databaseName, ColumnUtils.special(columnName), rawKey, ex); + return resourceStream( + () -> LLUtils.generateCustomReadOptions(null, false, isBoundedRange(rangePartial), false), + ro -> { + ro.setFillCache(false); + if (!rangePartial.isSingle()) { + if (LLUtils.MANUAL_READAHEAD) { + ro.setReadaheadSize(32 * 1024); + } + } + ro.setVerifyChecksums(true); + return resourceStream(() -> db.newRocksIterator(ro, rangePartial, false), rocksIterator -> { + rocksIterator.seekToFirst(); + return StreamUtils.>streamWhileNonNull(() -> { + if (!rocksIterator.isValid()) { + if (streamEnded.compareAndSet(false, true)) { + totalScanned.add(fileScanned.get() - fileEstimate); + return Optional.of(new FileOk(databaseName, column, path)); + } else { + //noinspection OptionalAssignedToNull + return null; + } + } + boolean shouldSendStatus; + Buf rawKey = null; + try { + rawKey = rocksIterator.keyBuf().copy(); + shouldSendStatus = fileScanned.incrementAndGet() % 1_000_000 == 0; + rocksIterator.next(); + } catch (RocksDBException ex) { + return Optional.of(new BlockBad(databaseName, column, rawKey, path, ex)); + } + if (shouldSendStatus) { + long totalScannedValue = totalScanned.sum(); + long fileScannedVal = fileScanned.get(); + return Optional.of(new Progress(databaseName, + column, + path, + totalScannedValue, + Math.max(totalEstimate.get(), totalScannedValue), + fileScannedVal, + Math.max(fileEstimate, fileScannedVal) + )); + } else { + return Optional.empty(); + } + }).filter(Optional::isPresent).map(Optional::get).onClose(() -> { + rocksIterator.close(); + ro.close(); + }); + }); + } + ); + } catch (RocksDBException e) { + return Stream.of(new BlockBad(databaseName, column, null, path, e)); } - return null; - }).takeWhile(x -> rocksIterator.isValid()).onClose(() -> { - rocksIterator.close(); - ro.close(); - }); - }); - }); + }) + .filter(err -> !(err instanceof BlockBad blockBad && blockBad.rawKey() == null && !brokenFiles.add(blockBad.file()))); } catch (RocksDBException e) { - throw new DBException("Failed to get bad blocks", e); + return Stream.of(new BlockBad(databaseName, column, null, null, e)); } } @@ -1011,6 +1077,95 @@ public class LLLocalDictionary implements LLDictionary { } } + private static BigInteger getRangePointAt(LLRange range, + BigInteger minBi, + BigInteger maxBi, + BigInteger rangeBi, + int pointsCount, + BigInteger pointsCountBi, + int i) { + if (i == 0) { + return range.hasMin() ? minBi : null; + } else if (i + 1 == pointsCount) { + return range.hasMax() ? maxBi : null; + } else { + return minBi.add(rangeBi.multiply(BigInteger.valueOf(i)).divide(pointsCountBi)); + } + } + + private static Buf getMinBufForParallelization(LLRange range) { + Buf min = range.getMin(); + if (min != null) { + return min; + } else { + return Buf.wrap(); + } + } + + private static Buf getMaxBufForParallelization(LLRange range) { + Buf max = range.getMax(); + if (max != null) { + return max; + } else { + max = range.getMin().copy(); + max.add(Byte.MAX_VALUE); + return max.freeze(); + } + } + + public static Stream parallelizeRange(LLRange range) { + return parallelizeRange(range, Math.max(Runtime.getRuntime().availableProcessors() - 1, 1)); + } + + public static Stream parallelizeRange(LLRange range, int threads) { + if (threads < 1) { + throw new IllegalArgumentException(); + } + if (range.isAll()) { + return IntStream + .range(-1, LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS.length) + .mapToObj(idx -> LLRange.of( + Buf.wrap(idx == -1 ? new byte[0] : LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS[idx]), + idx + 1 >= LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS.length ? null : Buf.wrap(LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS[idx + 1]))); + } else if (range.isSingle()) { + return Stream.of(range); + } else { + Buf minBuf = getMinBufForParallelization(range); + Buf maxBuf = getMaxBufForParallelization(range); + byte[] minUnboundedArray = minBuf.asUnboundedArray(); + byte[] maxUnboundedArray = maxBuf.asUnboundedArray(); + var minBi = minBuf.isEmpty() ? BigInteger.ZERO : new BigInteger(minUnboundedArray, 0, minBuf.size()); + var maxBi = new BigInteger(maxUnboundedArray, 0, maxBuf.size()); + BigInteger rangeBi = maxBi.subtract(minBi); + int pointsCount = rangeBi.min(BigInteger.valueOf(threads).add(BigInteger.ONE)).intValueExact(); + int segmentsCount = pointsCount - 1; + if (threads > 2 && segmentsCount <= 2) { + return Stream.of(range); + } + BigInteger pointsCountBi = BigInteger.valueOf(pointsCount); + + byte[][] points = new byte[pointsCount][]; + for (int i = 0; i < pointsCount; i++) { + BigInteger rangePoint = getRangePointAt(range, minBi, maxBi, rangeBi, pointsCount, pointsCountBi, i); + points[i] = rangePoint != null ? rangePoint.toByteArray() : null; + } + + LLRange[] ranges = new LLRange[segmentsCount]; + for (int i = 0; i < segmentsCount; i++) { + byte[] min = points[i]; + byte[] max = points[i + 1]; + if (min == null) { + ranges[i] = LLRange.to(Buf.wrap(max)); + } else if (max == null) { + ranges[i] = LLRange.from(Buf.wrap(min)); + } else { + ranges[i] = LLRange.of(Buf.wrap(min), Buf.wrap(max)); + } + } + return Stream.of(ranges); + } + } + private long exactSizeAll(@Nullable LLSnapshot snapshot) { if (LLUtils.isInNonBlockingThread()) { throw new UnsupportedOperationException("Called exactSizeAll in a nonblocking thread"); @@ -1022,19 +1177,11 @@ public class LLLocalDictionary implements LLDictionary { readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); if (PARALLEL_EXACT_SIZE) { - return collectOn(ROCKSDB_POOL, IntStream - .range(-1, LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS.length) - .mapToObj(idx -> Pair.of(idx == -1 ? new byte[0] : LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS[idx], - idx + 1 >= LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS.length ? null - : LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS[idx + 1] - )).map(range -> { + return collectOn(ROCKSDB_POOL, parallelizeRange(LLRange.all()).map(range -> { long partialCount = 0; try (var rangeReadOpts = readOpts.copy()) { try { - try (var rocksIterator = db.newIterator(rangeReadOpts, - wrapNullable(range.getKey()), - wrapNullable(range.getValue()) - )) { + try (var rocksIterator = db.newRocksIterator(rangeReadOpts, range, false)) { rocksIterator.seekToFirst(); while (rocksIterator.isValid()) { partialCount++; diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 64ec808..87321df 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -34,6 +34,7 @@ import it.cavallium.dbengine.rpc.current.data.NoFilter; import java.io.File; import java.io.IOException; import it.cavallium.dbengine.utils.DBException; +import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -45,6 +46,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; @@ -425,7 +427,23 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa while (true) { try { // a factory method that returns a RocksDB instance - if (databaseOptions.optimistic()) { + if (databaseOptions.openAsSecondary()) { + var secondaryPath = dbPath + .resolve("secondary-log") + .resolve(databaseOptions.secondaryDirectoryName().orElse("unnamed-" + UUID.randomUUID())); + try { + Files.createDirectories(secondaryPath); + } catch (IOException e) { + throw new RocksDBException("Failed to create secondary exception: " + e); + } + this.db = RocksDB.openAsSecondary(rocksdbOptions, + dbPathString, + secondaryPath + .toString(), + descriptors, + handles + ); + } else if (databaseOptions.optimistic()) { this.db = OptimisticTransactionDB.open(rocksdbOptions, dbPathString, descriptors, handles); } else { var transactionOptions = new TransactionDBOptions() @@ -475,7 +493,10 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa handles.forEach(refs::track); // compactDb(db, handles); - flushDb(db, handles); + if (!databaseOptions.openAsSecondary()) { + logger.info("Flushing database at {}", dbPathString); + flushDb(db, handles); + } } catch (RocksDBException ex) { throw new DBException(ex); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LiveFileMetadata.java b/src/main/java/it/cavallium/dbengine/database/disk/LiveFileMetadata.java new file mode 100644 index 0000000..ae1e6bd --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/disk/LiveFileMetadata.java @@ -0,0 +1,8 @@ +package it.cavallium.dbengine.database.disk; + +import it.cavallium.dbengine.database.LLRange; + +public record LiveFileMetadata(String path, String fileName, int level, String columnName, long numEntries, long size, + LLRange keysRange) { + +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java index 87e090f..4be3751 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java @@ -10,6 +10,7 @@ import it.cavallium.dbengine.database.disk.rocksdb.RocksIteratorObj; import it.cavallium.dbengine.database.serialization.SerializationFunction; import java.io.IOException; import java.util.List; +import java.util.stream.Stream; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.rocksdb.ColumnFamilyHandle; @@ -53,6 +54,8 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn { @NotNull RocksIteratorObj newIterator(@NotNull LLReadOptions readOptions, @Nullable Buf min, @Nullable Buf max); + Stream getAllLiveFiles() throws RocksDBException; + @NotNull UpdateAtomicResult updateAtomic(@NotNull LLReadOptions readOptions, @NotNull LLWriteOptions writeOptions, Buf key, diff --git a/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLReadOptions.java b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLReadOptions.java index 95bb60a..a5b89b8 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLReadOptions.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLReadOptions.java @@ -2,8 +2,11 @@ package it.cavallium.dbengine.database.disk.rocksdb; import it.cavallium.dbengine.database.disk.IteratorMetrics; import it.cavallium.dbengine.utils.SimpleResource; +import java.util.Arrays; import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.LiveFileMetaData; import org.rocksdb.ReadOptions; +import org.rocksdb.ReadTier; import org.rocksdb.RocksDB; import org.rocksdb.Snapshot; diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java index d8d8670..30a31df 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java @@ -5,7 +5,7 @@ import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.mapping; import it.cavallium.buffer.Buf; -import it.cavallium.dbengine.client.BadBlock; +import it.cavallium.dbengine.client.VerificationProgress; import it.cavallium.dbengine.database.LLDelta; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; @@ -19,14 +19,11 @@ import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.serialization.KVSerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; -import it.cavallium.dbengine.utils.DBException; -import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Set; -import java.util.SortedMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; @@ -361,7 +358,7 @@ public class LLMemoryDictionary implements LLDictionary { } @Override - public Stream badBlocks(LLRange range) { + public Stream badBlocks(LLRange range) { return Stream.empty(); } diff --git a/src/repair/java/it/cavallium/dbengine/repair/Repair.java b/src/repair/java/it/cavallium/dbengine/repair/Repair.java new file mode 100644 index 0000000..fa91015 --- /dev/null +++ b/src/repair/java/it/cavallium/dbengine/repair/Repair.java @@ -0,0 +1,248 @@ +package it.cavallium.dbengine.repair; + +import io.micrometer.core.instrument.Clock; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.logging.LoggingMeterRegistry; +import it.cavallium.datagen.nativedata.NullableString; +import it.cavallium.datagen.nativedata.Nullableboolean; +import it.cavallium.datagen.nativedata.Nullableint; +import it.cavallium.datagen.nativedata.Nullablelong; +import it.cavallium.dbengine.client.VerificationProgress.BlockBad; +import it.cavallium.dbengine.client.VerificationProgress.FileOk; +import it.cavallium.dbengine.client.VerificationProgress.Progress; +import it.cavallium.dbengine.database.LLRange; +import it.cavallium.dbengine.database.UpdateMode; +import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection; +import it.cavallium.dbengine.database.disk.LLLocalDictionary; +import it.cavallium.dbengine.database.disk.LLLocalKeyValueDatabase; +import it.cavallium.dbengine.rpc.current.data.Column; +import it.cavallium.dbengine.rpc.current.data.DatabaseOptionsBuilder; +import it.cavallium.dbengine.rpc.current.data.DatabaseVolume; +import it.cavallium.dbengine.rpc.current.data.DefaultColumnOptions; +import it.cavallium.dbengine.rpc.current.data.NamedColumnOptions; +import it.cavallium.dbengine.rpc.current.data.nullables.NullableCompression; +import it.cavallium.dbengine.rpc.current.data.nullables.NullableFilter; +import it.cavallium.dbengine.utils.StreamUtils; +import it.unimi.dsi.fastutil.objects.ObjectArrayList; +import it.unimi.dsi.fastutil.objects.ObjectList; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Base64; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import org.rocksdb.Options; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; + +public class Repair { + + public static final MeterRegistry METER = LoggingMeterRegistry + .builder(key -> null) + .clock(Clock.SYSTEM) + .loggingSink(System.err::println) + .build(); + + public static void main(String[] argsArray) throws RocksDBException { + ObjectList initialArgs = ObjectArrayList.wrap(argsArray), args = initialArgs; + if (args.isEmpty() || args.contains("--help")) { + printHelp(initialArgs); + } + String command = args.get(0); + args = args.subList(1, args.size()); + switch (command.toLowerCase(Locale.ROOT)) { + case "list-column-families" -> { + if (args.size() < 3) { + printHelp(initialArgs); + } + Path path = Path.of(args.get(0)); + String dbName = args.get(1); + getColumnFamilyNames(path, dbName).forEach(s -> System.err.printf("\tColumn family: %s%n", s)); + } + case "scan-all" -> { + if (args.size() < 2) { + printHelp(initialArgs); + } + Path path = Path.of(args.get(0)).toAbsolutePath(); + String dbName = args.get(1); + List columnNames = args.size() < 3 ? getColumnFamilyNames(path, dbName).toList() : args.subList(2, args.size()); + System.err.printf("Scanning database \"%s\" at \"%s\", column families to scan:\n\t%s%n", dbName, path, String.join("\n\t", columnNames)); + var conn = new LLLocalDatabaseConnection(METER, path, false); + conn.connect(); + LLLocalKeyValueDatabase db = getDatabase(conn, dbName, columnNames); + db.getAllColumnFamilyHandles().forEach((column, cfh) -> { + System.err.printf("Scanning column: %s%n", column.name()); + LLLocalDictionary dict = db.getDictionary(column.name().getBytes(StandardCharsets.UTF_8), UpdateMode.DISALLOW); + StreamUtils.collectOn(StreamUtils.ROCKSDB_POOL, dict.badBlocks(LLRange.all()), StreamUtils.executing(block -> { + synchronized (Repair.class) { + switch (block) { + case null -> {} + case BlockBad blockBad -> { + System.err.println("[ ! ] Bad block found: " + block.databaseName() + (block.column() != null ? "->" + block.column().name() : "") + "->" + blockBad.rawKey() + "->" + block.file()); + if (blockBad.ex() != null) { + if (blockBad.ex() instanceof RocksDBException ex) { + System.err.println("\t" + ex); + } else { + blockBad.ex().printStackTrace(System.err); + } + } + } + case FileOk fileOk -> { + System.err.println("File is ok: " + block.databaseName() + (block.column() != null ? "->" + block.column().name() : "") + "->" + block.file()); + } + case Progress progress -> { + System.err.printf("Progress: [%d/%d] file: [%d/%d] %s->%s->%s%n", + progress.scanned(), + progress.total(), + progress.fileScanned(), + progress.fileTotal(), + block.databaseName(), + block.column() != null ? "->" + block.column().name() : "", + block.file() + ); + } + default -> throw new IllegalStateException("Unexpected value: " + block); + } + } + })); + }); + } + case "dump-all" -> { + if (args.size() < 2) { + printHelp(initialArgs); + } + Path path = Path.of(args.get(0)).toAbsolutePath(); + String dbName = args.get(1); + List columnNames = args.size() < 3 ? getColumnFamilyNames(path, dbName).toList() : args.subList(2, args.size()); + System.err.printf("Dumping database \"%s\" at \"%s\", column families to dump:\n\t%s%n", dbName, path, String.join("\n\t", columnNames)); + var conn = new LLLocalDatabaseConnection(METER, path, false); + conn.connect(); + LLLocalKeyValueDatabase db = getDatabase(conn, dbName, columnNames); + System.out.println("{\"columns\": {"); + AtomicBoolean first = new AtomicBoolean(true); + db.getAllColumnFamilyHandles().forEach((column, cfh) -> { + if (!first.compareAndSet(true, false)) { + System.out.println(","); + } + System.out.printf("\"%s\": [", column.name()); + System.err.printf("Dumping column: %s%n", column.name()); + AtomicBoolean firstElem = new AtomicBoolean(true); + var dict = db.getDictionary(column.name().getBytes(StandardCharsets.UTF_8), UpdateMode.DISALLOW); + dict.getRange(null, LLRange.all(), false, false).forEach(elem -> { + if (!firstElem.compareAndSet(true, false)) { + System.out.println(","); + } else { + System.out.println(); + } + System.out.printf("{\"key\": \"%s\", \"value\": \"%s\"}", + Base64.getEncoder().encodeToString(elem.getKey().toByteArray()), + Base64.getEncoder().encodeToString(elem.getValue().toByteArray()) + ); + System.err.printf("\t\tkey: %s\tvalue: %s%n", elem.getKey().toString(), elem.getValue().toString(StandardCharsets.UTF_8)); + }); + System.out.printf("%n]"); + }); + System.out.println(); + System.out.println("}"); + System.out.println("}"); + } + case "verify-checksum" -> { + if (args.size() != 2) { + printHelp(initialArgs); + } + Path path = Path.of(args.get(0)).toAbsolutePath(); + String dbName = args.get(1); + List columnNames = getColumnFamilyNames(path, dbName).toList(); + System.err.printf("Verifying checksum of database \"%s\" at \"%s\", column families:\n\t%s%n", dbName, path, String.join("\n\t", columnNames)); + var conn = new LLLocalDatabaseConnection(METER, path, false); + conn.connect(); + LLLocalKeyValueDatabase db = getDatabase(conn, dbName, columnNames); + db.verifyChecksum(); + System.err.println("Done"); + } + default -> printHelp(initialArgs); + } + System.exit(0); + } + + private static Stream getColumnFamilyNames(Path path, String dbName) throws RocksDBException { + String dbPath = path.resolve("database_" + dbName).toString(); + System.err.printf("Listing column families of database: %s%n", dbPath); + try (var options = new Options()) { + options.setCreateIfMissing(false).setCreateMissingColumnFamilies(false); + return RocksDB.listColumnFamilies(options, dbPath).stream().map(s -> new String(s, StandardCharsets.UTF_8)); + } + } + + private static LLLocalKeyValueDatabase getDatabase(LLLocalDatabaseConnection conn, + String dbName, + List columnNames) { + return conn.getDatabase(dbName, columnNames.stream() + .map(Column::of) + .toList(), DatabaseOptionsBuilder.builder() + .openAsSecondary(true) + .absoluteConsistency(true) + .allowMemoryMapping(true) + .blockCache(Nullablelong.empty()) + .lowMemory(false) + .maxOpenFiles(Nullableint.of(-1)) + .optimistic(false) + .spinning(false) + .useDirectIO(false) + .extraFlags(Map.of()) + .logPath(NullableString.empty()) + .walPath(NullableString.empty()) + .secondaryDirectoryName(NullableString.of("scan-all")) + .persistentCaches(List.of()) + .volumes(Stream.of( + DatabaseVolume.of(Path.of(conn.getDatabasePath(dbName).toString()), -1), + DatabaseVolume.of(Path.of(conn.getDatabasePath(dbName).toString() + "_hot"), -1), + DatabaseVolume.of(Path.of(conn.getDatabasePath(dbName).toString() + "_cold"), -1), + DatabaseVolume.of(Path.of(conn.getDatabasePath(dbName).toString() + "_colder"), -1) + ).filter(x -> Files.exists(x.volumePath())).toList()) + .defaultColumnOptions(DefaultColumnOptions.of( + List.of(), + Nullablelong.empty(), + Nullableboolean.empty(), + Nullableboolean.empty(), + NullableFilter.empty(), + Nullableint.empty(), + NullableString.empty(), + Nullablelong.empty(), + false, + Nullablelong.empty(), + Nullablelong.empty(), + NullableCompression.empty() + )) + .columnOptions(columnNames.stream() + .map(columnName -> NamedColumnOptions.of(columnName, + List.of(), + Nullablelong.empty(), + Nullableboolean.empty(), + Nullableboolean.empty(), + NullableFilter.empty(), + Nullableint.empty(), + NullableString.empty(), + Nullablelong.empty(), + false, + Nullablelong.empty(), + Nullablelong.empty(), + NullableCompression.empty())) + .toList()) + .writeBufferManager(Nullablelong.empty()) + .build()); + } + + private static void printHelp(List args) { + System.err.println(""" + Usage: repair scan-all DIRECTORY DB_NAME COLUMN_NAME... + or: repair dump-all DIRECTORY DB_NAME COLUMN_NAME... + or: repair verify-checksum DIRECTORY DB_NAME + or: repair list-column-families DIRECTORY DB_NAME + """); + System.exit(1); + } +} diff --git a/src/repair/resources/log4j2.xml b/src/repair/resources/log4j2.xml new file mode 100644 index 0000000..b33854b --- /dev/null +++ b/src/repair/resources/log4j2.xml @@ -0,0 +1,23 @@ + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/test/java/it/cavallium/dbengine/tests/LLRangeTest.java b/src/test/java/it/cavallium/dbengine/tests/LLRangeTest.java new file mode 100644 index 0000000..94e9f35 --- /dev/null +++ b/src/test/java/it/cavallium/dbengine/tests/LLRangeTest.java @@ -0,0 +1,120 @@ +package it.cavallium.dbengine.tests; + +import static org.junit.jupiter.api.Assertions.*; + +import it.cavallium.buffer.Buf; +import it.cavallium.dbengine.database.LLRange; +import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.disk.LLLocalDictionary; +import java.math.BigInteger; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.stream.Stream; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsSource; +import org.junit.jupiter.params.provider.MethodSource; + +class LLRangeTest { + + private void printRanges(LLRange range, List ranges) { + for (int i = 1, size = ranges.size(); i < size; i++) { + LLRange prev = ranges.get(i - 1); + LLRange cur = ranges.get(i); + assertEquals(0, Objects.compare(prev.getMax(), cur.getMin(), Comparator.nullsFirst(Comparator.naturalOrder()))); + } + System.out.println(toStringRange(range) + " ---> " + String.join(", ", ranges.stream().map(this::toStringRange).toList())); + } + + private String toStringRange(LLRange r) { + if (r.isSingle()) { + return LLUtils.HEX_FORMAT.formatHex(r.getSingle().toByteArray()); + } else if (r.hasMin() && r.hasMax()) { + return LLUtils.HEX_FORMAT.formatHex(r.getMin().toByteArray()) + + "-" + + LLUtils.HEX_FORMAT.formatHex(r.getMax().toByteArray()); + } else if (r.hasMin()) { + return LLUtils.HEX_FORMAT.formatHex(r.getMin().toByteArray()) + "-MAX"; + } else if (r.hasMax()) { + return "MIN-" + LLUtils.HEX_FORMAT.formatHex(r.getMax().toByteArray()); + } else { + return "MIN-MAX"; + } + } + + @Test + void parallelizeRange() { + var minBi = BigInteger.valueOf(50); + var maxBi = BigInteger.valueOf(100); + var range = LLRange.of(Buf.wrap(minBi.toByteArray()), Buf.wrap(maxBi.toByteArray())); + var ranges = LLLocalDictionary.parallelizeRange(range, 8).toList(); + printRanges(range, ranges); + } + + @Test + void parallelizeRangeNegative() { + var minBi = BigInteger.valueOf(Byte.MIN_VALUE); + var maxBi = BigInteger.valueOf(-50); + var range = LLRange.of(Buf.wrap(minBi.toByteArray()), Buf.wrap(maxBi.toByteArray())); + var ranges = LLLocalDictionary.parallelizeRange(range, 8).toList(); + printRanges(range, ranges); + } + + @Test + void parallelizeRangeNegativeToPositive() { + var minBi = BigInteger.valueOf(0); + var maxBi = BigInteger.valueOf(Byte.MIN_VALUE); + var range = LLRange.of(Buf.wrap(minBi.toByteArray()), Buf.wrap(maxBi.toByteArray())); + var ranges = LLLocalDictionary.parallelizeRange(range, 8).toList(); + printRanges(range, ranges); + } + + @Test + void parallelizeRangeMin() { + var minBi = BigInteger.valueOf(50); + var range = LLRange.of(Buf.wrap(minBi.toByteArray()), null); + var ranges = LLLocalDictionary.parallelizeRange(range, 8).toList(); + printRanges(range, ranges); + } + + @Test + void parallelizeRangeMax() { + var maxBi = BigInteger.valueOf(50); + var range = LLRange.of(null, Buf.wrap(maxBi.toByteArray())); + var ranges = LLLocalDictionary.parallelizeRange(range, 8).toList(); + printRanges(range, ranges); + } + + @ParameterizedTest + @MethodSource("provideRanges") + public void intersectTest(IntersectArgs args) { + Assertions.assertEquals(args.expected, LLRange.intersect(args.rangeA, args.rangeB)); + } + + public record IntersectArgs(LLRange expected, LLRange rangeA, LLRange rangeB) {} + + public static Stream provideRanges() { + return Stream.of( + new IntersectArgs(LLRange.all(), LLRange.all(), LLRange.all()), + new IntersectArgs(LLRange.single(Buf.wrap((byte) 1)), LLRange.single(Buf.wrap((byte) 1)), LLRange.all()), + new IntersectArgs(LLRange.single(Buf.wrap((byte) 1)), LLRange.all(), LLRange.single(Buf.wrap((byte) 1))), + new IntersectArgs(null, LLRange.single(Buf.wrap((byte) 1)), LLRange.single(Buf.wrap((byte) 2))), + new IntersectArgs(null, LLRange.of(Buf.wrap((byte) 1), Buf.wrap((byte) 2)), LLRange.of(Buf.wrap((byte) 2), Buf.wrap((byte) 3))), + new IntersectArgs(LLRange.single(Buf.wrap((byte) 1)), LLRange.single(Buf.wrap((byte) 1)), LLRange.of(Buf.wrap((byte) 1), Buf.wrap((byte) 2))), + new IntersectArgs(LLRange.single(Buf.wrap((byte) 1)), LLRange.of(Buf.wrap((byte) 1), Buf.wrap((byte) 2)), LLRange.single(Buf.wrap((byte) 1))), + new IntersectArgs(null, LLRange.single(Buf.wrap((byte) 2)), LLRange.of(Buf.wrap((byte) 1), Buf.wrap((byte) 2))), + new IntersectArgs(null, LLRange.of(Buf.wrap((byte) 1), Buf.wrap((byte) 2)), LLRange.single(Buf.wrap((byte) 2))), + new IntersectArgs(null, LLRange.of(Buf.wrap((byte) 1), Buf.wrap((byte) 2)), LLRange.of(Buf.wrap((byte) 3), Buf.wrap((byte) 4))), + new IntersectArgs(null, LLRange.of(Buf.wrap((byte) 1), Buf.wrap((byte) 2)), LLRange.of(Buf.wrap((byte) 2), Buf.wrap((byte) 3))), + new IntersectArgs(LLRange.of(Buf.wrap((byte) 1), Buf.wrap((byte) 2)), LLRange.of(Buf.wrap((byte) 1), Buf.wrap((byte) 2)), LLRange.of(Buf.wrap((byte) 1), Buf.wrap((byte) 3))), + new IntersectArgs(LLRange.of(Buf.wrap((byte) 4), Buf.wrap((byte) 7)), LLRange.of(Buf.wrap((byte) 1), Buf.wrap((byte) 7)), LLRange.of(Buf.wrap((byte) 4), Buf.wrap((byte) 9))), + new IntersectArgs(LLRange.of(Buf.wrap((byte) 1), Buf.wrap((byte) 2)), LLRange.of(Buf.wrap((byte) 1), Buf.wrap((byte) 2)), LLRange.of(Buf.wrap((byte) 1), Buf.wrap((byte) 2))), + new IntersectArgs(null, LLRange.of(Buf.wrap((byte) 1), Buf.wrap((byte) 2)), LLRange.of(Buf.wrap((byte) 3), Buf.wrap((byte) 4))) + ); + } +} \ No newline at end of file diff --git a/src/test/java/module-info.java b/src/test/java/module-info.java index 3f8ad69..eb79b9f 100644 --- a/src/test/java/module-info.java +++ b/src/test/java/module-info.java @@ -14,4 +14,5 @@ module dbengine.tests { requires org.apache.commons.lang3; requires rocksdbjni; opens it.cavallium.dbengine.tests; + opens it.cavallium.dbengine.database.disk.test; } \ No newline at end of file