Add Option to disable update locks

This commit is contained in:
Andrea Cavalli 2021-02-13 01:31:24 +01:00
parent 3f78e5fec9
commit 2d82a1c9a5
5 changed files with 171 additions and 71 deletions

View File

@ -45,7 +45,8 @@ public class SpeedExample {
rangeTestPutMultiSame() rangeTestPutMultiSame()
.then(rangeTestPutMultiProgressive()) .then(rangeTestPutMultiProgressive())
.then(testPutMulti()) .then(testPutMulti())
.then(testPutValue()) .then(testPutValue(4))
.then(testPutValue(16 * 1024))
.then(testAtPut()) .then(testAtPut())
.then(test2LevelPut()) .then(test2LevelPut())
.then(test3LevelPut()) .then(test3LevelPut())
@ -227,12 +228,15 @@ public class SpeedExample {
tuple -> tuple.getT1().close()); tuple -> tuple.getT1().close());
} }
private static Mono<Void> testPutValue() { private static Mono<Void> testPutValue(int valSize) {
var ssg = new SubStageGetterSingleBytes(); var ssg = new SubStageGetterSingleBytes();
var ser = SerializerFixedBinaryLength.noop(4); var ser = SerializerFixedBinaryLength.noop(4);
var itemKey = new byte[]{0, 1, 2, 3}; var itemKey = new byte[]{0, 1, 2, 3};
var newValue = new byte[]{4, 5, 6, 7}; var newValue = new byte[valSize];
return test("MapDictionaryDeep::putValue (same key, same value, " + batchSize + " times)", for (int i = 0; i < valSize; i++) {
newValue[i] = (byte) ((i * 13) % 256);
};
return test("MapDictionaryDeep::putValue (same key, same value, " + valSize + " bytes, " + batchSize + " times)",
tempDb() tempDb()
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict))) .flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
.map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ser, ssg))), .map(tuple -> tuple.mapT2(dict -> DatabaseMapDictionaryDeep.simple(dict, ser, ssg))),
@ -399,7 +403,7 @@ public class SpeedExample {
} }
public static <U> Mono<? extends LLKeyValueDatabase> tempDb() { public static <U> Mono<? extends LLKeyValueDatabase> tempDb() {
var wrkspcPath = Path.of("/tmp/tempdb/"); var wrkspcPath = Path.of("/home/ubuntu/.cache/tempdb/");
return Mono return Mono
.fromCallable(() -> { .fromCallable(() -> {
if (Files.exists(wrkspcPath)) { if (Files.exists(wrkspcPath)) {

View File

@ -11,15 +11,15 @@ public interface LLKeyValueDatabase extends LLSnapshottable, LLKeyValueDatabaseS
Mono<? extends LLSingleton> getSingleton(byte[] singletonListColumnName, byte[] name, byte[] defaultValue); Mono<? extends LLSingleton> getSingleton(byte[] singletonListColumnName, byte[] name, byte[] defaultValue);
Mono<? extends LLDictionary> getDictionary(byte[] columnName); Mono<? extends LLDictionary> getDictionary(byte[] columnName, UpdateMode updateMode);
@Deprecated @Deprecated
default Mono<? extends LLDictionary> getDeprecatedSet(String name) { default Mono<? extends LLDictionary> getDeprecatedSet(String name, UpdateMode updateMode) {
return getDictionary(Column.deprecatedSet(name).getName().getBytes(StandardCharsets.US_ASCII)); return getDictionary(Column.deprecatedSet(name).getName().getBytes(StandardCharsets.US_ASCII), updateMode);
} }
default Mono<? extends LLDictionary> getDictionary(String name) { default Mono<? extends LLDictionary> getDictionary(String name, UpdateMode updateMode) {
return getDictionary(Column.dictionary(name).getName().getBytes(StandardCharsets.US_ASCII)); return getDictionary(Column.dictionary(name).getName().getBytes(StandardCharsets.US_ASCII), updateMode);
} }
default Mono<DatabaseInt> getInteger(String singletonListName, String name, int defaultValue) { default Mono<DatabaseInt> getInteger(String singletonListName, String name, int defaultValue) {

View File

@ -0,0 +1,17 @@
package it.cavallium.dbengine.database;
public enum UpdateMode {
/**
* Disallow update(). This speeds up the database reads and writes (x4 single writes, x1 multi writes)
*/
DISALLOW,
/**
* Allow update(). This will slow down the database reads and writes (x1 single writes, x1 multi writes)
*/
ALLOW,
/**
* Allow update(). This is as fast as {@link UpdateMode#DISALLOW} (x4 single writes, x1 multi writes),
* but you need to lock by yourself each key, otherwise the data will not be atomic!
*/
ALLOW_UNSAFE
}

View File

@ -5,6 +5,7 @@ import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateMode;
import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.objects.ObjectArrayList; import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.io.IOException; import java.io.IOException;
@ -47,7 +48,7 @@ public class LLLocalDictionary implements LLDictionary {
static final int MULTI_GET_WINDOW = 500; static final int MULTI_GET_WINDOW = 500;
static final WriteOptions BATCH_WRITE_OPTIONS = new WriteOptions().setLowPri(true); static final WriteOptions BATCH_WRITE_OPTIONS = new WriteOptions().setLowPri(true);
private static final int STRIPES = 512; private static final int STRIPES = 65536;
private static final byte[] FIRST_KEY = new byte[]{}; private static final byte[] FIRST_KEY = new byte[]{};
private static final byte[] NO_DATA = new byte[0]; private static final byte[] NO_DATA = new byte[0];
private static final ReadOptions EMPTY_READ_OPTIONS = new ReadOptions(); private static final ReadOptions EMPTY_READ_OPTIONS = new ReadOptions();
@ -57,12 +58,14 @@ public class LLLocalDictionary implements LLDictionary {
private final Scheduler dbScheduler; private final Scheduler dbScheduler;
private final Function<LLSnapshot, Snapshot> snapshotResolver; private final Function<LLSnapshot, Snapshot> snapshotResolver;
private final Striped<StampedLock> itemsLock = Striped.readWriteStampedLock(STRIPES); private final Striped<StampedLock> itemsLock = Striped.readWriteStampedLock(STRIPES);
private final UpdateMode updateMode;
public LLLocalDictionary(@NotNull RocksDB db, public LLLocalDictionary(@NotNull RocksDB db,
@NotNull ColumnFamilyHandle columnFamilyHandle, @NotNull ColumnFamilyHandle columnFamilyHandle,
String databaseName, String databaseName,
Scheduler dbScheduler, Scheduler dbScheduler,
Function<LLSnapshot, Snapshot> snapshotResolver) { Function<LLSnapshot, Snapshot> snapshotResolver,
UpdateMode updateMode) {
Objects.requireNonNull(db); Objects.requireNonNull(db);
this.db = db; this.db = db;
Objects.requireNonNull(columnFamilyHandle); Objects.requireNonNull(columnFamilyHandle);
@ -70,6 +73,7 @@ public class LLLocalDictionary implements LLDictionary {
this.databaseName = databaseName; this.databaseName = databaseName;
this.dbScheduler = dbScheduler; this.dbScheduler = dbScheduler;
this.snapshotResolver = snapshotResolver; this.snapshotResolver = snapshotResolver;
this.updateMode = updateMode;
} }
@Override @Override
@ -117,9 +121,16 @@ public class LLLocalDictionary implements LLDictionary {
public Mono<byte[]> get(@Nullable LLSnapshot snapshot, byte[] key) { public Mono<byte[]> get(@Nullable LLSnapshot snapshot, byte[] key) {
return Mono return Mono
.fromCallable(() -> { .fromCallable(() -> {
var lock = itemsLock.getAt(getLockIndex(key)); StampedLock lock;
//noinspection BlockingMethodInNonBlockingContext long stamp;
var stamp = lock.readLockInterruptibly(); if (updateMode == UpdateMode.ALLOW) {
lock = itemsLock.getAt(getLockIndex(key));
//noinspection BlockingMethodInNonBlockingContext
stamp = lock.readLockInterruptibly();
} else {
lock = null;
stamp = 0;
}
try { try {
logger.trace("Reading {}", key); logger.trace("Reading {}", key);
Holder<byte[]> data = new Holder<>(); Holder<byte[]> data = new Holder<>();
@ -133,7 +144,9 @@ public class LLLocalDictionary implements LLDictionary {
return null; return null;
} }
} finally { } finally {
lock.unlockRead(stamp); if (updateMode == UpdateMode.ALLOW) {
lock.unlockRead(stamp);
}
} }
}) })
.onErrorMap(IOException::new) .onErrorMap(IOException::new)
@ -177,9 +190,16 @@ public class LLLocalDictionary implements LLDictionary {
private Mono<Boolean> containsKey(@Nullable LLSnapshot snapshot, byte[] key) { private Mono<Boolean> containsKey(@Nullable LLSnapshot snapshot, byte[] key) {
return Mono return Mono
.fromCallable(() -> { .fromCallable(() -> {
var lock = itemsLock.getAt(getLockIndex(key)); StampedLock lock;
//noinspection BlockingMethodInNonBlockingContext long stamp;
var stamp = lock.readLockInterruptibly(); if (updateMode == UpdateMode.ALLOW) {
lock = itemsLock.getAt(getLockIndex(key));
//noinspection BlockingMethodInNonBlockingContext
stamp = lock.readLockInterruptibly();
} else {
lock = null;
stamp = 0;
}
try { try {
int size = RocksDB.NOT_FOUND; int size = RocksDB.NOT_FOUND;
Holder<byte[]> data = new Holder<>(); Holder<byte[]> data = new Holder<>();
@ -192,7 +212,9 @@ public class LLLocalDictionary implements LLDictionary {
} }
return size != RocksDB.NOT_FOUND; return size != RocksDB.NOT_FOUND;
} finally { } finally {
lock.unlockRead(stamp); if (updateMode == UpdateMode.ALLOW) {
lock.unlockRead(stamp);
}
} }
}) })
.onErrorMap(IOException::new) .onErrorMap(IOException::new)
@ -204,15 +226,24 @@ public class LLLocalDictionary implements LLDictionary {
return getPrevValue(key, resultType) return getPrevValue(key, resultType)
.concatWith(Mono .concatWith(Mono
.fromCallable(() -> { .fromCallable(() -> {
var lock = itemsLock.getAt(getLockIndex(key)); StampedLock lock;
//noinspection BlockingMethodInNonBlockingContext long stamp;
var stamp = lock.writeLockInterruptibly(); if (updateMode == UpdateMode.ALLOW) {
lock = itemsLock.getAt(getLockIndex(key));
//noinspection BlockingMethodInNonBlockingContext
stamp = lock.writeLockInterruptibly();
} else {
lock = null;
stamp = 0;
}
try { try {
logger.trace("Writing {}: {}", key, value); logger.trace("Writing {}: {}", key, value);
db.put(cfh, key, value); db.put(cfh, key, value);
return null; return null;
} finally { } finally {
lock.unlockWrite(stamp); if (updateMode == UpdateMode.ALLOW) {
lock.unlockWrite(stamp);
}
} }
}) })
.onErrorMap(IOException::new) .onErrorMap(IOException::new)
@ -225,9 +256,17 @@ public class LLLocalDictionary implements LLDictionary {
public Mono<Boolean> update(byte[] key, Function<Optional<byte[]>, Optional<byte[]>> value) { public Mono<Boolean> update(byte[] key, Function<Optional<byte[]>, Optional<byte[]>> value) {
return Mono return Mono
.fromCallable(() -> { .fromCallable(() -> {
var lock = itemsLock.getAt(getLockIndex(key)); if (updateMode == UpdateMode.DISALLOW) throw new UnsupportedOperationException("update() is disallowed");
//noinspection BlockingMethodInNonBlockingContext StampedLock lock;
long stamp = lock.readLockInterruptibly(); long stamp;
if (updateMode == UpdateMode.ALLOW) {
lock = itemsLock.getAt(getLockIndex(key));
//noinspection BlockingMethodInNonBlockingContext
stamp = lock.readLockInterruptibly();
} else {
lock = null;
stamp = 0;
}
try { try {
logger.trace("Reading {}", key); logger.trace("Reading {}", key);
while (true) { while (true) {
@ -246,24 +285,28 @@ public class LLLocalDictionary implements LLDictionary {
boolean changed = false; boolean changed = false;
Optional<byte[]> newData = value.apply(prevData); Optional<byte[]> newData = value.apply(prevData);
if (prevData.isPresent() && newData.isEmpty()) { if (prevData.isPresent() && newData.isEmpty()) {
var ws = lock.tryConvertToWriteLock(stamp); if (updateMode == UpdateMode.ALLOW) {
if (ws == 0) { var ws = lock.tryConvertToWriteLock(stamp);
lock.unlockRead(stamp); if (ws == 0) {
//noinspection BlockingMethodInNonBlockingContext lock.unlockRead(stamp);
stamp = lock.writeLockInterruptibly(); //noinspection BlockingMethodInNonBlockingContext
continue; stamp = lock.writeLockInterruptibly();
continue;
}
} }
logger.trace("Deleting {}", key); logger.trace("Deleting {}", key);
changed = true; changed = true;
db.delete(cfh, key); db.delete(cfh, key);
} else if (newData.isPresent() } else if (newData.isPresent()
&& (prevData.isEmpty() || !Arrays.equals(prevData.get(), newData.get()))) { && (prevData.isEmpty() || !Arrays.equals(prevData.get(), newData.get()))) {
var ws = lock.tryConvertToWriteLock(stamp); if (updateMode == UpdateMode.ALLOW) {
if (ws == 0) { var ws = lock.tryConvertToWriteLock(stamp);
lock.unlockRead(stamp); if (ws == 0) {
//noinspection BlockingMethodInNonBlockingContext lock.unlockRead(stamp);
stamp = lock.writeLockInterruptibly(); //noinspection BlockingMethodInNonBlockingContext
continue; stamp = lock.writeLockInterruptibly();
continue;
}
} }
logger.trace("Writing {}: {}", key, newData.get()); logger.trace("Writing {}: {}", key, newData.get());
changed = true; changed = true;
@ -272,7 +315,9 @@ public class LLLocalDictionary implements LLDictionary {
return changed; return changed;
} }
} finally { } finally {
lock.unlock(stamp); if (updateMode == UpdateMode.ALLOW) {
lock.unlock(stamp);
}
} }
}) })
.onErrorMap(IOException::new) .onErrorMap(IOException::new)
@ -284,14 +329,23 @@ public class LLLocalDictionary implements LLDictionary {
return getPrevValue(key, resultType) return getPrevValue(key, resultType)
.concatWith(Mono .concatWith(Mono
.fromCallable(() -> { .fromCallable(() -> {
var lock = itemsLock.getAt(getLockIndex(key)); StampedLock lock;
//noinspection BlockingMethodInNonBlockingContext long stamp;
long stamp = lock.writeLockInterruptibly(); if (updateMode == UpdateMode.ALLOW) {
lock = itemsLock.getAt(getLockIndex(key));
//noinspection BlockingMethodInNonBlockingContext
stamp = lock.writeLockInterruptibly();
} else {
lock = null;
stamp = 0;
}
try { try {
db.delete(cfh, key); db.delete(cfh, key);
return null; return null;
} finally { } finally {
lock.unlockWrite(stamp); if (updateMode == UpdateMode.ALLOW) {
lock.unlockWrite(stamp);
}
} }
}) })
.onErrorMap(IOException::new) .onErrorMap(IOException::new)
@ -308,9 +362,16 @@ public class LLLocalDictionary implements LLDictionary {
case PREVIOUS_VALUE: case PREVIOUS_VALUE:
return Mono return Mono
.fromCallable(() -> { .fromCallable(() -> {
var lock = itemsLock.getAt(getLockIndex(key)); StampedLock lock;
//noinspection BlockingMethodInNonBlockingContext long stamp;
long stamp = lock.readLockInterruptibly(); if (updateMode == UpdateMode.ALLOW) {
lock = itemsLock.getAt(getLockIndex(key));
//noinspection BlockingMethodInNonBlockingContext
stamp = lock.readLockInterruptibly();
} else {
lock = null;
stamp = 0;
}
try { try {
logger.trace("Reading {}", key); logger.trace("Reading {}", key);
var data = new Holder<byte[]>(); var data = new Holder<byte[]>();
@ -324,7 +385,9 @@ public class LLLocalDictionary implements LLDictionary {
return null; return null;
} }
} finally { } finally {
lock.unlockRead(stamp); if (updateMode == UpdateMode.ALLOW) {
lock.unlockRead(stamp);
}
} }
}) })
.onErrorMap(IOException::new) .onErrorMap(IOException::new)
@ -344,11 +407,18 @@ public class LLLocalDictionary implements LLDictionary {
.flatMap(keysWindowFlux -> keysWindowFlux.collectList() .flatMap(keysWindowFlux -> keysWindowFlux.collectList()
.flatMapMany(keysWindow -> Mono .flatMapMany(keysWindow -> Mono
.fromCallable(() -> { .fromCallable(() -> {
var locks = itemsLock.bulkGetAt(getLockIndices(keysWindow)); Iterable<StampedLock> locks;
ArrayList<Long> stamps = new ArrayList<>(); ArrayList<Long> stamps;
for (var lock : locks) { if (updateMode == UpdateMode.ALLOW) {
//noinspection BlockingMethodInNonBlockingContext locks = itemsLock.bulkGetAt(getLockIndices(keysWindow));
stamps.add(lock.readLockInterruptibly()); stamps = new ArrayList<>();
for (var lock : locks) {
//noinspection BlockingMethodInNonBlockingContext
stamps.add(lock.readLockInterruptibly());
}
} else {
locks = null;
stamps = null;
} }
try { try {
var handlesArray = new ColumnFamilyHandle[keysWindow.size()]; var handlesArray = new ColumnFamilyHandle[keysWindow.size()];
@ -365,10 +435,12 @@ public class LLLocalDictionary implements LLDictionary {
} }
return mappedResults; return mappedResults;
} finally { } finally {
int index = 0; if (updateMode == UpdateMode.ALLOW) {
for (var lock : locks) { int index = 0;
lock.unlockRead(stamps.get(index)); for (var lock : locks) {
index++; lock.unlockRead(stamps.get(index));
index++;
}
} }
} }
}) })
@ -388,14 +460,17 @@ public class LLLocalDictionary implements LLDictionary {
.getMulti(null, Flux.fromIterable(entriesWindow).map(Entry::getKey)) .getMulti(null, Flux.fromIterable(entriesWindow).map(Entry::getKey))
.publishOn(dbScheduler) .publishOn(dbScheduler)
.concatWith(Mono.fromCallable(() -> { .concatWith(Mono.fromCallable(() -> {
var locks = itemsLock.bulkGetAt(getLockIndicesEntries(entriesWindow)); Iterable<StampedLock> locks;
int i = 0; ArrayList<Long> stamps;
for (StampedLock lock : locks) { if (updateMode == UpdateMode.ALLOW) {
i++; locks = itemsLock.bulkGetAt(getLockIndicesEntries(entriesWindow));
} stamps = new ArrayList<>();
ArrayList<Long> stamps = new ArrayList<>(); for (var lock : locks) {
for (var lock : locks) { stamps.add(lock.writeLockInterruptibly());
stamps.add(lock.writeLockInterruptibly()); }
} else {
locks = null;
stamps = null;
} }
try { try {
var batch = new CappedWriteBatch(db, var batch = new CappedWriteBatch(db,
@ -411,10 +486,12 @@ public class LLLocalDictionary implements LLDictionary {
batch.close(); batch.close();
return null; return null;
} finally { } finally {
int index = 0; if (updateMode == UpdateMode.ALLOW) {
for (var lock : locks) { int index = 0;
lock.unlockWrite(stamps.get(index)); for (var lock : locks) {
index++; lock.unlockWrite(stamps.get(index));
index++;
}
} }
} }
}))); })));

View File

@ -3,6 +3,7 @@ package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.Column; import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.LLKeyValueDatabase; import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.UpdateMode;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
@ -290,13 +291,14 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
} }
@Override @Override
public Mono<LLLocalDictionary> getDictionary(byte[] columnName) { public Mono<LLLocalDictionary> getDictionary(byte[] columnName, UpdateMode updateMode) {
return Mono return Mono
.fromCallable(() -> new LLLocalDictionary(db, .fromCallable(() -> new LLLocalDictionary(db,
handles.get(Column.special(Column.toString(columnName))), handles.get(Column.special(Column.toString(columnName))),
name, name,
dbScheduler, dbScheduler,
(snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()) (snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()),
updateMode
)) ))
.subscribeOn(dbScheduler); .subscribeOn(dbScheduler);
} }