This commit is contained in:
Andrea Cavalli 2021-03-07 15:50:11 +01:00
parent e25ad84ac2
commit 91694960e4
31 changed files with 564 additions and 1085 deletions

18
pom.xml
View File

@ -34,6 +34,19 @@
<maven.compiler.>11</maven.compiler.> <maven.compiler.>11</maven.compiler.>
</properties> </properties>
<repositories>
<repository>
<id>mchv-release</id>
<name>MCHV Release Apache Maven Packages</name>
<url>https://mvn.mchv.eu/repository/mchv</url>
</repository>
<repository>
<id>mchv-snapshot</id>
<name>MCHV Snapshot Apache Maven Packages</name>
<url>https://mvn.mchv.eu/repository/mchv-snapshot</url>
</repository>
</repositories>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.junit.jupiter</groupId> <groupId>org.junit.jupiter</groupId>
@ -67,6 +80,11 @@
<version>3.4.2</version> <version>3.4.2</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>it.cavallium</groupId>
<artifactId>dbengine</artifactId>
<version>3.0.0-SNAPSHOT</version>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -1,35 +0,0 @@
/*
* FileSponge
* Copyright (C) 2020 Andrea Cavalli
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.warp.filesponge;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import org.warp.filesponge.api.FileSource;
import org.warp.filesponge.value.FileType;
import org.warp.filesponge.value.FileURI;
import org.warp.filesponge.value.MirrorURI;
@AllArgsConstructor(access = AccessLevel.PROTECTED)
public abstract class BaseMirrorFileSource<FURI extends FileURI, FTYPE extends FileType> implements
FileSource<FURI, FTYPE> {
protected final MirrorAvailabilityManager receiverAvailabilityManager;
protected final MirrorURI mirrorURI;
}

View File

@ -0,0 +1,35 @@
/*
* FileSponge
* Copyright (C) 2021 Andrea Cavalli
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.warp.filesponge;
import java.nio.ByteBuffer;
import lombok.Value;
@Value
public class DataBlock {
int offset;
int length;
ByteBuffer data;
public int getId() {
return offset / Web.BLOCK_SIZE;
}
}

View File

@ -0,0 +1,182 @@
/*
* FileSponge
* Copyright (C) 2021 Andrea Cavalli
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.warp.filesponge;
import static org.warp.filesponge.Web.BLOCK_SIZE;
import com.google.common.primitives.Ints;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.LLDatabaseConnection;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.UpdateMode;
import it.unimi.dsi.fastutil.booleans.BooleanArrayList;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import org.warp.filesponge.DiskMetadata.DiskMetadataSerializer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
@AllArgsConstructor(access = AccessLevel.PRIVATE)
public class DiskCache implements URLsDiskHandler, URLsWriter {
private static final DiskMetadataSerializer diskMetadataSerializer = new DiskMetadataSerializer();
private final LLKeyValueDatabase db;
private final LLDictionary fileContent;
private final LLDictionary fileMetadata;
public static Mono<DiskCache> open(LLDatabaseConnection databaseConnection, String dbName, boolean lowMemory) {
return databaseConnection
.getDatabase(dbName, List.of(Column.dictionary("file-content"), Column.dictionary("file-metadata")), lowMemory)
.flatMap(db -> Mono.zip(
Mono.just(db).single(),
db.getDictionary("file-content", UpdateMode.ALLOW).single(),
db.getDictionary("file-metadata", UpdateMode.ALLOW).single()
))
.map(tuple -> new DiskCache(tuple.getT1(), tuple.getT2(), tuple.getT3()))
.single();
}
@Override
public Mono<Void> writeMetadata(URL url, Metadata metadata) {
return fileMetadata
.update(url.getSerializer().serialize(url), oldValue -> {
if (oldValue.isPresent()) {
return oldValue;
} else {
return Optional
.of(new DiskMetadata(
metadata.getSize(),
new BooleanArrayList(DiskMetadata.getBlocksCount(metadata.getSize(), BLOCK_SIZE))
))
.map(diskMetadataSerializer::serialize);
}
})
.then();
}
@Override
public Mono<Void> writeContentBlock(URL url, DataBlock dataBlock) {
return Mono
.fromCallable(() -> {
byte[] bytes = new byte[dataBlock.getLength()];
dataBlock.getData().get(bytes);
return bytes;
}).subscribeOn(Schedulers.boundedElastic())
.flatMap(bytes -> fileContent.put(getBlockKey(url, dataBlock.getId()), bytes, LLDictionaryResultType.VOID))
.then(fileMetadata
.update(url.getSerializer().serialize(url), prevBytes -> prevBytes
.map(diskMetadataSerializer::deserialize)
.map(prevMeta -> {
if (!prevMeta.getDownloadedBlocks().getBoolean(dataBlock.getId())) {
BooleanArrayList bal = prevMeta.getDownloadedBlocks().clone();
bal.add(dataBlock.getId(), true);
return new DiskMetadata(prevMeta.getSize(), bal);
} else {
return prevMeta;
}
})
.map(diskMetadataSerializer::serialize)
)
)
.then();
}
@Override
public Flux<DataBlock> requestContent(URL url) {
return requestDiskMetadata(url)
.filter(DiskMetadata::isDownloadedFully)
.flatMapMany(meta -> Flux.fromIterable(meta.getDownloadedBlocks()))
.index()
// Get only downloaded blocks
.filter(Tuple2::getT2)
.flatMapSequential(blockMeta -> {
int blockId = Math.toIntExact(blockMeta.getT1());
boolean downloaded = blockMeta.getT2();
if (!downloaded) {
return Mono.empty();
}
return fileContent.get(null, getBlockKey(url, blockId)).map(data -> {
int blockOffset = getBlockOffset(blockId);
int blockLength = data.length;
if (blockOffset + blockLength >= blockMeta.size()) {
if (blockOffset + blockLength > blockMeta.size()) {
throw new IllegalStateException("Overflowed data size");
}
} else if (data.length != BLOCK_SIZE) {
throw new IllegalStateException("Block data length != block length");
}
return new DataBlock(blockOffset, blockLength, ByteBuffer.wrap(data, 0, blockLength));
});
});
}
private byte[] getBlockKey(URL url, int blockId) {
byte[] urlBytes = url.getSerializer().serialize(url);
byte[] blockIdBytes = Ints.toByteArray(blockId);
byte[] resultBytes = Arrays.copyOf(urlBytes, urlBytes.length);
System.arraycopy(blockIdBytes, 0, resultBytes, urlBytes.length, blockIdBytes.length);
return resultBytes;
}
private static int getBlockOffset(int blockId) {
return blockId * BLOCK_SIZE;
}
@Override
public Mono<DiskMetadata> requestDiskMetadata(URL url) {
return fileMetadata
.get(null, url.getSerializer().serialize(url))
.map(diskMetadataSerializer::deserialize);
}
@Override
public Mono<Metadata> requestMetadata(URL url) {
return requestDiskMetadata(url)
.map(DiskMetadata::asMetadata);
}
@Override
public Mono<Tuple2<Metadata, Flux<DataBlock>>> request(URL url) {
return fileMetadata
.get(null, url.getSerializer().serialize(url))
.map(diskMetadataSerializer::deserialize)
.map(diskMeta -> {
var meta = diskMeta.asMetadata();
if (diskMeta.isDownloadedFully()) {
return Tuples.of(meta, this.requestContent(url));
} else {
return Tuples.of(meta, Flux.empty());
}
});
}
public Mono<Void> close() {
return db.close();
}
}

View File

@ -0,0 +1,103 @@
/*
* FileSponge
* Copyright (C) 2021 Andrea Cavalli
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.warp.filesponge;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.unimi.dsi.fastutil.booleans.BooleanArrayList;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import lombok.Data;
import lombok.SneakyThrows;
import org.jetbrains.annotations.NotNull;
@Data
public class DiskMetadata {
/**
* -1 = unknown size
*/
private final int size;
private final BooleanArrayList downloadedBlocks;
private Boolean downloadedFully;
public boolean isDownloadedFully() {
if (downloadedFully == null) {
// Ensure blocks count is valid by calling getBlocksCount()
getBlocksCount();
// It's fully downloaded if every block is true
downloadedFully = !this.getDownloadedBlocks().contains(false);
}
return downloadedFully;
}
private int getBlocksCount() {
var expectedBlocksCount = getBlocksCount(size, Web.BLOCK_SIZE);
if (this.getDownloadedBlocks().size() != expectedBlocksCount) {
throw new IllegalStateException("Blocks array length != expected blocks count");
}
return expectedBlocksCount;
}
public static int getBlocksCount(int size, int blockSize) {
return (size + (blockSize - size % blockSize)) / blockSize;
}
public Metadata asMetadata() {
return new Metadata(size);
}
public static class DiskMetadataSerializer implements Serializer<DiskMetadata, byte[]> {
@SneakyThrows
@Override
public @NotNull DiskMetadata deserialize(byte @NotNull [] serialized) {
var bais = new ByteArrayInputStream(serialized);
var dis = new DataInputStream(bais);
int size = dis.readInt();
int blocksCount = getBlocksCount(size, Web.BLOCK_SIZE);
boolean[] downloadedBlocks = new boolean[blocksCount];
for (int i = 0; i < blocksCount; i++) {
downloadedBlocks[i] = dis.readBoolean();
}
return new DiskMetadata(size, BooleanArrayList.wrap(downloadedBlocks, blocksCount));
}
@SneakyThrows
@Override
public byte @NotNull [] serialize(@NotNull DiskMetadata deserialized) {
try (var bos = new ByteArrayOutputStream(Integer.BYTES * 2)) {
try (var dos = new DataOutputStream(bos)) {
dos.writeInt(deserialized.getSize());
if (deserialized.getDownloadedBlocks().size() != deserialized.getBlocksCount()) {
throw new IllegalStateException("Blocks array length != expected blocks count");
}
for (boolean downloadedBlock : deserialized.getDownloadedBlocks()) {
dos.writeBoolean(downloadedBlock);
}
}
return bos.toByteArray();
}
}
}
}

View File

@ -1,77 +0,0 @@
/*
* FileSponge
* Copyright (C) 2020 Andrea Cavalli
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.warp.filesponge;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import org.warp.filesponge.reactor.AsyncMultiAssociation;
import org.warp.filesponge.value.FileURI;
import org.warp.filesponge.value.MirrorURI;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
@AllArgsConstructor()
public class FileMirrorsManager {
private final Scheduler fileMirrorsManagerScheduler = Schedulers.single();
private final MirrorAvailabilityManager mirrorAvailabilityManager;
/**
* This map must be persistent
*/
private final AsyncMultiAssociation<FileURI, MirrorURI> fileMirrors;
public Mono<Set<MirrorURI>> getAvailableMirrors(FileURI fileURI) {
return fileMirrors
.getLinks(fileURI)
.filterWhen(mirrorAvailabilityManager::isMirrorAvailable)
.collect(Collectors.toUnmodifiableSet())
.subscribeOn(fileMirrorsManagerScheduler);
}
public Mono<Boolean> hasAnyAvailableMirror(FileURI uri) {
return fileMirrors
.getLinks(uri)
.filterWhen(mirrorAvailabilityManager::isMirrorAvailable)
.hasElements()
.subscribeOn(fileMirrorsManagerScheduler);
}
public Mono<Void> addMirror(FileURI uri, MirrorURI mirrorURI) {
return fileMirrors
.link(uri, mirrorURI)
.then()
.subscribeOn(fileMirrorsManagerScheduler);
}
public Mono<Void> removeMirror(FileURI uri, MirrorURI mirrorURI) {
return fileMirrors
.unlink(uri, mirrorURI)
.then()
.subscribeOn(fileMirrorsManagerScheduler);
}
public Mono<Void> unsetAllFiles() {
return fileMirrors.clear()
.subscribeOn(fileMirrorsManagerScheduler);
}
}

View File

@ -1,6 +1,6 @@
/* /*
* FileSponge * FileSponge
* Copyright (C) 2020 Andrea Cavalli * Copyright (C) 2021 Andrea Cavalli
* *
* This program is free software: you can redistribute it and/or modify * This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by * it under the terms of the GNU General Public License as published by
@ -16,12 +16,16 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>. * along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
package org.warp.filesponge.value; package org.warp.filesponge;
import lombok.Value;
@Value
public class Metadata {
/**
* -1 = unknown size
*/
int size;
public enum FileAvailability {
UNKNOWN,
DOWNLOADABLE,
DOWNLOADING,
DOWNLOADED,
FAILED
} }

View File

@ -1,48 +0,0 @@
/*
* FileSponge
* Copyright (C) 2020 Andrea Cavalli
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.warp.filesponge;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import org.warp.filesponge.value.MirrorURI;
import org.warp.filesponge.reactor.ConcurrentAsyncSet;
import org.warp.filesponge.reactor.AsyncSet;
import reactor.core.publisher.Mono;
@AllArgsConstructor(access = AccessLevel.PUBLIC)
public class MirrorAvailabilityManager {
private final AsyncSet<MirrorURI> availableMirrors = new ConcurrentAsyncSet<>();
public Mono<Void> setAllMirrorsAsUnavailable() {
return availableMirrors.clear();
}
public Mono<Void> setMirrorAvailability(MirrorURI mirrorURI, boolean available) {
if (available) {
return availableMirrors.add(mirrorURI).then();
} else {
return availableMirrors.remove(mirrorURI).then();
}
}
public Mono<Boolean> isMirrorAvailable(MirrorURI mirrorURI) {
return this.availableMirrors.contains(mirrorURI);
}
}

View File

@ -1,58 +0,0 @@
/*
* FileSponge
* Copyright (C) 2020 Andrea Cavalli
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.warp.filesponge;
import java.nio.ByteBuffer;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import org.jetbrains.annotations.NotNull;
import org.warp.filesponge.api.FileAccessor;
import org.warp.filesponge.value.FileStatus;
import org.warp.filesponge.value.FileURI;
import reactor.core.publisher.Mono;
/**
* Prevent access to other methods via casting
*/
@AllArgsConstructor
@EqualsAndHashCode
public class SecureFileAccessor<FURI extends FileURI> implements FileAccessor<FURI> {
private final FileAccessor<FURI> fileAccessor;
@Override
public Mono<Void> delete(@NotNull FURI fileURI) {
return fileAccessor.delete(fileURI);
}
@Override
public Mono<ByteBuffer> getContent(@NotNull FURI fileURI, boolean offlineOnly) {
return fileAccessor.getContent(fileURI, offlineOnly);
}
@Override
public @NotNull Mono<FileStatus> getStatus(@NotNull FURI fileURI) {
return fileAccessor.getStatus(fileURI);
}
@Override
public String toString() {
return fileAccessor.toString();
}
}

View File

@ -1,6 +1,6 @@
/* /*
* FileSponge * FileSponge
* Copyright (C) 2020 Andrea Cavalli * Copyright (C) 2021 Andrea Cavalli
* *
* This program is free software: you can redistribute it and/or modify * This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by * it under the terms of the GNU General Public License as published by
@ -16,10 +16,12 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>. * along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
package org.warp.filesponge.value; package org.warp.filesponge;
import it.cavallium.dbengine.database.serialization.Serializer;
public interface URL {
Serializer<URL, byte[]> getSerializer();
public enum FileDataAvailability {
UNAVAILABLE,
PARTIAL,
FULL
} }

View File

@ -1,6 +1,6 @@
/* /*
* FileSponge * FileSponge
* Copyright (C) 2020 Andrea Cavalli * Copyright (C) 2021 Andrea Cavalli
* *
* This program is free software: you can redistribute it and/or modify * This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by * it under the terms of the GNU General Public License as published by
@ -16,11 +16,12 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>. * along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
package org.warp.filesponge.value; package org.warp.filesponge;
import reactor.core.publisher.Mono;
public interface URLDiskHandler extends URLHandler {
Mono<DiskMetadata> requestDiskMetadata();
public enum FileSourceAvailability {
DOWNLOADABLE,
DOWNLOADING,
DOWNLOADED,
FAILED
} }

View File

@ -16,23 +16,21 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>. * along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
package org.warp.filesponge.value; package org.warp.filesponge;
public class AlreadyAssignedException extends Exception { import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
public AlreadyAssignedException() { public interface URLHandler {
super();
Flux<DataBlock> requestContent();
Mono<Metadata> requestMetadata();
default Mono<Tuple2<Metadata, Flux<DataBlock>>> request() {
return requestMetadata().map(metadata -> Tuples.of(metadata, requestContent()));
} }
public AlreadyAssignedException(String message) {
super(message);
}
public AlreadyAssignedException(String message, Throwable cause) {
super(message, cause);
}
public AlreadyAssignedException(Throwable cause) {
super(cause);
}
} }

View File

@ -1,6 +1,6 @@
/* /*
* FileSponge * FileSponge
* Copyright (C) 2020 Andrea Cavalli * Copyright (C) 2021 Andrea Cavalli
* *
* This program is free software: you can redistribute it and/or modify * This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by * it under the terms of the GNU General Public License as published by
@ -16,6 +16,14 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>. * along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
package org.warp.filesponge.value; package org.warp.filesponge;
public interface FileType {} import reactor.core.publisher.Mono;
public interface URLWriter {
Mono<Void> writeMetadata(Metadata metadata);
Mono<Void> writeContentBlock(DataBlock dataBlock);
}

View File

@ -16,40 +16,32 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>. * along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
package org.warp.filesponge.reactor; package org.warp.filesponge;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
public interface AsyncMultiAssociation<T, U> { public interface URLsDiskHandler extends URLsHandler {
Mono<Boolean> link(T var1, U var2); Mono<DiskMetadata> requestDiskMetadata(URL url);
Mono<Boolean> unlink(T var1, U var2); default URLDiskHandler asURLDiskHandler(URL url) {
return new URLDiskHandler() {
@Override
public Mono<DiskMetadata> requestDiskMetadata() {
return URLsDiskHandler.this.requestDiskMetadata(url);
}
Flux<U> unlink(T var1); @Override
public Flux<DataBlock> requestContent() {
return URLsDiskHandler.this.requestContent(url);
}
Flux<T> unlinkFromSource(U var1); @Override
public Mono<Metadata> requestMetadata() {
default Mono<Boolean> hasAnyLink(T src) { return URLsDiskHandler.this.requestMetadata(url);
return this.getLinks(src).hasElements(); }
};
} }
default Mono<Boolean> hasAnyLinkSource(U dest) {
return this.getLinkSources(dest).hasElements();
}
Mono<Boolean> hasLink(T var1, U var2);
Flux<U> getLinks(T var1);
Flux<T> getLinkSources(U var1);
Mono<Void> clear();
Mono<Integer> size();
Flux<T> getSources();
Flux<U> getDestinations();
} }

View File

@ -0,0 +1,50 @@
/*
* FileSponge
* Copyright (C) 2021 Andrea Cavalli
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.warp.filesponge;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
public interface URLsHandler {
Flux<DataBlock> requestContent(URL url);
Mono<Metadata> requestMetadata(URL url);
default Mono<Tuple2<Metadata, Flux<DataBlock>>> request(URL url) {
return requestMetadata(url).map(metadata -> Tuples.of(metadata, requestContent(url)));
}
default URLHandler asURLHandler(URL url) {
return new URLHandler() {
@Override
public Flux<DataBlock> requestContent() {
return URLsHandler.this.requestContent(url);
}
@Override
public Mono<Metadata> requestMetadata() {
return URLsHandler.this.requestMetadata(url);
}
};
}
}

View File

@ -1,6 +1,6 @@
/* /*
* FileSponge * FileSponge
* Copyright (C) 2020 Andrea Cavalli * Copyright (C) 2021 Andrea Cavalli
* *
* This program is free software: you can redistribute it and/or modify * This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by * it under the terms of the GNU General Public License as published by
@ -16,25 +16,28 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>. * along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
package org.warp.filesponge.api; package org.warp.filesponge;
import org.warp.filesponge.SecureFileAccessor;
import org.warp.filesponge.value.FileURI;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
/** public interface URLsWriter {
* FileAccessor can be used to manage FileSponge and access files from the client side
*/
public interface FileSpongeClient<FURI extends FileURI> extends FileAccessor<FURI> {
Mono<Void> optimizeStorage(); Mono<Void> writeMetadata(URL url, Metadata metadata);
/** Mono<Void> writeContentBlock(URL url, DataBlock dataBlock);
* Get this instance but without special methods
* default URLWriter getUrlWriter(URL url) {
* @return limited instance of itself return new URLWriter() {
*/ @Override
default FileAccessor<FURI> asFileAccessor() { public Mono<Void> writeMetadata(Metadata metadata) {
return new SecureFileAccessor<>(this); return URLsWriter.this.writeMetadata(url, metadata);
}
@Override
public Mono<Void> writeContentBlock(DataBlock dataBlock) {
return URLsWriter.this.writeContentBlock(url, dataBlock);
}
};
} }
} }

View File

@ -0,0 +1,87 @@
/*
* FileSponge
* Copyright (C) 2021 Andrea Cavalli
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.warp.filesponge;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class Web implements URLsHandler {
public static final int BLOCK_SIZE = 8 * 1024 * 1024; // 8 MiB
public static final int MAX_BLOCKS = 256; // 2 GiB
private final Set<URLsHandler> urlsHandlers = new ConcurrentHashMap<URLsHandler, Object>().keySet(new Object());
private final Set<URLsDiskHandler> cacheAccess = new ConcurrentHashMap<URLsDiskHandler, Object>().keySet(new Object());
private final Set<URLsWriter> cacheWrite = new ConcurrentHashMap<URLsWriter, Object>().keySet(new Object());
public Web() {
}
public Mono<Void> registerSource(URLsHandler urLsHandler) {
return Mono.fromRunnable(() -> urlsHandlers.add(urLsHandler));
}
public <T extends URLsDiskHandler & URLsWriter> Mono<Void> registerCache(T urlsCache) {
return Mono.fromRunnable(() -> {
cacheAccess.add(urlsCache);
cacheWrite.add(urlsCache);
});
}
@Override
public Flux<DataBlock> requestContent(URL url) {
return Flux
.fromIterable(cacheAccess)
.flatMap(urlsHandler -> urlsHandler.requestContent(url))
.switchIfEmpty(Flux
.fromIterable(urlsHandlers)
.flatMap(urlsHandler -> urlsHandler
.requestContent(url)
.flatMapSequential(dataBlock -> Flux
.fromIterable(cacheWrite)
.flatMapSequential(cw -> cw.writeContentBlock(url, dataBlock))
.then(Mono.just(dataBlock))
)
)
)
.distinct(DataBlock::getId);
}
@Override
public Mono<Metadata> requestMetadata(URL url) {
return Mono.from(Flux
.fromIterable(cacheAccess)
.flatMap(urlsHandler -> urlsHandler.requestMetadata(url))
.switchIfEmpty(Flux
.fromIterable(urlsHandlers)
.flatMap(urlsHandler -> urlsHandler
.requestMetadata(url)
.flatMap(dataBlock -> Flux
.fromIterable(cacheWrite)
.flatMapSequential(cw -> cw.writeMetadata(url, dataBlock))
.then(Mono.just(dataBlock))
)
)
));
}
}

View File

@ -1,57 +0,0 @@
/*
* FileSponge
* Copyright (C) 2020 Andrea Cavalli
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.warp.filesponge.api;
import java.nio.ByteBuffer;
import org.jetbrains.annotations.NotNull;
import org.warp.filesponge.value.FileStatus;
import org.warp.filesponge.value.FileURI;
import reactor.core.publisher.Mono;
/**
* FileAccessor can be used to access files from the client side
*/
public interface FileAccessor<FURI extends FileURI> {
/**
* Request file deletion
*
* @param fileURI File URI
* @return Empty.
*/
Mono<Void> delete(@NotNull FURI fileURI);
/**
* Get file content
*
* @param fileURI File URI
* @param offlineOnly true to get the file from cache
* @return content if found. If the request is offline the future will complete instantly.
* Can be empty
*/
Mono<ByteBuffer> getContent(@NotNull FURI fileURI, boolean offlineOnly);
/**
* Get file status
*
* @param fileURI File URI
* @return status of this file. Cannot be empty.
*/
Mono<FileStatus> getStatus(@NotNull FURI fileURI);
}

View File

@ -1,72 +0,0 @@
/*
* FileSponge
* Copyright (C) 2020 Andrea Cavalli
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.warp.filesponge.api;
import java.time.Duration;
import org.warp.filesponge.value.FileURI;
import reactor.core.publisher.Mono;
/**
* FileActor sends signals to a mirror
*/
public interface FileActor<FURI extends FileURI> {
/**
* Send a "delete file" signal
*
* @param fileURI File URI
* @return true if the signal can be sent. Cannot be empty.
*/
Mono<Boolean> deleteFile(FURI fileURI);
/**
* Send a "download file" signal
*
* @param fileURI File URI
* @return true if the signal can be sent. Cannot be empty.
*/
Mono<Boolean> downloadFile(FURI fileURI);
/**
* Check if this actor can handle signals for this file
*
* @param fileURI File URI
* @return true if the actor can send signals related to this file. Cannot be empty.
*/
Mono<Boolean> canHandleFile(FURI fileURI);
/**
* Send a "download file" signal
*
* @param timeout if it's 0 the method will return immediately, if it's set the method will wait until a file
* <b>download request</b> has been found, or the timeout time elapsed
* @return empty if no pending <b>download requests</b> has been found, true if the signal can be sent, false
* otherwise
*/
Mono<Boolean> downloadNextFile(Duration timeout);
/**
* Send a "delete file" signal
*
* @param timeout if it's 0 the method will return immediately, if it's set the method will wait until a file
* <b>delete request</b> has been found, or the timeout time elapsed
* @return empty if no pending <b>delete requests</b> has been found, true if the signal can be sent, false otherwise
*/
Mono<Boolean> deleteNextFile(Duration timeout);
}

View File

@ -1,62 +0,0 @@
/*
* FileSponge
* Copyright (C) 2020 Andrea Cavalli
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.warp.filesponge.api;
import java.nio.ByteBuffer;
import org.jetbrains.annotations.NotNull;
import org.warp.filesponge.value.FileSourceAvailability;
import org.warp.filesponge.value.FileType;
import org.warp.filesponge.value.FileURI;
import reactor.core.publisher.Mono;
/**
* FileSource receives responses from a mirror
*/
public interface FileSource<FURI extends FileURI, FTYPE extends FileType> {
/**
* Called when the mirror is online
*/
Mono<Void> onAvailable();
/**
* Called when the mirror is unreachable
*/
Mono<Void> onUnavailable();
/**
* Called when the mirror notifies you that a new file exists. Cannot be empty.
*/
Mono<Boolean> onNewFile(@NotNull FURI fileURI, @NotNull FTYPE fileType);
/**
* Called when the mirror notifies you details about a file.
* <p>
* {@link FileSource#onNewFile(FURI, FTYPE)} must have been already called
*/
Mono<Void> onFile(@NotNull FURI fileURI, @NotNull FileSourceAvailability fileAvailability, long totalSize);
/**
* Called when the mirror notifies you the bytes of a part of a file.
* <p>
* {@link FileSource#onNewFile(FURI, FTYPE)} and {@link FileSource#onFile(FURI, FileSourceAvailability, long)} must
* have been already called
*/
Mono<Void> onFilePiece(@NotNull FURI fileURI, long offset, long size, @NotNull ByteBuffer piece);
}

View File

@ -1,76 +0,0 @@
/*
* FileSponge
* Copyright (C) 2020 Andrea Cavalli
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.warp.filesponge.api;
import java.nio.ByteBuffer;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.warp.filesponge.value.FileType;
import org.warp.filesponge.value.FileURI;
import org.warp.filesponge.value.MirrorURI;
import reactor.core.publisher.Mono;
public interface FileStorage<FURI extends FileURI, FTYPE extends FileType, MURI extends MirrorURI> {
Mono<Void> newFile(@NotNull FURI fileURI, @NotNull FTYPE fileType);
/**
* Read file data.
* Fails if not all the file data is available.
* @param fileURI File URI
* @return read-only file data
* @throws java.util.NoSuchElementException if file is not existing, or some requested data is missing
*/
Mono<ByteBuffer> readFileData(@NotNull FURI fileURI);
/**
* Read a part of file data.
* Fails if not all the requested file data is available.
* @param fileURI File URI
* @param offset offset of the current data segment
* @param size current data segment size
* @return read-only file data part
* @throws java.util.NoSuchElementException if file is not existing, or some requested data is missing
* @throws org.warp.commonutils.error.IndexOutOfBoundsException if requested offset or size is not valid
*/
Mono<ByteBuffer> readFileDataPart(@NotNull FURI fileURI, long offset, long size);
/**
* Set a part of file data.
* If file size is 0, the file will be deleted.
* @param fileURI File URI
* @param offset offset of the current data segment
* @param size current data segment size
* @param bytes data segment, can be null if totalSize is 0
* @param totalSize total file size
* @return nothing
*/
Mono<Void> setFileData(@NotNull FURI fileURI, long offset, long size, @Nullable ByteBuffer bytes, long totalSize);
Mono<Boolean> hasAllData(@NotNull FURI fileURI);
/**
* Delete a file
* @param fileURI File URI
* @return nothing
*/
default Mono<Void> deleteFile(@NotNull FURI fileURI) {
return setFileData(fileURI, 0, 0, null, 0);
}
}

View File

@ -1,44 +0,0 @@
/*
* FileSponge
* Copyright (C) 2020 Andrea Cavalli
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.warp.filesponge.extra.api;
import java.util.Optional;
import org.warp.filesponge.value.AlreadyAssignedException;
import org.warp.filesponge.value.FileURI;
/**
* Translate File URIs to "fileId" and back
*/
public interface FileURITranslator<FURI extends FileURI, FID> {
Optional<FURI> getURI(FID fileId);
Optional<FID> getFileId(FURI fileURI);
/**
* @throws AlreadyAssignedException Throw if the uri has another fileId assigned
*/
void setFileId(FURI fileURI, FID fileId) throws AlreadyAssignedException;
Optional<FURI> delete(FID fileId);
Optional<FID> delete(FURI fileURI);
void clear();
}

View File

@ -1,39 +0,0 @@
/*
* FileSponge
* Copyright (C) 2020 Andrea Cavalli
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.warp.filesponge.extra.api;
import org.warp.filesponge.value.FileURI;
/**
* Create an unique "fileId" for each File URI
*/
public interface URIObfuscator<FURI extends FileURI, FID> {
FURI deobfuscateFileId(FID fileId);
void getFileId(FURI fileURI);
void setURIValue(FURI fileURI, FID fileId);
FURI delete(FID fileId);
FID delete(FURI fileURI);
void clear();
}

View File

@ -1,22 +0,0 @@
/*
* FileSponge
* Copyright (C) 2020 Andrea Cavalli
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
/**
* Extra functionalities and utilities for FileSponge
*/
package org.warp.filesponge.extra;

View File

@ -1,80 +0,0 @@
/*
* FileSponge
* Copyright (C) 2021 Andrea Cavalli
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.warp.filesponge.reactor;
import java.util.Set;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* Asynchronous set
* @param <T> value type
*/
public interface AsyncSet<T> {
/**
* Clear the set
*
* @return void
*/
Mono<Void> clear();
/**
* Add element to the set
*
* @param value value to add
* @return true if added, false if it's already present. Can't be empty.
*/
Mono<Boolean> add(T value);
/**
* Remove element from the set
*
* @param value value to remove
* @return true if removed, false if it's not present. Can't be empty.
*/
Mono<Boolean> remove(T value);
/**
* Find if element is present in the set
*
* @param value value to find
* @return true if found, false if not. Can't be empty.
*/
Mono<Boolean> contains(T value);
/**
* Get the set size
*
* @return set size, from 0 to {@value Integer#MAX_VALUE}. Can't be empty.
*/
Mono<Integer> size();
/**
* Get all values
* @return values, in a flux. Can be empty.
*/
Flux<T> toFlux();
/**
* Get all values
* @return values, in a set. Can't be empty.
*/
Mono<Set<T>> toSet();
}

View File

@ -1,81 +0,0 @@
/*
* FileSponge
* Copyright (C) 2021 Andrea Cavalli
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.warp.filesponge.reactor;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentHashMap.KeySetView;
import lombok.EqualsAndHashCode;
import org.warp.commonutils.concurrency.atomicity.Atomic;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Atomic
@EqualsAndHashCode
public class ConcurrentAsyncSet<T> implements AsyncSet<T> {
private final KeySetView<T, Boolean> set;
public ConcurrentAsyncSet() {
this.set = ConcurrentHashMap.newKeySet();
}
@Override
public Mono<Void> clear() {
return Mono.fromCallable(() -> {
set.clear();
return null;
});
}
@Override
public Mono<Boolean> add(T value) {
return Mono.fromCallable(() -> set.add(value));
}
@Override
public Mono<Boolean> remove(T value) {
return Mono.fromCallable(() -> set.remove(value));
}
@Override
public Mono<Boolean> contains(T value) {
return Mono.fromCallable(() -> set.contains(value));
}
@Override
public Mono<Integer> size() {
return Mono.fromCallable(set::size);
}
@Override
public Flux<T> toFlux() {
return Flux.fromStream(set::stream);
}
@Override
public Mono<Set<T>> toSet() {
return Mono.fromCallable(() -> Set.copyOf(set));
}
@Override
public String toString() {
return set.toString();
}
}

View File

@ -1,88 +0,0 @@
/*
* FileSponge
* Copyright (C) 2021 Andrea Cavalli
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.warp.filesponge.reactor;
import java.util.HashSet;
import java.util.Set;
import lombok.EqualsAndHashCode;
import org.warp.commonutils.concurrency.atomicity.NotAtomic;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@NotAtomic
@EqualsAndHashCode
public class HashAsyncSet<T> implements AsyncSet<T> {
private final HashSet<T> set;
public HashAsyncSet() {
this.set = new HashSet<>();
}
public HashAsyncSet(HashSet<T> set) {
this.set = set;
}
@Override
public Mono<Void> clear() {
return Mono.fromCallable(() -> {
set.clear();
return null;
});
}
@Override
public Mono<Boolean> add(T value) {
return Mono.fromCallable(() -> set.add(value));
}
@Override
public Mono<Boolean> remove(T value) {
return Mono.fromCallable(() -> set.remove(value));
}
@Override
public Mono<Boolean> contains(T value) {
return Mono.fromCallable(() -> set.contains(value));
}
@Override
public Mono<Integer> size() {
return Mono.fromCallable(set::size);
}
@Override
public Flux<T> toFlux() {
return Flux.fromStream(set::stream);
}
@Override
public Mono<Set<T>> toSet() {
return Mono.fromCallable(() -> Set.copyOf(set));
}
@Override
public String toString() {
return set.toString();
}
public SynchronizedHashAsyncSet<T> synchronize() {
return new SynchronizedHashAsyncSet<>(this);
}
}

View File

@ -1,83 +0,0 @@
/*
* FileSponge
* Copyright (C) 2021 Andrea Cavalli
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.warp.filesponge.reactor;
import java.util.Set;
import lombok.EqualsAndHashCode;
import org.warp.commonutils.concurrency.atomicity.NotAtomic;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
@EqualsAndHashCode
@NotAtomic
public class SynchronizedHashAsyncSet<T> implements AsyncSet<T> {
private transient final Scheduler scheduler = Schedulers.single();
private final HashAsyncSet<T> set;
public SynchronizedHashAsyncSet() {
this.set = new HashAsyncSet<>();
}
public SynchronizedHashAsyncSet(HashAsyncSet<T> set) {
this.set = set;
}
@Override
public Mono<Void> clear() {
return set.clear().subscribeOn(scheduler);
}
@Override
public Mono<Boolean> add(T value) {
return set.add(value).subscribeOn(scheduler);
}
@Override
public Mono<Boolean> remove(T value) {
return set.remove(value).subscribeOn(scheduler);
}
@Override
public Mono<Boolean> contains(T value) {
return set.contains(value).subscribeOn(scheduler);
}
@Override
public Mono<Integer> size() {
return set.size().subscribeOn(scheduler);
}
@Override
public Flux<T> toFlux() {
return set.toFlux().subscribeOn(scheduler);
}
@Override
public Mono<Set<T>> toSet() {
return set.toSet().subscribeOn(scheduler);
}
@Override
public String toString() {
return set.toString();
}
}

View File

@ -1,40 +0,0 @@
/*
* FileSponge
* Copyright (C) 2020 Andrea Cavalli
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.warp.filesponge.value;
import java.util.Optional;
import lombok.Value;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@Value
public class FileStatus {
@NotNull FileAvailability availability;
@NotNull FileDataAvailability dataAvailability;
@Nullable Integer totalSize;
@Nullable Integer downloadedSize;
public Optional<Integer> getTotalSize() {
return Optional.ofNullable(totalSize);
}
public Optional<Integer> getDownloadedSize() {
return Optional.ofNullable(downloadedSize);
}
}

View File

@ -1,21 +0,0 @@
/*
* FileSponge
* Copyright (C) 2020 Andrea Cavalli
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.warp.filesponge.value;
public interface FileURI {}

View File

@ -1,21 +0,0 @@
/*
* FileSponge
* Copyright (C) 2020 Andrea Cavalli
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.warp.filesponge.value;
public interface MirrorURI {}