Manage discards

This commit is contained in:
Andrea Cavalli 2021-05-05 17:31:50 +02:00
parent e2774d55f2
commit 3bf2b96892
3 changed files with 94 additions and 33 deletions

View File

@ -19,29 +19,82 @@
package org.warp.filesponge;
import io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer;
import lombok.Value;
import java.util.Objects;
@Value
public class DataBlock {
public final class DataBlock {
public DataBlock(int offset, int length, ByteBuf data) {
this.offset = offset;
assert data.isDirect();
this.length = length;
this.data = data;
try {
this.offset = offset;
this.length = length;
this.data = data.retain();
} finally {
data.release();
}
}
int offset;
int length;
ByteBuf data;
private final int offset;
private final int length;
private final ByteBuf data;
public ByteBuf getData() {
assert data.isReadable();
return data;
return data.retain();
}
public int getId() {
return offset / FileSponge.BLOCK_SIZE;
}
public int getOffset() {
return this.offset;
}
public int getLength() {
return this.length;
}
public boolean equals(final Object o) {
if (o == this) {
return true;
}
if (!(o instanceof DataBlock)) {
return false;
}
final DataBlock other = (DataBlock) o;
if (this.getOffset() != other.getOffset()) {
return false;
}
if (this.getLength() != other.getLength()) {
return false;
}
final Object this$data = this.getData();
final Object other$data = other.getData();
if (!Objects.equals(this$data, other$data)) {
return false;
}
return true;
}
public int hashCode() {
final int PRIME = 59;
int result = 1;
result = result * PRIME + this.getOffset();
result = result * PRIME + this.getLength();
final Object $data = this.getData();
result = result * PRIME + ($data == null ? 43 : $data.hashCode());
return result;
}
public String toString() {
return "DataBlock(offset=" + this.getOffset() + ", length=" + this.getLength() + ", data=" + this.getData() + ")";
}
public void retain() {
this.data.retain();
}
public void release() {
this.data.release();
}
}

View File

@ -92,18 +92,13 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
@Override
public Mono<Void> writeContentBlock(URL url, DataBlock dataBlock) {
return Mono
.fromCallable(() -> {
return dataBlock.getData();
/*
ByteBuf bytes = PooledByteBufAllocator.DEFAULT.directBuffer(dataBlock.getLength());
bytes.writeBytes(dataBlock.getData().slice());
return bytes;
*/
})
.fromCallable(dataBlock::getData)
.subscribeOn(Schedulers.boundedElastic())
.flatMap(bytes -> fileContent.put(getBlockKey(url, dataBlock.getId()), bytes, LLDictionaryResultType.VOID))
.doOnNext(ReferenceCounted::release)
.flatMap(bytes -> fileContent
.put(getBlockKey(url, dataBlock.getId()), bytes, LLDictionaryResultType.VOID)
.doOnNext(ReferenceCounted::release)
.then()
)
.then(fileMetadata.update(url.getSerializer(db.getAllocator()).serialize(url), prevBytes -> {
@Nullable DiskMetadata result;
if (prevBytes != null) {
@ -142,17 +137,21 @@ public class DiskCache implements URLsDiskHandler, URLsWriter {
return Mono.<DataBlock>empty();
}
return fileContent.get(null, getBlockKey(url, blockId)).map(data -> {
int blockOffset = getBlockOffset(blockId);
int blockLength = data.readableBytes();
if (blockOffset + blockLength >= meta.getSize()) {
if (blockOffset + blockLength > meta.getSize()) {
throw new IllegalStateException("Overflowed data size");
try {
int blockOffset = getBlockOffset(blockId);
int blockLength = data.readableBytes();
if (blockOffset + blockLength >= meta.getSize()) {
if (blockOffset + blockLength > meta.getSize()) {
throw new IllegalStateException("Overflowed data size");
}
} else {
// Intermediate blocks must be of max size
assert data.readableBytes() == BLOCK_SIZE;
}
} else {
// Intermediate blocks must be of max size
assert data.readableBytes() == BLOCK_SIZE;
return new DataBlock(blockOffset, blockLength, data.retain());
} finally {
data.release();
}
return new DataBlock(blockOffset, blockLength, data);
});
}));
}

View File

@ -18,6 +18,7 @@
package org.warp.filesponge;
import it.cavallium.dbengine.database.disk.LLLocalDictionary.ReleasableSlice;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
@ -25,6 +26,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class FileSponge implements URLsHandler {
@ -59,6 +61,7 @@ public class FileSponge implements URLsHandler {
.fromIterable(cacheAccess)
.map(urlsHandler -> urlsHandler.requestContent(url))
.collectList()
.doOnDiscard(DataBlock.class, DataBlock::release)
.flatMapMany(monos -> FileSpongeUtils.firstWithValueFlux(monos))
.doOnNext(dataBlock -> {
if (alreadyPrintedDebug.compareAndSet(false, true)) {
@ -77,10 +80,16 @@ public class FileSponge implements URLsHandler {
)
)
.collectList()
.doOnDiscard(Flux.class, f -> {
//noinspection unchecked
Flux<DataBlock> flux = (Flux<DataBlock>) f;
flux.doOnNext(DataBlock::release).subscribeOn(Schedulers.single()).subscribe();
})
.flatMapMany(monos -> FileSpongeUtils.firstWithValueFlux(monos))
.doOnComplete(() -> logger.debug("Downloaded file \"{}\" content", url))
)
.distinct(DataBlock::getId);
.distinct(DataBlock::getId)
.doOnDiscard(DataBlock.class, DataBlock::release);
}
@Override