Deduplicate code

This commit is contained in:
Andrea Cavalli 2021-01-30 00:33:36 +01:00
parent 538e5c51ac
commit 28614db942

View File

@ -6,10 +6,8 @@ import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Objects; import java.util.Objects;
@ -246,25 +244,7 @@ public class LLLocalDictionary implements LLDictionary {
)) ))
.subscribeOn(Schedulers.boundedElastic()) .subscribeOn(Schedulers.boundedElastic())
.flatMapMany(writeBatch -> entries .flatMapMany(writeBatch -> entries
.flatMap(newEntry -> Mono .flatMap(newEntry -> putEntryToWriteBatch(newEntry, getOldValues, writeBatch))
.defer(() -> {
if (getOldValues) {
return get(null, newEntry.getKey());
} else {
return Mono.empty();
}
})
.concatWith(Mono
.<byte[]>fromCallable(() -> {
synchronized (writeBatch) {
writeBatch.put(cfh, newEntry.getKey(), newEntry.getValue());
}
return null;
})
.subscribeOn(Schedulers.boundedElastic())
)
.map(oldValue -> Map.entry(newEntry.getKey(), oldValue))
)
.concatWith(Mono .concatWith(Mono
.<Entry<byte[], byte[]>>fromCallable(() -> { .<Entry<byte[], byte[]>>fromCallable(() -> {
synchronized (writeBatch) { synchronized (writeBatch) {
@ -284,12 +264,25 @@ public class LLLocalDictionary implements LLDictionary {
.onErrorMap(IOException::new); .onErrorMap(IOException::new);
} }
private static List<ColumnFamilyHandle> newCfhList(ColumnFamilyHandle cfh, int size) { @NotNull
var list = new ArrayList<ColumnFamilyHandle>(size); private Mono<Entry<byte[], byte[]>> putEntryToWriteBatch(Entry<byte[], byte[]> newEntry, boolean getOldValues,
for (int i = 0; i < size; i++) { CappedWriteBatch writeBatch) {
list.add(cfh); return Mono.from(Mono
} .<byte[]>defer(() -> {
return list; if (getOldValues) {
return get(null, newEntry.getKey());
} else {
return Mono.empty();
}
})
.concatWith(Mono.<byte[]>fromCallable(() -> {
synchronized (writeBatch) {
writeBatch.put(cfh, newEntry.getKey(), newEntry.getValue());
}
return null;
})
.subscribeOn(Schedulers.boundedElastic()))
.map(oldValue -> Map.entry(newEntry.getKey(), oldValue)));
} }
@Override @Override
@ -396,29 +389,12 @@ public class LLLocalDictionary implements LLDictionary {
}) })
.subscribeOn(Schedulers.boundedElastic()) .subscribeOn(Schedulers.boundedElastic())
.thenMany(entries) .thenMany(entries)
.flatMap(newEntry -> Mono .flatMap(newEntry -> putEntryToWriteBatch(newEntry, getOldValues, writeBatch))
.defer(() -> {
if (getOldValues) {
return get(null, newEntry.getKey());
} else {
return Mono.empty();
}
})
.concatWith(Mono
.<byte[]>fromCallable(() -> {
synchronized (writeBatch) {
writeBatch.put(cfh, newEntry.getKey(), newEntry.getValue());
}
return null;
})
.subscribeOn(Schedulers.boundedElastic())
)
.map(oldValue -> Map.entry(newEntry.getKey(), oldValue))
)
.concatWith(Mono .concatWith(Mono
.<Entry<byte[], byte[]>>fromCallable(() -> { .<Entry<byte[], byte[]>>fromCallable(() -> {
synchronized (writeBatch) { synchronized (writeBatch) {
writeBatch.writeToDbAndClose(); writeBatch.writeToDbAndClose();
writeBatch.close();
} }
return null; return null;
}) })