From 8522e7a866bf546594e3bc66c22b14a22eed0edb Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Fri, 17 Sep 2021 20:38:28 +0200 Subject: [PATCH] Performance optimization --- .../java/org/warp/filesponge/FileSponge.java | 86 ++++++++++++------- 1 file changed, 54 insertions(+), 32 deletions(-) diff --git a/src/main/java/org/warp/filesponge/FileSponge.java b/src/main/java/org/warp/filesponge/FileSponge.java index f69d675..086d760 100644 --- a/src/main/java/org/warp/filesponge/FileSponge.java +++ b/src/main/java/org/warp/filesponge/FileSponge.java @@ -19,6 +19,8 @@ package org.warp.filesponge; import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet; +import java.util.ArrayList; +import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -74,28 +76,38 @@ public class FileSponge implements URLsHandler { @Override public Flux requestContent(URL url) { AtomicBoolean alreadyPrintedDebug = new AtomicBoolean(false); - return Flux - .defer(() -> Flux.fromIterable(cacheAccess)) - .map(urlsHandler -> urlsHandler.requestContent(url)) - .collectList() + return Mono + .fromCallable(() -> { + List> contentRequests = new ArrayList<>(cacheAccess.size()); + for (URLsDiskHandler urlsHandler : cacheAccess) { + contentRequests.add(urlsHandler.requestContent(url)); + } + return contentRequests; + }) .flatMapMany(FileSpongeUtils::firstWithValueFlux) .doOnNext(dataBlock -> { if (alreadyPrintedDebug.compareAndSet(false, true)) { logger.debug("File \"{}\" content has been found in the cache", url); } }) - .switchIfEmpty(Flux - .defer(() -> Flux.fromIterable(urlsHandlers)) - .doOnSubscribe(s -> logger.debug("Downloading file \"{}\" content", url)) - .map(urlsHandler -> urlsHandler - .requestContent(url) - .flatMapSequential(dataBlock -> Flux - .defer(() -> Flux.fromIterable(cacheWrite)) - .flatMapSequential(cw -> cw.writeContentBlock(url, dataBlock)) - .then(Mono.just(dataBlock)) - ) - ) - .collectList() + .switchIfEmpty(Mono + .fromCallable(() -> { + logger.debug("Downloading file \"{}\" content", url); + List> contentRequestsAndCaching = new ArrayList<>(urlsHandlers.size()); + for (URLsHandler urlsHandler : urlsHandlers) { + contentRequestsAndCaching.add(urlsHandler + .requestContent(url) + .flatMapSequential(dataBlock -> { + List> cacheWriteActions = new ArrayList<>(cacheWrite.size()); + for (URLsWriter urlsWriter : cacheWrite) { + cacheWriteActions.add(urlsWriter.writeContentBlock(url, dataBlock)); + } + return Mono.when(cacheWriteActions).thenReturn(dataBlock); + }) + ); + } + return contentRequestsAndCaching; + }) .flatMapMany(FileSpongeUtils::firstWithValueFlux) .doOnComplete(() -> logger.debug("Downloaded file \"{}\" content", url)) ) @@ -106,28 +118,38 @@ public class FileSponge implements URLsHandler { @Override public Mono requestMetadata(URL url) { - return Flux - .defer(() -> Flux.fromIterable(cacheAccess)) - .map(urlsHandler -> urlsHandler.requestMetadata(url)) - .collectList() + return Mono + .fromCallable(() -> { + List> metadataRequests = new ArrayList<>(cacheAccess.size()); + for (URLsDiskHandler urlsHandler : cacheAccess) { + metadataRequests.add(urlsHandler.requestMetadata(url)); + } + return metadataRequests; + }) .flatMap(FileSpongeUtils::firstWithValueMono) .doOnSuccess(metadata -> { if (metadata != null) { logger.debug("File \"{}\" metadata has been found in the cache", url); } }) - .switchIfEmpty(Flux - .defer(() -> Flux.fromIterable(urlsHandlers)) - .doOnSubscribe(s -> logger.debug("Downloading file \"{}\" metadata", url)) - .map(urlsHandler -> urlsHandler - .requestMetadata(url) - .flatMap(dataBlock -> Flux - .defer(() -> Flux.fromIterable(cacheWrite)) - .flatMapSequential(cw -> cw.writeMetadata(url, dataBlock)) - .then(Mono.just(dataBlock)) - ) - ) - .collectList() + .switchIfEmpty(Mono + .fromCallable(() -> { + logger.debug("Downloading file \"{}\" metadata", url); + List> metadataRequestsAndCaching = new ArrayList<>(urlsHandlers.size()); + for (URLsHandler urlsHandler : urlsHandlers) { + metadataRequestsAndCaching.add(urlsHandler + .requestMetadata(url) + .flatMap(meta -> { + List> cacheWriteActions = new ArrayList<>(cacheWrite.size()); + for (URLsWriter urlsWriter : cacheWrite) { + cacheWriteActions.add(urlsWriter.writeMetadata(url, meta)); + } + return Mono.when(cacheWriteActions).thenReturn(meta); + }) + ); + } + return metadataRequestsAndCaching; + }) .flatMap(FileSpongeUtils::firstWithValueMono) .doOnSuccess(s -> { if (s != null) {