diff --git a/pom.xml b/pom.xml index 406fa07..faccc42 100644 --- a/pom.xml +++ b/pom.xml @@ -104,12 +104,12 @@ io.projectreactor reactor-core - 3.4.8 + 3.4.9 io.projectreactor reactor-test - 3.4.8 + 3.4.9 it.cavallium diff --git a/src/main/java/org/warp/filesponge/DataBlock.java b/src/main/java/org/warp/filesponge/DataBlock.java index 1cf82c5..456e228 100644 --- a/src/main/java/org/warp/filesponge/DataBlock.java +++ b/src/main/java/org/warp/filesponge/DataBlock.java @@ -19,27 +19,40 @@ package org.warp.filesponge; import io.netty.buffer.ByteBuf; +import io.netty.buffer.api.Buffer; +import io.netty.buffer.api.Drop; +import io.netty.buffer.api.Owned; +import io.netty.buffer.api.Send; +import io.netty.buffer.api.internal.ResourceSupport; +import it.cavallium.dbengine.database.LLEntry; import java.util.Objects; -public final class DataBlock { +public final class DataBlock extends ResourceSupport { - public DataBlock(int offset, int length, ByteBuf data) { - try { + public static DataBlock of(int offset, int length, Send data) { + return new DataBlock(offset, length, data, d -> {}); + } + + private DataBlock(int offset, int length, Send data, Drop drop) { + super(new DataBlock.CloseOnDrop(drop)); + try (data) { this.offset = offset; this.length = length; - this.data = data.retain(); - } finally { - data.release(); + this.data = data.receive(); } } private final int offset; private final int length; - private final ByteBuf data; + private final Buffer data; - public ByteBuf getData() { - assert data.isReadable(); - return data.retain(); + public Send getData() { + assert data.isAccessible(); + return data.copy().send(); + } + + public Buffer getDataUnsafe() { + return data; } public int getId() { @@ -90,11 +103,32 @@ public final class DataBlock { return "DataBlock(offset=" + this.getOffset() + ", length=" + this.getLength() + ", data=" + this.getData() + ")"; } - public void retain() { - this.data.retain(); + @Override + protected RuntimeException createResourceClosedException() { + return new IllegalStateException("Closed"); } - public void release() { - this.data.release(); + @Override + protected Owned prepareSend() { + Send dataSend; + dataSend = this.data.send(); + return drop -> new DataBlock(offset, length, dataSend, drop); + } + + private static class CloseOnDrop implements Drop { + + private final Drop delegate; + + public CloseOnDrop(Drop drop) { + this.delegate = drop; + } + + @Override + public void drop(DataBlock obj) { + if (obj.data.isAccessible()) { + obj.data.close(); + } + delegate.drop(obj); + } } } diff --git a/src/main/java/org/warp/filesponge/DiskCache.java b/src/main/java/org/warp/filesponge/DiskCache.java index 7db489b..26cc309 100644 --- a/src/main/java/org/warp/filesponge/DiskCache.java +++ b/src/main/java/org/warp/filesponge/DiskCache.java @@ -22,16 +22,21 @@ import static org.warp.filesponge.FileSponge.BLOCK_SIZE; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.buffer.api.Buffer; +import io.netty.buffer.api.CompositeBuffer; +import io.netty.buffer.api.Send; import io.netty.util.ReferenceCounted; import it.cavallium.dbengine.database.Column; import it.cavallium.dbengine.database.LLDatabaseConnection; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLKeyValueDatabase; +import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.client.DatabaseOptions; import it.cavallium.dbengine.database.serialization.SerializationException; +import it.cavallium.dbengine.database.serialization.Serializer.DeserializationResult; import it.unimi.dsi.fastutil.booleans.BooleanArrayList; import java.util.List; import java.util.Objects; @@ -77,7 +82,7 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { @Override public Mono writeMetadata(URL url, Metadata metadata) { - Mono keyMono = Mono.fromCallable(() -> url.getSerializer(db.getAllocator()).serialize(url)); + Mono> keyMono = Mono.fromCallable(() -> url.getSerializer(db.getAllocator()).serialize(url)); return fileMetadata .update(keyMono, oldValue -> Objects.requireNonNullElseGet(oldValue, () -> diskMetadataSerializer.serialize(new DiskMetadata(metadata.size(), @@ -89,22 +94,23 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { @Override public Mono writeContentBlock(URL url, DataBlock dataBlock) { - Mono urlKeyMono = Mono.fromCallable(() -> url.getSerializer(db.getAllocator()).serialize(url)); - Mono blockKeyMono = Mono.fromCallable(() -> getBlockKey(url, dataBlock.getId())); + Mono> urlKeyMono = Mono.fromCallable(() -> url.getSerializer(db.getAllocator()).serialize(url)); + Mono> blockKeyMono = Mono.fromCallable(() -> getBlockKey(url, dataBlock.getId())); return Mono .fromCallable(dataBlock::getData) .subscribeOn(Schedulers.boundedElastic()) - .flatMap(bytes -> { - Mono bytesMono = Mono.just(bytes).map(ByteBuf::retain); - return fileContent - .put(blockKeyMono, bytesMono, LLDictionaryResultType.VOID) - .doOnNext(ReferenceCounted::release) - .then(); - }) + .flatMap(bytes_ -> Mono.using( + () -> bytes_, + bytes -> fileContent + .put(blockKeyMono, Mono.just(bytes), LLDictionaryResultType.VOID) + .doOnNext(Send::close) + .then(), + Send::close + )) .then(fileMetadata.update(urlKeyMono, prevBytes -> { @Nullable DiskMetadata result; if (prevBytes != null) { - DiskMetadata prevMeta = diskMetadataSerializer.deserialize(prevBytes); + DiskMetadata prevMeta = diskMetadataSerializer.deserialize(prevBytes).deserializedData(); if (!prevMeta.isDownloadedBlock(dataBlock.getId())) { BooleanArrayList bal = prevMeta.downloadedBlocks().clone(); if (prevMeta.size() == -1) { @@ -152,11 +158,11 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { if (!downloaded) { return Mono.empty(); } - Mono blockKeyMono = Mono.fromCallable(() -> getBlockKey(url, blockId)); + var blockKeyMono = Mono.fromCallable(() -> getBlockKey(url, blockId)); return fileContent .get(null, blockKeyMono) - .map(data -> { - try { + .map(dataToReceive -> { + try (var data = dataToReceive.receive()) { int blockOffset = getBlockOffset(blockId); int blockLength = data.readableBytes(); if (meta.size() != -1) { @@ -169,20 +175,19 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { assert data.readableBytes() == BLOCK_SIZE; } } - return new DataBlock(blockOffset, blockLength, data.retain()); - } finally { - data.release(); + return DataBlock.of(blockOffset, blockLength, data.send()); } }); }) ); } - private ByteBuf getBlockKey(URL url, int blockId) throws SerializationException { - ByteBuf urlBytes = url.getSerializer(db.getAllocator()).serialize(url); - ByteBuf blockIdBytes = this.db.getAllocator().directBuffer(Integer.BYTES, Integer.BYTES); - blockIdBytes.writeInt(blockId); - return Unpooled.wrappedBuffer(urlBytes, blockIdBytes); + private Send getBlockKey(URL url, int blockId) throws SerializationException { + try (var urlBytes = url.getSerializer(db.getAllocator()).serialize(url).receive()) { + Buffer blockIdBytes = this.db.getAllocator().allocate(Integer.BYTES); + blockIdBytes.writeInt(blockId); + return LLUtils.compositeBuffer(db.getAllocator(), urlBytes.send(), blockIdBytes.send()); + } } private static int getBlockOffset(int blockId) { @@ -191,10 +196,11 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { @Override public Mono requestDiskMetadata(URL url) { - Mono urlKeyMono = Mono.fromCallable(() -> url.getSerializer(db.getAllocator()).serialize(url)); + Mono> urlKeyMono = Mono.fromCallable(() -> url.getSerializer(db.getAllocator()).serialize(url)); return fileMetadata .get(null, urlKeyMono) - .map(diskMetadataSerializer::deserialize); + .map(diskMetadataSerializer::deserialize) + .map(DeserializationResult::deserializedData); } @Override @@ -205,14 +211,15 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { @Override public Mono>> request(URL url) { - Mono urlKeyMono = Mono.fromCallable(() -> url.getSerializer(db.getAllocator()).serialize(url)); + Mono> urlKeyMono = Mono.fromCallable(() -> url.getSerializer(db.getAllocator()).serialize(url)); return Mono .using( () -> url.getSerializer(db.getAllocator()).serialize(url), key -> fileMetadata.get(null, urlKeyMono), - ReferenceCounted::release + Send::close ) .map(diskMetadataSerializer::deserialize) + .map(DeserializationResult::deserializedData) .map(diskMeta -> { var meta = diskMeta.asMetadata(); if (diskMeta.isDownloadedFully()) { diff --git a/src/main/java/org/warp/filesponge/DiskMetadata.java b/src/main/java/org/warp/filesponge/DiskMetadata.java index 3ff3d95..a2a1da4 100644 --- a/src/main/java/org/warp/filesponge/DiskMetadata.java +++ b/src/main/java/org/warp/filesponge/DiskMetadata.java @@ -23,6 +23,11 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.ByteBufOutputStream; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.api.Buffer; +import io.netty.buffer.api.BufferAllocator; +import io.netty.buffer.api.Send; +import it.cavallium.dbengine.database.serialization.BufferDataInput; +import it.cavallium.dbengine.database.serialization.BufferDataOutput; import it.cavallium.dbengine.database.serialization.Serializer; import it.unimi.dsi.fastutil.booleans.BooleanArrayList; import java.io.ByteArrayInputStream; @@ -80,56 +85,45 @@ public record DiskMetadata(int size, BooleanArrayList downloadedBlocks) { } } - public static class DiskMetadataSerializer implements Serializer { + public static class DiskMetadataSerializer implements Serializer { - private final ByteBufAllocator allocator; + private final BufferAllocator allocator; - public DiskMetadataSerializer(ByteBufAllocator allocator) { + public DiskMetadataSerializer(BufferAllocator allocator) { this.allocator = allocator; } @Override - public @NotNull DiskMetadata deserialize(@NotNull ByteBuf serialized) { - try { - var bais = new ByteBufInputStream(serialized); - var dis = new DataInputStream(bais); - int size = dis.readInt(); - int blocksCount; - if (size == -1) { - blocksCount = dis.readShort(); - } else { - blocksCount = getBlocksCount(size, FileSponge.BLOCK_SIZE); - } - var downloadedBlocks = new BooleanArrayList(blocksCount); - for (int i = 0; i < blocksCount; i++) { - downloadedBlocks.add(dis.readBoolean()); - } - return new DiskMetadata(size, downloadedBlocks); - } catch (IOException e) { - throw new SerializationException(e); - } finally { - serialized.release(); + public @NotNull DeserializationResult deserialize(@NotNull Send serialized) { + var dis = new BufferDataInput(serialized); + int size = dis.readInt(); + int blocksCount; + if (size == -1) { + blocksCount = dis.readShort(); + } else { + blocksCount = getBlocksCount(size, FileSponge.BLOCK_SIZE); } + var downloadedBlocks = new BooleanArrayList(blocksCount); + for (int i = 0; i < blocksCount; i++) { + downloadedBlocks.add(dis.readBoolean()); + } + return new DeserializationResult<>(new DiskMetadata(size, downloadedBlocks), dis.getReadBytesCount()); } @Override - public @NotNull ByteBuf serialize(@NotNull DiskMetadata deserialized) { - ByteBuf buffer = allocator.buffer(); - try (var bos = new ByteBufOutputStream(buffer)) { - try (var dos = new DataOutputStream(bos)) { - dos.writeInt(deserialized.size()); - if (deserialized.size == -1) { - dos.writeShort(deserialized.getBlocksCount()); - } else { - deserialized.getBlocksCount(); - } - for (boolean downloadedBlock : deserialized.downloadedBlocks()) { - dos.writeBoolean(downloadedBlock); - } + public @NotNull Send serialize(@NotNull DiskMetadata deserialized) { + try (var buffer = allocator.allocate(64)) { + var dos = new BufferDataOutput(buffer); + dos.writeInt(deserialized.size()); + if (deserialized.size == -1) { + dos.writeShort(deserialized.getBlocksCount()); + } else { + deserialized.getBlocksCount(); } - return buffer; - } catch (IOException e) { - throw new SerializationException(e); + for (boolean downloadedBlock : deserialized.downloadedBlocks()) { + dos.writeBoolean(downloadedBlock); + } + return buffer.send(); } } diff --git a/src/main/java/org/warp/filesponge/FileSponge.java b/src/main/java/org/warp/filesponge/FileSponge.java index bc70bd8..f69d675 100644 --- a/src/main/java/org/warp/filesponge/FileSponge.java +++ b/src/main/java/org/warp/filesponge/FileSponge.java @@ -101,7 +101,7 @@ public class FileSponge implements URLsHandler { ) .distinct(DataBlock::getId) - .doOnDiscard(DataBlock.class, DataBlock::release); + .doOnDiscard(DataBlock.class, DataBlock::close); } @Override diff --git a/src/main/java/org/warp/filesponge/URL.java b/src/main/java/org/warp/filesponge/URL.java index aae425f..9ac8222 100644 --- a/src/main/java/org/warp/filesponge/URL.java +++ b/src/main/java/org/warp/filesponge/URL.java @@ -20,10 +20,11 @@ package org.warp.filesponge; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.api.BufferAllocator; import it.cavallium.dbengine.database.serialization.Serializer; public interface URL { - Serializer getSerializer(ByteBufAllocator allocator); + Serializer getSerializer(BufferAllocator allocator); }