Added FloatPriorityQueue

This commit is contained in:
Andrea Cavalli 2020-07-28 23:01:43 +02:00
parent 1e22e016bf
commit 92e944ad97
3 changed files with 371 additions and 26 deletions

View File

@ -7,6 +7,7 @@ import java.util.Comparator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
@ -16,6 +17,8 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.warp.commonutils.type.FloatPriorityQueue;
import org.warp.commonutils.type.ScoredValue;
public class CompletableFutureUtils {
@ -26,7 +29,7 @@ public class CompletableFutureUtils {
* @param <T> List elements type
* @return {@link CompletableFuture} list
*/
public static <T> CompletableFuture<List<T>> aggregate(Collection<CompletableFuture<List<T>>> futureLists) {
public static <T> CompletableFuture<List<T>> aggregateList(Collection<CompletableFuture<List<T>>> futureLists) {
final CompletableFuture<List<T>> identityAggregatedResult = CompletableFuture.completedFuture(new ArrayList<T>());
return futureLists.parallelStream().reduce(identityAggregatedResult, (currentAggregatedResult, futureList) -> {
@ -50,7 +53,7 @@ public class CompletableFutureUtils {
* @param result
* @param <T>
*/
public static <T> void addDisaggregated(
public static <T> void addDisaggregatedList(
Collection<CompletableFuture<List<CompletableFuture<T>>>> disaggregatedResults,
CompletableFuture<List<CompletableFuture<T>>> result) {
disaggregatedResults.add(result);
@ -59,13 +62,13 @@ public class CompletableFutureUtils {
/**
* Add a result
*/
public static <T, U extends T> void addDisaggregatedCast(
public static <T, U extends T> void addDisaggregatedListCast(
Collection<CompletableFuture<List<CompletableFuture<T>>>> disaggregatedResults,
CompletableFuture<List<CompletableFuture<U>>> result) {
addDisaggregatedCastForced(disaggregatedResults, result);
addDisaggregatedListCastForced(disaggregatedResults, result);
}
public static <T, U> void addDisaggregatedCastForced(
public static <T, U> void addDisaggregatedListCastForced(
Collection<CompletableFuture<List<CompletableFuture<T>>>> disaggregatedResults,
CompletableFuture<List<CompletableFuture<U>>> result) {
disaggregatedResults.add(result.thenApply((originalList) -> {
@ -80,61 +83,152 @@ public class CompletableFutureUtils {
}));
}
public static <T> Set<T> collectToSet(CompletableFuture<List<CompletableFuture<T>>> futureList) {
/**
* Aggregate multiple {@link CompletableFuture} lists into a single {@link CompletableFuture} list
*
* @param futureFloatPriorityQueues A collection of {@link CompletableFuture} lists.
* @param <T> List elements type
* @return {@link CompletableFuture} list
*/
public static <T> CompletableFuture<FloatPriorityQueue<T>> aggregatePq(Collection<CompletableFuture<FloatPriorityQueue<T>>> futureFloatPriorityQueues) {
final CompletableFuture<FloatPriorityQueue<T>> identityAggregatedResult = CompletableFuture.completedFuture(new FloatPriorityQueue<>());
return futureFloatPriorityQueues.parallelStream().reduce(identityAggregatedResult, (currentAggregatedResult, futureFloatPriorityQueue) -> {
return currentAggregatedResult.thenApplyAsync((aggregatedFloatPriorityQueue) -> {
var futureFloatPriorityQueueValues = futureFloatPriorityQueue.join();
if (futureFloatPriorityQueueValues == aggregatedFloatPriorityQueue) {
return aggregatedFloatPriorityQueue;
}
futureFloatPriorityQueueValues.forEachItem(aggregatedFloatPriorityQueue::offer);
return aggregatedFloatPriorityQueue;
});
});
}
/**
* Creates a new empty collection of disaggregated future results future lists
*/
public static <T> Collection<CompletableFuture<FloatPriorityQueue<CompletableFuture<T>>>> createDisaggregatedResultsPq() {
return FloatPriorityQueue.synchronizedPq(10);
}
/**
* Add a
* @param disaggregatedResults
* @param result
* @param <T>
*/
public static <T> void addDisaggregatedPq(
Collection<CompletableFuture<FloatPriorityQueue<CompletableFuture<T>>>> disaggregatedResults,
CompletableFuture<FloatPriorityQueue<CompletableFuture<T>>> result) {
disaggregatedResults.add(result);
}
/**
* Add a result
*/
public static <T, U extends T> void addDisaggregatedPqCast(
Collection<CompletableFuture<FloatPriorityQueue<CompletableFuture<T>>>> disaggregatedResults,
CompletableFuture<FloatPriorityQueue<CompletableFuture<U>>> result) {
addDisaggregatedPqCastForced(disaggregatedResults, result);
}
public static <T, U> void addDisaggregatedPqCastForced(
Collection<CompletableFuture<FloatPriorityQueue<CompletableFuture<T>>>> disaggregatedResults,
CompletableFuture<FloatPriorityQueue<CompletableFuture<U>>> result) {
disaggregatedResults.add(result.thenApply((originalFloatPriorityQueue) -> {
FloatPriorityQueue<CompletableFuture<T>> resultFloatPriorityQueue = new FloatPriorityQueue<>();
originalFloatPriorityQueue.forEachItem((originalFuture) -> {
resultFloatPriorityQueue.offer(ScoredValue.of(originalFuture.getScore(),
originalFuture.getValue().thenApply((originalValue) -> {
//noinspection unchecked
return (T) originalValue;
})
));
});
return resultFloatPriorityQueue;
}));
}
public static <T> Set<T> collectToSet(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList) {
return futureList.join().parallelStream().map(CompletableFuture::join).collect(Collectors.toSet());
}
public static <T> Set<T> collectToSet(CompletableFuture<List<CompletableFuture<T>>> futureList, int limit) {
public static <T> Set<T> collectToSet(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList, int limit) {
return futureList.join().parallelStream().map(CompletableFuture::join).limit(10).collect(Collectors.toSet());
}
public static <T> List<T> collectToList(CompletableFuture<List<CompletableFuture<T>>> futureList) {
public static <T> List<T> collectToList(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList) {
return futureList.join().stream().map(CompletableFuture::join).collect(Collectors.toList());
}
public static <T> List<T> collectToList(CompletableFuture<List<CompletableFuture<T>>> futureList, int limit) {
public static <T> List<T> collectToList(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList, int limit) {
return futureList.join().stream().map(CompletableFuture::join).limit(limit).collect(Collectors.toList());
}
public static <T> LinkedHashSet<T> collectToLinkedSet(CompletableFuture<List<CompletableFuture<T>>> futureList) {
public static <T> LinkedHashSet<T> collectToLinkedSet(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList) {
return futureList.join().stream().map(CompletableFuture::join).collect(Collectors.toCollection(LinkedHashSet::new));
}
public static <T> LinkedHashSet<T> collectToLinkedSet(CompletableFuture<List<CompletableFuture<T>>> futureList,
public static <T> LinkedHashSet<T> collectToLinkedSet(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList,
int limit) {
return futureList.join().stream().map(CompletableFuture::join).limit(limit)
.collect(Collectors.toCollection(LinkedHashSet::new));
}
public static <T> TreeSet<T> collectToTreeSet(CompletableFuture<List<CompletableFuture<T>>> futureList) {
public static <T> FloatPriorityQueue<T> collectToPq(CompletableFuture<? extends FloatPriorityQueue<CompletableFuture<T>>> futureList) {
var internalPq = futureList.join().streamItems().map(t -> {
if (t.getValue() != null) {
return ScoredValue.of(t.getScore(), t.getValue().join());
} else {
return ScoredValue.of(t.getScore(), (T) null);
}
}).collect(Collectors.toCollection(PriorityQueue::new));
return new FloatPriorityQueue<>(internalPq);
}
public static <T> FloatPriorityQueue<T> collectToPq(CompletableFuture<? extends FloatPriorityQueue<CompletableFuture<T>>> futureList,
int limit) {
var internalPq = futureList.join().streamItems().map(t -> {
if (t.getValue() != null) {
return ScoredValue.of(t.getScore(), t.getValue().join());
} else {
return ScoredValue.of(t.getScore(), (T) null);
}
}).limit(limit).collect(Collectors.toCollection(PriorityQueue::new));
return new FloatPriorityQueue<>(internalPq);
}
public static <T> TreeSet<T> collectToTreeSet(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList) {
return futureList.join().stream().map(CompletableFuture::join).collect(Collectors.toCollection(TreeSet::new));
}
public static <T> TreeSet<T> collectToTreeSet(CompletableFuture<List<CompletableFuture<T>>> futureList, int limit) {
public static <T> TreeSet<T> collectToTreeSet(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList, int limit) {
return futureList.join().stream().map(CompletableFuture::join).limit(limit)
.collect(Collectors.toCollection(TreeSet::new));
}
public static <T> TreeSet<T> collectToTreeSet(CompletableFuture<List<CompletableFuture<T>>> futureList, Comparator<T> comparator) {
public static <T> TreeSet<T> collectToTreeSet(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList, Comparator<T> comparator) {
return futureList.join().stream().map(CompletableFuture::join).collect(Collectors.toCollection(() -> new TreeSet<>(comparator)));
}
public static <T> TreeSet<T> collectToTreeSet(CompletableFuture<List<CompletableFuture<T>>> futureList, Comparator<T> comparator, int limit) {
public static <T> TreeSet<T> collectToTreeSet(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList, Comparator<T> comparator, int limit) {
return futureList.join().stream().map(CompletableFuture::join).limit(limit)
.collect(Collectors.toCollection(() -> new TreeSet<>(comparator)));
}
public static <T> Optional<T> anyOrNull(CompletableFuture<List<CompletableFuture<T>>> futureList) {
public static <T> Optional<T> anyOrNull(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList) {
return futureList.join().parallelStream().map(CompletableFuture::join).findAny();
}
public static <T> Optional<T> firstOrNull(CompletableFuture<List<CompletableFuture<T>>> futureList) {
public static <T> Optional<T> firstOrNull(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList) {
return futureList.join().stream().map(CompletableFuture::join).findFirst();
}
public static <T> void forEachOrdered(CompletableFuture<List<CompletableFuture<T>>> futureList,
public static <T> void forEachOrdered(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList,
Consumer<T> consumer) {
forEachOrdered(futureList, consumer, false);
var futures = futureList.join();
futures.stream().map(CompletableFuture::join).forEachOrdered(consumer);
}
public static <T> void forEachOrdered(CompletableFuture<List<CompletableFuture<T>>> futureList,
@ -146,13 +240,7 @@ public class CompletableFutureUtils {
futures.stream().map(CompletableFuture::join).forEachOrdered(consumer);
}
public static <T> void forEachOrderedSet(CompletableFuture<LinkedHashSet<CompletableFuture<T>>> futureList,
Consumer<T> consumer) {
var futures = futureList.join();
futures.stream().map(CompletableFuture::join).forEachOrdered(consumer);
}
public static <T> void forEach(CompletableFuture<List<CompletableFuture<T>>> futureList, Consumer<T> consumer) {
public static <T> void forEach(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList, Consumer<T> consumer) {
futureList.join().parallelStream().map(CompletableFuture::join).forEach(consumer);
}

View File

@ -0,0 +1,205 @@
package org.warp.commonutils.type;
import com.google.common.collect.Queues;
import java.lang.reflect.Array;
import java.util.Collection;
import java.util.Iterator;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jetbrains.annotations.NotNull;
public class FloatPriorityQueue<T> implements Queue<T> {
private final Queue<ScoredValue<T>> internalQueue;
public static <T> FloatPriorityQueue<T> of() {
return new FloatPriorityQueue<T>(0);
}
public static <T> FloatPriorityQueue<T> of(T value, float score) {
var pq = new FloatPriorityQueue<T>(1);
pq.offer(value);
return pq;
}
public static <T> FloatPriorityQueue<T> of(ScoredValue<T> value) {
var pq = new FloatPriorityQueue<T>(1);
pq.offer(value);
return pq;
}
public static <T> FloatPriorityQueue<T> of(ScoredValue<T>... values) {
var pq = new FloatPriorityQueue<T>(values.length);
for (ScoredValue<T> value : values) {
pq.offer(value);
}
return pq;
}
public FloatPriorityQueue(PriorityQueue<ScoredValue<T>> internalQueue) {
this.internalQueue = internalQueue;
}
private FloatPriorityQueue(Queue<ScoredValue<T>> internalQueue) {
this.internalQueue = internalQueue;
}
public FloatPriorityQueue() {
internalQueue = new PriorityQueue<>();
}
public FloatPriorityQueue(int initialCapacity) {
internalQueue = new PriorityQueue<>(Math.max(1, initialCapacity));
}
public static <T> FloatPriorityQueue<T> synchronize(FloatPriorityQueue<T> queue) {
return new FloatPriorityQueue<T>(Queues.synchronizedQueue(queue.internalQueue));
}
public static <T> FloatPriorityQueue<T> synchronizedPq(int initialCapacity) {
return new FloatPriorityQueue<T>(Queues.synchronizedQueue(new PriorityQueue<>(Math.max(1, initialCapacity))));
}
@Override
public int size() {
return internalQueue.size();
}
@Override
public boolean isEmpty() {
return internalQueue.isEmpty();
}
@Override
public boolean contains(Object o) {
return internalQueue.contains(ScoredValue.of(0, o));
}
@NotNull
@Override
public Iterator<T> iterator() {
var it = internalQueue.iterator();
return new Iterator<T>() {
@Override
public boolean hasNext() {
return it.hasNext();
}
@Override
public T next() {
return getValueOrNull(it.next());
}
};
}
private T getValueOrNull(ScoredValue<T> scoredValue) {
if (scoredValue == null) {
return null;
} else {
return scoredValue.getValue();
}
}
@NotNull
@Override
public Object[] toArray() {
return internalQueue.stream().map(this::getValueOrNull).toArray(Object[]::new);
}
@NotNull
@Override
public <T1> T1[] toArray(@NotNull T1[] a) {
return internalQueue
.stream()
.map(this::getValueOrNull)
.toArray(i -> (T1[]) Array.newInstance(a.getClass().getComponentType(), i));
}
@Deprecated
@Override
public boolean add(T t) {
return internalQueue.add(ScoredValue.of(0, t));
}
public boolean addTop(T t) {
return internalQueue.add(ScoredValue.of(Integer.MAX_VALUE, t));
}
public boolean add(T t, float score) {
return internalQueue.add(ScoredValue.of(score, t));
}
@Override
public boolean remove(Object o) {
return internalQueue.remove(ScoredValue.of(0, o));
}
@Override
public boolean containsAll(@NotNull Collection<?> c) {
return internalQueue.containsAll(c.stream().map(val -> ScoredValue.of(0, val)).collect(Collectors.toList()));
}
@Override
public boolean addAll(@NotNull Collection<? extends T> c) {
return internalQueue.addAll(c.stream().map(val -> ScoredValue.of(0, (T) val)).collect(Collectors.toList()));
}
@Override
public boolean removeAll(@NotNull Collection<?> c) {
return internalQueue.removeAll(c.stream().map(val -> ScoredValue.of(0, val)).collect(Collectors.toList()));
}
@Override
public boolean retainAll(@NotNull Collection<?> c) {
return internalQueue.retainAll(c.stream().map(val -> ScoredValue.of(0, val)).collect(Collectors.toList()));
}
@Override
public void clear() {
internalQueue.clear();
}
@Override
public boolean offer(T t) {
return offer(ScoredValue.of(0, t));
}
public boolean offer(T t, float score) {
return offer(ScoredValue.of(score, t));
}
public boolean offer(ScoredValue<T> value) {
return this.internalQueue.offer(value);
}
@Override
public T remove() {
return getValueOrNull(internalQueue.remove());
}
@Override
public T poll() {
return getValueOrNull(internalQueue.poll());
}
@Override
public T element() {
return getValueOrNull(internalQueue.element());
}
@Override
public T peek() {
return getValueOrNull(internalQueue.peek());
}
public void forEachItem(Consumer<ScoredValue<T>> action) {
internalQueue.forEach(action);
}
public Stream<ScoredValue<T>> streamItems() {
return internalQueue.stream();
}
}

View File

@ -0,0 +1,52 @@
package org.warp.commonutils.type;
import java.util.Objects;
import org.jetbrains.annotations.NotNull;
public final class ScoredValue<T> implements Comparable<ScoredValue<T>> {
private final float score;
private final T value;
private ScoredValue(float score, T value) {
this.score = score;
this.value = value;
}
public static <T> ScoredValue<T> of(float score, T value) {
return new ScoredValue<T>(score, value);
}
@Override
public int compareTo(@NotNull ScoredValue<T> o) {
return Float.compare(this.score, o.score);
}
public float getScore() {
return this.score;
}
public T getValue() {
return this.value;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ScoredValue<?> that = (ScoredValue<?>) o;
return Objects.equals(value, that.value);
}
@Override
public int hashCode() {
return Objects.hash(value);
}
public String toString() {
return "ScoredValue(score=" + this.getScore() + ", value=" + this.getValue() + ")";
}
}