Deduplicate code
This commit is contained in:
parent
28614db942
commit
74fdb752b4
@ -152,33 +152,7 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
|
||||
@Override
|
||||
public Mono<byte[]> put(byte[] key, byte[] value, LLDictionaryResultType resultType) {
|
||||
Mono<byte[]> response = null;
|
||||
switch (resultType) {
|
||||
case VALUE_CHANGED:
|
||||
response = containsKey(null, key).single().map(LLUtils::booleanToResponse);
|
||||
break;
|
||||
case PREVIOUS_VALUE:
|
||||
response = Mono
|
||||
.fromCallable(() -> {
|
||||
var data = new Holder<byte[]>();
|
||||
if (db.keyMayExist(cfh, key, data)) {
|
||||
if (data.getValue() != null) {
|
||||
return data.getValue();
|
||||
} else {
|
||||
return db.get(cfh, key);
|
||||
}
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
})
|
||||
.onErrorMap(IOException::new)
|
||||
.subscribeOn(Schedulers.boundedElastic());
|
||||
break;
|
||||
case VOID:
|
||||
response = Mono.empty();
|
||||
break;
|
||||
}
|
||||
|
||||
Mono<byte[]> response = getPrevValue(key, resultType);
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
db.put(cfh, key, value);
|
||||
@ -191,13 +165,23 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
|
||||
@Override
|
||||
public Mono<byte[]> remove(byte[] key, LLDictionaryResultType resultType) {
|
||||
Mono<byte[]> response = null;
|
||||
Mono<byte[]> response = getPrevValue(key, resultType);
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
db.delete(cfh, key);
|
||||
return null;
|
||||
})
|
||||
.onErrorMap(IOException::new)
|
||||
.subscribeOn(Schedulers.boundedElastic())
|
||||
.then(response);
|
||||
}
|
||||
|
||||
private Mono<byte[]> getPrevValue(byte[] key, LLDictionaryResultType resultType) {
|
||||
switch (resultType) {
|
||||
case VALUE_CHANGED:
|
||||
response = containsKey(null, key).single().map(LLUtils::booleanToResponse);
|
||||
break;
|
||||
return containsKey(null, key).single().map(LLUtils::booleanToResponse);
|
||||
case PREVIOUS_VALUE:
|
||||
response = Mono
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
var data = new Holder<byte[]>();
|
||||
if (db.keyMayExist(cfh, key, data)) {
|
||||
@ -212,20 +196,11 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
})
|
||||
.onErrorMap(IOException::new)
|
||||
.subscribeOn(Schedulers.boundedElastic());
|
||||
break;
|
||||
case VOID:
|
||||
response = Mono.empty();
|
||||
break;
|
||||
return Mono.empty();
|
||||
default:
|
||||
return Mono.error(new IllegalStateException("Unexpected value: " + resultType));
|
||||
}
|
||||
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
db.delete(cfh, key);
|
||||
return null;
|
||||
})
|
||||
.onErrorMap(IOException::new)
|
||||
.subscribeOn(Schedulers.boundedElastic())
|
||||
.then(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -268,7 +243,7 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
private Mono<Entry<byte[], byte[]>> putEntryToWriteBatch(Entry<byte[], byte[]> newEntry, boolean getOldValues,
|
||||
CappedWriteBatch writeBatch) {
|
||||
return Mono.from(Mono
|
||||
.<byte[]>defer(() -> {
|
||||
.defer(() -> {
|
||||
if (getOldValues) {
|
||||
return get(null, newEntry.getKey());
|
||||
} else {
|
||||
|
Loading…
x
Reference in New Issue
Block a user