From ce3eedc6eb802409fd2c12ed241201848dd6c581 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Mon, 11 Jan 2021 01:29:53 +0100 Subject: [PATCH] Asynchronous --- pom.xml | 13 ++++- .../warp/filesponge/FileMirrorsManager.java | 44 ++++++++++----- .../filesponge/MirrorAvailabilityManager.java | 36 +++++++----- .../warp/filesponge/SecureFileAccessor.java | 11 ++-- .../org/warp/filesponge/api/FileAccessor.java | 15 ++--- .../org/warp/filesponge/api/FileActor.java | 18 +++--- .../org/warp/filesponge/api/FileSource.java | 13 +++-- .../warp/filesponge/api/FileSpongeClient.java | 3 +- .../org/warp/filesponge/api/FileStorage.java | 9 +-- .../value/AsyncMultiAssociation.java | 55 +++++++++++++++++++ .../org/warp/filesponge/ThreadSafety.java | 45 +++++++++++++++ 11 files changed, 201 insertions(+), 61 deletions(-) create mode 100644 src/main/lombok/org/warp/filesponge/value/AsyncMultiAssociation.java create mode 100644 src/test/lombok/org/warp/filesponge/ThreadSafety.java diff --git a/pom.xml b/pom.xml index 5c513b9..db38798 100644 --- a/pom.xml +++ b/pom.xml @@ -56,11 +56,22 @@ common-utils 1.1.1 + + io.projectreactor + reactor-core + 3.4.1 + + + io.projectreactor + reactor-test + 3.4.1 + test + src/main/lombok - src/main/lombok + src/test/lombok org.apache.maven.plugins diff --git a/src/main/lombok/org/warp/filesponge/FileMirrorsManager.java b/src/main/lombok/org/warp/filesponge/FileMirrorsManager.java index b59de44..058ecea 100644 --- a/src/main/lombok/org/warp/filesponge/FileMirrorsManager.java +++ b/src/main/lombok/org/warp/filesponge/FileMirrorsManager.java @@ -21,41 +21,57 @@ package org.warp.filesponge; import java.util.Set; import java.util.stream.Collectors; import lombok.AllArgsConstructor; -import org.warp.commonutils.type.MultiAssociation; +import org.warp.filesponge.value.AsyncMultiAssociation; import org.warp.filesponge.value.FileURI; import org.warp.filesponge.value.MirrorURI; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; @AllArgsConstructor() public class FileMirrorsManager { + private final Scheduler fileMirrorsManagerScheduler = Schedulers.single(); + private final MirrorAvailabilityManager mirrorAvailabilityManager; /** * This map must be persistent */ - private final MultiAssociation fileMirrors; + private final AsyncMultiAssociation fileMirrors; - public synchronized Set getAvailableMirrors(FileURI fileURI) { + public Mono> getAvailableMirrors(FileURI fileURI) { return fileMirrors .getLinks(fileURI) - .stream() - .filter(mirrorAvailabilityManager::isMirrorAvailable) - .collect(Collectors.toUnmodifiableSet()); + .filterWhen(mirrorAvailabilityManager::isMirrorAvailable) + .collect(Collectors.toUnmodifiableSet()) + .subscribeOn(fileMirrorsManagerScheduler); } - public synchronized boolean hasAnyAvailableMirror(FileURI uri) { - return fileMirrors.getLinks(uri).stream().anyMatch(mirrorAvailabilityManager::isMirrorAvailable); + public Mono hasAnyAvailableMirror(FileURI uri) { + return fileMirrors + .getLinks(uri) + .filterWhen(mirrorAvailabilityManager::isMirrorAvailable) + .hasElements() + .subscribeOn(fileMirrorsManagerScheduler); } - public synchronized void addMirror(FileURI uri, MirrorURI mirrorURI) { - fileMirrors.link(uri, mirrorURI); + public Mono addMirror(FileURI uri, MirrorURI mirrorURI) { + return fileMirrors + .link(uri, mirrorURI) + .then() + .subscribeOn(fileMirrorsManagerScheduler); } - public synchronized void removeMirror(FileURI uri, MirrorURI mirrorURI) { - fileMirrors.unlink(uri, mirrorURI); + public Mono removeMirror(FileURI uri, MirrorURI mirrorURI) { + return fileMirrors + .unlink(uri, mirrorURI) + .then() + .subscribeOn(fileMirrorsManagerScheduler); } - public synchronized void unsetAllFiles() { - fileMirrors.clear(); + public Mono unsetAllFiles() { + return fileMirrors.clear() + .subscribeOn(fileMirrorsManagerScheduler); } } diff --git a/src/main/lombok/org/warp/filesponge/MirrorAvailabilityManager.java b/src/main/lombok/org/warp/filesponge/MirrorAvailabilityManager.java index 2713be1..7d524a1 100644 --- a/src/main/lombok/org/warp/filesponge/MirrorAvailabilityManager.java +++ b/src/main/lombok/org/warp/filesponge/MirrorAvailabilityManager.java @@ -18,30 +18,40 @@ package org.warp.filesponge; -import java.util.HashSet; -import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import lombok.AccessLevel; import lombok.AllArgsConstructor; import org.warp.filesponge.value.MirrorURI; +import reactor.core.publisher.Mono; @AllArgsConstructor(access = AccessLevel.PUBLIC) public class MirrorAvailabilityManager { - private final Set availableMirrors = new HashSet<>(); + private static final Object NO_VALUE = new Object(); + /** + * This is a set. the value is not important + */ + private final ConcurrentHashMap availableMirrors = new ConcurrentHashMap<>(); - public synchronized void setAllMirrorsAsUnavailable() { - availableMirrors.clear(); + public Mono setAllMirrorsAsUnavailable() { + return Mono.fromCallable(() -> { + availableMirrors.clear(); + return null; + }); } - public synchronized void setMirrorAvailability(MirrorURI mirrorURI, boolean available) { - if (available) { - availableMirrors.add(mirrorURI); - } else { - availableMirrors.remove(mirrorURI); - } + public Mono setMirrorAvailability(MirrorURI mirrorURI, boolean available) { + return Mono.fromCallable(() -> { + if (available) { + availableMirrors.put(mirrorURI, NO_VALUE); + } else { + availableMirrors.remove(mirrorURI); + } + return null; + }); } - public synchronized boolean isMirrorAvailable(MirrorURI mirrorURI) { - return this.availableMirrors.contains(mirrorURI); + public Mono isMirrorAvailable(MirrorURI mirrorURI) { + return Mono.fromCallable(() -> this.availableMirrors.contains(mirrorURI)); } } diff --git a/src/main/lombok/org/warp/filesponge/SecureFileAccessor.java b/src/main/lombok/org/warp/filesponge/SecureFileAccessor.java index 6726ee7..ce7e9eb 100644 --- a/src/main/lombok/org/warp/filesponge/SecureFileAccessor.java +++ b/src/main/lombok/org/warp/filesponge/SecureFileAccessor.java @@ -18,8 +18,6 @@ package org.warp.filesponge; -import java.util.Optional; -import java.util.concurrent.CompletionStage; import lombok.AllArgsConstructor; import lombok.EqualsAndHashCode; import org.jetbrains.annotations.NotNull; @@ -27,6 +25,7 @@ import org.warp.filesponge.api.FileAccessor; import org.warp.filesponge.value.FileContent; import org.warp.filesponge.value.FileStatus; import org.warp.filesponge.value.FileURI; +import reactor.core.publisher.Mono; /** * Prevent access to other methods via casting @@ -38,17 +37,17 @@ public class SecureFileAccessor im private final FileAccessor fileAccessor; @Override - public void delete(@NotNull FURI fileURI) { - fileAccessor.delete(fileURI); + public Mono delete(@NotNull FURI fileURI) { + return fileAccessor.delete(fileURI); } @Override - public CompletionStage> getContent(@NotNull FURI fileURI, boolean offlineOnly) { + public Mono getContent(@NotNull FURI fileURI, boolean offlineOnly) { return fileAccessor.getContent(fileURI, offlineOnly); } @Override - public @NotNull FileStatus getStatus(@NotNull FURI fileURI) { + public @NotNull Mono getStatus(@NotNull FURI fileURI) { return fileAccessor.getStatus(fileURI); } diff --git a/src/main/lombok/org/warp/filesponge/api/FileAccessor.java b/src/main/lombok/org/warp/filesponge/api/FileAccessor.java index c5a0fa6..12cbbfe 100644 --- a/src/main/lombok/org/warp/filesponge/api/FileAccessor.java +++ b/src/main/lombok/org/warp/filesponge/api/FileAccessor.java @@ -18,12 +18,11 @@ package org.warp.filesponge.api; -import java.util.Optional; -import java.util.concurrent.CompletionStage; import org.jetbrains.annotations.NotNull; import org.warp.filesponge.value.FileContent; import org.warp.filesponge.value.FileStatus; import org.warp.filesponge.value.FileURI; +import reactor.core.publisher.Mono; /** * FileAccessor can be used to access files from the client side @@ -34,23 +33,25 @@ public interface FileAccessor { * Request file deletion * * @param fileURI File URI + * @return Empty. */ - void delete(@NotNull FURI fileURI); + Mono delete(@NotNull FURI fileURI); /** * Get file content * * @param fileURI File URI * @param offlineOnly true to get the file from cache - * @return content if found. If the request is offline the future will complete instantly + * @return content if found. If the request is offline the future will complete instantly. + * Can be empty */ - CompletionStage> getContent(@NotNull FURI fileURI, boolean offlineOnly); + Mono getContent(@NotNull FURI fileURI, boolean offlineOnly); /** * Get file status * * @param fileURI File URI - * @return status of this file + * @return status of this file. Cannot be empty. */ - @NotNull FileStatus getStatus(@NotNull FURI fileURI); + Mono getStatus(@NotNull FURI fileURI); } diff --git a/src/main/lombok/org/warp/filesponge/api/FileActor.java b/src/main/lombok/org/warp/filesponge/api/FileActor.java index 9e459e8..39bfa75 100644 --- a/src/main/lombok/org/warp/filesponge/api/FileActor.java +++ b/src/main/lombok/org/warp/filesponge/api/FileActor.java @@ -19,8 +19,8 @@ package org.warp.filesponge.api; import java.time.Duration; -import java.util.Optional; import org.warp.filesponge.value.FileURI; +import reactor.core.publisher.Mono; /** * FileActor sends signals to a mirror @@ -31,25 +31,25 @@ public interface FileActor { * Send a "delete file" signal * * @param fileURI File URI - * @return true if the signal can be sent + * @return true if the signal can be sent. Cannot be empty. */ - boolean deleteFile(FURI fileURI); + Mono deleteFile(FURI fileURI); /** * Send a "download file" signal * * @param fileURI File URI - * @return true if the signal can be sent + * @return true if the signal can be sent. Cannot be empty. */ - boolean downloadFile(FURI fileURI); + Mono downloadFile(FURI fileURI); /** * Check if this actor can handle signals for this file * * @param fileURI File URI - * @return true if the actor can send signals related to this file + * @return true if the actor can send signals related to this file. Cannot be empty. */ - boolean canHandleFile(FURI fileURI); + Mono canHandleFile(FURI fileURI); /** * Send a "download file" signal @@ -59,7 +59,7 @@ public interface FileActor { * @return empty if no pending download requests has been found, true if the signal can be sent, false * otherwise */ - Optional downloadNextFile(Duration timeout); + Mono downloadNextFile(Duration timeout); /** * Send a "delete file" signal @@ -68,5 +68,5 @@ public interface FileActor { * delete request has been found, or the timeout time elapsed * @return empty if no pending delete requests has been found, true if the signal can be sent, false otherwise */ - Optional deleteNextFile(Duration timeout); + Mono deleteNextFile(Duration timeout); } diff --git a/src/main/lombok/org/warp/filesponge/api/FileSource.java b/src/main/lombok/org/warp/filesponge/api/FileSource.java index 5dac83f..99a9ce8 100644 --- a/src/main/lombok/org/warp/filesponge/api/FileSource.java +++ b/src/main/lombok/org/warp/filesponge/api/FileSource.java @@ -23,6 +23,7 @@ import org.jetbrains.annotations.NotNull; import org.warp.filesponge.value.FileSourceAvailability; import org.warp.filesponge.value.FileType; import org.warp.filesponge.value.FileURI; +import reactor.core.publisher.Mono; /** * FileSource receives responses from a mirror @@ -32,24 +33,24 @@ public interface FileSource { /** * Called when the mirror is online */ - void onAvailable(); + Mono onAvailable(); /** * Called when the mirror is unreachable */ - void onUnavailable(); + Mono onUnavailable(); /** - * Called when the mirror notifies you that a new file exists + * Called when the mirror notifies you that a new file exists. Cannot be empty. */ - boolean onNewFile(@NotNull FURI fileURI, @NotNull FTYPE fileType); + Mono onNewFile(@NotNull FURI fileURI, @NotNull FTYPE fileType); /** * Called when the mirror notifies you details about a file. *

* {@link FileSource#onNewFile(FURI, FTYPE)} must have been already called */ - void onFile(@NotNull FURI fileURI, @NotNull FileSourceAvailability fileAvailability, long totalSize); + Mono onFile(@NotNull FURI fileURI, @NotNull FileSourceAvailability fileAvailability, long totalSize); /** * Called when the mirror notifies you the bytes of a part of a file. @@ -57,5 +58,5 @@ public interface FileSource { * {@link FileSource#onNewFile(FURI, FTYPE)} and {@link FileSource#onFile(FURI, FileSourceAvailability, long)} must * have been already called */ - void onFilePiece(@NotNull FURI fileURI, long offset, long size, @NotNull ByteBuffer piece); + Mono onFilePiece(@NotNull FURI fileURI, long offset, long size, @NotNull ByteBuffer piece); } diff --git a/src/main/lombok/org/warp/filesponge/api/FileSpongeClient.java b/src/main/lombok/org/warp/filesponge/api/FileSpongeClient.java index 0fb8ee5..5c08562 100644 --- a/src/main/lombok/org/warp/filesponge/api/FileSpongeClient.java +++ b/src/main/lombok/org/warp/filesponge/api/FileSpongeClient.java @@ -21,13 +21,14 @@ package org.warp.filesponge.api; import org.warp.filesponge.SecureFileAccessor; import org.warp.filesponge.value.FileContent; import org.warp.filesponge.value.FileURI; +import reactor.core.publisher.Mono; /** * FileAccessor can be used to manage FileSponge and access files from the client side */ public interface FileSpongeClient extends FileAccessor { - void optimizeStorage(); + Mono optimizeStorage(); /** * Get this instance but without special methods diff --git a/src/main/lombok/org/warp/filesponge/api/FileStorage.java b/src/main/lombok/org/warp/filesponge/api/FileStorage.java index 39fdf80..e63da88 100644 --- a/src/main/lombok/org/warp/filesponge/api/FileStorage.java +++ b/src/main/lombok/org/warp/filesponge/api/FileStorage.java @@ -24,14 +24,15 @@ import org.warp.filesponge.value.FileContent; import org.warp.filesponge.value.FileType; import org.warp.filesponge.value.FileURI; import org.warp.filesponge.value.MirrorURI; +import reactor.core.publisher.Mono; public interface FileStorage { - void newFile(@NotNull FURI fileURI, @NotNull FTYPE fileType); + Mono newFile(@NotNull FURI fileURI, @NotNull FTYPE fileType); - FC readFileData(@NotNull FURI fileURI); + Mono readFileData(@NotNull FURI fileURI); - void setFileData(@NotNull FURI fileURI, long offset, long size, ByteBuffer bytes, long totalSize); + Mono setFileData(@NotNull FURI fileURI, long offset, long size, ByteBuffer bytes, long totalSize); - boolean hasAllData(@NotNull FURI fileURI); + Mono hasAllData(@NotNull FURI fileURI); } diff --git a/src/main/lombok/org/warp/filesponge/value/AsyncMultiAssociation.java b/src/main/lombok/org/warp/filesponge/value/AsyncMultiAssociation.java new file mode 100644 index 0000000..f7eb63b --- /dev/null +++ b/src/main/lombok/org/warp/filesponge/value/AsyncMultiAssociation.java @@ -0,0 +1,55 @@ +/* + * FileSponge + * Copyright (C) 2021 Andrea Cavalli + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package org.warp.filesponge.value; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public interface AsyncMultiAssociation { + + Mono link(T var1, U var2); + + Mono unlink(T var1, U var2); + + Flux unlink(T var1); + + Flux unlinkFromSource(U var1); + + default Mono hasAnyLink(T src) { + return this.getLinks(src).hasElements(); + } + + default Mono hasAnyLinkSource(U dest) { + return this.getLinkSources(dest).hasElements(); + } + + Mono hasLink(T var1, U var2); + + Flux getLinks(T var1); + + Flux getLinkSources(U var1); + + Mono clear(); + + Mono size(); + + Flux getSources(); + + Flux getDestinations(); +} diff --git a/src/test/lombok/org/warp/filesponge/ThreadSafety.java b/src/test/lombok/org/warp/filesponge/ThreadSafety.java new file mode 100644 index 0000000..98c5706 --- /dev/null +++ b/src/test/lombok/org/warp/filesponge/ThreadSafety.java @@ -0,0 +1,45 @@ +package org.warp.filesponge; + +import java.util.LinkedList; +import java.util.List; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; +import reactor.test.StepVerifier; + +public class ThreadSafety { + + @Test + public void threadSafety() { + Scheduler schedulerSingle = Schedulers.newSingle("treadSafeScheduler"); + Scheduler schedulerParallel = Schedulers.newParallel("threadUnsafeScheduler", 20); + + int iterations = 500; + List list = new LinkedList<>(); + + var flux = Flux.range(0, iterations) + .flatMap(s -> Mono.fromCallable(() -> { + list.add(s); + return s; + }).then(Mono.fromCallable(() -> { + list.add(1); + return s; + })).subscribeOn(schedulerSingle)) + .ignoreElements() + .thenMany(Flux.defer(() -> Flux.fromIterable(list))) + .subscribeOn(schedulerParallel); + + Integer[] checks = new Integer[iterations * 2]; + for (int i = 0; i < iterations; i++) { + checks[i * 2] = i; + checks[i * 2 + 1] = 1; + } + StepVerifier + .create(flux) + .expectSubscription() + .expectNext(checks) + .verifyComplete(); + } +}