onNewFile(@NotNull FURI fileURI, @NotNull FTYPE fileType);
/**
* Called when the mirror notifies you details about a file.
*
* {@link FileSource#onNewFile(FURI, FTYPE)} must have been already called
*/
- void onFile(@NotNull FURI fileURI, @NotNull FileSourceAvailability fileAvailability, long totalSize);
+ Mono onFile(@NotNull FURI fileURI, @NotNull FileSourceAvailability fileAvailability, long totalSize);
/**
* Called when the mirror notifies you the bytes of a part of a file.
@@ -57,5 +58,5 @@ public interface FileSource {
* {@link FileSource#onNewFile(FURI, FTYPE)} and {@link FileSource#onFile(FURI, FileSourceAvailability, long)} must
* have been already called
*/
- void onFilePiece(@NotNull FURI fileURI, long offset, long size, @NotNull ByteBuffer piece);
+ Mono onFilePiece(@NotNull FURI fileURI, long offset, long size, @NotNull ByteBuffer piece);
}
diff --git a/src/main/lombok/org/warp/filesponge/api/FileSpongeClient.java b/src/main/lombok/org/warp/filesponge/api/FileSpongeClient.java
index 0fb8ee5..5c08562 100644
--- a/src/main/lombok/org/warp/filesponge/api/FileSpongeClient.java
+++ b/src/main/lombok/org/warp/filesponge/api/FileSpongeClient.java
@@ -21,13 +21,14 @@ package org.warp.filesponge.api;
import org.warp.filesponge.SecureFileAccessor;
import org.warp.filesponge.value.FileContent;
import org.warp.filesponge.value.FileURI;
+import reactor.core.publisher.Mono;
/**
* FileAccessor can be used to manage FileSponge and access files from the client side
*/
public interface FileSpongeClient extends FileAccessor {
- void optimizeStorage();
+ Mono optimizeStorage();
/**
* Get this instance but without special methods
diff --git a/src/main/lombok/org/warp/filesponge/api/FileStorage.java b/src/main/lombok/org/warp/filesponge/api/FileStorage.java
index 39fdf80..e63da88 100644
--- a/src/main/lombok/org/warp/filesponge/api/FileStorage.java
+++ b/src/main/lombok/org/warp/filesponge/api/FileStorage.java
@@ -24,14 +24,15 @@ import org.warp.filesponge.value.FileContent;
import org.warp.filesponge.value.FileType;
import org.warp.filesponge.value.FileURI;
import org.warp.filesponge.value.MirrorURI;
+import reactor.core.publisher.Mono;
public interface FileStorage {
- void newFile(@NotNull FURI fileURI, @NotNull FTYPE fileType);
+ Mono newFile(@NotNull FURI fileURI, @NotNull FTYPE fileType);
- FC readFileData(@NotNull FURI fileURI);
+ Mono readFileData(@NotNull FURI fileURI);
- void setFileData(@NotNull FURI fileURI, long offset, long size, ByteBuffer bytes, long totalSize);
+ Mono setFileData(@NotNull FURI fileURI, long offset, long size, ByteBuffer bytes, long totalSize);
- boolean hasAllData(@NotNull FURI fileURI);
+ Mono hasAllData(@NotNull FURI fileURI);
}
diff --git a/src/main/lombok/org/warp/filesponge/value/AsyncMultiAssociation.java b/src/main/lombok/org/warp/filesponge/value/AsyncMultiAssociation.java
new file mode 100644
index 0000000..f7eb63b
--- /dev/null
+++ b/src/main/lombok/org/warp/filesponge/value/AsyncMultiAssociation.java
@@ -0,0 +1,55 @@
+/*
+ * FileSponge
+ * Copyright (C) 2021 Andrea Cavalli
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package org.warp.filesponge.value;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public interface AsyncMultiAssociation {
+
+ Mono link(T var1, U var2);
+
+ Mono unlink(T var1, U var2);
+
+ Flux unlink(T var1);
+
+ Flux unlinkFromSource(U var1);
+
+ default Mono hasAnyLink(T src) {
+ return this.getLinks(src).hasElements();
+ }
+
+ default Mono hasAnyLinkSource(U dest) {
+ return this.getLinkSources(dest).hasElements();
+ }
+
+ Mono hasLink(T var1, U var2);
+
+ Flux getLinks(T var1);
+
+ Flux getLinkSources(U var1);
+
+ Mono clear();
+
+ Mono size();
+
+ Flux getSources();
+
+ Flux getDestinations();
+}
diff --git a/src/test/lombok/org/warp/filesponge/ThreadSafety.java b/src/test/lombok/org/warp/filesponge/ThreadSafety.java
new file mode 100644
index 0000000..98c5706
--- /dev/null
+++ b/src/test/lombok/org/warp/filesponge/ThreadSafety.java
@@ -0,0 +1,45 @@
+package org.warp.filesponge;
+
+import java.util.LinkedList;
+import java.util.List;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
+import reactor.test.StepVerifier;
+
+public class ThreadSafety {
+
+ @Test
+ public void threadSafety() {
+ Scheduler schedulerSingle = Schedulers.newSingle("treadSafeScheduler");
+ Scheduler schedulerParallel = Schedulers.newParallel("threadUnsafeScheduler", 20);
+
+ int iterations = 500;
+ List list = new LinkedList<>();
+
+ var flux = Flux.range(0, iterations)
+ .flatMap(s -> Mono.fromCallable(() -> {
+ list.add(s);
+ return s;
+ }).then(Mono.fromCallable(() -> {
+ list.add(1);
+ return s;
+ })).subscribeOn(schedulerSingle))
+ .ignoreElements()
+ .thenMany(Flux.defer(() -> Flux.fromIterable(list)))
+ .subscribeOn(schedulerParallel);
+
+ Integer[] checks = new Integer[iterations * 2];
+ for (int i = 0; i < iterations; i++) {
+ checks[i * 2] = i;
+ checks[i * 2 + 1] = 1;
+ }
+ StepVerifier
+ .create(flux)
+ .expectSubscription()
+ .expectNext(checks)
+ .verifyComplete();
+ }
+}