From e18143248330a02e0faa3269de02eef6c4519586 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Tue, 19 Oct 2021 00:28:30 +0200 Subject: [PATCH] Update dbengine --- .../java/org/warp/filesponge/DiskCache.java | 64 ++++++++++++++----- .../org/warp/filesponge/DiskMetadata.java | 54 ++++++++-------- src/main/java/org/warp/filesponge/URL.java | 2 +- 3 files changed, 74 insertions(+), 46 deletions(-) diff --git a/src/main/java/org/warp/filesponge/DiskCache.java b/src/main/java/org/warp/filesponge/DiskCache.java index 0cc26df..e30c756 100644 --- a/src/main/java/org/warp/filesponge/DiskCache.java +++ b/src/main/java/org/warp/filesponge/DiskCache.java @@ -32,7 +32,6 @@ import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.client.DatabaseOptions; import it.cavallium.dbengine.database.serialization.SerializationException; -import it.cavallium.dbengine.database.serialization.Serializer.DeserializationResult; import it.unimi.dsi.fastutil.booleans.BooleanArrayList; import java.util.List; import java.util.Objects; @@ -56,7 +55,7 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { this.db = db; this.fileContent = fileContent; this.fileMetadata = fileMetadata; - this.diskMetadataSerializer = new DiskMetadataSerializer(db.getAllocator()); + this.diskMetadataSerializer = new DiskMetadataSerializer(); } public static Mono open(LLDatabaseConnection databaseConnection, @@ -78,19 +77,54 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { @Override public Mono writeMetadata(URL url, Metadata metadata) { - Mono> keyMono = Mono.fromCallable(() -> url.getSerializer(db.getAllocator()).serialize(url)); + Mono> keyMono = Mono.fromCallable(() -> serializeUrl(url)); return fileMetadata .update(keyMono, oldValue -> Objects.requireNonNullElseGet(oldValue, - () -> diskMetadataSerializer.serialize(new DiskMetadata(metadata.size(), + () -> serializeMetadata(new DiskMetadata(metadata.size(), BooleanArrayList.wrap(new boolean[DiskMetadata.getBlocksCount(metadata.size(), BLOCK_SIZE)]) )) ), UpdateReturnMode.NOTHING) .then(); } + private Send serializeUrl(URL url) { + var urlSerializer = url.getSerializer(); + int sizeHint = urlSerializer.getSerializedSizeHint(); + if (sizeHint == -1) sizeHint = 64; + try (var buffer = db.getAllocator().allocate(sizeHint)) { + try { + urlSerializer.serialize(url, buffer); + } catch (SerializationException ex) { + throw new IllegalStateException("Failed to serialize url", ex); + } + return buffer.send(); + } + } + + private Send serializeMetadata(DiskMetadata diskMetadata) { + int sizeHint = diskMetadataSerializer.getSerializedSizeHint(); + if (sizeHint == -1) sizeHint = 64; + try (var buffer = db.getAllocator().allocate(sizeHint)) { + try { + diskMetadataSerializer.serialize(diskMetadata, buffer); + } catch (SerializationException ex) { + throw new IllegalStateException("Failed to serialize metadata", ex); + } + return buffer.send(); + } + } + + private DiskMetadata deserializeMetadata(Send prevBytes) { + try (var prevBytesBuf = prevBytes.receive()) { + return diskMetadataSerializer.deserialize(prevBytesBuf); + } catch (SerializationException ex) { + throw new IllegalStateException("Failed to deserialize metadata", ex); + } + } + @Override public Mono writeContentBlock(URL url, DataBlock dataBlock) { - Mono> urlKeyMono = Mono.fromCallable(() -> url.getSerializer(db.getAllocator()).serialize(url)); + Mono> urlKeyMono = Mono.fromCallable(() -> serializeUrl(url)); Mono> blockKeyMono = Mono.fromCallable(() -> getBlockKey(url, dataBlock.getId())); return Mono .fromCallable(dataBlock::getData) @@ -106,7 +140,7 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { .then(fileMetadata.update(urlKeyMono, prevBytes -> { @Nullable DiskMetadata result; if (prevBytes != null) { - DiskMetadata prevMeta = diskMetadataSerializer.deserialize(prevBytes).deserializedData(); + DiskMetadata prevMeta = deserializeMetadata(prevBytes); if (!prevMeta.isDownloadedBlock(dataBlock.getId())) { BooleanArrayList bal = prevMeta.downloadedBlocks().clone(); if (prevMeta.size() == -1) { @@ -130,7 +164,7 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { result = null; } if (result != null) { - return diskMetadataSerializer.serialize(result); + return serializeMetadata(result); } else { return null; } @@ -178,8 +212,8 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { ); } - private Send getBlockKey(URL url, int blockId) throws SerializationException { - try (var urlBytes = url.getSerializer(db.getAllocator()).serialize(url).receive()) { + private Send getBlockKey(URL url, int blockId) { + try (var urlBytes = serializeUrl(url).receive()) { Buffer blockIdBytes = this.db.getAllocator().allocate(Integer.BYTES); blockIdBytes.writeInt(blockId); return LLUtils.compositeBuffer(db.getAllocator(), urlBytes.send(), blockIdBytes.send()).send(); @@ -192,11 +226,10 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { @Override public Mono requestDiskMetadata(URL url) { - Mono> urlKeyMono = Mono.fromCallable(() -> url.getSerializer(db.getAllocator()).serialize(url)); + Mono> urlKeyMono = Mono.fromCallable(() -> serializeUrl(url)); return fileMetadata .get(null, urlKeyMono) - .map(diskMetadataSerializer::deserialize) - .map(DeserializationResult::deserializedData); + .map(this::deserializeMetadata); } @Override @@ -207,16 +240,15 @@ public class DiskCache implements URLsDiskHandler, URLsWriter { @Override public Mono>> request(URL url) { - Mono> urlKeyMono = Mono.fromCallable(() -> url.getSerializer(db.getAllocator()).serialize(url)); + Mono> urlKeyMono = Mono.fromCallable(() -> serializeUrl(url)); return Mono .using( - () -> url.getSerializer(db.getAllocator()).serialize(url), + () -> serializeUrl(url), key -> fileMetadata.get(null, urlKeyMono), Send::close ) .map(serialized -> { - var diskMetadataDeserializationResult = diskMetadataSerializer.deserialize(serialized); - var diskMeta = diskMetadataDeserializationResult.deserializedData(); + var diskMeta = deserializeMetadata(serialized); var meta = diskMeta.asMetadata(); if (diskMeta.isDownloadedFully()) { return Tuples.of(meta, this.requestContent(url)); diff --git a/src/main/java/org/warp/filesponge/DiskMetadata.java b/src/main/java/org/warp/filesponge/DiskMetadata.java index ef0d8e5..bab552f 100644 --- a/src/main/java/org/warp/filesponge/DiskMetadata.java +++ b/src/main/java/org/warp/filesponge/DiskMetadata.java @@ -21,8 +21,10 @@ package org.warp.filesponge; import io.net5.buffer.api.Buffer; import io.net5.buffer.api.BufferAllocator; import io.net5.buffer.api.Send; -import it.cavallium.dbengine.database.serialization.BufferDataInput; +import it.cavallium.dbengine.database.serialization.BufferDataInputOwned; +import it.cavallium.dbengine.database.serialization.BufferDataInputShared; import it.cavallium.dbengine.database.serialization.BufferDataOutput; +import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.Serializer; import it.unimi.dsi.fastutil.booleans.BooleanArrayList; import org.jetbrains.annotations.NotNull; @@ -37,19 +39,19 @@ public record DiskMetadata(int size, BooleanArrayList downloadedBlocks) { // Ensure blocks count is valid by calling getBlocksCount() getBlocksCount(); // It's fully downloaded if every block is true - downloadedFullyVal = !this.downloadedBlocks().contains(false); + downloadedFullyVal = !this.downloadedBlocks.contains(false); return downloadedFullyVal; } @SuppressWarnings("UnusedReturnValue") private int getBlocksCount() { if (size == -1) { - return downloadedBlocks().size(); + return downloadedBlocks.size(); } var expectedBlocksCount = getBlocksCount(size, FileSponge.BLOCK_SIZE); - if (this.downloadedBlocks().size() != expectedBlocksCount) { + if (this.downloadedBlocks.size() != expectedBlocksCount) { throw new IllegalStateException( - "Blocks array length (" + this.downloadedBlocks().size() + ") != expected blocks count (" + "Blocks array length (" + this.downloadedBlocks.size() + ") != expected blocks count (" + expectedBlocksCount + ")"); } return expectedBlocksCount; @@ -67,24 +69,18 @@ public record DiskMetadata(int size, BooleanArrayList downloadedBlocks) { } public boolean isDownloadedBlock(int id) { - if (size == -1 && downloadedBlocks().size() <= id) { + if (size == -1 && downloadedBlocks.size() <= id) { return false; } else { - return downloadedBlocks().getBoolean(id); + return downloadedBlocks.getBoolean(id); } } public static class DiskMetadataSerializer implements Serializer { - private final BufferAllocator allocator; - - public DiskMetadataSerializer(BufferAllocator allocator) { - this.allocator = allocator; - } - @Override - public @NotNull DeserializationResult deserialize(@NotNull Send serialized) { - var dis = new BufferDataInput(serialized); + public @NotNull DiskMetadata deserialize(@NotNull Buffer serialized) throws SerializationException { + var dis = new BufferDataInputShared(serialized); int size = dis.readInt(); int blocksCount; if (size == -1) { @@ -96,25 +92,25 @@ public record DiskMetadata(int size, BooleanArrayList downloadedBlocks) { for (int i = 0; i < blocksCount; i++) { downloadedBlocks.add(dis.readBoolean()); } - return new DeserializationResult<>(new DiskMetadata(size, downloadedBlocks), dis.getReadBytesCount()); + return new DiskMetadata(size, downloadedBlocks); } @Override - public @NotNull Send serialize(@NotNull DiskMetadata deserialized) { - try (var buffer = allocator.allocate(64)) { - var dos = new BufferDataOutput(buffer); - dos.writeInt(deserialized.size()); - if (deserialized.size == -1) { - dos.writeShort(deserialized.getBlocksCount()); - } else { - deserialized.getBlocksCount(); - } - for (boolean downloadedBlock : deserialized.downloadedBlocks()) { - dos.writeBoolean(downloadedBlock); - } - return buffer.send(); + public void serialize(@NotNull DiskMetadata deserialized, Buffer output) throws SerializationException { + var dos = new BufferDataOutput(output); + dos.writeInt(deserialized.size); + var blocksCount = deserialized.getBlocksCount(); + if (deserialized.size == -1) { + dos.writeShort(blocksCount); + } + for (boolean downloadedBlock : deserialized.downloadedBlocks) { + dos.writeBoolean(downloadedBlock); } } + @Override + public int getSerializedSizeHint() { + return Integer.BYTES; + } } } diff --git a/src/main/java/org/warp/filesponge/URL.java b/src/main/java/org/warp/filesponge/URL.java index f7bf110..7b18f54 100644 --- a/src/main/java/org/warp/filesponge/URL.java +++ b/src/main/java/org/warp/filesponge/URL.java @@ -23,6 +23,6 @@ import it.cavallium.dbengine.database.serialization.Serializer; public interface URL { - Serializer getSerializer(BufferAllocator allocator); + Serializer getSerializer(); }