Use stamped locks

This commit is contained in:
Andrea Cavalli 2021-02-13 00:18:57 +01:00
parent 9df1bda11d
commit c796459e1c

View File

@ -1,6 +1,5 @@
package it.cavallium.dbengine.database.disk;
import it.cavallium.concurrentlocks.ReadWriteUpdateLock;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLRange;
@ -16,7 +15,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.StampedLock;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@ -48,7 +47,7 @@ public class LLLocalDictionary implements LLDictionary {
static final int MULTI_GET_WINDOW = 500;
static final WriteOptions BATCH_WRITE_OPTIONS = new WriteOptions().setLowPri(true);
private static final int STRIPES = 65536;
private static final int STRIPES = 256;
private static final byte[] FIRST_KEY = new byte[]{};
private static final byte[] NO_DATA = new byte[0];
private static final ReadOptions EMPTY_READ_OPTIONS = new ReadOptions();
@ -57,7 +56,7 @@ public class LLLocalDictionary implements LLDictionary {
private final String databaseName;
private final Scheduler dbScheduler;
private final Function<LLSnapshot, Snapshot> snapshotResolver;
private final Striped<ReadWriteUpdateLock> itemsLock = Striped.readWriteUpdateLock(STRIPES);
private final Striped<StampedLock> itemsLock = Striped.readWriteStampedLock(STRIPES);
public LLLocalDictionary(@NotNull RocksDB db,
@NotNull ColumnFamilyHandle columnFamilyHandle,
@ -118,8 +117,9 @@ public class LLLocalDictionary implements LLDictionary {
public Mono<byte[]> get(@Nullable LLSnapshot snapshot, byte[] key) {
return Mono
.fromCallable(() -> {
var lock = itemsLock.getAt(getLockIndex(key)).readLock();
lock.lock();
var lock = itemsLock.getAt(getLockIndex(key));
//noinspection BlockingMethodInNonBlockingContext
var stamp = lock.readLockInterruptibly();
try {
logger.trace("Reading {}", key);
Holder<byte[]> data = new Holder<>();
@ -133,7 +133,7 @@ public class LLLocalDictionary implements LLDictionary {
return null;
}
} finally {
lock.unlock();
lock.unlockRead(stamp);
}
})
.onErrorMap(IOException::new)
@ -177,8 +177,9 @@ public class LLLocalDictionary implements LLDictionary {
private Mono<Boolean> containsKey(@Nullable LLSnapshot snapshot, byte[] key) {
return Mono
.fromCallable(() -> {
var lock = itemsLock.getAt(getLockIndex(key)).readLock();
lock.lock();
var lock = itemsLock.getAt(getLockIndex(key));
//noinspection BlockingMethodInNonBlockingContext
var stamp = lock.readLockInterruptibly();
try {
int size = RocksDB.NOT_FOUND;
Holder<byte[]> data = new Holder<>();
@ -191,7 +192,7 @@ public class LLLocalDictionary implements LLDictionary {
}
return size != RocksDB.NOT_FOUND;
} finally {
lock.unlock();
lock.unlockRead(stamp);
}
})
.onErrorMap(IOException::new)
@ -203,14 +204,15 @@ public class LLLocalDictionary implements LLDictionary {
return getPrevValue(key, resultType)
.concatWith(Mono
.fromCallable(() -> {
var lock = itemsLock.getAt(getLockIndex(key)).writeLock();
lock.lock();
var lock = itemsLock.getAt(getLockIndex(key));
//noinspection BlockingMethodInNonBlockingContext
var stamp = lock.writeLockInterruptibly();
try {
logger.trace("Writing {}: {}", key, value);
db.put(cfh, key, value);
return null;
} finally {
lock.unlock();
lock.unlockWrite(stamp);
}
})
.onErrorMap(IOException::new)
@ -223,10 +225,12 @@ public class LLLocalDictionary implements LLDictionary {
public Mono<Boolean> update(byte[] key, Function<Optional<byte[]>, Optional<byte[]>> value) {
return Mono
.fromCallable(() -> {
var rwuLock = itemsLock.getAt(getLockIndex(key));
rwuLock.updateLock().lock();
var lock = itemsLock.getAt(getLockIndex(key));
//noinspection BlockingMethodInNonBlockingContext
long stamp = lock.readLockInterruptibly();
try {
logger.trace("Reading {}", key);
while (true) {
Optional<byte[]> prevData;
var prevDataHolder = new Holder<byte[]>();
if (db.keyMayExist(cfh, key, prevDataHolder)) {
@ -242,28 +246,33 @@ public class LLLocalDictionary implements LLDictionary {
boolean changed = false;
Optional<byte[]> newData = value.apply(prevData);
if (prevData.isPresent() && newData.isEmpty()) {
rwuLock.writeLock().lock();
try {
var ws = lock.tryConvertToWriteLock(stamp);
if (ws == 0) {
lock.unlockRead(stamp);
//noinspection BlockingMethodInNonBlockingContext
stamp = lock.writeLockInterruptibly();
continue;
}
logger.trace("Deleting {}", key);
changed = true;
db.delete(cfh, key);
} finally {
rwuLock.writeLock().unlock();
}
} else if (newData.isPresent()
&& (prevData.isEmpty() || !Arrays.equals(prevData.get(), newData.get()))) {
rwuLock.writeLock().lock();
try {
var ws = lock.tryConvertToWriteLock(stamp);
if (ws == 0) {
lock.unlockRead(stamp);
//noinspection BlockingMethodInNonBlockingContext
stamp = lock.writeLockInterruptibly();
continue;
}
logger.trace("Writing {}: {}", key, newData.get());
changed = true;
db.put(cfh, key, newData.get());
} finally {
rwuLock.writeLock().unlock();
}
}
return changed;
}
} finally {
rwuLock.updateLock().unlock();
lock.unlock(stamp);
}
})
.onErrorMap(IOException::new)
@ -275,13 +284,14 @@ public class LLLocalDictionary implements LLDictionary {
return getPrevValue(key, resultType)
.concatWith(Mono
.fromCallable(() -> {
var lock = itemsLock.getAt(getLockIndex(key)).writeLock();
lock.lock();
var lock = itemsLock.getAt(getLockIndex(key));
//noinspection BlockingMethodInNonBlockingContext
long stamp = lock.writeLockInterruptibly();
try {
db.delete(cfh, key);
return null;
} finally {
lock.unlock();
lock.unlockWrite(stamp);
}
})
.onErrorMap(IOException::new)
@ -298,8 +308,9 @@ public class LLLocalDictionary implements LLDictionary {
case PREVIOUS_VALUE:
return Mono
.fromCallable(() -> {
var lock = itemsLock.getAt(getLockIndex(key)).readLock();
lock.lock();
var lock = itemsLock.getAt(getLockIndex(key));
//noinspection BlockingMethodInNonBlockingContext
long stamp = lock.readLockInterruptibly();
try {
logger.trace("Reading {}", key);
var data = new Holder<byte[]>();
@ -313,7 +324,7 @@ public class LLLocalDictionary implements LLDictionary {
return null;
}
} finally {
lock.unlock();
lock.unlockRead(stamp);
}
})
.onErrorMap(IOException::new)
@ -334,8 +345,10 @@ public class LLLocalDictionary implements LLDictionary {
.flatMapMany(keysWindow -> Mono
.fromCallable(() -> {
var locks = itemsLock.bulkGetAt(getLockIndices(keysWindow));
for (ReadWriteLock lock : locks) {
lock.readLock().lock();
ArrayList<Long> stamps = new ArrayList<>();
for (var lock : locks) {
//noinspection BlockingMethodInNonBlockingContext
stamps.add(lock.readLockInterruptibly());
}
try {
var handlesArray = new ColumnFamilyHandle[keysWindow.size()];
@ -352,8 +365,10 @@ public class LLLocalDictionary implements LLDictionary {
}
return mappedResults;
} finally {
for (ReadWriteLock lock : locks) {
lock.readLock().unlock();
int index = 0;
for (var lock : locks) {
lock.unlockRead(stamps.get(index));
index++;
}
}
})
@ -374,8 +389,9 @@ public class LLLocalDictionary implements LLDictionary {
.publishOn(dbScheduler)
.concatWith(Mono.fromCallable(() -> {
var locks = itemsLock.bulkGetAt(getLockIndicesEntries(entriesWindow));
for (ReadWriteLock lock : locks) {
lock.writeLock().lock();
ArrayList<Long> stamps = new ArrayList<>();
for (var lock : locks) {
stamps.add(lock.writeLockInterruptibly());
}
try {
var batch = new CappedWriteBatch(db,
@ -391,8 +407,10 @@ public class LLLocalDictionary implements LLDictionary {
batch.close();
return null;
} finally {
for (ReadWriteLock lock : locks) {
lock.writeLock().unlock();
int index = 0;
for (var lock : locks) {
lock.unlockWrite(stamps.get(index));
index++;
}
}
})));