Low-level sst entry
This commit is contained in:
parent
86377a4e65
commit
6af06ca90e
@ -2,7 +2,6 @@ package it.cavallium.dbengine.database.collections;
|
|||||||
|
|
||||||
import static it.cavallium.dbengine.utils.StreamUtils.resourceStream;
|
import static it.cavallium.dbengine.utils.StreamUtils.resourceStream;
|
||||||
|
|
||||||
import com.google.common.collect.Collections2;
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import it.cavallium.buffer.Buf;
|
import it.cavallium.buffer.Buf;
|
||||||
import it.cavallium.buffer.BufDataInput;
|
import it.cavallium.buffer.BufDataInput;
|
||||||
@ -20,7 +19,6 @@ import it.cavallium.dbengine.database.UpdateMode;
|
|||||||
import it.cavallium.dbengine.database.UpdateReturnMode;
|
import it.cavallium.dbengine.database.UpdateReturnMode;
|
||||||
import it.cavallium.dbengine.database.disk.CachedSerializationFunction;
|
import it.cavallium.dbengine.database.disk.CachedSerializationFunction;
|
||||||
import it.cavallium.dbengine.database.disk.LLLocalDictionary;
|
import it.cavallium.dbengine.database.disk.LLLocalDictionary;
|
||||||
import it.cavallium.dbengine.database.disk.RocksDBFile;
|
|
||||||
import it.cavallium.dbengine.database.disk.RocksDBFile.RocksDBFileIterationKeyState.RocksDBFileIterationStateKeyError;
|
import it.cavallium.dbengine.database.disk.RocksDBFile.RocksDBFileIterationKeyState.RocksDBFileIterationStateKeyError;
|
||||||
import it.cavallium.dbengine.database.disk.RocksDBFile.RocksDBFileIterationKeyState.RocksDBFileIterationStateKeyOk;
|
import it.cavallium.dbengine.database.disk.RocksDBFile.RocksDBFileIterationKeyState.RocksDBFileIterationStateKeyOk;
|
||||||
import it.cavallium.dbengine.database.disk.RocksDBFile.RocksDBFileIterationState.RocksDBFileIterationStateBegin;
|
import it.cavallium.dbengine.database.disk.RocksDBFile.RocksDBFileIterationState.RocksDBFileIterationStateBegin;
|
||||||
@ -36,7 +34,6 @@ import it.cavallium.dbengine.utils.StreamUtils;
|
|||||||
import it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap;
|
import it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap;
|
||||||
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap;
|
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap;
|
||||||
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMaps;
|
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMaps;
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
@ -568,11 +565,13 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <T, U> List<Stream<Entry<T, U>>> getAllEntriesFastUnsafe(DatabaseMapDictionary<T, U> dict,
|
public static <T, U> List<Stream<UnsafeSSTEntry<T, U>>> getAllEntriesFastUnsafe(DatabaseMapDictionary<T, U> dict,
|
||||||
BiConsumer<Entry<Buf, Buf>, Throwable> deserializationErrorHandler) {
|
boolean disableRocksdbChecks,
|
||||||
|
BiConsumer<UnsafeRawSSTEntry<T, U>, Throwable> deserializationErrorHandler) {
|
||||||
try {
|
try {
|
||||||
var liveFiles = ((LLLocalDictionary) dict.dictionary).getAllLiveFiles();
|
var liveFiles = ((LLLocalDictionary) dict.dictionary).getAllLiveFiles();
|
||||||
return Lists.transform(liveFiles, file -> file.iterate(new SSTRangeFull()).map(state -> switch (state) {
|
return Lists.transform(liveFiles, file -> file.iterate(new SSTRangeFull(), disableRocksdbChecks)
|
||||||
|
.map(state -> switch (state) {
|
||||||
case RocksDBFileIterationStateBegin rocksDBFileIterationStateBegin:
|
case RocksDBFileIterationStateBegin rocksDBFileIterationStateBegin:
|
||||||
yield null;
|
yield null;
|
||||||
case RocksDBFileIterationStateEnd rocksDBFileIterationStateEnd:
|
case RocksDBFileIterationStateEnd rocksDBFileIterationStateEnd:
|
||||||
@ -581,14 +580,26 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
|||||||
yield switch (rocksDBFileIterationStateKey.state()) {
|
yield switch (rocksDBFileIterationStateKey.state()) {
|
||||||
case RocksDBFileIterationStateKeyError e -> null;
|
case RocksDBFileIterationStateKeyError e -> null;
|
||||||
case RocksDBFileIterationStateKeyOk rocksDBFileIterationStateKeyOk -> {
|
case RocksDBFileIterationStateKeyOk rocksDBFileIterationStateKeyOk -> {
|
||||||
|
var key = rocksDBFileIterationStateKey.key();
|
||||||
|
var value = rocksDBFileIterationStateKeyOk.value();
|
||||||
try {
|
try {
|
||||||
yield Map.entry(dict.deserializeSuffix(BufDataInput.create(rocksDBFileIterationStateKey.key())),
|
var deserializedKey = dict.deserializeSuffix(BufDataInput.create(key));
|
||||||
dict.deserializeValue(rocksDBFileIterationStateKeyOk.value())
|
var deserializedValue = dict.deserializeValue(value);
|
||||||
|
yield new UnsafeSSTEntry<>(file,
|
||||||
|
deserializedKey,
|
||||||
|
deserializedValue,
|
||||||
|
key,
|
||||||
|
value,
|
||||||
|
k -> dict.deserializeSuffix(BufDataInput.create(k)),
|
||||||
|
dict::deserializeValue
|
||||||
);
|
);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
if (deserializationErrorHandler != null) {
|
if (deserializationErrorHandler != null) {
|
||||||
deserializationErrorHandler.accept(Map.entry(rocksDBFileIterationStateKey.key().copy(),
|
deserializationErrorHandler.accept(new UnsafeRawSSTEntry<>(file,
|
||||||
rocksDBFileIterationStateKeyOk.value().copy()
|
key,
|
||||||
|
value,
|
||||||
|
k -> dict.deserializeSuffix(BufDataInput.create(k)),
|
||||||
|
dict::deserializeValue
|
||||||
), t);
|
), t);
|
||||||
yield null;
|
yield null;
|
||||||
} else {
|
} else {
|
||||||
@ -598,7 +609,8 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
|||||||
}
|
}
|
||||||
|
|
||||||
};
|
};
|
||||||
}).filter(Objects::nonNull));
|
})
|
||||||
|
.filter(Objects::nonNull));
|
||||||
} catch (RocksDBException e) {
|
} catch (RocksDBException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,10 @@
|
|||||||
|
package it.cavallium.dbengine.database.collections;
|
||||||
|
|
||||||
|
import it.cavallium.buffer.Buf;
|
||||||
|
import it.cavallium.dbengine.database.disk.RocksDBFile;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
|
public record UnsafeRawSSTEntry<T, U>(RocksDBFile file,
|
||||||
|
Buf rawKey, Buf rawValue,
|
||||||
|
Function<Buf, T> keyDeserializer,
|
||||||
|
Function<Buf, U> valueDeserializer) {}
|
@ -0,0 +1,11 @@
|
|||||||
|
package it.cavallium.dbengine.database.collections;
|
||||||
|
|
||||||
|
import it.cavallium.buffer.Buf;
|
||||||
|
import it.cavallium.dbengine.database.disk.RocksDBFile;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
|
public record UnsafeSSTEntry<T, U>(RocksDBFile file,
|
||||||
|
T key, U value,
|
||||||
|
Buf rawKey, Buf rawValue,
|
||||||
|
Function<Buf, T> keyDeserializer,
|
||||||
|
Function<Buf, U> valueDeserializer) {}
|
@ -55,7 +55,6 @@ public class RocksDBFile implements Comparable<RocksDBFile> {
|
|||||||
Long sstNumber = null;
|
Long sstNumber = null;
|
||||||
if (extensionIndex != -1) {
|
if (extensionIndex != -1) {
|
||||||
String numberRaw = fileName.substring(0, extensionIndex);
|
String numberRaw = fileName.substring(0, extensionIndex);
|
||||||
//noinspection UnstableApiUsage
|
|
||||||
this.sstNumber = Longs.tryParse(numberRaw);
|
this.sstNumber = Longs.tryParse(numberRaw);
|
||||||
} else {
|
} else {
|
||||||
this.sstNumber = null;
|
this.sstNumber = null;
|
||||||
@ -95,7 +94,7 @@ public class RocksDBFile implements Comparable<RocksDBFile> {
|
|||||||
public Stream<SSTVerificationProgress> verify(SSTRange range) {
|
public Stream<SSTVerificationProgress> verify(SSTRange range) {
|
||||||
AtomicLong fileScanned = new AtomicLong();
|
AtomicLong fileScanned = new AtomicLong();
|
||||||
AtomicLong fileTotal = new AtomicLong();
|
AtomicLong fileTotal = new AtomicLong();
|
||||||
return iterate(range).map(state -> switch (state) {
|
return iterate(range, true).map(state -> switch (state) {
|
||||||
case RocksDBFileIterationStateBegin begin -> {
|
case RocksDBFileIterationStateBegin begin -> {
|
||||||
var countEstimate = begin.metadata().countEstimate();
|
var countEstimate = begin.metadata().countEstimate();
|
||||||
if (countEstimate != null) {
|
if (countEstimate != null) {
|
||||||
@ -115,10 +114,10 @@ public class RocksDBFile implements Comparable<RocksDBFile> {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public Stream<SSTDumpProgress> readAllSST(SSTRange range, boolean failOnError) {
|
public Stream<SSTDumpProgress> readAllSST(SSTRange range, boolean failOnError, boolean disableRocksdbChecks) {
|
||||||
AtomicLong fileScanned = new AtomicLong();
|
AtomicLong fileScanned = new AtomicLong();
|
||||||
AtomicLong fileTotal = new AtomicLong();
|
AtomicLong fileTotal = new AtomicLong();
|
||||||
return iterate(range).<SSTDumpProgress>mapMulti((state, consumer) -> {
|
return iterate(range, disableRocksdbChecks).<SSTDumpProgress>mapMulti((state, consumer) -> {
|
||||||
switch (state) {
|
switch (state) {
|
||||||
case RocksDBFileIterationStateBegin begin -> {
|
case RocksDBFileIterationStateBegin begin -> {
|
||||||
var countEstimate = begin.metadata().countEstimate();
|
var countEstimate = begin.metadata().countEstimate();
|
||||||
@ -151,7 +150,7 @@ public class RocksDBFile implements Comparable<RocksDBFile> {
|
|||||||
}).takeWhile(data -> !(data instanceof SSTBlockFail));
|
}).takeWhile(data -> !(data instanceof SSTBlockFail));
|
||||||
}
|
}
|
||||||
|
|
||||||
public Stream<RocksDBFileIterationState> iterate(SSTRange rangeFull) {
|
public Stream<RocksDBFileIterationState> iterate(SSTRange rangeFull, boolean disableRocksdbChecks) {
|
||||||
var intersectedRange = RocksDBFile.intersectWithMetadata(metadata.keysRange(), rangeFull);
|
var intersectedRange = RocksDBFile.intersectWithMetadata(metadata.keysRange(), rangeFull);
|
||||||
|
|
||||||
Path filePath = metadata.filePath();
|
Path filePath = metadata.filePath();
|
||||||
@ -171,7 +170,7 @@ public class RocksDBFile implements Comparable<RocksDBFile> {
|
|||||||
AtomicLong fileScanned = new AtomicLong();
|
AtomicLong fileScanned = new AtomicLong();
|
||||||
AtomicBoolean mustSeek = new AtomicBoolean(true);
|
AtomicBoolean mustSeek = new AtomicBoolean(true);
|
||||||
try {
|
try {
|
||||||
streamContent = resourceStream(() -> new LLSstFileReader(false, filePathString),
|
streamContent = resourceStream(() -> new LLSstFileReader(!disableRocksdbChecks, filePathString),
|
||||||
r -> resourceStream(() -> LLUtils.generateCustomReadOptions(null, false, intersectedRange.isBounded(), false),
|
r -> resourceStream(() -> LLUtils.generateCustomReadOptions(null, false, intersectedRange.isBounded(), false),
|
||||||
ro -> {
|
ro -> {
|
||||||
long skipToIndex;
|
long skipToIndex;
|
||||||
|
Loading…
Reference in New Issue
Block a user