diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index b83e841..0786e47 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -1,5 +1,7 @@ package it.cavallium.dbengine.database.collections; +import static it.cavallium.dbengine.utils.StreamUtils.resourceStream; + import it.cavallium.buffer.Buf; import it.cavallium.buffer.BufDataInput; import it.cavallium.buffer.BufDataOutput; @@ -535,8 +537,10 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> setAllEntriesAndGetPrevious(Stream> entries) { - return getAllEntries(null, false) - .onClose(() -> dictionary.setRange(range, entries.map(entry -> serializeEntry(entry)), false)); + return resourceStream( + () -> getAllEntries(null, false), + () -> dictionary.setRange(range, entries.map(entry -> serializeEntry(entry)), false) + ); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index c6a5ec6..a9b2e71 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -1,5 +1,7 @@ package it.cavallium.dbengine.database.collections; +import static it.cavallium.dbengine.utils.StreamUtils.resourceStream; + import it.cavallium.buffer.Buf; import it.cavallium.buffer.BufDataInput; import it.cavallium.buffer.BufDataOutput; @@ -16,6 +18,7 @@ import it.cavallium.dbengine.database.collections.DatabaseEmpty.Nothing; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; +import it.cavallium.dbengine.utils.StreamUtils; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap; import java.util.Map.Entry; import java.util.Optional; @@ -304,7 +307,7 @@ public class DatabaseMapDictionaryDeep> implem @Override public Stream> setAllEntriesAndGetPrevious(Stream> entries) { - return this.getAllEntries(null, false).onClose(() -> setAllEntries(entries)); + return resourceStream(() -> this.getAllEntries(null, false), () -> setAllEntries(entries)); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 474faf2..0f5815a 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -10,6 +10,7 @@ import static it.cavallium.dbengine.utils.StreamUtils.ROCKSDB_POOL; import static it.cavallium.dbengine.utils.StreamUtils.collectOn; import static it.cavallium.dbengine.utils.StreamUtils.executing; import static it.cavallium.dbengine.utils.StreamUtils.fastSummingLong; +import static it.cavallium.dbengine.utils.StreamUtils.resourceStream; import static it.cavallium.dbengine.utils.StreamUtils.streamWhileNonNull; import static java.util.Objects.requireNonNull; import static it.cavallium.dbengine.utils.StreamUtils.batches; @@ -629,39 +630,33 @@ public class LLLocalDictionary implements LLDictionary { @Override public Stream badBlocks(LLRange range) { try { - var ro = LLUtils.generateCustomReadOptions(null, - false, - isBoundedRange(range), - false - ); - ro.setFillCache(false); - if (!range.isSingle()) { - if (LLUtils.MANUAL_READAHEAD) { - ro.setReadaheadSize(32 * 1024); + return resourceStream( + () -> LLUtils.generateCustomReadOptions(null, false, isBoundedRange(range), false), + ro -> { + ro.setFillCache(false); + if (!range.isSingle()) { + if (LLUtils.MANUAL_READAHEAD) { + ro.setReadaheadSize(32 * 1024); + } } - } - ro.setVerifyChecksums(true); - var rocksIterator = db.newRocksIterator(ro, range, false); - try { - rocksIterator.seekToFirst(); - } catch (Exception ex) { - rocksIterator.close(); - ro.close(); - throw new DBException("Failed to open rocksdb iterator", ex); - } - return streamWhileNonNull(() -> { - if (!rocksIterator.isValid()) return null; - Buf rawKey = null; - try { - rawKey = rocksIterator.keyBuf().copy(); - rocksIterator.next(); - } catch (RocksDBException ex) { - return new BadBlock(databaseName, ColumnUtils.special(columnName), rawKey, ex); - } - return null; - }).takeWhile(x -> rocksIterator.isValid()).onClose(() -> { - rocksIterator.close(); - ro.close(); + ro.setVerifyChecksums(true); + return resourceStream(() -> db.newRocksIterator(ro, range, false), rocksIterator -> { + rocksIterator.seekToFirst(); + return streamWhileNonNull(() -> { + if (!rocksIterator.isValid()) return null; + Buf rawKey = null; + try { + rawKey = rocksIterator.keyBuf().copy(); + rocksIterator.next(); + } catch (RocksDBException ex) { + return new BadBlock(databaseName, ColumnUtils.special(columnName), rawKey, ex); + } + return null; + }).takeWhile(x -> rocksIterator.isValid()).onClose(() -> { + rocksIterator.close(); + ro.close(); + }); + }); }); } catch (RocksDBException e) { throw new DBException("Failed to get bad blocks", e); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java index 4c694c5..74b4ebc 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java @@ -3,6 +3,7 @@ package it.cavallium.dbengine.database.disk; import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB; import static it.cavallium.dbengine.database.LLUtils.generateCustomReadOptions; import static it.cavallium.dbengine.database.LLUtils.isBoundedRange; +import static it.cavallium.dbengine.utils.StreamUtils.resourceStream; import it.cavallium.buffer.Buf; import it.cavallium.dbengine.database.LLRange; @@ -20,6 +21,7 @@ import java.util.function.Supplier; import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDBException; @@ -53,58 +55,71 @@ public abstract class LLLocalGroupedReactiveRocksIterator { } public final Stream> stream() { - return StreamUtils.>streamWhileNonNull(() -> { - try (var readOptions = generateCustomReadOptions(this.readOptions.get(), canFillCache, isBoundedRange(range), smallRange); - var rocksIterator = db.newRocksIterator(readOptions, range, false)) { - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range)); - } - ObjectArrayList values = new ObjectArrayList<>(); - Buf firstGroupKey = null; - while (rocksIterator.isValid()) { - // Note that the underlying array is subject to changes! - Buf key = rocksIterator.keyBuf(); - if (firstGroupKey == null) { - firstGroupKey = key.copy(); - } else if (!LLUtils.equals(firstGroupKey, 0, key, 0, prefixLength)) { - break; - } - // Note that the underlying array is subject to changes! - @Nullable Buf value; - if (readValues) { - value = rocksIterator.valueBuf(); - } else { - value = null; - } + try { + return resourceStream( + () -> generateCustomReadOptions(this.readOptions.get(), canFillCache, isBoundedRange(range), smallRange), + readOptions -> StreamUtils.resourceStream( + () -> db.newRocksIterator(readOptions, range, false), + rocksIterator -> StreamUtils.streamWhileNonNull(() -> { + try { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range)); + } + ObjectArrayList values = new ObjectArrayList<>(); + Buf firstGroupKey = null; + while (rocksIterator.isValid()) { + // Note that the underlying array is subject to changes! + Buf key = rocksIterator.keyBuf(); + if (firstGroupKey == null) { + firstGroupKey = key.copy(); + } else if (!LLUtils.equals(firstGroupKey, 0, key, 0, prefixLength)) { + break; + } + // Note that the underlying array is subject to changes! + @Nullable Buf value; + if (readValues) { + value = rocksIterator.valueBuf(); + } else { + value = null; + } - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, - "Range {} is reading {}: {}", - LLUtils.toStringSafe(range), - LLUtils.toStringSafe(key), - LLUtils.toStringSafe(value) - ); - } + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, + "Range {} is reading {}: {}", + LLUtils.toStringSafe(range), + LLUtils.toStringSafe(key), + LLUtils.toStringSafe(value) + ); + } - rocksIterator.next(); - T entry = getEntry(key, value); - values.add(entry); - } - if (!values.isEmpty()) { - return values; - } else { - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Range {} ended", LLUtils.toStringSafe(range)); - } - return null; - } - } catch (RocksDBException ex) { - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Range {} failed", LLUtils.toStringSafe(range)); - } - throw new CompletionException(new DBException("Range failed", ex)); - } - }); + rocksIterator.next(); + T entry = getEntry(key, value); + values.add(entry); + } + if (!values.isEmpty()) { + return values; + } else { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Range {} ended", LLUtils.toStringSafe(range)); + } + return null; + } + } catch (RocksDBException ex) { + throw new CompletionException(generateRangeFailedException(ex)); + } + }) + ) + ); + } catch (RocksDBException ex) { + throw generateRangeFailedException(ex); + } + } + + private DBException generateRangeFailedException(RocksDBException ex) { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Range {} failed", LLUtils.toStringSafe(range)); + } + throw new DBException("Range failed", ex); } /** diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java index 85b78db..95f7901 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java @@ -3,6 +3,7 @@ package it.cavallium.dbengine.database.disk; import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB; import static it.cavallium.dbengine.database.LLUtils.generateCustomReadOptions; import static it.cavallium.dbengine.database.LLUtils.isBoundedRange; +import static it.cavallium.dbengine.utils.StreamUtils.resourceStream; import static it.cavallium.dbengine.utils.StreamUtils.streamWhileNonNull; import com.google.common.collect.Iterators; @@ -50,61 +51,74 @@ public class LLLocalKeyPrefixReactiveRocksIterator { public Stream stream() { - return streamWhileNonNull(() -> { - try (var readOptions = generateCustomReadOptions(this.readOptions.get(), canFillCache, isBoundedRange(range), smallRange); - var rocksIterator = db.newRocksIterator(readOptions, range, false)) { - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range)); - } - Buf firstGroupKey = null; - while (rocksIterator.isValid()) { - // Note that the underlying array is subject to changes! - Buf key = rocksIterator.keyBuf(); - var keyLen = key.size(); - if (keyLen >= prefixLength) { - if (firstGroupKey == null) { - firstGroupKey = key.copy(); - assert firstGroupKey == null || firstGroupKey.size() >= prefixLength; - } else if (!LLUtils.equals(firstGroupKey, - 0, - key, - 0, - prefixLength - )) { - break; - } - } else { - logger.error("Skipped a key with length {}, the expected minimum prefix key length is {}!" - + " This key will be dropped", key.size(), prefixLength); - } - rocksIterator.next(); - } + try { + return resourceStream( + () -> generateCustomReadOptions(this.readOptions.get(), canFillCache, isBoundedRange(range), smallRange), + readOptions -> resourceStream( + () -> db.newRocksIterator(readOptions, range, false), + rocksIterator -> streamWhileNonNull(() -> { + try { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range)); + } + Buf firstGroupKey = null; + while (rocksIterator.isValid()) { + // Note that the underlying array is subject to changes! + Buf key = rocksIterator.keyBuf(); + var keyLen = key.size(); + if (keyLen >= prefixLength) { + if (firstGroupKey == null) { + firstGroupKey = key.copy(); + assert firstGroupKey == null || firstGroupKey.size() >= prefixLength; + } else if (!LLUtils.equals(firstGroupKey, + 0, + key, + 0, + prefixLength + )) { + break; + } + } else { + logger.error("Skipped a key with length {}, the expected minimum prefix key length is {}!" + + " This key will be dropped", key.size(), prefixLength); + } + rocksIterator.next(); + } - if (firstGroupKey != null) { - var groupKeyPrefix = firstGroupKey.subList(0, prefixLength); + if (firstGroupKey != null) { + var groupKeyPrefix = firstGroupKey.subList(0, prefixLength); - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, - "Range {} is reading prefix {}", - LLUtils.toStringSafe(range), - LLUtils.toStringSafe(groupKeyPrefix) - ); - } + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, + "Range {} is reading prefix {}", + LLUtils.toStringSafe(range), + LLUtils.toStringSafe(groupKeyPrefix) + ); + } - return groupKeyPrefix; - } else { - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Range {} ended", LLUtils.toStringSafe(range)); - } - return null; - } - } catch (RocksDBException ex) { - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Range {} failed", LLUtils.toStringSafe(range)); - } - throw new CompletionException(new DBException("Range failed", ex)); - } - }); + return groupKeyPrefix; + } else { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Range {} ended", LLUtils.toStringSafe(range)); + } + return null; + } + } catch (RocksDBException ex) { + throw new CompletionException(generateRangeFailedException(ex)); + } + } + )) + ); + } catch (RocksDBException e) { + throw generateRangeFailedException(e); + } + } + + private DBException generateRangeFailedException(RocksDBException ex) { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Range {} failed", LLUtils.toStringSafe(range)); + } + throw new DBException("Range failed", ex); } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMigrationReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMigrationReactiveRocksIterator.java index 6f3aa03..cc6cfaa 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMigrationReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMigrationReactiveRocksIterator.java @@ -1,6 +1,7 @@ package it.cavallium.dbengine.database.disk; import static it.cavallium.dbengine.database.LLUtils.generateCustomReadOptions; +import static it.cavallium.dbengine.utils.StreamUtils.resourceStream; import static it.cavallium.dbengine.utils.StreamUtils.streamWhileNonNull; import it.cavallium.buffer.Buf; @@ -34,20 +35,26 @@ public final class LLLocalMigrationReactiveRocksIterator { } public Stream stream() { - return streamWhileNonNull(() -> { - try (var readOptions = generateCustomReadOptions(this.readOptions.get(), false, false, false); - var rocksIterator = db.newRocksIterator(readOptions, range, false)) { - if (rocksIterator.isValid()) { - var key = rocksIterator.keyBuf().copy(); - var value = rocksIterator.valueBuf().copy(); - rocksIterator.next(false); - return LLEntry.of(key, value); - } else { - return null; - } - } catch (RocksDBException e) { - throw new DBException("Failed to open iterator", e); - } - }); + try { + return resourceStream( + // Create the read options + () -> generateCustomReadOptions(this.readOptions.get(), false, false, false), + readOptions -> resourceStream( + // Create the iterator + () -> db.newRocksIterator(readOptions, range, false), + // Stream the iterator values until null is returned + iterator -> streamWhileNonNull(() -> { + if (iterator.isValid()) { + var key = iterator.keyBuf().copy(); + var value = iterator.valueBuf().copy(); + iterator.next(false); + return LLEntry.of(key, value); + } else { + return null; + } + }))); + } catch (RocksDBException e) { + throw new DBException("Failed to open iterator", e); + } } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index 37918b3..e5add52 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -67,10 +67,6 @@ import org.jetbrains.annotations.Nullable; public class LLLocalMultiLuceneIndex extends SimpleResource implements LLLuceneIndex, LuceneCloseable { private static final Logger LOG = LogManager.getLogger(LLLuceneIndex.class); - private static final boolean BYPASS_GROUPBY_BUG = Boolean.parseBoolean(System.getProperty( - "it.cavallium.dbengine.bypassGroupByBug", - "false" - )); private final String clusterName; private final boolean lowMemory; diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java index 4aabe74..3845dc7 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java @@ -3,6 +3,7 @@ package it.cavallium.dbengine.database.disk; import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB; import static it.cavallium.dbengine.database.LLUtils.generateCustomReadOptions; import static it.cavallium.dbengine.database.LLUtils.isBoundedRange; +import static it.cavallium.dbengine.utils.StreamUtils.resourceStream; import static it.cavallium.dbengine.utils.StreamUtils.streamWhileNonNull; import it.cavallium.buffer.Buf; @@ -48,52 +49,65 @@ public abstract class LLLocalReactiveRocksIterator { } public final Stream stream() { - return streamWhileNonNull(() -> { - try (var readOptions = generateCustomReadOptions(this.readOptions.get(), true, isBoundedRange(range), smallRange); - var rocksIterator = db.newRocksIterator(readOptions, range, reverse)) { - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range)); - } - if (rocksIterator.isValid()) { - // Note that the underlying array is subject to changes! - Buf key; - key = rocksIterator.keyBuf(); - // Note that the underlying array is subject to changes! - Buf value; - if (readValues) { - value = rocksIterator.valueBuf(); - } else { - value = null; - } + try { + return resourceStream( + () -> generateCustomReadOptions(this.readOptions.get(), true, isBoundedRange(range), smallRange), + readOptions -> resourceStream( + () -> db.newRocksIterator(readOptions, range, reverse), + rocksIterator -> streamWhileNonNull(() -> { + try { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range)); + } + if (rocksIterator.isValid()) { + // Note that the underlying array is subject to changes! + Buf key; + key = rocksIterator.keyBuf(); + // Note that the underlying array is subject to changes! + Buf value; + if (readValues) { + value = rocksIterator.valueBuf(); + } else { + value = null; + } - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, - "Range {} is reading {}: {}", - LLUtils.toStringSafe(range), - LLUtils.toStringSafe(key), - LLUtils.toStringSafe(value) - ); - } + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, + "Range {} is reading {}: {}", + LLUtils.toStringSafe(range), + LLUtils.toStringSafe(key), + LLUtils.toStringSafe(value) + ); + } - if (reverse) { - rocksIterator.prev(); - } else { - rocksIterator.next(); - } - return getEntry(key, value); - } else { - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Range {} ended", LLUtils.toStringSafe(range)); - } - return null; - } - } catch (RocksDBException ex) { - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Range {} failed", LLUtils.toStringSafe(range)); - } - throw new CompletionException(ex); - } - }); + if (reverse) { + rocksIterator.prev(); + } else { + rocksIterator.next(); + } + return getEntry(key, value); + } else { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Range {} ended", LLUtils.toStringSafe(range)); + } + return null; + } + } catch (RocksDBException ex) { + throw new CompletionException(generateRangeFailedException(ex)); + } + }) + ) + ); + } catch (RocksDBException ex) { + throw generateRangeFailedException(ex); + } + } + + private DBException generateRangeFailedException(RocksDBException ex) { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Range {} failed", LLUtils.toStringSafe(range)); + } + throw new DBException("Range failed", ex); } /** diff --git a/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java b/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java index 384685a..3a0f939 100644 --- a/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java +++ b/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java @@ -32,6 +32,8 @@ import java.util.stream.Collector; import java.util.stream.Collector.Characteristics; import java.util.stream.Stream; import java.util.stream.StreamSupport; +import org.apache.commons.lang3.function.FailableFunction; +import org.apache.commons.lang3.function.FailableSupplier; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -324,6 +326,57 @@ public class StreamUtils { return Streams.mapWithIndex(stream, mapper::apply); } + /** + * Checks if stream.onClose() will be called during the stream lifetime + */ + public static Stream resourceStream(Stream stream) { + var sr = new StreamResource(null); + return stream.onClose(sr::close); + } + + /** + * Checks if stream.onClose() will be called during the stream lifetime + */ + public static Stream resourceStream( + FailableSupplier resourceInitializer, + FailableFunction, ? extends EX> streamInitializer) throws EX { + SR resource = resourceInitializer.get(); + var sr = new StreamResource(resource::close); + try { + Stream stream = streamInitializer.apply(resource); + return stream.onClose(sr::close); + } catch (Throwable ex) { + sr.close(); + throw ex; + } + } + + /** + * Checks if stream.onClose() will be called during the stream lifetime + */ + public static Stream resourceStream(Stream stream, Runnable finalization) { + var sr = new StreamResource(finalization); + try { + return stream.onClose(sr::close); + } catch (Throwable ex) { + sr.close(); + throw ex; + } + } + + /** + * Checks if stream.onClose() will be called during the stream lifetime + */ + public static Stream resourceStream(Supplier> stream, Runnable finalization) { + var sr = new StreamResource(finalization); + try { + return stream.get().onClose(sr::close); + } catch (Throwable ex) { + sr.close(); + throw ex; + } + } + private record BatchSpliterator(Spliterator base, int batchSize) implements Spliterator> { @Override @@ -556,4 +609,20 @@ public class StreamUtils { return worker; } } + + private static class StreamResource extends SimpleResource { + + private final Runnable finalization; + + public StreamResource(Runnable finalization) { + this.finalization = finalization; + } + + @Override + protected void onClose() { + if (finalization != null) { + finalization.run(); + } + } + } }