Bugfix
This commit is contained in:
parent
ff44ed16ba
commit
827bb23038
|
@ -1,25 +1,37 @@
|
||||||
package org.warp.commonutils.locks;
|
package org.warp.commonutils.locks;
|
||||||
|
|
||||||
import java.util.concurrent.Semaphore;
|
import java.util.concurrent.Semaphore;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
public class TransitLock {
|
public class TransitLock {
|
||||||
|
|
||||||
private final FlexibleCountDownLatch unfinishedTransits = new FlexibleCountDownLatch(0);
|
private final FlexibleCountDownLatch unfinishedTransits = new FlexibleCountDownLatch(0);
|
||||||
private final Semaphore permits = new Semaphore(Integer.MAX_VALUE);
|
private final Semaphore permits = new Semaphore(Integer.MAX_VALUE);
|
||||||
|
private final AtomicInteger allowedTransits = new AtomicInteger(1);
|
||||||
private final Object synchronization = new Object();
|
private final Object synchronization = new Object();
|
||||||
|
|
||||||
public TransitLock() {
|
public TransitLock() {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void allowTransit() {
|
public void allowTransit() {
|
||||||
var permitsToRelease = Math.max(0, Integer.MAX_VALUE - (permits.availablePermits() + 10000));
|
var allowedTransits = this.allowedTransits.incrementAndGet();
|
||||||
permits.release(permitsToRelease);
|
if (allowedTransits == 1) {
|
||||||
|
var permitsToRelease = Math.max(0, Integer.MAX_VALUE - (permits.availablePermits() + 10000));
|
||||||
|
permits.release(permitsToRelease);
|
||||||
|
} else if (allowedTransits > 1) {
|
||||||
|
throw new IllegalStateException("Already allowed transit!");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void disallowTransit() {
|
public void disallowTransit() {
|
||||||
synchronized (synchronization) {
|
synchronized (synchronization) {
|
||||||
unfinishedTransits.await();
|
var allowedTransits = this.allowedTransits.decrementAndGet();
|
||||||
permits.drainPermits();
|
if (allowedTransits == 0) {
|
||||||
|
unfinishedTransits.await();
|
||||||
|
permits.drainPermits();
|
||||||
|
} else if (allowedTransits > 0) {
|
||||||
|
throw new IllegalStateException("Transits are still allowed for an unknown reason!");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -33,17 +33,20 @@ public class TransitLockTest {
|
||||||
@Test
|
@Test
|
||||||
public void testAllowTransit() {
|
public void testAllowTransit() {
|
||||||
var lock = new TransitLock();
|
var lock = new TransitLock();
|
||||||
|
lock.disallowTransit();
|
||||||
lock.allowTransit();
|
lock.allowTransit();
|
||||||
lock.transit();
|
lock.transit();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMultiAllowTransit() {
|
public void testMultiAllowTransit() {
|
||||||
var lock = new TransitLock();
|
Assertions.assertThrows(IllegalStateException.class, () -> {
|
||||||
lock.allowTransit();
|
var lock = new TransitLock();
|
||||||
lock.allowTransit();
|
lock.allowTransit();
|
||||||
lock.allowTransit();
|
lock.allowTransit();
|
||||||
lock.transit();
|
lock.allowTransit();
|
||||||
|
lock.transit();
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -54,6 +57,7 @@ public class TransitLockTest {
|
||||||
lock.disallowTransit();
|
lock.disallowTransit();
|
||||||
lock.disallowTransit();
|
lock.disallowTransit();
|
||||||
lock.allowTransit();
|
lock.allowTransit();
|
||||||
|
lock.allowTransit();
|
||||||
lock.transit();
|
lock.transit();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -86,13 +90,13 @@ public class TransitLockTest {
|
||||||
var lock = new TransitLock();
|
var lock = new TransitLock();
|
||||||
AtomicInteger alreadyRunningTransits = new AtomicInteger();
|
AtomicInteger alreadyRunningTransits = new AtomicInteger();
|
||||||
|
|
||||||
var pool = Executors.newFixedThreadPool(100);
|
var pool = Executors.newFixedThreadPool(500);
|
||||||
AtomicReference<AssertionError> failure = new AtomicReference<>();
|
AtomicReference<Exception> failure = new AtomicReference<>();
|
||||||
for (int i = 0; i < 100; i++) {
|
for (int i = 0; i < 100; i++) {
|
||||||
int iF = i;
|
int iF = i;
|
||||||
pool.submit(() -> {
|
pool.submit(() -> {
|
||||||
try {
|
try {
|
||||||
for (int j = 0; j < 100; j++) {
|
for (int j = 0; j < 500; j++) {
|
||||||
if (iF % 2 == 0) {
|
if (iF % 2 == 0) {
|
||||||
lock.startTransit();
|
lock.startTransit();
|
||||||
alreadyRunningTransits.getAndIncrement();
|
alreadyRunningTransits.getAndIncrement();
|
||||||
|
@ -104,15 +108,14 @@ public class TransitLockTest {
|
||||||
lock.allowTransit();
|
lock.allowTransit();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (AssertionError e) {
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
failure.set(e);
|
failure.set(e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
lock.allowTransit();
|
|
||||||
pool.shutdown();
|
pool.shutdown();
|
||||||
if (failure.get() != null) throw failure.get();
|
if (failure.get() != null) throw new AssertionError(failure.get());
|
||||||
Assertions.assertDoesNotThrow(() -> pool.awaitTermination(10, TimeUnit.SECONDS));
|
Assertions.assertDoesNotThrow(() -> pool.awaitTermination(10, TimeUnit.SECONDS));
|
||||||
Assertions.assertTrue(pool.isTerminated());
|
Assertions.assertTrue(pool.isTerminated());
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue