Fix returning a single result

This commit is contained in:
Andrea Cavalli 2021-04-14 02:37:03 +02:00
parent 17594f66bd
commit 505de18ecb
4 changed files with 133 additions and 111 deletions

View File

@ -429,59 +429,55 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
return Mono.just(new LLSearchResult(Flux.empty()));
}
return acquireSearcherWrapper(snapshot, doDistributedPre, actionId).flatMap(indexSearcher -> Mono
.fromCallable(() -> {
var mlt = new MoreLikeThis(indexSearcher.getIndexReader());
mlt.setAnalyzer(indexWriter.getAnalyzer());
mlt.setFieldNames(mltDocumentFields.keySet().toArray(String[]::new));
mlt.setMinTermFreq(1);
mlt.setMinDocFreq(3);
mlt.setMaxDocFreqPct(20);
mlt.setBoost(QueryParser.isScoringEnabled(queryParams));
mlt.setStopWords(EnglishItalianStopFilter.getStopWordsString());
var similarity = getSimilarity();
if (similarity instanceof TFIDFSimilarity) {
mlt.setSimilarity((TFIDFSimilarity) similarity);
} else {
logger.trace("Using an unsupported similarity algorithm for MoreLikeThis:"
+ " {}. You must use a similarity instance based on TFIDFSimilarity!", similarity);
}
return acquireSearcherWrapper(snapshot, doDistributedPre, actionId)
.flatMap(indexSearcher -> Mono
.fromCallable(() -> {
var mlt = new MoreLikeThis(indexSearcher.getIndexReader());
mlt.setAnalyzer(indexWriter.getAnalyzer());
mlt.setFieldNames(mltDocumentFields.keySet().toArray(String[]::new));
mlt.setMinTermFreq(1);
mlt.setMinDocFreq(3);
mlt.setMaxDocFreqPct(20);
mlt.setBoost(QueryParser.isScoringEnabled(queryParams));
mlt.setStopWords(EnglishItalianStopFilter.getStopWordsString());
var similarity = getSimilarity();
if (similarity instanceof TFIDFSimilarity) {
mlt.setSimilarity((TFIDFSimilarity) similarity);
} else {
logger.trace("Using an unsupported similarity algorithm for MoreLikeThis:"
+ " {}. You must use a similarity instance based on TFIDFSimilarity!", similarity);
}
// Get the reference doc and apply it to MoreLikeThis, to generate the query
//noinspection BlockingMethodInNonBlockingContext
var mltQuery = mlt.like((Map) mltDocumentFields);
Query luceneQuery;
if (luceneAdditionalQuery != null) {
luceneQuery = new BooleanQuery.Builder()
.add(mltQuery, Occur.MUST)
.add(new ConstantScoreQuery(luceneAdditionalQuery), Occur.MUST)
.build();
} else {
luceneQuery = mltQuery;
}
// Get the reference doc and apply it to MoreLikeThis, to generate the query
//noinspection BlockingMethodInNonBlockingContext
var mltQuery = mlt.like((Map) mltDocumentFields);
Query luceneQuery;
if (luceneAdditionalQuery != null) {
luceneQuery = new BooleanQuery.Builder()
.add(mltQuery, Occur.MUST)
.add(new ConstantScoreQuery(luceneAdditionalQuery), Occur.MUST)
.build();
} else {
luceneQuery = mltQuery;
}
return luceneQuery;
})
.subscribeOn(luceneQueryScheduler)
.map(luceneQuery -> luceneSearch(doDistributedPre,
indexSearcher,
queryParams.getOffset(),
queryParams.getLimit(),
queryParams.getMinCompetitiveScore().getNullable(),
keyFieldName,
scoreDivisor,
luceneQuery,
QueryParser.toSort(queryParams.getSort()),
QueryParser.toScoreMode(queryParams.getScoreMode())
))
.materialize()
.flatMap(signal -> {
if (signal.isOnComplete() || signal.isOnError()) {
return releaseSearcherWrapper(snapshot, indexSearcher).thenReturn(signal);
} else {
return Mono.just(signal);
}
}).dematerialize());
return luceneQuery;
})
.subscribeOn(luceneQueryScheduler)
.map(luceneQuery -> luceneSearch(doDistributedPre,
indexSearcher,
queryParams.getOffset(),
queryParams.getLimit(),
queryParams.getMinCompetitiveScore().getNullable(),
keyFieldName,
scoreDivisor,
luceneQuery,
QueryParser.toSort(queryParams.getSort()),
QueryParser.toScoreMode(queryParams.getScoreMode()),
releaseSearcherWrapper(snapshot, indexSearcher)
))
.onErrorResume(ex -> releaseSearcherWrapper(snapshot, indexSearcher).then(Mono.error(ex)))
);
});
}
@ -514,7 +510,8 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private Mono<LLSearchResult> search(@Nullable LLSnapshot snapshot,
QueryParams queryParams, String keyFieldName,
boolean doDistributedPre, long actionId, int scoreDivisor) {
return acquireSearcherWrapper(snapshot, doDistributedPre, actionId)
return this
.acquireSearcherWrapper(snapshot, doDistributedPre, actionId)
.flatMap(indexSearcher -> Mono
.fromCallable(() -> {
Objects.requireNonNull(queryParams.getScoreMode(), "ScoreMode must not be null");
@ -524,7 +521,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
return Tuples.of(luceneQuery, Optional.ofNullable(luceneSort), luceneScoreMode);
})
.subscribeOn(luceneQueryScheduler)
.flatMap(tuple -> Mono
.<LLSearchResult>flatMap(tuple -> Mono
.fromSupplier(() -> {
Query luceneQuery = tuple.getT1();
Sort luceneSort = tuple.getT2().orElse(null);
@ -539,22 +536,18 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
scoreDivisor,
luceneQuery,
luceneSort,
luceneScoreMode
luceneScoreMode,
releaseSearcherWrapper(snapshot, indexSearcher)
);
})
.onErrorResume(ex -> releaseSearcherWrapper(snapshot, indexSearcher).then(Mono.error(ex)))
)
.materialize()
.flatMap(signal -> {
if (signal.isOnComplete() || signal.isOnError()) {
return releaseSearcherWrapper(snapshot, indexSearcher).thenReturn(signal);
} else {
return Mono.just(signal);
}
})
.dematerialize()
);
}
/**
* This method always returns 1 shard! Not zero, not more than one.
*/
private LLSearchResult luceneSearch(boolean doDistributedPre,
IndexSearcher indexSearcher,
long offset,
@ -564,7 +557,8 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
int scoreDivisor,
Query luceneQuery,
Sort luceneSort,
ScoreMode luceneScoreMode) {
ScoreMode luceneScoreMode,
Mono<Void> successCleanup) {
return new LLSearchResult(Mono.<LLSearchResultShard>create(monoSink -> {
LuceneSearchInstance luceneSearchInstance;
@ -573,7 +567,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
if (doDistributedPre) {
//noinspection BlockingMethodInNonBlockingContext
allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery);
monoSink.success(new LLSearchResultShard(Flux.empty(), 0));
monoSink.success(new LLSearchResultShard(successCleanup.thenMany(Flux.empty()), 0));
return;
} else {
int boundedOffset = Math.max(0, offset > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) offset);
@ -642,7 +636,12 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
}, OverflowStrategy.ERROR).subscribeOn(blockingLuceneSearchScheduler).publishOn(luceneQueryScheduler);
monoSink.success(new LLSearchResultShard(resultsFlux, totalHitsCount));
monoSink.success(new LLSearchResultShard(Flux
.usingWhen(
Mono.just(true),
b -> resultsFlux,
b -> successCleanup),
totalHitsCount));
}).subscribeOn(blockingLuceneSearchScheduler).publishOn(luceneQueryScheduler).flux());
}

View File

@ -25,6 +25,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Value;
import org.apache.lucene.search.CollectionStatistics;
import org.apache.lucene.search.IndexSearcher;
import org.jetbrains.annotations.Nullable;
@ -215,13 +216,10 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
if (queryParams.getOffset() != 0) {
return Mono.error(new IllegalArgumentException("MultiLuceneIndex requires an offset equal to 0"));
}
long actionId;
int scoreDivisor;
Flux<Tuple2<String, Set<String>>> mltDocumentFieldsShared;
Mono<Void> distributedPre;
Mono<DistributedSearch> distributedPre;
if (luceneIndices.length > 1) {
actionId = newAction();
scoreDivisor = 20;
long actionId = newAction();
mltDocumentFieldsShared = mltDocumentFields.publish().refCount();
distributedPre = Flux
.fromArray(luceneIndices)
@ -239,16 +237,15 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
actionId
)
)
.then();
.then(Mono.just(new DistributedSearch(actionId, 20)));
} else {
actionId = -1;
scoreDivisor = 1;
mltDocumentFieldsShared = mltDocumentFields;
distributedPre = Mono.empty();
distributedPre = Mono.just(new DistributedSearch(-1, 1));
}
//noinspection DuplicatedCode
return distributedPre.then(Flux
return distributedPre
.flatMap(distributedSearch -> Flux
.fromArray(luceneIndices)
.index()
.flatMap(tuple -> Mono
@ -260,45 +257,55 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
queryParams,
keyFieldName,
mltDocumentFieldsShared,
actionId,
scoreDivisor
distributedSearch.getActionId(),
distributedSearch.getScoreDivisor()
)
)
.reduce(LLSearchResult.accumulator())
.map(result -> {
if (actionId != -1) {
if (distributedSearch.getActionId() != -1) {
Flux<LLSearchResultShard> resultsWithTermination = result
.getResults()
.map(flux -> new LLSearchResultShard(flux
.getResults()
.doOnTerminate(() -> completedAction(actionId)), flux.getTotalHitsCount())
.map(flux -> new LLSearchResultShard(Flux
.using(
distributedSearch::getActionId,
actionId -> flux.getResults(),
this::completedAction
), flux.getTotalHitsCount())
);
return new LLSearchResult(resultsWithTermination);
} else {
return result;
}
})
.doOnError(ex -> {
if (distributedSearch.getActionId() != -1) {
completedAction(distributedSearch.getActionId());
}
})
);
}
@Value
private static class DistributedSearch {
long actionId;
int scoreDivisor;
}
@Override
public Mono<LLSearchResult> search(@Nullable LLSnapshot snapshot,
QueryParams queryParams,
String keyFieldName) {
long actionId;
int scoreDivisor;
Mono<Void> distributedPre;
if (queryParams.getOffset() != 0) {
return Mono.error(new IllegalArgumentException("MultiLuceneIndex requires an offset equal to 0"));
}
Mono<DistributedSearch> distributedSearchMono;
if (luceneIndices.length <= 1 || !queryParams.getScoreMode().getComputeScores()) {
actionId = -1;
scoreDivisor = 1;
distributedPre = Mono.empty();
distributedSearchMono = Mono.just(new DistributedSearch(-1, 1));
} else {
actionId = newAction();
scoreDivisor = 20;
distributedPre = Flux
var actionId = newAction();
distributedSearchMono = Flux
.fromArray(luceneIndices)
.index()
.flatMap(tuple -> Mono
@ -313,11 +320,11 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
actionId
)
)
.then();
.then(Mono.just(new DistributedSearch(actionId, 20)));
}
//noinspection DuplicatedCode
return distributedPre
.then(Flux
return distributedSearchMono
.flatMap(distributedSearch -> Flux
.fromArray(luceneIndices)
.index()
.flatMap(tuple -> Mono
@ -329,23 +336,31 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
.distributedSearch(tuple.getT2().orElse(null),
queryParams,
keyFieldName,
actionId,
scoreDivisor
distributedSearch.getActionId(),
distributedSearch.getScoreDivisor()
))
.reduce(LLSearchResult.accumulator())
.map(result -> {
if (actionId != -1) {
if (distributedSearch.getActionId() != -1) {
Flux<LLSearchResultShard> resultsWithTermination = result
.getResults()
.map(flux -> new LLSearchResultShard(flux
.getResults()
.doOnTerminate(() -> completedAction(actionId)), flux.getTotalHitsCount())
.map(flux -> new LLSearchResultShard(Flux
.using(
distributedSearch::getActionId,
actionId -> flux.getResults(),
this::completedAction
), flux.getTotalHitsCount())
);
return new LLSearchResult(resultsWithTermination);
} else {
return result;
}
})
.doOnError(ex -> {
if (distributedSearch.getActionId() != -1) {
completedAction(distributedSearch.getActionId());
}
})
);
}

View File

@ -32,8 +32,8 @@ public class SimpleStreamSearcher implements LuceneStreamSearcher {
scoreMode != ScoreMode.COMPLETE_NO_SCORES,
1000
);
var firstTopDocs = searcher.getTopDocs(0, 1);
long totalHitsCount = firstTopDocs.totalHits.value;
var topDocs = searcher.getTopDocs();
long totalHitsCount = topDocs.totalHits.value;
return new LuceneSearchInstance() {
@Override
public long getTotalHitsCount() {
@ -42,14 +42,8 @@ public class SimpleStreamSearcher implements LuceneStreamSearcher {
@Override
public void getResults(ResultItemConsumer resultsConsumer) throws IOException {
if (firstTopDocs.scoreDocs.length > 0) {
{
var hit = firstTopDocs.scoreDocs[0];
if (publishHit(hit, resultsConsumer) == HandleResult.HALT) {
return;
}
}
ObjectArrayList<ScoreDoc> hits = ObjectArrayList.wrap(searcher.getTopDocs(offset, limit - 1).scoreDocs);
if (topDocs.scoreDocs.length > 0) {
ObjectArrayList<ScoreDoc> hits = ObjectArrayList.wrap(topDocs.scoreDocs);
for (ScoreDoc hit : hits) {
if (publishHit(hit, resultsConsumer) == HandleResult.HALT) {
return;

View File

@ -35,6 +35,20 @@ class TopDocsSearcher {
indexSearcher.search(query, collector);
}
/**
* This method must not be called more than once!
*/
public TopDocs getTopDocs() throws IOException {
TopDocs topDocs = collector.topDocs();
if (doDocScores) {
TopFieldCollector.populateScores(topDocs.scoreDocs, indexSearcher, query);
}
return topDocs;
}
/**
* This method must not be called more than once!
*/
public TopDocs getTopDocs(int offset, int length) throws IOException {
TopDocs topDocs = collector.topDocs(offset, length);
if (doDocScores) {