Code cleanup

This commit is contained in:
Andrea Cavalli 2021-08-22 18:20:52 +02:00
parent 952b31daa0
commit 52cd3a235d
1 changed files with 58 additions and 75 deletions

View File

@ -33,6 +33,7 @@ import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.client.DatabaseOptions; import it.cavallium.dbengine.client.DatabaseOptions;
import it.unimi.dsi.fastutil.booleans.BooleanArrayList; import it.unimi.dsi.fastutil.booleans.BooleanArrayList;
import java.util.List; import java.util.List;
import java.util.Objects;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.warp.filesponge.DiskMetadata.DiskMetadataSerializer; import org.warp.filesponge.DiskMetadata.DiskMetadataSerializer;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
@ -75,84 +76,70 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
@Override @Override
public Mono<Void> writeMetadata(URL url, Metadata metadata) { public Mono<Void> writeMetadata(URL url, Metadata metadata) {
return Mono Mono<ByteBuf> keyMono = Mono.fromCallable(() -> url.getSerializer(db.getAllocator()).serialize(url));
.using( return fileMetadata
() -> url.getSerializer(db.getAllocator()).serialize(url), .update(keyMono, oldValue -> Objects.requireNonNullElseGet(oldValue,
key -> fileMetadata () -> diskMetadataSerializer.serialize(new DiskMetadata(metadata.size(),
.update(key.retain(), oldValue -> { BooleanArrayList.wrap(new boolean[DiskMetadata.getBlocksCount(metadata.size(), BLOCK_SIZE)])
if (oldValue != null) { ))
return oldValue; ), UpdateReturnMode.NOTHING)
} else {
return diskMetadataSerializer.serialize(new DiskMetadata(
metadata.size(),
BooleanArrayList.wrap(new boolean[DiskMetadata.getBlocksCount(metadata.size(), BLOCK_SIZE)])
));
}
}, UpdateReturnMode.NOTHING),
ReferenceCounted::release
)
.then(); .then();
} }
@Override @Override
public Mono<Void> writeContentBlock(URL url, DataBlock dataBlock) { public Mono<Void> writeContentBlock(URL url, DataBlock dataBlock) {
Mono<ByteBuf> urlKeyMono = Mono.fromCallable(() -> url.getSerializer(db.getAllocator()).serialize(url));
Mono<ByteBuf> blockKeyMono = Mono.fromCallable(() -> getBlockKey(url, dataBlock.getId()));
return Mono return Mono
.fromCallable(dataBlock::getData) .fromCallable(dataBlock::getData)
.subscribeOn(Schedulers.boundedElastic()) .subscribeOn(Schedulers.boundedElastic())
.flatMap(bytes -> Mono .flatMap(bytes -> {
.using( Mono<ByteBuf> bytesMono = Mono.just(bytes).map(ByteBuf::retain);
() -> getBlockKey(url, dataBlock.getId()), return fileContent
key -> fileContent .put(blockKeyMono, bytesMono, LLDictionaryResultType.VOID)
.put(key.retain(), bytes, LLDictionaryResultType.VOID), .doOnNext(ReferenceCounted::release)
ReferenceCounted::release .then();
) })
.doOnNext(ReferenceCounted::release) .then(fileMetadata.update(urlKeyMono, prevBytes -> {
.then() @Nullable DiskMetadata result;
) if (prevBytes != null) {
.then(Mono DiskMetadata prevMeta = diskMetadataSerializer.deserialize(prevBytes);
.using( if (!prevMeta.isDownloadedBlock(dataBlock.getId())) {
() -> url.getSerializer(db.getAllocator()).serialize(url), BooleanArrayList bal = prevMeta.downloadedBlocks().clone();
key -> fileMetadata.update(key.retain(), prevBytes -> { if (prevMeta.size() == -1) {
@Nullable DiskMetadata result; if (bal.size() > dataBlock.getId()) {
if (prevBytes != null) { bal.set(dataBlock.getId(), true);
DiskMetadata prevMeta = diskMetadataSerializer.deserialize(prevBytes); } else if (bal.size() == dataBlock.getId()) {
if (!prevMeta.isDownloadedBlock(dataBlock.getId())) { bal.add(true);
BooleanArrayList bal = prevMeta.downloadedBlocks().clone();
if (prevMeta.size() == -1) {
if (bal.size() > dataBlock.getId()) {
bal.set(dataBlock.getId(), true);
} else if (bal.size() == dataBlock.getId()) {
bal.add(true);
} else {
throw new IndexOutOfBoundsException(
"Trying to write a block too much far from the last block. Previous total blocks: "
+ bal.size() + " Current block id: " + dataBlock.getId());
}
} else {
bal.set(dataBlock.getId(), true);
}
result = new DiskMetadata(prevMeta.size(), bal);
} else { } else {
result = prevMeta; throw new IndexOutOfBoundsException(
"Trying to write a block too much far from the last block. Previous total blocks: "
+ bal.size() + " Current block id: " + dataBlock.getId());
} }
} else { } else {
result = null; bal.set(dataBlock.getId(), true);
} }
if (result != null) { result = new DiskMetadata(prevMeta.size(), bal);
return diskMetadataSerializer.serialize(result); } else {
} else { result = prevMeta;
return null; }
} } else {
}, UpdateReturnMode.NOTHING), result = null;
ReferenceCounted::release }
) if (result != null) {
return diskMetadataSerializer.serialize(result);
} else {
return null;
}
}, UpdateReturnMode.NOTHING)
) )
.then(); .then();
} }
@Override @Override
public Flux<DataBlock> requestContent(URL url) { public Flux<DataBlock> requestContent(URL url) {
return requestDiskMetadata(url) return this
.requestDiskMetadata(url)
.filter(DiskMetadata::isDownloadedFully) .filter(DiskMetadata::isDownloadedFully)
.flatMapMany(meta -> Flux.fromStream(meta.downloadedBlocks()::stream) .flatMapMany(meta -> Flux.fromStream(meta.downloadedBlocks()::stream)
.index() .index()
@ -162,14 +149,11 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
int blockId = Math.toIntExact(blockMeta.getT1()); int blockId = Math.toIntExact(blockMeta.getT1());
boolean downloaded = blockMeta.getT2(); boolean downloaded = blockMeta.getT2();
if (!downloaded) { if (!downloaded) {
return Mono.<DataBlock>empty(); return Mono.empty();
} }
return Mono Mono<ByteBuf> blockKeyMono = Mono.fromCallable(() -> getBlockKey(url, blockId));
.using( return fileContent
() -> getBlockKey(url, blockId), .get(null, blockKeyMono)
key -> fileContent.get(null, key.retain()),
ReferenceCounted::release
)
.map(data -> { .map(data -> {
try { try {
int blockOffset = getBlockOffset(blockId); int blockOffset = getBlockOffset(blockId);
@ -189,7 +173,8 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
data.release(); data.release();
} }
}); });
})); })
);
} }
private ByteBuf getBlockKey(URL url, int blockId) { private ByteBuf getBlockKey(URL url, int blockId) {
@ -205,12 +190,9 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
@Override @Override
public Mono<DiskMetadata> requestDiskMetadata(URL url) { public Mono<DiskMetadata> requestDiskMetadata(URL url) {
return Mono Mono<ByteBuf> urlKeyMono = Mono.fromCallable(() -> url.getSerializer(db.getAllocator()).serialize(url));
.using( return fileMetadata
() -> url.getSerializer(db.getAllocator()).serialize(url), .get(null, urlKeyMono)
key -> fileMetadata.get(null, key.retain()),
ReferenceCounted::release
)
.map(diskMetadataSerializer::deserialize); .map(diskMetadataSerializer::deserialize);
} }
@ -222,10 +204,11 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
@Override @Override
public Mono<Tuple2<Metadata, Flux<DataBlock>>> request(URL url) { public Mono<Tuple2<Metadata, Flux<DataBlock>>> request(URL url) {
Mono<ByteBuf> urlKeyMono = Mono.fromCallable(() -> url.getSerializer(db.getAllocator()).serialize(url));
return Mono return Mono
.using( .using(
() -> url.getSerializer(db.getAllocator()).serialize(url), () -> url.getSerializer(db.getAllocator()).serialize(url),
key -> fileMetadata.get(null, key.retain()), key -> fileMetadata.get(null, urlKeyMono),
ReferenceCounted::release ReferenceCounted::release
) )
.map(diskMetadataSerializer::deserialize) .map(diskMetadataSerializer::deserialize)