diff --git a/src/main/lombok/org/warp/filesponge/DataBlock.java b/src/main/lombok/org/warp/filesponge/DataBlock.java index 7b917ca..c3fe0fa 100644 --- a/src/main/lombok/org/warp/filesponge/DataBlock.java +++ b/src/main/lombok/org/warp/filesponge/DataBlock.java @@ -18,16 +18,28 @@ package org.warp.filesponge; +import io.netty.buffer.ByteBuf; import java.nio.ByteBuffer; import lombok.Value; @Value public class DataBlock { + public DataBlock(int offset, int length, ByteBuf data) { + this.offset = offset; + assert data.isDirect(); + this.length = length; + this.data = data; + } + int offset; int length; - ByteBuffer data; + ByteBuf data; + public ByteBuf getData() { + assert data.isReadable(); + return data; + } public int getId() { return offset / FileSponge.BLOCK_SIZE; diff --git a/src/main/lombok/org/warp/filesponge/DiskCache.java b/src/main/lombok/org/warp/filesponge/DiskCache.java index 53c336c..e256791 100644 --- a/src/main/lombok/org/warp/filesponge/DiskCache.java +++ b/src/main/lombok/org/warp/filesponge/DiskCache.java @@ -21,6 +21,9 @@ package org.warp.filesponge; import static org.warp.filesponge.FileSponge.BLOCK_SIZE; import com.google.common.primitives.Ints; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; import it.cavallium.dbengine.database.Column; import it.cavallium.dbengine.database.LLDatabaseConnection; import it.cavallium.dbengine.database.LLDictionary; @@ -34,6 +37,7 @@ import java.util.List; import java.util.Optional; import lombok.AccessLevel; import lombok.AllArgsConstructor; +import org.jetbrains.annotations.Nullable; import org.warp.filesponge.DiskMetadata.DiskMetadataSerializer; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -52,7 +56,7 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { public static Mono open(LLDatabaseConnection databaseConnection, String dbName, boolean lowMemory) { return databaseConnection - .getDatabase(dbName, List.of(Column.dictionary("file-content"), Column.dictionary("file-metadata")), lowMemory) + .getDatabase(dbName, List.of(Column.dictionary("file-content"), Column.dictionary("file-metadata")), lowMemory, false) .flatMap(db -> Mono.zip( Mono.just(db).single(), db.getDictionary("file-content", UpdateMode.ALLOW).single(), @@ -66,15 +70,13 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { public Mono writeMetadata(URL url, Metadata metadata) { return fileMetadata .update(url.getSerializer().serialize(url), oldValue -> { - if (oldValue.isPresent()) { + if (oldValue != null) { return oldValue; } else { - return Optional - .of(new DiskMetadata( + return diskMetadataSerializer.serialize(new DiskMetadata( metadata.getSize(), BooleanArrayList.wrap(new boolean[DiskMetadata.getBlocksCount(metadata.getSize(), BLOCK_SIZE)]) - )) - .map(diskMetadataSerializer::serialize); + )); } }) .then(); @@ -84,26 +86,36 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { public Mono writeContentBlock(URL url, DataBlock dataBlock) { return Mono .fromCallable(() -> { - byte[] bytes = new byte[dataBlock.getLength()]; - dataBlock.getData().get(bytes); + return dataBlock.getData(); + /* + ByteBuf bytes = PooledByteBufAllocator.DEFAULT.directBuffer(dataBlock.getLength()); + bytes.writeBytes(dataBlock.getData().slice()); return bytes; - }).subscribeOn(Schedulers.boundedElastic()) + + */ + }) + .subscribeOn(Schedulers.boundedElastic()) .flatMap(bytes -> fileContent.put(getBlockKey(url, dataBlock.getId()), bytes, LLDictionaryResultType.VOID)) - .then(fileMetadata - .update(url.getSerializer().serialize(url), prevBytes -> prevBytes - .map(diskMetadataSerializer::deserialize) - .map(prevMeta -> { - if (!prevMeta.getDownloadedBlocks().getBoolean(dataBlock.getId())) { - BooleanArrayList bal = prevMeta.getDownloadedBlocks().clone(); - bal.set(dataBlock.getId(), true); - return new DiskMetadata(prevMeta.getSize(), bal); - } else { - return prevMeta; - } - }) - .map(diskMetadataSerializer::serialize) - ) - ) + .then(fileMetadata.update(url.getSerializer().serialize(url), prevBytes -> { + @Nullable DiskMetadata result; + if (prevBytes != null) { + DiskMetadata prevMeta = diskMetadataSerializer.deserialize(prevBytes); + if (!prevMeta.getDownloadedBlocks().getBoolean(dataBlock.getId())) { + BooleanArrayList bal = prevMeta.getDownloadedBlocks().clone(); + bal.set(dataBlock.getId(), true); + result = new DiskMetadata(prevMeta.getSize(), bal); + } else { + result = prevMeta; + } + } else { + result = null; + } + if (result != null) { + return diskMetadataSerializer.serialize(result); + } else { + return null; + } + })) .then(); } @@ -123,26 +135,25 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { } return fileContent.get(null, getBlockKey(url, blockId)).map(data -> { int blockOffset = getBlockOffset(blockId); - int blockLength = data.length; + int blockLength = data.readableBytes(); if (blockOffset + blockLength >= meta.getSize()) { if (blockOffset + blockLength > meta.getSize()) { throw new IllegalStateException("Overflowed data size"); } } else { // Intermediate blocks must be of max size - assert data.length == BLOCK_SIZE; + assert data.readableBytes() == BLOCK_SIZE; } - return new DataBlock(blockOffset, blockLength, ByteBuffer.wrap(data, 0, blockLength)); + return new DataBlock(blockOffset, blockLength, data); }); })); } - private byte[] getBlockKey(URL url, int blockId) { - byte[] urlBytes = url.getSerializer().serialize(url); - byte[] blockIdBytes = Ints.toByteArray(blockId); - byte[] resultBytes = Arrays.copyOf(urlBytes, urlBytes.length + blockIdBytes.length); - System.arraycopy(blockIdBytes, 0, resultBytes, urlBytes.length, blockIdBytes.length); - return resultBytes; + private ByteBuf getBlockKey(URL url, int blockId) { + ByteBuf urlBytes = url.getSerializer().serialize(url); + ByteBuf blockIdBytes = PooledByteBufAllocator.DEFAULT.directBuffer(Integer.BYTES, Integer.BYTES); + blockIdBytes.writeInt(blockId); + return Unpooled.wrappedBuffer(urlBytes, blockIdBytes); } private static int getBlockOffset(int blockId) { diff --git a/src/main/lombok/org/warp/filesponge/DiskMetadata.java b/src/main/lombok/org/warp/filesponge/DiskMetadata.java index 4feb7e7..ded5fb7 100644 --- a/src/main/lombok/org/warp/filesponge/DiskMetadata.java +++ b/src/main/lombok/org/warp/filesponge/DiskMetadata.java @@ -18,6 +18,10 @@ package org.warp.filesponge; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.buffer.PooledByteBufAllocator; import it.cavallium.dbengine.database.serialization.Serializer; import it.unimi.dsi.fastutil.booleans.BooleanArrayList; import java.io.ByteArrayInputStream; @@ -68,26 +72,31 @@ public class DiskMetadata { return new Metadata(size); } - public static class DiskMetadataSerializer implements Serializer { + public static class DiskMetadataSerializer implements Serializer { @SneakyThrows @Override - public @NotNull DiskMetadata deserialize(byte @NotNull [] serialized) { - var bais = new ByteArrayInputStream(serialized); - var dis = new DataInputStream(bais); - int size = dis.readInt(); - int blocksCount = getBlocksCount(size, FileSponge.BLOCK_SIZE); - var downloadedBlocks = new BooleanArrayList(blocksCount); - for (int i = 0; i < blocksCount; i++) { - downloadedBlocks.add(dis.readBoolean()); + public @NotNull DiskMetadata deserialize(@NotNull ByteBuf serialized) { + try { + var bais = new ByteBufInputStream(serialized); + var dis = new DataInputStream(bais); + int size = dis.readInt(); + int blocksCount = getBlocksCount(size, FileSponge.BLOCK_SIZE); + var downloadedBlocks = new BooleanArrayList(blocksCount); + for (int i = 0; i < blocksCount; i++) { + downloadedBlocks.add(dis.readBoolean()); + } + return new DiskMetadata(size, downloadedBlocks); + } finally { + serialized.release(); } - return new DiskMetadata(size, downloadedBlocks); } @SneakyThrows @Override - public byte @NotNull [] serialize(@NotNull DiskMetadata deserialized) { - try (var bos = new ByteArrayOutputStream()) { + public @NotNull ByteBuf serialize(@NotNull DiskMetadata deserialized) { + ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(); + try (var bos = new ByteBufOutputStream(buffer)) { try (var dos = new DataOutputStream(bos)) { dos.writeInt(deserialized.getSize()); deserialized.getBlocksCount(); @@ -95,7 +104,7 @@ public class DiskMetadata { dos.writeBoolean(downloadedBlock); } } - return bos.toByteArray(); + return buffer; } } diff --git a/src/main/lombok/org/warp/filesponge/URL.java b/src/main/lombok/org/warp/filesponge/URL.java index 8e254de..8628861 100644 --- a/src/main/lombok/org/warp/filesponge/URL.java +++ b/src/main/lombok/org/warp/filesponge/URL.java @@ -18,10 +18,11 @@ package org.warp.filesponge; +import io.netty.buffer.ByteBuf; import it.cavallium.dbengine.database.serialization.Serializer; public interface URL { - Serializer getSerializer(); + Serializer getSerializer(); }