From 256e3daf84d4e426da5f75758f096d56ac42bd2b Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Tue, 28 Feb 2023 23:11:07 +0100 Subject: [PATCH] Remove unused code and netty 5 --- pom.xml | 2 +- src/main/java/module-info.java | 2 - .../java/org/warp/filesponge/DataBlock.java | 87 +---- .../java/org/warp/filesponge/DiskCache.java | 298 ++-------------- .../org/warp/filesponge/DiskCacheImpl.java | 325 ++++++++++++++++++ .../org/warp/filesponge/DiskMetadata.java | 14 +- src/main/java/org/warp/filesponge/URL.java | 1 - .../org/warp/filesponge/URLSerializer.java | 8 +- .../warp/filesponge/URLStringSerializer.java | 9 +- .../org/warp/filesponge/ThreadSafety.java | 2 +- 10 files changed, 381 insertions(+), 367 deletions(-) create mode 100644 src/main/java/org/warp/filesponge/DiskCacheImpl.java diff --git a/pom.xml b/pom.xml index 97010b5..4e37564 100644 --- a/pom.xml +++ b/pom.xml @@ -122,7 +122,7 @@ it.cavallium dbengine - 3.0.0-SNAPSHOT + 4.0.0-SNAPSHOT diff --git a/src/main/java/module-info.java b/src/main/java/module-info.java index 28f53bc..9137b3b 100644 --- a/src/main/java/module-info.java +++ b/src/main/java/module-info.java @@ -1,11 +1,9 @@ module filesponge { - requires io.netty5.buffer; requires org.apache.logging.log4j; requires dbengine; requires it.unimi.dsi.fastutil; requires org.jetbrains.annotations; requires reactor.core; requires org.reactivestreams; - requires io.netty5.common; exports org.warp.filesponge; } \ No newline at end of file diff --git a/src/main/java/org/warp/filesponge/DataBlock.java b/src/main/java/org/warp/filesponge/DataBlock.java index e8eedbe..dbbd639 100644 --- a/src/main/java/org/warp/filesponge/DataBlock.java +++ b/src/main/java/org/warp/filesponge/DataBlock.java @@ -20,65 +20,31 @@ package org.warp.filesponge; import static java.lang.Math.toIntExact; -import io.netty5.buffer.Buffer; -import io.netty5.buffer.Drop; -import io.netty5.buffer.Owned; -import io.netty5.util.Send; -import io.netty5.buffer.internal.ResourceSupport; +import it.cavallium.dbengine.buffers.Buf; +import java.nio.Buffer; import java.util.Objects; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -public final class DataBlock extends ResourceSupport { +public final class DataBlock { - private static final Logger logger = LogManager.getLogger(DataBlock.class); + public static DataBlock EMPTY = new DataBlock(-1, -1, null); - private static final Drop DROP = new Drop<>() { - @Override - public void drop(DataBlock obj) { - try { - if (obj.data != null) { - obj.data.close(); - } - } catch (Throwable ex) { - logger.error("Failed to close data", ex); - } - } - - @Override - public Drop fork() { - return this; - } - - @Override - public void attach(DataBlock obj) { - - } - }; - - public static DataBlock of(long offset, int length, Send data) { + public static DataBlock of(long offset, int length, Buf data) { return new DataBlock(offset, length, data); } - private DataBlock(long offset, int length, Send data) { - super(DROP); - try (data) { - this.offset = offset; - this.length = length; - this.data = data.receive(); - } + private DataBlock(long offset, int length, Buf data) { + this.offset = offset; + this.length = length; + this.data = data; } private final long offset; private final int length; - private final Buffer data; + private final Buf data; - public Buffer getDataCopy() { - assert data.isAccessible(); - return data.copy(); - } - - public Buffer getDataUnsafe() { + public Buf getData() { return data; } @@ -98,22 +64,18 @@ public final class DataBlock extends ResourceSupport { if (o == this) { return true; } - if (!(o instanceof DataBlock)) { + if (!(o instanceof final DataBlock other)) { return false; } - final DataBlock other = (DataBlock) o; if (this.getOffset() != other.getOffset()) { return false; } if (this.getLength() != other.getLength()) { return false; } - final Object this$data = this.getDataUnsafe(); - final Object other$data = other.getDataUnsafe(); - if (!Objects.equals(this$data, other$data)) { - return false; - } - return true; + final Object this$data = this.getData(); + final Object other$data = other.getData(); + return Objects.equals(this$data, other$data); } public int hashCode() { @@ -122,28 +84,13 @@ public final class DataBlock extends ResourceSupport { long offset = this.getOffset(); result = result * PRIME + (int) (offset ^ (offset >>> 32)); result = result * PRIME + this.getLength(); - final Object $data = this.getDataUnsafe(); + final Object $data = this.getData(); result = result * PRIME + ($data == null ? 43 : $data.hashCode()); return result; } public String toString() { - return "DataBlock(offset=" + this.getOffset() + ", length=" + this.getLength() + ", data=" + this.getDataUnsafe() + ")"; + return "DataBlock(offset=" + this.getOffset() + ", length=" + this.getLength() + ", data=" + this.getData() + ")"; } - @Override - protected RuntimeException createResourceClosedException() { - return new IllegalStateException("Closed"); - } - - @Override - protected Owned prepareSend() { - Send dataSend; - dataSend = this.data.send(); - return drop -> { - var instance = new DataBlock(offset, length, dataSend); - drop.attach(instance); - return instance; - }; - } } diff --git a/src/main/java/org/warp/filesponge/DiskCache.java b/src/main/java/org/warp/filesponge/DiskCache.java index 332cf8b..7ce1577 100644 --- a/src/main/java/org/warp/filesponge/DiskCache.java +++ b/src/main/java/org/warp/filesponge/DiskCache.java @@ -1,6 +1,6 @@ /* * FileSponge - * Copyright (C) 2021 Andrea Cavalli + * Copyright (C) 2023 Andrea Cavalli * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -18,296 +18,50 @@ package org.warp.filesponge; -import static java.lang.Math.toIntExact; -import static org.warp.filesponge.FileSponge.BLOCK_SIZE; - -import io.netty5.buffer.Buffer; -import io.netty5.util.Resource; -import io.netty5.util.Send; import it.cavallium.dbengine.client.IBackuppable; -import it.cavallium.dbengine.database.BufSupplier; import it.cavallium.dbengine.database.ColumnUtils; import it.cavallium.dbengine.database.LLDatabaseConnection; import it.cavallium.dbengine.database.LLDictionary; -import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLKeyValueDatabase; -import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.SafeCloseable; import it.cavallium.dbengine.database.UpdateMode; -import it.cavallium.dbengine.database.UpdateReturnMode; -import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.rpc.current.data.DatabaseOptions; -import it.unimi.dsi.fastutil.booleans.BooleanArrayList; import java.util.List; -import java.util.Objects; import java.util.function.Predicate; -import org.jetbrains.annotations.Nullable; -import org.warp.filesponge.DiskMetadata.DiskMetadataSerializer; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; +import java.util.stream.Stream; import reactor.util.function.Tuple2; -import reactor.util.function.Tuples; -public class DiskCache implements URLsDiskHandler, URLsWriter, IBackuppable { +public interface DiskCache extends URLsDiskHandler, URLsWriter, IBackuppable, SafeCloseable { - private final DiskMetadataSerializer diskMetadataSerializer; + void writeMetadataSync(URL url, Metadata metadata); - private final LLKeyValueDatabase db; - private final LLDictionary fileContent; - private final LLDictionary fileMetadata; - private final Predicate shouldCache; + void writeContentBlockSync(URL url, DataBlock dataBlock); - public DiskCache(LLKeyValueDatabase db, - LLDictionary fileContent, - LLDictionary fileMetadata, - Predicate shouldCache) { - this.db = db; - this.fileContent = fileContent; - this.fileMetadata = fileMetadata; - this.diskMetadataSerializer = new DiskMetadataSerializer(); - this.shouldCache = shouldCache; - } + Stream requestContentSync(URL url); - public static Mono open(LLDatabaseConnection databaseConnection, + DiskMetadata requestDiskMetadataSync(URL url); + + Metadata requestMetadataSync(URL url); + + Tuple2> requestSync(URL url); + + static DiskCache open(LLDatabaseConnection databaseConnection, String dbName, DatabaseOptions databaseOptions, Predicate shouldCache) { - return databaseConnection - .getDatabase(dbName, - List.of(ColumnUtils.dictionary("file-content"), ColumnUtils.dictionary("file-metadata")), - databaseOptions - ) - .flatMap(db -> Mono.zip( - Mono.just(db).single(), - db.getDictionary("file-content", UpdateMode.ALLOW).single(), - db.getDictionary("file-metadata", UpdateMode.ALLOW).single() - )) - .map(tuple -> new DiskCache(tuple.getT1(), tuple.getT2(), tuple.getT3(), shouldCache)) - .single(); + var db = databaseConnection.getDatabase(dbName, + List.of(ColumnUtils.dictionary("file-content"), ColumnUtils.dictionary("file-metadata")), + databaseOptions + ); + var dict1 = db.getDictionary("file-content", UpdateMode.ALLOW); + var dict2 = db.getDictionary("file-metadata", UpdateMode.ALLOW); + return new DiskCacheImpl(db, dict1, dict2, shouldCache); } - @Override - public Mono writeMetadata(URL url, Metadata metadata) { - // Check if this cache should cache the url, otherwise do nothing - if (!shouldCache.test(url)) return Mono.empty(); - - Mono keyMono = Mono.fromCallable(() -> serializeUrl(url)); - return fileMetadata - .update(keyMono, - oldValue -> Objects.requireNonNullElseGet(oldValue, - () -> serializeMetadata(new DiskMetadata(metadata.size(), - BooleanArrayList.wrap(new boolean[DiskMetadata.getBlocksCount(metadata.size(), BLOCK_SIZE)]) - )) - ), - UpdateReturnMode.NOTHING - ) - .then(); - } - - private Buffer serializeUrl(T url) { - @SuppressWarnings("unchecked") - URLSerializer urlSerializer = (URLSerializer) url.getSerializer(); - - int sizeHint = urlSerializer.getSerializedSizeHint(); - if (sizeHint == -1) sizeHint = 64; - var buffer = db.getAllocator().allocate(sizeHint); - try { - try { - urlSerializer.serialize(url, buffer); - } catch (SerializationException ex) { - throw new IllegalStateException("Failed to serialize url", ex); - } - return buffer; - } catch (Throwable ex) { - buffer.close(); - throw ex; - } - } - - private Buffer serializeMetadata(DiskMetadata diskMetadata) { - int sizeHint = diskMetadataSerializer.getSerializedSizeHint(); - if (sizeHint == -1) sizeHint = 64; - var buffer = db.getAllocator().allocate(sizeHint); - try { - try { - diskMetadataSerializer.serialize(diskMetadata, buffer); - } catch (SerializationException ex) { - throw new IllegalStateException("Failed to serialize metadata", ex); - } - return buffer; - } catch (Throwable ex) { - buffer.close(); - throw ex; - } - } - - private DiskMetadata deserializeMetadata(Buffer prevBytes) { - try { - return diskMetadataSerializer.deserialize(prevBytes); - } catch (SerializationException ex) { - throw new IllegalStateException("Failed to deserialize metadata", ex); - } - } - - @Override - public Mono writeContentBlock(URL url, DataBlock dataBlock) { - // Check if this cache should cache the url, otherwise do nothing - if (!shouldCache.test(url)) return Mono.empty(); - - Mono urlKeyMono = Mono.fromCallable(() -> serializeUrl(url)); - Mono blockKeyMono = Mono.fromCallable(() -> getBlockKey(url, dataBlock.getId())); - return Mono.using( - () -> BufSupplier.of(dataBlock::getDataCopy), - bufSupplier -> fileContent - .put(blockKeyMono, Mono.fromSupplier(bufSupplier::get), LLDictionaryResultType.VOID) - .doOnNext(Resource::close) - .then(), - BufSupplier::close - ) - .then(fileMetadata.update(urlKeyMono, prevBytes -> { - @Nullable DiskMetadata result; - if (prevBytes != null) { - DiskMetadata prevMeta = deserializeMetadata(prevBytes); - if (!prevMeta.isDownloadedBlock(dataBlock.getId())) { - BooleanArrayList bal = prevMeta.downloadedBlocks().clone(); - if (prevMeta.size() == -1) { - if (bal.size() > dataBlock.getId()) { - bal.set(dataBlock.getId(), true); - } else if (bal.size() == dataBlock.getId()) { - bal.add(true); - } else { - throw new IndexOutOfBoundsException( - "Trying to write a block too much far from the last block. Previous total blocks: " - + bal.size() + " Current block id: " + dataBlock.getId()); - } - } else { - bal.set(dataBlock.getId(), true); - } - result = new DiskMetadata(prevMeta.size(), bal); - } else { - result = prevMeta; - } - } else { - result = null; - } - if (result != null) { - return serializeMetadata(result); - } else { - return null; - } - }, UpdateReturnMode.NOTHING) - ) - .then(); - } - - @Override - public Flux requestContent(URL url) { - return this - .requestDiskMetadata(url) - .filter(DiskMetadata::isDownloadedFully) - .flatMapMany(meta -> Flux.fromStream(meta.downloadedBlocks()::stream) - .index() - // Get only downloaded blocks - .filter(Tuple2::getT2) - .flatMapSequential(blockMeta -> { - int blockId = toIntExact(blockMeta.getT1()); - boolean downloaded = blockMeta.getT2(); - if (!downloaded) { - return Mono.empty(); - } - var blockKeyMono = Mono.fromCallable(() -> getBlockKey(url, blockId)); - return fileContent - .get(null, blockKeyMono) - .map(data -> { - try (data) { - long blockOffset = getBlockOffset(blockId); - int blockLength = data.readableBytes(); - if (meta.size() != -1) { - if (blockOffset + blockLength >= meta.size()) { - if (blockOffset + blockLength > meta.size()) { - throw new IllegalStateException("Overflowed data size"); - } - } else { - // Intermediate blocks must be of max size - assert data.readableBytes() == BLOCK_SIZE; - } - } - return DataBlock.of(blockOffset, blockLength, data.send()); - } - }); - }) - ); - } - - private Buffer getBlockKey(URL url, int blockId) { - try (var urlBytes = serializeUrl(url)) { - Buffer blockIdBytes = this.db.getAllocator().allocate(Integer.BYTES); - blockIdBytes.writeInt(blockId); - return LLUtils.compositeBuffer(db.getAllocator(), urlBytes.send(), blockIdBytes.send()); - } - } - - private static long getBlockOffset(int blockId) { - return blockId * (long) BLOCK_SIZE; - } - - @Override - public Mono requestDiskMetadata(URL url) { - Mono urlKeyMono = Mono.fromCallable(() -> serializeUrl(url)); - return fileMetadata - .get(null, urlKeyMono) - .map(prevBytes -> { - try (prevBytes) { - return deserializeMetadata(prevBytes); - } - }); - } - - @Override - public Mono requestMetadata(URL url) { - return requestDiskMetadata(url) - .map(DiskMetadata::asMetadata); - } - - @Override - public Mono>> request(URL url) { - Mono urlKeyMono = Mono.fromCallable(() -> serializeUrl(url)); - return Mono - .using( - () -> serializeUrl(url), - key -> fileMetadata.get(null, urlKeyMono), - Resource::close - ) - .map(serialized -> { - DiskMetadata diskMeta; - try (serialized) { - diskMeta = deserializeMetadata(serialized); - } - var meta = diskMeta.asMetadata(); - if (diskMeta.isDownloadedFully()) { - return Tuples.of(meta, this.requestContent(url)); - } else { - return Tuples.of(meta, Flux.empty()); - } - }); - } - - public Mono close() { - return db.close(); - } - - @Override - public Mono pauseForBackup() { - return db.pauseForBackup(); - } - - @Override - public Mono resumeAfterBackup() { - return db.resumeAfterBackup(); - } - - @Override - public boolean isPaused() { - return db.isPaused(); + static DiskCache openCustom(LLKeyValueDatabase db, + LLDictionary fileContent, + LLDictionary fileMetadata, + Predicate shouldCache) { + return new DiskCacheImpl(db, fileContent, fileMetadata, shouldCache); } } diff --git a/src/main/java/org/warp/filesponge/DiskCacheImpl.java b/src/main/java/org/warp/filesponge/DiskCacheImpl.java new file mode 100644 index 0000000..85d58d6 --- /dev/null +++ b/src/main/java/org/warp/filesponge/DiskCacheImpl.java @@ -0,0 +1,325 @@ +/* + * FileSponge + * Copyright (C) 2021 Andrea Cavalli + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package org.warp.filesponge; + +import static java.lang.Math.toIntExact; +import static org.warp.filesponge.FileSponge.BLOCK_SIZE; + +import it.cavallium.dbengine.buffers.Buf; +import it.cavallium.dbengine.buffers.BufDataInput; +import it.cavallium.dbengine.buffers.BufDataOutput; +import it.cavallium.dbengine.client.IBackuppable; +import it.cavallium.dbengine.database.ColumnUtils; +import it.cavallium.dbengine.database.LLDatabaseConnection; +import it.cavallium.dbengine.database.LLDictionary; +import it.cavallium.dbengine.database.LLDictionaryResultType; +import it.cavallium.dbengine.database.LLKeyValueDatabase; +import it.cavallium.dbengine.database.UpdateMode; +import it.cavallium.dbengine.database.UpdateReturnMode; +import it.cavallium.dbengine.database.serialization.SerializationException; +import it.cavallium.dbengine.rpc.current.data.DatabaseOptions; +import it.cavallium.dbengine.utils.StreamUtils; +import it.unimi.dsi.fastutil.booleans.BooleanArrayList; +import java.util.List; +import java.util.Objects; +import java.util.function.Predicate; +import java.util.stream.Stream; +import org.jetbrains.annotations.Nullable; +import org.warp.filesponge.DiskMetadata.DiskMetadataSerializer; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; + +class DiskCacheImpl implements DiskCache { + + private final DiskMetadataSerializer diskMetadataSerializer; + + private final LLKeyValueDatabase db; + private final LLDictionary fileContent; + private final LLDictionary fileMetadata; + private final Predicate shouldCache; + + public DiskCacheImpl(LLKeyValueDatabase db, + LLDictionary fileContent, + LLDictionary fileMetadata, + Predicate shouldCache) { + this.db = db; + this.fileContent = fileContent; + this.fileMetadata = fileMetadata; + this.diskMetadataSerializer = new DiskMetadataSerializer(); + this.shouldCache = shouldCache; + } + + @Override + public Mono writeMetadata(URL url, Metadata metadata) { + return Mono.fromRunnable(() -> writeMetadataSync(url, metadata)).subscribeOn(Schedulers.boundedElastic()); + } + + @Override + public void writeMetadataSync(URL url, Metadata metadata) { + // Check if this cache should cache the url, otherwise do nothing + if (!shouldCache.test(url)) return; + + var key = serializeUrl(url); + + fileMetadata.update(key, oldValue -> { + if (oldValue != null) { + return oldValue; + } else { + return serializeMetadata(new DiskMetadata(metadata.size(), + BooleanArrayList.wrap(new boolean[DiskMetadata.getBlocksCount(metadata.size(), BLOCK_SIZE)]) + )); + } + }, UpdateReturnMode.NOTHING); + } + + private Buf serializeUrl(T url) { + @SuppressWarnings("unchecked") + URLSerializer urlSerializer = (URLSerializer) url.getSerializer(); + + int sizeHint = urlSerializer.getSerializedSizeHint(); + if (sizeHint == -1) sizeHint = 64; + var output = BufDataOutput.create(sizeHint); + try { + urlSerializer.serialize(url, output); + } catch (SerializationException ex) { + throw new IllegalStateException("Failed to serialize url", ex); + } + return output.asList(); + } + + private Buf serializeMetadata(DiskMetadata diskMetadata) { + int sizeHint = diskMetadataSerializer.getSerializedSizeHint(); + if (sizeHint == -1) sizeHint = 64; + var out = BufDataOutput.create(sizeHint); + try { + diskMetadataSerializer.serialize(diskMetadata, out); + } catch (SerializationException ex) { + throw new IllegalStateException("Failed to serialize metadata", ex); + } + return out.asList(); + } + + private DiskMetadata deserializeMetadata(Buf prevBytes) { + try { + return diskMetadataSerializer.deserialize(BufDataInput.create(prevBytes)); + } catch (SerializationException ex) { + throw new IllegalStateException("Failed to deserialize metadata", ex); + } + } + + @Override + public Mono writeContentBlock(URL url, DataBlock dataBlock) { + return Mono + .fromRunnable(() -> writeContentBlockSync(url, dataBlock)) + .subscribeOn(Schedulers.boundedElastic()); + } + + @Override + public void writeContentBlockSync(URL url, DataBlock dataBlock) { + // Check if this cache should cache the url, otherwise do nothing + if (!shouldCache.test(url)) { + return; + } + + Buf urlKey = serializeUrl(url); + Buf blockKey = getBlockKey(url, dataBlock.getId()); + + fileContent.put(blockKey, dataBlock.getData(), LLDictionaryResultType.VOID); + fileMetadata.update(urlKey, prevBytes -> { + @Nullable DiskMetadata result; + if (prevBytes != null) { + DiskMetadata prevMeta = deserializeMetadata(prevBytes); + if (!prevMeta.isDownloadedBlock(dataBlock.getId())) { + BooleanArrayList bal = prevMeta.downloadedBlocks().clone(); + if (prevMeta.size() == -1) { + if (bal.size() > dataBlock.getId()) { + bal.set(dataBlock.getId(), true); + } else if (bal.size() == dataBlock.getId()) { + bal.add(true); + } else { + throw new IndexOutOfBoundsException( + "Trying to write a block too much far from the last block. Previous total blocks: " + + bal.size() + " Current block id: " + dataBlock.getId()); + } + } else { + bal.set(dataBlock.getId(), true); + } + result = new DiskMetadata(prevMeta.size(), bal); + } else { + result = prevMeta; + } + } else { + result = null; + } + if (result != null) { + return serializeMetadata(result); + } else { + return null; + } + }, UpdateReturnMode.NOTHING); + } + + @Override + public Flux requestContent(URL url) { + return Flux.fromStream(() -> requestContentSync(url)).subscribeOn(Schedulers.boundedElastic()); + } + + @Override + public Stream requestContentSync(URL url) { + record BlockMeta(int blockId, boolean downloaded) {} + var meta = this.requestDiskMetadataSync(url); + if (meta == null || !meta.isDownloadedFully()) { + return Stream.empty(); + } + return StreamUtils.indexed(meta.downloadedBlocks().stream(), + (downloaded, blockId) -> new BlockMeta(toIntExact(blockId), downloaded) + ) + // Get only downloaded blocks + .filter(BlockMeta::downloaded).map(blockMeta -> { + if (!blockMeta.downloaded) { + return null; + } + var blockKey = getBlockKey(url, blockMeta.blockId); + var data = fileContent.get(null, blockKey); + long blockOffset = getBlockOffset(blockMeta.blockId); + int blockLength = data.size(); + if (meta.size() != -1) { + if (blockOffset + blockLength >= meta.size()) { + if (blockOffset + blockLength > meta.size()) { + throw new IllegalStateException("Overflowed data size"); + } + } else { + // Intermediate blocks must be of max size + assert data.size() == BLOCK_SIZE; + } + } + return DataBlock.of(blockOffset, blockLength, data); + }).filter(Objects::nonNull); + } + + private Buf getBlockKey(T url, int blockId) { + //noinspection unchecked + URLSerializer urlSerializer = (URLSerializer) url.getSerializer(); + + int urlSizeHint = urlSerializer.getSerializedSizeHint(); + if (urlSizeHint == -1) { + urlSizeHint = 64; + } + + var sizeHint = urlSizeHint + Integer.BYTES; + var out = BufDataOutput.create(sizeHint); + + try { + urlSerializer.serialize(url, out); + } catch (SerializationException ex) { + throw new IllegalStateException("Failed to serialize url", ex); + } + + out.writeInt(blockId); + + return out.asList(); + } + + private static long getBlockOffset(int blockId) { + return blockId * (long) BLOCK_SIZE; + } + + @Override + public Mono requestDiskMetadata(URL url) { + return Mono.fromCallable(() -> requestDiskMetadataSync(url)).subscribeOn(Schedulers.boundedElastic()); + } + + @Override + public DiskMetadata requestDiskMetadataSync(URL url) { + Buf urlKey = serializeUrl(url); + var prevBytes = fileMetadata.get(null, urlKey); + if (prevBytes != null) { + return deserializeMetadata(prevBytes); + } else { + return null; + } + } + + @Override + public Mono requestMetadata(URL url) { + return requestDiskMetadata(url).map(DiskMetadata::asMetadata); + } + + @Override + public Metadata requestMetadataSync(URL url) { + var metadata = requestDiskMetadataSync(url); + if (metadata != null) { + return metadata.asMetadata(); + } else { + return null; + } + } + + @Override + public Mono>> request(URL url) { + return Mono + .fromCallable(() -> { + var tuple = requestSync(url); + if (tuple == null) { + return null; + } + return tuple.mapT2(s -> Flux.fromStream(s).subscribeOn(Schedulers.boundedElastic())); + }) + .subscribeOn(Schedulers.boundedElastic()); + } + + @Override + public Tuple2> requestSync(URL url) { + Buf urlKey = serializeUrl(url); + var serialized = fileMetadata.get(null, urlKey); + if (serialized == null) { + return null; + } + DiskMetadata diskMeta = deserializeMetadata(serialized); + var meta = diskMeta.asMetadata(); + if (diskMeta.isDownloadedFully()) { + return Tuples.of(meta, this.requestContentSync(url)); + } else { + return Tuples.of(meta, Stream.empty()); + } + } + + @Override + public void close() { + db.close(); + } + + @Override + public void pauseForBackup() { + db.pauseForBackup(); + } + + @Override + public void resumeAfterBackup() { + db.resumeAfterBackup(); + } + + @Override + public boolean isPaused() { + return db.isPaused(); + } +} diff --git a/src/main/java/org/warp/filesponge/DiskMetadata.java b/src/main/java/org/warp/filesponge/DiskMetadata.java index e147f1c..96f8b3a 100644 --- a/src/main/java/org/warp/filesponge/DiskMetadata.java +++ b/src/main/java/org/warp/filesponge/DiskMetadata.java @@ -20,12 +20,8 @@ package org.warp.filesponge; import static java.lang.Math.toIntExact; -import io.netty5.buffer.Buffer; -import io.netty5.buffer.BufferAllocator; -import io.netty5.util.Send; -import it.cavallium.dbengine.database.serialization.BufferDataInputOwned; -import it.cavallium.dbengine.database.serialization.BufferDataInputShared; -import it.cavallium.dbengine.database.serialization.BufferDataOutput; +import it.cavallium.dbengine.buffers.BufDataInput; +import it.cavallium.dbengine.buffers.BufDataOutput; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.Serializer; import it.unimi.dsi.fastutil.booleans.BooleanArrayList; @@ -81,8 +77,7 @@ public record DiskMetadata(long size, BooleanArrayList downloadedBlocks) { public static class DiskMetadataSerializer implements Serializer { @Override - public @NotNull DiskMetadata deserialize(@NotNull Buffer serialized) throws SerializationException { - var dis = new BufferDataInputShared(serialized); + public @NotNull DiskMetadata deserialize(@NotNull BufDataInput dis) throws SerializationException { int legacySize = dis.readInt(); long size; if (legacySize == -2) { @@ -104,8 +99,7 @@ public record DiskMetadata(long size, BooleanArrayList downloadedBlocks) { } @Override - public void serialize(@NotNull DiskMetadata deserialized, Buffer output) throws SerializationException { - var dos = new BufferDataOutput(output); + public void serialize(@NotNull DiskMetadata deserialized, BufDataOutput dos) throws SerializationException { dos.writeInt(-2); dos.writeLong(deserialized.size); var blocksCount = deserialized.getBlocksCount(); diff --git a/src/main/java/org/warp/filesponge/URL.java b/src/main/java/org/warp/filesponge/URL.java index 94a7164..c86ae77 100644 --- a/src/main/java/org/warp/filesponge/URL.java +++ b/src/main/java/org/warp/filesponge/URL.java @@ -18,7 +18,6 @@ package org.warp.filesponge; -import io.netty5.buffer.BufferAllocator; import it.cavallium.dbengine.database.serialization.Serializer; public interface URL { diff --git a/src/main/java/org/warp/filesponge/URLSerializer.java b/src/main/java/org/warp/filesponge/URLSerializer.java index a0631c1..f5d015a 100644 --- a/src/main/java/org/warp/filesponge/URLSerializer.java +++ b/src/main/java/org/warp/filesponge/URLSerializer.java @@ -18,11 +18,9 @@ package org.warp.filesponge; -import io.netty5.buffer.Buffer; -import io.netty5.buffer.BufferAllocator; -import it.cavallium.dbengine.database.serialization.BufferDataOutput; +import it.cavallium.dbengine.buffers.Buf; +import it.cavallium.dbengine.buffers.BufDataOutput; import it.cavallium.dbengine.database.serialization.SerializationException; -import java.nio.charset.StandardCharsets; import org.jetbrains.annotations.NotNull; public interface URLSerializer { @@ -30,7 +28,7 @@ public interface URLSerializer { /** * @param output its writable size will be at least equal to the size hint */ - void serialize(@NotNull T url, Buffer output) throws SerializationException; + void serialize(@NotNull T url, BufDataOutput output) throws SerializationException; /** * @return hint about the expected size of the buffer diff --git a/src/main/java/org/warp/filesponge/URLStringSerializer.java b/src/main/java/org/warp/filesponge/URLStringSerializer.java index 9650d2f..5891595 100644 --- a/src/main/java/org/warp/filesponge/URLStringSerializer.java +++ b/src/main/java/org/warp/filesponge/URLStringSerializer.java @@ -18,18 +18,17 @@ package org.warp.filesponge; -import io.netty5.buffer.Buffer; -import it.cavallium.dbengine.database.serialization.BufferDataOutput; +import it.cavallium.dbengine.buffers.Buf; +import it.cavallium.dbengine.buffers.BufDataOutput; import it.cavallium.dbengine.database.serialization.SerializationException; import org.jetbrains.annotations.NotNull; public abstract class URLStringSerializer implements URLSerializer { @Override - public final void serialize(@NotNull T url, Buffer output) throws SerializationException { + public final void serialize(@NotNull T url, BufDataOutput output) throws SerializationException { var string = this.serialize(url); - var dataOut = new BufferDataOutput(output); - dataOut.writeUTF(string); + output.writeUTF(string); } public abstract @NotNull String serialize(@NotNull T url); diff --git a/src/test/java/org/warp/filesponge/ThreadSafety.java b/src/test/java/org/warp/filesponge/ThreadSafety.java index eca4bf4..9db7b12 100644 --- a/src/test/java/org/warp/filesponge/ThreadSafety.java +++ b/src/test/java/org/warp/filesponge/ThreadSafety.java @@ -28,7 +28,7 @@ public class ThreadSafety { return s; })).subscribeOn(schedulerSingle)) .ignoreElements() - .thenMany(Flux.defer(() -> Flux.fromStream(list::stream))) + .thenMany(Flux.defer(() -> Flux.fromStream(list::stream).subscribeOn(Schedulers.boundedElastic()))) .subscribeOn(schedulerParallel); Integer[] checks = new Integer[iterations * 2];