Asynchronous

This commit is contained in:
Andrea Cavalli 2021-01-11 01:29:53 +01:00
parent 6af215a2c9
commit ce3eedc6eb
11 changed files with 201 additions and 61 deletions

13
pom.xml
View File

@ -56,11 +56,22 @@
<artifactId>common-utils</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.4.1</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/lombok</sourceDirectory>
<testSourceDirectory>src/main/lombok</testSourceDirectory>
<testSourceDirectory>src/test/lombok</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>

View File

@ -21,41 +21,57 @@ package org.warp.filesponge;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import org.warp.commonutils.type.MultiAssociation;
import org.warp.filesponge.value.AsyncMultiAssociation;
import org.warp.filesponge.value.FileURI;
import org.warp.filesponge.value.MirrorURI;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
@AllArgsConstructor()
public class FileMirrorsManager {
private final Scheduler fileMirrorsManagerScheduler = Schedulers.single();
private final MirrorAvailabilityManager mirrorAvailabilityManager;
/**
* This map must be persistent
*/
private final MultiAssociation<FileURI, MirrorURI> fileMirrors;
private final AsyncMultiAssociation<FileURI, MirrorURI> fileMirrors;
public synchronized Set<MirrorURI> getAvailableMirrors(FileURI fileURI) {
public Mono<Set<MirrorURI>> getAvailableMirrors(FileURI fileURI) {
return fileMirrors
.getLinks(fileURI)
.stream()
.filter(mirrorAvailabilityManager::isMirrorAvailable)
.collect(Collectors.toUnmodifiableSet());
.filterWhen(mirrorAvailabilityManager::isMirrorAvailable)
.collect(Collectors.toUnmodifiableSet())
.subscribeOn(fileMirrorsManagerScheduler);
}
public synchronized boolean hasAnyAvailableMirror(FileURI uri) {
return fileMirrors.getLinks(uri).stream().anyMatch(mirrorAvailabilityManager::isMirrorAvailable);
public Mono<Boolean> hasAnyAvailableMirror(FileURI uri) {
return fileMirrors
.getLinks(uri)
.filterWhen(mirrorAvailabilityManager::isMirrorAvailable)
.hasElements()
.subscribeOn(fileMirrorsManagerScheduler);
}
public synchronized void addMirror(FileURI uri, MirrorURI mirrorURI) {
fileMirrors.link(uri, mirrorURI);
public Mono<Void> addMirror(FileURI uri, MirrorURI mirrorURI) {
return fileMirrors
.link(uri, mirrorURI)
.then()
.subscribeOn(fileMirrorsManagerScheduler);
}
public synchronized void removeMirror(FileURI uri, MirrorURI mirrorURI) {
fileMirrors.unlink(uri, mirrorURI);
public Mono<Void> removeMirror(FileURI uri, MirrorURI mirrorURI) {
return fileMirrors
.unlink(uri, mirrorURI)
.then()
.subscribeOn(fileMirrorsManagerScheduler);
}
public synchronized void unsetAllFiles() {
fileMirrors.clear();
public Mono<Void> unsetAllFiles() {
return fileMirrors.clear()
.subscribeOn(fileMirrorsManagerScheduler);
}
}

View File

@ -18,30 +18,40 @@
package org.warp.filesponge;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import org.warp.filesponge.value.MirrorURI;
import reactor.core.publisher.Mono;
@AllArgsConstructor(access = AccessLevel.PUBLIC)
public class MirrorAvailabilityManager {
private final Set<MirrorURI> availableMirrors = new HashSet<>();
private static final Object NO_VALUE = new Object();
/**
* This is a set. the value is not important
*/
private final ConcurrentHashMap<MirrorURI, Object> availableMirrors = new ConcurrentHashMap<>();
public synchronized void setAllMirrorsAsUnavailable() {
availableMirrors.clear();
public Mono<Void> setAllMirrorsAsUnavailable() {
return Mono.fromCallable(() -> {
availableMirrors.clear();
return null;
});
}
public synchronized void setMirrorAvailability(MirrorURI mirrorURI, boolean available) {
if (available) {
availableMirrors.add(mirrorURI);
} else {
availableMirrors.remove(mirrorURI);
}
public Mono<Void> setMirrorAvailability(MirrorURI mirrorURI, boolean available) {
return Mono.fromCallable(() -> {
if (available) {
availableMirrors.put(mirrorURI, NO_VALUE);
} else {
availableMirrors.remove(mirrorURI);
}
return null;
});
}
public synchronized boolean isMirrorAvailable(MirrorURI mirrorURI) {
return this.availableMirrors.contains(mirrorURI);
public Mono<Boolean> isMirrorAvailable(MirrorURI mirrorURI) {
return Mono.fromCallable(() -> this.availableMirrors.contains(mirrorURI));
}
}

View File

@ -18,8 +18,6 @@
package org.warp.filesponge;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import org.jetbrains.annotations.NotNull;
@ -27,6 +25,7 @@ import org.warp.filesponge.api.FileAccessor;
import org.warp.filesponge.value.FileContent;
import org.warp.filesponge.value.FileStatus;
import org.warp.filesponge.value.FileURI;
import reactor.core.publisher.Mono;
/**
* Prevent access to other methods via casting
@ -38,17 +37,17 @@ public class SecureFileAccessor<FURI extends FileURI, FC extends FileContent> im
private final FileAccessor<FURI, FC> fileAccessor;
@Override
public void delete(@NotNull FURI fileURI) {
fileAccessor.delete(fileURI);
public Mono<Void> delete(@NotNull FURI fileURI) {
return fileAccessor.delete(fileURI);
}
@Override
public CompletionStage<Optional<FC>> getContent(@NotNull FURI fileURI, boolean offlineOnly) {
public Mono<FC> getContent(@NotNull FURI fileURI, boolean offlineOnly) {
return fileAccessor.getContent(fileURI, offlineOnly);
}
@Override
public @NotNull FileStatus getStatus(@NotNull FURI fileURI) {
public @NotNull Mono<FileStatus> getStatus(@NotNull FURI fileURI) {
return fileAccessor.getStatus(fileURI);
}

View File

@ -18,12 +18,11 @@
package org.warp.filesponge.api;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import org.jetbrains.annotations.NotNull;
import org.warp.filesponge.value.FileContent;
import org.warp.filesponge.value.FileStatus;
import org.warp.filesponge.value.FileURI;
import reactor.core.publisher.Mono;
/**
* FileAccessor can be used to access files from the client side
@ -34,23 +33,25 @@ public interface FileAccessor<FURI extends FileURI, FC extends FileContent> {
* Request file deletion
*
* @param fileURI File URI
* @return Empty.
*/
void delete(@NotNull FURI fileURI);
Mono<Void> delete(@NotNull FURI fileURI);
/**
* Get file content
*
* @param fileURI File URI
* @param offlineOnly true to get the file from cache
* @return content if found. If the request is offline the future will complete instantly
* @return content if found. If the request is offline the future will complete instantly.
* Can be empty
*/
CompletionStage<Optional<FC>> getContent(@NotNull FURI fileURI, boolean offlineOnly);
Mono<FC> getContent(@NotNull FURI fileURI, boolean offlineOnly);
/**
* Get file status
*
* @param fileURI File URI
* @return status of this file
* @return status of this file. Cannot be empty.
*/
@NotNull FileStatus getStatus(@NotNull FURI fileURI);
Mono<FileStatus> getStatus(@NotNull FURI fileURI);
}

View File

@ -19,8 +19,8 @@
package org.warp.filesponge.api;
import java.time.Duration;
import java.util.Optional;
import org.warp.filesponge.value.FileURI;
import reactor.core.publisher.Mono;
/**
* FileActor sends signals to a mirror
@ -31,25 +31,25 @@ public interface FileActor<FURI extends FileURI> {
* Send a "delete file" signal
*
* @param fileURI File URI
* @return true if the signal can be sent
* @return true if the signal can be sent. Cannot be empty.
*/
boolean deleteFile(FURI fileURI);
Mono<Boolean> deleteFile(FURI fileURI);
/**
* Send a "download file" signal
*
* @param fileURI File URI
* @return true if the signal can be sent
* @return true if the signal can be sent. Cannot be empty.
*/
boolean downloadFile(FURI fileURI);
Mono<Boolean> downloadFile(FURI fileURI);
/**
* Check if this actor can handle signals for this file
*
* @param fileURI File URI
* @return true if the actor can send signals related to this file
* @return true if the actor can send signals related to this file. Cannot be empty.
*/
boolean canHandleFile(FURI fileURI);
Mono<Boolean> canHandleFile(FURI fileURI);
/**
* Send a "download file" signal
@ -59,7 +59,7 @@ public interface FileActor<FURI extends FileURI> {
* @return empty if no pending <b>download requests</b> has been found, true if the signal can be sent, false
* otherwise
*/
Optional<Boolean> downloadNextFile(Duration timeout);
Mono<Boolean> downloadNextFile(Duration timeout);
/**
* Send a "delete file" signal
@ -68,5 +68,5 @@ public interface FileActor<FURI extends FileURI> {
* <b>delete request</b> has been found, or the timeout time elapsed
* @return empty if no pending <b>delete requests</b> has been found, true if the signal can be sent, false otherwise
*/
Optional<Boolean> deleteNextFile(Duration timeout);
Mono<Boolean> deleteNextFile(Duration timeout);
}

View File

@ -23,6 +23,7 @@ import org.jetbrains.annotations.NotNull;
import org.warp.filesponge.value.FileSourceAvailability;
import org.warp.filesponge.value.FileType;
import org.warp.filesponge.value.FileURI;
import reactor.core.publisher.Mono;
/**
* FileSource receives responses from a mirror
@ -32,24 +33,24 @@ public interface FileSource<FURI extends FileURI, FTYPE extends FileType> {
/**
* Called when the mirror is online
*/
void onAvailable();
Mono<Void> onAvailable();
/**
* Called when the mirror is unreachable
*/
void onUnavailable();
Mono<Void> onUnavailable();
/**
* Called when the mirror notifies you that a new file exists
* Called when the mirror notifies you that a new file exists. Cannot be empty.
*/
boolean onNewFile(@NotNull FURI fileURI, @NotNull FTYPE fileType);
Mono<Boolean> onNewFile(@NotNull FURI fileURI, @NotNull FTYPE fileType);
/**
* Called when the mirror notifies you details about a file.
* <p>
* {@link FileSource#onNewFile(FURI, FTYPE)} must have been already called
*/
void onFile(@NotNull FURI fileURI, @NotNull FileSourceAvailability fileAvailability, long totalSize);
Mono<Void> onFile(@NotNull FURI fileURI, @NotNull FileSourceAvailability fileAvailability, long totalSize);
/**
* Called when the mirror notifies you the bytes of a part of a file.
@ -57,5 +58,5 @@ public interface FileSource<FURI extends FileURI, FTYPE extends FileType> {
* {@link FileSource#onNewFile(FURI, FTYPE)} and {@link FileSource#onFile(FURI, FileSourceAvailability, long)} must
* have been already called
*/
void onFilePiece(@NotNull FURI fileURI, long offset, long size, @NotNull ByteBuffer piece);
Mono<Void> onFilePiece(@NotNull FURI fileURI, long offset, long size, @NotNull ByteBuffer piece);
}

View File

@ -21,13 +21,14 @@ package org.warp.filesponge.api;
import org.warp.filesponge.SecureFileAccessor;
import org.warp.filesponge.value.FileContent;
import org.warp.filesponge.value.FileURI;
import reactor.core.publisher.Mono;
/**
* FileAccessor can be used to manage FileSponge and access files from the client side
*/
public interface FileSpongeClient<FURI extends FileURI, FC extends FileContent> extends FileAccessor<FURI, FC> {
void optimizeStorage();
Mono<Void> optimizeStorage();
/**
* Get this instance but without special methods

View File

@ -24,14 +24,15 @@ import org.warp.filesponge.value.FileContent;
import org.warp.filesponge.value.FileType;
import org.warp.filesponge.value.FileURI;
import org.warp.filesponge.value.MirrorURI;
import reactor.core.publisher.Mono;
public interface FileStorage<FURI extends FileURI, FTYPE extends FileType, MURI extends MirrorURI, FC extends FileContent> {
void newFile(@NotNull FURI fileURI, @NotNull FTYPE fileType);
Mono<Void> newFile(@NotNull FURI fileURI, @NotNull FTYPE fileType);
FC readFileData(@NotNull FURI fileURI);
Mono<FC> readFileData(@NotNull FURI fileURI);
void setFileData(@NotNull FURI fileURI, long offset, long size, ByteBuffer bytes, long totalSize);
Mono<Void> setFileData(@NotNull FURI fileURI, long offset, long size, ByteBuffer bytes, long totalSize);
boolean hasAllData(@NotNull FURI fileURI);
Mono<Boolean> hasAllData(@NotNull FURI fileURI);
}

View File

@ -0,0 +1,55 @@
/*
* FileSponge
* Copyright (C) 2021 Andrea Cavalli
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.warp.filesponge.value;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public interface AsyncMultiAssociation<T, U> {
Mono<Boolean> link(T var1, U var2);
Mono<Boolean> unlink(T var1, U var2);
Flux<U> unlink(T var1);
Flux<T> unlinkFromSource(U var1);
default Mono<Boolean> hasAnyLink(T src) {
return this.getLinks(src).hasElements();
}
default Mono<Boolean> hasAnyLinkSource(U dest) {
return this.getLinkSources(dest).hasElements();
}
Mono<Boolean> hasLink(T var1, U var2);
Flux<U> getLinks(T var1);
Flux<T> getLinkSources(U var1);
Mono<Void> clear();
Mono<Integer> size();
Flux<T> getSources();
Flux<U> getDestinations();
}

View File

@ -0,0 +1,45 @@
package org.warp.filesponge;
import java.util.LinkedList;
import java.util.List;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
public class ThreadSafety {
@Test
public void threadSafety() {
Scheduler schedulerSingle = Schedulers.newSingle("treadSafeScheduler");
Scheduler schedulerParallel = Schedulers.newParallel("threadUnsafeScheduler", 20);
int iterations = 500;
List<Integer> list = new LinkedList<>();
var flux = Flux.range(0, iterations)
.flatMap(s -> Mono.fromCallable(() -> {
list.add(s);
return s;
}).then(Mono.fromCallable(() -> {
list.add(1);
return s;
})).subscribeOn(schedulerSingle))
.ignoreElements()
.thenMany(Flux.defer(() -> Flux.fromIterable(list)))
.subscribeOn(schedulerParallel);
Integer[] checks = new Integer[iterations * 2];
for (int i = 0; i < iterations; i++) {
checks[i * 2] = i;
checks[i * 2 + 1] = 1;
}
StepVerifier
.create(flux)
.expectSubscription()
.expectNext(checks)
.verifyComplete();
}
}