From 2d82a1c9a5d08e5520f2ab09c3cb813937a1e532 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sat, 13 Feb 2021 01:31:24 +0100 Subject: [PATCH] Add Option to disable update locks --- .../SpeedExample.java | 14 +- .../dbengine/database/LLKeyValueDatabase.java | 10 +- .../dbengine/database/UpdateMode.java | 17 ++ .../database/disk/LLLocalDictionary.java | 195 ++++++++++++------ .../disk/LLLocalKeyValueDatabase.java | 6 +- 5 files changed, 171 insertions(+), 71 deletions(-) create mode 100644 src/main/java/it/cavallium/dbengine/database/UpdateMode.java diff --git a/src/example/java/it.cavallium.dbengine.client/SpeedExample.java b/src/example/java/it.cavallium.dbengine.client/SpeedExample.java index 371210a..47c404e 100644 --- a/src/example/java/it.cavallium.dbengine.client/SpeedExample.java +++ b/src/example/java/it.cavallium.dbengine.client/SpeedExample.java @@ -45,7 +45,8 @@ public class SpeedExample { rangeTestPutMultiSame() .then(rangeTestPutMultiProgressive()) .then(testPutMulti()) - .then(testPutValue()) + .then(testPutValue(4)) + .then(testPutValue(16 * 1024)) .then(testAtPut()) .then(test2LevelPut()) .then(test3LevelPut()) @@ -227,12 +228,15 @@ public class SpeedExample { tuple -> tuple.getT1().close()); } - private static Mono testPutValue() { + private static Mono testPutValue(int valSize) { var ssg = new SubStageGetterSingleBytes(); var ser = SerializerFixedBinaryLength.noop(4); var itemKey = new byte[]{0, 1, 2, 3}; - var newValue = new byte[]{4, 5, 6, 7}; - return test("MapDictionaryDeep::putValue (same key, same value, " + batchSize + " times)", + var newValue = new byte[valSize]; + for (int i = 0; i < valSize; i++) { + newValue[i] = (byte) ((i * 13) % 256); + }; + return test("MapDictionaryDeep::putValue (same key, same value, " + valSize + " bytes, " + batchSize + " times)", tempDb() .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ser, ssg))), @@ -399,7 +403,7 @@ public class SpeedExample { } public static Mono tempDb() { - var wrkspcPath = Path.of("/tmp/tempdb/"); + var wrkspcPath = Path.of("/home/ubuntu/.cache/tempdb/"); return Mono .fromCallable(() -> { if (Files.exists(wrkspcPath)) { diff --git a/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java index 8cd63e0..f0ea57c 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java @@ -11,15 +11,15 @@ public interface LLKeyValueDatabase extends LLSnapshottable, LLKeyValueDatabaseS Mono getSingleton(byte[] singletonListColumnName, byte[] name, byte[] defaultValue); - Mono getDictionary(byte[] columnName); + Mono getDictionary(byte[] columnName, UpdateMode updateMode); @Deprecated - default Mono getDeprecatedSet(String name) { - return getDictionary(Column.deprecatedSet(name).getName().getBytes(StandardCharsets.US_ASCII)); + default Mono getDeprecatedSet(String name, UpdateMode updateMode) { + return getDictionary(Column.deprecatedSet(name).getName().getBytes(StandardCharsets.US_ASCII), updateMode); } - default Mono getDictionary(String name) { - return getDictionary(Column.dictionary(name).getName().getBytes(StandardCharsets.US_ASCII)); + default Mono getDictionary(String name, UpdateMode updateMode) { + return getDictionary(Column.dictionary(name).getName().getBytes(StandardCharsets.US_ASCII), updateMode); } default Mono getInteger(String singletonListName, String name, int defaultValue) { diff --git a/src/main/java/it/cavallium/dbengine/database/UpdateMode.java b/src/main/java/it/cavallium/dbengine/database/UpdateMode.java new file mode 100644 index 0000000..aa9fc3f --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/UpdateMode.java @@ -0,0 +1,17 @@ +package it.cavallium.dbengine.database; + +public enum UpdateMode { + /** + * Disallow update(). This speeds up the database reads and writes (x4 single writes, x1 multi writes) + */ + DISALLOW, + /** + * Allow update(). This will slow down the database reads and writes (x1 single writes, x1 multi writes) + */ + ALLOW, + /** + * Allow update(). This is as fast as {@link UpdateMode#DISALLOW} (x4 single writes, x1 multi writes), + * but you need to lock by yourself each key, otherwise the data will not be atomic! + */ + ALLOW_UNSAFE +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index fd90268..5d921e3 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -5,6 +5,7 @@ import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.UpdateMode; import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.objects.ObjectArrayList; import java.io.IOException; @@ -47,7 +48,7 @@ public class LLLocalDictionary implements LLDictionary { static final int MULTI_GET_WINDOW = 500; static final WriteOptions BATCH_WRITE_OPTIONS = new WriteOptions().setLowPri(true); - private static final int STRIPES = 512; + private static final int STRIPES = 65536; private static final byte[] FIRST_KEY = new byte[]{}; private static final byte[] NO_DATA = new byte[0]; private static final ReadOptions EMPTY_READ_OPTIONS = new ReadOptions(); @@ -57,12 +58,14 @@ public class LLLocalDictionary implements LLDictionary { private final Scheduler dbScheduler; private final Function snapshotResolver; private final Striped itemsLock = Striped.readWriteStampedLock(STRIPES); + private final UpdateMode updateMode; public LLLocalDictionary(@NotNull RocksDB db, @NotNull ColumnFamilyHandle columnFamilyHandle, String databaseName, Scheduler dbScheduler, - Function snapshotResolver) { + Function snapshotResolver, + UpdateMode updateMode) { Objects.requireNonNull(db); this.db = db; Objects.requireNonNull(columnFamilyHandle); @@ -70,6 +73,7 @@ public class LLLocalDictionary implements LLDictionary { this.databaseName = databaseName; this.dbScheduler = dbScheduler; this.snapshotResolver = snapshotResolver; + this.updateMode = updateMode; } @Override @@ -117,9 +121,16 @@ public class LLLocalDictionary implements LLDictionary { public Mono get(@Nullable LLSnapshot snapshot, byte[] key) { return Mono .fromCallable(() -> { - var lock = itemsLock.getAt(getLockIndex(key)); - //noinspection BlockingMethodInNonBlockingContext - var stamp = lock.readLockInterruptibly(); + StampedLock lock; + long stamp; + if (updateMode == UpdateMode.ALLOW) { + lock = itemsLock.getAt(getLockIndex(key)); + //noinspection BlockingMethodInNonBlockingContext + stamp = lock.readLockInterruptibly(); + } else { + lock = null; + stamp = 0; + } try { logger.trace("Reading {}", key); Holder data = new Holder<>(); @@ -133,7 +144,9 @@ public class LLLocalDictionary implements LLDictionary { return null; } } finally { - lock.unlockRead(stamp); + if (updateMode == UpdateMode.ALLOW) { + lock.unlockRead(stamp); + } } }) .onErrorMap(IOException::new) @@ -177,9 +190,16 @@ public class LLLocalDictionary implements LLDictionary { private Mono containsKey(@Nullable LLSnapshot snapshot, byte[] key) { return Mono .fromCallable(() -> { - var lock = itemsLock.getAt(getLockIndex(key)); - //noinspection BlockingMethodInNonBlockingContext - var stamp = lock.readLockInterruptibly(); + StampedLock lock; + long stamp; + if (updateMode == UpdateMode.ALLOW) { + lock = itemsLock.getAt(getLockIndex(key)); + //noinspection BlockingMethodInNonBlockingContext + stamp = lock.readLockInterruptibly(); + } else { + lock = null; + stamp = 0; + } try { int size = RocksDB.NOT_FOUND; Holder data = new Holder<>(); @@ -192,7 +212,9 @@ public class LLLocalDictionary implements LLDictionary { } return size != RocksDB.NOT_FOUND; } finally { - lock.unlockRead(stamp); + if (updateMode == UpdateMode.ALLOW) { + lock.unlockRead(stamp); + } } }) .onErrorMap(IOException::new) @@ -204,15 +226,24 @@ public class LLLocalDictionary implements LLDictionary { return getPrevValue(key, resultType) .concatWith(Mono .fromCallable(() -> { - var lock = itemsLock.getAt(getLockIndex(key)); - //noinspection BlockingMethodInNonBlockingContext - var stamp = lock.writeLockInterruptibly(); + StampedLock lock; + long stamp; + if (updateMode == UpdateMode.ALLOW) { + lock = itemsLock.getAt(getLockIndex(key)); + //noinspection BlockingMethodInNonBlockingContext + stamp = lock.writeLockInterruptibly(); + } else { + lock = null; + stamp = 0; + } try { logger.trace("Writing {}: {}", key, value); db.put(cfh, key, value); return null; } finally { - lock.unlockWrite(stamp); + if (updateMode == UpdateMode.ALLOW) { + lock.unlockWrite(stamp); + } } }) .onErrorMap(IOException::new) @@ -225,9 +256,17 @@ public class LLLocalDictionary implements LLDictionary { public Mono update(byte[] key, Function, Optional> value) { return Mono .fromCallable(() -> { - var lock = itemsLock.getAt(getLockIndex(key)); - //noinspection BlockingMethodInNonBlockingContext - long stamp = lock.readLockInterruptibly(); + if (updateMode == UpdateMode.DISALLOW) throw new UnsupportedOperationException("update() is disallowed"); + StampedLock lock; + long stamp; + if (updateMode == UpdateMode.ALLOW) { + lock = itemsLock.getAt(getLockIndex(key)); + //noinspection BlockingMethodInNonBlockingContext + stamp = lock.readLockInterruptibly(); + } else { + lock = null; + stamp = 0; + } try { logger.trace("Reading {}", key); while (true) { @@ -246,24 +285,28 @@ public class LLLocalDictionary implements LLDictionary { boolean changed = false; Optional newData = value.apply(prevData); if (prevData.isPresent() && newData.isEmpty()) { - var ws = lock.tryConvertToWriteLock(stamp); - if (ws == 0) { - lock.unlockRead(stamp); - //noinspection BlockingMethodInNonBlockingContext - stamp = lock.writeLockInterruptibly(); - continue; + if (updateMode == UpdateMode.ALLOW) { + var ws = lock.tryConvertToWriteLock(stamp); + if (ws == 0) { + lock.unlockRead(stamp); + //noinspection BlockingMethodInNonBlockingContext + stamp = lock.writeLockInterruptibly(); + continue; + } } logger.trace("Deleting {}", key); changed = true; db.delete(cfh, key); } else if (newData.isPresent() && (prevData.isEmpty() || !Arrays.equals(prevData.get(), newData.get()))) { - var ws = lock.tryConvertToWriteLock(stamp); - if (ws == 0) { - lock.unlockRead(stamp); - //noinspection BlockingMethodInNonBlockingContext - stamp = lock.writeLockInterruptibly(); - continue; + if (updateMode == UpdateMode.ALLOW) { + var ws = lock.tryConvertToWriteLock(stamp); + if (ws == 0) { + lock.unlockRead(stamp); + //noinspection BlockingMethodInNonBlockingContext + stamp = lock.writeLockInterruptibly(); + continue; + } } logger.trace("Writing {}: {}", key, newData.get()); changed = true; @@ -272,7 +315,9 @@ public class LLLocalDictionary implements LLDictionary { return changed; } } finally { - lock.unlock(stamp); + if (updateMode == UpdateMode.ALLOW) { + lock.unlock(stamp); + } } }) .onErrorMap(IOException::new) @@ -284,14 +329,23 @@ public class LLLocalDictionary implements LLDictionary { return getPrevValue(key, resultType) .concatWith(Mono .fromCallable(() -> { - var lock = itemsLock.getAt(getLockIndex(key)); - //noinspection BlockingMethodInNonBlockingContext - long stamp = lock.writeLockInterruptibly(); + StampedLock lock; + long stamp; + if (updateMode == UpdateMode.ALLOW) { + lock = itemsLock.getAt(getLockIndex(key)); + //noinspection BlockingMethodInNonBlockingContext + stamp = lock.writeLockInterruptibly(); + } else { + lock = null; + stamp = 0; + } try { db.delete(cfh, key); return null; } finally { - lock.unlockWrite(stamp); + if (updateMode == UpdateMode.ALLOW) { + lock.unlockWrite(stamp); + } } }) .onErrorMap(IOException::new) @@ -308,9 +362,16 @@ public class LLLocalDictionary implements LLDictionary { case PREVIOUS_VALUE: return Mono .fromCallable(() -> { - var lock = itemsLock.getAt(getLockIndex(key)); - //noinspection BlockingMethodInNonBlockingContext - long stamp = lock.readLockInterruptibly(); + StampedLock lock; + long stamp; + if (updateMode == UpdateMode.ALLOW) { + lock = itemsLock.getAt(getLockIndex(key)); + //noinspection BlockingMethodInNonBlockingContext + stamp = lock.readLockInterruptibly(); + } else { + lock = null; + stamp = 0; + } try { logger.trace("Reading {}", key); var data = new Holder(); @@ -324,7 +385,9 @@ public class LLLocalDictionary implements LLDictionary { return null; } } finally { - lock.unlockRead(stamp); + if (updateMode == UpdateMode.ALLOW) { + lock.unlockRead(stamp); + } } }) .onErrorMap(IOException::new) @@ -344,11 +407,18 @@ public class LLLocalDictionary implements LLDictionary { .flatMap(keysWindowFlux -> keysWindowFlux.collectList() .flatMapMany(keysWindow -> Mono .fromCallable(() -> { - var locks = itemsLock.bulkGetAt(getLockIndices(keysWindow)); - ArrayList stamps = new ArrayList<>(); - for (var lock : locks) { - //noinspection BlockingMethodInNonBlockingContext - stamps.add(lock.readLockInterruptibly()); + Iterable locks; + ArrayList stamps; + if (updateMode == UpdateMode.ALLOW) { + locks = itemsLock.bulkGetAt(getLockIndices(keysWindow)); + stamps = new ArrayList<>(); + for (var lock : locks) { + //noinspection BlockingMethodInNonBlockingContext + stamps.add(lock.readLockInterruptibly()); + } + } else { + locks = null; + stamps = null; } try { var handlesArray = new ColumnFamilyHandle[keysWindow.size()]; @@ -365,10 +435,12 @@ public class LLLocalDictionary implements LLDictionary { } return mappedResults; } finally { - int index = 0; - for (var lock : locks) { - lock.unlockRead(stamps.get(index)); - index++; + if (updateMode == UpdateMode.ALLOW) { + int index = 0; + for (var lock : locks) { + lock.unlockRead(stamps.get(index)); + index++; + } } } }) @@ -388,14 +460,17 @@ public class LLLocalDictionary implements LLDictionary { .getMulti(null, Flux.fromIterable(entriesWindow).map(Entry::getKey)) .publishOn(dbScheduler) .concatWith(Mono.fromCallable(() -> { - var locks = itemsLock.bulkGetAt(getLockIndicesEntries(entriesWindow)); - int i = 0; - for (StampedLock lock : locks) { - i++; - } - ArrayList stamps = new ArrayList<>(); - for (var lock : locks) { - stamps.add(lock.writeLockInterruptibly()); + Iterable locks; + ArrayList stamps; + if (updateMode == UpdateMode.ALLOW) { + locks = itemsLock.bulkGetAt(getLockIndicesEntries(entriesWindow)); + stamps = new ArrayList<>(); + for (var lock : locks) { + stamps.add(lock.writeLockInterruptibly()); + } + } else { + locks = null; + stamps = null; } try { var batch = new CappedWriteBatch(db, @@ -411,10 +486,12 @@ public class LLLocalDictionary implements LLDictionary { batch.close(); return null; } finally { - int index = 0; - for (var lock : locks) { - lock.unlockWrite(stamps.get(index)); - index++; + if (updateMode == UpdateMode.ALLOW) { + int index = 0; + for (var lock : locks) { + lock.unlockWrite(stamps.get(index)); + index++; + } } } }))); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 61cd9fd..1a93dc7 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -3,6 +3,7 @@ package it.cavallium.dbengine.database.disk; import it.cavallium.dbengine.database.Column; import it.cavallium.dbengine.database.LLKeyValueDatabase; import it.cavallium.dbengine.database.LLSnapshot; +import it.cavallium.dbengine.database.UpdateMode; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -290,13 +291,14 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { } @Override - public Mono getDictionary(byte[] columnName) { + public Mono getDictionary(byte[] columnName, UpdateMode updateMode) { return Mono .fromCallable(() -> new LLLocalDictionary(db, handles.get(Column.special(Column.toString(columnName))), name, dbScheduler, - (snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()) + (snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()), + updateMode )) .subscribeOn(dbScheduler); }