Fix queue auto-resizing

This commit is contained in:
Andrea Cavalli 2020-09-18 00:43:35 +02:00
parent 9e5b0a3688
commit 54f09d35ac
12 changed files with 152 additions and 513 deletions

View File

@ -19,7 +19,6 @@ public class ParallelUtils {
int parallelism,
int groupSize, Consumer<V> consumer) throws CompletionException {
var parallelExecutor = BoundedExecutorService.create(maxQueueSize,
parallelism,
parallelism,
0,
TimeUnit.MILLISECONDS,
@ -76,7 +75,6 @@ public class ParallelUtils {
int parallelism,
int groupSize, BiConsumer<K, V> consumer) throws CompletionException {
var parallelExecutor = BoundedExecutorService.create(maxQueueSize,
parallelism,
parallelism,
0,
TimeUnit.MILLISECONDS,
@ -139,7 +137,6 @@ public class ParallelUtils {
int groupSize,
TriConsumer<K1, K2, V> consumer) throws CompletionException {
var parallelExecutor = BoundedExecutorService.create(maxQueueSize,
parallelism,
parallelism,
0,
TimeUnit.MILLISECONDS,

View File

@ -1,229 +0,0 @@
package org.warp.commonutils.concurrency.executor;
import java.lang.StackWalker.StackFrame;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.jetbrains.annotations.NotNull;
import org.warp.commonutils.type.IntWrapper;
public class AsyncStackTraceExecutorDecorator extends ExecutorDecorator {
private static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private static final Map<Thread, LSTTask> threadToTask = new HashMap<>();
private static final Map<Thread, AsyncStackTraceExecutorDecorator> threadToExecutor = new HashMap<>();
public AsyncStackTraceExecutorDecorator(Executor executor) {
super(executor);
}
@Override
public void execute(@NotNull Runnable command) {
var currentThread = Thread.currentThread();
List<StackFrame> frames = new ArrayList<>();
LSTTask lstTask;
lock.readLock().lock();
try {
lstTask = threadToTask.getOrDefault(currentThread, null);
// Add the current stack frames
addCurrentStackTrace(frames, lstTask != null);
if (lstTask != null) {
frames.addAll(lstTask.frames);
}
lstTask = new LSTTask(command, frames);
//System.out.println("execute(): THREAD-" + Thread.currentThread().hashCode() + " TASK-" + lstTask.hashCode());
} finally {
lock.readLock().unlock();
}
super.execute(lstTask);
}
private static void addCurrentStackTrace(List<StackFrame> frames, boolean isFromAsyncCal) {
StackWalker.getInstance().walk(a -> {
IntWrapper count = new IntWrapper(0);
int STACK_MAX_SIZE = 10;
a.filter(x -> {
var cn = x.getClassName();
return !cn.equals("java.util.concurrent.CompletableFuture")
&& !cn.equals("java.util.concurrent.CompletableFuture$AsyncRun")
&& !cn.equals("java.util.concurrent.ThreadPoolExecutor")
&& !cn.equals("java.util.concurrent.ThreadPoolExecutor$Worker")
&& !cn.equals("java.lang.Thread")
&& !cn.equals(LSTTask.class.getName())
&& !cn.equals(AsyncStackTraceExecutorDecorator.class.getName());
}).skip(0).limit(STACK_MAX_SIZE + 1).peek(x -> count.var++).forEachOrdered(frames::add);
if (count.var > STACK_MAX_SIZE) {
frames.remove(frames.size() - 1);
frames.add(new TextStackFrame("AndMoreFrames"));
}
return null;
});
if (isFromAsyncCal) {
frames.add(new TextStackFrame("AsyncCall"));
}
}
class LSTTask implements Runnable {
private final Runnable runnable;
List<StackFrame> frames;
LSTTask(Runnable runnable, List<StackFrame> frames) {
this.runnable = runnable;
this.frames = frames;
}
@Override
public void run() {
var currentThread = Thread.currentThread();
lock.writeLock().lock();
try {
threadToTask.put(currentThread, LSTTask.this);
threadToExecutor.put(currentThread, AsyncStackTraceExecutorDecorator.this);
} finally {
lock.writeLock().unlock();
}
try {
//System.out.println(" run(): THREAD-" + Thread.currentThread().hashCode() + " TASK-" + this.hashCode());
runnable.run();
} catch (Throwable t) {
RuntimeException e = new RuntimeException(t);
e.setStackTrace(frames.stream().map(StackFrame::toStackTraceElement).toArray(StackTraceElement[]::new));
throw e;
}
lock.writeLock().lock();
try {
threadToExecutor.remove(currentThread, AsyncStackTraceExecutorDecorator.this);
threadToTask.remove(currentThread, LSTTask.this);
} finally {
lock.writeLock().unlock();
}
}
}
public static void fixStackTrace(Exception ex) {
List<StackTraceElement> result = new ArrayList<>();
var currentThread = Thread.currentThread();
lock.readLock().lock();
try {
var executor = threadToExecutor.getOrDefault(currentThread, null);
if (executor != null) {
LSTTask lstTask = threadToTask.getOrDefault(currentThread, null);
if (lstTask != null) {
var currentStackFrames = new ArrayList<StackFrame>();
addCurrentStackTrace(currentStackFrames, true);
for (var frame : currentStackFrames) {
result.add(frame.toStackTraceElement());
}
for (var frame : lstTask.frames) {
result.add(frame.toStackTraceElement());
}
ex.setStackTrace(result.toArray(StackTraceElement[]::new));
}
}
} finally {
lock.readLock().unlock();
}
}
public static void dumpStack() {
var currentThread = Thread.currentThread();
lock.readLock().lock();
try {
var executor = threadToExecutor.getOrDefault(currentThread, null);
if (executor != null) {
LSTTask lstTask = threadToTask.getOrDefault(currentThread, null);
if (lstTask != null) {
StringBuilder sb = new StringBuilder();
sb.append(new Exception("Stack trace").toString()).append('\n');
var currentStackFrames = new ArrayList<StackFrame>();
addCurrentStackTrace(currentStackFrames, true);
for (var frame : currentStackFrames) {
printStackFrame(sb, frame);
}
for (var frame : lstTask.frames) {
printStackFrame(sb, frame);
}
System.err.println(sb.toString());
return;
}
}
Thread.dumpStack();
} finally {
lock.readLock().unlock();
}
}
private static void printStackFrame(StringBuilder sb, StackFrame frame) {
if(frame.getClassName().equals("AsyncCall")) {
sb.append("\t(async call)\n");
} else if(frame.getClassName().equals("AndMoreFrames")) {
sb.append("\t... omitted more frames\n");
} else {
sb.append("\tat ").append(frame.toString()).append('\n');
}
}
private static class TextStackFrame implements StackFrame {
private final String text;
public TextStackFrame(String text) {
this.text = text;
}
@Override
public String getClassName() {
return text;
}
@Override
public String getMethodName() {
return "..";
}
@Override
public Class<?> getDeclaringClass() {
return Object.class;
}
@Override
public int getByteCodeIndex() {
return 0;
}
@Override
public String getFileName() {
return null;
}
@Override
public int getLineNumber() {
return 0;
}
@Override
public boolean isNativeMethod() {
return false;
}
@Override
public StackTraceElement toStackTraceElement() {
return new StackTraceElement(getClassName(), getMethodName(), getFileName(), getLineNumber());
}
}
}

View File

@ -1,33 +0,0 @@
package org.warp.commonutils.concurrency.executor;
import java.util.concurrent.ExecutorService;
import org.jetbrains.annotations.NotNull;
public class AsyncStackTraceExecutorServiceDecorator extends SimplerExecutorServiceDecorator {
// todo: Fix async stacktrace performance and memory problems
private static final boolean DISABLE_ASYNC_STACKTRACES_GLOBALLY = true;
public AsyncStackTraceExecutorServiceDecorator(ExecutorService executorService) {
super(executorService, (executor) -> {
if (DISABLE_ASYNC_STACKTRACES_GLOBALLY) {
return executor;
}
// Do nothing if it has already the asyncstacktrace executor service decorator
if (executorService instanceof ExecutorServiceDecorator) {
var decorators = ((ExecutorServiceDecorator) executorService).getExecutorServiceDecorators();
if (decorators.contains(AsyncStackTraceExecutorServiceDecorator.class)) {
return new ExecutorDecorator(executorService) {
@Override
public void execute(@NotNull Runnable runnable) {
super.execute(runnable);
}
};
}
}
return new AsyncStackTraceExecutorDecorator(executor);
});
}
}

View File

@ -1,7 +1,5 @@
package org.warp.commonutils.concurrency.executor;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
@ -9,10 +7,7 @@ import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.StampedLock;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
@ -23,11 +18,6 @@ public class BlockingOnFullQueueExecutorServiceDecorator extends ExecutorService
private volatile boolean ignoreTaskLimit;
private final StampedLock drainAllLock = new StampedLock();
@Nonnull
private final Semaphore taskLimit;
@Nonnull
private final Duration timeout;
@ -38,12 +28,8 @@ public class BlockingOnFullQueueExecutorServiceDecorator extends ExecutorService
private final @Nullable BiConsumer<Boolean, Integer> queueSizeStatus;
@Nonnull
private final Object queueSizeStatusLock;
public BlockingOnFullQueueExecutorServiceDecorator(@Nonnull final ExecutorService executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout, @Nonnull Supplier<Integer> queueSizeSupplier, @Nullable BiConsumer<Boolean, Integer> queueSizeStatus) {
super(executor);
ExecutorServiceDecorator.hasDecorator(executor, this.getClass());
if (maximumTaskNumber < 0) {
throw new IllegalArgumentException(String.format("At least zero tasks must be permitted, not '%d'", maximumTaskNumber));
} else if (maximumTaskNumber == 0) {
@ -56,8 +42,6 @@ public class BlockingOnFullQueueExecutorServiceDecorator extends ExecutorService
this.maximumTaskNumber = maximumTaskNumber;
this.queueSizeSupplier = queueSizeSupplier;
this.queueSizeStatus = queueSizeStatus;
this.queueSizeStatusLock = new Object();
this.taskLimit = new Semaphore(maximumTaskNumber);
}
public BlockingOnFullQueueExecutorServiceDecorator(@Nonnull final ExecutorService executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout, @Nonnull Supplier<Integer> queueSizeSupplier) {
@ -67,33 +51,13 @@ public class BlockingOnFullQueueExecutorServiceDecorator extends ExecutorService
private void preExecute(Object command) {
Objects.requireNonNull(command, "'command' must not be null");
if (!ignoreTaskLimit) {
try {
if (this.taskLimit.availablePermits() == 0) {
synchronized (queueSizeStatusLock) {
if (queueSizeStatus != null)
queueSizeStatus.accept(true,
maximumTaskNumber + (taskLimit.hasQueuedThreads() ? taskLimit.getQueueLength() : 0)
);
}
}
// attempt to acquire permit for task execution
if (!this.taskLimit.tryAcquire(this.timeout.toMillis(), MILLISECONDS)) {
throw new RejectedExecutionException(String.format("Executor '%s' busy", super.toString()));
}
} catch (final InterruptedException e) {
// restore interrupt status
Thread.currentThread().interrupt();
throw new RejectedExecutionException(e);
}
}
}
@Override
public final void execute(final @NotNull Runnable command) {
preExecute(command);
super.execute(new PermitReleasingRunnableDecorator(command, this::updateQueue, this.taskLimit));
super.execute(new PermitReleasingRunnableDecorator(command, this::updateQueue));
}
@NotNull
@ -101,7 +65,7 @@ public class BlockingOnFullQueueExecutorServiceDecorator extends ExecutorService
public <T> Future<T> submit(@NotNull Callable<T> task) {
preExecute(task);
return super.submit(new PermitReleasingCallableDecorator<>(task, this::updateQueue, this.taskLimit));
return super.submit(new PermitReleasingCallableDecorator<>(task, this::updateQueue));
}
@NotNull
@ -109,7 +73,7 @@ public class BlockingOnFullQueueExecutorServiceDecorator extends ExecutorService
public <T> Future<T> submit(@NotNull Runnable task, T result) {
preExecute(task);
return super.submit(new PermitReleasingRunnableDecorator(task, this::updateQueue, this.taskLimit), result);
return super.submit(new PermitReleasingRunnableDecorator(task, this::updateQueue), result);
}
@NotNull
@ -117,23 +81,23 @@ public class BlockingOnFullQueueExecutorServiceDecorator extends ExecutorService
public Future<?> submit(@NotNull Runnable task) {
preExecute(task);
return super.submit(new PermitReleasingRunnableDecorator(task, this::updateQueue, this.taskLimit));
return super.submit(new PermitReleasingRunnableDecorator(task, this::updateQueue));
}
private void updateQueue(boolean beforeRunning) {
var queueSize = queueSizeSupplier.get() + (beforeRunning ? 1 : 0);
synchronized (queueSizeStatusLock) {
if (queueSizeStatus != null) queueSizeStatus.accept(!ignoreTaskLimit && queueSize >= maximumTaskNumber, queueSize);
}
var full = !ignoreTaskLimit && queueSize >= maximumTaskNumber;
if (queueSizeStatus != null) queueSizeStatus.accept(full, queueSize);
}
@Override
public void shutdown() {
this.ignoreTaskLimit = true;
while (this.taskLimit.hasQueuedThreads()) {
this.taskLimit.release(10);
}
super.shutdown();
}
void testShutdown() {
super.shutdown();
}
@ -141,9 +105,6 @@ public class BlockingOnFullQueueExecutorServiceDecorator extends ExecutorService
@Override
public List<Runnable> shutdownNow() {
this.ignoreTaskLimit = true;
while (this.taskLimit.hasQueuedThreads()) {
this.taskLimit.release(10);
}
return super.shutdownNow();
}
@ -189,7 +150,7 @@ public class BlockingOnFullQueueExecutorServiceDecorator extends ExecutorService
@Override
public final String toString() {
return String.format("%s[availablePermits='%s',timeout='%s',delegate='%s']", getClass().getSimpleName(), this.taskLimit.availablePermits(),
return String.format("%s[timeout='%s',delegate='%s']", getClass().getSimpleName(),
this.timeout, super.toString());
}
}

View File

@ -1,52 +0,0 @@
package org.warp.commonutils.concurrency.executor;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.StampedLock;
import org.jetbrains.annotations.NotNull;
public class BoundedExecutor {
private final Executor executor;
private final int maxQueueSize;
private final Semaphore semaphore;
private final StampedLock drainAllLock = new StampedLock();
public BoundedExecutor(Executor executor, int maxQueueSize) {
this.executor = executor;
this.maxQueueSize = maxQueueSize > 0 ? maxQueueSize : Integer.MAX_VALUE;
this.semaphore = new Semaphore(maxQueueSize);
}
public void executeButBlockIfFull(@NotNull Runnable command) throws RejectedExecutionException, InterruptedException {
var drainAllLockRead = drainAllLock.readLockInterruptibly();
semaphore.acquire();
try {
executor.execute(() -> {
try {
semaphore.release();
command.run();
} finally {
drainAllLock.unlockRead(drainAllLockRead);
}
});
} catch (RejectedExecutionException | NullPointerException ex) {
drainAllLock.unlockRead(drainAllLockRead);
throw ex;
}
}
public void drainAll(DrainAllMethodLambda runnableWhenDrained) throws InterruptedException {
var drainAllWriteLock = drainAllLock.writeLockInterruptibly();
try {
runnableWhenDrained.run();
} finally {
drainAllLock.unlockWrite(drainAllWriteLock);
}
}
public interface DrainAllMethodLambda {
void run() throws InterruptedException;
}
}

View File

@ -2,7 +2,6 @@ package org.warp.commonutils.concurrency.executor;
import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
@ -20,76 +19,55 @@ public class BoundedExecutorService {
}
@Deprecated
public static ExecutorService createUnbounded(
int corePoolSize,
public static BlockingOnFullQueueExecutorServiceDecorator createUnbounded(
int maxPoolSize,
long keepAliveTime,
TimeUnit unit,
@Nullable BiConsumer<Boolean, Integer> queueSizeStatus) {
return create(0, corePoolSize, maxPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueSizeStatus);
return create(0, maxPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueSizeStatus);
}
public static ExecutorService createUnbounded(
int corePoolSize,
public static BlockingOnFullQueueExecutorServiceDecorator createUnbounded(
int maxPoolSize,
long keepAliveTime,
TimeUnit unit,
ThreadFactory threadFactory,
@Nullable BiConsumer<Boolean, Integer> queueSizeStatus) {
return createCustom(0, corePoolSize, maxPoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(100000), queueSizeStatus, new LinkedBlockingQueue<>(MAX_BLOCKING_QUEUE_SIZE));
return createCustom(0, maxPoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(100000), queueSizeStatus, new LinkedBlockingQueue<>());
}
public static ExecutorService createUnbounded(
int corePoolSize,
public static BlockingOnFullQueueExecutorServiceDecorator createUnbounded(
int maxPoolSize,
long keepAliveTime,
TimeUnit unit,
ThreadFactory threadFactory,
@Nullable BiConsumer<Boolean, Integer> queueSizeStatus,
BlockingQueue<Runnable> queue) {
return createCustom(0, corePoolSize, maxPoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(100000), queueSizeStatus, queue);
return createCustom(0, maxPoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(100000), queueSizeStatus, queue);
}
@Deprecated
public static ExecutorService create(
public static BlockingOnFullQueueExecutorServiceDecorator create(
int maxQueueSize,
int corePoolSize,
int maxPoolSize,
long keepAliveTime,
TimeUnit unit,
@Nullable BiConsumer<Boolean, Integer> queueSizeStatus) {
if (corePoolSize <= 0) throw new IllegalArgumentException("Core pool size must be >=1 if the executor service is bounded");
return create(maxQueueSize, corePoolSize, maxPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueSizeStatus);
return create(maxQueueSize, maxPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueSizeStatus);
}
public static ExecutorService create(
public static BlockingOnFullQueueExecutorServiceDecorator create(
int maxQueueSize,
int corePoolSize,
int maxPoolSize,
long keepAliveTime,
TimeUnit unit,
ThreadFactory threadFactory,
@Nullable BiConsumer<Boolean, Integer> queueSizeStatus) {
if (corePoolSize <= 0) throw new IllegalArgumentException("Core pool size must be >=1 if the executor service is bounded");
return createCustom(maxQueueSize, corePoolSize, maxPoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(100000), queueSizeStatus, new LinkedBlockingQueue<>(MAX_BLOCKING_QUEUE_SIZE));
return createCustom(maxQueueSize, maxPoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(100000), queueSizeStatus, new LinkedBlockingQueue<>(maxQueueSize));
}
public static ExecutorService create(
public static BlockingOnFullQueueExecutorServiceDecorator createCustom(
int maxQueueSize,
int corePoolSize,
int maxPoolSize,
long keepAliveTime,
TimeUnit unit,
ThreadFactory threadFactory,
@Nullable BiConsumer<Boolean, Integer> queueSizeStatus,
BlockingQueue<Runnable> queue) {
if (corePoolSize <= 0) throw new IllegalArgumentException("Core pool size must be >=1 if the executor service is bounded");
return createCustom(maxQueueSize, corePoolSize, maxPoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(100000), queueSizeStatus, queue);
}
public static ExecutorService createCustom(
int maxQueueSize,
int corePoolSize,
int maxPoolSize,
long keepAliveTime,
TimeUnit unit,
@ -97,13 +75,16 @@ public class BoundedExecutorService {
Duration queueItemTtl,
@Nullable BiConsumer<Boolean, Integer> queueSizeStatus,
BlockingQueue<Runnable> queue) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(maxPoolSize,
maxPoolSize,
keepAliveTime,
unit,
queue,
threadFactory
);
if (keepAliveTime > 0) {
threadPoolExecutor.allowCoreThreadTimeOut(true);
}
threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return new BlockingOnFullQueueExecutorServiceDecorator(threadPoolExecutor,
maxQueueSize,

View File

@ -1,8 +1,6 @@
package org.warp.commonutils.concurrency.executor;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import org.jetbrains.annotations.NotNull;
@ -13,16 +11,6 @@ public abstract class ExecutorDecorator implements Executor {
this.executor = Objects.requireNonNull(executor);
}
public final Set<Class<? extends ExecutorDecorator>> getExecutorDecorators() {
if (executor instanceof ExecutorDecorator) {
var decorators = ((ExecutorDecorator) executor).getExecutorDecorators();
decorators.add(this.getClass());
return decorators;
} else {
return new HashSet<>();
}
}
@Override
public void execute(@NotNull Runnable runnable) {
executor.execute(runnable);

View File

@ -1,10 +1,8 @@
package org.warp.commonutils.concurrency.executor;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@ -20,26 +18,6 @@ public abstract class ExecutorServiceDecorator implements ExecutorService {
this.executorService = Objects.requireNonNull(executorService);
}
protected static boolean hasDecorator(ExecutorService executor,
Class<? extends ExecutorServiceDecorator> decoratorClass) {
if (executor instanceof ExecutorServiceDecorator) {
var executorServiceDecoratorImpl = (ExecutorServiceDecorator) executor;
var executorServiceDecorators = executorServiceDecoratorImpl.getExecutorServiceDecorators();
return executorServiceDecorators.contains(decoratorClass);
}
return false;
}
public final Set<Class<? extends ExecutorServiceDecorator>> getExecutorServiceDecorators() {
if (executorService instanceof ExecutorServiceDecorator) {
var decorators = ((ExecutorServiceDecorator) executorService).getExecutorServiceDecorators();
decorators.add(this.getClass());
return decorators;
} else {
return new HashSet<>();
}
}
@Override
public void shutdown() {
executorService.shutdown();

View File

@ -1,7 +1,6 @@
package org.warp.commonutils.concurrency.executor;
import java.util.concurrent.Callable;
import java.util.concurrent.Semaphore;
import javax.annotation.Nonnull;
public final class PermitReleasingCallableDecorator<T> extends CallableDecorator<T> {
@ -9,15 +8,10 @@ public final class PermitReleasingCallableDecorator<T> extends CallableDecorator
@Nonnull
private final QueueSizeUpdater queueSizeUpdater;
@Nonnull
private final Semaphore semaphore;
PermitReleasingCallableDecorator(@Nonnull final Callable<T> task,
@Nonnull final QueueSizeUpdater queueSizeUpdater,
@Nonnull final Semaphore semaphoreToRelease) {
@Nonnull final QueueSizeUpdater queueSizeUpdater) {
super(task);
this.queueSizeUpdater = queueSizeUpdater;
this.semaphore = semaphoreToRelease;
}
@Override
@ -25,9 +19,6 @@ public final class PermitReleasingCallableDecorator<T> extends CallableDecorator
try {
queueSizeUpdater.update(true);
} finally {
// however execution goes, release permit for next task
this.semaphore.release();
try {
return super.call();
} finally {

View File

@ -1,22 +1,15 @@
package org.warp.commonutils.concurrency.executor;
import java.util.concurrent.Semaphore;
import javax.annotation.Nonnull;
public final class PermitReleasingRunnableDecorator extends RunnableDecorator {
@Nonnull
private final QueueSizeUpdater queueSizeUpdater;
@Nonnull
private final Semaphore semaphore;
PermitReleasingRunnableDecorator(@Nonnull final Runnable task,
@Nonnull final QueueSizeUpdater queueSizeUpdater,
@Nonnull final Semaphore semaphoreToRelease) {
@Nonnull final QueueSizeUpdater queueSizeUpdater) {
super(task);
this.queueSizeUpdater = queueSizeUpdater;
this.semaphore = semaphoreToRelease;
}
@Override
@ -24,9 +17,6 @@ public final class PermitReleasingRunnableDecorator extends RunnableDecorator {
try {
queueSizeUpdater.update(true);
} finally {
// however execution goes, release permit for next task
this.semaphore.release();
try {
super.run();
} finally {

View File

@ -1,57 +0,0 @@
package org.warp.commonutils;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.opentest4j.AssertionFailedError;
import org.warp.commonutils.concurrency.executor.BoundedExecutorService;
import org.warp.commonutils.type.ShortNamedThreadFactory;
public class BoundedQueueTest {
@Test
public void testBoundedQueue() throws InterruptedException {
testBoundedQueue(1, 1);
testBoundedQueue(1, 10);
testBoundedQueue(4, 10);
}
public void testBoundedQueue(int corePoolSize, int maxPoolSize) throws InterruptedException {
int maxQueueSize = 2;
AtomicInteger queueSize = new AtomicInteger();
AtomicReference<AssertionFailedError> failedError = new AtomicReference<>();
var executor = BoundedExecutorService.create(maxQueueSize,
corePoolSize,
maxPoolSize,
0L,
TimeUnit.MILLISECONDS,
new ShortNamedThreadFactory("test"),
(isQueueFull, currentQueueSize) -> {
try {
if (currentQueueSize >= maxQueueSize) {
Assertions.assertTrue(isQueueFull);
} else {
Assertions.assertFalse(isQueueFull);
}
} catch (AssertionFailedError ex) {
if (failedError.get() == null) {
failedError.set(ex);
}
ex.printStackTrace();
}
}
);
for (int i = 0; i < 10000; i++) {
queueSize.incrementAndGet();
executor.execute(queueSize::decrementAndGet);
}
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
Assertions.assertNull(failedError.get());
}
}

View File

@ -0,0 +1,124 @@
package org.warp.commonutils.concurrency.executor;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.opentest4j.AssertionFailedError;
import org.warp.commonutils.type.ShortNamedThreadFactory;
public class BoundedQueueTest {
@Test
public void testBoundedQueue() throws InterruptedException, ExecutionException {
testBoundedQueue(1, 1);
testBoundedQueue(1, 10);
testBoundedQueue(4, 10);
testBoundedQueue(0, 10);
}
public void testBoundedQueue(int corePoolSize, int maxPoolSize) throws InterruptedException, ExecutionException {
int maxQueueSize = 2;
AtomicInteger queueSize = new AtomicInteger();
AtomicReference<AssertionFailedError> failedError = new AtomicReference<>();
AtomicInteger maxRecordedCurrentQueueSize = new AtomicInteger(0);
var executor = BoundedExecutorService.create(maxQueueSize,
maxPoolSize,
0L,
TimeUnit.MILLISECONDS,
new ShortNamedThreadFactory("test"),
(isQueueFull, currentQueueSize) -> {
try {
if (currentQueueSize >= maxQueueSize) {
Assertions.assertTrue(isQueueFull);
} else {
Assertions.assertFalse(isQueueFull);
}
} catch (AssertionFailedError ex) {
if (failedError.get() == null) {
failedError.set(ex);
}
ex.printStackTrace();
}
}
);
for (int i = 0; i < 10000; i++) {
queueSize.incrementAndGet();
executor.execute(queueSize::decrementAndGet);
}
executor.testShutdown();
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
Assertions.fail("Not terminated");
}
Assertions.assertNull(failedError.get());
}
@Test
public void testBoundedQueueMaxPoolSize1_1() throws InterruptedException, ExecutionException {
testBoundedQueueMaxPoolSize( 1, 1);
}
@Test
public void testBoundedQueueMaxPoolSize10_10() throws InterruptedException, ExecutionException {
testBoundedQueueMaxPoolSize( 10, 10);
}
@Test
public void testBoundedQueueMaxPoolSize10_1() throws InterruptedException, ExecutionException {
testBoundedQueueMaxPoolSize( 10, 1);
}
@Test
public void testBoundedQueueMaxPoolSize1_10() throws InterruptedException, ExecutionException {
testBoundedQueueMaxPoolSize( 1, 10);
}
@Test
public void testBoundedQueueMaxPoolSize4_10() throws InterruptedException, ExecutionException {
testBoundedQueueMaxPoolSize( 4, 10);
}
public void testBoundedQueueMaxPoolSize(int maxPoolSize, int maxQueueSize) throws InterruptedException, ExecutionException {
CountDownLatch allFilled = new CountDownLatch(maxPoolSize);
var executor = BoundedExecutorService.create(maxQueueSize,
maxPoolSize,
0L,
TimeUnit.MILLISECONDS,
new ShortNamedThreadFactory("test"),
(isQueueFull, currentQueueSize) -> {
}
);
AtomicReference<InterruptedException> failedError = new AtomicReference<>();
for (int i = 0; i < maxPoolSize; i++) {
executor.execute(() -> {
allFilled.countDown();
try {
allFilled.await();
} catch (InterruptedException ex) {
if (failedError.get() == null) {
failedError.set(ex);
}
}
});
}
if (!allFilled.await(10, TimeUnit.SECONDS)) {
Assertions.fail("Not reached max pool size");
}
executor.testShutdown();
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
Assertions.fail("Not terminated");
}
Assertions.assertNull(failedError.get());
}
}