From 7cebcd7e92c9df2c841ab6aff2729425306ce064 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Tue, 6 Jul 2021 01:52:12 +0200 Subject: [PATCH] Sort shards during merge --- .../FieldSimpleLuceneShardSearcher.java | 68 +++++++++++++------ .../SharedSortedLuceneMultiSearcher.java | 2 +- .../searcher/SimpleLuceneLocalSearcher.java | 4 +- .../searcher/UnscoredLuceneMultiSearcher.java | 2 +- .../searcher/UnscoredLuceneShardSearcher.java | 2 +- 5 files changed, 54 insertions(+), 24 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/FieldSimpleLuceneShardSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/FieldSimpleLuceneShardSearcher.java index 4fb9eb8..26fbd85 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/FieldSimpleLuceneShardSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/FieldSimpleLuceneShardSearcher.java @@ -62,23 +62,44 @@ class FieldSimpleLuceneShardSearcher implements LuceneShardSearcher { public Mono collect(LocalQueryParams queryParams, String keyFieldName, Scheduler scheduler) { return Mono .fromCallable(() -> { - TopDocs[] topDocs; - synchronized (lock) { - topDocs = new TopDocs[collectors.size()]; - var i = 0; - for (TopFieldCollector collector : collectors) { - topDocs[i] = collector.topDocs(); - for (ScoreDoc scoreDoc : topDocs[i].scoreDocs) { - scoreDoc.shardIndex = i; + TopDocs result; + if (queryParams.sort() != null) { + TopFieldDocs[] topDocs; + synchronized (lock) { + topDocs = new TopFieldDocs[collectors.size()]; + var i = 0; + for (TopFieldCollector collector : collectors) { + topDocs[i] = collector.topDocs(); + for (ScoreDoc scoreDoc : topDocs[i].scoreDocs) { + scoreDoc.shardIndex = i; + } + i++; } - i++; } + result = TopDocs.merge(queryParams.sort(), LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset()), + LuceneUtils.safeLongToInt(paginationInfo.firstPageLimit()), + topDocs, + TIE_BREAKER + ); + } else { + TopDocs[] topDocs; + synchronized (lock) { + topDocs = new TopDocs[collectors.size()]; + var i = 0; + for (TopFieldCollector collector : collectors) { + topDocs[i] = collector.topDocs(); + for (ScoreDoc scoreDoc : topDocs[i].scoreDocs) { + scoreDoc.shardIndex = i; + } + i++; + } + } + result = TopDocs.merge(LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset()), + LuceneUtils.safeLongToInt(paginationInfo.firstPageLimit()), + topDocs, + TIE_BREAKER + ); } - var result = TopDocs.merge(LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset()), - LuceneUtils.safeLongToInt(paginationInfo.firstPageLimit()), - topDocs, - TIE_BREAKER - ); IndexSearchers indexSearchers; synchronized (lock) { indexSearchers = IndexSearchers.of(indexSearchersArray); @@ -87,7 +108,7 @@ class FieldSimpleLuceneShardSearcher implements LuceneShardSearcher { .convertHits(result.scoreDocs, indexSearchers, keyFieldName, scheduler); Flux nextHits = Flux.defer(() -> { - if (paginationInfo.totalLimit() - paginationInfo.firstPageLimit() <= 0) { + if (paginationInfo.forceSinglePage() || paginationInfo.totalLimit() - paginationInfo.firstPageLimit() <= 0) { return Flux.empty(); } return Flux @@ -120,10 +141,19 @@ class FieldSimpleLuceneShardSearcher implements LuceneShardSearcher { ) .collect(Collectors.toCollection(ObjectArrayList::new)) .map(topFieldDocs -> topFieldDocs.toArray(TopFieldDocs[]::new)) - .map(topFieldDocs -> TopDocs.merge(0, s.currentPageLimit(), - topFieldDocs, - TIE_BREAKER - )) + .map(topFieldDocs -> { + if (queryParams.sort() != null) { + return TopDocs.merge(queryParams.sort(), 0, s.currentPageLimit(), + topFieldDocs, + TIE_BREAKER + ); + } else { + return TopDocs.merge(0, s.currentPageLimit(), + topFieldDocs, + TIE_BREAKER + ); + } + }) .blockOptional().orElseThrow(); var pageLastDoc = LuceneUtils.getLastFieldDoc(pageTopDocs.scoreDocs); sink.next(pageTopDocs); diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SharedSortedLuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SharedSortedLuceneMultiSearcher.java index 8b45593..4fe4134 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/SharedSortedLuceneMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SharedSortedLuceneMultiSearcher.java @@ -29,7 +29,7 @@ public class SharedSortedLuceneMultiSearcher implements LuceneMultiSearcher { if (queryParams.limit() <= MAX_SINGLE_SEARCH_LIMIT) { paginationInfo = new PaginationInfo(queryParams.limit(), queryParams.offset(), queryParams.limit(), true); } else { - paginationInfo = new PaginationInfo(queryParams.limit(), queryParams.offset(), FIRST_PAGE_LIMIT, true); + paginationInfo = new PaginationInfo(queryParams.limit(), queryParams.offset(), FIRST_PAGE_LIMIT, false); } CollectorManager sharedManager = TopFieldCollector .createSharedManager(luceneSort, LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset() + paginationInfo.firstPageLimit()), null, 1000); diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java index 1bdf305..a64f8c1 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java @@ -42,7 +42,7 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { if (queryParams.limit() <= MAX_SINGLE_SEARCH_LIMIT) { paginationInfo = new PaginationInfo(queryParams.limit(), queryParams.offset(), queryParams.limit(), true); } else { - paginationInfo = new PaginationInfo(queryParams.limit(), queryParams.offset(), FIRST_PAGE_LIMIT, true); + paginationInfo = new PaginationInfo(queryParams.limit(), queryParams.offset(), FIRST_PAGE_LIMIT, false); } //noinspection BlockingMethodInNonBlockingContext TopDocs firstPageTopDocs = TopDocsSearcher.getTopDocs(indexSearcher, @@ -64,7 +64,7 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher { Flux nextHits = Flux.defer(() -> { - if (paginationInfo.totalLimit() - paginationInfo.firstPageLimit() <= 0) { + if (paginationInfo.forceSinglePage() || paginationInfo.totalLimit() - paginationInfo.firstPageLimit() <= 0) { return Flux.empty(); } return Flux diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredLuceneMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredLuceneMultiSearcher.java index 0066a75..baae9c7 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredLuceneMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredLuceneMultiSearcher.java @@ -28,7 +28,7 @@ public class UnscoredLuceneMultiSearcher implements LuceneMultiSearcher { if (queryParams.limit() <= MAX_SINGLE_SEARCH_LIMIT) { paginationInfo = new PaginationInfo(queryParams.limit(), queryParams.offset(), queryParams.limit(), true); } else { - paginationInfo = new PaginationInfo(queryParams.limit(), queryParams.offset(), FIRST_PAGE_LIMIT, true); + paginationInfo = new PaginationInfo(queryParams.limit(), queryParams.offset(), FIRST_PAGE_LIMIT, false); } UnsortedCollectorManager unsortedCollectorManager = new UnsortedCollectorManager(() -> TopDocsSearcher.getTopDocsCollector(null, LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset() + paginationInfo.firstPageLimit()), diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredLuceneShardSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredLuceneShardSearcher.java index 19be847..2cc5647 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredLuceneShardSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnscoredLuceneShardSearcher.java @@ -84,7 +84,7 @@ class UnscoredLuceneShardSearcher implements LuceneShardSearcher { .convertHits(result.scoreDocs, indexSearchers, keyFieldName, scheduler); Flux nextHits = Flux.defer(() -> { - if (paginationInfo.totalLimit() - paginationInfo.firstPageLimit() <= 0) { + if (paginationInfo.forceSinglePage() || paginationInfo.totalLimit() - paginationInfo.firstPageLimit() <= 0) { return Flux.empty(); } return Flux