Add more examples
This commit is contained in:
parent
92eb6182ae
commit
ee72401487
@ -26,6 +26,7 @@ import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Function;
|
||||
@ -48,6 +49,8 @@ public class SpeedExample {
|
||||
.then(testPutMulti())
|
||||
.then(testPutValue(4))
|
||||
.then(testPutValue(16 * 1024))
|
||||
.then(testUpdateValue(4))
|
||||
.then(testUpdateValue(16 * 1024))
|
||||
.then(testAtPut())
|
||||
.then(test2LevelPut())
|
||||
.then(test3LevelPut())
|
||||
@ -254,6 +257,35 @@ public class SpeedExample {
|
||||
tuple -> tuple.getT1().close());
|
||||
}
|
||||
|
||||
private static Mono<Void> testUpdateValue(int valSize) {
|
||||
var ssg = new SubStageGetterSingleBytes();
|
||||
var ser = SerializerFixedBinaryLength.noop(4);
|
||||
var itemKey = new byte[]{0, 1, 2, 3};
|
||||
var newValue1 = new byte[valSize];
|
||||
var newValue2 = new byte[valSize];
|
||||
for (int i = 0; i < valSize; i++) {
|
||||
newValue1[i] = (byte) ((i * 13) % 256);
|
||||
newValue2[i] = (byte) ((i * 11) % 256);
|
||||
};
|
||||
return test("MapDictionaryDeep::updateValue (same key, alternating value, " + valSize + " bytes, " + batchSize + " times)",
|
||||
tempDb()
|
||||
.flatMap(db -> db.getDictionary("testmap", UpdateMode.ALLOW).map(dict -> Tuples.of(db, dict)))
|
||||
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ser, ssg))),
|
||||
tuple -> Flux.range(0, batchSize).flatMap(n -> Mono
|
||||
.defer(() -> tuple.getT2().updateValue(itemKey, (old) -> {
|
||||
if (old.isPresent()) {
|
||||
if (Arrays.equals(old.get(), newValue1)) {
|
||||
return Optional.of(newValue2);
|
||||
}
|
||||
}
|
||||
return Optional.of(newValue1);
|
||||
})
|
||||
))
|
||||
.then(),
|
||||
numRepeats,
|
||||
tuple -> tuple.getT1().close());
|
||||
}
|
||||
|
||||
private static Mono<Void> testPutMulti() {
|
||||
var ssg = new SubStageGetterSingleBytes();
|
||||
var ser = SerializerFixedBinaryLength.noop(4);
|
||||
|
@ -125,8 +125,8 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
long stamp;
|
||||
if (updateMode == UpdateMode.ALLOW) {
|
||||
lock = itemsLock.getAt(getLockIndex(key));
|
||||
//noinspection BlockingMethodInNonBlockingContext
|
||||
stamp = lock.readLockInterruptibly();
|
||||
|
||||
stamp = lock.readLock();
|
||||
} else {
|
||||
lock = null;
|
||||
stamp = 0;
|
||||
@ -194,8 +194,8 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
long stamp;
|
||||
if (updateMode == UpdateMode.ALLOW) {
|
||||
lock = itemsLock.getAt(getLockIndex(key));
|
||||
//noinspection BlockingMethodInNonBlockingContext
|
||||
stamp = lock.readLockInterruptibly();
|
||||
|
||||
stamp = lock.readLock();
|
||||
} else {
|
||||
lock = null;
|
||||
stamp = 0;
|
||||
@ -230,8 +230,8 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
long stamp;
|
||||
if (updateMode == UpdateMode.ALLOW) {
|
||||
lock = itemsLock.getAt(getLockIndex(key));
|
||||
//noinspection BlockingMethodInNonBlockingContext
|
||||
stamp = lock.writeLockInterruptibly();
|
||||
|
||||
stamp = lock.writeLock();
|
||||
} else {
|
||||
lock = null;
|
||||
stamp = 0;
|
||||
@ -261,8 +261,8 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
long stamp;
|
||||
if (updateMode == UpdateMode.ALLOW) {
|
||||
lock = itemsLock.getAt(getLockIndex(key));
|
||||
//noinspection BlockingMethodInNonBlockingContext
|
||||
stamp = lock.readLockInterruptibly();
|
||||
|
||||
stamp = lock.readLock();
|
||||
} else {
|
||||
lock = null;
|
||||
stamp = 0;
|
||||
@ -270,6 +270,7 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
try {
|
||||
logger.trace("Reading {}", key);
|
||||
while (true) {
|
||||
boolean changed = false;
|
||||
Optional<byte[]> prevData;
|
||||
var prevDataHolder = new Holder<byte[]>();
|
||||
if (db.keyMayExist(cfh, key, prevDataHolder)) {
|
||||
@ -282,15 +283,16 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
prevData = Optional.empty();
|
||||
}
|
||||
|
||||
boolean changed = false;
|
||||
Optional<byte[]> newData = value.apply(prevData);
|
||||
if (prevData.isPresent() && newData.isEmpty()) {
|
||||
if (updateMode == UpdateMode.ALLOW) {
|
||||
var ws = lock.tryConvertToWriteLock(stamp);
|
||||
if (ws == 0) {
|
||||
if (ws != 0) {
|
||||
stamp = ws;
|
||||
} else {
|
||||
lock.unlockRead(stamp);
|
||||
//noinspection BlockingMethodInNonBlockingContext
|
||||
stamp = lock.writeLockInterruptibly();
|
||||
|
||||
stamp = lock.writeLock();
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@ -301,10 +303,12 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
&& (prevData.isEmpty() || !Arrays.equals(prevData.get(), newData.get()))) {
|
||||
if (updateMode == UpdateMode.ALLOW) {
|
||||
var ws = lock.tryConvertToWriteLock(stamp);
|
||||
if (ws == 0) {
|
||||
if (ws != 0) {
|
||||
stamp = ws;
|
||||
} else {
|
||||
lock.unlockRead(stamp);
|
||||
//noinspection BlockingMethodInNonBlockingContext
|
||||
stamp = lock.writeLockInterruptibly();
|
||||
|
||||
stamp = lock.writeLock();
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@ -333,8 +337,8 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
long stamp;
|
||||
if (updateMode == UpdateMode.ALLOW) {
|
||||
lock = itemsLock.getAt(getLockIndex(key));
|
||||
//noinspection BlockingMethodInNonBlockingContext
|
||||
stamp = lock.writeLockInterruptibly();
|
||||
|
||||
stamp = lock.writeLock();
|
||||
} else {
|
||||
lock = null;
|
||||
stamp = 0;
|
||||
@ -366,8 +370,8 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
long stamp;
|
||||
if (updateMode == UpdateMode.ALLOW) {
|
||||
lock = itemsLock.getAt(getLockIndex(key));
|
||||
//noinspection BlockingMethodInNonBlockingContext
|
||||
stamp = lock.readLockInterruptibly();
|
||||
|
||||
stamp = lock.readLock();
|
||||
} else {
|
||||
lock = null;
|
||||
stamp = 0;
|
||||
@ -413,8 +417,8 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
locks = itemsLock.bulkGetAt(getLockIndices(keysWindow));
|
||||
stamps = new ArrayList<>();
|
||||
for (var lock : locks) {
|
||||
//noinspection BlockingMethodInNonBlockingContext
|
||||
stamps.add(lock.readLockInterruptibly());
|
||||
|
||||
stamps.add(lock.readLock());
|
||||
}
|
||||
} else {
|
||||
locks = null;
|
||||
@ -466,7 +470,7 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
locks = itemsLock.bulkGetAt(getLockIndicesEntries(entriesWindow));
|
||||
stamps = new ArrayList<>();
|
||||
for (var lock : locks) {
|
||||
stamps.add(lock.writeLockInterruptibly());
|
||||
stamps.add(lock.writeLock());
|
||||
}
|
||||
} else {
|
||||
locks = null;
|
||||
|
Loading…
Reference in New Issue
Block a user