From 6fd7d249deec7513f5a3e834f2af29728d405238 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Mon, 2 Oct 2023 19:51:28 +0200 Subject: [PATCH] A lot of db repair features, rollback SearchEntityHandler, update log4j --- pom.xml | 3 + .../dbengine/client/VerificationProgress.java | 4 +- .../cavallium/dbengine/database/LLRange.java | 2 - .../database/disk/AbstractRocksDBColumn.java | 18 +- .../database/disk/IteratorMetrics.java | 17 +- .../database/disk/LLLocalDictionary.java | 135 +++-------- .../disk/LLLocalKeyValueDatabase.java | 19 +- .../database/disk/LiveFileMetadata.java | 8 - .../dbengine/database/disk/RocksDBColumn.java | 2 +- .../dbengine/database/disk/RocksDBFile.java | 215 ++++++++++++++++++ .../database/disk/RocksDBFileMetadata.java | 8 + .../dbengine/database/disk/RocksDBUtils.java | 15 +- .../cavallium/dbengine/repair/DataSize.java | 194 ++++++++++++++++ .../it/cavallium/dbengine/repair/Repair.java | 186 +++++++++++---- 14 files changed, 658 insertions(+), 168 deletions(-) delete mode 100644 src/main/java/it/cavallium/dbengine/database/disk/LiveFileMetadata.java create mode 100644 src/main/java/it/cavallium/dbengine/database/disk/RocksDBFile.java create mode 100644 src/main/java/it/cavallium/dbengine/database/disk/RocksDBFileMetadata.java create mode 100644 src/repair/java/it/cavallium/dbengine/repair/DataSize.java diff --git a/pom.xml b/pom.xml index 7e41226..e7f9b65 100644 --- a/pom.xml +++ b/pom.xml @@ -574,6 +574,9 @@ it.cavallium.dbengine.repair.Repair + + true + diff --git a/src/main/java/it/cavallium/dbengine/client/VerificationProgress.java b/src/main/java/it/cavallium/dbengine/client/VerificationProgress.java index 22f2da8..0ee0d4e 100644 --- a/src/main/java/it/cavallium/dbengine/client/VerificationProgress.java +++ b/src/main/java/it/cavallium/dbengine/client/VerificationProgress.java @@ -7,7 +7,9 @@ import org.jetbrains.annotations.Nullable; public sealed interface VerificationProgress { record BlockBad(String databaseName, Column column, Buf rawKey, String file, Throwable ex) implements VerificationProgress {} - record FileOk(String databaseName, Column column, String file) + record FileStart(String databaseName, Column column, String file, @Nullable Long numEntriesEstimate) + implements VerificationProgress {} + record FileOk(String databaseName, Column column, String file, long scanned) implements VerificationProgress {} record Progress(String databaseName, Column column, String file, long scanned, long total, diff --git a/src/main/java/it/cavallium/dbengine/database/LLRange.java b/src/main/java/it/cavallium/dbengine/database/LLRange.java index defb906..300e63a 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLRange.java +++ b/src/main/java/it/cavallium/dbengine/database/LLRange.java @@ -1,9 +1,7 @@ package it.cavallium.dbengine.database; import it.cavallium.buffer.Buf; -import it.cavallium.dbengine.database.disk.LiveFileMetadata; import java.util.Objects; -import java.util.StringJoiner; import org.jetbrains.annotations.Nullable; /** diff --git a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java index e453415..d9cd1a4 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java @@ -560,12 +560,18 @@ public sealed abstract class AbstractRocksDBColumn implements } @Override - public Stream getAllLiveFiles() throws RocksDBException { - byte[] cfhName = cfh.getName(); - return db.getLiveFilesMetaData().stream() - .filter(file -> Arrays.equals(cfhName, file.columnFamilyName())) - .map(file -> new LiveFileMetadata(file.path(), file.fileName(), file.level(), columnName, - file.numEntries(),file.size(), LLRange.of(Buf.wrap(file.smallestKey()), Buf.wrap(file.largestKey())))); + public Stream getAllLiveFiles() throws RocksDBException { + var closeReadLock = closeLock.readLock(); + try { + ensureOpen(); + db.getLiveFiles(); // flushes the memtable + byte[] cfhName = cfh.getName(); + return db.getLiveFilesMetaData().stream() + .filter(file -> Arrays.equals(cfhName, file.columnFamilyName())) + .map(file -> new RocksDBFile(db, cfh, file)); + } finally { + closeLock.unlockRead(closeReadLock); + } } protected int getLevels() { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/IteratorMetrics.java b/src/main/java/it/cavallium/dbengine/database/disk/IteratorMetrics.java index 090e4cb..04ac534 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/IteratorMetrics.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/IteratorMetrics.java @@ -1,7 +1,22 @@ package it.cavallium.dbengine.database.disk; import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Meter.Id; +import io.micrometer.core.instrument.Meter.Type; +import io.micrometer.core.instrument.Tags; import io.micrometer.core.instrument.Timer; +import io.micrometer.core.instrument.noop.NoopCounter; +import io.micrometer.core.instrument.noop.NoopTimer; public record IteratorMetrics(Counter startedIterSeek, Counter endedIterSeek, Timer iterSeekTime, - Counter startedIterNext, Counter endedIterNext, Timer iterNextTime) {} + Counter startedIterNext, Counter endedIterNext, Timer iterNextTime) { + + public static final IteratorMetrics NO_OP = new IteratorMetrics( + new NoopCounter(new Id("no-op", Tags.empty(), null, null, Type.COUNTER)), + new NoopCounter(new Id("no-op", Tags.empty(), null, null, Type.COUNTER)), + new NoopTimer(new Id("no-op", Tags.empty(), null, null, Type.TIMER)), + new NoopCounter(new Id("no-op", Tags.empty(), null, null, Type.COUNTER)), + new NoopCounter(new Id("no-op", Tags.empty(), null, null, Type.COUNTER)), + new NoopTimer(new Id("no-op", Tags.empty(), null, null, Type.TIMER)) + ); +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 55b94b1..5924f78 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -14,13 +14,16 @@ import static it.cavallium.dbengine.utils.StreamUtils.resourceStream; import static it.cavallium.dbengine.utils.StreamUtils.streamWhileNonNull; import static java.util.Objects.requireNonNull; import static it.cavallium.dbengine.utils.StreamUtils.batches; +import static java.util.Objects.requireNonNullElse; +import com.google.common.primitives.Longs; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Timer; import it.cavallium.buffer.Buf; import it.cavallium.dbengine.client.VerificationProgress; import it.cavallium.dbengine.client.VerificationProgress.BlockBad; import it.cavallium.dbengine.client.VerificationProgress.FileOk; +import it.cavallium.dbengine.client.VerificationProgress.FileStart; import it.cavallium.dbengine.client.VerificationProgress.Progress; import it.cavallium.dbengine.database.ColumnUtils; import it.cavallium.dbengine.database.LLDelta; @@ -49,7 +52,9 @@ import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletionException; @@ -643,113 +648,43 @@ public class LLLocalDictionary implements LLDictionary { public Stream verifyChecksum(LLRange rangeFull) { Set brokenFiles = new ConcurrentHashMap().keySet(true); LongAdder totalScanned = new LongAdder(); - AtomicLong totalEstimate = new AtomicLong(); - Set filesToSkip = Stream - .of(System.getProperty("filenames.skip.list", "").split(",")) - .filter(x -> !x.isBlank()) - .map(String::trim) - .collect(Collectors.toUnmodifiableSet()); + long totalEstimate; - try { - totalEstimate.set(db.getNumEntries()); - } catch (Throwable ex) { - logger.warn("Failed to get total entries count", ex); + { + long totalEstimateTmp = 0; + try { + totalEstimateTmp = db.getNumEntries(); + } catch (Throwable ex) { + logger.warn("Failed to get total entries count", ex); + } + totalEstimate = totalEstimateTmp; } Column column = ColumnUtils.special(columnName); try { - record FileRange(String filePath, String filename, @Nullable LLRange range, long countEstimate) {} - return db.getAllLiveFiles() - .map(metadata -> new FileRange(Path.of(metadata.path()).resolve("./" + metadata.fileName()).normalize().toString(), - metadata.fileName().replace("/", ""), - LLRange.intersect(metadata.keysRange(), rangeFull), - metadata.numEntries() - )) - .filter(fr -> fr.range != null) - // Skip some files - .filter(fr -> !filesToSkip.contains(fr.filename)) - .parallel() - .flatMap(fr -> { - String filename = fr.filename; - String path = fr.filePath; - LLRange rangePartial = fr.range; - AtomicLong fileScanned = new AtomicLong(); - final long fileEstimate = fr.countEstimate; - AtomicBoolean streamStarted = new AtomicBoolean(false); - AtomicBoolean streamStarted2 = new AtomicBoolean(false); - AtomicBoolean streamEnded = new AtomicBoolean(false); - totalScanned.add(fileEstimate); - try { - return resourceStream( - () -> LLUtils.generateCustomReadOptions(null, false, isBoundedRange(rangePartial), false), - ro -> { - ro.setFillCache(false); - if (!rangePartial.isSingle()) { - if (LLUtils.MANUAL_READAHEAD) { - ro.setReadaheadSize(32 * 1024); - } - } - ro.setVerifyChecksums(true); - return resourceStream(() -> db.newRocksIterator(ro, rangePartial, false), rocksIterator -> { - return StreamUtils.>streamWhileNonNull(() -> { - boolean first = streamStarted.compareAndSet(false, true); - boolean second = !first && streamStarted.compareAndSet(false, true); - if (!first && !rocksIterator.isValid()) { - if (streamEnded.compareAndSet(false, true)) { - totalScanned.add(fileScanned.get() - fileEstimate); - return Optional.of(new FileOk(databaseName, column, path)); - } else { - //noinspection OptionalAssignedToNull - return null; - } - } - boolean shouldSendStatus; - Buf rawKey = null; - try { - if (second) { - if (PRINT_ALL_CHECKSUM_VERIFICATION_STEPS) { - logger.info("Seeking to {}->{}->first on file {}", databaseName, column.name(), filename); - } - rocksIterator.seekToFirst(); - } - if (first) { - shouldSendStatus = true; - } else { - rawKey = rocksIterator.keyBuf().copy(); - shouldSendStatus = fileScanned.incrementAndGet() % 1_000_000 == 0; - if (PRINT_ALL_CHECKSUM_VERIFICATION_STEPS) { - logger.info("Checking {}->{}->{} on file {}", databaseName, column.name(), rawKey.toString(), filename); - } - rocksIterator.next(); - } - } catch (RocksDBException ex) { - return Optional.of(new BlockBad(databaseName, column, rawKey, path, ex)); - } - if (shouldSendStatus) { - long totalScannedValue = totalScanned.sum(); - long fileScannedVal = fileScanned.get(); - return Optional.of(new Progress(databaseName, - column, - path, - totalScannedValue, - Math.max(totalEstimate.get(), totalScannedValue), - fileScannedVal, - Math.max(fileEstimate, fileScannedVal) - )); - } else { - return Optional.empty(); - } - }).filter(Optional::isPresent).map(Optional::get).onClose(() -> { - rocksIterator.close(); - ro.close(); - }); - }); - } - ); - } catch (RocksDBException e) { - return Stream.of(new BlockBad(databaseName, column, null, path, e)); + var liveFiles = db.getAllLiveFiles().toList(); + var liveFilesCount = liveFiles.size(); + ConcurrentHashMap fileToInitialEstimate = new ConcurrentHashMap<>(); + + return liveFiles.stream() + .sorted(Comparator.reverseOrder()) + .flatMap(fr -> fr.verify(databaseName, columnName, rangeFull)) + .map(status -> switch (status) { + case FileStart fileStart -> { + Long numEntriesEstimate = Objects.requireNonNullElse(fileStart.numEntriesEstimate(), totalEstimate / liveFilesCount); + totalScanned.add(numEntriesEstimate); + fileToInitialEstimate.put(fileStart.file(), numEntriesEstimate); + yield status; } + case FileOk fileEnd -> { + long initialNumEntriesEstimate = Objects.requireNonNullElse(fileToInitialEstimate.remove(fileEnd.file()), 0L); + long numEntriesScanned = fileEnd.scanned(); + totalScanned.add(numEntriesScanned - initialNumEntriesEstimate); + yield status; + } + case Progress progress -> new Progress(progress.databaseName(), progress.column(), progress.file(), totalScanned.longValue(), totalEstimate, progress.fileScanned(), progress.fileTotal()); + default -> status; }) .filter(err -> !(err instanceof BlockBad blockBad && blockBad.rawKey() == null && !brokenFiles.add(blockBad.file()))); } catch (RocksDBException e) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 34c428d..518800c 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -80,6 +80,7 @@ import org.rocksdb.FlushOptions; import org.rocksdb.IndexType; import org.rocksdb.InfoLogLevel; import org.rocksdb.IngestExternalFileOptions; +import org.rocksdb.LiveFileMetaData; import org.rocksdb.OptimisticTransactionDB; import org.rocksdb.PersistentCache; import org.rocksdb.PlainTableConfig; @@ -622,7 +623,23 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa } } - public List getColumnFiles(Column column, boolean excludeLastLevel) { + public Stream getAllLiveFiles() throws RocksDBException { + var closeReadLock = closeLock.readLock(); + try { + ensureOpen(); + db.getLiveFiles(); // flushes the memtable + var liveFilesMetadata = db.getLiveFilesMetaData(); + List files = new ArrayList<>(); + for (LiveFileMetaData file : liveFilesMetadata) { + files.add(new RocksDBFile(db, getCfh(file.columnFamilyName()), file)); + } + return files.stream(); + } finally { + closeLock.unlockRead(closeReadLock); + } + } + + public List getColumnFiles(Column column, boolean excludeLastLevel) { var closeReadLock = closeLock.readLock(); try { ensureOpen(); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LiveFileMetadata.java b/src/main/java/it/cavallium/dbengine/database/disk/LiveFileMetadata.java deleted file mode 100644 index ae1e6bd..0000000 --- a/src/main/java/it/cavallium/dbengine/database/disk/LiveFileMetadata.java +++ /dev/null @@ -1,8 +0,0 @@ -package it.cavallium.dbengine.database.disk; - -import it.cavallium.dbengine.database.LLRange; - -public record LiveFileMetadata(String path, String fileName, int level, String columnName, long numEntries, long size, - LLRange keysRange) { - -} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java index 4be3751..0da1164 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java @@ -54,7 +54,7 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn { @NotNull RocksIteratorObj newIterator(@NotNull LLReadOptions readOptions, @Nullable Buf min, @Nullable Buf max); - Stream getAllLiveFiles() throws RocksDBException; + Stream getAllLiveFiles() throws RocksDBException; @NotNull UpdateAtomicResult updateAtomic(@NotNull LLReadOptions readOptions, @NotNull LLWriteOptions writeOptions, diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBFile.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBFile.java new file mode 100644 index 0000000..c335f14 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBFile.java @@ -0,0 +1,215 @@ +package it.cavallium.dbengine.database.disk; + +import static it.cavallium.dbengine.database.LLUtils.isBoundedRange; +import static it.cavallium.dbengine.database.disk.LLLocalKeyValueDatabase.PRINT_ALL_CHECKSUM_VERIFICATION_STEPS; +import static it.cavallium.dbengine.utils.StreamUtils.resourceStream; +import static java.util.Objects.requireNonNull; + +import com.google.common.primitives.Longs; +import it.cavallium.buffer.Buf; +import it.cavallium.dbengine.client.VerificationProgress; +import it.cavallium.dbengine.client.VerificationProgress.BlockBad; +import it.cavallium.dbengine.client.VerificationProgress.FileOk; +import it.cavallium.dbengine.client.VerificationProgress.FileStart; +import it.cavallium.dbengine.client.VerificationProgress.Progress; +import it.cavallium.dbengine.database.LLRange; +import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.disk.rocksdb.LLReadOptions; +import it.cavallium.dbengine.rpc.current.data.Column; +import it.cavallium.dbengine.utils.StreamUtils; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.util.Objects; +import java.util.Optional; +import java.util.StringJoiner; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.LiveFileMetaData; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.SstFileMetaData; + +public class RocksDBFile implements Comparable { + + protected static final Logger logger = LogManager.getLogger(RocksDBFile.class); + + private final RocksDB db; + private final ColumnFamilyHandle cfh; + private final RocksDBFileMetadata metadata; + private final Long sstNumber; + + public RocksDBFile(RocksDB db, ColumnFamilyHandle cfh, RocksDBFileMetadata metadata) { + this.db = db; + this.cfh = cfh; + this.metadata = metadata; + String fileName = metadata.fileName().replace("/", ""); + int extensionIndex = fileName.indexOf("."); + Long sstNumber = null; + if (extensionIndex != -1) { + String numberRaw = fileName.substring(0, extensionIndex); + //noinspection UnstableApiUsage + this.sstNumber = Longs.tryParse(numberRaw); + } else { + this.sstNumber = null; + } + } + + public RocksDBFile(T db, ColumnFamilyHandle cfh, LiveFileMetaData file) { + this(db, + cfh, + new RocksDBFileMetadata(file.path(), + file.fileName(), + file.level(), + new String(file.columnFamilyName(), StandardCharsets.UTF_8), + file.numEntries(), + file.size(), + decodeRange(file.smallestKey(), file.largestKey()) + ) + ); + } + + public RocksDBFile(T db, ColumnFamilyHandle cfh, SstFileMetaData file, byte[] columnFamilyName, int level) { + this(db, + cfh, + new RocksDBFileMetadata(file.path(), + file.fileName(), + level, + new String(columnFamilyName, StandardCharsets.UTF_8), + file.numEntries(), + file.size(), + decodeRange(file.smallestKey(), file.largestKey()) + ) + ); + } + + private static LLRange decodeRange(byte[] smallestKey, byte[] largestKey) { + return LLRange.of(Buf.wrap(smallestKey), Buf.wrap(largestKey)); + } + + public RocksDBFileMetadata getMetadata() { + return metadata; + } + + public record VerificationFileRange(String filePath, String filename, @Nullable LLRange range, long countEstimate, + @Nullable Long sstNumber) {} + + public Stream verify(String databaseDisplayName, String columnDisplayName, LLRange rangeFull) { + var intersectedRange = LLRange.intersect(metadata.keysRange(), rangeFull); + // Ignore the file if it's outside the requested range + if (intersectedRange == null) { + return Stream.of(); + } + + String filePath = Path.of(metadata.path()).resolve("./" + metadata.fileName()).normalize().toString(); + var fr = new VerificationFileRange(filePath, + metadata.fileName().replace("/", ""), + intersectedRange, + metadata.numEntries(), + sstNumber + ); + return verify(databaseDisplayName, columnDisplayName, rangeFull, fr); + } + + private Stream verify(String databaseDisplayName, String columnDisplayName, LLRange rangeFull, VerificationFileRange fr) { + var columnObj = Column.of(columnDisplayName); + Stream streamInit = Stream.of(new FileStart(databaseDisplayName, columnObj, fr.filePath, fr.countEstimate > 0 ? fr.countEstimate : null)); + Stream streamContent; + Objects.requireNonNull(fr.range); + + String filename = fr.filename; + String path = fr.filePath; + LLRange rangePartial = fr.range; + AtomicLong fileScanned = new AtomicLong(); + final long fileEstimate = fr.countEstimate; + AtomicBoolean mustSeek = new AtomicBoolean(true); + AtomicBoolean streamEnded = new AtomicBoolean(false); + streamContent = resourceStream( + () -> LLUtils.generateCustomReadOptions(null, false, isBoundedRange(rangePartial), false), + ro -> { + ro.setIterateLowerBound(rangePartial.getMin() != null ? requireNonNull(LLUtils.asArray(rangePartial.getMin())) : null); + ro.setIterateUpperBound(rangePartial.getMax() != null ? requireNonNull(LLUtils.asArray(rangePartial.getMax())) : null); + ro.setFillCache(false); + if (!rangePartial.isSingle()) { + if (LLUtils.MANUAL_READAHEAD) { + ro.setReadaheadSize(32 * 1024); + } + } + ro.setVerifyChecksums(true); + return resourceStream(() -> ro.newIterator(db, cfh, IteratorMetrics.NO_OP), rocksIterator -> StreamUtils + .>streamWhileNonNull(() -> { + boolean mustSeekVal = mustSeek.compareAndSet(true, false); + if (!mustSeekVal && !rocksIterator.isValid()) { + if (streamEnded.compareAndSet(false, true)) { + return Optional.of(new FileOk(databaseDisplayName, columnObj, path, fileScanned.get())); + } else { + //noinspection OptionalAssignedToNull + return null; + } + } + boolean shouldSendStatus; + Buf rawKey = null; + try { + if (mustSeekVal) { + if (PRINT_ALL_CHECKSUM_VERIFICATION_STEPS) { + logger.info("Seeking to {}->{}->first on file {}", databaseDisplayName, columnDisplayName, filename); + } + rocksIterator.seekToFirst(); + shouldSendStatus = true; + } else { + rawKey = rocksIterator.keyBuf().copy(); + shouldSendStatus = fileScanned.incrementAndGet() % 1_000_000 == 0; + if (PRINT_ALL_CHECKSUM_VERIFICATION_STEPS) { + logger.info("Checking {}->{}->{} on file {}", databaseDisplayName, columnDisplayName, rawKey.toString(), filename); + } + rocksIterator.next(); + } + } catch (RocksDBException ex) { + return Optional.of(new BlockBad(databaseDisplayName, columnObj, rawKey, path, ex)); + } + if (shouldSendStatus) { + long fileScannedVal = fileScanned.get(); + return Optional.of(new Progress(databaseDisplayName, + columnObj, + path, + -1, + -1, + fileScannedVal, + Math.max(fileEstimate, fileScannedVal) + )); + } else { + return Optional.empty(); + } + }).filter(Optional::isPresent).map(Optional::get).onClose(() -> { + rocksIterator.close(); + ro.close(); + })); + } + ); + return Stream.concat(streamInit, streamContent); + } + + @Override + public String toString() { + return new StringJoiner(", ", RocksDBFile.class.getSimpleName() + "[", "]") + .add("fileMetadata=" + metadata) + .toString(); + } + + @Override + public int compareTo(@NotNull RocksDBFile o) { + if (this.sstNumber == null && o.sstNumber == null) { + return 0; + } else if (this.sstNumber == null) { + return 1; + } else if (o.sstNumber == null) { + return -1; + } + return Long.compare(this.sstNumber, o.sstNumber); + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBFileMetadata.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBFileMetadata.java new file mode 100644 index 0000000..34e7b9c --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBFileMetadata.java @@ -0,0 +1,8 @@ +package it.cavallium.dbengine.database.disk; + +import it.cavallium.dbengine.database.LLRange; + +public record RocksDBFileMetadata(String path, String fileName, int level, String columnName, long numEntries, long size, + LLRange keysRange) { + +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBUtils.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBUtils.java index 05afaa6..f47fb9f 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBUtils.java @@ -26,15 +26,15 @@ public class RocksDBUtils { return db.numberLevels(cfh); } - public static List getColumnFiles(RocksDB db, ColumnFamilyHandle cfh, boolean excludeLastLevel) { - List files = new ArrayList<>(); + public static List getColumnFiles(RocksDB db, ColumnFamilyHandle cfh, boolean excludeLastLevel) { + List files = new ArrayList<>(); var meta = db.getColumnFamilyMetaData(cfh); var lastLevelId = excludeLastLevel ? (getLevels(db, cfh) - 1) : -1; for (LevelMetaData level : meta.levels()) { if (!excludeLastLevel || level.level() < lastLevelId) { for (SstFileMetaData file : level.files()) { if (file.fileName().endsWith(".sst")) { - files.add(file.fileName()); + files.add(new RocksDBFile(db, cfh, file, meta.name(), level.level())); } } } @@ -51,18 +51,18 @@ public class RocksDBUtils { .setCompression(CompressionType.LZ4_COMPRESSION) .setMaxSubcompactions(0) .setOutputFileSizeLimit(2 * SizeUnit.GB)) { - List filesToCompact = getColumnFiles(db, cfh, true); + var filesToCompact = getColumnFiles(db, cfh, true); if (!filesToCompact.isEmpty()) { var partitionSize = filesToCompact.size() / Runtime.getRuntime().availableProcessors(); - List> partitions; + List> partitions; if (partitionSize > 0) { partitions = partition(filesToCompact, partitionSize); } else { partitions = List.of(filesToCompact); } int finalBottommostLevelId = getLevels(db, cfh) - 1; - for (List partition : partitions) { + for (List partition : partitions) { logger.info("Compacting {} files in database {} in column family {} to level {}", partition.size(), logDbName, @@ -72,7 +72,8 @@ public class RocksDBUtils { if (!partition.isEmpty()) { var coi = new CompactionJobInfo(); try { - db.compactFiles(co, cfh, partition, finalBottommostLevelId, volumeId, coi); + var partitionFileNames = partition.stream().map(x -> x.getMetadata().fileName()).toList(); + db.compactFiles(co, cfh, partitionFileNames, finalBottommostLevelId, volumeId, coi); logger.info("Compacted {} files in database {} in column family {} to level {}: {}", partition.size(), logDbName, diff --git a/src/repair/java/it/cavallium/dbengine/repair/DataSize.java b/src/repair/java/it/cavallium/dbengine/repair/DataSize.java new file mode 100644 index 0000000..b297ce5 --- /dev/null +++ b/src/repair/java/it/cavallium/dbengine/repair/DataSize.java @@ -0,0 +1,194 @@ +package it.cavallium.dbengine.repair; + +import java.io.Serial; +import java.text.CharacterIterator; +import java.text.StringCharacterIterator; +import org.jetbrains.annotations.NotNull; + +@SuppressWarnings("unused") +public final class DataSize extends Number implements Comparable { + + @Serial + private static final long serialVersionUID = 7213411239846723568L; + + public static DataSize ZERO = new DataSize(0L); + public static DataSize ONE = new DataSize(1L); + public static DataSize KIB = new DataSize(1024L); + public static DataSize KB = new DataSize(1000L); + public static DataSize MIB = new DataSize(1024L * 1024); + public static DataSize MB = new DataSize(1000L * 1000); + public static DataSize GIB = new DataSize(1024L * 1024 * 1024); + public static DataSize GB = new DataSize(1000L * 1000 * 1000); + public static DataSize TIB = new DataSize(1024L * 1024 * 1024 * 1024); + public static DataSize TB = new DataSize(1000L * 1000 * 1000 * 1024); + public static DataSize PIB = new DataSize(1024L * 1024 * 1024 * 1024 * 1024); + public static DataSize PB = new DataSize(1000L * 1000 * 1000 * 1024 * 1024); + public static DataSize EIB = new DataSize(1024L * 1024 * 1024 * 1024 * 1024 * 1024); + public static DataSize EB = new DataSize(1000L * 1000 * 1000 * 1024 * 1024 * 1024); + public static DataSize MAX_VALUE = new DataSize(Long.MAX_VALUE); + + private final long size; + + public DataSize(long size) { + this.size = size; + } + + public DataSize(String size) { + size = size.replaceAll("\\s|_", ""); + switch (size) { + case "", "0", "-0", "+0" -> { + this.size = 0; + return; + } + case "∞", "inf", "infinite", "∞b" -> { + this.size = Long.MAX_VALUE; + return; + } + } + int numberStartOffset = 0; + int numberEndOffset = 0; + boolean negative = false; + { + boolean firstChar = true; + boolean numberMode = true; + for (char c : size.toCharArray()) { + if (c == '-') { + if (firstChar) { + negative = true; + numberStartOffset++; + numberEndOffset++; + } else { + throw new IllegalArgumentException("Found a minus character after index 0"); + } + } else if (Character.isDigit(c)) { + if (numberMode) { + numberEndOffset++; + } else { + throw new IllegalArgumentException("Found a number after the unit"); + } + } else if (Character.isLetter(c)) { + if (numberEndOffset - numberStartOffset <= 0) { + throw new IllegalArgumentException("No number found"); + } + if (numberMode) { + numberMode = false; + } + } else { + throw new IllegalArgumentException("Unsupported character"); + } + if (firstChar) { + firstChar = false; + } + } + } + var number = Long.parseUnsignedLong(size, numberStartOffset, numberEndOffset, 10); + if (numberEndOffset == size.length()) { + // No measurement + this.size = (negative ? -1 : 1) * number; + return; + } + // Measurements are like B, MB, or MiB, not longer + if (size.length() - numberEndOffset > 3) { + throw new IllegalArgumentException("Wrong measurement unit"); + } + var scaleChar = size.charAt(numberEndOffset); + boolean powerOf2 = numberEndOffset + 1 < size.length() && size.charAt(numberEndOffset + 1) == 'i'; + int k = powerOf2 ? 1024 : 1000; + var scale = switch (scaleChar) { + case 'B' -> 1; + case 'b' -> throw new IllegalArgumentException("Bits are not allowed"); + case 'K', 'k' -> k; + case 'M', 'm' -> k * k; + case 'G', 'g' -> k * k * k; + case 'T', 't' -> k * k * k * k; + case 'P', 'p' -> k * k * k * k * k; + case 'E', 'e' -> k * k * k * k * k * k; + case 'Z', 'z' -> k * k * k * k * k * k * k; + case 'Y', 'y' -> k * k * k * k * k * k * k * k; + default -> throw new IllegalStateException("Unexpected value: " + scaleChar); + }; + // if scale is 1, the unit should be "B", nothing more + if (scale == 1 && numberEndOffset + 1 != size.length()) { + throw new IllegalArgumentException("Invalid unit"); + } + this.size = (negative ? -1 : 1) * number * scale; + } + + public static Long get(DataSize value) { + if (value == null) { + return null; + } else { + return value.size; + } + } + + public static long getOrElse(DataSize value, @NotNull DataSize defaultValue) { + if (value == null) { + return defaultValue.size; + } else { + return value.size; + } + } + + + @Override + public int intValue() { + if (size >= Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } + return (int) size; + } + + @Override + public long longValue() { + return size; + } + + @Override + public float floatValue() { + return size; + } + + @Override + public double doubleValue() { + return size; + } + + @Override + public String toString() { + return toString(true); + } + + public String toString(boolean precise) { + boolean siUnits = size % 1000 == 0; + int k = siUnits ? 1000 : 1024; + long lSize = size; + CharacterIterator ci = new StringCharacterIterator((siUnits ? "k" : "K") + "MGTPEZY"); + while ((precise ? lSize % k == 0 : lSize > k) && lSize != 0) { + lSize /= k; + ci.next(); + } + if (lSize == size) { + return lSize + "B"; + } + return lSize + "" + ci.previous() + (siUnits ? "B" : "iB"); + } + + @Override + public int compareTo(@NotNull DataSize anotherLong) { + return Long.compare(this.size, anotherLong.size); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof DataSize) { + return size == ((DataSize)obj).size; + } + return false; + } + + @Override + public int hashCode() { + return Long.hashCode(size); + } +} diff --git a/src/repair/java/it/cavallium/dbengine/repair/Repair.java b/src/repair/java/it/cavallium/dbengine/repair/Repair.java index a64caa5..45dde05 100644 --- a/src/repair/java/it/cavallium/dbengine/repair/Repair.java +++ b/src/repair/java/it/cavallium/dbengine/repair/Repair.java @@ -1,24 +1,23 @@ package it.cavallium.dbengine.repair; -import io.micrometer.core.instrument.Clock; -import io.micrometer.core.instrument.Meter; import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.logging.LoggingMeterRegistry; -import io.micrometer.core.instrument.noop.NoopMeter; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import it.cavallium.datagen.nativedata.NullableString; import it.cavallium.datagen.nativedata.Nullableboolean; import it.cavallium.datagen.nativedata.Nullableint; import it.cavallium.datagen.nativedata.Nullablelong; import it.cavallium.dbengine.client.Compression; +import it.cavallium.dbengine.client.VerificationProgress; import it.cavallium.dbengine.client.VerificationProgress.BlockBad; import it.cavallium.dbengine.client.VerificationProgress.FileOk; +import it.cavallium.dbengine.client.VerificationProgress.FileStart; import it.cavallium.dbengine.client.VerificationProgress.Progress; import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection; import it.cavallium.dbengine.database.disk.LLLocalDictionary; import it.cavallium.dbengine.database.disk.LLLocalKeyValueDatabase; +import it.cavallium.dbengine.database.disk.RocksDBFile; import it.cavallium.dbengine.rpc.current.data.Column; import it.cavallium.dbengine.rpc.current.data.DatabaseOptionsBuilder; import it.cavallium.dbengine.rpc.current.data.DatabaseVolume; @@ -26,18 +25,22 @@ import it.cavallium.dbengine.rpc.current.data.DefaultColumnOptions; import it.cavallium.dbengine.rpc.current.data.NamedColumnOptions; import it.cavallium.dbengine.rpc.current.data.nullables.NullableCompression; import it.cavallium.dbengine.rpc.current.data.nullables.NullableFilter; -import it.cavallium.dbengine.utils.StreamUtils; import it.unimi.dsi.fastutil.objects.ObjectArrayList; import it.unimi.dsi.fastutil.objects.ObjectList; +import java.io.BufferedWriter; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.util.Base64; +import java.util.Comparator; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.rocksdb.InfoLogLevel; import org.rocksdb.Options; import org.rocksdb.RocksDB; @@ -47,7 +50,7 @@ public class Repair { public static final MeterRegistry METER = new SimpleMeterRegistry(); // LoggingMeterRegistry.builder(key -> null).clock(Clock.SYSTEM).loggingSink(System.err::println).build(); - public static void main(String[] argsArray) throws RocksDBException { + public static void main(String[] argsArray) throws RocksDBException, IOException { System.setProperty("it.cavallium.dbengine.checks.compression", "true"); System.setProperty("it.cavallium.dbengine.checks.paranoid", "true"); System.setProperty("it.cavallium.dbengine.checks.filesize", "true"); @@ -62,7 +65,7 @@ public class Repair { args = args.subList(1, args.size()); switch (command.toLowerCase(Locale.ROOT)) { case "list-column-families" -> { - if (args.size() < 3) { + if (args.size() != 2) { printHelp(initialArgs); } Path path = Path.of(args.get(0)); @@ -80,41 +83,87 @@ public class Repair { var conn = new LLLocalDatabaseConnection(METER, path, false); conn.connect(); LLLocalKeyValueDatabase db = getDatabase(conn, dbName, columnNames); - db.getAllColumnFamilyHandles().forEach((column, cfh) -> { - System.err.printf("Scanning column: %s%n", column.name()); - LLLocalDictionary dict = db.getDictionary(column.name().getBytes(StandardCharsets.UTF_8), UpdateMode.DISALLOW); - StreamUtils.collectOn(StreamUtils.ROCKSDB_POOL, dict.verifyChecksum(LLRange.all()), StreamUtils.executing(block -> { - synchronized (Repair.class) { - switch (block) { - case null -> {} - case BlockBad blockBad -> { - System.err.println("[ ! ] Bad block found: " + block.databaseName() + (block.column() != null ? "->" + block.column().name() : "") + "->" + blockBad.rawKey() + "->" + block.file()); - if (blockBad.ex() != null) { - if (blockBad.ex() instanceof RocksDBException ex) { - System.err.println("\t" + ex); - } else { - blockBad.ex().printStackTrace(System.err); - } - } + Path cwd = Path.of("."); + String errorsFileNamePrefix = "errors-" + dbName; + String errorsFileNameSuffix = ".log"; + Path errorsFile; + if (Files.isWritable(cwd)) { + errorsFile = cwd.resolve(errorsFileNamePrefix + "-" + System.currentTimeMillis() + errorsFileNameSuffix); + } else { + errorsFile = Files.createTempFile(errorsFileNamePrefix, errorsFileNameSuffix); + } + try (var os = Files.newBufferedWriter(errorsFile, + StandardCharsets.UTF_8, + StandardOpenOption.CREATE, + StandardOpenOption.TRUNCATE_EXISTING, + StandardOpenOption.DSYNC + )) { + db.getAllColumnFamilyHandles().forEach((column, cfh) -> { + System.err.printf("Scanning column: %s%n", column.name()); + LLLocalDictionary dict = db.getDictionary(column.name().getBytes(StandardCharsets.UTF_8), UpdateMode.DISALLOW); + consumeVerification(os, dict.verifyChecksum(LLRange.all())); + }); + } + } + case "scan-files" -> { + if (args.size() < 3) { + printHelp(initialArgs); + } + Path path = Path.of(args.get(0)).toAbsolutePath(); + String dbName = args.get(1); + List fileNames = args.subList(2, args.size()); + List columnNames = getColumnFamilyNames(path, dbName).toList(); + System.err.printf("Scanning database \"%s\" at \"%s\", files:%n%s%n", dbName, path, String.join("\n", fileNames)); + var conn = new LLLocalDatabaseConnection(METER, path, false); + conn.connect(); + LLLocalKeyValueDatabase db = getDatabase(conn, dbName, columnNames); + Path cwd = Path.of("."); + String errorsFileNamePrefix = "errors-" + dbName; + String errorsFileNameSuffix = ".log"; + Path errorsFile; + if (Files.isWritable(cwd)) { + errorsFile = cwd.resolve(errorsFileNamePrefix + "-" + System.currentTimeMillis() + errorsFileNameSuffix); + } else { + errorsFile = Files.createTempFile(errorsFileNamePrefix, errorsFileNameSuffix); + } + try (var os = Files.newBufferedWriter(errorsFile, + StandardCharsets.UTF_8, + StandardOpenOption.CREATE, + StandardOpenOption.TRUNCATE_EXISTING, + StandardOpenOption.DSYNC + )) { + consumeVerification(os, db.getAllLiveFiles() + .filter(file -> { + if (fileNames.contains(file.getMetadata().fileName())) { + return true; + } else { + System.err.printf("Ignoring file: \"%s\"%n", file.getMetadata().fileName()); + return false; } - case FileOk fileOk -> { - System.err.println("File is ok: " + block.databaseName() + (block.column() != null ? "->" + block.column().name() : "") + "->" + block.file()); - } - case Progress progress -> { - System.err.printf("Progress: [%d/%d] file: [%d/%d] %s%s->%s%n", - progress.scanned(), - progress.total(), - progress.fileScanned(), - progress.fileTotal(), - block.databaseName(), - block.column() != null ? "->" + block.column().name() : "", - block.file() - ); - } - default -> throw new IllegalStateException("Unexpected value: " + block); - } - } - })); + }) + .flatMap(file -> file.verify(dbName, "any", LLRange.all()))); + } + } + case "list-files" -> { + if (args.size() != 2) { + printHelp(initialArgs); + } + Path path = Path.of(args.get(0)).toAbsolutePath(); + String dbName = args.get(1); + List columnNames = getColumnFamilyNames(path, dbName).toList(); + System.err.printf("Getting files of database \"%s\" at \"%s\"%n", dbName, path); + System.out.println("Name \tPath \tColumn name \tKeys range \tSize \tEntries \tLevel"); + var conn = new LLLocalDatabaseConnection(METER, path, false); + conn.connect(); + LLLocalKeyValueDatabase db = getDatabase(conn, dbName, columnNames); + db.getAllLiveFiles().sorted(Comparator + .comparingInt(x -> x.getMetadata().level()) + .thenComparingLong(x -> x.getMetadata().size()) + .thenComparingLong(x -> x.getMetadata().numEntries()) + .thenComparing(x -> x.getMetadata().fileName()) + ).forEach(file -> { + var meta = file.getMetadata(); + System.out.printf("%s\t%s\t%s\t%s\t%s\t%d\t%d%n", meta.fileName(), meta.path(), meta.columnName(), meta.keysRange(), new DataSize(meta.size()).toString(false), meta.numEntries(), meta.level()); }); } case "dump-all" -> { @@ -175,6 +224,59 @@ public class Repair { System.exit(0); } + private static void consumeVerification(BufferedWriter os, Stream verificationProgressStream) { + var showProgress = !Boolean.getBoolean("it.cavallium.dbengine.repair.hideprogress"); + verificationProgressStream.parallel().forEach(block -> { + synchronized (Repair.class) { + switch (block) { + case null -> {} + case BlockBad blockBad -> { + StringBuilder errorLineBuilder = new StringBuilder() + .append("File scan ended with error, bad block found: ") + .append(block.databaseName()) + .append(block.column() != null ? "->" + block.column().name() : "") + .append("->") + .append(blockBad.rawKey()) + .append("->") + .append(block.file()); + if (blockBad.ex() != null) { + if (blockBad.ex() instanceof RocksDBException ex) { + errorLineBuilder.append("\n\t").append(ex); + } else { + errorLineBuilder.append("\n").append(ExceptionUtils.getStackTrace(blockBad.ex())); + } + } + String errorLine = errorLineBuilder.toString(); + System.err.println("[ ! ] " + errorLine); + try { + os.write(errorLine); + os.flush(); + } catch (IOException e) { + System.err.println("Can't write to errors file: " + e); + } + } + case FileOk ignored -> + System.err.println("File scan ended with success: " + block.databaseName() + (block.column() != null ? "->" + block.column().name() : "") + "->" + block.file()); + case FileStart ignored -> + System.err.println("File scan begin: " + block.databaseName() + (block.column() != null ? "->" + block.column().name() : "") + "->" + block.file()); + case Progress progress -> { + if (showProgress) { + System.err.printf("Progress: %s[%d/%d] %s%s->%s%n", + progress.total() != -1 ? "[%d/%d] file: ".formatted(progress.scanned(), progress.total()) : "", + progress.fileScanned(), + progress.fileTotal(), + block.databaseName(), + block.column() != null ? "->" + block.column().name() : "", + block.file() + ); + } + } + default -> throw new IllegalStateException("Unexpected value: " + block); + } + } + }); + } + private static Stream getColumnFamilyNames(Path path, String dbName) throws RocksDBException { String dbPath = path.resolve("database_" + dbName).toString(); System.err.printf("Listing column families of database: %s%n", dbPath); @@ -249,6 +351,8 @@ public class Repair { or: repair dump-all DIRECTORY DB_NAME COLUMN_NAME... or: repair verify-checksum DIRECTORY DB_NAME or: repair list-column-families DIRECTORY DB_NAME + or: repair scan-files DIRECTORY DB_NAME FILE-NAME... + or: repair list-files DIRECTORY DB_NAME """); System.exit(1); }