Fix streaming files

This commit is contained in:
Andrea Cavalli 2021-06-02 13:21:34 +02:00
parent 8325bf7a73
commit eeadda9b78
2 changed files with 50 additions and 12 deletions

View File

@ -115,9 +115,21 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
@Nullable DiskMetadata result;
if (prevBytes != null) {
DiskMetadata prevMeta = diskMetadataSerializer.deserialize(prevBytes);
if (!prevMeta.downloadedBlocks().getBoolean(dataBlock.getId())) {
if (!prevMeta.isDownloadedBlock(dataBlock.getId())) {
BooleanArrayList bal = prevMeta.downloadedBlocks().clone();
if (prevMeta.size() == -1) {
if (bal.size() > dataBlock.getId()) {
bal.set(dataBlock.getId(), true);
} else if (bal.size() == dataBlock.getId()) {
bal.add(true);
} else {
throw new IndexOutOfBoundsException(
"Trying to write a block too much far from the last block. Previous total blocks: "
+ bal.size() + " Current block id: " + dataBlock.getId());
}
} else {
bal.set(dataBlock.getId(), true);
}
result = new DiskMetadata(prevMeta.size(), bal);
} else {
result = prevMeta;
@ -161,6 +173,7 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
try {
int blockOffset = getBlockOffset(blockId);
int blockLength = data.readableBytes();
if (meta.size() != -1) {
if (blockOffset + blockLength >= meta.size()) {
if (blockOffset + blockLength > meta.size()) {
throw new IllegalStateException("Overflowed data size");
@ -169,6 +182,7 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
// Intermediate blocks must be of max size
assert data.readableBytes() == BLOCK_SIZE;
}
}
return new DataBlock(blockOffset, blockLength, data.retain());
} finally {
data.release();

View File

@ -47,9 +47,13 @@ public record DiskMetadata(int size, BooleanArrayList downloadedBlocks) {
return downloadedFullyVal;
}
@SuppressWarnings("UnusedReturnValue")
private int getBlocksCount() {
if (size == -1) {
return downloadedBlocks().size();
}
var expectedBlocksCount = getBlocksCount(size, FileSponge.BLOCK_SIZE);
if (this.downloadedBlocks.size() != expectedBlocksCount) {
if (this.downloadedBlocks().size() != expectedBlocksCount) {
throw new IllegalStateException(
"Blocks array length (" + this.downloadedBlocks().size() + ") != expected blocks count ("
+ expectedBlocksCount + ")");
@ -58,6 +62,9 @@ public record DiskMetadata(int size, BooleanArrayList downloadedBlocks) {
}
public static int getBlocksCount(int size, int blockSize) {
if (size == -1) {
return 0;
}
return (size + (blockSize - size % blockSize)) / blockSize;
}
@ -65,6 +72,14 @@ public record DiskMetadata(int size, BooleanArrayList downloadedBlocks) {
return new Metadata(size);
}
public boolean isDownloadedBlock(int id) {
if (size == -1 && downloadedBlocks().size() <= id) {
return false;
} else {
return downloadedBlocks().getBoolean(id);
}
}
public static class DiskMetadataSerializer implements Serializer<DiskMetadata, ByteBuf> {
private final ByteBufAllocator allocator;
@ -79,7 +94,12 @@ public record DiskMetadata(int size, BooleanArrayList downloadedBlocks) {
var bais = new ByteBufInputStream(serialized);
var dis = new DataInputStream(bais);
int size = dis.readInt();
int blocksCount = getBlocksCount(size, FileSponge.BLOCK_SIZE);
int blocksCount;
if (size == -1) {
blocksCount = dis.readShort();
} else {
blocksCount = getBlocksCount(size, FileSponge.BLOCK_SIZE);
}
var downloadedBlocks = new BooleanArrayList(blocksCount);
for (int i = 0; i < blocksCount; i++) {
downloadedBlocks.add(dis.readBoolean());
@ -98,8 +118,12 @@ public record DiskMetadata(int size, BooleanArrayList downloadedBlocks) {
try (var bos = new ByteBufOutputStream(buffer)) {
try (var dos = new DataOutputStream(bos)) {
dos.writeInt(deserialized.size());
if (deserialized.size == -1) {
dos.writeShort(deserialized.getBlocksCount());
} else {
deserialized.getBlocksCount();
for (boolean downloadedBlock : deserialized.downloadedBlocks) {
}
for (boolean downloadedBlock : deserialized.downloadedBlocks()) {
dos.writeBoolean(downloadedBlock);
}
}