Return true if update() data changed
This commit is contained in:
parent
871159641c
commit
46bd61e817
@ -17,7 +17,7 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
|
|||||||
|
|
||||||
Mono<byte[]> put(byte[] key, byte[] value, LLDictionaryResultType resultType);
|
Mono<byte[]> put(byte[] key, byte[] value, LLDictionaryResultType resultType);
|
||||||
|
|
||||||
Mono<Void> update(byte[] key, Function<Optional<byte[]>, Optional<byte[]>> updater);
|
Mono<Boolean> update(byte[] key, Function<Optional<byte[]>, Optional<byte[]>> updater);
|
||||||
|
|
||||||
Mono<byte[]> remove(byte[] key, LLDictionaryResultType resultType);
|
Mono<byte[]> remove(byte[] key, LLDictionaryResultType resultType);
|
||||||
|
|
||||||
|
@ -11,6 +11,7 @@ import java.util.HashMap;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import org.jetbrains.annotations.Nullable;
|
import org.jetbrains.annotations.Nullable;
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
@ -109,12 +110,11 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Mono<Void> updateValue(T keySuffix, Function<Optional<U>, Optional<U>> updater) {
|
public Mono<Boolean> updateValue(T keySuffix, Function<Optional<U>, Optional<U>> updater) {
|
||||||
return dictionary
|
AtomicBoolean changed = new AtomicBoolean(false);
|
||||||
.update(toKey(serializeSuffix(keySuffix)),
|
return dictionary.update(toKey(serializeSuffix(keySuffix)),
|
||||||
oldSerialized -> updater.apply(oldSerialized.map(this::deserialize)).map(this::serialize)
|
oldSerialized -> updater.apply(oldSerialized.map(this::deserialize)).map(this::serialize)
|
||||||
)
|
);
|
||||||
.then();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -42,7 +42,7 @@ public class DatabaseSingle<U> implements DatabaseStageEntry<U> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Mono<Void> update(Function<Optional<U>, Optional<U>> updater) {
|
public Mono<Boolean> update(Function<Optional<U>, Optional<U>> updater) {
|
||||||
return dictionary.update(key,
|
return dictionary.update(key,
|
||||||
(oldValueSer) -> updater.apply(oldValueSer.map(this::deserialize)).map(this::serialize)
|
(oldValueSer) -> updater.apply(oldValueSer.map(this::deserialize)).map(this::serialize)
|
||||||
);
|
);
|
||||||
|
@ -43,7 +43,7 @@ public class DatabaseSingleMapped<U> implements DatabaseStageEntry<U> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Mono<Void> update(Function<Optional<U>, Optional<U>> updater) {
|
public Mono<Boolean> update(Function<Optional<U>, Optional<U>> updater) {
|
||||||
return serializedSingle.update(oldValue -> updater.apply(oldValue.map(this::deserialize)).map(this::serialize));
|
return serializedSingle.update(oldValue -> updater.apply(oldValue.map(this::deserialize)).map(this::serialize));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -25,7 +25,7 @@ public interface DatabaseStage<T> extends DatabaseStageWithEntry<T> {
|
|||||||
return setAndGetPrevious(value).map(oldValue -> !Objects.equals(oldValue, value)).defaultIfEmpty(false);
|
return setAndGetPrevious(value).map(oldValue -> !Objects.equals(oldValue, value)).defaultIfEmpty(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
Mono<Void> update(Function<Optional<T>, Optional<T>> updater);
|
Mono<Boolean> update(Function<Optional<T>, Optional<T>> updater);
|
||||||
|
|
||||||
default Mono<Void> clear() {
|
default Mono<Void> clear() {
|
||||||
return clearAndGetStatus().then();
|
return clearAndGetStatus().then();
|
||||||
|
@ -29,7 +29,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
|
|||||||
return at(null, key).single().flatMap(v -> v.set(value));
|
return at(null, key).single().flatMap(v -> v.set(value));
|
||||||
}
|
}
|
||||||
|
|
||||||
default Mono<Void> updateValue(T key, Function<Optional<U>, Optional<U>> updater) {
|
default Mono<Boolean> updateValue(T key, Function<Optional<U>, Optional<U>> updater) {
|
||||||
return at(null, key).single().flatMap(v -> v.update(updater));
|
return at(null, key).single().flatMap(v -> v.update(updater));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -110,7 +110,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
default Mono<Void> update(Function<Optional<Map<T, U>>, Optional<Map<T, U>>> updater) {
|
default Mono<Boolean> update(Function<Optional<Map<T, U>>, Optional<Map<T, U>>> updater) {
|
||||||
return this
|
return this
|
||||||
.getAllValues(null)
|
.getAllValues(null)
|
||||||
.collectMap(Entry::getKey, Entry::getValue, HashMap::new)
|
.collectMap(Entry::getKey, Entry::getValue, HashMap::new)
|
||||||
@ -119,7 +119,9 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
|
|||||||
.map(updater)
|
.map(updater)
|
||||||
.filter(Optional::isPresent)
|
.filter(Optional::isPresent)
|
||||||
.map(Optional::get)
|
.map(Optional::get)
|
||||||
.flatMap(values -> this.setAllValues(Flux.fromIterable(values.entrySet())));
|
.flatMap(values -> this.setAllValues(Flux.fromIterable(values.entrySet())))
|
||||||
|
//todo: can be optimized by calculating the correct return value
|
||||||
|
.thenReturn(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -220,9 +220,9 @@ public class LLLocalDictionary implements LLDictionary {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Mono<Void> update(byte[] key, Function<Optional<byte[]>, Optional<byte[]>> value) {
|
public Mono<Boolean> update(byte[] key, Function<Optional<byte[]>, Optional<byte[]>> value) {
|
||||||
return Mono
|
return Mono
|
||||||
.<Void>fromCallable(() -> {
|
.fromCallable(() -> {
|
||||||
var rwuLock = itemsLock.getAt(getLockIndex(key));
|
var rwuLock = itemsLock.getAt(getLockIndex(key));
|
||||||
rwuLock.updateLock().lock();
|
rwuLock.updateLock().lock();
|
||||||
try {
|
try {
|
||||||
@ -238,10 +238,12 @@ public class LLLocalDictionary implements LLDictionary {
|
|||||||
prevData = Optional.empty();
|
prevData = Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean changed = false;
|
||||||
Optional<byte[]> newData = value.apply(prevData);
|
Optional<byte[]> newData = value.apply(prevData);
|
||||||
if (prevData.isPresent() && newData.isEmpty()) {
|
if (prevData.isPresent() && newData.isEmpty()) {
|
||||||
rwuLock.writeLock().lock();
|
rwuLock.writeLock().lock();
|
||||||
try {
|
try {
|
||||||
|
changed = true;
|
||||||
db.delete(cfh, key);
|
db.delete(cfh, key);
|
||||||
} finally {
|
} finally {
|
||||||
rwuLock.writeLock().unlock();
|
rwuLock.writeLock().unlock();
|
||||||
@ -250,12 +252,13 @@ public class LLLocalDictionary implements LLDictionary {
|
|||||||
&& (prevData.isEmpty() || !Arrays.equals(prevData.get(), newData.get()))) {
|
&& (prevData.isEmpty() || !Arrays.equals(prevData.get(), newData.get()))) {
|
||||||
rwuLock.writeLock().lock();
|
rwuLock.writeLock().lock();
|
||||||
try {
|
try {
|
||||||
|
changed = true;
|
||||||
db.put(cfh, key, newData.get());
|
db.put(cfh, key, newData.get());
|
||||||
} finally {
|
} finally {
|
||||||
rwuLock.writeLock().unlock();
|
rwuLock.writeLock().unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return null;
|
return changed;
|
||||||
} finally {
|
} finally {
|
||||||
rwuLock.updateLock().unlock();
|
rwuLock.updateLock().unlock();
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user