From a15a8c341d9df27ce65ed1391219af62e1cd9567 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Fri, 1 May 2020 00:31:27 +0200 Subject: [PATCH] Bounded executor service --- src/main/java/it/ernytech/tdlib/Client.java | 10 +++++--- .../tdlib/utils/BoundedExecutorService.java | 24 +++++++++++++++++++ ...r.java => BoundedExecutorServiceImpl.java} | 9 +++---- 3 files changed, 36 insertions(+), 7 deletions(-) create mode 100644 src/main/java/it/ernytech/tdlib/utils/BoundedExecutorService.java rename src/main/java/it/ernytech/tdlib/utils/{BoundedExecutor.java => BoundedExecutorServiceImpl.java} (90%) diff --git a/src/main/java/it/ernytech/tdlib/Client.java b/src/main/java/it/ernytech/tdlib/Client.java index 0f2c8e3..36690ad 100644 --- a/src/main/java/it/ernytech/tdlib/Client.java +++ b/src/main/java/it/ernytech/tdlib/Client.java @@ -83,9 +83,13 @@ public class Client { throw new IllegalThreadStateException("Thread: " + Thread.currentThread().getName() + " trying receive incoming updates but shouldn't be called simultaneously from two different threads!"); } - this.receiveLock.lock(); - var resultSize = nativeClientReceive(this.clientId, eventIds, events, timeout); - this.receiveLock.unlock(); + int resultSize; + this.receiveLock.lock(); + try { + resultSize = nativeClientReceive(this.clientId, eventIds, events, timeout); + } finally { + this.receiveLock.unlock(); + } for (int i = 0; i < resultSize; i++) { responseList.add(new Response(eventIds[i], events[i])); diff --git a/src/main/java/it/ernytech/tdlib/utils/BoundedExecutorService.java b/src/main/java/it/ernytech/tdlib/utils/BoundedExecutorService.java new file mode 100644 index 0000000..793c025 --- /dev/null +++ b/src/main/java/it/ernytech/tdlib/utils/BoundedExecutorService.java @@ -0,0 +1,24 @@ +package it.ernytech.tdlib.utils; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import org.jetbrains.annotations.Nullable; + +public interface BoundedExecutorService extends ExecutorService { + + static BoundedExecutorService create(int maxQueueSize, + int corePoolSize, + int maxPoolSize, + long keepAliveTime, + TimeUnit unit, + @Nullable BiConsumer queueSizeStatus) { + return new BoundedExecutorServiceImpl(maxQueueSize, corePoolSize, maxPoolSize, keepAliveTime, unit, queueSizeStatus); + } + + Future submitButBlockIfFull(Callable task) throws InterruptedException; + + void executeButBlockIfFull(Runnable task) throws InterruptedException; +} diff --git a/src/main/java/it/ernytech/tdlib/utils/BoundedExecutor.java b/src/main/java/it/ernytech/tdlib/utils/BoundedExecutorServiceImpl.java similarity index 90% rename from src/main/java/it/ernytech/tdlib/utils/BoundedExecutor.java rename to src/main/java/it/ernytech/tdlib/utils/BoundedExecutorServiceImpl.java index 9e4509b..2d667e6 100644 --- a/src/main/java/it/ernytech/tdlib/utils/BoundedExecutor.java +++ b/src/main/java/it/ernytech/tdlib/utils/BoundedExecutorServiceImpl.java @@ -1,12 +1,11 @@ package it.ernytech.tdlib.utils; -import java.util.concurrent.locks.ReentrantLock; import org.jetbrains.annotations.Nullable; import java.util.concurrent.*; import java.util.function.BiConsumer; -public class BoundedExecutor extends ThreadPoolExecutor { +class BoundedExecutorServiceImpl extends ThreadPoolExecutor implements BoundedExecutorService { private final Semaphore semaphore; private final @Nullable BiConsumer queueSizeStatus; @@ -22,7 +21,7 @@ public class BoundedExecutor extends ThreadPoolExecutor { * @param unit * @param queueSizeStatus Status. The boolean indicates if the queue is full, the integer indicates the current queue size */ - public BoundedExecutor(int maxQueueSize, + public BoundedExecutorServiceImpl(int maxQueueSize, int corePoolSize, int maxPoolSize, long keepAliveTime, @@ -41,6 +40,7 @@ public class BoundedExecutor extends ThreadPoolExecutor { * Submits task to execution pool, but blocks while number of running threads * has reached the bound limit */ + @Override public Future submitButBlockIfFull(final Callable task) throws InterruptedException { blockIfFull(); return submit(task); @@ -50,6 +50,7 @@ public class BoundedExecutor extends ThreadPoolExecutor { * Submits task to execution pool, but blocks while number of running threads * has reached the bound limit */ + @Override public void executeButBlockIfFull(final Runnable task) throws InterruptedException { blockIfFull(); execute(task); @@ -69,7 +70,7 @@ public class BoundedExecutor extends ThreadPoolExecutor { } @Override - protected void beforeExecute(Thread t, Runnable r) { + public void beforeExecute(Thread t, Runnable r) { semaphore.release(); super.beforeExecute(t, r);