Add scoring shard multi manager

This commit is contained in:
Andrea Cavalli 2021-11-09 00:54:09 +01:00
parent 47aac33b22
commit 50b3c897ff
9 changed files with 224 additions and 167 deletions

View File

@ -403,8 +403,7 @@ public class LuceneUtils {
@Nullable Sort sort,
@Nullable Integer startN,
@Nullable Integer topN,
TopDocs[] topDocs,
Comparator<ScoreDoc> tieBreaker) {
TopDocs[] topDocs) {
if ((startN == null) != (topN == null)) {
throw new IllegalArgumentException("You must pass startN and topN together or nothing");
}
@ -420,14 +419,12 @@ public class LuceneUtils {
defaultTopN += length;
}
result = TopDocs.merge(sort, 0, defaultTopN,
(TopFieldDocs[]) topDocs,
tieBreaker
(TopFieldDocs[]) topDocs
);
} else {
result = TopDocs.merge(sort, startN,
topN,
(TopFieldDocs[]) topDocs,
tieBreaker
(TopFieldDocs[]) topDocs
);
}
} else {
@ -439,14 +436,12 @@ public class LuceneUtils {
}
result = TopDocs.merge(0,
defaultTopN,
topDocs,
tieBreaker
topDocs
);
} else {
result = TopDocs.merge(startN,
topN,
topDocs,
tieBreaker
topDocs
);
}
}

View File

@ -1,8 +1,12 @@
package it.cavallium.dbengine.lucene.collector;
import java.util.List;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.TopDocs;
public interface CollectorMultiManager {
public interface CollectorMultiManager<T, U> {
ScoreMode scoreMode();
U reduce(List<T> results);
}

View File

@ -1,19 +1,24 @@
package it.cavallium.dbengine.lucene.searcher;
package it.cavallium.dbengine.lucene.collector;
import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.ALLOW_UNSCORED_PAGINATION_MODE;
import it.cavallium.dbengine.lucene.collector.UnscoredCollector;
import java.io.IOException;
import java.util.Collection;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopDocsCollector;
import org.apache.lucene.search.TopFieldCollector;
import org.apache.lucene.search.TopScoreDocCollector;
class TopDocsCollectorUtils {
public class OptimizedTopDocsCollector {
@SuppressWarnings({"unchecked", "rawtypes"})
public static TopDocsCollector<ScoreDoc> getTopDocsCollector(Sort luceneSort,
public static TopDocsCollector<ScoreDoc> create(Sort luceneSort,
int limit,
ScoreDoc after,
int totalHitsThreshold,

View File

@ -2,6 +2,7 @@ package it.cavallium.dbengine.lucene.collector;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;
@ -11,7 +12,7 @@ import org.apache.lucene.search.ScoreMode;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Sinks.Many;
public class ReactiveCollectorMultiManager implements CollectorMultiManager {
public class ReactiveCollectorMultiManager implements CollectorMultiManager<Void, Void> {
private final FluxSink<ScoreDoc> scoreDocsSink;
@ -49,4 +50,9 @@ public class ReactiveCollectorMultiManager implements CollectorMultiManager {
public ScoreMode scoreMode() {
return ScoreMode.COMPLETE_NO_SCORES;
}
@Override
public Void reduce(List<Void> results) {
return null;
}
}

View File

@ -1,132 +0,0 @@
package it.cavallium.dbengine.lucene.collector;
import static it.cavallium.dbengine.lucene.searcher.CurrentPageInfo.TIE_BREAKER;
import it.cavallium.dbengine.lucene.LuceneUtils;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldCollector;
import org.apache.lucene.search.TopFieldDocs;
import org.jetbrains.annotations.Nullable;
import reactor.core.scheduler.Schedulers;
public class ScoringShardsCollectorManager implements CollectorManager<TopFieldCollector, TopDocs> {
private final Query query;
@Nullable
private final Sort sort;
private final int numHits;
private final FieldDoc after;
private final int totalHitsThreshold;
private final @Nullable Integer startN;
private final @Nullable Integer topN;
private final CollectorManager<TopFieldCollector, TopFieldDocs> sharedCollectorManager;
private List<IndexSearcher> indexSearchers;
public ScoringShardsCollectorManager(Query query,
@Nullable final Sort sort,
final int numHits,
final FieldDoc after,
final int totalHitsThreshold,
int startN,
int topN) {
this(query, sort, numHits, after, totalHitsThreshold, (Integer) startN, (Integer) topN);
}
public ScoringShardsCollectorManager(Query query,
@Nullable final Sort sort,
final int numHits,
final FieldDoc after,
final int totalHitsThreshold,
int startN) {
this(query, sort, numHits, after, totalHitsThreshold, (Integer) startN, (Integer) 2147483630);
}
public ScoringShardsCollectorManager(Query query,
@Nullable final Sort sort,
final int numHits,
final FieldDoc after,
final int totalHitsThreshold) {
this(query, sort, numHits, after, totalHitsThreshold, null, null);
}
private ScoringShardsCollectorManager(Query query,
@Nullable final Sort sort,
final int numHits,
final FieldDoc after,
final int totalHitsThreshold,
@Nullable Integer startN,
@Nullable Integer topN) {
this.query = query;
this.sort = sort;
this.numHits = numHits;
this.after = after;
this.totalHitsThreshold = totalHitsThreshold;
this.startN = startN;
if (topN != null && startN != null && (long) topN + (long) startN > 2147483630) {
this.topN = 2147483630 - startN;
} else if (topN != null && topN > 2147483630) {
this.topN = 2147483630;
} else {
this.topN = topN;
}
this.sharedCollectorManager = TopFieldCollector.createSharedManager(sort == null ? Sort.RELEVANCE : sort, numHits, after, totalHitsThreshold);
}
@Override
public TopFieldCollector newCollector() throws IOException {
return sharedCollectorManager.newCollector();
}
public void setIndexSearchers(List<IndexSearcher> indexSearcher) {
this.indexSearchers = indexSearcher;
}
@Override
public TopDocs reduce(Collection<TopFieldCollector> collectors) throws IOException {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called reduce in a nonblocking thread");
}
TopDocs result;
if (sort != null) {
TopFieldDocs[] topDocs = new TopFieldDocs[collectors.size()];
var i = 0;
for (TopFieldCollector collector : collectors) {
topDocs[i] = collector.topDocs();
// Populate scores of topfieldcollector. By default it doesn't popupate the scores
if (topDocs[i].scoreDocs.length > 0 && Float.isNaN(topDocs[i].scoreDocs[0].score) && sort.needsScores()) {
Objects.requireNonNull(indexSearchers, "You must call setIndexSearchers before calling reduce!");
TopFieldCollector.populateScores(topDocs[i].scoreDocs, indexSearchers.get(i), query);
}
for (ScoreDoc scoreDoc : topDocs[i].scoreDocs) {
scoreDoc.shardIndex = i;
}
i++;
}
result = LuceneUtils.mergeTopDocs(sort, startN, topN, topDocs, TIE_BREAKER);
} else {
TopDocs[] topDocs = new TopDocs[collectors.size()];
var i = 0;
for (TopFieldCollector collector : collectors) {
topDocs[i] = collector.topDocs();
for (ScoreDoc scoreDoc : topDocs[i].scoreDocs) {
scoreDoc.shardIndex = i;
}
i++;
}
result = LuceneUtils.mergeTopDocs(null, startN, topN, topDocs, TIE_BREAKER);
}
return result;
}
}

View File

@ -0,0 +1,183 @@
package it.cavallium.dbengine.lucene.collector;
import static it.cavallium.dbengine.lucene.searcher.CurrentPageInfo.TIE_BREAKER;
import it.cavallium.dbengine.lucene.LuceneUtils;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldCollector;
import org.apache.lucene.search.TopFieldDocs;
import org.jetbrains.annotations.Nullable;
import reactor.core.scheduler.Schedulers;
public class ScoringShardsCollectorMultiManager implements CollectorMultiManager<TopDocs, TopDocs> {
private static final boolean USE_CLASSIC_REDUCE = false;
private final Query query;
@Nullable
private final Sort sort;
private final int numHits;
private final FieldDoc after;
private final int totalHitsThreshold;
private final @Nullable Integer startN;
private final @Nullable Integer topN;
private final @Nullable Integer internalStartN;
private final @Nullable Integer internalTopN;
private final CollectorManager<TopFieldCollector, TopFieldDocs> sharedCollectorManager;
public ScoringShardsCollectorMultiManager(Query query,
@Nullable final Sort sort,
final int numHits,
final FieldDoc after,
final int totalHitsThreshold,
int startN,
int topN) {
this(query, sort, numHits, after, totalHitsThreshold, (Integer) startN, (Integer) topN);
}
public ScoringShardsCollectorMultiManager(Query query,
@Nullable final Sort sort,
final int numHits,
final FieldDoc after,
final int totalHitsThreshold,
int startN) {
this(query, sort, numHits, after, totalHitsThreshold, (Integer) startN, (Integer) 2147483630);
}
public ScoringShardsCollectorMultiManager(Query query,
@Nullable final Sort sort,
final int numHits,
final FieldDoc after,
final int totalHitsThreshold) {
this(query, sort, numHits, after, totalHitsThreshold, null, null);
}
private ScoringShardsCollectorMultiManager(Query query,
@Nullable final Sort sort,
final int numHits,
final FieldDoc after,
final int totalHitsThreshold,
@Nullable Integer startN,
@Nullable Integer topN) {
this.query = query;
this.sort = sort;
this.numHits = numHits;
this.after = after;
this.totalHitsThreshold = totalHitsThreshold;
this.startN = startN;
if (topN != null && startN != null && (long) topN + (long) startN > 2147483630) {
this.topN = 2147483630 - startN;
} else if (topN != null && topN > 2147483630) {
this.topN = 2147483630;
} else {
this.topN = topN;
}
if (this.topN != null && this.startN != null) {
if (this.topN >= 2147483630) {
this.internalTopN = this.topN;
} else {
this.internalTopN = this.startN + this.topN;
}
} else if (this.topN == null && this.startN != null) {
this.internalTopN = null;
} else {
this.internalTopN = this.topN;
}
if (this.internalTopN != null) {
this.internalStartN = 0;
} else {
this.internalStartN = null;
}
this.sharedCollectorManager = TopFieldCollector.createSharedManager(sort == null ? Sort.RELEVANCE : sort, numHits, after, totalHitsThreshold);
}
public CollectorManager<TopFieldCollector, TopDocs> get(IndexSearcher indexSearcher, int shardIndex) {
return new CollectorManager<>() {
@Override
public TopFieldCollector newCollector() throws IOException {
return sharedCollectorManager.newCollector();
}
@Override
public TopDocs reduce(Collection<TopFieldCollector> collectors) throws IOException {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called reduce in a nonblocking thread");
}
if (USE_CLASSIC_REDUCE) {
final TopFieldDocs[] topDocs = new TopFieldDocs[collectors.size()];
int i = 0;
for (TopFieldCollector collector : collectors) {
topDocs[i++] = collector.topDocs();
}
var result = LuceneUtils.mergeTopDocs(sort, 0, numHits, topDocs);
if (sort != null && sort.needsScores()) {
TopFieldCollector.populateScores(result.scoreDocs, indexSearcher, query);
}
return result;
} else {
TopDocs result;
if (sort != null) {
TopFieldDocs[] topDocs = new TopFieldDocs[collectors.size()];
var i = 0;
for (TopFieldCollector collector : collectors) {
topDocs[i] = collector.topDocs();
// Populate scores of topfieldcollector. By default it doesn't popupate the scores
if (topDocs[i].scoreDocs.length > 0 && Float.isNaN(topDocs[i].scoreDocs[0].score) && sort.needsScores()) {
TopFieldCollector.populateScores(topDocs[i].scoreDocs, indexSearcher, query);
}
for (ScoreDoc scoreDoc : topDocs[i].scoreDocs) {
scoreDoc.shardIndex = shardIndex;
}
i++;
}
result = LuceneUtils.mergeTopDocs(sort, internalStartN, internalTopN, topDocs);
} else {
TopDocs[] topDocs = new TopDocs[collectors.size()];
var i = 0;
for (TopFieldCollector collector : collectors) {
topDocs[i] = collector.topDocs();
for (ScoreDoc scoreDoc : topDocs[i].scoreDocs) {
scoreDoc.shardIndex = shardIndex;
}
i++;
}
result = LuceneUtils.mergeTopDocs(null, internalStartN, internalTopN, topDocs);
}
return result;
}
}
};
}
@Override
public ScoreMode scoreMode() {
throw new NotImplementedException();
}
@SuppressWarnings({"SuspiciousToArrayCall", "IfStatementWithIdenticalBranches"})
@Override
public TopDocs reduce(List<TopDocs> topDocs) {
TopDocs[] arr;
if (sort != null) {
arr = topDocs.toArray(TopFieldDocs[]::new);
} else {
arr = topDocs.toArray(TopDocs[]::new);
}
return LuceneUtils.mergeTopDocs(sort, startN, topN, arr);
}
}

View File

@ -10,15 +10,14 @@ import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.collector.OptimizedTopDocsCollector;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopDocsCollector;
import org.apache.lucene.search.TopFieldCollector;
@ -185,7 +184,7 @@ public class PagedLocalSearcher implements LocalSearcher {
} else if (s.pageIndex() == 0 || (s.last() != null && s.remainingLimit() > 0)) {
TopDocs pageTopDocs;
try {
TopDocsCollector<ScoreDoc> collector = TopDocsCollectorUtils.getTopDocsCollector(queryParams.sort(),
TopDocsCollector<ScoreDoc> collector = OptimizedTopDocsCollector.create(queryParams.sort(),
currentPageLimit, s.last(), queryParams.getTotalHitsThresholdInt(),
allowPagination, queryParams.needsScores());
assert queryParams.complete() == collector.scoreMode().isExhaustive();

View File

@ -8,7 +8,7 @@ import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.PageLimits;
import it.cavallium.dbengine.lucene.collector.ScoringShardsCollectorManager;
import it.cavallium.dbengine.lucene.collector.ScoringShardsCollectorMultiManager;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
import java.util.Arrays;
import java.util.List;
@ -176,32 +176,29 @@ public class ScoredPagedMultiSearcher implements MultiSearcher {
var pageLimit = pageLimits.getPageLimit(s.pageIndex());
var after = (FieldDoc) s.last();
var totalHitsThreshold = queryParams.getTotalHitsThresholdInt();
return new ScoringShardsCollectorManager(query, sort, pageLimit, after, totalHitsThreshold,
return new ScoringShardsCollectorMultiManager(query, sort, pageLimit, after, totalHitsThreshold,
resultsOffset, pageLimit);
} else {
return null;
}
})
.flatMap(sharedManager -> Flux
.flatMap(cmm -> Flux
.fromIterable(indexSearchers)
.flatMap(shard -> Mono.fromCallable(() -> {
.index()
.flatMap(shardWithIndex -> Mono.fromCallable(() -> {
LLUtils.ensureBlocking();
var collector = sharedManager.newCollector();
assert queryParams.complete() == collector.scoreMode().isExhaustive();
assert pageLimits.getPageLimit(s.pageIndex()) < Integer.MAX_VALUE || queryParams
.getScoreModeOptional()
.map(scoreMode -> scoreMode == collector.scoreMode())
.orElse(true);
var index = (int) (long) shardWithIndex.getT1();
var shard = shardWithIndex.getT2();
shard.search(queryParams.query(), collector);
return collector;
var cm = cmm.get(shard, index);
return shard.search(queryParams.query(), cm);
}))
.collectList()
.flatMap(collectors -> Mono.fromCallable(() -> {
.flatMap(results -> Mono.fromCallable(() -> {
LLUtils.ensureBlocking();
sharedManager.setIndexSearchers(indexSearchers);
var pageTopDocs = sharedManager.reduce(collectors);
var pageTopDocs = cmm.reduce(results);
var pageLastDoc = LuceneUtils.getLastScoreDoc(pageTopDocs.scoreDocs);
long nextRemainingLimit;

View File

@ -38,8 +38,8 @@ public class UnsortedUnscoredSimpleMultiSearcher implements MultiSearcher {
if (transformer == LLSearchTransformer.NO_TRANSFORMATION) {
queryParamsMono = Mono.just(queryParams);
} else {
queryParamsMono = transformer.transform(Mono
.fromSupplier(() -> new TransformerInput(indexSearchers, queryParams)));
var transformerInput = Mono.just(new TransformerInput(indexSearchers, queryParams));
queryParamsMono = transformer.transform(transformerInput);
}
return queryParamsMono.flatMap(queryParams2 -> {