Cached url handlers

This commit is contained in:
Andrea Cavalli 2023-08-22 12:41:24 +02:00
parent bf8d6ee38f
commit b0b22a1358
8 changed files with 73 additions and 27 deletions

View File

@ -23,7 +23,7 @@
<groupId>org.warp.filesponge</groupId>
<artifactId>FileSponge</artifactId>
<version>0.2.0</version>
<version>0.3.1</version>
<name>FileSponge</name>

View File

@ -22,7 +22,6 @@ import it.cavallium.dbengine.client.IBackuppable;
import it.cavallium.dbengine.database.ColumnUtils;
import it.cavallium.dbengine.database.LLDatabaseConnection;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.rpc.current.data.DatabaseOptions;
@ -33,9 +32,9 @@ import reactor.util.function.Tuple2;
public interface DiskCache extends URLsDiskHandler, URLsWriter, SafeCloseable, IBackuppable {
void writeMetadataSync(URL url, Metadata metadata);
void writeMetadataSync(URL url, Metadata metadata, boolean force);
void writeContentBlockSync(URL url, DataBlock dataBlock);
void writeContentBlockSync(URL url, DataBlock dataBlock, boolean force);
Stream<DataBlock> requestContentSync(URL url);

View File

@ -24,7 +24,6 @@ import static org.warp.filesponge.FileSponge.BLOCK_SIZE;
import it.cavallium.buffer.Buf;
import it.cavallium.buffer.BufDataInput;
import it.cavallium.buffer.BufDataOutput;
import it.cavallium.dbengine.client.IBackuppable;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLKeyValueDatabase;
@ -64,12 +63,12 @@ class DiskCacheImpl implements DiskCache {
}
@Override
public Mono<Void> writeMetadata(URL url, Metadata metadata) {
return Mono.<Void>fromRunnable(() -> writeMetadataSync(url, metadata)).subscribeOn(Schedulers.boundedElastic());
public Mono<Void> writeMetadata(URL url, Metadata metadata, boolean force) {
return Mono.<Void>fromRunnable(() -> writeMetadataSync(url, metadata, force)).subscribeOn(Schedulers.boundedElastic());
}
@Override
public void writeMetadataSync(URL url, Metadata metadata) {
public void writeMetadataSync(URL url, Metadata metadata, boolean force) {
// Check if this cache should cache the url, otherwise do nothing
if (!shouldCache.test(url)) return;
@ -122,14 +121,14 @@ class DiskCacheImpl implements DiskCache {
}
@Override
public Mono<Void> writeContentBlock(URL url, DataBlock dataBlock) {
public Mono<Void> writeContentBlock(URL url, DataBlock dataBlock, boolean force) {
return Mono
.<Void>fromRunnable(() -> writeContentBlockSync(url, dataBlock))
.<Void>fromRunnable(() -> writeContentBlockSync(url, dataBlock, force))
.subscribeOn(Schedulers.boundedElastic());
}
@Override
public void writeContentBlockSync(URL url, DataBlock dataBlock) {
public void writeContentBlockSync(URL url, DataBlock dataBlock, boolean force) {
// Check if this cache should cache the url, otherwise do nothing
if (!shouldCache.test(url)) {
return;

View File

@ -22,8 +22,6 @@ import it.cavallium.dbengine.database.LLUtils;
import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -31,7 +29,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class FileSponge implements URLsHandler {
public class FileSponge implements URLsHandlerCached {
private static final Logger logger = LogManager.getLogger(FileSponge.class);
@ -104,7 +102,7 @@ public class FileSponge implements URLsHandler {
var cw = this.cacheWrite;
List<Mono<Void>> cacheWriteActions = new ArrayList<>(cw.size());
for (URLsWriter urlsWriter : cw) {
cacheWriteActions.add(urlsWriter.writeContentBlock(url, dataBlock));
cacheWriteActions.add(urlsWriter.writeContentBlock(url, dataBlock, false));
}
return Mono.whenDelayError(cacheWriteActions).thenReturn(dataBlock);
})
@ -120,8 +118,7 @@ public class FileSponge implements URLsHandler {
.doOnDiscard(DataBlock.class, LLUtils::onDiscard);
}
@Override
public Mono<Metadata> requestMetadata(URL url) {
public Mono<Metadata> requestCachedMetadata(URL url) {
return Mono
.fromCallable(() -> {
var ca = this.cacheAccess;
@ -136,7 +133,12 @@ public class FileSponge implements URLsHandler {
if (metadata != null) {
logger.debug("File \"{}\" metadata has been found in the cache", url);
}
})
});
}
@Override
public Mono<Metadata> requestMetadata(URL url) {
return requestCachedMetadata(url)
.switchIfEmpty(Mono
.fromCallable(() -> {
logger.debug("Downloading file \"{}\" metadata", url);
@ -149,7 +151,7 @@ public class FileSponge implements URLsHandler {
var cw = this.cacheWrite;
List<Mono<Void>> cacheWriteActions = new ArrayList<>(cw.size());
for (URLsWriter urlsWriter : cw) {
cacheWriteActions.add(urlsWriter.writeMetadata(url, meta));
cacheWriteActions.add(urlsWriter.writeMetadata(url, meta, false));
}
return Mono.whenDelayError(cacheWriteActions).thenReturn(meta);
})

View File

@ -22,8 +22,8 @@ import reactor.core.publisher.Mono;
public interface URLWriter {
Mono<Void> writeMetadata(Metadata metadata);
Mono<Void> writeMetadata(Metadata metadata, boolean force);
Mono<Void> writeContentBlock(DataBlock dataBlock);
Mono<Void> writeContentBlock(DataBlock dataBlock, boolean force);
}

View File

@ -27,6 +27,9 @@ public interface URLsHandler {
Flux<DataBlock> requestContent(URL url);
/**
* Get metadata from cached sources, if not found, retrieve it online
*/
Mono<Metadata> requestMetadata(URL url);
default Mono<Tuple2<Metadata, Flux<DataBlock>>> request(URL url) {

View File

@ -0,0 +1,30 @@
/*
* FileSponge
* Copyright (C) 2023 Andrea Cavalli
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.warp.filesponge;
import reactor.core.publisher.Mono;
public interface URLsHandlerCached extends URLsHandler {
/**
* Get metadata only from cached sources
*/
Mono<Metadata> requestCachedMetadata(URL url);
}

View File

@ -22,20 +22,33 @@ import reactor.core.publisher.Mono;
public interface URLsWriter {
Mono<Void> writeMetadata(URL url, Metadata metadata);
/**
* @param force true to force writing onto a cache, ignoring the shouldCache predicate
*/
Mono<Void> writeMetadata(URL url, Metadata metadata, boolean force);
Mono<Void> writeContentBlock(URL url, DataBlock dataBlock);
/**
* @param force true to force writing onto a cache, ignoring the shouldCache predicate
*/
Mono<Void> writeContentBlock(URL url, DataBlock dataBlock, boolean force);
default URLWriter getUrlWriter(URL url) {
return new URLWriter() {
/**
* @param force true to force writing onto a cache, ignoring the shouldCache predicate
*/
@Override
public Mono<Void> writeMetadata(Metadata metadata) {
return URLsWriter.this.writeMetadata(url, metadata);
public Mono<Void> writeMetadata(Metadata metadata, boolean force) {
return URLsWriter.this.writeMetadata(url, metadata, force);
}
/**
* @param force true to force writing onto a cache, ignoring the shouldCache predicate
*/
@Override
public Mono<Void> writeContentBlock(DataBlock dataBlock) {
return URLsWriter.this.writeContentBlock(url, dataBlock);
public Mono<Void> writeContentBlock(DataBlock dataBlock, boolean force) {
return URLsWriter.this.writeContentBlock(url, dataBlock, force);
}
};
}