Update LLLocalMultiLuceneIndex.java

This commit is contained in:
Andrea Cavalli 2021-01-30 20:01:22 +01:00
parent ad2ac618aa
commit abe1f35544

View File

@ -17,7 +17,6 @@ import java.nio.file.Path;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -26,8 +25,10 @@ import org.warp.commonutils.batch.ParallelUtils;
import org.warp.commonutils.functional.IOBiConsumer; import org.warp.commonutils.functional.IOBiConsumer;
import org.warp.commonutils.functional.TriFunction; import org.warp.commonutils.functional.TriFunction;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples; import reactor.util.function.Tuples;
public class LLLocalMultiLuceneIndex implements LLLuceneIndex { public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
@ -88,8 +89,8 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
} }
@Override @Override
public Mono<Void> addDocuments(Iterable<LLTerm> keys, Iterable<LLDocument> documents) { public Mono<Void> addDocuments(Flux<GroupedFlux<LLTerm, LLDocument>> documents) {
return runPerInstance(keys, documents, LLLuceneIndex::addDocuments); return documents.flatMap(docs -> getLuceneIndex(docs.key()).addDocuments(documents)).then();
} }
private Mono<Void> runPerInstance(Iterable<LLTerm> keys, private Mono<Void> runPerInstance(Iterable<LLTerm> keys,
@ -131,8 +132,8 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
} }
@Override @Override
public Mono<Void> updateDocuments(Iterable<LLTerm> keys, Iterable<LLDocument> documents) { public Mono<Void> updateDocuments(Flux<GroupedFlux<LLTerm, LLDocument>> documents) {
return runPerInstance(keys, documents, LLLuceneIndex::updateDocuments); return documents.flatMap(docs -> getLuceneIndex(docs.key()).updateDocuments(documents)).then();
} }
@Override @Override
@ -153,7 +154,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
@Override @Override
public Mono<LLSearchResult> moreLikeThis(@Nullable LLSnapshot snapshot, public Mono<LLSearchResult> moreLikeThis(@Nullable LLSnapshot snapshot,
Map<String, Set<String>> mltDocumentFields, Flux<Tuple2<String, Set<String>>> mltDocumentFields,
int limit, int limit,
String keyFieldName) { String keyFieldName) {
return Flux return Flux