Add new locks

This commit is contained in:
Andrea Cavalli 2021-01-22 03:15:14 +01:00
parent 97b7246ead
commit ff44ed16ba
4 changed files with 291 additions and 0 deletions

View File

@ -0,0 +1,28 @@
package org.warp.commonutils.locks;
import java.util.concurrent.Phaser;
public class FlexibleCountDownLatch {
private final Phaser phaser;
public FlexibleCountDownLatch(int initialSize) {
this.phaser = new Phaser(initialSize + 1);
}
public void await() {
phaser.arriveAndAwaitAdvance();
}
public void grow() {
phaser.register();
}
public void grow(int n) {
phaser.bulkRegister(n);
}
public void countDown() {
phaser.arriveAndDeregister();
}
}

View File

@ -0,0 +1,41 @@
package org.warp.commonutils.locks;
import java.util.concurrent.Semaphore;
public class TransitLock {
private final FlexibleCountDownLatch unfinishedTransits = new FlexibleCountDownLatch(0);
private final Semaphore permits = new Semaphore(Integer.MAX_VALUE);
private final Object synchronization = new Object();
public TransitLock() {
}
public void allowTransit() {
var permitsToRelease = Math.max(0, Integer.MAX_VALUE - (permits.availablePermits() + 10000));
permits.release(permitsToRelease);
}
public void disallowTransit() {
synchronized (synchronization) {
unfinishedTransits.await();
permits.drainPermits();
}
}
public void transit() {
startTransit();
endTransit();
}
public void startTransit() {
synchronized (synchronization) {
unfinishedTransits.grow();
permits.acquireUninterruptibly();
}
}
public void endTransit() {
unfinishedTransits.countDown();
}
}

View File

@ -0,0 +1,69 @@
package org.warp.commonutils.locks;
public class TransitReadWriteLock {
private final TransitLock writeLock;
private final TransitLock readLock;
private final Object synchronization = new Object();
public TransitReadWriteLock() {
this.writeLock = new TransitLock();
this.readLock = new TransitLock();
}
public void reAllowTransit() {
this.readLock.allowTransit();
this.writeLock.allowTransit();
}
public void disallowTransit() {
synchronized (synchronization) {
this.writeLock.disallowTransit();
this.readLock.disallowTransit();
}
}
public void reAllowTransitRead() {
this.readLock.allowTransit();
}
public void disallowTransitRead() {
this.readLock.disallowTransit();
}
public void reAllowTransitWrite() {
this.writeLock.allowTransit();
}
public void disallowTransitWrite() {
this.writeLock.disallowTransit();
}
public void transitWrite() {
this.writeLock.transit();
}
public void transitRead() {
this.readLock.transit();
}
public void startTransitWrite() {
synchronized (synchronization) {
this.writeLock.startTransit();
}
}
public void startTransitRead() {
synchronized (synchronization) {
this.readLock.startTransit();
}
}
public void endTransitWrite() {
writeLock.endTransit();
}
public void endTransitRead() {
readLock.endTransit();
}
}

View File

@ -0,0 +1,153 @@
package org.warp.commonutils.locks;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class TransitLockTest {
@Test
public void testTransit() {
var lock = new TransitLock();
lock.transit();
lock.transit();
lock.transit();
}
@Test
public void testPartialTransit() {
var lock = new TransitLock();
lock.startTransit();
lock.endTransit();
lock.startTransit();
lock.endTransit();
lock.startTransit();
lock.endTransit();
}
@Test
public void testAllowTransit() {
var lock = new TransitLock();
lock.allowTransit();
lock.transit();
}
@Test
public void testMultiAllowTransit() {
var lock = new TransitLock();
lock.allowTransit();
lock.allowTransit();
lock.allowTransit();
lock.transit();
}
@Test
public void testMultiDisallowAllowTransit() {
var lock = new TransitLock();
lock.disallowTransit();
lock.allowTransit();
lock.disallowTransit();
lock.disallowTransit();
lock.allowTransit();
lock.transit();
}
@Test
public void testDisallowTransit() {
var lock = new TransitLock();
lock.disallowTransit();
for (int i = 0; i < 10; i++) {
Assertions.assertThrows(TimeoutException.class, () -> {
CompletableFuture.runAsync(lock::transit).get(5, TimeUnit.MILLISECONDS);
}, "Disallowed transit didn't block a transit");
}
}
@Test
public void testMultiDisallowTransit() {
var lock = new TransitLock();
lock.disallowTransit();
lock.disallowTransit();
lock.disallowTransit();
for (int i = 0; i < 10; i++) {
Assertions.assertThrows(TimeoutException.class, () -> {
CompletableFuture.runAsync(lock::transit).get(5, TimeUnit.MILLISECONDS);
}, "Disallowed transit didn't block a transit");
}
}
@Test
public void testDeadlocks() {
var lock = new TransitLock();
AtomicInteger alreadyRunningTransits = new AtomicInteger();
var pool = Executors.newFixedThreadPool(100);
AtomicReference<AssertionError> failure = new AtomicReference<>();
for (int i = 0; i < 100; i++) {
int iF = i;
pool.submit(() -> {
try {
for (int j = 0; j < 100; j++) {
if (iF % 2 == 0) {
lock.startTransit();
alreadyRunningTransits.getAndIncrement();
alreadyRunningTransits.decrementAndGet();
lock.endTransit();
} else {
lock.disallowTransit();
Assertions.assertEquals(0, alreadyRunningTransits.get());
lock.allowTransit();
}
}
} catch (AssertionError e) {
e.printStackTrace();
failure.set(e);
}
});
}
lock.allowTransit();
pool.shutdown();
if (failure.get() != null) throw failure.get();
Assertions.assertDoesNotThrow(() -> pool.awaitTermination(10, TimeUnit.SECONDS));
Assertions.assertTrue(pool.isTerminated());
}
@Test
public void testParallelTransit() {
var lock = new TransitLock();
AtomicInteger alreadyRunningTransits = new AtomicInteger();
var pool = Executors.newFixedThreadPool(100);
AtomicReference<AssertionError> failure = new AtomicReference<>();
for (int i = 0; i < 100; i++) {
pool.submit(() -> {
try {
lock.startTransit();
alreadyRunningTransits.getAndIncrement();
try {
Thread.sleep(20);
} catch (InterruptedException e) {
Assertions.fail(e);
}
alreadyRunningTransits.decrementAndGet();
lock.endTransit();
} catch (AssertionError e) {
e.printStackTrace();
failure.set(e);
}
});
}
lock.disallowTransit();
Assertions.assertEquals(0, alreadyRunningTransits.get());
lock.allowTransit();
pool.shutdown();
if (failure.get() != null) throw failure.get();
Assertions.assertDoesNotThrow(() -> pool.awaitTermination(10, TimeUnit.SECONDS));
Assertions.assertTrue(pool.isTerminated());
}
}