Customizable allocators

This commit is contained in:
Andrea Cavalli 2021-05-05 00:08:08 +02:00
parent b8a515acf6
commit e2774d55f2
3 changed files with 25 additions and 10 deletions

View File

@ -46,15 +46,21 @@ import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2; import reactor.util.function.Tuple2;
import reactor.util.function.Tuples; import reactor.util.function.Tuples;
@AllArgsConstructor(access = AccessLevel.PRIVATE)
public class DiskCache implements URLsDiskHandler, URLsWriter { public class DiskCache implements URLsDiskHandler, URLsWriter {
private static final DiskMetadataSerializer diskMetadataSerializer = new DiskMetadataSerializer(); private final DiskMetadataSerializer diskMetadataSerializer;
private final LLKeyValueDatabase db; private final LLKeyValueDatabase db;
private final LLDictionary fileContent; private final LLDictionary fileContent;
private final LLDictionary fileMetadata; private final LLDictionary fileMetadata;
public DiskCache(LLKeyValueDatabase db, LLDictionary fileContent, LLDictionary fileMetadata) {
this.db = db;
this.fileContent = fileContent;
this.fileMetadata = fileMetadata;
this.diskMetadataSerializer = new DiskMetadataSerializer(db.getAllocator());
}
public static Mono<DiskCache> open(LLDatabaseConnection databaseConnection, String dbName, boolean lowMemory) { public static Mono<DiskCache> open(LLDatabaseConnection databaseConnection, String dbName, boolean lowMemory) {
return databaseConnection return databaseConnection
.getDatabase(dbName, List.of(Column.dictionary("file-content"), Column.dictionary("file-metadata")), lowMemory, false) .getDatabase(dbName, List.of(Column.dictionary("file-content"), Column.dictionary("file-metadata")), lowMemory, false)
@ -70,7 +76,7 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
@Override @Override
public Mono<Void> writeMetadata(URL url, Metadata metadata) { public Mono<Void> writeMetadata(URL url, Metadata metadata) {
return fileMetadata return fileMetadata
.update(url.getSerializer().serialize(url), oldValue -> { .update(url.getSerializer(db.getAllocator()).serialize(url), oldValue -> {
if (oldValue != null) { if (oldValue != null) {
return oldValue; return oldValue;
} else { } else {
@ -98,7 +104,7 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
.subscribeOn(Schedulers.boundedElastic()) .subscribeOn(Schedulers.boundedElastic())
.flatMap(bytes -> fileContent.put(getBlockKey(url, dataBlock.getId()), bytes, LLDictionaryResultType.VOID)) .flatMap(bytes -> fileContent.put(getBlockKey(url, dataBlock.getId()), bytes, LLDictionaryResultType.VOID))
.doOnNext(ReferenceCounted::release) .doOnNext(ReferenceCounted::release)
.then(fileMetadata.update(url.getSerializer().serialize(url), prevBytes -> { .then(fileMetadata.update(url.getSerializer(db.getAllocator()).serialize(url), prevBytes -> {
@Nullable DiskMetadata result; @Nullable DiskMetadata result;
if (prevBytes != null) { if (prevBytes != null) {
DiskMetadata prevMeta = diskMetadataSerializer.deserialize(prevBytes); DiskMetadata prevMeta = diskMetadataSerializer.deserialize(prevBytes);
@ -152,8 +158,8 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
} }
private ByteBuf getBlockKey(URL url, int blockId) { private ByteBuf getBlockKey(URL url, int blockId) {
ByteBuf urlBytes = url.getSerializer().serialize(url); ByteBuf urlBytes = url.getSerializer(db.getAllocator()).serialize(url);
ByteBuf blockIdBytes = PooledByteBufAllocator.DEFAULT.directBuffer(Integer.BYTES, Integer.BYTES); ByteBuf blockIdBytes = this.db.getAllocator().directBuffer(Integer.BYTES, Integer.BYTES);
blockIdBytes.writeInt(blockId); blockIdBytes.writeInt(blockId);
return Unpooled.wrappedBuffer(urlBytes, blockIdBytes); return Unpooled.wrappedBuffer(urlBytes, blockIdBytes);
} }
@ -165,7 +171,7 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
@Override @Override
public Mono<DiskMetadata> requestDiskMetadata(URL url) { public Mono<DiskMetadata> requestDiskMetadata(URL url) {
return fileMetadata return fileMetadata
.get(null, url.getSerializer().serialize(url)) .get(null, url.getSerializer(db.getAllocator()).serialize(url))
.map(diskMetadataSerializer::deserialize); .map(diskMetadataSerializer::deserialize);
} }
@ -178,7 +184,7 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
@Override @Override
public Mono<Tuple2<Metadata, Flux<DataBlock>>> request(URL url) { public Mono<Tuple2<Metadata, Flux<DataBlock>>> request(URL url) {
return fileMetadata return fileMetadata
.get(null, url.getSerializer().serialize(url)) .get(null, url.getSerializer(db.getAllocator()).serialize(url))
.map(diskMetadataSerializer::deserialize) .map(diskMetadataSerializer::deserialize)
.map(diskMeta -> { .map(diskMeta -> {
var meta = diskMeta.asMetadata(); var meta = diskMeta.asMetadata();

View File

@ -19,6 +19,7 @@
package org.warp.filesponge; package org.warp.filesponge;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream; import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator;
@ -31,6 +32,7 @@ import java.io.DataOutputStream;
import lombok.Data; import lombok.Data;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.warp.filesponge.DiskMetadata.DiskMetadataSerializer;
@Data @Data
public class DiskMetadata { public class DiskMetadata {
@ -74,6 +76,12 @@ public class DiskMetadata {
public static class DiskMetadataSerializer implements Serializer<DiskMetadata, ByteBuf> { public static class DiskMetadataSerializer implements Serializer<DiskMetadata, ByteBuf> {
private final ByteBufAllocator allocator;
public DiskMetadataSerializer(ByteBufAllocator allocator) {
this.allocator = allocator;
}
@SneakyThrows @SneakyThrows
@Override @Override
public @NotNull DiskMetadata deserialize(@NotNull ByteBuf serialized) { public @NotNull DiskMetadata deserialize(@NotNull ByteBuf serialized) {
@ -95,7 +103,7 @@ public class DiskMetadata {
@SneakyThrows @SneakyThrows
@Override @Override
public @NotNull ByteBuf serialize(@NotNull DiskMetadata deserialized) { public @NotNull ByteBuf serialize(@NotNull DiskMetadata deserialized) {
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(); ByteBuf buffer = allocator.buffer();
try (var bos = new ByteBufOutputStream(buffer)) { try (var bos = new ByteBufOutputStream(buffer)) {
try (var dos = new DataOutputStream(bos)) { try (var dos = new DataOutputStream(bos)) {
dos.writeInt(deserialized.getSize()); dos.writeInt(deserialized.getSize());

View File

@ -19,10 +19,11 @@
package org.warp.filesponge; package org.warp.filesponge;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.Serializer;
public interface URL { public interface URL {
Serializer<URL, ByteBuf> getSerializer(); Serializer<URL, ByteBuf> getSerializer(ByteBufAllocator allocator);
} }