Update dbengine

This commit is contained in:
Andrea Cavalli 2022-05-20 10:25:59 +02:00
parent 7346376d0e
commit 242bd588c6
2 changed files with 44 additions and 37 deletions

View File

@ -71,9 +71,9 @@ public final class DataBlock extends ResourceSupport<DataBlock, DataBlock> {
private final int length;
private final Buffer data;
public Send<Buffer> getData() {
public Buffer getDataCopy() {
assert data.isAccessible();
return data.copy().send();
return data.copy();
}
public Buffer getDataUnsafe() {
@ -106,8 +106,8 @@ public final class DataBlock extends ResourceSupport<DataBlock, DataBlock> {
if (this.getLength() != other.getLength()) {
return false;
}
final Object this$data = this.getData();
final Object other$data = other.getData();
final Object this$data = this.getDataUnsafe();
final Object other$data = other.getDataUnsafe();
if (!Objects.equals(this$data, other$data)) {
return false;
}
@ -119,13 +119,13 @@ public final class DataBlock extends ResourceSupport<DataBlock, DataBlock> {
int result = 1;
result = result * PRIME + this.getOffset();
result = result * PRIME + this.getLength();
final Object $data = this.getData();
final Object $data = this.getDataUnsafe();
result = result * PRIME + ($data == null ? 43 : $data.hashCode());
return result;
}
public String toString() {
return "DataBlock(offset=" + this.getOffset() + ", length=" + this.getLength() + ", data=" + this.getData() + ")";
return "DataBlock(offset=" + this.getOffset() + ", length=" + this.getLength() + ", data=" + this.getDataUnsafe() + ")";
}
@Override

View File

@ -21,7 +21,9 @@ package org.warp.filesponge;
import static org.warp.filesponge.FileSponge.BLOCK_SIZE;
import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.Resource;
import io.netty5.buffer.api.Send;
import it.cavallium.dbengine.database.BufSupplier;
import it.cavallium.dbengine.database.ColumnUtils;
import it.cavallium.dbengine.database.LLDatabaseConnection;
import it.cavallium.dbengine.database.LLDictionary;
@ -87,45 +89,53 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
// Check if this cache should cache the url, otherwise do nothing
if (!shouldCache.test(url)) return Mono.empty();
Mono<Send<Buffer>> keyMono = Mono.fromCallable(() -> serializeUrl(url));
Mono<Buffer> keyMono = Mono.fromCallable(() -> serializeUrl(url));
return fileMetadata
.update(keyMono,
oldValue -> Objects.requireNonNullElseGet(oldValue,
() -> serializeMetadata(new DiskMetadata(metadata.size(),
BooleanArrayList.wrap(new boolean[DiskMetadata.getBlocksCount(metadata.size(), BLOCK_SIZE)])
)).receive()
))
),
UpdateReturnMode.NOTHING
)
.then();
}
private <T extends URL> Send<Buffer> serializeUrl(T url) {
private <T extends URL> Buffer serializeUrl(T url) {
@SuppressWarnings("unchecked")
URLSerializer<T> urlSerializer = (URLSerializer<T>) url.getSerializer();
int sizeHint = urlSerializer.getSerializedSizeHint();
if (sizeHint == -1) sizeHint = 64;
try (var buffer = db.getAllocator().allocate(sizeHint)) {
var buffer = db.getAllocator().allocate(sizeHint);
try {
try {
urlSerializer.serialize(url, buffer);
} catch (SerializationException ex) {
throw new IllegalStateException("Failed to serialize url", ex);
}
return buffer.send();
return buffer;
} catch (Throwable ex) {
buffer.close();
throw ex;
}
}
private Send<Buffer> serializeMetadata(DiskMetadata diskMetadata) {
private Buffer serializeMetadata(DiskMetadata diskMetadata) {
int sizeHint = diskMetadataSerializer.getSerializedSizeHint();
if (sizeHint == -1) sizeHint = 64;
try (var buffer = db.getAllocator().allocate(sizeHint)) {
var buffer = db.getAllocator().allocate(sizeHint);
try {
try {
diskMetadataSerializer.serialize(diskMetadata, buffer);
} catch (SerializationException ex) {
throw new IllegalStateException("Failed to serialize metadata", ex);
}
return buffer.send();
return buffer;
} catch (Throwable ex) {
buffer.close();
throw ex;
}
}
@ -142,19 +152,16 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
// Check if this cache should cache the url, otherwise do nothing
if (!shouldCache.test(url)) return Mono.empty();
Mono<Send<Buffer>> urlKeyMono = Mono.fromCallable(() -> serializeUrl(url));
Mono<Send<Buffer>> blockKeyMono = Mono.fromCallable(() -> getBlockKey(url, dataBlock.getId()));
return Mono
.fromCallable(dataBlock::getData)
.subscribeOn(Schedulers.boundedElastic())
.flatMap(bytes_ -> Mono.using(
() -> bytes_,
bytes -> fileContent
.put(blockKeyMono, Mono.just(bytes), LLDictionaryResultType.VOID)
.doOnNext(Send::close)
Mono<Buffer> urlKeyMono = Mono.fromCallable(() -> serializeUrl(url));
Mono<Buffer> blockKeyMono = Mono.fromCallable(() -> getBlockKey(url, dataBlock.getId()));
return Mono.using(
() -> BufSupplier.of(dataBlock::getDataCopy),
bufSupplier -> fileContent
.put(blockKeyMono, Mono.fromSupplier(bufSupplier::get), LLDictionaryResultType.VOID)
.doOnNext(Resource::close)
.then(),
Send::close
))
BufSupplier::close
)
.then(fileMetadata.update(urlKeyMono, prevBytes -> {
@Nullable DiskMetadata result;
if (prevBytes != null) {
@ -182,7 +189,7 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
result = null;
}
if (result != null) {
return serializeMetadata(result).receive();
return serializeMetadata(result);
} else {
return null;
}
@ -210,7 +217,7 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
return fileContent
.get(null, blockKeyMono)
.map(dataToReceive -> {
try (var data = dataToReceive.receive()) {
try (var data = dataToReceive) {
int blockOffset = getBlockOffset(blockId);
int blockLength = data.readableBytes();
if (meta.size() != -1) {
@ -230,11 +237,11 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
);
}
private Send<Buffer> getBlockKey(URL url, int blockId) {
try (var urlBytes = serializeUrl(url).receive()) {
private Buffer getBlockKey(URL url, int blockId) {
try (var urlBytes = serializeUrl(url)) {
Buffer blockIdBytes = this.db.getAllocator().allocate(Integer.BYTES);
blockIdBytes.writeInt(blockId);
return LLUtils.compositeBuffer(db.getAllocator(), urlBytes.send(), blockIdBytes.send()).send();
return LLUtils.compositeBuffer(db.getAllocator(), urlBytes.send(), blockIdBytes.send());
}
}
@ -244,11 +251,11 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
@Override
public Mono<DiskMetadata> requestDiskMetadata(URL url) {
Mono<Send<Buffer>> urlKeyMono = Mono.fromCallable(() -> serializeUrl(url));
Mono<Buffer> urlKeyMono = Mono.fromCallable(() -> serializeUrl(url));
return fileMetadata
.get(null, urlKeyMono)
.map(prevBytesSend -> {
try (var prevBytes = prevBytesSend.receive()) {
try (var prevBytes = prevBytesSend) {
return deserializeMetadata(prevBytes);
}
});
@ -262,17 +269,17 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
@Override
public Mono<Tuple2<Metadata, Flux<DataBlock>>> request(URL url) {
Mono<Send<Buffer>> urlKeyMono = Mono.fromCallable(() -> serializeUrl(url));
Mono<Buffer> urlKeyMono = Mono.fromCallable(() -> serializeUrl(url));
return Mono
.using(
() -> serializeUrl(url),
key -> fileMetadata.get(null, urlKeyMono),
Send::close
Resource::close
)
.map(serialized -> {
DiskMetadata diskMeta;
try (var serializedBuf = serialized.receive()) {
diskMeta = deserializeMetadata(serializedBuf);
try (serialized) {
diskMeta = deserializeMetadata(serialized);
}
var meta = diskMeta.asMetadata();
if (diskMeta.isDownloadedFully()) {