Added warp utils

This commit is contained in:
Andrea Cavalli 2020-05-31 17:19:05 +02:00
parent a9f596b7ea
commit 8bac77db70
3 changed files with 5 additions and 118 deletions

View File

@ -29,6 +29,11 @@
<version>17.0.0</version> <version>17.0.0</version>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency>
<groupId>org.warp</groupId>
<artifactId>common-utils</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies> </dependencies>
<build> <build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --> <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->

View File

@ -1,37 +0,0 @@
package it.ernytech.tdlib.utils;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.jetbrains.annotations.Nullable;
public interface BoundedExecutorService extends ExecutorService {
@Deprecated
static BoundedExecutorService create(int maxQueueSize,
int corePoolSize,
int maxPoolSize,
long keepAliveTime,
TimeUnit unit,
@Nullable BiConsumer<Boolean, Integer> queueSizeStatus) {
return new BoundedExecutorServiceImpl(maxQueueSize, corePoolSize, maxPoolSize, keepAliveTime, unit,
Executors.defaultThreadFactory(), queueSizeStatus);
}
static BoundedExecutorService create(int maxQueueSize,
int corePoolSize,
int maxPoolSize,
long keepAliveTime,
TimeUnit unit,
ThreadFactory threadFactory,
@Nullable BiConsumer<Boolean, Integer> queueSizeStatus) {
return new BoundedExecutorServiceImpl(maxQueueSize, corePoolSize, maxPoolSize, keepAliveTime, unit, threadFactory, queueSizeStatus);
}
<T> Future<T> submitButBlockIfFull(Callable<T> task) throws InterruptedException;
void executeButBlockIfFull(Runnable task) throws InterruptedException;
}

View File

@ -1,81 +0,0 @@
package it.ernytech.tdlib.utils;
import org.jetbrains.annotations.Nullable;
import java.util.concurrent.*;
import java.util.function.BiConsumer;
class BoundedExecutorServiceImpl extends ThreadPoolExecutor implements BoundedExecutorService {
private final Semaphore semaphore;
private final @Nullable BiConsumer<Boolean, Integer> queueSizeStatus;
private final int maxQueueSize;
private final Object queueSizeStatusLock = new Object();
/**
*
* @param maxQueueSize
* @param corePoolSize
* @param maxPoolSize
* @param keepAliveTime
* @param unit
* @param queueSizeStatus Status. The boolean indicates if the queue is full, the integer indicates the current queue size
*/
public BoundedExecutorServiceImpl(int maxQueueSize,
int corePoolSize,
int maxPoolSize,
long keepAliveTime,
TimeUnit unit,
ThreadFactory threadFactory,
@Nullable BiConsumer<Boolean, Integer> queueSizeStatus) {
super(corePoolSize, maxPoolSize, keepAliveTime, unit, new LinkedBlockingQueue<>(), threadFactory);
if (maxQueueSize < 0) {
throw new IllegalArgumentException();
}
this.maxQueueSize = maxQueueSize;
this.queueSizeStatus = queueSizeStatus;
semaphore = new Semaphore(maxQueueSize);
}
/**
* Submits task to execution pool, but blocks while number of running threads
* has reached the bound limit
*/
@Override
public <T> Future<T> submitButBlockIfFull(final Callable<T> task) throws InterruptedException {
blockIfFull();
return submit(task);
}
/**
* Submits task to execution pool, but blocks while number of running threads
* has reached the bound limit
*/
@Override
public void executeButBlockIfFull(final Runnable task) throws InterruptedException {
blockIfFull();
execute(task);
}
private void blockIfFull() throws InterruptedException {
if (semaphore.availablePermits() == 0) {
synchronized (queueSizeStatusLock) {
if (queueSizeStatus != null) queueSizeStatus.accept(true, maxQueueSize + (semaphore.hasQueuedThreads() ? semaphore.getQueueLength() : 0));
}
}
semaphore.acquire();
}
@Override
public void beforeExecute(Thread t, Runnable r) {
var queueSize = getQueue().size();
synchronized (queueSizeStatusLock) {
if (queueSizeStatus != null) queueSizeStatus.accept(queueSize >= maxQueueSize, queueSize);
}
semaphore.release();
super.beforeExecute(t, r);
}
}