Clean search code

This commit is contained in:
Andrea Cavalli 2021-02-27 19:05:13 +01:00
parent 3c715affcf
commit 050d77d359
4 changed files with 134 additions and 119 deletions

View File

@ -29,11 +29,9 @@ import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.math3.exception.NumberIsTooLargeException;
import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.IndexWriterConfig;
@ -42,11 +40,13 @@ import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.queries.mlt.MoreLikeThis; import org.apache.lucene.queries.mlt.MoreLikeThis;
import org.apache.lucene.search.BooleanClause.Occur; import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.ConstantScoreQuery;
import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query; import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.SearcherManager; import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.Sort; import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.similarities.Similarity; import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.FSDirectory;
@ -60,7 +60,6 @@ import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmissionException; import reactor.core.publisher.Sinks.EmissionException;
import reactor.core.publisher.Sinks.EmitResult; import reactor.core.publisher.Sinks.EmitResult;
import reactor.core.publisher.Sinks.Empty;
import reactor.core.publisher.Sinks.Many; import reactor.core.publisher.Sinks.Many;
import reactor.core.publisher.Sinks.One; import reactor.core.publisher.Sinks.One;
import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Scheduler;
@ -390,6 +389,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
return mltDocumentFieldsFlux return mltDocumentFieldsFlux
.collectMap(Tuple2::getT1, Tuple2::getT2, HashMap::new) .collectMap(Tuple2::getT1, Tuple2::getT2, HashMap::new)
.flatMap(mltDocumentFields -> { .flatMap(mltDocumentFields -> {
mltDocumentFields.entrySet().removeIf(entry -> entry.getValue().isEmpty());
if (mltDocumentFields.isEmpty()) { if (mltDocumentFields.isEmpty()) {
return Mono.just(LLSearchResult.empty()); return Mono.just(LLSearchResult.empty());
} }
@ -411,63 +411,28 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.subscribeOn(luceneBlockingScheduler) .subscribeOn(luceneBlockingScheduler)
.flatMap(mltQuery -> Mono .flatMap(mltQuery -> Mono
.fromCallable(() -> { .fromCallable(() -> {
One<Long> totalHitsCountSink = Sinks.one(); Query luceneQuery;
Many<LLKeyScore> topKeysSink = Sinks
.many()
.unicast()
.onBackpressureBuffer(new ArrayBlockingQueue<>(1000));
Empty<Void> completeSink = Sinks.empty();
Schedulers.boundedElastic().schedule(() -> {
Query query;
if (luceneAdditionalQuery != null) { if (luceneAdditionalQuery != null) {
query = new BooleanQuery.Builder() luceneQuery = new BooleanQuery.Builder()
.add(mltQuery, Occur.MUST) .add(mltQuery, Occur.MUST)
.add(luceneAdditionalQuery, Occur.MUST) .add(new ConstantScoreQuery(luceneAdditionalQuery), Occur.MUST)
.build(); .build();
} else { } else {
query = mltQuery; luceneQuery = mltQuery;
} }
try {
if (doDistributedPre) {
allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, query);
totalHitsCountSink.tryEmitValue(0L);
} else {
if (limit > Integer.MAX_VALUE) {
throw new NumberIsTooLargeException(limit, Integer.MAX_VALUE, true);
}
streamSearcher.search(indexSearcher,
query,
(int) limit,
null,
ScoreMode.TOP_SCORES,
minCompetitiveScore,
keyFieldName,
keyScore -> {
EmitResult result = topKeysSink.tryEmitNext(fixKeyScore(keyScore, scoreDivisor));
if (result.isFailure()) {
throw new EmissionException(result);
}
},
totalHitsCount -> {
EmitResult result = totalHitsCountSink.tryEmitValue(totalHitsCount);
if (result.isFailure()) {
throw new EmissionException(result);
}
});
}
topKeysSink.tryEmitComplete();
completeSink.tryEmitEmpty();
} catch (IOException e) {
topKeysSink.tryEmitError(e);
totalHitsCountSink.tryEmitError(e);
completeSink.tryEmitError(e);
}
});
return new LLSearchResult(totalHitsCountSink.asMono(), Flux.just(topKeysSink.asFlux())); return luceneSearch(doDistributedPre,
indexSearcher,
limit,
minCompetitiveScore,
keyFieldName,
scoreDivisor,
luceneQuery,
new Sort(SortField.FIELD_SCORE),
ScoreMode.TOP_SCORES
);
}).subscribeOn(luceneBlockingScheduler) }).subscribeOn(luceneBlockingScheduler)
).then() )
.materialize() .materialize()
.flatMap(signal -> { .flatMap(signal -> {
if (signal.isOnComplete() || signal.isOnError()) { if (signal.isOnComplete() || signal.isOnError()) {
@ -476,7 +441,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
return Mono.just(signal); return Mono.just(signal);
} }
}) })
.dematerialize() .<LLSearchResult>dematerialize()
); );
}); });
} }
@ -527,71 +492,15 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
Sort luceneSort = tuple.getT2().orElse(null); Sort luceneSort = tuple.getT2().orElse(null);
ScoreMode luceneScoreMode = tuple.getT3(); ScoreMode luceneScoreMode = tuple.getT3();
One<Long> totalHitsCountSink = Sinks.one(); return luceneSearch(doDistributedPre,
Many<LLKeyScore> topKeysSink = Sinks indexSearcher,
.many() limit,
.unicast() minCompetitiveScore,
.onBackpressureBuffer(); keyFieldName,
scoreDivisor,
var searchFlux = Flux.<Void>push(sink -> { luceneQuery,
try { luceneSort,
if (doDistributedPre) { luceneScoreMode
allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery);
totalHitsCountSink.tryEmitValue(0L);
} else {
int boundedLimit = Math.max(0, limit > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) limit);
streamSearcher.search(indexSearcher,
luceneQuery,
boundedLimit,
luceneSort,
luceneScoreMode,
minCompetitiveScore,
keyFieldName,
keyScore -> {
EmitResult result = topKeysSink.tryEmitNext(fixKeyScore(keyScore, scoreDivisor));
if (result.isFailure()) {
if (result == EmitResult.FAIL_CANCELLED) {
logger.debug("Fail to emit next value: cancelled");
} else if (result == EmitResult.FAIL_TERMINATED) {
logger.debug("Fail to emit next value: terminated");
} else if (result == EmitResult.FAIL_ZERO_SUBSCRIBER) {
logger.error("Fail to emit next value: zero subscriber. You must subscribe to results before total hits if you specified a limit > 0!");
sink.error(new EmissionException(result));
throw new EmissionException(result);
} else {
throw new EmissionException(result);
}
}
},
totalHitsCount -> {
EmitResult result = totalHitsCountSink.tryEmitValue(totalHitsCount);
if (result.isFailure()) {
if (result == EmitResult.FAIL_CANCELLED) {
logger.debug("Fail to emit total hits count: cancelled");
} else if (result == EmitResult.FAIL_TERMINATED) {
logger.debug("Fail to emit total hits count: terminated");
} else if (result == EmitResult.FAIL_ZERO_SUBSCRIBER) {
logger.debug("Fail to emit total hits count: zero subscriber");
} else {
sink.error(new EmissionException(result));
throw new EmissionException(result);
}
}
}
);
}
topKeysSink.tryEmitComplete();
sink.complete();
} catch (IOException e) {
topKeysSink.tryEmitError(e);
totalHitsCountSink.tryEmitError(e);
sink.error(e);
}
}).share();
return new LLSearchResult(
Mono.<Long>firstWithValue(searchFlux.then(Mono.empty()), totalHitsCountSink.asMono()),
Flux.<Flux<LLKeyScore>>merge(searchFlux.then(Mono.empty()), Flux.just(topKeysSink.asFlux()))
); );
}).subscribeOn(luceneBlockingScheduler) }).subscribeOn(luceneBlockingScheduler)
) )
@ -607,6 +516,84 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
); );
} }
private LLSearchResult luceneSearch(boolean doDistributedPre,
IndexSearcher indexSearcher,
long limit,
@Nullable Float minCompetitiveScore,
String keyFieldName,
int scoreDivisor,
Query luceneQuery,
Sort luceneSort,
ScoreMode luceneScoreMode) {
One<Long> totalHitsCountSink = Sinks.one();
Many<LLKeyScore> topKeysSink = Sinks
.many()
.unicast()
.onBackpressureBuffer();
var searchFlux = Flux.<Void>push(sink -> {
try {
if (doDistributedPre) {
allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery);
totalHitsCountSink.tryEmitValue(0L);
} else {
int boundedLimit = Math.max(0, limit > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) limit);
streamSearcher.search(indexSearcher,
luceneQuery,
boundedLimit,
luceneSort,
luceneScoreMode,
minCompetitiveScore,
keyFieldName,
keyScore -> {
EmitResult result = topKeysSink.tryEmitNext(fixKeyScore(keyScore, scoreDivisor));
if (result.isFailure()) {
if (result == EmitResult.FAIL_CANCELLED) {
logger.debug("Fail to emit next value: cancelled");
} else if (result == EmitResult.FAIL_TERMINATED) {
logger.debug("Fail to emit next value: terminated");
} else if (result == EmitResult.FAIL_ZERO_SUBSCRIBER) {
logger.error("Fail to emit next value: zero subscriber. You must subscribe to results before total hits if you specified a limit > 0!");
sink.error(new EmissionException(result));
throw new EmissionException(result);
} else {
throw new EmissionException(result);
}
}
},
totalHitsCount -> {
EmitResult result = totalHitsCountSink.tryEmitValue(totalHitsCount);
if (result.isFailure()) {
if (result == EmitResult.FAIL_CANCELLED) {
logger.debug("Fail to emit total hits count: cancelled");
} else if (result == EmitResult.FAIL_TERMINATED) {
logger.debug("Fail to emit total hits count: terminated");
} else if (result == EmitResult.FAIL_ZERO_SUBSCRIBER) {
logger.debug("Fail to emit total hits count: zero subscriber");
} else {
sink.error(new EmissionException(result));
throw new EmissionException(result);
}
}
}
);
}
topKeysSink.tryEmitComplete();
sink.complete();
} catch (IOException e) {
topKeysSink.tryEmitError(e);
totalHitsCountSink.tryEmitError(e);
sink.error(e);
}
}).share();
return new LLSearchResult(
Mono.<Long>firstWithValue(searchFlux.then(Mono.empty()), totalHitsCountSink.asMono()),
Flux.<Flux<LLKeyScore>>merge(searchFlux.then(Mono.empty()), Flux.just(topKeysSink.asFlux()))
);
}
@Override @Override
public Mono<Void> close() { public Mono<Void> close() {
return Mono return Mono

View File

@ -0,0 +1,22 @@
package it.cavallium.dbengine.lucene.serializer;
public class ConstantScoreQuery implements Query {
private final Query query;
public ConstantScoreQuery(Query query) {
this.query = query;
}
@Override
public void stringify(StringBuilder output) {
StringBuilder data = new StringBuilder();
query.stringify(data);
StringifyUtils.writeHeader(output, QueryConstructorType.CONSTANT_SCORE_QUERY, data);
}
@Override
public String toString() {
return "(" + query + " *1)";
}
}

View File

@ -4,6 +4,7 @@ public enum QueryConstructorType {
BOOLEAN_QUERY, BOOLEAN_QUERY,
BOOLEAN_QUERY_INFO, BOOLEAN_QUERY_INFO,
BOOST_QUERY, BOOST_QUERY,
CONSTANT_SCORE_QUERY,
SORTED_SLOW_RANGE_QUERY, SORTED_SLOW_RANGE_QUERY,
INT_POINT_EXACT_QUERY, INT_POINT_EXACT_QUERY,
LONG_POINT_EXACT_QUERY, LONG_POINT_EXACT_QUERY,

View File

@ -12,6 +12,7 @@ import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause; import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.BoostQuery; import org.apache.lucene.search.BoostQuery;
import org.apache.lucene.search.ConstantScoreQuery;
import org.apache.lucene.search.DocValuesFieldExistsQuery; import org.apache.lucene.search.DocValuesFieldExistsQuery;
import org.apache.lucene.search.FuzzyQuery; import org.apache.lucene.search.FuzzyQuery;
import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.MatchAllDocsQuery;
@ -79,6 +80,10 @@ public class QueryParser {
assert query != null; assert query != null;
assert numb != null; assert numb != null;
return new BoostQuery(query, numb); return new BoostQuery(query, numb);
case CONSTANT_SCORE_QUERY:
Query queryC = (Query) parse(completeText, position);
assert queryC != null;
return new ConstantScoreQuery(queryC);
case FUZZY_QUERY: case FUZZY_QUERY:
Term fqTerm = (Term) parse(completeText, position); Term fqTerm = (Term) parse(completeText, position);
Integer numb1 = (Integer) parse(completeText, position); Integer numb1 = (Integer) parse(completeText, position);