Update dbengine

This commit is contained in:
Andrea Cavalli 2021-10-19 00:28:30 +02:00
parent 2cfe213280
commit e181432483
3 changed files with 74 additions and 46 deletions

View File

@ -32,7 +32,6 @@ import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.client.DatabaseOptions; import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.Serializer.DeserializationResult;
import it.unimi.dsi.fastutil.booleans.BooleanArrayList; import it.unimi.dsi.fastutil.booleans.BooleanArrayList;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
@ -56,7 +55,7 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
this.db = db; this.db = db;
this.fileContent = fileContent; this.fileContent = fileContent;
this.fileMetadata = fileMetadata; this.fileMetadata = fileMetadata;
this.diskMetadataSerializer = new DiskMetadataSerializer(db.getAllocator()); this.diskMetadataSerializer = new DiskMetadataSerializer();
} }
public static Mono<DiskCache> open(LLDatabaseConnection databaseConnection, public static Mono<DiskCache> open(LLDatabaseConnection databaseConnection,
@ -78,19 +77,54 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
@Override @Override
public Mono<Void> writeMetadata(URL url, Metadata metadata) { public Mono<Void> writeMetadata(URL url, Metadata metadata) {
Mono<Send<Buffer>> keyMono = Mono.fromCallable(() -> url.getSerializer(db.getAllocator()).serialize(url)); Mono<Send<Buffer>> keyMono = Mono.fromCallable(() -> serializeUrl(url));
return fileMetadata return fileMetadata
.update(keyMono, oldValue -> Objects.requireNonNullElseGet(oldValue, .update(keyMono, oldValue -> Objects.requireNonNullElseGet(oldValue,
() -> diskMetadataSerializer.serialize(new DiskMetadata(metadata.size(), () -> serializeMetadata(new DiskMetadata(metadata.size(),
BooleanArrayList.wrap(new boolean[DiskMetadata.getBlocksCount(metadata.size(), BLOCK_SIZE)]) BooleanArrayList.wrap(new boolean[DiskMetadata.getBlocksCount(metadata.size(), BLOCK_SIZE)])
)) ))
), UpdateReturnMode.NOTHING) ), UpdateReturnMode.NOTHING)
.then(); .then();
} }
private Send<Buffer> serializeUrl(URL url) {
var urlSerializer = url.getSerializer();
int sizeHint = urlSerializer.getSerializedSizeHint();
if (sizeHint == -1) sizeHint = 64;
try (var buffer = db.getAllocator().allocate(sizeHint)) {
try {
urlSerializer.serialize(url, buffer);
} catch (SerializationException ex) {
throw new IllegalStateException("Failed to serialize url", ex);
}
return buffer.send();
}
}
private Send<Buffer> serializeMetadata(DiskMetadata diskMetadata) {
int sizeHint = diskMetadataSerializer.getSerializedSizeHint();
if (sizeHint == -1) sizeHint = 64;
try (var buffer = db.getAllocator().allocate(sizeHint)) {
try {
diskMetadataSerializer.serialize(diskMetadata, buffer);
} catch (SerializationException ex) {
throw new IllegalStateException("Failed to serialize metadata", ex);
}
return buffer.send();
}
}
private DiskMetadata deserializeMetadata(Send<Buffer> prevBytes) {
try (var prevBytesBuf = prevBytes.receive()) {
return diskMetadataSerializer.deserialize(prevBytesBuf);
} catch (SerializationException ex) {
throw new IllegalStateException("Failed to deserialize metadata", ex);
}
}
@Override @Override
public Mono<Void> writeContentBlock(URL url, DataBlock dataBlock) { public Mono<Void> writeContentBlock(URL url, DataBlock dataBlock) {
Mono<Send<Buffer>> urlKeyMono = Mono.fromCallable(() -> url.getSerializer(db.getAllocator()).serialize(url)); Mono<Send<Buffer>> urlKeyMono = Mono.fromCallable(() -> serializeUrl(url));
Mono<Send<Buffer>> blockKeyMono = Mono.fromCallable(() -> getBlockKey(url, dataBlock.getId())); Mono<Send<Buffer>> blockKeyMono = Mono.fromCallable(() -> getBlockKey(url, dataBlock.getId()));
return Mono return Mono
.fromCallable(dataBlock::getData) .fromCallable(dataBlock::getData)
@ -106,7 +140,7 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
.then(fileMetadata.update(urlKeyMono, prevBytes -> { .then(fileMetadata.update(urlKeyMono, prevBytes -> {
@Nullable DiskMetadata result; @Nullable DiskMetadata result;
if (prevBytes != null) { if (prevBytes != null) {
DiskMetadata prevMeta = diskMetadataSerializer.deserialize(prevBytes).deserializedData(); DiskMetadata prevMeta = deserializeMetadata(prevBytes);
if (!prevMeta.isDownloadedBlock(dataBlock.getId())) { if (!prevMeta.isDownloadedBlock(dataBlock.getId())) {
BooleanArrayList bal = prevMeta.downloadedBlocks().clone(); BooleanArrayList bal = prevMeta.downloadedBlocks().clone();
if (prevMeta.size() == -1) { if (prevMeta.size() == -1) {
@ -130,7 +164,7 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
result = null; result = null;
} }
if (result != null) { if (result != null) {
return diskMetadataSerializer.serialize(result); return serializeMetadata(result);
} else { } else {
return null; return null;
} }
@ -178,8 +212,8 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
); );
} }
private Send<Buffer> getBlockKey(URL url, int blockId) throws SerializationException { private Send<Buffer> getBlockKey(URL url, int blockId) {
try (var urlBytes = url.getSerializer(db.getAllocator()).serialize(url).receive()) { try (var urlBytes = serializeUrl(url).receive()) {
Buffer blockIdBytes = this.db.getAllocator().allocate(Integer.BYTES); Buffer blockIdBytes = this.db.getAllocator().allocate(Integer.BYTES);
blockIdBytes.writeInt(blockId); blockIdBytes.writeInt(blockId);
return LLUtils.compositeBuffer(db.getAllocator(), urlBytes.send(), blockIdBytes.send()).send(); return LLUtils.compositeBuffer(db.getAllocator(), urlBytes.send(), blockIdBytes.send()).send();
@ -192,11 +226,10 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
@Override @Override
public Mono<DiskMetadata> requestDiskMetadata(URL url) { public Mono<DiskMetadata> requestDiskMetadata(URL url) {
Mono<Send<Buffer>> urlKeyMono = Mono.fromCallable(() -> url.getSerializer(db.getAllocator()).serialize(url)); Mono<Send<Buffer>> urlKeyMono = Mono.fromCallable(() -> serializeUrl(url));
return fileMetadata return fileMetadata
.get(null, urlKeyMono) .get(null, urlKeyMono)
.map(diskMetadataSerializer::deserialize) .map(this::deserializeMetadata);
.map(DeserializationResult::deserializedData);
} }
@Override @Override
@ -207,16 +240,15 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
@Override @Override
public Mono<Tuple2<Metadata, Flux<DataBlock>>> request(URL url) { public Mono<Tuple2<Metadata, Flux<DataBlock>>> request(URL url) {
Mono<Send<Buffer>> urlKeyMono = Mono.fromCallable(() -> url.getSerializer(db.getAllocator()).serialize(url)); Mono<Send<Buffer>> urlKeyMono = Mono.fromCallable(() -> serializeUrl(url));
return Mono return Mono
.using( .using(
() -> url.getSerializer(db.getAllocator()).serialize(url), () -> serializeUrl(url),
key -> fileMetadata.get(null, urlKeyMono), key -> fileMetadata.get(null, urlKeyMono),
Send::close Send::close
) )
.map(serialized -> { .map(serialized -> {
var diskMetadataDeserializationResult = diskMetadataSerializer.deserialize(serialized); var diskMeta = deserializeMetadata(serialized);
var diskMeta = diskMetadataDeserializationResult.deserializedData();
var meta = diskMeta.asMetadata(); var meta = diskMeta.asMetadata();
if (diskMeta.isDownloadedFully()) { if (diskMeta.isDownloadedFully()) {
return Tuples.of(meta, this.requestContent(url)); return Tuples.of(meta, this.requestContent(url));

View File

@ -21,8 +21,10 @@ package org.warp.filesponge;
import io.net5.buffer.api.Buffer; import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator; import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.Send; import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.serialization.BufferDataInput; import it.cavallium.dbengine.database.serialization.BufferDataInputOwned;
import it.cavallium.dbengine.database.serialization.BufferDataInputShared;
import it.cavallium.dbengine.database.serialization.BufferDataOutput; import it.cavallium.dbengine.database.serialization.BufferDataOutput;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.Serializer;
import it.unimi.dsi.fastutil.booleans.BooleanArrayList; import it.unimi.dsi.fastutil.booleans.BooleanArrayList;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
@ -37,19 +39,19 @@ public record DiskMetadata(int size, BooleanArrayList downloadedBlocks) {
// Ensure blocks count is valid by calling getBlocksCount() // Ensure blocks count is valid by calling getBlocksCount()
getBlocksCount(); getBlocksCount();
// It's fully downloaded if every block is true // It's fully downloaded if every block is true
downloadedFullyVal = !this.downloadedBlocks().contains(false); downloadedFullyVal = !this.downloadedBlocks.contains(false);
return downloadedFullyVal; return downloadedFullyVal;
} }
@SuppressWarnings("UnusedReturnValue") @SuppressWarnings("UnusedReturnValue")
private int getBlocksCount() { private int getBlocksCount() {
if (size == -1) { if (size == -1) {
return downloadedBlocks().size(); return downloadedBlocks.size();
} }
var expectedBlocksCount = getBlocksCount(size, FileSponge.BLOCK_SIZE); var expectedBlocksCount = getBlocksCount(size, FileSponge.BLOCK_SIZE);
if (this.downloadedBlocks().size() != expectedBlocksCount) { if (this.downloadedBlocks.size() != expectedBlocksCount) {
throw new IllegalStateException( throw new IllegalStateException(
"Blocks array length (" + this.downloadedBlocks().size() + ") != expected blocks count (" "Blocks array length (" + this.downloadedBlocks.size() + ") != expected blocks count ("
+ expectedBlocksCount + ")"); + expectedBlocksCount + ")");
} }
return expectedBlocksCount; return expectedBlocksCount;
@ -67,24 +69,18 @@ public record DiskMetadata(int size, BooleanArrayList downloadedBlocks) {
} }
public boolean isDownloadedBlock(int id) { public boolean isDownloadedBlock(int id) {
if (size == -1 && downloadedBlocks().size() <= id) { if (size == -1 && downloadedBlocks.size() <= id) {
return false; return false;
} else { } else {
return downloadedBlocks().getBoolean(id); return downloadedBlocks.getBoolean(id);
} }
} }
public static class DiskMetadataSerializer implements Serializer<DiskMetadata> { public static class DiskMetadataSerializer implements Serializer<DiskMetadata> {
private final BufferAllocator allocator;
public DiskMetadataSerializer(BufferAllocator allocator) {
this.allocator = allocator;
}
@Override @Override
public @NotNull DeserializationResult<DiskMetadata> deserialize(@NotNull Send<Buffer> serialized) { public @NotNull DiskMetadata deserialize(@NotNull Buffer serialized) throws SerializationException {
var dis = new BufferDataInput(serialized); var dis = new BufferDataInputShared(serialized);
int size = dis.readInt(); int size = dis.readInt();
int blocksCount; int blocksCount;
if (size == -1) { if (size == -1) {
@ -96,25 +92,25 @@ public record DiskMetadata(int size, BooleanArrayList downloadedBlocks) {
for (int i = 0; i < blocksCount; i++) { for (int i = 0; i < blocksCount; i++) {
downloadedBlocks.add(dis.readBoolean()); downloadedBlocks.add(dis.readBoolean());
} }
return new DeserializationResult<>(new DiskMetadata(size, downloadedBlocks), dis.getReadBytesCount()); return new DiskMetadata(size, downloadedBlocks);
} }
@Override @Override
public @NotNull Send<Buffer> serialize(@NotNull DiskMetadata deserialized) { public void serialize(@NotNull DiskMetadata deserialized, Buffer output) throws SerializationException {
try (var buffer = allocator.allocate(64)) { var dos = new BufferDataOutput(output);
var dos = new BufferDataOutput(buffer); dos.writeInt(deserialized.size);
dos.writeInt(deserialized.size()); var blocksCount = deserialized.getBlocksCount();
if (deserialized.size == -1) { if (deserialized.size == -1) {
dos.writeShort(deserialized.getBlocksCount()); dos.writeShort(blocksCount);
} else {
deserialized.getBlocksCount();
} }
for (boolean downloadedBlock : deserialized.downloadedBlocks()) { for (boolean downloadedBlock : deserialized.downloadedBlocks) {
dos.writeBoolean(downloadedBlock); dos.writeBoolean(downloadedBlock);
} }
return buffer.send();
}
} }
@Override
public int getSerializedSizeHint() {
return Integer.BYTES;
}
} }
} }

View File

@ -23,6 +23,6 @@ import it.cavallium.dbengine.database.serialization.Serializer;
public interface URL { public interface URL {
Serializer<URL> getSerializer(BufferAllocator allocator); Serializer<URL> getSerializer();
} }