Remove unused code and netty 5

This commit is contained in:
Andrea Cavalli 2023-02-28 23:11:07 +01:00
parent 5f50b0bd05
commit 256e3daf84
10 changed files with 381 additions and 367 deletions

View File

@ -122,7 +122,7 @@
<dependency> <dependency>
<groupId>it.cavallium</groupId> <groupId>it.cavallium</groupId>
<artifactId>dbengine</artifactId> <artifactId>dbengine</artifactId>
<version>3.0.0-SNAPSHOT</version> <version>4.0.0-SNAPSHOT</version>
</dependency> </dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>

View File

@ -1,11 +1,9 @@
module filesponge { module filesponge {
requires io.netty5.buffer;
requires org.apache.logging.log4j; requires org.apache.logging.log4j;
requires dbengine; requires dbengine;
requires it.unimi.dsi.fastutil; requires it.unimi.dsi.fastutil;
requires org.jetbrains.annotations; requires org.jetbrains.annotations;
requires reactor.core; requires reactor.core;
requires org.reactivestreams; requires org.reactivestreams;
requires io.netty5.common;
exports org.warp.filesponge; exports org.warp.filesponge;
} }

View File

@ -20,65 +20,31 @@ package org.warp.filesponge;
import static java.lang.Math.toIntExact; import static java.lang.Math.toIntExact;
import io.netty5.buffer.Buffer; import it.cavallium.dbengine.buffers.Buf;
import io.netty5.buffer.Drop; import java.nio.Buffer;
import io.netty5.buffer.Owned;
import io.netty5.util.Send;
import io.netty5.buffer.internal.ResourceSupport;
import java.util.Objects; import java.util.Objects;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
public final class DataBlock extends ResourceSupport<DataBlock, DataBlock> { public final class DataBlock {
private static final Logger logger = LogManager.getLogger(DataBlock.class); public static DataBlock EMPTY = new DataBlock(-1, -1, null);
private static final Drop<DataBlock> DROP = new Drop<>() { public static DataBlock of(long offset, int length, Buf data) {
@Override
public void drop(DataBlock obj) {
try {
if (obj.data != null) {
obj.data.close();
}
} catch (Throwable ex) {
logger.error("Failed to close data", ex);
}
}
@Override
public Drop<DataBlock> fork() {
return this;
}
@Override
public void attach(DataBlock obj) {
}
};
public static DataBlock of(long offset, int length, Send<Buffer> data) {
return new DataBlock(offset, length, data); return new DataBlock(offset, length, data);
} }
private DataBlock(long offset, int length, Send<Buffer> data) { private DataBlock(long offset, int length, Buf data) {
super(DROP);
try (data) {
this.offset = offset; this.offset = offset;
this.length = length; this.length = length;
this.data = data.receive(); this.data = data;
}
} }
private final long offset; private final long offset;
private final int length; private final int length;
private final Buffer data; private final Buf data;
public Buffer getDataCopy() { public Buf getData() {
assert data.isAccessible();
return data.copy();
}
public Buffer getDataUnsafe() {
return data; return data;
} }
@ -98,22 +64,18 @@ public final class DataBlock extends ResourceSupport<DataBlock, DataBlock> {
if (o == this) { if (o == this) {
return true; return true;
} }
if (!(o instanceof DataBlock)) { if (!(o instanceof final DataBlock other)) {
return false; return false;
} }
final DataBlock other = (DataBlock) o;
if (this.getOffset() != other.getOffset()) { if (this.getOffset() != other.getOffset()) {
return false; return false;
} }
if (this.getLength() != other.getLength()) { if (this.getLength() != other.getLength()) {
return false; return false;
} }
final Object this$data = this.getDataUnsafe(); final Object this$data = this.getData();
final Object other$data = other.getDataUnsafe(); final Object other$data = other.getData();
if (!Objects.equals(this$data, other$data)) { return Objects.equals(this$data, other$data);
return false;
}
return true;
} }
public int hashCode() { public int hashCode() {
@ -122,28 +84,13 @@ public final class DataBlock extends ResourceSupport<DataBlock, DataBlock> {
long offset = this.getOffset(); long offset = this.getOffset();
result = result * PRIME + (int) (offset ^ (offset >>> 32)); result = result * PRIME + (int) (offset ^ (offset >>> 32));
result = result * PRIME + this.getLength(); result = result * PRIME + this.getLength();
final Object $data = this.getDataUnsafe(); final Object $data = this.getData();
result = result * PRIME + ($data == null ? 43 : $data.hashCode()); result = result * PRIME + ($data == null ? 43 : $data.hashCode());
return result; return result;
} }
public String toString() { public String toString() {
return "DataBlock(offset=" + this.getOffset() + ", length=" + this.getLength() + ", data=" + this.getDataUnsafe() + ")"; return "DataBlock(offset=" + this.getOffset() + ", length=" + this.getLength() + ", data=" + this.getData() + ")";
} }
@Override
protected RuntimeException createResourceClosedException() {
return new IllegalStateException("Closed");
}
@Override
protected Owned<DataBlock> prepareSend() {
Send<Buffer> dataSend;
dataSend = this.data.send();
return drop -> {
var instance = new DataBlock(offset, length, dataSend);
drop.attach(instance);
return instance;
};
}
} }

View File

@ -1,6 +1,6 @@
/* /*
* FileSponge * FileSponge
* Copyright (C) 2021 Andrea Cavalli * Copyright (C) 2023 Andrea Cavalli
* *
* This program is free software: you can redistribute it and/or modify * This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by * it under the terms of the GNU General Public License as published by
@ -18,296 +18,50 @@
package org.warp.filesponge; package org.warp.filesponge;
import static java.lang.Math.toIntExact;
import static org.warp.filesponge.FileSponge.BLOCK_SIZE;
import io.netty5.buffer.Buffer;
import io.netty5.util.Resource;
import io.netty5.util.Send;
import it.cavallium.dbengine.client.IBackuppable; import it.cavallium.dbengine.client.IBackuppable;
import it.cavallium.dbengine.database.BufSupplier;
import it.cavallium.dbengine.database.ColumnUtils; import it.cavallium.dbengine.database.ColumnUtils;
import it.cavallium.dbengine.database.LLDatabaseConnection; import it.cavallium.dbengine.database.LLDatabaseConnection;
import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLKeyValueDatabase; import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.rpc.current.data.DatabaseOptions; import it.cavallium.dbengine.rpc.current.data.DatabaseOptions;
import it.unimi.dsi.fastutil.booleans.BooleanArrayList;
import java.util.List; import java.util.List;
import java.util.Objects;
import java.util.function.Predicate; import java.util.function.Predicate;
import org.jetbrains.annotations.Nullable; import java.util.stream.Stream;
import org.warp.filesponge.DiskMetadata.DiskMetadataSerializer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2; import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
public class DiskCache implements URLsDiskHandler, URLsWriter, IBackuppable { public interface DiskCache extends URLsDiskHandler, URLsWriter, IBackuppable, SafeCloseable {
private final DiskMetadataSerializer diskMetadataSerializer; void writeMetadataSync(URL url, Metadata metadata);
private final LLKeyValueDatabase db; void writeContentBlockSync(URL url, DataBlock dataBlock);
private final LLDictionary fileContent;
private final LLDictionary fileMetadata;
private final Predicate<URL> shouldCache;
public DiskCache(LLKeyValueDatabase db, Stream<DataBlock> requestContentSync(URL url);
LLDictionary fileContent,
LLDictionary fileMetadata,
Predicate<URL> shouldCache) {
this.db = db;
this.fileContent = fileContent;
this.fileMetadata = fileMetadata;
this.diskMetadataSerializer = new DiskMetadataSerializer();
this.shouldCache = shouldCache;
}
public static Mono<DiskCache> open(LLDatabaseConnection databaseConnection, DiskMetadata requestDiskMetadataSync(URL url);
Metadata requestMetadataSync(URL url);
Tuple2<Metadata, Stream<DataBlock>> requestSync(URL url);
static DiskCache open(LLDatabaseConnection databaseConnection,
String dbName, String dbName,
DatabaseOptions databaseOptions, DatabaseOptions databaseOptions,
Predicate<URL> shouldCache) { Predicate<URL> shouldCache) {
return databaseConnection var db = databaseConnection.getDatabase(dbName,
.getDatabase(dbName,
List.of(ColumnUtils.dictionary("file-content"), ColumnUtils.dictionary("file-metadata")), List.of(ColumnUtils.dictionary("file-content"), ColumnUtils.dictionary("file-metadata")),
databaseOptions databaseOptions
)
.flatMap(db -> Mono.zip(
Mono.just(db).single(),
db.getDictionary("file-content", UpdateMode.ALLOW).single(),
db.getDictionary("file-metadata", UpdateMode.ALLOW).single()
))
.map(tuple -> new DiskCache(tuple.getT1(), tuple.getT2(), tuple.getT3(), shouldCache))
.single();
}
@Override
public Mono<Void> writeMetadata(URL url, Metadata metadata) {
// Check if this cache should cache the url, otherwise do nothing
if (!shouldCache.test(url)) return Mono.empty();
Mono<Buffer> keyMono = Mono.fromCallable(() -> serializeUrl(url));
return fileMetadata
.update(keyMono,
oldValue -> Objects.requireNonNullElseGet(oldValue,
() -> serializeMetadata(new DiskMetadata(metadata.size(),
BooleanArrayList.wrap(new boolean[DiskMetadata.getBlocksCount(metadata.size(), BLOCK_SIZE)])
))
),
UpdateReturnMode.NOTHING
)
.then();
}
private <T extends URL> Buffer serializeUrl(T url) {
@SuppressWarnings("unchecked")
URLSerializer<T> urlSerializer = (URLSerializer<T>) url.getSerializer();
int sizeHint = urlSerializer.getSerializedSizeHint();
if (sizeHint == -1) sizeHint = 64;
var buffer = db.getAllocator().allocate(sizeHint);
try {
try {
urlSerializer.serialize(url, buffer);
} catch (SerializationException ex) {
throw new IllegalStateException("Failed to serialize url", ex);
}
return buffer;
} catch (Throwable ex) {
buffer.close();
throw ex;
}
}
private Buffer serializeMetadata(DiskMetadata diskMetadata) {
int sizeHint = diskMetadataSerializer.getSerializedSizeHint();
if (sizeHint == -1) sizeHint = 64;
var buffer = db.getAllocator().allocate(sizeHint);
try {
try {
diskMetadataSerializer.serialize(diskMetadata, buffer);
} catch (SerializationException ex) {
throw new IllegalStateException("Failed to serialize metadata", ex);
}
return buffer;
} catch (Throwable ex) {
buffer.close();
throw ex;
}
}
private DiskMetadata deserializeMetadata(Buffer prevBytes) {
try {
return diskMetadataSerializer.deserialize(prevBytes);
} catch (SerializationException ex) {
throw new IllegalStateException("Failed to deserialize metadata", ex);
}
}
@Override
public Mono<Void> writeContentBlock(URL url, DataBlock dataBlock) {
// Check if this cache should cache the url, otherwise do nothing
if (!shouldCache.test(url)) return Mono.empty();
Mono<Buffer> urlKeyMono = Mono.fromCallable(() -> serializeUrl(url));
Mono<Buffer> blockKeyMono = Mono.fromCallable(() -> getBlockKey(url, dataBlock.getId()));
return Mono.using(
() -> BufSupplier.of(dataBlock::getDataCopy),
bufSupplier -> fileContent
.put(blockKeyMono, Mono.fromSupplier(bufSupplier::get), LLDictionaryResultType.VOID)
.doOnNext(Resource::close)
.then(),
BufSupplier::close
)
.then(fileMetadata.update(urlKeyMono, prevBytes -> {
@Nullable DiskMetadata result;
if (prevBytes != null) {
DiskMetadata prevMeta = deserializeMetadata(prevBytes);
if (!prevMeta.isDownloadedBlock(dataBlock.getId())) {
BooleanArrayList bal = prevMeta.downloadedBlocks().clone();
if (prevMeta.size() == -1) {
if (bal.size() > dataBlock.getId()) {
bal.set(dataBlock.getId(), true);
} else if (bal.size() == dataBlock.getId()) {
bal.add(true);
} else {
throw new IndexOutOfBoundsException(
"Trying to write a block too much far from the last block. Previous total blocks: "
+ bal.size() + " Current block id: " + dataBlock.getId());
}
} else {
bal.set(dataBlock.getId(), true);
}
result = new DiskMetadata(prevMeta.size(), bal);
} else {
result = prevMeta;
}
} else {
result = null;
}
if (result != null) {
return serializeMetadata(result);
} else {
return null;
}
}, UpdateReturnMode.NOTHING)
)
.then();
}
@Override
public Flux<DataBlock> requestContent(URL url) {
return this
.requestDiskMetadata(url)
.filter(DiskMetadata::isDownloadedFully)
.flatMapMany(meta -> Flux.fromStream(meta.downloadedBlocks()::stream)
.index()
// Get only downloaded blocks
.filter(Tuple2::getT2)
.flatMapSequential(blockMeta -> {
int blockId = toIntExact(blockMeta.getT1());
boolean downloaded = blockMeta.getT2();
if (!downloaded) {
return Mono.empty();
}
var blockKeyMono = Mono.fromCallable(() -> getBlockKey(url, blockId));
return fileContent
.get(null, blockKeyMono)
.map(data -> {
try (data) {
long blockOffset = getBlockOffset(blockId);
int blockLength = data.readableBytes();
if (meta.size() != -1) {
if (blockOffset + blockLength >= meta.size()) {
if (blockOffset + blockLength > meta.size()) {
throw new IllegalStateException("Overflowed data size");
}
} else {
// Intermediate blocks must be of max size
assert data.readableBytes() == BLOCK_SIZE;
}
}
return DataBlock.of(blockOffset, blockLength, data.send());
}
});
})
); );
var dict1 = db.getDictionary("file-content", UpdateMode.ALLOW);
var dict2 = db.getDictionary("file-metadata", UpdateMode.ALLOW);
return new DiskCacheImpl(db, dict1, dict2, shouldCache);
} }
private Buffer getBlockKey(URL url, int blockId) { static DiskCache openCustom(LLKeyValueDatabase db,
try (var urlBytes = serializeUrl(url)) { LLDictionary fileContent,
Buffer blockIdBytes = this.db.getAllocator().allocate(Integer.BYTES); LLDictionary fileMetadata,
blockIdBytes.writeInt(blockId); Predicate<URL> shouldCache) {
return LLUtils.compositeBuffer(db.getAllocator(), urlBytes.send(), blockIdBytes.send()); return new DiskCacheImpl(db, fileContent, fileMetadata, shouldCache);
}
}
private static long getBlockOffset(int blockId) {
return blockId * (long) BLOCK_SIZE;
}
@Override
public Mono<DiskMetadata> requestDiskMetadata(URL url) {
Mono<Buffer> urlKeyMono = Mono.fromCallable(() -> serializeUrl(url));
return fileMetadata
.get(null, urlKeyMono)
.map(prevBytes -> {
try (prevBytes) {
return deserializeMetadata(prevBytes);
}
});
}
@Override
public Mono<Metadata> requestMetadata(URL url) {
return requestDiskMetadata(url)
.map(DiskMetadata::asMetadata);
}
@Override
public Mono<Tuple2<Metadata, Flux<DataBlock>>> request(URL url) {
Mono<Buffer> urlKeyMono = Mono.fromCallable(() -> serializeUrl(url));
return Mono
.using(
() -> serializeUrl(url),
key -> fileMetadata.get(null, urlKeyMono),
Resource::close
)
.map(serialized -> {
DiskMetadata diskMeta;
try (serialized) {
diskMeta = deserializeMetadata(serialized);
}
var meta = diskMeta.asMetadata();
if (diskMeta.isDownloadedFully()) {
return Tuples.of(meta, this.requestContent(url));
} else {
return Tuples.of(meta, Flux.empty());
}
});
}
public Mono<Void> close() {
return db.close();
}
@Override
public Mono<Void> pauseForBackup() {
return db.pauseForBackup();
}
@Override
public Mono<Void> resumeAfterBackup() {
return db.resumeAfterBackup();
}
@Override
public boolean isPaused() {
return db.isPaused();
} }
} }

View File

@ -0,0 +1,325 @@
/*
* FileSponge
* Copyright (C) 2021 Andrea Cavalli
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.warp.filesponge;
import static java.lang.Math.toIntExact;
import static org.warp.filesponge.FileSponge.BLOCK_SIZE;
import it.cavallium.dbengine.buffers.Buf;
import it.cavallium.dbengine.buffers.BufDataInput;
import it.cavallium.dbengine.buffers.BufDataOutput;
import it.cavallium.dbengine.client.IBackuppable;
import it.cavallium.dbengine.database.ColumnUtils;
import it.cavallium.dbengine.database.LLDatabaseConnection;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.rpc.current.data.DatabaseOptions;
import it.cavallium.dbengine.utils.StreamUtils;
import it.unimi.dsi.fastutil.booleans.BooleanArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.jetbrains.annotations.Nullable;
import org.warp.filesponge.DiskMetadata.DiskMetadataSerializer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
class DiskCacheImpl implements DiskCache {
private final DiskMetadataSerializer diskMetadataSerializer;
private final LLKeyValueDatabase db;
private final LLDictionary fileContent;
private final LLDictionary fileMetadata;
private final Predicate<URL> shouldCache;
public DiskCacheImpl(LLKeyValueDatabase db,
LLDictionary fileContent,
LLDictionary fileMetadata,
Predicate<URL> shouldCache) {
this.db = db;
this.fileContent = fileContent;
this.fileMetadata = fileMetadata;
this.diskMetadataSerializer = new DiskMetadataSerializer();
this.shouldCache = shouldCache;
}
@Override
public Mono<Void> writeMetadata(URL url, Metadata metadata) {
return Mono.<Void>fromRunnable(() -> writeMetadataSync(url, metadata)).subscribeOn(Schedulers.boundedElastic());
}
@Override
public void writeMetadataSync(URL url, Metadata metadata) {
// Check if this cache should cache the url, otherwise do nothing
if (!shouldCache.test(url)) return;
var key = serializeUrl(url);
fileMetadata.update(key, oldValue -> {
if (oldValue != null) {
return oldValue;
} else {
return serializeMetadata(new DiskMetadata(metadata.size(),
BooleanArrayList.wrap(new boolean[DiskMetadata.getBlocksCount(metadata.size(), BLOCK_SIZE)])
));
}
}, UpdateReturnMode.NOTHING);
}
private <T extends URL> Buf serializeUrl(T url) {
@SuppressWarnings("unchecked")
URLSerializer<T> urlSerializer = (URLSerializer<T>) url.getSerializer();
int sizeHint = urlSerializer.getSerializedSizeHint();
if (sizeHint == -1) sizeHint = 64;
var output = BufDataOutput.create(sizeHint);
try {
urlSerializer.serialize(url, output);
} catch (SerializationException ex) {
throw new IllegalStateException("Failed to serialize url", ex);
}
return output.asList();
}
private Buf serializeMetadata(DiskMetadata diskMetadata) {
int sizeHint = diskMetadataSerializer.getSerializedSizeHint();
if (sizeHint == -1) sizeHint = 64;
var out = BufDataOutput.create(sizeHint);
try {
diskMetadataSerializer.serialize(diskMetadata, out);
} catch (SerializationException ex) {
throw new IllegalStateException("Failed to serialize metadata", ex);
}
return out.asList();
}
private DiskMetadata deserializeMetadata(Buf prevBytes) {
try {
return diskMetadataSerializer.deserialize(BufDataInput.create(prevBytes));
} catch (SerializationException ex) {
throw new IllegalStateException("Failed to deserialize metadata", ex);
}
}
@Override
public Mono<Void> writeContentBlock(URL url, DataBlock dataBlock) {
return Mono
.<Void>fromRunnable(() -> writeContentBlockSync(url, dataBlock))
.subscribeOn(Schedulers.boundedElastic());
}
@Override
public void writeContentBlockSync(URL url, DataBlock dataBlock) {
// Check if this cache should cache the url, otherwise do nothing
if (!shouldCache.test(url)) {
return;
}
Buf urlKey = serializeUrl(url);
Buf blockKey = getBlockKey(url, dataBlock.getId());
fileContent.put(blockKey, dataBlock.getData(), LLDictionaryResultType.VOID);
fileMetadata.update(urlKey, prevBytes -> {
@Nullable DiskMetadata result;
if (prevBytes != null) {
DiskMetadata prevMeta = deserializeMetadata(prevBytes);
if (!prevMeta.isDownloadedBlock(dataBlock.getId())) {
BooleanArrayList bal = prevMeta.downloadedBlocks().clone();
if (prevMeta.size() == -1) {
if (bal.size() > dataBlock.getId()) {
bal.set(dataBlock.getId(), true);
} else if (bal.size() == dataBlock.getId()) {
bal.add(true);
} else {
throw new IndexOutOfBoundsException(
"Trying to write a block too much far from the last block. Previous total blocks: "
+ bal.size() + " Current block id: " + dataBlock.getId());
}
} else {
bal.set(dataBlock.getId(), true);
}
result = new DiskMetadata(prevMeta.size(), bal);
} else {
result = prevMeta;
}
} else {
result = null;
}
if (result != null) {
return serializeMetadata(result);
} else {
return null;
}
}, UpdateReturnMode.NOTHING);
}
@Override
public Flux<DataBlock> requestContent(URL url) {
return Flux.fromStream(() -> requestContentSync(url)).subscribeOn(Schedulers.boundedElastic());
}
@Override
public Stream<DataBlock> requestContentSync(URL url) {
record BlockMeta(int blockId, boolean downloaded) {}
var meta = this.requestDiskMetadataSync(url);
if (meta == null || !meta.isDownloadedFully()) {
return Stream.empty();
}
return StreamUtils.indexed(meta.downloadedBlocks().stream(),
(downloaded, blockId) -> new BlockMeta(toIntExact(blockId), downloaded)
)
// Get only downloaded blocks
.filter(BlockMeta::downloaded).map(blockMeta -> {
if (!blockMeta.downloaded) {
return null;
}
var blockKey = getBlockKey(url, blockMeta.blockId);
var data = fileContent.get(null, blockKey);
long blockOffset = getBlockOffset(blockMeta.blockId);
int blockLength = data.size();
if (meta.size() != -1) {
if (blockOffset + blockLength >= meta.size()) {
if (blockOffset + blockLength > meta.size()) {
throw new IllegalStateException("Overflowed data size");
}
} else {
// Intermediate blocks must be of max size
assert data.size() == BLOCK_SIZE;
}
}
return DataBlock.of(blockOffset, blockLength, data);
}).filter(Objects::nonNull);
}
private <T extends URL> Buf getBlockKey(T url, int blockId) {
//noinspection unchecked
URLSerializer<T> urlSerializer = (URLSerializer<T>) url.getSerializer();
int urlSizeHint = urlSerializer.getSerializedSizeHint();
if (urlSizeHint == -1) {
urlSizeHint = 64;
}
var sizeHint = urlSizeHint + Integer.BYTES;
var out = BufDataOutput.create(sizeHint);
try {
urlSerializer.serialize(url, out);
} catch (SerializationException ex) {
throw new IllegalStateException("Failed to serialize url", ex);
}
out.writeInt(blockId);
return out.asList();
}
private static long getBlockOffset(int blockId) {
return blockId * (long) BLOCK_SIZE;
}
@Override
public Mono<DiskMetadata> requestDiskMetadata(URL url) {
return Mono.fromCallable(() -> requestDiskMetadataSync(url)).subscribeOn(Schedulers.boundedElastic());
}
@Override
public DiskMetadata requestDiskMetadataSync(URL url) {
Buf urlKey = serializeUrl(url);
var prevBytes = fileMetadata.get(null, urlKey);
if (prevBytes != null) {
return deserializeMetadata(prevBytes);
} else {
return null;
}
}
@Override
public Mono<Metadata> requestMetadata(URL url) {
return requestDiskMetadata(url).map(DiskMetadata::asMetadata);
}
@Override
public Metadata requestMetadataSync(URL url) {
var metadata = requestDiskMetadataSync(url);
if (metadata != null) {
return metadata.asMetadata();
} else {
return null;
}
}
@Override
public Mono<Tuple2<Metadata, Flux<DataBlock>>> request(URL url) {
return Mono
.fromCallable(() -> {
var tuple = requestSync(url);
if (tuple == null) {
return null;
}
return tuple.mapT2(s -> Flux.fromStream(s).subscribeOn(Schedulers.boundedElastic()));
})
.subscribeOn(Schedulers.boundedElastic());
}
@Override
public Tuple2<Metadata, Stream<DataBlock>> requestSync(URL url) {
Buf urlKey = serializeUrl(url);
var serialized = fileMetadata.get(null, urlKey);
if (serialized == null) {
return null;
}
DiskMetadata diskMeta = deserializeMetadata(serialized);
var meta = diskMeta.asMetadata();
if (diskMeta.isDownloadedFully()) {
return Tuples.of(meta, this.requestContentSync(url));
} else {
return Tuples.of(meta, Stream.empty());
}
}
@Override
public void close() {
db.close();
}
@Override
public void pauseForBackup() {
db.pauseForBackup();
}
@Override
public void resumeAfterBackup() {
db.resumeAfterBackup();
}
@Override
public boolean isPaused() {
return db.isPaused();
}
}

View File

@ -20,12 +20,8 @@ package org.warp.filesponge;
import static java.lang.Math.toIntExact; import static java.lang.Math.toIntExact;
import io.netty5.buffer.Buffer; import it.cavallium.dbengine.buffers.BufDataInput;
import io.netty5.buffer.BufferAllocator; import it.cavallium.dbengine.buffers.BufDataOutput;
import io.netty5.util.Send;
import it.cavallium.dbengine.database.serialization.BufferDataInputOwned;
import it.cavallium.dbengine.database.serialization.BufferDataInputShared;
import it.cavallium.dbengine.database.serialization.BufferDataOutput;
import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.Serializer;
import it.unimi.dsi.fastutil.booleans.BooleanArrayList; import it.unimi.dsi.fastutil.booleans.BooleanArrayList;
@ -81,8 +77,7 @@ public record DiskMetadata(long size, BooleanArrayList downloadedBlocks) {
public static class DiskMetadataSerializer implements Serializer<DiskMetadata> { public static class DiskMetadataSerializer implements Serializer<DiskMetadata> {
@Override @Override
public @NotNull DiskMetadata deserialize(@NotNull Buffer serialized) throws SerializationException { public @NotNull DiskMetadata deserialize(@NotNull BufDataInput dis) throws SerializationException {
var dis = new BufferDataInputShared(serialized);
int legacySize = dis.readInt(); int legacySize = dis.readInt();
long size; long size;
if (legacySize == -2) { if (legacySize == -2) {
@ -104,8 +99,7 @@ public record DiskMetadata(long size, BooleanArrayList downloadedBlocks) {
} }
@Override @Override
public void serialize(@NotNull DiskMetadata deserialized, Buffer output) throws SerializationException { public void serialize(@NotNull DiskMetadata deserialized, BufDataOutput dos) throws SerializationException {
var dos = new BufferDataOutput(output);
dos.writeInt(-2); dos.writeInt(-2);
dos.writeLong(deserialized.size); dos.writeLong(deserialized.size);
var blocksCount = deserialized.getBlocksCount(); var blocksCount = deserialized.getBlocksCount();

View File

@ -18,7 +18,6 @@
package org.warp.filesponge; package org.warp.filesponge;
import io.netty5.buffer.BufferAllocator;
import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.Serializer;
public interface URL { public interface URL {

View File

@ -18,11 +18,9 @@
package org.warp.filesponge; package org.warp.filesponge;
import io.netty5.buffer.Buffer; import it.cavallium.dbengine.buffers.Buf;
import io.netty5.buffer.BufferAllocator; import it.cavallium.dbengine.buffers.BufDataOutput;
import it.cavallium.dbengine.database.serialization.BufferDataOutput;
import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationException;
import java.nio.charset.StandardCharsets;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
public interface URLSerializer<T extends URL> { public interface URLSerializer<T extends URL> {
@ -30,7 +28,7 @@ public interface URLSerializer<T extends URL> {
/** /**
* @param output its writable size will be at least equal to the size hint * @param output its writable size will be at least equal to the size hint
*/ */
void serialize(@NotNull T url, Buffer output) throws SerializationException; void serialize(@NotNull T url, BufDataOutput output) throws SerializationException;
/** /**
* @return hint about the expected size of the buffer * @return hint about the expected size of the buffer

View File

@ -18,18 +18,17 @@
package org.warp.filesponge; package org.warp.filesponge;
import io.netty5.buffer.Buffer; import it.cavallium.dbengine.buffers.Buf;
import it.cavallium.dbengine.database.serialization.BufferDataOutput; import it.cavallium.dbengine.buffers.BufDataOutput;
import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationException;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
public abstract class URLStringSerializer<T extends URL> implements URLSerializer<T> { public abstract class URLStringSerializer<T extends URL> implements URLSerializer<T> {
@Override @Override
public final void serialize(@NotNull T url, Buffer output) throws SerializationException { public final void serialize(@NotNull T url, BufDataOutput output) throws SerializationException {
var string = this.serialize(url); var string = this.serialize(url);
var dataOut = new BufferDataOutput(output); output.writeUTF(string);
dataOut.writeUTF(string);
} }
public abstract @NotNull String serialize(@NotNull T url); public abstract @NotNull String serialize(@NotNull T url);

View File

@ -28,7 +28,7 @@ public class ThreadSafety {
return s; return s;
})).subscribeOn(schedulerSingle)) })).subscribeOn(schedulerSingle))
.ignoreElements() .ignoreElements()
.thenMany(Flux.defer(() -> Flux.fromStream(list::stream))) .thenMany(Flux.defer(() -> Flux.fromStream(list::stream).subscribeOn(Schedulers.boundedElastic())))
.subscribeOn(schedulerParallel); .subscribeOn(schedulerParallel);
Integer[] checks = new Integer[iterations * 2]; Integer[] checks = new Integer[iterations * 2];