Return unique stripes

This commit is contained in:
Andrea Cavalli 2021-02-13 01:36:00 +01:00
parent 6e07f1bace
commit d41a0d135b
2 changed files with 42 additions and 103 deletions

View File

@ -355,7 +355,7 @@ public class SnapshottableCollectionLock<T> {
public void dataUnlockMulti(Object snapshot, DataLockType dataLockMode, @NotNull Iterable<? extends T> key) {
if (snapshot == null) {
Objects.requireNonNull(key, () -> "Lock key must not be null");
for (ReadWriteUpdateLock lock : transactionLock.bulkGetInverse(key)) {
for (ReadWriteUpdateLock lock : transactionLock.bulkGet(key)) {
partialUnlockKey(dataLockMode, lock);
}
}
@ -364,7 +364,7 @@ public class SnapshottableCollectionLock<T> {
public void dataUnlockAtMulti(Object snapshot, DataLockType dataLockMode, @NotNull Iterable<Integer> key) {
if (snapshot == null) {
Objects.requireNonNull(key, () -> "Lock key must not be null");
for (ReadWriteUpdateLock lock : transactionLock.bulkGetAtInverse(key)) {
for (ReadWriteUpdateLock lock : transactionLock.bulkGetAt(key)) {
partialUnlockKey(dataLockMode, lock);
}
}

View File

@ -20,20 +20,19 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.MapMaker;
import com.google.common.math.IntMath;
import com.google.common.primitives.Ints;
import it.cavallium.concurrentlocks.ReadWriteUpdateLock;
import it.cavallium.concurrentlocks.ReentrantReadWriteUpdateLock;
import it.unimi.dsi.fastutil.ints.IntAVLTreeSet;
import it.unimi.dsi.fastutil.objects.ObjectLinkedOpenHashSet;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.math.RoundingMode;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReferenceArray;
@ -41,6 +40,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.StampedLock;
/**
* A striped {@code Lock/Semaphore/ReadWriteLock}. This offers the underlying lock striping similar to that of {@code
@ -134,112 +134,32 @@ public abstract class Striped<L> {
* #get(Object)}; may contain duplicates), in an increasing index order.
*/
public Iterable<L> bulkGet(Iterable<?> keys) {
return Collections.unmodifiableList(bulkGet_(keys));
return Collections.unmodifiableCollection(bulkGet_(keys));
}
public Iterable<L> bulkGetInverse(Iterable<?> keys) {
var list = bulkGet_(keys);
Collections.reverse(list);
return Collections.unmodifiableList(list);
}
private List<L> bulkGet_(Iterable<?> keys) {
// Initially using the array to store the keys, then reusing it to store the respective L's
final Object[] array = Iterables.toArray(keys, Object.class);
if (array.length == 0) {
return ImmutableList.of();
private Collection<L> bulkGet_(Iterable<?> keys) {
var stripes = new IntAVLTreeSet(Integer::compare);
for (Object key : keys) {
stripes.add(indexFor(key));
}
int[] stripes = new int[array.length];
for (int i = 0; i < array.length; i++) {
stripes[i] = indexFor(array[i]);
}
Arrays.sort(stripes);
// optimize for runs of identical stripes
int previousStripe = stripes[0];
array[0] = getAt(previousStripe);
for (int i = 1; i < array.length; i++) {
int currentStripe = stripes[i];
if (currentStripe == previousStripe) {
array[i] = array[i - 1];
} else {
array[i] = getAt(currentStripe);
previousStripe = currentStripe;
}
}
/*
* Note that the returned Iterable holds references to the returned stripes, to avoid
* error-prone code like:
*
* Striped<Lock> stripedLock = Striped.lazyWeakXXX(...)'
* Iterable<Lock> locks = stripedLock.bulkGet(keys);
* for (Lock lock : locks) {
* lock.lock();
* }
* operation();
* for (Lock lock : locks) {
* lock.unlock();
* }
*
* If we only held the int[] stripes, translating it on the fly to L's, the original locks might
* be garbage collected after locking them, ending up in a huge mess.
*/
@SuppressWarnings("unchecked") // we carefully replaced all keys with their respective L's
List<L> asList = (List<L>) Arrays.asList(array);
return asList;
var locks = new ObjectLinkedOpenHashSet<L>();
stripes.forEach((int stripe) -> locks.add(getAt(stripe)));
return locks;
}
public Iterable<L> bulkGetAt(Iterable<Integer> keys) {
return Collections.unmodifiableList(bulkGetAt_(keys));
return Collections.unmodifiableCollection(bulkGetAt_(keys));
}
public Iterable<L> bulkGetAtInverse(Iterable<Integer> keys) {
var list = bulkGetAt_(keys);
Collections.reverse(list);
return Collections.unmodifiableList(list);
}
private List<L> bulkGetAt_(Iterable<Integer> keys) {
// Initially using the array to store the keys, then reusing it to store the respective L's
final Object[] array = Iterables.toArray(keys, Object.class);
if (array.length == 0) {
return ImmutableList.of();
private Collection<L> bulkGetAt_(Iterable<Integer> keys) {
var stripes = new IntAVLTreeSet(Integer::compare);
for (Integer key : keys) {
stripes.add((int) key);
}
int[] stripes = new int[array.length];
for (int i = 0; i < array.length; i++) {
stripes[i] = (int) array[i];
var locks = new ObjectLinkedOpenHashSet<L>();
for (Integer stripe : stripes) {
locks.add(getAt(stripe));
}
Arrays.sort(stripes);
// optimize for runs of identical stripes
int previousStripe = stripes[0];
array[0] = getAt(previousStripe);
for (int i = 1; i < array.length; i++) {
int currentStripe = stripes[i];
if (currentStripe == previousStripe) {
array[i] = array[i - 1];
} else {
array[i] = getAt(currentStripe);
previousStripe = currentStripe;
}
}
/*
* Note that the returned Iterable holds references to the returned stripes, to avoid
* error-prone code like:
*
* Striped<Lock> stripedLock = Striped.lazyWeakXXX(...)'
* Iterable<Lock> locks = stripedLock.bulkGet(keys);
* for (Lock lock : locks) {
* lock.lock();
* }
* operation();
* for (Lock lock : locks) {
* lock.unlock();
* }
*
* If we only held the int[] stripes, translating it on the fly to L's, the original locks might
* be garbage collected after locking them, ending up in a huge mess.
*/
@SuppressWarnings("unchecked") // we carefully replaced all keys with their respective L's
List<L> asList = (List<L>) Arrays.asList(array);
return asList;
return locks;
}
// Static factories
@ -324,6 +244,17 @@ public abstract class Striped<L> {
return new CompactStriped<ReadWriteLock>(stripes, READ_WRITE_LOCK_SUPPLIER);
}
/**
* Creates a {@code Striped<StampedLock>} with eagerly initialized, strongly referenced read-write locks. Every lock
* is striped.
*
* @param stripes the minimum number of stripes (locks) required
* @return a new {@code Striped<StampedLock>}
*/
public static Striped<StampedLock> readWriteStampedLock(int stripes) {
return new CompactStriped<StampedLock>(stripes, STAMPED_LOCK_SUPPLIER);
}
/**
* Creates a {@code Striped<ReadWriteLock>} with eagerly initialized, strongly referenced read-write-update locks.
* Every lock is reentrant.
@ -354,6 +285,14 @@ public abstract class Striped<L> {
}
};
// StampedLock is large enough to make padding probably unnecessary
private static final Supplier<StampedLock> STAMPED_LOCK_SUPPLIER = new Supplier<StampedLock>() {
@Override
public StampedLock get() {
return new StampedLock();
}
};
// ReentrantReadWriteUpdateLock is large enough to make padding probably unnecessary
private static final Supplier<ReadWriteUpdateLock> READ_WRITE_UPDATE_LOCK_SUPPLIER = new Supplier<ReadWriteUpdateLock>() {
@Override