Async Set

This commit is contained in:
Andrea Cavalli 2021-01-11 13:18:10 +01:00
parent ce3eedc6eb
commit cf26e07904
7 changed files with 344 additions and 21 deletions

View File

@ -21,7 +21,7 @@ package org.warp.filesponge;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import org.warp.filesponge.value.AsyncMultiAssociation; import org.warp.filesponge.reactor.AsyncMultiAssociation;
import org.warp.filesponge.value.FileURI; import org.warp.filesponge.value.FileURI;
import org.warp.filesponge.value.MirrorURI; import org.warp.filesponge.value.MirrorURI;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;

View File

@ -18,40 +18,31 @@
package org.warp.filesponge; package org.warp.filesponge;
import java.util.concurrent.ConcurrentHashMap;
import lombok.AccessLevel; import lombok.AccessLevel;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import org.warp.filesponge.value.MirrorURI; import org.warp.filesponge.value.MirrorURI;
import org.warp.filesponge.reactor.ConcurrentAsyncSet;
import org.warp.filesponge.reactor.AsyncSet;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
@AllArgsConstructor(access = AccessLevel.PUBLIC) @AllArgsConstructor(access = AccessLevel.PUBLIC)
public class MirrorAvailabilityManager { public class MirrorAvailabilityManager {
private static final Object NO_VALUE = new Object(); private final AsyncSet<MirrorURI> availableMirrors = new ConcurrentAsyncSet<>();
/**
* This is a set. the value is not important
*/
private final ConcurrentHashMap<MirrorURI, Object> availableMirrors = new ConcurrentHashMap<>();
public Mono<Void> setAllMirrorsAsUnavailable() { public Mono<Void> setAllMirrorsAsUnavailable() {
return Mono.fromCallable(() -> { return availableMirrors.clear();
availableMirrors.clear();
return null;
});
} }
public Mono<Void> setMirrorAvailability(MirrorURI mirrorURI, boolean available) { public Mono<Void> setMirrorAvailability(MirrorURI mirrorURI, boolean available) {
return Mono.fromCallable(() -> { if (available) {
if (available) { return availableMirrors.add(mirrorURI).then();
availableMirrors.put(mirrorURI, NO_VALUE); } else {
} else { return availableMirrors.remove(mirrorURI).then();
availableMirrors.remove(mirrorURI); }
}
return null;
});
} }
public Mono<Boolean> isMirrorAvailable(MirrorURI mirrorURI) { public Mono<Boolean> isMirrorAvailable(MirrorURI mirrorURI) {
return Mono.fromCallable(() -> this.availableMirrors.contains(mirrorURI)); return this.availableMirrors.contains(mirrorURI);
} }
} }

View File

@ -16,7 +16,7 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>. * along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
package org.warp.filesponge.value; package org.warp.filesponge.reactor;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;

View File

@ -0,0 +1,80 @@
/*
* FileSponge
* Copyright (C) 2021 Andrea Cavalli
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.warp.filesponge.reactor;
import java.util.Set;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* Asynchronous set
* @param <T> value type
*/
public interface AsyncSet<T> {
/**
* Clear the set
*
* @return void
*/
Mono<Void> clear();
/**
* Add element to the set
*
* @param value value to add
* @return true if added, false if it's already present. Can't be empty.
*/
Mono<Boolean> add(T value);
/**
* Remove element from the set
*
* @param value value to remove
* @return true if removed, false if it's not present. Can't be empty.
*/
Mono<Boolean> remove(T value);
/**
* Find if element is present in the set
*
* @param value value to find
* @return true if found, false if not. Can't be empty.
*/
Mono<Boolean> contains(T value);
/**
* Get the set size
*
* @return set size, from 0 to {@value Integer#MAX_VALUE}. Can't be empty.
*/
Mono<Integer> size();
/**
* Get all values
* @return values, in a flux. Can be empty.
*/
Flux<T> toFlux();
/**
* Get all values
* @return values, in a set. Can't be empty.
*/
Mono<Set<T>> toSet();
}

View File

@ -0,0 +1,81 @@
/*
* FileSponge
* Copyright (C) 2021 Andrea Cavalli
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.warp.filesponge.reactor;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentHashMap.KeySetView;
import lombok.EqualsAndHashCode;
import org.warp.commonutils.concurrency.atomicity.Atomic;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Atomic
@EqualsAndHashCode
public class ConcurrentAsyncSet<T> implements AsyncSet<T> {
private final KeySetView<T, Boolean> set;
public ConcurrentAsyncSet() {
this.set = ConcurrentHashMap.newKeySet();
}
@Override
public Mono<Void> clear() {
return Mono.fromCallable(() -> {
set.clear();
return null;
});
}
@Override
public Mono<Boolean> add(T value) {
return Mono.fromCallable(() -> set.add(value));
}
@Override
public Mono<Boolean> remove(T value) {
return Mono.fromCallable(() -> set.remove(value));
}
@Override
public Mono<Boolean> contains(T value) {
return Mono.fromCallable(() -> set.contains(value));
}
@Override
public Mono<Integer> size() {
return Mono.fromCallable(set::size);
}
@Override
public Flux<T> toFlux() {
return Flux.fromStream(set::stream);
}
@Override
public Mono<Set<T>> toSet() {
return Mono.fromCallable(() -> Set.copyOf(set));
}
@Override
public String toString() {
return set.toString();
}
}

View File

@ -0,0 +1,88 @@
/*
* FileSponge
* Copyright (C) 2021 Andrea Cavalli
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.warp.filesponge.reactor;
import java.util.HashSet;
import java.util.Set;
import lombok.EqualsAndHashCode;
import org.warp.commonutils.concurrency.atomicity.NotAtomic;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@NotAtomic
@EqualsAndHashCode
public class HashAsyncSet<T> implements AsyncSet<T> {
private final HashSet<T> set;
public HashAsyncSet() {
this.set = new HashSet<>();
}
public HashAsyncSet(HashSet<T> set) {
this.set = set;
}
@Override
public Mono<Void> clear() {
return Mono.fromCallable(() -> {
set.clear();
return null;
});
}
@Override
public Mono<Boolean> add(T value) {
return Mono.fromCallable(() -> set.add(value));
}
@Override
public Mono<Boolean> remove(T value) {
return Mono.fromCallable(() -> set.remove(value));
}
@Override
public Mono<Boolean> contains(T value) {
return Mono.fromCallable(() -> set.contains(value));
}
@Override
public Mono<Integer> size() {
return Mono.fromCallable(set::size);
}
@Override
public Flux<T> toFlux() {
return Flux.fromStream(set::stream);
}
@Override
public Mono<Set<T>> toSet() {
return Mono.fromCallable(() -> Set.copyOf(set));
}
@Override
public String toString() {
return set.toString();
}
public SynchronizedHashAsyncSet<T> synchronize() {
return new SynchronizedHashAsyncSet<>(this);
}
}

View File

@ -0,0 +1,83 @@
/*
* FileSponge
* Copyright (C) 2021 Andrea Cavalli
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.warp.filesponge.reactor;
import java.util.Set;
import lombok.EqualsAndHashCode;
import org.warp.commonutils.concurrency.atomicity.NotAtomic;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
@EqualsAndHashCode
@NotAtomic
public class SynchronizedHashAsyncSet<T> implements AsyncSet<T> {
private transient final Scheduler scheduler = Schedulers.single();
private final HashAsyncSet<T> set;
public SynchronizedHashAsyncSet() {
this.set = new HashAsyncSet<>();
}
public SynchronizedHashAsyncSet(HashAsyncSet<T> set) {
this.set = set;
}
@Override
public Mono<Void> clear() {
return set.clear().subscribeOn(scheduler);
}
@Override
public Mono<Boolean> add(T value) {
return set.add(value).subscribeOn(scheduler);
}
@Override
public Mono<Boolean> remove(T value) {
return set.remove(value).subscribeOn(scheduler);
}
@Override
public Mono<Boolean> contains(T value) {
return set.contains(value).subscribeOn(scheduler);
}
@Override
public Mono<Integer> size() {
return set.size().subscribeOn(scheduler);
}
@Override
public Flux<T> toFlux() {
return set.toFlux().subscribeOn(scheduler);
}
@Override
public Mono<Set<T>> toSet() {
return set.toSet().subscribeOn(scheduler);
}
@Override
public String toString() {
return set.toString();
}
}