From 92e944ad975519f4c069b49c1a01efc1eea67022 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Tue, 28 Jul 2020 23:01:43 +0200 Subject: [PATCH] Added FloatPriorityQueue --- .../future/CompletableFutureUtils.java | 140 +++++++++--- .../commonutils/type/FloatPriorityQueue.java | 205 ++++++++++++++++++ .../warp/commonutils/type/ScoredValue.java | 52 +++++ 3 files changed, 371 insertions(+), 26 deletions(-) create mode 100644 src/main/java/org/warp/commonutils/type/FloatPriorityQueue.java create mode 100644 src/main/java/org/warp/commonutils/type/ScoredValue.java diff --git a/src/main/java/org/warp/commonutils/concurrency/future/CompletableFutureUtils.java b/src/main/java/org/warp/commonutils/concurrency/future/CompletableFutureUtils.java index cfc74a6..f9c4268 100644 --- a/src/main/java/org/warp/commonutils/concurrency/future/CompletableFutureUtils.java +++ b/src/main/java/org/warp/commonutils/concurrency/future/CompletableFutureUtils.java @@ -7,6 +7,7 @@ import java.util.Comparator; import java.util.LinkedHashSet; import java.util.List; import java.util.Optional; +import java.util.PriorityQueue; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.CompletableFuture; @@ -16,6 +17,8 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; +import org.warp.commonutils.type.FloatPriorityQueue; +import org.warp.commonutils.type.ScoredValue; public class CompletableFutureUtils { @@ -26,7 +29,7 @@ public class CompletableFutureUtils { * @param List elements type * @return {@link CompletableFuture} list */ - public static CompletableFuture> aggregate(Collection>> futureLists) { + public static CompletableFuture> aggregateList(Collection>> futureLists) { final CompletableFuture> identityAggregatedResult = CompletableFuture.completedFuture(new ArrayList()); return futureLists.parallelStream().reduce(identityAggregatedResult, (currentAggregatedResult, futureList) -> { @@ -50,7 +53,7 @@ public class CompletableFutureUtils { * @param result * @param */ - public static void addDisaggregated( + public static void addDisaggregatedList( Collection>>> disaggregatedResults, CompletableFuture>> result) { disaggregatedResults.add(result); @@ -59,13 +62,13 @@ public class CompletableFutureUtils { /** * Add a result */ - public static void addDisaggregatedCast( + public static void addDisaggregatedListCast( Collection>>> disaggregatedResults, CompletableFuture>> result) { - addDisaggregatedCastForced(disaggregatedResults, result); + addDisaggregatedListCastForced(disaggregatedResults, result); } - public static void addDisaggregatedCastForced( + public static void addDisaggregatedListCastForced( Collection>>> disaggregatedResults, CompletableFuture>> result) { disaggregatedResults.add(result.thenApply((originalList) -> { @@ -80,61 +83,152 @@ public class CompletableFutureUtils { })); } - public static Set collectToSet(CompletableFuture>> futureList) { + /** + * Aggregate multiple {@link CompletableFuture} lists into a single {@link CompletableFuture} list + * + * @param futureFloatPriorityQueues A collection of {@link CompletableFuture} lists. + * @param List elements type + * @return {@link CompletableFuture} list + */ + public static CompletableFuture> aggregatePq(Collection>> futureFloatPriorityQueues) { + final CompletableFuture> identityAggregatedResult = CompletableFuture.completedFuture(new FloatPriorityQueue<>()); + + return futureFloatPriorityQueues.parallelStream().reduce(identityAggregatedResult, (currentAggregatedResult, futureFloatPriorityQueue) -> { + return currentAggregatedResult.thenApplyAsync((aggregatedFloatPriorityQueue) -> { + var futureFloatPriorityQueueValues = futureFloatPriorityQueue.join(); + if (futureFloatPriorityQueueValues == aggregatedFloatPriorityQueue) { + return aggregatedFloatPriorityQueue; + } + futureFloatPriorityQueueValues.forEachItem(aggregatedFloatPriorityQueue::offer); + return aggregatedFloatPriorityQueue; + }); + }); + } + + /** + * Creates a new empty collection of disaggregated future results future lists + */ + public static Collection>>> createDisaggregatedResultsPq() { + return FloatPriorityQueue.synchronizedPq(10); + } + + /** + * Add a + * @param disaggregatedResults + * @param result + * @param + */ + public static void addDisaggregatedPq( + Collection>>> disaggregatedResults, + CompletableFuture>> result) { + disaggregatedResults.add(result); + } + + /** + * Add a result + */ + public static void addDisaggregatedPqCast( + Collection>>> disaggregatedResults, + CompletableFuture>> result) { + addDisaggregatedPqCastForced(disaggregatedResults, result); + } + + public static void addDisaggregatedPqCastForced( + Collection>>> disaggregatedResults, + CompletableFuture>> result) { + disaggregatedResults.add(result.thenApply((originalFloatPriorityQueue) -> { + FloatPriorityQueue> resultFloatPriorityQueue = new FloatPriorityQueue<>(); + originalFloatPriorityQueue.forEachItem((originalFuture) -> { + resultFloatPriorityQueue.offer(ScoredValue.of(originalFuture.getScore(), + originalFuture.getValue().thenApply((originalValue) -> { + //noinspection unchecked + return (T) originalValue; + }) + )); + }); + return resultFloatPriorityQueue; + })); + } + + public static Set collectToSet(CompletableFuture>> futureList) { return futureList.join().parallelStream().map(CompletableFuture::join).collect(Collectors.toSet()); } - public static Set collectToSet(CompletableFuture>> futureList, int limit) { + public static Set collectToSet(CompletableFuture>> futureList, int limit) { return futureList.join().parallelStream().map(CompletableFuture::join).limit(10).collect(Collectors.toSet()); } - public static List collectToList(CompletableFuture>> futureList) { + public static List collectToList(CompletableFuture>> futureList) { return futureList.join().stream().map(CompletableFuture::join).collect(Collectors.toList()); } - public static List collectToList(CompletableFuture>> futureList, int limit) { + public static List collectToList(CompletableFuture>> futureList, int limit) { return futureList.join().stream().map(CompletableFuture::join).limit(limit).collect(Collectors.toList()); } - public static LinkedHashSet collectToLinkedSet(CompletableFuture>> futureList) { + public static LinkedHashSet collectToLinkedSet(CompletableFuture>> futureList) { return futureList.join().stream().map(CompletableFuture::join).collect(Collectors.toCollection(LinkedHashSet::new)); } - public static LinkedHashSet collectToLinkedSet(CompletableFuture>> futureList, + public static LinkedHashSet collectToLinkedSet(CompletableFuture>> futureList, int limit) { return futureList.join().stream().map(CompletableFuture::join).limit(limit) .collect(Collectors.toCollection(LinkedHashSet::new)); } - public static TreeSet collectToTreeSet(CompletableFuture>> futureList) { + public static FloatPriorityQueue collectToPq(CompletableFuture>> futureList) { + var internalPq = futureList.join().streamItems().map(t -> { + if (t.getValue() != null) { + return ScoredValue.of(t.getScore(), t.getValue().join()); + } else { + return ScoredValue.of(t.getScore(), (T) null); + } + }).collect(Collectors.toCollection(PriorityQueue::new)); + return new FloatPriorityQueue<>(internalPq); + } + + public static FloatPriorityQueue collectToPq(CompletableFuture>> futureList, + int limit) { + var internalPq = futureList.join().streamItems().map(t -> { + if (t.getValue() != null) { + return ScoredValue.of(t.getScore(), t.getValue().join()); + } else { + return ScoredValue.of(t.getScore(), (T) null); + } + }).limit(limit).collect(Collectors.toCollection(PriorityQueue::new)); + return new FloatPriorityQueue<>(internalPq); + } + + public static TreeSet collectToTreeSet(CompletableFuture>> futureList) { return futureList.join().stream().map(CompletableFuture::join).collect(Collectors.toCollection(TreeSet::new)); } - public static TreeSet collectToTreeSet(CompletableFuture>> futureList, int limit) { + public static TreeSet collectToTreeSet(CompletableFuture>> futureList, int limit) { return futureList.join().stream().map(CompletableFuture::join).limit(limit) .collect(Collectors.toCollection(TreeSet::new)); } - public static TreeSet collectToTreeSet(CompletableFuture>> futureList, Comparator comparator) { + public static TreeSet collectToTreeSet(CompletableFuture>> futureList, Comparator comparator) { return futureList.join().stream().map(CompletableFuture::join).collect(Collectors.toCollection(() -> new TreeSet<>(comparator))); } - public static TreeSet collectToTreeSet(CompletableFuture>> futureList, Comparator comparator, int limit) { + public static TreeSet collectToTreeSet(CompletableFuture>> futureList, Comparator comparator, int limit) { return futureList.join().stream().map(CompletableFuture::join).limit(limit) .collect(Collectors.toCollection(() -> new TreeSet<>(comparator))); } - public static Optional anyOrNull(CompletableFuture>> futureList) { + public static Optional anyOrNull(CompletableFuture>> futureList) { return futureList.join().parallelStream().map(CompletableFuture::join).findAny(); } - public static Optional firstOrNull(CompletableFuture>> futureList) { + public static Optional firstOrNull(CompletableFuture>> futureList) { return futureList.join().stream().map(CompletableFuture::join).findFirst(); } - public static void forEachOrdered(CompletableFuture>> futureList, + public static void forEachOrdered(CompletableFuture>> futureList, Consumer consumer) { - forEachOrdered(futureList, consumer, false); + var futures = futureList.join(); + futures.stream().map(CompletableFuture::join).forEachOrdered(consumer); } public static void forEachOrdered(CompletableFuture>> futureList, @@ -146,13 +240,7 @@ public class CompletableFutureUtils { futures.stream().map(CompletableFuture::join).forEachOrdered(consumer); } - public static void forEachOrderedSet(CompletableFuture>> futureList, - Consumer consumer) { - var futures = futureList.join(); - futures.stream().map(CompletableFuture::join).forEachOrdered(consumer); - } - - public static void forEach(CompletableFuture>> futureList, Consumer consumer) { + public static void forEach(CompletableFuture>> futureList, Consumer consumer) { futureList.join().parallelStream().map(CompletableFuture::join).forEach(consumer); } diff --git a/src/main/java/org/warp/commonutils/type/FloatPriorityQueue.java b/src/main/java/org/warp/commonutils/type/FloatPriorityQueue.java new file mode 100644 index 0000000..fe5a817 --- /dev/null +++ b/src/main/java/org/warp/commonutils/type/FloatPriorityQueue.java @@ -0,0 +1,205 @@ +package org.warp.commonutils.type; + +import com.google.common.collect.Queues; +import java.lang.reflect.Array; +import java.util.Collection; +import java.util.Iterator; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.jetbrains.annotations.NotNull; + +public class FloatPriorityQueue implements Queue { + + private final Queue> internalQueue; + + public static FloatPriorityQueue of() { + return new FloatPriorityQueue(0); + } + + public static FloatPriorityQueue of(T value, float score) { + var pq = new FloatPriorityQueue(1); + pq.offer(value); + return pq; + } + + public static FloatPriorityQueue of(ScoredValue value) { + var pq = new FloatPriorityQueue(1); + pq.offer(value); + return pq; + } + + public static FloatPriorityQueue of(ScoredValue... values) { + var pq = new FloatPriorityQueue(values.length); + for (ScoredValue value : values) { + pq.offer(value); + } + return pq; + } + + public FloatPriorityQueue(PriorityQueue> internalQueue) { + this.internalQueue = internalQueue; + } + + private FloatPriorityQueue(Queue> internalQueue) { + this.internalQueue = internalQueue; + } + + public FloatPriorityQueue() { + internalQueue = new PriorityQueue<>(); + } + + public FloatPriorityQueue(int initialCapacity) { + internalQueue = new PriorityQueue<>(Math.max(1, initialCapacity)); + } + + public static FloatPriorityQueue synchronize(FloatPriorityQueue queue) { + return new FloatPriorityQueue(Queues.synchronizedQueue(queue.internalQueue)); + } + + public static FloatPriorityQueue synchronizedPq(int initialCapacity) { + return new FloatPriorityQueue(Queues.synchronizedQueue(new PriorityQueue<>(Math.max(1, initialCapacity)))); + } + + @Override + public int size() { + return internalQueue.size(); + } + + @Override + public boolean isEmpty() { + return internalQueue.isEmpty(); + } + + @Override + public boolean contains(Object o) { + return internalQueue.contains(ScoredValue.of(0, o)); + } + + @NotNull + @Override + public Iterator iterator() { + var it = internalQueue.iterator(); + return new Iterator() { + @Override + public boolean hasNext() { + return it.hasNext(); + } + + @Override + public T next() { + return getValueOrNull(it.next()); + } + }; + } + + private T getValueOrNull(ScoredValue scoredValue) { + if (scoredValue == null) { + return null; + } else { + return scoredValue.getValue(); + } + } + + @NotNull + @Override + public Object[] toArray() { + return internalQueue.stream().map(this::getValueOrNull).toArray(Object[]::new); + } + + @NotNull + @Override + public T1[] toArray(@NotNull T1[] a) { + return internalQueue + .stream() + .map(this::getValueOrNull) + .toArray(i -> (T1[]) Array.newInstance(a.getClass().getComponentType(), i)); + } + + @Deprecated + @Override + public boolean add(T t) { + return internalQueue.add(ScoredValue.of(0, t)); + } + + public boolean addTop(T t) { + return internalQueue.add(ScoredValue.of(Integer.MAX_VALUE, t)); + } + + public boolean add(T t, float score) { + return internalQueue.add(ScoredValue.of(score, t)); + } + + @Override + public boolean remove(Object o) { + return internalQueue.remove(ScoredValue.of(0, o)); + } + + @Override + public boolean containsAll(@NotNull Collection c) { + return internalQueue.containsAll(c.stream().map(val -> ScoredValue.of(0, val)).collect(Collectors.toList())); + } + + @Override + public boolean addAll(@NotNull Collection c) { + return internalQueue.addAll(c.stream().map(val -> ScoredValue.of(0, (T) val)).collect(Collectors.toList())); + } + + @Override + public boolean removeAll(@NotNull Collection c) { + return internalQueue.removeAll(c.stream().map(val -> ScoredValue.of(0, val)).collect(Collectors.toList())); + } + + @Override + public boolean retainAll(@NotNull Collection c) { + return internalQueue.retainAll(c.stream().map(val -> ScoredValue.of(0, val)).collect(Collectors.toList())); + } + + @Override + public void clear() { + internalQueue.clear(); + } + + @Override + public boolean offer(T t) { + return offer(ScoredValue.of(0, t)); + } + + public boolean offer(T t, float score) { + return offer(ScoredValue.of(score, t)); + } + + public boolean offer(ScoredValue value) { + return this.internalQueue.offer(value); + } + + @Override + public T remove() { + return getValueOrNull(internalQueue.remove()); + } + + @Override + public T poll() { + return getValueOrNull(internalQueue.poll()); + } + + @Override + public T element() { + return getValueOrNull(internalQueue.element()); + } + + @Override + public T peek() { + return getValueOrNull(internalQueue.peek()); + } + + public void forEachItem(Consumer> action) { + internalQueue.forEach(action); + } + + public Stream> streamItems() { + return internalQueue.stream(); + } +} diff --git a/src/main/java/org/warp/commonutils/type/ScoredValue.java b/src/main/java/org/warp/commonutils/type/ScoredValue.java new file mode 100644 index 0000000..d07be69 --- /dev/null +++ b/src/main/java/org/warp/commonutils/type/ScoredValue.java @@ -0,0 +1,52 @@ +package org.warp.commonutils.type; + +import java.util.Objects; +import org.jetbrains.annotations.NotNull; + +public final class ScoredValue implements Comparable> { + private final float score; + private final T value; + + private ScoredValue(float score, T value) { + this.score = score; + this.value = value; + } + + public static ScoredValue of(float score, T value) { + return new ScoredValue(score, value); + } + + @Override + public int compareTo(@NotNull ScoredValue o) { + return Float.compare(this.score, o.score); + } + + public float getScore() { + return this.score; + } + + public T getValue() { + return this.value; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ScoredValue that = (ScoredValue) o; + return Objects.equals(value, that.value); + } + + @Override + public int hashCode() { + return Objects.hash(value); + } + + public String toString() { + return "ScoredValue(score=" + this.getScore() + ", value=" + this.getValue() + ")"; + } +}