Compare commits

...

3 Commits

37 changed files with 1530 additions and 277 deletions

View File

@ -13,8 +13,8 @@
<revision>0-SNAPSHOT</revision>
<dbengine.ci>false</dbengine.ci>
<micrometer.version>1.10.4</micrometer.version>
<lucene.version>9.7.0</lucene.version>
<rocksdb.version>8.5.3</rocksdb.version>
<lucene.version>9.8.0</lucene.version>
<rocksdb.version>8.5.4</rocksdb.version>
<junit.jupiter.version>5.9.0</junit.jupiter.version>
<data.generator.version>1.0.13</data.generator.version>
</properties>
@ -574,6 +574,9 @@
<manifest>
<mainClass>it.cavallium.dbengine.repair.Repair</mainClass>
</manifest>
<manifestEntries>
<Multi-Release>true</Multi-Release>
</manifestEntries>
</archive>
</configuration>
<executions>

View File

@ -26,7 +26,7 @@ public interface CompositeDatabase extends DatabaseProperties, DatabaseOperation
/**
* Find corrupted items
*/
Stream<VerificationProgress> verify();
Stream<DbProgress<SSTVerificationProgress>> verify();
void verifyChecksum();
}

View File

@ -0,0 +1,45 @@
package it.cavallium.dbengine.client;
import it.cavallium.dbengine.client.SSTProgress.SSTProgressReport;
import it.cavallium.dbengine.client.SSTProgress.SSTStart;
import it.cavallium.dbengine.rpc.current.data.Column;
import java.nio.file.Path;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.jetbrains.annotations.Nullable;
public interface DbProgress<T extends SSTProgress> {
String databaseName();
record DbSSTProgress<T extends SSTProgress>(String databaseName, Column column, @Nullable Path file, long scanned,
long total, T sstProgress) implements DbProgress<T> {
public double getProgress() {
if (total == 0) {
return 0d;
}
return scanned / (double) total;
}
public String fileString() {
return file != null ? file.normalize().toString() : null;
}
}
static <T extends SSTProgress> Stream<DbProgress<T>> toDbProgress(String dbName,
String columnName,
LongProgressTracker totalTracker,
Stream<T> stream) {
Column column = Column.of(columnName);
AtomicReference<Path> filePath = new AtomicReference<>();
return stream.map(state -> {
switch (state) {
case SSTStart start -> filePath.set(start.metadata().filePath());
case SSTProgressReport progress -> totalTracker.incrementAndGet();
default -> {}
}
return new DbSSTProgress<>(dbName, column, filePath.get(), totalTracker.getCurrent(), totalTracker.getTotal(), state);
});
}
}

View File

@ -0,0 +1,42 @@
package it.cavallium.dbengine.client;
import java.util.concurrent.atomic.AtomicLong;
public class LongProgressTracker {
private final AtomicLong current = new AtomicLong();
private final AtomicLong total = new AtomicLong();
public LongProgressTracker(long size) {
setTotal(size);
}
public LongProgressTracker() {
}
public LongProgressTracker setTotal(long estimate) {
total.set(estimate);
return this;
}
public long getCurrent() {
return current.get();
}
public long incrementAndGet() {
return current.incrementAndGet();
}
public long getAndIncrement() {
return current.getAndIncrement();
}
public long getTotal() {
return Math.max(current.get(), total.get());
}
public double progress() {
return getCurrent() / (double) Math.max(1L, getTotal());
}
}

View File

@ -0,0 +1,17 @@
package it.cavallium.dbengine.client;
import it.cavallium.buffer.Buf;
import it.cavallium.dbengine.client.SSTDumpProgress.SSTBlockFail;
import it.cavallium.dbengine.client.SSTDumpProgress.SSTBlockKeyValue;
import it.cavallium.dbengine.client.SSTProgress.SSTOk;
import it.cavallium.dbengine.client.SSTProgress.SSTProgressReport;
import it.cavallium.dbengine.client.SSTProgress.SSTStart;
import org.rocksdb.RocksDBException;
public sealed interface SSTDumpProgress extends SSTProgress permits SSTBlockFail, SSTBlockKeyValue, SSTOk,
SSTProgressReport, SSTStart {
record SSTBlockKeyValue(Buf rawKey, Buf rawValue) implements SSTDumpProgress {}
record SSTBlockFail(RocksDBException ex) implements SSTDumpProgress {}
}

View File

@ -0,0 +1,21 @@
package it.cavallium.dbengine.client;
import it.cavallium.dbengine.database.disk.RocksDBFile.IterationMetadata;
import it.cavallium.dbengine.rpc.current.data.Column;
import org.jetbrains.annotations.Nullable;
public interface SSTProgress {
record SSTStart(IterationMetadata metadata) implements SSTProgress, SSTVerificationProgress, SSTDumpProgress {}
record SSTOk(long scannedCount) implements SSTProgress, SSTVerificationProgress, SSTDumpProgress {}
record SSTProgressReport(long fileScanned, long fileTotal) implements SSTProgress, SSTVerificationProgress,
SSTDumpProgress {
public double getFileProgress() {
if (fileTotal == 0) return 0d;
return fileScanned / (double) fileTotal;
}
}
}

View File

@ -0,0 +1,17 @@
package it.cavallium.dbengine.client;
import it.cavallium.buffer.Buf;
import it.cavallium.dbengine.client.DbProgress.DbSSTProgress;
import it.cavallium.dbengine.client.SSTProgress.SSTOk;
import it.cavallium.dbengine.client.SSTProgress.SSTProgressReport;
import it.cavallium.dbengine.client.SSTProgress.SSTStart;
import it.cavallium.dbengine.client.SSTVerificationProgress.SSTBlockBad;
import it.cavallium.dbengine.rpc.current.data.Column;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
public sealed interface SSTVerificationProgress extends SSTProgress permits SSTOk, SSTProgressReport, SSTStart,
SSTBlockBad {
record SSTBlockBad(Buf rawKey, Throwable ex) implements SSTVerificationProgress {}
}

View File

@ -1,29 +0,0 @@
package it.cavallium.dbengine.client;
import it.cavallium.buffer.Buf;
import it.cavallium.dbengine.rpc.current.data.Column;
import org.jetbrains.annotations.Nullable;
public sealed interface VerificationProgress {
record BlockBad(String databaseName, Column column, Buf rawKey, String file, Throwable ex)
implements VerificationProgress {}
record FileOk(String databaseName, Column column, String file)
implements VerificationProgress {}
record Progress(String databaseName, Column column, String file,
long scanned, long total,
long fileScanned, long fileTotal)
implements VerificationProgress {
public double getProgress() {
return scanned / (double) total;
}
public double getFileProgress() {
return fileScanned / (double) fileTotal;
}
}
@Nullable String databaseName();
@Nullable Column column();
@Nullable String file();
}

View File

@ -1,7 +1,8 @@
package it.cavallium.dbengine.database;
import it.cavallium.buffer.Buf;
import it.cavallium.dbengine.client.VerificationProgress;
import it.cavallium.dbengine.client.DbProgress;
import it.cavallium.dbengine.client.SSTVerificationProgress;
import it.cavallium.dbengine.database.serialization.KVSerializationFunction;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import java.util.List;
@ -64,7 +65,7 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
int prefixLength,
boolean smallRange);
Stream<VerificationProgress> verifyChecksum(LLRange range);
Stream<DbProgress<SSTVerificationProgress>> verifyChecksum(LLRange range);
void setRange(LLRange range, Stream<LLEntry> entries, boolean smallRange);

View File

@ -1,9 +1,7 @@
package it.cavallium.dbengine.database;
import it.cavallium.buffer.Buf;
import it.cavallium.dbengine.database.disk.LiveFileMetadata;
import java.util.Objects;
import java.util.StringJoiner;
import org.jetbrains.annotations.Nullable;
/**

View File

@ -5,8 +5,9 @@ import static it.cavallium.dbengine.utils.StreamUtils.resourceStream;
import it.cavallium.buffer.Buf;
import it.cavallium.buffer.BufDataInput;
import it.cavallium.buffer.BufDataOutput;
import it.cavallium.dbengine.client.VerificationProgress;
import it.cavallium.dbengine.client.DbProgress;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.client.SSTVerificationProgress;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLRange;
@ -269,7 +270,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
}
@Override
public Stream<VerificationProgress> verifyChecksum() {
public Stream<DbProgress<SSTVerificationProgress>> verifyChecksum() {
return dictionary.verifyChecksum(range);
}

View File

@ -1,8 +1,9 @@
package it.cavallium.dbengine.database.collections;
import it.cavallium.buffer.Buf;
import it.cavallium.dbengine.client.VerificationProgress;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.client.DbProgress;
import it.cavallium.dbengine.client.SSTVerificationProgress;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.SubStageEntry;
import it.cavallium.dbengine.database.UpdateMode;
@ -148,7 +149,7 @@ public class DatabaseMapDictionaryHashed<T, U, TH> implements DatabaseStageMap<T
}
@Override
public Stream<VerificationProgress> verifyChecksum() {
public Stream<DbProgress<SSTVerificationProgress>> verifyChecksum() {
return this.subDictionary.verifyChecksum();
}

View File

@ -3,8 +3,9 @@ package it.cavallium.dbengine.database.collections;
import it.cavallium.buffer.Buf;
import it.cavallium.buffer.BufDataInput;
import it.cavallium.buffer.BufDataOutput;
import it.cavallium.dbengine.client.VerificationProgress;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.client.DbProgress;
import it.cavallium.dbengine.client.SSTVerificationProgress;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
@ -121,7 +122,7 @@ public final class DatabaseMapSingle<U> implements DatabaseStageEntry<U> {
}
@Override
public Stream<VerificationProgress> verifyChecksum() {
public Stream<DbProgress<SSTVerificationProgress>> verifyChecksum() {
return dictionary.verifyChecksum(LLRange.single(key));
}

View File

@ -1,7 +1,8 @@
package it.cavallium.dbengine.database.collections;
import it.cavallium.dbengine.client.VerificationProgress;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.client.DbProgress;
import it.cavallium.dbengine.client.SSTVerificationProgress;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateReturnMode;
@ -124,7 +125,7 @@ public class DatabaseSingleBucket<K, V, TH> implements DatabaseStageEntry<V> {
}
@Override
public Stream<VerificationProgress> verifyChecksum() {
public Stream<DbProgress<SSTVerificationProgress>> verifyChecksum() {
return bucketStage.verifyChecksum();
}

View File

@ -1,8 +1,9 @@
package it.cavallium.dbengine.database.collections;
import it.cavallium.dbengine.client.VerificationProgress;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.client.DbProgress;
import it.cavallium.dbengine.client.Mapper;
import it.cavallium.dbengine.client.SSTVerificationProgress;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.disk.CachedSerializationFunction;
@ -107,7 +108,7 @@ public class DatabaseSingleMapped<A, B> implements DatabaseStageEntry<A> {
}
@Override
public Stream<VerificationProgress> verifyChecksum() {
public Stream<DbProgress<SSTVerificationProgress>> verifyChecksum() {
return this.serializedSingle.verifyChecksum();
}

View File

@ -3,8 +3,9 @@ package it.cavallium.dbengine.database.collections;
import it.cavallium.buffer.Buf;
import it.cavallium.buffer.BufDataInput;
import it.cavallium.buffer.BufDataOutput;
import it.cavallium.dbengine.client.VerificationProgress;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.client.DbProgress;
import it.cavallium.dbengine.client.SSTVerificationProgress;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLSingleton;
import it.cavallium.dbengine.database.LLSnapshot;
@ -119,7 +120,7 @@ public class DatabaseSingleton<U> implements DatabaseStageEntry<U> {
}
@Override
public Stream<VerificationProgress> verifyChecksum() {
public Stream<DbProgress<SSTVerificationProgress>> verifyChecksum() {
return Stream.empty();
}
}

View File

@ -1,7 +1,8 @@
package it.cavallium.dbengine.database.collections;
import it.cavallium.dbengine.client.VerificationProgress;
import it.cavallium.dbengine.client.DbProgress;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.client.SSTVerificationProgress;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateReturnMode;
@ -64,5 +65,5 @@ public interface DatabaseStage<T> extends DatabaseStageWithEntry<T> {
return leavesCount(snapshot, false) <= 0;
}
Stream<VerificationProgress> verifyChecksum();
Stream<DbProgress<SSTVerificationProgress>> verifyChecksum();
}

View File

@ -7,12 +7,10 @@ import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import it.cavallium.buffer.Buf;
import it.cavallium.dbengine.database.ColumnUtils;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.RepeatedElementList;
import it.cavallium.dbengine.database.disk.rocksdb.LLReadOptions;
import it.cavallium.dbengine.database.disk.rocksdb.LLSlice;
import it.cavallium.dbengine.database.disk.rocksdb.LLWriteOptions;
import it.cavallium.dbengine.database.disk.rocksdb.RocksIteratorObj;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
@ -22,6 +20,7 @@ import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.StampedLock;
import java.util.stream.Stream;
@ -35,10 +34,11 @@ import org.rocksdb.CompactRangeOptions;
import org.rocksdb.FlushOptions;
import org.rocksdb.Holder;
import org.rocksdb.KeyMayExist;
import org.rocksdb.LiveFileMetaData;
import org.rocksdb.LevelMetaData;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksObject;
import org.rocksdb.SstFileMetaData;
import org.rocksdb.TableProperties;
import org.rocksdb.Transaction;
import org.rocksdb.TransactionOptions;
@ -560,12 +560,17 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
}
@Override
public Stream<LiveFileMetadata> getAllLiveFiles() throws RocksDBException {
byte[] cfhName = cfh.getName();
return db.getLiveFilesMetaData().stream()
.filter(file -> Arrays.equals(cfhName, file.columnFamilyName()))
.map(file -> new LiveFileMetadata(file.path(), file.fileName(), file.level(), columnName,
file.numEntries(),file.size(), LLRange.of(Buf.wrap(file.smallestKey()), Buf.wrap(file.largestKey()))));
public Stream<RocksDBFile> getAllLiveFiles() throws RocksDBException {
var closeReadLock = closeLock.readLock();
try {
ensureOpen();
byte[] cfhName = cfh.getName();
return db.getColumnFamilyMetaData(cfh).levels().stream()
.flatMap(l -> l.files().stream()
.map(sstFileMetaData -> new RocksDBColumnFile(db, cfh, sstFileMetaData, cfhName, l.level())));
} finally {
closeLock.unlockRead(closeReadLock);
}
}
protected int getLevels() {

View File

@ -1,7 +1,22 @@
package it.cavallium.dbengine.database.disk;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Meter.Id;
import io.micrometer.core.instrument.Meter.Type;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.noop.NoopCounter;
import io.micrometer.core.instrument.noop.NoopTimer;
public record IteratorMetrics(Counter startedIterSeek, Counter endedIterSeek, Timer iterSeekTime,
Counter startedIterNext, Counter endedIterNext, Timer iterNextTime) {}
Counter startedIterNext, Counter endedIterNext, Timer iterNextTime) {
public static final IteratorMetrics NO_OP = new IteratorMetrics(
new NoopCounter(new Id("no-op", Tags.empty(), null, null, Type.COUNTER)),
new NoopCounter(new Id("no-op", Tags.empty(), null, null, Type.COUNTER)),
new NoopTimer(new Id("no-op", Tags.empty(), null, null, Type.TIMER)),
new NoopCounter(new Id("no-op", Tags.empty(), null, null, Type.COUNTER)),
new NoopCounter(new Id("no-op", Tags.empty(), null, null, Type.COUNTER)),
new NoopTimer(new Id("no-op", Tags.empty(), null, null, Type.TIMER))
);
}

View File

@ -68,6 +68,10 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection {
}
public Path getDatabasePath(String databaseName) {
return getDatabasePath(basePath, databaseName);
}
public static Path getDatabasePath(Path basePath, String databaseName) {
return basePath.resolve("database_" + databaseName);
}

View File

@ -4,24 +4,25 @@ import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
import static it.cavallium.dbengine.database.LLUtils.isBoundedRange;
import static it.cavallium.dbengine.database.LLUtils.mapList;
import static it.cavallium.dbengine.database.LLUtils.toStringSafe;
import static it.cavallium.dbengine.database.disk.LLLocalKeyValueDatabase.PRINT_ALL_CHECKSUM_VERIFICATION_STEPS;
import static it.cavallium.dbengine.database.disk.UpdateAtomicResultMode.DELTA;
import static it.cavallium.dbengine.utils.StreamUtils.ROCKSDB_POOL;
import static it.cavallium.dbengine.utils.StreamUtils.collectOn;
import static it.cavallium.dbengine.utils.StreamUtils.executing;
import static it.cavallium.dbengine.utils.StreamUtils.fastSummingLong;
import static it.cavallium.dbengine.utils.StreamUtils.resourceStream;
import static it.cavallium.dbengine.utils.StreamUtils.streamWhileNonNull;
import static java.util.Objects.requireNonNull;
import static it.cavallium.dbengine.utils.StreamUtils.batches;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import it.cavallium.buffer.Buf;
import it.cavallium.dbengine.client.VerificationProgress;
import it.cavallium.dbengine.client.VerificationProgress.BlockBad;
import it.cavallium.dbengine.client.VerificationProgress.FileOk;
import it.cavallium.dbengine.client.VerificationProgress.Progress;
import it.cavallium.dbengine.client.DbProgress;
import it.cavallium.dbengine.client.DbProgress.DbSSTProgress;
import it.cavallium.dbengine.client.SSTProgress.SSTOk;
import it.cavallium.dbengine.client.SSTProgress.SSTProgressReport;
import it.cavallium.dbengine.client.SSTProgress.SSTStart;
import it.cavallium.dbengine.client.SSTVerificationProgress;
import it.cavallium.dbengine.client.SSTVerificationProgress.SSTBlockBad;
import it.cavallium.dbengine.database.ColumnUtils;
import it.cavallium.dbengine.database.LLDelta;
import it.cavallium.dbengine.database.LLDictionary;
@ -35,30 +36,28 @@ import it.cavallium.dbengine.database.RocksDBLongProperty;
import it.cavallium.dbengine.database.SerializedKey;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.disk.RocksDBFile.IterationMetadata;
import it.cavallium.dbengine.database.disk.rocksdb.LLReadOptions;
import it.cavallium.dbengine.database.disk.rocksdb.LLWriteOptions;
import it.cavallium.dbengine.database.serialization.KVSerializationFunction;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.rpc.current.data.Column;
import it.cavallium.dbengine.utils.DBException;
import it.cavallium.dbengine.utils.StreamUtils;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
@ -640,120 +639,58 @@ public class LLLocalDictionary implements LLDictionary {
}
@Override
public Stream<VerificationProgress> verifyChecksum(LLRange rangeFull) {
public Stream<DbProgress<SSTVerificationProgress>> verifyChecksum(LLRange rangeFull) {
Set<String> brokenFiles = new ConcurrentHashMap<String, Boolean>().keySet(true);
LongAdder totalScanned = new LongAdder();
AtomicLong totalEstimate = new AtomicLong();
Set<String> filesToSkip = Stream
.of(System.getProperty("filenames.skip.list", "").split(","))
.filter(x -> !x.isBlank())
.map(String::trim)
.collect(Collectors.toUnmodifiableSet());
long totalEstimate;
try {
totalEstimate.set(db.getNumEntries());
} catch (Throwable ex) {
logger.warn("Failed to get total entries count", ex);
{
long totalEstimateTmp = 0;
try {
totalEstimateTmp = db.getNumEntries();
} catch (Throwable ex) {
logger.warn("Failed to get total entries count", ex);
}
totalEstimate = totalEstimateTmp;
}
Column column = ColumnUtils.special(columnName);
try {
record FileRange(String filePath, String filename, @Nullable LLRange range, long countEstimate) {}
return db.getAllLiveFiles()
.map(metadata -> new FileRange(Path.of(metadata.path()).resolve("./" + metadata.fileName()).normalize().toString(),
metadata.fileName().replace("/", ""),
LLRange.intersect(metadata.keysRange(), rangeFull),
metadata.numEntries()
))
.filter(fr -> fr.range != null)
// Skip some files
.filter(fr -> !filesToSkip.contains(fr.filename))
.parallel()
.<VerificationProgress>flatMap(fr -> {
String filename = fr.filename;
String path = fr.filePath;
LLRange rangePartial = fr.range;
AtomicLong fileScanned = new AtomicLong();
final long fileEstimate = fr.countEstimate;
AtomicBoolean streamStarted = new AtomicBoolean(false);
AtomicBoolean streamStarted2 = new AtomicBoolean(false);
AtomicBoolean streamEnded = new AtomicBoolean(false);
totalScanned.add(fileEstimate);
try {
return resourceStream(
() -> LLUtils.generateCustomReadOptions(null, false, isBoundedRange(rangePartial), false),
ro -> {
ro.setFillCache(false);
if (!rangePartial.isSingle()) {
if (LLUtils.MANUAL_READAHEAD) {
ro.setReadaheadSize(32 * 1024);
}
}
ro.setVerifyChecksums(true);
return resourceStream(() -> db.newRocksIterator(ro, rangePartial, false), rocksIterator -> {
return StreamUtils.<Optional<VerificationProgress>>streamWhileNonNull(() -> {
boolean first = streamStarted.compareAndSet(false, true);
boolean second = !first && streamStarted.compareAndSet(false, true);
if (!first && !rocksIterator.isValid()) {
if (streamEnded.compareAndSet(false, true)) {
totalScanned.add(fileScanned.get() - fileEstimate);
return Optional.of(new FileOk(databaseName, column, path));
} else {
//noinspection OptionalAssignedToNull
return null;
}
}
boolean shouldSendStatus;
Buf rawKey = null;
try {
if (second) {
if (PRINT_ALL_CHECKSUM_VERIFICATION_STEPS) {
logger.info("Seeking to {}->{}->first on file {}", databaseName, column.name(), filename);
}
rocksIterator.seekToFirst();
}
if (first) {
shouldSendStatus = true;
} else {
rawKey = rocksIterator.keyBuf().copy();
shouldSendStatus = fileScanned.incrementAndGet() % 1_000_000 == 0;
if (PRINT_ALL_CHECKSUM_VERIFICATION_STEPS) {
logger.info("Checking {}->{}->{} on file {}", databaseName, column.name(), rawKey.toString(), filename);
}
rocksIterator.next();
}
} catch (RocksDBException ex) {
return Optional.of(new BlockBad(databaseName, column, rawKey, path, ex));
}
if (shouldSendStatus) {
long totalScannedValue = totalScanned.sum();
long fileScannedVal = fileScanned.get();
return Optional.of(new Progress(databaseName,
column,
path,
totalScannedValue,
Math.max(totalEstimate.get(), totalScannedValue),
fileScannedVal,
Math.max(fileEstimate, fileScannedVal)
));
} else {
return Optional.empty();
}
}).filter(Optional::isPresent).map(Optional::get).onClose(() -> {
rocksIterator.close();
ro.close();
});
});
var liveFiles = db.getAllLiveFiles().toList();
var liveFilesCount = liveFiles.size();
return liveFiles.stream()
.sorted(Comparator.reverseOrder())
.flatMap(fr -> {
AtomicReference<IterationMetadata> metadataRef = new AtomicReference<>();
AtomicLong initialEstimate = new AtomicLong();
return fr
.verify(SSTRange.parse(rangeFull))
.map(status -> switch (status) {
case SSTStart fileStart -> {
metadataRef.set(fileStart.metadata());
long numEntriesEstimate = Objects.requireNonNullElse(fileStart.metadata().countEstimate(), totalEstimate / liveFilesCount);
totalScanned.add(numEntriesEstimate);
initialEstimate.set(numEntriesEstimate);
yield status;
}
);
} catch (RocksDBException e) {
return Stream.of(new BlockBad(databaseName, column, null, path, e));
}
case SSTOk fileEnd -> {
long initialNumEntriesEstimate = initialEstimate.get();
long numEntriesScanned = fileEnd.scannedCount();
totalScanned.add(numEntriesScanned - initialNumEntriesEstimate);
yield status;
}
case SSTProgressReport ignored -> status;
case SSTBlockBad ignored -> status;
})
.<DbProgress<SSTVerificationProgress>>map(sstProgress -> new DbProgress.DbSSTProgress<>(databaseName, column, metadataRef.get().filePath(), totalScanned.longValue(), totalEstimate, sstProgress));
})
.filter(err -> !(err instanceof BlockBad blockBad && blockBad.rawKey() == null && !brokenFiles.add(blockBad.file())));
.filter(err -> !(err instanceof DbProgress.DbSSTProgress<SSTVerificationProgress> sstProgress
&& sstProgress.sstProgress() instanceof SSTBlockBad blockBad
&& blockBad.rawKey() == null && !brokenFiles.add(sstProgress.fileString())));
} catch (RocksDBException e) {
return Stream.of(new BlockBad(databaseName, column, null, null, e));
return Stream.of(new DbProgress.DbSSTProgress<>(databaseName, column, null, 0, 0, new SSTBlockBad(null, e)));
}
}

View File

@ -34,7 +34,6 @@ import it.cavallium.dbengine.rpc.current.data.NoFilter;
import java.io.File;
import java.io.IOException;
import it.cavallium.dbengine.utils.DBException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
@ -80,6 +79,7 @@ import org.rocksdb.FlushOptions;
import org.rocksdb.IndexType;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.IngestExternalFileOptions;
import org.rocksdb.LiveFileMetaData;
import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.PersistentCache;
import org.rocksdb.PlainTableConfig;
@ -114,8 +114,6 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa
= Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.checks.paranoidfilechecks", "false"));
private static final boolean FORCE_COLUMN_FAMILY_CONSISTENCY_CHECKS
= Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.checks.forcecolumnfamilyconsistencychecks", "true"));
static final boolean PRINT_ALL_CHECKSUM_VERIFICATION_STEPS
= Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.checks.verification.print", "false"));
private static final InfoLogLevel LOG_LEVEL = InfoLogLevel.getInfoLogLevel(Byte.parseByte(System.getProperty("it.cavallium.dbengine.log.levelcode", "" + InfoLogLevel.WARN_LEVEL.getValue())));
private static final CacheFactory CACHE_FACTORY = USE_CLOCK_CACHE ? new ClockCacheFactory() : new LRUCacheFactory();
@ -622,7 +620,23 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa
}
}
public List<String> getColumnFiles(Column column, boolean excludeLastLevel) {
public Stream<RocksDBFile> getAllLiveFiles() throws RocksDBException {
var closeReadLock = closeLock.readLock();
try {
ensureOpen();
db.getLiveFiles(); // flushes the memtable
var liveFilesMetadata = db.getLiveFilesMetaData();
List<RocksDBFile> files = new ArrayList<>();
for (LiveFileMetaData file : liveFilesMetadata) {
files.add(new RocksDBColumnFile(db, getCfh(file.columnFamilyName()), file));
}
return files.stream();
} finally {
closeLock.unlockRead(closeReadLock);
}
}
public List<RocksDBFile> getColumnFiles(Column column, boolean excludeLastLevel) {
var closeReadLock = closeLock.readLock();
try {
ensureOpen();

View File

@ -1,8 +0,0 @@
package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.LLRange;
public record LiveFileMetadata(String path, String fileName, int level, String columnName, long numEntries, long size,
LLRange keysRange) {
}

View File

@ -54,7 +54,7 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn {
@NotNull RocksIteratorObj newIterator(@NotNull LLReadOptions readOptions, @Nullable Buf min, @Nullable Buf max);
Stream<LiveFileMetadata> getAllLiveFiles() throws RocksDBException;
Stream<RocksDBFile> getAllLiveFiles() throws RocksDBException;
@NotNull UpdateAtomicResult updateAtomic(@NotNull LLReadOptions readOptions,
@NotNull LLWriteOptions writeOptions,

View File

@ -0,0 +1,52 @@
package it.cavallium.dbengine.database.disk;
import static it.cavallium.dbengine.utils.StreamUtils.resourceStream;
import static java.util.Objects.requireNonNull;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.LiveFileMetaData;
import org.rocksdb.RocksDB;
import org.rocksdb.SstFileMetaData;
public class RocksDBColumnFile extends RocksDBFile {
private final ColumnFamilyHandle cfh;
public RocksDBColumnFile(RocksDB db, ColumnFamilyHandle cfh, RocksDBFileMetadata metadata) {
super(metadata);
this.cfh = cfh;
}
public <T extends RocksDB> RocksDBColumnFile(T db, ColumnFamilyHandle cfh, LiveFileMetaData file) {
this(db,
cfh,
new RocksDBFileMetadata(Path.of(file.path() + file.fileName()),
file.fileName(),
file.level(),
new String(file.columnFamilyName(), StandardCharsets.UTF_8),
file.numEntries(),
file.size(),
decodeRange(file.smallestKey(), file.largestKey())
)
);
}
public <T extends RocksDB> RocksDBColumnFile(T db, ColumnFamilyHandle cfh,
SstFileMetaData file, byte[] columnFamilyName, int level) {
this(db,
cfh,
new RocksDBFileMetadata(Path.of(file.path()+ file.fileName()),
file.fileName(),
level,
new String(columnFamilyName, StandardCharsets.UTF_8),
file.numEntries(),
file.size(),
decodeRange(file.smallestKey(), file.largestKey())
)
);
}
}

View File

@ -0,0 +1,297 @@
package it.cavallium.dbengine.database.disk;
import static it.cavallium.dbengine.utils.StreamUtils.resourceStream;
import static java.util.Objects.requireNonNull;
import com.google.common.primitives.Longs;
import it.cavallium.buffer.Buf;
import it.cavallium.dbengine.client.SSTDumpProgress;
import it.cavallium.dbengine.client.SSTDumpProgress.SSTBlockFail;
import it.cavallium.dbengine.client.SSTDumpProgress.SSTBlockKeyValue;
import it.cavallium.dbengine.client.SSTProgress.SSTOk;
import it.cavallium.dbengine.client.SSTProgress.SSTProgressReport;
import it.cavallium.dbengine.client.SSTProgress.SSTStart;
import it.cavallium.dbengine.client.SSTVerificationProgress;
import it.cavallium.dbengine.client.SSTVerificationProgress.SSTBlockBad;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.RocksDBFile.RocksDBFileIterationKeyState.RocksDBFileIterationStateKeyError;
import it.cavallium.dbengine.database.disk.RocksDBFile.RocksDBFileIterationKeyState.RocksDBFileIterationStateKeyOk;
import it.cavallium.dbengine.database.disk.RocksDBFile.RocksDBFileIterationState.RocksDBFileIterationStateBegin;
import it.cavallium.dbengine.database.disk.RocksDBFile.RocksDBFileIterationState.RocksDBFileIterationStateEnd;
import it.cavallium.dbengine.database.disk.RocksDBFile.RocksDBFileIterationState.RocksDBFileIterationStateKey;
import it.cavallium.dbengine.database.disk.SSTRange.SSTLLRange;
import it.cavallium.dbengine.database.disk.SSTRange.SSTRangeFull;
import it.cavallium.dbengine.database.disk.SSTRange.SSTRangeKey;
import it.cavallium.dbengine.database.disk.SSTRange.SSTRangeNone;
import it.cavallium.dbengine.database.disk.SSTRange.SSTRangeOffset;
import it.cavallium.dbengine.database.disk.SSTRange.SSTSingleKey;
import it.cavallium.dbengine.database.disk.rocksdb.LLSstFileReader;
import it.cavallium.dbengine.utils.StreamUtils;
import java.nio.file.Path;
import java.util.StringJoiner;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
public class RocksDBFile implements Comparable<RocksDBFile> {
protected static final Logger logger = LogManager.getLogger(RocksDBFile.class);
protected final RocksDBFileMetadata metadata;
protected final Long sstNumber;
public RocksDBFile(RocksDBFileMetadata metadata) {
this.metadata = metadata;
String fileName = metadata.fileName().startsWith("/") ? metadata.fileName().substring(1) : metadata.fileName();
int extensionIndex = fileName.indexOf(".");
Long sstNumber = null;
if (extensionIndex != -1) {
String numberRaw = fileName.substring(0, extensionIndex);
//noinspection UnstableApiUsage
this.sstNumber = Longs.tryParse(numberRaw);
} else {
this.sstNumber = null;
}
}
public <T extends RocksDB> RocksDBFile(Path dbBaseDir, String file) {
this(new RocksDBFileMetadata(dbBaseDir.resolve(file.startsWith("/") ? file.substring(1) : file),
StringUtils.substringAfter(file, '/'),
0,
"any",
0,
0,
LLRange.all()
)
);
}
protected static LLRange decodeRange(byte[] smallestKey, byte[] largestKey) {
return LLRange.of(Buf.wrap(smallestKey), Buf.wrap(largestKey));
}
private static SSTRange intersectWithMetadata(LLRange metadataRange, SSTRange innerRange) {
return switch (innerRange) {
case SSTRangeFull ignored -> SSTRange.parse(metadataRange);
case SSTSingleKey singleKey -> SSTRange.parse(LLRange.intersect(metadataRange, singleKey.toLLRange()));
case SSTRangeKey rangeKey -> SSTRange.parse(LLRange.intersect(metadataRange, rangeKey.toLLRange()));
case SSTRangeNone none -> none;
case SSTRangeOffset offset -> offset;
};
}
public RocksDBFileMetadata getMetadata() {
return metadata;
}
public Stream<SSTVerificationProgress> verify(SSTRange range) {
AtomicLong fileScanned = new AtomicLong();
AtomicLong fileTotal = new AtomicLong();
return iterate(range).map(state -> switch (state) {
case RocksDBFileIterationStateBegin begin -> {
var countEstimate = begin.metadata().countEstimate();
if (countEstimate != null) {
fileTotal.set(countEstimate);
}
yield new SSTStart(begin.metadata());
}
case RocksDBFileIterationStateKey key -> {
var scanned = fileScanned.incrementAndGet();
yield switch (key.state()) {
case RocksDBFileIterationStateKeyOk ignored ->
new SSTProgressReport(scanned, Math.max(scanned, fileTotal.get()));
case RocksDBFileIterationStateKeyError keyError -> new SSTBlockBad(key.key, keyError.exception);
};
}
case RocksDBFileIterationStateEnd end -> new SSTOk(end.scannedCount());
});
}
public Stream<SSTDumpProgress> readAllSST(SSTRange range, boolean failOnError) {
AtomicLong fileScanned = new AtomicLong();
AtomicLong fileTotal = new AtomicLong();
return iterate(range).<SSTDumpProgress>mapMulti((state, consumer) -> {
switch (state) {
case RocksDBFileIterationStateBegin begin -> {
var countEstimate = begin.metadata().countEstimate();
if (countEstimate != null) {
fileTotal.set(countEstimate);
}
consumer.accept(new SSTStart(begin.metadata()));
}
case RocksDBFileIterationStateKey key -> {
var scanned = fileScanned.incrementAndGet();
switch (key.state()) {
case RocksDBFileIterationStateKeyOk ignored -> {
consumer.accept(new SSTBlockKeyValue(key.key(), ignored.value()));
consumer.accept(new SSTProgressReport(scanned, Math.max(scanned, fileTotal.get())));
}
case RocksDBFileIterationStateKeyError keyError -> {
if (failOnError) {
throw new CompletionException(keyError.exception());
} else {
logger.error("Corrupted SST \"{}\" after \"{}\" scanned keys", sstNumber, scanned);
// This is sent before bad block, so takewhile still returns ok before the end, if failOnError is false
consumer.accept(new SSTOk(scanned));
}
consumer.accept(new SSTBlockFail(keyError.exception));
}
}
}
case RocksDBFileIterationStateEnd end -> consumer.accept(new SSTOk(end.scannedCount()));
}
}).takeWhile(data -> !(data instanceof SSTBlockFail));
}
public Stream<RocksDBFileIterationState> iterate(SSTRange rangeFull) {
var intersectedRange = RocksDBFile.intersectWithMetadata(metadata.keysRange(), rangeFull);
Path filePath = metadata.filePath();
String filePathString = filePath.toString();
var meta = new IterationMetadata(filePath,
metadata.fileName().replace("/", ""),
intersectedRange,
metadata.numEntries() > 0 ? metadata.numEntries() : null,
sstNumber
);
Stream<RocksDBFileIterationState> streamContent;
// Ignore the file if it's outside the requested range
if (intersectedRange instanceof SSTRangeNone) {
streamContent = Stream.of(new RocksDBFileIterationStateBegin(meta), new RocksDBFileIterationStateEnd(0L));
} else {
AtomicLong fileScanned = new AtomicLong();
AtomicBoolean mustSeek = new AtomicBoolean(true);
try {
streamContent = resourceStream(() -> new LLSstFileReader(false, filePathString),
r -> resourceStream(() -> LLUtils.generateCustomReadOptions(null, false, intersectedRange.isBounded(), false),
ro -> {
long skipToIndex;
long readToCount;
switch (intersectedRange) {
case SSTLLRange sstllRange -> {
var llRange = sstllRange.toLLRange();
requireNonNull(llRange);
ro.setIterateLowerBound(
llRange.getMin() != null ? requireNonNull(LLUtils.asArray(llRange.getMin())) : null);
ro.setIterateUpperBound(
llRange.getMax() != null ? requireNonNull(LLUtils.asArray(llRange.getMax())) : null);
skipToIndex = 0;
readToCount = Long.MAX_VALUE;
}
case SSTRangeOffset offset -> {
skipToIndex = offset.offsetMin() == null ? 0 : offset.offsetMin();
readToCount = offset.offsetMax() == null ? Long.MAX_VALUE : (offset.offsetMax() - skipToIndex);
}
default -> throw new IllegalStateException("Unexpected value: " + intersectedRange);
}
ro.setFillCache(true);
ro.setIgnoreRangeDeletions(true);
if (!(intersectedRange instanceof SSTSingleKey)) {
ro.setReadaheadSize(256 * 1024 * 1024);
}
ro.setVerifyChecksums(true);
return resourceStream(() -> ro.newIterator(r.get(), IteratorMetrics.NO_OP),
rocksIterator -> StreamUtils.<RocksDBFileIterationState>streamUntil(() -> {
boolean mustSeekVal = mustSeek.compareAndSet(true, false);
if (!mustSeekVal && !rocksIterator.isValid()) {
return new RocksDBFileIterationStateEnd(fileScanned.get());
}
Buf rawKey = null;
Buf rawValue = null;
RocksDBFileIterationKeyState keyResult;
var index = fileScanned.getAndIncrement();
if (index >= readToCount) {
return new RocksDBFileIterationStateEnd(fileScanned.get());
} else {
try {
if (mustSeekVal) {
rocksIterator.seekToFirstUnsafe();
if (skipToIndex > 0) {
for (long i = 0; i < skipToIndex; i++) {
if (!rocksIterator.isValid()) {
break;
}
rocksIterator.nextUnsafe();
}
}
return new RocksDBFileIterationStateBegin(meta);
} else {
rawKey = rocksIterator.keyBuf().copy();
rawValue = rocksIterator.valueBuf().copy();
rocksIterator.next();
}
keyResult = new RocksDBFileIterationStateKeyOk(rawValue);
} catch (RocksDBException ex) {
keyResult = new RocksDBFileIterationStateKeyError(ex);
}
return new RocksDBFileIterationStateKey(rawKey, keyResult, index);
}
}, x -> x instanceof RocksDBFileIterationStateEnd).onClose(() -> {
rocksIterator.close();
ro.close();
})
);
}
)
);
} catch (RocksDBException e) {
streamContent = Stream.of(new RocksDBFileIterationStateBegin(meta),
new RocksDBFileIterationStateKey(null, new RocksDBFileIterationStateKeyError(e), 0));
}
}
return streamContent;
}
@Override
public String toString() {
return new StringJoiner(", ", RocksDBFile.class.getSimpleName() + "[", "]")
.add("fileMetadata=" + metadata)
.toString();
}
@Override
public int compareTo(@NotNull RocksDBFile o) {
if (this.sstNumber == null && o.sstNumber == null) {
return 0;
} else if (this.sstNumber == null) {
return 1;
} else if (o.sstNumber == null) {
return -1;
}
return Long.compare(this.sstNumber, o.sstNumber);
}
public Long getSstNumber() {
return sstNumber;
}
public sealed interface RocksDBFileIterationState {
record RocksDBFileIterationStateBegin(IterationMetadata metadata) implements RocksDBFileIterationState {}
record RocksDBFileIterationStateKey(Buf key, RocksDBFileIterationKeyState state, long scannedCount) implements
RocksDBFileIterationState {}
record RocksDBFileIterationStateEnd(long scannedCount) implements RocksDBFileIterationState {}
}
public sealed interface RocksDBFileIterationKeyState {
record RocksDBFileIterationStateKeyOk(Buf value) implements RocksDBFileIterationKeyState {}
record RocksDBFileIterationStateKeyError(RocksDBException exception) implements RocksDBFileIterationKeyState {}
}
public record IterationMetadata(Path filePath, String filename, @NotNull SSTRange range,
@Nullable Long countEstimate, @Nullable Long sstNumber) {}
}

View File

@ -0,0 +1,9 @@
package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.LLRange;
import java.nio.file.Path;
public record RocksDBFileMetadata(Path filePath, String fileName, int level, String columnName, long numEntries, long size,
LLRange keysRange) {
}

View File

@ -26,15 +26,15 @@ public class RocksDBUtils {
return db.numberLevels(cfh);
}
public static List<String> getColumnFiles(RocksDB db, ColumnFamilyHandle cfh, boolean excludeLastLevel) {
List<String> files = new ArrayList<>();
public static List<RocksDBFile> getColumnFiles(RocksDB db, ColumnFamilyHandle cfh, boolean excludeLastLevel) {
List<RocksDBFile> files = new ArrayList<>();
var meta = db.getColumnFamilyMetaData(cfh);
var lastLevelId = excludeLastLevel ? (getLevels(db, cfh) - 1) : -1;
for (LevelMetaData level : meta.levels()) {
if (!excludeLastLevel || level.level() < lastLevelId) {
for (SstFileMetaData file : level.files()) {
if (file.fileName().endsWith(".sst")) {
files.add(file.fileName());
files.add(new RocksDBColumnFile(db, cfh, file, meta.name(), level.level()));
}
}
}
@ -51,18 +51,18 @@ public class RocksDBUtils {
.setCompression(CompressionType.LZ4_COMPRESSION)
.setMaxSubcompactions(0)
.setOutputFileSizeLimit(2 * SizeUnit.GB)) {
List<String> filesToCompact = getColumnFiles(db, cfh, true);
var filesToCompact = getColumnFiles(db, cfh, true);
if (!filesToCompact.isEmpty()) {
var partitionSize = filesToCompact.size() / Runtime.getRuntime().availableProcessors();
List<List<String>> partitions;
List<List<RocksDBFile>> partitions;
if (partitionSize > 0) {
partitions = partition(filesToCompact, partitionSize);
} else {
partitions = List.of(filesToCompact);
}
int finalBottommostLevelId = getLevels(db, cfh) - 1;
for (List<String> partition : partitions) {
for (List<RocksDBFile> partition : partitions) {
logger.info("Compacting {} files in database {} in column family {} to level {}",
partition.size(),
logDbName,
@ -72,7 +72,8 @@ public class RocksDBUtils {
if (!partition.isEmpty()) {
var coi = new CompactionJobInfo();
try {
db.compactFiles(co, cfh, partition, finalBottommostLevelId, volumeId, coi);
var partitionFileNames = partition.stream().map(x -> x.getMetadata().fileName()).toList();
db.compactFiles(co, cfh, partitionFileNames, finalBottommostLevelId, volumeId, coi);
logger.info("Compacted {} files in database {} in column family {} to level {}: {}",
partition.size(),
logDbName,

View File

@ -0,0 +1,117 @@
package it.cavallium.dbengine.database.disk;
import it.cavallium.buffer.Buf;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils;
import java.util.HexFormat;
import java.util.Objects;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
public sealed interface SSTRange {
boolean isBounded();
sealed interface SSTLLRange extends SSTRange permits SSTRangeNone, SSTRangeFull, SSTRangeKey, SSTSingleKey {
@Nullable LLRange toLLRange();
default boolean isBounded() {
LLRange r = toLLRange();
return r == null || LLUtils.isBoundedRange(r);
}
}
;
static SSTRange parse(String raw) {
var parts = StringUtils.split(raw, '-');
return switch (parts[0]) {
case "none" -> new SSTRangeNone();
case "full" -> new SSTRangeFull();
case "offset" -> new SSTRangeOffset(parts[1].isBlank() ? null : Long.parseUnsignedLong(parts[1]),
parts[2].isBlank() ? null : Long.parseUnsignedLong(parts[2])
);
case "key" -> new SSTRangeKey(parts[1].isBlank() ? null : Buf.wrap(LLUtils.parseHex(parts[1])),
parts[2].isBlank() ? null : Buf.wrap(LLUtils.parseHex(parts[2]))
);
case "single-key" -> new SSTSingleKey(Buf.wrap(LLUtils.parseHex(parts[1])));
default -> throw new IllegalStateException("Unexpected value: " + parts[0]);
};
}
static SSTRange parse(LLRange range) {
if (range == null) {
return new SSTRangeNone();
}
return range.isAll() ? new SSTRangeFull()
: range.isSingle() ? new SSTSingleKey(range.getSingle()) : new SSTRangeKey(range.getMin(), range.getMax());
}
record SSTRangeNone() implements SSTRange, SSTLLRange {
@Override
public String toString() {
return "none";
}
public LLRange toLLRange() {
return null;
}
}
record SSTRangeFull() implements SSTRange, SSTLLRange {
@Override
public String toString() {
return "full";
}
public LLRange toLLRange() {
return LLRange.all();
}
}
record SSTRangeOffset(@Nullable Long offsetMin, @Nullable Long offsetMax) implements SSTRange {
@Override
public String toString() {
return "offset-" + (offsetMin != null ? offsetMin : "") + "-" + (offsetMax != null ? offsetMax : "");
}
@Override
public boolean isBounded() {
return offsetMin != null && offsetMax != null;
}
}
record SSTRangeKey(@Nullable Buf min, @Nullable Buf max) implements SSTRange, SSTLLRange {
private static final HexFormat HF = HexFormat.of();
@Override
public String toString() {
return "key-" + (min != null ? HF.formatHex(min.asUnboundedArray(), 0, min.size()) : "") + "-" + (max != null
? HF.formatHex(max.asUnboundedArray(), 0, max.size()) : "");
}
public LLRange toLLRange() {
return LLRange.of(min, max);
}
}
record SSTSingleKey(@NotNull Buf key) implements SSTRange, SSTLLRange {
private static final HexFormat HF = HexFormat.of();
@Override
public String toString() {
return "single-key-" + HF.formatHex(key.asUnboundedArray(), 0, key.size());
}
public LLRange toLLRange() {
return LLRange.single(key);
}
}
}

View File

@ -9,6 +9,7 @@ import org.rocksdb.ReadOptions;
import org.rocksdb.ReadTier;
import org.rocksdb.RocksDB;
import org.rocksdb.Snapshot;
import org.rocksdb.SstFileReader;
public final class LLReadOptions extends SimpleResource {
@ -115,7 +116,11 @@ public final class LLReadOptions extends SimpleResource {
}
public RocksIteratorObj newIterator(RocksDB db, ColumnFamilyHandle cfh, IteratorMetrics iteratorMetrics) {
return new RocksIteratorObj(db.newIterator(cfh, val), this, iteratorMetrics);
return RocksIteratorObj.create(db.newIterator(cfh, val), this, iteratorMetrics);
}
public RocksIteratorObj newIterator(SstFileReader r, IteratorMetrics iteratorMetrics) {
return RocksIteratorObj.create(r.newIterator(val), this, iteratorMetrics);
}
public void setFillCache(boolean fillCache) {

View File

@ -0,0 +1,31 @@
package it.cavallium.dbengine.database.disk.rocksdb;
import it.cavallium.dbengine.utils.SimpleResource;
import org.rocksdb.Options;
import org.rocksdb.RocksDBException;
import org.rocksdb.SstFileReader;
public class LLSstFileReader extends SimpleResource {
private final Options options;
private final SstFileReader r;
public LLSstFileReader(boolean checks, String filePath) throws RocksDBException {
this.options = new Options();
this.r = new SstFileReader(options
.setAllowMmapReads(true)
.setParanoidChecks(checks)
);
r.open(filePath);
}
public SstFileReader get() {
return r;
}
@Override
protected void onClose() {
r.close();
options.close();
}
}

View File

@ -0,0 +1,40 @@
package it.cavallium.dbengine.database.disk.rocksdb;
import it.cavallium.dbengine.utils.SimpleResource;
import org.rocksdb.CompressionOptions;
import org.rocksdb.CompressionType;
import org.rocksdb.EnvOptions;
import org.rocksdb.Options;
import org.rocksdb.SstFileWriter;
public class LLSstFileWriter extends SimpleResource {
private final Options options;
private final CompressionOptions compressionOptions;
private final SstFileWriter w;
public LLSstFileWriter(boolean unorderedWrite) {
this.options = new Options();
this.compressionOptions = new CompressionOptions();
this.w = new SstFileWriter(new EnvOptions(),
options
.setCompressionOptions(compressionOptions
.setEnabled(true)
.setMaxDictBytes(32768)
.setZStdMaxTrainBytes(32768 * 4))
.setCompressionType(CompressionType.ZSTD_COMPRESSION)
.setUnorderedWrite(unorderedWrite)
);
}
public SstFileWriter get() {
return w;
}
@Override
protected void onClose() {
w.close();
compressionOptions.close();
options.close();
}
}

View File

@ -7,25 +7,25 @@ import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.IteratorMetrics;
import it.cavallium.dbengine.utils.SimpleResource;
import java.nio.ByteBuffer;
import org.rocksdb.AbstractSlice;
import org.rocksdb.AbstractRocksIterator;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.SstFileReaderIterator;
public class RocksIteratorObj extends SimpleResource {
public abstract class RocksIteratorObj extends SimpleResource {
private LLReadOptions readOptions;
private final RocksIterator rocksIterator;
private final Counter startedIterSeek;
private final Counter endedIterSeek;
private final Timer iterSeekTime;
private final Counter startedIterNext;
private final Counter endedIterNext;
private final Timer iterNextTime;
private byte[] seekingFrom;
protected LLReadOptions readOptions;
protected final AbstractRocksIterator<?> rocksIterator;
protected final Counter startedIterSeek;
protected final Counter endedIterSeek;
protected final Timer iterSeekTime;
protected final Counter startedIterNext;
protected final Counter endedIterNext;
protected final Timer iterNextTime;
protected byte[] seekingFrom;
private byte[] seekingTo;
RocksIteratorObj(RocksIterator rocksIterator,
LLReadOptions readOptions, IteratorMetrics iteratorMetrics) {
RocksIteratorObj(AbstractRocksIterator<?> rocksIterator, LLReadOptions readOptions, IteratorMetrics iteratorMetrics) {
super(rocksIterator::close);
this.readOptions = readOptions;
this.rocksIterator = rocksIterator;
@ -37,6 +37,92 @@ public class RocksIteratorObj extends SimpleResource {
this.iterNextTime = iteratorMetrics.iterNextTime();
}
public static RocksIteratorObj create(AbstractRocksIterator<?> rocksIterator,
LLReadOptions readOptions,
IteratorMetrics iteratorMetrics) {
return switch (rocksIterator) {
case RocksIterator it -> new RocksIteratorObj1(it, readOptions, iteratorMetrics);
case SstFileReaderIterator it -> new RocksIteratorObj2(it, readOptions, iteratorMetrics);
default -> throw new IllegalStateException("Unsupported iterator type");
};
}
private static class RocksIteratorObj1 extends RocksIteratorObj {
private final RocksIterator rocksIterator;
private RocksIteratorObj1(RocksIterator rocksIterator, LLReadOptions readOptions, IteratorMetrics iteratorMetrics) {
super(rocksIterator, readOptions, iteratorMetrics);
this.rocksIterator = rocksIterator;
}
@Deprecated(forRemoval = true)
public synchronized int key(ByteBuffer buffer) {
ensureOpen();
return rocksIterator.key(buffer);
}
@Deprecated(forRemoval = true)
public synchronized int value(ByteBuffer buffer) {
ensureOpen();
return rocksIterator.value(buffer);
}
/**
* The returned buffer may change when calling next() or when the iterator is not valid anymore
*/
public synchronized byte[] key() {
ensureOpen();
return rocksIterator.key();
}
/**
* The returned buffer may change when calling next() or when the iterator is not valid anymore
*/
public synchronized byte[] value() {
ensureOpen();
return rocksIterator.value();
}
}
private static class RocksIteratorObj2 extends RocksIteratorObj {
private final SstFileReaderIterator rocksIterator;
private RocksIteratorObj2(SstFileReaderIterator rocksIterator, LLReadOptions readOptions, IteratorMetrics iteratorMetrics) {
super(rocksIterator, readOptions, iteratorMetrics);
this.rocksIterator = rocksIterator;
}
@Deprecated(forRemoval = true)
public synchronized int key(ByteBuffer buffer) {
ensureOpen();
return rocksIterator.key(buffer);
}
@Deprecated(forRemoval = true)
public synchronized int value(ByteBuffer buffer) {
ensureOpen();
return rocksIterator.value(buffer);
}
/**
* The returned buffer may change when calling next() or when the iterator is not valid anymore
*/
public synchronized byte[] key() {
ensureOpen();
return rocksIterator.key();
}
/**
* The returned buffer may change when calling next() or when the iterator is not valid anymore
*/
public synchronized byte[] value() {
ensureOpen();
return rocksIterator.value();
}
}
public synchronized void seek(ByteBuffer seekBuf) throws RocksDBException {
ensureOpen();
startedIterSeek.increment();
@ -70,6 +156,18 @@ public class RocksIteratorObj extends SimpleResource {
rocksIterator.status();
}
public synchronized void seekToFirstUnsafe() throws RocksDBException {
rocksIterator.seekToFirst();
}
public synchronized void seekToLastUnsafe() throws RocksDBException {
rocksIterator.seekToLast();
}
public synchronized void nextUnsafe() throws RocksDBException {
rocksIterator.next();
}
public synchronized void seekToLast() throws RocksDBException {
ensureOpen();
startedIterSeek.increment();
@ -118,33 +216,25 @@ public class RocksIteratorObj extends SimpleResource {
return rocksIterator.isValid();
}
@Deprecated(forRemoval = true)
public synchronized int key(ByteBuffer buffer) {
ensureOpen();
return rocksIterator.key(buffer);
public synchronized boolean isValidUnsafe() {
return rocksIterator.isValid();
}
@Deprecated(forRemoval = true)
public synchronized int value(ByteBuffer buffer) {
ensureOpen();
return rocksIterator.value(buffer);
}
public abstract int key(ByteBuffer buffer);
@Deprecated(forRemoval = true)
public abstract int value(ByteBuffer buffer);
/**
* The returned buffer may change when calling next() or when the iterator is not valid anymore
*/
public synchronized byte[] key() {
ensureOpen();
return rocksIterator.key();
}
public abstract byte[] key();
/**
* The returned buffer may change when calling next() or when the iterator is not valid anymore
*/
public synchronized byte[] value() {
ensureOpen();
return rocksIterator.value();
}
public abstract byte[] value();
/**
* The returned buffer may change when calling next() or when the iterator is not valid anymore

View File

@ -5,7 +5,8 @@ import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import it.cavallium.buffer.Buf;
import it.cavallium.dbengine.client.VerificationProgress;
import it.cavallium.dbengine.client.DbProgress;
import it.cavallium.dbengine.client.SSTVerificationProgress;
import it.cavallium.dbengine.database.LLDelta;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
@ -358,7 +359,7 @@ public class LLMemoryDictionary implements LLDictionary {
}
@Override
public Stream<VerificationProgress> verifyChecksum(LLRange range) {
public Stream<DbProgress<SSTVerificationProgress>> verifyChecksum(LLRange range) {
return Stream.empty();
}

View File

@ -11,6 +11,7 @@ import java.util.Comparator;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.Spliterator;
@ -62,6 +63,7 @@ public class StreamUtils {
private static final BinaryOperator<Object> COMBINER = (a, b) -> NULL;
private static final Function<Object, Void> FINISHER = x -> null;
private static final Collector<Long,?, Long> SUMMING_LONG_COLLECTOR = new SummingLongCollector();
private static final Consumer<?> NOOP_CONSUMER = x -> {};
public static ForkJoinPool newNamedForkJoinPool(String name, boolean async) {
final int MAX_CAP = 0x7fff; // max #workers - 1
@ -139,8 +141,12 @@ public class StreamUtils {
}
}
@SuppressWarnings("UnstableApiUsage")
public static <X> Stream<X> streamWhileNonNull(Supplier<? extends X> supplier) {
return streamWhile(supplier, Objects::nonNull);
}
@SuppressWarnings("UnstableApiUsage")
public static <X> Stream<X> streamWhile(Supplier<? extends X> supplier, Predicate<? super X> endPredicate) {
var it = new Iterator<X>() {
private boolean nextSet = false;
@ -152,7 +158,7 @@ public class StreamUtils {
next = supplier.get();
nextSet = true;
}
return next != null;
return endPredicate.test(next);
}
@Override
@ -164,6 +170,40 @@ public class StreamUtils {
return Streams.stream(it);
}
@SuppressWarnings("UnstableApiUsage")
public static <X> Stream<X> streamUntil(Supplier<? extends X> supplier, Predicate<? super X> endPredicate) {
var it = new Iterator<X>() {
private boolean nextSet = false;
private byte state = (byte) 0;
private X next;
@Override
public boolean hasNext() {
if (state == (byte) 2) {
return false;
} else {
if (!nextSet) {
next = supplier.get();
state = endPredicate.test(next) ? (byte) 1 : 0;
nextSet = true;
}
return true;
}
}
@Override
public X next() {
if (state == (byte) 1) {
state = (byte) 2;
}
nextSet = false;
return next;
}
};
return Streams.stream(it);
}
@SuppressWarnings("DataFlowIssue")
@NotNull
public static <X> List<X> toList(Stream<X> stream) {
@ -285,6 +325,11 @@ public class StreamUtils {
return new ExecutingCollector<>(consumer);
}
public static <I> Collector<I, ?, Void> executing() {
//noinspection unchecked
return new ExecutingCollector<>((Consumer<? super I>) NOOP_CONSUMER);
}
public static <I> Collector<I, ?, Long> countingExecuting(Consumer<? super I> consumer) {
return new CountingExecutingCollector<>(consumer);
}

View File

@ -0,0 +1,194 @@
package it.cavallium.dbengine.repair;
import java.io.Serial;
import java.text.CharacterIterator;
import java.text.StringCharacterIterator;
import org.jetbrains.annotations.NotNull;
@SuppressWarnings("unused")
public final class DataSize extends Number implements Comparable<DataSize> {
@Serial
private static final long serialVersionUID = 7213411239846723568L;
public static DataSize ZERO = new DataSize(0L);
public static DataSize ONE = new DataSize(1L);
public static DataSize KIB = new DataSize(1024L);
public static DataSize KB = new DataSize(1000L);
public static DataSize MIB = new DataSize(1024L * 1024);
public static DataSize MB = new DataSize(1000L * 1000);
public static DataSize GIB = new DataSize(1024L * 1024 * 1024);
public static DataSize GB = new DataSize(1000L * 1000 * 1000);
public static DataSize TIB = new DataSize(1024L * 1024 * 1024 * 1024);
public static DataSize TB = new DataSize(1000L * 1000 * 1000 * 1024);
public static DataSize PIB = new DataSize(1024L * 1024 * 1024 * 1024 * 1024);
public static DataSize PB = new DataSize(1000L * 1000 * 1000 * 1024 * 1024);
public static DataSize EIB = new DataSize(1024L * 1024 * 1024 * 1024 * 1024 * 1024);
public static DataSize EB = new DataSize(1000L * 1000 * 1000 * 1024 * 1024 * 1024);
public static DataSize MAX_VALUE = new DataSize(Long.MAX_VALUE);
private final long size;
public DataSize(long size) {
this.size = size;
}
public DataSize(String size) {
size = size.replaceAll("\\s|_", "");
switch (size) {
case "", "0", "-0", "+0" -> {
this.size = 0;
return;
}
case "", "inf", "infinite", "∞b" -> {
this.size = Long.MAX_VALUE;
return;
}
}
int numberStartOffset = 0;
int numberEndOffset = 0;
boolean negative = false;
{
boolean firstChar = true;
boolean numberMode = true;
for (char c : size.toCharArray()) {
if (c == '-') {
if (firstChar) {
negative = true;
numberStartOffset++;
numberEndOffset++;
} else {
throw new IllegalArgumentException("Found a minus character after index 0");
}
} else if (Character.isDigit(c)) {
if (numberMode) {
numberEndOffset++;
} else {
throw new IllegalArgumentException("Found a number after the unit");
}
} else if (Character.isLetter(c)) {
if (numberEndOffset - numberStartOffset <= 0) {
throw new IllegalArgumentException("No number found");
}
if (numberMode) {
numberMode = false;
}
} else {
throw new IllegalArgumentException("Unsupported character");
}
if (firstChar) {
firstChar = false;
}
}
}
var number = Long.parseUnsignedLong(size, numberStartOffset, numberEndOffset, 10);
if (numberEndOffset == size.length()) {
// No measurement
this.size = (negative ? -1 : 1) * number;
return;
}
// Measurements are like B, MB, or MiB, not longer
if (size.length() - numberEndOffset > 3) {
throw new IllegalArgumentException("Wrong measurement unit");
}
var scaleChar = size.charAt(numberEndOffset);
boolean powerOf2 = numberEndOffset + 1 < size.length() && size.charAt(numberEndOffset + 1) == 'i';
int k = powerOf2 ? 1024 : 1000;
var scale = switch (scaleChar) {
case 'B' -> 1;
case 'b' -> throw new IllegalArgumentException("Bits are not allowed");
case 'K', 'k' -> k;
case 'M', 'm' -> k * k;
case 'G', 'g' -> k * k * k;
case 'T', 't' -> k * k * k * k;
case 'P', 'p' -> k * k * k * k * k;
case 'E', 'e' -> k * k * k * k * k * k;
case 'Z', 'z' -> k * k * k * k * k * k * k;
case 'Y', 'y' -> k * k * k * k * k * k * k * k;
default -> throw new IllegalStateException("Unexpected value: " + scaleChar);
};
// if scale is 1, the unit should be "B", nothing more
if (scale == 1 && numberEndOffset + 1 != size.length()) {
throw new IllegalArgumentException("Invalid unit");
}
this.size = (negative ? -1 : 1) * number * scale;
}
public static Long get(DataSize value) {
if (value == null) {
return null;
} else {
return value.size;
}
}
public static long getOrElse(DataSize value, @NotNull DataSize defaultValue) {
if (value == null) {
return defaultValue.size;
} else {
return value.size;
}
}
@Override
public int intValue() {
if (size >= Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
}
return (int) size;
}
@Override
public long longValue() {
return size;
}
@Override
public float floatValue() {
return size;
}
@Override
public double doubleValue() {
return size;
}
@Override
public String toString() {
return toString(true);
}
public String toString(boolean precise) {
boolean siUnits = size % 1000 == 0;
int k = siUnits ? 1000 : 1024;
long lSize = size;
CharacterIterator ci = new StringCharacterIterator((siUnits ? "k" : "K") + "MGTPEZY");
while ((precise ? lSize % k == 0 : lSize > k) && lSize != 0) {
lSize /= k;
ci.next();
}
if (lSize == size) {
return lSize + "B";
}
return lSize + "" + ci.previous() + (siUnits ? "B" : "iB");
}
@Override
public int compareTo(@NotNull DataSize anotherLong) {
return Long.compare(this.size, anotherLong.size);
}
@Override
public boolean equals(Object obj) {
if (obj instanceof DataSize) {
return size == ((DataSize)obj).size;
}
return false;
}
@Override
public int hashCode() {
return Long.hashCode(size);
}
}

View File

@ -1,24 +1,34 @@
package it.cavallium.dbengine.repair;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Meter;
import static it.cavallium.dbengine.client.DbProgress.toDbProgress;
import static it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection.getDatabasePath;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.logging.LoggingMeterRegistry;
import io.micrometer.core.instrument.noop.NoopMeter;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import it.cavallium.datagen.nativedata.NullableString;
import it.cavallium.datagen.nativedata.Nullableboolean;
import it.cavallium.datagen.nativedata.Nullableint;
import it.cavallium.datagen.nativedata.Nullablelong;
import it.cavallium.dbengine.client.Compression;
import it.cavallium.dbengine.client.VerificationProgress.BlockBad;
import it.cavallium.dbengine.client.VerificationProgress.FileOk;
import it.cavallium.dbengine.client.VerificationProgress.Progress;
import it.cavallium.dbengine.client.DbProgress;
import it.cavallium.dbengine.client.DbProgress.DbSSTProgress;
import it.cavallium.dbengine.client.LongProgressTracker;
import it.cavallium.dbengine.client.SSTDumpProgress;
import it.cavallium.dbengine.client.SSTDumpProgress.SSTBlockKeyValue;
import it.cavallium.dbengine.client.SSTProgress;
import it.cavallium.dbengine.client.SSTProgress.SSTOk;
import it.cavallium.dbengine.client.SSTProgress.SSTProgressReport;
import it.cavallium.dbengine.client.SSTProgress.SSTStart;
import it.cavallium.dbengine.client.SSTVerificationProgress.SSTBlockBad;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.disk.RocksDBFile;
import it.cavallium.dbengine.database.disk.SSTRange;
import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
import it.cavallium.dbengine.database.disk.LLLocalDictionary;
import it.cavallium.dbengine.database.disk.LLLocalKeyValueDatabase;
import it.cavallium.dbengine.database.disk.SSTRange.SSTRangeFull;
import it.cavallium.dbengine.database.disk.rocksdb.LLSstFileWriter;
import it.cavallium.dbengine.rpc.current.data.Column;
import it.cavallium.dbengine.rpc.current.data.DatabaseOptionsBuilder;
import it.cavallium.dbengine.rpc.current.data.DatabaseVolume;
@ -29,25 +39,51 @@ import it.cavallium.dbengine.rpc.current.data.nullables.NullableFilter;
import it.cavallium.dbengine.utils.StreamUtils;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import it.unimi.dsi.fastutil.objects.ObjectList;
import java.io.BufferedWriter;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.BasicFileAttributeView;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.FormatStyle;
import java.util.Base64;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.SstFileWriter;
public class Repair {
public static final MeterRegistry METER = new SimpleMeterRegistry(); // LoggingMeterRegistry.builder(key -> null).clock(Clock.SYSTEM).loggingSink(System.err::println).build();
public static void main(String[] argsArray) throws RocksDBException {
static final boolean PRINT_ALL_CHECKSUM_VERIFICATION_STEPS
= Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.checks.verification.print", "false"));
public static void main(String[] argsArray) throws RocksDBException, IOException {
System.setProperty("it.cavallium.dbengine.checks.compression", "true");
System.setProperty("it.cavallium.dbengine.checks.paranoid", "true");
System.setProperty("it.cavallium.dbengine.checks.filesize", "true");
@ -62,7 +98,7 @@ public class Repair {
args = args.subList(1, args.size());
switch (command.toLowerCase(Locale.ROOT)) {
case "list-column-families" -> {
if (args.size() < 3) {
if (args.size() != 2) {
printHelp(initialArgs);
}
Path path = Path.of(args.get(0));
@ -80,41 +116,133 @@ public class Repair {
var conn = new LLLocalDatabaseConnection(METER, path, false);
conn.connect();
LLLocalKeyValueDatabase db = getDatabase(conn, dbName, columnNames);
db.getAllColumnFamilyHandles().forEach((column, cfh) -> {
System.err.printf("Scanning column: %s%n", column.name());
LLLocalDictionary dict = db.getDictionary(column.name().getBytes(StandardCharsets.UTF_8), UpdateMode.DISALLOW);
StreamUtils.collectOn(StreamUtils.ROCKSDB_POOL, dict.verifyChecksum(LLRange.all()), StreamUtils.executing(block -> {
synchronized (Repair.class) {
switch (block) {
case null -> {}
case BlockBad blockBad -> {
System.err.println("[ ! ] Bad block found: " + block.databaseName() + (block.column() != null ? "->" + block.column().name() : "") + "->" + blockBad.rawKey() + "->" + block.file());
if (blockBad.ex() != null) {
if (blockBad.ex() instanceof RocksDBException ex) {
System.err.println("\t" + ex);
} else {
blockBad.ex().printStackTrace(System.err);
}
}
try (var os = getErrorLogFile(dbName)) {
db.getAllColumnFamilyHandles().forEach((column, cfh) -> {
System.err.printf("Scanning column: %s%n", column.name());
LLLocalDictionary dict = db.getDictionary(column.name().getBytes(StandardCharsets.UTF_8), UpdateMode.DISALLOW);
LongProgressTracker fileCountTotalTracker = new LongProgressTracker(0);
StreamUtils.collectOn(ForkJoinPool.commonPool(),
peekProgress(os, fileCountTotalTracker, 0, dict.verifyChecksum(LLRange.all())),
StreamUtils.executing()
);
});
}
}
case "scan-files" -> {
if (args.size() < 3) {
printHelp(initialArgs);
}
Path path = Path.of(args.get(0)).toAbsolutePath();
String dbName = args.get(1);
boolean skip = args.get(2).equalsIgnoreCase("--skip");
List<String> fileNames = args.subList(skip ? 3 : 2, args.size()).stream().map(f -> f.startsWith("/") ? f : ("/" + f)).toList();
List<String> columnNames = getColumnFamilyNames(path, dbName).toList();
System.err.printf("Scanning database \"%s\" at \"%s\", files:%n%s%n", dbName, path, String.join("\n", fileNames));
var conn = new LLLocalDatabaseConnection(METER, path, false);
conn.connect();
LLLocalKeyValueDatabase db = getDatabase(conn, dbName, columnNames);
try (var os = getErrorLogFile(dbName)) {
AtomicLong ignoredFiles = new AtomicLong();
var fileList = db.getAllLiveFiles()
.filter(file -> {
if (file.getSstNumber() == null) {
System.err.printf("Empty SST number: %s%n", file);
}
case FileOk fileOk -> {
System.err.println("File is ok: " + block.databaseName() + (block.column() != null ? "->" + block.column().name() : "") + "->" + block.file());
if (!skip && !fileNames.contains(file.getMetadata().fileName())) {
ignoredFiles.incrementAndGet();
System.err.printf("Ignoring file: \"%s\"%n", file.getMetadata().fileName());
return false;
} else if (skip && fileNames.contains(file.getMetadata().fileName())) {
ignoredFiles.incrementAndGet();
System.err.printf("Ignoring file: \"%s\"%n", file.getMetadata().fileName());
return false;
} else {
System.err.printf("About to scan file: \"%s\"%n", file.getMetadata().fileName());
return true;
}
case Progress progress -> {
System.err.printf("Progress: [%d/%d] file: [%d/%d] %s%s->%s%n",
progress.scanned(),
progress.total(),
progress.fileScanned(),
progress.fileTotal(),
block.databaseName(),
block.column() != null ? "->" + block.column().name() : "",
block.file()
);
}
default -> throw new IllegalStateException("Unexpected value: " + block);
}
})
.sorted(Comparator.reverseOrder())
.toList();
var keyTotalTracker = new LongProgressTracker();
var parallelism = Math.max(1, Runtime.getRuntime().availableProcessors());
try (var pool = Executors.newWorkStealingPool(parallelism)) {
LongProgressTracker fileCountTotalTracker = new LongProgressTracker(fileList.size());
for (int i = 0; i < parallelism; i++) {
var iF = i;
pool.execute(() -> peekProgress(os, fileCountTotalTracker, ignoredFiles.get(),
IntStream.range(0, fileList.size())
.filter(n -> n % parallelism == iF)
.mapToObj(fileList::get)
.flatMap(file -> toDbProgress(dbName, "any", keyTotalTracker, file.verify(new SSTRangeFull()))))
.sequential()
.forEachOrdered(x -> {}));
}
}));
}
}
}
case "export-files" -> {
if (args.size() < 4) {
printHelp(initialArgs);
}
Path path = Path.of(args.get(0)).toAbsolutePath();
String dbName = args.get(1);
Path dbPath = getDatabasePath(path, dbName);
Path destPath = Path.of(args.get(2)).toAbsolutePath();
record Input(String fileName, SSTRange exportOption) {
@Override
public String toString() {
return Input.this.fileName + "[" + Input.this.exportOption + "]";
}
}
Map<String, Input> fileNames = args.subList(3, args.size()).stream().<Input>map(f -> {
var fn = f.startsWith("/") ? f : ("/" + f);
if (fn.indexOf('[') == -1) {
fn = fn + "[full]";
}
var fileName = StringUtils.substringBeforeLast(fn, "[");
var exportOptions = SSTRange.parse(StringUtils.substringBetween(fn, "[", "]"));
return new Input(fileName, exportOptions);
}).collect(Collectors.toMap(Input::fileName, Function.identity()));
List<String> columnNames = getColumnFamilyNames(path, dbName).toList();
System.err.printf("Exporting database \"%s\" at \"%s\" to \"%s\", files:%n%s%n", dbName, path, destPath, String.join("\n", fileNames.values().stream().map(Input::toString).toList()));
var conn = new LLLocalDatabaseConnection(METER, path, false);
conn.connect();
try (var os = getErrorLogFile(dbName)) {
var fileList = fileNames.values().stream()
.map(input -> Map.entry(new RocksDBFile(dbPath, input.fileName()), input.exportOption()))
.toList();
var keyTotalTracker = new LongProgressTracker();
LongProgressTracker fileCountTotalTracker = new LongProgressTracker(fileList.size());
peekProgress(os, fileCountTotalTracker, 0, fileList.stream().flatMap(e ->
toDbProgress(dbName, "any", keyTotalTracker,
peekExportSST(destPath, e.getKey().readAllSST(e.getValue(), false))))
).sequential().forEachOrdered(x -> {});
}
}
case "list-files" -> {
if (args.size() != 2) {
printHelp(initialArgs);
}
Path path = Path.of(args.get(0)).toAbsolutePath();
String dbName = args.get(1);
List<String> columnNames = getColumnFamilyNames(path, dbName).toList();
System.err.printf("Getting files of database \"%s\" at \"%s\"%n", dbName, path);
System.out.println("Name \tPath \tColumn name \tKeys range \tSize \tEntries \tLevel");
var conn = new LLLocalDatabaseConnection(METER, path, false);
conn.connect();
LLLocalKeyValueDatabase db = getDatabase(conn, dbName, columnNames);
db.getAllLiveFiles().sorted(Comparator
.<RocksDBFile>comparingInt(x -> x.getMetadata().level())
.thenComparingLong(x -> x.getMetadata().size())
.thenComparingLong(x -> x.getMetadata().numEntries())
.thenComparing(x -> x.getMetadata().fileName())
).forEach(file -> {
var meta = file.getMetadata();
System.out.printf("%s\t%s\t%s\t%s\t%s\t%d\t%d%n", meta.fileName(), meta.filePath(), meta.columnName(), meta.keysRange(), new DataSize(meta.size()).toString(false), meta.numEntries(), meta.level());
});
}
case "dump-all" -> {
@ -175,6 +303,157 @@ public class Repair {
System.exit(0);
}
private static Stream<SSTDumpProgress> peekExportSST(Path destDirectory, Stream<SSTDumpProgress> sstProgressStream) {
try {
Files.createDirectories(destDirectory);
} catch (IOException e) {
throw new CompletionException(e);
}
return StreamUtils.resourceStream(() -> new LLSstFileWriter(false), w -> {
AtomicInteger fileNum = new AtomicInteger(1);
AtomicReference<String> fileName = new AtomicReference<>();
AtomicLong memory = new AtomicLong(0);
SstFileWriter writer = w.get();
return sstProgressStream.peek(r -> {
synchronized (writer) {
try {
switch (r) {
case SSTStart start -> {
var sst = destDirectory.resolve(start.metadata().filePath().getFileName()).toString();
fileName.set(sst);
System.err.printf("Creating SST at \"%s\"%n", sst);
writer.open(sst);
}
case SSTBlockKeyValue report -> {
var k = report.rawKey().toByteArray();
var v = report.rawValue().toByteArray();
long currentMem = k.length + v.length;
writer.put(k, v);
var memVal = memory.addAndGet(currentMem);
// 4GiB
if (memVal > 4L * 1024 * 1024 * 1024) {
memory.set(0);
writer.finish();
writer.open(fileName + "." + fileNum.getAndIncrement() + ".sst");
}
}
case SSTOk ok -> writer.finish();
default -> {
}
}
} catch (RocksDBException ex) {
throw new CompletionException(ex);
}
}
});
});
}
private static BufferedWriter getErrorLogFile(String dbName) throws IOException {
Path cwd = Path.of(".");
String errorsFileNamePrefix = "errors-" + dbName;
String errorsFileNameSuffix = ".log";
Path errorsFile;
if (Files.isWritable(cwd)) {
errorsFile = cwd.resolve(errorsFileNamePrefix + "-" + System.currentTimeMillis() + errorsFileNameSuffix);
} else {
errorsFile = Files.createTempFile(errorsFileNamePrefix, errorsFileNameSuffix);
}
return Files.newBufferedWriter(errorsFile,
StandardCharsets.UTF_8,
StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING,
StandardOpenOption.DSYNC
);
}
private static <T extends DbProgress<? extends SSTProgress>> Stream<T> peekProgress(@Nullable BufferedWriter os, LongProgressTracker fileCountTotalTracker,
long ignoredFiles, Stream<T> verificationProgressStream) {
var showProgress = !Boolean.getBoolean("it.cavallium.dbengine.repair.hideprogress");
AtomicLong startMs = new AtomicLong();
return verificationProgressStream.peek(block -> {
synchronized (Repair.class) {
switch (block) {
case null -> {}
case DbSSTProgress<? extends SSTProgress> dbSstProgress -> {
var sstProgress = dbSstProgress.sstProgress();
switch (sstProgress) {
case SSTStart sstStart -> {
startMs.set(System.currentTimeMillis());
System.err.printf("Processing file [%d/%d%s]: %s%n", fileCountTotalTracker.incrementAndGet(),
fileCountTotalTracker.getTotal(),
(ignoredFiles > 0 ? " (+%d ignored)".formatted(ignoredFiles) : ""), sstStart.metadata().filename());
String date;
try {
var dateInstant = Files.getFileAttributeView(sstStart.metadata().filePath(), BasicFileAttributeView.class).readAttributes().creationTime().toInstant().atZone(ZoneId.systemDefault());
date = DateTimeFormatter.ofLocalizedDateTime(FormatStyle.FULL).format(dateInstant);
} catch (Exception e) {
date = "unknown";
e.printStackTrace();
}
System.err.println(
"File scan begin: " + block.databaseName() + (dbSstProgress.column() != null ? "->" + dbSstProgress.column().name() : "") + "->" + dbSstProgress.fileString() + ". The file creation date is: " + date);
if (PRINT_ALL_CHECKSUM_VERIFICATION_STEPS) {
System.err.printf("Seeking to %s->%s->first on file %s%n", block.databaseName(), dbSstProgress.column(), dbSstProgress.file());
}
}
case SSTBlockBad blockBad -> {
StringBuilder errorLineBuilder = new StringBuilder()
.append("File scan ended with error, bad block found: ")
.append(block.databaseName())
.append(dbSstProgress.column() != null ? "->" + dbSstProgress.column().name() : "")
.append("->")
.append(blockBad.rawKey())
.append("->")
.append(dbSstProgress.file());
if (blockBad.ex() != null) {
if (blockBad.ex() instanceof RocksDBException ex) {
errorLineBuilder.append("\n\t").append(ex);
} else {
errorLineBuilder.append("\n").append(ExceptionUtils.getStackTrace(blockBad.ex()));
}
}
String errorLine = errorLineBuilder.toString();
System.err.println("[ ! ] " + errorLine);
if (os != null) {
try {
os.write(errorLine);
os.flush();
} catch (IOException e) {
System.err.println("Can't write to errors file: " + e);
}
}
}
case SSTOk end -> {
var scanned = end.scannedCount();
var time = Duration.ofMillis(System.currentTimeMillis() - startMs.get());
var rate = (int) (scanned / Math.max(1, time.toMillis() / 1000d));
System.err.println("File scan ended with success: " + block.databaseName() + (dbSstProgress.column() != null ? "->" + dbSstProgress.column().name() : "") + "->" + dbSstProgress.fileString() + " - " + end.scannedCount() + " keys scanned in " + time + ". Speed: " + rate + "keys/sec.");
}
case SSTProgressReport progress -> {
if (showProgress) {
boolean shouldSendStatus = progress.fileScanned() % 1_000_000 == 0;
if (PRINT_ALL_CHECKSUM_VERIFICATION_STEPS || shouldSendStatus) {
System.err.printf("Progress: %s[%d/%d] %s%s->%s%n",
dbSstProgress.total() != -1 ? "[%d/%d] file: ".formatted(dbSstProgress.scanned(), dbSstProgress.total()) : "",
progress.fileScanned(),
progress.fileTotal(),
block.databaseName(),
dbSstProgress.column() != null ? "->" + dbSstProgress.column().name() : "",
dbSstProgress.fileString()
);
}
}
}
default -> {}
}
}
default -> throw new IllegalStateException("Unexpected value: " + block);
}
}
});
}
private static Stream<String> getColumnFamilyNames(Path path, String dbName) throws RocksDBException {
String dbPath = path.resolve("database_" + dbName).toString();
System.err.printf("Listing column families of database: %s%n", dbPath);
@ -193,11 +472,11 @@ public class Repair {
.openAsSecondary(true)
.absoluteConsistency(true)
.allowMemoryMapping(true)
.blockCache(Nullablelong.empty())
.blockCache(Nullablelong.of(4L * 1024 * 1024 * 1024))
.lowMemory(false)
.maxOpenFiles(Nullableint.of(-1))
.maxOpenFiles(Nullableint.of(Math.max(40, Runtime.getRuntime().availableProcessors() * 3)))
.optimistic(false)
.spinning(false)
.spinning(true)
.useDirectIO(false)
.extraFlags(Map.of())
.logPath(NullableString.empty())
@ -212,8 +491,8 @@ public class Repair {
).filter(x -> Files.exists(x.volumePath())).toList())
.defaultColumnOptions(DefaultColumnOptions.of(
List.of(),
Nullablelong.empty(),
Nullableboolean.empty(),
Nullablelong.of(512 * 1024 * 1024),
Nullableboolean.of(true),
Nullableboolean.empty(),
NullableFilter.empty(),
Nullableint.empty(),
@ -227,8 +506,8 @@ public class Repair {
.columnOptions(columnNames.stream()
.map(columnName -> NamedColumnOptions.of(columnName,
List.of(),
Nullablelong.empty(),
Nullableboolean.empty(),
Nullablelong.of(512 * 1024 * 1024),
Nullableboolean.of(true),
Nullableboolean.empty(),
NullableFilter.empty(),
Nullableint.empty(),
@ -249,6 +528,9 @@ public class Repair {
or: repair dump-all DIRECTORY DB_NAME COLUMN_NAME...
or: repair verify-checksum DIRECTORY DB_NAME
or: repair list-column-families DIRECTORY DB_NAME
or: repair scan-files DIRECTORY DB_NAME [--skip] FILE-NAME...
or: repair export-files DIRECTORY DB_NAME DESTINATION_DIRECTORY '{fileName}[{key|offset}-{from}-{to}]'...
or: repair list-files DIRECTORY DB_NAME
""");
System.exit(1);
}