From ee72401487b6d4534ef395da2e28472521a6deb4 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sat, 13 Feb 2021 02:16:24 +0100 Subject: [PATCH] Add more examples --- .../SpeedExample.java | 32 +++++++++++++ .../database/disk/LLLocalDictionary.java | 48 ++++++++++--------- 2 files changed, 58 insertions(+), 22 deletions(-) diff --git a/src/example/java/it.cavallium.dbengine.client/SpeedExample.java b/src/example/java/it.cavallium.dbengine.client/SpeedExample.java index 15af975..a4ae2eb 100644 --- a/src/example/java/it.cavallium.dbengine.client/SpeedExample.java +++ b/src/example/java/it.cavallium.dbengine.client/SpeedExample.java @@ -26,6 +26,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Locale; +import java.util.Optional; import java.util.concurrent.CompletionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; @@ -48,6 +49,8 @@ public class SpeedExample { .then(testPutMulti()) .then(testPutValue(4)) .then(testPutValue(16 * 1024)) + .then(testUpdateValue(4)) + .then(testUpdateValue(16 * 1024)) .then(testAtPut()) .then(test2LevelPut()) .then(test3LevelPut()) @@ -254,6 +257,35 @@ public class SpeedExample { tuple -> tuple.getT1().close()); } + private static Mono testUpdateValue(int valSize) { + var ssg = new SubStageGetterSingleBytes(); + var ser = SerializerFixedBinaryLength.noop(4); + var itemKey = new byte[]{0, 1, 2, 3}; + var newValue1 = new byte[valSize]; + var newValue2 = new byte[valSize]; + for (int i = 0; i < valSize; i++) { + newValue1[i] = (byte) ((i * 13) % 256); + newValue2[i] = (byte) ((i * 11) % 256); + }; + return test("MapDictionaryDeep::updateValue (same key, alternating value, " + valSize + " bytes, " + batchSize + " times)", + tempDb() + .flatMap(db -> db.getDictionary("testmap", UpdateMode.ALLOW).map(dict -> Tuples.of(db, dict))) + .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ser, ssg))), + tuple -> Flux.range(0, batchSize).flatMap(n -> Mono + .defer(() -> tuple.getT2().updateValue(itemKey, (old) -> { + if (old.isPresent()) { + if (Arrays.equals(old.get(), newValue1)) { + return Optional.of(newValue2); + } + } + return Optional.of(newValue1); + }) + )) + .then(), + numRepeats, + tuple -> tuple.getT1().close()); + } + private static Mono testPutMulti() { var ssg = new SubStageGetterSingleBytes(); var ser = SerializerFixedBinaryLength.noop(4); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 5d921e3..fc4b47e 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -125,8 +125,8 @@ public class LLLocalDictionary implements LLDictionary { long stamp; if (updateMode == UpdateMode.ALLOW) { lock = itemsLock.getAt(getLockIndex(key)); - //noinspection BlockingMethodInNonBlockingContext - stamp = lock.readLockInterruptibly(); + + stamp = lock.readLock(); } else { lock = null; stamp = 0; @@ -194,8 +194,8 @@ public class LLLocalDictionary implements LLDictionary { long stamp; if (updateMode == UpdateMode.ALLOW) { lock = itemsLock.getAt(getLockIndex(key)); - //noinspection BlockingMethodInNonBlockingContext - stamp = lock.readLockInterruptibly(); + + stamp = lock.readLock(); } else { lock = null; stamp = 0; @@ -230,8 +230,8 @@ public class LLLocalDictionary implements LLDictionary { long stamp; if (updateMode == UpdateMode.ALLOW) { lock = itemsLock.getAt(getLockIndex(key)); - //noinspection BlockingMethodInNonBlockingContext - stamp = lock.writeLockInterruptibly(); + + stamp = lock.writeLock(); } else { lock = null; stamp = 0; @@ -261,8 +261,8 @@ public class LLLocalDictionary implements LLDictionary { long stamp; if (updateMode == UpdateMode.ALLOW) { lock = itemsLock.getAt(getLockIndex(key)); - //noinspection BlockingMethodInNonBlockingContext - stamp = lock.readLockInterruptibly(); + + stamp = lock.readLock(); } else { lock = null; stamp = 0; @@ -270,6 +270,7 @@ public class LLLocalDictionary implements LLDictionary { try { logger.trace("Reading {}", key); while (true) { + boolean changed = false; Optional prevData; var prevDataHolder = new Holder(); if (db.keyMayExist(cfh, key, prevDataHolder)) { @@ -282,15 +283,16 @@ public class LLLocalDictionary implements LLDictionary { prevData = Optional.empty(); } - boolean changed = false; Optional newData = value.apply(prevData); if (prevData.isPresent() && newData.isEmpty()) { if (updateMode == UpdateMode.ALLOW) { var ws = lock.tryConvertToWriteLock(stamp); - if (ws == 0) { + if (ws != 0) { + stamp = ws; + } else { lock.unlockRead(stamp); - //noinspection BlockingMethodInNonBlockingContext - stamp = lock.writeLockInterruptibly(); + + stamp = lock.writeLock(); continue; } } @@ -301,10 +303,12 @@ public class LLLocalDictionary implements LLDictionary { && (prevData.isEmpty() || !Arrays.equals(prevData.get(), newData.get()))) { if (updateMode == UpdateMode.ALLOW) { var ws = lock.tryConvertToWriteLock(stamp); - if (ws == 0) { + if (ws != 0) { + stamp = ws; + } else { lock.unlockRead(stamp); - //noinspection BlockingMethodInNonBlockingContext - stamp = lock.writeLockInterruptibly(); + + stamp = lock.writeLock(); continue; } } @@ -333,8 +337,8 @@ public class LLLocalDictionary implements LLDictionary { long stamp; if (updateMode == UpdateMode.ALLOW) { lock = itemsLock.getAt(getLockIndex(key)); - //noinspection BlockingMethodInNonBlockingContext - stamp = lock.writeLockInterruptibly(); + + stamp = lock.writeLock(); } else { lock = null; stamp = 0; @@ -366,8 +370,8 @@ public class LLLocalDictionary implements LLDictionary { long stamp; if (updateMode == UpdateMode.ALLOW) { lock = itemsLock.getAt(getLockIndex(key)); - //noinspection BlockingMethodInNonBlockingContext - stamp = lock.readLockInterruptibly(); + + stamp = lock.readLock(); } else { lock = null; stamp = 0; @@ -413,8 +417,8 @@ public class LLLocalDictionary implements LLDictionary { locks = itemsLock.bulkGetAt(getLockIndices(keysWindow)); stamps = new ArrayList<>(); for (var lock : locks) { - //noinspection BlockingMethodInNonBlockingContext - stamps.add(lock.readLockInterruptibly()); + + stamps.add(lock.readLock()); } } else { locks = null; @@ -466,7 +470,7 @@ public class LLLocalDictionary implements LLDictionary { locks = itemsLock.bulkGetAt(getLockIndicesEntries(entriesWindow)); stamps = new ArrayList<>(); for (var lock : locks) { - stamps.add(lock.writeLockInterruptibly()); + stamps.add(lock.writeLock()); } } else { locks = null;