diff --git a/pom.xml b/pom.xml
index 97010b5..4e37564 100644
--- a/pom.xml
+++ b/pom.xml
@@ -122,7 +122,7 @@
it.cavallium
dbengine
- 3.0.0-SNAPSHOT
+ 4.0.0-SNAPSHOT
diff --git a/src/main/java/module-info.java b/src/main/java/module-info.java
index 28f53bc..9137b3b 100644
--- a/src/main/java/module-info.java
+++ b/src/main/java/module-info.java
@@ -1,11 +1,9 @@
module filesponge {
- requires io.netty5.buffer;
requires org.apache.logging.log4j;
requires dbengine;
requires it.unimi.dsi.fastutil;
requires org.jetbrains.annotations;
requires reactor.core;
requires org.reactivestreams;
- requires io.netty5.common;
exports org.warp.filesponge;
}
\ No newline at end of file
diff --git a/src/main/java/org/warp/filesponge/DataBlock.java b/src/main/java/org/warp/filesponge/DataBlock.java
index e8eedbe..dbbd639 100644
--- a/src/main/java/org/warp/filesponge/DataBlock.java
+++ b/src/main/java/org/warp/filesponge/DataBlock.java
@@ -20,65 +20,31 @@ package org.warp.filesponge;
import static java.lang.Math.toIntExact;
-import io.netty5.buffer.Buffer;
-import io.netty5.buffer.Drop;
-import io.netty5.buffer.Owned;
-import io.netty5.util.Send;
-import io.netty5.buffer.internal.ResourceSupport;
+import it.cavallium.dbengine.buffers.Buf;
+import java.nio.Buffer;
import java.util.Objects;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-public final class DataBlock extends ResourceSupport {
+public final class DataBlock {
- private static final Logger logger = LogManager.getLogger(DataBlock.class);
+ public static DataBlock EMPTY = new DataBlock(-1, -1, null);
- private static final Drop DROP = new Drop<>() {
- @Override
- public void drop(DataBlock obj) {
- try {
- if (obj.data != null) {
- obj.data.close();
- }
- } catch (Throwable ex) {
- logger.error("Failed to close data", ex);
- }
- }
-
- @Override
- public Drop fork() {
- return this;
- }
-
- @Override
- public void attach(DataBlock obj) {
-
- }
- };
-
- public static DataBlock of(long offset, int length, Send data) {
+ public static DataBlock of(long offset, int length, Buf data) {
return new DataBlock(offset, length, data);
}
- private DataBlock(long offset, int length, Send data) {
- super(DROP);
- try (data) {
- this.offset = offset;
- this.length = length;
- this.data = data.receive();
- }
+ private DataBlock(long offset, int length, Buf data) {
+ this.offset = offset;
+ this.length = length;
+ this.data = data;
}
private final long offset;
private final int length;
- private final Buffer data;
+ private final Buf data;
- public Buffer getDataCopy() {
- assert data.isAccessible();
- return data.copy();
- }
-
- public Buffer getDataUnsafe() {
+ public Buf getData() {
return data;
}
@@ -98,22 +64,18 @@ public final class DataBlock extends ResourceSupport {
if (o == this) {
return true;
}
- if (!(o instanceof DataBlock)) {
+ if (!(o instanceof final DataBlock other)) {
return false;
}
- final DataBlock other = (DataBlock) o;
if (this.getOffset() != other.getOffset()) {
return false;
}
if (this.getLength() != other.getLength()) {
return false;
}
- final Object this$data = this.getDataUnsafe();
- final Object other$data = other.getDataUnsafe();
- if (!Objects.equals(this$data, other$data)) {
- return false;
- }
- return true;
+ final Object this$data = this.getData();
+ final Object other$data = other.getData();
+ return Objects.equals(this$data, other$data);
}
public int hashCode() {
@@ -122,28 +84,13 @@ public final class DataBlock extends ResourceSupport {
long offset = this.getOffset();
result = result * PRIME + (int) (offset ^ (offset >>> 32));
result = result * PRIME + this.getLength();
- final Object $data = this.getDataUnsafe();
+ final Object $data = this.getData();
result = result * PRIME + ($data == null ? 43 : $data.hashCode());
return result;
}
public String toString() {
- return "DataBlock(offset=" + this.getOffset() + ", length=" + this.getLength() + ", data=" + this.getDataUnsafe() + ")";
+ return "DataBlock(offset=" + this.getOffset() + ", length=" + this.getLength() + ", data=" + this.getData() + ")";
}
- @Override
- protected RuntimeException createResourceClosedException() {
- return new IllegalStateException("Closed");
- }
-
- @Override
- protected Owned prepareSend() {
- Send dataSend;
- dataSend = this.data.send();
- return drop -> {
- var instance = new DataBlock(offset, length, dataSend);
- drop.attach(instance);
- return instance;
- };
- }
}
diff --git a/src/main/java/org/warp/filesponge/DiskCache.java b/src/main/java/org/warp/filesponge/DiskCache.java
index 332cf8b..7ce1577 100644
--- a/src/main/java/org/warp/filesponge/DiskCache.java
+++ b/src/main/java/org/warp/filesponge/DiskCache.java
@@ -1,6 +1,6 @@
/*
* FileSponge
- * Copyright (C) 2021 Andrea Cavalli
+ * Copyright (C) 2023 Andrea Cavalli
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -18,296 +18,50 @@
package org.warp.filesponge;
-import static java.lang.Math.toIntExact;
-import static org.warp.filesponge.FileSponge.BLOCK_SIZE;
-
-import io.netty5.buffer.Buffer;
-import io.netty5.util.Resource;
-import io.netty5.util.Send;
import it.cavallium.dbengine.client.IBackuppable;
-import it.cavallium.dbengine.database.BufSupplier;
import it.cavallium.dbengine.database.ColumnUtils;
import it.cavallium.dbengine.database.LLDatabaseConnection;
import it.cavallium.dbengine.database.LLDictionary;
-import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLKeyValueDatabase;
-import it.cavallium.dbengine.database.LLUtils;
+import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.UpdateMode;
-import it.cavallium.dbengine.database.UpdateReturnMode;
-import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.rpc.current.data.DatabaseOptions;
-import it.unimi.dsi.fastutil.booleans.BooleanArrayList;
import java.util.List;
-import java.util.Objects;
import java.util.function.Predicate;
-import org.jetbrains.annotations.Nullable;
-import org.warp.filesponge.DiskMetadata.DiskMetadataSerializer;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
+import java.util.stream.Stream;
import reactor.util.function.Tuple2;
-import reactor.util.function.Tuples;
-public class DiskCache implements URLsDiskHandler, URLsWriter, IBackuppable {
+public interface DiskCache extends URLsDiskHandler, URLsWriter, IBackuppable, SafeCloseable {
- private final DiskMetadataSerializer diskMetadataSerializer;
+ void writeMetadataSync(URL url, Metadata metadata);
- private final LLKeyValueDatabase db;
- private final LLDictionary fileContent;
- private final LLDictionary fileMetadata;
- private final Predicate shouldCache;
+ void writeContentBlockSync(URL url, DataBlock dataBlock);
- public DiskCache(LLKeyValueDatabase db,
- LLDictionary fileContent,
- LLDictionary fileMetadata,
- Predicate shouldCache) {
- this.db = db;
- this.fileContent = fileContent;
- this.fileMetadata = fileMetadata;
- this.diskMetadataSerializer = new DiskMetadataSerializer();
- this.shouldCache = shouldCache;
- }
+ Stream requestContentSync(URL url);
- public static Mono open(LLDatabaseConnection databaseConnection,
+ DiskMetadata requestDiskMetadataSync(URL url);
+
+ Metadata requestMetadataSync(URL url);
+
+ Tuple2> requestSync(URL url);
+
+ static DiskCache open(LLDatabaseConnection databaseConnection,
String dbName,
DatabaseOptions databaseOptions,
Predicate shouldCache) {
- return databaseConnection
- .getDatabase(dbName,
- List.of(ColumnUtils.dictionary("file-content"), ColumnUtils.dictionary("file-metadata")),
- databaseOptions
- )
- .flatMap(db -> Mono.zip(
- Mono.just(db).single(),
- db.getDictionary("file-content", UpdateMode.ALLOW).single(),
- db.getDictionary("file-metadata", UpdateMode.ALLOW).single()
- ))
- .map(tuple -> new DiskCache(tuple.getT1(), tuple.getT2(), tuple.getT3(), shouldCache))
- .single();
+ var db = databaseConnection.getDatabase(dbName,
+ List.of(ColumnUtils.dictionary("file-content"), ColumnUtils.dictionary("file-metadata")),
+ databaseOptions
+ );
+ var dict1 = db.getDictionary("file-content", UpdateMode.ALLOW);
+ var dict2 = db.getDictionary("file-metadata", UpdateMode.ALLOW);
+ return new DiskCacheImpl(db, dict1, dict2, shouldCache);
}
- @Override
- public Mono writeMetadata(URL url, Metadata metadata) {
- // Check if this cache should cache the url, otherwise do nothing
- if (!shouldCache.test(url)) return Mono.empty();
-
- Mono keyMono = Mono.fromCallable(() -> serializeUrl(url));
- return fileMetadata
- .update(keyMono,
- oldValue -> Objects.requireNonNullElseGet(oldValue,
- () -> serializeMetadata(new DiskMetadata(metadata.size(),
- BooleanArrayList.wrap(new boolean[DiskMetadata.getBlocksCount(metadata.size(), BLOCK_SIZE)])
- ))
- ),
- UpdateReturnMode.NOTHING
- )
- .then();
- }
-
- private Buffer serializeUrl(T url) {
- @SuppressWarnings("unchecked")
- URLSerializer urlSerializer = (URLSerializer) url.getSerializer();
-
- int sizeHint = urlSerializer.getSerializedSizeHint();
- if (sizeHint == -1) sizeHint = 64;
- var buffer = db.getAllocator().allocate(sizeHint);
- try {
- try {
- urlSerializer.serialize(url, buffer);
- } catch (SerializationException ex) {
- throw new IllegalStateException("Failed to serialize url", ex);
- }
- return buffer;
- } catch (Throwable ex) {
- buffer.close();
- throw ex;
- }
- }
-
- private Buffer serializeMetadata(DiskMetadata diskMetadata) {
- int sizeHint = diskMetadataSerializer.getSerializedSizeHint();
- if (sizeHint == -1) sizeHint = 64;
- var buffer = db.getAllocator().allocate(sizeHint);
- try {
- try {
- diskMetadataSerializer.serialize(diskMetadata, buffer);
- } catch (SerializationException ex) {
- throw new IllegalStateException("Failed to serialize metadata", ex);
- }
- return buffer;
- } catch (Throwable ex) {
- buffer.close();
- throw ex;
- }
- }
-
- private DiskMetadata deserializeMetadata(Buffer prevBytes) {
- try {
- return diskMetadataSerializer.deserialize(prevBytes);
- } catch (SerializationException ex) {
- throw new IllegalStateException("Failed to deserialize metadata", ex);
- }
- }
-
- @Override
- public Mono writeContentBlock(URL url, DataBlock dataBlock) {
- // Check if this cache should cache the url, otherwise do nothing
- if (!shouldCache.test(url)) return Mono.empty();
-
- Mono urlKeyMono = Mono.fromCallable(() -> serializeUrl(url));
- Mono blockKeyMono = Mono.fromCallable(() -> getBlockKey(url, dataBlock.getId()));
- return Mono.using(
- () -> BufSupplier.of(dataBlock::getDataCopy),
- bufSupplier -> fileContent
- .put(blockKeyMono, Mono.fromSupplier(bufSupplier::get), LLDictionaryResultType.VOID)
- .doOnNext(Resource::close)
- .then(),
- BufSupplier::close
- )
- .then(fileMetadata.update(urlKeyMono, prevBytes -> {
- @Nullable DiskMetadata result;
- if (prevBytes != null) {
- DiskMetadata prevMeta = deserializeMetadata(prevBytes);
- if (!prevMeta.isDownloadedBlock(dataBlock.getId())) {
- BooleanArrayList bal = prevMeta.downloadedBlocks().clone();
- if (prevMeta.size() == -1) {
- if (bal.size() > dataBlock.getId()) {
- bal.set(dataBlock.getId(), true);
- } else if (bal.size() == dataBlock.getId()) {
- bal.add(true);
- } else {
- throw new IndexOutOfBoundsException(
- "Trying to write a block too much far from the last block. Previous total blocks: "
- + bal.size() + " Current block id: " + dataBlock.getId());
- }
- } else {
- bal.set(dataBlock.getId(), true);
- }
- result = new DiskMetadata(prevMeta.size(), bal);
- } else {
- result = prevMeta;
- }
- } else {
- result = null;
- }
- if (result != null) {
- return serializeMetadata(result);
- } else {
- return null;
- }
- }, UpdateReturnMode.NOTHING)
- )
- .then();
- }
-
- @Override
- public Flux requestContent(URL url) {
- return this
- .requestDiskMetadata(url)
- .filter(DiskMetadata::isDownloadedFully)
- .flatMapMany(meta -> Flux.fromStream(meta.downloadedBlocks()::stream)
- .index()
- // Get only downloaded blocks
- .filter(Tuple2::getT2)
- .flatMapSequential(blockMeta -> {
- int blockId = toIntExact(blockMeta.getT1());
- boolean downloaded = blockMeta.getT2();
- if (!downloaded) {
- return Mono.empty();
- }
- var blockKeyMono = Mono.fromCallable(() -> getBlockKey(url, blockId));
- return fileContent
- .get(null, blockKeyMono)
- .map(data -> {
- try (data) {
- long blockOffset = getBlockOffset(blockId);
- int blockLength = data.readableBytes();
- if (meta.size() != -1) {
- if (blockOffset + blockLength >= meta.size()) {
- if (blockOffset + blockLength > meta.size()) {
- throw new IllegalStateException("Overflowed data size");
- }
- } else {
- // Intermediate blocks must be of max size
- assert data.readableBytes() == BLOCK_SIZE;
- }
- }
- return DataBlock.of(blockOffset, blockLength, data.send());
- }
- });
- })
- );
- }
-
- private Buffer getBlockKey(URL url, int blockId) {
- try (var urlBytes = serializeUrl(url)) {
- Buffer blockIdBytes = this.db.getAllocator().allocate(Integer.BYTES);
- blockIdBytes.writeInt(blockId);
- return LLUtils.compositeBuffer(db.getAllocator(), urlBytes.send(), blockIdBytes.send());
- }
- }
-
- private static long getBlockOffset(int blockId) {
- return blockId * (long) BLOCK_SIZE;
- }
-
- @Override
- public Mono requestDiskMetadata(URL url) {
- Mono urlKeyMono = Mono.fromCallable(() -> serializeUrl(url));
- return fileMetadata
- .get(null, urlKeyMono)
- .map(prevBytes -> {
- try (prevBytes) {
- return deserializeMetadata(prevBytes);
- }
- });
- }
-
- @Override
- public Mono requestMetadata(URL url) {
- return requestDiskMetadata(url)
- .map(DiskMetadata::asMetadata);
- }
-
- @Override
- public Mono>> request(URL url) {
- Mono urlKeyMono = Mono.fromCallable(() -> serializeUrl(url));
- return Mono
- .using(
- () -> serializeUrl(url),
- key -> fileMetadata.get(null, urlKeyMono),
- Resource::close
- )
- .map(serialized -> {
- DiskMetadata diskMeta;
- try (serialized) {
- diskMeta = deserializeMetadata(serialized);
- }
- var meta = diskMeta.asMetadata();
- if (diskMeta.isDownloadedFully()) {
- return Tuples.of(meta, this.requestContent(url));
- } else {
- return Tuples.of(meta, Flux.empty());
- }
- });
- }
-
- public Mono close() {
- return db.close();
- }
-
- @Override
- public Mono pauseForBackup() {
- return db.pauseForBackup();
- }
-
- @Override
- public Mono resumeAfterBackup() {
- return db.resumeAfterBackup();
- }
-
- @Override
- public boolean isPaused() {
- return db.isPaused();
+ static DiskCache openCustom(LLKeyValueDatabase db,
+ LLDictionary fileContent,
+ LLDictionary fileMetadata,
+ Predicate shouldCache) {
+ return new DiskCacheImpl(db, fileContent, fileMetadata, shouldCache);
}
}
diff --git a/src/main/java/org/warp/filesponge/DiskCacheImpl.java b/src/main/java/org/warp/filesponge/DiskCacheImpl.java
new file mode 100644
index 0000000..85d58d6
--- /dev/null
+++ b/src/main/java/org/warp/filesponge/DiskCacheImpl.java
@@ -0,0 +1,325 @@
+/*
+ * FileSponge
+ * Copyright (C) 2021 Andrea Cavalli
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package org.warp.filesponge;
+
+import static java.lang.Math.toIntExact;
+import static org.warp.filesponge.FileSponge.BLOCK_SIZE;
+
+import it.cavallium.dbengine.buffers.Buf;
+import it.cavallium.dbengine.buffers.BufDataInput;
+import it.cavallium.dbengine.buffers.BufDataOutput;
+import it.cavallium.dbengine.client.IBackuppable;
+import it.cavallium.dbengine.database.ColumnUtils;
+import it.cavallium.dbengine.database.LLDatabaseConnection;
+import it.cavallium.dbengine.database.LLDictionary;
+import it.cavallium.dbengine.database.LLDictionaryResultType;
+import it.cavallium.dbengine.database.LLKeyValueDatabase;
+import it.cavallium.dbengine.database.UpdateMode;
+import it.cavallium.dbengine.database.UpdateReturnMode;
+import it.cavallium.dbengine.database.serialization.SerializationException;
+import it.cavallium.dbengine.rpc.current.data.DatabaseOptions;
+import it.cavallium.dbengine.utils.StreamUtils;
+import it.unimi.dsi.fastutil.booleans.BooleanArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+import org.jetbrains.annotations.Nullable;
+import org.warp.filesponge.DiskMetadata.DiskMetadataSerializer;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+import reactor.util.function.Tuple2;
+import reactor.util.function.Tuples;
+
+class DiskCacheImpl implements DiskCache {
+
+ private final DiskMetadataSerializer diskMetadataSerializer;
+
+ private final LLKeyValueDatabase db;
+ private final LLDictionary fileContent;
+ private final LLDictionary fileMetadata;
+ private final Predicate shouldCache;
+
+ public DiskCacheImpl(LLKeyValueDatabase db,
+ LLDictionary fileContent,
+ LLDictionary fileMetadata,
+ Predicate shouldCache) {
+ this.db = db;
+ this.fileContent = fileContent;
+ this.fileMetadata = fileMetadata;
+ this.diskMetadataSerializer = new DiskMetadataSerializer();
+ this.shouldCache = shouldCache;
+ }
+
+ @Override
+ public Mono writeMetadata(URL url, Metadata metadata) {
+ return Mono.fromRunnable(() -> writeMetadataSync(url, metadata)).subscribeOn(Schedulers.boundedElastic());
+ }
+
+ @Override
+ public void writeMetadataSync(URL url, Metadata metadata) {
+ // Check if this cache should cache the url, otherwise do nothing
+ if (!shouldCache.test(url)) return;
+
+ var key = serializeUrl(url);
+
+ fileMetadata.update(key, oldValue -> {
+ if (oldValue != null) {
+ return oldValue;
+ } else {
+ return serializeMetadata(new DiskMetadata(metadata.size(),
+ BooleanArrayList.wrap(new boolean[DiskMetadata.getBlocksCount(metadata.size(), BLOCK_SIZE)])
+ ));
+ }
+ }, UpdateReturnMode.NOTHING);
+ }
+
+ private Buf serializeUrl(T url) {
+ @SuppressWarnings("unchecked")
+ URLSerializer urlSerializer = (URLSerializer) url.getSerializer();
+
+ int sizeHint = urlSerializer.getSerializedSizeHint();
+ if (sizeHint == -1) sizeHint = 64;
+ var output = BufDataOutput.create(sizeHint);
+ try {
+ urlSerializer.serialize(url, output);
+ } catch (SerializationException ex) {
+ throw new IllegalStateException("Failed to serialize url", ex);
+ }
+ return output.asList();
+ }
+
+ private Buf serializeMetadata(DiskMetadata diskMetadata) {
+ int sizeHint = diskMetadataSerializer.getSerializedSizeHint();
+ if (sizeHint == -1) sizeHint = 64;
+ var out = BufDataOutput.create(sizeHint);
+ try {
+ diskMetadataSerializer.serialize(diskMetadata, out);
+ } catch (SerializationException ex) {
+ throw new IllegalStateException("Failed to serialize metadata", ex);
+ }
+ return out.asList();
+ }
+
+ private DiskMetadata deserializeMetadata(Buf prevBytes) {
+ try {
+ return diskMetadataSerializer.deserialize(BufDataInput.create(prevBytes));
+ } catch (SerializationException ex) {
+ throw new IllegalStateException("Failed to deserialize metadata", ex);
+ }
+ }
+
+ @Override
+ public Mono writeContentBlock(URL url, DataBlock dataBlock) {
+ return Mono
+ .fromRunnable(() -> writeContentBlockSync(url, dataBlock))
+ .subscribeOn(Schedulers.boundedElastic());
+ }
+
+ @Override
+ public void writeContentBlockSync(URL url, DataBlock dataBlock) {
+ // Check if this cache should cache the url, otherwise do nothing
+ if (!shouldCache.test(url)) {
+ return;
+ }
+
+ Buf urlKey = serializeUrl(url);
+ Buf blockKey = getBlockKey(url, dataBlock.getId());
+
+ fileContent.put(blockKey, dataBlock.getData(), LLDictionaryResultType.VOID);
+ fileMetadata.update(urlKey, prevBytes -> {
+ @Nullable DiskMetadata result;
+ if (prevBytes != null) {
+ DiskMetadata prevMeta = deserializeMetadata(prevBytes);
+ if (!prevMeta.isDownloadedBlock(dataBlock.getId())) {
+ BooleanArrayList bal = prevMeta.downloadedBlocks().clone();
+ if (prevMeta.size() == -1) {
+ if (bal.size() > dataBlock.getId()) {
+ bal.set(dataBlock.getId(), true);
+ } else if (bal.size() == dataBlock.getId()) {
+ bal.add(true);
+ } else {
+ throw new IndexOutOfBoundsException(
+ "Trying to write a block too much far from the last block. Previous total blocks: "
+ + bal.size() + " Current block id: " + dataBlock.getId());
+ }
+ } else {
+ bal.set(dataBlock.getId(), true);
+ }
+ result = new DiskMetadata(prevMeta.size(), bal);
+ } else {
+ result = prevMeta;
+ }
+ } else {
+ result = null;
+ }
+ if (result != null) {
+ return serializeMetadata(result);
+ } else {
+ return null;
+ }
+ }, UpdateReturnMode.NOTHING);
+ }
+
+ @Override
+ public Flux requestContent(URL url) {
+ return Flux.fromStream(() -> requestContentSync(url)).subscribeOn(Schedulers.boundedElastic());
+ }
+
+ @Override
+ public Stream requestContentSync(URL url) {
+ record BlockMeta(int blockId, boolean downloaded) {}
+ var meta = this.requestDiskMetadataSync(url);
+ if (meta == null || !meta.isDownloadedFully()) {
+ return Stream.empty();
+ }
+ return StreamUtils.indexed(meta.downloadedBlocks().stream(),
+ (downloaded, blockId) -> new BlockMeta(toIntExact(blockId), downloaded)
+ )
+ // Get only downloaded blocks
+ .filter(BlockMeta::downloaded).map(blockMeta -> {
+ if (!blockMeta.downloaded) {
+ return null;
+ }
+ var blockKey = getBlockKey(url, blockMeta.blockId);
+ var data = fileContent.get(null, blockKey);
+ long blockOffset = getBlockOffset(blockMeta.blockId);
+ int blockLength = data.size();
+ if (meta.size() != -1) {
+ if (blockOffset + blockLength >= meta.size()) {
+ if (blockOffset + blockLength > meta.size()) {
+ throw new IllegalStateException("Overflowed data size");
+ }
+ } else {
+ // Intermediate blocks must be of max size
+ assert data.size() == BLOCK_SIZE;
+ }
+ }
+ return DataBlock.of(blockOffset, blockLength, data);
+ }).filter(Objects::nonNull);
+ }
+
+ private Buf getBlockKey(T url, int blockId) {
+ //noinspection unchecked
+ URLSerializer urlSerializer = (URLSerializer) url.getSerializer();
+
+ int urlSizeHint = urlSerializer.getSerializedSizeHint();
+ if (urlSizeHint == -1) {
+ urlSizeHint = 64;
+ }
+
+ var sizeHint = urlSizeHint + Integer.BYTES;
+ var out = BufDataOutput.create(sizeHint);
+
+ try {
+ urlSerializer.serialize(url, out);
+ } catch (SerializationException ex) {
+ throw new IllegalStateException("Failed to serialize url", ex);
+ }
+
+ out.writeInt(blockId);
+
+ return out.asList();
+ }
+
+ private static long getBlockOffset(int blockId) {
+ return blockId * (long) BLOCK_SIZE;
+ }
+
+ @Override
+ public Mono requestDiskMetadata(URL url) {
+ return Mono.fromCallable(() -> requestDiskMetadataSync(url)).subscribeOn(Schedulers.boundedElastic());
+ }
+
+ @Override
+ public DiskMetadata requestDiskMetadataSync(URL url) {
+ Buf urlKey = serializeUrl(url);
+ var prevBytes = fileMetadata.get(null, urlKey);
+ if (prevBytes != null) {
+ return deserializeMetadata(prevBytes);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public Mono requestMetadata(URL url) {
+ return requestDiskMetadata(url).map(DiskMetadata::asMetadata);
+ }
+
+ @Override
+ public Metadata requestMetadataSync(URL url) {
+ var metadata = requestDiskMetadataSync(url);
+ if (metadata != null) {
+ return metadata.asMetadata();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public Mono>> request(URL url) {
+ return Mono
+ .fromCallable(() -> {
+ var tuple = requestSync(url);
+ if (tuple == null) {
+ return null;
+ }
+ return tuple.mapT2(s -> Flux.fromStream(s).subscribeOn(Schedulers.boundedElastic()));
+ })
+ .subscribeOn(Schedulers.boundedElastic());
+ }
+
+ @Override
+ public Tuple2> requestSync(URL url) {
+ Buf urlKey = serializeUrl(url);
+ var serialized = fileMetadata.get(null, urlKey);
+ if (serialized == null) {
+ return null;
+ }
+ DiskMetadata diskMeta = deserializeMetadata(serialized);
+ var meta = diskMeta.asMetadata();
+ if (diskMeta.isDownloadedFully()) {
+ return Tuples.of(meta, this.requestContentSync(url));
+ } else {
+ return Tuples.of(meta, Stream.empty());
+ }
+ }
+
+ @Override
+ public void close() {
+ db.close();
+ }
+
+ @Override
+ public void pauseForBackup() {
+ db.pauseForBackup();
+ }
+
+ @Override
+ public void resumeAfterBackup() {
+ db.resumeAfterBackup();
+ }
+
+ @Override
+ public boolean isPaused() {
+ return db.isPaused();
+ }
+}
diff --git a/src/main/java/org/warp/filesponge/DiskMetadata.java b/src/main/java/org/warp/filesponge/DiskMetadata.java
index e147f1c..96f8b3a 100644
--- a/src/main/java/org/warp/filesponge/DiskMetadata.java
+++ b/src/main/java/org/warp/filesponge/DiskMetadata.java
@@ -20,12 +20,8 @@ package org.warp.filesponge;
import static java.lang.Math.toIntExact;
-import io.netty5.buffer.Buffer;
-import io.netty5.buffer.BufferAllocator;
-import io.netty5.util.Send;
-import it.cavallium.dbengine.database.serialization.BufferDataInputOwned;
-import it.cavallium.dbengine.database.serialization.BufferDataInputShared;
-import it.cavallium.dbengine.database.serialization.BufferDataOutput;
+import it.cavallium.dbengine.buffers.BufDataInput;
+import it.cavallium.dbengine.buffers.BufDataOutput;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.unimi.dsi.fastutil.booleans.BooleanArrayList;
@@ -81,8 +77,7 @@ public record DiskMetadata(long size, BooleanArrayList downloadedBlocks) {
public static class DiskMetadataSerializer implements Serializer {
@Override
- public @NotNull DiskMetadata deserialize(@NotNull Buffer serialized) throws SerializationException {
- var dis = new BufferDataInputShared(serialized);
+ public @NotNull DiskMetadata deserialize(@NotNull BufDataInput dis) throws SerializationException {
int legacySize = dis.readInt();
long size;
if (legacySize == -2) {
@@ -104,8 +99,7 @@ public record DiskMetadata(long size, BooleanArrayList downloadedBlocks) {
}
@Override
- public void serialize(@NotNull DiskMetadata deserialized, Buffer output) throws SerializationException {
- var dos = new BufferDataOutput(output);
+ public void serialize(@NotNull DiskMetadata deserialized, BufDataOutput dos) throws SerializationException {
dos.writeInt(-2);
dos.writeLong(deserialized.size);
var blocksCount = deserialized.getBlocksCount();
diff --git a/src/main/java/org/warp/filesponge/URL.java b/src/main/java/org/warp/filesponge/URL.java
index 94a7164..c86ae77 100644
--- a/src/main/java/org/warp/filesponge/URL.java
+++ b/src/main/java/org/warp/filesponge/URL.java
@@ -18,7 +18,6 @@
package org.warp.filesponge;
-import io.netty5.buffer.BufferAllocator;
import it.cavallium.dbengine.database.serialization.Serializer;
public interface URL {
diff --git a/src/main/java/org/warp/filesponge/URLSerializer.java b/src/main/java/org/warp/filesponge/URLSerializer.java
index a0631c1..f5d015a 100644
--- a/src/main/java/org/warp/filesponge/URLSerializer.java
+++ b/src/main/java/org/warp/filesponge/URLSerializer.java
@@ -18,11 +18,9 @@
package org.warp.filesponge;
-import io.netty5.buffer.Buffer;
-import io.netty5.buffer.BufferAllocator;
-import it.cavallium.dbengine.database.serialization.BufferDataOutput;
+import it.cavallium.dbengine.buffers.Buf;
+import it.cavallium.dbengine.buffers.BufDataOutput;
import it.cavallium.dbengine.database.serialization.SerializationException;
-import java.nio.charset.StandardCharsets;
import org.jetbrains.annotations.NotNull;
public interface URLSerializer {
@@ -30,7 +28,7 @@ public interface URLSerializer {
/**
* @param output its writable size will be at least equal to the size hint
*/
- void serialize(@NotNull T url, Buffer output) throws SerializationException;
+ void serialize(@NotNull T url, BufDataOutput output) throws SerializationException;
/**
* @return hint about the expected size of the buffer
diff --git a/src/main/java/org/warp/filesponge/URLStringSerializer.java b/src/main/java/org/warp/filesponge/URLStringSerializer.java
index 9650d2f..5891595 100644
--- a/src/main/java/org/warp/filesponge/URLStringSerializer.java
+++ b/src/main/java/org/warp/filesponge/URLStringSerializer.java
@@ -18,18 +18,17 @@
package org.warp.filesponge;
-import io.netty5.buffer.Buffer;
-import it.cavallium.dbengine.database.serialization.BufferDataOutput;
+import it.cavallium.dbengine.buffers.Buf;
+import it.cavallium.dbengine.buffers.BufDataOutput;
import it.cavallium.dbengine.database.serialization.SerializationException;
import org.jetbrains.annotations.NotNull;
public abstract class URLStringSerializer implements URLSerializer {
@Override
- public final void serialize(@NotNull T url, Buffer output) throws SerializationException {
+ public final void serialize(@NotNull T url, BufDataOutput output) throws SerializationException {
var string = this.serialize(url);
- var dataOut = new BufferDataOutput(output);
- dataOut.writeUTF(string);
+ output.writeUTF(string);
}
public abstract @NotNull String serialize(@NotNull T url);
diff --git a/src/test/java/org/warp/filesponge/ThreadSafety.java b/src/test/java/org/warp/filesponge/ThreadSafety.java
index eca4bf4..9db7b12 100644
--- a/src/test/java/org/warp/filesponge/ThreadSafety.java
+++ b/src/test/java/org/warp/filesponge/ThreadSafety.java
@@ -28,7 +28,7 @@ public class ThreadSafety {
return s;
})).subscribeOn(schedulerSingle))
.ignoreElements()
- .thenMany(Flux.defer(() -> Flux.fromStream(list::stream)))
+ .thenMany(Flux.defer(() -> Flux.fromStream(list::stream).subscribeOn(Schedulers.boundedElastic())))
.subscribeOn(schedulerParallel);
Integer[] checks = new Integer[iterations * 2];