From 3bf2b968929d3ce99806fb706680ea54bcfd52c7 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Wed, 5 May 2021 17:31:50 +0200 Subject: [PATCH] Manage discards --- .../lombok/org/warp/filesponge/DataBlock.java | 77 ++++++++++++++++--- .../lombok/org/warp/filesponge/DiskCache.java | 39 +++++----- .../org/warp/filesponge/FileSponge.java | 11 ++- 3 files changed, 94 insertions(+), 33 deletions(-) diff --git a/src/main/lombok/org/warp/filesponge/DataBlock.java b/src/main/lombok/org/warp/filesponge/DataBlock.java index c3fe0fa..1cf82c5 100644 --- a/src/main/lombok/org/warp/filesponge/DataBlock.java +++ b/src/main/lombok/org/warp/filesponge/DataBlock.java @@ -19,29 +19,82 @@ package org.warp.filesponge; import io.netty.buffer.ByteBuf; -import java.nio.ByteBuffer; -import lombok.Value; +import java.util.Objects; -@Value -public class DataBlock { +public final class DataBlock { public DataBlock(int offset, int length, ByteBuf data) { - this.offset = offset; - assert data.isDirect(); - this.length = length; - this.data = data; + try { + this.offset = offset; + this.length = length; + this.data = data.retain(); + } finally { + data.release(); + } } - int offset; - int length; - ByteBuf data; + private final int offset; + private final int length; + private final ByteBuf data; public ByteBuf getData() { assert data.isReadable(); - return data; + return data.retain(); } public int getId() { return offset / FileSponge.BLOCK_SIZE; } + + public int getOffset() { + return this.offset; + } + + public int getLength() { + return this.length; + } + + public boolean equals(final Object o) { + if (o == this) { + return true; + } + if (!(o instanceof DataBlock)) { + return false; + } + final DataBlock other = (DataBlock) o; + if (this.getOffset() != other.getOffset()) { + return false; + } + if (this.getLength() != other.getLength()) { + return false; + } + final Object this$data = this.getData(); + final Object other$data = other.getData(); + if (!Objects.equals(this$data, other$data)) { + return false; + } + return true; + } + + public int hashCode() { + final int PRIME = 59; + int result = 1; + result = result * PRIME + this.getOffset(); + result = result * PRIME + this.getLength(); + final Object $data = this.getData(); + result = result * PRIME + ($data == null ? 43 : $data.hashCode()); + return result; + } + + public String toString() { + return "DataBlock(offset=" + this.getOffset() + ", length=" + this.getLength() + ", data=" + this.getData() + ")"; + } + + public void retain() { + this.data.retain(); + } + + public void release() { + this.data.release(); + } } diff --git a/src/main/lombok/org/warp/filesponge/DiskCache.java b/src/main/lombok/org/warp/filesponge/DiskCache.java index 4cacd83..4176a9b 100644 --- a/src/main/lombok/org/warp/filesponge/DiskCache.java +++ b/src/main/lombok/org/warp/filesponge/DiskCache.java @@ -92,18 +92,13 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { @Override public Mono writeContentBlock(URL url, DataBlock dataBlock) { return Mono - .fromCallable(() -> { - return dataBlock.getData(); - /* - ByteBuf bytes = PooledByteBufAllocator.DEFAULT.directBuffer(dataBlock.getLength()); - bytes.writeBytes(dataBlock.getData().slice()); - return bytes; - - */ - }) + .fromCallable(dataBlock::getData) .subscribeOn(Schedulers.boundedElastic()) - .flatMap(bytes -> fileContent.put(getBlockKey(url, dataBlock.getId()), bytes, LLDictionaryResultType.VOID)) - .doOnNext(ReferenceCounted::release) + .flatMap(bytes -> fileContent + .put(getBlockKey(url, dataBlock.getId()), bytes, LLDictionaryResultType.VOID) + .doOnNext(ReferenceCounted::release) + .then() + ) .then(fileMetadata.update(url.getSerializer(db.getAllocator()).serialize(url), prevBytes -> { @Nullable DiskMetadata result; if (prevBytes != null) { @@ -142,17 +137,21 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { return Mono.empty(); } return fileContent.get(null, getBlockKey(url, blockId)).map(data -> { - int blockOffset = getBlockOffset(blockId); - int blockLength = data.readableBytes(); - if (blockOffset + blockLength >= meta.getSize()) { - if (blockOffset + blockLength > meta.getSize()) { - throw new IllegalStateException("Overflowed data size"); + try { + int blockOffset = getBlockOffset(blockId); + int blockLength = data.readableBytes(); + if (blockOffset + blockLength >= meta.getSize()) { + if (blockOffset + blockLength > meta.getSize()) { + throw new IllegalStateException("Overflowed data size"); + } + } else { + // Intermediate blocks must be of max size + assert data.readableBytes() == BLOCK_SIZE; } - } else { - // Intermediate blocks must be of max size - assert data.readableBytes() == BLOCK_SIZE; + return new DataBlock(blockOffset, blockLength, data.retain()); + } finally { + data.release(); } - return new DataBlock(blockOffset, blockLength, data); }); })); } diff --git a/src/main/lombok/org/warp/filesponge/FileSponge.java b/src/main/lombok/org/warp/filesponge/FileSponge.java index 62c9cff..2daf454 100644 --- a/src/main/lombok/org/warp/filesponge/FileSponge.java +++ b/src/main/lombok/org/warp/filesponge/FileSponge.java @@ -18,6 +18,7 @@ package org.warp.filesponge; +import it.cavallium.dbengine.database.disk.LLLocalDictionary.ReleasableSlice; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -25,6 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; public class FileSponge implements URLsHandler { @@ -59,6 +61,7 @@ public class FileSponge implements URLsHandler { .fromIterable(cacheAccess) .map(urlsHandler -> urlsHandler.requestContent(url)) .collectList() + .doOnDiscard(DataBlock.class, DataBlock::release) .flatMapMany(monos -> FileSpongeUtils.firstWithValueFlux(monos)) .doOnNext(dataBlock -> { if (alreadyPrintedDebug.compareAndSet(false, true)) { @@ -77,10 +80,16 @@ public class FileSponge implements URLsHandler { ) ) .collectList() + .doOnDiscard(Flux.class, f -> { + //noinspection unchecked + Flux flux = (Flux) f; + flux.doOnNext(DataBlock::release).subscribeOn(Schedulers.single()).subscribe(); + }) .flatMapMany(monos -> FileSpongeUtils.firstWithValueFlux(monos)) .doOnComplete(() -> logger.debug("Downloaded file \"{}\" content", url)) ) - .distinct(DataBlock::getId); + .distinct(DataBlock::getId) + .doOnDiscard(DataBlock.class, DataBlock::release); } @Override