diff --git a/src/main/java/org/warp/filesponge/DataBlock.java b/src/main/java/org/warp/filesponge/DataBlock.java index 6638d6e..dfd50f0 100644 --- a/src/main/java/org/warp/filesponge/DataBlock.java +++ b/src/main/java/org/warp/filesponge/DataBlock.java @@ -71,9 +71,9 @@ public final class DataBlock extends ResourceSupport { private final int length; private final Buffer data; - public Send getData() { + public Buffer getDataCopy() { assert data.isAccessible(); - return data.copy().send(); + return data.copy(); } public Buffer getDataUnsafe() { @@ -106,8 +106,8 @@ public final class DataBlock extends ResourceSupport { if (this.getLength() != other.getLength()) { return false; } - final Object this$data = this.getData(); - final Object other$data = other.getData(); + final Object this$data = this.getDataUnsafe(); + final Object other$data = other.getDataUnsafe(); if (!Objects.equals(this$data, other$data)) { return false; } @@ -119,13 +119,13 @@ public final class DataBlock extends ResourceSupport { int result = 1; result = result * PRIME + this.getOffset(); result = result * PRIME + this.getLength(); - final Object $data = this.getData(); + final Object $data = this.getDataUnsafe(); result = result * PRIME + ($data == null ? 43 : $data.hashCode()); return result; } public String toString() { - return "DataBlock(offset=" + this.getOffset() + ", length=" + this.getLength() + ", data=" + this.getData() + ")"; + return "DataBlock(offset=" + this.getOffset() + ", length=" + this.getLength() + ", data=" + this.getDataUnsafe() + ")"; } @Override diff --git a/src/main/java/org/warp/filesponge/DiskCache.java b/src/main/java/org/warp/filesponge/DiskCache.java index 64e8a6f..e9b08f1 100644 --- a/src/main/java/org/warp/filesponge/DiskCache.java +++ b/src/main/java/org/warp/filesponge/DiskCache.java @@ -21,7 +21,9 @@ package org.warp.filesponge; import static org.warp.filesponge.FileSponge.BLOCK_SIZE; import io.netty5.buffer.api.Buffer; +import io.netty5.buffer.api.Resource; import io.netty5.buffer.api.Send; +import it.cavallium.dbengine.database.BufSupplier; import it.cavallium.dbengine.database.ColumnUtils; import it.cavallium.dbengine.database.LLDatabaseConnection; import it.cavallium.dbengine.database.LLDictionary; @@ -87,45 +89,53 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { // Check if this cache should cache the url, otherwise do nothing if (!shouldCache.test(url)) return Mono.empty(); - Mono> keyMono = Mono.fromCallable(() -> serializeUrl(url)); + Mono keyMono = Mono.fromCallable(() -> serializeUrl(url)); return fileMetadata .update(keyMono, oldValue -> Objects.requireNonNullElseGet(oldValue, () -> serializeMetadata(new DiskMetadata(metadata.size(), BooleanArrayList.wrap(new boolean[DiskMetadata.getBlocksCount(metadata.size(), BLOCK_SIZE)]) - )).receive() + )) ), UpdateReturnMode.NOTHING ) .then(); } - private Send serializeUrl(T url) { + private Buffer serializeUrl(T url) { @SuppressWarnings("unchecked") URLSerializer urlSerializer = (URLSerializer) url.getSerializer(); int sizeHint = urlSerializer.getSerializedSizeHint(); if (sizeHint == -1) sizeHint = 64; - try (var buffer = db.getAllocator().allocate(sizeHint)) { + var buffer = db.getAllocator().allocate(sizeHint); + try { try { urlSerializer.serialize(url, buffer); } catch (SerializationException ex) { throw new IllegalStateException("Failed to serialize url", ex); } - return buffer.send(); + return buffer; + } catch (Throwable ex) { + buffer.close(); + throw ex; } } - private Send serializeMetadata(DiskMetadata diskMetadata) { + private Buffer serializeMetadata(DiskMetadata diskMetadata) { int sizeHint = diskMetadataSerializer.getSerializedSizeHint(); if (sizeHint == -1) sizeHint = 64; - try (var buffer = db.getAllocator().allocate(sizeHint)) { + var buffer = db.getAllocator().allocate(sizeHint); + try { try { diskMetadataSerializer.serialize(diskMetadata, buffer); } catch (SerializationException ex) { throw new IllegalStateException("Failed to serialize metadata", ex); } - return buffer.send(); + return buffer; + } catch (Throwable ex) { + buffer.close(); + throw ex; } } @@ -142,19 +152,16 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { // Check if this cache should cache the url, otherwise do nothing if (!shouldCache.test(url)) return Mono.empty(); - Mono> urlKeyMono = Mono.fromCallable(() -> serializeUrl(url)); - Mono> blockKeyMono = Mono.fromCallable(() -> getBlockKey(url, dataBlock.getId())); - return Mono - .fromCallable(dataBlock::getData) - .subscribeOn(Schedulers.boundedElastic()) - .flatMap(bytes_ -> Mono.using( - () -> bytes_, - bytes -> fileContent - .put(blockKeyMono, Mono.just(bytes), LLDictionaryResultType.VOID) - .doOnNext(Send::close) + Mono urlKeyMono = Mono.fromCallable(() -> serializeUrl(url)); + Mono blockKeyMono = Mono.fromCallable(() -> getBlockKey(url, dataBlock.getId())); + return Mono.using( + () -> BufSupplier.of(dataBlock::getDataCopy), + bufSupplier -> fileContent + .put(blockKeyMono, Mono.fromSupplier(bufSupplier::get), LLDictionaryResultType.VOID) + .doOnNext(Resource::close) .then(), - Send::close - )) + BufSupplier::close + ) .then(fileMetadata.update(urlKeyMono, prevBytes -> { @Nullable DiskMetadata result; if (prevBytes != null) { @@ -182,7 +189,7 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { result = null; } if (result != null) { - return serializeMetadata(result).receive(); + return serializeMetadata(result); } else { return null; } @@ -210,7 +217,7 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { return fileContent .get(null, blockKeyMono) .map(dataToReceive -> { - try (var data = dataToReceive.receive()) { + try (var data = dataToReceive) { int blockOffset = getBlockOffset(blockId); int blockLength = data.readableBytes(); if (meta.size() != -1) { @@ -230,11 +237,11 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { ); } - private Send getBlockKey(URL url, int blockId) { - try (var urlBytes = serializeUrl(url).receive()) { + private Buffer getBlockKey(URL url, int blockId) { + try (var urlBytes = serializeUrl(url)) { Buffer blockIdBytes = this.db.getAllocator().allocate(Integer.BYTES); blockIdBytes.writeInt(blockId); - return LLUtils.compositeBuffer(db.getAllocator(), urlBytes.send(), blockIdBytes.send()).send(); + return LLUtils.compositeBuffer(db.getAllocator(), urlBytes.send(), blockIdBytes.send()); } } @@ -244,11 +251,11 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { @Override public Mono requestDiskMetadata(URL url) { - Mono> urlKeyMono = Mono.fromCallable(() -> serializeUrl(url)); + Mono urlKeyMono = Mono.fromCallable(() -> serializeUrl(url)); return fileMetadata .get(null, urlKeyMono) .map(prevBytesSend -> { - try (var prevBytes = prevBytesSend.receive()) { + try (var prevBytes = prevBytesSend) { return deserializeMetadata(prevBytes); } }); @@ -262,17 +269,17 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { @Override public Mono>> request(URL url) { - Mono> urlKeyMono = Mono.fromCallable(() -> serializeUrl(url)); + Mono urlKeyMono = Mono.fromCallable(() -> serializeUrl(url)); return Mono .using( () -> serializeUrl(url), key -> fileMetadata.get(null, urlKeyMono), - Send::close + Resource::close ) .map(serialized -> { DiskMetadata diskMeta; - try (var serializedBuf = serialized.receive()) { - diskMeta = deserializeMetadata(serializedBuf); + try (serialized) { + diskMeta = deserializeMetadata(serialized); } var meta = diskMeta.asMetadata(); if (diskMeta.isDownloadedFully()) {