diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..d1f3acc --- /dev/null +++ b/pom.xml @@ -0,0 +1,61 @@ + + + jar + 4.0.0 + + common-utils + org.warp + 1.0.0 + + + UTF-8 + + 11 + 11 + + + + + org.junit.jupiter + junit-jupiter-api + RELEASE + test + + + org.hamcrest + hamcrest-core + + + + + org.jetbrains + annotations + 17.0.0 + + + com.google.guava + guava + 28.2-jre + compile + + + it.unimi.dsi + fastutil + 8.3.0 + + + com.googlecode.concurrent-locks + concurrent-locks + 1.0.0 + + + org.apache.commons + commons-lang3 + 3.9 + + + + + \ No newline at end of file diff --git a/src/main/java/org/warp/commonutils/batch/Batching.java b/src/main/java/org/warp/commonutils/batch/Batching.java new file mode 100644 index 0000000..0cb115b --- /dev/null +++ b/src/main/java/org/warp/commonutils/batch/Batching.java @@ -0,0 +1,213 @@ +package org.warp.commonutils.batch; + +import com.google.common.util.concurrent.AtomicDouble; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; + +public abstract class Batching { + + private final int pingRefreshTimeMillis; + private volatile double singleItemTransferTimeMillis; + private volatile double latencyMillis; + private final AtomicBoolean enablePacking = new AtomicBoolean(false); + private final ConcurrentLinkedQueue executors = new ConcurrentLinkedQueue<>(); + private final AtomicBoolean closeRequested = new AtomicBoolean(false); + private final ReentrantLock waitingAccesLock = new ReentrantLock(); + private final ConcurrentLinkedQueue waitingPutItems = new ConcurrentLinkedQueue<>(); + private final AtomicDouble lostTimeMillis = new AtomicDouble(0d); + private final AtomicDouble sentItems = new AtomicDouble(0); + private final double startTimeMillis = ((double) System.nanoTime()) / 1000000d; + + public Batching(int pingRefreshTimeMillis) { + this.pingRefreshTimeMillis = pingRefreshTimeMillis; + refreshPing(); + + if (enablePacking.get()) { + ExecutorService executor = Executors.newFixedThreadPool(2); + this.executors.offer(executor); + executor.execute(this::pingRefreshExecutor); + executor.execute(new BatchSender()); + } + } + + private void pingRefreshExecutor() { + boolean closeReq = false; + while (!(closeReq = closeRequested.get())) { + try { + Thread.sleep(pingRefreshTimeMillis); + } catch (InterruptedException e) { + e.printStackTrace(); + } + refreshPing(); + } + } + + private void refreshPing() { + double pingTime = ping(); + this.latencyMillis = 0.9 * pingTime; + this.singleItemTransferTimeMillis = 0.1 * pingTime; + this.enablePacking.compareAndSet(false, latencyMillis > 0.1d); + } + + public void offer(T action) { + if (enablePacking.get()) { + sentItems.addAndGet(1d); + waitingAccesLock.lock(); + try { + waitingPutItems.offer(action); + } finally { + waitingAccesLock.unlock(); + } + } else { + executeDirect(action); + } + } + + public void offer(Collection actions) { + if (enablePacking.get()) { + sentItems.addAndGet(actions.size()); + waitingAccesLock.lock(); + try { + for (T action : actions) { + waitingPutItems.offer(action); + } + } finally { + waitingAccesLock.unlock(); + } + } else { + executeDirect(actions); + } + } + + public void offer(T... actions) { + offer(List.of(actions)); + } + + protected abstract void executeBatch(Collection actions); + + protected void executeBatch(T action) { + executeBatch(List.of(action)); + } + + protected abstract void executeDirect(T action); + + protected abstract void executeDirect(Collection action); + + protected abstract double ping(); + + public abstract void close(); + + + private static final double getItemSendLongestTime(double lostTime, double latencyMillis, double waitingSize, + double singleItemTransferTimeMillis) { + return lostTime + latencyMillis + waitingSize * singleItemTransferTimeMillis; + } + + private static final double getItemSendLongestTimeNext(double lostTime, double latencyMillis, double waitTime, + double waitingSize, double singleItemTransferTimeMillis, double itemsPerMillisecondIdeal) { + return lostTime + latencyMillis + waitTime + (waitingSize + + (waitTime * itemsPerMillisecondIdeal) * singleItemTransferTimeMillis); + } + + private static final double getItemsPerSecond(double waitingSize, double itemSendLongestTime) { + return waitingSize / notZero(itemSendLongestTime); + } + + private static final double getAverageItemTime(double waitingSize, double itemSendLongestTime) { + return itemSendLongestTime / notZero(waitingSize); + } + + private static final double getNextItemsPerSecond(double waitingSize, double nextItemSendLongestTime, double waitTime, + double itemsPerMillisecondIdeal) { + return (waitingSize + (waitTime * itemsPerMillisecondIdeal)) / notZero(nextItemSendLongestTime); + } + + private static final double getNextAverageItemTime(double waitingSize, double nextItemSendLongestTime, + double waitTime, double itemsPerMillisecondIdeal) { + return nextItemSendLongestTime / notZero((waitingSize + (waitTime * itemsPerMillisecondIdeal))); + } + + private static final double notZero(double input) { + if (input != 0) { + return input; + } else { + return input + 0.000000000000000000001d; + } + } + + private class BatchSender implements Runnable { + + @Override + public void run() { + boolean closeReq; + while ((!(closeReq = closeRequested.get())) || !waitingPutItems.isEmpty()) { + double waitTimeMillis = latencyMillis; + long waitTimeNanoMillis = (long) Math.floor(latencyMillis); + int waitTimeNanos = (int) ((waitTimeMillis - ((double) waitTimeNanoMillis)) * 1000000d); + try { + if (!closeReq) { + Thread.sleep(waitTimeNanoMillis, waitTimeNanos); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + waitingAccesLock.lock(); + try { + if (!waitingPutItems.isEmpty()) { + int waitingSize = waitingPutItems.size(); + double lostTime = lostTimeMillis.addAndGet(waitTimeMillis); // Get the lost time as the time + // in the middle + double idealItemsPerMillis = + sentItems.get() / notZero(((double) System.nanoTime()) / 1000000d - startTimeMillis); + double idealMillisPerItem = 1d / notZero(idealItemsPerMillis); + double itemSendLongestTime = getItemSendLongestTime(lostTime, latencyMillis, waitingSize, + singleItemTransferTimeMillis); + double itemsPerSecond = getItemsPerSecond(waitingSize, itemSendLongestTime); + double averageItemTime = getAverageItemTime(waitingSize, itemSendLongestTime); + double nextItemSendLongestTime = getItemSendLongestTimeNext(lostTime, latencyMillis, waitTimeMillis, + waitingSize, singleItemTransferTimeMillis, idealItemsPerMillis); + double nextItemsPerSecond = getNextItemsPerSecond(waitingSize, nextItemSendLongestTime, waitTimeMillis, + idealItemsPerMillis); + double nextAverageItemTime = getNextAverageItemTime(waitingSize, itemSendLongestTime, waitTimeMillis, + idealItemsPerMillis); + boolean do1 = idealMillisPerItem > latencyMillis; + boolean do2 = itemsPerSecond > nextItemsPerSecond; + boolean do3 = averageItemTime - nextAverageItemTime < latencyMillis; + boolean do4 = averageItemTime > 5; + boolean doThisTurn = do1 | do2 | do3 | do4 || closeReq; + + if (doThisTurn) { + lostTimeMillis.set(0); + if (waitingSize > 1) { + executeBatch(waitingPutItems); + } else { + T pair = waitingPutItems.poll(); + executeBatch(pair); + } + if ((System.nanoTime() % 100) < 1) { + System.out.printf("LATENCY=%.2f; WAITED=%.2f; PACKET_SIZE=%.2f; AVG_ITEM_TIME=%.2f; " + + "NEXT_AVG_ITEM_TIME=%.2f; DO=%s,%s,%s\n", latencyMillis, lostTime, (double) waitingSize, + averageItemTime, nextAverageItemTime, "" + do1, "" + do2, "" + do3); + System.out.printf("idealMillisPerItem=%.2f; itemsPerSecond=%.2f; nextItemsPerSecond=%" + + ".2f; averageItemTime-nextAverageItemTime=%.2f\n", idealItemsPerMillis, itemsPerSecond, + nextItemsPerSecond, averageItemTime - nextAverageItemTime); + } + waitingPutItems.clear(); + } else { + if ((System.nanoTime() % 100) < 1) { + System.out.println("SKIPPED TURN"); + } + } + } + } finally { + waitingAccesLock.unlock(); + } + } + } + } +} diff --git a/src/main/java/org/warp/commonutils/batch/KVSafeBatching.java b/src/main/java/org/warp/commonutils/batch/KVSafeBatching.java new file mode 100644 index 0000000..76a3034 --- /dev/null +++ b/src/main/java/org/warp/commonutils/batch/KVSafeBatching.java @@ -0,0 +1,80 @@ +package org.warp.commonutils.batch; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import org.apache.commons.lang3.tuple.Pair; + +public abstract class KVSafeBatching extends Batching> { + + public KVSafeBatching(int pingRefreshTimeMillis) { + super(pingRefreshTimeMillis); + } + + @Deprecated + @Override + public void offer(Pair... actions) { + offer(List.of(actions)); + } + + @Deprecated + @Override + public void offer(Collection> actions) { + Object[] keys = new Object[actions.size()]; + Object[] values = new Object[actions.size()]; + int i = 0; + for (Pair action : actions) { + keys[i] = action.getKey(); + values[i] = action.getValue(); + i++; + } + offer_(keys, values); + } + + public void offer(T key, U value) { + this.offer_(key, value); + } + + public void offer(T[] keys, U[] values) { + if (keys.length == 1 && values.length == 1) { + this.offer_(keys[0], values[0]); + } else { + this.offer_(keys, values); + } + } + + private void offer_(T key, U value) { + super.offer(Pair.of(key, value)); + } + + private void offer_(Object[] keys, Object[] values) { + if (keys.length != values.length) { + throw new IllegalArgumentException("Keys and values count must be the same."); + } + List> pairs = new ArrayList<>(keys.length); + for (int i = 0; i < keys.length; i++) { + pairs.add(Pair.of((T) keys[i], (U) values[i])); + } + super.offer(pairs); + } + + @Override + protected void executeBatch(Collection> actions) { + + } + + @Override + protected void executeDirect(Pair action) { + + } + + @Override + protected void executeDirect(Collection> action) { + + } + + @Override + public void close() { + + } +} diff --git a/src/main/java/org/warp/commonutils/batch/ParallelUtils.java b/src/main/java/org/warp/commonutils/batch/ParallelUtils.java new file mode 100644 index 0000000..033e022 --- /dev/null +++ b/src/main/java/org/warp/commonutils/batch/ParallelUtils.java @@ -0,0 +1,98 @@ +package org.warp.commonutils.batch; + +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import org.warp.commonutils.concurrency.executor.BoundedExecutorService; +import org.warp.commonutils.functional.TriConsumer; +import org.warp.commonutils.type.IntWrapper; +import org.warp.commonutils.type.ShortNamedThreadFactory; +import org.warp.commonutils.type.VariableWrapper; + +public class ParallelUtils { + + public static void parallelize(Consumer> iterator, + int maxQueueSize, + int parallelism, + int groupSize, + BiConsumer consumer) { + BoundedExecutorService parallelExecutor = BoundedExecutorService.create(maxQueueSize, parallelism, parallelism * 2, 0, TimeUnit.MILLISECONDS, new ShortNamedThreadFactory("ForEachParallel"), (a, b) -> {}); + final int CHUNK_SIZE = groupSize; + IntWrapper count = new IntWrapper(CHUNK_SIZE); + VariableWrapper keys = new VariableWrapper<>(new Object[CHUNK_SIZE]); + VariableWrapper values = new VariableWrapper<>(new Object[CHUNK_SIZE]); + iterator.accept((key, value) -> { + keys.var[CHUNK_SIZE - count.var] = key; + values.var[CHUNK_SIZE - count.var] = value; + count.var--; + if (count.var == 0) { + count.var = CHUNK_SIZE; + Object[] keysCopy = keys.var; + Object[] valuesCopy = values.var; + keys.var = new Object[CHUNK_SIZE]; + values.var = new Object[CHUNK_SIZE]; + try { + parallelExecutor.executeButBlockIfFull(() -> { + for (int i = 0; i < CHUNK_SIZE; i++) { + //noinspection unchecked + consumer.accept((K) keysCopy[i], (V) valuesCopy[i]); + } + }); + } catch (InterruptedException e) { + throw new CompletionException(e); + } + } + }); + parallelExecutor.shutdown(); + try { + parallelExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS); + } catch (InterruptedException e) { + throw new RuntimeException("Parallel forEach interrupted", e); + } + } + + public static void parallelize(Consumer> iterator, + int maxQueueSize, + int parallelism, + int groupSize, + TriConsumer consumer) { + BoundedExecutorService parallelExecutor = BoundedExecutorService.create(maxQueueSize, parallelism, parallelism * 2, 0, TimeUnit.MILLISECONDS, new ShortNamedThreadFactory("ForEachParallel"), (a, b) -> {}); + final int CHUNK_SIZE = groupSize; + IntWrapper count = new IntWrapper(CHUNK_SIZE); + VariableWrapper keys1 = new VariableWrapper<>(new Object[CHUNK_SIZE]); + VariableWrapper keys2 = new VariableWrapper<>(new Object[CHUNK_SIZE]); + VariableWrapper values = new VariableWrapper<>(new Object[CHUNK_SIZE]); + iterator.accept((key1, key2, value) -> { + keys1.var[CHUNK_SIZE - count.var] = key1; + keys2.var[CHUNK_SIZE - count.var] = key2; + values.var[CHUNK_SIZE - count.var] = value; + count.var--; + if (count.var == 0) { + count.var = CHUNK_SIZE; + Object[] keys1Copy = keys1.var; + Object[] keys2Copy = keys2.var; + Object[] valuesCopy = values.var; + keys1.var = new Object[CHUNK_SIZE]; + keys2.var = new Object[CHUNK_SIZE]; + values.var = new Object[CHUNK_SIZE]; + try { + parallelExecutor.executeButBlockIfFull(() -> { + for (int i = 0; i < CHUNK_SIZE; i++) { + //noinspection unchecked + consumer.accept((K1) keys1Copy[i], (K2) keys2Copy[i], (V) valuesCopy[i]); + } + }); + } catch (InterruptedException e) { + throw new CompletionException(e); + } + } + }); + parallelExecutor.shutdown(); + try { + parallelExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS); + } catch (InterruptedException e) { + throw new RuntimeException("Parallel forEach interrupted", e); + } + } +} diff --git a/src/main/java/org/warp/commonutils/concurrency/atomicity/Atomic.java b/src/main/java/org/warp/commonutils/concurrency/atomicity/Atomic.java new file mode 100644 index 0000000..47f1189 --- /dev/null +++ b/src/main/java/org/warp/commonutils/concurrency/atomicity/Atomic.java @@ -0,0 +1,13 @@ +package org.warp.commonutils.concurrency.atomicity; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * This element can be considered atomic + */ +@Retention(RetentionPolicy.SOURCE) +@Target({ElementType.FIELD, ElementType.METHOD, ElementType.TYPE}) +public @interface Atomic {} diff --git a/src/main/java/org/warp/commonutils/concurrency/atomicity/NotAtomic.java b/src/main/java/org/warp/commonutils/concurrency/atomicity/NotAtomic.java new file mode 100644 index 0000000..ffc77db --- /dev/null +++ b/src/main/java/org/warp/commonutils/concurrency/atomicity/NotAtomic.java @@ -0,0 +1,13 @@ +package org.warp.commonutils.concurrency.atomicity; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * This element cannot be considered atomic + */ +@Retention(RetentionPolicy.SOURCE) +@Target({ElementType.FIELD, ElementType.METHOD, ElementType.TYPE}) +public @interface NotAtomic {} diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorService.java b/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorService.java new file mode 100644 index 0000000..80fcdf6 --- /dev/null +++ b/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorService.java @@ -0,0 +1,37 @@ +package org.warp.commonutils.concurrency.executor; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import org.jetbrains.annotations.Nullable; + +public interface BoundedExecutorService extends ExecutorService { + + @Deprecated + static BoundedExecutorService create(int maxQueueSize, + int corePoolSize, + int maxPoolSize, + long keepAliveTime, + TimeUnit unit, + @Nullable BiConsumer queueSizeStatus) { + return new BoundedExecutorServiceImpl(maxQueueSize, corePoolSize, maxPoolSize, keepAliveTime, unit, + Executors.defaultThreadFactory(), queueSizeStatus); + } + static BoundedExecutorService create(int maxQueueSize, + int corePoolSize, + int maxPoolSize, + long keepAliveTime, + TimeUnit unit, + ThreadFactory threadFactory, + @Nullable BiConsumer queueSizeStatus) { + return new BoundedExecutorServiceImpl(maxQueueSize, corePoolSize, maxPoolSize, keepAliveTime, unit, threadFactory, queueSizeStatus); + } + + Future submitButBlockIfFull(Callable task) throws InterruptedException; + + void executeButBlockIfFull(Runnable task) throws InterruptedException; +} diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorServiceImpl.java b/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorServiceImpl.java new file mode 100644 index 0000000..ba5447c --- /dev/null +++ b/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorServiceImpl.java @@ -0,0 +1,80 @@ +package org.warp.commonutils.concurrency.executor; + +import org.jetbrains.annotations.Nullable; + +import java.util.concurrent.*; +import java.util.function.BiConsumer; + +class BoundedExecutorServiceImpl extends ThreadPoolExecutor implements BoundedExecutorService { + + private final Semaphore semaphore; + private final @Nullable BiConsumer queueSizeStatus; + private final int maxQueueSize; + private final Object queueSizeStatusLock = new Object(); + + /** + * + * @param maxQueueSize + * @param corePoolSize + * @param maxPoolSize + * @param keepAliveTime + * @param unit + * @param queueSizeStatus Status. The boolean indicates if the queue is full, the integer indicates the current queue size + */ + public BoundedExecutorServiceImpl(int maxQueueSize, + int corePoolSize, + int maxPoolSize, + long keepAliveTime, + TimeUnit unit, + ThreadFactory threadFactory, + @Nullable BiConsumer queueSizeStatus) { + super(corePoolSize, maxPoolSize, keepAliveTime, unit, new LinkedBlockingQueue<>(), threadFactory); + if (maxQueueSize < 0) { + throw new IllegalArgumentException(); + } + this.maxQueueSize = maxQueueSize; + this.queueSizeStatus = queueSizeStatus; + semaphore = new Semaphore(maxQueueSize); + } + + /** + * Submits task to execution pool, but blocks while number of running threads + * has reached the bound limit + */ + @Override + public Future submitButBlockIfFull(final Callable task) throws InterruptedException { + blockIfFull(); + return submit(task); + } + + /** + * Submits task to execution pool, but blocks while number of running threads + * has reached the bound limit + */ + @Override + public void executeButBlockIfFull(final Runnable task) throws InterruptedException { + blockIfFull(); + execute(task); + } + + private void blockIfFull() throws InterruptedException { + if (semaphore.availablePermits() == 0) { + synchronized (queueSizeStatusLock) { + if (queueSizeStatus != null) queueSizeStatus.accept(true, maxQueueSize + (semaphore.hasQueuedThreads() ? semaphore.getQueueLength() : 0)); + } + } + semaphore.acquire(); + } + + @Override + public void beforeExecute(Thread t, Runnable r) { + var queueSize = getQueue().size(); + synchronized (queueSizeStatusLock) { + if (queueSizeStatus != null) queueSizeStatus.accept(queueSize >= maxQueueSize, queueSize); + } + + semaphore.release(); + + super.beforeExecute(t, r); + } +} \ No newline at end of file diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/ConcurrencySegment.java b/src/main/java/org/warp/commonutils/concurrency/executor/ConcurrencySegment.java new file mode 100644 index 0000000..6ef35a9 --- /dev/null +++ b/src/main/java/org/warp/commonutils/concurrency/executor/ConcurrencySegment.java @@ -0,0 +1,40 @@ +package org.warp.commonutils.concurrency.executor; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; + +final class ConcurrencySegment { + + private final Map store = new HashMap(); + private final Supplier valuesSupplier; + + ConcurrencySegment(Supplier valuesSupplier) { + this.valuesSupplier = valuesSupplier; + } + + synchronized V getValue(K key) { + Entry current = store.get(key); + if (current == null) { + current = new Entry(); + store.put(key, current); + } else { + current.users++; + } + return current.value; + } + + synchronized void releaseKey(K key) { + Entry current = store.get(key); + if (current.users == 1) { + store.remove(key); + } else { + current.users--; + } + } + + private class Entry { + private int users = 1; + private V value = valuesSupplier.get(); + } +} \ No newline at end of file diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/PerKeyReadWriteExecutor.java b/src/main/java/org/warp/commonutils/concurrency/executor/PerKeyReadWriteExecutor.java new file mode 100644 index 0000000..bd6d802 --- /dev/null +++ b/src/main/java/org/warp/commonutils/concurrency/executor/PerKeyReadWriteExecutor.java @@ -0,0 +1,108 @@ + +package org.warp.commonutils.concurrency.executor; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.locks.Lock; +import java.util.function.Supplier; +import org.warp.commonutils.functional.IORunnable; +import org.warp.commonutils.functional.IOSupplier; +import org.warp.commonutils.random.HashUtil; + +/** + * An Executor which executes tasks on the caller thread. + * The tasks will be executed synchronously on a per-key basis. + * By saying per-key, we mean that thread safety is guaranteed for threads calling it with equals keys. + * When two threads calling the executor with equals keys, the executions will never overlap each other. + * On the other hand, the executor is implemented so calls from different threads, with keys that are not equals, will be executed concurrently with minimal contention between the calls. + * Calling threads might be suspended. + * Calling execute from different threads with equals keys has the same memory semantics as locking and releasing a java.util.concurrent.locks.{@link Lock}. + */ +public final class PerKeyReadWriteExecutor extends ReadWriteExecutor implements Closeable { + + private static final int BASE_CONCURRENCY_LEVEL = 32; + + private final int concurrencyLevel; + + private final ConcurrencySegment[] segments; + + private boolean closed = false; + + public PerKeyReadWriteExecutor() { + this(BASE_CONCURRENCY_LEVEL); + } + + @SuppressWarnings({"unchecked"}) + public PerKeyReadWriteExecutor(int concurrencyLevel) { + super(); + this.concurrencyLevel = concurrencyLevel; + segments = (ConcurrencySegment[]) new ConcurrencySegment[concurrencyLevel]; + for (int i = 0; i < concurrencyLevel; i++) { + segments[i] = new ConcurrencySegment<>(ReadWriteExecutor::new); + } + } + + public void execute(KEY_TYPE key, ReadWriteExecutor.LockMode lockMode, Runnable task) { + super.execute(LockMode.READ, () -> { + if (closed) throw new IllegalStateException(PerKeyReadWriteExecutor.class.getSimpleName() + " is closed"); + int segmentIndex = HashUtil.boundedHash(key, concurrencyLevel); + ConcurrencySegment s = segments[segmentIndex]; + ReadWriteExecutor executor = s.getValue(key); + try { + executor.execute(lockMode, task); + } finally { + s.releaseKey(key); + } + }); + } + + public void executeIO(KEY_TYPE key, ReadWriteExecutor.LockMode lockMode, IORunnable task) throws IOException { + super.executeIO(LockMode.READ, () -> { + if (closed) throw new IllegalStateException(PerKeyReadWriteExecutor.class.getSimpleName() + " is closed"); + int segmentIndex = HashUtil.boundedHash(key, concurrencyLevel); + ConcurrencySegment s = segments[segmentIndex]; + ReadWriteExecutor executor = s.getValue(key); + try { + executor.executeIO(lockMode, task); + } finally { + s.releaseKey(key); + } + }); + } + + public R execute(KEY_TYPE key, ReadWriteExecutor.LockMode lockMode, Supplier task) { + return super.execute(LockMode.READ, () -> { + if (closed) throw new IllegalStateException(PerKeyReadWriteExecutor.class.getSimpleName() + " is closed"); + int segmentIndex = HashUtil.boundedHash(key, concurrencyLevel); + ConcurrencySegment s = segments[segmentIndex]; + ReadWriteExecutor executor = s.getValue(key); + try { + return executor.execute(lockMode, task); + } finally { + s.releaseKey(key); + } + }); + } + + public R executeIO(KEY_TYPE key, ReadWriteExecutor.LockMode lockMode, IOSupplier task) throws IOException { + return super.executeIO(LockMode.READ, () -> { + if (closed) + throw new IllegalStateException(PerKeyReadWriteExecutor.class.getSimpleName() + " is closed"); + int segmentIndex = HashUtil.boundedHash(key, concurrencyLevel); + ConcurrencySegment s = segments[segmentIndex]; + ReadWriteExecutor executor = s.getValue(key); + try { + return executor.executeIO(lockMode, task); + } finally { + s.releaseKey(key); + } + }); + } + + @Override + public void close() { + super.execute(LockMode.WRITE, () -> { + closed = true; + }); + } +} \ No newline at end of file diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/PerKeySynchronizedExecutor.java b/src/main/java/org/warp/commonutils/concurrency/executor/PerKeySynchronizedExecutor.java new file mode 100644 index 0000000..f092c22 --- /dev/null +++ b/src/main/java/org/warp/commonutils/concurrency/executor/PerKeySynchronizedExecutor.java @@ -0,0 +1,106 @@ + +package org.warp.commonutils.concurrency.executor; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.locks.Lock; +import java.util.function.Supplier; +import org.warp.commonutils.functional.IORunnable; +import org.warp.commonutils.functional.IOSupplier; +import org.warp.commonutils.random.HashUtil; + +/** + * An Executor which executes tasks on the caller thread. + * The tasks will be executed synchronously on a per-key basis. + * By saying per-key, we mean that thread safety is guaranteed for threads calling it with equals keys. + * When two threads calling the executor with equals keys, the executions will never overlap each other. + * On the other hand, the executor is implemented so calls from different threads, with keys that are not equals, will be executed concurrently with minimal contention between the calls. + * Calling threads might be suspended. + * Calling execute from different threads with equals keys has the same memory semantics as locking and releasing a java.util.concurrent.locks.{@link Lock}. + */ +public final class PerKeySynchronizedExecutor extends ReadWriteExecutor implements Closeable { + + private static final int BASE_CONCURRENCY_LEVEL = 32; + + private final int concurrencyLevel; + + private final ConcurrencySegment[] segments; + + private boolean closed = false; + + public PerKeySynchronizedExecutor() { + this(BASE_CONCURRENCY_LEVEL); + } + + @SuppressWarnings({"unchecked"}) + public PerKeySynchronizedExecutor(int concurrencyLevel) { + this.concurrencyLevel = concurrencyLevel; + segments = (ConcurrencySegment[]) new ConcurrencySegment[concurrencyLevel]; + for (int i = 0; i < concurrencyLevel; i++) { + segments[i] = new ConcurrencySegment<>(SynchronizedExecutor::new); + } + } + + public void execute(KEY_TYPE key, Runnable task) { + super.execute(LockMode.READ, () -> { + if (closed) throw new IllegalStateException(PerKeySynchronizedExecutor.class.getSimpleName() + " is closed"); + int segmentIndex = HashUtil.boundedHash(key, concurrencyLevel); + ConcurrencySegment s = segments[segmentIndex]; + SynchronizedExecutor executor = s.getValue(key); + try { + executor.execute(task); + } finally { + s.releaseKey(key); + } + }); + } + + public void executeIO(KEY_TYPE key, IORunnable task) throws IOException { + super.executeIO(LockMode.READ, () -> { + if (closed) throw new IllegalStateException(PerKeySynchronizedExecutor.class.getSimpleName() + " is closed"); + int segmentIndex = HashUtil.boundedHash(key, concurrencyLevel); + ConcurrencySegment s = segments[segmentIndex]; + SynchronizedExecutor executor = s.getValue(key); + try { + executor.executeIO(task); + } finally { + s.releaseKey(key); + } + }); + } + + public R execute(KEY_TYPE key, Supplier task) { + return super.execute(LockMode.READ, () -> { + if (closed) throw new IllegalStateException(PerKeySynchronizedExecutor.class.getSimpleName() + " is closed"); + int segmentIndex = HashUtil.boundedHash(key, concurrencyLevel); + ConcurrencySegment s = segments[segmentIndex]; + SynchronizedExecutor executor = s.getValue(key); + try { + return executor.execute(task); + } finally { + s.releaseKey(key); + } + }); + } + + public R executeIO(KEY_TYPE key, IOSupplier task) throws IOException { + return super.executeIO(LockMode.READ, () -> { + if (closed) throw new IllegalStateException(PerKeySynchronizedExecutor.class.getSimpleName() + " is closed"); + int segmentIndex = HashUtil.boundedHash(key, concurrencyLevel); + ConcurrencySegment s = segments[segmentIndex]; + SynchronizedExecutor executor = s.getValue(key); + try { + return executor.executeIO(task); + } finally { + s.releaseKey(key); + } + }); + } + + @Override + public void close() { + super.execute(LockMode.WRITE, () -> { + closed = true; + }); + } +} \ No newline at end of file diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/ReadWriteExecutor.java b/src/main/java/org/warp/commonutils/concurrency/executor/ReadWriteExecutor.java new file mode 100644 index 0000000..088b702 --- /dev/null +++ b/src/main/java/org/warp/commonutils/concurrency/executor/ReadWriteExecutor.java @@ -0,0 +1,45 @@ +package org.warp.commonutils.concurrency.executor; + +import java.io.IOException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; +import org.warp.commonutils.functional.IORunnable; +import org.warp.commonutils.functional.IOSupplier; +import org.warp.commonutils.locks.LockUtils; + +/** + * An Executor which executes tasks on the caller thread. + * The tasks will be executed synchronously, so no overlapping between two tasks running on different threads will ever occur. + * Calling threads might be suspended. + * Executing a task has the same memory semantics as locking and releasing a java.util.concurrent.locks.{@link Lock}. + */ +public class ReadWriteExecutor { + + private final ReentrantReadWriteLock lock; + + public ReadWriteExecutor() { + this.lock = new ReentrantReadWriteLock(); + } + + public void execute(LockMode lockMode, Runnable task) { + LockUtils.lock(lockMode == LockMode.READ ? lock.readLock() : lock.writeLock(), task); + } + + public void executeIO(LockMode lockMode, IORunnable task) throws IOException { + LockUtils.lockIO(lockMode == LockMode.READ ? lock.readLock() : lock.writeLock(), task); + } + + public R execute(LockMode lockMode, Supplier task) { + return LockUtils.lock(lockMode == LockMode.READ ? lock.readLock() : lock.writeLock(), task); + } + + public R executeIO(LockMode lockMode, IOSupplier task) throws IOException { + return LockUtils.lockIO(lockMode == LockMode.READ ? lock.readLock() : lock.writeLock(), task); + } + + public enum LockMode { + READ, + WRITE + } +} \ No newline at end of file diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/SynchronizedExecutor.java b/src/main/java/org/warp/commonutils/concurrency/executor/SynchronizedExecutor.java new file mode 100644 index 0000000..1932459 --- /dev/null +++ b/src/main/java/org/warp/commonutils/concurrency/executor/SynchronizedExecutor.java @@ -0,0 +1,63 @@ +package org.warp.commonutils.concurrency.executor; + +import java.io.IOException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; +import org.warp.commonutils.functional.IORunnable; +import org.warp.commonutils.functional.IOSupplier; + +/** + * An Executor which executes tasks on the caller thread. + * The tasks will be executed synchronously, so no overlapping between two tasks running on different threads will ever occur. + * Calling threads might be suspended. + * Executing a task has the same memory semantics as locking and releasing a java.util.concurrent.locks.{@link Lock}. + */ +public final class SynchronizedExecutor { + + private final Lock lock; + + public SynchronizedExecutor() { + this.lock = new ReentrantLock(); + } + + SynchronizedExecutor(Lock lock) { + this.lock = lock; + } + + public void execute(Runnable task) { + lock.lock(); + try { + task.run(); + } finally { + lock.unlock(); + } + } + + public void executeIO(IORunnable task) throws IOException { + lock.lock(); + try { + task.run(); + } finally { + lock.unlock(); + } + } + + public R execute(Supplier task) { + lock.lock(); + try { + return task.get(); + } finally { + lock.unlock(); + } + } + + public R executeIO(IOSupplier task) throws IOException { + lock.lock(); + try { + return task.get(); + } finally { + lock.unlock(); + } + } +} \ No newline at end of file diff --git a/src/main/java/org/warp/commonutils/concurrency/future/CompletableFutureUtils.java b/src/main/java/org/warp/commonutils/concurrency/future/CompletableFutureUtils.java new file mode 100644 index 0000000..eb4db36 --- /dev/null +++ b/src/main/java/org/warp/commonutils/concurrency/future/CompletableFutureUtils.java @@ -0,0 +1,148 @@ +package org.warp.commonutils.concurrency.future; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +public class CompletableFutureUtils { + + /** + * Aggregate multiple {@link CompletableFuture} lists into a single {@link CompletableFuture} list + * + * @param futureLists A collection of {@link CompletableFuture} lists. + * @param List elements type + * @return {@link CompletableFuture} list + */ + public static CompletableFuture> aggregate(Collection>> futureLists) { + final CompletableFuture> identityAggregatedResult = CompletableFuture.completedFuture(new ArrayList()); + + return futureLists.parallelStream().reduce(identityAggregatedResult, (currentAggregatedResult, futureList) -> { + return currentAggregatedResult.thenApplyAsync((aggregatedList) -> { + aggregatedList.addAll(futureList.join()); + return aggregatedList; + }); + }); + } + + /** + * Creates a new empty collection of disaggregated future results future lists + */ + public static Collection>>> createDisaggregatedResultsList() { + return new ArrayList<>(10); + } + + /** + * Add a + * @param disaggregatedResults + * @param result + * @param + */ + public static void addDisaggregated( + Collection>>> disaggregatedResults, + CompletableFuture>> result) { + disaggregatedResults.add(result); + } + + /** + * Add a result + */ + public static void addDisaggregatedCast( + Collection>>> disaggregatedResults, + CompletableFuture>> result) { + addDisaggregatedCastForced(disaggregatedResults, result); + } + + public static void addDisaggregatedCastForced( + Collection>>> disaggregatedResults, + CompletableFuture>> result) { + disaggregatedResults.add(result.thenApply((originalList) -> { + List> resultList = new ArrayList<>(); + for (CompletableFuture originalFuture : originalList) { + resultList.add(originalFuture.thenApply((originalValue) -> { + //noinspection unchecked + return (T) originalValue; + })); + } + return resultList; + })); + } + + public static Set collectToSet(CompletableFuture>> futureList) { + return futureList.join().parallelStream().map(CompletableFuture::join).collect(Collectors.toSet()); + } + + public static Set collectToSet(CompletableFuture>> futureList, int limit) { + return futureList.join().parallelStream().map(CompletableFuture::join).limit(10).collect(Collectors.toSet()); + } + + public static List collectToList(CompletableFuture>> futureList) { + return futureList.join().stream().map(CompletableFuture::join).collect(Collectors.toList()); + } + + public static List collectToList(CompletableFuture>> futureList, int limit) { + return futureList.join().stream().map(CompletableFuture::join).limit(limit).collect(Collectors.toList()); + } + + public static LinkedHashSet collectToLinkedSet(CompletableFuture>> futureList) { + return futureList.join().stream().map(CompletableFuture::join).collect(Collectors.toCollection(LinkedHashSet::new)); + } + + public static LinkedHashSet collectToLinkedSet(CompletableFuture>> futureList, + int limit) { + return futureList.join().stream().map(CompletableFuture::join).limit(limit) + .collect(Collectors.toCollection(LinkedHashSet::new)); + } + + public static TreeSet collectToTreeSet(CompletableFuture>> futureList) { + return futureList.join().stream().map(CompletableFuture::join).collect(Collectors.toCollection(TreeSet::new)); + } + + public static TreeSet collectToTreeSet(CompletableFuture>> futureList, int limit) { + return futureList.join().stream().map(CompletableFuture::join).limit(limit) + .collect(Collectors.toCollection(TreeSet::new)); + } + + public static TreeSet collectToTreeSet(CompletableFuture>> futureList, Comparator comparator) { + return futureList.join().stream().map(CompletableFuture::join).collect(Collectors.toCollection(() -> new TreeSet<>(comparator))); + } + + public static TreeSet collectToTreeSet(CompletableFuture>> futureList, Comparator comparator, int limit) { + return futureList.join().stream().map(CompletableFuture::join).limit(limit) + .collect(Collectors.toCollection(() -> new TreeSet<>(comparator))); + } + + public static Optional anyOrNull(CompletableFuture>> futureList) { + return futureList.join().parallelStream().map(CompletableFuture::join).findAny(); + } + + public static Optional firstOrNull(CompletableFuture>> futureList) { + return futureList.join().stream().map(CompletableFuture::join).findFirst(); + } + + public static void forEachOrdered(CompletableFuture>> futureList, + Consumer consumer) { + forEachOrdered(futureList, consumer, false); + } + + public static void forEachOrdered(CompletableFuture>> futureList, + Consumer consumer, boolean reverse) { + var futures = futureList.join(); + if (reverse) { + Collections.reverse(futures); + } + futures.stream().map(CompletableFuture::join).forEachOrdered(consumer); + } + + public static void forEach(CompletableFuture>> futureList, Consumer consumer) { + futureList.join().parallelStream().map(CompletableFuture::join).forEach(consumer); + } +} diff --git a/src/main/java/org/warp/commonutils/concurrency/future/FutureUtils.java b/src/main/java/org/warp/commonutils/concurrency/future/FutureUtils.java new file mode 100644 index 0000000..33e7f25 --- /dev/null +++ b/src/main/java/org/warp/commonutils/concurrency/future/FutureUtils.java @@ -0,0 +1,23 @@ +package org.warp.commonutils.concurrency.future; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +public class FutureUtils { + + /** + * Waits for *all* futures to complete and returns a list of results. If *any* future completes exceptionally then the + * resulting future will also complete exceptionally. + * + * @param futures + * @param + * @return + */ + public static CompletableFuture> all(List> futures) { + CompletableFuture[] cfs = futures.toArray(CompletableFuture[]::new); + + return CompletableFuture.allOf(cfs) + .thenApply(ignored -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList())); + } +} diff --git a/src/main/java/org/warp/commonutils/concurrency/future/SizedFutureList.java b/src/main/java/org/warp/commonutils/concurrency/future/SizedFutureList.java new file mode 100644 index 0000000..716d73a --- /dev/null +++ b/src/main/java/org/warp/commonutils/concurrency/future/SizedFutureList.java @@ -0,0 +1,50 @@ +package org.warp.commonutils.concurrency.future; + +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; + +public class SizedFutureList { + + private final CompletableFuture>> data; + private final CompletableFuture size; + + public SizedFutureList(CompletableFuture>> data, CompletableFuture size) { + this.data = data; + this.size = size; + } + + public static SizedFutureList empty() { + return new SizedFutureList<>(CompletableFuture.completedFuture(List.of()), CompletableFuture.completedFuture(0)); + } + + public CompletableFuture>> getData() { + return data; + } + + public CompletableFuture getSize() { + return size; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SizedFutureList that = (SizedFutureList) o; + return Objects.equals(data, that.data) && Objects.equals(size, that.size); + } + + @Override + public int hashCode() { + return Objects.hash(data, size); + } + + @Override + public String toString() { + return "SizedFutureList{" + "data=" + data + ", size=" + size + '}'; + } +} diff --git a/src/main/java/org/warp/commonutils/concurrency/future/SizedFutureSet.java b/src/main/java/org/warp/commonutils/concurrency/future/SizedFutureSet.java new file mode 100644 index 0000000..9083ad0 --- /dev/null +++ b/src/main/java/org/warp/commonutils/concurrency/future/SizedFutureSet.java @@ -0,0 +1,65 @@ +package org.warp.commonutils.concurrency.future; + +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +public class SizedFutureSet { + + private final CompletableFuture>> data; + private final CompletableFuture size; + + public SizedFutureSet(CompletableFuture>> data, CompletableFuture size) { + this.data = data; + this.size = size; + } + + public static SizedFutureSet empty() { + return new SizedFutureSet<>(CompletableFuture.completedFuture(List.of()), CompletableFuture.completedFuture(0)); + } + + public CompletableFuture>> getFutureDataOrdered() { + return data.thenApply(LinkedHashSet::new); + } + + public CompletableFuture>> getFutureDataUnordered() { + return data.thenApply(HashSet::new); + } + + public LinkedHashSet getDataOrdered() { + return CompletableFutureUtils.collectToLinkedSet(data); + } + + public Set getDataUnordered() { + return CompletableFutureUtils.collectToSet(data); + } + + public CompletableFuture getSize() { + return size; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SizedFutureSet that = (SizedFutureSet) o; + return Objects.equals(data, that.data) && Objects.equals(size, that.size); + } + + @Override + public int hashCode() { + return Objects.hash(data, size); + } + + @Override + public String toString() { + return "SizedFutureList{" + "data=" + data + ", size=" + size + '}'; + } +} diff --git a/src/main/java/org/warp/commonutils/error/IndexOutOfBoundsException.java b/src/main/java/org/warp/commonutils/error/IndexOutOfBoundsException.java new file mode 100644 index 0000000..3454544 --- /dev/null +++ b/src/main/java/org/warp/commonutils/error/IndexOutOfBoundsException.java @@ -0,0 +1,19 @@ +package org.warp.commonutils.error; + +public class IndexOutOfBoundsException extends RuntimeException { + + public IndexOutOfBoundsException() { + } + + public IndexOutOfBoundsException(String s) { + super(s); + } + + public IndexOutOfBoundsException(long index) { + super("Index out of range: " + index); + } + + public IndexOutOfBoundsException(long index, long min, long max) { + super("Index " + index + " out of range (from " + min + " to " + max + ")"); + } +} diff --git a/src/main/java/org/warp/commonutils/error/InitializationException.java b/src/main/java/org/warp/commonutils/error/InitializationException.java new file mode 100644 index 0000000..a21a923 --- /dev/null +++ b/src/main/java/org/warp/commonutils/error/InitializationException.java @@ -0,0 +1,21 @@ +package org.warp.commonutils.error; + +import java.io.IOException; + +public class InitializationException extends IOException { + public InitializationException() { + super(); + } + + public InitializationException(String text) { + super(text); + } + + public InitializationException(String message, Throwable cause) { + super(message, cause); + } + + public InitializationException(Throwable cause) { + super(cause); + } +} diff --git a/src/main/java/org/warp/commonutils/functional/Generic.java b/src/main/java/org/warp/commonutils/functional/Generic.java new file mode 100644 index 0000000..1b3b8b7 --- /dev/null +++ b/src/main/java/org/warp/commonutils/functional/Generic.java @@ -0,0 +1,19 @@ +package org.warp.commonutils.functional; + +import java.util.function.Consumer; +import java.util.function.Function; +import org.warp.commonutils.functional.Unchecked.UncheckedConsumer; + +public class Generic { + public static Function function(Function fnc) { + return (Function) fnc; + + } + public static Consumer consumer(Consumer fnc) { + return (Consumer) fnc; + } + + public static UncheckedConsumer consumerExc(UncheckedConsumer fnc) { + return (UncheckedConsumer) fnc; + } +} diff --git a/src/main/java/org/warp/commonutils/functional/IOBooleanSupplier.java b/src/main/java/org/warp/commonutils/functional/IOBooleanSupplier.java new file mode 100644 index 0000000..c2912cc --- /dev/null +++ b/src/main/java/org/warp/commonutils/functional/IOBooleanSupplier.java @@ -0,0 +1,8 @@ +package org.warp.commonutils.functional; + +import java.io.IOException; + +public interface IOBooleanSupplier { + + boolean get() throws IOException; +} diff --git a/src/main/java/org/warp/commonutils/functional/IOConsumer.java b/src/main/java/org/warp/commonutils/functional/IOConsumer.java new file mode 100644 index 0000000..09b779a --- /dev/null +++ b/src/main/java/org/warp/commonutils/functional/IOConsumer.java @@ -0,0 +1,8 @@ +package org.warp.commonutils.functional; + +import java.io.IOException; + +public interface IOConsumer { + + void consume(T value) throws IOException; +} diff --git a/src/main/java/org/warp/commonutils/functional/IOFunction.java b/src/main/java/org/warp/commonutils/functional/IOFunction.java new file mode 100644 index 0000000..3790aa2 --- /dev/null +++ b/src/main/java/org/warp/commonutils/functional/IOFunction.java @@ -0,0 +1,8 @@ +package org.warp.commonutils.functional; + +import java.io.IOException; + +public interface IOFunction { + + U run(T data) throws IOException; +} diff --git a/src/main/java/org/warp/commonutils/functional/IOIntegerSupplier.java b/src/main/java/org/warp/commonutils/functional/IOIntegerSupplier.java new file mode 100644 index 0000000..b311899 --- /dev/null +++ b/src/main/java/org/warp/commonutils/functional/IOIntegerSupplier.java @@ -0,0 +1,8 @@ +package org.warp.commonutils.functional; + +import java.io.IOException; + +public interface IOIntegerSupplier { + + int get() throws IOException; +} diff --git a/src/main/java/org/warp/commonutils/functional/IOLongSupplier.java b/src/main/java/org/warp/commonutils/functional/IOLongSupplier.java new file mode 100644 index 0000000..78bed27 --- /dev/null +++ b/src/main/java/org/warp/commonutils/functional/IOLongSupplier.java @@ -0,0 +1,8 @@ +package org.warp.commonutils.functional; + +import java.io.IOException; + +public interface IOLongSupplier { + + long get() throws IOException; +} diff --git a/src/main/java/org/warp/commonutils/functional/IORunnable.java b/src/main/java/org/warp/commonutils/functional/IORunnable.java new file mode 100644 index 0000000..20431e6 --- /dev/null +++ b/src/main/java/org/warp/commonutils/functional/IORunnable.java @@ -0,0 +1,8 @@ +package org.warp.commonutils.functional; + +import java.io.IOException; + +public interface IORunnable { + + void run() throws IOException; +} diff --git a/src/main/java/org/warp/commonutils/functional/IOSupplier.java b/src/main/java/org/warp/commonutils/functional/IOSupplier.java new file mode 100644 index 0000000..1c26436 --- /dev/null +++ b/src/main/java/org/warp/commonutils/functional/IOSupplier.java @@ -0,0 +1,8 @@ +package org.warp.commonutils.functional; + +import java.io.IOException; + +public interface IOSupplier { + + T get() throws IOException; +} diff --git a/src/main/java/org/warp/commonutils/functional/TriConsumer.java b/src/main/java/org/warp/commonutils/functional/TriConsumer.java new file mode 100644 index 0000000..e7c19bb --- /dev/null +++ b/src/main/java/org/warp/commonutils/functional/TriConsumer.java @@ -0,0 +1,54 @@ +package org.warp.commonutils.functional; + +import java.util.Objects; +import java.util.function.Consumer; + +/** + * Represents an operation that accepts three input arguments and returns no + * result. This is the three-arity specialization of {@link Consumer}. + * Unlike most other functional interfaces, {@code TriConsumer} is expected + * to operate via side-effects. + * + *

This is a functional interface + * whose functional method is {@link #accept(Object, Object, Object)}. + * + * @param the type of the first argument to the operation + * @param the type of the second argument to the operation + * @param the type of the thord argument to the operation + * + * @see Consumer + * @since 1.8 + */ +@FunctionalInterface +public interface TriConsumer { + + /** + * Performs this operation on the given arguments. + * + * @param t the first input argument + * @param u the second input argument + * @param v the third input argument + */ + void accept(T t, U u, V v); + + /** + * Returns a composed {@code TriConsumer} that performs, in sequence, this + * operation followed by the {@code after} operation. If performing either + * operation throws an exception, it is relayed to the caller of the + * composed operation. If performing this operation throws an exception, + * the {@code after} operation will not be performed. + * + * @param after the operation to perform after this operation + * @return a composed {@code TriConsumer} that performs in sequence this + * operation followed by the {@code after} operation + * @throws NullPointerException if {@code after} is null + */ + default org.warp.commonutils.functional.TriConsumer andThen(org.warp.commonutils.functional.TriConsumer after) { + Objects.requireNonNull(after); + + return (l, r, u) -> { + accept(l, r, u); + after.accept(l, r, u); + }; + } +} diff --git a/src/main/java/org/warp/commonutils/functional/TriFunction.java b/src/main/java/org/warp/commonutils/functional/TriFunction.java new file mode 100644 index 0000000..3e5abf9 --- /dev/null +++ b/src/main/java/org/warp/commonutils/functional/TriFunction.java @@ -0,0 +1,51 @@ +package org.warp.commonutils.functional; + +import java.util.Objects; +import java.util.function.Function; + +/** + * Represents a function that accepts three arguments and produces a result. + * This is the three-arity specialization of {@link Function}. + * + *

This is a functional interface + * whose functional method is {@link #apply(Object, Object, Object)}. + * + * @param the type of the first argument to the function + * @param the type of the second argument to the function + * @param the type of the third argument to the function + * @param the type of the result of the function + * + * @see Function + * @since 1.8 + */ +@FunctionalInterface +public interface TriFunction { + + /** + * Applies this function to the given arguments. + * + * @param t the first function argument + * @param u the second function argument + * @param x the third function argument + * @return the function result + */ + R apply(T t, U u, X x); + + /** + * Returns a composed function that first applies this function to + * its input, and then applies the {@code after} function to the result. + * If evaluation of either function throws an exception, it is relayed to + * the caller of the composed function. + * + * @param the type of output of the {@code after} function, and of the + * composed function + * @param after the function to apply after this function is applied + * @return a composed function that first applies this function and then + * applies the {@code after} function + * @throws NullPointerException if after is null + */ + default org.warp.commonutils.functional.TriFunction andThen(Function after) { + Objects.requireNonNull(after); + return (T t, U u, X x) -> after.apply(apply(t, u, x)); + } +} diff --git a/src/main/java/org/warp/commonutils/functional/Unchecked.java b/src/main/java/org/warp/commonutils/functional/Unchecked.java new file mode 100644 index 0000000..2e5ea64 --- /dev/null +++ b/src/main/java/org/warp/commonutils/functional/Unchecked.java @@ -0,0 +1,30 @@ +package org.warp.commonutils.functional; + +import java.util.function.Function; + +public class Unchecked implements Function { + + private final UncheckedConsumer uncheckedConsumer; + + public Unchecked(UncheckedConsumer uncheckedConsumer) { + this.uncheckedConsumer = uncheckedConsumer; + } + + public static Unchecked wrap(UncheckedConsumer uncheckedConsumer) { + return new Unchecked<>(uncheckedConsumer); + } + + @Override + public UncheckedResult apply(T t) { + try { + uncheckedConsumer.consume(t); + return new UncheckedResult(); + } catch (Exception e) { + return new UncheckedResult(e); + } + } + + public interface UncheckedConsumer { + public void consume(T value) throws Exception; + } +} diff --git a/src/main/java/org/warp/commonutils/functional/UncheckedResult.java b/src/main/java/org/warp/commonutils/functional/UncheckedResult.java new file mode 100644 index 0000000..853f811 --- /dev/null +++ b/src/main/java/org/warp/commonutils/functional/UncheckedResult.java @@ -0,0 +1,33 @@ +package org.warp.commonutils.functional; + +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +public class UncheckedResult { + + @Nullable + private final Exception e; + + public UncheckedResult(@NotNull Exception e) { + this.e = e; + } + + public UncheckedResult() { + this.e = null; + } + + public UncheckedResult throwException(@NotNull Class exceptionClass) throws T { + if (e != null) { + if (exceptionClass.isInstance(e)) { + throw (T) e; + } + } + return this; + } + + public void done() { + if (e != null) { + throw new RuntimeException(e); + } + } +} diff --git a/src/main/java/org/warp/commonutils/functional/UnsafeIOUtils.java b/src/main/java/org/warp/commonutils/functional/UnsafeIOUtils.java new file mode 100644 index 0000000..ed02a16 --- /dev/null +++ b/src/main/java/org/warp/commonutils/functional/UnsafeIOUtils.java @@ -0,0 +1,52 @@ +package org.warp.commonutils.functional; + +import java.io.IOError; +import java.io.IOException; +import org.warp.commonutils.functional.IOBooleanSupplier; +import org.warp.commonutils.functional.IOIntegerSupplier; +import org.warp.commonutils.functional.IOLongSupplier; +import org.warp.commonutils.functional.IORunnable; +import org.warp.commonutils.functional.IOSupplier; + +public final class UnsafeIOUtils { + + public static T unsafe(IOSupplier expression) { + try { + return expression.get(); + } catch (IOException e) { + throw new IOError(e); + } + } + + public static int unsafe(IOIntegerSupplier expression) { + try { + return expression.get(); + } catch (IOException e) { + throw new IOError(e); + } + } + + public static boolean unsafe(IOBooleanSupplier expression) { + try { + return expression.get(); + } catch (IOException e) { + throw new IOError(e); + } + } + + public static long unsafe(IOLongSupplier expression) { + try { + return expression.get(); + } catch (IOException e) { + throw new IOError(e); + } + } + + public static void unsafe(IORunnable expression) { + try { + expression.run(); + } catch (IOException e) { + throw new IOError(e); + } + } +} diff --git a/src/main/java/org/warp/commonutils/locks/LeftRightLock.java b/src/main/java/org/warp/commonutils/locks/LeftRightLock.java new file mode 100644 index 0000000..7b8d458 --- /dev/null +++ b/src/main/java/org/warp/commonutils/locks/LeftRightLock.java @@ -0,0 +1,123 @@ +package org.warp.commonutils.locks; +import java.util.concurrent.locks.AbstractQueuedSynchronizer; +import java.util.concurrent.locks.Lock; + +/** + * A binary mutex with the following properties: + * + * Exposes two different {@link Lock}s: LEFT, RIGHT. + * + * When LEFT is held other threads can acquire LEFT but thread trying to acquire RIGHT will be + * blocked. When RIGHT is held other threads can acquire RIGHT but thread trying to acquire LEFT + * will be blocked. + */ +public class LeftRightLock { + + public static final int ACQUISITION_FAILED = -1; + public static final int ACQUISITION_SUCCEEDED = 1; + + private final LeftRightSync sync = new LeftRightSync(); + + public void lockLeft() { + sync.acquireShared(LockSide.LEFT.getV()); + } + + public void lockRight() { + sync.acquireShared(LockSide.RIGHT.getV()); + } + + public void releaseLeft() { + sync.releaseShared(LockSide.LEFT.getV()); + } + + public void releaseRight() { + sync.releaseShared(LockSide.RIGHT.getV()); + } + + public boolean tryLockLeft() { + return sync.tryAcquireShared(LockSide.LEFT) == ACQUISITION_SUCCEEDED; + } + + public boolean tryLockRight() { + return sync.tryAcquireShared(LockSide.RIGHT) == ACQUISITION_SUCCEEDED; + } + + private enum LockSide { + LEFT(-1), NONE(0), RIGHT(1); + + private final int v; + + LockSide(int v) { + this.v = v; + } + + public int getV() { + return v; + } + } + + /** + *

+ * Keep count the count of threads holding either the LEFT or the RIGHT lock. + *

+ * + *
  • A state ({@link AbstractQueuedSynchronizer#getState()}) greater than 0 means one or more threads are holding RIGHT lock.
  • + *
  • A state ({@link AbstractQueuedSynchronizer#getState()}) lower than 0 means one or more threads are holding LEFT lock.
  • + *
  • A state ({@link AbstractQueuedSynchronizer#getState()}) equal to zero means no thread is holding any lock.
  • + */ + private static final class LeftRightSync extends AbstractQueuedSynchronizer { + + + @Override + protected int tryAcquireShared(int requiredSide) { + return (tryChangeThreadCountHoldingCurrentLock(requiredSide, ChangeType.ADD) ? ACQUISITION_SUCCEEDED : ACQUISITION_FAILED); + } + + @Override + protected boolean tryReleaseShared(int requiredSide) { + return tryChangeThreadCountHoldingCurrentLock(requiredSide, ChangeType.REMOVE); + } + + public boolean tryChangeThreadCountHoldingCurrentLock(int requiredSide, ChangeType changeType) { + if (requiredSide != 1 && requiredSide != -1) + throw new AssertionError("You can either lock LEFT or RIGHT (-1 or +1)"); + + int curState; + int newState; + do { + curState = this.getState(); + if (!sameSide(curState, requiredSide)) { + return false; + } + + if (changeType == ChangeType.ADD) { + newState = curState + requiredSide; + } else { + newState = curState - requiredSide; + } + //TODO: protect against int overflow (hopefully you won't have so many threads) + } while (!this.compareAndSetState(curState, newState)); + return true; + } + + final int tryAcquireShared(LockSide lockSide) { + return this.tryAcquireShared(lockSide.getV()); + } + + final boolean tryReleaseShared(LockSide lockSide) { + return this.tryReleaseShared(lockSide.getV()); + } + + private boolean sameSide(int curState, int requiredSide) { + return curState == 0 || sameSign(curState, requiredSide); + } + + private boolean sameSign(int a, int b) { + return (a >= 0) ^ (b < 0); + } + + public enum ChangeType { + ADD, REMOVE + } + } +} \ No newline at end of file diff --git a/src/main/java/org/warp/commonutils/locks/LockUtils.java b/src/main/java/org/warp/commonutils/locks/LockUtils.java new file mode 100644 index 0000000..7995218 --- /dev/null +++ b/src/main/java/org/warp/commonutils/locks/LockUtils.java @@ -0,0 +1,148 @@ +package org.warp.commonutils.locks; + +import java.io.IOException; +import java.util.concurrent.locks.Lock; +import java.util.function.Supplier; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.warp.commonutils.functional.IORunnable; +import org.warp.commonutils.functional.IOSupplier; + +public class LockUtils { + + public static void lock(@Nullable Lock lock, @NotNull Runnable r) { + if (lock != null) { + lock.lock(); + } + try { + r.run(); + } finally { + if (lock != null) { + lock.unlock(); + } + } + } + + public static void lock(@Nullable LeftRightLock lock, boolean right, @NotNull Runnable r) { + if (lock != null) { + if (right) { + lock.lockRight(); + } else { + lock.lockLeft(); + } + } + try { + r.run(); + } finally { + if (lock != null) { + if (right) { + lock.releaseRight(); + } else { + lock.releaseLeft(); + } + } + } + } + + public static void lockIO(@Nullable Lock lock, @NotNull IORunnable r) throws IOException { + if (lock != null) { + lock.lock(); + } + try { + r.run(); + } finally { + if (lock != null) { + lock.unlock(); + } + } + } + + public static void lockIO(@Nullable LeftRightLock lock, boolean right, @NotNull IORunnable r) throws IOException { + if (lock != null) { + if (right) { + lock.lockRight(); + } else { + lock.lockLeft(); + } + } + try { + r.run(); + } finally { + if (lock != null) { + if (right) { + lock.releaseRight(); + } else { + lock.releaseLeft(); + } + } + } + } + + public static T lock(@Nullable Lock lock, @NotNull Supplier r) { + if (lock != null) { + lock.lock(); + } + try { + return r.get(); + } finally { + if (lock != null) { + lock.unlock(); + } + } + } + + public static T lock(@Nullable LeftRightLock lock, boolean right, @NotNull Supplier r) { + if (lock != null) { + if (right) { + lock.lockRight(); + } else { + lock.lockLeft(); + } + } + try { + return r.get(); + } finally { + if (lock != null) { + if (right) { + lock.releaseRight(); + } else { + lock.releaseLeft(); + } + } + } + } + + public static T lockIO(@Nullable Lock lock, @NotNull IOSupplier r) throws IOException { + if (lock != null) { + lock.lock(); + } + try { + return r.get(); + } finally { + if (lock != null) { + lock.unlock(); + } + } + } + + public static T lockIO(@Nullable LeftRightLock lock, boolean right, @NotNull IOSupplier r) throws IOException { + if (lock != null) { + if (right) { + lock.lockRight(); + } else { + lock.lockLeft(); + } + } + try { + return r.get(); + } finally { + if (lock != null) { + if (right) { + lock.releaseRight(); + } else { + lock.releaseLeft(); + } + } + } + } +} diff --git a/src/main/java/org/warp/commonutils/locks/Striped.java b/src/main/java/org/warp/commonutils/locks/Striped.java new file mode 100644 index 0000000..bbd09ff --- /dev/null +++ b/src/main/java/org/warp/commonutils/locks/Striped.java @@ -0,0 +1,519 @@ +/* + * Copyright (C) 2011 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.warp.commonutils.locks; + +import com.google.common.annotations.Beta; +import com.google.common.annotations.GwtIncompatible; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.MapMaker; +import com.google.common.math.IntMath; +import com.google.common.primitives.Ints; +import com.googlecode.concurentlocks.ReadWriteUpdateLock; +import com.googlecode.concurentlocks.ReentrantReadWriteUpdateLock; +import java.lang.ref.Reference; +import java.lang.ref.ReferenceQueue; +import java.lang.ref.WeakReference; +import java.math.RoundingMode; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * A striped {@code Lock/Semaphore/ReadWriteLock}. This offers the underlying lock striping similar to that of {@code + * ConcurrentHashMap} in a reusable form, and extends it for semaphores and read-write locks. Conceptually, lock + * striping is the technique of dividing a lock into many + * stripes, increasing the granularity of a single lock and allowing independent operations + * to lock different stripes and proceed concurrently, instead of creating contention for a single lock. + * + *

    The guarantee provided by this class is that equal keys lead to the same lock (or semaphore), + * i.e. {@code if (key1.equals(key2))} then {@code striped.get(key1) == striped.get(key2)} (assuming {@link + * Object#hashCode()} is correctly implemented for the keys). Note that if {@code key1} is + * not equal to {@code key2}, it is not guaranteed that + * {@code striped.get(key1) != striped.get(key2)}; the elements might nevertheless be mapped to the same lock. The lower + * the number of stripes, the higher the probability of this happening. + * + *

    There are three flavors of this class: {@code Striped}, {@code Striped}, and + * {@code Striped}. For each type, two implementations are offered: {@linkplain #lock(int) strong} and + * {@linkplain #lazyWeakLock(int) weak} {@code Striped}, {@linkplain #semaphore(int, int) strong} and {@linkplain + * #lazyWeakSemaphore(int, int) weak} {@code Striped}, and {@linkplain #readWriteLock(int) strong} and + * {@linkplain #lazyWeakReadWriteLock(int) weak} {@code Striped}. Strong means that all stripes + * (locks/semaphores) are initialized eagerly, and are not reclaimed unless {@code Striped} itself is reclaimable. + * Weak means that locks/semaphores are created lazily, and they are allowed to be reclaimed if nobody is + * holding on to them. This is useful, for example, if one wants to create a {@code Striped} of many locks, but + * worries that in most cases only a small portion of these would be in use. + * + *

    Prior to this class, one might be tempted to use {@code Map}, where {@code K} + * represents the task. This maximizes concurrency by having each unique key mapped to a unique lock, but also maximizes + * memory footprint. On the other extreme, one could use a single lock for all tasks, which minimizes memory footprint + * but also minimizes concurrency. Instead of choosing either of these extremes, {@code Striped} allows the user to + * trade between required concurrency and memory footprint. For example, if a set of tasks are CPU-bound, one could + * easily create a very compact {@code Striped} of {@code availableProcessors() * 4} stripes, instead of possibly + * thousands of locks which could be created in a {@code Map} structure. + * + * @author Dimitris Andreou + * @since 13.0 + */ +@Beta +@GwtIncompatible +public abstract class Striped { + + /** + * If there are at least this many stripes, we assume the memory usage of a ConcurrentMap will be smaller than a large + * array. (This assumes that in the lazy case, most stripes are unused. As always, if many stripes are in use, a + * non-lazy striped makes more sense.) + */ + private static final int LARGE_LAZY_CUTOFF = 1024; + + private Striped() { + } + + /** + * Returns the stripe that corresponds to the passed key. It is always guaranteed that if {@code key1.equals(key2)}, + * then {@code get(key1) == get(key2)}. + * + * @param key an arbitrary, non-null key + * @return the stripe that the passed key corresponds to + */ + public abstract L get(Object key); + + /** + * Returns the stripe at the specified index. Valid indexes are 0, inclusively, to {@code size()}, exclusively. + * + * @param index the index of the stripe to return; must be in {@code [0...size())} + * @return the stripe at the specified index + */ + public abstract L getAt(int index); + + /** + * Returns the index to which the given key is mapped, so that getAt(indexFor(key)) == get(key). + */ + abstract int indexFor(Object key); + + /** + * Returns the total number of stripes in this instance. + */ + public abstract int size(); + + /** + * Returns the stripes that correspond to the passed objects, in ascending (as per {@link #getAt(int)}) order. Thus, + * threads that use the stripes in the order returned by this method are guaranteed to not deadlock each other. + * + *

    It should be noted that using a {@code Striped} with relatively few stripes, and + * {@code bulkGet(keys)} with a relative large number of keys can cause an excessive number of shared stripes (much + * like the birthday paradox, where much fewer than anticipated birthdays are needed for a pair of them to match). + * Please consider carefully the implications of the number of stripes, the intended concurrency level, and the + * typical number of keys used in a {@code bulkGet(keys)} operation. See Balls + * in Bins model for mathematical formulas that can be used to estimate the probability of collisions. + * + * @param keys arbitrary non-null keys + * @return the stripes corresponding to the objects (one per each object, derived by delegating to {@link + * #get(Object)}; may contain duplicates), in an increasing index order. + */ + public Iterable bulkGet(Iterable keys) { + // Initially using the array to store the keys, then reusing it to store the respective L's + final Object[] array = Iterables.toArray(keys, Object.class); + if (array.length == 0) { + return ImmutableList.of(); + } + int[] stripes = new int[array.length]; + for (int i = 0; i < array.length; i++) { + stripes[i] = indexFor(array[i]); + } + Arrays.sort(stripes); + // optimize for runs of identical stripes + int previousStripe = stripes[0]; + array[0] = getAt(previousStripe); + for (int i = 1; i < array.length; i++) { + int currentStripe = stripes[i]; + if (currentStripe == previousStripe) { + array[i] = array[i - 1]; + } else { + array[i] = getAt(currentStripe); + previousStripe = currentStripe; + } + } + /* + * Note that the returned Iterable holds references to the returned stripes, to avoid + * error-prone code like: + * + * Striped stripedLock = Striped.lazyWeakXXX(...)' + * Iterable locks = stripedLock.bulkGet(keys); + * for (Lock lock : locks) { + * lock.lock(); + * } + * operation(); + * for (Lock lock : locks) { + * lock.unlock(); + * } + * + * If we only held the int[] stripes, translating it on the fly to L's, the original locks might + * be garbage collected after locking them, ending up in a huge mess. + */ + @SuppressWarnings("unchecked") // we carefully replaced all keys with their respective L's + List asList = (List) Arrays.asList(array); + return Collections.unmodifiableList(asList); + } + + // Static factories + + /** + * Creates a {@code Striped} with eagerly initialized, strongly referenced locks. Every lock is reentrant. + * + * @param stripes the minimum number of stripes (locks) required + * @return a new {@code Striped} + */ + public static Striped lock(int stripes) { + return new CompactStriped(stripes, new Supplier() { + @Override + public Lock get() { + return new PaddedLock(); + } + }); + } + + /** + * Creates a {@code Striped} with lazily initialized, weakly referenced locks. Every lock is reentrant. + * + * @param stripes the minimum number of stripes (locks) required + * @return a new {@code Striped} + */ + public static Striped lazyWeakLock(int stripes) { + return lazy(stripes, new Supplier() { + @Override + public Lock get() { + return new ReentrantLock(false); + } + }); + } + + private static Striped lazy(int stripes, Supplier supplier) { + return stripes < LARGE_LAZY_CUTOFF ? new SmallLazyStriped(stripes, supplier) + : new LargeLazyStriped(stripes, supplier); + } + + /** + * Creates a {@code Striped} with eagerly initialized, strongly referenced semaphores, with the specified + * number of permits. + * + * @param stripes the minimum number of stripes (semaphores) required + * @param permits the number of permits in each semaphore + * @return a new {@code Striped} + */ + public static Striped semaphore(int stripes, final int permits) { + return new CompactStriped(stripes, new Supplier() { + @Override + public Semaphore get() { + return new PaddedSemaphore(permits); + } + }); + } + + /** + * Creates a {@code Striped} with lazily initialized, weakly referenced semaphores, with the specified + * number of permits. + * + * @param stripes the minimum number of stripes (semaphores) required + * @param permits the number of permits in each semaphore + * @return a new {@code Striped} + */ + public static Striped lazyWeakSemaphore(int stripes, final int permits) { + return lazy(stripes, new Supplier() { + @Override + public Semaphore get() { + return new Semaphore(permits, false); + } + }); + } + + /** + * Creates a {@code Striped} with eagerly initialized, strongly referenced read-write locks. Every lock + * is reentrant. + * + * @param stripes the minimum number of stripes (locks) required + * @return a new {@code Striped} + */ + public static Striped readWriteLock(int stripes) { + return new CompactStriped(stripes, READ_WRITE_LOCK_SUPPLIER); + } + + /** + * Creates a {@code Striped} with eagerly initialized, strongly referenced read-write-update locks. + * Every lock is reentrant. + * + * @param stripes the minimum number of stripes (locks) required + * @return a new {@code Striped} + */ + public static Striped readWriteUpdateLock(int stripes) { + return new CompactStriped(stripes, READ_WRITE_UPDATE_LOCK_SUPPLIER); + } + + /** + * Creates a {@code Striped} with lazily initialized, weakly referenced read-write locks. Every lock is + * reentrant. + * + * @param stripes the minimum number of stripes (locks) required + * @return a new {@code Striped} + */ + public static Striped lazyWeakReadWriteLock(int stripes) { + return lazy(stripes, READ_WRITE_LOCK_SUPPLIER); + } + + // ReentrantReadWriteLock is large enough to make padding probably unnecessary + private static final Supplier READ_WRITE_LOCK_SUPPLIER = new Supplier() { + @Override + public ReadWriteLock get() { + return new ReentrantReadWriteLock(); + } + }; + + // ReentrantReadWriteUpdateLock is large enough to make padding probably unnecessary + private static final Supplier READ_WRITE_UPDATE_LOCK_SUPPLIER = new Supplier() { + @Override + public ReadWriteUpdateLock get() { + return new ReentrantReadWriteUpdateLock(); + } + }; + + private abstract static class PowerOfTwoStriped extends Striped { + + /** + * Capacity (power of two) minus one, for fast mod evaluation + */ + final int mask; + + PowerOfTwoStriped(int stripes) { + Preconditions.checkArgument(stripes > 0, "Stripes must be positive"); + this.mask = stripes > Ints.MAX_POWER_OF_TWO ? ALL_SET : ceilToPowerOfTwo(stripes) - 1; + } + + @Override + final int indexFor(Object key) { + int hash = smear(key.hashCode()); + return hash & mask; + } + + @Override + public final L get(Object key) { + return getAt(indexFor(key)); + } + } + + /** + * Implementation of Striped where 2^k stripes are represented as an array of the same length, eagerly initialized. + */ + private static class CompactStriped extends PowerOfTwoStriped { + + /** + * Size is a power of two. + */ + private final Object[] array; + + private CompactStriped(int stripes, Supplier supplier) { + super(stripes); + Preconditions.checkArgument(stripes <= Ints.MAX_POWER_OF_TWO, "Stripes must be <= 2^30)"); + + this.array = new Object[mask + 1]; + for (int i = 0; i < array.length; i++) { + array[i] = supplier.get(); + } + } + + @SuppressWarnings("unchecked") // we only put L's in the array + @Override + public L getAt(int index) { + return (L) array[index]; + } + + @Override + public int size() { + return array.length; + } + } + + /** + * Implementation of Striped where up to 2^k stripes can be represented, using an AtomicReferenceArray of size 2^k. To + * map a user key into a stripe, we take a k-bit slice of the user key's (smeared) hashCode(). The stripes are lazily + * initialized and are weakly referenced. + */ + @VisibleForTesting + static class SmallLazyStriped extends PowerOfTwoStriped { + + final AtomicReferenceArray> locks; + final Supplier supplier; + final int size; + final ReferenceQueue queue = new ReferenceQueue(); + + SmallLazyStriped(int stripes, Supplier supplier) { + super(stripes); + this.size = (mask == ALL_SET) ? Integer.MAX_VALUE : mask + 1; + this.locks = new AtomicReferenceArray>(size); + this.supplier = supplier; + } + + @Override + public L getAt(int index) { + if (size != Integer.MAX_VALUE) { + Preconditions.checkElementIndex(index, size()); + } // else no check necessary, all index values are valid + ArrayReference existingRef = locks.get(index); + L existing = existingRef == null ? null : existingRef.get(); + if (existing != null) { + return existing; + } + L created = supplier.get(); + ArrayReference newRef = new ArrayReference(created, index, queue); + while (!locks.compareAndSet(index, existingRef, newRef)) { + // we raced, we need to re-read and try again + existingRef = locks.get(index); + existing = existingRef == null ? null : existingRef.get(); + if (existing != null) { + return existing; + } + } + drainQueue(); + return created; + } + + // N.B. Draining the queue is only necessary to ensure that we don't accumulate empty references + // in the array. We could skip this if we decide we don't care about holding on to Reference + // objects indefinitely. + private void drainQueue() { + Reference ref; + while ((ref = queue.poll()) != null) { + // We only ever register ArrayReferences with the queue so this is always safe. + ArrayReference arrayRef = (ArrayReference) ref; + // Try to clear out the array slot, n.b. if we fail that is fine, in either case the + // arrayRef will be out of the array after this step. + locks.compareAndSet(arrayRef.index, arrayRef, null); + } + } + + @Override + public int size() { + return size; + } + + private static final class ArrayReference extends WeakReference { + + final int index; + + ArrayReference(L referent, int index, ReferenceQueue queue) { + super(referent, queue); + this.index = index; + } + } + } + + /** + * Implementation of Striped where up to 2^k stripes can be represented, using a ConcurrentMap where the key domain is + * [0..2^k). To map a user key into a stripe, we take a k-bit slice of the user key's (smeared) hashCode(). The + * stripes are lazily initialized and are weakly referenced. + */ + @VisibleForTesting + static class LargeLazyStriped extends PowerOfTwoStriped { + + final ConcurrentMap locks; + final Supplier supplier; + final int size; + + LargeLazyStriped(int stripes, Supplier supplier) { + super(stripes); + this.size = (mask == ALL_SET) ? Integer.MAX_VALUE : mask + 1; + this.supplier = supplier; + this.locks = new MapMaker().weakValues().makeMap(); + } + + @Override + public L getAt(int index) { + if (size != Integer.MAX_VALUE) { + Preconditions.checkElementIndex(index, size()); + } // else no check necessary, all index values are valid + L existing = locks.get(index); + if (existing != null) { + return existing; + } + L created = supplier.get(); + existing = locks.putIfAbsent(index, created); + return MoreObjects.firstNonNull(existing, created); + } + + @Override + public int size() { + return size; + } + } + + /** + * A bit mask were all bits are set. + */ + private static final int ALL_SET = ~0; + + private static int ceilToPowerOfTwo(int x) { + return 1 << IntMath.log2(x, RoundingMode.CEILING); + } + + /* + * This method was written by Doug Lea with assistance from members of JCP JSR-166 Expert Group + * and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + * + * As of 2010/06/11, this method is identical to the (package private) hash method in OpenJDK 7's + * java.util.HashMap class. + */ + // Copied from java/com/google/common/collect/Hashing.java + private static int smear(int hashCode) { + hashCode ^= (hashCode >>> 20) ^ (hashCode >>> 12); + return hashCode ^ (hashCode >>> 7) ^ (hashCode >>> 4); + } + + private static class PaddedLock extends ReentrantLock { + + /* + * Padding from 40 into 64 bytes, same size as cache line. Might be beneficial to add a fourth + * long here, to minimize chance of interference between consecutive locks, but I couldn't + * observe any benefit from that. + */ long unused1; + long unused2; + long unused3; + + PaddedLock() { + super(false); + } + } + + private static class PaddedSemaphore extends Semaphore { + + // See PaddedReentrantLock comment + long unused1; + long unused2; + long unused3; + + PaddedSemaphore(int permits) { + super(permits, false); + } + } +} diff --git a/src/main/java/org/warp/commonutils/metrics/AtomicDetailedTimeAbsoluteSamples.java b/src/main/java/org/warp/commonutils/metrics/AtomicDetailedTimeAbsoluteSamples.java new file mode 100644 index 0000000..72445a3 --- /dev/null +++ b/src/main/java/org/warp/commonutils/metrics/AtomicDetailedTimeAbsoluteSamples.java @@ -0,0 +1,91 @@ +package org.warp.commonutils.metrics; + +import java.util.HashMap; + +public class AtomicDetailedTimeAbsoluteSamples implements AtomicDetailedTimeAbsoluteSamplesSnapshot { + + private final int sampleTime; + private final int samplesCount; + private HashMap detailedAtomicTimeSamples = new HashMap<>(); + + /** + * @param sampleTime in milliseconds + * @param samplesCount + */ + public AtomicDetailedTimeAbsoluteSamples(int sampleTime, int samplesCount) { + this.sampleTime = sampleTime; + this.samplesCount = samplesCount; + } + + public AtomicDetailedTimeAbsoluteSamples(int sampleTime, int samplesCount, HashMap detailedAtomicTimeSamples) { + this.sampleTime = sampleTime; + this.samplesCount = samplesCount; + this.detailedAtomicTimeSamples = new HashMap<>(); + detailedAtomicTimeSamples.forEach((detail, sample) -> this.detailedAtomicTimeSamples.put(detail, (AtomicTimeAbsoluteSamples) sample)); + } + + private synchronized void updateSamples() { + + } + + private synchronized AtomicTimeAbsoluteSamples getDetailed(T detail) { + AtomicTimeAbsoluteSamples detailed = detailedAtomicTimeSamples.get(detail); + if (detailed == null) { + detailed = new AtomicTimeAbsoluteSamples(sampleTime, samplesCount); + detailedAtomicTimeSamples.put(detail, detailed); + } + return detailed; + } + + public synchronized void set(T detail, long count) { + updateSamples(); + getDetailed(detail).set(count); + } + + @Override + public synchronized double getAveragePerSecond(T detail, long timeRange) { + updateSamples(); + return getDetailed(detail).getAveragePerSecond(timeRange); + } + + @Override + public synchronized double getAveragePerSecond(long timeRange) { + updateSamples(); + return detailedAtomicTimeSamples.values().stream().mapToDouble((detail) -> detail.getAveragePerSecond(timeRange)).sum(); + } + + @Override + public synchronized long getCurrentCount(T detail) { + updateSamples(); + return getDetailed(detail).getCurrentCount(); + } + + @Override + public synchronized long getCurrentCount() { + updateSamples(); + return detailedAtomicTimeSamples.values().stream().mapToLong(AtomicTimeAbsoluteSamples::getCurrentCount).sum(); + } + + @Override + public synchronized double getTotalAveragePerSecond() { + updateSamples(); + return detailedAtomicTimeSamples.values().stream().mapToDouble(AtomicTimeAbsoluteSamples::getTotalAveragePerSecond).sum(); + } + + @Override + public synchronized double getTotalAveragePerSecond(T detail) { + updateSamples(); + return getDetailed(detail).getTotalAveragePerSecond(); + } + + public synchronized AtomicTimeAbsoluteSamplesSnapshot snapshot(T detail) { + return getDetailed(detail).snapshot(); + } + + public synchronized AtomicDetailedTimeAbsoluteSamples snapshot() { + var clonedDetailedAtomicTimeSamples = new HashMap(detailedAtomicTimeSamples); + clonedDetailedAtomicTimeSamples.replaceAll((key, value) -> ((AtomicTimeAbsoluteSamples) value).snapshot()); + return new AtomicDetailedTimeAbsoluteSamples<>(sampleTime, + samplesCount, clonedDetailedAtomicTimeSamples); + } +} diff --git a/src/main/java/org/warp/commonutils/metrics/AtomicDetailedTimeAbsoluteSamplesSnapshot.java b/src/main/java/org/warp/commonutils/metrics/AtomicDetailedTimeAbsoluteSamplesSnapshot.java new file mode 100644 index 0000000..9040111 --- /dev/null +++ b/src/main/java/org/warp/commonutils/metrics/AtomicDetailedTimeAbsoluteSamplesSnapshot.java @@ -0,0 +1,10 @@ +package org.warp.commonutils.metrics; + +public interface AtomicDetailedTimeAbsoluteSamplesSnapshot extends AtomicTimeAbsoluteSamplesSnapshot { + + double getAveragePerSecond(T detail, long timeRange); + + long getCurrentCount(T detail); + + double getTotalAveragePerSecond(T detail); +} diff --git a/src/main/java/org/warp/commonutils/metrics/AtomicDetailedTimeIncrementalSamples.java b/src/main/java/org/warp/commonutils/metrics/AtomicDetailedTimeIncrementalSamples.java new file mode 100644 index 0000000..e999011 --- /dev/null +++ b/src/main/java/org/warp/commonutils/metrics/AtomicDetailedTimeIncrementalSamples.java @@ -0,0 +1,81 @@ +package org.warp.commonutils.metrics; + +import java.util.Arrays; +import java.util.HashMap; + +public class AtomicDetailedTimeIncrementalSamples extends AtomicTimeIncrementalSamples implements + AtomicDetailedTimeIncrementalSamplesSnapshot { + + private HashMap detailedAtomicTimeSamples = new HashMap<>(); + + /** + * @param sampleTime in milliseconds + * @param samplesCount + */ + public AtomicDetailedTimeIncrementalSamples(int sampleTime, int samplesCount) { + super(sampleTime, samplesCount); + } + + public AtomicDetailedTimeIncrementalSamples(long startTime, long[] samples, int sampleTime, long currentSampleStartTime, long totalEvents, + HashMap detailedAtomicTimeSamples) { + super(startTime, samples, sampleTime, currentSampleStartTime, totalEvents); + this.detailedAtomicTimeSamples = new HashMap<>(); + detailedAtomicTimeSamples.forEach((detail, sample) -> detailedAtomicTimeSamples.put(detail, (AtomicTimeIncrementalSamples) sample)); + } + + private synchronized AtomicTimeIncrementalSamples getDetailed(T detail) { + AtomicTimeIncrementalSamples detailed = detailedAtomicTimeSamples.get(detail); + if (detailed == null) { + detailed = new AtomicTimeIncrementalSamples(sampleTime, samples.length); + detailedAtomicTimeSamples.put(detail, detailed); + } + return detailed; + } + + public synchronized void increment(T detail, long count) { + updateSamples(); + getDetailed(detail).increment(count); + increment(count); + } + + @Override + public synchronized double getAveragePerSecond(T detail, long timeRange) { + updateSamples(); + return getDetailed(detail).getAveragePerSecond(timeRange); + } + + @Override + public synchronized long getApproximateCount(T detail, long timeRange) { + updateSamples(); + return getDetailed(detail).getApproximateCount(timeRange); + } + + @Override + public synchronized long getTotalCount(T detail) { + updateSamples(); + return getDetailed(detail).getTotalCount(); + } + + @Override + public synchronized double getTotalAverage(T detail) { + updateSamples(); + return getDetailed(detail).getTotalAveragePerSecond(); + } + + public synchronized AtomicTimeIncrementalSamplesSnapshot snapshot(T detail) { + return getDetailed(detail).snapshot(); + } + + @Override + protected synchronized void shiftSamples(int shiftCount) { + //detailedAtomicTimeSamples.values().forEach(AtomicTimeSamples::shiftSamples); + super.shiftSamples(shiftCount); + } + + public synchronized AtomicDetailedTimeIncrementalSamples snapshot() { + var clonedDetailedAtomicTimeSamples = new HashMap(detailedAtomicTimeSamples); + clonedDetailedAtomicTimeSamples.replaceAll((key, value) -> ((AtomicTimeIncrementalSamples) value).snapshot()); + return new AtomicDetailedTimeIncrementalSamples<>(startTime, Arrays.copyOf(this.samples, this.samples.length), sampleTime, + currentSampleStartTime, totalEvents, clonedDetailedAtomicTimeSamples); + } +} diff --git a/src/main/java/org/warp/commonutils/metrics/AtomicDetailedTimeIncrementalSamplesSnapshot.java b/src/main/java/org/warp/commonutils/metrics/AtomicDetailedTimeIncrementalSamplesSnapshot.java new file mode 100644 index 0000000..2b3bca4 --- /dev/null +++ b/src/main/java/org/warp/commonutils/metrics/AtomicDetailedTimeIncrementalSamplesSnapshot.java @@ -0,0 +1,12 @@ +package org.warp.commonutils.metrics; + +public interface AtomicDetailedTimeIncrementalSamplesSnapshot extends AtomicTimeIncrementalSamplesSnapshot { + + double getAveragePerSecond(T detail, long timeRange); + + long getApproximateCount(T detail, long timeRange); + + long getTotalCount(T detail); + + double getTotalAverage(T detail); +} diff --git a/src/main/java/org/warp/commonutils/metrics/AtomicTimeAbsoluteSamples.java b/src/main/java/org/warp/commonutils/metrics/AtomicTimeAbsoluteSamples.java new file mode 100644 index 0000000..a29fc64 --- /dev/null +++ b/src/main/java/org/warp/commonutils/metrics/AtomicTimeAbsoluteSamples.java @@ -0,0 +1,112 @@ +package org.warp.commonutils.metrics; + +import java.util.Arrays; + +public class AtomicTimeAbsoluteSamples implements AtomicTimeAbsoluteSamplesSnapshot { + + protected long startTime; + protected final long[] samples; + protected final int sampleTime; + protected long currentSampleStartTime; + protected long totalSamplesSum = 0; + protected long totalSamplesCount = 1; + + /** + * + * @param sampleTime in milliseconds + * @param samplesCount + */ + public AtomicTimeAbsoluteSamples(int sampleTime, int samplesCount) { + this.samples = new long[samplesCount]; + this.sampleTime = sampleTime; + startTime = -1; + if (samplesCount < 1) throw new IndexOutOfBoundsException(); + if (sampleTime < 1) throw new IndexOutOfBoundsException(); + } + + public AtomicTimeAbsoluteSamples(long startTime, long[] samples, int sampleTime, long currentSampleStartTime, long totalSamplesSum, long totalSamplesCount) { + this.startTime = startTime; + this.samples = samples; + this.sampleTime = sampleTime; + this.currentSampleStartTime = currentSampleStartTime; + this.totalSamplesSum = totalSamplesSum; + this.totalSamplesCount = totalSamplesCount; + } + + protected synchronized void updateSamples() { + checkStarted(); + long currentTime = System.nanoTime() / 1000000L; + long timeDiff = currentTime - currentSampleStartTime; + long timeToShift = timeDiff - (timeDiff % sampleTime); + int shiftCount = (int) (timeToShift / sampleTime); + if (currentTime - (currentSampleStartTime + timeToShift) > sampleTime) { + throw new IndexOutOfBoundsException("Time sample bigger than " + sampleTime + "! It's " + (currentTime - (currentSampleStartTime + timeToShift))); + } + if (shiftCount > 0) { + shiftSamples(shiftCount); + currentSampleStartTime += timeToShift; + totalSamplesCount += shiftCount; + long lastSample = samples[0]; + totalSamplesSum += lastSample * shiftCount; + } + } + + protected synchronized void checkStarted() { + if (startTime == -1) { + this.startTime = System.nanoTime() / 1000000L; + this.currentSampleStartTime = startTime; + } + } + + protected void shiftSamples(int shiftCount) { + checkStarted(); + long lastSample = samples[0]; + if (samples.length - shiftCount > 0) { + System.arraycopy(samples, 0, samples, shiftCount, samples.length - shiftCount); + Arrays.fill(samples, 0, shiftCount, lastSample); + } else { + Arrays.fill(samples, lastSample); + } + } + + public synchronized void set(long count) { + updateSamples(); + long oldValue = samples[0]; + samples[0]=count; + totalSamplesSum += count - oldValue; + } + + @Override + public synchronized double getAveragePerSecond(long timeRange) { + updateSamples(); + + // Fix if the time range is bigger than the collected data since start + long currentTime = System.nanoTime() / 1000000L; + if (currentTime - timeRange < startTime) { + timeRange = currentTime - startTime; + } + + long samplesCount = Math.min(Math.max(timeRange / sampleTime, 1L), samples.length); + long value = 0; + for (int i = 0; i < samplesCount; i++) { + value += samples[i]; + } + return ((double) value) / ((double) samplesCount); + } + + @Override + public synchronized long getCurrentCount() { + updateSamples(); + return samples[0]; + } + + @Override + public synchronized double getTotalAveragePerSecond() { + updateSamples(); + return (double) totalSamplesSum / (double) totalSamplesCount; + } + + public synchronized AtomicTimeAbsoluteSamplesSnapshot snapshot() { + return new AtomicTimeAbsoluteSamples(startTime, Arrays.copyOf(this.samples, this.samples.length), sampleTime, currentSampleStartTime, totalSamplesSum, totalSamplesCount); + } +} diff --git a/src/main/java/org/warp/commonutils/metrics/AtomicTimeAbsoluteSamplesSnapshot.java b/src/main/java/org/warp/commonutils/metrics/AtomicTimeAbsoluteSamplesSnapshot.java new file mode 100644 index 0000000..0eed8ea --- /dev/null +++ b/src/main/java/org/warp/commonutils/metrics/AtomicTimeAbsoluteSamplesSnapshot.java @@ -0,0 +1,10 @@ +package org.warp.commonutils.metrics; + +public interface AtomicTimeAbsoluteSamplesSnapshot { + + double getAveragePerSecond(long timeRange); + + long getCurrentCount(); + + double getTotalAveragePerSecond(); +} diff --git a/src/main/java/org/warp/commonutils/metrics/AtomicTimeIncrementalSamples.java b/src/main/java/org/warp/commonutils/metrics/AtomicTimeIncrementalSamples.java new file mode 100644 index 0000000..4ed75d5 --- /dev/null +++ b/src/main/java/org/warp/commonutils/metrics/AtomicTimeIncrementalSamples.java @@ -0,0 +1,121 @@ +package org.warp.commonutils.metrics; + +import java.util.Arrays; + +public class AtomicTimeIncrementalSamples implements AtomicTimeIncrementalSamplesSnapshot { + + protected long startTime; + protected final long[] samples; + protected final int sampleTime; + protected long currentSampleStartTime; + protected long totalEvents; + + /** + * + * @param sampleTime in milliseconds + * @param samplesCount + */ + public AtomicTimeIncrementalSamples(int sampleTime, int samplesCount) { + this.samples = new long[samplesCount]; + this.sampleTime = sampleTime; + startTime = -1; + if (samplesCount < 1) throw new IndexOutOfBoundsException(); + if (sampleTime < 1) throw new IndexOutOfBoundsException(); + } + + public AtomicTimeIncrementalSamples(long startTime, long[] samples, int sampleTime, long currentSampleStartTime, long totalEvents) { + this.startTime = startTime; + this.samples = samples; + this.sampleTime = sampleTime; + this.currentSampleStartTime = currentSampleStartTime; + this.totalEvents = totalEvents; + } + + protected synchronized void updateSamples() { + checkStarted(); + long currentTime = System.nanoTime() / 1000000L; + long timeDiff = currentTime - currentSampleStartTime; + long timeToShift = timeDiff - (timeDiff % sampleTime); + int shiftCount = (int) (timeToShift / sampleTime); + if (currentTime - (currentSampleStartTime + timeToShift) > sampleTime) { + throw new IndexOutOfBoundsException("Time sample bigger than " + sampleTime + "! It's " + (currentTime - (currentSampleStartTime + timeToShift))); + } + if (shiftCount > 0) { + shiftSamples(shiftCount); + currentSampleStartTime += timeToShift; + } + } + + protected synchronized void checkStarted() { + if (startTime == -1) { + this.startTime = System.nanoTime() / 1000000L; + this.currentSampleStartTime = startTime; + } + } + + protected synchronized void shiftSamples(int shiftCount) { + checkStarted(); + if (samples.length - shiftCount > 0) { + System.arraycopy(samples, 0, samples, shiftCount, samples.length - shiftCount); + Arrays.fill(samples, 0, shiftCount, 0); + } else { + Arrays.fill(samples, 0); + } + } + + public synchronized void increment(long count) { + updateSamples(); + samples[0]+=count; + totalEvents+=count; + } + + @Override + public synchronized double getAveragePerSecond(long timeRange) { + updateSamples(); + + // Fix if the time range is bigger than the collected data since start + long currentTime = currentSampleStartTime; + if (currentTime - timeRange < startTime) { + timeRange = currentTime - startTime; + } + + long samplesCount = Math.min(Math.max(timeRange / sampleTime, 1L), samples.length); + long roundedTimeRange = samplesCount * sampleTime; + long value = 0; + for (int i = 0; i < samplesCount; i++) { + value += samples[i]; + } + return ((double) value) / ((double) roundedTimeRange / 1000D); + } + + + @Override + public synchronized long getApproximateCount(long timeRange) { + updateSamples(); + long samplesCount = Math.min(Math.max(timeRange / sampleTime, 1L), samples.length); + long value = 0; + for (int i = 0; i < samplesCount; i++) { + value += samples[i]; + } + return value; + } + + @Override + public synchronized long getTotalCount() { + updateSamples(); + return totalEvents; + } + + @Override + public synchronized double getTotalAveragePerSecond() { + updateSamples(); + if (currentSampleStartTime == startTime) { + return 0; + } + return ((double) totalEvents) / (double) ((currentSampleStartTime - startTime) / 1000D); + } + + public synchronized AtomicTimeIncrementalSamplesSnapshot snapshot() { + return new AtomicTimeIncrementalSamples(startTime, Arrays.copyOf(this.samples, this.samples.length), sampleTime, currentSampleStartTime, totalEvents); + } +} diff --git a/src/main/java/org/warp/commonutils/metrics/AtomicTimeIncrementalSamplesSnapshot.java b/src/main/java/org/warp/commonutils/metrics/AtomicTimeIncrementalSamplesSnapshot.java new file mode 100644 index 0000000..7f1bc34 --- /dev/null +++ b/src/main/java/org/warp/commonutils/metrics/AtomicTimeIncrementalSamplesSnapshot.java @@ -0,0 +1,12 @@ +package org.warp.commonutils.metrics; + +public interface AtomicTimeIncrementalSamplesSnapshot { + + double getAveragePerSecond(long timeRange); + + long getApproximateCount(long timeRange); + + long getTotalCount(); + + double getTotalAveragePerSecond(); +} diff --git a/src/main/java/org/warp/commonutils/random/HashUtil.java b/src/main/java/org/warp/commonutils/random/HashUtil.java new file mode 100644 index 0000000..245cec3 --- /dev/null +++ b/src/main/java/org/warp/commonutils/random/HashUtil.java @@ -0,0 +1,24 @@ +package org.warp.commonutils.random; + +public final class HashUtil { + + private HashUtil() { + } + + public static int boundedHash(Object o, int upperBoundExclusive) { + int h = o.hashCode(); + + // Protection against poor hash functions. + // Used by java.util.concurrent.ConcurrentHashMap + // Spread bits to regularize both segment and index locations, + // using variant of single-word Wang/Jenkins hash. + h += (h << 15) ^ 0xffffcd7d; + h ^= (h >>> 10); + h += (h << 3); + h ^= (h >>> 6); + h += (h << 2) + (h << 14); + h ^= (h >>> 16); + + return Math.abs(h % upperBoundExclusive); + } +} \ No newline at end of file diff --git a/src/main/java/org/warp/commonutils/random/LFSR.java b/src/main/java/org/warp/commonutils/random/LFSR.java new file mode 100644 index 0000000..c84505f --- /dev/null +++ b/src/main/java/org/warp/commonutils/random/LFSR.java @@ -0,0 +1,125 @@ +package org.warp.commonutils.random; + +import java.math.BigInteger; +import java.util.Iterator; +import java.util.Random; + +/** + * Linear feedback shift register + *

    + * Taps can be found at: See http://www.xilinx.com/support/documentation/application_notes/xapp052.pdf See + * http://mathoverflow.net/questions/46961/how-are-taps-proven-to-work-for-lfsrs/46983#46983 See + * http://www.newwaveinstruments.com/resources/articles/m_sequence_linear_feedback_shift_register_lfsr.htm See + * http://www.yikes.com/~ptolemy/lfsr_web/index.htm See http://seanerikoconnor.freeservers.com/Mathematics/AbstractAlgebra/PrimitivePolynomials/overview.html + * + * @author OldCurmudgeon + */ +public class LFSR implements Iterable { + + private static final Random random = new Random(); + + // Bit pattern for taps. + private final BigInteger taps; + // Where to start (and end). + private final BigInteger start; + + public static LFSR randomInt() { + return random(32, random.nextInt()); + } + + public static LFSR randomLong() { + return random(64, random.nextLong()); + } + + public static LFSR randomPositiveLong() { + return random(50, Math.abs(random.nextInt())); + } + + public static LFSR random(int bitsSize, long startNumber) { + // Build the BigInteger. + BigInteger primitive = BigInteger.ZERO; + for (int bitNumber = 0; bitNumber <= bitsSize; bitNumber++) { + if (random.nextBoolean() || bitNumber == 0 || bitNumber == bitsSize) { + primitive = primitive.or(BigInteger.ONE.shiftLeft(bitNumber)); + } + } + return new LFSR(primitive, BigInteger.valueOf(startNumber)); + } + + // The poly must be primitive to span the full sequence. + public LFSR(BigInteger primitivePoly, BigInteger start) { + // Where to start from (and stop). + this.start = start.equals(BigInteger.ZERO) ? BigInteger.ONE : start; + // Knock off the 2^0 coefficient of the polynomial for the TAP. + this.taps = primitivePoly.shiftRight(1); + } + + @Override + public Iterator iterator() { + return new LFSRIterator(start); + } + + private class LFSRIterator implements Iterator { + // The last one we returned. + + private BigInteger last = null; + // The next one to return. + private BigInteger next = null; + + public LFSRIterator(BigInteger start) { + // Do not return the seed. + last = start; + } + + @Override + public boolean hasNext() { + if (next == null) { + /* + * Uses the Galois form. + * + * Shift last right one. + * + * If the bit shifted out was a 1 - xor with the tap mask. + */ + boolean shiftedOutA1 = last.testBit(0); + // Shift right. + next = last.shiftRight(1); + if (shiftedOutA1) { + // Tap! + next = next.xor(taps); + } + // Never give them `start` again. + if (next.equals(start)) { + // Could set a finished flag here too. + next = null; + } + } + return next != null; + } + + @Override + public BigInteger next() { + // Remember this one. + last = hasNext() ? next : null; + // Don't deliver it again. + next = null; + return last; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Not supported."); + } + + @Override + public String toString() { + return LFSR.this.toString() + "[" + (last != null ? last.toString(16) : "") + "-" + (next != null ? next + .toString(16) : "") + "]"; + } + } + + @Override + public String toString() { + return "(" + taps.toString(32) + ")-" + start.toString(32); + } +} \ No newline at end of file diff --git a/src/main/java/org/warp/commonutils/range/MappedRanges.java b/src/main/java/org/warp/commonutils/range/MappedRanges.java new file mode 100644 index 0000000..8ca2464 --- /dev/null +++ b/src/main/java/org/warp/commonutils/range/MappedRanges.java @@ -0,0 +1,121 @@ +package org.warp.commonutils.range; + +import it.unimi.dsi.fastutil.objects.Object2ObjectMap; +import it.unimi.dsi.fastutil.objects.Object2ObjectMaps; +import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap; +import it.unimi.dsi.fastutil.objects.Object2ObjectRBTreeMap; +import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap; +import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet; +import java.util.Comparator; +import java.util.function.Function; + +public class MappedRanges { + + private final Object2ObjectMap ranges; + + public MappedRanges(int start, int end, T value) { + if (start > end) { + throw new IndexOutOfBoundsException(); + } + this.ranges = new Object2ObjectOpenHashMap<>(); + ranges.put(new Range(start, end), value); + } + + public void deleteRange(final int start, final int end, Function replaceWhenSplitting, Function cloneWhenSplitting) { + if (start > end) { + throw new IndexOutOfBoundsException(); + } + Object2ObjectOpenHashMap rangesToAdd = new Object2ObjectOpenHashMap<>(); + ObjectOpenHashSet rangesToDelete = new ObjectOpenHashSet<>(); + ranges.forEach((range, value) -> { + if (range.start <= end && range.end >= start) { + if (range.start >= start && range.end <= end) { + // delete the range + rangesToDelete.add(range); + } else if (range.start <= start && range.end >= end) { + // cut the hole + rangesToDelete.add(range); + rangesToAdd.put(new Range(range.start, start), value); + rangesToAdd.put(new Range(end, range.end), cloneWhenSplitting.apply(value)); + } else if (range.start <= start && range.end <= end && range.end > start) { + // shrink the right border + rangesToDelete.add(range); + rangesToAdd.put(new Range(range.start, start), value); + } else if (range.start >= start && range.end >= end && range.start < end) { + // shrink the left border + rangesToDelete.add(range); + rangesToAdd.put(new Range(end, range.end), value); + } + } + }); + for (Range range : rangesToDelete) { + ranges.remove(range); + } + rangesToAdd.forEach((range, value) -> { + if (canAddRange(range)) { + ranges.put(range, replaceWhenSplitting.apply(value)); + } + }); + } + + public void transformRange(int start, int end, Function replaceWhenOverlapping, Function cloneWhenSplitting) { + if (start > end) { + throw new IndexOutOfBoundsException(); + } + Object2ObjectOpenHashMap rangesToTransform = new Object2ObjectOpenHashMap<>(); + Object2ObjectOpenHashMap rangesToAdd = new Object2ObjectOpenHashMap<>(); + ObjectOpenHashSet rangesToRemove = new ObjectOpenHashSet<>(); + ranges.forEach((range, value) -> { + if (range.start <= end && range.end >= start) { + if (range.start >= start && range.end <= end) { + // transform the range + rangesToTransform.put(range, replaceWhenOverlapping.apply(value)); + } else if (range.start <= start && range.end >= end) { + // transform the middle + rangesToRemove.add(range); + rangesToAdd.put(new Range(range.start, start), value); + rangesToTransform.put(new Range(start, end), replaceWhenOverlapping.apply(cloneWhenSplitting.apply(value))); + rangesToAdd.put(new Range(end, range.end), cloneWhenSplitting.apply(value)); + } else if (range.start <= start && range.end <= end && range.end > start) { + // transform the right + rangesToRemove.add(range); + rangesToAdd.put(new Range(range.start, start), value); + rangesToTransform.put(new Range(start, range.end), replaceWhenOverlapping.apply(cloneWhenSplitting.apply(value))); + } else if (range.start >= start && range.end >= end && range.start < end) { + // transform the left + rangesToRemove.add(range); + rangesToTransform.put(new Range(range.start, end), replaceWhenOverlapping.apply(cloneWhenSplitting.apply(value))); + rangesToAdd.put(new Range(end, range.end), value); + } else { + // do not transform + } + } + }); + + rangesToRemove.forEach((range) -> { + ranges.remove(range); + }); + rangesToAdd.forEach((range, value) -> { + if (canAddRange(range)) { + ranges.put(range, value); + } + }); + rangesToTransform.forEach((range, value) -> { + ranges.put(range, value); + }); + } + + private boolean canAddRange(UnmodifiableRange range) { + return range.getStart() != range.getEnd(); + } + + private boolean canAddRange(Range range) { + return range.getStart() != range.getEnd(); + } + + public Object2ObjectMap getRanges() { + Object2ObjectSortedMap a = new Object2ObjectRBTreeMap<>(Comparator.comparingLong(UnmodifiableRange::getStart)); + ranges.forEach((range, value) -> a.put(range.unmodifiableClone(), value)); + return Object2ObjectMaps.unmodifiable(a); + } +} diff --git a/src/main/java/org/warp/commonutils/range/Range.java b/src/main/java/org/warp/commonutils/range/Range.java new file mode 100644 index 0000000..26093a5 --- /dev/null +++ b/src/main/java/org/warp/commonutils/range/Range.java @@ -0,0 +1,60 @@ +package org.warp.commonutils.range; + +import java.util.Objects; +import java.util.StringJoiner; +import org.warp.commonutils.error.IndexOutOfBoundsException; + +public class Range { + + public long start; + public long end; + + public Range(long start, long end) { + if (start > end) { + throw new IndexOutOfBoundsException(start, 0, end); + } + this.start = start; + this.end = end; + } + + public long getStart() { + return start; + } + + public long getEnd() { + return end; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Range range = (Range) o; + return start == range.start && end == range.end; + } + + @Override + public int hashCode() { + return Objects.hash(start, end); + } + + @Override + public String toString() { + return new StringJoiner(", ", Range.class.getSimpleName() + "[", "]").add("start=" + start).add("end=" + end) + .toString(); + } + + @SuppressWarnings("MethodDoesntCallSuperMethod") + @Override + public Range clone() { + return new Range(start, end); + } + + public UnmodifiableRange unmodifiableClone() { + return new UnmodifiableRange(start, end); + } +} diff --git a/src/main/java/org/warp/commonutils/range/Ranges.java b/src/main/java/org/warp/commonutils/range/Ranges.java new file mode 100644 index 0000000..3f140cd --- /dev/null +++ b/src/main/java/org/warp/commonutils/range/Ranges.java @@ -0,0 +1,104 @@ +package org.warp.commonutils.range; + +import it.unimi.dsi.fastutil.objects.ObjectArrayList; +import it.unimi.dsi.fastutil.objects.ObjectRBTreeSet; +import it.unimi.dsi.fastutil.objects.ObjectSortedSet; +import it.unimi.dsi.fastutil.objects.ObjectSortedSets; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import org.warp.commonutils.error.IndexOutOfBoundsException; + +public class Ranges { + + private final ObjectArrayList ranges; + + public Ranges(long start, long end) { + if (start > end) { + throw new IndexOutOfBoundsException(start, 0, end); + } + this.ranges = new ObjectArrayList<>(); + ranges.add(new Range(start, end)); + } + + public void addRange(Range range) { + addRange(range.start, range.end); + } + + public void addRange(long start, long end) { + if (start > end) { + throw new IndexOutOfBoundsException(start, 0, end); + } + long rangeStart = start; + long rangeEnd = end; + var it = ranges.iterator(); + while (it.hasNext()) { + Range range = it.next(); + if (range.start <= end && range.end >= start) { + boolean remove = false; + if (range.start < rangeStart && range.end >= rangeStart) { + rangeStart = range.start; + remove = true; + } + if (range.end > rangeEnd && range.start <= rangeEnd) { + rangeEnd = range.end; + remove = true; + } + if (remove) { + it.remove(); + } + } + } + addRangeIfNotZero(new Range(rangeStart, rangeEnd)); + } + + public void deleteRange(final long start, final long end) { + if (start > end) { + throw new IndexOutOfBoundsException(start); + } + List rangesToAdd = new ArrayList<>(ranges.size()); + var it = ranges.iterator(); + while (it.hasNext()) { + Range range = it.next(); + if (range.start <= end && range.end >= start) { + if (range.start >= start && range.end <= end) { + // delete the range + it.remove(); + } else if (range.start <= start && range.end >= end) { + // cut the hole + it.remove(); + rangesToAdd.add(new Range(range.start, start)); + rangesToAdd.add(new Range(end, range.end)); + } else if (range.start <= start && range.end <= end && range.end > start) { + // shrink the right border + it.remove(); + rangesToAdd.add(new Range(range.start, start)); + } else if (range.start >= start && range.end >= end && range.start < end) { + // shrink the left border + it.remove(); + rangesToAdd.add(new Range(end, range.end)); + } + } + } + for (Range rangeToAdd : rangesToAdd) { + addRangeIfNotZero(rangeToAdd); + } + } + + /** + * This methods does not check overlapping ranges! It's used only internally to skip empty ranges + * + * @param range + */ + private void addRangeIfNotZero(Range range) { + if (range.start != range.end) { + ranges.add(range); + } + } + + public ObjectSortedSet getRanges() { + ObjectSortedSet a = new ObjectRBTreeSet<>(Comparator.comparingLong(UnmodifiableRange::getStart)); + ranges.forEach((range) -> a.add(range.unmodifiableClone())); + return ObjectSortedSets.unmodifiable(a); + } +} diff --git a/src/main/java/org/warp/commonutils/range/UnmodifiableRange.java b/src/main/java/org/warp/commonutils/range/UnmodifiableRange.java new file mode 100644 index 0000000..bce0655 --- /dev/null +++ b/src/main/java/org/warp/commonutils/range/UnmodifiableRange.java @@ -0,0 +1,54 @@ +package org.warp.commonutils.range; + +import java.util.Objects; +import java.util.StringJoiner; +import org.warp.commonutils.error.IndexOutOfBoundsException; + +public class UnmodifiableRange { + + private final long start; + private final long end; + + public UnmodifiableRange(long start, long end) { + if (start > end) { + throw new IndexOutOfBoundsException(start, 0, end); + } + this.start = start; + this.end = end; + } + + public long getStart() { + return start; + } + + public long getEnd() { + return end; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + UnmodifiableRange that = (UnmodifiableRange) o; + return start == that.start && end == that.end; + } + + @Override + public int hashCode() { + return Objects.hash(start, end); + } + + @Override + public String toString() { + return new StringJoiner(", ", UnmodifiableRange.class.getSimpleName() + "[", "]").add("start=" + start) + .add("end=" + end).toString(); + } + + public Range toRange() { + return new Range(start, end); + } +} diff --git a/src/main/java/org/warp/commonutils/serialization/UTFUtils.java b/src/main/java/org/warp/commonutils/serialization/UTFUtils.java new file mode 100644 index 0000000..d790a63 --- /dev/null +++ b/src/main/java/org/warp/commonutils/serialization/UTFUtils.java @@ -0,0 +1,19 @@ +package org.warp.commonutils.serialization; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +public class UTFUtils { + public static final void writeUTF(DataOutputStream out, String utf) throws IOException { + byte[] bytes = utf.getBytes(StandardCharsets.UTF_8); + out.writeInt(bytes.length); + out.write(bytes); + } + + public static final String readUTF(DataInputStream in) throws IOException { + int len = in.readInt(); + return new String(in.readNBytes(len), StandardCharsets.UTF_8); + } +} diff --git a/src/main/java/org/warp/commonutils/stream/DataInputOutput.java b/src/main/java/org/warp/commonutils/stream/DataInputOutput.java new file mode 100644 index 0000000..7435511 --- /dev/null +++ b/src/main/java/org/warp/commonutils/stream/DataInputOutput.java @@ -0,0 +1,11 @@ +package org.warp.commonutils.stream; + +import java.io.DataInput; +import java.io.DataOutput; + +public interface DataInputOutput extends DataInput, DataOutput { + + DataInput getIn(); + + DataOutput getOut(); +} diff --git a/src/main/java/org/warp/commonutils/stream/DataInputOutputImpl.java b/src/main/java/org/warp/commonutils/stream/DataInputOutputImpl.java new file mode 100644 index 0000000..c567214 --- /dev/null +++ b/src/main/java/org/warp/commonutils/stream/DataInputOutputImpl.java @@ -0,0 +1,173 @@ +package org.warp.commonutils.stream; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import org.jetbrains.annotations.NotNull; + +public class DataInputOutputImpl implements DataInputOutput { + + private final DataInput in; + private final DataOutput out; + + public DataInputOutputImpl(DataInput in, DataOutput out) { + this.in = in; + this.out = out; + } + + @Override + public DataInput getIn() { + return this; + } + + @Override + public DataOutput getOut() { + return this; + } + + @Override + public void readFully(byte[] bytes) throws IOException { + in.readFully(bytes); + } + + @Override + public void readFully(byte[] bytes, int i, int i1) throws IOException { + in.readFully(bytes, i, i1); + } + + @Override + public int skipBytes(int i) throws IOException { + return in.skipBytes(i); + } + + @Override + public boolean readBoolean() throws IOException { + return in.readBoolean(); + } + + @Override + public byte readByte() throws IOException { + return in.readByte(); + } + + @Override + public int readUnsignedByte() throws IOException { + return in.readUnsignedByte(); + } + + @Override + public short readShort() throws IOException { + return in.readShort(); + } + + @Override + public int readUnsignedShort() throws IOException { + return in.readUnsignedShort(); + } + + @Override + public char readChar() throws IOException { + return in.readChar(); + } + + @Override + public int readInt() throws IOException { + return in.readInt(); + } + + @Override + public long readLong() throws IOException { + return in.readLong(); + } + + @Override + public float readFloat() throws IOException { + return in.readFloat(); + } + + @Override + public double readDouble() throws IOException { + return in.readDouble(); + } + + @Override + public String readLine() throws IOException { + return in.readLine(); + } + + @NotNull + @Override + public String readUTF() throws IOException { + return in.readUTF(); + } + + @Override + public void write(int i) throws IOException { + out.write(i); + } + + @Override + public void write(byte[] bytes) throws IOException { + out.write(bytes); + } + + @Override + public void write(byte[] bytes, int i, int i1) throws IOException { + out.write(bytes, i, i1); + } + + @Override + public void writeBoolean(boolean b) throws IOException { + out.writeBoolean(b); + } + + @Override + public void writeByte(int i) throws IOException { + out.writeByte(i); + } + + @Override + public void writeShort(int i) throws IOException { + out.writeShort(i); + } + + @Override + public void writeChar(int i) throws IOException { + out.writeChar(i); + } + + @Override + public void writeInt(int i) throws IOException { + out.writeInt(i); + } + + @Override + public void writeLong(long l) throws IOException { + out.writeLong(l); + } + + @Override + public void writeFloat(float v) throws IOException { + out.writeFloat(v); + } + + @Override + public void writeDouble(double v) throws IOException { + out.writeDouble(v); + } + + @Override + public void writeBytes(@NotNull String s) throws IOException { + out.writeBytes(s); + } + + @Override + public void writeChars(@NotNull String s) throws IOException { + out.writeChars(s); + } + + @Override + public void writeUTF(@NotNull String s) throws IOException { + out.writeUTF(s); + } +} diff --git a/src/main/java/org/warp/commonutils/stream/DataInputOutputStream.java b/src/main/java/org/warp/commonutils/stream/DataInputOutputStream.java new file mode 100644 index 0000000..5a17e7f --- /dev/null +++ b/src/main/java/org/warp/commonutils/stream/DataInputOutputStream.java @@ -0,0 +1,103 @@ +package org.warp.commonutils.stream; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import org.jetbrains.annotations.NotNull; + +public class DataInputOutputStream extends DataOutputStream implements DataInputOutput { + + private final DataInputStream in; + + public DataInputOutputStream(DataInputStream in, DataOutputStream out) { + super(out); + this.in = in; + } + + @Override + public DataInputStream getIn() { + return in; + } + + @Override + public DataOutputStream getOut() { + return this; + } + + @Override + public void readFully(byte[] bytes) throws IOException { + in.readFully(bytes); + } + + @Override + public void readFully(byte[] bytes, int i, int i1) throws IOException { + in.readFully(bytes, i, i1); + } + + @Override + public int skipBytes(int i) throws IOException { + return in.skipBytes(i); + } + + @Override + public boolean readBoolean() throws IOException { + return in.readBoolean(); + } + + @Override + public byte readByte() throws IOException { + return in.readByte(); + } + + @Override + public int readUnsignedByte() throws IOException { + return in.readUnsignedByte(); + } + + @Override + public short readShort() throws IOException { + return in.readShort(); + } + + @Override + public int readUnsignedShort() throws IOException { + return in.readUnsignedShort(); + } + + @Override + public char readChar() throws IOException { + return in.readChar(); + } + + @Override + public int readInt() throws IOException { + return in.readInt(); + } + + @Override + public long readLong() throws IOException { + return in.readLong(); + } + + @Override + public float readFloat() throws IOException { + return in.readFloat(); + } + + @Override + public double readDouble() throws IOException { + return in.readDouble(); + } + + @Deprecated + @Override + public String readLine() throws IOException { + return in.readLine(); + } + + @NotNull + @Override + public String readUTF() throws IOException { + return in.readUTF(); + } +} diff --git a/src/main/java/org/warp/commonutils/type/Bytes.java b/src/main/java/org/warp/commonutils/type/Bytes.java new file mode 100644 index 0000000..2ed35cf --- /dev/null +++ b/src/main/java/org/warp/commonutils/type/Bytes.java @@ -0,0 +1,93 @@ +package org.warp.commonutils.type; + +import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet; +import it.unimi.dsi.fastutil.objects.ObjectSets.UnmodifiableSet; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.jetbrains.annotations.NotNull; + +public class Bytes { + public final byte[] data; + + public Bytes(@NotNull byte[] data) { + this.data = data; + } + + public static Map ofMap(Map oldMap) { + var newMap = new HashMap(oldMap.size()); + oldMap.forEach((key, value) -> newMap.put(new Bytes(key), new Bytes(value))); + return newMap; + } + + public static UnmodifiableMap ofMap(UnmodifiableIterableMap oldMap) { + Bytes[] keys = new Bytes[oldMap.size()]; + Bytes[] values = new Bytes[oldMap.size()]; + IntWrapper i = new IntWrapper(0); + oldMap.forEach((key, value) -> { + keys[i.var] = new Bytes(key); + values[i.var] = new Bytes(value); + i.var++; + }); + return UnmodifiableMap.of(keys, values); + } + + public static List ofList(List oldList) { + var newList = new ArrayList(oldList.size()); + oldList.forEach((item) -> newList.add(new Bytes(item))); + return newList; + } + + public static Set ofSet(Set oldSet) { + var newSet = new ObjectOpenHashSet(oldSet.size()); + oldSet.forEach((item) -> newSet.add(new Bytes(item))); + return newSet; + } + + public static UnmodifiableIterableSet toIterableSet(UnmodifiableSet set) { + byte[][] resultItems = new byte[set.size()][]; + var it = set.iterator(); + int i = 0; + while (it.hasNext()) { + var item = it.next(); + resultItems[i] = item.data; + i++; + } + return UnmodifiableIterableSet.of(resultItems); + } + + public static byte[][] toByteArray(Collection value) { + Bytes[] valueBytesArray = value.toArray(Bytes[]::new); + byte[][] convertedResult = new byte[valueBytesArray.length][]; + for (int i = 0; i < valueBytesArray.length; i++) { + convertedResult[i] = valueBytesArray[i].data; + } + return convertedResult; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Bytes that = (Bytes) o; + return Arrays.equals(data, that.data); + } + + @Override + public int hashCode() { + return Arrays.hashCode(data); + } + + @Override + public String toString() { + return Arrays.toString(data); + } +} diff --git a/src/main/java/org/warp/commonutils/type/IntWrapper.java b/src/main/java/org/warp/commonutils/type/IntWrapper.java new file mode 100644 index 0000000..109bd49 --- /dev/null +++ b/src/main/java/org/warp/commonutils/type/IntWrapper.java @@ -0,0 +1,10 @@ +package org.warp.commonutils.type; + +public class IntWrapper { + + public int var; + + public IntWrapper(int value) { + this.var = value; + } +} diff --git a/src/main/java/org/warp/commonutils/type/ShortNamedThreadFactory.java b/src/main/java/org/warp/commonutils/type/ShortNamedThreadFactory.java new file mode 100644 index 0000000..595c423 --- /dev/null +++ b/src/main/java/org/warp/commonutils/type/ShortNamedThreadFactory.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.warp.commonutils.type; + + +import java.util.Locale; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A default {@link ThreadFactory} implementation that accepts the name prefix + * of the created threads as a constructor argument. Otherwise, this factory + * yields the same semantics as the thread factory returned by + * {@link Executors#defaultThreadFactory()}. + */ +public class ShortNamedThreadFactory implements ThreadFactory { + + private static int POOL_NUMBERS_COUNT = 50; + private static final AtomicInteger[] threadPoolNumber = new AtomicInteger[POOL_NUMBERS_COUNT]; + static { + for (int i = 0; i < threadPoolNumber.length; i++) { + threadPoolNumber[i] = new AtomicInteger(1); + } + } + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + private static final String NAME_PATTERN = "%s-%d"; + private final String threadNamePrefix; + + /** + * Creates a new {@link ShortNamedThreadFactory} instance + * + * @param threadNamePrefix the name prefix assigned to each thread created. + */ + public ShortNamedThreadFactory(String threadNamePrefix) { + final SecurityManager s = System.getSecurityManager(); + group = (s != null) ? s.getThreadGroup() : Thread.currentThread() + .getThreadGroup(); + this.threadNamePrefix = String.format(Locale.ROOT, NAME_PATTERN, + checkPrefix(threadNamePrefix), threadPoolNumber[(threadNamePrefix.hashCode() % POOL_NUMBERS_COUNT / 2) + POOL_NUMBERS_COUNT / 2].getAndIncrement()); + } + + private static String checkPrefix(String prefix) { + return prefix == null || prefix.length() == 0 ? "Unnamed" : prefix; + } + + /** + * Creates a new {@link Thread} + * + * @see java.util.concurrent.ThreadFactory#newThread(java.lang.Runnable) + */ + @Override + public Thread newThread(Runnable r) { + final Thread t = new Thread(group, r, String.format(Locale.ROOT, "%s-%d", + this.threadNamePrefix, threadNumber.getAndIncrement()), 0); + t.setDaemon(false); + t.setPriority(Thread.NORM_PRIORITY); + return t; + } + +} diff --git a/src/main/java/org/warp/commonutils/type/UnmodifiableIterableMap.java b/src/main/java/org/warp/commonutils/type/UnmodifiableIterableMap.java new file mode 100644 index 0000000..83fe3c5 --- /dev/null +++ b/src/main/java/org/warp/commonutils/type/UnmodifiableIterableMap.java @@ -0,0 +1,202 @@ +package org.warp.commonutils.type; + +import com.google.common.collect.Streams; +import it.unimi.dsi.fastutil.objects.Object2ObjectMaps; +import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap; +import java.lang.reflect.Array; +import java.util.ConcurrentModificationException; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NoSuchElementException; +import java.util.function.BiConsumer; +import java.util.function.IntFunction; +import java.util.stream.Stream; +import org.jetbrains.annotations.NotNull; + +public interface UnmodifiableIterableMap extends Iterable> { + + /** + * Returns the number of key-value mappings in this map. If the + * map contains more than {@code Integer.MAX_VALUE} elements, returns + * {@code Integer.MAX_VALUE}. + * + * @return the number of key-value mappings in this map + */ + int size(); + + /** + * Returns {@code true} if this map contains no key-value mappings. + * + * @return {@code true} if this map contains no key-value mappings + */ + boolean isEmpty(); + + /** + * Performs the given action for each entry in this map until all entries + * have been processed or the action throws an exception. Unless + * otherwise specified by the implementing class, actions are performed in + * the order of entry set iteration (if an iteration order is specified.) + * Exceptions thrown by the action are relayed to the caller. + * + * @implSpec + * The default implementation is equivalent to, for this {@code map}: + *

     {@code
    +	 * for (Map.Entry entry : map.entrySet())
    +	 *     action.accept(entry.getKey(), entry.getValue());
    +	 * }
    + * + * The default implementation makes no guarantees about synchronization + * or atomicity properties of this method. Any implementation providing + * atomicity guarantees must override this method and document its + * concurrency properties. + * + * @param action The action to be performed for each entry + * @throws NullPointerException if the specified action is null + * @throws ConcurrentModificationException if an entry is found to be + * removed during iteration + * @since 1.8 + */ + void forEach(BiConsumer action); + + Map toUnmodifiableMap(); + + Stream> stream(); + + UnmodifiableIterableSet toUnmodifiableIterableKeysSet(IntFunction generator); + + @SuppressWarnings("SuspiciousSystemArraycopy") + static UnmodifiableIterableMap ofObjects(Object[] keys, Object[] values) { + if (keys == null || values == null || (keys.length == 0 && values.length == 0)) { + return UnmodifiableIterableMap.of(null, null); + } else if (keys.length == values.length) { + //noinspection unchecked + K[] keysArray = (K[]) Array.newInstance(keys[0].getClass(), keys.length); + System.arraycopy(keys, 0, keysArray, 0, keys.length); + //noinspection unchecked + V[] valuesArray = (V[]) Array.newInstance(values[0].getClass(), keys.length); + System.arraycopy(values, 0, valuesArray, 0, values.length); + return UnmodifiableIterableMap.of(keysArray, valuesArray); + } else { + throw new IllegalArgumentException("The number of keys doesn't match the number of values."); + } + } + + static UnmodifiableIterableMap of(K[] keys, V[] values) { + int keysSize = (keys != null) ? keys.length : 0; + int valuesSize = (values != null) ? values.length : 0; + + if (keysSize == 0 && valuesSize == 0) { + // return mutable map + return new EmptyUnmodifiableIterableMap<>(); + } + + if (keysSize != valuesSize) { + throw new IllegalArgumentException("The number of keys doesn't match the number of values."); + } + + return new ArrayUnmodifiableIterableMap<>(keys, values, keysSize); + } + + class EmptyUnmodifiableIterableMap implements UnmodifiableIterableMap { + + private EmptyUnmodifiableIterableMap() {} + + @NotNull + @Override + public Iterator> iterator() { + return new Iterator<>() { + @Override + public boolean hasNext() { + return false; + } + + @Override + public Entry next() { + throw new NoSuchElementException(); + } + }; + } + + @Override + public int size() { + return 0; + } + + @Override + public boolean isEmpty() { + return true; + } + + @Override + public void forEach(BiConsumer action) {} + + @Override + public Map toUnmodifiableMap() { + //noinspection unchecked + return Object2ObjectMaps.EMPTY_MAP; + } + + @Override + public Stream> stream() { + return Stream.empty(); + } + + @Override + public UnmodifiableIterableSet toUnmodifiableIterableKeysSet(IntFunction generator) { + return UnmodifiableIterableSet.of(null); + } + } + + class ArrayUnmodifiableIterableMap implements UnmodifiableIterableMap { + + private final K[] keys; + private final V[] values; + private final int keysSize; + + private ArrayUnmodifiableIterableMap(K[] keys, V[] values, int keysSize) { + this.keys = keys; + this.values = values; + this.keysSize = keysSize; + } + + @NotNull + @Override + public Iterator> iterator() { + return new Object2ObjectOpenHashMap(keys, values, 1.0f).entrySet().iterator(); + } + + @Override + public int size() { + return keysSize; + } + + @Override + public boolean isEmpty() { + return false; + } + + @Override + public void forEach(BiConsumer action) { + for (int i = 0; i < keys.length; i++) { + action.accept(keys[i], values[i]); + } + } + + @Override + public Map toUnmodifiableMap() { + return Object2ObjectMaps.unmodifiable(new Object2ObjectOpenHashMap<>(keys, values, 1.0f)); + } + + @Override + public Stream> stream() { + //noinspection UnstableApiUsage + return Streams.zip(Stream.of(keys), Stream.of(values), Map::entry); + } + + @Override + public UnmodifiableIterableSet toUnmodifiableIterableKeysSet(IntFunction generator) { + return UnmodifiableIterableSet.of(keys); + } + } +} diff --git a/src/main/java/org/warp/commonutils/type/UnmodifiableIterableSet.java b/src/main/java/org/warp/commonutils/type/UnmodifiableIterableSet.java new file mode 100644 index 0000000..25d58bc --- /dev/null +++ b/src/main/java/org/warp/commonutils/type/UnmodifiableIterableSet.java @@ -0,0 +1,195 @@ +package org.warp.commonutils.type; + +import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet; +import it.unimi.dsi.fastutil.objects.ObjectSets; +import java.util.Collections; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.IntFunction; +import org.jetbrains.annotations.NotNull; + +public interface UnmodifiableIterableSet extends Iterable { + + int size(); + + boolean isEmpty(); + + void forEach(Consumer action); + + Set toUnmodifiableSet(); + + UnmodifiableIterableMap toUnmodifiableIterableMapSetValues(V[] values); + + UnmodifiableIterableMap toUnmodifiableIterableMapSetKeys(K2[] keys); + + UnmodifiableMap toUnmodifiableMapSetValues(V[] values); + + UnmodifiableMap toUnmodifiableMapSetKeys(K2[] keys); + + static UnmodifiableIterableSet of(K[] items) { + int keysSize = (items != null) ? items.length : 0; + + if (keysSize == 0) { + // return mutable map + return new UnmodifiableIterableSet() { + @NotNull + @Override + public Iterator iterator() { + return new Iterator<>() { + @Override + public boolean hasNext() { + return false; + } + + @Override + public K next() { + throw new NoSuchElementException(); + } + }; + } + + @Override + public int size() { + return 0; + } + + @Override + public boolean isEmpty() { + return true; + } + + @Override + public void forEach(Consumer action) {} + + @Override + public Set toUnmodifiableSet() { + //noinspection unchecked + return ObjectSets.EMPTY_SET; + } + + @Override + public UnmodifiableIterableMap toUnmodifiableIterableMapSetValues(V[] values) { + return UnmodifiableIterableMap.of(null, values); + } + + @Override + public UnmodifiableIterableMap toUnmodifiableIterableMapSetKeys(K2[] keys) { + return UnmodifiableIterableMap.of(keys, null); + } + + @Override + public UnmodifiableMap toUnmodifiableMapSetValues(V[] values) { + return UnmodifiableMap.of(null, values); + } + + @Override + public UnmodifiableMap toUnmodifiableMapSetKeys(K2[] keys) { + return UnmodifiableMap.of(keys, null); + } + }; + } + + return new UnmodifiableIterableSet() { + @Override + public int size() { + return keysSize; + } + + @Override + public boolean isEmpty() { + return false; + } + + @Override + public void forEach(Consumer action) { + for (int i = 0; i < items.length; i++) { + action.accept(items[i]); + } + } + + @Override + public Set toUnmodifiableSet() { + return ObjectSets.unmodifiable(new ObjectOpenHashSet<>(items, 1.0f)); + } + + @Override + public UnmodifiableIterableMap toUnmodifiableIterableMapSetValues(V[] values) { + return UnmodifiableIterableMap.of(items, values); + } + + @Override + public UnmodifiableIterableMap toUnmodifiableIterableMapSetKeys(K2[] keys) { + return UnmodifiableIterableMap.of(keys, items); + } + + @Override + public UnmodifiableMap toUnmodifiableMapSetValues(V[] values) { + return UnmodifiableMap.of(items, values); + } + + @Override + public UnmodifiableMap toUnmodifiableMapSetKeys(K2[] keys) { + return UnmodifiableMap.of(keys, items); + } + + @NotNull + @Override + public Iterator iterator() { + return new ObjectOpenHashSet(items, 1.0f).iterator(); + } + }; + } + + static UnmodifiableIterableSet of(Set items, IntFunction generator) { + + return new UnmodifiableIterableSet() { + @Override + public int size() { + return items.size(); + } + + @Override + public boolean isEmpty() { + return items.isEmpty(); + } + + @Override + public void forEach(Consumer action) { + items.forEach(action); + } + + @Override + public Set toUnmodifiableSet() { + return Collections.unmodifiableSet(items); + } + + @Override + public UnmodifiableIterableMap toUnmodifiableIterableMapSetValues(V[] values) { + return UnmodifiableIterableMap.of(items.toArray(generator), values); + } + + @Override + public UnmodifiableIterableMap toUnmodifiableIterableMapSetKeys(K2[] keys) { + return UnmodifiableIterableMap.of(keys, items.toArray(generator)); + } + + @Override + public UnmodifiableMap toUnmodifiableMapSetValues(V[] values) { + return UnmodifiableMap.of(items.toArray(generator), values); + } + + @Override + public UnmodifiableMap toUnmodifiableMapSetKeys(K2[] keys) { + return UnmodifiableMap.of(keys, items.toArray(generator)); + } + + @NotNull + @Override + public Iterator iterator() { + return items.iterator(); + } + }; + } +} diff --git a/src/main/java/org/warp/commonutils/type/UnmodifiableMap.java b/src/main/java/org/warp/commonutils/type/UnmodifiableMap.java new file mode 100644 index 0000000..a0e86fb --- /dev/null +++ b/src/main/java/org/warp/commonutils/type/UnmodifiableMap.java @@ -0,0 +1,317 @@ +package org.warp.commonutils.type; + +import it.unimi.dsi.fastutil.objects.Object2ObjectMap; +import it.unimi.dsi.fastutil.objects.Object2ObjectMaps; +import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap; +import it.unimi.dsi.fastutil.objects.ObjectIterator; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NoSuchElementException; +import java.util.function.BiConsumer; +import java.util.function.IntFunction; +import java.util.stream.Stream; +import org.jetbrains.annotations.NotNull; + +public interface UnmodifiableMap extends UnmodifiableIterableMap { + + /** + * Returns {@code true} if this map contains a mapping for the specified + * key. More formally, returns {@code true} if and only if + * this map contains a mapping for a key {@code k} such that + * {@code Objects.equals(key, k)}. (There can be + * at most one such mapping.) + * + * @param key key whose presence in this map is to be tested + * @return {@code true} if this map contains a mapping for the specified + * key + * @throws ClassCastException if the key is of an inappropriate type for + * this map + * (optional) + * @throws NullPointerException if the specified key is null and this map + * does not permit null keys + * (optional) + */ + boolean containsKey(Object key); + + /** + * Returns the value to which the specified key is mapped, + * or {@code null} if this map contains no mapping for the key. + * + *

    More formally, if this map contains a mapping from a key + * {@code k} to a value {@code v} such that + * {@code Objects.equals(key, k)}, + * then this method returns {@code v}; otherwise + * it returns {@code null}. (There can be at most one such mapping.) + * + *

    If this map permits null values, then a return value of + * {@code null} does not necessarily indicate that the map + * contains no mapping for the key; it's also possible that the map + * explicitly maps the key to {@code null}. The {@link #containsKey + * containsKey} operation may be used to distinguish these two cases. + * + * @param key the key whose associated value is to be returned + * @return the value to which the specified key is mapped, or + * {@code null} if this map contains no mapping for the key + * @throws ClassCastException if the key is of an inappropriate type for + * this map + * (optional) + * @throws NullPointerException if the specified key is null and this map + * does not permit null keys + * (optional) + */ + V get(Object key); + + /** + * Returns the value to which the specified key is mapped, or + * {@code defaultValue} if this map contains no mapping for the key. + * + * @implSpec + * The default implementation makes no guarantees about synchronization + * or atomicity properties of this method. Any implementation providing + * atomicity guarantees must override this method and document its + * concurrency properties. + * + * @param key the key whose associated value is to be returned + * @param defaultValue the default mapping of the key + * @return the value to which the specified key is mapped, or + * {@code defaultValue} if this map contains no mapping for the key + * @throws ClassCastException if the key is of an inappropriate type for + * this map + * (optional) + * @throws NullPointerException if the specified key is null and this map + * does not permit null keys + * (optional) + * @since 1.8 + */ + default V getOrDefault(Object key, V defaultValue) { + V v; + return (((v = get(key)) != null) || containsKey(key)) + ? v + : defaultValue; + } + + @NotNull + ObjectIterator> fastIterator(); + + /** + * Performs the given action for each entry in this map until all entries + * have been processed or the action throws an exception. Unless + * otherwise specified by the implementing class, actions are performed in + * the order of entry set iteration (if an iteration order is specified.) + * Exceptions thrown by the action are relayed to the caller. + * + * @implSpec + * The default implementation is equivalent to, for this {@code map}: + *

     {@code
    +	 * for (Map.Entry entry : map.entrySet())
    +	 *     action.accept(entry.getKey(), entry.getValue());
    +	 * }
    + * + * The default implementation makes no guarantees about synchronization + * or atomicity properties of this method. Any implementation providing + * atomicity guarantees must override this method and document its + * concurrency properties. + * + * @param action The action to be performed for each entry + * @throws NullPointerException if the specified action is null + * @throws ConcurrentModificationException if an entry is found to be + * removed during iteration + * @since 1.8 + */ + void forEach(BiConsumer action); + + static UnmodifiableMap of(K[] keys, V[] values) { + int keysSize = (keys != null) ? keys.length : 0; + int valuesSize = (values != null) ? values.length : 0; + + if (keysSize == 0 && valuesSize == 0) { + // return mutable map + return new EmptyUnmodifiableMap<>(); + } + + return new MappedUnmodifiableMap<>(new Object2ObjectOpenHashMap<>(keys, values, 1.0f)); + } + + static UnmodifiableMap of(Map map) { + return new MappedUnmodifiableMap(map); + } + + class EmptyUnmodifiableMap implements UnmodifiableMap { + + private EmptyUnmodifiableMap() {} + + @Override + public int size() { + return 0; + } + + @Override + public boolean isEmpty() { + return true; + } + + @Override + public boolean containsKey(Object key) { + return false; + } + + @Override + public V get(Object key) { + return null; + } + + @Override + public void forEach(BiConsumer action) { + + } + + @NotNull + @Override + public Iterator> iterator() { + return new Iterator<>() { + @Override + public boolean hasNext() { + return false; + } + + @Override + public Entry next() { + throw new NoSuchElementException(); + } + }; + } + + @NotNull + @Override + public ObjectIterator> fastIterator() { + return new ObjectIterator<>() { + @Override + public boolean hasNext() { + return false; + } + + @Override + public Object2ObjectMap.Entry next() { + throw new NoSuchElementException(); + } + }; + } + + @Override + public Map toUnmodifiableMap() { + //noinspection unchecked + return Object2ObjectMaps.EMPTY_MAP; + } + + @Override + public Stream> stream() { + return Stream.empty(); + } + + @Override + public UnmodifiableIterableSet toUnmodifiableIterableKeysSet(IntFunction generator) { + return UnmodifiableIterableSet.of(null); + } + } + + class MappedUnmodifiableMap implements UnmodifiableMap { + + private final Map map; + + private MappedUnmodifiableMap(@NotNull Map map) { + this.map = map; + } + + @Override + public int size() { + return map.size(); + } + + @Override + public boolean isEmpty() { + return map.isEmpty(); + } + + @Override + public boolean containsKey(Object key) { + return map.containsKey(key); + } + + @Override + public V get(Object key) { + return map.get(key); + } + + @Override + public void forEach(BiConsumer action) { + map.forEach(action); + } + + @NotNull + @Override + public Iterator> iterator() { + return map.entrySet().iterator(); + } + + @NotNull + @Override + public ObjectIterator> fastIterator() { + if (map instanceof Object2ObjectMap) { + return Object2ObjectMaps.fastIterator((Object2ObjectMap) map); + } else { + var iterator = map.entrySet().iterator(); + var reusableEntry = new Object2ObjectMap.Entry() { + private K key; + private V val; + @Override + public K getKey() { + return key; + } + + @Override + public V getValue() { + return val; + } + + @Override + public V setValue(V value) { + throw new UnsupportedOperationException(); + } + }; + return new ObjectIterator<>() { + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public Object2ObjectMap.Entry next() { + var next = iterator.next(); + reusableEntry.key = next.getKey(); + reusableEntry.val = next.getValue(); + return reusableEntry; + } + }; + } + } + + @Override + public Map toUnmodifiableMap() { + return Collections.unmodifiableMap(map); + } + + @Override + public Stream> stream() { + return map.entrySet().stream(); + } + + @Override + public UnmodifiableIterableSet toUnmodifiableIterableKeysSet(IntFunction generator) { + return UnmodifiableIterableSet.of(map.keySet().toArray(generator)); + } + } +} diff --git a/src/main/java/org/warp/commonutils/type/VariableWrapper.java b/src/main/java/org/warp/commonutils/type/VariableWrapper.java new file mode 100644 index 0000000..b0b5857 --- /dev/null +++ b/src/main/java/org/warp/commonutils/type/VariableWrapper.java @@ -0,0 +1,10 @@ +package org.warp.commonutils.type; + +public class VariableWrapper { + + public volatile T var; + + public VariableWrapper(T value) { + this.var = value; + } +} diff --git a/src/test/java/org/warp/commonutils/functional/TestGenericExceptions.java b/src/test/java/org/warp/commonutils/functional/TestGenericExceptions.java new file mode 100644 index 0000000..8ad0982 --- /dev/null +++ b/src/test/java/org/warp/commonutils/functional/TestGenericExceptions.java @@ -0,0 +1,40 @@ +package org.warp.commonutils.functional; + +import java.io.IOException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.warp.commonutils.functional.Unchecked.UncheckedConsumer; + +public class TestGenericExceptions { + + @Test + public void testGenericExceptions() { + testFunction((number) -> { + Assertions.assertEquals(number, 1); + }).done(); + + boolean thrown = false; + try { + testFunction((number) -> { + throw new IOException("Test"); + }).throwException(IOException.class); + } catch (IOException e) { + thrown = true; + } + Assertions.assertEquals(true, thrown, "IOException not thrown"); + + boolean thrown2 = false; + try { + testFunction((number) -> { + throw new IOException("Test"); + }).throwException(Exception.class); + } catch (Exception e) { + thrown2 = true; + } + Assertions.assertEquals(true, thrown2, "Exception not thrown"); + } + + private UncheckedResult testFunction(UncheckedConsumer uncheckedConsumer) { + return Unchecked.wrap(uncheckedConsumer).apply(1); + } +} diff --git a/src/test/java/org/warp/commonutils/functional/org/warp/commonutils/locks/LeftRightLockTest.java b/src/test/java/org/warp/commonutils/functional/org/warp/commonutils/locks/LeftRightLockTest.java new file mode 100644 index 0000000..64a6647 --- /dev/null +++ b/src/test/java/org/warp/commonutils/functional/org/warp/commonutils/locks/LeftRightLockTest.java @@ -0,0 +1,185 @@ +package org.warp.commonutils.functional.org.warp.commonutils.locks; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.warp.commonutils.locks.LeftRightLock; + +public class LeftRightLockTest { + + int logLineSequenceNumber = 0; + private LeftRightLock sut = new LeftRightLock(); + + @Timeout(2000) + @Test() + public void acquiringLeftLockExcludeAcquiringRightLock() throws Exception { + sut.lockLeft(); + + + Future task = Executors.newSingleThreadExecutor().submit(() -> sut.tryLockRight()); + assertFalse(task.get(), "I shouldn't be able to acquire the RIGHT lock!"); + } + + @Timeout(2000) + @Test() + public void acquiringRightLockExcludeAcquiringLeftLock() throws Exception { + sut.lockRight(); + Future task = Executors.newSingleThreadExecutor().submit(() -> sut.tryLockLeft()); + assertFalse(task.get(), "I shouldn't be able to acquire the LEFT lock!"); + } + + @Timeout(2000) + @Test() + public void theLockShouldBeReentrant() throws Exception { + sut.lockLeft(); + assertTrue(sut.tryLockLeft()); + } + + @Timeout(2000) + @Test() + public void multipleThreadShouldBeAbleToAcquireTheSameLock_Right() throws Exception { + sut.lockRight(); + Future task = Executors.newSingleThreadExecutor().submit(() -> sut.tryLockRight()); + assertTrue(task.get()); + } + + @Timeout(2000) + @Test() + public void multipleThreadShouldBeAbleToAcquireTheSameLock_left() throws Exception { + sut.lockLeft(); + Future task = Executors.newSingleThreadExecutor().submit(() -> sut.tryLockLeft()); + assertTrue(task.get()); + } + + @Timeout(2000) + @Test() + public void shouldKeepCountOfAllTheThreadsHoldingTheSide() throws Exception { + + CountDownLatch latchA = new CountDownLatch(1); + CountDownLatch latchB = new CountDownLatch(1); + + + Thread threadA = spawnThreadToAcquireLeftLock(latchA, sut); + Thread threadB = spawnThreadToAcquireLeftLock(latchB, sut); + + System.out.println("Both threads have acquired the left lock."); + + try { + latchA.countDown(); + threadA.join(); + boolean acqStatus = sut.tryLockRight(); + System.out.println("The right lock was " + (acqStatus ? "" : "not") + " acquired"); + assertFalse(acqStatus, "There is still a thread holding the left lock. This shouldn't succeed."); + } finally { + latchB.countDown(); + threadB.join(); + } + + } + + @Timeout(2000) + @Test() + public void shouldBlockThreadsTryingToAcquireLeftIfRightIsHeld() throws Exception { + sut.lockLeft(); + + CountDownLatch taskStartedLatch = new CountDownLatch(1); + + final Future task = Executors.newSingleThreadExecutor().submit(() -> { + taskStartedLatch.countDown(); + sut.lockRight(); + return false; + }); + + taskStartedLatch.await(); + Thread.sleep(100); + + assertFalse(task.isDone()); + } + + @Test + public void shouldBeFreeAfterRelease() throws Exception { + sut.lockLeft(); + sut.releaseLeft(); + assertTrue(sut.tryLockRight()); + } + + @Test + public void shouldBeFreeAfterMultipleThreadsReleaseIt() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + + final Thread thread1 = spawnThreadToAcquireLeftLock(latch, sut); + final Thread thread2 = spawnThreadToAcquireLeftLock(latch, sut); + + latch.countDown(); + + thread1.join(); + thread2.join(); + + assertTrue(sut.tryLockRight()); + + } + + @Timeout(2000) + @Test() + public void lockShouldBeReleasedIfNoThreadIsHoldingIt() throws Exception { + CountDownLatch releaseLeftLatch = new CountDownLatch(1); + CountDownLatch rightLockTaskIsRunning = new CountDownLatch(1); + + Thread leftLockThread1 = spawnThreadToAcquireLeftLock(releaseLeftLatch, sut); + Thread leftLockThread2 = spawnThreadToAcquireLeftLock(releaseLeftLatch, sut); + + Future acquireRightLockTask = Executors.newSingleThreadExecutor().submit(() -> { + if (sut.tryLockRight()) + throw new AssertionError("The left lock should be still held, I shouldn't be able to acquire right a this point."); + printSynchronously("Going to be blocked on right lock"); + rightLockTaskIsRunning.countDown(); + sut.lockRight(); + printSynchronously("Lock acquired!"); + return true; + }); + + rightLockTaskIsRunning.await(); + + releaseLeftLatch.countDown(); + leftLockThread1.join(); + leftLockThread2.join(); + + assertTrue(acquireRightLockTask.get()); + } + + private synchronized void printSynchronously(String str) { + + System.out.println(logLineSequenceNumber++ + ")" + str); + System.out.flush(); + } + + private Thread spawnThreadToAcquireLeftLock(CountDownLatch releaseLockLatch, LeftRightLock lock) throws InterruptedException { + CountDownLatch lockAcquiredLatch = new CountDownLatch(1); + final Thread thread = spawnThreadToAcquireLeftLock(releaseLockLatch, lockAcquiredLatch, lock); + lockAcquiredLatch.await(); + return thread; + } + + private Thread spawnThreadToAcquireLeftLock(CountDownLatch releaseLockLatch, CountDownLatch lockAcquiredLatch, LeftRightLock lock) { + final Thread thread = new Thread(() -> { + lock.lockLeft(); + printSynchronously("Thread " + Thread.currentThread() + " Acquired left lock"); + try { + lockAcquiredLatch.countDown(); + releaseLockLatch.await(); + } catch (InterruptedException ignore) { + } finally { + lock.releaseLeft(); + } + + printSynchronously("Thread " + Thread.currentThread() + " RELEASED left lock"); + }); + thread.start(); + return thread; + } +} \ No newline at end of file