From 435fb8eb0fcc1eb910272355827080f62e0199a0 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Mon, 24 Aug 2020 23:48:05 +0200 Subject: [PATCH] Update to 1.0.4 --- pom.xml | 2 +- .../warp/commonutils/batch/ParallelUtils.java | 21 +- .../AsyncStackTraceExecutorDecorator.java | 229 ++++++++++++++++++ ...ngOnFullQueueExecutorServiceDecorator.java | 136 +++-------- .../executor/BoundedExecutorService.java | 48 +++- .../executor/CallableDecorator.java | 18 ++ .../executor/ExecutorDecorator.java | 30 +++ .../executor/ExecutorServiceDecorator.java | 119 +++++++++ .../PermitReleasingCallableDecorator.java | 39 +++ .../PermitReleasingRunnableDecorator.java | 38 +++ .../executor/RunnableDecorator.java | 17 ++ .../SimplerExecutorServiceDecorator.java | 101 ++++++++ 12 files changed, 681 insertions(+), 117 deletions(-) create mode 100644 src/main/java/org/warp/commonutils/concurrency/executor/AsyncStackTraceExecutorDecorator.java create mode 100644 src/main/java/org/warp/commonutils/concurrency/executor/CallableDecorator.java create mode 100644 src/main/java/org/warp/commonutils/concurrency/executor/ExecutorDecorator.java create mode 100644 src/main/java/org/warp/commonutils/concurrency/executor/ExecutorServiceDecorator.java create mode 100644 src/main/java/org/warp/commonutils/concurrency/executor/PermitReleasingCallableDecorator.java create mode 100644 src/main/java/org/warp/commonutils/concurrency/executor/PermitReleasingRunnableDecorator.java create mode 100644 src/main/java/org/warp/commonutils/concurrency/executor/RunnableDecorator.java create mode 100644 src/main/java/org/warp/commonutils/concurrency/executor/SimplerExecutorServiceDecorator.java diff --git a/pom.xml b/pom.xml index c752809..b7397fa 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ common-utils org.warp - 1.0.3 + 1.0.4 UTF-8 diff --git a/src/main/java/org/warp/commonutils/batch/ParallelUtils.java b/src/main/java/org/warp/commonutils/batch/ParallelUtils.java index 3076fa3..74a91f8 100644 --- a/src/main/java/org/warp/commonutils/batch/ParallelUtils.java +++ b/src/main/java/org/warp/commonutils/batch/ParallelUtils.java @@ -16,9 +16,15 @@ public class ParallelUtils { public static void parallelize(Consumer> iterator, int maxQueueSize, int parallelism, - int groupSize, - BiConsumer consumer) { - BoundedExecutorService parallelExecutor = BoundedExecutorService.create(maxQueueSize, parallelism, parallelism, 0, TimeUnit.MILLISECONDS, new ShortNamedThreadFactory("ForEachParallel"), (a, b) -> {}); + int groupSize, BiConsumer consumer) { + var parallelExecutor = BoundedExecutorService.create(maxQueueSize, + parallelism, + parallelism, + 0, + TimeUnit.MILLISECONDS, + new ShortNamedThreadFactory("ForEachParallel"), + (a, b) -> {} + ); final int CHUNK_SIZE = groupSize; IntWrapper count = new IntWrapper(CHUNK_SIZE); VariableWrapper keys = new VariableWrapper<>(new Object[CHUNK_SIZE]); @@ -58,7 +64,14 @@ public class ParallelUtils { int parallelism, int groupSize, TriConsumer consumer) { - BoundedExecutorService parallelExecutor = BoundedExecutorService.create(maxQueueSize, parallelism, parallelism, 0, TimeUnit.MILLISECONDS, new ShortNamedThreadFactory("ForEachParallel"), (a, b) -> {}); + var parallelExecutor = BoundedExecutorService.create(maxQueueSize, + parallelism, + parallelism, + 0, + TimeUnit.MILLISECONDS, + new ShortNamedThreadFactory("ForEachParallel"), + (a, b) -> {} + ); final int CHUNK_SIZE = groupSize; IntWrapper count = new IntWrapper(CHUNK_SIZE); VariableWrapper keys1 = new VariableWrapper<>(new Object[CHUNK_SIZE]); diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/AsyncStackTraceExecutorDecorator.java b/src/main/java/org/warp/commonutils/concurrency/executor/AsyncStackTraceExecutorDecorator.java new file mode 100644 index 0000000..fc347ea --- /dev/null +++ b/src/main/java/org/warp/commonutils/concurrency/executor/AsyncStackTraceExecutorDecorator.java @@ -0,0 +1,229 @@ +package org.warp.commonutils.concurrency.executor; + +import java.lang.StackWalker.StackFrame; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.jetbrains.annotations.NotNull; +import org.warp.commonutils.type.IntWrapper; + +public class AsyncStackTraceExecutorDecorator extends ExecutorDecorator { + + private static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private static final Map threadToTask = new HashMap<>(); + private static final Map threadToExecutor = new HashMap<>(); + + public AsyncStackTraceExecutorDecorator(Executor executor) { + super(executor); + } + + @Override + public void execute(@NotNull Runnable command) { + var currentThread = Thread.currentThread(); + List frames = new ArrayList<>(); + + LSTTask lstTask; + + lock.readLock().lock(); + try { + lstTask = threadToTask.getOrDefault(currentThread, null); + + // Add the current stack frames + addCurrentStackTrace(frames, lstTask != null); + + if (lstTask != null) { + frames.addAll(lstTask.frames); + } + lstTask = new LSTTask(command, frames); + + //System.out.println("execute(): THREAD-" + Thread.currentThread().hashCode() + " TASK-" + lstTask.hashCode()); + } finally { + lock.readLock().unlock(); + } + + super.execute(lstTask); + } + + private static void addCurrentStackTrace(List frames, boolean isFromAsyncCal) { + StackWalker.getInstance().walk(a -> { + IntWrapper count = new IntWrapper(0); + int STACK_MAX_SIZE = 10; + a.filter(x -> { + var cn = x.getClassName(); + return !cn.equals("java.util.concurrent.CompletableFuture") + && !cn.equals("java.util.concurrent.CompletableFuture$AsyncRun") + && !cn.equals("java.util.concurrent.ThreadPoolExecutor") + && !cn.equals("java.util.concurrent.ThreadPoolExecutor$Worker") + && !cn.equals("java.lang.Thread") + && !cn.equals(LSTTask.class.getName()) + && !cn.equals(AsyncStackTraceExecutorDecorator.class.getName()); + }).skip(0).limit(STACK_MAX_SIZE + 1).peek(x -> count.var++).forEachOrdered(frames::add); + if (count.var > STACK_MAX_SIZE) { + frames.remove(frames.size() - 1); + frames.add(new TextStackFrame("AndMoreFrames")); + } + return null; + }); + if (isFromAsyncCal) { + frames.add(new TextStackFrame("AsyncCall")); + } + } + + + class LSTTask implements Runnable { + private final Runnable runnable; + List frames; + + LSTTask(Runnable runnable, List frames) { + this.runnable = runnable; + this.frames = frames; + } + + @Override + public void run() { + var currentThread = Thread.currentThread(); + + lock.writeLock().lock(); + try { + threadToTask.put(currentThread, LSTTask.this); + threadToExecutor.put(currentThread, AsyncStackTraceExecutorDecorator.this); + } finally { + lock.writeLock().unlock(); + } + try { + //System.out.println(" run(): THREAD-" + Thread.currentThread().hashCode() + " TASK-" + this.hashCode()); + runnable.run(); + } catch (Throwable t) { + RuntimeException e = new RuntimeException(t); + e.setStackTrace(frames.stream().map(StackFrame::toStackTraceElement).toArray(StackTraceElement[]::new)); + throw e; + } + lock.writeLock().lock(); + try { + threadToExecutor.remove(currentThread, AsyncStackTraceExecutorDecorator.this); + threadToTask.remove(currentThread, LSTTask.this); + } finally { + lock.writeLock().unlock(); + } + } + } + + public static void fixStackTrace(Exception ex) { + List result = new ArrayList<>(); + var currentThread = Thread.currentThread(); + + lock.readLock().lock(); + try { + var executor = threadToExecutor.getOrDefault(currentThread, null); + if (executor != null) { + LSTTask lstTask = threadToTask.getOrDefault(currentThread, null); + if (lstTask != null) { + var currentStackFrames = new ArrayList(); + addCurrentStackTrace(currentStackFrames, true); + for (var frame : currentStackFrames) { + result.add(frame.toStackTraceElement()); + } + + for (var frame : lstTask.frames) { + result.add(frame.toStackTraceElement()); + } + ex.setStackTrace(result.toArray(StackTraceElement[]::new)); + } + } + } finally { + lock.readLock().unlock(); + } + } + + public static void dumpStack() { + var currentThread = Thread.currentThread(); + + lock.readLock().lock(); + try { + var executor = threadToExecutor.getOrDefault(currentThread, null); + if (executor != null) { + LSTTask lstTask = threadToTask.getOrDefault(currentThread, null); + if (lstTask != null) { + StringBuilder sb = new StringBuilder(); + sb.append(new Exception("Stack trace").toString()).append('\n'); + var currentStackFrames = new ArrayList(); + addCurrentStackTrace(currentStackFrames, true); + for (var frame : currentStackFrames) { + printStackFrame(sb, frame); + } + + for (var frame : lstTask.frames) { + printStackFrame(sb, frame); + } + System.err.println(sb.toString()); + return; + } + } + Thread.dumpStack(); + } finally { + lock.readLock().unlock(); + } + } + + private static void printStackFrame(StringBuilder sb, StackFrame frame) { + if(frame.getClassName().equals("AsyncCall")) { + sb.append("\t(async call)\n"); + } else if(frame.getClassName().equals("AndMoreFrames")) { + sb.append("\t... omitted more frames\n"); + } else { + sb.append("\tat ").append(frame.toString()).append('\n'); + } + } + + private static class TextStackFrame implements StackFrame { + + private final String text; + + public TextStackFrame(String text) { + this.text = text; + } + + @Override + public String getClassName() { + return text; + } + + @Override + public String getMethodName() { + return ".."; + } + + @Override + public Class getDeclaringClass() { + return Object.class; + } + + @Override + public int getByteCodeIndex() { + return 0; + } + + @Override + public String getFileName() { + return null; + } + + @Override + public int getLineNumber() { + return 0; + } + + @Override + public boolean isNativeMethod() { + return false; + } + + @Override + public StackTraceElement toStackTraceElement() { + return new StackTraceElement(getClassName(), getMethodName(), getFileName(), getLineNumber()); + } + } +} \ No newline at end of file diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/BlockingOnFullQueueExecutorServiceDecorator.java b/src/main/java/org/warp/commonutils/concurrency/executor/BlockingOnFullQueueExecutorServiceDecorator.java index 05d3c3d..5eea9f7 100644 --- a/src/main/java/org/warp/commonutils/concurrency/executor/BlockingOnFullQueueExecutorServiceDecorator.java +++ b/src/main/java/org/warp/commonutils/concurrency/executor/BlockingOnFullQueueExecutorServiceDecorator.java @@ -7,100 +7,24 @@ import java.util.Collection; import java.util.List; import java.util.Objects; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.StampedLock; import java.util.function.BiConsumer; import java.util.function.Supplier; import javax.annotation.Nonnull; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecutorService { - - private final void updateQueue() { - var queueSize = queueSizeSupplier.get(); - synchronized (queueSizeStatusLock) { - if (queueSizeStatus != null) queueSizeStatus.accept(queueSize >= maximumTaskNumber, queueSize); - } - }; - - private static final class PermitReleasingRunnableDecorator implements Runnable { - - @Nonnull - private final Runnable delegate; - - @Nonnull - private final Runnable queueSizeUpdater; - - @Nonnull - private final Semaphore semaphore; - - private PermitReleasingRunnableDecorator(@Nonnull final Runnable task, @Nonnull final Runnable queueSizeUpdater, @Nonnull final Semaphore semaphoreToRelease) { - this.delegate = task; - this.queueSizeUpdater = queueSizeUpdater; - this.semaphore = semaphoreToRelease; - } - - @Override - public void run() { - try { - queueSizeUpdater.run(); - } finally { - // however execution goes, release permit for next task - this.semaphore.release(); - - this.delegate.run(); - } - } - - @Override - public final String toString() { - return String.format("%s[delegate='%s']", getClass().getSimpleName(), this.delegate); - } - } - - private static final class PermitReleasingCallableDecorator implements Callable { - - @Nonnull - private final Callable delegate; - - @Nonnull - private final Runnable queueSizeUpdater; - - @Nonnull - private final Semaphore semaphore; - - private PermitReleasingCallableDecorator(@Nonnull final Callable task, @Nonnull final Runnable queueSizeUpdater, @Nonnull final Semaphore semaphoreToRelease) { - this.delegate = task; - this.queueSizeUpdater = queueSizeUpdater; - this.semaphore = semaphoreToRelease; - } - - @Override - public T call() throws Exception { - try { - queueSizeUpdater.run(); - } finally { - // however execution goes, release permit for next task - this.semaphore.release(); - - return this.delegate.call(); - } - } - - @Override - public final String toString() { - return String.format("%s[delegate='%s']", getClass().getSimpleName(), this.delegate); - } - } +public class BlockingOnFullQueueExecutorServiceDecorator extends ExecutorServiceDecorator { private volatile boolean ignoreTaskLimit; + private final StampedLock drainAllLock = new StampedLock(); + @Nonnull private final Semaphore taskLimit; @@ -117,11 +41,9 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu @Nonnull private final Object queueSizeStatusLock; - @Nonnull - private final ExecutorService delegate; - public BlockingOnFullQueueExecutorServiceDecorator(@Nonnull final ExecutorService executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout, @Nonnull Supplier queueSizeSupplier, @Nullable BiConsumer queueSizeStatus) { - this.delegate = Objects.requireNonNull(executor, "'executor' must not be null"); + super(executor); + ExecutorServiceDecorator.hasDecorator(executor, this.getClass()); if (maximumTaskNumber < 0) { throw new IllegalArgumentException(String.format("At least zero tasks must be permitted, not '%d'", maximumTaskNumber)); } else if (maximumTaskNumber == 0) { @@ -138,6 +60,18 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu this.taskLimit = new Semaphore(maximumTaskNumber); } + public BlockingOnFullQueueExecutorServiceDecorator(@Nonnull final ExecutorService executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout, @Nonnull Supplier queueSizeSupplier) { + this(executor, maximumTaskNumber, maximumTimeout, queueSizeSupplier, null); + } + + private void updateQueue() { + var queueSize = queueSizeSupplier.get(); + synchronized (queueSizeStatusLock) { + if (queueSizeStatus != null) queueSizeStatus.accept(queueSize >= maximumTaskNumber, queueSize); + } + } + + private void preExecute(Object command) { Objects.requireNonNull(command, "'command' must not be null"); if (!ignoreTaskLimit) { @@ -152,7 +86,7 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu } // attempt to acquire permit for task execution if (!this.taskLimit.tryAcquire(this.timeout.toMillis(), MILLISECONDS)) { - throw new RejectedExecutionException(String.format("Executor '%s' busy", this.delegate)); + throw new RejectedExecutionException(String.format("Executor '%s' busy", super.toString())); } } catch (final InterruptedException e) { // restore interrupt status @@ -163,10 +97,10 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu } @Override - public final void execute(final Runnable command) { + public final void execute(final @NotNull Runnable command) { preExecute(command); - this.delegate.execute(new PermitReleasingRunnableDecorator(command, () -> { + super.execute(new PermitReleasingRunnableDecorator(command, () -> { var queueSize = queueSizeSupplier.get(); synchronized (queueSizeStatusLock) { if (queueSizeStatus != null) queueSizeStatus.accept(!ignoreTaskLimit && queueSize >= maximumTaskNumber, queueSize); @@ -181,7 +115,7 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu while (this.taskLimit.hasQueuedThreads()) { this.taskLimit.release(10); } - this.delegate.shutdown(); + super.shutdown(); } @NotNull @@ -191,22 +125,22 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu while (this.taskLimit.hasQueuedThreads()) { this.taskLimit.release(10); } - return this.delegate.shutdownNow(); + return super.shutdownNow(); } @Override public boolean isShutdown() { - return this.delegate.isShutdown(); + return super.isShutdown(); } @Override public boolean isTerminated() { - return this.delegate.isTerminated(); + return super.isTerminated(); } @Override public boolean awaitTermination(long timeout, @NotNull TimeUnit unit) throws InterruptedException { - return this.delegate.awaitTermination(timeout, unit); + return super.awaitTermination(timeout, unit); } @NotNull @@ -214,7 +148,7 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu public Future submit(@NotNull Callable task) { preExecute(task); - return this.delegate.submit(new PermitReleasingCallableDecorator<>(task, this::updateQueue, this.taskLimit)); + return super.submit(new PermitReleasingCallableDecorator<>(task, this::updateQueue, this.taskLimit)); } @NotNull @@ -222,7 +156,7 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu public Future submit(@NotNull Runnable task, T result) { preExecute(task); - return this.delegate.submit(new PermitReleasingRunnableDecorator(task, this::updateQueue, this.taskLimit), result); + return super.submit(new PermitReleasingRunnableDecorator(task, this::updateQueue, this.taskLimit), result); } @NotNull @@ -230,12 +164,12 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu public Future submit(@NotNull Runnable task) { preExecute(task); - return this.delegate.submit(new PermitReleasingRunnableDecorator(task, this::updateQueue, this.taskLimit)); + return super.submit(new PermitReleasingRunnableDecorator(task, this::updateQueue, this.taskLimit)); } @NotNull @Override - public List> invokeAll(@NotNull Collection> tasks) throws InterruptedException { + public List> invokeAll(@NotNull Collection> tasks) { throw new UnsupportedOperationException("invokeAll(tasks) is not supported"); } @@ -243,26 +177,24 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu @Override public List> invokeAll(@NotNull Collection> tasks, long timeout, - @NotNull TimeUnit unit) throws InterruptedException { + @NotNull TimeUnit unit) { throw new UnsupportedOperationException("invokeAll(tasks, timeout, unit) is not supported"); } @NotNull @Override - public T invokeAny(@NotNull Collection> tasks) - throws InterruptedException, ExecutionException { + public T invokeAny(@NotNull Collection> tasks) { throw new UnsupportedOperationException("invokeAny(tasks) is not supported"); } @Override - public T invokeAny(@NotNull Collection> tasks, long timeout, @NotNull TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { + public T invokeAny(@NotNull Collection> tasks, long timeout, @NotNull TimeUnit unit) { throw new UnsupportedOperationException("invokeAny(tasks, timeout, unit) is not supported"); } @Override public final String toString() { return String.format("%s[availablePermits='%s',timeout='%s',delegate='%s']", getClass().getSimpleName(), this.taskLimit.availablePermits(), - this.timeout, this.delegate); + this.timeout, super.toString()); } } \ No newline at end of file diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorService.java b/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorService.java index bb8aa85..a9c8af5 100644 --- a/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorService.java +++ b/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorService.java @@ -1,6 +1,7 @@ package org.warp.commonutils.concurrency.executor; import java.time.Duration; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -10,10 +11,14 @@ import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import org.jetbrains.annotations.Nullable; -public interface BoundedExecutorService extends ExecutorService { +public class BoundedExecutorService { + + private BoundedExecutorService() { + + } @Deprecated - static ExecutorService createUnbounded( + public static ExecutorService createUnbounded( int corePoolSize, int maxPoolSize, long keepAliveTime, @@ -22,18 +27,29 @@ public interface BoundedExecutorService extends ExecutorService { return create(0, corePoolSize, maxPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueSizeStatus); } - static ExecutorService createUnbounded( + public static ExecutorService createUnbounded( int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, @Nullable BiConsumer queueSizeStatus) { - return create(0, corePoolSize, maxPoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(100000), queueSizeStatus); + return createCustom(0, corePoolSize, maxPoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(100000), queueSizeStatus, new LinkedBlockingQueue<>()); + } + + public static ExecutorService createUnbounded( + int corePoolSize, + int maxPoolSize, + long keepAliveTime, + TimeUnit unit, + ThreadFactory threadFactory, + @Nullable BiConsumer queueSizeStatus, + BlockingQueue queue) { + return createCustom(0, corePoolSize, maxPoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(100000), queueSizeStatus, queue); } @Deprecated - static BoundedExecutorService create( + public static ExecutorService create( int maxQueueSize, int corePoolSize, int maxPoolSize, @@ -43,7 +59,7 @@ public interface BoundedExecutorService extends ExecutorService { return create(maxQueueSize, corePoolSize, maxPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueSizeStatus); } - static BoundedExecutorService create( + public static ExecutorService create( int maxQueueSize, int corePoolSize, int maxPoolSize, @@ -51,10 +67,22 @@ public interface BoundedExecutorService extends ExecutorService { TimeUnit unit, ThreadFactory threadFactory, @Nullable BiConsumer queueSizeStatus) { - return create(maxQueueSize, corePoolSize, maxPoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(100000), queueSizeStatus); + return createCustom(maxQueueSize, corePoolSize, maxPoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(100000), queueSizeStatus, new LinkedBlockingQueue<>()); } - static BoundedExecutorService create( + public static ExecutorService create( + int maxQueueSize, + int corePoolSize, + int maxPoolSize, + long keepAliveTime, + TimeUnit unit, + ThreadFactory threadFactory, + @Nullable BiConsumer queueSizeStatus, + BlockingQueue queue) { + return createCustom(maxQueueSize, corePoolSize, maxPoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(100000), queueSizeStatus, queue); + } + + public static ExecutorService createCustom( int maxQueueSize, int corePoolSize, int maxPoolSize, @@ -62,8 +90,8 @@ public interface BoundedExecutorService extends ExecutorService { TimeUnit unit, ThreadFactory threadFactory, Duration queueItemTtl, - @Nullable BiConsumer queueSizeStatus) { - var queue = new LinkedBlockingQueue(); + @Nullable BiConsumer queueSizeStatus, + BlockingQueue queue) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/CallableDecorator.java b/src/main/java/org/warp/commonutils/concurrency/executor/CallableDecorator.java new file mode 100644 index 0000000..dbc0f69 --- /dev/null +++ b/src/main/java/org/warp/commonutils/concurrency/executor/CallableDecorator.java @@ -0,0 +1,18 @@ +package org.warp.commonutils.concurrency.executor; + +import java.util.Objects; +import java.util.concurrent.Callable; + +public abstract class CallableDecorator implements Callable { + + private final Callable callable; + + public CallableDecorator(Callable callable) { + this.callable = Objects.requireNonNull(callable); + } + + @Override + public T call() throws Exception { + return callable.call(); + } +} diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/ExecutorDecorator.java b/src/main/java/org/warp/commonutils/concurrency/executor/ExecutorDecorator.java new file mode 100644 index 0000000..4d779a5 --- /dev/null +++ b/src/main/java/org/warp/commonutils/concurrency/executor/ExecutorDecorator.java @@ -0,0 +1,30 @@ +package org.warp.commonutils.concurrency.executor; + +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.Executor; +import org.jetbrains.annotations.NotNull; + +public abstract class ExecutorDecorator implements Executor { + private final Executor executor; + + public ExecutorDecorator(Executor executor) { + this.executor = Objects.requireNonNull(executor); + } + + public final Set> getExecutorDecorators() { + if (executor instanceof ExecutorDecorator) { + var decorators = ((ExecutorDecorator) executor).getExecutorDecorators(); + decorators.add(this.getClass()); + return decorators; + } else { + return new HashSet<>(); + } + } + + @Override + public void execute(@NotNull Runnable runnable) { + executor.execute(runnable); + } +} diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/ExecutorServiceDecorator.java b/src/main/java/org/warp/commonutils/concurrency/executor/ExecutorServiceDecorator.java new file mode 100644 index 0000000..c5309b0 --- /dev/null +++ b/src/main/java/org/warp/commonutils/concurrency/executor/ExecutorServiceDecorator.java @@ -0,0 +1,119 @@ +package org.warp.commonutils.concurrency.executor; + +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.jetbrains.annotations.NotNull; + +public abstract class ExecutorServiceDecorator implements ExecutorService { + private ExecutorService executorService; + + public ExecutorServiceDecorator(ExecutorService executorService) { + this.executorService = Objects.requireNonNull(executorService); + } + + protected static boolean hasDecorator(ExecutorService executor, + Class decoratorClass) { + if (executor instanceof ExecutorServiceDecorator) { + var executorServiceDecoratorImpl = (ExecutorServiceDecorator) executor; + var executorServiceDecorators = executorServiceDecoratorImpl.getExecutorServiceDecorators(); + return executorServiceDecorators.contains(decoratorClass); + } + return false; + } + + public final Set> getExecutorServiceDecorators() { + if (executorService instanceof ExecutorServiceDecorator) { + var decorators = ((ExecutorServiceDecorator) executorService).getExecutorServiceDecorators(); + decorators.add(this.getClass()); + return decorators; + } else { + return new HashSet<>(); + } + } + + @Override + public void shutdown() { + executorService.shutdown(); + } + + @NotNull + @Override + public List shutdownNow() { + return executorService.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return executorService.isShutdown(); + } + + @Override + public boolean isTerminated() { + return executorService.isTerminated(); + } + + @Override + public boolean awaitTermination(long l, @NotNull TimeUnit timeUnit) throws InterruptedException { + return executorService.awaitTermination(l, timeUnit); + } + + @NotNull + @Override + public Future submit(@NotNull Callable callable) { + return executorService.submit(callable); + } + + @NotNull + @Override + public Future submit(@NotNull Runnable runnable, T t) { + return executorService.submit(runnable, t); + } + + @NotNull + @Override + public Future submit(@NotNull Runnable runnable) { + return executorService.submit(runnable); + } + + @NotNull + @Override + public List> invokeAll(@NotNull Collection> collection) + throws InterruptedException { + return executorService.invokeAll(collection); + } + + @NotNull + @Override + public List> invokeAll(@NotNull Collection> collection, + long l, + @NotNull TimeUnit timeUnit) throws InterruptedException { + return executorService.invokeAll(collection, l, timeUnit); + } + + @NotNull + @Override + public T invokeAny(@NotNull Collection> collection) + throws InterruptedException, ExecutionException { + return executorService.invokeAny(collection); + } + + @Override + public T invokeAny(@NotNull Collection> collection, long l, @NotNull TimeUnit timeUnit) + throws InterruptedException, ExecutionException, TimeoutException { + return executorService.invokeAny(collection, l, timeUnit); + } + + @Override + public void execute(@NotNull Runnable runnable) { + executorService.execute(runnable); + } +} diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/PermitReleasingCallableDecorator.java b/src/main/java/org/warp/commonutils/concurrency/executor/PermitReleasingCallableDecorator.java new file mode 100644 index 0000000..99dc9f6 --- /dev/null +++ b/src/main/java/org/warp/commonutils/concurrency/executor/PermitReleasingCallableDecorator.java @@ -0,0 +1,39 @@ +package org.warp.commonutils.concurrency.executor; + +import java.util.concurrent.Callable; +import java.util.concurrent.Semaphore; +import javax.annotation.Nonnull; + +public final class PermitReleasingCallableDecorator extends CallableDecorator { + + @Nonnull + private final Runnable queueSizeUpdater; + + @Nonnull + private final Semaphore semaphore; + + PermitReleasingCallableDecorator(@Nonnull final Callable task, + @Nonnull final Runnable queueSizeUpdater, + @Nonnull final Semaphore semaphoreToRelease) { + super(task); + this.queueSizeUpdater = queueSizeUpdater; + this.semaphore = semaphoreToRelease; + } + + @Override + public T call() throws Exception { + try { + queueSizeUpdater.run(); + } finally { + // however execution goes, release permit for next task + this.semaphore.release(); + + return super.call(); + } + } + + @Override + public final String toString() { + return String.format("%s[delegate='%s']", getClass().getSimpleName(), super.toString()); + } +} diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/PermitReleasingRunnableDecorator.java b/src/main/java/org/warp/commonutils/concurrency/executor/PermitReleasingRunnableDecorator.java new file mode 100644 index 0000000..ee77a57 --- /dev/null +++ b/src/main/java/org/warp/commonutils/concurrency/executor/PermitReleasingRunnableDecorator.java @@ -0,0 +1,38 @@ +package org.warp.commonutils.concurrency.executor; + +import java.util.concurrent.Semaphore; +import javax.annotation.Nonnull; + +public final class PermitReleasingRunnableDecorator extends RunnableDecorator { + + @Nonnull + private final Runnable queueSizeUpdater; + + @Nonnull + private final Semaphore semaphore; + + PermitReleasingRunnableDecorator(@Nonnull final Runnable task, + @Nonnull final Runnable queueSizeUpdater, + @Nonnull final Semaphore semaphoreToRelease) { + super(task); + this.queueSizeUpdater = queueSizeUpdater; + this.semaphore = semaphoreToRelease; + } + + @Override + public void run() { + try { + queueSizeUpdater.run(); + } finally { + // however execution goes, release permit for next task + this.semaphore.release(); + + super.run(); + } + } + + @Override + public final String toString() { + return String.format("%s[delegate='%s']", getClass().getSimpleName(), super.toString()); + } +} diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/RunnableDecorator.java b/src/main/java/org/warp/commonutils/concurrency/executor/RunnableDecorator.java new file mode 100644 index 0000000..ce95485 --- /dev/null +++ b/src/main/java/org/warp/commonutils/concurrency/executor/RunnableDecorator.java @@ -0,0 +1,17 @@ +package org.warp.commonutils.concurrency.executor; + +import java.util.Objects; + +public abstract class RunnableDecorator implements Runnable { + + private final Runnable runnable; + + public RunnableDecorator(Runnable runnable) { + this.runnable = Objects.requireNonNull(runnable); + } + + @Override + public void run() { + runnable.run(); + } +} diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/SimplerExecutorServiceDecorator.java b/src/main/java/org/warp/commonutils/concurrency/executor/SimplerExecutorServiceDecorator.java new file mode 100644 index 0000000..1f42fe3 --- /dev/null +++ b/src/main/java/org/warp/commonutils/concurrency/executor/SimplerExecutorServiceDecorator.java @@ -0,0 +1,101 @@ +package org.warp.commonutils.concurrency.executor; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; +import org.jetbrains.annotations.NotNull; + +public class SimplerExecutorServiceDecorator extends ExecutorServiceDecorator { + + private final ExecutorDecorator executorDecorator; + + public SimplerExecutorServiceDecorator(ExecutorService executorService, + Function executorDecoratorInitializer) { + super(executorService); + this.executorDecorator = executorDecoratorInitializer.apply(executorService); + } + + @Override + public void shutdown() { + super.shutdown(); + } + + @NotNull + @Override + public List shutdownNow() { + return super.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return super.isShutdown(); + } + + @Override + public boolean isTerminated() { + return super.isTerminated(); + } + + @Override + public boolean awaitTermination(long l, @NotNull TimeUnit timeUnit) throws InterruptedException { + return super.awaitTermination(l, timeUnit); + } + + @NotNull + @Override + public Future submit(@NotNull Callable callable) { + return super.submit(callable); + } + + @NotNull + @Override + public Future submit(@NotNull Runnable runnable, T t) { + return super.submit(runnable, t); + } + + @NotNull + @Override + public Future submit(@NotNull Runnable runnable) { + return super.submit(runnable); + } + + @NotNull + @Override + public List> invokeAll(@NotNull Collection> collection) + throws InterruptedException { + return super.invokeAll(collection); + } + + @NotNull + @Override + public List> invokeAll(@NotNull Collection> collection, + long l, + @NotNull TimeUnit timeUnit) throws InterruptedException { + return super.invokeAll(collection, l, timeUnit); + } + + @NotNull + @Override + public T invokeAny(@NotNull Collection> collection) + throws InterruptedException, ExecutionException { + return super.invokeAny(collection); + } + + @Override + public T invokeAny(@NotNull Collection> collection, long l, @NotNull TimeUnit timeUnit) + throws InterruptedException, ExecutionException, TimeoutException { + return super.invokeAny(collection, l, timeUnit); + } + + @Override + public void execute(@NotNull Runnable runnable) { + executorDecorator.execute(runnable); + } +}