From 5f3c8a251546c0637d7321ebed6616b61b3ecad2 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 8 Jul 2021 18:54:53 +0200 Subject: [PATCH] Avoid calling reduce() multiple times --- .../dbengine/lucene/LuceneUtils.java | 63 +++++++++--- .../searcher/ScoredLuceneMultiSearcher.java | 7 +- .../ScoredSimpleLuceneShardSearcher.java | 84 +++++----------- .../ScoringShardsCollectorManager.java | 99 +++++++++++++++++++ .../searcher/SimpleLuceneLocalSearcher.java | 5 +- .../lucene/searcher/TopDocsSearcher.java | 9 +- .../searcher/UnscoredCollectorManager.java | 10 +- .../searcher/UnscoredLuceneShardSearcher.java | 60 +++-------- 8 files changed, 201 insertions(+), 136 deletions(-) create mode 100644 src/main/java/it/cavallium/dbengine/lucene/searcher/ScoringShardsCollectorManager.java diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java index e79120f..7dd0fd4 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java @@ -22,6 +22,7 @@ import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; import java.util.Map; @@ -365,32 +366,62 @@ public class LuceneUtils { * Transform a flux of results to take elements while the minimum competitive score is valid */ public static Flux filterTopDoc(Flux flux, LocalQueryParams queryParams) { - return flux; - /* - if (queryParams.sort() != null && queryParams.sort().needsScores() && queryParams.minCompetitiveScore() != null) { - return flux.takeWhile(entry -> LuceneUtils.filterTopDoc(entry.score(), queryParams.minCompetitiveScore())); + if (queryParams.scoreMode().needsScores() && queryParams.minCompetitiveScore() != null) { + if (queryParams.sort() != null && queryParams.sort().needsScores()) { + return flux.takeWhile(entry -> LuceneUtils.filterTopDoc(entry.score(), queryParams.minCompetitiveScore())); + } else { + return flux.filter(entry -> LuceneUtils.filterTopDoc(entry.score(), queryParams.minCompetitiveScore())); + } } else { return flux; - }*/ + } } - public static TopDocs mergeTopDocs(Sort sort, int startN, int topN, TopDocs[] topDocs, Comparator tieBreaker) { + public static TopDocs mergeTopDocs(Sort sort, @Nullable Integer startN, @Nullable Integer topN, TopDocs[] topDocs, Comparator tieBreaker) { + if ((startN == null) != (topN == null)) { + throw new IllegalArgumentException("You must pass startN and topN together or nothing"); + } TopDocs result; if (sort != null) { if (!(topDocs instanceof TopFieldDocs[])) { throw new IllegalStateException("Expected TopFieldDocs[], got TopDocs[]"); } - result = TopDocs.merge(sort, startN, - topN, - (TopFieldDocs[]) topDocs, - tieBreaker - ); + if (startN == null) { + int defaultTopN = 0; + for (TopDocs td : topDocs) { + int length = td.scoreDocs.length; + defaultTopN += length; + } + result = TopDocs.merge(sort, 0, defaultTopN, + (TopFieldDocs[]) topDocs, + tieBreaker + ); + } else { + result = TopDocs.merge(sort, startN, + topN, + (TopFieldDocs[]) topDocs, + tieBreaker + ); + } } else { - result = TopDocs.merge(startN, - topN, - topDocs, - tieBreaker - ); + if (startN == null) { + int defaultTopN = 0; + for (TopDocs td : topDocs) { + int length = td.scoreDocs.length; + defaultTopN += length; + } + result = TopDocs.merge(0, + defaultTopN, + topDocs, + tieBreaker + ); + } else { + result = TopDocs.merge(startN, + topN, + topDocs, + tieBreaker + ); + } } return result; } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredLuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredLuceneMultiSearcher.java index ed789f0..3e14402 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredLuceneMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredLuceneMultiSearcher.java @@ -6,6 +6,7 @@ import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SE import it.cavallium.dbengine.lucene.LuceneUtils; import org.apache.lucene.search.CollectorManager; import org.apache.lucene.search.Sort; +import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopFieldCollector; import org.apache.lucene.search.TopFieldDocs; import reactor.core.publisher.Mono; @@ -26,8 +27,10 @@ public class ScoredLuceneMultiSearcher implements LuceneMultiSearcher { } else { paginationInfo = new PaginationInfo(queryParams.limit(), queryParams.offset(), FIRST_PAGE_LIMIT, false); } - CollectorManager sharedManager = TopFieldCollector - .createSharedManager(luceneSort, LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset() + paginationInfo.firstPageLimit()), null, 1000); + CollectorManager sharedManager = new ScoringShardsCollectorManager(luceneSort, + LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset() + paginationInfo.firstPageLimit()), + null, 1000, LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset()), + LuceneUtils.safeLongToInt(paginationInfo.firstPageLimit())); return new ScoredSimpleLuceneShardSearcher(sharedManager, queryParams.query(), paginationInfo); }); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java index ed51e25..ea24923 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java @@ -28,13 +28,13 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher { private final Object lock = new Object(); private final List indexSearchersArray = new ArrayList<>(); private final List collectors = new ArrayList<>(); - private final CollectorManager sharedManager; + private final CollectorManager firstPageSharedManager; private final Query luceneQuery; private final PaginationInfo paginationInfo; - public ScoredSimpleLuceneShardSearcher(CollectorManager sharedManager, + public ScoredSimpleLuceneShardSearcher(CollectorManager firstPageSharedManager, Query luceneQuery, PaginationInfo paginationInfo) { - this.sharedManager = sharedManager; + this.firstPageSharedManager = firstPageSharedManager; this.luceneQuery = luceneQuery; this.paginationInfo = paginationInfo; } @@ -45,7 +45,7 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher { TopFieldCollector collector; synchronized (lock) { //noinspection BlockingMethodInNonBlockingContext - collector = sharedManager.newCollector(); + collector = firstPageSharedManager.newCollector(); indexSearchersArray.add(indexSearcher); collectors.add(collector); } @@ -65,42 +65,9 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher { return Mono .fromCallable(() -> { TopDocs result; - if (queryParams.isSorted()) { - TopFieldDocs[] topDocs; - synchronized (lock) { - topDocs = new TopFieldDocs[collectors.size()]; - var i = 0; - for (TopFieldCollector collector : collectors) { - topDocs[i] = collector.topDocs(); - for (ScoreDoc scoreDoc : topDocs[i].scoreDocs) { - scoreDoc.shardIndex = i; - } - i++; - } - } - result = LuceneUtils.mergeTopDocs(queryParams.sort(), LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset()), - LuceneUtils.safeLongToInt(paginationInfo.firstPageLimit()), - topDocs, - TIE_BREAKER - ); - } else { - TopDocs[] topDocs; - synchronized (lock) { - topDocs = new TopDocs[collectors.size()]; - var i = 0; - for (TopFieldCollector collector : collectors) { - topDocs[i] = collector.topDocs(); - for (ScoreDoc scoreDoc : topDocs[i].scoreDocs) { - scoreDoc.shardIndex = i; - } - i++; - } - } - result = TopDocs.merge(LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset()), - LuceneUtils.safeLongToInt(paginationInfo.firstPageLimit()), - topDocs, - TIE_BREAKER - ); + synchronized (lock) { + //noinspection BlockingMethodInNonBlockingContext + result = firstPageSharedManager.reduce(collectors); } IndexSearchers indexSearchers; synchronized (lock) { @@ -120,13 +87,13 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher { paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1), (s, sink) -> { if (s.last() != null && s.remainingLimit() > 0) { - CollectorManager sharedManager; Sort luceneSort = queryParams.sort(); if (luceneSort == null) { luceneSort = Sort.RELEVANCE; } - sharedManager = TopFieldCollector.createSharedManager(luceneSort, s.currentPageLimit(), - (FieldDoc) s.last(), 1000); + CollectorManager sharedManager + = new ScoringShardsCollectorManager(luceneSort, s.currentPageLimit(), + (FieldDoc) s.last(), 1000, 0, s.currentPageLimit()); //noinspection BlockingMethodInNonBlockingContext TopDocs pageTopDocs = Flux .fromIterable(indexSearchersArray) @@ -136,28 +103,17 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher { long shardIndex = tuple.getT1(); IndexSearcher indexSearcher = tuple.getT2(); //noinspection BlockingMethodInNonBlockingContext - var results = indexSearcher.search(luceneQuery, sharedManager); - for (ScoreDoc scoreDoc : results.scoreDocs) { - scoreDoc.shardIndex = LuceneUtils.safeLongToInt(shardIndex); - } - return results; + TopFieldCollector collector = sharedManager.newCollector(); + //noinspection BlockingMethodInNonBlockingContext + indexSearcher.search(luceneQuery, collector); + return collector; }) .subscribeOn(scheduler) ) .collect(Collectors.toCollection(ObjectArrayList::new)) - .map(topFieldDocs -> topFieldDocs.toArray(TopFieldDocs[]::new)) - .flatMap(topFieldDocs -> Mono.fromCallable(() -> { - if (queryParams.isSorted()) { - return LuceneUtils.mergeTopDocs(queryParams.sort(), 0, s.currentPageLimit(), - topFieldDocs, - TIE_BREAKER - ); - } else { - return TopDocs.merge(0, s.currentPageLimit(), - topFieldDocs, - TIE_BREAKER - ); - } + .flatMap(collectors -> Mono.fromCallable(() -> { + //noinspection BlockingMethodInNonBlockingContext + return sharedManager.reduce(collectors); }).subscribeOn(scheduler)) .subscribeOn(Schedulers.immediate()) .blockOptional().orElseThrow(); @@ -177,7 +133,11 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher { ); }); - return new LuceneSearchResult(result.totalHits.value, firstPageHits.concatWith(nextHits)); + return new LuceneSearchResult(result.totalHits.value, + firstPageHits + .concatWith(nextHits) + .transform(flux -> LuceneUtils.filterTopDoc(flux, queryParams)) + ); }) .subscribeOn(scheduler); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoringShardsCollectorManager.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoringShardsCollectorManager.java new file mode 100644 index 0000000..76adb57 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoringShardsCollectorManager.java @@ -0,0 +1,99 @@ +package it.cavallium.dbengine.lucene.searcher; + +import static it.cavallium.dbengine.lucene.searcher.CurrentPageInfo.TIE_BREAKER; + +import it.cavallium.dbengine.lucene.LuceneUtils; +import java.io.IOException; +import java.util.Collection; +import org.apache.lucene.search.CollectorManager; +import org.apache.lucene.search.FieldDoc; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TopFieldCollector; +import org.apache.lucene.search.TopFieldDocs; +import org.jetbrains.annotations.Nullable; + +public class ScoringShardsCollectorManager implements CollectorManager { + + private final Sort sort; + private final int numHits; + private final FieldDoc after; + private final int totalHitsThreshold; + private final @Nullable Integer startN; + private final @Nullable Integer topN; + private final CollectorManager sharedCollectorManager; + + public ScoringShardsCollectorManager(final Sort sort, + final int numHits, + final FieldDoc after, + final int totalHitsThreshold, + int startN, + int topN) { + this(sort, numHits, after, totalHitsThreshold, (Integer) startN, (Integer) topN); + } + + public ScoringShardsCollectorManager(final Sort sort, + final int numHits, + final FieldDoc after, + final int totalHitsThreshold) { + this(sort, numHits, after, totalHitsThreshold, null, null); + } + + private ScoringShardsCollectorManager(final Sort sort, + final int numHits, + final FieldDoc after, + final int totalHitsThreshold, + @Nullable Integer startN, + @Nullable Integer topN) { + this.sort = sort; + this.numHits = numHits; + this.after = after; + this.totalHitsThreshold = totalHitsThreshold; + this.startN = startN; + this.topN = topN; + this.sharedCollectorManager = TopFieldCollector.createSharedManager(sort, numHits, after, totalHitsThreshold); + } + + @Override + public TopFieldCollector newCollector() throws IOException { + return sharedCollectorManager.newCollector(); + } + + @Override + public TopDocs reduce(Collection collectors) throws IOException { + TopDocs result; + if (sort != null) { + TopFieldDocs[] topDocs = new TopFieldDocs[collectors.size()]; + var i = 0; + for (TopFieldCollector collector : collectors) { + topDocs[i] = collector.topDocs(); + for (ScoreDoc scoreDoc : topDocs[i].scoreDocs) { + scoreDoc.shardIndex = i; + } + i++; + } + result = LuceneUtils.mergeTopDocs(sort, startN, + topN, + topDocs, + TIE_BREAKER + ); + } else { + TopDocs[] topDocs = new TopDocs[collectors.size()]; + var i = 0; + for (TopFieldCollector collector : collectors) { + topDocs[i] = collector.topDocs(); + for (ScoreDoc scoreDoc : topDocs[i].scoreDocs) { + scoreDoc.shardIndex = i; + } + i++; + } + result = LuceneUtils.mergeTopDocs(null, startN, + topN, + topDocs, + TIE_BREAKER + ); + } + return result; + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java index 0badb9b..4a71bf0 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java @@ -83,7 +83,10 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { ); }); - return new LuceneSearchResult(firstPageTopDocs.totalHits.value, firstPageMono.concatWith(nextHits)); + return new LuceneSearchResult(firstPageTopDocs.totalHits.value, firstPageMono + .concatWith(nextHits) + .transform(flux -> LuceneUtils.filterTopDoc(flux, queryParams)) + ); }) .subscribeOn(scheduler); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/TopDocsSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/TopDocsSearcher.java index ed0da8b..4906294 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/TopDocsSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/TopDocsSearcher.java @@ -91,11 +91,12 @@ class TopDocsSearcher { return topDocs; } - public static TopDocsCollector getTopDocsCollector(Sort luceneSort, + @SuppressWarnings({"unchecked", "rawtypes"}) + public static TopDocsCollector getTopDocsCollector(Sort luceneSort, int limit, ScoreDoc after, int totalHitsThreshold) { - TopDocsCollector collector; + TopDocsCollector collector; if (luceneSort == null) { if (after == null) { collector = TopScoreDocCollector.create(limit, totalHitsThreshold); @@ -104,9 +105,9 @@ class TopDocsSearcher { } } else { if (after == null) { - collector = TopFieldCollector.create(luceneSort, limit, totalHitsThreshold); + collector = (TopDocsCollector) (TopDocsCollector) TopFieldCollector.create(luceneSort, limit, totalHitsThreshold); } else if (after instanceof FieldDoc afterFieldDoc) { - collector = TopFieldCollector.create(luceneSort, limit, afterFieldDoc, totalHitsThreshold); + collector = (TopDocsCollector) (TopDocsCollector) TopFieldCollector.create(luceneSort, limit, afterFieldDoc, totalHitsThreshold); } else { throw new UnsupportedOperationException("GetTopDocs with \"luceneSort\" != null requires \"after\" to be a FieldDoc"); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredCollectorManager.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredCollectorManager.java index 458409d..50229f9 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredCollectorManager.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredCollectorManager.java @@ -16,14 +16,14 @@ import org.apache.lucene.search.TopFieldDocs; import org.jetbrains.annotations.Nullable; public class UnscoredCollectorManager implements - CollectorManager, TopDocs> { + CollectorManager, TopDocs> { - private final Supplier> collectorSupplier; + private final Supplier> collectorSupplier; private final long offset; private final long limit; private final Sort sort; - public UnscoredCollectorManager(Supplier> collectorSupplier, + public UnscoredCollectorManager(Supplier> collectorSupplier, long offset, long limit, @Nullable Sort sort) { @@ -34,12 +34,12 @@ public class UnscoredCollectorManager implements } @Override - public TopDocsCollector newCollector() throws IOException { + public TopDocsCollector newCollector() throws IOException { return collectorSupplier.get(); } @Override - public TopDocs reduce(Collection> collection) throws IOException { + public TopDocs reduce(Collection> collection) throws IOException { int i = 0; TopDocs[] topDocsArray; if (sort != null) { diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredLuceneShardSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredLuceneShardSearcher.java index 4bd0e08..ea440ae 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredLuceneShardSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredLuceneShardSearcher.java @@ -26,12 +26,12 @@ class UnscoredLuceneShardSearcher implements LuceneShardSearcher { private final Object lock = new Object(); private final List indexSearchersArray = new ArrayList<>(); - private final List> collectors = new ArrayList<>(); - private final CollectorManager, ? extends TopDocs> firstPageUnsortedCollectorManager; + private final List> collectors = new ArrayList<>(); + private final CollectorManager, TopDocs> firstPageUnsortedCollectorManager; private final Query luceneQuery; private final PaginationInfo paginationInfo; - public UnscoredLuceneShardSearcher(CollectorManager, ? extends TopDocs> unsortedCollectorManager, + public UnscoredLuceneShardSearcher(CollectorManager, TopDocs> unsortedCollectorManager, Query luceneQuery, PaginationInfo paginationInfo) { this.firstPageUnsortedCollectorManager = unsortedCollectorManager; @@ -42,7 +42,7 @@ class UnscoredLuceneShardSearcher implements LuceneShardSearcher { @Override public Mono searchOn(IndexSearcher indexSearcher, LocalQueryParams queryParams, Scheduler scheduler) { return Mono.fromCallable(() -> { - TopDocsCollector collector; + TopDocsCollector collector; synchronized (lock) { //noinspection BlockingMethodInNonBlockingContext collector = firstPageUnsortedCollectorManager.newCollector(); @@ -59,28 +59,11 @@ class UnscoredLuceneShardSearcher implements LuceneShardSearcher { public Mono collect(LocalQueryParams queryParams, String keyFieldName, Scheduler scheduler) { return Mono .fromCallable(() -> { - TopDocs[] topDocs; + TopDocs result; synchronized (lock) { - if (queryParams.isSorted()) { - topDocs = new TopFieldDocs[collectors.size()]; - } else { - topDocs = new TopDocs[collectors.size()]; - } - var i = 0; - for (TopDocsCollector collector : collectors) { - topDocs[i] = collector.topDocs(); - for (ScoreDoc scoreDoc : topDocs[i].scoreDocs) { - scoreDoc.shardIndex = i; - } - i++; - } + //noinspection BlockingMethodInNonBlockingContext + result = firstPageUnsortedCollectorManager.reduce(collectors); } - TopDocs result = LuceneUtils.mergeTopDocs(queryParams.sort(), - LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset()), - LuceneUtils.safeLongToInt(paginationInfo.firstPageLimit()), - topDocs, - TIE_BREAKER - ); IndexSearchers indexSearchers; synchronized (lock) { indexSearchers = IndexSearchers.of(indexSearchersArray); @@ -106,34 +89,19 @@ class UnscoredLuceneShardSearcher implements LuceneShardSearcher { //noinspection BlockingMethodInNonBlockingContext TopDocs pageTopDocs = Flux .fromIterable(indexSearchersArray) - .index() - .flatMapSequential(tuple -> Mono + .flatMapSequential(indexSearcher -> Mono .fromCallable(() -> { - long shardIndex = tuple.getT1(); - IndexSearcher indexSearcher = tuple.getT2(); //noinspection BlockingMethodInNonBlockingContext - var results = indexSearcher.search(luceneQuery, currentPageUnsortedCollectorManager); - for (ScoreDoc scoreDoc : results.scoreDocs) { - scoreDoc.shardIndex = LuceneUtils.safeLongToInt(shardIndex); - } - return results; + var collector = currentPageUnsortedCollectorManager.newCollector(); + //noinspection BlockingMethodInNonBlockingContext + indexSearcher.search(luceneQuery, collector); + return collector; }) .subscribeOn(scheduler) ) .collect(Collectors.toCollection(ObjectArrayList::new)) - .map(topFieldDocs -> { - if (queryParams.isSorted()) { - @SuppressWarnings("SuspiciousToArrayCall") - TopFieldDocs[] topFieldDocsArray = topFieldDocs.toArray(TopFieldDocs[]::new); - return topFieldDocsArray; - } else { - return topFieldDocs.toArray(TopDocs[]::new); - } - }) - .flatMap(topFieldDocs -> Mono - .fromCallable(() -> LuceneUtils - .mergeTopDocs(queryParams.sort(), 0, s.currentPageLimit(), topFieldDocs, TIE_BREAKER) - ) + .flatMap(collectors -> Mono + .fromCallable(() -> currentPageUnsortedCollectorManager.reduce(collectors)) .subscribeOn(scheduler) ) .blockOptional().orElseThrow();