Update BoundedExecutorServiceImpl.java and BoundedQueueTest.java
This commit is contained in:
parent
a5c8f8f6a5
commit
988e104173
@ -51,6 +51,24 @@ class BoundedExecutorServiceImpl extends ThreadPoolExecutor implements BoundedEx
|
|||||||
return this.submit(task);
|
return this.submit(task);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Future<?> submit(Runnable task) {
|
||||||
|
preChecks();
|
||||||
|
return super.submit(task);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> Future<T> submit(Callable<T> task) {
|
||||||
|
preChecks();
|
||||||
|
return super.submit(task);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> Future<T> submit(Runnable task, T result) {
|
||||||
|
preChecks();
|
||||||
|
return super.submit(task, result);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Submits task to execution pool, but blocks while number of running threads
|
* Submits task to execution pool, but blocks while number of running threads
|
||||||
* has reached the bound limit
|
* has reached the bound limit
|
||||||
@ -60,6 +78,32 @@ class BoundedExecutorServiceImpl extends ThreadPoolExecutor implements BoundedEx
|
|||||||
this.execute(task);
|
this.execute(task);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void execute(Runnable command) {
|
||||||
|
preChecks();
|
||||||
|
super.execute(command);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void preChecks() {
|
||||||
|
try {
|
||||||
|
blockIfFull();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void beforeExecute(Thread t, Runnable r) {
|
||||||
|
var queueSize = getQueue().size();
|
||||||
|
synchronized (queueSizeStatusLock) {
|
||||||
|
if (queueSizeStatus != null) queueSizeStatus.accept(queueSize >= maxQueueSize, queueSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
semaphore.release();
|
||||||
|
|
||||||
|
super.beforeExecute(t, r);
|
||||||
|
}
|
||||||
|
|
||||||
private void blockIfFull() throws InterruptedException {
|
private void blockIfFull() throws InterruptedException {
|
||||||
if (semaphore.availablePermits() == 0) {
|
if (semaphore.availablePermits() == 0) {
|
||||||
synchronized (queueSizeStatusLock) {
|
synchronized (queueSizeStatusLock) {
|
||||||
@ -68,22 +112,4 @@ class BoundedExecutorServiceImpl extends ThreadPoolExecutor implements BoundedEx
|
|||||||
}
|
}
|
||||||
semaphore.acquire();
|
semaphore.acquire();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void beforeExecute(Thread t, Runnable r) {
|
|
||||||
try {
|
|
||||||
blockIfFull();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
var queueSize = getQueue().size();
|
|
||||||
synchronized (queueSizeStatusLock) {
|
|
||||||
if (queueSizeStatus != null) queueSizeStatus.accept(queueSize >= maxQueueSize, queueSize);
|
|
||||||
}
|
|
||||||
|
|
||||||
semaphore.release();
|
|
||||||
|
|
||||||
super.beforeExecute(t, r);
|
|
||||||
}
|
|
||||||
}
|
}
|
51
src/test/java/org/warp/commonutils/BoundedQueueTest.java
Normal file
51
src/test/java/org/warp/commonutils/BoundedQueueTest.java
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
package org.warp.commonutils;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.opentest4j.AssertionFailedError;
|
||||||
|
import org.warp.commonutils.concurrency.executor.BoundedExecutorService;
|
||||||
|
import org.warp.commonutils.type.ShortNamedThreadFactory;
|
||||||
|
|
||||||
|
public class BoundedQueueTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBoundedQueue() throws InterruptedException {
|
||||||
|
int maxQueueSize = 2;
|
||||||
|
AtomicInteger queueSize = new AtomicInteger();
|
||||||
|
AtomicReference<AssertionFailedError> failedError = new AtomicReference<>();
|
||||||
|
var executor = BoundedExecutorService.create(maxQueueSize,
|
||||||
|
1,
|
||||||
|
1,
|
||||||
|
0L,
|
||||||
|
TimeUnit.MILLISECONDS,
|
||||||
|
new ShortNamedThreadFactory("test"),
|
||||||
|
(isQueueFull, currentQueueSize) -> {
|
||||||
|
try {
|
||||||
|
if (currentQueueSize >= maxQueueSize) {
|
||||||
|
Assertions.assertTrue(isQueueFull);
|
||||||
|
} else {
|
||||||
|
Assertions.assertFalse(isQueueFull);
|
||||||
|
}
|
||||||
|
} catch (AssertionFailedError ex) {
|
||||||
|
if (failedError.get() == null) {
|
||||||
|
failedError.set(ex);
|
||||||
|
}
|
||||||
|
ex.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
for (int i = 0; i < 10000; i++) {
|
||||||
|
queueSize.incrementAndGet();
|
||||||
|
executor.executeButBlockIfFull(queueSize::decrementAndGet);
|
||||||
|
}
|
||||||
|
|
||||||
|
executor.shutdown();
|
||||||
|
executor.awaitTermination(10, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
Assertions.assertNull(failedError.get());
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user