diff --git a/pom.xml b/pom.xml
index 7e41226..e7f9b65 100644
--- a/pom.xml
+++ b/pom.xml
@@ -574,6 +574,9 @@
it.cavallium.dbengine.repair.Repair
+
+ true
+
diff --git a/src/main/java/it/cavallium/dbengine/client/VerificationProgress.java b/src/main/java/it/cavallium/dbengine/client/VerificationProgress.java
index 22f2da8..0ee0d4e 100644
--- a/src/main/java/it/cavallium/dbengine/client/VerificationProgress.java
+++ b/src/main/java/it/cavallium/dbengine/client/VerificationProgress.java
@@ -7,7 +7,9 @@ import org.jetbrains.annotations.Nullable;
public sealed interface VerificationProgress {
record BlockBad(String databaseName, Column column, Buf rawKey, String file, Throwable ex)
implements VerificationProgress {}
- record FileOk(String databaseName, Column column, String file)
+ record FileStart(String databaseName, Column column, String file, @Nullable Long numEntriesEstimate)
+ implements VerificationProgress {}
+ record FileOk(String databaseName, Column column, String file, long scanned)
implements VerificationProgress {}
record Progress(String databaseName, Column column, String file,
long scanned, long total,
diff --git a/src/main/java/it/cavallium/dbengine/database/LLRange.java b/src/main/java/it/cavallium/dbengine/database/LLRange.java
index defb906..300e63a 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLRange.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLRange.java
@@ -1,9 +1,7 @@
package it.cavallium.dbengine.database;
import it.cavallium.buffer.Buf;
-import it.cavallium.dbengine.database.disk.LiveFileMetadata;
import java.util.Objects;
-import java.util.StringJoiner;
import org.jetbrains.annotations.Nullable;
/**
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java
index e453415..d9cd1a4 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java
@@ -560,12 +560,18 @@ public sealed abstract class AbstractRocksDBColumn implements
}
@Override
- public Stream getAllLiveFiles() throws RocksDBException {
- byte[] cfhName = cfh.getName();
- return db.getLiveFilesMetaData().stream()
- .filter(file -> Arrays.equals(cfhName, file.columnFamilyName()))
- .map(file -> new LiveFileMetadata(file.path(), file.fileName(), file.level(), columnName,
- file.numEntries(),file.size(), LLRange.of(Buf.wrap(file.smallestKey()), Buf.wrap(file.largestKey()))));
+ public Stream getAllLiveFiles() throws RocksDBException {
+ var closeReadLock = closeLock.readLock();
+ try {
+ ensureOpen();
+ db.getLiveFiles(); // flushes the memtable
+ byte[] cfhName = cfh.getName();
+ return db.getLiveFilesMetaData().stream()
+ .filter(file -> Arrays.equals(cfhName, file.columnFamilyName()))
+ .map(file -> new RocksDBFile(db, cfh, file));
+ } finally {
+ closeLock.unlockRead(closeReadLock);
+ }
}
protected int getLevels() {
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/IteratorMetrics.java b/src/main/java/it/cavallium/dbengine/database/disk/IteratorMetrics.java
index 090e4cb..04ac534 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/IteratorMetrics.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/IteratorMetrics.java
@@ -1,7 +1,22 @@
package it.cavallium.dbengine.database.disk;
import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.Meter.Id;
+import io.micrometer.core.instrument.Meter.Type;
+import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
+import io.micrometer.core.instrument.noop.NoopCounter;
+import io.micrometer.core.instrument.noop.NoopTimer;
public record IteratorMetrics(Counter startedIterSeek, Counter endedIterSeek, Timer iterSeekTime,
- Counter startedIterNext, Counter endedIterNext, Timer iterNextTime) {}
+ Counter startedIterNext, Counter endedIterNext, Timer iterNextTime) {
+
+ public static final IteratorMetrics NO_OP = new IteratorMetrics(
+ new NoopCounter(new Id("no-op", Tags.empty(), null, null, Type.COUNTER)),
+ new NoopCounter(new Id("no-op", Tags.empty(), null, null, Type.COUNTER)),
+ new NoopTimer(new Id("no-op", Tags.empty(), null, null, Type.TIMER)),
+ new NoopCounter(new Id("no-op", Tags.empty(), null, null, Type.COUNTER)),
+ new NoopCounter(new Id("no-op", Tags.empty(), null, null, Type.COUNTER)),
+ new NoopTimer(new Id("no-op", Tags.empty(), null, null, Type.TIMER))
+ );
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java
index 55b94b1..5924f78 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java
@@ -14,13 +14,16 @@ import static it.cavallium.dbengine.utils.StreamUtils.resourceStream;
import static it.cavallium.dbengine.utils.StreamUtils.streamWhileNonNull;
import static java.util.Objects.requireNonNull;
import static it.cavallium.dbengine.utils.StreamUtils.batches;
+import static java.util.Objects.requireNonNullElse;
+import com.google.common.primitives.Longs;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import it.cavallium.buffer.Buf;
import it.cavallium.dbengine.client.VerificationProgress;
import it.cavallium.dbengine.client.VerificationProgress.BlockBad;
import it.cavallium.dbengine.client.VerificationProgress.FileOk;
+import it.cavallium.dbengine.client.VerificationProgress.FileStart;
import it.cavallium.dbengine.client.VerificationProgress.Progress;
import it.cavallium.dbengine.database.ColumnUtils;
import it.cavallium.dbengine.database.LLDelta;
@@ -49,7 +52,9 @@ import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionException;
@@ -643,113 +648,43 @@ public class LLLocalDictionary implements LLDictionary {
public Stream verifyChecksum(LLRange rangeFull) {
Set brokenFiles = new ConcurrentHashMap().keySet(true);
LongAdder totalScanned = new LongAdder();
- AtomicLong totalEstimate = new AtomicLong();
- Set filesToSkip = Stream
- .of(System.getProperty("filenames.skip.list", "").split(","))
- .filter(x -> !x.isBlank())
- .map(String::trim)
- .collect(Collectors.toUnmodifiableSet());
+ long totalEstimate;
- try {
- totalEstimate.set(db.getNumEntries());
- } catch (Throwable ex) {
- logger.warn("Failed to get total entries count", ex);
+ {
+ long totalEstimateTmp = 0;
+ try {
+ totalEstimateTmp = db.getNumEntries();
+ } catch (Throwable ex) {
+ logger.warn("Failed to get total entries count", ex);
+ }
+ totalEstimate = totalEstimateTmp;
}
Column column = ColumnUtils.special(columnName);
try {
- record FileRange(String filePath, String filename, @Nullable LLRange range, long countEstimate) {}
- return db.getAllLiveFiles()
- .map(metadata -> new FileRange(Path.of(metadata.path()).resolve("./" + metadata.fileName()).normalize().toString(),
- metadata.fileName().replace("/", ""),
- LLRange.intersect(metadata.keysRange(), rangeFull),
- metadata.numEntries()
- ))
- .filter(fr -> fr.range != null)
- // Skip some files
- .filter(fr -> !filesToSkip.contains(fr.filename))
- .parallel()
- .flatMap(fr -> {
- String filename = fr.filename;
- String path = fr.filePath;
- LLRange rangePartial = fr.range;
- AtomicLong fileScanned = new AtomicLong();
- final long fileEstimate = fr.countEstimate;
- AtomicBoolean streamStarted = new AtomicBoolean(false);
- AtomicBoolean streamStarted2 = new AtomicBoolean(false);
- AtomicBoolean streamEnded = new AtomicBoolean(false);
- totalScanned.add(fileEstimate);
- try {
- return resourceStream(
- () -> LLUtils.generateCustomReadOptions(null, false, isBoundedRange(rangePartial), false),
- ro -> {
- ro.setFillCache(false);
- if (!rangePartial.isSingle()) {
- if (LLUtils.MANUAL_READAHEAD) {
- ro.setReadaheadSize(32 * 1024);
- }
- }
- ro.setVerifyChecksums(true);
- return resourceStream(() -> db.newRocksIterator(ro, rangePartial, false), rocksIterator -> {
- return StreamUtils.>streamWhileNonNull(() -> {
- boolean first = streamStarted.compareAndSet(false, true);
- boolean second = !first && streamStarted.compareAndSet(false, true);
- if (!first && !rocksIterator.isValid()) {
- if (streamEnded.compareAndSet(false, true)) {
- totalScanned.add(fileScanned.get() - fileEstimate);
- return Optional.of(new FileOk(databaseName, column, path));
- } else {
- //noinspection OptionalAssignedToNull
- return null;
- }
- }
- boolean shouldSendStatus;
- Buf rawKey = null;
- try {
- if (second) {
- if (PRINT_ALL_CHECKSUM_VERIFICATION_STEPS) {
- logger.info("Seeking to {}->{}->first on file {}", databaseName, column.name(), filename);
- }
- rocksIterator.seekToFirst();
- }
- if (first) {
- shouldSendStatus = true;
- } else {
- rawKey = rocksIterator.keyBuf().copy();
- shouldSendStatus = fileScanned.incrementAndGet() % 1_000_000 == 0;
- if (PRINT_ALL_CHECKSUM_VERIFICATION_STEPS) {
- logger.info("Checking {}->{}->{} on file {}", databaseName, column.name(), rawKey.toString(), filename);
- }
- rocksIterator.next();
- }
- } catch (RocksDBException ex) {
- return Optional.of(new BlockBad(databaseName, column, rawKey, path, ex));
- }
- if (shouldSendStatus) {
- long totalScannedValue = totalScanned.sum();
- long fileScannedVal = fileScanned.get();
- return Optional.of(new Progress(databaseName,
- column,
- path,
- totalScannedValue,
- Math.max(totalEstimate.get(), totalScannedValue),
- fileScannedVal,
- Math.max(fileEstimate, fileScannedVal)
- ));
- } else {
- return Optional.empty();
- }
- }).filter(Optional::isPresent).map(Optional::get).onClose(() -> {
- rocksIterator.close();
- ro.close();
- });
- });
- }
- );
- } catch (RocksDBException e) {
- return Stream.of(new BlockBad(databaseName, column, null, path, e));
+ var liveFiles = db.getAllLiveFiles().toList();
+ var liveFilesCount = liveFiles.size();
+ ConcurrentHashMap fileToInitialEstimate = new ConcurrentHashMap<>();
+
+ return liveFiles.stream()
+ .sorted(Comparator.reverseOrder())
+ .flatMap(fr -> fr.verify(databaseName, columnName, rangeFull))
+ .map(status -> switch (status) {
+ case FileStart fileStart -> {
+ Long numEntriesEstimate = Objects.requireNonNullElse(fileStart.numEntriesEstimate(), totalEstimate / liveFilesCount);
+ totalScanned.add(numEntriesEstimate);
+ fileToInitialEstimate.put(fileStart.file(), numEntriesEstimate);
+ yield status;
}
+ case FileOk fileEnd -> {
+ long initialNumEntriesEstimate = Objects.requireNonNullElse(fileToInitialEstimate.remove(fileEnd.file()), 0L);
+ long numEntriesScanned = fileEnd.scanned();
+ totalScanned.add(numEntriesScanned - initialNumEntriesEstimate);
+ yield status;
+ }
+ case Progress progress -> new Progress(progress.databaseName(), progress.column(), progress.file(), totalScanned.longValue(), totalEstimate, progress.fileScanned(), progress.fileTotal());
+ default -> status;
})
.filter(err -> !(err instanceof BlockBad blockBad && blockBad.rawKey() == null && !brokenFiles.add(blockBad.file())));
} catch (RocksDBException e) {
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java
index 34c428d..518800c 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java
@@ -80,6 +80,7 @@ import org.rocksdb.FlushOptions;
import org.rocksdb.IndexType;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.IngestExternalFileOptions;
+import org.rocksdb.LiveFileMetaData;
import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.PersistentCache;
import org.rocksdb.PlainTableConfig;
@@ -622,7 +623,23 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa
}
}
- public List getColumnFiles(Column column, boolean excludeLastLevel) {
+ public Stream getAllLiveFiles() throws RocksDBException {
+ var closeReadLock = closeLock.readLock();
+ try {
+ ensureOpen();
+ db.getLiveFiles(); // flushes the memtable
+ var liveFilesMetadata = db.getLiveFilesMetaData();
+ List files = new ArrayList<>();
+ for (LiveFileMetaData file : liveFilesMetadata) {
+ files.add(new RocksDBFile(db, getCfh(file.columnFamilyName()), file));
+ }
+ return files.stream();
+ } finally {
+ closeLock.unlockRead(closeReadLock);
+ }
+ }
+
+ public List getColumnFiles(Column column, boolean excludeLastLevel) {
var closeReadLock = closeLock.readLock();
try {
ensureOpen();
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LiveFileMetadata.java b/src/main/java/it/cavallium/dbengine/database/disk/LiveFileMetadata.java
deleted file mode 100644
index ae1e6bd..0000000
--- a/src/main/java/it/cavallium/dbengine/database/disk/LiveFileMetadata.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package it.cavallium.dbengine.database.disk;
-
-import it.cavallium.dbengine.database.LLRange;
-
-public record LiveFileMetadata(String path, String fileName, int level, String columnName, long numEntries, long size,
- LLRange keysRange) {
-
-}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java
index 4be3751..0da1164 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java
@@ -54,7 +54,7 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn {
@NotNull RocksIteratorObj newIterator(@NotNull LLReadOptions readOptions, @Nullable Buf min, @Nullable Buf max);
- Stream getAllLiveFiles() throws RocksDBException;
+ Stream getAllLiveFiles() throws RocksDBException;
@NotNull UpdateAtomicResult updateAtomic(@NotNull LLReadOptions readOptions,
@NotNull LLWriteOptions writeOptions,
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBFile.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBFile.java
new file mode 100644
index 0000000..c335f14
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBFile.java
@@ -0,0 +1,215 @@
+package it.cavallium.dbengine.database.disk;
+
+import static it.cavallium.dbengine.database.LLUtils.isBoundedRange;
+import static it.cavallium.dbengine.database.disk.LLLocalKeyValueDatabase.PRINT_ALL_CHECKSUM_VERIFICATION_STEPS;
+import static it.cavallium.dbengine.utils.StreamUtils.resourceStream;
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.primitives.Longs;
+import it.cavallium.buffer.Buf;
+import it.cavallium.dbengine.client.VerificationProgress;
+import it.cavallium.dbengine.client.VerificationProgress.BlockBad;
+import it.cavallium.dbengine.client.VerificationProgress.FileOk;
+import it.cavallium.dbengine.client.VerificationProgress.FileStart;
+import it.cavallium.dbengine.client.VerificationProgress.Progress;
+import it.cavallium.dbengine.database.LLRange;
+import it.cavallium.dbengine.database.LLUtils;
+import it.cavallium.dbengine.database.disk.rocksdb.LLReadOptions;
+import it.cavallium.dbengine.rpc.current.data.Column;
+import it.cavallium.dbengine.utils.StreamUtils;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.LiveFileMetaData;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.SstFileMetaData;
+
+public class RocksDBFile implements Comparable {
+
+ protected static final Logger logger = LogManager.getLogger(RocksDBFile.class);
+
+ private final RocksDB db;
+ private final ColumnFamilyHandle cfh;
+ private final RocksDBFileMetadata metadata;
+ private final Long sstNumber;
+
+ public RocksDBFile(RocksDB db, ColumnFamilyHandle cfh, RocksDBFileMetadata metadata) {
+ this.db = db;
+ this.cfh = cfh;
+ this.metadata = metadata;
+ String fileName = metadata.fileName().replace("/", "");
+ int extensionIndex = fileName.indexOf(".");
+ Long sstNumber = null;
+ if (extensionIndex != -1) {
+ String numberRaw = fileName.substring(0, extensionIndex);
+ //noinspection UnstableApiUsage
+ this.sstNumber = Longs.tryParse(numberRaw);
+ } else {
+ this.sstNumber = null;
+ }
+ }
+
+ public RocksDBFile(T db, ColumnFamilyHandle cfh, LiveFileMetaData file) {
+ this(db,
+ cfh,
+ new RocksDBFileMetadata(file.path(),
+ file.fileName(),
+ file.level(),
+ new String(file.columnFamilyName(), StandardCharsets.UTF_8),
+ file.numEntries(),
+ file.size(),
+ decodeRange(file.smallestKey(), file.largestKey())
+ )
+ );
+ }
+
+ public RocksDBFile(T db, ColumnFamilyHandle cfh, SstFileMetaData file, byte[] columnFamilyName, int level) {
+ this(db,
+ cfh,
+ new RocksDBFileMetadata(file.path(),
+ file.fileName(),
+ level,
+ new String(columnFamilyName, StandardCharsets.UTF_8),
+ file.numEntries(),
+ file.size(),
+ decodeRange(file.smallestKey(), file.largestKey())
+ )
+ );
+ }
+
+ private static LLRange decodeRange(byte[] smallestKey, byte[] largestKey) {
+ return LLRange.of(Buf.wrap(smallestKey), Buf.wrap(largestKey));
+ }
+
+ public RocksDBFileMetadata getMetadata() {
+ return metadata;
+ }
+
+ public record VerificationFileRange(String filePath, String filename, @Nullable LLRange range, long countEstimate,
+ @Nullable Long sstNumber) {}
+
+ public Stream verify(String databaseDisplayName, String columnDisplayName, LLRange rangeFull) {
+ var intersectedRange = LLRange.intersect(metadata.keysRange(), rangeFull);
+ // Ignore the file if it's outside the requested range
+ if (intersectedRange == null) {
+ return Stream.of();
+ }
+
+ String filePath = Path.of(metadata.path()).resolve("./" + metadata.fileName()).normalize().toString();
+ var fr = new VerificationFileRange(filePath,
+ metadata.fileName().replace("/", ""),
+ intersectedRange,
+ metadata.numEntries(),
+ sstNumber
+ );
+ return verify(databaseDisplayName, columnDisplayName, rangeFull, fr);
+ }
+
+ private Stream verify(String databaseDisplayName, String columnDisplayName, LLRange rangeFull, VerificationFileRange fr) {
+ var columnObj = Column.of(columnDisplayName);
+ Stream streamInit = Stream.of(new FileStart(databaseDisplayName, columnObj, fr.filePath, fr.countEstimate > 0 ? fr.countEstimate : null));
+ Stream streamContent;
+ Objects.requireNonNull(fr.range);
+
+ String filename = fr.filename;
+ String path = fr.filePath;
+ LLRange rangePartial = fr.range;
+ AtomicLong fileScanned = new AtomicLong();
+ final long fileEstimate = fr.countEstimate;
+ AtomicBoolean mustSeek = new AtomicBoolean(true);
+ AtomicBoolean streamEnded = new AtomicBoolean(false);
+ streamContent = resourceStream(
+ () -> LLUtils.generateCustomReadOptions(null, false, isBoundedRange(rangePartial), false),
+ ro -> {
+ ro.setIterateLowerBound(rangePartial.getMin() != null ? requireNonNull(LLUtils.asArray(rangePartial.getMin())) : null);
+ ro.setIterateUpperBound(rangePartial.getMax() != null ? requireNonNull(LLUtils.asArray(rangePartial.getMax())) : null);
+ ro.setFillCache(false);
+ if (!rangePartial.isSingle()) {
+ if (LLUtils.MANUAL_READAHEAD) {
+ ro.setReadaheadSize(32 * 1024);
+ }
+ }
+ ro.setVerifyChecksums(true);
+ return resourceStream(() -> ro.newIterator(db, cfh, IteratorMetrics.NO_OP), rocksIterator -> StreamUtils
+ .>streamWhileNonNull(() -> {
+ boolean mustSeekVal = mustSeek.compareAndSet(true, false);
+ if (!mustSeekVal && !rocksIterator.isValid()) {
+ if (streamEnded.compareAndSet(false, true)) {
+ return Optional.of(new FileOk(databaseDisplayName, columnObj, path, fileScanned.get()));
+ } else {
+ //noinspection OptionalAssignedToNull
+ return null;
+ }
+ }
+ boolean shouldSendStatus;
+ Buf rawKey = null;
+ try {
+ if (mustSeekVal) {
+ if (PRINT_ALL_CHECKSUM_VERIFICATION_STEPS) {
+ logger.info("Seeking to {}->{}->first on file {}", databaseDisplayName, columnDisplayName, filename);
+ }
+ rocksIterator.seekToFirst();
+ shouldSendStatus = true;
+ } else {
+ rawKey = rocksIterator.keyBuf().copy();
+ shouldSendStatus = fileScanned.incrementAndGet() % 1_000_000 == 0;
+ if (PRINT_ALL_CHECKSUM_VERIFICATION_STEPS) {
+ logger.info("Checking {}->{}->{} on file {}", databaseDisplayName, columnDisplayName, rawKey.toString(), filename);
+ }
+ rocksIterator.next();
+ }
+ } catch (RocksDBException ex) {
+ return Optional.of(new BlockBad(databaseDisplayName, columnObj, rawKey, path, ex));
+ }
+ if (shouldSendStatus) {
+ long fileScannedVal = fileScanned.get();
+ return Optional.of(new Progress(databaseDisplayName,
+ columnObj,
+ path,
+ -1,
+ -1,
+ fileScannedVal,
+ Math.max(fileEstimate, fileScannedVal)
+ ));
+ } else {
+ return Optional.empty();
+ }
+ }).filter(Optional::isPresent).map(Optional::get).onClose(() -> {
+ rocksIterator.close();
+ ro.close();
+ }));
+ }
+ );
+ return Stream.concat(streamInit, streamContent);
+ }
+
+ @Override
+ public String toString() {
+ return new StringJoiner(", ", RocksDBFile.class.getSimpleName() + "[", "]")
+ .add("fileMetadata=" + metadata)
+ .toString();
+ }
+
+ @Override
+ public int compareTo(@NotNull RocksDBFile o) {
+ if (this.sstNumber == null && o.sstNumber == null) {
+ return 0;
+ } else if (this.sstNumber == null) {
+ return 1;
+ } else if (o.sstNumber == null) {
+ return -1;
+ }
+ return Long.compare(this.sstNumber, o.sstNumber);
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBFileMetadata.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBFileMetadata.java
new file mode 100644
index 0000000..34e7b9c
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBFileMetadata.java
@@ -0,0 +1,8 @@
+package it.cavallium.dbengine.database.disk;
+
+import it.cavallium.dbengine.database.LLRange;
+
+public record RocksDBFileMetadata(String path, String fileName, int level, String columnName, long numEntries, long size,
+ LLRange keysRange) {
+
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBUtils.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBUtils.java
index 05afaa6..f47fb9f 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBUtils.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBUtils.java
@@ -26,15 +26,15 @@ public class RocksDBUtils {
return db.numberLevels(cfh);
}
- public static List getColumnFiles(RocksDB db, ColumnFamilyHandle cfh, boolean excludeLastLevel) {
- List files = new ArrayList<>();
+ public static List getColumnFiles(RocksDB db, ColumnFamilyHandle cfh, boolean excludeLastLevel) {
+ List files = new ArrayList<>();
var meta = db.getColumnFamilyMetaData(cfh);
var lastLevelId = excludeLastLevel ? (getLevels(db, cfh) - 1) : -1;
for (LevelMetaData level : meta.levels()) {
if (!excludeLastLevel || level.level() < lastLevelId) {
for (SstFileMetaData file : level.files()) {
if (file.fileName().endsWith(".sst")) {
- files.add(file.fileName());
+ files.add(new RocksDBFile(db, cfh, file, meta.name(), level.level()));
}
}
}
@@ -51,18 +51,18 @@ public class RocksDBUtils {
.setCompression(CompressionType.LZ4_COMPRESSION)
.setMaxSubcompactions(0)
.setOutputFileSizeLimit(2 * SizeUnit.GB)) {
- List filesToCompact = getColumnFiles(db, cfh, true);
+ var filesToCompact = getColumnFiles(db, cfh, true);
if (!filesToCompact.isEmpty()) {
var partitionSize = filesToCompact.size() / Runtime.getRuntime().availableProcessors();
- List> partitions;
+ List> partitions;
if (partitionSize > 0) {
partitions = partition(filesToCompact, partitionSize);
} else {
partitions = List.of(filesToCompact);
}
int finalBottommostLevelId = getLevels(db, cfh) - 1;
- for (List partition : partitions) {
+ for (List partition : partitions) {
logger.info("Compacting {} files in database {} in column family {} to level {}",
partition.size(),
logDbName,
@@ -72,7 +72,8 @@ public class RocksDBUtils {
if (!partition.isEmpty()) {
var coi = new CompactionJobInfo();
try {
- db.compactFiles(co, cfh, partition, finalBottommostLevelId, volumeId, coi);
+ var partitionFileNames = partition.stream().map(x -> x.getMetadata().fileName()).toList();
+ db.compactFiles(co, cfh, partitionFileNames, finalBottommostLevelId, volumeId, coi);
logger.info("Compacted {} files in database {} in column family {} to level {}: {}",
partition.size(),
logDbName,
diff --git a/src/repair/java/it/cavallium/dbengine/repair/DataSize.java b/src/repair/java/it/cavallium/dbengine/repair/DataSize.java
new file mode 100644
index 0000000..b297ce5
--- /dev/null
+++ b/src/repair/java/it/cavallium/dbengine/repair/DataSize.java
@@ -0,0 +1,194 @@
+package it.cavallium.dbengine.repair;
+
+import java.io.Serial;
+import java.text.CharacterIterator;
+import java.text.StringCharacterIterator;
+import org.jetbrains.annotations.NotNull;
+
+@SuppressWarnings("unused")
+public final class DataSize extends Number implements Comparable {
+
+ @Serial
+ private static final long serialVersionUID = 7213411239846723568L;
+
+ public static DataSize ZERO = new DataSize(0L);
+ public static DataSize ONE = new DataSize(1L);
+ public static DataSize KIB = new DataSize(1024L);
+ public static DataSize KB = new DataSize(1000L);
+ public static DataSize MIB = new DataSize(1024L * 1024);
+ public static DataSize MB = new DataSize(1000L * 1000);
+ public static DataSize GIB = new DataSize(1024L * 1024 * 1024);
+ public static DataSize GB = new DataSize(1000L * 1000 * 1000);
+ public static DataSize TIB = new DataSize(1024L * 1024 * 1024 * 1024);
+ public static DataSize TB = new DataSize(1000L * 1000 * 1000 * 1024);
+ public static DataSize PIB = new DataSize(1024L * 1024 * 1024 * 1024 * 1024);
+ public static DataSize PB = new DataSize(1000L * 1000 * 1000 * 1024 * 1024);
+ public static DataSize EIB = new DataSize(1024L * 1024 * 1024 * 1024 * 1024 * 1024);
+ public static DataSize EB = new DataSize(1000L * 1000 * 1000 * 1024 * 1024 * 1024);
+ public static DataSize MAX_VALUE = new DataSize(Long.MAX_VALUE);
+
+ private final long size;
+
+ public DataSize(long size) {
+ this.size = size;
+ }
+
+ public DataSize(String size) {
+ size = size.replaceAll("\\s|_", "");
+ switch (size) {
+ case "", "0", "-0", "+0" -> {
+ this.size = 0;
+ return;
+ }
+ case "∞", "inf", "infinite", "∞b" -> {
+ this.size = Long.MAX_VALUE;
+ return;
+ }
+ }
+ int numberStartOffset = 0;
+ int numberEndOffset = 0;
+ boolean negative = false;
+ {
+ boolean firstChar = true;
+ boolean numberMode = true;
+ for (char c : size.toCharArray()) {
+ if (c == '-') {
+ if (firstChar) {
+ negative = true;
+ numberStartOffset++;
+ numberEndOffset++;
+ } else {
+ throw new IllegalArgumentException("Found a minus character after index 0");
+ }
+ } else if (Character.isDigit(c)) {
+ if (numberMode) {
+ numberEndOffset++;
+ } else {
+ throw new IllegalArgumentException("Found a number after the unit");
+ }
+ } else if (Character.isLetter(c)) {
+ if (numberEndOffset - numberStartOffset <= 0) {
+ throw new IllegalArgumentException("No number found");
+ }
+ if (numberMode) {
+ numberMode = false;
+ }
+ } else {
+ throw new IllegalArgumentException("Unsupported character");
+ }
+ if (firstChar) {
+ firstChar = false;
+ }
+ }
+ }
+ var number = Long.parseUnsignedLong(size, numberStartOffset, numberEndOffset, 10);
+ if (numberEndOffset == size.length()) {
+ // No measurement
+ this.size = (negative ? -1 : 1) * number;
+ return;
+ }
+ // Measurements are like B, MB, or MiB, not longer
+ if (size.length() - numberEndOffset > 3) {
+ throw new IllegalArgumentException("Wrong measurement unit");
+ }
+ var scaleChar = size.charAt(numberEndOffset);
+ boolean powerOf2 = numberEndOffset + 1 < size.length() && size.charAt(numberEndOffset + 1) == 'i';
+ int k = powerOf2 ? 1024 : 1000;
+ var scale = switch (scaleChar) {
+ case 'B' -> 1;
+ case 'b' -> throw new IllegalArgumentException("Bits are not allowed");
+ case 'K', 'k' -> k;
+ case 'M', 'm' -> k * k;
+ case 'G', 'g' -> k * k * k;
+ case 'T', 't' -> k * k * k * k;
+ case 'P', 'p' -> k * k * k * k * k;
+ case 'E', 'e' -> k * k * k * k * k * k;
+ case 'Z', 'z' -> k * k * k * k * k * k * k;
+ case 'Y', 'y' -> k * k * k * k * k * k * k * k;
+ default -> throw new IllegalStateException("Unexpected value: " + scaleChar);
+ };
+ // if scale is 1, the unit should be "B", nothing more
+ if (scale == 1 && numberEndOffset + 1 != size.length()) {
+ throw new IllegalArgumentException("Invalid unit");
+ }
+ this.size = (negative ? -1 : 1) * number * scale;
+ }
+
+ public static Long get(DataSize value) {
+ if (value == null) {
+ return null;
+ } else {
+ return value.size;
+ }
+ }
+
+ public static long getOrElse(DataSize value, @NotNull DataSize defaultValue) {
+ if (value == null) {
+ return defaultValue.size;
+ } else {
+ return value.size;
+ }
+ }
+
+
+ @Override
+ public int intValue() {
+ if (size >= Integer.MAX_VALUE) {
+ return Integer.MAX_VALUE;
+ }
+ return (int) size;
+ }
+
+ @Override
+ public long longValue() {
+ return size;
+ }
+
+ @Override
+ public float floatValue() {
+ return size;
+ }
+
+ @Override
+ public double doubleValue() {
+ return size;
+ }
+
+ @Override
+ public String toString() {
+ return toString(true);
+ }
+
+ public String toString(boolean precise) {
+ boolean siUnits = size % 1000 == 0;
+ int k = siUnits ? 1000 : 1024;
+ long lSize = size;
+ CharacterIterator ci = new StringCharacterIterator((siUnits ? "k" : "K") + "MGTPEZY");
+ while ((precise ? lSize % k == 0 : lSize > k) && lSize != 0) {
+ lSize /= k;
+ ci.next();
+ }
+ if (lSize == size) {
+ return lSize + "B";
+ }
+ return lSize + "" + ci.previous() + (siUnits ? "B" : "iB");
+ }
+
+ @Override
+ public int compareTo(@NotNull DataSize anotherLong) {
+ return Long.compare(this.size, anotherLong.size);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof DataSize) {
+ return size == ((DataSize)obj).size;
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Long.hashCode(size);
+ }
+}
diff --git a/src/repair/java/it/cavallium/dbengine/repair/Repair.java b/src/repair/java/it/cavallium/dbengine/repair/Repair.java
index a64caa5..45dde05 100644
--- a/src/repair/java/it/cavallium/dbengine/repair/Repair.java
+++ b/src/repair/java/it/cavallium/dbengine/repair/Repair.java
@@ -1,24 +1,23 @@
package it.cavallium.dbengine.repair;
-import io.micrometer.core.instrument.Clock;
-import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
-import io.micrometer.core.instrument.logging.LoggingMeterRegistry;
-import io.micrometer.core.instrument.noop.NoopMeter;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import it.cavallium.datagen.nativedata.NullableString;
import it.cavallium.datagen.nativedata.Nullableboolean;
import it.cavallium.datagen.nativedata.Nullableint;
import it.cavallium.datagen.nativedata.Nullablelong;
import it.cavallium.dbengine.client.Compression;
+import it.cavallium.dbengine.client.VerificationProgress;
import it.cavallium.dbengine.client.VerificationProgress.BlockBad;
import it.cavallium.dbengine.client.VerificationProgress.FileOk;
+import it.cavallium.dbengine.client.VerificationProgress.FileStart;
import it.cavallium.dbengine.client.VerificationProgress.Progress;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
import it.cavallium.dbengine.database.disk.LLLocalDictionary;
import it.cavallium.dbengine.database.disk.LLLocalKeyValueDatabase;
+import it.cavallium.dbengine.database.disk.RocksDBFile;
import it.cavallium.dbengine.rpc.current.data.Column;
import it.cavallium.dbengine.rpc.current.data.DatabaseOptionsBuilder;
import it.cavallium.dbengine.rpc.current.data.DatabaseVolume;
@@ -26,18 +25,22 @@ import it.cavallium.dbengine.rpc.current.data.DefaultColumnOptions;
import it.cavallium.dbengine.rpc.current.data.NamedColumnOptions;
import it.cavallium.dbengine.rpc.current.data.nullables.NullableCompression;
import it.cavallium.dbengine.rpc.current.data.nullables.NullableFilter;
-import it.cavallium.dbengine.utils.StreamUtils;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import it.unimi.dsi.fastutil.objects.ObjectList;
+import java.io.BufferedWriter;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
import java.util.Base64;
+import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
@@ -47,7 +50,7 @@ public class Repair {
public static final MeterRegistry METER = new SimpleMeterRegistry(); // LoggingMeterRegistry.builder(key -> null).clock(Clock.SYSTEM).loggingSink(System.err::println).build();
- public static void main(String[] argsArray) throws RocksDBException {
+ public static void main(String[] argsArray) throws RocksDBException, IOException {
System.setProperty("it.cavallium.dbengine.checks.compression", "true");
System.setProperty("it.cavallium.dbengine.checks.paranoid", "true");
System.setProperty("it.cavallium.dbengine.checks.filesize", "true");
@@ -62,7 +65,7 @@ public class Repair {
args = args.subList(1, args.size());
switch (command.toLowerCase(Locale.ROOT)) {
case "list-column-families" -> {
- if (args.size() < 3) {
+ if (args.size() != 2) {
printHelp(initialArgs);
}
Path path = Path.of(args.get(0));
@@ -80,41 +83,87 @@ public class Repair {
var conn = new LLLocalDatabaseConnection(METER, path, false);
conn.connect();
LLLocalKeyValueDatabase db = getDatabase(conn, dbName, columnNames);
- db.getAllColumnFamilyHandles().forEach((column, cfh) -> {
- System.err.printf("Scanning column: %s%n", column.name());
- LLLocalDictionary dict = db.getDictionary(column.name().getBytes(StandardCharsets.UTF_8), UpdateMode.DISALLOW);
- StreamUtils.collectOn(StreamUtils.ROCKSDB_POOL, dict.verifyChecksum(LLRange.all()), StreamUtils.executing(block -> {
- synchronized (Repair.class) {
- switch (block) {
- case null -> {}
- case BlockBad blockBad -> {
- System.err.println("[ ! ] Bad block found: " + block.databaseName() + (block.column() != null ? "->" + block.column().name() : "") + "->" + blockBad.rawKey() + "->" + block.file());
- if (blockBad.ex() != null) {
- if (blockBad.ex() instanceof RocksDBException ex) {
- System.err.println("\t" + ex);
- } else {
- blockBad.ex().printStackTrace(System.err);
- }
- }
+ Path cwd = Path.of(".");
+ String errorsFileNamePrefix = "errors-" + dbName;
+ String errorsFileNameSuffix = ".log";
+ Path errorsFile;
+ if (Files.isWritable(cwd)) {
+ errorsFile = cwd.resolve(errorsFileNamePrefix + "-" + System.currentTimeMillis() + errorsFileNameSuffix);
+ } else {
+ errorsFile = Files.createTempFile(errorsFileNamePrefix, errorsFileNameSuffix);
+ }
+ try (var os = Files.newBufferedWriter(errorsFile,
+ StandardCharsets.UTF_8,
+ StandardOpenOption.CREATE,
+ StandardOpenOption.TRUNCATE_EXISTING,
+ StandardOpenOption.DSYNC
+ )) {
+ db.getAllColumnFamilyHandles().forEach((column, cfh) -> {
+ System.err.printf("Scanning column: %s%n", column.name());
+ LLLocalDictionary dict = db.getDictionary(column.name().getBytes(StandardCharsets.UTF_8), UpdateMode.DISALLOW);
+ consumeVerification(os, dict.verifyChecksum(LLRange.all()));
+ });
+ }
+ }
+ case "scan-files" -> {
+ if (args.size() < 3) {
+ printHelp(initialArgs);
+ }
+ Path path = Path.of(args.get(0)).toAbsolutePath();
+ String dbName = args.get(1);
+ List fileNames = args.subList(2, args.size());
+ List columnNames = getColumnFamilyNames(path, dbName).toList();
+ System.err.printf("Scanning database \"%s\" at \"%s\", files:%n%s%n", dbName, path, String.join("\n", fileNames));
+ var conn = new LLLocalDatabaseConnection(METER, path, false);
+ conn.connect();
+ LLLocalKeyValueDatabase db = getDatabase(conn, dbName, columnNames);
+ Path cwd = Path.of(".");
+ String errorsFileNamePrefix = "errors-" + dbName;
+ String errorsFileNameSuffix = ".log";
+ Path errorsFile;
+ if (Files.isWritable(cwd)) {
+ errorsFile = cwd.resolve(errorsFileNamePrefix + "-" + System.currentTimeMillis() + errorsFileNameSuffix);
+ } else {
+ errorsFile = Files.createTempFile(errorsFileNamePrefix, errorsFileNameSuffix);
+ }
+ try (var os = Files.newBufferedWriter(errorsFile,
+ StandardCharsets.UTF_8,
+ StandardOpenOption.CREATE,
+ StandardOpenOption.TRUNCATE_EXISTING,
+ StandardOpenOption.DSYNC
+ )) {
+ consumeVerification(os, db.getAllLiveFiles()
+ .filter(file -> {
+ if (fileNames.contains(file.getMetadata().fileName())) {
+ return true;
+ } else {
+ System.err.printf("Ignoring file: \"%s\"%n", file.getMetadata().fileName());
+ return false;
}
- case FileOk fileOk -> {
- System.err.println("File is ok: " + block.databaseName() + (block.column() != null ? "->" + block.column().name() : "") + "->" + block.file());
- }
- case Progress progress -> {
- System.err.printf("Progress: [%d/%d] file: [%d/%d] %s%s->%s%n",
- progress.scanned(),
- progress.total(),
- progress.fileScanned(),
- progress.fileTotal(),
- block.databaseName(),
- block.column() != null ? "->" + block.column().name() : "",
- block.file()
- );
- }
- default -> throw new IllegalStateException("Unexpected value: " + block);
- }
- }
- }));
+ })
+ .flatMap(file -> file.verify(dbName, "any", LLRange.all())));
+ }
+ }
+ case "list-files" -> {
+ if (args.size() != 2) {
+ printHelp(initialArgs);
+ }
+ Path path = Path.of(args.get(0)).toAbsolutePath();
+ String dbName = args.get(1);
+ List columnNames = getColumnFamilyNames(path, dbName).toList();
+ System.err.printf("Getting files of database \"%s\" at \"%s\"%n", dbName, path);
+ System.out.println("Name \tPath \tColumn name \tKeys range \tSize \tEntries \tLevel");
+ var conn = new LLLocalDatabaseConnection(METER, path, false);
+ conn.connect();
+ LLLocalKeyValueDatabase db = getDatabase(conn, dbName, columnNames);
+ db.getAllLiveFiles().sorted(Comparator
+ .comparingInt(x -> x.getMetadata().level())
+ .thenComparingLong(x -> x.getMetadata().size())
+ .thenComparingLong(x -> x.getMetadata().numEntries())
+ .thenComparing(x -> x.getMetadata().fileName())
+ ).forEach(file -> {
+ var meta = file.getMetadata();
+ System.out.printf("%s\t%s\t%s\t%s\t%s\t%d\t%d%n", meta.fileName(), meta.path(), meta.columnName(), meta.keysRange(), new DataSize(meta.size()).toString(false), meta.numEntries(), meta.level());
});
}
case "dump-all" -> {
@@ -175,6 +224,59 @@ public class Repair {
System.exit(0);
}
+ private static void consumeVerification(BufferedWriter os, Stream verificationProgressStream) {
+ var showProgress = !Boolean.getBoolean("it.cavallium.dbengine.repair.hideprogress");
+ verificationProgressStream.parallel().forEach(block -> {
+ synchronized (Repair.class) {
+ switch (block) {
+ case null -> {}
+ case BlockBad blockBad -> {
+ StringBuilder errorLineBuilder = new StringBuilder()
+ .append("File scan ended with error, bad block found: ")
+ .append(block.databaseName())
+ .append(block.column() != null ? "->" + block.column().name() : "")
+ .append("->")
+ .append(blockBad.rawKey())
+ .append("->")
+ .append(block.file());
+ if (blockBad.ex() != null) {
+ if (blockBad.ex() instanceof RocksDBException ex) {
+ errorLineBuilder.append("\n\t").append(ex);
+ } else {
+ errorLineBuilder.append("\n").append(ExceptionUtils.getStackTrace(blockBad.ex()));
+ }
+ }
+ String errorLine = errorLineBuilder.toString();
+ System.err.println("[ ! ] " + errorLine);
+ try {
+ os.write(errorLine);
+ os.flush();
+ } catch (IOException e) {
+ System.err.println("Can't write to errors file: " + e);
+ }
+ }
+ case FileOk ignored ->
+ System.err.println("File scan ended with success: " + block.databaseName() + (block.column() != null ? "->" + block.column().name() : "") + "->" + block.file());
+ case FileStart ignored ->
+ System.err.println("File scan begin: " + block.databaseName() + (block.column() != null ? "->" + block.column().name() : "") + "->" + block.file());
+ case Progress progress -> {
+ if (showProgress) {
+ System.err.printf("Progress: %s[%d/%d] %s%s->%s%n",
+ progress.total() != -1 ? "[%d/%d] file: ".formatted(progress.scanned(), progress.total()) : "",
+ progress.fileScanned(),
+ progress.fileTotal(),
+ block.databaseName(),
+ block.column() != null ? "->" + block.column().name() : "",
+ block.file()
+ );
+ }
+ }
+ default -> throw new IllegalStateException("Unexpected value: " + block);
+ }
+ }
+ });
+ }
+
private static Stream getColumnFamilyNames(Path path, String dbName) throws RocksDBException {
String dbPath = path.resolve("database_" + dbName).toString();
System.err.printf("Listing column families of database: %s%n", dbPath);
@@ -249,6 +351,8 @@ public class Repair {
or: repair dump-all DIRECTORY DB_NAME COLUMN_NAME...
or: repair verify-checksum DIRECTORY DB_NAME
or: repair list-column-families DIRECTORY DB_NAME
+ or: repair scan-files DIRECTORY DB_NAME FILE-NAME...
+ or: repair list-files DIRECTORY DB_NAME
""");
System.exit(1);
}