From 2e2ec6ff68cb2b4493ab871ea66b70313d2acf56 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sat, 12 Dec 2020 22:08:14 +0100 Subject: [PATCH] Add scheduled task lifecycle --- pom.xml | 2 +- .../executor/ScheduledTaskLifecycle.java | 37 ++++++++++++++++ .../executor/TestScheduledTaskLifecycle.java | 42 +++++++++++++++++++ 3 files changed, 80 insertions(+), 1 deletion(-) create mode 100644 src/main/java/org/warp/commonutils/concurrency/executor/ScheduledTaskLifecycle.java create mode 100644 src/test/java/org/warp/commonutils/concurrency/executor/TestScheduledTaskLifecycle.java diff --git a/pom.xml b/pom.xml index cdc2516..9278ebc 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ common-utils org.warp - 1.1.1 + 1.1.2 UTF-8 diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/ScheduledTaskLifecycle.java b/src/main/java/org/warp/commonutils/concurrency/executor/ScheduledTaskLifecycle.java new file mode 100644 index 0000000..a6189c3 --- /dev/null +++ b/src/main/java/org/warp/commonutils/concurrency/executor/ScheduledTaskLifecycle.java @@ -0,0 +1,37 @@ +package org.warp.commonutils.concurrency.executor; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.locks.StampedLock; +import org.warp.commonutils.concurrency.atomicity.Atomic; + +@Atomic +public class ScheduledTaskLifecycle { + + private final StampedLock lock; + private final ConcurrentHashMap, Object> tasks = new ConcurrentHashMap<>(); + + public ScheduledTaskLifecycle() { + this.lock = new StampedLock(); + } + + public void registerScheduledTask(ScheduledFuture task) { + this.tasks.put(task, new Object()); + } + + public void startScheduledTask() { + this.lock.readLock(); + } + + public void endScheduledTask() { + this.lock.tryUnlockRead(); + } + + public void cancelAndWait() { + tasks.forEach((task, obj) -> { + task.cancel(false); + }); + + lock.unlockWrite(lock.writeLock()); + } +} diff --git a/src/test/java/org/warp/commonutils/concurrency/executor/TestScheduledTaskLifecycle.java b/src/test/java/org/warp/commonutils/concurrency/executor/TestScheduledTaskLifecycle.java new file mode 100644 index 0000000..5ff5acb --- /dev/null +++ b/src/test/java/org/warp/commonutils/concurrency/executor/TestScheduledTaskLifecycle.java @@ -0,0 +1,42 @@ +package org.warp.commonutils.concurrency.executor; + +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestScheduledTaskLifecycle { + + @Test + public void testScheduledTaskLifecycle() throws InterruptedException { + var scheduler = Executors.newScheduledThreadPool(100); + + var lifecycle = new ScheduledTaskLifecycle(); + + AtomicInteger runningTasks = new AtomicInteger(); + + for (int i = 0; i < 49; i++) { + lifecycle.registerScheduledTask(scheduler.scheduleAtFixedRate(() -> { + lifecycle.startScheduledTask(); + runningTasks.incrementAndGet(); + try { + Thread.sleep(33); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + runningTasks.decrementAndGet(); + lifecycle.endScheduledTask(); + } + }, 0, 1, TimeUnit.MICROSECONDS)); + } + + Thread.sleep(96); + + lifecycle.cancelAndWait(); + + System.out.println("stopped"); + + Assertions.assertEquals(0, runningTasks.get()); + } +}