From 894351e633f30f70b904def44ead345fa517834f Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 27 Aug 2020 00:10:07 +0200 Subject: [PATCH] Added maximum queue size to avoid outofmemory --- .../concurrency/executor/BoundedExecutorService.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorService.java b/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorService.java index a9c8af5..9fc7928 100644 --- a/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorService.java +++ b/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorService.java @@ -13,6 +13,8 @@ import org.jetbrains.annotations.Nullable; public class BoundedExecutorService { + private static final int MAX_BLOCKING_QUEUE_SIZE = 50000; + private BoundedExecutorService() { } @@ -34,7 +36,7 @@ public class BoundedExecutorService { TimeUnit unit, ThreadFactory threadFactory, @Nullable BiConsumer queueSizeStatus) { - return createCustom(0, corePoolSize, maxPoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(100000), queueSizeStatus, new LinkedBlockingQueue<>()); + return createCustom(0, corePoolSize, maxPoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(100000), queueSizeStatus, new LinkedBlockingQueue<>(MAX_BLOCKING_QUEUE_SIZE)); } public static ExecutorService createUnbounded( @@ -67,7 +69,7 @@ public class BoundedExecutorService { TimeUnit unit, ThreadFactory threadFactory, @Nullable BiConsumer queueSizeStatus) { - return createCustom(maxQueueSize, corePoolSize, maxPoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(100000), queueSizeStatus, new LinkedBlockingQueue<>()); + return createCustom(maxQueueSize, corePoolSize, maxPoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(100000), queueSizeStatus, new LinkedBlockingQueue<>(MAX_BLOCKING_QUEUE_SIZE)); } public static ExecutorService create( @@ -99,6 +101,7 @@ public class BoundedExecutorService { queue, threadFactory ); + threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return new BlockingOnFullQueueExecutorServiceDecorator(threadPoolExecutor, maxQueueSize, queueItemTtl,