From 54f09d35acbca2bec0a447309128c2afb714d612 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Fri, 18 Sep 2020 00:43:35 +0200 Subject: [PATCH] Fix queue auto-resizing --- .../warp/commonutils/batch/ParallelUtils.java | 3 - .../AsyncStackTraceExecutorDecorator.java | 229 ------------------ ...yncStackTraceExecutorServiceDecorator.java | 33 --- ...ngOnFullQueueExecutorServiceDecorator.java | 61 +---- .../concurrency/executor/BoundedExecutor.java | 52 ---- .../executor/BoundedExecutorService.java | 49 ++-- .../executor/ExecutorDecorator.java | 12 - .../executor/ExecutorServiceDecorator.java | 22 -- .../PermitReleasingCallableDecorator.java | 11 +- .../PermitReleasingRunnableDecorator.java | 12 +- .../warp/commonutils/BoundedQueueTest.java | 57 ----- .../executor/BoundedQueueTest.java | 124 ++++++++++ 12 files changed, 152 insertions(+), 513 deletions(-) delete mode 100644 src/main/java/org/warp/commonutils/concurrency/executor/AsyncStackTraceExecutorDecorator.java delete mode 100644 src/main/java/org/warp/commonutils/concurrency/executor/AsyncStackTraceExecutorServiceDecorator.java delete mode 100644 src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutor.java delete mode 100644 src/test/java/org/warp/commonutils/BoundedQueueTest.java create mode 100644 src/test/java/org/warp/commonutils/concurrency/executor/BoundedQueueTest.java diff --git a/src/main/java/org/warp/commonutils/batch/ParallelUtils.java b/src/main/java/org/warp/commonutils/batch/ParallelUtils.java index de52de2..96af0c7 100644 --- a/src/main/java/org/warp/commonutils/batch/ParallelUtils.java +++ b/src/main/java/org/warp/commonutils/batch/ParallelUtils.java @@ -19,7 +19,6 @@ public class ParallelUtils { int parallelism, int groupSize, Consumer consumer) throws CompletionException { var parallelExecutor = BoundedExecutorService.create(maxQueueSize, - parallelism, parallelism, 0, TimeUnit.MILLISECONDS, @@ -76,7 +75,6 @@ public class ParallelUtils { int parallelism, int groupSize, BiConsumer consumer) throws CompletionException { var parallelExecutor = BoundedExecutorService.create(maxQueueSize, - parallelism, parallelism, 0, TimeUnit.MILLISECONDS, @@ -139,7 +137,6 @@ public class ParallelUtils { int groupSize, TriConsumer consumer) throws CompletionException { var parallelExecutor = BoundedExecutorService.create(maxQueueSize, - parallelism, parallelism, 0, TimeUnit.MILLISECONDS, diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/AsyncStackTraceExecutorDecorator.java b/src/main/java/org/warp/commonutils/concurrency/executor/AsyncStackTraceExecutorDecorator.java deleted file mode 100644 index fc347ea..0000000 --- a/src/main/java/org/warp/commonutils/concurrency/executor/AsyncStackTraceExecutorDecorator.java +++ /dev/null @@ -1,229 +0,0 @@ -package org.warp.commonutils.concurrency.executor; - -import java.lang.StackWalker.StackFrame; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Executor; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.jetbrains.annotations.NotNull; -import org.warp.commonutils.type.IntWrapper; - -public class AsyncStackTraceExecutorDecorator extends ExecutorDecorator { - - private static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - private static final Map threadToTask = new HashMap<>(); - private static final Map threadToExecutor = new HashMap<>(); - - public AsyncStackTraceExecutorDecorator(Executor executor) { - super(executor); - } - - @Override - public void execute(@NotNull Runnable command) { - var currentThread = Thread.currentThread(); - List frames = new ArrayList<>(); - - LSTTask lstTask; - - lock.readLock().lock(); - try { - lstTask = threadToTask.getOrDefault(currentThread, null); - - // Add the current stack frames - addCurrentStackTrace(frames, lstTask != null); - - if (lstTask != null) { - frames.addAll(lstTask.frames); - } - lstTask = new LSTTask(command, frames); - - //System.out.println("execute(): THREAD-" + Thread.currentThread().hashCode() + " TASK-" + lstTask.hashCode()); - } finally { - lock.readLock().unlock(); - } - - super.execute(lstTask); - } - - private static void addCurrentStackTrace(List frames, boolean isFromAsyncCal) { - StackWalker.getInstance().walk(a -> { - IntWrapper count = new IntWrapper(0); - int STACK_MAX_SIZE = 10; - a.filter(x -> { - var cn = x.getClassName(); - return !cn.equals("java.util.concurrent.CompletableFuture") - && !cn.equals("java.util.concurrent.CompletableFuture$AsyncRun") - && !cn.equals("java.util.concurrent.ThreadPoolExecutor") - && !cn.equals("java.util.concurrent.ThreadPoolExecutor$Worker") - && !cn.equals("java.lang.Thread") - && !cn.equals(LSTTask.class.getName()) - && !cn.equals(AsyncStackTraceExecutorDecorator.class.getName()); - }).skip(0).limit(STACK_MAX_SIZE + 1).peek(x -> count.var++).forEachOrdered(frames::add); - if (count.var > STACK_MAX_SIZE) { - frames.remove(frames.size() - 1); - frames.add(new TextStackFrame("AndMoreFrames")); - } - return null; - }); - if (isFromAsyncCal) { - frames.add(new TextStackFrame("AsyncCall")); - } - } - - - class LSTTask implements Runnable { - private final Runnable runnable; - List frames; - - LSTTask(Runnable runnable, List frames) { - this.runnable = runnable; - this.frames = frames; - } - - @Override - public void run() { - var currentThread = Thread.currentThread(); - - lock.writeLock().lock(); - try { - threadToTask.put(currentThread, LSTTask.this); - threadToExecutor.put(currentThread, AsyncStackTraceExecutorDecorator.this); - } finally { - lock.writeLock().unlock(); - } - try { - //System.out.println(" run(): THREAD-" + Thread.currentThread().hashCode() + " TASK-" + this.hashCode()); - runnable.run(); - } catch (Throwable t) { - RuntimeException e = new RuntimeException(t); - e.setStackTrace(frames.stream().map(StackFrame::toStackTraceElement).toArray(StackTraceElement[]::new)); - throw e; - } - lock.writeLock().lock(); - try { - threadToExecutor.remove(currentThread, AsyncStackTraceExecutorDecorator.this); - threadToTask.remove(currentThread, LSTTask.this); - } finally { - lock.writeLock().unlock(); - } - } - } - - public static void fixStackTrace(Exception ex) { - List result = new ArrayList<>(); - var currentThread = Thread.currentThread(); - - lock.readLock().lock(); - try { - var executor = threadToExecutor.getOrDefault(currentThread, null); - if (executor != null) { - LSTTask lstTask = threadToTask.getOrDefault(currentThread, null); - if (lstTask != null) { - var currentStackFrames = new ArrayList(); - addCurrentStackTrace(currentStackFrames, true); - for (var frame : currentStackFrames) { - result.add(frame.toStackTraceElement()); - } - - for (var frame : lstTask.frames) { - result.add(frame.toStackTraceElement()); - } - ex.setStackTrace(result.toArray(StackTraceElement[]::new)); - } - } - } finally { - lock.readLock().unlock(); - } - } - - public static void dumpStack() { - var currentThread = Thread.currentThread(); - - lock.readLock().lock(); - try { - var executor = threadToExecutor.getOrDefault(currentThread, null); - if (executor != null) { - LSTTask lstTask = threadToTask.getOrDefault(currentThread, null); - if (lstTask != null) { - StringBuilder sb = new StringBuilder(); - sb.append(new Exception("Stack trace").toString()).append('\n'); - var currentStackFrames = new ArrayList(); - addCurrentStackTrace(currentStackFrames, true); - for (var frame : currentStackFrames) { - printStackFrame(sb, frame); - } - - for (var frame : lstTask.frames) { - printStackFrame(sb, frame); - } - System.err.println(sb.toString()); - return; - } - } - Thread.dumpStack(); - } finally { - lock.readLock().unlock(); - } - } - - private static void printStackFrame(StringBuilder sb, StackFrame frame) { - if(frame.getClassName().equals("AsyncCall")) { - sb.append("\t(async call)\n"); - } else if(frame.getClassName().equals("AndMoreFrames")) { - sb.append("\t... omitted more frames\n"); - } else { - sb.append("\tat ").append(frame.toString()).append('\n'); - } - } - - private static class TextStackFrame implements StackFrame { - - private final String text; - - public TextStackFrame(String text) { - this.text = text; - } - - @Override - public String getClassName() { - return text; - } - - @Override - public String getMethodName() { - return ".."; - } - - @Override - public Class getDeclaringClass() { - return Object.class; - } - - @Override - public int getByteCodeIndex() { - return 0; - } - - @Override - public String getFileName() { - return null; - } - - @Override - public int getLineNumber() { - return 0; - } - - @Override - public boolean isNativeMethod() { - return false; - } - - @Override - public StackTraceElement toStackTraceElement() { - return new StackTraceElement(getClassName(), getMethodName(), getFileName(), getLineNumber()); - } - } -} \ No newline at end of file diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/AsyncStackTraceExecutorServiceDecorator.java b/src/main/java/org/warp/commonutils/concurrency/executor/AsyncStackTraceExecutorServiceDecorator.java deleted file mode 100644 index d4be997..0000000 --- a/src/main/java/org/warp/commonutils/concurrency/executor/AsyncStackTraceExecutorServiceDecorator.java +++ /dev/null @@ -1,33 +0,0 @@ -package org.warp.commonutils.concurrency.executor; - -import java.util.concurrent.ExecutorService; -import org.jetbrains.annotations.NotNull; - -public class AsyncStackTraceExecutorServiceDecorator extends SimplerExecutorServiceDecorator { - - // todo: Fix async stacktrace performance and memory problems - private static final boolean DISABLE_ASYNC_STACKTRACES_GLOBALLY = true; - - public AsyncStackTraceExecutorServiceDecorator(ExecutorService executorService) { - super(executorService, (executor) -> { - if (DISABLE_ASYNC_STACKTRACES_GLOBALLY) { - return executor; - } - - // Do nothing if it has already the asyncstacktrace executor service decorator - if (executorService instanceof ExecutorServiceDecorator) { - var decorators = ((ExecutorServiceDecorator) executorService).getExecutorServiceDecorators(); - if (decorators.contains(AsyncStackTraceExecutorServiceDecorator.class)) { - return new ExecutorDecorator(executorService) { - @Override - public void execute(@NotNull Runnable runnable) { - super.execute(runnable); - } - }; - } - } - - return new AsyncStackTraceExecutorDecorator(executor); - }); - } -} \ No newline at end of file diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/BlockingOnFullQueueExecutorServiceDecorator.java b/src/main/java/org/warp/commonutils/concurrency/executor/BlockingOnFullQueueExecutorServiceDecorator.java index 9eb0500..fe1d8d2 100644 --- a/src/main/java/org/warp/commonutils/concurrency/executor/BlockingOnFullQueueExecutorServiceDecorator.java +++ b/src/main/java/org/warp/commonutils/concurrency/executor/BlockingOnFullQueueExecutorServiceDecorator.java @@ -1,7 +1,5 @@ package org.warp.commonutils.concurrency.executor; -import static java.util.concurrent.TimeUnit.MILLISECONDS; - import java.time.Duration; import java.util.Collection; import java.util.List; @@ -9,10 +7,7 @@ import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.StampedLock; import java.util.function.BiConsumer; import java.util.function.Supplier; import javax.annotation.Nonnull; @@ -23,11 +18,6 @@ public class BlockingOnFullQueueExecutorServiceDecorator extends ExecutorService private volatile boolean ignoreTaskLimit; - private final StampedLock drainAllLock = new StampedLock(); - - @Nonnull - private final Semaphore taskLimit; - @Nonnull private final Duration timeout; @@ -38,12 +28,8 @@ public class BlockingOnFullQueueExecutorServiceDecorator extends ExecutorService private final @Nullable BiConsumer queueSizeStatus; - @Nonnull - private final Object queueSizeStatusLock; - public BlockingOnFullQueueExecutorServiceDecorator(@Nonnull final ExecutorService executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout, @Nonnull Supplier queueSizeSupplier, @Nullable BiConsumer queueSizeStatus) { super(executor); - ExecutorServiceDecorator.hasDecorator(executor, this.getClass()); if (maximumTaskNumber < 0) { throw new IllegalArgumentException(String.format("At least zero tasks must be permitted, not '%d'", maximumTaskNumber)); } else if (maximumTaskNumber == 0) { @@ -56,8 +42,6 @@ public class BlockingOnFullQueueExecutorServiceDecorator extends ExecutorService this.maximumTaskNumber = maximumTaskNumber; this.queueSizeSupplier = queueSizeSupplier; this.queueSizeStatus = queueSizeStatus; - this.queueSizeStatusLock = new Object(); - this.taskLimit = new Semaphore(maximumTaskNumber); } public BlockingOnFullQueueExecutorServiceDecorator(@Nonnull final ExecutorService executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout, @Nonnull Supplier queueSizeSupplier) { @@ -67,33 +51,13 @@ public class BlockingOnFullQueueExecutorServiceDecorator extends ExecutorService private void preExecute(Object command) { Objects.requireNonNull(command, "'command' must not be null"); - if (!ignoreTaskLimit) { - try { - if (this.taskLimit.availablePermits() == 0) { - synchronized (queueSizeStatusLock) { - if (queueSizeStatus != null) - queueSizeStatus.accept(true, - maximumTaskNumber + (taskLimit.hasQueuedThreads() ? taskLimit.getQueueLength() : 0) - ); - } - } - // attempt to acquire permit for task execution - if (!this.taskLimit.tryAcquire(this.timeout.toMillis(), MILLISECONDS)) { - throw new RejectedExecutionException(String.format("Executor '%s' busy", super.toString())); - } - } catch (final InterruptedException e) { - // restore interrupt status - Thread.currentThread().interrupt(); - throw new RejectedExecutionException(e); - } - } } @Override public final void execute(final @NotNull Runnable command) { preExecute(command); - super.execute(new PermitReleasingRunnableDecorator(command, this::updateQueue, this.taskLimit)); + super.execute(new PermitReleasingRunnableDecorator(command, this::updateQueue)); } @NotNull @@ -101,7 +65,7 @@ public class BlockingOnFullQueueExecutorServiceDecorator extends ExecutorService public Future submit(@NotNull Callable task) { preExecute(task); - return super.submit(new PermitReleasingCallableDecorator<>(task, this::updateQueue, this.taskLimit)); + return super.submit(new PermitReleasingCallableDecorator<>(task, this::updateQueue)); } @NotNull @@ -109,7 +73,7 @@ public class BlockingOnFullQueueExecutorServiceDecorator extends ExecutorService public Future submit(@NotNull Runnable task, T result) { preExecute(task); - return super.submit(new PermitReleasingRunnableDecorator(task, this::updateQueue, this.taskLimit), result); + return super.submit(new PermitReleasingRunnableDecorator(task, this::updateQueue), result); } @NotNull @@ -117,23 +81,23 @@ public class BlockingOnFullQueueExecutorServiceDecorator extends ExecutorService public Future submit(@NotNull Runnable task) { preExecute(task); - return super.submit(new PermitReleasingRunnableDecorator(task, this::updateQueue, this.taskLimit)); + return super.submit(new PermitReleasingRunnableDecorator(task, this::updateQueue)); } private void updateQueue(boolean beforeRunning) { var queueSize = queueSizeSupplier.get() + (beforeRunning ? 1 : 0); - synchronized (queueSizeStatusLock) { - if (queueSizeStatus != null) queueSizeStatus.accept(!ignoreTaskLimit && queueSize >= maximumTaskNumber, queueSize); - } + var full = !ignoreTaskLimit && queueSize >= maximumTaskNumber; + if (queueSizeStatus != null) queueSizeStatus.accept(full, queueSize); } @Override public void shutdown() { this.ignoreTaskLimit = true; - while (this.taskLimit.hasQueuedThreads()) { - this.taskLimit.release(10); - } + super.shutdown(); + } + + void testShutdown() { super.shutdown(); } @@ -141,9 +105,6 @@ public class BlockingOnFullQueueExecutorServiceDecorator extends ExecutorService @Override public List shutdownNow() { this.ignoreTaskLimit = true; - while (this.taskLimit.hasQueuedThreads()) { - this.taskLimit.release(10); - } return super.shutdownNow(); } @@ -189,7 +150,7 @@ public class BlockingOnFullQueueExecutorServiceDecorator extends ExecutorService @Override public final String toString() { - return String.format("%s[availablePermits='%s',timeout='%s',delegate='%s']", getClass().getSimpleName(), this.taskLimit.availablePermits(), + return String.format("%s[timeout='%s',delegate='%s']", getClass().getSimpleName(), this.timeout, super.toString()); } } \ No newline at end of file diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutor.java b/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutor.java deleted file mode 100644 index dab2ce9..0000000 --- a/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutor.java +++ /dev/null @@ -1,52 +0,0 @@ -package org.warp.commonutils.concurrency.executor; - -import java.util.concurrent.Executor; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.Semaphore; -import java.util.concurrent.locks.StampedLock; -import org.jetbrains.annotations.NotNull; - -public class BoundedExecutor { - - private final Executor executor; - private final int maxQueueSize; - private final Semaphore semaphore; - private final StampedLock drainAllLock = new StampedLock(); - - public BoundedExecutor(Executor executor, int maxQueueSize) { - this.executor = executor; - this.maxQueueSize = maxQueueSize > 0 ? maxQueueSize : Integer.MAX_VALUE; - this.semaphore = new Semaphore(maxQueueSize); - } - - public void executeButBlockIfFull(@NotNull Runnable command) throws RejectedExecutionException, InterruptedException { - var drainAllLockRead = drainAllLock.readLockInterruptibly(); - semaphore.acquire(); - try { - executor.execute(() -> { - try { - semaphore.release(); - command.run(); - } finally { - drainAllLock.unlockRead(drainAllLockRead); - } - }); - } catch (RejectedExecutionException | NullPointerException ex) { - drainAllLock.unlockRead(drainAllLockRead); - throw ex; - } - } - - public void drainAll(DrainAllMethodLambda runnableWhenDrained) throws InterruptedException { - var drainAllWriteLock = drainAllLock.writeLockInterruptibly(); - try { - runnableWhenDrained.run(); - } finally { - drainAllLock.unlockWrite(drainAllWriteLock); - } - } - - public interface DrainAllMethodLambda { - void run() throws InterruptedException; - } -} diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorService.java b/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorService.java index 368e87f..5b60e73 100644 --- a/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorService.java +++ b/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorService.java @@ -2,7 +2,6 @@ package org.warp.commonutils.concurrency.executor; import java.time.Duration; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; @@ -20,76 +19,55 @@ public class BoundedExecutorService { } @Deprecated - public static ExecutorService createUnbounded( - int corePoolSize, + public static BlockingOnFullQueueExecutorServiceDecorator createUnbounded( int maxPoolSize, long keepAliveTime, TimeUnit unit, @Nullable BiConsumer queueSizeStatus) { - return create(0, corePoolSize, maxPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueSizeStatus); + return create(0, maxPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueSizeStatus); } - public static ExecutorService createUnbounded( - int corePoolSize, + public static BlockingOnFullQueueExecutorServiceDecorator createUnbounded( int maxPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, @Nullable BiConsumer queueSizeStatus) { - return createCustom(0, corePoolSize, maxPoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(100000), queueSizeStatus, new LinkedBlockingQueue<>(MAX_BLOCKING_QUEUE_SIZE)); + return createCustom(0, maxPoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(100000), queueSizeStatus, new LinkedBlockingQueue<>()); } - public static ExecutorService createUnbounded( - int corePoolSize, + public static BlockingOnFullQueueExecutorServiceDecorator createUnbounded( int maxPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, @Nullable BiConsumer queueSizeStatus, BlockingQueue queue) { - return createCustom(0, corePoolSize, maxPoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(100000), queueSizeStatus, queue); + return createCustom(0, maxPoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(100000), queueSizeStatus, queue); } @Deprecated - public static ExecutorService create( + public static BlockingOnFullQueueExecutorServiceDecorator create( int maxQueueSize, - int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit, @Nullable BiConsumer queueSizeStatus) { - if (corePoolSize <= 0) throw new IllegalArgumentException("Core pool size must be >=1 if the executor service is bounded"); - return create(maxQueueSize, corePoolSize, maxPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueSizeStatus); + return create(maxQueueSize, maxPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueSizeStatus); } - public static ExecutorService create( + public static BlockingOnFullQueueExecutorServiceDecorator create( int maxQueueSize, - int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, @Nullable BiConsumer queueSizeStatus) { - if (corePoolSize <= 0) throw new IllegalArgumentException("Core pool size must be >=1 if the executor service is bounded"); - return createCustom(maxQueueSize, corePoolSize, maxPoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(100000), queueSizeStatus, new LinkedBlockingQueue<>(MAX_BLOCKING_QUEUE_SIZE)); + return createCustom(maxQueueSize, maxPoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(100000), queueSizeStatus, new LinkedBlockingQueue<>(maxQueueSize)); } - public static ExecutorService create( + public static BlockingOnFullQueueExecutorServiceDecorator createCustom( int maxQueueSize, - int corePoolSize, - int maxPoolSize, - long keepAliveTime, - TimeUnit unit, - ThreadFactory threadFactory, - @Nullable BiConsumer queueSizeStatus, - BlockingQueue queue) { - if (corePoolSize <= 0) throw new IllegalArgumentException("Core pool size must be >=1 if the executor service is bounded"); - return createCustom(maxQueueSize, corePoolSize, maxPoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(100000), queueSizeStatus, queue); - } - - public static ExecutorService createCustom( - int maxQueueSize, - int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit, @@ -97,13 +75,16 @@ public class BoundedExecutorService { Duration queueItemTtl, @Nullable BiConsumer queueSizeStatus, BlockingQueue queue) { - ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, + ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(maxPoolSize, maxPoolSize, keepAliveTime, unit, queue, threadFactory ); + if (keepAliveTime > 0) { + threadPoolExecutor.allowCoreThreadTimeOut(true); + } threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return new BlockingOnFullQueueExecutorServiceDecorator(threadPoolExecutor, maxQueueSize, diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/ExecutorDecorator.java b/src/main/java/org/warp/commonutils/concurrency/executor/ExecutorDecorator.java index 4d779a5..021b02e 100644 --- a/src/main/java/org/warp/commonutils/concurrency/executor/ExecutorDecorator.java +++ b/src/main/java/org/warp/commonutils/concurrency/executor/ExecutorDecorator.java @@ -1,8 +1,6 @@ package org.warp.commonutils.concurrency.executor; -import java.util.HashSet; import java.util.Objects; -import java.util.Set; import java.util.concurrent.Executor; import org.jetbrains.annotations.NotNull; @@ -13,16 +11,6 @@ public abstract class ExecutorDecorator implements Executor { this.executor = Objects.requireNonNull(executor); } - public final Set> getExecutorDecorators() { - if (executor instanceof ExecutorDecorator) { - var decorators = ((ExecutorDecorator) executor).getExecutorDecorators(); - decorators.add(this.getClass()); - return decorators; - } else { - return new HashSet<>(); - } - } - @Override public void execute(@NotNull Runnable runnable) { executor.execute(runnable); diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/ExecutorServiceDecorator.java b/src/main/java/org/warp/commonutils/concurrency/executor/ExecutorServiceDecorator.java index c5309b0..6ad3093 100644 --- a/src/main/java/org/warp/commonutils/concurrency/executor/ExecutorServiceDecorator.java +++ b/src/main/java/org/warp/commonutils/concurrency/executor/ExecutorServiceDecorator.java @@ -1,10 +1,8 @@ package org.warp.commonutils.concurrency.executor; import java.util.Collection; -import java.util.HashSet; import java.util.List; import java.util.Objects; -import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -20,26 +18,6 @@ public abstract class ExecutorServiceDecorator implements ExecutorService { this.executorService = Objects.requireNonNull(executorService); } - protected static boolean hasDecorator(ExecutorService executor, - Class decoratorClass) { - if (executor instanceof ExecutorServiceDecorator) { - var executorServiceDecoratorImpl = (ExecutorServiceDecorator) executor; - var executorServiceDecorators = executorServiceDecoratorImpl.getExecutorServiceDecorators(); - return executorServiceDecorators.contains(decoratorClass); - } - return false; - } - - public final Set> getExecutorServiceDecorators() { - if (executorService instanceof ExecutorServiceDecorator) { - var decorators = ((ExecutorServiceDecorator) executorService).getExecutorServiceDecorators(); - decorators.add(this.getClass()); - return decorators; - } else { - return new HashSet<>(); - } - } - @Override public void shutdown() { executorService.shutdown(); diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/PermitReleasingCallableDecorator.java b/src/main/java/org/warp/commonutils/concurrency/executor/PermitReleasingCallableDecorator.java index 8d5eabb..90c7055 100644 --- a/src/main/java/org/warp/commonutils/concurrency/executor/PermitReleasingCallableDecorator.java +++ b/src/main/java/org/warp/commonutils/concurrency/executor/PermitReleasingCallableDecorator.java @@ -1,7 +1,6 @@ package org.warp.commonutils.concurrency.executor; import java.util.concurrent.Callable; -import java.util.concurrent.Semaphore; import javax.annotation.Nonnull; public final class PermitReleasingCallableDecorator extends CallableDecorator { @@ -9,15 +8,10 @@ public final class PermitReleasingCallableDecorator extends CallableDecorator @Nonnull private final QueueSizeUpdater queueSizeUpdater; - @Nonnull - private final Semaphore semaphore; - PermitReleasingCallableDecorator(@Nonnull final Callable task, - @Nonnull final QueueSizeUpdater queueSizeUpdater, - @Nonnull final Semaphore semaphoreToRelease) { + @Nonnull final QueueSizeUpdater queueSizeUpdater) { super(task); this.queueSizeUpdater = queueSizeUpdater; - this.semaphore = semaphoreToRelease; } @Override @@ -25,9 +19,6 @@ public final class PermitReleasingCallableDecorator extends CallableDecorator try { queueSizeUpdater.update(true); } finally { - // however execution goes, release permit for next task - this.semaphore.release(); - try { return super.call(); } finally { diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/PermitReleasingRunnableDecorator.java b/src/main/java/org/warp/commonutils/concurrency/executor/PermitReleasingRunnableDecorator.java index fb6b988..349b21f 100644 --- a/src/main/java/org/warp/commonutils/concurrency/executor/PermitReleasingRunnableDecorator.java +++ b/src/main/java/org/warp/commonutils/concurrency/executor/PermitReleasingRunnableDecorator.java @@ -1,22 +1,15 @@ package org.warp.commonutils.concurrency.executor; -import java.util.concurrent.Semaphore; import javax.annotation.Nonnull; public final class PermitReleasingRunnableDecorator extends RunnableDecorator { @Nonnull private final QueueSizeUpdater queueSizeUpdater; - - @Nonnull - private final Semaphore semaphore; - PermitReleasingRunnableDecorator(@Nonnull final Runnable task, - @Nonnull final QueueSizeUpdater queueSizeUpdater, - @Nonnull final Semaphore semaphoreToRelease) { + @Nonnull final QueueSizeUpdater queueSizeUpdater) { super(task); this.queueSizeUpdater = queueSizeUpdater; - this.semaphore = semaphoreToRelease; } @Override @@ -24,9 +17,6 @@ public final class PermitReleasingRunnableDecorator extends RunnableDecorator { try { queueSizeUpdater.update(true); } finally { - // however execution goes, release permit for next task - this.semaphore.release(); - try { super.run(); } finally { diff --git a/src/test/java/org/warp/commonutils/BoundedQueueTest.java b/src/test/java/org/warp/commonutils/BoundedQueueTest.java deleted file mode 100644 index 88a7cbd..0000000 --- a/src/test/java/org/warp/commonutils/BoundedQueueTest.java +++ /dev/null @@ -1,57 +0,0 @@ -package org.warp.commonutils; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.opentest4j.AssertionFailedError; -import org.warp.commonutils.concurrency.executor.BoundedExecutorService; -import org.warp.commonutils.type.ShortNamedThreadFactory; - -public class BoundedQueueTest { - - @Test - public void testBoundedQueue() throws InterruptedException { - testBoundedQueue(1, 1); - testBoundedQueue(1, 10); - testBoundedQueue(4, 10); - } - - public void testBoundedQueue(int corePoolSize, int maxPoolSize) throws InterruptedException { - int maxQueueSize = 2; - AtomicInteger queueSize = new AtomicInteger(); - AtomicReference failedError = new AtomicReference<>(); - var executor = BoundedExecutorService.create(maxQueueSize, - corePoolSize, - maxPoolSize, - 0L, - TimeUnit.MILLISECONDS, - new ShortNamedThreadFactory("test"), - (isQueueFull, currentQueueSize) -> { - try { - if (currentQueueSize >= maxQueueSize) { - Assertions.assertTrue(isQueueFull); - } else { - Assertions.assertFalse(isQueueFull); - } - } catch (AssertionFailedError ex) { - if (failedError.get() == null) { - failedError.set(ex); - } - ex.printStackTrace(); - } - } - ); - - for (int i = 0; i < 10000; i++) { - queueSize.incrementAndGet(); - executor.execute(queueSize::decrementAndGet); - } - - executor.shutdown(); - executor.awaitTermination(10, TimeUnit.SECONDS); - - Assertions.assertNull(failedError.get()); - } -} diff --git a/src/test/java/org/warp/commonutils/concurrency/executor/BoundedQueueTest.java b/src/test/java/org/warp/commonutils/concurrency/executor/BoundedQueueTest.java new file mode 100644 index 0000000..e8de834 --- /dev/null +++ b/src/test/java/org/warp/commonutils/concurrency/executor/BoundedQueueTest.java @@ -0,0 +1,124 @@ +package org.warp.commonutils.concurrency.executor; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.opentest4j.AssertionFailedError; +import org.warp.commonutils.type.ShortNamedThreadFactory; + +public class BoundedQueueTest { + + @Test + public void testBoundedQueue() throws InterruptedException, ExecutionException { + testBoundedQueue(1, 1); + testBoundedQueue(1, 10); + testBoundedQueue(4, 10); + testBoundedQueue(0, 10); + } + + public void testBoundedQueue(int corePoolSize, int maxPoolSize) throws InterruptedException, ExecutionException { + int maxQueueSize = 2; + AtomicInteger queueSize = new AtomicInteger(); + AtomicReference failedError = new AtomicReference<>(); + AtomicInteger maxRecordedCurrentQueueSize = new AtomicInteger(0); + var executor = BoundedExecutorService.create(maxQueueSize, + maxPoolSize, + 0L, + TimeUnit.MILLISECONDS, + new ShortNamedThreadFactory("test"), + (isQueueFull, currentQueueSize) -> { + try { + if (currentQueueSize >= maxQueueSize) { + Assertions.assertTrue(isQueueFull); + } else { + Assertions.assertFalse(isQueueFull); + } + } catch (AssertionFailedError ex) { + if (failedError.get() == null) { + failedError.set(ex); + } + ex.printStackTrace(); + } + } + ); + + for (int i = 0; i < 10000; i++) { + queueSize.incrementAndGet(); + executor.execute(queueSize::decrementAndGet); + } + + executor.testShutdown(); + if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { + Assertions.fail("Not terminated"); + } + + Assertions.assertNull(failedError.get()); + } + + @Test + public void testBoundedQueueMaxPoolSize1_1() throws InterruptedException, ExecutionException { + testBoundedQueueMaxPoolSize( 1, 1); + } + + @Test + public void testBoundedQueueMaxPoolSize10_10() throws InterruptedException, ExecutionException { + testBoundedQueueMaxPoolSize( 10, 10); + } + + @Test + public void testBoundedQueueMaxPoolSize10_1() throws InterruptedException, ExecutionException { + testBoundedQueueMaxPoolSize( 10, 1); + } + + @Test + public void testBoundedQueueMaxPoolSize1_10() throws InterruptedException, ExecutionException { + testBoundedQueueMaxPoolSize( 1, 10); + } + + @Test + public void testBoundedQueueMaxPoolSize4_10() throws InterruptedException, ExecutionException { + testBoundedQueueMaxPoolSize( 4, 10); + } + + public void testBoundedQueueMaxPoolSize(int maxPoolSize, int maxQueueSize) throws InterruptedException, ExecutionException { + CountDownLatch allFilled = new CountDownLatch(maxPoolSize); + var executor = BoundedExecutorService.create(maxQueueSize, + maxPoolSize, + 0L, + TimeUnit.MILLISECONDS, + new ShortNamedThreadFactory("test"), + (isQueueFull, currentQueueSize) -> { + + } + ); + + AtomicReference failedError = new AtomicReference<>(); + for (int i = 0; i < maxPoolSize; i++) { + executor.execute(() -> { + allFilled.countDown(); + try { + allFilled.await(); + } catch (InterruptedException ex) { + if (failedError.get() == null) { + failedError.set(ex); + } + } + }); + } + + if (!allFilled.await(10, TimeUnit.SECONDS)) { + Assertions.fail("Not reached max pool size"); + } + + executor.testShutdown(); + if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { + Assertions.fail("Not terminated"); + } + + Assertions.assertNull(failedError.get()); + } +}