Use netty
This commit is contained in:
parent
1b2268ce52
commit
64cb045d61
@ -18,16 +18,28 @@
|
||||
|
||||
package org.warp.filesponge;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import java.nio.ByteBuffer;
|
||||
import lombok.Value;
|
||||
|
||||
@Value
|
||||
public class DataBlock {
|
||||
|
||||
public DataBlock(int offset, int length, ByteBuf data) {
|
||||
this.offset = offset;
|
||||
assert data.isDirect();
|
||||
this.length = length;
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
int offset;
|
||||
int length;
|
||||
ByteBuffer data;
|
||||
ByteBuf data;
|
||||
|
||||
public ByteBuf getData() {
|
||||
assert data.isReadable();
|
||||
return data;
|
||||
}
|
||||
|
||||
public int getId() {
|
||||
return offset / FileSponge.BLOCK_SIZE;
|
||||
|
@ -21,6 +21,9 @@ package org.warp.filesponge;
|
||||
import static org.warp.filesponge.FileSponge.BLOCK_SIZE;
|
||||
|
||||
import com.google.common.primitives.Ints;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.PooledByteBufAllocator;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import it.cavallium.dbengine.database.Column;
|
||||
import it.cavallium.dbengine.database.LLDatabaseConnection;
|
||||
import it.cavallium.dbengine.database.LLDictionary;
|
||||
@ -34,6 +37,7 @@ import java.util.List;
|
||||
import java.util.Optional;
|
||||
import lombok.AccessLevel;
|
||||
import lombok.AllArgsConstructor;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.warp.filesponge.DiskMetadata.DiskMetadataSerializer;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
@ -52,7 +56,7 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
|
||||
|
||||
public static Mono<DiskCache> open(LLDatabaseConnection databaseConnection, String dbName, boolean lowMemory) {
|
||||
return databaseConnection
|
||||
.getDatabase(dbName, List.of(Column.dictionary("file-content"), Column.dictionary("file-metadata")), lowMemory)
|
||||
.getDatabase(dbName, List.of(Column.dictionary("file-content"), Column.dictionary("file-metadata")), lowMemory, false)
|
||||
.flatMap(db -> Mono.zip(
|
||||
Mono.just(db).single(),
|
||||
db.getDictionary("file-content", UpdateMode.ALLOW).single(),
|
||||
@ -66,15 +70,13 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
|
||||
public Mono<Void> writeMetadata(URL url, Metadata metadata) {
|
||||
return fileMetadata
|
||||
.update(url.getSerializer().serialize(url), oldValue -> {
|
||||
if (oldValue.isPresent()) {
|
||||
if (oldValue != null) {
|
||||
return oldValue;
|
||||
} else {
|
||||
return Optional
|
||||
.of(new DiskMetadata(
|
||||
return diskMetadataSerializer.serialize(new DiskMetadata(
|
||||
metadata.getSize(),
|
||||
BooleanArrayList.wrap(new boolean[DiskMetadata.getBlocksCount(metadata.getSize(), BLOCK_SIZE)])
|
||||
))
|
||||
.map(diskMetadataSerializer::serialize);
|
||||
));
|
||||
}
|
||||
})
|
||||
.then();
|
||||
@ -84,26 +86,36 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
|
||||
public Mono<Void> writeContentBlock(URL url, DataBlock dataBlock) {
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
byte[] bytes = new byte[dataBlock.getLength()];
|
||||
dataBlock.getData().get(bytes);
|
||||
return dataBlock.getData();
|
||||
/*
|
||||
ByteBuf bytes = PooledByteBufAllocator.DEFAULT.directBuffer(dataBlock.getLength());
|
||||
bytes.writeBytes(dataBlock.getData().slice());
|
||||
return bytes;
|
||||
}).subscribeOn(Schedulers.boundedElastic())
|
||||
|
||||
*/
|
||||
})
|
||||
.subscribeOn(Schedulers.boundedElastic())
|
||||
.flatMap(bytes -> fileContent.put(getBlockKey(url, dataBlock.getId()), bytes, LLDictionaryResultType.VOID))
|
||||
.then(fileMetadata
|
||||
.update(url.getSerializer().serialize(url), prevBytes -> prevBytes
|
||||
.map(diskMetadataSerializer::deserialize)
|
||||
.map(prevMeta -> {
|
||||
if (!prevMeta.getDownloadedBlocks().getBoolean(dataBlock.getId())) {
|
||||
BooleanArrayList bal = prevMeta.getDownloadedBlocks().clone();
|
||||
bal.set(dataBlock.getId(), true);
|
||||
return new DiskMetadata(prevMeta.getSize(), bal);
|
||||
} else {
|
||||
return prevMeta;
|
||||
}
|
||||
})
|
||||
.map(diskMetadataSerializer::serialize)
|
||||
)
|
||||
)
|
||||
.then(fileMetadata.update(url.getSerializer().serialize(url), prevBytes -> {
|
||||
@Nullable DiskMetadata result;
|
||||
if (prevBytes != null) {
|
||||
DiskMetadata prevMeta = diskMetadataSerializer.deserialize(prevBytes);
|
||||
if (!prevMeta.getDownloadedBlocks().getBoolean(dataBlock.getId())) {
|
||||
BooleanArrayList bal = prevMeta.getDownloadedBlocks().clone();
|
||||
bal.set(dataBlock.getId(), true);
|
||||
result = new DiskMetadata(prevMeta.getSize(), bal);
|
||||
} else {
|
||||
result = prevMeta;
|
||||
}
|
||||
} else {
|
||||
result = null;
|
||||
}
|
||||
if (result != null) {
|
||||
return diskMetadataSerializer.serialize(result);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}))
|
||||
.then();
|
||||
}
|
||||
|
||||
@ -123,26 +135,25 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
|
||||
}
|
||||
return fileContent.get(null, getBlockKey(url, blockId)).map(data -> {
|
||||
int blockOffset = getBlockOffset(blockId);
|
||||
int blockLength = data.length;
|
||||
int blockLength = data.readableBytes();
|
||||
if (blockOffset + blockLength >= meta.getSize()) {
|
||||
if (blockOffset + blockLength > meta.getSize()) {
|
||||
throw new IllegalStateException("Overflowed data size");
|
||||
}
|
||||
} else {
|
||||
// Intermediate blocks must be of max size
|
||||
assert data.length == BLOCK_SIZE;
|
||||
assert data.readableBytes() == BLOCK_SIZE;
|
||||
}
|
||||
return new DataBlock(blockOffset, blockLength, ByteBuffer.wrap(data, 0, blockLength));
|
||||
return new DataBlock(blockOffset, blockLength, data);
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
private byte[] getBlockKey(URL url, int blockId) {
|
||||
byte[] urlBytes = url.getSerializer().serialize(url);
|
||||
byte[] blockIdBytes = Ints.toByteArray(blockId);
|
||||
byte[] resultBytes = Arrays.copyOf(urlBytes, urlBytes.length + blockIdBytes.length);
|
||||
System.arraycopy(blockIdBytes, 0, resultBytes, urlBytes.length, blockIdBytes.length);
|
||||
return resultBytes;
|
||||
private ByteBuf getBlockKey(URL url, int blockId) {
|
||||
ByteBuf urlBytes = url.getSerializer().serialize(url);
|
||||
ByteBuf blockIdBytes = PooledByteBufAllocator.DEFAULT.directBuffer(Integer.BYTES, Integer.BYTES);
|
||||
blockIdBytes.writeInt(blockId);
|
||||
return Unpooled.wrappedBuffer(urlBytes, blockIdBytes);
|
||||
}
|
||||
|
||||
private static int getBlockOffset(int blockId) {
|
||||
|
@ -18,6 +18,10 @@
|
||||
|
||||
package org.warp.filesponge;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufInputStream;
|
||||
import io.netty.buffer.ByteBufOutputStream;
|
||||
import io.netty.buffer.PooledByteBufAllocator;
|
||||
import it.cavallium.dbengine.database.serialization.Serializer;
|
||||
import it.unimi.dsi.fastutil.booleans.BooleanArrayList;
|
||||
import java.io.ByteArrayInputStream;
|
||||
@ -68,26 +72,31 @@ public class DiskMetadata {
|
||||
return new Metadata(size);
|
||||
}
|
||||
|
||||
public static class DiskMetadataSerializer implements Serializer<DiskMetadata, byte[]> {
|
||||
public static class DiskMetadataSerializer implements Serializer<DiskMetadata, ByteBuf> {
|
||||
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public @NotNull DiskMetadata deserialize(byte @NotNull [] serialized) {
|
||||
var bais = new ByteArrayInputStream(serialized);
|
||||
var dis = new DataInputStream(bais);
|
||||
int size = dis.readInt();
|
||||
int blocksCount = getBlocksCount(size, FileSponge.BLOCK_SIZE);
|
||||
var downloadedBlocks = new BooleanArrayList(blocksCount);
|
||||
for (int i = 0; i < blocksCount; i++) {
|
||||
downloadedBlocks.add(dis.readBoolean());
|
||||
public @NotNull DiskMetadata deserialize(@NotNull ByteBuf serialized) {
|
||||
try {
|
||||
var bais = new ByteBufInputStream(serialized);
|
||||
var dis = new DataInputStream(bais);
|
||||
int size = dis.readInt();
|
||||
int blocksCount = getBlocksCount(size, FileSponge.BLOCK_SIZE);
|
||||
var downloadedBlocks = new BooleanArrayList(blocksCount);
|
||||
for (int i = 0; i < blocksCount; i++) {
|
||||
downloadedBlocks.add(dis.readBoolean());
|
||||
}
|
||||
return new DiskMetadata(size, downloadedBlocks);
|
||||
} finally {
|
||||
serialized.release();
|
||||
}
|
||||
return new DiskMetadata(size, downloadedBlocks);
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public byte @NotNull [] serialize(@NotNull DiskMetadata deserialized) {
|
||||
try (var bos = new ByteArrayOutputStream()) {
|
||||
public @NotNull ByteBuf serialize(@NotNull DiskMetadata deserialized) {
|
||||
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer();
|
||||
try (var bos = new ByteBufOutputStream(buffer)) {
|
||||
try (var dos = new DataOutputStream(bos)) {
|
||||
dos.writeInt(deserialized.getSize());
|
||||
deserialized.getBlocksCount();
|
||||
@ -95,7 +104,7 @@ public class DiskMetadata {
|
||||
dos.writeBoolean(downloadedBlock);
|
||||
}
|
||||
}
|
||||
return bos.toByteArray();
|
||||
return buffer;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -18,10 +18,11 @@
|
||||
|
||||
package org.warp.filesponge;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import it.cavallium.dbengine.database.serialization.Serializer;
|
||||
|
||||
public interface URL {
|
||||
|
||||
Serializer<URL, byte[]> getSerializer();
|
||||
Serializer<URL, ByteBuf> getSerializer();
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user