diff --git a/src/main/lombok/org/warp/filesponge/FileMirrorsManager.java b/src/main/lombok/org/warp/filesponge/FileMirrorsManager.java index 058ecea..8e7dfac 100644 --- a/src/main/lombok/org/warp/filesponge/FileMirrorsManager.java +++ b/src/main/lombok/org/warp/filesponge/FileMirrorsManager.java @@ -21,7 +21,7 @@ package org.warp.filesponge; import java.util.Set; import java.util.stream.Collectors; import lombok.AllArgsConstructor; -import org.warp.filesponge.value.AsyncMultiAssociation; +import org.warp.filesponge.reactor.AsyncMultiAssociation; import org.warp.filesponge.value.FileURI; import org.warp.filesponge.value.MirrorURI; import reactor.core.publisher.Mono; diff --git a/src/main/lombok/org/warp/filesponge/MirrorAvailabilityManager.java b/src/main/lombok/org/warp/filesponge/MirrorAvailabilityManager.java index 7d524a1..09f8cff 100644 --- a/src/main/lombok/org/warp/filesponge/MirrorAvailabilityManager.java +++ b/src/main/lombok/org/warp/filesponge/MirrorAvailabilityManager.java @@ -18,40 +18,31 @@ package org.warp.filesponge; -import java.util.concurrent.ConcurrentHashMap; import lombok.AccessLevel; import lombok.AllArgsConstructor; import org.warp.filesponge.value.MirrorURI; +import org.warp.filesponge.reactor.ConcurrentAsyncSet; +import org.warp.filesponge.reactor.AsyncSet; import reactor.core.publisher.Mono; @AllArgsConstructor(access = AccessLevel.PUBLIC) public class MirrorAvailabilityManager { - private static final Object NO_VALUE = new Object(); - /** - * This is a set. the value is not important - */ - private final ConcurrentHashMap availableMirrors = new ConcurrentHashMap<>(); + private final AsyncSet availableMirrors = new ConcurrentAsyncSet<>(); public Mono setAllMirrorsAsUnavailable() { - return Mono.fromCallable(() -> { - availableMirrors.clear(); - return null; - }); + return availableMirrors.clear(); } public Mono setMirrorAvailability(MirrorURI mirrorURI, boolean available) { - return Mono.fromCallable(() -> { - if (available) { - availableMirrors.put(mirrorURI, NO_VALUE); - } else { - availableMirrors.remove(mirrorURI); - } - return null; - }); + if (available) { + return availableMirrors.add(mirrorURI).then(); + } else { + return availableMirrors.remove(mirrorURI).then(); + } } public Mono isMirrorAvailable(MirrorURI mirrorURI) { - return Mono.fromCallable(() -> this.availableMirrors.contains(mirrorURI)); + return this.availableMirrors.contains(mirrorURI); } } diff --git a/src/main/lombok/org/warp/filesponge/value/AsyncMultiAssociation.java b/src/main/lombok/org/warp/filesponge/reactor/AsyncMultiAssociation.java similarity index 97% rename from src/main/lombok/org/warp/filesponge/value/AsyncMultiAssociation.java rename to src/main/lombok/org/warp/filesponge/reactor/AsyncMultiAssociation.java index f7eb63b..1c4aff9 100644 --- a/src/main/lombok/org/warp/filesponge/value/AsyncMultiAssociation.java +++ b/src/main/lombok/org/warp/filesponge/reactor/AsyncMultiAssociation.java @@ -16,7 +16,7 @@ * along with this program. If not, see . */ -package org.warp.filesponge.value; +package org.warp.filesponge.reactor; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; diff --git a/src/main/lombok/org/warp/filesponge/reactor/AsyncSet.java b/src/main/lombok/org/warp/filesponge/reactor/AsyncSet.java new file mode 100644 index 0000000..ab9415c --- /dev/null +++ b/src/main/lombok/org/warp/filesponge/reactor/AsyncSet.java @@ -0,0 +1,80 @@ +/* + * FileSponge + * Copyright (C) 2021 Andrea Cavalli + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package org.warp.filesponge.reactor; + +import java.util.Set; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * Asynchronous set + * @param value type + */ +public interface AsyncSet { + + /** + * Clear the set + * + * @return void + */ + Mono clear(); + + /** + * Add element to the set + * + * @param value value to add + * @return true if added, false if it's already present. Can't be empty. + */ + Mono add(T value); + + /** + * Remove element from the set + * + * @param value value to remove + * @return true if removed, false if it's not present. Can't be empty. + */ + Mono remove(T value); + + /** + * Find if element is present in the set + * + * @param value value to find + * @return true if found, false if not. Can't be empty. + */ + Mono contains(T value); + + /** + * Get the set size + * + * @return set size, from 0 to {@value Integer#MAX_VALUE}. Can't be empty. + */ + Mono size(); + + /** + * Get all values + * @return values, in a flux. Can be empty. + */ + Flux toFlux(); + + /** + * Get all values + * @return values, in a set. Can't be empty. + */ + Mono> toSet(); +} diff --git a/src/main/lombok/org/warp/filesponge/reactor/ConcurrentAsyncSet.java b/src/main/lombok/org/warp/filesponge/reactor/ConcurrentAsyncSet.java new file mode 100644 index 0000000..2273916 --- /dev/null +++ b/src/main/lombok/org/warp/filesponge/reactor/ConcurrentAsyncSet.java @@ -0,0 +1,81 @@ +/* + * FileSponge + * Copyright (C) 2021 Andrea Cavalli + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package org.warp.filesponge.reactor; + +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentHashMap.KeySetView; +import lombok.EqualsAndHashCode; +import org.warp.commonutils.concurrency.atomicity.Atomic; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@Atomic +@EqualsAndHashCode +public class ConcurrentAsyncSet implements AsyncSet { + + private final KeySetView set; + + public ConcurrentAsyncSet() { + this.set = ConcurrentHashMap.newKeySet(); + } + + @Override + public Mono clear() { + return Mono.fromCallable(() -> { + set.clear(); + return null; + }); + } + + @Override + public Mono add(T value) { + return Mono.fromCallable(() -> set.add(value)); + } + + @Override + public Mono remove(T value) { + return Mono.fromCallable(() -> set.remove(value)); + } + + @Override + public Mono contains(T value) { + return Mono.fromCallable(() -> set.contains(value)); + } + + @Override + public Mono size() { + return Mono.fromCallable(set::size); + } + + @Override + public Flux toFlux() { + return Flux.fromStream(set::stream); + } + + @Override + public Mono> toSet() { + return Mono.fromCallable(() -> Set.copyOf(set)); + } + + @Override + public String toString() { + return set.toString(); + } +} diff --git a/src/main/lombok/org/warp/filesponge/reactor/HashAsyncSet.java b/src/main/lombok/org/warp/filesponge/reactor/HashAsyncSet.java new file mode 100644 index 0000000..f228456 --- /dev/null +++ b/src/main/lombok/org/warp/filesponge/reactor/HashAsyncSet.java @@ -0,0 +1,88 @@ +/* + * FileSponge + * Copyright (C) 2021 Andrea Cavalli + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package org.warp.filesponge.reactor; + +import java.util.HashSet; +import java.util.Set; +import lombok.EqualsAndHashCode; +import org.warp.commonutils.concurrency.atomicity.NotAtomic; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@NotAtomic +@EqualsAndHashCode +public class HashAsyncSet implements AsyncSet { + + private final HashSet set; + + public HashAsyncSet() { + this.set = new HashSet<>(); + } + + public HashAsyncSet(HashSet set) { + this.set = set; + } + + @Override + public Mono clear() { + return Mono.fromCallable(() -> { + set.clear(); + return null; + }); + } + + @Override + public Mono add(T value) { + return Mono.fromCallable(() -> set.add(value)); + } + + @Override + public Mono remove(T value) { + return Mono.fromCallable(() -> set.remove(value)); + } + + @Override + public Mono contains(T value) { + return Mono.fromCallable(() -> set.contains(value)); + } + + @Override + public Mono size() { + return Mono.fromCallable(set::size); + } + + @Override + public Flux toFlux() { + return Flux.fromStream(set::stream); + } + + @Override + public Mono> toSet() { + return Mono.fromCallable(() -> Set.copyOf(set)); + } + + @Override + public String toString() { + return set.toString(); + } + + public SynchronizedHashAsyncSet synchronize() { + return new SynchronizedHashAsyncSet<>(this); + } +} diff --git a/src/main/lombok/org/warp/filesponge/reactor/SynchronizedHashAsyncSet.java b/src/main/lombok/org/warp/filesponge/reactor/SynchronizedHashAsyncSet.java new file mode 100644 index 0000000..7eb44af --- /dev/null +++ b/src/main/lombok/org/warp/filesponge/reactor/SynchronizedHashAsyncSet.java @@ -0,0 +1,83 @@ +/* + * FileSponge + * Copyright (C) 2021 Andrea Cavalli + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package org.warp.filesponge.reactor; + +import java.util.Set; +import lombok.EqualsAndHashCode; +import org.warp.commonutils.concurrency.atomicity.NotAtomic; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; + +@EqualsAndHashCode +@NotAtomic +public class SynchronizedHashAsyncSet implements AsyncSet { + + private transient final Scheduler scheduler = Schedulers.single(); + private final HashAsyncSet set; + + public SynchronizedHashAsyncSet() { + this.set = new HashAsyncSet<>(); + } + + public SynchronizedHashAsyncSet(HashAsyncSet set) { + this.set = set; + } + + @Override + public Mono clear() { + return set.clear().subscribeOn(scheduler); + } + + @Override + public Mono add(T value) { + return set.add(value).subscribeOn(scheduler); + } + + @Override + public Mono remove(T value) { + return set.remove(value).subscribeOn(scheduler); + } + + @Override + public Mono contains(T value) { + return set.contains(value).subscribeOn(scheduler); + } + + @Override + public Mono size() { + return set.size().subscribeOn(scheduler); + } + + @Override + public Flux toFlux() { + return set.toFlux().subscribeOn(scheduler); + } + + @Override + public Mono> toSet() { + return set.toSet().subscribeOn(scheduler); + } + + @Override + public String toString() { + return set.toString(); + } +}