diff --git a/src/main/java/org/warp/filesponge/DiskCache.java b/src/main/java/org/warp/filesponge/DiskCache.java index d492b33..af1d219 100644 --- a/src/main/java/org/warp/filesponge/DiskCache.java +++ b/src/main/java/org/warp/filesponge/DiskCache.java @@ -33,6 +33,7 @@ import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.client.DatabaseOptions; import it.unimi.dsi.fastutil.booleans.BooleanArrayList; import java.util.List; +import java.util.Objects; import org.jetbrains.annotations.Nullable; import org.warp.filesponge.DiskMetadata.DiskMetadataSerializer; import reactor.core.publisher.Flux; @@ -75,84 +76,70 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { @Override public Mono writeMetadata(URL url, Metadata metadata) { - return Mono - .using( - () -> url.getSerializer(db.getAllocator()).serialize(url), - key -> fileMetadata - .update(key.retain(), oldValue -> { - if (oldValue != null) { - return oldValue; - } else { - return diskMetadataSerializer.serialize(new DiskMetadata( - metadata.size(), - BooleanArrayList.wrap(new boolean[DiskMetadata.getBlocksCount(metadata.size(), BLOCK_SIZE)]) - )); - } - }, UpdateReturnMode.NOTHING), - ReferenceCounted::release - ) + Mono keyMono = Mono.fromCallable(() -> url.getSerializer(db.getAllocator()).serialize(url)); + return fileMetadata + .update(keyMono, oldValue -> Objects.requireNonNullElseGet(oldValue, + () -> diskMetadataSerializer.serialize(new DiskMetadata(metadata.size(), + BooleanArrayList.wrap(new boolean[DiskMetadata.getBlocksCount(metadata.size(), BLOCK_SIZE)]) + )) + ), UpdateReturnMode.NOTHING) .then(); } @Override public Mono writeContentBlock(URL url, DataBlock dataBlock) { + Mono urlKeyMono = Mono.fromCallable(() -> url.getSerializer(db.getAllocator()).serialize(url)); + Mono blockKeyMono = Mono.fromCallable(() -> getBlockKey(url, dataBlock.getId())); return Mono .fromCallable(dataBlock::getData) .subscribeOn(Schedulers.boundedElastic()) - .flatMap(bytes -> Mono - .using( - () -> getBlockKey(url, dataBlock.getId()), - key -> fileContent - .put(key.retain(), bytes, LLDictionaryResultType.VOID), - ReferenceCounted::release - ) - .doOnNext(ReferenceCounted::release) - .then() - ) - .then(Mono - .using( - () -> url.getSerializer(db.getAllocator()).serialize(url), - key -> fileMetadata.update(key.retain(), prevBytes -> { - @Nullable DiskMetadata result; - if (prevBytes != null) { - DiskMetadata prevMeta = diskMetadataSerializer.deserialize(prevBytes); - if (!prevMeta.isDownloadedBlock(dataBlock.getId())) { - BooleanArrayList bal = prevMeta.downloadedBlocks().clone(); - if (prevMeta.size() == -1) { - if (bal.size() > dataBlock.getId()) { - bal.set(dataBlock.getId(), true); - } else if (bal.size() == dataBlock.getId()) { - bal.add(true); - } else { - throw new IndexOutOfBoundsException( - "Trying to write a block too much far from the last block. Previous total blocks: " - + bal.size() + " Current block id: " + dataBlock.getId()); - } - } else { - bal.set(dataBlock.getId(), true); - } - result = new DiskMetadata(prevMeta.size(), bal); + .flatMap(bytes -> { + Mono bytesMono = Mono.just(bytes).map(ByteBuf::retain); + return fileContent + .put(blockKeyMono, bytesMono, LLDictionaryResultType.VOID) + .doOnNext(ReferenceCounted::release) + .then(); + }) + .then(fileMetadata.update(urlKeyMono, prevBytes -> { + @Nullable DiskMetadata result; + if (prevBytes != null) { + DiskMetadata prevMeta = diskMetadataSerializer.deserialize(prevBytes); + if (!prevMeta.isDownloadedBlock(dataBlock.getId())) { + BooleanArrayList bal = prevMeta.downloadedBlocks().clone(); + if (prevMeta.size() == -1) { + if (bal.size() > dataBlock.getId()) { + bal.set(dataBlock.getId(), true); + } else if (bal.size() == dataBlock.getId()) { + bal.add(true); } else { - result = prevMeta; + throw new IndexOutOfBoundsException( + "Trying to write a block too much far from the last block. Previous total blocks: " + + bal.size() + " Current block id: " + dataBlock.getId()); } } else { - result = null; + bal.set(dataBlock.getId(), true); } - if (result != null) { - return diskMetadataSerializer.serialize(result); - } else { - return null; - } - }, UpdateReturnMode.NOTHING), - ReferenceCounted::release - ) + result = new DiskMetadata(prevMeta.size(), bal); + } else { + result = prevMeta; + } + } else { + result = null; + } + if (result != null) { + return diskMetadataSerializer.serialize(result); + } else { + return null; + } + }, UpdateReturnMode.NOTHING) ) .then(); } @Override public Flux requestContent(URL url) { - return requestDiskMetadata(url) + return this + .requestDiskMetadata(url) .filter(DiskMetadata::isDownloadedFully) .flatMapMany(meta -> Flux.fromStream(meta.downloadedBlocks()::stream) .index() @@ -162,14 +149,11 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { int blockId = Math.toIntExact(blockMeta.getT1()); boolean downloaded = blockMeta.getT2(); if (!downloaded) { - return Mono.empty(); + return Mono.empty(); } - return Mono - .using( - () -> getBlockKey(url, blockId), - key -> fileContent.get(null, key.retain()), - ReferenceCounted::release - ) + Mono blockKeyMono = Mono.fromCallable(() -> getBlockKey(url, blockId)); + return fileContent + .get(null, blockKeyMono) .map(data -> { try { int blockOffset = getBlockOffset(blockId); @@ -189,7 +173,8 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { data.release(); } }); - })); + }) + ); } private ByteBuf getBlockKey(URL url, int blockId) { @@ -205,12 +190,9 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { @Override public Mono requestDiskMetadata(URL url) { - return Mono - .using( - () -> url.getSerializer(db.getAllocator()).serialize(url), - key -> fileMetadata.get(null, key.retain()), - ReferenceCounted::release - ) + Mono urlKeyMono = Mono.fromCallable(() -> url.getSerializer(db.getAllocator()).serialize(url)); + return fileMetadata + .get(null, urlKeyMono) .map(diskMetadataSerializer::deserialize); } @@ -222,10 +204,11 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { @Override public Mono>> request(URL url) { + Mono urlKeyMono = Mono.fromCallable(() -> url.getSerializer(db.getAllocator()).serialize(url)); return Mono .using( () -> url.getSerializer(db.getAllocator()).serialize(url), - key -> fileMetadata.get(null, key.retain()), + key -> fileMetadata.get(null, urlKeyMono), ReferenceCounted::release ) .map(diskMetadataSerializer::deserialize)