Return list

This commit is contained in:
Andrea Cavalli 2024-10-02 00:42:15 +02:00
parent 591963f630
commit 86377a4e65
4 changed files with 37 additions and 37 deletions

View File

@ -2,6 +2,8 @@ package it.cavallium.dbengine.database.collections;
import static it.cavallium.dbengine.utils.StreamUtils.resourceStream; import static it.cavallium.dbengine.utils.StreamUtils.resourceStream;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import it.cavallium.buffer.Buf; import it.cavallium.buffer.Buf;
import it.cavallium.buffer.BufDataInput; import it.cavallium.buffer.BufDataInput;
import it.cavallium.buffer.BufDataOutput; import it.cavallium.buffer.BufDataOutput;
@ -24,7 +26,6 @@ import it.cavallium.dbengine.database.disk.RocksDBFile.RocksDBFileIterationKeySt
import it.cavallium.dbengine.database.disk.RocksDBFile.RocksDBFileIterationState.RocksDBFileIterationStateBegin; import it.cavallium.dbengine.database.disk.RocksDBFile.RocksDBFileIterationState.RocksDBFileIterationStateBegin;
import it.cavallium.dbengine.database.disk.RocksDBFile.RocksDBFileIterationState.RocksDBFileIterationStateEnd; import it.cavallium.dbengine.database.disk.RocksDBFile.RocksDBFileIterationState.RocksDBFileIterationStateEnd;
import it.cavallium.dbengine.database.disk.RocksDBFile.RocksDBFileIterationState.RocksDBFileIterationStateKey; import it.cavallium.dbengine.database.disk.RocksDBFile.RocksDBFileIterationState.RocksDBFileIterationStateKey;
import it.cavallium.dbengine.database.disk.SSTRange;
import it.cavallium.dbengine.database.disk.SSTRange.SSTRangeFull; import it.cavallium.dbengine.database.disk.SSTRange.SSTRangeFull;
import it.cavallium.dbengine.database.serialization.KVSerializationFunction; import it.cavallium.dbengine.database.serialization.KVSerializationFunction;
import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationException;
@ -35,8 +36,8 @@ import it.cavallium.dbengine.utils.StreamUtils;
import it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap; import it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap;
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap;
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMaps; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMaps;
import java.util.Comparator;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Objects; import java.util.Objects;
@ -44,7 +45,6 @@ import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
@ -568,38 +568,37 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
} }
} }
public static <T, U> Stream<Stream<Entry<T, U>>> getAllEntriesFastUnsafe(DatabaseMapDictionary<T, U> dict, public static <T, U> List<Stream<Entry<T, U>>> getAllEntriesFastUnsafe(DatabaseMapDictionary<T, U> dict,
BiConsumer<Entry<Buf, Buf>, Throwable> deserializationErrorHandler) { BiConsumer<Entry<Buf, Buf>, Throwable> deserializationErrorHandler) {
try { try {
var liveFiles = StreamUtils.toListOn(dict.getDbReadPool(), var liveFiles = ((LLLocalDictionary) dict.dictionary).getAllLiveFiles();
((LLLocalDictionary) dict.dictionary).getAllLiveFiles()); return Lists.transform(liveFiles, file -> file.iterate(new SSTRangeFull()).map(state -> switch (state) {
return liveFiles.stream() case RocksDBFileIterationStateBegin rocksDBFileIterationStateBegin:
.map(file -> file.iterate(new SSTRangeFull()).map(state -> switch (state) { yield null;
case RocksDBFileIterationStateBegin rocksDBFileIterationStateBegin: case RocksDBFileIterationStateEnd rocksDBFileIterationStateEnd:
yield null; yield null;
case RocksDBFileIterationStateEnd rocksDBFileIterationStateEnd: case RocksDBFileIterationStateKey rocksDBFileIterationStateKey:
yield null; yield switch (rocksDBFileIterationStateKey.state()) {
case RocksDBFileIterationStateKey rocksDBFileIterationStateKey: case RocksDBFileIterationStateKeyError e -> null;
yield switch (rocksDBFileIterationStateKey.state()) { case RocksDBFileIterationStateKeyOk rocksDBFileIterationStateKeyOk -> {
case RocksDBFileIterationStateKeyError e -> null; try {
case RocksDBFileIterationStateKeyOk rocksDBFileIterationStateKeyOk -> { yield Map.entry(dict.deserializeSuffix(BufDataInput.create(rocksDBFileIterationStateKey.key())),
try { dict.deserializeValue(rocksDBFileIterationStateKeyOk.value())
yield Map.entry(dict.deserializeSuffix(BufDataInput.create(rocksDBFileIterationStateKey.key())), );
dict.deserializeValue(rocksDBFileIterationStateKeyOk.value()) } catch (Throwable t) {
); if (deserializationErrorHandler != null) {
} catch (Throwable t) { deserializationErrorHandler.accept(Map.entry(rocksDBFileIterationStateKey.key().copy(),
if (deserializationErrorHandler != null) { rocksDBFileIterationStateKeyOk.value().copy()
deserializationErrorHandler.accept(Map.entry(rocksDBFileIterationStateKey.key().copy(), ), t);
rocksDBFileIterationStateKeyOk.value().copy()), t); yield null;
yield null; } else {
} else { throw t;
throw t;
}
}
} }
}
}
}; };
}).filter(Objects::nonNull)); }).filter(Objects::nonNull));
} catch (RocksDBException e) { } catch (RocksDBException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }

View File

@ -1,5 +1,6 @@
package it.cavallium.dbengine.database.disk; package it.cavallium.dbengine.database.disk;
import static it.cavallium.dbengine.utils.StreamUtils.toListOn;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Counter;
@ -559,16 +560,16 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
} }
@Override @Override
public Stream<RocksDBFile> getAllLiveFiles() throws RocksDBException { public List<RocksDBFile> getAllLiveFiles() throws RocksDBException {
var closeReadLock = closeLock.readLock(); var closeReadLock = closeLock.readLock();
try { try {
ensureOpen(); ensureOpen();
byte[] cfhName = cfh.getName(); byte[] cfhName = cfh.getName();
return db.getLiveFilesMetaData() return toListOn(dbReadPool, db.getLiveFilesMetaData()
.parallelStream() .parallelStream()
.filter(x -> Arrays.equals(cfhName, x.columnFamilyName())) .filter(x -> Arrays.equals(cfhName, x.columnFamilyName()))
.sorted(Comparator.comparingInt(LiveFileMetaData::level).reversed()) .sorted(Comparator.comparingInt(LiveFileMetaData::level).reversed())
.map(sstFileMetaData -> new RocksDBColumnFile(db, cfh, sstFileMetaData, cfhName, sstFileMetaData.level())); .map(sstFileMetaData -> new RocksDBColumnFile(db, cfh, sstFileMetaData, cfhName, sstFileMetaData.level())));
} finally { } finally {
closeLock.unlockRead(closeReadLock); closeLock.unlockRead(closeReadLock);
} }

View File

@ -663,7 +663,7 @@ public class LLLocalDictionary implements LLDictionary {
Column column = ColumnUtils.special(columnName); Column column = ColumnUtils.special(columnName);
try { try {
var liveFiles = db.getAllLiveFiles().toList(); var liveFiles = db.getAllLiveFiles();
var liveFilesCount = liveFiles.size(); var liveFilesCount = liveFiles.size();
return liveFiles.stream() return liveFiles.stream()
@ -1205,7 +1205,7 @@ public class LLLocalDictionary implements LLDictionary {
} }
} }
public Stream<RocksDBFile> getAllLiveFiles() throws RocksDBException { public List<RocksDBFile> getAllLiveFiles() throws RocksDBException {
return db.getAllLiveFiles(); return db.getAllLiveFiles();
} }

View File

@ -55,7 +55,7 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn {
@NotNull RocksIteratorObj newIterator(@NotNull LLReadOptions readOptions, @Nullable Buf min, @Nullable Buf max); @NotNull RocksIteratorObj newIterator(@NotNull LLReadOptions readOptions, @Nullable Buf min, @Nullable Buf max);
Stream<RocksDBFile> getAllLiveFiles() throws RocksDBException; List<RocksDBFile> getAllLiveFiles() throws RocksDBException;
@NotNull UpdateAtomicResult updateAtomic(@NotNull LLReadOptions readOptions, @NotNull UpdateAtomicResult updateAtomic(@NotNull LLReadOptions readOptions,
@NotNull LLWriteOptions writeOptions, @NotNull LLWriteOptions writeOptions,