Fix results ordering

This commit is contained in:
Andrea Cavalli 2021-07-30 14:01:12 +02:00
parent db54773cda
commit 4e782403f5
5 changed files with 50 additions and 29 deletions

View File

@ -30,7 +30,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.LowerCaseFilter;
@ -349,32 +351,49 @@ public class LuceneUtils {
public static Flux<LLKeyScore> convertHits(ScoreDoc[] hits,
IndexSearchers indexSearchers,
String keyFieldName,
Scheduler scheduler) {
Scheduler scheduler,
boolean preserveOrder) {
return Flux
.fromArray(hits)
.parallel()
.runOn(scheduler)
.map(hit -> {
int shardDocId = hit.doc;
int shardIndex = hit.shardIndex;
float score = hit.score;
var indexSearcher = indexSearchers.shard(shardIndex);
try {
//noinspection BlockingMethodInNonBlockingContext
String collectedDoc = keyOfTopDoc(shardDocId, indexSearcher.getIndexReader(), keyFieldName);
return new LLKeyScore(shardDocId, score, Mono.just(collectedDoc));
} catch (NoSuchElementException ex) {
logger.debug("Error: document " + shardDocId + " key is not present!");
// Errored key score, to filter out next
return new LLKeyScore(-1, -1, Mono.empty());
} catch (Exception ex) {
return new LLKeyScore(shardDocId, score, Mono.error(ex));
.transform(hitsFlux -> {
if (preserveOrder) {
return hitsFlux
.publishOn(scheduler)
.mapNotNull(hit -> mapHitBlocking(hit, indexSearchers, keyFieldName));
} else {
return hitsFlux
.parallel()
.runOn(scheduler)
.map(hit -> {
var result = mapHitBlocking(hit, indexSearchers, keyFieldName);
// The "else" value is an errored key score, to filter out next
return Objects.requireNonNullElseGet(result, () -> new LLKeyScore(-1, -1, Mono.empty()));
})
.sequential()
// Filter out the errored key scores
.filter(ks -> !(ks.docId() == -1 && ks.score() == -1));
}
})
// Filter out the errored key scores
.filter(ks -> !(ks.docId() == -1 && ks.score() == -1))
.sequential();
});
}
@Nullable
private static LLKeyScore mapHitBlocking(ScoreDoc hit,
IndexSearchers indexSearchers,
String keyFieldName) {
int shardDocId = hit.doc;
int shardIndex = hit.shardIndex;
float score = hit.score;
var indexSearcher = indexSearchers.shard(shardIndex);
try {
String collectedDoc = keyOfTopDoc(shardDocId, indexSearcher.getIndexReader(), keyFieldName);
return new LLKeyScore(shardDocId, score, Mono.just(collectedDoc));
} catch (NoSuchElementException ex) {
logger.debug("Error: document " + shardDocId + " key is not present!");
return null;
} catch (Exception ex) {
return new LLKeyScore(shardDocId, score, Mono.error(ex));
}
}
/**

View File

@ -81,7 +81,7 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher {
indexSearchers = IndexSearchers.of(indexSearchersArray);
}
Flux<LLKeyScore> firstPageHits = LuceneUtils
.convertHits(result.scoreDocs, indexSearchers, keyFieldName, scheduler);
.convertHits(result.scoreDocs, indexSearchers, keyFieldName, scheduler, true);
Flux<LLKeyScore> nextHits = Flux.defer(() -> {
if (paginationInfo.forceSinglePage()
@ -136,7 +136,7 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher {
)
.subscribeOn(scheduler)
.flatMapSequential(topFieldDoc -> LuceneUtils
.convertHits(topFieldDoc.scoreDocs, indexSearchers, keyFieldName, scheduler)
.convertHits(topFieldDoc.scoreDocs, indexSearchers, keyFieldName, scheduler, true)
);
});

View File

@ -53,7 +53,8 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
firstPageTopDocs.scoreDocs,
IndexSearchers.unsharded(indexSearcher),
keyFieldName,
scheduler
scheduler,
true
)
.take(queryParams.limit(), true);
@ -96,7 +97,7 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
)
.subscribeOn(scheduler)
.flatMapSequential(topFieldDoc -> LuceneUtils
.convertHits(topFieldDoc.scoreDocs, IndexSearchers.unsharded(indexSearcher), keyFieldName, scheduler)
.convertHits(topFieldDoc.scoreDocs, IndexSearchers.unsharded(indexSearcher), keyFieldName, scheduler, true)
);
});
}

View File

@ -73,7 +73,7 @@ class UnscoredPagedLuceneShardSearcher implements LuceneShardSearcher {
indexSearchers = IndexSearchers.of(indexSearchersArray);
}
Flux<LLKeyScore> firstPageHits = LuceneUtils
.convertHits(result.scoreDocs, indexSearchers, keyFieldName, scheduler);
.convertHits(result.scoreDocs, indexSearchers, keyFieldName, scheduler, false);
Flux<LLKeyScore> nextHits = Flux.defer(() -> {
if (paginationInfo.forceSinglePage() || paginationInfo.totalLimit() - paginationInfo.firstPageLimit() <= 0) {
@ -124,7 +124,7 @@ class UnscoredPagedLuceneShardSearcher implements LuceneShardSearcher {
)
.subscribeOn(scheduler)
.flatMapSequential(topFieldDoc -> LuceneUtils
.convertHits(topFieldDoc.scoreDocs, indexSearchers, keyFieldName, scheduler)
.convertHits(topFieldDoc.scoreDocs, indexSearchers, keyFieldName, scheduler, false)
);
});

View File

@ -148,7 +148,8 @@ public class UnscoredUnsortedContinuousLuceneMultiSearcher implements LuceneMult
.flatMap(scoreDocs -> LuceneUtils.convertHits(scoreDocs.toArray(ScoreDoc[]::new),
indexSearchers,
keyFieldName,
scheduler
scheduler,
false
));
return new LuceneSearchResult(1, resultsFlux, release);