Handle errors

This commit is contained in:
Andrea Cavalli 2024-09-28 15:14:18 +02:00
parent 06e754d437
commit ceff8f5022

View File

@ -42,7 +42,9 @@ import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
@ -566,7 +568,8 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
}
}
public static <T, U> Stream<Stream<Entry<T, U>>> getAllEntriesFastUnsafe(DatabaseMapDictionary<T, U> dict) {
public static <T, U> Stream<Stream<Entry<T, U>>> getAllEntriesFastUnsafe(DatabaseMapDictionary<T, U> dict,
BiConsumer<Entry<Buf, Buf>, Throwable> deserializationErrorHandler) {
try {
Comparator<RocksDBFile> comparator = Comparator.<RocksDBFile>comparingInt(x -> x.getMetadata().level()).reversed();
return ((LLLocalDictionary) dict.dictionary)
@ -580,10 +583,22 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
case RocksDBFileIterationStateKey rocksDBFileIterationStateKey:
yield switch (rocksDBFileIterationStateKey.state()) {
case RocksDBFileIterationStateKeyError e -> null;
case RocksDBFileIterationStateKeyOk rocksDBFileIterationStateKeyOk ->
Map.entry(dict.deserializeSuffix(BufDataInput.create(rocksDBFileIterationStateKey.key())),
case RocksDBFileIterationStateKeyOk rocksDBFileIterationStateKeyOk -> {
try {
yield Map.entry(dict.deserializeSuffix(BufDataInput.create(rocksDBFileIterationStateKey.key())),
dict.deserializeValue(rocksDBFileIterationStateKeyOk.value())
);
} catch (Throwable t) {
if (deserializationErrorHandler != null) {
deserializationErrorHandler.accept(Map.entry(rocksDBFileIterationStateKey.key().copy(),
rocksDBFileIterationStateKeyOk.value().copy()), t);
yield null;
} else {
throw t;
}
}
}
};
}).filter(Objects::nonNull));
} catch (RocksDBException e) {