diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index abeb53d..4f5e7aa 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -29,11 +29,9 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import org.apache.commons.math3.exception.NumberIsTooLargeException; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; @@ -42,11 +40,13 @@ import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.queries.mlt.MoreLikeThis; import org.apache.lucene.search.BooleanClause.Occur; import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.ConstantScoreQuery; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.SearcherManager; import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; import org.apache.lucene.search.similarities.Similarity; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; @@ -60,7 +60,6 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.EmissionException; import reactor.core.publisher.Sinks.EmitResult; -import reactor.core.publisher.Sinks.Empty; import reactor.core.publisher.Sinks.Many; import reactor.core.publisher.Sinks.One; import reactor.core.scheduler.Scheduler; @@ -390,6 +389,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { return mltDocumentFieldsFlux .collectMap(Tuple2::getT1, Tuple2::getT2, HashMap::new) .flatMap(mltDocumentFields -> { + mltDocumentFields.entrySet().removeIf(entry -> entry.getValue().isEmpty()); if (mltDocumentFields.isEmpty()) { return Mono.just(LLSearchResult.empty()); } @@ -411,63 +411,28 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { .subscribeOn(luceneBlockingScheduler) .flatMap(mltQuery -> Mono .fromCallable(() -> { - One totalHitsCountSink = Sinks.one(); - Many topKeysSink = Sinks - .many() - .unicast() - .onBackpressureBuffer(new ArrayBlockingQueue<>(1000)); - Empty completeSink = Sinks.empty(); - - Schedulers.boundedElastic().schedule(() -> { - Query query; + Query luceneQuery; if (luceneAdditionalQuery != null) { - query = new BooleanQuery.Builder() + luceneQuery = new BooleanQuery.Builder() .add(mltQuery, Occur.MUST) - .add(luceneAdditionalQuery, Occur.MUST) + .add(new ConstantScoreQuery(luceneAdditionalQuery), Occur.MUST) .build(); } else { - query = mltQuery; + luceneQuery = mltQuery; } - try { - if (doDistributedPre) { - allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, query); - totalHitsCountSink.tryEmitValue(0L); - } else { - if (limit > Integer.MAX_VALUE) { - throw new NumberIsTooLargeException(limit, Integer.MAX_VALUE, true); - } - streamSearcher.search(indexSearcher, - query, - (int) limit, - null, - ScoreMode.TOP_SCORES, - minCompetitiveScore, - keyFieldName, - keyScore -> { - EmitResult result = topKeysSink.tryEmitNext(fixKeyScore(keyScore, scoreDivisor)); - if (result.isFailure()) { - throw new EmissionException(result); - } - }, - totalHitsCount -> { - EmitResult result = totalHitsCountSink.tryEmitValue(totalHitsCount); - if (result.isFailure()) { - throw new EmissionException(result); - } - }); - } - topKeysSink.tryEmitComplete(); - completeSink.tryEmitEmpty(); - } catch (IOException e) { - topKeysSink.tryEmitError(e); - totalHitsCountSink.tryEmitError(e); - completeSink.tryEmitError(e); - } - }); - return new LLSearchResult(totalHitsCountSink.asMono(), Flux.just(topKeysSink.asFlux())); + return luceneSearch(doDistributedPre, + indexSearcher, + limit, + minCompetitiveScore, + keyFieldName, + scoreDivisor, + luceneQuery, + new Sort(SortField.FIELD_SCORE), + ScoreMode.TOP_SCORES + ); }).subscribeOn(luceneBlockingScheduler) - ).then() + ) .materialize() .flatMap(signal -> { if (signal.isOnComplete() || signal.isOnError()) { @@ -476,7 +441,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { return Mono.just(signal); } }) - .dematerialize() + .dematerialize() ); }); } @@ -527,71 +492,15 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { Sort luceneSort = tuple.getT2().orElse(null); ScoreMode luceneScoreMode = tuple.getT3(); - One totalHitsCountSink = Sinks.one(); - Many topKeysSink = Sinks - .many() - .unicast() - .onBackpressureBuffer(); - - var searchFlux = Flux.push(sink -> { - try { - if (doDistributedPre) { - allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery); - totalHitsCountSink.tryEmitValue(0L); - } else { - int boundedLimit = Math.max(0, limit > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) limit); - streamSearcher.search(indexSearcher, - luceneQuery, - boundedLimit, - luceneSort, - luceneScoreMode, - minCompetitiveScore, - keyFieldName, - keyScore -> { - EmitResult result = topKeysSink.tryEmitNext(fixKeyScore(keyScore, scoreDivisor)); - if (result.isFailure()) { - if (result == EmitResult.FAIL_CANCELLED) { - logger.debug("Fail to emit next value: cancelled"); - } else if (result == EmitResult.FAIL_TERMINATED) { - logger.debug("Fail to emit next value: terminated"); - } else if (result == EmitResult.FAIL_ZERO_SUBSCRIBER) { - logger.error("Fail to emit next value: zero subscriber. You must subscribe to results before total hits if you specified a limit > 0!"); - sink.error(new EmissionException(result)); - throw new EmissionException(result); - } else { - throw new EmissionException(result); - } - } - }, - totalHitsCount -> { - EmitResult result = totalHitsCountSink.tryEmitValue(totalHitsCount); - if (result.isFailure()) { - if (result == EmitResult.FAIL_CANCELLED) { - logger.debug("Fail to emit total hits count: cancelled"); - } else if (result == EmitResult.FAIL_TERMINATED) { - logger.debug("Fail to emit total hits count: terminated"); - } else if (result == EmitResult.FAIL_ZERO_SUBSCRIBER) { - logger.debug("Fail to emit total hits count: zero subscriber"); - } else { - sink.error(new EmissionException(result)); - throw new EmissionException(result); - } - } - } - ); - } - topKeysSink.tryEmitComplete(); - sink.complete(); - } catch (IOException e) { - topKeysSink.tryEmitError(e); - totalHitsCountSink.tryEmitError(e); - sink.error(e); - } - }).share(); - - return new LLSearchResult( - Mono.firstWithValue(searchFlux.then(Mono.empty()), totalHitsCountSink.asMono()), - Flux.>merge(searchFlux.then(Mono.empty()), Flux.just(topKeysSink.asFlux())) + return luceneSearch(doDistributedPre, + indexSearcher, + limit, + minCompetitiveScore, + keyFieldName, + scoreDivisor, + luceneQuery, + luceneSort, + luceneScoreMode ); }).subscribeOn(luceneBlockingScheduler) ) @@ -607,6 +516,84 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { ); } + private LLSearchResult luceneSearch(boolean doDistributedPre, + IndexSearcher indexSearcher, + long limit, + @Nullable Float minCompetitiveScore, + String keyFieldName, + int scoreDivisor, + Query luceneQuery, + Sort luceneSort, + ScoreMode luceneScoreMode) { + + One totalHitsCountSink = Sinks.one(); + Many topKeysSink = Sinks + .many() + .unicast() + .onBackpressureBuffer(); + + var searchFlux = Flux.push(sink -> { + try { + if (doDistributedPre) { + allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery); + totalHitsCountSink.tryEmitValue(0L); + } else { + int boundedLimit = Math.max(0, limit > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) limit); + streamSearcher.search(indexSearcher, + luceneQuery, + boundedLimit, + luceneSort, + luceneScoreMode, + minCompetitiveScore, + keyFieldName, + keyScore -> { + EmitResult result = topKeysSink.tryEmitNext(fixKeyScore(keyScore, scoreDivisor)); + if (result.isFailure()) { + if (result == EmitResult.FAIL_CANCELLED) { + logger.debug("Fail to emit next value: cancelled"); + } else if (result == EmitResult.FAIL_TERMINATED) { + logger.debug("Fail to emit next value: terminated"); + } else if (result == EmitResult.FAIL_ZERO_SUBSCRIBER) { + logger.error("Fail to emit next value: zero subscriber. You must subscribe to results before total hits if you specified a limit > 0!"); + sink.error(new EmissionException(result)); + throw new EmissionException(result); + } else { + throw new EmissionException(result); + } + } + }, + totalHitsCount -> { + EmitResult result = totalHitsCountSink.tryEmitValue(totalHitsCount); + if (result.isFailure()) { + if (result == EmitResult.FAIL_CANCELLED) { + logger.debug("Fail to emit total hits count: cancelled"); + } else if (result == EmitResult.FAIL_TERMINATED) { + logger.debug("Fail to emit total hits count: terminated"); + } else if (result == EmitResult.FAIL_ZERO_SUBSCRIBER) { + logger.debug("Fail to emit total hits count: zero subscriber"); + } else { + sink.error(new EmissionException(result)); + throw new EmissionException(result); + } + } + } + ); + } + topKeysSink.tryEmitComplete(); + sink.complete(); + } catch (IOException e) { + topKeysSink.tryEmitError(e); + totalHitsCountSink.tryEmitError(e); + sink.error(e); + } + }).share(); + + return new LLSearchResult( + Mono.firstWithValue(searchFlux.then(Mono.empty()), totalHitsCountSink.asMono()), + Flux.>merge(searchFlux.then(Mono.empty()), Flux.just(topKeysSink.asFlux())) + ); + } + @Override public Mono close() { return Mono diff --git a/src/main/java/it/cavallium/dbengine/lucene/serializer/ConstantScoreQuery.java b/src/main/java/it/cavallium/dbengine/lucene/serializer/ConstantScoreQuery.java new file mode 100644 index 0000000..c1f07ac --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/serializer/ConstantScoreQuery.java @@ -0,0 +1,22 @@ +package it.cavallium.dbengine.lucene.serializer; + +public class ConstantScoreQuery implements Query { + + private final Query query; + + public ConstantScoreQuery(Query query) { + this.query = query; + } + + @Override + public void stringify(StringBuilder output) { + StringBuilder data = new StringBuilder(); + query.stringify(data); + StringifyUtils.writeHeader(output, QueryConstructorType.CONSTANT_SCORE_QUERY, data); + } + + @Override + public String toString() { + return "(" + query + " *1)"; + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/serializer/QueryConstructorType.java b/src/main/java/it/cavallium/dbengine/lucene/serializer/QueryConstructorType.java index 3e3c7c8..21d02b3 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/serializer/QueryConstructorType.java +++ b/src/main/java/it/cavallium/dbengine/lucene/serializer/QueryConstructorType.java @@ -4,6 +4,7 @@ public enum QueryConstructorType { BOOLEAN_QUERY, BOOLEAN_QUERY_INFO, BOOST_QUERY, + CONSTANT_SCORE_QUERY, SORTED_SLOW_RANGE_QUERY, INT_POINT_EXACT_QUERY, LONG_POINT_EXACT_QUERY, diff --git a/src/main/java/it/cavallium/dbengine/lucene/serializer/QueryParser.java b/src/main/java/it/cavallium/dbengine/lucene/serializer/QueryParser.java index a9838b9..9928c57 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/serializer/QueryParser.java +++ b/src/main/java/it/cavallium/dbengine/lucene/serializer/QueryParser.java @@ -12,6 +12,7 @@ import org.apache.lucene.index.Term; import org.apache.lucene.search.BooleanClause; import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.BoostQuery; +import org.apache.lucene.search.ConstantScoreQuery; import org.apache.lucene.search.DocValuesFieldExistsQuery; import org.apache.lucene.search.FuzzyQuery; import org.apache.lucene.search.MatchAllDocsQuery; @@ -79,6 +80,10 @@ public class QueryParser { assert query != null; assert numb != null; return new BoostQuery(query, numb); + case CONSTANT_SCORE_QUERY: + Query queryC = (Query) parse(completeText, position); + assert queryC != null; + return new ConstantScoreQuery(queryC); case FUZZY_QUERY: Term fqTerm = (Term) parse(completeText, position); Integer numb1 = (Integer) parse(completeText, position);