diff --git a/src/main/lombok/org/warp/filesponge/DiskCache.java b/src/main/lombok/org/warp/filesponge/DiskCache.java index c061a48..4cacd83 100644 --- a/src/main/lombok/org/warp/filesponge/DiskCache.java +++ b/src/main/lombok/org/warp/filesponge/DiskCache.java @@ -46,15 +46,21 @@ import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; -@AllArgsConstructor(access = AccessLevel.PRIVATE) public class DiskCache implements URLsDiskHandler, URLsWriter { - private static final DiskMetadataSerializer diskMetadataSerializer = new DiskMetadataSerializer(); + private final DiskMetadataSerializer diskMetadataSerializer; private final LLKeyValueDatabase db; private final LLDictionary fileContent; private final LLDictionary fileMetadata; + public DiskCache(LLKeyValueDatabase db, LLDictionary fileContent, LLDictionary fileMetadata) { + this.db = db; + this.fileContent = fileContent; + this.fileMetadata = fileMetadata; + this.diskMetadataSerializer = new DiskMetadataSerializer(db.getAllocator()); + } + public static Mono open(LLDatabaseConnection databaseConnection, String dbName, boolean lowMemory) { return databaseConnection .getDatabase(dbName, List.of(Column.dictionary("file-content"), Column.dictionary("file-metadata")), lowMemory, false) @@ -70,7 +76,7 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { @Override public Mono writeMetadata(URL url, Metadata metadata) { return fileMetadata - .update(url.getSerializer().serialize(url), oldValue -> { + .update(url.getSerializer(db.getAllocator()).serialize(url), oldValue -> { if (oldValue != null) { return oldValue; } else { @@ -98,7 +104,7 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { .subscribeOn(Schedulers.boundedElastic()) .flatMap(bytes -> fileContent.put(getBlockKey(url, dataBlock.getId()), bytes, LLDictionaryResultType.VOID)) .doOnNext(ReferenceCounted::release) - .then(fileMetadata.update(url.getSerializer().serialize(url), prevBytes -> { + .then(fileMetadata.update(url.getSerializer(db.getAllocator()).serialize(url), prevBytes -> { @Nullable DiskMetadata result; if (prevBytes != null) { DiskMetadata prevMeta = diskMetadataSerializer.deserialize(prevBytes); @@ -152,8 +158,8 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { } private ByteBuf getBlockKey(URL url, int blockId) { - ByteBuf urlBytes = url.getSerializer().serialize(url); - ByteBuf blockIdBytes = PooledByteBufAllocator.DEFAULT.directBuffer(Integer.BYTES, Integer.BYTES); + ByteBuf urlBytes = url.getSerializer(db.getAllocator()).serialize(url); + ByteBuf blockIdBytes = this.db.getAllocator().directBuffer(Integer.BYTES, Integer.BYTES); blockIdBytes.writeInt(blockId); return Unpooled.wrappedBuffer(urlBytes, blockIdBytes); } @@ -165,7 +171,7 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { @Override public Mono requestDiskMetadata(URL url) { return fileMetadata - .get(null, url.getSerializer().serialize(url)) + .get(null, url.getSerializer(db.getAllocator()).serialize(url)) .map(diskMetadataSerializer::deserialize); } @@ -178,7 +184,7 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { @Override public Mono>> request(URL url) { return fileMetadata - .get(null, url.getSerializer().serialize(url)) + .get(null, url.getSerializer(db.getAllocator()).serialize(url)) .map(diskMetadataSerializer::deserialize) .map(diskMeta -> { var meta = diskMeta.asMetadata(); diff --git a/src/main/lombok/org/warp/filesponge/DiskMetadata.java b/src/main/lombok/org/warp/filesponge/DiskMetadata.java index ded5fb7..24ecf9e 100644 --- a/src/main/lombok/org/warp/filesponge/DiskMetadata.java +++ b/src/main/lombok/org/warp/filesponge/DiskMetadata.java @@ -19,6 +19,7 @@ package org.warp.filesponge; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.ByteBufOutputStream; import io.netty.buffer.PooledByteBufAllocator; @@ -31,6 +32,7 @@ import java.io.DataOutputStream; import lombok.Data; import lombok.SneakyThrows; import org.jetbrains.annotations.NotNull; +import org.warp.filesponge.DiskMetadata.DiskMetadataSerializer; @Data public class DiskMetadata { @@ -74,6 +76,12 @@ public class DiskMetadata { public static class DiskMetadataSerializer implements Serializer { + private final ByteBufAllocator allocator; + + public DiskMetadataSerializer(ByteBufAllocator allocator) { + this.allocator = allocator; + } + @SneakyThrows @Override public @NotNull DiskMetadata deserialize(@NotNull ByteBuf serialized) { @@ -95,7 +103,7 @@ public class DiskMetadata { @SneakyThrows @Override public @NotNull ByteBuf serialize(@NotNull DiskMetadata deserialized) { - ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(); + ByteBuf buffer = allocator.buffer(); try (var bos = new ByteBufOutputStream(buffer)) { try (var dos = new DataOutputStream(bos)) { dos.writeInt(deserialized.getSize()); diff --git a/src/main/lombok/org/warp/filesponge/URL.java b/src/main/lombok/org/warp/filesponge/URL.java index 8628861..aae425f 100644 --- a/src/main/lombok/org/warp/filesponge/URL.java +++ b/src/main/lombok/org/warp/filesponge/URL.java @@ -19,10 +19,11 @@ package org.warp.filesponge; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import it.cavallium.dbengine.database.serialization.Serializer; public interface URL { - Serializer getSerializer(); + Serializer getSerializer(ByteBufAllocator allocator); }