diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index eefe956..a2584c1 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -2,6 +2,8 @@ package it.cavallium.dbengine.database.collections; import static it.cavallium.dbengine.utils.StreamUtils.resourceStream; +import com.google.common.collect.Collections2; +import com.google.common.collect.Lists; import it.cavallium.buffer.Buf; import it.cavallium.buffer.BufDataInput; import it.cavallium.buffer.BufDataOutput; @@ -24,7 +26,6 @@ import it.cavallium.dbengine.database.disk.RocksDBFile.RocksDBFileIterationKeySt import it.cavallium.dbengine.database.disk.RocksDBFile.RocksDBFileIterationState.RocksDBFileIterationStateBegin; import it.cavallium.dbengine.database.disk.RocksDBFile.RocksDBFileIterationState.RocksDBFileIterationStateEnd; import it.cavallium.dbengine.database.disk.RocksDBFile.RocksDBFileIterationState.RocksDBFileIterationStateKey; -import it.cavallium.dbengine.database.disk.SSTRange; import it.cavallium.dbengine.database.disk.SSTRange.SSTRangeFull; import it.cavallium.dbengine.database.serialization.KVSerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationException; @@ -35,8 +36,8 @@ import it.cavallium.dbengine.utils.StreamUtils; import it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMaps; -import java.util.Comparator; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; @@ -44,7 +45,6 @@ import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.function.BiFunction; -import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; @@ -568,38 +568,37 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep Stream>> getAllEntriesFastUnsafe(DatabaseMapDictionary dict, + public static List>> getAllEntriesFastUnsafe(DatabaseMapDictionary dict, BiConsumer, Throwable> deserializationErrorHandler) { try { - var liveFiles = StreamUtils.toListOn(dict.getDbReadPool(), - ((LLLocalDictionary) dict.dictionary).getAllLiveFiles()); - return liveFiles.stream() - .map(file -> file.iterate(new SSTRangeFull()).map(state -> switch (state) { - case RocksDBFileIterationStateBegin rocksDBFileIterationStateBegin: - yield null; - case RocksDBFileIterationStateEnd rocksDBFileIterationStateEnd: - yield null; - case RocksDBFileIterationStateKey rocksDBFileIterationStateKey: - yield switch (rocksDBFileIterationStateKey.state()) { - case RocksDBFileIterationStateKeyError e -> null; - case RocksDBFileIterationStateKeyOk rocksDBFileIterationStateKeyOk -> { - try { - yield Map.entry(dict.deserializeSuffix(BufDataInput.create(rocksDBFileIterationStateKey.key())), - dict.deserializeValue(rocksDBFileIterationStateKeyOk.value()) - ); - } catch (Throwable t) { - if (deserializationErrorHandler != null) { - deserializationErrorHandler.accept(Map.entry(rocksDBFileIterationStateKey.key().copy(), - rocksDBFileIterationStateKeyOk.value().copy()), t); - yield null; - } else { - throw t; - } - } + var liveFiles = ((LLLocalDictionary) dict.dictionary).getAllLiveFiles(); + return Lists.transform(liveFiles, file -> file.iterate(new SSTRangeFull()).map(state -> switch (state) { + case RocksDBFileIterationStateBegin rocksDBFileIterationStateBegin: + yield null; + case RocksDBFileIterationStateEnd rocksDBFileIterationStateEnd: + yield null; + case RocksDBFileIterationStateKey rocksDBFileIterationStateKey: + yield switch (rocksDBFileIterationStateKey.state()) { + case RocksDBFileIterationStateKeyError e -> null; + case RocksDBFileIterationStateKeyOk rocksDBFileIterationStateKeyOk -> { + try { + yield Map.entry(dict.deserializeSuffix(BufDataInput.create(rocksDBFileIterationStateKey.key())), + dict.deserializeValue(rocksDBFileIterationStateKeyOk.value()) + ); + } catch (Throwable t) { + if (deserializationErrorHandler != null) { + deserializationErrorHandler.accept(Map.entry(rocksDBFileIterationStateKey.key().copy(), + rocksDBFileIterationStateKeyOk.value().copy() + ), t); + yield null; + } else { + throw t; } + } + } - }; - }).filter(Objects::nonNull)); + }; + }).filter(Objects::nonNull)); } catch (RocksDBException e) { throw new RuntimeException(e); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java index 3f07ff3..1aaec8b 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine.database.disk; +import static it.cavallium.dbengine.utils.StreamUtils.toListOn; import static java.util.Objects.requireNonNull; import io.micrometer.core.instrument.Counter; @@ -559,16 +560,16 @@ public sealed abstract class AbstractRocksDBColumn implements } @Override - public Stream getAllLiveFiles() throws RocksDBException { + public List getAllLiveFiles() throws RocksDBException { var closeReadLock = closeLock.readLock(); try { ensureOpen(); byte[] cfhName = cfh.getName(); - return db.getLiveFilesMetaData() + return toListOn(dbReadPool, db.getLiveFilesMetaData() .parallelStream() .filter(x -> Arrays.equals(cfhName, x.columnFamilyName())) .sorted(Comparator.comparingInt(LiveFileMetaData::level).reversed()) - .map(sstFileMetaData -> new RocksDBColumnFile(db, cfh, sstFileMetaData, cfhName, sstFileMetaData.level())); + .map(sstFileMetaData -> new RocksDBColumnFile(db, cfh, sstFileMetaData, cfhName, sstFileMetaData.level()))); } finally { closeLock.unlockRead(closeReadLock); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 1e4a502..0ed51a9 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -663,7 +663,7 @@ public class LLLocalDictionary implements LLDictionary { Column column = ColumnUtils.special(columnName); try { - var liveFiles = db.getAllLiveFiles().toList(); + var liveFiles = db.getAllLiveFiles(); var liveFilesCount = liveFiles.size(); return liveFiles.stream() @@ -1205,7 +1205,7 @@ public class LLLocalDictionary implements LLDictionary { } } - public Stream getAllLiveFiles() throws RocksDBException { + public List getAllLiveFiles() throws RocksDBException { return db.getAllLiveFiles(); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java index 86b3692..7eaf74a 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java @@ -55,7 +55,7 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn { @NotNull RocksIteratorObj newIterator(@NotNull LLReadOptions readOptions, @Nullable Buf min, @Nullable Buf max); - Stream getAllLiveFiles() throws RocksDBException; + List getAllLiveFiles() throws RocksDBException; @NotNull UpdateAtomicResult updateAtomic(@NotNull LLReadOptions readOptions, @NotNull LLWriteOptions writeOptions,