From fbabebbf1366026f3fd858025bbf8ada613ce88c Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Fri, 17 Jul 2020 16:56:44 +0200 Subject: [PATCH] Added BoundedExecutor (not service) --- .../concurrency/executor/BoundedExecutor.java | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutor.java diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutor.java b/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutor.java new file mode 100644 index 0000000..dab2ce9 --- /dev/null +++ b/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutor.java @@ -0,0 +1,52 @@ +package org.warp.commonutils.concurrency.executor; + +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.locks.StampedLock; +import org.jetbrains.annotations.NotNull; + +public class BoundedExecutor { + + private final Executor executor; + private final int maxQueueSize; + private final Semaphore semaphore; + private final StampedLock drainAllLock = new StampedLock(); + + public BoundedExecutor(Executor executor, int maxQueueSize) { + this.executor = executor; + this.maxQueueSize = maxQueueSize > 0 ? maxQueueSize : Integer.MAX_VALUE; + this.semaphore = new Semaphore(maxQueueSize); + } + + public void executeButBlockIfFull(@NotNull Runnable command) throws RejectedExecutionException, InterruptedException { + var drainAllLockRead = drainAllLock.readLockInterruptibly(); + semaphore.acquire(); + try { + executor.execute(() -> { + try { + semaphore.release(); + command.run(); + } finally { + drainAllLock.unlockRead(drainAllLockRead); + } + }); + } catch (RejectedExecutionException | NullPointerException ex) { + drainAllLock.unlockRead(drainAllLockRead); + throw ex; + } + } + + public void drainAll(DrainAllMethodLambda runnableWhenDrained) throws InterruptedException { + var drainAllWriteLock = drainAllLock.writeLockInterruptibly(); + try { + runnableWhenDrained.run(); + } finally { + drainAllLock.unlockWrite(drainAllWriteLock); + } + } + + public interface DrainAllMethodLambda { + void run() throws InterruptedException; + } +}