diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index d837e8c..0f222c5 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -17,7 +17,6 @@ import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicLong; @@ -26,8 +25,10 @@ import org.warp.commonutils.batch.ParallelUtils; import org.warp.commonutils.functional.IOBiConsumer; import org.warp.commonutils.functional.TriFunction; import reactor.core.publisher.Flux; +import reactor.core.publisher.GroupedFlux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; +import reactor.util.function.Tuple2; import reactor.util.function.Tuples; public class LLLocalMultiLuceneIndex implements LLLuceneIndex { @@ -88,8 +89,8 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { } @Override - public Mono addDocuments(Iterable keys, Iterable documents) { - return runPerInstance(keys, documents, LLLuceneIndex::addDocuments); + public Mono addDocuments(Flux> documents) { + return documents.flatMap(docs -> getLuceneIndex(docs.key()).addDocuments(documents)).then(); } private Mono runPerInstance(Iterable keys, @@ -131,8 +132,8 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { } @Override - public Mono updateDocuments(Iterable keys, Iterable documents) { - return runPerInstance(keys, documents, LLLuceneIndex::updateDocuments); + public Mono updateDocuments(Flux> documents) { + return documents.flatMap(docs -> getLuceneIndex(docs.key()).updateDocuments(documents)).then(); } @Override @@ -153,7 +154,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { @Override public Mono moreLikeThis(@Nullable LLSnapshot snapshot, - Map> mltDocumentFields, + Flux>> mltDocumentFields, int limit, String keyFieldName) { return Flux