diff --git a/src/main/java/org/warp/filesponge/DiskCache.java b/src/main/java/org/warp/filesponge/DiskCache.java index b097202..295fa9b 100644 --- a/src/main/java/org/warp/filesponge/DiskCache.java +++ b/src/main/java/org/warp/filesponge/DiskCache.java @@ -153,7 +153,7 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { public Flux requestContent(URL url) { return requestDiskMetadata(url) .filter(DiskMetadata::isDownloadedFully) - .flatMapMany(meta -> Flux.fromIterable(meta.downloadedBlocks()) + .flatMapMany(meta -> Flux.fromStream(meta.downloadedBlocks()::stream) .index() // Get only downloaded blocks .filter(Tuple2::getT2) diff --git a/src/main/java/org/warp/filesponge/FileSponge.java b/src/main/java/org/warp/filesponge/FileSponge.java index 499f743..79bb219 100644 --- a/src/main/java/org/warp/filesponge/FileSponge.java +++ b/src/main/java/org/warp/filesponge/FileSponge.java @@ -57,10 +57,9 @@ public class FileSponge implements URLsHandler { public Flux requestContent(URL url) { AtomicBoolean alreadyPrintedDebug = new AtomicBoolean(false); return Flux - .fromIterable(cacheAccess) + .fromStream(cacheAccess::stream) .map(urlsHandler -> urlsHandler.requestContent(url)) .collectList() - .doOnDiscard(DataBlock.class, DataBlock::release) .flatMapMany(monos -> FileSpongeUtils.firstWithValueFlux(monos)) .doOnNext(dataBlock -> { if (alreadyPrintedDebug.compareAndSet(false, true)) { @@ -68,33 +67,34 @@ public class FileSponge implements URLsHandler { } }) .switchIfEmpty(Flux - .fromIterable(urlsHandlers) + .fromStream(urlsHandlers::stream) .doOnSubscribe(s -> logger.debug("Downloading file \"{}\" content", url)) .map(urlsHandler -> urlsHandler .requestContent(url) .flatMapSequential(dataBlock -> Flux - .fromIterable(cacheWrite) + .fromStream(cacheWrite::stream) .flatMapSequential(cw -> cw.writeContentBlock(url, dataBlock)) .then(Mono.just(dataBlock)) ) ) .collectList() - .doOnDiscard(Flux.class, f -> { - //noinspection unchecked - Flux flux = (Flux) f; - flux.doOnNext(DataBlock::release).subscribeOn(Schedulers.single()).subscribe(); - }) .flatMapMany(monos -> FileSpongeUtils.firstWithValueFlux(monos)) .doOnComplete(() -> logger.debug("Downloaded file \"{}\" content", url)) ) .distinct(DataBlock::getId) - .doOnDiscard(DataBlock.class, DataBlock::release); + + .doOnDiscard(DataBlock.class, DataBlock::release) + .doOnDiscard(Flux.class, f -> { + //noinspection unchecked + Flux flux = (Flux) f; + flux.doOnNext(DataBlock::release).subscribeOn(Schedulers.single()).subscribe(); + }); } @Override public Mono requestMetadata(URL url) { return Flux - .fromIterable(cacheAccess) + .fromStream(cacheAccess::stream) .map(urlsHandler -> urlsHandler.requestMetadata(url)) .collectList() .flatMap(monos -> FileSpongeUtils.firstWithValueMono(monos)) @@ -104,12 +104,12 @@ public class FileSponge implements URLsHandler { } }) .switchIfEmpty(Flux - .fromIterable(urlsHandlers) + .fromStream(urlsHandlers::stream) .doOnSubscribe(s -> logger.debug("Downloading file \"{}\" metadata", url)) .map(urlsHandler -> urlsHandler .requestMetadata(url) .flatMap(dataBlock -> Flux - .fromIterable(cacheWrite) + .fromStream(cacheWrite::stream) .flatMapSequential(cw -> cw.writeMetadata(url, dataBlock)) .then(Mono.just(dataBlock)) ) diff --git a/src/main/java/org/warp/filesponge/FileSpongeUtils.java b/src/main/java/org/warp/filesponge/FileSpongeUtils.java index c73d0af..795ab4d 100644 --- a/src/main/java/org/warp/filesponge/FileSpongeUtils.java +++ b/src/main/java/org/warp/filesponge/FileSpongeUtils.java @@ -37,28 +37,25 @@ public class FileSpongeUtils { private static final Logger logger = LoggerFactory.getLogger(FileSponge.class); public static Mono firstWithValueMono(List> monos) { - return Mono.firstWithValue(monos).onErrorResume(FileSpongeUtils::ignoreFakeErrors); + return Mono.firstWithValue(monos).doOnError(ex -> {}).onErrorResume(FileSpongeUtils::ignoreFakeErrors); } public static Flux firstWithValueFlux(List> monos) { - return Flux.firstWithValue(monos).onErrorResume(FileSpongeUtils::ignoreFakeErrors); + return Flux.firstWithValue(monos).doOnError(ex -> {}).onErrorResume(FileSpongeUtils::ignoreFakeErrors); } private static Mono ignoreFakeErrors(Throwable ex) { - return Mono.create(sink -> { - if (ex instanceof NoSuchElementException) { - var multiple = Exceptions.unwrapMultiple(ex.getCause()); - for (Throwable throwable : multiple) { - if (!(throwable instanceof NoSuchElementException)) { - sink.error(ex); - return; - } + if (ex instanceof NoSuchElementException) { + var multiple = Exceptions.unwrapMultiple(ex.getCause()); + for (Throwable throwable : multiple) { + if (!(throwable instanceof NoSuchElementException)) { + return Mono.error(ex); } - sink.success(); - } else { - sink.error(ex); } - }); + return Mono.empty(); + } else { + return Mono.error(ex); + } } public static Mono deleteFileAfter(Path path, Duration delay) { diff --git a/src/test/java/org/warp/filesponge/ThreadSafety.java b/src/test/java/org/warp/filesponge/ThreadSafety.java index 98c5706..eca4bf4 100644 --- a/src/test/java/org/warp/filesponge/ThreadSafety.java +++ b/src/test/java/org/warp/filesponge/ThreadSafety.java @@ -28,7 +28,7 @@ public class ThreadSafety { return s; })).subscribeOn(schedulerSingle)) .ignoreElements() - .thenMany(Flux.defer(() -> Flux.fromIterable(list))) + .thenMany(Flux.defer(() -> Flux.fromStream(list::stream))) .subscribeOn(schedulerParallel); Integer[] checks = new Integer[iterations * 2];