diff --git a/src/main/java/org/warp/filesponge/FileSponge.java b/src/main/java/org/warp/filesponge/FileSponge.java index 79bb219..bc70bd8 100644 --- a/src/main/java/org/warp/filesponge/FileSponge.java +++ b/src/main/java/org/warp/filesponge/FileSponge.java @@ -18,6 +18,7 @@ package org.warp.filesponge; +import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -33,89 +34,101 @@ public class FileSponge implements URLsHandler { public static final int BLOCK_SIZE = 1024 * 1024; // 1 MiB - private final Set urlsHandlers = new ConcurrentHashMap().keySet(new Object()); - - private final Set cacheAccess = new ConcurrentHashMap().keySet(new Object()); - private final Set cacheWrite = new ConcurrentHashMap().keySet(new Object()); + private final Object structuresLock = new Object(); + private volatile ObjectOpenHashSet urlsHandlers = ObjectOpenHashSet.of(); + private volatile ObjectOpenHashSet cacheAccess = ObjectOpenHashSet.of(); + private volatile ObjectOpenHashSet cacheWrite = ObjectOpenHashSet.of(); public FileSponge() { } public Mono registerSource(URLsHandler urLsHandler) { - return Mono.fromRunnable(() -> urlsHandlers.add(urLsHandler)); + return Mono + .fromRunnable(() -> { + synchronized (structuresLock) { + var clone = urlsHandlers.clone(); + clone.add(urLsHandler); + this.urlsHandlers = clone; + } + }) + .subscribeOn(Schedulers.boundedElastic()); } public Mono registerCache(T urlsCache) { - return Mono.fromRunnable(() -> { - cacheAccess.add(urlsCache); - cacheWrite.add(urlsCache); - }); + return Mono + .fromRunnable(() -> { + synchronized (structuresLock) { + var cacheAccessClone = cacheAccess.clone(); + cacheAccessClone.add(urlsCache); + this.cacheAccess = cacheAccessClone; + + var cacheWriteClone = cacheWrite.clone(); + cacheWriteClone.add(urlsCache); + this.cacheWrite = cacheWriteClone; + } + }) + .subscribeOn(Schedulers.boundedElastic()); } @Override public Flux requestContent(URL url) { AtomicBoolean alreadyPrintedDebug = new AtomicBoolean(false); return Flux - .fromStream(cacheAccess::stream) + .defer(() -> Flux.fromIterable(cacheAccess)) .map(urlsHandler -> urlsHandler.requestContent(url)) .collectList() - .flatMapMany(monos -> FileSpongeUtils.firstWithValueFlux(monos)) + .flatMapMany(FileSpongeUtils::firstWithValueFlux) .doOnNext(dataBlock -> { if (alreadyPrintedDebug.compareAndSet(false, true)) { logger.debug("File \"{}\" content has been found in the cache", url); } }) .switchIfEmpty(Flux - .fromStream(urlsHandlers::stream) + .defer(() -> Flux.fromIterable(urlsHandlers)) .doOnSubscribe(s -> logger.debug("Downloading file \"{}\" content", url)) .map(urlsHandler -> urlsHandler .requestContent(url) .flatMapSequential(dataBlock -> Flux - .fromStream(cacheWrite::stream) + .defer(() -> Flux.fromIterable(cacheWrite)) .flatMapSequential(cw -> cw.writeContentBlock(url, dataBlock)) .then(Mono.just(dataBlock)) ) ) .collectList() - .flatMapMany(monos -> FileSpongeUtils.firstWithValueFlux(monos)) + .flatMapMany(FileSpongeUtils::firstWithValueFlux) .doOnComplete(() -> logger.debug("Downloaded file \"{}\" content", url)) ) .distinct(DataBlock::getId) - .doOnDiscard(DataBlock.class, DataBlock::release) - .doOnDiscard(Flux.class, f -> { - //noinspection unchecked - Flux flux = (Flux) f; - flux.doOnNext(DataBlock::release).subscribeOn(Schedulers.single()).subscribe(); - }); + .doOnDiscard(DataBlock.class, DataBlock::release); } @Override public Mono requestMetadata(URL url) { return Flux - .fromStream(cacheAccess::stream) + .defer(() -> Flux.fromIterable(cacheAccess)) .map(urlsHandler -> urlsHandler.requestMetadata(url)) .collectList() - .flatMap(monos -> FileSpongeUtils.firstWithValueMono(monos)) + .flatMap(FileSpongeUtils::firstWithValueMono) .doOnSuccess(metadata -> { if (metadata != null) { logger.debug("File \"{}\" metadata has been found in the cache", url); } }) .switchIfEmpty(Flux - .fromStream(urlsHandlers::stream) + .defer(() -> Flux.fromIterable(urlsHandlers)) .doOnSubscribe(s -> logger.debug("Downloading file \"{}\" metadata", url)) .map(urlsHandler -> urlsHandler .requestMetadata(url) .flatMap(dataBlock -> Flux - .fromStream(cacheWrite::stream) + .defer(() -> Flux.fromIterable(cacheWrite)) .flatMapSequential(cw -> cw.writeMetadata(url, dataBlock)) .then(Mono.just(dataBlock)) ) ) .collectList() - .flatMap(monos -> FileSpongeUtils.firstWithValueMono(monos)) + .flatMap(FileSpongeUtils::firstWithValueMono) .doOnSuccess(s -> { if (s != null) { logger.debug("Downloaded file \"{}\" metadata", url);