From 370197c6e15fdc2567f085054d2bd03dd6a3bf6d Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sat, 6 Feb 2021 19:21:31 +0100 Subject: [PATCH] Add update method --- .../IndicizationExample.java | 6 +- .../dbengine/database/LLDictionary.java | 3 + .../collections/DatabaseMapDictionary.java | 11 + .../database/collections/DatabaseSingle.java | 9 + .../collections/DatabaseSingleMapped.java | 7 + .../database/collections/DatabaseStage.java | 4 + .../collections/DatabaseStageMap.java | 18 ++ .../database/disk/LLLocalDictionary.java | 225 ++++++++++++++---- .../disk/LLLocalKeyValueDatabase.java | 1 - 9 files changed, 229 insertions(+), 55 deletions(-) diff --git a/src/example/java/it.cavallium.dbengine.client/IndicizationExample.java b/src/example/java/it.cavallium.dbengine.client/IndicizationExample.java index ae95fe3..d7c2429 100644 --- a/src/example/java/it.cavallium.dbengine.client/IndicizationExample.java +++ b/src/example/java/it.cavallium.dbengine.client/IndicizationExample.java @@ -34,7 +34,7 @@ public class IndicizationExample { }) ) .then(index.refresh()) - .then(index.search(null, Query.exactSearch(TextFieldsAnalyzer.NGramPartialString,"name", "Mario"), 1, null, LLScoreMode.COMPLETE, "id")) + .then(index.search(null, Query.exactSearch(TextFieldsAnalyzer.N4GramPartialString,"name", "Mario"), 1, null, LLScoreMode.COMPLETE, "id")) .flatMap(results -> results .results() .flatMap(r -> r) @@ -98,7 +98,7 @@ public class IndicizationExample { }) )) .then(index.refresh()) - .then(index.search(null, Query.exactSearch(TextFieldsAnalyzer.NGramPartialString,"name", "Mario"), 10, MultiSort.topScore() + .then(index.search(null, Query.exactSearch(TextFieldsAnalyzer.N4GramPartialString,"name", "Mario"), 10, MultiSort.topScore() .getQuerySort(), LLScoreMode.COMPLETE, "id")) .flatMap(results -> LuceneUtils.mergeStream(results .results(), MultiSort.topScoreRaw(), 10) @@ -153,7 +153,7 @@ public class IndicizationExample { .then(new LLLocalDatabaseConnection(wrkspcPath, true).connect()) .flatMap(conn -> conn.getLuceneIndex("testindices", 10, - TextFieldsAnalyzer.NGramPartialString, + TextFieldsAnalyzer.N4GramPartialString, TextFieldsSimilarity.NGramBM25Plus, Duration.ofSeconds(5), Duration.ofSeconds(5), diff --git a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java index cd6d2d3..9db376e 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java @@ -2,6 +2,7 @@ package it.cavallium.dbengine.database; import java.util.List; import java.util.Map.Entry; +import java.util.Optional; import java.util.function.Function; import org.jetbrains.annotations.Nullable; import org.warp.commonutils.concurrency.atomicity.NotAtomic; @@ -16,6 +17,8 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure { Mono put(byte[] key, byte[] value, LLDictionaryResultType resultType); + Mono update(byte[] key, Function, Optional> updater); + Mono remove(byte[] key, LLDictionaryResultType resultType); Flux> getMulti(@Nullable LLSnapshot snapshot, Flux keys); diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index 9403749..d516d02 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -10,6 +10,8 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; +import java.util.function.Function; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -106,6 +108,15 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep updateValue(T keySuffix, Function, Optional> updater) { + return dictionary + .update(toKey(serializeSuffix(keySuffix)), + oldSerialized -> updater.apply(oldSerialized.map(this::deserialize)).map(this::serialize) + ) + .then(); + } + @Override public Mono putValueAndGetPrevious(T keySuffix, U value) { return dictionary diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java index bcac399..9da588b 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java @@ -6,6 +6,8 @@ import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.serialization.Serializer; +import java.util.Optional; +import java.util.function.Function; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; @@ -39,6 +41,13 @@ public class DatabaseSingle implements DatabaseStageEntry { return dictionary.put(key, serialize(value), LLDictionaryResultType.PREVIOUS_VALUE).map(this::deserialize); } + @Override + public Mono update(Function, Optional> updater) { + return dictionary.update(key, + (oldValueSer) -> updater.apply(oldValueSer.map(this::deserialize)).map(this::serialize) + ); + } + @Override public Mono clearAndGetPrevious() { return dictionary.remove(key, LLDictionaryResultType.PREVIOUS_VALUE).map(this::deserialize); diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java index ea50d7b..3c64f0c 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java @@ -2,6 +2,8 @@ package it.cavallium.dbengine.database.collections; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.serialization.Serializer; +import java.util.Optional; +import java.util.function.Function; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; @@ -40,6 +42,11 @@ public class DatabaseSingleMapped implements DatabaseStageEntry { return serializedSingle.setAndGetStatus(serialize(value)); } + @Override + public Mono update(Function, Optional> updater) { + return serializedSingle.update(oldValue -> updater.apply(oldValue.map(this::deserialize)).map(this::serialize)); + } + @Override public Mono clear() { return serializedSingle.clear(); diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java index f3e7191..85eed18 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java @@ -2,6 +2,8 @@ package it.cavallium.dbengine.database.collections; import it.cavallium.dbengine.client.CompositeSnapshot; import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; @@ -23,6 +25,8 @@ public interface DatabaseStage extends DatabaseStageWithEntry { return setAndGetPrevious(value).map(oldValue -> !Objects.equals(oldValue, value)).defaultIfEmpty(false); } + Mono update(Function, Optional> updater); + default Mono clear() { return clearAndGetStatus().then(); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java index 6a23d10..dc8bcc1 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java @@ -6,6 +6,7 @@ import it.cavallium.dbengine.database.collections.JoinerBlocking.ValueGetterBloc import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.function.Function; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; @@ -28,6 +29,10 @@ public interface DatabaseStageMap> extends Dat return at(null, key).single().flatMap(v -> v.set(value)); } + default Mono updateValue(T key, Function, Optional> updater) { + return at(null, key).single().flatMap(v -> v.update(updater)); + } + default Mono putValueAndGetPrevious(T key, U value) { return at(null, key).single().flatMap(v -> v.setAndGetPrevious(value)); } @@ -104,6 +109,19 @@ public interface DatabaseStageMap> extends Dat .collectMap(Entry::getKey, Entry::getValue, HashMap::new); } + @Override + default Mono update(Function>, Optional>> updater) { + return this + .getAllValues(null) + .collectMap(Entry::getKey, Entry::getValue, HashMap::new) + .single() + .map(v -> v.isEmpty() ? Optional.>empty() : Optional.of(v)) + .map(updater) + .filter(Optional::isPresent) + .map(Optional::get) + .flatMap(values -> this.setAllValues(Flux.fromIterable(values.entrySet()))); + } + @Override default Mono> clearAndGetPrevious() { return this.setAndGetPrevious(Map.of()); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 58b5fcd..86451e5 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -1,10 +1,12 @@ package it.cavallium.dbengine.database.disk; +import it.cavallium.concurrentlocks.ReadWriteUpdateLock; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; +import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.objects.ObjectArrayList; import java.io.IOException; import java.util.ArrayList; @@ -13,6 +15,8 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.locks.ReadWriteLock; import java.util.function.Function; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -28,6 +32,7 @@ import org.rocksdb.WriteOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.warp.commonutils.concurrency.atomicity.NotAtomic; +import org.warp.commonutils.locks.Striped; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; @@ -43,6 +48,7 @@ public class LLLocalDictionary implements LLDictionary { static final int MULTI_GET_WINDOW = 500; static final WriteOptions BATCH_WRITE_OPTIONS = new WriteOptions().setLowPri(true); + private static final int STRIPES = 65536; private static final byte[] FIRST_KEY = new byte[]{}; private static final byte[] NO_DATA = new byte[0]; private static final ReadOptions EMPTY_READ_OPTIONS = new ReadOptions(); @@ -51,6 +57,7 @@ public class LLLocalDictionary implements LLDictionary { private final String databaseName; private final Scheduler dbScheduler; private final Function snapshotResolver; + private final Striped itemsLock = Striped.readWriteUpdateLock(STRIPES); public LLLocalDictionary(@NotNull RocksDB db, @NotNull ColumnFamilyHandle columnFamilyHandle, @@ -87,20 +94,46 @@ public class LLLocalDictionary implements LLDictionary { } } + private int getLockIndex(byte[] key) { + return Arrays.hashCode(key) % STRIPES; + } + + private IntArrayList getLockIndices(List keys) { + var list = new IntArrayList(keys.size()); + for (byte[] key : keys) { + list.add(getLockIndex(key)); + } + return list; + } + + private IntArrayList getLockIndicesEntries(List> keys) { + var list = new IntArrayList(keys.size()); + for (Entry key : keys) { + list.add(getLockIndex(key.getKey())); + } + return list; + } + @Override public Mono get(@Nullable LLSnapshot snapshot, byte[] key) { return Mono .fromCallable(() -> { - logger.trace("Reading {}", key); - Holder data = new Holder<>(); - if (db.keyMayExist(cfh, resolveSnapshot(snapshot), key, data)) { - if (data.getValue() != null) { - return data.getValue(); + var lock = itemsLock.getAt(getLockIndex(key)).readLock(); + lock.lock(); + try { + logger.trace("Reading {}", key); + Holder data = new Holder<>(); + if (db.keyMayExist(cfh, resolveSnapshot(snapshot), key, data)) { + if (data.getValue() != null) { + return data.getValue(); + } else { + return db.get(cfh, resolveSnapshot(snapshot), key); + } } else { - return db.get(cfh, resolveSnapshot(snapshot), key); + return null; } - } else { - return null; + } finally { + lock.unlock(); } }) .onErrorMap(IOException::new) @@ -144,16 +177,22 @@ public class LLLocalDictionary implements LLDictionary { private Mono containsKey(@Nullable LLSnapshot snapshot, byte[] key) { return Mono .fromCallable(() -> { - int size = RocksDB.NOT_FOUND; - Holder data = new Holder<>(); - if (db.keyMayExist(cfh, resolveSnapshot(snapshot), key, data)) { - if (data.getValue() != null) { - size = data.getValue().length; - } else { - size = db.get(cfh, resolveSnapshot(snapshot), key, NO_DATA); + var lock = itemsLock.getAt(getLockIndex(key)).readLock(); + lock.lock(); + try { + int size = RocksDB.NOT_FOUND; + Holder data = new Holder<>(); + if (db.keyMayExist(cfh, resolveSnapshot(snapshot), key, data)) { + if (data.getValue() != null) { + size = data.getValue().length; + } else { + size = db.get(cfh, resolveSnapshot(snapshot), key, NO_DATA); + } } + return size != RocksDB.NOT_FOUND; + } finally { + lock.unlock(); } - return size != RocksDB.NOT_FOUND; }) .onErrorMap(IOException::new) .subscribeOn(dbScheduler); @@ -164,9 +203,15 @@ public class LLLocalDictionary implements LLDictionary { return getPrevValue(key, resultType) .concatWith(Mono .fromCallable(() -> { - logger.trace("Writing {}: {}", key, value); - db.put(cfh, key, value); - return null; + var lock = itemsLock.getAt(getLockIndex(key)).writeLock(); + lock.lock(); + try { + logger.trace("Writing {}: {}", key, value); + db.put(cfh, key, value); + return null; + } finally { + lock.unlock(); + } }) .onErrorMap(IOException::new) .subscribeOn(dbScheduler) @@ -174,13 +219,64 @@ public class LLLocalDictionary implements LLDictionary { ).singleOrEmpty(); } + @Override + public Mono update(byte[] key, Function, Optional> value) { + return Mono + .fromCallable(() -> { + var rwuLock = itemsLock.getAt(getLockIndex(key)); + rwuLock.updateLock().lock(); + try { + Optional prevData; + var prevDataHolder = new Holder(); + if (db.keyMayExist(cfh, key, prevDataHolder)) { + if (prevDataHolder.getValue() != null) { + prevData = Optional.ofNullable(prevDataHolder.getValue()); + } else { + prevData = Optional.ofNullable(db.get(cfh, key)); + } + } else { + prevData = Optional.empty(); + } + + Optional newData = value.apply(prevData); + if (prevData.isPresent() && newData.isEmpty()) { + rwuLock.writeLock().lock(); + try { + db.delete(cfh, key); + } finally { + rwuLock.writeLock().unlock(); + } + } else if (newData.isPresent() + && (prevData.isEmpty() || !Arrays.equals(prevData.get(), newData.get()))) { + rwuLock.writeLock().lock(); + try { + db.put(cfh, key, newData.get()); + } finally { + rwuLock.writeLock().unlock(); + } + } + return null; + } finally { + rwuLock.updateLock().unlock(); + } + }) + .onErrorMap(IOException::new) + .subscribeOn(dbScheduler); + } + @Override public Mono remove(byte[] key, LLDictionaryResultType resultType) { return getPrevValue(key, resultType) .concatWith(Mono .fromCallable(() -> { - db.delete(cfh, key); - return null; + var lock = itemsLock.getAt(getLockIndex(key)).writeLock(); + lock.lock(); + try { + db.delete(cfh, key); + return null; + } finally { + lock.unlock(); + } }) .onErrorMap(IOException::new) .subscribeOn(dbScheduler) @@ -196,16 +292,22 @@ public class LLLocalDictionary implements LLDictionary { case PREVIOUS_VALUE: return Mono .fromCallable(() -> { - logger.trace("Reading {}", key); - var data = new Holder(); - if (db.keyMayExist(cfh, key, data)) { - if (data.getValue() != null) { - return data.getValue(); + var lock = itemsLock.getAt(getLockIndex(key)).readLock(); + lock.lock(); + try { + logger.trace("Reading {}", key); + var data = new Holder(); + if (db.keyMayExist(cfh, key, data)) { + if (data.getValue() != null) { + return data.getValue(); + } else { + return db.get(cfh, key); + } } else { - return db.get(cfh, key); + return null; } - } else { - return null; + } finally { + lock.unlock(); } }) .onErrorMap(IOException::new) @@ -225,19 +327,29 @@ public class LLLocalDictionary implements LLDictionary { .flatMap(keysWindowFlux -> keysWindowFlux.collectList() .flatMapMany(keysWindow -> Mono .fromCallable(() -> { - var handlesArray = new ColumnFamilyHandle[keysWindow.size()]; - Arrays.fill(handlesArray, cfh); - var handles = ObjectArrayList.wrap(handlesArray, handlesArray.length); - var results = db.multiGetAsList(resolveSnapshot(snapshot), handles, keysWindow); - var mappedResults = new ArrayList>(results.size()); - for (int i = 0; i < results.size(); i++) { - var val = results.get(i); - if (val != null) { - results.set(i, null); - mappedResults.add(Map.entry(keysWindow.get(i), val)); + var locks = itemsLock.bulkGetAt(getLockIndices(keysWindow)); + for (ReadWriteLock lock : locks) { + lock.readLock().lock(); + } + try { + var handlesArray = new ColumnFamilyHandle[keysWindow.size()]; + Arrays.fill(handlesArray, cfh); + var handles = ObjectArrayList.wrap(handlesArray, handlesArray.length); + var results = db.multiGetAsList(resolveSnapshot(snapshot), handles, keysWindow); + var mappedResults = new ArrayList>(results.size()); + for (int i = 0; i < results.size(); i++) { + var val = results.get(i); + if (val != null) { + results.set(i, null); + mappedResults.add(Map.entry(keysWindow.get(i), val)); + } + } + return mappedResults; + } finally { + for (ReadWriteLock lock : locks) { + lock.readLock().unlock(); } } - return mappedResults; }) .subscribeOn(dbScheduler) .flatMapMany(Flux::fromIterable) @@ -255,18 +367,28 @@ public class LLLocalDictionary implements LLDictionary { .getMulti(null, Flux.fromIterable(entriesWindow).map(Entry::getKey)) .publishOn(dbScheduler) .concatWith(Mono.fromCallable(() -> { - var batch = new CappedWriteBatch(db, - CAPPED_WRITE_BATCH_CAP, - RESERVED_WRITE_BATCH_SIZE, - MAX_WRITE_BATCH_SIZE, - BATCH_WRITE_OPTIONS - ); - for (Entry entry : entriesWindow) { - batch.put(entry.getKey(), entry.getValue()); + var locks = itemsLock.bulkGetAt(getLockIndicesEntries(entriesWindow)); + for (ReadWriteLock lock : locks) { + lock.writeLock().lock(); + } + try { + var batch = new CappedWriteBatch(db, + CAPPED_WRITE_BATCH_CAP, + RESERVED_WRITE_BATCH_SIZE, + MAX_WRITE_BATCH_SIZE, + BATCH_WRITE_OPTIONS + ); + for (Entry entry : entriesWindow) { + batch.put(entry.getKey(), entry.getValue()); + } + batch.writeToDbAndClose(); + batch.close(); + return null; + } finally { + for (ReadWriteLock lock : locks) { + lock.writeLock().unlock(); + } } - batch.writeToDbAndClose(); - batch.close(); - return null; }))); } @@ -476,6 +598,7 @@ public class LLLocalDictionary implements LLDictionary { .subscribeOn(dbScheduler); } + //todo: replace implementation with a simple Flux.push @Override public Flux> setRange(LLRange range, Flux> entries, diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 205dee4..fb52bf9 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -55,7 +55,6 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { private final ConcurrentHashMap snapshotsHandles = new ConcurrentHashMap<>(); private final AtomicLong nextSnapshotNumbers = new AtomicLong(1); - @SuppressWarnings("CommentedOutCode") public LLLocalKeyValueDatabase(String name, Path path, List columns, List handles, boolean crashIfWalError, boolean lowMemory) throws IOException { Options options = openRocksDb(path, crashIfWalError, lowMemory);