Partial refactoring to netty 5

This commit is contained in:
Andrea Cavalli 2021-09-03 02:23:19 +02:00
parent 19b80c3656
commit 59a209cfb5
6 changed files with 119 additions and 83 deletions

View File

@ -104,12 +104,12 @@
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.8</version>
<version>3.4.9</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.4.8</version>
<version>3.4.9</version>
</dependency>
<dependency>
<groupId>it.cavallium</groupId>

View File

@ -19,27 +19,40 @@
package org.warp.filesponge;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.Drop;
import io.netty.buffer.api.Owned;
import io.netty.buffer.api.Send;
import io.netty.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.database.LLEntry;
import java.util.Objects;
public final class DataBlock {
public final class DataBlock extends ResourceSupport<DataBlock, DataBlock> {
public DataBlock(int offset, int length, ByteBuf data) {
try {
public static DataBlock of(int offset, int length, Send<Buffer> data) {
return new DataBlock(offset, length, data, d -> {});
}
private DataBlock(int offset, int length, Send<Buffer> data, Drop<DataBlock> drop) {
super(new DataBlock.CloseOnDrop(drop));
try (data) {
this.offset = offset;
this.length = length;
this.data = data.retain();
} finally {
data.release();
this.data = data.receive();
}
}
private final int offset;
private final int length;
private final ByteBuf data;
private final Buffer data;
public ByteBuf getData() {
assert data.isReadable();
return data.retain();
public Send<Buffer> getData() {
assert data.isAccessible();
return data.copy().send();
}
public Buffer getDataUnsafe() {
return data;
}
public int getId() {
@ -90,11 +103,32 @@ public final class DataBlock {
return "DataBlock(offset=" + this.getOffset() + ", length=" + this.getLength() + ", data=" + this.getData() + ")";
}
public void retain() {
this.data.retain();
@Override
protected RuntimeException createResourceClosedException() {
return new IllegalStateException("Closed");
}
public void release() {
this.data.release();
@Override
protected Owned<DataBlock> prepareSend() {
Send<Buffer> dataSend;
dataSend = this.data.send();
return drop -> new DataBlock(offset, length, dataSend, drop);
}
private static class CloseOnDrop implements Drop<DataBlock> {
private final Drop<DataBlock> delegate;
public CloseOnDrop(Drop<DataBlock> drop) {
this.delegate = drop;
}
@Override
public void drop(DataBlock obj) {
if (obj.data.isAccessible()) {
obj.data.close();
}
delegate.drop(obj);
}
}
}

View File

@ -22,16 +22,21 @@ import static org.warp.filesponge.FileSponge.BLOCK_SIZE;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.CompositeBuffer;
import io.netty.buffer.api.Send;
import io.netty.util.ReferenceCounted;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.LLDatabaseConnection;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.Serializer.DeserializationResult;
import it.unimi.dsi.fastutil.booleans.BooleanArrayList;
import java.util.List;
import java.util.Objects;
@ -77,7 +82,7 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
@Override
public Mono<Void> writeMetadata(URL url, Metadata metadata) {
Mono<ByteBuf> keyMono = Mono.fromCallable(() -> url.getSerializer(db.getAllocator()).serialize(url));
Mono<Send<Buffer>> keyMono = Mono.fromCallable(() -> url.getSerializer(db.getAllocator()).serialize(url));
return fileMetadata
.update(keyMono, oldValue -> Objects.requireNonNullElseGet(oldValue,
() -> diskMetadataSerializer.serialize(new DiskMetadata(metadata.size(),
@ -89,22 +94,23 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
@Override
public Mono<Void> writeContentBlock(URL url, DataBlock dataBlock) {
Mono<ByteBuf> urlKeyMono = Mono.fromCallable(() -> url.getSerializer(db.getAllocator()).serialize(url));
Mono<ByteBuf> blockKeyMono = Mono.fromCallable(() -> getBlockKey(url, dataBlock.getId()));
Mono<Send<Buffer>> urlKeyMono = Mono.fromCallable(() -> url.getSerializer(db.getAllocator()).serialize(url));
Mono<Send<Buffer>> blockKeyMono = Mono.fromCallable(() -> getBlockKey(url, dataBlock.getId()));
return Mono
.fromCallable(dataBlock::getData)
.subscribeOn(Schedulers.boundedElastic())
.flatMap(bytes -> {
Mono<ByteBuf> bytesMono = Mono.just(bytes).map(ByteBuf::retain);
return fileContent
.put(blockKeyMono, bytesMono, LLDictionaryResultType.VOID)
.doOnNext(ReferenceCounted::release)
.then();
})
.flatMap(bytes_ -> Mono.using(
() -> bytes_,
bytes -> fileContent
.put(blockKeyMono, Mono.just(bytes), LLDictionaryResultType.VOID)
.doOnNext(Send::close)
.then(),
Send::close
))
.then(fileMetadata.update(urlKeyMono, prevBytes -> {
@Nullable DiskMetadata result;
if (prevBytes != null) {
DiskMetadata prevMeta = diskMetadataSerializer.deserialize(prevBytes);
DiskMetadata prevMeta = diskMetadataSerializer.deserialize(prevBytes).deserializedData();
if (!prevMeta.isDownloadedBlock(dataBlock.getId())) {
BooleanArrayList bal = prevMeta.downloadedBlocks().clone();
if (prevMeta.size() == -1) {
@ -152,11 +158,11 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
if (!downloaded) {
return Mono.empty();
}
Mono<ByteBuf> blockKeyMono = Mono.fromCallable(() -> getBlockKey(url, blockId));
var blockKeyMono = Mono.fromCallable(() -> getBlockKey(url, blockId));
return fileContent
.get(null, blockKeyMono)
.map(data -> {
try {
.map(dataToReceive -> {
try (var data = dataToReceive.receive()) {
int blockOffset = getBlockOffset(blockId);
int blockLength = data.readableBytes();
if (meta.size() != -1) {
@ -169,20 +175,19 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
assert data.readableBytes() == BLOCK_SIZE;
}
}
return new DataBlock(blockOffset, blockLength, data.retain());
} finally {
data.release();
return DataBlock.of(blockOffset, blockLength, data.send());
}
});
})
);
}
private ByteBuf getBlockKey(URL url, int blockId) throws SerializationException {
ByteBuf urlBytes = url.getSerializer(db.getAllocator()).serialize(url);
ByteBuf blockIdBytes = this.db.getAllocator().directBuffer(Integer.BYTES, Integer.BYTES);
blockIdBytes.writeInt(blockId);
return Unpooled.wrappedBuffer(urlBytes, blockIdBytes);
private Send<Buffer> getBlockKey(URL url, int blockId) throws SerializationException {
try (var urlBytes = url.getSerializer(db.getAllocator()).serialize(url).receive()) {
Buffer blockIdBytes = this.db.getAllocator().allocate(Integer.BYTES);
blockIdBytes.writeInt(blockId);
return LLUtils.compositeBuffer(db.getAllocator(), urlBytes.send(), blockIdBytes.send());
}
}
private static int getBlockOffset(int blockId) {
@ -191,10 +196,11 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
@Override
public Mono<DiskMetadata> requestDiskMetadata(URL url) {
Mono<ByteBuf> urlKeyMono = Mono.fromCallable(() -> url.getSerializer(db.getAllocator()).serialize(url));
Mono<Send<Buffer>> urlKeyMono = Mono.fromCallable(() -> url.getSerializer(db.getAllocator()).serialize(url));
return fileMetadata
.get(null, urlKeyMono)
.map(diskMetadataSerializer::deserialize);
.map(diskMetadataSerializer::deserialize)
.map(DeserializationResult::deserializedData);
}
@Override
@ -205,14 +211,15 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
@Override
public Mono<Tuple2<Metadata, Flux<DataBlock>>> request(URL url) {
Mono<ByteBuf> urlKeyMono = Mono.fromCallable(() -> url.getSerializer(db.getAllocator()).serialize(url));
Mono<Send<Buffer>> urlKeyMono = Mono.fromCallable(() -> url.getSerializer(db.getAllocator()).serialize(url));
return Mono
.using(
() -> url.getSerializer(db.getAllocator()).serialize(url),
key -> fileMetadata.get(null, urlKeyMono),
ReferenceCounted::release
Send::close
)
.map(diskMetadataSerializer::deserialize)
.map(DeserializationResult::deserializedData)
.map(diskMeta -> {
var meta = diskMeta.asMetadata();
if (diskMeta.isDownloadedFully()) {

View File

@ -23,6 +23,11 @@ import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.Send;
import it.cavallium.dbengine.database.serialization.BufferDataInput;
import it.cavallium.dbengine.database.serialization.BufferDataOutput;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.unimi.dsi.fastutil.booleans.BooleanArrayList;
import java.io.ByteArrayInputStream;
@ -80,56 +85,45 @@ public record DiskMetadata(int size, BooleanArrayList downloadedBlocks) {
}
}
public static class DiskMetadataSerializer implements Serializer<DiskMetadata, ByteBuf> {
public static class DiskMetadataSerializer implements Serializer<DiskMetadata> {
private final ByteBufAllocator allocator;
private final BufferAllocator allocator;
public DiskMetadataSerializer(ByteBufAllocator allocator) {
public DiskMetadataSerializer(BufferAllocator allocator) {
this.allocator = allocator;
}
@Override
public @NotNull DiskMetadata deserialize(@NotNull ByteBuf serialized) {
try {
var bais = new ByteBufInputStream(serialized);
var dis = new DataInputStream(bais);
int size = dis.readInt();
int blocksCount;
if (size == -1) {
blocksCount = dis.readShort();
} else {
blocksCount = getBlocksCount(size, FileSponge.BLOCK_SIZE);
}
var downloadedBlocks = new BooleanArrayList(blocksCount);
for (int i = 0; i < blocksCount; i++) {
downloadedBlocks.add(dis.readBoolean());
}
return new DiskMetadata(size, downloadedBlocks);
} catch (IOException e) {
throw new SerializationException(e);
} finally {
serialized.release();
public @NotNull DeserializationResult<DiskMetadata> deserialize(@NotNull Send<Buffer> serialized) {
var dis = new BufferDataInput(serialized);
int size = dis.readInt();
int blocksCount;
if (size == -1) {
blocksCount = dis.readShort();
} else {
blocksCount = getBlocksCount(size, FileSponge.BLOCK_SIZE);
}
var downloadedBlocks = new BooleanArrayList(blocksCount);
for (int i = 0; i < blocksCount; i++) {
downloadedBlocks.add(dis.readBoolean());
}
return new DeserializationResult<>(new DiskMetadata(size, downloadedBlocks), dis.getReadBytesCount());
}
@Override
public @NotNull ByteBuf serialize(@NotNull DiskMetadata deserialized) {
ByteBuf buffer = allocator.buffer();
try (var bos = new ByteBufOutputStream(buffer)) {
try (var dos = new DataOutputStream(bos)) {
dos.writeInt(deserialized.size());
if (deserialized.size == -1) {
dos.writeShort(deserialized.getBlocksCount());
} else {
deserialized.getBlocksCount();
}
for (boolean downloadedBlock : deserialized.downloadedBlocks()) {
dos.writeBoolean(downloadedBlock);
}
public @NotNull Send<Buffer> serialize(@NotNull DiskMetadata deserialized) {
try (var buffer = allocator.allocate(64)) {
var dos = new BufferDataOutput(buffer);
dos.writeInt(deserialized.size());
if (deserialized.size == -1) {
dos.writeShort(deserialized.getBlocksCount());
} else {
deserialized.getBlocksCount();
}
return buffer;
} catch (IOException e) {
throw new SerializationException(e);
for (boolean downloadedBlock : deserialized.downloadedBlocks()) {
dos.writeBoolean(downloadedBlock);
}
return buffer.send();
}
}

View File

@ -101,7 +101,7 @@ public class FileSponge implements URLsHandler {
)
.distinct(DataBlock::getId)
.doOnDiscard(DataBlock.class, DataBlock::release);
.doOnDiscard(DataBlock.class, DataBlock::close);
}
@Override

View File

@ -20,10 +20,11 @@ package org.warp.filesponge;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.api.BufferAllocator;
import it.cavallium.dbengine.database.serialization.Serializer;
public interface URL {
Serializer<URL, ByteBuf> getSerializer(ByteBufAllocator allocator);
Serializer<URL> getSerializer(BufferAllocator allocator);
}