From b0b22a1358537bca111b5deec6010c7d23b39c7e Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Tue, 22 Aug 2023 12:41:24 +0200 Subject: [PATCH] Cached url handlers --- pom.xml | 2 +- .../java/org/warp/filesponge/DiskCache.java | 5 ++-- .../org/warp/filesponge/DiskCacheImpl.java | 13 ++++---- .../java/org/warp/filesponge/FileSponge.java | 18 ++++++----- .../java/org/warp/filesponge/URLWriter.java | 4 +-- .../java/org/warp/filesponge/URLsHandler.java | 3 ++ .../warp/filesponge/URLsHandlerCached.java | 30 +++++++++++++++++++ .../java/org/warp/filesponge/URLsWriter.java | 25 ++++++++++++---- 8 files changed, 73 insertions(+), 27 deletions(-) create mode 100644 src/main/java/org/warp/filesponge/URLsHandlerCached.java diff --git a/pom.xml b/pom.xml index 64d4604..3e89d1b 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ org.warp.filesponge FileSponge - 0.2.0 + 0.3.1 FileSponge diff --git a/src/main/java/org/warp/filesponge/DiskCache.java b/src/main/java/org/warp/filesponge/DiskCache.java index 4bd734b..e769348 100644 --- a/src/main/java/org/warp/filesponge/DiskCache.java +++ b/src/main/java/org/warp/filesponge/DiskCache.java @@ -22,7 +22,6 @@ import it.cavallium.dbengine.client.IBackuppable; import it.cavallium.dbengine.database.ColumnUtils; import it.cavallium.dbengine.database.LLDatabaseConnection; import it.cavallium.dbengine.database.LLDictionary; -import it.cavallium.dbengine.database.LLKeyValueDatabase; import it.cavallium.dbengine.database.SafeCloseable; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.rpc.current.data.DatabaseOptions; @@ -33,9 +32,9 @@ import reactor.util.function.Tuple2; public interface DiskCache extends URLsDiskHandler, URLsWriter, SafeCloseable, IBackuppable { - void writeMetadataSync(URL url, Metadata metadata); + void writeMetadataSync(URL url, Metadata metadata, boolean force); - void writeContentBlockSync(URL url, DataBlock dataBlock); + void writeContentBlockSync(URL url, DataBlock dataBlock, boolean force); Stream requestContentSync(URL url); diff --git a/src/main/java/org/warp/filesponge/DiskCacheImpl.java b/src/main/java/org/warp/filesponge/DiskCacheImpl.java index 8200159..1555678 100644 --- a/src/main/java/org/warp/filesponge/DiskCacheImpl.java +++ b/src/main/java/org/warp/filesponge/DiskCacheImpl.java @@ -24,7 +24,6 @@ import static org.warp.filesponge.FileSponge.BLOCK_SIZE; import it.cavallium.buffer.Buf; import it.cavallium.buffer.BufDataInput; import it.cavallium.buffer.BufDataOutput; -import it.cavallium.dbengine.client.IBackuppable; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLKeyValueDatabase; @@ -64,12 +63,12 @@ class DiskCacheImpl implements DiskCache { } @Override - public Mono writeMetadata(URL url, Metadata metadata) { - return Mono.fromRunnable(() -> writeMetadataSync(url, metadata)).subscribeOn(Schedulers.boundedElastic()); + public Mono writeMetadata(URL url, Metadata metadata, boolean force) { + return Mono.fromRunnable(() -> writeMetadataSync(url, metadata, force)).subscribeOn(Schedulers.boundedElastic()); } @Override - public void writeMetadataSync(URL url, Metadata metadata) { + public void writeMetadataSync(URL url, Metadata metadata, boolean force) { // Check if this cache should cache the url, otherwise do nothing if (!shouldCache.test(url)) return; @@ -122,14 +121,14 @@ class DiskCacheImpl implements DiskCache { } @Override - public Mono writeContentBlock(URL url, DataBlock dataBlock) { + public Mono writeContentBlock(URL url, DataBlock dataBlock, boolean force) { return Mono - .fromRunnable(() -> writeContentBlockSync(url, dataBlock)) + .fromRunnable(() -> writeContentBlockSync(url, dataBlock, force)) .subscribeOn(Schedulers.boundedElastic()); } @Override - public void writeContentBlockSync(URL url, DataBlock dataBlock) { + public void writeContentBlockSync(URL url, DataBlock dataBlock, boolean force) { // Check if this cache should cache the url, otherwise do nothing if (!shouldCache.test(url)) { return; diff --git a/src/main/java/org/warp/filesponge/FileSponge.java b/src/main/java/org/warp/filesponge/FileSponge.java index d7b3b23..c25df4e 100644 --- a/src/main/java/org/warp/filesponge/FileSponge.java +++ b/src/main/java/org/warp/filesponge/FileSponge.java @@ -22,8 +22,6 @@ import it.cavallium.dbengine.database.LLUtils; import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet; import java.util.ArrayList; import java.util.List; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -31,7 +29,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; -public class FileSponge implements URLsHandler { +public class FileSponge implements URLsHandlerCached { private static final Logger logger = LogManager.getLogger(FileSponge.class); @@ -104,7 +102,7 @@ public class FileSponge implements URLsHandler { var cw = this.cacheWrite; List> cacheWriteActions = new ArrayList<>(cw.size()); for (URLsWriter urlsWriter : cw) { - cacheWriteActions.add(urlsWriter.writeContentBlock(url, dataBlock)); + cacheWriteActions.add(urlsWriter.writeContentBlock(url, dataBlock, false)); } return Mono.whenDelayError(cacheWriteActions).thenReturn(dataBlock); }) @@ -120,8 +118,7 @@ public class FileSponge implements URLsHandler { .doOnDiscard(DataBlock.class, LLUtils::onDiscard); } - @Override - public Mono requestMetadata(URL url) { + public Mono requestCachedMetadata(URL url) { return Mono .fromCallable(() -> { var ca = this.cacheAccess; @@ -136,7 +133,12 @@ public class FileSponge implements URLsHandler { if (metadata != null) { logger.debug("File \"{}\" metadata has been found in the cache", url); } - }) + }); + } + + @Override + public Mono requestMetadata(URL url) { + return requestCachedMetadata(url) .switchIfEmpty(Mono .fromCallable(() -> { logger.debug("Downloading file \"{}\" metadata", url); @@ -149,7 +151,7 @@ public class FileSponge implements URLsHandler { var cw = this.cacheWrite; List> cacheWriteActions = new ArrayList<>(cw.size()); for (URLsWriter urlsWriter : cw) { - cacheWriteActions.add(urlsWriter.writeMetadata(url, meta)); + cacheWriteActions.add(urlsWriter.writeMetadata(url, meta, false)); } return Mono.whenDelayError(cacheWriteActions).thenReturn(meta); }) diff --git a/src/main/java/org/warp/filesponge/URLWriter.java b/src/main/java/org/warp/filesponge/URLWriter.java index eeeb1b9..52d4984 100644 --- a/src/main/java/org/warp/filesponge/URLWriter.java +++ b/src/main/java/org/warp/filesponge/URLWriter.java @@ -22,8 +22,8 @@ import reactor.core.publisher.Mono; public interface URLWriter { - Mono writeMetadata(Metadata metadata); + Mono writeMetadata(Metadata metadata, boolean force); - Mono writeContentBlock(DataBlock dataBlock); + Mono writeContentBlock(DataBlock dataBlock, boolean force); } diff --git a/src/main/java/org/warp/filesponge/URLsHandler.java b/src/main/java/org/warp/filesponge/URLsHandler.java index c60916a..f543360 100644 --- a/src/main/java/org/warp/filesponge/URLsHandler.java +++ b/src/main/java/org/warp/filesponge/URLsHandler.java @@ -27,6 +27,9 @@ public interface URLsHandler { Flux requestContent(URL url); + /** + * Get metadata from cached sources, if not found, retrieve it online + */ Mono requestMetadata(URL url); default Mono>> request(URL url) { diff --git a/src/main/java/org/warp/filesponge/URLsHandlerCached.java b/src/main/java/org/warp/filesponge/URLsHandlerCached.java new file mode 100644 index 0000000..e99d3ef --- /dev/null +++ b/src/main/java/org/warp/filesponge/URLsHandlerCached.java @@ -0,0 +1,30 @@ +/* + * FileSponge + * Copyright (C) 2023 Andrea Cavalli + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package org.warp.filesponge; + +import reactor.core.publisher.Mono; + +public interface URLsHandlerCached extends URLsHandler { + + /** + * Get metadata only from cached sources + */ + Mono requestCachedMetadata(URL url); + +} diff --git a/src/main/java/org/warp/filesponge/URLsWriter.java b/src/main/java/org/warp/filesponge/URLsWriter.java index 3490609..5e6b6e8 100644 --- a/src/main/java/org/warp/filesponge/URLsWriter.java +++ b/src/main/java/org/warp/filesponge/URLsWriter.java @@ -22,20 +22,33 @@ import reactor.core.publisher.Mono; public interface URLsWriter { - Mono writeMetadata(URL url, Metadata metadata); + /** + * @param force true to force writing onto a cache, ignoring the shouldCache predicate + */ + Mono writeMetadata(URL url, Metadata metadata, boolean force); - Mono writeContentBlock(URL url, DataBlock dataBlock); + /** + * @param force true to force writing onto a cache, ignoring the shouldCache predicate + */ + Mono writeContentBlock(URL url, DataBlock dataBlock, boolean force); default URLWriter getUrlWriter(URL url) { return new URLWriter() { + + /** + * @param force true to force writing onto a cache, ignoring the shouldCache predicate + */ @Override - public Mono writeMetadata(Metadata metadata) { - return URLsWriter.this.writeMetadata(url, metadata); + public Mono writeMetadata(Metadata metadata, boolean force) { + return URLsWriter.this.writeMetadata(url, metadata, force); } + /** + * @param force true to force writing onto a cache, ignoring the shouldCache predicate + */ @Override - public Mono writeContentBlock(DataBlock dataBlock) { - return URLsWriter.this.writeContentBlock(url, dataBlock); + public Mono writeContentBlock(DataBlock dataBlock, boolean force) { + return URLsWriter.this.writeContentBlock(url, dataBlock, force); } }; }