Update BoundedExecutorServiceImpl.java

This commit is contained in:
Andrea Cavalli 2020-06-24 12:12:31 +02:00
parent fd426938f5
commit 46dcf99277

View File

@ -1,9 +1,14 @@
package org.warp.commonutils.concurrency.executor; package org.warp.commonutils.concurrency.executor;
import org.jetbrains.annotations.Nullable; import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.*; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import org.jetbrains.annotations.Nullable;
class BoundedExecutorServiceImpl extends ThreadPoolExecutor implements BoundedExecutorService { class BoundedExecutorServiceImpl extends ThreadPoolExecutor implements BoundedExecutorService {
@ -43,8 +48,7 @@ class BoundedExecutorServiceImpl extends ThreadPoolExecutor implements BoundedEx
*/ */
@Override @Override
public <T> Future<T> submitButBlockIfFull(final Callable<T> task) throws InterruptedException { public <T> Future<T> submitButBlockIfFull(final Callable<T> task) throws InterruptedException {
blockIfFull(); return super.submit(task);
return submit(task);
} }
/** /**
@ -53,8 +57,7 @@ class BoundedExecutorServiceImpl extends ThreadPoolExecutor implements BoundedEx
*/ */
@Override @Override
public void executeButBlockIfFull(final Runnable task) throws InterruptedException { public void executeButBlockIfFull(final Runnable task) throws InterruptedException {
blockIfFull(); super.execute(task);
execute(task);
} }
private void blockIfFull() throws InterruptedException { private void blockIfFull() throws InterruptedException {
@ -68,6 +71,12 @@ class BoundedExecutorServiceImpl extends ThreadPoolExecutor implements BoundedEx
@Override @Override
public void beforeExecute(Thread t, Runnable r) { public void beforeExecute(Thread t, Runnable r) {
try {
blockIfFull();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
var queueSize = getQueue().size(); var queueSize = getQueue().size();
synchronized (queueSizeStatusLock) { synchronized (queueSizeStatusLock) {
if (queueSizeStatus != null) queueSizeStatus.accept(queueSize >= maxQueueSize, queueSize); if (queueSizeStatus != null) queueSizeStatus.accept(queueSize >= maxQueueSize, queueSize);