From 0ab75623ba8e6e9ac369b556ce8b9c6d9bca97a6 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Tue, 10 Oct 2023 00:39:41 +0200 Subject: [PATCH] Update rocksdb, lucene, Add SST Reader/Writer, update blocks verification, live files, refactor iterator --- pom.xml | 4 +- .../dbengine/client/CompositeDatabase.java | 2 +- .../cavallium/dbengine/client/DbProgress.java | 45 +++ .../dbengine/client/LongProgressTracker.java | 42 ++ .../dbengine/client/SSTDumpProgress.java | 17 + .../dbengine/client/SSTProgress.java | 21 + .../client/SSTVerificationProgress.java | 17 + .../dbengine/client/VerificationProgress.java | 31 -- .../dbengine/database/LLDictionary.java | 5 +- .../DatabaseMapDictionaryDeep.java | 5 +- .../DatabaseMapDictionaryHashed.java | 5 +- .../collections/DatabaseMapSingle.java | 5 +- .../collections/DatabaseSingleBucket.java | 5 +- .../collections/DatabaseSingleMapped.java | 5 +- .../collections/DatabaseSingleton.java | 5 +- .../database/collections/DatabaseStage.java | 5 +- .../database/disk/AbstractRocksDBColumn.java | 13 +- .../disk/LLLocalDatabaseConnection.java | 4 + .../database/disk/LLLocalDictionary.java | 72 ++-- .../disk/LLLocalKeyValueDatabase.java | 5 +- .../database/disk/RocksDBColumnFile.java | 52 +++ .../dbengine/database/disk/RocksDBFile.java | 359 +++++++++++------- .../database/disk/RocksDBFileMetadata.java | 3 +- .../dbengine/database/disk/RocksDBUtils.java | 2 +- .../dbengine/database/disk/SSTRange.java | 117 ++++++ .../database/disk/rocksdb/LLReadOptions.java | 7 +- .../disk/rocksdb/LLSstFileReader.java | 31 ++ .../disk/rocksdb/LLSstFileWriter.java | 40 ++ .../disk/rocksdb/RocksIteratorObj.java | 148 ++++++-- .../database/memory/LLMemoryDictionary.java | 5 +- .../cavallium/dbengine/utils/StreamUtils.java | 49 ++- .../it/cavallium/dbengine/repair/Repair.java | 341 ++++++++++++----- 32 files changed, 1109 insertions(+), 358 deletions(-) create mode 100644 src/main/java/it/cavallium/dbengine/client/DbProgress.java create mode 100644 src/main/java/it/cavallium/dbengine/client/LongProgressTracker.java create mode 100644 src/main/java/it/cavallium/dbengine/client/SSTDumpProgress.java create mode 100644 src/main/java/it/cavallium/dbengine/client/SSTProgress.java create mode 100644 src/main/java/it/cavallium/dbengine/client/SSTVerificationProgress.java delete mode 100644 src/main/java/it/cavallium/dbengine/client/VerificationProgress.java create mode 100644 src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumnFile.java create mode 100644 src/main/java/it/cavallium/dbengine/database/disk/SSTRange.java create mode 100644 src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLSstFileReader.java create mode 100644 src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLSstFileWriter.java diff --git a/pom.xml b/pom.xml index e7f9b65..9a2a928 100644 --- a/pom.xml +++ b/pom.xml @@ -13,8 +13,8 @@ 0-SNAPSHOT false 1.10.4 - 9.7.0 - 8.5.3 + 9.8.0 + 8.5.4 5.9.0 1.0.13 diff --git a/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java b/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java index 30afe41..28740b2 100644 --- a/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java +++ b/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java @@ -26,7 +26,7 @@ public interface CompositeDatabase extends DatabaseProperties, DatabaseOperation /** * Find corrupted items */ - Stream verify(); + Stream> verify(); void verifyChecksum(); } diff --git a/src/main/java/it/cavallium/dbengine/client/DbProgress.java b/src/main/java/it/cavallium/dbengine/client/DbProgress.java new file mode 100644 index 0000000..6309251 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/client/DbProgress.java @@ -0,0 +1,45 @@ +package it.cavallium.dbengine.client; + +import it.cavallium.dbengine.client.SSTProgress.SSTProgressReport; +import it.cavallium.dbengine.client.SSTProgress.SSTStart; +import it.cavallium.dbengine.rpc.current.data.Column; +import java.nio.file.Path; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; +import org.jetbrains.annotations.Nullable; + +public interface DbProgress { + + String databaseName(); + + record DbSSTProgress(String databaseName, Column column, @Nullable Path file, long scanned, + long total, T sstProgress) implements DbProgress { + + public double getProgress() { + if (total == 0) { + return 0d; + } + return scanned / (double) total; + } + + public String fileString() { + return file != null ? file.normalize().toString() : null; + } + } + + static Stream> toDbProgress(String dbName, + String columnName, + LongProgressTracker totalTracker, + Stream stream) { + Column column = Column.of(columnName); + AtomicReference filePath = new AtomicReference<>(); + return stream.map(state -> { + switch (state) { + case SSTStart start -> filePath.set(start.metadata().filePath()); + case SSTProgressReport progress -> totalTracker.incrementAndGet(); + default -> {} + } + return new DbSSTProgress<>(dbName, column, filePath.get(), totalTracker.getCurrent(), totalTracker.getTotal(), state); + }); + } +} diff --git a/src/main/java/it/cavallium/dbengine/client/LongProgressTracker.java b/src/main/java/it/cavallium/dbengine/client/LongProgressTracker.java new file mode 100644 index 0000000..5549f27 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/client/LongProgressTracker.java @@ -0,0 +1,42 @@ +package it.cavallium.dbengine.client; + +import java.util.concurrent.atomic.AtomicLong; + +public class LongProgressTracker { + + private final AtomicLong current = new AtomicLong(); + private final AtomicLong total = new AtomicLong(); + + public LongProgressTracker(long size) { + setTotal(size); + } + + public LongProgressTracker() { + + } + + public LongProgressTracker setTotal(long estimate) { + total.set(estimate); + return this; + } + + public long getCurrent() { + return current.get(); + } + + public long incrementAndGet() { + return current.incrementAndGet(); + } + + public long getAndIncrement() { + return current.getAndIncrement(); + } + + public long getTotal() { + return Math.max(current.get(), total.get()); + } + + public double progress() { + return getCurrent() / (double) Math.max(1L, getTotal()); + } +} diff --git a/src/main/java/it/cavallium/dbengine/client/SSTDumpProgress.java b/src/main/java/it/cavallium/dbengine/client/SSTDumpProgress.java new file mode 100644 index 0000000..7e7ac9a --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/client/SSTDumpProgress.java @@ -0,0 +1,17 @@ +package it.cavallium.dbengine.client; + +import it.cavallium.buffer.Buf; +import it.cavallium.dbengine.client.SSTDumpProgress.SSTBlockFail; +import it.cavallium.dbengine.client.SSTDumpProgress.SSTBlockKeyValue; +import it.cavallium.dbengine.client.SSTProgress.SSTOk; +import it.cavallium.dbengine.client.SSTProgress.SSTProgressReport; +import it.cavallium.dbengine.client.SSTProgress.SSTStart; +import org.rocksdb.RocksDBException; + +public sealed interface SSTDumpProgress extends SSTProgress permits SSTBlockFail, SSTBlockKeyValue, SSTOk, + SSTProgressReport, SSTStart { + + record SSTBlockKeyValue(Buf rawKey, Buf rawValue) implements SSTDumpProgress {} + + record SSTBlockFail(RocksDBException ex) implements SSTDumpProgress {} +} diff --git a/src/main/java/it/cavallium/dbengine/client/SSTProgress.java b/src/main/java/it/cavallium/dbengine/client/SSTProgress.java new file mode 100644 index 0000000..7bbe43d --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/client/SSTProgress.java @@ -0,0 +1,21 @@ +package it.cavallium.dbengine.client; + +import it.cavallium.dbengine.database.disk.RocksDBFile.IterationMetadata; +import it.cavallium.dbengine.rpc.current.data.Column; +import org.jetbrains.annotations.Nullable; + +public interface SSTProgress { + + record SSTStart(IterationMetadata metadata) implements SSTProgress, SSTVerificationProgress, SSTDumpProgress {} + + record SSTOk(long scannedCount) implements SSTProgress, SSTVerificationProgress, SSTDumpProgress {} + + record SSTProgressReport(long fileScanned, long fileTotal) implements SSTProgress, SSTVerificationProgress, + SSTDumpProgress { + + public double getFileProgress() { + if (fileTotal == 0) return 0d; + return fileScanned / (double) fileTotal; + } + } +} diff --git a/src/main/java/it/cavallium/dbengine/client/SSTVerificationProgress.java b/src/main/java/it/cavallium/dbengine/client/SSTVerificationProgress.java new file mode 100644 index 0000000..f1404da --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/client/SSTVerificationProgress.java @@ -0,0 +1,17 @@ +package it.cavallium.dbengine.client; + +import it.cavallium.buffer.Buf; +import it.cavallium.dbengine.client.DbProgress.DbSSTProgress; +import it.cavallium.dbengine.client.SSTProgress.SSTOk; +import it.cavallium.dbengine.client.SSTProgress.SSTProgressReport; +import it.cavallium.dbengine.client.SSTProgress.SSTStart; +import it.cavallium.dbengine.client.SSTVerificationProgress.SSTBlockBad; +import it.cavallium.dbengine.rpc.current.data.Column; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; + +public sealed interface SSTVerificationProgress extends SSTProgress permits SSTOk, SSTProgressReport, SSTStart, + SSTBlockBad { + + record SSTBlockBad(Buf rawKey, Throwable ex) implements SSTVerificationProgress {} +} diff --git a/src/main/java/it/cavallium/dbengine/client/VerificationProgress.java b/src/main/java/it/cavallium/dbengine/client/VerificationProgress.java deleted file mode 100644 index 0ee0d4e..0000000 --- a/src/main/java/it/cavallium/dbengine/client/VerificationProgress.java +++ /dev/null @@ -1,31 +0,0 @@ -package it.cavallium.dbengine.client; - -import it.cavallium.buffer.Buf; -import it.cavallium.dbengine.rpc.current.data.Column; -import org.jetbrains.annotations.Nullable; - -public sealed interface VerificationProgress { - record BlockBad(String databaseName, Column column, Buf rawKey, String file, Throwable ex) - implements VerificationProgress {} - record FileStart(String databaseName, Column column, String file, @Nullable Long numEntriesEstimate) - implements VerificationProgress {} - record FileOk(String databaseName, Column column, String file, long scanned) - implements VerificationProgress {} - record Progress(String databaseName, Column column, String file, - long scanned, long total, - long fileScanned, long fileTotal) - implements VerificationProgress { - - public double getProgress() { - return scanned / (double) total; - } - - public double getFileProgress() { - return fileScanned / (double) fileTotal; - } - } - - @Nullable String databaseName(); - @Nullable Column column(); - @Nullable String file(); -} diff --git a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java index 71a4f76..8828ac9 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java @@ -1,7 +1,8 @@ package it.cavallium.dbengine.database; import it.cavallium.buffer.Buf; -import it.cavallium.dbengine.client.VerificationProgress; +import it.cavallium.dbengine.client.DbProgress; +import it.cavallium.dbengine.client.SSTVerificationProgress; import it.cavallium.dbengine.database.serialization.KVSerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationFunction; import java.util.List; @@ -64,7 +65,7 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure { int prefixLength, boolean smallRange); - Stream verifyChecksum(LLRange range); + Stream> verifyChecksum(LLRange range); void setRange(LLRange range, Stream entries, boolean smallRange); diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index 37b1df4..d7fc254 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -5,8 +5,9 @@ import static it.cavallium.dbengine.utils.StreamUtils.resourceStream; import it.cavallium.buffer.Buf; import it.cavallium.buffer.BufDataInput; import it.cavallium.buffer.BufDataOutput; -import it.cavallium.dbengine.client.VerificationProgress; +import it.cavallium.dbengine.client.DbProgress; import it.cavallium.dbengine.client.CompositeSnapshot; +import it.cavallium.dbengine.client.SSTVerificationProgress; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLRange; @@ -269,7 +270,7 @@ public class DatabaseMapDictionaryDeep> implem } @Override - public Stream verifyChecksum() { + public Stream> verifyChecksum() { return dictionary.verifyChecksum(range); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java index abdd110..71678ad 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java @@ -1,8 +1,9 @@ package it.cavallium.dbengine.database.collections; import it.cavallium.buffer.Buf; -import it.cavallium.dbengine.client.VerificationProgress; import it.cavallium.dbengine.client.CompositeSnapshot; +import it.cavallium.dbengine.client.DbProgress; +import it.cavallium.dbengine.client.SSTVerificationProgress; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.SubStageEntry; import it.cavallium.dbengine.database.UpdateMode; @@ -148,7 +149,7 @@ public class DatabaseMapDictionaryHashed implements DatabaseStageMap verifyChecksum() { + public Stream> verifyChecksum() { return this.subDictionary.verifyChecksum(); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java index c7cf60c..dc8f8c9 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java @@ -3,8 +3,9 @@ package it.cavallium.dbengine.database.collections; import it.cavallium.buffer.Buf; import it.cavallium.buffer.BufDataInput; import it.cavallium.buffer.BufDataOutput; -import it.cavallium.dbengine.client.VerificationProgress; import it.cavallium.dbengine.client.CompositeSnapshot; +import it.cavallium.dbengine.client.DbProgress; +import it.cavallium.dbengine.client.SSTVerificationProgress; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; @@ -121,7 +122,7 @@ public final class DatabaseMapSingle implements DatabaseStageEntry { } @Override - public Stream verifyChecksum() { + public Stream> verifyChecksum() { return dictionary.verifyChecksum(LLRange.single(key)); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java index 7d05136..374f76c 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java @@ -1,7 +1,8 @@ package it.cavallium.dbengine.database.collections; -import it.cavallium.dbengine.client.VerificationProgress; import it.cavallium.dbengine.client.CompositeSnapshot; +import it.cavallium.dbengine.client.DbProgress; +import it.cavallium.dbengine.client.SSTVerificationProgress; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateReturnMode; @@ -124,7 +125,7 @@ public class DatabaseSingleBucket implements DatabaseStageEntry { } @Override - public Stream verifyChecksum() { + public Stream> verifyChecksum() { return bucketStage.verifyChecksum(); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java index e2cb7e3..8e4fb39 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java @@ -1,8 +1,9 @@ package it.cavallium.dbengine.database.collections; -import it.cavallium.dbengine.client.VerificationProgress; import it.cavallium.dbengine.client.CompositeSnapshot; +import it.cavallium.dbengine.client.DbProgress; import it.cavallium.dbengine.client.Mapper; +import it.cavallium.dbengine.client.SSTVerificationProgress; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.disk.CachedSerializationFunction; @@ -107,7 +108,7 @@ public class DatabaseSingleMapped implements DatabaseStageEntry { } @Override - public Stream verifyChecksum() { + public Stream> verifyChecksum() { return this.serializedSingle.verifyChecksum(); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java index 001d27b..69dd3d4 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java @@ -3,8 +3,9 @@ package it.cavallium.dbengine.database.collections; import it.cavallium.buffer.Buf; import it.cavallium.buffer.BufDataInput; import it.cavallium.buffer.BufDataOutput; -import it.cavallium.dbengine.client.VerificationProgress; import it.cavallium.dbengine.client.CompositeSnapshot; +import it.cavallium.dbengine.client.DbProgress; +import it.cavallium.dbengine.client.SSTVerificationProgress; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLSingleton; import it.cavallium.dbengine.database.LLSnapshot; @@ -119,7 +120,7 @@ public class DatabaseSingleton implements DatabaseStageEntry { } @Override - public Stream verifyChecksum() { + public Stream> verifyChecksum() { return Stream.empty(); } } \ No newline at end of file diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java index c45337e..f1b13fa 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java @@ -1,7 +1,8 @@ package it.cavallium.dbengine.database.collections; -import it.cavallium.dbengine.client.VerificationProgress; +import it.cavallium.dbengine.client.DbProgress; import it.cavallium.dbengine.client.CompositeSnapshot; +import it.cavallium.dbengine.client.SSTVerificationProgress; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateReturnMode; @@ -64,5 +65,5 @@ public interface DatabaseStage extends DatabaseStageWithEntry { return leavesCount(snapshot, false) <= 0; } - Stream verifyChecksum(); + Stream> verifyChecksum(); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java index d9cd1a4..c667a80 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java @@ -7,12 +7,10 @@ import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Timer; import it.cavallium.buffer.Buf; -import it.cavallium.dbengine.database.ColumnUtils; import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.RepeatedElementList; import it.cavallium.dbengine.database.disk.rocksdb.LLReadOptions; -import it.cavallium.dbengine.database.disk.rocksdb.LLSlice; import it.cavallium.dbengine.database.disk.rocksdb.LLWriteOptions; import it.cavallium.dbengine.database.disk.rocksdb.RocksIteratorObj; import it.cavallium.dbengine.database.serialization.SerializationFunction; @@ -22,6 +20,7 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.StampedLock; import java.util.stream.Stream; @@ -35,10 +34,11 @@ import org.rocksdb.CompactRangeOptions; import org.rocksdb.FlushOptions; import org.rocksdb.Holder; import org.rocksdb.KeyMayExist; -import org.rocksdb.LiveFileMetaData; +import org.rocksdb.LevelMetaData; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.RocksObject; +import org.rocksdb.SstFileMetaData; import org.rocksdb.TableProperties; import org.rocksdb.Transaction; import org.rocksdb.TransactionOptions; @@ -564,11 +564,10 @@ public sealed abstract class AbstractRocksDBColumn implements var closeReadLock = closeLock.readLock(); try { ensureOpen(); - db.getLiveFiles(); // flushes the memtable byte[] cfhName = cfh.getName(); - return db.getLiveFilesMetaData().stream() - .filter(file -> Arrays.equals(cfhName, file.columnFamilyName())) - .map(file -> new RocksDBFile(db, cfh, file)); + return db.getColumnFamilyMetaData(cfh).levels().stream() + .flatMap(l -> l.files().stream() + .map(sstFileMetaData -> new RocksDBColumnFile(db, cfh, sstFileMetaData, cfhName, l.level()))); } finally { closeLock.unlockRead(closeReadLock); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java index 38625bd..7b842d7 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java @@ -68,6 +68,10 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection { } public Path getDatabasePath(String databaseName) { + return getDatabasePath(basePath, databaseName); + } + + public static Path getDatabasePath(Path basePath, String databaseName) { return basePath.resolve("database_" + databaseName); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 5924f78..25830a8 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -4,27 +4,25 @@ import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB; import static it.cavallium.dbengine.database.LLUtils.isBoundedRange; import static it.cavallium.dbengine.database.LLUtils.mapList; import static it.cavallium.dbengine.database.LLUtils.toStringSafe; -import static it.cavallium.dbengine.database.disk.LLLocalKeyValueDatabase.PRINT_ALL_CHECKSUM_VERIFICATION_STEPS; import static it.cavallium.dbengine.database.disk.UpdateAtomicResultMode.DELTA; import static it.cavallium.dbengine.utils.StreamUtils.ROCKSDB_POOL; import static it.cavallium.dbengine.utils.StreamUtils.collectOn; import static it.cavallium.dbengine.utils.StreamUtils.executing; import static it.cavallium.dbengine.utils.StreamUtils.fastSummingLong; import static it.cavallium.dbengine.utils.StreamUtils.resourceStream; -import static it.cavallium.dbengine.utils.StreamUtils.streamWhileNonNull; import static java.util.Objects.requireNonNull; import static it.cavallium.dbengine.utils.StreamUtils.batches; -import static java.util.Objects.requireNonNullElse; -import com.google.common.primitives.Longs; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Timer; import it.cavallium.buffer.Buf; -import it.cavallium.dbengine.client.VerificationProgress; -import it.cavallium.dbengine.client.VerificationProgress.BlockBad; -import it.cavallium.dbengine.client.VerificationProgress.FileOk; -import it.cavallium.dbengine.client.VerificationProgress.FileStart; -import it.cavallium.dbengine.client.VerificationProgress.Progress; +import it.cavallium.dbengine.client.DbProgress; +import it.cavallium.dbengine.client.DbProgress.DbSSTProgress; +import it.cavallium.dbengine.client.SSTProgress.SSTOk; +import it.cavallium.dbengine.client.SSTProgress.SSTProgressReport; +import it.cavallium.dbengine.client.SSTProgress.SSTStart; +import it.cavallium.dbengine.client.SSTVerificationProgress; +import it.cavallium.dbengine.client.SSTVerificationProgress.SSTBlockBad; import it.cavallium.dbengine.database.ColumnUtils; import it.cavallium.dbengine.database.LLDelta; import it.cavallium.dbengine.database.LLDictionary; @@ -38,32 +36,28 @@ import it.cavallium.dbengine.database.RocksDBLongProperty; import it.cavallium.dbengine.database.SerializedKey; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateReturnMode; +import it.cavallium.dbengine.database.disk.RocksDBFile.IterationMetadata; import it.cavallium.dbengine.database.disk.rocksdb.LLReadOptions; import it.cavallium.dbengine.database.disk.rocksdb.LLWriteOptions; import it.cavallium.dbengine.database.serialization.KVSerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.rpc.current.data.Column; import it.cavallium.dbengine.utils.DBException; -import it.cavallium.dbengine.utils.StreamUtils; import java.io.IOException; import java.math.BigInteger; -import java.nio.file.LinkOption; -import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Objects; -import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; -import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; @@ -645,7 +639,7 @@ public class LLLocalDictionary implements LLDictionary { } @Override - public Stream verifyChecksum(LLRange rangeFull) { + public Stream> verifyChecksum(LLRange rangeFull) { Set brokenFiles = new ConcurrentHashMap().keySet(true); LongAdder totalScanned = new LongAdder(); long totalEstimate; @@ -665,30 +659,38 @@ public class LLLocalDictionary implements LLDictionary { var liveFiles = db.getAllLiveFiles().toList(); var liveFilesCount = liveFiles.size(); - ConcurrentHashMap fileToInitialEstimate = new ConcurrentHashMap<>(); return liveFiles.stream() .sorted(Comparator.reverseOrder()) - .flatMap(fr -> fr.verify(databaseName, columnName, rangeFull)) - .map(status -> switch (status) { - case FileStart fileStart -> { - Long numEntriesEstimate = Objects.requireNonNullElse(fileStart.numEntriesEstimate(), totalEstimate / liveFilesCount); - totalScanned.add(numEntriesEstimate); - fileToInitialEstimate.put(fileStart.file(), numEntriesEstimate); - yield status; - } - case FileOk fileEnd -> { - long initialNumEntriesEstimate = Objects.requireNonNullElse(fileToInitialEstimate.remove(fileEnd.file()), 0L); - long numEntriesScanned = fileEnd.scanned(); - totalScanned.add(numEntriesScanned - initialNumEntriesEstimate); - yield status; - } - case Progress progress -> new Progress(progress.databaseName(), progress.column(), progress.file(), totalScanned.longValue(), totalEstimate, progress.fileScanned(), progress.fileTotal()); - default -> status; + .flatMap(fr -> { + AtomicReference metadataRef = new AtomicReference<>(); + AtomicLong initialEstimate = new AtomicLong(); + return fr + .verify(SSTRange.parse(rangeFull)) + .map(status -> switch (status) { + case SSTStart fileStart -> { + metadataRef.set(fileStart.metadata()); + long numEntriesEstimate = Objects.requireNonNullElse(fileStart.metadata().countEstimate(), totalEstimate / liveFilesCount); + totalScanned.add(numEntriesEstimate); + initialEstimate.set(numEntriesEstimate); + yield status; + } + case SSTOk fileEnd -> { + long initialNumEntriesEstimate = initialEstimate.get(); + long numEntriesScanned = fileEnd.scannedCount(); + totalScanned.add(numEntriesScanned - initialNumEntriesEstimate); + yield status; + } + case SSTProgressReport ignored -> status; + case SSTBlockBad ignored -> status; + }) + .>map(sstProgress -> new DbProgress.DbSSTProgress<>(databaseName, column, metadataRef.get().filePath(), totalScanned.longValue(), totalEstimate, sstProgress)); }) - .filter(err -> !(err instanceof BlockBad blockBad && blockBad.rawKey() == null && !brokenFiles.add(blockBad.file()))); + .filter(err -> !(err instanceof DbProgress.DbSSTProgress sstProgress + && sstProgress.sstProgress() instanceof SSTBlockBad blockBad + && blockBad.rawKey() == null && !brokenFiles.add(sstProgress.fileString()))); } catch (RocksDBException e) { - return Stream.of(new BlockBad(databaseName, column, null, null, e)); + return Stream.of(new DbProgress.DbSSTProgress<>(databaseName, column, null, 0, 0, new SSTBlockBad(null, e))); } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 518800c..09b9d2c 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -34,7 +34,6 @@ import it.cavallium.dbengine.rpc.current.data.NoFilter; import java.io.File; import java.io.IOException; import it.cavallium.dbengine.utils.DBException; -import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -115,8 +114,6 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa = Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.checks.paranoidfilechecks", "false")); private static final boolean FORCE_COLUMN_FAMILY_CONSISTENCY_CHECKS = Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.checks.forcecolumnfamilyconsistencychecks", "true")); - static final boolean PRINT_ALL_CHECKSUM_VERIFICATION_STEPS - = Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.checks.verification.print", "false")); private static final InfoLogLevel LOG_LEVEL = InfoLogLevel.getInfoLogLevel(Byte.parseByte(System.getProperty("it.cavallium.dbengine.log.levelcode", "" + InfoLogLevel.WARN_LEVEL.getValue()))); private static final CacheFactory CACHE_FACTORY = USE_CLOCK_CACHE ? new ClockCacheFactory() : new LRUCacheFactory(); @@ -631,7 +628,7 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa var liveFilesMetadata = db.getLiveFilesMetaData(); List files = new ArrayList<>(); for (LiveFileMetaData file : liveFilesMetadata) { - files.add(new RocksDBFile(db, getCfh(file.columnFamilyName()), file)); + files.add(new RocksDBColumnFile(db, getCfh(file.columnFamilyName()), file)); } return files.stream(); } finally { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumnFile.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumnFile.java new file mode 100644 index 0000000..95539f6 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumnFile.java @@ -0,0 +1,52 @@ +package it.cavallium.dbengine.database.disk; + +import static it.cavallium.dbengine.utils.StreamUtils.resourceStream; +import static java.util.Objects.requireNonNull; + +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.LiveFileMetaData; +import org.rocksdb.RocksDB; +import org.rocksdb.SstFileMetaData; + +public class RocksDBColumnFile extends RocksDBFile { + + private final ColumnFamilyHandle cfh; + + public RocksDBColumnFile(RocksDB db, ColumnFamilyHandle cfh, RocksDBFileMetadata metadata) { + super(metadata); + this.cfh = cfh; + } + + public RocksDBColumnFile(T db, ColumnFamilyHandle cfh, LiveFileMetaData file) { + this(db, + cfh, + new RocksDBFileMetadata(Path.of(file.path() + file.fileName()), + file.fileName(), + file.level(), + new String(file.columnFamilyName(), StandardCharsets.UTF_8), + file.numEntries(), + file.size(), + decodeRange(file.smallestKey(), file.largestKey()) + ) + ); + } + + public RocksDBColumnFile(T db, ColumnFamilyHandle cfh, + SstFileMetaData file, byte[] columnFamilyName, int level) { + this(db, + cfh, + new RocksDBFileMetadata(Path.of(file.path()+ file.fileName()), + file.fileName(), + level, + new String(columnFamilyName, StandardCharsets.UTF_8), + file.numEntries(), + file.size(), + decodeRange(file.smallestKey(), file.largestKey()) + ) + ); + } + + +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBFile.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBFile.java index 1596f49..8dd4376 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBFile.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBFile.java @@ -1,54 +1,56 @@ package it.cavallium.dbengine.database.disk; -import static it.cavallium.dbengine.database.LLUtils.isBoundedRange; -import static it.cavallium.dbengine.database.disk.LLLocalKeyValueDatabase.PRINT_ALL_CHECKSUM_VERIFICATION_STEPS; import static it.cavallium.dbengine.utils.StreamUtils.resourceStream; import static java.util.Objects.requireNonNull; import com.google.common.primitives.Longs; import it.cavallium.buffer.Buf; -import it.cavallium.dbengine.client.VerificationProgress; -import it.cavallium.dbengine.client.VerificationProgress.BlockBad; -import it.cavallium.dbengine.client.VerificationProgress.FileOk; -import it.cavallium.dbengine.client.VerificationProgress.FileStart; -import it.cavallium.dbengine.client.VerificationProgress.Progress; +import it.cavallium.dbengine.client.SSTDumpProgress; +import it.cavallium.dbengine.client.SSTDumpProgress.SSTBlockFail; +import it.cavallium.dbengine.client.SSTDumpProgress.SSTBlockKeyValue; +import it.cavallium.dbengine.client.SSTProgress.SSTOk; +import it.cavallium.dbengine.client.SSTProgress.SSTProgressReport; +import it.cavallium.dbengine.client.SSTProgress.SSTStart; +import it.cavallium.dbengine.client.SSTVerificationProgress; +import it.cavallium.dbengine.client.SSTVerificationProgress.SSTBlockBad; import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLUtils; -import it.cavallium.dbengine.database.disk.rocksdb.LLReadOptions; -import it.cavallium.dbengine.rpc.current.data.Column; +import it.cavallium.dbengine.database.disk.RocksDBFile.RocksDBFileIterationKeyState.RocksDBFileIterationStateKeyError; +import it.cavallium.dbengine.database.disk.RocksDBFile.RocksDBFileIterationKeyState.RocksDBFileIterationStateKeyOk; +import it.cavallium.dbengine.database.disk.RocksDBFile.RocksDBFileIterationState.RocksDBFileIterationStateBegin; +import it.cavallium.dbengine.database.disk.RocksDBFile.RocksDBFileIterationState.RocksDBFileIterationStateEnd; +import it.cavallium.dbengine.database.disk.RocksDBFile.RocksDBFileIterationState.RocksDBFileIterationStateKey; +import it.cavallium.dbengine.database.disk.SSTRange.SSTLLRange; +import it.cavallium.dbengine.database.disk.SSTRange.SSTRangeFull; +import it.cavallium.dbengine.database.disk.SSTRange.SSTRangeKey; +import it.cavallium.dbengine.database.disk.SSTRange.SSTRangeNone; +import it.cavallium.dbengine.database.disk.SSTRange.SSTRangeOffset; +import it.cavallium.dbengine.database.disk.SSTRange.SSTSingleKey; +import it.cavallium.dbengine.database.disk.rocksdb.LLSstFileReader; import it.cavallium.dbengine.utils.StreamUtils; -import java.nio.charset.StandardCharsets; import java.nio.file.Path; -import java.util.Objects; -import java.util.Optional; import java.util.StringJoiner; +import java.util.concurrent.CompletionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import org.rocksdb.ColumnFamilyHandle; -import org.rocksdb.LiveFileMetaData; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; -import org.rocksdb.SstFileMetaData; public class RocksDBFile implements Comparable { protected static final Logger logger = LogManager.getLogger(RocksDBFile.class); + protected final RocksDBFileMetadata metadata; + protected final Long sstNumber; - private final RocksDB db; - private final ColumnFamilyHandle cfh; - private final RocksDBFileMetadata metadata; - private final Long sstNumber; - - public RocksDBFile(RocksDB db, ColumnFamilyHandle cfh, RocksDBFileMetadata metadata) { - this.db = db; - this.cfh = cfh; + public RocksDBFile(RocksDBFileMetadata metadata) { this.metadata = metadata; - String fileName = metadata.fileName().replace("/", ""); + String fileName = metadata.fileName().startsWith("/") ? metadata.fileName().substring(1) : metadata.fileName(); int extensionIndex = fileName.indexOf("."); Long sstNumber = null; if (extensionIndex != -1) { @@ -60,137 +62,194 @@ public class RocksDBFile implements Comparable { } } - public RocksDBFile(T db, ColumnFamilyHandle cfh, LiveFileMetaData file) { - this(db, - cfh, - new RocksDBFileMetadata(file.path(), - file.fileName(), - file.level(), - new String(file.columnFamilyName(), StandardCharsets.UTF_8), - file.numEntries(), - file.size(), - decodeRange(file.smallestKey(), file.largestKey()) + public RocksDBFile(Path dbBaseDir, String file) { + this(new RocksDBFileMetadata(dbBaseDir.resolve(file.startsWith("/") ? file.substring(1) : file), + StringUtils.substringAfter(file, '/'), + 0, + "any", + 0, + 0, + LLRange.all() ) ); } - public RocksDBFile(T db, ColumnFamilyHandle cfh, SstFileMetaData file, byte[] columnFamilyName, int level) { - this(db, - cfh, - new RocksDBFileMetadata(file.path(), - file.fileName(), - level, - new String(columnFamilyName, StandardCharsets.UTF_8), - file.numEntries(), - file.size(), - decodeRange(file.smallestKey(), file.largestKey()) - ) - ); - } - - private static LLRange decodeRange(byte[] smallestKey, byte[] largestKey) { + protected static LLRange decodeRange(byte[] smallestKey, byte[] largestKey) { return LLRange.of(Buf.wrap(smallestKey), Buf.wrap(largestKey)); } + private static SSTRange intersectWithMetadata(LLRange metadataRange, SSTRange innerRange) { + return switch (innerRange) { + case SSTRangeFull ignored -> SSTRange.parse(metadataRange); + case SSTSingleKey singleKey -> SSTRange.parse(LLRange.intersect(metadataRange, singleKey.toLLRange())); + case SSTRangeKey rangeKey -> SSTRange.parse(LLRange.intersect(metadataRange, rangeKey.toLLRange())); + case SSTRangeNone none -> none; + case SSTRangeOffset offset -> offset; + }; + } + public RocksDBFileMetadata getMetadata() { return metadata; } - public record VerificationFileRange(String filePath, String filename, @Nullable LLRange range, long countEstimate, - @Nullable Long sstNumber) {} - - public Stream verify(String databaseDisplayName, String columnDisplayName, LLRange rangeFull) { - var intersectedRange = LLRange.intersect(metadata.keysRange(), rangeFull); - // Ignore the file if it's outside the requested range - if (intersectedRange == null) { - return Stream.of(); - } - - String filePath = Path.of(metadata.path()).resolve("./" + metadata.fileName()).normalize().toString(); - var fr = new VerificationFileRange(filePath, - metadata.fileName().replace("/", ""), - intersectedRange, - metadata.numEntries(), - sstNumber - ); - return verify(databaseDisplayName, columnDisplayName, rangeFull, fr); + public Stream verify(SSTRange range) { + AtomicLong fileScanned = new AtomicLong(); + AtomicLong fileTotal = new AtomicLong(); + return iterate(range).map(state -> switch (state) { + case RocksDBFileIterationStateBegin begin -> { + var countEstimate = begin.metadata().countEstimate(); + if (countEstimate != null) { + fileTotal.set(countEstimate); + } + yield new SSTStart(begin.metadata()); + } + case RocksDBFileIterationStateKey key -> { + var scanned = fileScanned.incrementAndGet(); + yield switch (key.state()) { + case RocksDBFileIterationStateKeyOk ignored -> + new SSTProgressReport(scanned, Math.max(scanned, fileTotal.get())); + case RocksDBFileIterationStateKeyError keyError -> new SSTBlockBad(key.key, keyError.exception); + }; + } + case RocksDBFileIterationStateEnd end -> new SSTOk(end.scannedCount()); + }); } - private Stream verify(String databaseDisplayName, String columnDisplayName, LLRange rangeFull, VerificationFileRange fr) { - var columnObj = Column.of(columnDisplayName); - Stream streamInit = Stream.of(new FileStart(databaseDisplayName, columnObj, fr.filePath, fr.countEstimate > 0 ? fr.countEstimate : null)); - Stream streamContent; - Objects.requireNonNull(fr.range); - - String filename = fr.filename; - String path = fr.filePath; - LLRange rangePartial = fr.range; + public Stream readAllSST(SSTRange range, boolean failOnError) { AtomicLong fileScanned = new AtomicLong(); - final long fileEstimate = fr.countEstimate; - AtomicBoolean mustSeek = new AtomicBoolean(true); - AtomicBoolean streamEnded = new AtomicBoolean(false); - streamContent = resourceStream( - () -> LLUtils.generateCustomReadOptions(null, false, isBoundedRange(rangePartial), false), - ro -> { - ro.setIterateLowerBound(rangePartial.getMin() != null ? requireNonNull(LLUtils.asArray(rangePartial.getMin())) : null); - ro.setIterateUpperBound(rangePartial.getMax() != null ? requireNonNull(LLUtils.asArray(rangePartial.getMax())) : null); - ro.setFillCache(false); - ro.setIgnoreRangeDeletions(true); - if (!rangePartial.isSingle()) { - ro.setReadaheadSize(256 * 1024 * 1024); + AtomicLong fileTotal = new AtomicLong(); + return iterate(range).mapMulti((state, consumer) -> { + switch (state) { + case RocksDBFileIterationStateBegin begin -> { + var countEstimate = begin.metadata().countEstimate(); + if (countEstimate != null) { + fileTotal.set(countEstimate); } - ro.setVerifyChecksums(true); - return resourceStream(() -> ro.newIterator(db, cfh, IteratorMetrics.NO_OP), rocksIterator -> StreamUtils - .>streamWhileNonNull(() -> { - boolean mustSeekVal = mustSeek.compareAndSet(true, false); - if (!mustSeekVal && !rocksIterator.isValid()) { - if (streamEnded.compareAndSet(false, true)) { - return Optional.of(new FileOk(databaseDisplayName, columnObj, path, fileScanned.get())); - } else { - //noinspection OptionalAssignedToNull - return null; - } - } - boolean shouldSendStatus; - Buf rawKey = null; - try { - if (mustSeekVal) { - if (PRINT_ALL_CHECKSUM_VERIFICATION_STEPS) { - logger.info("Seeking to {}->{}->first on file {}", databaseDisplayName, columnDisplayName, filename); - } - rocksIterator.seekToFirst(); - shouldSendStatus = true; - } else { - rawKey = rocksIterator.keyBuf().copy(); - shouldSendStatus = fileScanned.incrementAndGet() % 1_000_000 == 0; - if (PRINT_ALL_CHECKSUM_VERIFICATION_STEPS) { - logger.info("Checking {}->{}->{} on file {}", databaseDisplayName, columnDisplayName, rawKey.toString(), filename); - } - rocksIterator.next(); - } - } catch (RocksDBException ex) { - return Optional.of(new BlockBad(databaseDisplayName, columnObj, rawKey, path, ex)); - } - if (shouldSendStatus) { - long fileScannedVal = fileScanned.get(); - return Optional.of(new Progress(databaseDisplayName, - columnObj, - path, - -1, - -1, - fileScannedVal, - Math.max(fileEstimate, fileScannedVal) - )); - } else { - return Optional.empty(); - } - }).filter(Optional::isPresent).map(Optional::get).onClose(() -> { - rocksIterator.close(); - ro.close(); - })); + consumer.accept(new SSTStart(begin.metadata())); } + case RocksDBFileIterationStateKey key -> { + var scanned = fileScanned.incrementAndGet(); + switch (key.state()) { + case RocksDBFileIterationStateKeyOk ignored -> { + consumer.accept(new SSTBlockKeyValue(key.key(), ignored.value())); + consumer.accept(new SSTProgressReport(scanned, Math.max(scanned, fileTotal.get()))); + } + case RocksDBFileIterationStateKeyError keyError -> { + if (failOnError) { + throw new CompletionException(keyError.exception()); + } else { + logger.error("Corrupted SST \"{}\" after \"{}\" scanned keys", sstNumber, scanned); + // This is sent before bad block, so takewhile still returns ok before the end, if failOnError is false + consumer.accept(new SSTOk(scanned)); + } + consumer.accept(new SSTBlockFail(keyError.exception)); + } + } + } + case RocksDBFileIterationStateEnd end -> consumer.accept(new SSTOk(end.scannedCount())); + } + }).takeWhile(data -> !(data instanceof SSTBlockFail)); + } + + public Stream iterate(SSTRange rangeFull) { + var intersectedRange = RocksDBFile.intersectWithMetadata(metadata.keysRange(), rangeFull); + + Path filePath = metadata.filePath(); + String filePathString = filePath.toString(); + var meta = new IterationMetadata(filePath, + metadata.fileName().replace("/", ""), + intersectedRange, + metadata.numEntries() > 0 ? metadata.numEntries() : null, + sstNumber ); - return Stream.concat(streamInit, streamContent); + + Stream streamContent; + // Ignore the file if it's outside the requested range + if (intersectedRange instanceof SSTRangeNone) { + streamContent = Stream.of(new RocksDBFileIterationStateBegin(meta), new RocksDBFileIterationStateEnd(0L)); + } else { + AtomicLong fileScanned = new AtomicLong(); + AtomicBoolean mustSeek = new AtomicBoolean(true); + try { + streamContent = resourceStream(() -> new LLSstFileReader(false, filePathString), + r -> resourceStream(() -> LLUtils.generateCustomReadOptions(null, false, intersectedRange.isBounded(), false), + ro -> { + long skipToIndex; + long readToCount; + switch (intersectedRange) { + case SSTLLRange sstllRange -> { + var llRange = sstllRange.toLLRange(); + requireNonNull(llRange); + ro.setIterateLowerBound( + llRange.getMin() != null ? requireNonNull(LLUtils.asArray(llRange.getMin())) : null); + ro.setIterateUpperBound( + llRange.getMax() != null ? requireNonNull(LLUtils.asArray(llRange.getMax())) : null); + skipToIndex = 0; + readToCount = Long.MAX_VALUE; + } + case SSTRangeOffset offset -> { + skipToIndex = offset.offsetMin() == null ? 0 : offset.offsetMin(); + readToCount = offset.offsetMax() == null ? Long.MAX_VALUE : (offset.offsetMax() - skipToIndex); + } + default -> throw new IllegalStateException("Unexpected value: " + intersectedRange); + } + ro.setFillCache(true); + ro.setIgnoreRangeDeletions(true); + if (!(intersectedRange instanceof SSTSingleKey)) { + ro.setReadaheadSize(256 * 1024 * 1024); + } + ro.setVerifyChecksums(true); + return resourceStream(() -> ro.newIterator(r.get(), IteratorMetrics.NO_OP), + rocksIterator -> StreamUtils.streamUntil(() -> { + boolean mustSeekVal = mustSeek.compareAndSet(true, false); + if (!mustSeekVal && !rocksIterator.isValid()) { + return new RocksDBFileIterationStateEnd(fileScanned.get()); + } + Buf rawKey = null; + Buf rawValue = null; + RocksDBFileIterationKeyState keyResult; + var index = fileScanned.getAndIncrement(); + if (index >= readToCount) { + return new RocksDBFileIterationStateEnd(fileScanned.get()); + } else { + try { + if (mustSeekVal) { + rocksIterator.seekToFirstUnsafe(); + if (skipToIndex > 0) { + for (long i = 0; i < skipToIndex; i++) { + if (!rocksIterator.isValid()) { + break; + } + rocksIterator.nextUnsafe(); + } + } + return new RocksDBFileIterationStateBegin(meta); + } else { + rawKey = rocksIterator.keyBuf().copy(); + rawValue = rocksIterator.valueBuf().copy(); + rocksIterator.next(); + } + keyResult = new RocksDBFileIterationStateKeyOk(rawValue); + } catch (RocksDBException ex) { + keyResult = new RocksDBFileIterationStateKeyError(ex); + } + + return new RocksDBFileIterationStateKey(rawKey, keyResult, index); + } + }, x -> x instanceof RocksDBFileIterationStateEnd).onClose(() -> { + rocksIterator.close(); + ro.close(); + }) + ); + } + ) + ); + } catch (RocksDBException e) { + streamContent = Stream.of(new RocksDBFileIterationStateBegin(meta), + new RocksDBFileIterationStateKey(null, new RocksDBFileIterationStateKeyError(e), 0)); + } + } + return streamContent; } @Override @@ -211,4 +270,28 @@ public class RocksDBFile implements Comparable { } return Long.compare(this.sstNumber, o.sstNumber); } + + public Long getSstNumber() { + return sstNumber; + } + + public sealed interface RocksDBFileIterationState { + + record RocksDBFileIterationStateBegin(IterationMetadata metadata) implements RocksDBFileIterationState {} + + record RocksDBFileIterationStateKey(Buf key, RocksDBFileIterationKeyState state, long scannedCount) implements + RocksDBFileIterationState {} + + record RocksDBFileIterationStateEnd(long scannedCount) implements RocksDBFileIterationState {} + } + + public sealed interface RocksDBFileIterationKeyState { + + record RocksDBFileIterationStateKeyOk(Buf value) implements RocksDBFileIterationKeyState {} + + record RocksDBFileIterationStateKeyError(RocksDBException exception) implements RocksDBFileIterationKeyState {} + } + + public record IterationMetadata(Path filePath, String filename, @NotNull SSTRange range, + @Nullable Long countEstimate, @Nullable Long sstNumber) {} } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBFileMetadata.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBFileMetadata.java index 34e7b9c..a9b941d 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBFileMetadata.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBFileMetadata.java @@ -1,8 +1,9 @@ package it.cavallium.dbengine.database.disk; import it.cavallium.dbengine.database.LLRange; +import java.nio.file.Path; -public record RocksDBFileMetadata(String path, String fileName, int level, String columnName, long numEntries, long size, +public record RocksDBFileMetadata(Path filePath, String fileName, int level, String columnName, long numEntries, long size, LLRange keysRange) { } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBUtils.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBUtils.java index f47fb9f..40f0b70 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBUtils.java @@ -34,7 +34,7 @@ public class RocksDBUtils { if (!excludeLastLevel || level.level() < lastLevelId) { for (SstFileMetaData file : level.files()) { if (file.fileName().endsWith(".sst")) { - files.add(new RocksDBFile(db, cfh, file, meta.name(), level.level())); + files.add(new RocksDBColumnFile(db, cfh, file, meta.name(), level.level())); } } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/SSTRange.java b/src/main/java/it/cavallium/dbengine/database/disk/SSTRange.java new file mode 100644 index 0000000..dea5ff2 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/disk/SSTRange.java @@ -0,0 +1,117 @@ +package it.cavallium.dbengine.database.disk; + +import it.cavallium.buffer.Buf; +import it.cavallium.dbengine.database.LLRange; +import it.cavallium.dbengine.database.LLUtils; +import java.util.HexFormat; +import java.util.Objects; +import org.apache.commons.lang3.StringUtils; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +public sealed interface SSTRange { + + boolean isBounded(); + + sealed interface SSTLLRange extends SSTRange permits SSTRangeNone, SSTRangeFull, SSTRangeKey, SSTSingleKey { + + @Nullable LLRange toLLRange(); + + default boolean isBounded() { + LLRange r = toLLRange(); + return r == null || LLUtils.isBoundedRange(r); + } + } + + ; + + static SSTRange parse(String raw) { + var parts = StringUtils.split(raw, '-'); + return switch (parts[0]) { + case "none" -> new SSTRangeNone(); + case "full" -> new SSTRangeFull(); + case "offset" -> new SSTRangeOffset(parts[1].isBlank() ? null : Long.parseUnsignedLong(parts[1]), + parts[2].isBlank() ? null : Long.parseUnsignedLong(parts[2]) + ); + case "key" -> new SSTRangeKey(parts[1].isBlank() ? null : Buf.wrap(LLUtils.parseHex(parts[1])), + parts[2].isBlank() ? null : Buf.wrap(LLUtils.parseHex(parts[2])) + ); + case "single-key" -> new SSTSingleKey(Buf.wrap(LLUtils.parseHex(parts[1]))); + default -> throw new IllegalStateException("Unexpected value: " + parts[0]); + }; + } + + static SSTRange parse(LLRange range) { + if (range == null) { + return new SSTRangeNone(); + } + return range.isAll() ? new SSTRangeFull() + : range.isSingle() ? new SSTSingleKey(range.getSingle()) : new SSTRangeKey(range.getMin(), range.getMax()); + } + + record SSTRangeNone() implements SSTRange, SSTLLRange { + + @Override + public String toString() { + return "none"; + } + + public LLRange toLLRange() { + return null; + } + } + + record SSTRangeFull() implements SSTRange, SSTLLRange { + + @Override + public String toString() { + return "full"; + } + + public LLRange toLLRange() { + return LLRange.all(); + } + } + + record SSTRangeOffset(@Nullable Long offsetMin, @Nullable Long offsetMax) implements SSTRange { + + @Override + public String toString() { + return "offset-" + (offsetMin != null ? offsetMin : "") + "-" + (offsetMax != null ? offsetMax : ""); + } + + @Override + public boolean isBounded() { + return offsetMin != null && offsetMax != null; + } + } + + record SSTRangeKey(@Nullable Buf min, @Nullable Buf max) implements SSTRange, SSTLLRange { + + private static final HexFormat HF = HexFormat.of(); + + @Override + public String toString() { + return "key-" + (min != null ? HF.formatHex(min.asUnboundedArray(), 0, min.size()) : "") + "-" + (max != null + ? HF.formatHex(max.asUnboundedArray(), 0, max.size()) : ""); + } + + public LLRange toLLRange() { + return LLRange.of(min, max); + } + } + + record SSTSingleKey(@NotNull Buf key) implements SSTRange, SSTLLRange { + + private static final HexFormat HF = HexFormat.of(); + + @Override + public String toString() { + return "single-key-" + HF.formatHex(key.asUnboundedArray(), 0, key.size()); + } + + public LLRange toLLRange() { + return LLRange.single(key); + } + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLReadOptions.java b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLReadOptions.java index a5b89b8..0f7b1c6 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLReadOptions.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLReadOptions.java @@ -9,6 +9,7 @@ import org.rocksdb.ReadOptions; import org.rocksdb.ReadTier; import org.rocksdb.RocksDB; import org.rocksdb.Snapshot; +import org.rocksdb.SstFileReader; public final class LLReadOptions extends SimpleResource { @@ -115,7 +116,11 @@ public final class LLReadOptions extends SimpleResource { } public RocksIteratorObj newIterator(RocksDB db, ColumnFamilyHandle cfh, IteratorMetrics iteratorMetrics) { - return new RocksIteratorObj(db.newIterator(cfh, val), this, iteratorMetrics); + return RocksIteratorObj.create(db.newIterator(cfh, val), this, iteratorMetrics); + } + + public RocksIteratorObj newIterator(SstFileReader r, IteratorMetrics iteratorMetrics) { + return RocksIteratorObj.create(r.newIterator(val), this, iteratorMetrics); } public void setFillCache(boolean fillCache) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLSstFileReader.java b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLSstFileReader.java new file mode 100644 index 0000000..edc4240 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLSstFileReader.java @@ -0,0 +1,31 @@ +package it.cavallium.dbengine.database.disk.rocksdb; + +import it.cavallium.dbengine.utils.SimpleResource; +import org.rocksdb.Options; +import org.rocksdb.RocksDBException; +import org.rocksdb.SstFileReader; + +public class LLSstFileReader extends SimpleResource { + + private final Options options; + private final SstFileReader r; + + public LLSstFileReader(boolean checks, String filePath) throws RocksDBException { + this.options = new Options(); + this.r = new SstFileReader(options + .setAllowMmapReads(true) + .setParanoidChecks(checks) + ); + r.open(filePath); + } + + public SstFileReader get() { + return r; + } + + @Override + protected void onClose() { + r.close(); + options.close(); + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLSstFileWriter.java b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLSstFileWriter.java new file mode 100644 index 0000000..13a1efb --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLSstFileWriter.java @@ -0,0 +1,40 @@ +package it.cavallium.dbengine.database.disk.rocksdb; + +import it.cavallium.dbengine.utils.SimpleResource; +import org.rocksdb.CompressionOptions; +import org.rocksdb.CompressionType; +import org.rocksdb.EnvOptions; +import org.rocksdb.Options; +import org.rocksdb.SstFileWriter; + +public class LLSstFileWriter extends SimpleResource { + + private final Options options; + private final CompressionOptions compressionOptions; + private final SstFileWriter w; + + public LLSstFileWriter(boolean unorderedWrite) { + this.options = new Options(); + this.compressionOptions = new CompressionOptions(); + this.w = new SstFileWriter(new EnvOptions(), + options + .setCompressionOptions(compressionOptions + .setEnabled(true) + .setMaxDictBytes(32768) + .setZStdMaxTrainBytes(32768 * 4)) + .setCompressionType(CompressionType.ZSTD_COMPRESSION) + .setUnorderedWrite(unorderedWrite) + ); + } + + public SstFileWriter get() { + return w; + } + + @Override + protected void onClose() { + w.close(); + compressionOptions.close(); + options.close(); + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/RocksIteratorObj.java b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/RocksIteratorObj.java index 590f6bf..b77122c 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/RocksIteratorObj.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/RocksIteratorObj.java @@ -7,25 +7,25 @@ import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.IteratorMetrics; import it.cavallium.dbengine.utils.SimpleResource; import java.nio.ByteBuffer; -import org.rocksdb.AbstractSlice; +import org.rocksdb.AbstractRocksIterator; import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; +import org.rocksdb.SstFileReaderIterator; -public class RocksIteratorObj extends SimpleResource { +public abstract class RocksIteratorObj extends SimpleResource { - private LLReadOptions readOptions; - private final RocksIterator rocksIterator; - private final Counter startedIterSeek; - private final Counter endedIterSeek; - private final Timer iterSeekTime; - private final Counter startedIterNext; - private final Counter endedIterNext; - private final Timer iterNextTime; - private byte[] seekingFrom; + protected LLReadOptions readOptions; + protected final AbstractRocksIterator rocksIterator; + protected final Counter startedIterSeek; + protected final Counter endedIterSeek; + protected final Timer iterSeekTime; + protected final Counter startedIterNext; + protected final Counter endedIterNext; + protected final Timer iterNextTime; + protected byte[] seekingFrom; private byte[] seekingTo; - RocksIteratorObj(RocksIterator rocksIterator, - LLReadOptions readOptions, IteratorMetrics iteratorMetrics) { + RocksIteratorObj(AbstractRocksIterator rocksIterator, LLReadOptions readOptions, IteratorMetrics iteratorMetrics) { super(rocksIterator::close); this.readOptions = readOptions; this.rocksIterator = rocksIterator; @@ -37,6 +37,92 @@ public class RocksIteratorObj extends SimpleResource { this.iterNextTime = iteratorMetrics.iterNextTime(); } + public static RocksIteratorObj create(AbstractRocksIterator rocksIterator, + LLReadOptions readOptions, + IteratorMetrics iteratorMetrics) { + return switch (rocksIterator) { + case RocksIterator it -> new RocksIteratorObj1(it, readOptions, iteratorMetrics); + case SstFileReaderIterator it -> new RocksIteratorObj2(it, readOptions, iteratorMetrics); + default -> throw new IllegalStateException("Unsupported iterator type"); + }; + } + + private static class RocksIteratorObj1 extends RocksIteratorObj { + + private final RocksIterator rocksIterator; + + private RocksIteratorObj1(RocksIterator rocksIterator, LLReadOptions readOptions, IteratorMetrics iteratorMetrics) { + super(rocksIterator, readOptions, iteratorMetrics); + this.rocksIterator = rocksIterator; + } + + @Deprecated(forRemoval = true) + public synchronized int key(ByteBuffer buffer) { + ensureOpen(); + return rocksIterator.key(buffer); + } + + @Deprecated(forRemoval = true) + public synchronized int value(ByteBuffer buffer) { + ensureOpen(); + return rocksIterator.value(buffer); + } + + /** + * The returned buffer may change when calling next() or when the iterator is not valid anymore + */ + public synchronized byte[] key() { + ensureOpen(); + return rocksIterator.key(); + } + + /** + * The returned buffer may change when calling next() or when the iterator is not valid anymore + */ + public synchronized byte[] value() { + ensureOpen(); + return rocksIterator.value(); + } + } + + private static class RocksIteratorObj2 extends RocksIteratorObj { + + private final SstFileReaderIterator rocksIterator; + + private RocksIteratorObj2(SstFileReaderIterator rocksIterator, LLReadOptions readOptions, IteratorMetrics iteratorMetrics) { + super(rocksIterator, readOptions, iteratorMetrics); + this.rocksIterator = rocksIterator; + } + + @Deprecated(forRemoval = true) + public synchronized int key(ByteBuffer buffer) { + ensureOpen(); + return rocksIterator.key(buffer); + } + + @Deprecated(forRemoval = true) + public synchronized int value(ByteBuffer buffer) { + ensureOpen(); + return rocksIterator.value(buffer); + } + + /** + * The returned buffer may change when calling next() or when the iterator is not valid anymore + */ + public synchronized byte[] key() { + ensureOpen(); + return rocksIterator.key(); + } + + /** + * The returned buffer may change when calling next() or when the iterator is not valid anymore + */ + public synchronized byte[] value() { + ensureOpen(); + return rocksIterator.value(); + } + } + public synchronized void seek(ByteBuffer seekBuf) throws RocksDBException { ensureOpen(); startedIterSeek.increment(); @@ -70,6 +156,18 @@ public class RocksIteratorObj extends SimpleResource { rocksIterator.status(); } + public synchronized void seekToFirstUnsafe() throws RocksDBException { + rocksIterator.seekToFirst(); + } + + public synchronized void seekToLastUnsafe() throws RocksDBException { + rocksIterator.seekToLast(); + } + + public synchronized void nextUnsafe() throws RocksDBException { + rocksIterator.next(); + } + public synchronized void seekToLast() throws RocksDBException { ensureOpen(); startedIterSeek.increment(); @@ -118,33 +216,25 @@ public class RocksIteratorObj extends SimpleResource { return rocksIterator.isValid(); } - @Deprecated(forRemoval = true) - public synchronized int key(ByteBuffer buffer) { - ensureOpen(); - return rocksIterator.key(buffer); + public synchronized boolean isValidUnsafe() { + return rocksIterator.isValid(); } @Deprecated(forRemoval = true) - public synchronized int value(ByteBuffer buffer) { - ensureOpen(); - return rocksIterator.value(buffer); - } + public abstract int key(ByteBuffer buffer); + + @Deprecated(forRemoval = true) + public abstract int value(ByteBuffer buffer); /** * The returned buffer may change when calling next() or when the iterator is not valid anymore */ - public synchronized byte[] key() { - ensureOpen(); - return rocksIterator.key(); - } + public abstract byte[] key(); /** * The returned buffer may change when calling next() or when the iterator is not valid anymore */ - public synchronized byte[] value() { - ensureOpen(); - return rocksIterator.value(); - } + public abstract byte[] value(); /** * The returned buffer may change when calling next() or when the iterator is not valid anymore diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java index 09f2335..e7e6651 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java @@ -5,7 +5,8 @@ import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.mapping; import it.cavallium.buffer.Buf; -import it.cavallium.dbengine.client.VerificationProgress; +import it.cavallium.dbengine.client.DbProgress; +import it.cavallium.dbengine.client.SSTVerificationProgress; import it.cavallium.dbengine.database.LLDelta; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; @@ -358,7 +359,7 @@ public class LLMemoryDictionary implements LLDictionary { } @Override - public Stream verifyChecksum(LLRange range) { + public Stream> verifyChecksum(LLRange range) { return Stream.empty(); } diff --git a/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java b/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java index abbe48b..d434b15 100644 --- a/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java +++ b/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java @@ -11,6 +11,7 @@ import java.util.Comparator; import java.util.EnumSet; import java.util.Iterator; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.Spliterator; @@ -62,6 +63,7 @@ public class StreamUtils { private static final BinaryOperator COMBINER = (a, b) -> NULL; private static final Function FINISHER = x -> null; private static final Collector SUMMING_LONG_COLLECTOR = new SummingLongCollector(); + private static final Consumer NOOP_CONSUMER = x -> {}; public static ForkJoinPool newNamedForkJoinPool(String name, boolean async) { final int MAX_CAP = 0x7fff; // max #workers - 1 @@ -139,8 +141,12 @@ public class StreamUtils { } } - @SuppressWarnings("UnstableApiUsage") public static Stream streamWhileNonNull(Supplier supplier) { + return streamWhile(supplier, Objects::nonNull); + } + + @SuppressWarnings("UnstableApiUsage") + public static Stream streamWhile(Supplier supplier, Predicate endPredicate) { var it = new Iterator() { private boolean nextSet = false; @@ -152,7 +158,7 @@ public class StreamUtils { next = supplier.get(); nextSet = true; } - return next != null; + return endPredicate.test(next); } @Override @@ -164,6 +170,40 @@ public class StreamUtils { return Streams.stream(it); } + @SuppressWarnings("UnstableApiUsage") + public static Stream streamUntil(Supplier supplier, Predicate endPredicate) { + var it = new Iterator() { + + private boolean nextSet = false; + private byte state = (byte) 0; + private X next; + + @Override + public boolean hasNext() { + if (state == (byte) 2) { + return false; + } else { + if (!nextSet) { + next = supplier.get(); + state = endPredicate.test(next) ? (byte) 1 : 0; + nextSet = true; + } + return true; + } + } + + @Override + public X next() { + if (state == (byte) 1) { + state = (byte) 2; + } + nextSet = false; + return next; + } + }; + return Streams.stream(it); + } + @SuppressWarnings("DataFlowIssue") @NotNull public static List toList(Stream stream) { @@ -285,6 +325,11 @@ public class StreamUtils { return new ExecutingCollector<>(consumer); } + public static Collector executing() { + //noinspection unchecked + return new ExecutingCollector<>((Consumer) NOOP_CONSUMER); + } + public static Collector countingExecuting(Consumer consumer) { return new CountingExecutingCollector<>(consumer); } diff --git a/src/repair/java/it/cavallium/dbengine/repair/Repair.java b/src/repair/java/it/cavallium/dbengine/repair/Repair.java index f796a00..8886dfb 100644 --- a/src/repair/java/it/cavallium/dbengine/repair/Repair.java +++ b/src/repair/java/it/cavallium/dbengine/repair/Repair.java @@ -1,5 +1,8 @@ package it.cavallium.dbengine.repair; +import static it.cavallium.dbengine.client.DbProgress.toDbProgress; +import static it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection.getDatabasePath; + import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import it.cavallium.datagen.nativedata.NullableString; @@ -7,17 +10,25 @@ import it.cavallium.datagen.nativedata.Nullableboolean; import it.cavallium.datagen.nativedata.Nullableint; import it.cavallium.datagen.nativedata.Nullablelong; import it.cavallium.dbengine.client.Compression; -import it.cavallium.dbengine.client.VerificationProgress; -import it.cavallium.dbengine.client.VerificationProgress.BlockBad; -import it.cavallium.dbengine.client.VerificationProgress.FileOk; -import it.cavallium.dbengine.client.VerificationProgress.FileStart; -import it.cavallium.dbengine.client.VerificationProgress.Progress; +import it.cavallium.dbengine.client.DbProgress; +import it.cavallium.dbengine.client.DbProgress.DbSSTProgress; +import it.cavallium.dbengine.client.LongProgressTracker; +import it.cavallium.dbengine.client.SSTDumpProgress; +import it.cavallium.dbengine.client.SSTDumpProgress.SSTBlockKeyValue; +import it.cavallium.dbengine.client.SSTProgress; +import it.cavallium.dbengine.client.SSTProgress.SSTOk; +import it.cavallium.dbengine.client.SSTProgress.SSTProgressReport; +import it.cavallium.dbengine.client.SSTProgress.SSTStart; +import it.cavallium.dbengine.client.SSTVerificationProgress.SSTBlockBad; import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.UpdateMode; +import it.cavallium.dbengine.database.disk.RocksDBFile; +import it.cavallium.dbengine.database.disk.SSTRange; import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection; import it.cavallium.dbengine.database.disk.LLLocalDictionary; import it.cavallium.dbengine.database.disk.LLLocalKeyValueDatabase; -import it.cavallium.dbengine.database.disk.RocksDBFile; +import it.cavallium.dbengine.database.disk.SSTRange.SSTRangeFull; +import it.cavallium.dbengine.database.disk.rocksdb.LLSstFileWriter; import it.cavallium.dbengine.rpc.current.data.Column; import it.cavallium.dbengine.rpc.current.data.DatabaseOptionsBuilder; import it.cavallium.dbengine.rpc.current.data.DatabaseVolume; @@ -25,6 +36,7 @@ import it.cavallium.dbengine.rpc.current.data.DefaultColumnOptions; import it.cavallium.dbengine.rpc.current.data.NamedColumnOptions; import it.cavallium.dbengine.rpc.current.data.nullables.NullableCompression; import it.cavallium.dbengine.rpc.current.data.nullables.NullableFilter; +import it.cavallium.dbengine.utils.StreamUtils; import it.unimi.dsi.fastutil.objects.ObjectArrayList; import it.unimi.dsi.fastutil.objects.ObjectList; import java.io.BufferedWriter; @@ -33,24 +45,44 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.nio.file.attribute.BasicFileAttributeView; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.time.format.FormatStyle; import java.util.Base64; import java.util.Comparator; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.jetbrains.annotations.Nullable; import org.rocksdb.InfoLogLevel; import org.rocksdb.Options; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; +import org.rocksdb.SstFileWriter; public class Repair { public static final MeterRegistry METER = new SimpleMeterRegistry(); // LoggingMeterRegistry.builder(key -> null).clock(Clock.SYSTEM).loggingSink(System.err::println).build(); + static final boolean PRINT_ALL_CHECKSUM_VERIFICATION_STEPS + = Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.checks.verification.print", "false")); + public static void main(String[] argsArray) throws RocksDBException, IOException { System.setProperty("it.cavallium.dbengine.checks.compression", "true"); System.setProperty("it.cavallium.dbengine.checks.paranoid", "true"); @@ -84,25 +116,15 @@ public class Repair { var conn = new LLLocalDatabaseConnection(METER, path, false); conn.connect(); LLLocalKeyValueDatabase db = getDatabase(conn, dbName, columnNames); - Path cwd = Path.of("."); - String errorsFileNamePrefix = "errors-" + dbName; - String errorsFileNameSuffix = ".log"; - Path errorsFile; - if (Files.isWritable(cwd)) { - errorsFile = cwd.resolve(errorsFileNamePrefix + "-" + System.currentTimeMillis() + errorsFileNameSuffix); - } else { - errorsFile = Files.createTempFile(errorsFileNamePrefix, errorsFileNameSuffix); - } - try (var os = Files.newBufferedWriter(errorsFile, - StandardCharsets.UTF_8, - StandardOpenOption.CREATE, - StandardOpenOption.TRUNCATE_EXISTING, - StandardOpenOption.DSYNC - )) { + try (var os = getErrorLogFile(dbName)) { db.getAllColumnFamilyHandles().forEach((column, cfh) -> { System.err.printf("Scanning column: %s%n", column.name()); LLLocalDictionary dict = db.getDictionary(column.name().getBytes(StandardCharsets.UTF_8), UpdateMode.DISALLOW); - consumeVerification(os, dict.verifyChecksum(LLRange.all())); + LongProgressTracker fileCountTotalTracker = new LongProgressTracker(0); + StreamUtils.collectOn(ForkJoinPool.commonPool(), + peekProgress(os, fileCountTotalTracker, 0, dict.verifyChecksum(LLRange.all())), + StreamUtils.executing() + ); }); } } @@ -112,31 +134,20 @@ public class Repair { } Path path = Path.of(args.get(0)).toAbsolutePath(); String dbName = args.get(1); - boolean skip = args.get(2).equals("--skip"); + boolean skip = args.get(2).equalsIgnoreCase("--skip"); List fileNames = args.subList(skip ? 3 : 2, args.size()).stream().map(f -> f.startsWith("/") ? f : ("/" + f)).toList(); List columnNames = getColumnFamilyNames(path, dbName).toList(); System.err.printf("Scanning database \"%s\" at \"%s\", files:%n%s%n", dbName, path, String.join("\n", fileNames)); var conn = new LLLocalDatabaseConnection(METER, path, false); conn.connect(); LLLocalKeyValueDatabase db = getDatabase(conn, dbName, columnNames); - Path cwd = Path.of("."); - String errorsFileNamePrefix = "errors-" + dbName; - String errorsFileNameSuffix = ".log"; - Path errorsFile; - if (Files.isWritable(cwd)) { - errorsFile = cwd.resolve(errorsFileNamePrefix + "-" + System.currentTimeMillis() + errorsFileNameSuffix); - } else { - errorsFile = Files.createTempFile(errorsFileNamePrefix, errorsFileNameSuffix); - } - try (var os = Files.newBufferedWriter(errorsFile, - StandardCharsets.UTF_8, - StandardOpenOption.CREATE, - StandardOpenOption.TRUNCATE_EXISTING, - StandardOpenOption.DSYNC - )) { + try (var os = getErrorLogFile(dbName)) { AtomicLong ignoredFiles = new AtomicLong(); var fileList = db.getAllLiveFiles() .filter(file -> { + if (file.getSstNumber() == null) { + System.err.printf("Empty SST number: %s%n", file); + } if (!skip && !fileNames.contains(file.getMetadata().fileName())) { ignoredFiles.incrementAndGet(); System.err.printf("Ignoring file: \"%s\"%n", file.getMetadata().fileName()); @@ -146,15 +157,70 @@ public class Repair { System.err.printf("Ignoring file: \"%s\"%n", file.getMetadata().fileName()); return false; } else { + System.err.printf("About to scan file: \"%s\"%n", file.getMetadata().fileName()); return true; } - }).toList(); - AtomicLong progress = new AtomicLong(); - consumeVerification(os, fileList.stream() - .flatMap(file -> { - System.err.printf("Processing file [%d/%d (+%d ignored)]: %s%n", progress.incrementAndGet(), fileList.size(), ignoredFiles.get(), file.getMetadata().fileName()); - return file.verify(dbName, "any", LLRange.all()); - })); + }) + .sorted(Comparator.reverseOrder()) + .toList(); + var keyTotalTracker = new LongProgressTracker(); + var parallelism = Math.max(1, Runtime.getRuntime().availableProcessors()); + try (var pool = Executors.newWorkStealingPool(parallelism)) { + LongProgressTracker fileCountTotalTracker = new LongProgressTracker(fileList.size()); + for (int i = 0; i < parallelism; i++) { + var iF = i; + + pool.execute(() -> peekProgress(os, fileCountTotalTracker, ignoredFiles.get(), + + IntStream.range(0, fileList.size()) + .filter(n -> n % parallelism == iF) + .mapToObj(fileList::get) + + .flatMap(file -> toDbProgress(dbName, "any", keyTotalTracker, file.verify(new SSTRangeFull())))) + .sequential() + .forEachOrdered(x -> {})); + } + } + } + } + case "export-files" -> { + if (args.size() < 4) { + printHelp(initialArgs); + } + Path path = Path.of(args.get(0)).toAbsolutePath(); + String dbName = args.get(1); + Path dbPath = getDatabasePath(path, dbName); + Path destPath = Path.of(args.get(2)).toAbsolutePath(); + record Input(String fileName, SSTRange exportOption) { + + @Override + public String toString() { + return Input.this.fileName + "[" + Input.this.exportOption + "]"; + } + } + Map fileNames = args.subList(3, args.size()).stream().map(f -> { + var fn = f.startsWith("/") ? f : ("/" + f); + if (fn.indexOf('[') == -1) { + fn = fn + "[full]"; + } + var fileName = StringUtils.substringBeforeLast(fn, "["); + var exportOptions = SSTRange.parse(StringUtils.substringBetween(fn, "[", "]")); + return new Input(fileName, exportOptions); + }).collect(Collectors.toMap(Input::fileName, Function.identity())); + List columnNames = getColumnFamilyNames(path, dbName).toList(); + System.err.printf("Exporting database \"%s\" at \"%s\" to \"%s\", files:%n%s%n", dbName, path, destPath, String.join("\n", fileNames.values().stream().map(Input::toString).toList())); + var conn = new LLLocalDatabaseConnection(METER, path, false); + conn.connect(); + try (var os = getErrorLogFile(dbName)) { + var fileList = fileNames.values().stream() + .map(input -> Map.entry(new RocksDBFile(dbPath, input.fileName()), input.exportOption())) + .toList(); + var keyTotalTracker = new LongProgressTracker(); + LongProgressTracker fileCountTotalTracker = new LongProgressTracker(fileList.size()); + peekProgress(os, fileCountTotalTracker, 0, fileList.stream().flatMap(e -> + toDbProgress(dbName, "any", keyTotalTracker, + peekExportSST(destPath, e.getKey().readAllSST(e.getValue(), false)))) + ).sequential().forEachOrdered(x -> {}); } } case "list-files" -> { @@ -176,7 +242,7 @@ public class Repair { .thenComparing(x -> x.getMetadata().fileName()) ).forEach(file -> { var meta = file.getMetadata(); - System.out.printf("%s\t%s\t%s\t%s\t%s\t%d\t%d%n", meta.fileName(), meta.path(), meta.columnName(), meta.keysRange(), new DataSize(meta.size()).toString(false), meta.numEntries(), meta.level()); + System.out.printf("%s\t%s\t%s\t%s\t%s\t%d\t%d%n", meta.fileName(), meta.filePath(), meta.columnName(), meta.keysRange(), new DataSize(meta.size()).toString(false), meta.numEntries(), meta.level()); }); } case "dump-all" -> { @@ -237,51 +303,149 @@ public class Repair { System.exit(0); } - private static void consumeVerification(BufferedWriter os, Stream verificationProgressStream) { + private static Stream peekExportSST(Path destDirectory, Stream sstProgressStream) { + try { + Files.createDirectories(destDirectory); + } catch (IOException e) { + throw new CompletionException(e); + } + return StreamUtils.resourceStream(() -> new LLSstFileWriter(false), w -> { + AtomicInteger fileNum = new AtomicInteger(1); + AtomicReference fileName = new AtomicReference<>(); + AtomicLong memory = new AtomicLong(0); + SstFileWriter writer = w.get(); + return sstProgressStream.peek(r -> { + synchronized (writer) { + try { + switch (r) { + case SSTStart start -> { + var sst = destDirectory.resolve(start.metadata().filePath().getFileName()).toString(); + fileName.set(sst); + System.err.printf("Creating SST at \"%s\"%n", sst); + writer.open(sst); + } + case SSTBlockKeyValue report -> { + var k = report.rawKey().toByteArray(); + var v = report.rawValue().toByteArray(); + long currentMem = k.length + v.length; + writer.put(k, v); + var memVal = memory.addAndGet(currentMem); + // 4GiB + if (memVal > 4L * 1024 * 1024 * 1024) { + memory.set(0); + writer.finish(); + writer.open(fileName + "." + fileNum.getAndIncrement() + ".sst"); + } + } + case SSTOk ok -> writer.finish(); + default -> { + } + } + } catch (RocksDBException ex) { + throw new CompletionException(ex); + } + } + }); + }); + } + + private static BufferedWriter getErrorLogFile(String dbName) throws IOException { + Path cwd = Path.of("."); + String errorsFileNamePrefix = "errors-" + dbName; + String errorsFileNameSuffix = ".log"; + Path errorsFile; + if (Files.isWritable(cwd)) { + errorsFile = cwd.resolve(errorsFileNamePrefix + "-" + System.currentTimeMillis() + errorsFileNameSuffix); + } else { + errorsFile = Files.createTempFile(errorsFileNamePrefix, errorsFileNameSuffix); + } + return Files.newBufferedWriter(errorsFile, + StandardCharsets.UTF_8, + StandardOpenOption.CREATE, + StandardOpenOption.TRUNCATE_EXISTING, + StandardOpenOption.DSYNC + ); + } + + private static > Stream peekProgress(@Nullable BufferedWriter os, LongProgressTracker fileCountTotalTracker, + long ignoredFiles, Stream verificationProgressStream) { var showProgress = !Boolean.getBoolean("it.cavallium.dbengine.repair.hideprogress"); - verificationProgressStream.parallel().forEach(block -> { + AtomicLong startMs = new AtomicLong(); + return verificationProgressStream.peek(block -> { synchronized (Repair.class) { switch (block) { case null -> {} - case BlockBad blockBad -> { - StringBuilder errorLineBuilder = new StringBuilder() - .append("File scan ended with error, bad block found: ") - .append(block.databaseName()) - .append(block.column() != null ? "->" + block.column().name() : "") - .append("->") - .append(blockBad.rawKey()) - .append("->") - .append(block.file()); - if (blockBad.ex() != null) { - if (blockBad.ex() instanceof RocksDBException ex) { - errorLineBuilder.append("\n\t").append(ex); - } else { - errorLineBuilder.append("\n").append(ExceptionUtils.getStackTrace(blockBad.ex())); + case DbSSTProgress dbSstProgress -> { + var sstProgress = dbSstProgress.sstProgress(); + switch (sstProgress) { + case SSTStart sstStart -> { + startMs.set(System.currentTimeMillis()); + System.err.printf("Processing file [%d/%d%s]: %s%n", fileCountTotalTracker.incrementAndGet(), + fileCountTotalTracker.getTotal(), + (ignoredFiles > 0 ? " (+%d ignored)".formatted(ignoredFiles) : ""), sstStart.metadata().filename()); + String date; + try { + var dateInstant = Files.getFileAttributeView(sstStart.metadata().filePath(), BasicFileAttributeView.class).readAttributes().creationTime().toInstant().atZone(ZoneId.systemDefault()); + date = DateTimeFormatter.ofLocalizedDateTime(FormatStyle.FULL).format(dateInstant); + } catch (Exception e) { + date = "unknown"; + e.printStackTrace(); + } + System.err.println( + "File scan begin: " + block.databaseName() + (dbSstProgress.column() != null ? "->" + dbSstProgress.column().name() : "") + "->" + dbSstProgress.fileString() + ". The file creation date is: " + date); + if (PRINT_ALL_CHECKSUM_VERIFICATION_STEPS) { + System.err.printf("Seeking to %s->%s->first on file %s%n", block.databaseName(), dbSstProgress.column(), dbSstProgress.file()); + } } - } - String errorLine = errorLineBuilder.toString(); - System.err.println("[ ! ] " + errorLine); - try { - os.write(errorLine); - os.flush(); - } catch (IOException e) { - System.err.println("Can't write to errors file: " + e); - } - } - case FileOk ignored -> - System.err.println("File scan ended with success: " + block.databaseName() + (block.column() != null ? "->" + block.column().name() : "") + "->" + block.file()); - case FileStart ignored -> - System.err.println("File scan begin: " + block.databaseName() + (block.column() != null ? "->" + block.column().name() : "") + "->" + block.file()); - case Progress progress -> { - if (showProgress) { - System.err.printf("Progress: %s[%d/%d] %s%s->%s%n", - progress.total() != -1 ? "[%d/%d] file: ".formatted(progress.scanned(), progress.total()) : "", - progress.fileScanned(), - progress.fileTotal(), - block.databaseName(), - block.column() != null ? "->" + block.column().name() : "", - block.file() - ); + case SSTBlockBad blockBad -> { + StringBuilder errorLineBuilder = new StringBuilder() + .append("File scan ended with error, bad block found: ") + .append(block.databaseName()) + .append(dbSstProgress.column() != null ? "->" + dbSstProgress.column().name() : "") + .append("->") + .append(blockBad.rawKey()) + .append("->") + .append(dbSstProgress.file()); + if (blockBad.ex() != null) { + if (blockBad.ex() instanceof RocksDBException ex) { + errorLineBuilder.append("\n\t").append(ex); + } else { + errorLineBuilder.append("\n").append(ExceptionUtils.getStackTrace(blockBad.ex())); + } + } + String errorLine = errorLineBuilder.toString(); + System.err.println("[ ! ] " + errorLine); + if (os != null) { + try { + os.write(errorLine); + os.flush(); + } catch (IOException e) { + System.err.println("Can't write to errors file: " + e); + } + } + } + case SSTOk end -> { + var scanned = end.scannedCount(); + var time = Duration.ofMillis(System.currentTimeMillis() - startMs.get()); + var rate = (int) (scanned / Math.max(1, time.toMillis() / 1000d)); + System.err.println("File scan ended with success: " + block.databaseName() + (dbSstProgress.column() != null ? "->" + dbSstProgress.column().name() : "") + "->" + dbSstProgress.fileString() + " - " + end.scannedCount() + " keys scanned in " + time + ". Speed: " + rate + "keys/sec."); + } + case SSTProgressReport progress -> { + if (showProgress) { + boolean shouldSendStatus = progress.fileScanned() % 1_000_000 == 0; + if (PRINT_ALL_CHECKSUM_VERIFICATION_STEPS || shouldSendStatus) { + System.err.printf("Progress: %s[%d/%d] %s%s->%s%n", + dbSstProgress.total() != -1 ? "[%d/%d] file: ".formatted(dbSstProgress.scanned(), dbSstProgress.total()) : "", + progress.fileScanned(), + progress.fileTotal(), + block.databaseName(), + dbSstProgress.column() != null ? "->" + dbSstProgress.column().name() : "", + dbSstProgress.fileString() + ); + } + } + } + default -> {} } } default -> throw new IllegalStateException("Unexpected value: " + block); @@ -308,9 +472,9 @@ public class Repair { .openAsSecondary(true) .absoluteConsistency(true) .allowMemoryMapping(true) - .blockCache(Nullablelong.of(3L * 1024 * 1024 * 1024)) + .blockCache(Nullablelong.of(4L * 1024 * 1024 * 1024)) .lowMemory(false) - .maxOpenFiles(Nullableint.of(-1)) + .maxOpenFiles(Nullableint.of(Math.max(40, Runtime.getRuntime().availableProcessors() * 3))) .optimistic(false) .spinning(true) .useDirectIO(false) @@ -365,6 +529,7 @@ public class Repair { or: repair verify-checksum DIRECTORY DB_NAME or: repair list-column-families DIRECTORY DB_NAME or: repair scan-files DIRECTORY DB_NAME [--skip] FILE-NAME... + or: repair export-files DIRECTORY DB_NAME DESTINATION_DIRECTORY '{fileName}[{key|offset}-{from}-{to}]'... or: repair list-files DIRECTORY DB_NAME """); System.exit(1);