Update LLLuceneIndex.java and LLLocalLuceneIndex.java

This commit is contained in:
Andrea Cavalli 2021-01-30 19:57:50 +01:00
parent 68bd86567c
commit ad2ac618aa
2 changed files with 86 additions and 66 deletions

View File

@ -1,9 +1,11 @@
package it.cavallium.dbengine.database; package it.cavallium.dbengine.database;
import java.util.Map;
import java.util.Set; import java.util.Set;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
public interface LLLuceneIndex extends LLSnapshottable { public interface LLLuceneIndex extends LLSnapshottable {
@ -11,13 +13,13 @@ public interface LLLuceneIndex extends LLSnapshottable {
Mono<Void> addDocument(LLTerm id, LLDocument doc); Mono<Void> addDocument(LLTerm id, LLDocument doc);
Mono<Void> addDocuments(Iterable<LLTerm> keys, Iterable<LLDocument> documents); Mono<Void> addDocuments(Flux<GroupedFlux<LLTerm, LLDocument>> documents);
Mono<Void> deleteDocument(LLTerm id); Mono<Void> deleteDocument(LLTerm id);
Mono<Void> updateDocument(LLTerm id, LLDocument document); Mono<Void> updateDocument(LLTerm id, LLDocument document);
Mono<Void> updateDocuments(Iterable<LLTerm> ids, Iterable<LLDocument> documents); Mono<Void> updateDocuments(Flux<GroupedFlux<LLTerm, LLDocument>> documents);
Mono<Void> deleteAll(); Mono<Void> deleteAll();
@ -29,7 +31,7 @@ public interface LLLuceneIndex extends LLSnapshottable {
* @return the collection has one or more flux * @return the collection has one or more flux
*/ */
Mono<LLSearchResult> moreLikeThis(@Nullable LLSnapshot snapshot, Mono<LLSearchResult> moreLikeThis(@Nullable LLSnapshot snapshot,
Map<String, Set<String>> mltDocumentFields, Flux<Tuple2<String, Set<String>>> mltDocumentFields,
int limit, int limit,
String keyFieldName); String keyFieldName);

View File

@ -18,6 +18,7 @@ import it.cavallium.luceneserializer.luceneserializer.QueryParser;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.Duration; import java.time.Duration;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
@ -45,6 +46,7 @@ import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.concurrency.executor.ScheduledTaskLifecycle; import org.warp.commonutils.concurrency.executor.ScheduledTaskLifecycle;
import org.warp.commonutils.type.ShortNamedThreadFactory; import org.warp.commonutils.type.ShortNamedThreadFactory;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmissionException; import reactor.core.publisher.Sinks.EmissionException;
@ -52,6 +54,7 @@ import reactor.core.publisher.Sinks.EmitResult;
import reactor.core.publisher.Sinks.Many; import reactor.core.publisher.Sinks.Many;
import reactor.core.publisher.Sinks.One; import reactor.core.publisher.Sinks.One;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples; import reactor.util.function.Tuples;
public class LLLocalLuceneIndex implements LLLuceneIndex { public class LLLocalLuceneIndex implements LLLuceneIndex {
@ -196,13 +199,21 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
} }
@Override @Override
public Mono<Void> addDocuments(Iterable<LLTerm> keys, Iterable<LLDocument> docs) { public Mono<Void> addDocuments(Flux<GroupedFlux<LLTerm, LLDocument>> documents) {
return Mono.<Void>fromCallable(() -> { return documents
indexWriter.addDocuments(LLUtils.toDocuments(docs)); .flatMap(group -> group
return null; .collectList()
}).subscribeOn(Schedulers.boundedElastic()); .flatMap(docs -> Mono
.<Void>fromCallable(() -> {
indexWriter.addDocuments(LLUtils.toDocuments(docs));
return null;
})
.subscribeOn(Schedulers.boundedElastic()))
)
.then();
} }
@Override @Override
public Mono<Void> deleteDocument(LLTerm id) { public Mono<Void> deleteDocument(LLTerm id) {
return Mono.<Void>fromCallable(() -> { return Mono.<Void>fromCallable(() -> {
@ -220,18 +231,21 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
} }
@Override @Override
public Mono<Void> updateDocuments(Iterable<LLTerm> ids, Iterable<LLDocument> documents) { public Mono<Void> updateDocuments(Flux<GroupedFlux<LLTerm, LLDocument>> documents) {
return Mono.<Void>fromCallable(() -> { return documents.flatMap(this::updateDocuments).then();
var idIt = ids.iterator(); }
var docIt = documents.iterator();
while (idIt.hasNext()) {
var id = idIt.next();
var doc = docIt.next();
indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(doc)); private Mono<Void> updateDocuments(GroupedFlux<LLTerm, LLDocument> documents) {
} return documents
return null; .map(LLUtils::toDocument)
}).subscribeOn(Schedulers.boundedElastic()); .collectList()
.flatMap(luceneDocuments -> Mono
.<Void>fromCallable(() -> {
indexWriter.updateDocuments(LLUtils.toTerm(documents.key()), luceneDocuments);
return null;
})
.subscribeOn(Schedulers.boundedElastic())
);
} }
@Override @Override
@ -271,58 +285,62 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
@SuppressWarnings({"Convert2MethodRef", "unchecked", "rawtypes"}) @SuppressWarnings({"Convert2MethodRef", "unchecked", "rawtypes"})
@Override @Override
public Mono<LLSearchResult> moreLikeThis(@Nullable LLSnapshot snapshot, public Mono<LLSearchResult> moreLikeThis(@Nullable LLSnapshot snapshot,
Map<String, Set<String>> mltDocumentFields, Flux<Tuple2<String, Set<String>>> mltDocumentFieldsFlux,
int limit, int limit,
String keyFieldName) { String keyFieldName) {
if (mltDocumentFields.isEmpty()) { return mltDocumentFieldsFlux
return Mono.just(LLSearchResult.empty()); .collectMap(Tuple2::getT1, Tuple2::getT2, HashMap::new)
} .flatMap(mltDocumentFields -> {
if (mltDocumentFields.isEmpty()) {
return Mono.just(LLSearchResult.empty());
}
return acquireSearcherWrapper(snapshot) return acquireSearcherWrapper(snapshot)
.flatMap(indexSearcher -> Mono .flatMap(indexSearcher -> Mono
.fromCallable(() -> { .fromCallable(() -> {
var mlt = new MoreLikeThis(indexSearcher.getIndexReader()); var mlt = new MoreLikeThis(indexSearcher.getIndexReader());
mlt.setAnalyzer(indexWriter.getAnalyzer()); mlt.setAnalyzer(indexWriter.getAnalyzer());
mlt.setFieldNames(mltDocumentFields.keySet().toArray(String[]::new)); mlt.setFieldNames(mltDocumentFields.keySet().toArray(String[]::new));
mlt.setMinTermFreq(1); mlt.setMinTermFreq(1);
//mlt.setMinDocFreq(1); //mlt.setMinDocFreq(1);
mlt.setBoost(true); mlt.setBoost(true);
// Get the reference doc and apply it to MoreLikeThis, to generate the query // Get the reference doc and apply it to MoreLikeThis, to generate the query
return mlt.like((Map) mltDocumentFields); return mlt.like((Map) mltDocumentFields);
}) })
.subscribeOn(Schedulers.boundedElastic()) .subscribeOn(Schedulers.boundedElastic())
.flatMap(query -> Mono .flatMap(query -> Mono
.fromCallable(() -> { .fromCallable(() -> {
One<Long> totalHitsCountSink = Sinks.one(); One<Long> totalHitsCountSink = Sinks.one();
Many<LLKeyScore> topKeysSink = Sinks.many().unicast().onBackpressureBuffer(new ArrayBlockingQueue<>(1000)); Many<LLKeyScore> topKeysSink = Sinks.many().unicast().onBackpressureBuffer(new ArrayBlockingQueue<>(1000));
streamSearcher.search(indexSearcher, streamSearcher.search(indexSearcher,
query, query,
limit, limit,
null, null,
ScoreMode.COMPLETE, ScoreMode.COMPLETE,
keyFieldName, keyFieldName,
keyScore -> { keyScore -> {
EmitResult result = topKeysSink.tryEmitNext(keyScore); EmitResult result = topKeysSink.tryEmitNext(keyScore);
if (result.isFailure()) { if (result.isFailure()) {
throw new EmissionException(result); throw new EmissionException(result);
} }
}, },
totalHitsCount -> { totalHitsCount -> {
EmitResult result = totalHitsCountSink.tryEmitValue(totalHitsCount); EmitResult result = totalHitsCountSink.tryEmitValue(totalHitsCount);
if (result.isFailure()) { if (result.isFailure()) {
throw new EmissionException(result); throw new EmissionException(result);
} }
}); });
return new LLSearchResult(totalHitsCountSink.asMono(), Flux.just(topKeysSink.asFlux())); return new LLSearchResult(totalHitsCountSink.asMono(), Flux.just(topKeysSink.asFlux()));
}).subscribeOn(Schedulers.boundedElastic()) }).subscribeOn(Schedulers.boundedElastic())
).then() ).then()
.materialize() .materialize()
.flatMap(value -> releaseSearcherWrapper(snapshot, indexSearcher).thenReturn(value)) .flatMap(value -> releaseSearcherWrapper(snapshot, indexSearcher).thenReturn(value))
.dematerialize() .dematerialize()
); );
});
} }
@SuppressWarnings("Convert2MethodRef") @SuppressWarnings("Convert2MethodRef")