Partial sorted implementation

This commit is contained in:
Andrea Cavalli 2021-10-15 00:03:41 +02:00
parent 36df18796b
commit 17a9b49755
15 changed files with 581 additions and 204 deletions

View File

@ -16,7 +16,7 @@ public interface FieldValueHitQueue {
LeafFieldComparator[] getComparators(LeafReaderContext context) throws IOException;
FieldDoc fillFields(Entry entry);
LLFieldDoc fillFields(LLSlotDoc entry);
SortField[] getFields();
}

View File

@ -12,11 +12,11 @@ import org.apache.lucene.search.TotalHits.Relation;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
public interface FullDocs<T extends LLDocElement> extends ResourceIterable<T> {
public interface FullDocs<T extends LLDoc> extends ResourceIterable<T> {
Comparator<LLDocElement> SHARD_INDEX_TIE_BREAKER = Comparator.comparingInt(LLDocElement::shardIndex);
Comparator<LLDocElement> DOC_ID_TIE_BREAKER = Comparator.comparingInt(LLDocElement::doc);
Comparator<LLDocElement> DEFAULT_TIE_BREAKER = SHARD_INDEX_TIE_BREAKER.thenComparing(DOC_ID_TIE_BREAKER);
Comparator<LLDoc> SHARD_INDEX_TIE_BREAKER = Comparator.comparingInt(LLDoc::shardIndex);
Comparator<LLDoc> DOC_ID_TIE_BREAKER = Comparator.comparingInt(LLDoc::doc);
Comparator<LLDoc> DEFAULT_TIE_BREAKER = SHARD_INDEX_TIE_BREAKER.thenComparing(DOC_ID_TIE_BREAKER);
@Override
Flux<T> iterate();
@ -26,7 +26,7 @@ public interface FullDocs<T extends LLDocElement> extends ResourceIterable<T> {
TotalHits totalHits();
static <T extends LLDocElement> FullDocs<T> merge(@Nullable Sort sort, FullDocs<T>[] fullDocs) {
static <T extends LLDoc> FullDocs<T> merge(@Nullable Sort sort, FullDocs<T>[] fullDocs) {
ResourceIterable<T> mergedIterable = mergeResourceIterable(sort, fullDocs);
TotalHits mergedTotalHits = mergeTotalHits(fullDocs);
return new FullDocs<>() {
@ -47,7 +47,7 @@ public interface FullDocs<T extends LLDocElement> extends ResourceIterable<T> {
};
}
static <T extends LLDocElement> int tieBreakCompare(
static <T extends LLDoc> int tieBreakCompare(
T firstDoc,
T secondDoc,
Comparator<T> tieBreaker) {
@ -61,7 +61,7 @@ public interface FullDocs<T extends LLDocElement> extends ResourceIterable<T> {
}
}
static <T extends LLDocElement> ResourceIterable<T> mergeResourceIterable(
static <T extends LLDoc> ResourceIterable<T> mergeResourceIterable(
@Nullable Sort sort,
FullDocs<T>[] fullDocs) {
return () -> {
@ -73,7 +73,7 @@ public interface FullDocs<T extends LLDocElement> extends ResourceIterable<T> {
iterables[i] = singleFullDocs;
}
Comparator<LLDocElement> comp;
Comparator<LLDoc> comp;
if (sort == null) {
// Merge maintaining sorting order (Algorithm taken from TopDocs.ScoreMergeSortQueue)
@ -132,7 +132,7 @@ public interface FullDocs<T extends LLDocElement> extends ResourceIterable<T> {
};
}
static <T extends LLDocElement> TotalHits mergeTotalHits(FullDocs<T>[] fullDocs) {
static <T extends LLDoc> TotalHits mergeTotalHits(FullDocs<T>[] fullDocs) {
long totalCount = 0;
Relation totalRelation = EQUAL_TO;
for (FullDocs<T> fullDoc : fullDocs) {

View File

@ -1,6 +1,6 @@
package it.cavallium.dbengine.lucene;
public sealed interface LLDocElement permits LLSlotDoc, LLFieldDoc, LLScoreDoc {
public sealed interface LLDoc permits LLSlotDoc, LLFieldDoc, LLScoreDoc {
int doc();

View File

@ -2,12 +2,12 @@ package it.cavallium.dbengine.lucene;
import java.util.Comparator;
class LLDocElementScoreComparator implements Comparator<LLDocElement> {
class LLDocElementScoreComparator implements Comparator<LLDoc> {
public static final Comparator<LLDocElement> SCORE_DOC_SCORE_ELEM_COMPARATOR = new LLDocElementScoreComparator();
public static final Comparator<LLDoc> SCORE_DOC_SCORE_ELEM_COMPARATOR = new LLDocElementScoreComparator();
@Override
public int compare(LLDocElement hitA, LLDocElement hitB) {
public int compare(LLDoc hitA, LLDoc hitB) {
return Float.compare(hitA.score(), hitB.score());
}
}

View File

@ -1,12 +1,10 @@
package it.cavallium.dbengine.lucene;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.StringJoiner;
import java.util.stream.Collectors;
public record LLFieldDoc(int doc, float score, int shardIndex, List<Object> fields) implements LLDocElement {
public record LLFieldDoc(int doc, float score, int shardIndex, List<Object> fields) implements LLDoc {
@Override
public String toString() {

View File

@ -2,7 +2,7 @@ package it.cavallium.dbengine.lucene;
import org.apache.lucene.search.ScoreDoc;
public record LLScoreDoc(int doc, float score, int shardIndex) implements LLDocElement {
public record LLScoreDoc(int doc, float score, int shardIndex) implements LLDoc {
public ScoreDoc toScoreDoc() {
return new ScoreDoc(doc, score, shardIndex);

View File

@ -1,12 +1,11 @@
package it.cavallium.dbengine.lucene;
import org.apache.lucene.search.FieldComparator;
import org.apache.lucene.search.FieldValueHitQueue;
import org.apache.lucene.search.FieldValueHitQueue.Entry;
import org.apache.lucene.search.ScoreDoc;
/** Extension of ScoreDoc to also store the {@link FieldComparator} slot. */
public record LLSlotDoc(int doc, float score, int shardIndex, int slot) implements LLDocElement {
public record LLSlotDoc(int doc, float score, int shardIndex, int slot) implements LLDoc {
public ScoreDoc toScoreDoc() {
return new ScoreDoc(doc, score, shardIndex);

View File

@ -3,6 +3,8 @@ package it.cavallium.dbengine.lucene;
import io.net5.buffer.ByteBuf;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.FieldComparator;
@ -155,14 +157,14 @@ public class LLSlotDocCodec implements LMDBSortedCodec<LLSlotDoc>, FieldValueHit
* @see IndexSearcher#search(Query,int, Sort)
*/
@Override
public FieldDoc fillFields(final Entry entry) {
public LLFieldDoc fillFields(final LLSlotDoc entry) {
final int n = comparators.length;
final Object[] fields = new Object[n];
for (int i = 0; i < n; ++i) {
fields[i] = comparators[i].value(entry.slot);
final List<Object> fields = new ArrayList<>(n);
for (FieldComparator<?> comparator : comparators) {
fields.add(comparator.value(entry.slot()));
}
// if (maxscore > 1.0f) doc.score /= maxscore; // normalize scores
return new FieldDoc(entry.doc, entry.score, fields);
return new LLFieldDoc(entry.doc(), entry.score(), entry.shardIndex(), fields);
}
/** Returns the SortFields being used by this hit queue. */

View File

@ -3,12 +3,12 @@ package it.cavallium.dbengine.lucene;
import org.apache.lucene.search.TotalHits;
import reactor.core.publisher.Flux;
public class PqFullDocs<T extends LLDocElement> implements FullDocs<T> {
public class LazyFullDocs<T extends LLDoc> implements FullDocs<T> {
private final PriorityQueue<T> pq;
private final ResourceIterable<T> pq;
private final TotalHits totalHits;
public PqFullDocs(PriorityQueue<T> pq, TotalHits totalHits) {
public LazyFullDocs(ResourceIterable<T> pq, TotalHits totalHits) {
this.pq = pq;
this.totalHits = totalHits;
}

View File

@ -16,11 +16,11 @@
*/
package it.cavallium.dbengine.lucene.collector;
import it.cavallium.dbengine.lucene.EmptyPriorityQueue;
import it.cavallium.dbengine.lucene.FullDocs;
import it.cavallium.dbengine.lucene.LLDocElement;
import it.cavallium.dbengine.lucene.PqFullDocs;
import it.cavallium.dbengine.lucene.LLDoc;
import it.cavallium.dbengine.lucene.LazyFullDocs;
import it.cavallium.dbengine.lucene.PriorityQueue;
import it.cavallium.dbengine.lucene.ResourceIterable;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHits;
@ -34,7 +34,7 @@ import org.apache.lucene.search.TotalHits;
* #FullDocsCollector(PriorityQueue)}. In that case however, you might want to consider overriding
* all methods, in order to avoid a NullPointerException.
*/
public abstract class FullDocsCollector<T extends LLDocElement> implements Collector, AutoCloseable {
public abstract class FullDocsCollector<INTERNAL extends LLDoc, EXTERNAL extends LLDoc> implements Collector, AutoCloseable {
/**
* The priority queue which holds the top documents. Note that different implementations of
@ -42,7 +42,7 @@ public abstract class FullDocsCollector<T extends LLDocElement> implements Colle
* top scoring documents, while other PQ implementations may hold documents sorted by other
* criteria.
*/
protected final PriorityQueue<T> pq;
protected final PriorityQueue<INTERNAL> pq;
/** The total number of documents that the collector encountered. */
protected int totalHits;
@ -50,7 +50,7 @@ public abstract class FullDocsCollector<T extends LLDocElement> implements Colle
/** Whether {@link #totalHits} is exact or a lower bound. */
protected TotalHits.Relation totalHitsRelation = TotalHits.Relation.EQUAL_TO;
protected FullDocsCollector(PriorityQueue<T> pq) {
protected FullDocsCollector(PriorityQueue<INTERNAL> pq) {
this.pq = pq;
}
@ -60,10 +60,12 @@ public abstract class FullDocsCollector<T extends LLDocElement> implements Colle
}
/** Returns the top docs that were collected by this collector. */
public FullDocs<T> fullDocs() {
return new PqFullDocs<>(this.pq, new TotalHits(totalHits, totalHitsRelation));
public FullDocs<EXTERNAL> fullDocs() {
return new LazyFullDocs<>(mapResults(this.pq), new TotalHits(totalHits, totalHitsRelation));
}
public abstract ResourceIterable<EXTERNAL> mapResults(ResourceIterable<INTERNAL> it);
@Override
public void close() throws Exception {
pq.close();

View File

@ -0,0 +1,38 @@
package it.cavallium.dbengine.lucene.collector;
import it.cavallium.dbengine.lucene.FullDocs;
import it.cavallium.dbengine.lucene.LLDoc;
import it.cavallium.dbengine.lucene.LLFieldDoc;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TotalHits;
import reactor.core.publisher.Flux;
public class FullFieldDocs<T extends LLDoc> implements FullDocs<T> {
private final FullDocs<T> fullDocs;
private final SortField[] fields;
public FullFieldDocs(FullDocs<T> fullDocs, SortField[] fields) {
this.fullDocs = fullDocs;
this.fields = fields;
}
@Override
public Flux<T> iterate() {
return fullDocs.iterate();
}
@Override
public Flux<T> iterate(long skips) {
return fullDocs.iterate(skips);
}
@Override
public TotalHits totalHits() {
return fullDocs.totalHits();
}
public SortField[] fields() {
return fields;
}
}

View File

@ -19,234 +19,461 @@ package it.cavallium.dbengine.lucene.collector;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.lucene.FieldValueHitQueue;
import it.cavallium.dbengine.lucene.FullDocs;
import it.cavallium.dbengine.lucene.LLDocElement;
import it.cavallium.dbengine.lucene.LLDoc;
import it.cavallium.dbengine.lucene.LLFieldDoc;
import it.cavallium.dbengine.lucene.LLScoreDoc;
import it.cavallium.dbengine.lucene.LLScoreDocCodec;
import it.cavallium.dbengine.lucene.LLSlotDoc;
import it.cavallium.dbengine.lucene.LLSlotDocCodec;
import it.cavallium.dbengine.lucene.LMDBPriorityQueue;
import it.cavallium.dbengine.lucene.MaxScoreAccumulator;
import it.cavallium.dbengine.lucene.MaxScoreAccumulator.DocAndScore;
import it.cavallium.dbengine.lucene.PriorityQueue;
import it.cavallium.dbengine.lucene.ResourceIterable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.ReaderUtil;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.FieldComparator;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.FieldValueHitQueue.Entry;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.LeafFieldComparator;
import org.apache.lucene.search.MultiLeafFieldComparator;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TotalHits;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.apache.lucene.search.TotalHits.Relation;
import reactor.core.publisher.Flux;
public abstract class LMDBFullFieldDocCollector extends FullDocsCollector<LLDocElement> {
/**
* A {@link org.apache.lucene.search.Collector} that sorts by {@link SortField} using {@link FieldComparator}s.
*
* <p>See the {@link #create(LLTempLMDBEnv, Sort, int, int)} (org.apache.lucene.search.Sort, int, int)} method for instantiating a
* TopFieldCollector.
*
*/
public abstract class LMDBFullFieldDocCollector extends FullDocsCollector<LLSlotDoc, LLFieldDoc> {
private final FieldValueHitQueue fieldValueHitQueue;
// TODO: one optimization we could do is to pre-fill
// the queue with sentinel value that guaranteed to
// always compare lower than a real hit; this would
// save having to check queueFull on each insert
public abstract static class ScorerLeafCollector implements LeafCollector {
private abstract class TopFieldLeafCollector implements LeafCollector {
protected Scorable scorer;
final LeafFieldComparator comparator;
final int reverseMul;
Scorable scorer;
boolean collectedAllCompetitiveHits = false;
TopFieldLeafCollector(PriorityQueue<LLSlotDoc> queue,
FieldValueHitQueue fieldValueHitQueue,
Sort sort,
LeafReaderContext context)
throws IOException {
// as all segments are sorted in the same way, enough to check only the 1st segment for
// indexSort
if (searchSortPartOfIndexSort == null) {
final Sort indexSort = context.reader().getMetaData().getSort();
searchSortPartOfIndexSort = canEarlyTerminate(sort, indexSort);
if (searchSortPartOfIndexSort) {
firstComparator.disableSkipping();
}
}
LeafFieldComparator[] comparators = fieldValueHitQueue.getComparators(context);
int[] reverseMuls = fieldValueHitQueue.getReverseMul();
if (comparators.length == 1) {
this.reverseMul = reverseMuls[0];
this.comparator = comparators[0];
} else {
this.reverseMul = 1;
this.comparator = new MultiLeafFieldComparator(comparators, reverseMuls);
}
}
void countHit(int doc) throws IOException {
++totalHits;
hitsThresholdChecker.incrementHitCount();
if (minScoreAcc != null && (totalHits & minScoreAcc.modInterval) == 0) {
updateGlobalMinCompetitiveScore(scorer);
}
if (scoreMode.isExhaustive() == false
&& totalHitsRelation == TotalHits.Relation.EQUAL_TO
&& hitsThresholdChecker.isThresholdReached()) {
// for the first time hitsThreshold is reached, notify comparator about this
comparator.setHitsThresholdReached();
totalHitsRelation = TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO;
}
}
boolean thresholdCheck(int doc) throws IOException {
if (collectedAllCompetitiveHits || reverseMul * comparator.compareBottom(doc) <= 0) {
// since docs are visited in doc Id order, if compare is 0, it means
// this document is largest than anything else in the queue, and
// therefore not competitive.
if (searchSortPartOfIndexSort) {
if (hitsThresholdChecker.isThresholdReached()) {
totalHitsRelation = Relation.GREATER_THAN_OR_EQUAL_TO;
throw new CollectionTerminatedException();
} else {
collectedAllCompetitiveHits = true;
}
} else if (totalHitsRelation == TotalHits.Relation.EQUAL_TO) {
// we can start setting the min competitive score if the
// threshold is reached for the first time here.
updateMinCompetitiveScore(scorer);
}
return true;
}
return false;
}
void collectCompetitiveHit(int doc) throws IOException {
// This hit is competitive - replace bottom element in queue & adjustTop
comparator.copy(pq.top().slot(), doc);
updateBottom(doc);
comparator.setBottom(pq.top().slot());
updateMinCompetitiveScore(scorer);
}
void collectAnyHit(int doc, int hitsCollected) throws IOException {
// Startup transient: queue hasn't gathered numHits yet
int slot = hitsCollected - 1;
// Copy hit into queue
comparator.copy(slot, doc);
add(slot, doc);
if (queueFull) {
comparator.setBottom(pq.top().slot());
updateMinCompetitiveScore(scorer);
}
}
@Override
public void setScorer(Scorable scorer) throws IOException {
this.scorer = scorer;
}
}
private static class SimpleLMDBFullScoreDocCollector extends LMDBFullFieldDocCollector {
SimpleLMDBFullScoreDocCollector(LLTempLMDBEnv env, @Nullable Long limit,
HitsThresholdChecker hitsThresholdChecker, MaxScoreAccumulator minScoreAcc) {
super(env, limit, hitsThresholdChecker, minScoreAcc);
comparator.setScorer(scorer);
minCompetitiveScore = 0f;
updateMinCompetitiveScore(scorer);
if (minScoreAcc != null) {
updateGlobalMinCompetitiveScore(scorer);
}
}
@Override
public LeafCollector getLeafCollector(LeafReaderContext context) {
docBase = context.docBase;
return new ScorerLeafCollector() {
public DocIdSetIterator competitiveIterator() throws IOException {
return comparator.competitiveIterator();
}
}
@Override
public void setScorer(Scorable scorer) throws IOException {
super.setScorer(scorer);
minCompetitiveScore = 0f;
updateMinCompetitiveScore(scorer);
if (minScoreAcc != null) {
updateGlobalMinCompetitiveScore(scorer);
}
}
static boolean canEarlyTerminate(Sort searchSort, Sort indexSort) {
return canEarlyTerminateOnDocId(searchSort) || canEarlyTerminateOnPrefix(searchSort, indexSort);
}
private static boolean canEarlyTerminateOnDocId(Sort searchSort) {
final SortField[] fields1 = searchSort.getSort();
return SortField.FIELD_DOC.equals(fields1[0]);
}
private static boolean canEarlyTerminateOnPrefix(Sort searchSort, Sort indexSort) {
if (indexSort != null) {
final SortField[] fields1 = searchSort.getSort();
final SortField[] fields2 = indexSort.getSort();
// early termination is possible if fields1 is a prefix of fields2
if (fields1.length > fields2.length) {
return false;
}
return Arrays.asList(fields1).equals(Arrays.asList(fields2).subList(0, fields1.length));
} else {
return false;
}
}
/*
* Implements a TopFieldCollector over one SortField criteria, with tracking
* document scores and maxScore.
*/
private static class SimpleFieldCollector extends LMDBFullFieldDocCollector {
final Sort sort;
final PriorityQueue<LLSlotDoc> queue;
private final FieldValueHitQueue fieldValueHitQueue;
public SimpleFieldCollector(
Sort sort,
PriorityQueue<LLSlotDoc> queue,
FieldValueHitQueue fieldValueHitQueue,
int numHits,
HitsThresholdChecker hitsThresholdChecker,
MaxScoreAccumulator minScoreAcc) {
super(queue, fieldValueHitQueue, numHits, hitsThresholdChecker, sort.needsScores(), minScoreAcc);
this.sort = sort;
this.queue = queue;
this.fieldValueHitQueue = fieldValueHitQueue;
}
@Override
public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
docBase = context.docBase;
return new TopFieldLeafCollector(queue, fieldValueHitQueue, sort, context) {
@Override
public void collect(int doc) throws IOException {
float score = scorer.score();
assert score >= 0;
totalHits++;
hitsThresholdChecker.incrementHitCount();
if (minScoreAcc != null && (totalHits & minScoreAcc.modInterval) == 0) {
updateGlobalMinCompetitiveScore(scorer);
}
if (limit != null && pq.size() >= limit) {
var pqTop = pq.top();
if (pqTop != null && score <= pqTop.score()) {
if (totalHitsRelation == TotalHits.Relation.EQUAL_TO) {
updateMinCompetitiveScore(scorer);
}
countHit(doc);
if (queueFull) {
if (thresholdCheck(doc)) {
return;
} else {
pq.replaceTop(new LLScoreDoc(doc + docBase, score, -1));
}
collectCompetitiveHit(doc);
} else {
pq.add(new LLScoreDoc(doc + docBase, score, -1));
collectAnyHit(doc, totalHits);
}
updateMinCompetitiveScore(scorer);
}
};
}
@Override
public ResourceIterable<LLFieldDoc> mapResults(ResourceIterable<LLSlotDoc> it) {
return new ResourceIterable<>() {
@Override
public Flux<LLFieldDoc> iterate() {
return it.iterate().map(fieldValueHitQueue::fillFields);
}
@Override
public Flux<LLFieldDoc> iterate(long skips) {
return it.iterate(skips).map(fieldValueHitQueue::fillFields);
}
};
}
}
public static LMDBFullFieldDocCollector create(LLTempLMDBEnv env, long numHits, int totalHitsThreshold) {
return create(env, numHits, HitsThresholdChecker.create(totalHitsThreshold), null);
}
private static final LLFieldDoc[] EMPTY_SCOREDOCS = new LLFieldDoc[0];
public static LMDBFullFieldDocCollector create(LLTempLMDBEnv env, int totalHitsThreshold) {
return create(env, HitsThresholdChecker.create(totalHitsThreshold), null);
}
static LMDBFullFieldDocCollector create(
LLTempLMDBEnv env,
HitsThresholdChecker hitsThresholdChecker,
MaxScoreAccumulator minScoreAcc) {
if (hitsThresholdChecker == null) {
throw new IllegalArgumentException("hitsThresholdChecker must be non null");
}
return new SimpleLMDBFullScoreDocCollector(env, null, hitsThresholdChecker, minScoreAcc);
}
static LMDBFullFieldDocCollector create(
LLTempLMDBEnv env,
@NotNull Long numHits,
HitsThresholdChecker hitsThresholdChecker,
MaxScoreAccumulator minScoreAcc) {
if (hitsThresholdChecker == null) {
throw new IllegalArgumentException("hitsThresholdChecker must be non null");
}
return new SimpleLMDBFullScoreDocCollector(env,
(numHits < 0 || numHits >= 2147483630L) ? null : numHits,
hitsThresholdChecker,
minScoreAcc
);
}
public static CollectorManager<LMDBFullFieldDocCollector, FullDocs<LLScoreDoc>> createSharedManager(
LLTempLMDBEnv env,
long numHits,
int totalHitsThreshold) {
return new CollectorManager<>() {
private final HitsThresholdChecker hitsThresholdChecker =
HitsThresholdChecker.createShared(totalHitsThreshold);
private final MaxScoreAccumulator minScoreAcc = new MaxScoreAccumulator();
@Override
public LMDBFullFieldDocCollector newCollector() {
return LMDBFullFieldDocCollector.create(env, numHits, hitsThresholdChecker, minScoreAcc);
}
@Override
public FullDocs<LLScoreDoc> reduce(Collection<LMDBFullFieldDocCollector> collectors) {
return reduceShared(collectors);
}
};
}
public static CollectorManager<LMDBFullFieldDocCollector, FullDocs<LLScoreDoc>> createSharedManager(
LLTempLMDBEnv env,
int totalHitsThreshold) {
return new CollectorManager<>() {
private final HitsThresholdChecker hitsThresholdChecker =
HitsThresholdChecker.createShared(totalHitsThreshold);
private final MaxScoreAccumulator minScoreAcc = new MaxScoreAccumulator();
@Override
public LMDBFullFieldDocCollector newCollector() {
return LMDBFullFieldDocCollector.create(env, hitsThresholdChecker, minScoreAcc);
}
@Override
public FullDocs<LLScoreDoc> reduce(Collection<LMDBFullFieldDocCollector> collectors) {
return reduceShared(collectors);
}
};
}
private static FullDocs<LLSlotDoc> reduceShared(Collection<LMDBFullFieldDocCollector> collectors) {
@SuppressWarnings("unchecked")
final FullDocs<LLSlotDoc>[] fullDocs = new FullDocs[collectors.size()];
int i = 0;
for (LMDBFullFieldDocCollector collector : collectors) {
fullDocs[i++] = collector.fullDocs();
}
return FullDocs.merge(null, fullDocs);
}
int docBase;
final @Nullable Long limit;
final int numHits;
final HitsThresholdChecker hitsThresholdChecker;
final FieldComparator<?> firstComparator;
final boolean canSetMinScore;
Boolean searchSortPartOfIndexSort = null; // shows if Search Sort if a part of the Index Sort
// an accumulator that maintains the maximum of the segment's minimum competitive scores
final MaxScoreAccumulator minScoreAcc;
// the current local minimum competitive score already propagated to the underlying scorer
float minCompetitiveScore;
// prevents instantiation
LMDBFullFieldDocCollector(LLTempLMDBEnv env, SortField[] fields, @Nullable Long limit,
HitsThresholdChecker hitsThresholdChecker, MaxScoreAccumulator minScoreAcc) {
//noinspection unchecked
super(new LMDBPriorityQueue(env, new LLSlotDocCodec(env, fields)));
this.fieldValueHitQueue = ((FieldValueHitQueue) ((LMDBPriorityQueue<LLDocElement>) pq).getCodec());
assert hitsThresholdChecker != null;
this.limit = limit;
final int numComparators;
boolean queueFull;
int docBase;
final boolean needsScores;
final ScoreMode scoreMode;
// Declaring the constructor private prevents extending this class by anyone
// else. Note that the class cannot be final since it's extended by the
// internal versions. If someone will define a constructor with any other
// visibility, then anyone will be able to extend the class, which is not what
// we want.
private LMDBFullFieldDocCollector(
PriorityQueue<LLSlotDoc> pq,
FieldValueHitQueue fieldValueHitQueue,
int numHits,
HitsThresholdChecker hitsThresholdChecker,
boolean needsScores,
MaxScoreAccumulator minScoreAcc) {
super(pq);
this.needsScores = needsScores;
this.numHits = numHits;
this.hitsThresholdChecker = hitsThresholdChecker;
this.numComparators = fieldValueHitQueue.getComparators().length;
this.firstComparator = fieldValueHitQueue.getComparators()[0];
int reverseMul = fieldValueHitQueue.getReverseMul()[0];
if (firstComparator.getClass().equals(FieldComparator.RelevanceComparator.class)
&& reverseMul == 1 // if the natural sort is preserved (sort by descending relevance)
&& hitsThresholdChecker.getHitsThreshold() != Integer.MAX_VALUE) {
scoreMode = ScoreMode.TOP_SCORES;
canSetMinScore = true;
} else {
canSetMinScore = false;
if (hitsThresholdChecker.getHitsThreshold() != Integer.MAX_VALUE) {
scoreMode = needsScores ? ScoreMode.TOP_DOCS_WITH_SCORES : ScoreMode.TOP_DOCS;
} else {
scoreMode = needsScores ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES;
}
}
this.minScoreAcc = minScoreAcc;
}
@Override
public ScoreMode scoreMode() {
return hitsThresholdChecker.scoreMode();
return scoreMode;
}
protected void updateGlobalMinCompetitiveScore(Scorable scorer) throws IOException {
assert minScoreAcc != null;
DocAndScore maxMinScore = minScoreAcc.get();
if (maxMinScore != null) {
float score = docBase > maxMinScore.docID ? Math.nextUp(maxMinScore.score) : maxMinScore.score;
if (score > minCompetitiveScore) {
assert hitsThresholdChecker.isThresholdReached();
scorer.setMinCompetitiveScore(score);
minCompetitiveScore = score;
if (canSetMinScore && hitsThresholdChecker.isThresholdReached()) {
// we can start checking the global maximum score even
// if the local queue is not full because the threshold
// is reached.
MaxScoreAccumulator.DocAndScore maxMinScore = minScoreAcc.get();
if (maxMinScore != null && maxMinScore.score > minCompetitiveScore) {
scorer.setMinCompetitiveScore(maxMinScore.score);
minCompetitiveScore = maxMinScore.score;
totalHitsRelation = TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO;
}
}
}
protected void updateMinCompetitiveScore(Scorable scorer) throws IOException {
var pqTop = pq.top();
if (hitsThresholdChecker.isThresholdReached()
&& pqTop != null
&& pqTop.score() != Float.NEGATIVE_INFINITY) {
float localMinScore = Math.nextUp(pqTop.score());
if (localMinScore > minCompetitiveScore) {
scorer.setMinCompetitiveScore(localMinScore);
if (canSetMinScore && queueFull && hitsThresholdChecker.isThresholdReached()) {
assert pq.top() != null;
float minScore = (float) firstComparator.value(pq.top().slot());
if (minScore > minCompetitiveScore) {
scorer.setMinCompetitiveScore(minScore);
minCompetitiveScore = minScore;
totalHitsRelation = TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO;
minCompetitiveScore = localMinScore;
if (minScoreAcc != null) {
minScoreAcc.accumulate(pqTop.doc(), pqTop.score());
minScoreAcc.accumulate(pq.top().doc(), minScore);
}
}
}
}
}
/**
* Creates a new {@link LMDBFullFieldDocCollector} from the given arguments.
*
* <p><b>NOTE</b>: The instances returned by this method pre-allocate a full array of length
* <code>numHits</code>.
*
* @param sort the sort criteria (SortFields).
* @param numHits the number of results to collect.
* @param totalHitsThreshold the number of docs to count accurately. If the query matches more
* than {@code totalHitsThreshold} hits then its hit count will be a lower bound. On the other
* hand if the query matches less than or exactly {@code totalHitsThreshold} hits then the hit
* count of the result will be accurate. {@link Integer#MAX_VALUE} may be used to make the hit
* count accurate, but this will also make query processing slower.
* @return a {@link LMDBFullFieldDocCollector} instance which will sort the results by the sort criteria.
*/
public static LMDBFullFieldDocCollector create(LLTempLMDBEnv env, Sort sort, int numHits, int totalHitsThreshold) {
if (totalHitsThreshold < 0) {
throw new IllegalArgumentException(
"totalHitsThreshold must be >= 0, got " + totalHitsThreshold);
}
return create(
env,
sort,
numHits,
HitsThresholdChecker.create(Math.max(totalHitsThreshold, numHits)),
null /* bottomValueChecker */);
}
/**
* Same as above with additional parameters to allow passing in the threshold checker and the max
* score accumulator.
*/
static LMDBFullFieldDocCollector create(
LLTempLMDBEnv env,
Sort sort,
int numHits,
HitsThresholdChecker hitsThresholdChecker,
MaxScoreAccumulator minScoreAcc) {
if (sort.getSort().length == 0) {
throw new IllegalArgumentException("Sort must contain at least one field");
}
if (numHits <= 0) {
throw new IllegalArgumentException(
"numHits must be > 0; please use TotalHitCountCollector if you just need the total hit count");
}
if (hitsThresholdChecker == null) {
throw new IllegalArgumentException("hitsThresholdChecker should not be null");
}
var fieldValueHitQueue = new LLSlotDocCodec(env, sort.getSort());
var queue = new LMDBPriorityQueue<>(env, fieldValueHitQueue);
// inform a comparator that sort is based on this single field
// to enable some optimizations for skipping over non-competitive documents
// We can't set single sort when the `after` parameter is non-null as it's
// an implicit sort over the document id.
if (fieldValueHitQueue.getComparators().length == 1) {
fieldValueHitQueue.getComparators()[0].setSingleSort();
}
return new SimpleFieldCollector(sort, queue, fieldValueHitQueue, numHits, hitsThresholdChecker, minScoreAcc);
}
/**
* Create a CollectorManager which uses a shared hit counter to maintain number of hits and a
* shared {@link MaxScoreAccumulator} to propagate the minimum score accross segments if the
* primary sort is by relevancy.
*/
public static CollectorManager<LMDBFullFieldDocCollector, FullFieldDocs<LLFieldDoc>> createSharedManager(
LLTempLMDBEnv env, Sort sort, int numHits, FieldDoc after, int totalHitsThreshold) {
return new CollectorManager<>() {
private final HitsThresholdChecker hitsThresholdChecker =
HitsThresholdChecker.createShared(Math.max(totalHitsThreshold, numHits));
private final MaxScoreAccumulator minScoreAcc = new MaxScoreAccumulator();
@Override
public LMDBFullFieldDocCollector newCollector() throws IOException {
return create(env, sort, numHits, hitsThresholdChecker, minScoreAcc);
}
@Override
public FullFieldDocs<LLFieldDoc> reduce(Collection<LMDBFullFieldDocCollector> collectors) throws IOException {
return reduceShared(sort, collectors);
}
};
}
private static FullFieldDocs<LLFieldDoc> reduceShared(Sort sort, Collection<LMDBFullFieldDocCollector> collectors) {
@SuppressWarnings("unchecked")
final FullDocs<LLFieldDoc>[] fullDocs = new FullDocs[collectors.size()];
int i = 0;
for (var collector : collectors) {
fullDocs[i++] = collector.fullDocs();
}
return (FullFieldDocs<LLFieldDoc>) FullDocs.merge(sort, fullDocs);
}
final void add(int slot, int doc) {
pq.add(new LLSlotDoc(docBase + doc, Float.NaN, -1, slot));
// The queue is full either when totalHits == numHits (in SimpleFieldCollector), in which case
// slot = totalHits - 1, or when hitsCollected == numHits (in PagingFieldCollector this is hits
// on the current page) and slot = hitsCollected - 1.
assert slot < numHits;
queueFull = slot == numHits - 1;
}
//todo: check if this part is efficient and not redundant
final void updateBottom(int doc) {
// bottom.score is already set to Float.NaN in add().
var bottom = pq.top();
pq.replaceTop(new LLSlotDoc(docBase + doc, bottom.score(), bottom.shardIndex(), bottom.slot()));
}
/*
* Only the following callback methods need to be overridden since
* topDocs(int, int) calls them to return the results.
*/
/** Return whether collection terminated early. */
public boolean isEarlyTerminated() {
return totalHitsRelation == Relation.GREATER_THAN_OR_EQUAL_TO;
}
}

View File

@ -24,6 +24,7 @@ import it.cavallium.dbengine.lucene.LLScoreDocCodec;
import it.cavallium.dbengine.lucene.LMDBPriorityQueue;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.MaxScoreAccumulator;
import it.cavallium.dbengine.lucene.ResourceIterable;
import java.io.IOException;
import java.util.Collection;
import org.apache.lucene.index.LeafReaderContext;
@ -48,7 +49,7 @@ import org.jetbrains.annotations.Nullable;
* <p><b>NOTE</b>: The values {@link Float#NaN} and {@link Float#NEGATIVE_INFINITY} are not valid
* scores. This collector will not properly collect hits with such scores.
*/
public abstract class LMDBFullScoreDocCollector extends FullDocsCollector<LLScoreDoc> {
public abstract class LMDBFullScoreDocCollector extends FullDocsCollector<LLScoreDoc, LLScoreDoc> {
/** Scorable leaf collector */
public abstract static class ScorerLeafCollector implements LeafCollector {
@ -126,6 +127,11 @@ public abstract class LMDBFullScoreDocCollector extends FullDocsCollector<LLScor
}
};
}
@Override
public ResourceIterable<LLScoreDoc> mapResults(ResourceIterable<LLScoreDoc> it) {
return it;
}
}
/**

View File

@ -2,7 +2,6 @@ package it.cavallium.dbengine.lucene.collector;
import static it.cavallium.dbengine.lucene.searcher.CurrentPageInfo.TIE_BREAKER;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.lucene.LuceneUtils;
import java.io.IOException;
import java.util.Collection;

View File

@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.search;
import java.io.IOException;
public final class MultiLeafFieldComparator implements LeafFieldComparator {
private final LeafFieldComparator[] comparators;
private final int[] reverseMul;
// we extract the first comparator to avoid array access in the common case
// that the first comparator compares worse than the bottom entry in the queue
private final LeafFieldComparator firstComparator;
private final int firstReverseMul;
public MultiLeafFieldComparator(LeafFieldComparator[] comparators, int[] reverseMul) {
if (comparators.length != reverseMul.length) {
throw new IllegalArgumentException(
"Must have the same number of comparators and reverseMul, got "
+ comparators.length
+ " and "
+ reverseMul.length);
}
this.comparators = comparators;
this.reverseMul = reverseMul;
this.firstComparator = comparators[0];
this.firstReverseMul = reverseMul[0];
}
@Override
public void setBottom(int slot) throws IOException {
for (LeafFieldComparator comparator : comparators) {
comparator.setBottom(slot);
}
}
@Override
public int compareBottom(int doc) throws IOException {
int cmp = firstReverseMul * firstComparator.compareBottom(doc);
if (cmp != 0) {
return cmp;
}
for (int i = 1; i < comparators.length; ++i) {
cmp = reverseMul[i] * comparators[i].compareBottom(doc);
if (cmp != 0) {
return cmp;
}
}
return 0;
}
@Override
public int compareTop(int doc) throws IOException {
int cmp = firstReverseMul * firstComparator.compareTop(doc);
if (cmp != 0) {
return cmp;
}
for (int i = 1; i < comparators.length; ++i) {
cmp = reverseMul[i] * comparators[i].compareTop(doc);
if (cmp != 0) {
return cmp;
}
}
return 0;
}
@Override
public void copy(int slot, int doc) throws IOException {
for (LeafFieldComparator comparator : comparators) {
comparator.copy(slot, doc);
}
}
@Override
public void setScorer(Scorable scorer) throws IOException {
for (LeafFieldComparator comparator : comparators) {
comparator.setScorer(scorer);
}
}
@Override
public void setHitsThresholdReached() throws IOException {
// this is needed for skipping functionality that is only relevant for the 1st comparator
firstComparator.setHitsThresholdReached();
}
@Override
public DocIdSetIterator competitiveIterator() throws IOException {
// this is needed for skipping functionality that is only relevant for the 1st comparator
return firstComparator.competitiveIterator();
}
}