Update Batching.java, KVSafeBatching.java, and ParallelUtils.java

This commit is contained in:
Andrea Cavalli 2020-11-10 00:43:10 +01:00
parent 8813ef3e88
commit 4b11b7fd94
3 changed files with 178 additions and 393 deletions

View File

@ -1,213 +0,0 @@
package org.warp.commonutils.batch;
import com.google.common.util.concurrent.AtomicDouble;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
public abstract class Batching<T> {
private final int pingRefreshTimeMillis;
private volatile double singleItemTransferTimeMillis;
private volatile double latencyMillis;
private final AtomicBoolean enablePacking = new AtomicBoolean(false);
private final ConcurrentLinkedQueue<ExecutorService> executors = new ConcurrentLinkedQueue<>();
private final AtomicBoolean closeRequested = new AtomicBoolean(false);
private final ReentrantLock waitingAccesLock = new ReentrantLock();
private final ConcurrentLinkedQueue<T> waitingPutItems = new ConcurrentLinkedQueue<>();
private final AtomicDouble lostTimeMillis = new AtomicDouble(0d);
private final AtomicDouble sentItems = new AtomicDouble(0);
private final double startTimeMillis = ((double) System.nanoTime()) / 1000000d;
public Batching(int pingRefreshTimeMillis) {
this.pingRefreshTimeMillis = pingRefreshTimeMillis;
refreshPing();
if (enablePacking.get()) {
ExecutorService executor = Executors.newFixedThreadPool(2);
this.executors.offer(executor);
executor.execute(this::pingRefreshExecutor);
executor.execute(new BatchSender());
}
}
private void pingRefreshExecutor() {
boolean closeReq = false;
while (!(closeReq = closeRequested.get())) {
try {
Thread.sleep(pingRefreshTimeMillis);
} catch (InterruptedException e) {
e.printStackTrace();
}
refreshPing();
}
}
private void refreshPing() {
double pingTime = ping();
this.latencyMillis = 0.9 * pingTime;
this.singleItemTransferTimeMillis = 0.1 * pingTime;
this.enablePacking.compareAndSet(false, latencyMillis > 0.1d);
}
public void offer(T action) {
if (enablePacking.get()) {
sentItems.addAndGet(1d);
waitingAccesLock.lock();
try {
waitingPutItems.offer(action);
} finally {
waitingAccesLock.unlock();
}
} else {
executeDirect(action);
}
}
public void offer(Collection<T> actions) {
if (enablePacking.get()) {
sentItems.addAndGet(actions.size());
waitingAccesLock.lock();
try {
for (T action : actions) {
waitingPutItems.offer(action);
}
} finally {
waitingAccesLock.unlock();
}
} else {
executeDirect(actions);
}
}
public void offer(T... actions) {
offer(List.of(actions));
}
protected abstract void executeBatch(Collection<T> actions);
protected void executeBatch(T action) {
executeBatch(List.of(action));
}
protected abstract void executeDirect(T action);
protected abstract void executeDirect(Collection<T> action);
protected abstract double ping();
public abstract void close();
private static final double getItemSendLongestTime(double lostTime, double latencyMillis, double waitingSize,
double singleItemTransferTimeMillis) {
return lostTime + latencyMillis + waitingSize * singleItemTransferTimeMillis;
}
private static final double getItemSendLongestTimeNext(double lostTime, double latencyMillis, double waitTime,
double waitingSize, double singleItemTransferTimeMillis, double itemsPerMillisecondIdeal) {
return lostTime + latencyMillis + waitTime + (waitingSize
+ (waitTime * itemsPerMillisecondIdeal) * singleItemTransferTimeMillis);
}
private static final double getItemsPerSecond(double waitingSize, double itemSendLongestTime) {
return waitingSize / notZero(itemSendLongestTime);
}
private static final double getAverageItemTime(double waitingSize, double itemSendLongestTime) {
return itemSendLongestTime / notZero(waitingSize);
}
private static final double getNextItemsPerSecond(double waitingSize, double nextItemSendLongestTime, double waitTime,
double itemsPerMillisecondIdeal) {
return (waitingSize + (waitTime * itemsPerMillisecondIdeal)) / notZero(nextItemSendLongestTime);
}
private static final double getNextAverageItemTime(double waitingSize, double nextItemSendLongestTime,
double waitTime, double itemsPerMillisecondIdeal) {
return nextItemSendLongestTime / notZero((waitingSize + (waitTime * itemsPerMillisecondIdeal)));
}
private static final double notZero(double input) {
if (input != 0) {
return input;
} else {
return input + 0.000000000000000000001d;
}
}
private class BatchSender implements Runnable {
@Override
public void run() {
boolean closeReq;
while ((!(closeReq = closeRequested.get())) || !waitingPutItems.isEmpty()) {
double waitTimeMillis = latencyMillis;
long waitTimeNanoMillis = (long) Math.floor(latencyMillis);
int waitTimeNanos = (int) ((waitTimeMillis - ((double) waitTimeNanoMillis)) * 1000000d);
try {
if (!closeReq) {
Thread.sleep(waitTimeNanoMillis, waitTimeNanos);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
waitingAccesLock.lock();
try {
if (!waitingPutItems.isEmpty()) {
int waitingSize = waitingPutItems.size();
double lostTime = lostTimeMillis.addAndGet(waitTimeMillis); // Get the lost time as the time
// in the middle
double idealItemsPerMillis =
sentItems.get() / notZero(((double) System.nanoTime()) / 1000000d - startTimeMillis);
double idealMillisPerItem = 1d / notZero(idealItemsPerMillis);
double itemSendLongestTime = getItemSendLongestTime(lostTime, latencyMillis, waitingSize,
singleItemTransferTimeMillis);
double itemsPerSecond = getItemsPerSecond(waitingSize, itemSendLongestTime);
double averageItemTime = getAverageItemTime(waitingSize, itemSendLongestTime);
double nextItemSendLongestTime = getItemSendLongestTimeNext(lostTime, latencyMillis, waitTimeMillis,
waitingSize, singleItemTransferTimeMillis, idealItemsPerMillis);
double nextItemsPerSecond = getNextItemsPerSecond(waitingSize, nextItemSendLongestTime, waitTimeMillis,
idealItemsPerMillis);
double nextAverageItemTime = getNextAverageItemTime(waitingSize, itemSendLongestTime, waitTimeMillis,
idealItemsPerMillis);
boolean do1 = idealMillisPerItem > latencyMillis;
boolean do2 = itemsPerSecond > nextItemsPerSecond;
boolean do3 = averageItemTime - nextAverageItemTime < latencyMillis;
boolean do4 = averageItemTime > 5;
boolean doThisTurn = do1 | do2 | do3 | do4 || closeReq;
if (doThisTurn) {
lostTimeMillis.set(0);
if (waitingSize > 1) {
executeBatch(waitingPutItems);
} else {
T pair = waitingPutItems.poll();
executeBatch(pair);
}
if ((System.nanoTime() % 100) < 1) {
System.out.printf("LATENCY=%.2f; WAITED=%.2f; PACKET_SIZE=%.2f; AVG_ITEM_TIME=%.2f; "
+ "NEXT_AVG_ITEM_TIME=%.2f; DO=%s,%s,%s\n", latencyMillis, lostTime, (double) waitingSize,
averageItemTime, nextAverageItemTime, "" + do1, "" + do2, "" + do3);
System.out.printf("idealMillisPerItem=%.2f; itemsPerSecond=%.2f; nextItemsPerSecond=%"
+ ".2f; averageItemTime-nextAverageItemTime=%.2f\n", idealItemsPerMillis, itemsPerSecond,
nextItemsPerSecond, averageItemTime - nextAverageItemTime);
}
waitingPutItems.clear();
} else {
if ((System.nanoTime() % 100) < 1) {
System.out.println("SKIPPED TURN");
}
}
}
} finally {
waitingAccesLock.unlock();
}
}
}
}
}

View File

@ -1,80 +0,0 @@
package org.warp.commonutils.batch;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
public abstract class KVSafeBatching<T, U> extends Batching<Pair<T, U>> {
public KVSafeBatching(int pingRefreshTimeMillis) {
super(pingRefreshTimeMillis);
}
@Deprecated
@Override
public void offer(Pair<T, U>... actions) {
offer(List.of(actions));
}
@Deprecated
@Override
public void offer(Collection<Pair<T, U>> actions) {
Object[] keys = new Object[actions.size()];
Object[] values = new Object[actions.size()];
int i = 0;
for (Pair<T, U> action : actions) {
keys[i] = action.getKey();
values[i] = action.getValue();
i++;
}
offer_(keys, values);
}
public void offer(T key, U value) {
this.offer_(key, value);
}
public void offer(T[] keys, U[] values) {
if (keys.length == 1 && values.length == 1) {
this.offer_(keys[0], values[0]);
} else {
this.offer_(keys, values);
}
}
private void offer_(T key, U value) {
super.offer(Pair.of(key, value));
}
private void offer_(Object[] keys, Object[] values) {
if (keys.length != values.length) {
throw new IllegalArgumentException("Keys and values count must be the same.");
}
List<Pair<T, U>> pairs = new ArrayList<>(keys.length);
for (int i = 0; i < keys.length; i++) {
pairs.add(Pair.of((T) keys[i], (U) values[i]));
}
super.offer(pairs);
}
@Override
protected void executeBatch(Collection<Pair<T, U>> actions) {
}
@Override
protected void executeDirect(Pair<T, U> action) {
}
@Override
protected void executeDirect(Collection<Pair<T, U>> action) {
}
@Override
public void close() {
}
}

View File

@ -6,6 +6,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Consumer; import java.util.function.Consumer;
import org.warp.commonutils.concurrency.executor.BlockingOnFullQueueExecutorServiceDecorator;
import org.warp.commonutils.concurrency.executor.BoundedExecutorService; import org.warp.commonutils.concurrency.executor.BoundedExecutorService;
import org.warp.commonutils.functional.TriConsumer; import org.warp.commonutils.functional.TriConsumer;
import org.warp.commonutils.type.IntWrapper; import org.warp.commonutils.type.IntWrapper;
@ -29,7 +30,9 @@ public class ParallelUtils {
IntWrapper count = new IntWrapper(CHUNK_SIZE); IntWrapper count = new IntWrapper(CHUNK_SIZE);
VariableWrapper<Object[]> values = new VariableWrapper<>(new Object[CHUNK_SIZE]); VariableWrapper<Object[]> values = new VariableWrapper<>(new Object[CHUNK_SIZE]);
AtomicReference<CompletionException> firstExceptionReference = new AtomicReference<>(null); AtomicReference<CompletionException> firstExceptionReference = new AtomicReference<>(null);
final Object arraysAccessLock = new Object();
iterator.accept((value) -> { iterator.accept((value) -> {
synchronized (arraysAccessLock) {
var firstException = firstExceptionReference.get(); var firstException = firstExceptionReference.get();
if (firstException != null) { if (firstException != null) {
throw firstException; throw firstException;
@ -38,22 +41,7 @@ public class ParallelUtils {
values.var[CHUNK_SIZE - count.var] = value; values.var[CHUNK_SIZE - count.var] = value;
count.var--; count.var--;
if (count.var == 0) { if (count.var == 0) {
count.var = CHUNK_SIZE; sendChunkItems(values, CHUNK_SIZE, count, consumer, parallelExecutor, firstExceptionReference);
Object[] valuesCopy = values.var;
values.var = new Object[CHUNK_SIZE];
try {
parallelExecutor.execute(() -> {
for (int i = 0; i < CHUNK_SIZE; i++) {
try {
//noinspection unchecked
consumer.accept((V) valuesCopy[i]);
} catch (Exception ex) {
firstExceptionReference.compareAndSet(null, new CompletionException(ex));
}
}
});
} catch (RejectedExecutionException e) {
throw new CompletionException(e);
} }
} }
}); });
@ -63,6 +51,11 @@ public class ParallelUtils {
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException("Parallel forEach interrupted", e); throw new RuntimeException("Parallel forEach interrupted", e);
} }
synchronized (arraysAccessLock) {
if (count.var > 0) {
sendChunkItems(values, CHUNK_SIZE, count, consumer, null, firstExceptionReference);
}
}
var firstException = firstExceptionReference.get(); var firstException = firstExceptionReference.get();
if (firstException != null) { if (firstException != null) {
@ -70,10 +63,44 @@ public class ParallelUtils {
} }
} }
private static <V> void sendChunkItems(VariableWrapper<Object[]> values,
int CHUNK_SIZE,
IntWrapper count,
Consumer<V> consumer,
BlockingOnFullQueueExecutorServiceDecorator parallelExecutor,
AtomicReference<CompletionException> firstExceptionReference) {
var itemsCount = CHUNK_SIZE - count.var;
count.var = CHUNK_SIZE;
Object[] valuesCopy = values.var;
values.var = new Object[itemsCount];
try {
Runnable action = () -> {
for (int i = 0; i < itemsCount; i++) {
try {
//noinspection unchecked
consumer.accept((V) valuesCopy[i]);
} catch (Exception ex) {
firstExceptionReference.compareAndSet(null, new CompletionException(ex));
}
}
};
if (parallelExecutor != null) {
parallelExecutor.execute(action);
} else {
action.run();
}
} catch (RejectedExecutionException e) {
throw new CompletionException(e);
}
}
public static <K, V> void parallelize(Consumer<BiConsumer<K, V>> iterator, public static <K, V> void parallelize(Consumer<BiConsumer<K, V>> iterator,
int maxQueueSize, int maxQueueSize,
int parallelism, int parallelism,
int groupSize, BiConsumer<K, V> consumer) throws CompletionException { int groupSize, BiConsumer<K, V> consumer) throws CompletionException {
if (parallelism <= 1) {
iterator.accept(consumer);
} else {
var parallelExecutor = BoundedExecutorService.create(maxQueueSize, var parallelExecutor = BoundedExecutorService.create(maxQueueSize,
parallelism, parallelism,
0, 0,
@ -86,7 +113,9 @@ public class ParallelUtils {
VariableWrapper<Object[]> keys = new VariableWrapper<>(new Object[CHUNK_SIZE]); VariableWrapper<Object[]> keys = new VariableWrapper<>(new Object[CHUNK_SIZE]);
VariableWrapper<Object[]> values = new VariableWrapper<>(new Object[CHUNK_SIZE]); VariableWrapper<Object[]> values = new VariableWrapper<>(new Object[CHUNK_SIZE]);
AtomicReference<CompletionException> firstExceptionReference = new AtomicReference<>(null); AtomicReference<CompletionException> firstExceptionReference = new AtomicReference<>(null);
final Object arraysAccessLock = new Object();
iterator.accept((key, value) -> { iterator.accept((key, value) -> {
synchronized (arraysAccessLock) {
var firstException = firstExceptionReference.get(); var firstException = firstExceptionReference.get();
if (firstException != null) { if (firstException != null) {
throw firstException; throw firstException;
@ -95,26 +124,9 @@ public class ParallelUtils {
keys.var[CHUNK_SIZE - count.var] = key; keys.var[CHUNK_SIZE - count.var] = key;
values.var[CHUNK_SIZE - count.var] = value; values.var[CHUNK_SIZE - count.var] = value;
count.var--; count.var--;
if (count.var == 0) { if (count.var == 0) {
count.var = CHUNK_SIZE; sendChunkItems(keys, values, CHUNK_SIZE, count, consumer, parallelExecutor, firstExceptionReference);
Object[] keysCopy = keys.var;
Object[] valuesCopy = values.var;
keys.var = new Object[CHUNK_SIZE];
values.var = new Object[CHUNK_SIZE];
try {
parallelExecutor.execute(() -> {
for (int i = 0; i < CHUNK_SIZE; i++) {
try {
//noinspection unchecked
consumer.accept((K) keysCopy[i], (V) valuesCopy[i]);
} catch (Exception ex) {
firstExceptionReference.compareAndSet(null, new CompletionException(ex));
break;
}
}
});
} catch (RejectedExecutionException e) {
throw new CompletionException(e);
} }
} }
}); });
@ -124,12 +136,53 @@ public class ParallelUtils {
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException("Parallel forEach interrupted", e); throw new RuntimeException("Parallel forEach interrupted", e);
} }
synchronized (arraysAccessLock) {
if (count.var > 0) {
sendChunkItems(keys, values, CHUNK_SIZE, count, consumer, null, firstExceptionReference);
}
}
var firstException = firstExceptionReference.get(); var firstException = firstExceptionReference.get();
if (firstException != null) { if (firstException != null) {
throw firstException; throw firstException;
} }
} }
}
private static <K, V> void sendChunkItems(VariableWrapper<Object[]> keys,
VariableWrapper<Object[]> values,
final int CHUNK_SIZE,
IntWrapper count,
BiConsumer<K, V> consumer,
BlockingOnFullQueueExecutorServiceDecorator parallelExecutor,
AtomicReference<CompletionException> firstExceptionReference) {
int itemsCount = CHUNK_SIZE - count.var;
count.var = CHUNK_SIZE;
Object[] keysCopy = keys.var;
Object[] valuesCopy = values.var;
keys.var = new Object[itemsCount];
values.var = new Object[itemsCount];
try {
Runnable action = () -> {
for (int i = 0; i < itemsCount; i++) {
try {
//noinspection unchecked
consumer.accept((K) keysCopy[i], (V) valuesCopy[i]);
} catch (Exception ex) {
firstExceptionReference.compareAndSet(null, new CompletionException(ex));
break;
}
}
};
if (parallelExecutor != null) {
parallelExecutor.execute(action);
} else {
action.run();
}
} catch (RejectedExecutionException e) {
throw new CompletionException(e);
}
}
public static <K1, K2, V> void parallelize(Consumer<TriConsumer<K1, K2, V>> iterator, public static <K1, K2, V> void parallelize(Consumer<TriConsumer<K1, K2, V>> iterator,
int maxQueueSize, int maxQueueSize,
@ -149,7 +202,9 @@ public class ParallelUtils {
VariableWrapper<Object[]> keys2 = new VariableWrapper<>(new Object[CHUNK_SIZE]); VariableWrapper<Object[]> keys2 = new VariableWrapper<>(new Object[CHUNK_SIZE]);
VariableWrapper<Object[]> values = new VariableWrapper<>(new Object[CHUNK_SIZE]); VariableWrapper<Object[]> values = new VariableWrapper<>(new Object[CHUNK_SIZE]);
AtomicReference<CompletionException> firstExceptionReference = new AtomicReference<>(null); AtomicReference<CompletionException> firstExceptionReference = new AtomicReference<>(null);
final Object arraysAccessLock = new Object();
iterator.accept((key1, key2, value) -> { iterator.accept((key1, key2, value) -> {
synchronized (arraysAccessLock) {
var firstException = firstExceptionReference.get(); var firstException = firstExceptionReference.get();
if (firstException != null) { if (firstException != null) {
throw firstException; throw firstException;
@ -160,26 +215,7 @@ public class ParallelUtils {
values.var[CHUNK_SIZE - count.var] = value; values.var[CHUNK_SIZE - count.var] = value;
count.var--; count.var--;
if (count.var == 0) { if (count.var == 0) {
count.var = CHUNK_SIZE; sendChunkItems(keys1, keys2, values, CHUNK_SIZE, count, consumer, parallelExecutor, firstExceptionReference);
Object[] keys1Copy = keys1.var;
Object[] keys2Copy = keys2.var;
Object[] valuesCopy = values.var;
keys1.var = new Object[CHUNK_SIZE];
keys2.var = new Object[CHUNK_SIZE];
values.var = new Object[CHUNK_SIZE];
try {
parallelExecutor.execute(() -> {
for (int i = 0; i < CHUNK_SIZE; i++) {
try {
//noinspection unchecked
consumer.accept((K1) keys1Copy[i], (K2) keys2Copy[i], (V) valuesCopy[i]);
} catch (Exception ex) {
firstExceptionReference.compareAndSet(null, new CompletionException(ex));
}
}
});
} catch (RejectedExecutionException e) {
throw new CompletionException(e);
} }
} }
}); });
@ -189,10 +225,52 @@ public class ParallelUtils {
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException("Parallel forEach interrupted", e); throw new RuntimeException("Parallel forEach interrupted", e);
} }
synchronized (arraysAccessLock) {
if (count.var > 0) {
sendChunkItems(keys1, keys2, values, CHUNK_SIZE, count, consumer, null, firstExceptionReference);
}
}
var firstException = firstExceptionReference.get(); var firstException = firstExceptionReference.get();
if (firstException != null) { if (firstException != null) {
throw firstException; throw firstException;
} }
} }
private static <K1, K2, V> void sendChunkItems(VariableWrapper<Object[]> keys1,
VariableWrapper<Object[]> keys2,
VariableWrapper<Object[]> values,
int CHUNK_SIZE,
IntWrapper count,
TriConsumer<K1, K2, V> consumer,
BlockingOnFullQueueExecutorServiceDecorator parallelExecutor,
AtomicReference<CompletionException> firstExceptionReference) {
int itemsCount = CHUNK_SIZE - count.var;
count.var = CHUNK_SIZE;
Object[] keys1Copy = keys1.var;
Object[] keys2Copy = keys2.var;
Object[] valuesCopy = values.var;
keys1.var = new Object[itemsCount];
keys2.var = new Object[itemsCount];
values.var = new Object[itemsCount];
try {
Runnable action = () -> {
for (int i = 0; i < itemsCount; i++) {
try {
//noinspection unchecked
consumer.accept((K1) keys1Copy[i], (K2) keys2Copy[i], (V) valuesCopy[i]);
} catch (Exception ex) {
firstExceptionReference.compareAndSet(null, new CompletionException(ex));
}
}
};
if (parallelExecutor != null) {
parallelExecutor.execute(action);
} else {
action.run();
}
} catch (RejectedExecutionException e) {
throw new CompletionException(e);
}
}
} }