From 28614db9428d4df68367dbbd38c567939e2cd8b9 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sat, 30 Jan 2021 00:33:36 +0100 Subject: [PATCH] Deduplicate code --- .../database/disk/LLLocalDictionary.java | 68 ++++++------------- 1 file changed, 22 insertions(+), 46 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 15fbf1c..4df101a 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -6,10 +6,8 @@ import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; @@ -246,25 +244,7 @@ public class LLLocalDictionary implements LLDictionary { )) .subscribeOn(Schedulers.boundedElastic()) .flatMapMany(writeBatch -> entries - .flatMap(newEntry -> Mono - .defer(() -> { - if (getOldValues) { - return get(null, newEntry.getKey()); - } else { - return Mono.empty(); - } - }) - .concatWith(Mono - .fromCallable(() -> { - synchronized (writeBatch) { - writeBatch.put(cfh, newEntry.getKey(), newEntry.getValue()); - } - return null; - }) - .subscribeOn(Schedulers.boundedElastic()) - ) - .map(oldValue -> Map.entry(newEntry.getKey(), oldValue)) - ) + .flatMap(newEntry -> putEntryToWriteBatch(newEntry, getOldValues, writeBatch)) .concatWith(Mono .>fromCallable(() -> { synchronized (writeBatch) { @@ -284,12 +264,25 @@ public class LLLocalDictionary implements LLDictionary { .onErrorMap(IOException::new); } - private static List newCfhList(ColumnFamilyHandle cfh, int size) { - var list = new ArrayList(size); - for (int i = 0; i < size; i++) { - list.add(cfh); - } - return list; + @NotNull + private Mono> putEntryToWriteBatch(Entry newEntry, boolean getOldValues, + CappedWriteBatch writeBatch) { + return Mono.from(Mono + .defer(() -> { + if (getOldValues) { + return get(null, newEntry.getKey()); + } else { + return Mono.empty(); + } + }) + .concatWith(Mono.fromCallable(() -> { + synchronized (writeBatch) { + writeBatch.put(cfh, newEntry.getKey(), newEntry.getValue()); + } + return null; + }) + .subscribeOn(Schedulers.boundedElastic())) + .map(oldValue -> Map.entry(newEntry.getKey(), oldValue))); } @Override @@ -396,29 +389,12 @@ public class LLLocalDictionary implements LLDictionary { }) .subscribeOn(Schedulers.boundedElastic()) .thenMany(entries) - .flatMap(newEntry -> Mono - .defer(() -> { - if (getOldValues) { - return get(null, newEntry.getKey()); - } else { - return Mono.empty(); - } - }) - .concatWith(Mono - .fromCallable(() -> { - synchronized (writeBatch) { - writeBatch.put(cfh, newEntry.getKey(), newEntry.getValue()); - } - return null; - }) - .subscribeOn(Schedulers.boundedElastic()) - ) - .map(oldValue -> Map.entry(newEntry.getKey(), oldValue)) - ) + .flatMap(newEntry -> putEntryToWriteBatch(newEntry, getOldValues, writeBatch)) .concatWith(Mono .>fromCallable(() -> { synchronized (writeBatch) { writeBatch.writeToDbAndClose(); + writeBatch.close(); } return null; })