Use streams instead of iterables

This commit is contained in:
Andrea Cavalli 2021-06-06 02:24:21 +02:00
parent eeadda9b78
commit d567026008
4 changed files with 26 additions and 29 deletions

View File

@ -153,7 +153,7 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
public Flux<DataBlock> requestContent(URL url) { public Flux<DataBlock> requestContent(URL url) {
return requestDiskMetadata(url) return requestDiskMetadata(url)
.filter(DiskMetadata::isDownloadedFully) .filter(DiskMetadata::isDownloadedFully)
.flatMapMany(meta -> Flux.fromIterable(meta.downloadedBlocks()) .flatMapMany(meta -> Flux.fromStream(meta.downloadedBlocks()::stream)
.index() .index()
// Get only downloaded blocks // Get only downloaded blocks
.filter(Tuple2::getT2) .filter(Tuple2::getT2)

View File

@ -57,10 +57,9 @@ public class FileSponge implements URLsHandler {
public Flux<DataBlock> requestContent(URL url) { public Flux<DataBlock> requestContent(URL url) {
AtomicBoolean alreadyPrintedDebug = new AtomicBoolean(false); AtomicBoolean alreadyPrintedDebug = new AtomicBoolean(false);
return Flux return Flux
.fromIterable(cacheAccess) .fromStream(cacheAccess::stream)
.map(urlsHandler -> urlsHandler.requestContent(url)) .map(urlsHandler -> urlsHandler.requestContent(url))
.collectList() .collectList()
.doOnDiscard(DataBlock.class, DataBlock::release)
.flatMapMany(monos -> FileSpongeUtils.firstWithValueFlux(monos)) .flatMapMany(monos -> FileSpongeUtils.firstWithValueFlux(monos))
.doOnNext(dataBlock -> { .doOnNext(dataBlock -> {
if (alreadyPrintedDebug.compareAndSet(false, true)) { if (alreadyPrintedDebug.compareAndSet(false, true)) {
@ -68,33 +67,34 @@ public class FileSponge implements URLsHandler {
} }
}) })
.switchIfEmpty(Flux .switchIfEmpty(Flux
.fromIterable(urlsHandlers) .fromStream(urlsHandlers::stream)
.doOnSubscribe(s -> logger.debug("Downloading file \"{}\" content", url)) .doOnSubscribe(s -> logger.debug("Downloading file \"{}\" content", url))
.map(urlsHandler -> urlsHandler .map(urlsHandler -> urlsHandler
.requestContent(url) .requestContent(url)
.flatMapSequential(dataBlock -> Flux .flatMapSequential(dataBlock -> Flux
.fromIterable(cacheWrite) .fromStream(cacheWrite::stream)
.flatMapSequential(cw -> cw.writeContentBlock(url, dataBlock)) .flatMapSequential(cw -> cw.writeContentBlock(url, dataBlock))
.then(Mono.just(dataBlock)) .then(Mono.just(dataBlock))
) )
) )
.collectList() .collectList()
.doOnDiscard(Flux.class, f -> {
//noinspection unchecked
Flux<DataBlock> flux = (Flux<DataBlock>) f;
flux.doOnNext(DataBlock::release).subscribeOn(Schedulers.single()).subscribe();
})
.flatMapMany(monos -> FileSpongeUtils.firstWithValueFlux(monos)) .flatMapMany(monos -> FileSpongeUtils.firstWithValueFlux(monos))
.doOnComplete(() -> logger.debug("Downloaded file \"{}\" content", url)) .doOnComplete(() -> logger.debug("Downloaded file \"{}\" content", url))
) )
.distinct(DataBlock::getId) .distinct(DataBlock::getId)
.doOnDiscard(DataBlock.class, DataBlock::release);
.doOnDiscard(DataBlock.class, DataBlock::release)
.doOnDiscard(Flux.class, f -> {
//noinspection unchecked
Flux<DataBlock> flux = (Flux<DataBlock>) f;
flux.doOnNext(DataBlock::release).subscribeOn(Schedulers.single()).subscribe();
});
} }
@Override @Override
public Mono<Metadata> requestMetadata(URL url) { public Mono<Metadata> requestMetadata(URL url) {
return Flux return Flux
.fromIterable(cacheAccess) .fromStream(cacheAccess::stream)
.map(urlsHandler -> urlsHandler.requestMetadata(url)) .map(urlsHandler -> urlsHandler.requestMetadata(url))
.collectList() .collectList()
.flatMap(monos -> FileSpongeUtils.firstWithValueMono(monos)) .flatMap(monos -> FileSpongeUtils.firstWithValueMono(monos))
@ -104,12 +104,12 @@ public class FileSponge implements URLsHandler {
} }
}) })
.switchIfEmpty(Flux .switchIfEmpty(Flux
.fromIterable(urlsHandlers) .fromStream(urlsHandlers::stream)
.doOnSubscribe(s -> logger.debug("Downloading file \"{}\" metadata", url)) .doOnSubscribe(s -> logger.debug("Downloading file \"{}\" metadata", url))
.map(urlsHandler -> urlsHandler .map(urlsHandler -> urlsHandler
.requestMetadata(url) .requestMetadata(url)
.flatMap(dataBlock -> Flux .flatMap(dataBlock -> Flux
.fromIterable(cacheWrite) .fromStream(cacheWrite::stream)
.flatMapSequential(cw -> cw.writeMetadata(url, dataBlock)) .flatMapSequential(cw -> cw.writeMetadata(url, dataBlock))
.then(Mono.just(dataBlock)) .then(Mono.just(dataBlock))
) )

View File

@ -37,28 +37,25 @@ public class FileSpongeUtils {
private static final Logger logger = LoggerFactory.getLogger(FileSponge.class); private static final Logger logger = LoggerFactory.getLogger(FileSponge.class);
public static <T> Mono<T> firstWithValueMono(List<Mono<T>> monos) { public static <T> Mono<T> firstWithValueMono(List<Mono<T>> monos) {
return Mono.firstWithValue(monos).onErrorResume(FileSpongeUtils::ignoreFakeErrors); return Mono.firstWithValue(monos).doOnError(ex -> {}).onErrorResume(FileSpongeUtils::ignoreFakeErrors);
} }
public static <T> Flux<T> firstWithValueFlux(List<Flux<T>> monos) { public static <T> Flux<T> firstWithValueFlux(List<Flux<T>> monos) {
return Flux.firstWithValue(monos).onErrorResume(FileSpongeUtils::ignoreFakeErrors); return Flux.firstWithValue(monos).doOnError(ex -> {}).onErrorResume(FileSpongeUtils::ignoreFakeErrors);
} }
private static <T> Mono<T> ignoreFakeErrors(Throwable ex) { private static <T> Mono<T> ignoreFakeErrors(Throwable ex) {
return Mono.create(sink -> { if (ex instanceof NoSuchElementException) {
if (ex instanceof NoSuchElementException) { var multiple = Exceptions.unwrapMultiple(ex.getCause());
var multiple = Exceptions.unwrapMultiple(ex.getCause()); for (Throwable throwable : multiple) {
for (Throwable throwable : multiple) { if (!(throwable instanceof NoSuchElementException)) {
if (!(throwable instanceof NoSuchElementException)) { return Mono.error(ex);
sink.error(ex);
return;
}
} }
sink.success();
} else {
sink.error(ex);
} }
}); return Mono.empty();
} else {
return Mono.error(ex);
}
} }
public static Mono<Path> deleteFileAfter(Path path, Duration delay) { public static Mono<Path> deleteFileAfter(Path path, Duration delay) {

View File

@ -28,7 +28,7 @@ public class ThreadSafety {
return s; return s;
})).subscribeOn(schedulerSingle)) })).subscribeOn(schedulerSingle))
.ignoreElements() .ignoreElements()
.thenMany(Flux.defer(() -> Flux.fromIterable(list))) .thenMany(Flux.defer(() -> Flux.fromStream(list::stream)))
.subscribeOn(schedulerParallel); .subscribeOn(schedulerParallel);
Integer[] checks = new Integer[iterations * 2]; Integer[] checks = new Integer[iterations * 2];