diff --git a/pom.xml b/pom.xml index f49497d..bc2a08b 100644 --- a/pom.xml +++ b/pom.xml @@ -29,6 +29,11 @@ 17.0.0 compile + + org.warp + common-utils + 1.0.0 + diff --git a/src/main/java/it/ernytech/tdlib/utils/BoundedExecutorService.java b/src/main/java/it/ernytech/tdlib/utils/BoundedExecutorService.java deleted file mode 100644 index f8dee63..0000000 --- a/src/main/java/it/ernytech/tdlib/utils/BoundedExecutorService.java +++ /dev/null @@ -1,37 +0,0 @@ -package it.ernytech.tdlib.utils; - -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.function.BiConsumer; -import org.jetbrains.annotations.Nullable; - -public interface BoundedExecutorService extends ExecutorService { - - @Deprecated - static BoundedExecutorService create(int maxQueueSize, - int corePoolSize, - int maxPoolSize, - long keepAliveTime, - TimeUnit unit, - @Nullable BiConsumer queueSizeStatus) { - return new BoundedExecutorServiceImpl(maxQueueSize, corePoolSize, maxPoolSize, keepAliveTime, unit, - Executors.defaultThreadFactory(), queueSizeStatus); - } - static BoundedExecutorService create(int maxQueueSize, - int corePoolSize, - int maxPoolSize, - long keepAliveTime, - TimeUnit unit, - ThreadFactory threadFactory, - @Nullable BiConsumer queueSizeStatus) { - return new BoundedExecutorServiceImpl(maxQueueSize, corePoolSize, maxPoolSize, keepAliveTime, unit, threadFactory, queueSizeStatus); - } - - Future submitButBlockIfFull(Callable task) throws InterruptedException; - - void executeButBlockIfFull(Runnable task) throws InterruptedException; -} diff --git a/src/main/java/it/ernytech/tdlib/utils/BoundedExecutorServiceImpl.java b/src/main/java/it/ernytech/tdlib/utils/BoundedExecutorServiceImpl.java deleted file mode 100644 index 09efe04..0000000 --- a/src/main/java/it/ernytech/tdlib/utils/BoundedExecutorServiceImpl.java +++ /dev/null @@ -1,81 +0,0 @@ -package it.ernytech.tdlib.utils; - -import org.jetbrains.annotations.Nullable; - -import java.util.concurrent.*; -import java.util.function.BiConsumer; - -class BoundedExecutorServiceImpl extends ThreadPoolExecutor implements BoundedExecutorService { - - private final Semaphore semaphore; - private final @Nullable BiConsumer queueSizeStatus; - private final int maxQueueSize; - private final Object queueSizeStatusLock = new Object(); - - /** - * - * @param maxQueueSize - * @param corePoolSize - * @param maxPoolSize - * @param keepAliveTime - * @param unit - * @param queueSizeStatus Status. The boolean indicates if the queue is full, the integer indicates the current queue size - */ - public BoundedExecutorServiceImpl(int maxQueueSize, - int corePoolSize, - int maxPoolSize, - long keepAliveTime, - TimeUnit unit, - ThreadFactory threadFactory, - @Nullable BiConsumer queueSizeStatus) { - super(corePoolSize, maxPoolSize, keepAliveTime, unit, new LinkedBlockingQueue<>(), threadFactory); - if (maxQueueSize < 0) { - throw new IllegalArgumentException(); - } - this.maxQueueSize = maxQueueSize; - this.queueSizeStatus = queueSizeStatus; - semaphore = new Semaphore(maxQueueSize); - } - - /** - * Submits task to execution pool, but blocks while number of running threads - * has reached the bound limit - */ - @Override - public Future submitButBlockIfFull(final Callable task) throws InterruptedException { - blockIfFull(); - return submit(task); - } - - /** - * Submits task to execution pool, but blocks while number of running threads - * has reached the bound limit - */ - @Override - public void executeButBlockIfFull(final Runnable task) throws InterruptedException { - blockIfFull(); - execute(task); - } - - private void blockIfFull() throws InterruptedException { - if (semaphore.availablePermits() == 0) { - synchronized (queueSizeStatusLock) { - if (queueSizeStatus != null) queueSizeStatus.accept(true, maxQueueSize + (semaphore.hasQueuedThreads() ? semaphore.getQueueLength() : 0)); - } - } - semaphore.acquire(); - } - - @Override - public void beforeExecute(Thread t, Runnable r) { - - var queueSize = getQueue().size(); - synchronized (queueSizeStatusLock) { - if (queueSizeStatus != null) queueSizeStatus.accept(queueSize >= maxQueueSize, queueSize); - } - - semaphore.release(); - - super.beforeExecute(t, r); - } -} \ No newline at end of file