From bce04a20a4ae6c9fb407ff07ee2632843055ebc1 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sun, 16 May 2021 14:59:12 +0200 Subject: [PATCH] Fix refcounts --- .../lombok/org/warp/filesponge/DiskCache.java | 137 +++++++++++------- 1 file changed, 84 insertions(+), 53 deletions(-) diff --git a/src/main/lombok/org/warp/filesponge/DiskCache.java b/src/main/lombok/org/warp/filesponge/DiskCache.java index 2758592..1bebe4a 100644 --- a/src/main/lombok/org/warp/filesponge/DiskCache.java +++ b/src/main/lombok/org/warp/filesponge/DiskCache.java @@ -32,6 +32,7 @@ import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLKeyValueDatabase; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateReturnMode; +import it.cavallium.dbengine.database.disk.LLLocalDictionary.ReleasableSlice; import it.unimi.dsi.fastutil.booleans.BooleanArrayList; import java.nio.ByteBuffer; import java.util.Arrays; @@ -76,17 +77,22 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { @Override public Mono writeMetadata(URL url, Metadata metadata) { - return fileMetadata - .update(url.getSerializer(db.getAllocator()).serialize(url), oldValue -> { - if (oldValue != null) { - return oldValue; - } else { - return diskMetadataSerializer.serialize(new DiskMetadata( - metadata.getSize(), - BooleanArrayList.wrap(new boolean[DiskMetadata.getBlocksCount(metadata.getSize(), BLOCK_SIZE)]) - )); - } - }, UpdateReturnMode.NOTHING) + return Mono + .using( + () -> url.getSerializer(db.getAllocator()).serialize(url), + key -> fileMetadata + .update(key.retain(), oldValue -> { + if (oldValue != null) { + return oldValue; + } else { + return diskMetadataSerializer.serialize(new DiskMetadata( + metadata.getSize(), + BooleanArrayList.wrap(new boolean[DiskMetadata.getBlocksCount(metadata.getSize(), BLOCK_SIZE)]) + )); + } + }, UpdateReturnMode.NOTHING), + ReferenceCounted::release + ) .then(); } @@ -95,31 +101,42 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { return Mono .fromCallable(dataBlock::getData) .subscribeOn(Schedulers.boundedElastic()) - .flatMap(bytes -> fileContent - .put(getBlockKey(url, dataBlock.getId()), bytes, LLDictionaryResultType.VOID) + .flatMap(bytes -> Mono + .using( + () -> getBlockKey(url, dataBlock.getId()), + key -> fileContent + .put(key.retain(), bytes, LLDictionaryResultType.VOID), + ReferenceCounted::release + ) .doOnNext(ReferenceCounted::release) .then() ) - .then(fileMetadata.update(url.getSerializer(db.getAllocator()).serialize(url), prevBytes -> { - @Nullable DiskMetadata result; - if (prevBytes != null) { - DiskMetadata prevMeta = diskMetadataSerializer.deserialize(prevBytes); - if (!prevMeta.getDownloadedBlocks().getBoolean(dataBlock.getId())) { - BooleanArrayList bal = prevMeta.getDownloadedBlocks().clone(); - bal.set(dataBlock.getId(), true); - result = new DiskMetadata(prevMeta.getSize(), bal); - } else { - result = prevMeta; - } - } else { - result = null; - } - if (result != null) { - return diskMetadataSerializer.serialize(result); - } else { - return null; - } - }, UpdateReturnMode.NOTHING)) + .then(Mono + .using( + () -> url.getSerializer(db.getAllocator()).serialize(url), + key -> fileMetadata.update(key.retain(), prevBytes -> { + @Nullable DiskMetadata result; + if (prevBytes != null) { + DiskMetadata prevMeta = diskMetadataSerializer.deserialize(prevBytes); + if (!prevMeta.getDownloadedBlocks().getBoolean(dataBlock.getId())) { + BooleanArrayList bal = prevMeta.getDownloadedBlocks().clone(); + bal.set(dataBlock.getId(), true); + result = new DiskMetadata(prevMeta.getSize(), bal); + } else { + result = prevMeta; + } + } else { + result = null; + } + if (result != null) { + return diskMetadataSerializer.serialize(result); + } else { + return null; + } + }, UpdateReturnMode.NOTHING), + ReferenceCounted::release + ) + ) .then(); } @@ -137,23 +154,29 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { if (!downloaded) { return Mono.empty(); } - return fileContent.get(null, getBlockKey(url, blockId)).map(data -> { - try { - int blockOffset = getBlockOffset(blockId); - int blockLength = data.readableBytes(); - if (blockOffset + blockLength >= meta.getSize()) { - if (blockOffset + blockLength > meta.getSize()) { - throw new IllegalStateException("Overflowed data size"); + return Mono + .using( + () -> getBlockKey(url, blockId), + key -> fileContent.get(null, key.retain()), + ReferenceCounted::release + ) + .map(data -> { + try { + int blockOffset = getBlockOffset(blockId); + int blockLength = data.readableBytes(); + if (blockOffset + blockLength >= meta.getSize()) { + if (blockOffset + blockLength > meta.getSize()) { + throw new IllegalStateException("Overflowed data size"); + } + } else { + // Intermediate blocks must be of max size + assert data.readableBytes() == BLOCK_SIZE; + } + return new DataBlock(blockOffset, blockLength, data.retain()); + } finally { + data.release(); } - } else { - // Intermediate blocks must be of max size - assert data.readableBytes() == BLOCK_SIZE; - } - return new DataBlock(blockOffset, blockLength, data.retain()); - } finally { - data.release(); - } - }); + }); })); } @@ -170,8 +193,12 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { @Override public Mono requestDiskMetadata(URL url) { - return fileMetadata - .get(null, url.getSerializer(db.getAllocator()).serialize(url)) + return Mono + .using( + () -> url.getSerializer(db.getAllocator()).serialize(url), + key -> fileMetadata.get(null, key.retain()), + ReferenceCounted::release + ) .map(diskMetadataSerializer::deserialize); } @@ -183,8 +210,12 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { @Override public Mono>> request(URL url) { - return fileMetadata - .get(null, url.getSerializer(db.getAllocator()).serialize(url)) + return Mono + .using( + () -> url.getSerializer(db.getAllocator()).serialize(url), + key -> fileMetadata.get(null, key.retain()), + ReferenceCounted::release + ) .map(diskMetadataSerializer::deserialize) .map(diskMeta -> { var meta = diskMeta.asMetadata();