Code cleanup
This commit is contained in:
parent
4f52b3d542
commit
6752fc8df4
@ -2,8 +2,6 @@ package it.cavallium.dbengine.lucene.searcher;
|
||||
|
||||
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
|
||||
|
||||
import it.cavallium.dbengine.lucene.MaxScoreAccumulator;
|
||||
import it.cavallium.dbengine.lucene.MaxScoreAccumulator.DocAndScore;
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.util.Iterator;
|
||||
@ -11,11 +9,9 @@ import java.util.List;
|
||||
import java.util.function.Supplier;
|
||||
import org.apache.lucene.index.LeafReader;
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.search.CustomHitsThresholdChecker;
|
||||
import org.apache.lucene.search.DocIdSetIterator;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.Scorable;
|
||||
import org.apache.lucene.search.ScoreDoc;
|
||||
import org.apache.lucene.search.ScoreMode;
|
||||
import org.apache.lucene.search.Scorer;
|
||||
@ -23,55 +19,43 @@ import org.apache.lucene.search.Weight;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.scheduler.Scheduler;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
|
||||
public class LuceneGenerator implements Supplier<ScoreDoc> {
|
||||
|
||||
private static final Scheduler SCHED = uninterruptibleScheduler(Schedulers.boundedElastic());
|
||||
private final IndexSearcher shard;
|
||||
private final int shardIndex;
|
||||
private final Query query;
|
||||
private final Iterator<LeafReaderContext> leavesIterator;
|
||||
private final boolean computeScores;
|
||||
private final CustomHitsThresholdChecker hitsThresholdChecker;
|
||||
private final @Nullable Long limit;
|
||||
|
||||
private long remainingOffset;
|
||||
private long remainingAllowedResults;
|
||||
private Weight weight;
|
||||
private Float minCompetitiveScore;
|
||||
private long totalHits;
|
||||
|
||||
private LeafReaderContext leaf;
|
||||
private DocIdSetIterator docIdSetIterator;
|
||||
private Scorer scorer;
|
||||
|
||||
LuceneGenerator(IndexSearcher shard,
|
||||
LocalQueryParams localQueryParams,
|
||||
int shardIndex,
|
||||
CustomHitsThresholdChecker hitsThresholdChecker) {
|
||||
LuceneGenerator(IndexSearcher shard, LocalQueryParams localQueryParams, int shardIndex) {
|
||||
this.shard = shard;
|
||||
this.shardIndex = shardIndex;
|
||||
this.query = localQueryParams.query();
|
||||
this.remainingOffset = localQueryParams.offsetLong();
|
||||
this.limit = localQueryParams.limitLong() == Long.MAX_VALUE ? null : localQueryParams.limitLong();
|
||||
this.remainingAllowedResults = localQueryParams.limitLong();
|
||||
this.computeScores = localQueryParams.needsScores();
|
||||
List<LeafReaderContext> leaves = shard.getTopReaderContext().leaves();
|
||||
this.leavesIterator = leaves.iterator();
|
||||
this.hitsThresholdChecker = hitsThresholdChecker;
|
||||
}
|
||||
|
||||
public static Flux<ScoreDoc> reactive(IndexSearcher shard,
|
||||
LocalQueryParams localQueryParams,
|
||||
int shardIndex,
|
||||
CustomHitsThresholdChecker hitsThresholdChecker) {
|
||||
public static Flux<ScoreDoc> reactive(IndexSearcher shard, LocalQueryParams localQueryParams, int shardIndex) {
|
||||
if (localQueryParams.sort() != null) {
|
||||
return Flux.error(new IllegalArgumentException("Sorting is not allowed"));
|
||||
}
|
||||
return Flux
|
||||
.<ScoreDoc, LuceneGenerator>generate(() -> new LuceneGenerator(shard,
|
||||
localQueryParams,
|
||||
shardIndex,
|
||||
hitsThresholdChecker
|
||||
),
|
||||
.<ScoreDoc, LuceneGenerator>generate(() -> new LuceneGenerator(shard, localQueryParams, shardIndex),
|
||||
(s, sink) -> {
|
||||
ScoreDoc val = s.get();
|
||||
if (val == null) {
|
||||
@ -82,7 +66,7 @@ public class LuceneGenerator implements Supplier<ScoreDoc> {
|
||||
return s;
|
||||
}
|
||||
)
|
||||
.subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()), false);
|
||||
.subscribeOn(SCHED, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -90,8 +74,10 @@ public class LuceneGenerator implements Supplier<ScoreDoc> {
|
||||
while (remainingOffset > 0) {
|
||||
skipNext();
|
||||
}
|
||||
if ((limit != null && totalHits >= limit) || hitsThresholdChecker.isThresholdReached(true)) {
|
||||
if (remainingAllowedResults == 0) {
|
||||
return null;
|
||||
} else {
|
||||
remainingAllowedResults--;
|
||||
}
|
||||
return getNext();
|
||||
}
|
||||
@ -131,15 +117,6 @@ public class LuceneGenerator implements Supplier<ScoreDoc> {
|
||||
if (docDeleted(liveDocs, doc)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
float score = scorer.score();
|
||||
|
||||
// This collector relies on the fact that scorers produce positive values:
|
||||
assert score >= 0; // NOTE: false for NaN
|
||||
|
||||
totalHits++;
|
||||
hitsThresholdChecker.incrementHitCount();
|
||||
|
||||
return transformDoc(doc);
|
||||
}
|
||||
docIdSetIterator = null;
|
||||
|
Loading…
Reference in New Issue
Block a user