Bounded executor service

This commit is contained in:
Andrea Cavalli 2020-05-01 00:31:27 +02:00
parent bebff648b7
commit 2111b5bf34
3 changed files with 36 additions and 7 deletions

View File

@ -83,9 +83,13 @@ public class Client {
throw new IllegalThreadStateException("Thread: " + Thread.currentThread().getName() + " trying receive incoming updates but shouldn't be called simultaneously from two different threads!");
}
int resultSize;
this.receiveLock.lock();
var resultSize = nativeClientReceive(this.clientId, eventIds, events, timeout);
try {
resultSize = nativeClientReceive(this.clientId, eventIds, events, timeout);
} finally {
this.receiveLock.unlock();
}
for (int i = 0; i < resultSize; i++) {
responseList.add(new Response(eventIds[i], events[i]));

View File

@ -0,0 +1,24 @@
package it.ernytech.tdlib.utils;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.jetbrains.annotations.Nullable;
public interface BoundedExecutorService extends ExecutorService {
static BoundedExecutorService create(int maxQueueSize,
int corePoolSize,
int maxPoolSize,
long keepAliveTime,
TimeUnit unit,
@Nullable BiConsumer<Boolean, Integer> queueSizeStatus) {
return new BoundedExecutorServiceImpl(maxQueueSize, corePoolSize, maxPoolSize, keepAliveTime, unit, queueSizeStatus);
}
<T> Future<T> submitButBlockIfFull(Callable<T> task) throws InterruptedException;
void executeButBlockIfFull(Runnable task) throws InterruptedException;
}

View File

@ -1,12 +1,11 @@
package it.ernytech.tdlib.utils;
import java.util.concurrent.locks.ReentrantLock;
import org.jetbrains.annotations.Nullable;
import java.util.concurrent.*;
import java.util.function.BiConsumer;
public class BoundedExecutor extends ThreadPoolExecutor {
class BoundedExecutorServiceImpl extends ThreadPoolExecutor implements BoundedExecutorService {
private final Semaphore semaphore;
private final @Nullable BiConsumer<Boolean, Integer> queueSizeStatus;
@ -22,7 +21,7 @@ public class BoundedExecutor extends ThreadPoolExecutor {
* @param unit
* @param queueSizeStatus Status. The boolean indicates if the queue is full, the integer indicates the current queue size
*/
public BoundedExecutor(int maxQueueSize,
public BoundedExecutorServiceImpl(int maxQueueSize,
int corePoolSize,
int maxPoolSize,
long keepAliveTime,
@ -41,6 +40,7 @@ public class BoundedExecutor extends ThreadPoolExecutor {
* Submits task to execution pool, but blocks while number of running threads
* has reached the bound limit
*/
@Override
public <T> Future<T> submitButBlockIfFull(final Callable<T> task) throws InterruptedException {
blockIfFull();
return submit(task);
@ -50,6 +50,7 @@ public class BoundedExecutor extends ThreadPoolExecutor {
* Submits task to execution pool, but blocks while number of running threads
* has reached the bound limit
*/
@Override
public void executeButBlockIfFull(final Runnable task) throws InterruptedException {
blockIfFull();
execute(task);
@ -69,7 +70,7 @@ public class BoundedExecutor extends ThreadPoolExecutor {
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
public void beforeExecute(Thread t, Runnable r) {
semaphore.release();
super.beforeExecute(t, r);