From 17a9b497556ae8a17a70ab2c820412822ba60fe0 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Fri, 15 Oct 2021 00:03:41 +0200 Subject: [PATCH] Partial sorted implementation --- .../dbengine/lucene/FieldValueHitQueue.java | 2 +- .../cavallium/dbengine/lucene/FullDocs.java | 18 +- .../lucene/{LLDocElement.java => LLDoc.java} | 2 +- .../lucene/LLDocElementScoreComparator.java | 6 +- .../cavallium/dbengine/lucene/LLFieldDoc.java | 4 +- .../cavallium/dbengine/lucene/LLScoreDoc.java | 2 +- .../cavallium/dbengine/lucene/LLSlotDoc.java | 3 +- .../dbengine/lucene/LLSlotDocCodec.java | 12 +- .../{PqFullDocs.java => LazyFullDocs.java} | 6 +- .../lucene/collector/FullDocsCollector.java | 18 +- .../lucene/collector/FullFieldDocs.java | 38 ++ .../collector/LMDBFullFieldDocCollector.java | 559 ++++++++++++------ .../collector/LMDBFullScoreDocCollector.java | 8 +- .../ScoringShardsCollectorManager.java | 1 - .../search/MultiLeafFieldComparator.java | 106 ++++ 15 files changed, 581 insertions(+), 204 deletions(-) rename src/main/java/it/cavallium/dbengine/lucene/{LLDocElement.java => LLDoc.java} (52%) rename src/main/java/it/cavallium/dbengine/lucene/{PqFullDocs.java => LazyFullDocs.java} (70%) create mode 100644 src/main/java/it/cavallium/dbengine/lucene/collector/FullFieldDocs.java create mode 100644 src/main/java/org/apache/lucene/search/MultiLeafFieldComparator.java diff --git a/src/main/java/it/cavallium/dbengine/lucene/FieldValueHitQueue.java b/src/main/java/it/cavallium/dbengine/lucene/FieldValueHitQueue.java index bc85535..0d51311 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/FieldValueHitQueue.java +++ b/src/main/java/it/cavallium/dbengine/lucene/FieldValueHitQueue.java @@ -16,7 +16,7 @@ public interface FieldValueHitQueue { LeafFieldComparator[] getComparators(LeafReaderContext context) throws IOException; - FieldDoc fillFields(Entry entry); + LLFieldDoc fillFields(LLSlotDoc entry); SortField[] getFields(); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/FullDocs.java b/src/main/java/it/cavallium/dbengine/lucene/FullDocs.java index 053e1ec..dc6a1f9 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/FullDocs.java +++ b/src/main/java/it/cavallium/dbengine/lucene/FullDocs.java @@ -12,11 +12,11 @@ import org.apache.lucene.search.TotalHits.Relation; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; -public interface FullDocs extends ResourceIterable { +public interface FullDocs extends ResourceIterable { - Comparator SHARD_INDEX_TIE_BREAKER = Comparator.comparingInt(LLDocElement::shardIndex); - Comparator DOC_ID_TIE_BREAKER = Comparator.comparingInt(LLDocElement::doc); - Comparator DEFAULT_TIE_BREAKER = SHARD_INDEX_TIE_BREAKER.thenComparing(DOC_ID_TIE_BREAKER); + Comparator SHARD_INDEX_TIE_BREAKER = Comparator.comparingInt(LLDoc::shardIndex); + Comparator DOC_ID_TIE_BREAKER = Comparator.comparingInt(LLDoc::doc); + Comparator DEFAULT_TIE_BREAKER = SHARD_INDEX_TIE_BREAKER.thenComparing(DOC_ID_TIE_BREAKER); @Override Flux iterate(); @@ -26,7 +26,7 @@ public interface FullDocs extends ResourceIterable { TotalHits totalHits(); - static FullDocs merge(@Nullable Sort sort, FullDocs[] fullDocs) { + static FullDocs merge(@Nullable Sort sort, FullDocs[] fullDocs) { ResourceIterable mergedIterable = mergeResourceIterable(sort, fullDocs); TotalHits mergedTotalHits = mergeTotalHits(fullDocs); return new FullDocs<>() { @@ -47,7 +47,7 @@ public interface FullDocs extends ResourceIterable { }; } - static int tieBreakCompare( + static int tieBreakCompare( T firstDoc, T secondDoc, Comparator tieBreaker) { @@ -61,7 +61,7 @@ public interface FullDocs extends ResourceIterable { } } - static ResourceIterable mergeResourceIterable( + static ResourceIterable mergeResourceIterable( @Nullable Sort sort, FullDocs[] fullDocs) { return () -> { @@ -73,7 +73,7 @@ public interface FullDocs extends ResourceIterable { iterables[i] = singleFullDocs; } - Comparator comp; + Comparator comp; if (sort == null) { // Merge maintaining sorting order (Algorithm taken from TopDocs.ScoreMergeSortQueue) @@ -132,7 +132,7 @@ public interface FullDocs extends ResourceIterable { }; } - static TotalHits mergeTotalHits(FullDocs[] fullDocs) { + static TotalHits mergeTotalHits(FullDocs[] fullDocs) { long totalCount = 0; Relation totalRelation = EQUAL_TO; for (FullDocs fullDoc : fullDocs) { diff --git a/src/main/java/it/cavallium/dbengine/lucene/LLDocElement.java b/src/main/java/it/cavallium/dbengine/lucene/LLDoc.java similarity index 52% rename from src/main/java/it/cavallium/dbengine/lucene/LLDocElement.java rename to src/main/java/it/cavallium/dbengine/lucene/LLDoc.java index 06779b4..95162fb 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LLDocElement.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LLDoc.java @@ -1,6 +1,6 @@ package it.cavallium.dbengine.lucene; -public sealed interface LLDocElement permits LLSlotDoc, LLFieldDoc, LLScoreDoc { +public sealed interface LLDoc permits LLSlotDoc, LLFieldDoc, LLScoreDoc { int doc(); diff --git a/src/main/java/it/cavallium/dbengine/lucene/LLDocElementScoreComparator.java b/src/main/java/it/cavallium/dbengine/lucene/LLDocElementScoreComparator.java index ba7f2a4..16f424c 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LLDocElementScoreComparator.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LLDocElementScoreComparator.java @@ -2,12 +2,12 @@ package it.cavallium.dbengine.lucene; import java.util.Comparator; -class LLDocElementScoreComparator implements Comparator { +class LLDocElementScoreComparator implements Comparator { - public static final Comparator SCORE_DOC_SCORE_ELEM_COMPARATOR = new LLDocElementScoreComparator(); + public static final Comparator SCORE_DOC_SCORE_ELEM_COMPARATOR = new LLDocElementScoreComparator(); @Override - public int compare(LLDocElement hitA, LLDocElement hitB) { + public int compare(LLDoc hitA, LLDoc hitB) { return Float.compare(hitA.score(), hitB.score()); } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/LLFieldDoc.java b/src/main/java/it/cavallium/dbengine/lucene/LLFieldDoc.java index c5c4db8..cc10b7b 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LLFieldDoc.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LLFieldDoc.java @@ -1,12 +1,10 @@ package it.cavallium.dbengine.lucene; -import java.util.Arrays; import java.util.List; import java.util.Objects; -import java.util.StringJoiner; import java.util.stream.Collectors; -public record LLFieldDoc(int doc, float score, int shardIndex, List fields) implements LLDocElement { +public record LLFieldDoc(int doc, float score, int shardIndex, List fields) implements LLDoc { @Override public String toString() { diff --git a/src/main/java/it/cavallium/dbengine/lucene/LLScoreDoc.java b/src/main/java/it/cavallium/dbengine/lucene/LLScoreDoc.java index 77a0262..71ba36d 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LLScoreDoc.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LLScoreDoc.java @@ -2,7 +2,7 @@ package it.cavallium.dbengine.lucene; import org.apache.lucene.search.ScoreDoc; -public record LLScoreDoc(int doc, float score, int shardIndex) implements LLDocElement { +public record LLScoreDoc(int doc, float score, int shardIndex) implements LLDoc { public ScoreDoc toScoreDoc() { return new ScoreDoc(doc, score, shardIndex); diff --git a/src/main/java/it/cavallium/dbengine/lucene/LLSlotDoc.java b/src/main/java/it/cavallium/dbengine/lucene/LLSlotDoc.java index 43fb13a..79dc5b3 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LLSlotDoc.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LLSlotDoc.java @@ -1,12 +1,11 @@ package it.cavallium.dbengine.lucene; import org.apache.lucene.search.FieldComparator; -import org.apache.lucene.search.FieldValueHitQueue; import org.apache.lucene.search.FieldValueHitQueue.Entry; import org.apache.lucene.search.ScoreDoc; /** Extension of ScoreDoc to also store the {@link FieldComparator} slot. */ -public record LLSlotDoc(int doc, float score, int shardIndex, int slot) implements LLDocElement { +public record LLSlotDoc(int doc, float score, int shardIndex, int slot) implements LLDoc { public ScoreDoc toScoreDoc() { return new ScoreDoc(doc, score, shardIndex); diff --git a/src/main/java/it/cavallium/dbengine/lucene/LLSlotDocCodec.java b/src/main/java/it/cavallium/dbengine/lucene/LLSlotDocCodec.java index bfa29aa..f9e12a0 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LLSlotDocCodec.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LLSlotDocCodec.java @@ -3,6 +3,8 @@ package it.cavallium.dbengine.lucene; import io.net5.buffer.ByteBuf; import it.cavallium.dbengine.database.disk.LLTempLMDBEnv; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.function.Function; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.FieldComparator; @@ -155,14 +157,14 @@ public class LLSlotDocCodec implements LMDBSortedCodec, FieldValueHit * @see IndexSearcher#search(Query,int, Sort) */ @Override - public FieldDoc fillFields(final Entry entry) { + public LLFieldDoc fillFields(final LLSlotDoc entry) { final int n = comparators.length; - final Object[] fields = new Object[n]; - for (int i = 0; i < n; ++i) { - fields[i] = comparators[i].value(entry.slot); + final List fields = new ArrayList<>(n); + for (FieldComparator comparator : comparators) { + fields.add(comparator.value(entry.slot())); } // if (maxscore > 1.0f) doc.score /= maxscore; // normalize scores - return new FieldDoc(entry.doc, entry.score, fields); + return new LLFieldDoc(entry.doc(), entry.score(), entry.shardIndex(), fields); } /** Returns the SortFields being used by this hit queue. */ diff --git a/src/main/java/it/cavallium/dbengine/lucene/PqFullDocs.java b/src/main/java/it/cavallium/dbengine/lucene/LazyFullDocs.java similarity index 70% rename from src/main/java/it/cavallium/dbengine/lucene/PqFullDocs.java rename to src/main/java/it/cavallium/dbengine/lucene/LazyFullDocs.java index 11fff6e..994fd5b 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/PqFullDocs.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LazyFullDocs.java @@ -3,12 +3,12 @@ package it.cavallium.dbengine.lucene; import org.apache.lucene.search.TotalHits; import reactor.core.publisher.Flux; -public class PqFullDocs implements FullDocs { +public class LazyFullDocs implements FullDocs { - private final PriorityQueue pq; + private final ResourceIterable pq; private final TotalHits totalHits; - public PqFullDocs(PriorityQueue pq, TotalHits totalHits) { + public LazyFullDocs(ResourceIterable pq, TotalHits totalHits) { this.pq = pq; this.totalHits = totalHits; } diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/FullDocsCollector.java b/src/main/java/it/cavallium/dbengine/lucene/collector/FullDocsCollector.java index ead2590..fed8509 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/collector/FullDocsCollector.java +++ b/src/main/java/it/cavallium/dbengine/lucene/collector/FullDocsCollector.java @@ -16,11 +16,11 @@ */ package it.cavallium.dbengine.lucene.collector; -import it.cavallium.dbengine.lucene.EmptyPriorityQueue; import it.cavallium.dbengine.lucene.FullDocs; -import it.cavallium.dbengine.lucene.LLDocElement; -import it.cavallium.dbengine.lucene.PqFullDocs; +import it.cavallium.dbengine.lucene.LLDoc; +import it.cavallium.dbengine.lucene.LazyFullDocs; import it.cavallium.dbengine.lucene.PriorityQueue; +import it.cavallium.dbengine.lucene.ResourceIterable; import org.apache.lucene.search.Collector; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TotalHits; @@ -34,7 +34,7 @@ import org.apache.lucene.search.TotalHits; * #FullDocsCollector(PriorityQueue)}. In that case however, you might want to consider overriding * all methods, in order to avoid a NullPointerException. */ -public abstract class FullDocsCollector implements Collector, AutoCloseable { +public abstract class FullDocsCollector implements Collector, AutoCloseable { /** * The priority queue which holds the top documents. Note that different implementations of @@ -42,7 +42,7 @@ public abstract class FullDocsCollector implements Colle * top scoring documents, while other PQ implementations may hold documents sorted by other * criteria. */ - protected final PriorityQueue pq; + protected final PriorityQueue pq; /** The total number of documents that the collector encountered. */ protected int totalHits; @@ -50,7 +50,7 @@ public abstract class FullDocsCollector implements Colle /** Whether {@link #totalHits} is exact or a lower bound. */ protected TotalHits.Relation totalHitsRelation = TotalHits.Relation.EQUAL_TO; - protected FullDocsCollector(PriorityQueue pq) { + protected FullDocsCollector(PriorityQueue pq) { this.pq = pq; } @@ -60,10 +60,12 @@ public abstract class FullDocsCollector implements Colle } /** Returns the top docs that were collected by this collector. */ - public FullDocs fullDocs() { - return new PqFullDocs<>(this.pq, new TotalHits(totalHits, totalHitsRelation)); + public FullDocs fullDocs() { + return new LazyFullDocs<>(mapResults(this.pq), new TotalHits(totalHits, totalHitsRelation)); } + public abstract ResourceIterable mapResults(ResourceIterable it); + @Override public void close() throws Exception { pq.close(); diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/FullFieldDocs.java b/src/main/java/it/cavallium/dbengine/lucene/collector/FullFieldDocs.java new file mode 100644 index 0000000..687b72f --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/collector/FullFieldDocs.java @@ -0,0 +1,38 @@ +package it.cavallium.dbengine.lucene.collector; + +import it.cavallium.dbengine.lucene.FullDocs; +import it.cavallium.dbengine.lucene.LLDoc; +import it.cavallium.dbengine.lucene.LLFieldDoc; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.TotalHits; +import reactor.core.publisher.Flux; + +public class FullFieldDocs implements FullDocs { + + private final FullDocs fullDocs; + private final SortField[] fields; + + public FullFieldDocs(FullDocs fullDocs, SortField[] fields) { + this.fullDocs = fullDocs; + this.fields = fields; + } + + @Override + public Flux iterate() { + return fullDocs.iterate(); + } + + @Override + public Flux iterate(long skips) { + return fullDocs.iterate(skips); + } + + @Override + public TotalHits totalHits() { + return fullDocs.totalHits(); + } + + public SortField[] fields() { + return fields; + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/LMDBFullFieldDocCollector.java b/src/main/java/it/cavallium/dbengine/lucene/collector/LMDBFullFieldDocCollector.java index 86d7feb..2805235 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/collector/LMDBFullFieldDocCollector.java +++ b/src/main/java/it/cavallium/dbengine/lucene/collector/LMDBFullFieldDocCollector.java @@ -19,234 +19,461 @@ package it.cavallium.dbengine.lucene.collector; import it.cavallium.dbengine.database.disk.LLTempLMDBEnv; import it.cavallium.dbengine.lucene.FieldValueHitQueue; import it.cavallium.dbengine.lucene.FullDocs; -import it.cavallium.dbengine.lucene.LLDocElement; +import it.cavallium.dbengine.lucene.LLDoc; +import it.cavallium.dbengine.lucene.LLFieldDoc; import it.cavallium.dbengine.lucene.LLScoreDoc; +import it.cavallium.dbengine.lucene.LLScoreDocCodec; import it.cavallium.dbengine.lucene.LLSlotDoc; import it.cavallium.dbengine.lucene.LLSlotDocCodec; import it.cavallium.dbengine.lucene.LMDBPriorityQueue; import it.cavallium.dbengine.lucene.MaxScoreAccumulator; -import it.cavallium.dbengine.lucene.MaxScoreAccumulator.DocAndScore; +import it.cavallium.dbengine.lucene.PriorityQueue; +import it.cavallium.dbengine.lucene.ResourceIterable; import java.io.IOException; +import java.util.Arrays; import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.ReaderUtil; +import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.CollectorManager; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.FieldComparator; +import org.apache.lucene.search.FieldDoc; +import org.apache.lucene.search.FieldValueHitQueue.Entry; import org.apache.lucene.search.LeafCollector; +import org.apache.lucene.search.LeafFieldComparator; +import org.apache.lucene.search.MultiLeafFieldComparator; import org.apache.lucene.search.Scorable; import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortField; import org.apache.lucene.search.TotalHits; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; +import org.apache.lucene.search.TotalHits.Relation; +import reactor.core.publisher.Flux; -public abstract class LMDBFullFieldDocCollector extends FullDocsCollector { +/** + * A {@link org.apache.lucene.search.Collector} that sorts by {@link SortField} using {@link FieldComparator}s. + * + *

See the {@link #create(LLTempLMDBEnv, Sort, int, int)} (org.apache.lucene.search.Sort, int, int)} method for instantiating a + * TopFieldCollector. + * + */ +public abstract class LMDBFullFieldDocCollector extends FullDocsCollector { - private final FieldValueHitQueue fieldValueHitQueue; + // TODO: one optimization we could do is to pre-fill + // the queue with sentinel value that guaranteed to + // always compare lower than a real hit; this would + // save having to check queueFull on each insert - public abstract static class ScorerLeafCollector implements LeafCollector { + private abstract class TopFieldLeafCollector implements LeafCollector { - protected Scorable scorer; + final LeafFieldComparator comparator; + final int reverseMul; + Scorable scorer; + boolean collectedAllCompetitiveHits = false; + + TopFieldLeafCollector(PriorityQueue queue, + FieldValueHitQueue fieldValueHitQueue, + Sort sort, + LeafReaderContext context) + throws IOException { + // as all segments are sorted in the same way, enough to check only the 1st segment for + // indexSort + if (searchSortPartOfIndexSort == null) { + final Sort indexSort = context.reader().getMetaData().getSort(); + searchSortPartOfIndexSort = canEarlyTerminate(sort, indexSort); + if (searchSortPartOfIndexSort) { + firstComparator.disableSkipping(); + } + } + LeafFieldComparator[] comparators = fieldValueHitQueue.getComparators(context); + int[] reverseMuls = fieldValueHitQueue.getReverseMul(); + if (comparators.length == 1) { + this.reverseMul = reverseMuls[0]; + this.comparator = comparators[0]; + } else { + this.reverseMul = 1; + this.comparator = new MultiLeafFieldComparator(comparators, reverseMuls); + } + } + + void countHit(int doc) throws IOException { + ++totalHits; + hitsThresholdChecker.incrementHitCount(); + + if (minScoreAcc != null && (totalHits & minScoreAcc.modInterval) == 0) { + updateGlobalMinCompetitiveScore(scorer); + } + if (scoreMode.isExhaustive() == false + && totalHitsRelation == TotalHits.Relation.EQUAL_TO + && hitsThresholdChecker.isThresholdReached()) { + // for the first time hitsThreshold is reached, notify comparator about this + comparator.setHitsThresholdReached(); + totalHitsRelation = TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO; + } + } + + boolean thresholdCheck(int doc) throws IOException { + if (collectedAllCompetitiveHits || reverseMul * comparator.compareBottom(doc) <= 0) { + // since docs are visited in doc Id order, if compare is 0, it means + // this document is largest than anything else in the queue, and + // therefore not competitive. + if (searchSortPartOfIndexSort) { + if (hitsThresholdChecker.isThresholdReached()) { + totalHitsRelation = Relation.GREATER_THAN_OR_EQUAL_TO; + throw new CollectionTerminatedException(); + } else { + collectedAllCompetitiveHits = true; + } + } else if (totalHitsRelation == TotalHits.Relation.EQUAL_TO) { + // we can start setting the min competitive score if the + // threshold is reached for the first time here. + updateMinCompetitiveScore(scorer); + } + return true; + } + return false; + } + + void collectCompetitiveHit(int doc) throws IOException { + // This hit is competitive - replace bottom element in queue & adjustTop + comparator.copy(pq.top().slot(), doc); + updateBottom(doc); + comparator.setBottom(pq.top().slot()); + updateMinCompetitiveScore(scorer); + } + + void collectAnyHit(int doc, int hitsCollected) throws IOException { + // Startup transient: queue hasn't gathered numHits yet + int slot = hitsCollected - 1; + // Copy hit into queue + comparator.copy(slot, doc); + add(slot, doc); + if (queueFull) { + comparator.setBottom(pq.top().slot()); + updateMinCompetitiveScore(scorer); + } + } @Override public void setScorer(Scorable scorer) throws IOException { this.scorer = scorer; - } - } - - private static class SimpleLMDBFullScoreDocCollector extends LMDBFullFieldDocCollector { - - SimpleLMDBFullScoreDocCollector(LLTempLMDBEnv env, @Nullable Long limit, - HitsThresholdChecker hitsThresholdChecker, MaxScoreAccumulator minScoreAcc) { - super(env, limit, hitsThresholdChecker, minScoreAcc); + comparator.setScorer(scorer); + minCompetitiveScore = 0f; + updateMinCompetitiveScore(scorer); + if (minScoreAcc != null) { + updateGlobalMinCompetitiveScore(scorer); + } } @Override - public LeafCollector getLeafCollector(LeafReaderContext context) { - docBase = context.docBase; - return new ScorerLeafCollector() { + public DocIdSetIterator competitiveIterator() throws IOException { + return comparator.competitiveIterator(); + } + } - @Override - public void setScorer(Scorable scorer) throws IOException { - super.setScorer(scorer); - minCompetitiveScore = 0f; - updateMinCompetitiveScore(scorer); - if (minScoreAcc != null) { - updateGlobalMinCompetitiveScore(scorer); - } - } + static boolean canEarlyTerminate(Sort searchSort, Sort indexSort) { + return canEarlyTerminateOnDocId(searchSort) || canEarlyTerminateOnPrefix(searchSort, indexSort); + } + + private static boolean canEarlyTerminateOnDocId(Sort searchSort) { + final SortField[] fields1 = searchSort.getSort(); + return SortField.FIELD_DOC.equals(fields1[0]); + } + + private static boolean canEarlyTerminateOnPrefix(Sort searchSort, Sort indexSort) { + if (indexSort != null) { + final SortField[] fields1 = searchSort.getSort(); + final SortField[] fields2 = indexSort.getSort(); + // early termination is possible if fields1 is a prefix of fields2 + if (fields1.length > fields2.length) { + return false; + } + return Arrays.asList(fields1).equals(Arrays.asList(fields2).subList(0, fields1.length)); + } else { + return false; + } + } + + /* + * Implements a TopFieldCollector over one SortField criteria, with tracking + * document scores and maxScore. + */ + private static class SimpleFieldCollector extends LMDBFullFieldDocCollector { + final Sort sort; + final PriorityQueue queue; + private final FieldValueHitQueue fieldValueHitQueue; + + public SimpleFieldCollector( + Sort sort, + PriorityQueue queue, + FieldValueHitQueue fieldValueHitQueue, + int numHits, + HitsThresholdChecker hitsThresholdChecker, + MaxScoreAccumulator minScoreAcc) { + super(queue, fieldValueHitQueue, numHits, hitsThresholdChecker, sort.needsScores(), minScoreAcc); + this.sort = sort; + this.queue = queue; + this.fieldValueHitQueue = fieldValueHitQueue; + } + + @Override + public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException { + docBase = context.docBase; + + return new TopFieldLeafCollector(queue, fieldValueHitQueue, sort, context) { @Override public void collect(int doc) throws IOException { - float score = scorer.score(); - - assert score >= 0; - - totalHits++; - hitsThresholdChecker.incrementHitCount(); - - if (minScoreAcc != null && (totalHits & minScoreAcc.modInterval) == 0) { - updateGlobalMinCompetitiveScore(scorer); - } - - if (limit != null && pq.size() >= limit) { - var pqTop = pq.top(); - if (pqTop != null && score <= pqTop.score()) { - if (totalHitsRelation == TotalHits.Relation.EQUAL_TO) { - updateMinCompetitiveScore(scorer); - } + countHit(doc); + if (queueFull) { + if (thresholdCheck(doc)) { return; - } else { - pq.replaceTop(new LLScoreDoc(doc + docBase, score, -1)); } + collectCompetitiveHit(doc); } else { - pq.add(new LLScoreDoc(doc + docBase, score, -1)); + collectAnyHit(doc, totalHits); } - updateMinCompetitiveScore(scorer); + } + }; + } + + @Override + public ResourceIterable mapResults(ResourceIterable it) { + return new ResourceIterable<>() { + @Override + public Flux iterate() { + return it.iterate().map(fieldValueHitQueue::fillFields); + } + + @Override + public Flux iterate(long skips) { + return it.iterate(skips).map(fieldValueHitQueue::fillFields); } }; } } - public static LMDBFullFieldDocCollector create(LLTempLMDBEnv env, long numHits, int totalHitsThreshold) { - return create(env, numHits, HitsThresholdChecker.create(totalHitsThreshold), null); - } + private static final LLFieldDoc[] EMPTY_SCOREDOCS = new LLFieldDoc[0]; - public static LMDBFullFieldDocCollector create(LLTempLMDBEnv env, int totalHitsThreshold) { - return create(env, HitsThresholdChecker.create(totalHitsThreshold), null); - } - - static LMDBFullFieldDocCollector create( - LLTempLMDBEnv env, - HitsThresholdChecker hitsThresholdChecker, - MaxScoreAccumulator minScoreAcc) { - - if (hitsThresholdChecker == null) { - throw new IllegalArgumentException("hitsThresholdChecker must be non null"); - } - - return new SimpleLMDBFullScoreDocCollector(env, null, hitsThresholdChecker, minScoreAcc); - } - - static LMDBFullFieldDocCollector create( - LLTempLMDBEnv env, - @NotNull Long numHits, - HitsThresholdChecker hitsThresholdChecker, - MaxScoreAccumulator minScoreAcc) { - - if (hitsThresholdChecker == null) { - throw new IllegalArgumentException("hitsThresholdChecker must be non null"); - } - - return new SimpleLMDBFullScoreDocCollector(env, - (numHits < 0 || numHits >= 2147483630L) ? null : numHits, - hitsThresholdChecker, - minScoreAcc - ); - } - - public static CollectorManager> createSharedManager( - LLTempLMDBEnv env, - long numHits, - int totalHitsThreshold) { - return new CollectorManager<>() { - - private final HitsThresholdChecker hitsThresholdChecker = - HitsThresholdChecker.createShared(totalHitsThreshold); - private final MaxScoreAccumulator minScoreAcc = new MaxScoreAccumulator(); - - @Override - public LMDBFullFieldDocCollector newCollector() { - return LMDBFullFieldDocCollector.create(env, numHits, hitsThresholdChecker, minScoreAcc); - } - - @Override - public FullDocs reduce(Collection collectors) { - return reduceShared(collectors); - } - }; - } - - public static CollectorManager> createSharedManager( - LLTempLMDBEnv env, - int totalHitsThreshold) { - return new CollectorManager<>() { - - private final HitsThresholdChecker hitsThresholdChecker = - HitsThresholdChecker.createShared(totalHitsThreshold); - private final MaxScoreAccumulator minScoreAcc = new MaxScoreAccumulator(); - - @Override - public LMDBFullFieldDocCollector newCollector() { - return LMDBFullFieldDocCollector.create(env, hitsThresholdChecker, minScoreAcc); - } - - @Override - public FullDocs reduce(Collection collectors) { - return reduceShared(collectors); - } - }; - } - - private static FullDocs reduceShared(Collection collectors) { - @SuppressWarnings("unchecked") - final FullDocs[] fullDocs = new FullDocs[collectors.size()]; - int i = 0; - for (LMDBFullFieldDocCollector collector : collectors) { - fullDocs[i++] = collector.fullDocs(); - } - return FullDocs.merge(null, fullDocs); - } - - int docBase; - final @Nullable Long limit; + final int numHits; final HitsThresholdChecker hitsThresholdChecker; + final FieldComparator firstComparator; + final boolean canSetMinScore; + + Boolean searchSortPartOfIndexSort = null; // shows if Search Sort if a part of the Index Sort + + // an accumulator that maintains the maximum of the segment's minimum competitive scores final MaxScoreAccumulator minScoreAcc; + // the current local minimum competitive score already propagated to the underlying scorer float minCompetitiveScore; - // prevents instantiation - LMDBFullFieldDocCollector(LLTempLMDBEnv env, SortField[] fields, @Nullable Long limit, - HitsThresholdChecker hitsThresholdChecker, MaxScoreAccumulator minScoreAcc) { - //noinspection unchecked - super(new LMDBPriorityQueue(env, new LLSlotDocCodec(env, fields))); - this.fieldValueHitQueue = ((FieldValueHitQueue) ((LMDBPriorityQueue) pq).getCodec()); - assert hitsThresholdChecker != null; - this.limit = limit; + final int numComparators; + boolean queueFull; + int docBase; + final boolean needsScores; + final ScoreMode scoreMode; + + // Declaring the constructor private prevents extending this class by anyone + // else. Note that the class cannot be final since it's extended by the + // internal versions. If someone will define a constructor with any other + // visibility, then anyone will be able to extend the class, which is not what + // we want. + private LMDBFullFieldDocCollector( + PriorityQueue pq, + FieldValueHitQueue fieldValueHitQueue, + int numHits, + HitsThresholdChecker hitsThresholdChecker, + boolean needsScores, + MaxScoreAccumulator minScoreAcc) { + super(pq); + this.needsScores = needsScores; + this.numHits = numHits; this.hitsThresholdChecker = hitsThresholdChecker; + this.numComparators = fieldValueHitQueue.getComparators().length; + this.firstComparator = fieldValueHitQueue.getComparators()[0]; + int reverseMul = fieldValueHitQueue.getReverseMul()[0]; + + if (firstComparator.getClass().equals(FieldComparator.RelevanceComparator.class) + && reverseMul == 1 // if the natural sort is preserved (sort by descending relevance) + && hitsThresholdChecker.getHitsThreshold() != Integer.MAX_VALUE) { + scoreMode = ScoreMode.TOP_SCORES; + canSetMinScore = true; + } else { + canSetMinScore = false; + if (hitsThresholdChecker.getHitsThreshold() != Integer.MAX_VALUE) { + scoreMode = needsScores ? ScoreMode.TOP_DOCS_WITH_SCORES : ScoreMode.TOP_DOCS; + } else { + scoreMode = needsScores ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES; + } + } this.minScoreAcc = minScoreAcc; } @Override public ScoreMode scoreMode() { - return hitsThresholdChecker.scoreMode(); + return scoreMode; } protected void updateGlobalMinCompetitiveScore(Scorable scorer) throws IOException { assert minScoreAcc != null; - DocAndScore maxMinScore = minScoreAcc.get(); - if (maxMinScore != null) { - float score = docBase > maxMinScore.docID ? Math.nextUp(maxMinScore.score) : maxMinScore.score; - if (score > minCompetitiveScore) { - assert hitsThresholdChecker.isThresholdReached(); - scorer.setMinCompetitiveScore(score); - minCompetitiveScore = score; + if (canSetMinScore && hitsThresholdChecker.isThresholdReached()) { + // we can start checking the global maximum score even + // if the local queue is not full because the threshold + // is reached. + MaxScoreAccumulator.DocAndScore maxMinScore = minScoreAcc.get(); + if (maxMinScore != null && maxMinScore.score > minCompetitiveScore) { + scorer.setMinCompetitiveScore(maxMinScore.score); + minCompetitiveScore = maxMinScore.score; totalHitsRelation = TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO; } } } protected void updateMinCompetitiveScore(Scorable scorer) throws IOException { - var pqTop = pq.top(); - if (hitsThresholdChecker.isThresholdReached() - && pqTop != null - && pqTop.score() != Float.NEGATIVE_INFINITY) { - float localMinScore = Math.nextUp(pqTop.score()); - if (localMinScore > minCompetitiveScore) { - scorer.setMinCompetitiveScore(localMinScore); + if (canSetMinScore && queueFull && hitsThresholdChecker.isThresholdReached()) { + assert pq.top() != null; + float minScore = (float) firstComparator.value(pq.top().slot()); + if (minScore > minCompetitiveScore) { + scorer.setMinCompetitiveScore(minScore); + minCompetitiveScore = minScore; totalHitsRelation = TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO; - minCompetitiveScore = localMinScore; if (minScoreAcc != null) { - minScoreAcc.accumulate(pqTop.doc(), pqTop.score()); + minScoreAcc.accumulate(pq.top().doc(), minScore); } } } } -} \ No newline at end of file + + /** + * Creates a new {@link LMDBFullFieldDocCollector} from the given arguments. + * + *

NOTE: The instances returned by this method pre-allocate a full array of length + * numHits. + * + * @param sort the sort criteria (SortFields). + * @param numHits the number of results to collect. + * @param totalHitsThreshold the number of docs to count accurately. If the query matches more + * than {@code totalHitsThreshold} hits then its hit count will be a lower bound. On the other + * hand if the query matches less than or exactly {@code totalHitsThreshold} hits then the hit + * count of the result will be accurate. {@link Integer#MAX_VALUE} may be used to make the hit + * count accurate, but this will also make query processing slower. + * @return a {@link LMDBFullFieldDocCollector} instance which will sort the results by the sort criteria. + */ + public static LMDBFullFieldDocCollector create(LLTempLMDBEnv env, Sort sort, int numHits, int totalHitsThreshold) { + if (totalHitsThreshold < 0) { + throw new IllegalArgumentException( + "totalHitsThreshold must be >= 0, got " + totalHitsThreshold); + } + + return create( + env, + sort, + numHits, + HitsThresholdChecker.create(Math.max(totalHitsThreshold, numHits)), + null /* bottomValueChecker */); + } + + /** + * Same as above with additional parameters to allow passing in the threshold checker and the max + * score accumulator. + */ + static LMDBFullFieldDocCollector create( + LLTempLMDBEnv env, + Sort sort, + int numHits, + HitsThresholdChecker hitsThresholdChecker, + MaxScoreAccumulator minScoreAcc) { + + if (sort.getSort().length == 0) { + throw new IllegalArgumentException("Sort must contain at least one field"); + } + + if (numHits <= 0) { + throw new IllegalArgumentException( + "numHits must be > 0; please use TotalHitCountCollector if you just need the total hit count"); + } + + if (hitsThresholdChecker == null) { + throw new IllegalArgumentException("hitsThresholdChecker should not be null"); + } + + var fieldValueHitQueue = new LLSlotDocCodec(env, sort.getSort()); + var queue = new LMDBPriorityQueue<>(env, fieldValueHitQueue); + + // inform a comparator that sort is based on this single field + // to enable some optimizations for skipping over non-competitive documents + // We can't set single sort when the `after` parameter is non-null as it's + // an implicit sort over the document id. + if (fieldValueHitQueue.getComparators().length == 1) { + fieldValueHitQueue.getComparators()[0].setSingleSort(); + } + return new SimpleFieldCollector(sort, queue, fieldValueHitQueue, numHits, hitsThresholdChecker, minScoreAcc); + } + + /** + * Create a CollectorManager which uses a shared hit counter to maintain number of hits and a + * shared {@link MaxScoreAccumulator} to propagate the minimum score accross segments if the + * primary sort is by relevancy. + */ + public static CollectorManager> createSharedManager( + LLTempLMDBEnv env, Sort sort, int numHits, FieldDoc after, int totalHitsThreshold) { + return new CollectorManager<>() { + + private final HitsThresholdChecker hitsThresholdChecker = + HitsThresholdChecker.createShared(Math.max(totalHitsThreshold, numHits)); + private final MaxScoreAccumulator minScoreAcc = new MaxScoreAccumulator(); + + @Override + public LMDBFullFieldDocCollector newCollector() throws IOException { + return create(env, sort, numHits, hitsThresholdChecker, minScoreAcc); + } + + @Override + public FullFieldDocs reduce(Collection collectors) throws IOException { + return reduceShared(sort, collectors); + } + }; + } + + private static FullFieldDocs reduceShared(Sort sort, Collection collectors) { + @SuppressWarnings("unchecked") + final FullDocs[] fullDocs = new FullDocs[collectors.size()]; + int i = 0; + for (var collector : collectors) { + fullDocs[i++] = collector.fullDocs(); + } + return (FullFieldDocs) FullDocs.merge(sort, fullDocs); + + } + + final void add(int slot, int doc) { + pq.add(new LLSlotDoc(docBase + doc, Float.NaN, -1, slot)); + + // The queue is full either when totalHits == numHits (in SimpleFieldCollector), in which case + // slot = totalHits - 1, or when hitsCollected == numHits (in PagingFieldCollector this is hits + // on the current page) and slot = hitsCollected - 1. + assert slot < numHits; + queueFull = slot == numHits - 1; + } + + //todo: check if this part is efficient and not redundant + final void updateBottom(int doc) { + // bottom.score is already set to Float.NaN in add(). + var bottom = pq.top(); + pq.replaceTop(new LLSlotDoc(docBase + doc, bottom.score(), bottom.shardIndex(), bottom.slot())); + } + + /* + * Only the following callback methods need to be overridden since + * topDocs(int, int) calls them to return the results. + */ + + /** Return whether collection terminated early. */ + public boolean isEarlyTerminated() { + return totalHitsRelation == Relation.GREATER_THAN_OR_EQUAL_TO; + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/LMDBFullScoreDocCollector.java b/src/main/java/it/cavallium/dbengine/lucene/collector/LMDBFullScoreDocCollector.java index 04fb4ec..e25885b 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/collector/LMDBFullScoreDocCollector.java +++ b/src/main/java/it/cavallium/dbengine/lucene/collector/LMDBFullScoreDocCollector.java @@ -24,6 +24,7 @@ import it.cavallium.dbengine.lucene.LLScoreDocCodec; import it.cavallium.dbengine.lucene.LMDBPriorityQueue; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.MaxScoreAccumulator; +import it.cavallium.dbengine.lucene.ResourceIterable; import java.io.IOException; import java.util.Collection; import org.apache.lucene.index.LeafReaderContext; @@ -48,7 +49,7 @@ import org.jetbrains.annotations.Nullable; *

NOTE: The values {@link Float#NaN} and {@link Float#NEGATIVE_INFINITY} are not valid * scores. This collector will not properly collect hits with such scores. */ -public abstract class LMDBFullScoreDocCollector extends FullDocsCollector { +public abstract class LMDBFullScoreDocCollector extends FullDocsCollector { /** Scorable leaf collector */ public abstract static class ScorerLeafCollector implements LeafCollector { @@ -126,6 +127,11 @@ public abstract class LMDBFullScoreDocCollector extends FullDocsCollector mapResults(ResourceIterable it) { + return it; + } } /** diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/ScoringShardsCollectorManager.java b/src/main/java/it/cavallium/dbengine/lucene/collector/ScoringShardsCollectorManager.java index 61306bd..d80aefb 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/collector/ScoringShardsCollectorManager.java +++ b/src/main/java/it/cavallium/dbengine/lucene/collector/ScoringShardsCollectorManager.java @@ -2,7 +2,6 @@ package it.cavallium.dbengine.lucene.collector; import static it.cavallium.dbengine.lucene.searcher.CurrentPageInfo.TIE_BREAKER; -import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.lucene.LuceneUtils; import java.io.IOException; import java.util.Collection; diff --git a/src/main/java/org/apache/lucene/search/MultiLeafFieldComparator.java b/src/main/java/org/apache/lucene/search/MultiLeafFieldComparator.java new file mode 100644 index 0000000..7eeddd8 --- /dev/null +++ b/src/main/java/org/apache/lucene/search/MultiLeafFieldComparator.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.search; + +import java.io.IOException; + +public final class MultiLeafFieldComparator implements LeafFieldComparator { + + private final LeafFieldComparator[] comparators; + private final int[] reverseMul; + // we extract the first comparator to avoid array access in the common case + // that the first comparator compares worse than the bottom entry in the queue + private final LeafFieldComparator firstComparator; + private final int firstReverseMul; + + public MultiLeafFieldComparator(LeafFieldComparator[] comparators, int[] reverseMul) { + if (comparators.length != reverseMul.length) { + throw new IllegalArgumentException( + "Must have the same number of comparators and reverseMul, got " + + comparators.length + + " and " + + reverseMul.length); + } + this.comparators = comparators; + this.reverseMul = reverseMul; + this.firstComparator = comparators[0]; + this.firstReverseMul = reverseMul[0]; + } + + @Override + public void setBottom(int slot) throws IOException { + for (LeafFieldComparator comparator : comparators) { + comparator.setBottom(slot); + } + } + + @Override + public int compareBottom(int doc) throws IOException { + int cmp = firstReverseMul * firstComparator.compareBottom(doc); + if (cmp != 0) { + return cmp; + } + for (int i = 1; i < comparators.length; ++i) { + cmp = reverseMul[i] * comparators[i].compareBottom(doc); + if (cmp != 0) { + return cmp; + } + } + return 0; + } + + @Override + public int compareTop(int doc) throws IOException { + int cmp = firstReverseMul * firstComparator.compareTop(doc); + if (cmp != 0) { + return cmp; + } + for (int i = 1; i < comparators.length; ++i) { + cmp = reverseMul[i] * comparators[i].compareTop(doc); + if (cmp != 0) { + return cmp; + } + } + return 0; + } + + @Override + public void copy(int slot, int doc) throws IOException { + for (LeafFieldComparator comparator : comparators) { + comparator.copy(slot, doc); + } + } + + @Override + public void setScorer(Scorable scorer) throws IOException { + for (LeafFieldComparator comparator : comparators) { + comparator.setScorer(scorer); + } + } + + @Override + public void setHitsThresholdReached() throws IOException { + // this is needed for skipping functionality that is only relevant for the 1st comparator + firstComparator.setHitsThresholdReached(); + } + + @Override + public DocIdSetIterator competitiveIterator() throws IOException { + // this is needed for skipping functionality that is only relevant for the 1st comparator + return firstComparator.competitiveIterator(); + } +}