diff --git a/pom.xml b/pom.xml
index e7f9b65..9a2a928 100644
--- a/pom.xml
+++ b/pom.xml
@@ -13,8 +13,8 @@
0-SNAPSHOT
false
1.10.4
- 9.7.0
- 8.5.3
+ 9.8.0
+ 8.5.4
5.9.0
1.0.13
diff --git a/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java b/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java
index 30afe41..28740b2 100644
--- a/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java
+++ b/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java
@@ -26,7 +26,7 @@ public interface CompositeDatabase extends DatabaseProperties, DatabaseOperation
/**
* Find corrupted items
*/
- Stream verify();
+ Stream> verify();
void verifyChecksum();
}
diff --git a/src/main/java/it/cavallium/dbengine/client/DbProgress.java b/src/main/java/it/cavallium/dbengine/client/DbProgress.java
new file mode 100644
index 0000000..6309251
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/client/DbProgress.java
@@ -0,0 +1,45 @@
+package it.cavallium.dbengine.client;
+
+import it.cavallium.dbengine.client.SSTProgress.SSTProgressReport;
+import it.cavallium.dbengine.client.SSTProgress.SSTStart;
+import it.cavallium.dbengine.rpc.current.data.Column;
+import java.nio.file.Path;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
+import org.jetbrains.annotations.Nullable;
+
+public interface DbProgress {
+
+ String databaseName();
+
+ record DbSSTProgress(String databaseName, Column column, @Nullable Path file, long scanned,
+ long total, T sstProgress) implements DbProgress {
+
+ public double getProgress() {
+ if (total == 0) {
+ return 0d;
+ }
+ return scanned / (double) total;
+ }
+
+ public String fileString() {
+ return file != null ? file.normalize().toString() : null;
+ }
+ }
+
+ static Stream> toDbProgress(String dbName,
+ String columnName,
+ LongProgressTracker totalTracker,
+ Stream stream) {
+ Column column = Column.of(columnName);
+ AtomicReference filePath = new AtomicReference<>();
+ return stream.map(state -> {
+ switch (state) {
+ case SSTStart start -> filePath.set(start.metadata().filePath());
+ case SSTProgressReport progress -> totalTracker.incrementAndGet();
+ default -> {}
+ }
+ return new DbSSTProgress<>(dbName, column, filePath.get(), totalTracker.getCurrent(), totalTracker.getTotal(), state);
+ });
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/client/LongProgressTracker.java b/src/main/java/it/cavallium/dbengine/client/LongProgressTracker.java
new file mode 100644
index 0000000..5549f27
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/client/LongProgressTracker.java
@@ -0,0 +1,42 @@
+package it.cavallium.dbengine.client;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class LongProgressTracker {
+
+ private final AtomicLong current = new AtomicLong();
+ private final AtomicLong total = new AtomicLong();
+
+ public LongProgressTracker(long size) {
+ setTotal(size);
+ }
+
+ public LongProgressTracker() {
+
+ }
+
+ public LongProgressTracker setTotal(long estimate) {
+ total.set(estimate);
+ return this;
+ }
+
+ public long getCurrent() {
+ return current.get();
+ }
+
+ public long incrementAndGet() {
+ return current.incrementAndGet();
+ }
+
+ public long getAndIncrement() {
+ return current.getAndIncrement();
+ }
+
+ public long getTotal() {
+ return Math.max(current.get(), total.get());
+ }
+
+ public double progress() {
+ return getCurrent() / (double) Math.max(1L, getTotal());
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/client/SSTDumpProgress.java b/src/main/java/it/cavallium/dbengine/client/SSTDumpProgress.java
new file mode 100644
index 0000000..7e7ac9a
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/client/SSTDumpProgress.java
@@ -0,0 +1,17 @@
+package it.cavallium.dbengine.client;
+
+import it.cavallium.buffer.Buf;
+import it.cavallium.dbengine.client.SSTDumpProgress.SSTBlockFail;
+import it.cavallium.dbengine.client.SSTDumpProgress.SSTBlockKeyValue;
+import it.cavallium.dbengine.client.SSTProgress.SSTOk;
+import it.cavallium.dbengine.client.SSTProgress.SSTProgressReport;
+import it.cavallium.dbengine.client.SSTProgress.SSTStart;
+import org.rocksdb.RocksDBException;
+
+public sealed interface SSTDumpProgress extends SSTProgress permits SSTBlockFail, SSTBlockKeyValue, SSTOk,
+ SSTProgressReport, SSTStart {
+
+ record SSTBlockKeyValue(Buf rawKey, Buf rawValue) implements SSTDumpProgress {}
+
+ record SSTBlockFail(RocksDBException ex) implements SSTDumpProgress {}
+}
diff --git a/src/main/java/it/cavallium/dbengine/client/SSTProgress.java b/src/main/java/it/cavallium/dbengine/client/SSTProgress.java
new file mode 100644
index 0000000..7bbe43d
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/client/SSTProgress.java
@@ -0,0 +1,21 @@
+package it.cavallium.dbengine.client;
+
+import it.cavallium.dbengine.database.disk.RocksDBFile.IterationMetadata;
+import it.cavallium.dbengine.rpc.current.data.Column;
+import org.jetbrains.annotations.Nullable;
+
+public interface SSTProgress {
+
+ record SSTStart(IterationMetadata metadata) implements SSTProgress, SSTVerificationProgress, SSTDumpProgress {}
+
+ record SSTOk(long scannedCount) implements SSTProgress, SSTVerificationProgress, SSTDumpProgress {}
+
+ record SSTProgressReport(long fileScanned, long fileTotal) implements SSTProgress, SSTVerificationProgress,
+ SSTDumpProgress {
+
+ public double getFileProgress() {
+ if (fileTotal == 0) return 0d;
+ return fileScanned / (double) fileTotal;
+ }
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/client/SSTVerificationProgress.java b/src/main/java/it/cavallium/dbengine/client/SSTVerificationProgress.java
new file mode 100644
index 0000000..f1404da
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/client/SSTVerificationProgress.java
@@ -0,0 +1,17 @@
+package it.cavallium.dbengine.client;
+
+import it.cavallium.buffer.Buf;
+import it.cavallium.dbengine.client.DbProgress.DbSSTProgress;
+import it.cavallium.dbengine.client.SSTProgress.SSTOk;
+import it.cavallium.dbengine.client.SSTProgress.SSTProgressReport;
+import it.cavallium.dbengine.client.SSTProgress.SSTStart;
+import it.cavallium.dbengine.client.SSTVerificationProgress.SSTBlockBad;
+import it.cavallium.dbengine.rpc.current.data.Column;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
+
+public sealed interface SSTVerificationProgress extends SSTProgress permits SSTOk, SSTProgressReport, SSTStart,
+ SSTBlockBad {
+
+ record SSTBlockBad(Buf rawKey, Throwable ex) implements SSTVerificationProgress {}
+}
diff --git a/src/main/java/it/cavallium/dbengine/client/VerificationProgress.java b/src/main/java/it/cavallium/dbengine/client/VerificationProgress.java
deleted file mode 100644
index 0ee0d4e..0000000
--- a/src/main/java/it/cavallium/dbengine/client/VerificationProgress.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package it.cavallium.dbengine.client;
-
-import it.cavallium.buffer.Buf;
-import it.cavallium.dbengine.rpc.current.data.Column;
-import org.jetbrains.annotations.Nullable;
-
-public sealed interface VerificationProgress {
- record BlockBad(String databaseName, Column column, Buf rawKey, String file, Throwable ex)
- implements VerificationProgress {}
- record FileStart(String databaseName, Column column, String file, @Nullable Long numEntriesEstimate)
- implements VerificationProgress {}
- record FileOk(String databaseName, Column column, String file, long scanned)
- implements VerificationProgress {}
- record Progress(String databaseName, Column column, String file,
- long scanned, long total,
- long fileScanned, long fileTotal)
- implements VerificationProgress {
-
- public double getProgress() {
- return scanned / (double) total;
- }
-
- public double getFileProgress() {
- return fileScanned / (double) fileTotal;
- }
- }
-
- @Nullable String databaseName();
- @Nullable Column column();
- @Nullable String file();
-}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java
index 71a4f76..8828ac9 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java
@@ -1,7 +1,8 @@
package it.cavallium.dbengine.database;
import it.cavallium.buffer.Buf;
-import it.cavallium.dbengine.client.VerificationProgress;
+import it.cavallium.dbengine.client.DbProgress;
+import it.cavallium.dbengine.client.SSTVerificationProgress;
import it.cavallium.dbengine.database.serialization.KVSerializationFunction;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import java.util.List;
@@ -64,7 +65,7 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
int prefixLength,
boolean smallRange);
- Stream verifyChecksum(LLRange range);
+ Stream> verifyChecksum(LLRange range);
void setRange(LLRange range, Stream entries, boolean smallRange);
diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java
index 37b1df4..d7fc254 100644
--- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java
+++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java
@@ -5,8 +5,9 @@ import static it.cavallium.dbengine.utils.StreamUtils.resourceStream;
import it.cavallium.buffer.Buf;
import it.cavallium.buffer.BufDataInput;
import it.cavallium.buffer.BufDataOutput;
-import it.cavallium.dbengine.client.VerificationProgress;
+import it.cavallium.dbengine.client.DbProgress;
import it.cavallium.dbengine.client.CompositeSnapshot;
+import it.cavallium.dbengine.client.SSTVerificationProgress;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLRange;
@@ -269,7 +270,7 @@ public class DatabaseMapDictionaryDeep> implem
}
@Override
- public Stream verifyChecksum() {
+ public Stream> verifyChecksum() {
return dictionary.verifyChecksum(range);
}
diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java
index abdd110..71678ad 100644
--- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java
+++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java
@@ -1,8 +1,9 @@
package it.cavallium.dbengine.database.collections;
import it.cavallium.buffer.Buf;
-import it.cavallium.dbengine.client.VerificationProgress;
import it.cavallium.dbengine.client.CompositeSnapshot;
+import it.cavallium.dbengine.client.DbProgress;
+import it.cavallium.dbengine.client.SSTVerificationProgress;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.SubStageEntry;
import it.cavallium.dbengine.database.UpdateMode;
@@ -148,7 +149,7 @@ public class DatabaseMapDictionaryHashed implements DatabaseStageMap verifyChecksum() {
+ public Stream> verifyChecksum() {
return this.subDictionary.verifyChecksum();
}
diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java
index c7cf60c..dc8f8c9 100644
--- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java
+++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java
@@ -3,8 +3,9 @@ package it.cavallium.dbengine.database.collections;
import it.cavallium.buffer.Buf;
import it.cavallium.buffer.BufDataInput;
import it.cavallium.buffer.BufDataOutput;
-import it.cavallium.dbengine.client.VerificationProgress;
import it.cavallium.dbengine.client.CompositeSnapshot;
+import it.cavallium.dbengine.client.DbProgress;
+import it.cavallium.dbengine.client.SSTVerificationProgress;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
@@ -121,7 +122,7 @@ public final class DatabaseMapSingle implements DatabaseStageEntry {
}
@Override
- public Stream verifyChecksum() {
+ public Stream> verifyChecksum() {
return dictionary.verifyChecksum(LLRange.single(key));
}
diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java
index 7d05136..374f76c 100644
--- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java
+++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java
@@ -1,7 +1,8 @@
package it.cavallium.dbengine.database.collections;
-import it.cavallium.dbengine.client.VerificationProgress;
import it.cavallium.dbengine.client.CompositeSnapshot;
+import it.cavallium.dbengine.client.DbProgress;
+import it.cavallium.dbengine.client.SSTVerificationProgress;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateReturnMode;
@@ -124,7 +125,7 @@ public class DatabaseSingleBucket implements DatabaseStageEntry {
}
@Override
- public Stream verifyChecksum() {
+ public Stream> verifyChecksum() {
return bucketStage.verifyChecksum();
}
diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java
index e2cb7e3..8e4fb39 100644
--- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java
+++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java
@@ -1,8 +1,9 @@
package it.cavallium.dbengine.database.collections;
-import it.cavallium.dbengine.client.VerificationProgress;
import it.cavallium.dbengine.client.CompositeSnapshot;
+import it.cavallium.dbengine.client.DbProgress;
import it.cavallium.dbengine.client.Mapper;
+import it.cavallium.dbengine.client.SSTVerificationProgress;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.disk.CachedSerializationFunction;
@@ -107,7 +108,7 @@ public class DatabaseSingleMapped implements DatabaseStageEntry {
}
@Override
- public Stream verifyChecksum() {
+ public Stream> verifyChecksum() {
return this.serializedSingle.verifyChecksum();
}
diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java
index 001d27b..69dd3d4 100644
--- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java
+++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java
@@ -3,8 +3,9 @@ package it.cavallium.dbengine.database.collections;
import it.cavallium.buffer.Buf;
import it.cavallium.buffer.BufDataInput;
import it.cavallium.buffer.BufDataOutput;
-import it.cavallium.dbengine.client.VerificationProgress;
import it.cavallium.dbengine.client.CompositeSnapshot;
+import it.cavallium.dbengine.client.DbProgress;
+import it.cavallium.dbengine.client.SSTVerificationProgress;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLSingleton;
import it.cavallium.dbengine.database.LLSnapshot;
@@ -119,7 +120,7 @@ public class DatabaseSingleton implements DatabaseStageEntry {
}
@Override
- public Stream verifyChecksum() {
+ public Stream> verifyChecksum() {
return Stream.empty();
}
}
\ No newline at end of file
diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java
index c45337e..f1b13fa 100644
--- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java
+++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java
@@ -1,7 +1,8 @@
package it.cavallium.dbengine.database.collections;
-import it.cavallium.dbengine.client.VerificationProgress;
+import it.cavallium.dbengine.client.DbProgress;
import it.cavallium.dbengine.client.CompositeSnapshot;
+import it.cavallium.dbengine.client.SSTVerificationProgress;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateReturnMode;
@@ -64,5 +65,5 @@ public interface DatabaseStage extends DatabaseStageWithEntry {
return leavesCount(snapshot, false) <= 0;
}
- Stream verifyChecksum();
+ Stream> verifyChecksum();
}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java
index d9cd1a4..c667a80 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java
@@ -7,12 +7,10 @@ import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import it.cavallium.buffer.Buf;
-import it.cavallium.dbengine.database.ColumnUtils;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.RepeatedElementList;
import it.cavallium.dbengine.database.disk.rocksdb.LLReadOptions;
-import it.cavallium.dbengine.database.disk.rocksdb.LLSlice;
import it.cavallium.dbengine.database.disk.rocksdb.LLWriteOptions;
import it.cavallium.dbengine.database.disk.rocksdb.RocksIteratorObj;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
@@ -22,6 +20,7 @@ import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.StampedLock;
import java.util.stream.Stream;
@@ -35,10 +34,11 @@ import org.rocksdb.CompactRangeOptions;
import org.rocksdb.FlushOptions;
import org.rocksdb.Holder;
import org.rocksdb.KeyMayExist;
-import org.rocksdb.LiveFileMetaData;
+import org.rocksdb.LevelMetaData;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksObject;
+import org.rocksdb.SstFileMetaData;
import org.rocksdb.TableProperties;
import org.rocksdb.Transaction;
import org.rocksdb.TransactionOptions;
@@ -564,11 +564,10 @@ public sealed abstract class AbstractRocksDBColumn implements
var closeReadLock = closeLock.readLock();
try {
ensureOpen();
- db.getLiveFiles(); // flushes the memtable
byte[] cfhName = cfh.getName();
- return db.getLiveFilesMetaData().stream()
- .filter(file -> Arrays.equals(cfhName, file.columnFamilyName()))
- .map(file -> new RocksDBFile(db, cfh, file));
+ return db.getColumnFamilyMetaData(cfh).levels().stream()
+ .flatMap(l -> l.files().stream()
+ .map(sstFileMetaData -> new RocksDBColumnFile(db, cfh, sstFileMetaData, cfhName, l.level())));
} finally {
closeLock.unlockRead(closeReadLock);
}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java
index 38625bd..7b842d7 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java
@@ -68,6 +68,10 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection {
}
public Path getDatabasePath(String databaseName) {
+ return getDatabasePath(basePath, databaseName);
+ }
+
+ public static Path getDatabasePath(Path basePath, String databaseName) {
return basePath.resolve("database_" + databaseName);
}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java
index 5924f78..25830a8 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java
@@ -4,27 +4,25 @@ import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
import static it.cavallium.dbengine.database.LLUtils.isBoundedRange;
import static it.cavallium.dbengine.database.LLUtils.mapList;
import static it.cavallium.dbengine.database.LLUtils.toStringSafe;
-import static it.cavallium.dbengine.database.disk.LLLocalKeyValueDatabase.PRINT_ALL_CHECKSUM_VERIFICATION_STEPS;
import static it.cavallium.dbengine.database.disk.UpdateAtomicResultMode.DELTA;
import static it.cavallium.dbengine.utils.StreamUtils.ROCKSDB_POOL;
import static it.cavallium.dbengine.utils.StreamUtils.collectOn;
import static it.cavallium.dbengine.utils.StreamUtils.executing;
import static it.cavallium.dbengine.utils.StreamUtils.fastSummingLong;
import static it.cavallium.dbengine.utils.StreamUtils.resourceStream;
-import static it.cavallium.dbengine.utils.StreamUtils.streamWhileNonNull;
import static java.util.Objects.requireNonNull;
import static it.cavallium.dbengine.utils.StreamUtils.batches;
-import static java.util.Objects.requireNonNullElse;
-import com.google.common.primitives.Longs;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import it.cavallium.buffer.Buf;
-import it.cavallium.dbengine.client.VerificationProgress;
-import it.cavallium.dbengine.client.VerificationProgress.BlockBad;
-import it.cavallium.dbengine.client.VerificationProgress.FileOk;
-import it.cavallium.dbengine.client.VerificationProgress.FileStart;
-import it.cavallium.dbengine.client.VerificationProgress.Progress;
+import it.cavallium.dbengine.client.DbProgress;
+import it.cavallium.dbengine.client.DbProgress.DbSSTProgress;
+import it.cavallium.dbengine.client.SSTProgress.SSTOk;
+import it.cavallium.dbengine.client.SSTProgress.SSTProgressReport;
+import it.cavallium.dbengine.client.SSTProgress.SSTStart;
+import it.cavallium.dbengine.client.SSTVerificationProgress;
+import it.cavallium.dbengine.client.SSTVerificationProgress.SSTBlockBad;
import it.cavallium.dbengine.database.ColumnUtils;
import it.cavallium.dbengine.database.LLDelta;
import it.cavallium.dbengine.database.LLDictionary;
@@ -38,32 +36,28 @@ import it.cavallium.dbengine.database.RocksDBLongProperty;
import it.cavallium.dbengine.database.SerializedKey;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.UpdateReturnMode;
+import it.cavallium.dbengine.database.disk.RocksDBFile.IterationMetadata;
import it.cavallium.dbengine.database.disk.rocksdb.LLReadOptions;
import it.cavallium.dbengine.database.disk.rocksdb.LLWriteOptions;
import it.cavallium.dbengine.database.serialization.KVSerializationFunction;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.rpc.current.data.Column;
import it.cavallium.dbengine.utils.DBException;
-import it.cavallium.dbengine.utils.StreamUtils;
import java.io.IOException;
import java.math.BigInteger;
-import java.nio.file.LinkOption;
-import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
-import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
@@ -645,7 +639,7 @@ public class LLLocalDictionary implements LLDictionary {
}
@Override
- public Stream verifyChecksum(LLRange rangeFull) {
+ public Stream> verifyChecksum(LLRange rangeFull) {
Set brokenFiles = new ConcurrentHashMap().keySet(true);
LongAdder totalScanned = new LongAdder();
long totalEstimate;
@@ -665,30 +659,38 @@ public class LLLocalDictionary implements LLDictionary {
var liveFiles = db.getAllLiveFiles().toList();
var liveFilesCount = liveFiles.size();
- ConcurrentHashMap fileToInitialEstimate = new ConcurrentHashMap<>();
return liveFiles.stream()
.sorted(Comparator.reverseOrder())
- .flatMap(fr -> fr.verify(databaseName, columnName, rangeFull))
- .map(status -> switch (status) {
- case FileStart fileStart -> {
- Long numEntriesEstimate = Objects.requireNonNullElse(fileStart.numEntriesEstimate(), totalEstimate / liveFilesCount);
- totalScanned.add(numEntriesEstimate);
- fileToInitialEstimate.put(fileStart.file(), numEntriesEstimate);
- yield status;
- }
- case FileOk fileEnd -> {
- long initialNumEntriesEstimate = Objects.requireNonNullElse(fileToInitialEstimate.remove(fileEnd.file()), 0L);
- long numEntriesScanned = fileEnd.scanned();
- totalScanned.add(numEntriesScanned - initialNumEntriesEstimate);
- yield status;
- }
- case Progress progress -> new Progress(progress.databaseName(), progress.column(), progress.file(), totalScanned.longValue(), totalEstimate, progress.fileScanned(), progress.fileTotal());
- default -> status;
+ .flatMap(fr -> {
+ AtomicReference metadataRef = new AtomicReference<>();
+ AtomicLong initialEstimate = new AtomicLong();
+ return fr
+ .verify(SSTRange.parse(rangeFull))
+ .map(status -> switch (status) {
+ case SSTStart fileStart -> {
+ metadataRef.set(fileStart.metadata());
+ long numEntriesEstimate = Objects.requireNonNullElse(fileStart.metadata().countEstimate(), totalEstimate / liveFilesCount);
+ totalScanned.add(numEntriesEstimate);
+ initialEstimate.set(numEntriesEstimate);
+ yield status;
+ }
+ case SSTOk fileEnd -> {
+ long initialNumEntriesEstimate = initialEstimate.get();
+ long numEntriesScanned = fileEnd.scannedCount();
+ totalScanned.add(numEntriesScanned - initialNumEntriesEstimate);
+ yield status;
+ }
+ case SSTProgressReport ignored -> status;
+ case SSTBlockBad ignored -> status;
+ })
+ .>map(sstProgress -> new DbProgress.DbSSTProgress<>(databaseName, column, metadataRef.get().filePath(), totalScanned.longValue(), totalEstimate, sstProgress));
})
- .filter(err -> !(err instanceof BlockBad blockBad && blockBad.rawKey() == null && !brokenFiles.add(blockBad.file())));
+ .filter(err -> !(err instanceof DbProgress.DbSSTProgress sstProgress
+ && sstProgress.sstProgress() instanceof SSTBlockBad blockBad
+ && blockBad.rawKey() == null && !brokenFiles.add(sstProgress.fileString())));
} catch (RocksDBException e) {
- return Stream.of(new BlockBad(databaseName, column, null, null, e));
+ return Stream.of(new DbProgress.DbSSTProgress<>(databaseName, column, null, 0, 0, new SSTBlockBad(null, e)));
}
}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java
index 518800c..09b9d2c 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java
@@ -34,7 +34,6 @@ import it.cavallium.dbengine.rpc.current.data.NoFilter;
import java.io.File;
import java.io.IOException;
import it.cavallium.dbengine.utils.DBException;
-import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -115,8 +114,6 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa
= Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.checks.paranoidfilechecks", "false"));
private static final boolean FORCE_COLUMN_FAMILY_CONSISTENCY_CHECKS
= Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.checks.forcecolumnfamilyconsistencychecks", "true"));
- static final boolean PRINT_ALL_CHECKSUM_VERIFICATION_STEPS
- = Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.checks.verification.print", "false"));
private static final InfoLogLevel LOG_LEVEL = InfoLogLevel.getInfoLogLevel(Byte.parseByte(System.getProperty("it.cavallium.dbengine.log.levelcode", "" + InfoLogLevel.WARN_LEVEL.getValue())));
private static final CacheFactory CACHE_FACTORY = USE_CLOCK_CACHE ? new ClockCacheFactory() : new LRUCacheFactory();
@@ -631,7 +628,7 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa
var liveFilesMetadata = db.getLiveFilesMetaData();
List files = new ArrayList<>();
for (LiveFileMetaData file : liveFilesMetadata) {
- files.add(new RocksDBFile(db, getCfh(file.columnFamilyName()), file));
+ files.add(new RocksDBColumnFile(db, getCfh(file.columnFamilyName()), file));
}
return files.stream();
} finally {
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumnFile.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumnFile.java
new file mode 100644
index 0000000..95539f6
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumnFile.java
@@ -0,0 +1,52 @@
+package it.cavallium.dbengine.database.disk;
+
+import static it.cavallium.dbengine.utils.StreamUtils.resourceStream;
+import static java.util.Objects.requireNonNull;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.LiveFileMetaData;
+import org.rocksdb.RocksDB;
+import org.rocksdb.SstFileMetaData;
+
+public class RocksDBColumnFile extends RocksDBFile {
+
+ private final ColumnFamilyHandle cfh;
+
+ public RocksDBColumnFile(RocksDB db, ColumnFamilyHandle cfh, RocksDBFileMetadata metadata) {
+ super(metadata);
+ this.cfh = cfh;
+ }
+
+ public RocksDBColumnFile(T db, ColumnFamilyHandle cfh, LiveFileMetaData file) {
+ this(db,
+ cfh,
+ new RocksDBFileMetadata(Path.of(file.path() + file.fileName()),
+ file.fileName(),
+ file.level(),
+ new String(file.columnFamilyName(), StandardCharsets.UTF_8),
+ file.numEntries(),
+ file.size(),
+ decodeRange(file.smallestKey(), file.largestKey())
+ )
+ );
+ }
+
+ public RocksDBColumnFile(T db, ColumnFamilyHandle cfh,
+ SstFileMetaData file, byte[] columnFamilyName, int level) {
+ this(db,
+ cfh,
+ new RocksDBFileMetadata(Path.of(file.path()+ file.fileName()),
+ file.fileName(),
+ level,
+ new String(columnFamilyName, StandardCharsets.UTF_8),
+ file.numEntries(),
+ file.size(),
+ decodeRange(file.smallestKey(), file.largestKey())
+ )
+ );
+ }
+
+
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBFile.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBFile.java
index 1596f49..8dd4376 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBFile.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBFile.java
@@ -1,54 +1,56 @@
package it.cavallium.dbengine.database.disk;
-import static it.cavallium.dbengine.database.LLUtils.isBoundedRange;
-import static it.cavallium.dbengine.database.disk.LLLocalKeyValueDatabase.PRINT_ALL_CHECKSUM_VERIFICATION_STEPS;
import static it.cavallium.dbengine.utils.StreamUtils.resourceStream;
import static java.util.Objects.requireNonNull;
import com.google.common.primitives.Longs;
import it.cavallium.buffer.Buf;
-import it.cavallium.dbengine.client.VerificationProgress;
-import it.cavallium.dbengine.client.VerificationProgress.BlockBad;
-import it.cavallium.dbengine.client.VerificationProgress.FileOk;
-import it.cavallium.dbengine.client.VerificationProgress.FileStart;
-import it.cavallium.dbengine.client.VerificationProgress.Progress;
+import it.cavallium.dbengine.client.SSTDumpProgress;
+import it.cavallium.dbengine.client.SSTDumpProgress.SSTBlockFail;
+import it.cavallium.dbengine.client.SSTDumpProgress.SSTBlockKeyValue;
+import it.cavallium.dbengine.client.SSTProgress.SSTOk;
+import it.cavallium.dbengine.client.SSTProgress.SSTProgressReport;
+import it.cavallium.dbengine.client.SSTProgress.SSTStart;
+import it.cavallium.dbengine.client.SSTVerificationProgress;
+import it.cavallium.dbengine.client.SSTVerificationProgress.SSTBlockBad;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils;
-import it.cavallium.dbengine.database.disk.rocksdb.LLReadOptions;
-import it.cavallium.dbengine.rpc.current.data.Column;
+import it.cavallium.dbengine.database.disk.RocksDBFile.RocksDBFileIterationKeyState.RocksDBFileIterationStateKeyError;
+import it.cavallium.dbengine.database.disk.RocksDBFile.RocksDBFileIterationKeyState.RocksDBFileIterationStateKeyOk;
+import it.cavallium.dbengine.database.disk.RocksDBFile.RocksDBFileIterationState.RocksDBFileIterationStateBegin;
+import it.cavallium.dbengine.database.disk.RocksDBFile.RocksDBFileIterationState.RocksDBFileIterationStateEnd;
+import it.cavallium.dbengine.database.disk.RocksDBFile.RocksDBFileIterationState.RocksDBFileIterationStateKey;
+import it.cavallium.dbengine.database.disk.SSTRange.SSTLLRange;
+import it.cavallium.dbengine.database.disk.SSTRange.SSTRangeFull;
+import it.cavallium.dbengine.database.disk.SSTRange.SSTRangeKey;
+import it.cavallium.dbengine.database.disk.SSTRange.SSTRangeNone;
+import it.cavallium.dbengine.database.disk.SSTRange.SSTRangeOffset;
+import it.cavallium.dbengine.database.disk.SSTRange.SSTSingleKey;
+import it.cavallium.dbengine.database.disk.rocksdb.LLSstFileReader;
import it.cavallium.dbengine.utils.StreamUtils;
-import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
-import java.util.Objects;
-import java.util.Optional;
import java.util.StringJoiner;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.LiveFileMetaData;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
-import org.rocksdb.SstFileMetaData;
public class RocksDBFile implements Comparable {
protected static final Logger logger = LogManager.getLogger(RocksDBFile.class);
+ protected final RocksDBFileMetadata metadata;
+ protected final Long sstNumber;
- private final RocksDB db;
- private final ColumnFamilyHandle cfh;
- private final RocksDBFileMetadata metadata;
- private final Long sstNumber;
-
- public RocksDBFile(RocksDB db, ColumnFamilyHandle cfh, RocksDBFileMetadata metadata) {
- this.db = db;
- this.cfh = cfh;
+ public RocksDBFile(RocksDBFileMetadata metadata) {
this.metadata = metadata;
- String fileName = metadata.fileName().replace("/", "");
+ String fileName = metadata.fileName().startsWith("/") ? metadata.fileName().substring(1) : metadata.fileName();
int extensionIndex = fileName.indexOf(".");
Long sstNumber = null;
if (extensionIndex != -1) {
@@ -60,137 +62,194 @@ public class RocksDBFile implements Comparable {
}
}
- public RocksDBFile(T db, ColumnFamilyHandle cfh, LiveFileMetaData file) {
- this(db,
- cfh,
- new RocksDBFileMetadata(file.path(),
- file.fileName(),
- file.level(),
- new String(file.columnFamilyName(), StandardCharsets.UTF_8),
- file.numEntries(),
- file.size(),
- decodeRange(file.smallestKey(), file.largestKey())
+ public RocksDBFile(Path dbBaseDir, String file) {
+ this(new RocksDBFileMetadata(dbBaseDir.resolve(file.startsWith("/") ? file.substring(1) : file),
+ StringUtils.substringAfter(file, '/'),
+ 0,
+ "any",
+ 0,
+ 0,
+ LLRange.all()
)
);
}
- public RocksDBFile(T db, ColumnFamilyHandle cfh, SstFileMetaData file, byte[] columnFamilyName, int level) {
- this(db,
- cfh,
- new RocksDBFileMetadata(file.path(),
- file.fileName(),
- level,
- new String(columnFamilyName, StandardCharsets.UTF_8),
- file.numEntries(),
- file.size(),
- decodeRange(file.smallestKey(), file.largestKey())
- )
- );
- }
-
- private static LLRange decodeRange(byte[] smallestKey, byte[] largestKey) {
+ protected static LLRange decodeRange(byte[] smallestKey, byte[] largestKey) {
return LLRange.of(Buf.wrap(smallestKey), Buf.wrap(largestKey));
}
+ private static SSTRange intersectWithMetadata(LLRange metadataRange, SSTRange innerRange) {
+ return switch (innerRange) {
+ case SSTRangeFull ignored -> SSTRange.parse(metadataRange);
+ case SSTSingleKey singleKey -> SSTRange.parse(LLRange.intersect(metadataRange, singleKey.toLLRange()));
+ case SSTRangeKey rangeKey -> SSTRange.parse(LLRange.intersect(metadataRange, rangeKey.toLLRange()));
+ case SSTRangeNone none -> none;
+ case SSTRangeOffset offset -> offset;
+ };
+ }
+
public RocksDBFileMetadata getMetadata() {
return metadata;
}
- public record VerificationFileRange(String filePath, String filename, @Nullable LLRange range, long countEstimate,
- @Nullable Long sstNumber) {}
-
- public Stream verify(String databaseDisplayName, String columnDisplayName, LLRange rangeFull) {
- var intersectedRange = LLRange.intersect(metadata.keysRange(), rangeFull);
- // Ignore the file if it's outside the requested range
- if (intersectedRange == null) {
- return Stream.of();
- }
-
- String filePath = Path.of(metadata.path()).resolve("./" + metadata.fileName()).normalize().toString();
- var fr = new VerificationFileRange(filePath,
- metadata.fileName().replace("/", ""),
- intersectedRange,
- metadata.numEntries(),
- sstNumber
- );
- return verify(databaseDisplayName, columnDisplayName, rangeFull, fr);
+ public Stream verify(SSTRange range) {
+ AtomicLong fileScanned = new AtomicLong();
+ AtomicLong fileTotal = new AtomicLong();
+ return iterate(range).map(state -> switch (state) {
+ case RocksDBFileIterationStateBegin begin -> {
+ var countEstimate = begin.metadata().countEstimate();
+ if (countEstimate != null) {
+ fileTotal.set(countEstimate);
+ }
+ yield new SSTStart(begin.metadata());
+ }
+ case RocksDBFileIterationStateKey key -> {
+ var scanned = fileScanned.incrementAndGet();
+ yield switch (key.state()) {
+ case RocksDBFileIterationStateKeyOk ignored ->
+ new SSTProgressReport(scanned, Math.max(scanned, fileTotal.get()));
+ case RocksDBFileIterationStateKeyError keyError -> new SSTBlockBad(key.key, keyError.exception);
+ };
+ }
+ case RocksDBFileIterationStateEnd end -> new SSTOk(end.scannedCount());
+ });
}
- private Stream verify(String databaseDisplayName, String columnDisplayName, LLRange rangeFull, VerificationFileRange fr) {
- var columnObj = Column.of(columnDisplayName);
- Stream streamInit = Stream.of(new FileStart(databaseDisplayName, columnObj, fr.filePath, fr.countEstimate > 0 ? fr.countEstimate : null));
- Stream streamContent;
- Objects.requireNonNull(fr.range);
-
- String filename = fr.filename;
- String path = fr.filePath;
- LLRange rangePartial = fr.range;
+ public Stream readAllSST(SSTRange range, boolean failOnError) {
AtomicLong fileScanned = new AtomicLong();
- final long fileEstimate = fr.countEstimate;
- AtomicBoolean mustSeek = new AtomicBoolean(true);
- AtomicBoolean streamEnded = new AtomicBoolean(false);
- streamContent = resourceStream(
- () -> LLUtils.generateCustomReadOptions(null, false, isBoundedRange(rangePartial), false),
- ro -> {
- ro.setIterateLowerBound(rangePartial.getMin() != null ? requireNonNull(LLUtils.asArray(rangePartial.getMin())) : null);
- ro.setIterateUpperBound(rangePartial.getMax() != null ? requireNonNull(LLUtils.asArray(rangePartial.getMax())) : null);
- ro.setFillCache(false);
- ro.setIgnoreRangeDeletions(true);
- if (!rangePartial.isSingle()) {
- ro.setReadaheadSize(256 * 1024 * 1024);
+ AtomicLong fileTotal = new AtomicLong();
+ return iterate(range).mapMulti((state, consumer) -> {
+ switch (state) {
+ case RocksDBFileIterationStateBegin begin -> {
+ var countEstimate = begin.metadata().countEstimate();
+ if (countEstimate != null) {
+ fileTotal.set(countEstimate);
}
- ro.setVerifyChecksums(true);
- return resourceStream(() -> ro.newIterator(db, cfh, IteratorMetrics.NO_OP), rocksIterator -> StreamUtils
- .>streamWhileNonNull(() -> {
- boolean mustSeekVal = mustSeek.compareAndSet(true, false);
- if (!mustSeekVal && !rocksIterator.isValid()) {
- if (streamEnded.compareAndSet(false, true)) {
- return Optional.of(new FileOk(databaseDisplayName, columnObj, path, fileScanned.get()));
- } else {
- //noinspection OptionalAssignedToNull
- return null;
- }
- }
- boolean shouldSendStatus;
- Buf rawKey = null;
- try {
- if (mustSeekVal) {
- if (PRINT_ALL_CHECKSUM_VERIFICATION_STEPS) {
- logger.info("Seeking to {}->{}->first on file {}", databaseDisplayName, columnDisplayName, filename);
- }
- rocksIterator.seekToFirst();
- shouldSendStatus = true;
- } else {
- rawKey = rocksIterator.keyBuf().copy();
- shouldSendStatus = fileScanned.incrementAndGet() % 1_000_000 == 0;
- if (PRINT_ALL_CHECKSUM_VERIFICATION_STEPS) {
- logger.info("Checking {}->{}->{} on file {}", databaseDisplayName, columnDisplayName, rawKey.toString(), filename);
- }
- rocksIterator.next();
- }
- } catch (RocksDBException ex) {
- return Optional.of(new BlockBad(databaseDisplayName, columnObj, rawKey, path, ex));
- }
- if (shouldSendStatus) {
- long fileScannedVal = fileScanned.get();
- return Optional.of(new Progress(databaseDisplayName,
- columnObj,
- path,
- -1,
- -1,
- fileScannedVal,
- Math.max(fileEstimate, fileScannedVal)
- ));
- } else {
- return Optional.empty();
- }
- }).filter(Optional::isPresent).map(Optional::get).onClose(() -> {
- rocksIterator.close();
- ro.close();
- }));
+ consumer.accept(new SSTStart(begin.metadata()));
}
+ case RocksDBFileIterationStateKey key -> {
+ var scanned = fileScanned.incrementAndGet();
+ switch (key.state()) {
+ case RocksDBFileIterationStateKeyOk ignored -> {
+ consumer.accept(new SSTBlockKeyValue(key.key(), ignored.value()));
+ consumer.accept(new SSTProgressReport(scanned, Math.max(scanned, fileTotal.get())));
+ }
+ case RocksDBFileIterationStateKeyError keyError -> {
+ if (failOnError) {
+ throw new CompletionException(keyError.exception());
+ } else {
+ logger.error("Corrupted SST \"{}\" after \"{}\" scanned keys", sstNumber, scanned);
+ // This is sent before bad block, so takewhile still returns ok before the end, if failOnError is false
+ consumer.accept(new SSTOk(scanned));
+ }
+ consumer.accept(new SSTBlockFail(keyError.exception));
+ }
+ }
+ }
+ case RocksDBFileIterationStateEnd end -> consumer.accept(new SSTOk(end.scannedCount()));
+ }
+ }).takeWhile(data -> !(data instanceof SSTBlockFail));
+ }
+
+ public Stream iterate(SSTRange rangeFull) {
+ var intersectedRange = RocksDBFile.intersectWithMetadata(metadata.keysRange(), rangeFull);
+
+ Path filePath = metadata.filePath();
+ String filePathString = filePath.toString();
+ var meta = new IterationMetadata(filePath,
+ metadata.fileName().replace("/", ""),
+ intersectedRange,
+ metadata.numEntries() > 0 ? metadata.numEntries() : null,
+ sstNumber
);
- return Stream.concat(streamInit, streamContent);
+
+ Stream streamContent;
+ // Ignore the file if it's outside the requested range
+ if (intersectedRange instanceof SSTRangeNone) {
+ streamContent = Stream.of(new RocksDBFileIterationStateBegin(meta), new RocksDBFileIterationStateEnd(0L));
+ } else {
+ AtomicLong fileScanned = new AtomicLong();
+ AtomicBoolean mustSeek = new AtomicBoolean(true);
+ try {
+ streamContent = resourceStream(() -> new LLSstFileReader(false, filePathString),
+ r -> resourceStream(() -> LLUtils.generateCustomReadOptions(null, false, intersectedRange.isBounded(), false),
+ ro -> {
+ long skipToIndex;
+ long readToCount;
+ switch (intersectedRange) {
+ case SSTLLRange sstllRange -> {
+ var llRange = sstllRange.toLLRange();
+ requireNonNull(llRange);
+ ro.setIterateLowerBound(
+ llRange.getMin() != null ? requireNonNull(LLUtils.asArray(llRange.getMin())) : null);
+ ro.setIterateUpperBound(
+ llRange.getMax() != null ? requireNonNull(LLUtils.asArray(llRange.getMax())) : null);
+ skipToIndex = 0;
+ readToCount = Long.MAX_VALUE;
+ }
+ case SSTRangeOffset offset -> {
+ skipToIndex = offset.offsetMin() == null ? 0 : offset.offsetMin();
+ readToCount = offset.offsetMax() == null ? Long.MAX_VALUE : (offset.offsetMax() - skipToIndex);
+ }
+ default -> throw new IllegalStateException("Unexpected value: " + intersectedRange);
+ }
+ ro.setFillCache(true);
+ ro.setIgnoreRangeDeletions(true);
+ if (!(intersectedRange instanceof SSTSingleKey)) {
+ ro.setReadaheadSize(256 * 1024 * 1024);
+ }
+ ro.setVerifyChecksums(true);
+ return resourceStream(() -> ro.newIterator(r.get(), IteratorMetrics.NO_OP),
+ rocksIterator -> StreamUtils.streamUntil(() -> {
+ boolean mustSeekVal = mustSeek.compareAndSet(true, false);
+ if (!mustSeekVal && !rocksIterator.isValid()) {
+ return new RocksDBFileIterationStateEnd(fileScanned.get());
+ }
+ Buf rawKey = null;
+ Buf rawValue = null;
+ RocksDBFileIterationKeyState keyResult;
+ var index = fileScanned.getAndIncrement();
+ if (index >= readToCount) {
+ return new RocksDBFileIterationStateEnd(fileScanned.get());
+ } else {
+ try {
+ if (mustSeekVal) {
+ rocksIterator.seekToFirstUnsafe();
+ if (skipToIndex > 0) {
+ for (long i = 0; i < skipToIndex; i++) {
+ if (!rocksIterator.isValid()) {
+ break;
+ }
+ rocksIterator.nextUnsafe();
+ }
+ }
+ return new RocksDBFileIterationStateBegin(meta);
+ } else {
+ rawKey = rocksIterator.keyBuf().copy();
+ rawValue = rocksIterator.valueBuf().copy();
+ rocksIterator.next();
+ }
+ keyResult = new RocksDBFileIterationStateKeyOk(rawValue);
+ } catch (RocksDBException ex) {
+ keyResult = new RocksDBFileIterationStateKeyError(ex);
+ }
+
+ return new RocksDBFileIterationStateKey(rawKey, keyResult, index);
+ }
+ }, x -> x instanceof RocksDBFileIterationStateEnd).onClose(() -> {
+ rocksIterator.close();
+ ro.close();
+ })
+ );
+ }
+ )
+ );
+ } catch (RocksDBException e) {
+ streamContent = Stream.of(new RocksDBFileIterationStateBegin(meta),
+ new RocksDBFileIterationStateKey(null, new RocksDBFileIterationStateKeyError(e), 0));
+ }
+ }
+ return streamContent;
}
@Override
@@ -211,4 +270,28 @@ public class RocksDBFile implements Comparable {
}
return Long.compare(this.sstNumber, o.sstNumber);
}
+
+ public Long getSstNumber() {
+ return sstNumber;
+ }
+
+ public sealed interface RocksDBFileIterationState {
+
+ record RocksDBFileIterationStateBegin(IterationMetadata metadata) implements RocksDBFileIterationState {}
+
+ record RocksDBFileIterationStateKey(Buf key, RocksDBFileIterationKeyState state, long scannedCount) implements
+ RocksDBFileIterationState {}
+
+ record RocksDBFileIterationStateEnd(long scannedCount) implements RocksDBFileIterationState {}
+ }
+
+ public sealed interface RocksDBFileIterationKeyState {
+
+ record RocksDBFileIterationStateKeyOk(Buf value) implements RocksDBFileIterationKeyState {}
+
+ record RocksDBFileIterationStateKeyError(RocksDBException exception) implements RocksDBFileIterationKeyState {}
+ }
+
+ public record IterationMetadata(Path filePath, String filename, @NotNull SSTRange range,
+ @Nullable Long countEstimate, @Nullable Long sstNumber) {}
}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBFileMetadata.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBFileMetadata.java
index 34e7b9c..a9b941d 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBFileMetadata.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBFileMetadata.java
@@ -1,8 +1,9 @@
package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.LLRange;
+import java.nio.file.Path;
-public record RocksDBFileMetadata(String path, String fileName, int level, String columnName, long numEntries, long size,
+public record RocksDBFileMetadata(Path filePath, String fileName, int level, String columnName, long numEntries, long size,
LLRange keysRange) {
}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBUtils.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBUtils.java
index f47fb9f..40f0b70 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBUtils.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBUtils.java
@@ -34,7 +34,7 @@ public class RocksDBUtils {
if (!excludeLastLevel || level.level() < lastLevelId) {
for (SstFileMetaData file : level.files()) {
if (file.fileName().endsWith(".sst")) {
- files.add(new RocksDBFile(db, cfh, file, meta.name(), level.level()));
+ files.add(new RocksDBColumnFile(db, cfh, file, meta.name(), level.level()));
}
}
}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/SSTRange.java b/src/main/java/it/cavallium/dbengine/database/disk/SSTRange.java
new file mode 100644
index 0000000..dea5ff2
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/disk/SSTRange.java
@@ -0,0 +1,117 @@
+package it.cavallium.dbengine.database.disk;
+
+import it.cavallium.buffer.Buf;
+import it.cavallium.dbengine.database.LLRange;
+import it.cavallium.dbengine.database.LLUtils;
+import java.util.HexFormat;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+public sealed interface SSTRange {
+
+ boolean isBounded();
+
+ sealed interface SSTLLRange extends SSTRange permits SSTRangeNone, SSTRangeFull, SSTRangeKey, SSTSingleKey {
+
+ @Nullable LLRange toLLRange();
+
+ default boolean isBounded() {
+ LLRange r = toLLRange();
+ return r == null || LLUtils.isBoundedRange(r);
+ }
+ }
+
+ ;
+
+ static SSTRange parse(String raw) {
+ var parts = StringUtils.split(raw, '-');
+ return switch (parts[0]) {
+ case "none" -> new SSTRangeNone();
+ case "full" -> new SSTRangeFull();
+ case "offset" -> new SSTRangeOffset(parts[1].isBlank() ? null : Long.parseUnsignedLong(parts[1]),
+ parts[2].isBlank() ? null : Long.parseUnsignedLong(parts[2])
+ );
+ case "key" -> new SSTRangeKey(parts[1].isBlank() ? null : Buf.wrap(LLUtils.parseHex(parts[1])),
+ parts[2].isBlank() ? null : Buf.wrap(LLUtils.parseHex(parts[2]))
+ );
+ case "single-key" -> new SSTSingleKey(Buf.wrap(LLUtils.parseHex(parts[1])));
+ default -> throw new IllegalStateException("Unexpected value: " + parts[0]);
+ };
+ }
+
+ static SSTRange parse(LLRange range) {
+ if (range == null) {
+ return new SSTRangeNone();
+ }
+ return range.isAll() ? new SSTRangeFull()
+ : range.isSingle() ? new SSTSingleKey(range.getSingle()) : new SSTRangeKey(range.getMin(), range.getMax());
+ }
+
+ record SSTRangeNone() implements SSTRange, SSTLLRange {
+
+ @Override
+ public String toString() {
+ return "none";
+ }
+
+ public LLRange toLLRange() {
+ return null;
+ }
+ }
+
+ record SSTRangeFull() implements SSTRange, SSTLLRange {
+
+ @Override
+ public String toString() {
+ return "full";
+ }
+
+ public LLRange toLLRange() {
+ return LLRange.all();
+ }
+ }
+
+ record SSTRangeOffset(@Nullable Long offsetMin, @Nullable Long offsetMax) implements SSTRange {
+
+ @Override
+ public String toString() {
+ return "offset-" + (offsetMin != null ? offsetMin : "") + "-" + (offsetMax != null ? offsetMax : "");
+ }
+
+ @Override
+ public boolean isBounded() {
+ return offsetMin != null && offsetMax != null;
+ }
+ }
+
+ record SSTRangeKey(@Nullable Buf min, @Nullable Buf max) implements SSTRange, SSTLLRange {
+
+ private static final HexFormat HF = HexFormat.of();
+
+ @Override
+ public String toString() {
+ return "key-" + (min != null ? HF.formatHex(min.asUnboundedArray(), 0, min.size()) : "") + "-" + (max != null
+ ? HF.formatHex(max.asUnboundedArray(), 0, max.size()) : "");
+ }
+
+ public LLRange toLLRange() {
+ return LLRange.of(min, max);
+ }
+ }
+
+ record SSTSingleKey(@NotNull Buf key) implements SSTRange, SSTLLRange {
+
+ private static final HexFormat HF = HexFormat.of();
+
+ @Override
+ public String toString() {
+ return "single-key-" + HF.formatHex(key.asUnboundedArray(), 0, key.size());
+ }
+
+ public LLRange toLLRange() {
+ return LLRange.single(key);
+ }
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLReadOptions.java b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLReadOptions.java
index a5b89b8..0f7b1c6 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLReadOptions.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLReadOptions.java
@@ -9,6 +9,7 @@ import org.rocksdb.ReadOptions;
import org.rocksdb.ReadTier;
import org.rocksdb.RocksDB;
import org.rocksdb.Snapshot;
+import org.rocksdb.SstFileReader;
public final class LLReadOptions extends SimpleResource {
@@ -115,7 +116,11 @@ public final class LLReadOptions extends SimpleResource {
}
public RocksIteratorObj newIterator(RocksDB db, ColumnFamilyHandle cfh, IteratorMetrics iteratorMetrics) {
- return new RocksIteratorObj(db.newIterator(cfh, val), this, iteratorMetrics);
+ return RocksIteratorObj.create(db.newIterator(cfh, val), this, iteratorMetrics);
+ }
+
+ public RocksIteratorObj newIterator(SstFileReader r, IteratorMetrics iteratorMetrics) {
+ return RocksIteratorObj.create(r.newIterator(val), this, iteratorMetrics);
}
public void setFillCache(boolean fillCache) {
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLSstFileReader.java b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLSstFileReader.java
new file mode 100644
index 0000000..edc4240
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLSstFileReader.java
@@ -0,0 +1,31 @@
+package it.cavallium.dbengine.database.disk.rocksdb;
+
+import it.cavallium.dbengine.utils.SimpleResource;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.SstFileReader;
+
+public class LLSstFileReader extends SimpleResource {
+
+ private final Options options;
+ private final SstFileReader r;
+
+ public LLSstFileReader(boolean checks, String filePath) throws RocksDBException {
+ this.options = new Options();
+ this.r = new SstFileReader(options
+ .setAllowMmapReads(true)
+ .setParanoidChecks(checks)
+ );
+ r.open(filePath);
+ }
+
+ public SstFileReader get() {
+ return r;
+ }
+
+ @Override
+ protected void onClose() {
+ r.close();
+ options.close();
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLSstFileWriter.java b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLSstFileWriter.java
new file mode 100644
index 0000000..13a1efb
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/LLSstFileWriter.java
@@ -0,0 +1,40 @@
+package it.cavallium.dbengine.database.disk.rocksdb;
+
+import it.cavallium.dbengine.utils.SimpleResource;
+import org.rocksdb.CompressionOptions;
+import org.rocksdb.CompressionType;
+import org.rocksdb.EnvOptions;
+import org.rocksdb.Options;
+import org.rocksdb.SstFileWriter;
+
+public class LLSstFileWriter extends SimpleResource {
+
+ private final Options options;
+ private final CompressionOptions compressionOptions;
+ private final SstFileWriter w;
+
+ public LLSstFileWriter(boolean unorderedWrite) {
+ this.options = new Options();
+ this.compressionOptions = new CompressionOptions();
+ this.w = new SstFileWriter(new EnvOptions(),
+ options
+ .setCompressionOptions(compressionOptions
+ .setEnabled(true)
+ .setMaxDictBytes(32768)
+ .setZStdMaxTrainBytes(32768 * 4))
+ .setCompressionType(CompressionType.ZSTD_COMPRESSION)
+ .setUnorderedWrite(unorderedWrite)
+ );
+ }
+
+ public SstFileWriter get() {
+ return w;
+ }
+
+ @Override
+ protected void onClose() {
+ w.close();
+ compressionOptions.close();
+ options.close();
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/RocksIteratorObj.java b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/RocksIteratorObj.java
index 590f6bf..b77122c 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/RocksIteratorObj.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/rocksdb/RocksIteratorObj.java
@@ -7,25 +7,25 @@ import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.IteratorMetrics;
import it.cavallium.dbengine.utils.SimpleResource;
import java.nio.ByteBuffer;
-import org.rocksdb.AbstractSlice;
+import org.rocksdb.AbstractRocksIterator;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
+import org.rocksdb.SstFileReaderIterator;
-public class RocksIteratorObj extends SimpleResource {
+public abstract class RocksIteratorObj extends SimpleResource {
- private LLReadOptions readOptions;
- private final RocksIterator rocksIterator;
- private final Counter startedIterSeek;
- private final Counter endedIterSeek;
- private final Timer iterSeekTime;
- private final Counter startedIterNext;
- private final Counter endedIterNext;
- private final Timer iterNextTime;
- private byte[] seekingFrom;
+ protected LLReadOptions readOptions;
+ protected final AbstractRocksIterator> rocksIterator;
+ protected final Counter startedIterSeek;
+ protected final Counter endedIterSeek;
+ protected final Timer iterSeekTime;
+ protected final Counter startedIterNext;
+ protected final Counter endedIterNext;
+ protected final Timer iterNextTime;
+ protected byte[] seekingFrom;
private byte[] seekingTo;
- RocksIteratorObj(RocksIterator rocksIterator,
- LLReadOptions readOptions, IteratorMetrics iteratorMetrics) {
+ RocksIteratorObj(AbstractRocksIterator> rocksIterator, LLReadOptions readOptions, IteratorMetrics iteratorMetrics) {
super(rocksIterator::close);
this.readOptions = readOptions;
this.rocksIterator = rocksIterator;
@@ -37,6 +37,92 @@ public class RocksIteratorObj extends SimpleResource {
this.iterNextTime = iteratorMetrics.iterNextTime();
}
+ public static RocksIteratorObj create(AbstractRocksIterator> rocksIterator,
+ LLReadOptions readOptions,
+ IteratorMetrics iteratorMetrics) {
+ return switch (rocksIterator) {
+ case RocksIterator it -> new RocksIteratorObj1(it, readOptions, iteratorMetrics);
+ case SstFileReaderIterator it -> new RocksIteratorObj2(it, readOptions, iteratorMetrics);
+ default -> throw new IllegalStateException("Unsupported iterator type");
+ };
+ }
+
+ private static class RocksIteratorObj1 extends RocksIteratorObj {
+
+ private final RocksIterator rocksIterator;
+
+ private RocksIteratorObj1(RocksIterator rocksIterator, LLReadOptions readOptions, IteratorMetrics iteratorMetrics) {
+ super(rocksIterator, readOptions, iteratorMetrics);
+ this.rocksIterator = rocksIterator;
+ }
+
+ @Deprecated(forRemoval = true)
+ public synchronized int key(ByteBuffer buffer) {
+ ensureOpen();
+ return rocksIterator.key(buffer);
+ }
+
+ @Deprecated(forRemoval = true)
+ public synchronized int value(ByteBuffer buffer) {
+ ensureOpen();
+ return rocksIterator.value(buffer);
+ }
+
+ /**
+ * The returned buffer may change when calling next() or when the iterator is not valid anymore
+ */
+ public synchronized byte[] key() {
+ ensureOpen();
+ return rocksIterator.key();
+ }
+
+ /**
+ * The returned buffer may change when calling next() or when the iterator is not valid anymore
+ */
+ public synchronized byte[] value() {
+ ensureOpen();
+ return rocksIterator.value();
+ }
+ }
+
+ private static class RocksIteratorObj2 extends RocksIteratorObj {
+
+ private final SstFileReaderIterator rocksIterator;
+
+ private RocksIteratorObj2(SstFileReaderIterator rocksIterator, LLReadOptions readOptions, IteratorMetrics iteratorMetrics) {
+ super(rocksIterator, readOptions, iteratorMetrics);
+ this.rocksIterator = rocksIterator;
+ }
+
+ @Deprecated(forRemoval = true)
+ public synchronized int key(ByteBuffer buffer) {
+ ensureOpen();
+ return rocksIterator.key(buffer);
+ }
+
+ @Deprecated(forRemoval = true)
+ public synchronized int value(ByteBuffer buffer) {
+ ensureOpen();
+ return rocksIterator.value(buffer);
+ }
+
+ /**
+ * The returned buffer may change when calling next() or when the iterator is not valid anymore
+ */
+ public synchronized byte[] key() {
+ ensureOpen();
+ return rocksIterator.key();
+ }
+
+ /**
+ * The returned buffer may change when calling next() or when the iterator is not valid anymore
+ */
+ public synchronized byte[] value() {
+ ensureOpen();
+ return rocksIterator.value();
+ }
+ }
+
public synchronized void seek(ByteBuffer seekBuf) throws RocksDBException {
ensureOpen();
startedIterSeek.increment();
@@ -70,6 +156,18 @@ public class RocksIteratorObj extends SimpleResource {
rocksIterator.status();
}
+ public synchronized void seekToFirstUnsafe() throws RocksDBException {
+ rocksIterator.seekToFirst();
+ }
+
+ public synchronized void seekToLastUnsafe() throws RocksDBException {
+ rocksIterator.seekToLast();
+ }
+
+ public synchronized void nextUnsafe() throws RocksDBException {
+ rocksIterator.next();
+ }
+
public synchronized void seekToLast() throws RocksDBException {
ensureOpen();
startedIterSeek.increment();
@@ -118,33 +216,25 @@ public class RocksIteratorObj extends SimpleResource {
return rocksIterator.isValid();
}
- @Deprecated(forRemoval = true)
- public synchronized int key(ByteBuffer buffer) {
- ensureOpen();
- return rocksIterator.key(buffer);
+ public synchronized boolean isValidUnsafe() {
+ return rocksIterator.isValid();
}
@Deprecated(forRemoval = true)
- public synchronized int value(ByteBuffer buffer) {
- ensureOpen();
- return rocksIterator.value(buffer);
- }
+ public abstract int key(ByteBuffer buffer);
+
+ @Deprecated(forRemoval = true)
+ public abstract int value(ByteBuffer buffer);
/**
* The returned buffer may change when calling next() or when the iterator is not valid anymore
*/
- public synchronized byte[] key() {
- ensureOpen();
- return rocksIterator.key();
- }
+ public abstract byte[] key();
/**
* The returned buffer may change when calling next() or when the iterator is not valid anymore
*/
- public synchronized byte[] value() {
- ensureOpen();
- return rocksIterator.value();
- }
+ public abstract byte[] value();
/**
* The returned buffer may change when calling next() or when the iterator is not valid anymore
diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java
index 09f2335..e7e6651 100644
--- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java
+++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java
@@ -5,7 +5,8 @@ import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import it.cavallium.buffer.Buf;
-import it.cavallium.dbengine.client.VerificationProgress;
+import it.cavallium.dbengine.client.DbProgress;
+import it.cavallium.dbengine.client.SSTVerificationProgress;
import it.cavallium.dbengine.database.LLDelta;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
@@ -358,7 +359,7 @@ public class LLMemoryDictionary implements LLDictionary {
}
@Override
- public Stream verifyChecksum(LLRange range) {
+ public Stream> verifyChecksum(LLRange range) {
return Stream.empty();
}
diff --git a/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java b/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java
index abbe48b..d434b15 100644
--- a/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java
+++ b/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java
@@ -11,6 +11,7 @@ import java.util.Comparator;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.Spliterator;
@@ -62,6 +63,7 @@ public class StreamUtils {
private static final BinaryOperator