Add update method
This commit is contained in:
parent
2c8bb8a480
commit
370197c6e1
@ -34,7 +34,7 @@ public class IndicizationExample {
|
||||
})
|
||||
)
|
||||
.then(index.refresh())
|
||||
.then(index.search(null, Query.exactSearch(TextFieldsAnalyzer.NGramPartialString,"name", "Mario"), 1, null, LLScoreMode.COMPLETE, "id"))
|
||||
.then(index.search(null, Query.exactSearch(TextFieldsAnalyzer.N4GramPartialString,"name", "Mario"), 1, null, LLScoreMode.COMPLETE, "id"))
|
||||
.flatMap(results -> results
|
||||
.results()
|
||||
.flatMap(r -> r)
|
||||
@ -98,7 +98,7 @@ public class IndicizationExample {
|
||||
})
|
||||
))
|
||||
.then(index.refresh())
|
||||
.then(index.search(null, Query.exactSearch(TextFieldsAnalyzer.NGramPartialString,"name", "Mario"), 10, MultiSort.topScore()
|
||||
.then(index.search(null, Query.exactSearch(TextFieldsAnalyzer.N4GramPartialString,"name", "Mario"), 10, MultiSort.topScore()
|
||||
.getQuerySort(), LLScoreMode.COMPLETE, "id"))
|
||||
.flatMap(results -> LuceneUtils.mergeStream(results
|
||||
.results(), MultiSort.topScoreRaw(), 10)
|
||||
@ -153,7 +153,7 @@ public class IndicizationExample {
|
||||
.then(new LLLocalDatabaseConnection(wrkspcPath, true).connect())
|
||||
.flatMap(conn -> conn.getLuceneIndex("testindices",
|
||||
10,
|
||||
TextFieldsAnalyzer.NGramPartialString,
|
||||
TextFieldsAnalyzer.N4GramPartialString,
|
||||
TextFieldsSimilarity.NGramBM25Plus,
|
||||
Duration.ofSeconds(5),
|
||||
Duration.ofSeconds(5),
|
||||
|
@ -2,6 +2,7 @@ package it.cavallium.dbengine.database;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.warp.commonutils.concurrency.atomicity.NotAtomic;
|
||||
@ -16,6 +17,8 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
|
||||
|
||||
Mono<byte[]> put(byte[] key, byte[] value, LLDictionaryResultType resultType);
|
||||
|
||||
Mono<Void> update(byte[] key, Function<Optional<byte[]>, Optional<byte[]>> updater);
|
||||
|
||||
Mono<byte[]> remove(byte[] key, LLDictionaryResultType resultType);
|
||||
|
||||
Flux<Entry<byte[], byte[]>> getMulti(@Nullable LLSnapshot snapshot, Flux<byte[]> keys);
|
||||
|
@ -10,6 +10,8 @@ import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
@ -106,6 +108,15 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
return dictionary.put(toKey(serializeSuffix(keySuffix)), serialize(value), LLDictionaryResultType.VOID).then();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> updateValue(T keySuffix, Function<Optional<U>, Optional<U>> updater) {
|
||||
return dictionary
|
||||
.update(toKey(serializeSuffix(keySuffix)),
|
||||
oldSerialized -> updater.apply(oldSerialized.map(this::deserialize)).map(this::serialize)
|
||||
)
|
||||
.then();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<U> putValueAndGetPrevious(T keySuffix, U value) {
|
||||
return dictionary
|
||||
|
@ -6,6 +6,8 @@ import it.cavallium.dbengine.database.LLDictionaryResultType;
|
||||
import it.cavallium.dbengine.database.LLRange;
|
||||
import it.cavallium.dbengine.database.LLSnapshot;
|
||||
import it.cavallium.dbengine.database.serialization.Serializer;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
@ -39,6 +41,13 @@ public class DatabaseSingle<U> implements DatabaseStageEntry<U> {
|
||||
return dictionary.put(key, serialize(value), LLDictionaryResultType.PREVIOUS_VALUE).map(this::deserialize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> update(Function<Optional<U>, Optional<U>> updater) {
|
||||
return dictionary.update(key,
|
||||
(oldValueSer) -> updater.apply(oldValueSer.map(this::deserialize)).map(this::serialize)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<U> clearAndGetPrevious() {
|
||||
return dictionary.remove(key, LLDictionaryResultType.PREVIOUS_VALUE).map(this::deserialize);
|
||||
|
@ -2,6 +2,8 @@ package it.cavallium.dbengine.database.collections;
|
||||
|
||||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import it.cavallium.dbengine.database.serialization.Serializer;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
@ -40,6 +42,11 @@ public class DatabaseSingleMapped<U> implements DatabaseStageEntry<U> {
|
||||
return serializedSingle.setAndGetStatus(serialize(value));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> update(Function<Optional<U>, Optional<U>> updater) {
|
||||
return serializedSingle.update(oldValue -> updater.apply(oldValue.map(this::deserialize)).map(this::serialize));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> clear() {
|
||||
return serializedSingle.clear();
|
||||
|
@ -2,6 +2,8 @@ package it.cavallium.dbengine.database.collections;
|
||||
|
||||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
@ -23,6 +25,8 @@ public interface DatabaseStage<T> extends DatabaseStageWithEntry<T> {
|
||||
return setAndGetPrevious(value).map(oldValue -> !Objects.equals(oldValue, value)).defaultIfEmpty(false);
|
||||
}
|
||||
|
||||
Mono<Void> update(Function<Optional<T>, Optional<T>> updater);
|
||||
|
||||
default Mono<Void> clear() {
|
||||
return clearAndGetStatus().then();
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ import it.cavallium.dbengine.database.collections.JoinerBlocking.ValueGetterBloc
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Flux;
|
||||
@ -28,6 +29,10 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
|
||||
return at(null, key).single().flatMap(v -> v.set(value));
|
||||
}
|
||||
|
||||
default Mono<Void> updateValue(T key, Function<Optional<U>, Optional<U>> updater) {
|
||||
return at(null, key).single().flatMap(v -> v.update(updater));
|
||||
}
|
||||
|
||||
default Mono<U> putValueAndGetPrevious(T key, U value) {
|
||||
return at(null, key).single().flatMap(v -> v.setAndGetPrevious(value));
|
||||
}
|
||||
@ -104,6 +109,19 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
|
||||
.collectMap(Entry::getKey, Entry::getValue, HashMap::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
default Mono<Void> update(Function<Optional<Map<T, U>>, Optional<Map<T, U>>> updater) {
|
||||
return this
|
||||
.getAllValues(null)
|
||||
.collectMap(Entry::getKey, Entry::getValue, HashMap::new)
|
||||
.single()
|
||||
.map(v -> v.isEmpty() ? Optional.<Map<T, U>>empty() : Optional.of(v))
|
||||
.map(updater)
|
||||
.filter(Optional::isPresent)
|
||||
.map(Optional::get)
|
||||
.flatMap(values -> this.setAllValues(Flux.fromIterable(values.entrySet())));
|
||||
}
|
||||
|
||||
@Override
|
||||
default Mono<Map<T, U>> clearAndGetPrevious() {
|
||||
return this.setAndGetPrevious(Map.of());
|
||||
|
@ -1,10 +1,12 @@
|
||||
package it.cavallium.dbengine.database.disk;
|
||||
|
||||
import it.cavallium.concurrentlocks.ReadWriteUpdateLock;
|
||||
import it.cavallium.dbengine.database.LLDictionary;
|
||||
import it.cavallium.dbengine.database.LLDictionaryResultType;
|
||||
import it.cavallium.dbengine.database.LLRange;
|
||||
import it.cavallium.dbengine.database.LLSnapshot;
|
||||
import it.cavallium.dbengine.database.LLUtils;
|
||||
import it.unimi.dsi.fastutil.ints.IntArrayList;
|
||||
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
@ -13,6 +15,8 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.function.Function;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
@ -28,6 +32,7 @@ import org.rocksdb.WriteOptions;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.warp.commonutils.concurrency.atomicity.NotAtomic;
|
||||
import org.warp.commonutils.locks.Striped;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Scheduler;
|
||||
@ -43,6 +48,7 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
static final int MULTI_GET_WINDOW = 500;
|
||||
static final WriteOptions BATCH_WRITE_OPTIONS = new WriteOptions().setLowPri(true);
|
||||
|
||||
private static final int STRIPES = 65536;
|
||||
private static final byte[] FIRST_KEY = new byte[]{};
|
||||
private static final byte[] NO_DATA = new byte[0];
|
||||
private static final ReadOptions EMPTY_READ_OPTIONS = new ReadOptions();
|
||||
@ -51,6 +57,7 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
private final String databaseName;
|
||||
private final Scheduler dbScheduler;
|
||||
private final Function<LLSnapshot, Snapshot> snapshotResolver;
|
||||
private final Striped<ReadWriteUpdateLock> itemsLock = Striped.readWriteUpdateLock(STRIPES);
|
||||
|
||||
public LLLocalDictionary(@NotNull RocksDB db,
|
||||
@NotNull ColumnFamilyHandle columnFamilyHandle,
|
||||
@ -87,20 +94,46 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
}
|
||||
}
|
||||
|
||||
private int getLockIndex(byte[] key) {
|
||||
return Arrays.hashCode(key) % STRIPES;
|
||||
}
|
||||
|
||||
private IntArrayList getLockIndices(List<byte[]> keys) {
|
||||
var list = new IntArrayList(keys.size());
|
||||
for (byte[] key : keys) {
|
||||
list.add(getLockIndex(key));
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
private IntArrayList getLockIndicesEntries(List<Entry<byte[], byte[]>> keys) {
|
||||
var list = new IntArrayList(keys.size());
|
||||
for (Entry<byte[], byte[]> key : keys) {
|
||||
list.add(getLockIndex(key.getKey()));
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<byte[]> get(@Nullable LLSnapshot snapshot, byte[] key) {
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
logger.trace("Reading {}", key);
|
||||
Holder<byte[]> data = new Holder<>();
|
||||
if (db.keyMayExist(cfh, resolveSnapshot(snapshot), key, data)) {
|
||||
if (data.getValue() != null) {
|
||||
return data.getValue();
|
||||
var lock = itemsLock.getAt(getLockIndex(key)).readLock();
|
||||
lock.lock();
|
||||
try {
|
||||
logger.trace("Reading {}", key);
|
||||
Holder<byte[]> data = new Holder<>();
|
||||
if (db.keyMayExist(cfh, resolveSnapshot(snapshot), key, data)) {
|
||||
if (data.getValue() != null) {
|
||||
return data.getValue();
|
||||
} else {
|
||||
return db.get(cfh, resolveSnapshot(snapshot), key);
|
||||
}
|
||||
} else {
|
||||
return db.get(cfh, resolveSnapshot(snapshot), key);
|
||||
return null;
|
||||
}
|
||||
} else {
|
||||
return null;
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
})
|
||||
.onErrorMap(IOException::new)
|
||||
@ -144,16 +177,22 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
private Mono<Boolean> containsKey(@Nullable LLSnapshot snapshot, byte[] key) {
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
int size = RocksDB.NOT_FOUND;
|
||||
Holder<byte[]> data = new Holder<>();
|
||||
if (db.keyMayExist(cfh, resolveSnapshot(snapshot), key, data)) {
|
||||
if (data.getValue() != null) {
|
||||
size = data.getValue().length;
|
||||
} else {
|
||||
size = db.get(cfh, resolveSnapshot(snapshot), key, NO_DATA);
|
||||
var lock = itemsLock.getAt(getLockIndex(key)).readLock();
|
||||
lock.lock();
|
||||
try {
|
||||
int size = RocksDB.NOT_FOUND;
|
||||
Holder<byte[]> data = new Holder<>();
|
||||
if (db.keyMayExist(cfh, resolveSnapshot(snapshot), key, data)) {
|
||||
if (data.getValue() != null) {
|
||||
size = data.getValue().length;
|
||||
} else {
|
||||
size = db.get(cfh, resolveSnapshot(snapshot), key, NO_DATA);
|
||||
}
|
||||
}
|
||||
return size != RocksDB.NOT_FOUND;
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
return size != RocksDB.NOT_FOUND;
|
||||
})
|
||||
.onErrorMap(IOException::new)
|
||||
.subscribeOn(dbScheduler);
|
||||
@ -164,9 +203,15 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
return getPrevValue(key, resultType)
|
||||
.concatWith(Mono
|
||||
.fromCallable(() -> {
|
||||
logger.trace("Writing {}: {}", key, value);
|
||||
db.put(cfh, key, value);
|
||||
return null;
|
||||
var lock = itemsLock.getAt(getLockIndex(key)).writeLock();
|
||||
lock.lock();
|
||||
try {
|
||||
logger.trace("Writing {}: {}", key, value);
|
||||
db.put(cfh, key, value);
|
||||
return null;
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
})
|
||||
.onErrorMap(IOException::new)
|
||||
.subscribeOn(dbScheduler)
|
||||
@ -174,13 +219,64 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
).singleOrEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> update(byte[] key, Function<Optional<byte[]>, Optional<byte[]>> value) {
|
||||
return Mono
|
||||
.<Void>fromCallable(() -> {
|
||||
var rwuLock = itemsLock.getAt(getLockIndex(key));
|
||||
rwuLock.updateLock().lock();
|
||||
try {
|
||||
Optional<byte[]> prevData;
|
||||
var prevDataHolder = new Holder<byte[]>();
|
||||
if (db.keyMayExist(cfh, key, prevDataHolder)) {
|
||||
if (prevDataHolder.getValue() != null) {
|
||||
prevData = Optional.ofNullable(prevDataHolder.getValue());
|
||||
} else {
|
||||
prevData = Optional.ofNullable(db.get(cfh, key));
|
||||
}
|
||||
} else {
|
||||
prevData = Optional.empty();
|
||||
}
|
||||
|
||||
Optional<byte[]> newData = value.apply(prevData);
|
||||
if (prevData.isPresent() && newData.isEmpty()) {
|
||||
rwuLock.writeLock().lock();
|
||||
try {
|
||||
db.delete(cfh, key);
|
||||
} finally {
|
||||
rwuLock.writeLock().unlock();
|
||||
}
|
||||
} else if (newData.isPresent()
|
||||
&& (prevData.isEmpty() || !Arrays.equals(prevData.get(), newData.get()))) {
|
||||
rwuLock.writeLock().lock();
|
||||
try {
|
||||
db.put(cfh, key, newData.get());
|
||||
} finally {
|
||||
rwuLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
return null;
|
||||
} finally {
|
||||
rwuLock.updateLock().unlock();
|
||||
}
|
||||
})
|
||||
.onErrorMap(IOException::new)
|
||||
.subscribeOn(dbScheduler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<byte[]> remove(byte[] key, LLDictionaryResultType resultType) {
|
||||
return getPrevValue(key, resultType)
|
||||
.concatWith(Mono
|
||||
.fromCallable(() -> {
|
||||
db.delete(cfh, key);
|
||||
return null;
|
||||
var lock = itemsLock.getAt(getLockIndex(key)).writeLock();
|
||||
lock.lock();
|
||||
try {
|
||||
db.delete(cfh, key);
|
||||
return null;
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
})
|
||||
.onErrorMap(IOException::new)
|
||||
.subscribeOn(dbScheduler)
|
||||
@ -196,16 +292,22 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
case PREVIOUS_VALUE:
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
logger.trace("Reading {}", key);
|
||||
var data = new Holder<byte[]>();
|
||||
if (db.keyMayExist(cfh, key, data)) {
|
||||
if (data.getValue() != null) {
|
||||
return data.getValue();
|
||||
var lock = itemsLock.getAt(getLockIndex(key)).readLock();
|
||||
lock.lock();
|
||||
try {
|
||||
logger.trace("Reading {}", key);
|
||||
var data = new Holder<byte[]>();
|
||||
if (db.keyMayExist(cfh, key, data)) {
|
||||
if (data.getValue() != null) {
|
||||
return data.getValue();
|
||||
} else {
|
||||
return db.get(cfh, key);
|
||||
}
|
||||
} else {
|
||||
return db.get(cfh, key);
|
||||
return null;
|
||||
}
|
||||
} else {
|
||||
return null;
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
})
|
||||
.onErrorMap(IOException::new)
|
||||
@ -225,19 +327,29 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
.flatMap(keysWindowFlux -> keysWindowFlux.collectList()
|
||||
.flatMapMany(keysWindow -> Mono
|
||||
.fromCallable(() -> {
|
||||
var handlesArray = new ColumnFamilyHandle[keysWindow.size()];
|
||||
Arrays.fill(handlesArray, cfh);
|
||||
var handles = ObjectArrayList.wrap(handlesArray, handlesArray.length);
|
||||
var results = db.multiGetAsList(resolveSnapshot(snapshot), handles, keysWindow);
|
||||
var mappedResults = new ArrayList<Entry<byte[], byte[]>>(results.size());
|
||||
for (int i = 0; i < results.size(); i++) {
|
||||
var val = results.get(i);
|
||||
if (val != null) {
|
||||
results.set(i, null);
|
||||
mappedResults.add(Map.entry(keysWindow.get(i), val));
|
||||
var locks = itemsLock.bulkGetAt(getLockIndices(keysWindow));
|
||||
for (ReadWriteLock lock : locks) {
|
||||
lock.readLock().lock();
|
||||
}
|
||||
try {
|
||||
var handlesArray = new ColumnFamilyHandle[keysWindow.size()];
|
||||
Arrays.fill(handlesArray, cfh);
|
||||
var handles = ObjectArrayList.wrap(handlesArray, handlesArray.length);
|
||||
var results = db.multiGetAsList(resolveSnapshot(snapshot), handles, keysWindow);
|
||||
var mappedResults = new ArrayList<Entry<byte[], byte[]>>(results.size());
|
||||
for (int i = 0; i < results.size(); i++) {
|
||||
var val = results.get(i);
|
||||
if (val != null) {
|
||||
results.set(i, null);
|
||||
mappedResults.add(Map.entry(keysWindow.get(i), val));
|
||||
}
|
||||
}
|
||||
return mappedResults;
|
||||
} finally {
|
||||
for (ReadWriteLock lock : locks) {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
return mappedResults;
|
||||
})
|
||||
.subscribeOn(dbScheduler)
|
||||
.flatMapMany(Flux::fromIterable)
|
||||
@ -255,18 +367,28 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
.getMulti(null, Flux.fromIterable(entriesWindow).map(Entry::getKey))
|
||||
.publishOn(dbScheduler)
|
||||
.concatWith(Mono.fromCallable(() -> {
|
||||
var batch = new CappedWriteBatch(db,
|
||||
CAPPED_WRITE_BATCH_CAP,
|
||||
RESERVED_WRITE_BATCH_SIZE,
|
||||
MAX_WRITE_BATCH_SIZE,
|
||||
BATCH_WRITE_OPTIONS
|
||||
);
|
||||
for (Entry<byte[], byte[]> entry : entriesWindow) {
|
||||
batch.put(entry.getKey(), entry.getValue());
|
||||
var locks = itemsLock.bulkGetAt(getLockIndicesEntries(entriesWindow));
|
||||
for (ReadWriteLock lock : locks) {
|
||||
lock.writeLock().lock();
|
||||
}
|
||||
try {
|
||||
var batch = new CappedWriteBatch(db,
|
||||
CAPPED_WRITE_BATCH_CAP,
|
||||
RESERVED_WRITE_BATCH_SIZE,
|
||||
MAX_WRITE_BATCH_SIZE,
|
||||
BATCH_WRITE_OPTIONS
|
||||
);
|
||||
for (Entry<byte[], byte[]> entry : entriesWindow) {
|
||||
batch.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
batch.writeToDbAndClose();
|
||||
batch.close();
|
||||
return null;
|
||||
} finally {
|
||||
for (ReadWriteLock lock : locks) {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
batch.writeToDbAndClose();
|
||||
batch.close();
|
||||
return null;
|
||||
})));
|
||||
}
|
||||
|
||||
@ -476,6 +598,7 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
.subscribeOn(dbScheduler);
|
||||
}
|
||||
|
||||
//todo: replace implementation with a simple Flux.push
|
||||
@Override
|
||||
public Flux<Entry<byte[], byte[]>> setRange(LLRange range,
|
||||
Flux<Entry<byte[], byte[]>> entries,
|
||||
|
@ -55,7 +55,6 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
|
||||
private final ConcurrentHashMap<Long, Snapshot> snapshotsHandles = new ConcurrentHashMap<>();
|
||||
private final AtomicLong nextSnapshotNumbers = new AtomicLong(1);
|
||||
|
||||
@SuppressWarnings("CommentedOutCode")
|
||||
public LLLocalKeyValueDatabase(String name, Path path, List<Column> columns, List<ColumnFamilyHandle> handles,
|
||||
boolean crashIfWalError, boolean lowMemory) throws IOException {
|
||||
Options options = openRocksDb(path, crashIfWalError, lowMemory);
|
||||
|
Loading…
Reference in New Issue
Block a user