diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java index 1832944..b13b6a9 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java @@ -5,7 +5,6 @@ import static it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite.NO_REWRIT import io.net5.buffer.api.Send; import io.net5.buffer.api.internal.ResourceSupport; -import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.LLIndexSearcher; import it.cavallium.dbengine.database.disk.LLIndexSearchers; import it.cavallium.dbengine.database.disk.LLTempLMDBEnv; @@ -16,7 +15,7 @@ import reactor.core.scheduler.Schedulers; public class AdaptiveLocalSearcher implements LocalSearcher { - private static final OfficialSearcher officialSearcher = new OfficialSearcher(); + private static final StandardSearcher standardSearcher = new StandardSearcher(); private static final LocalSearcher scoredPaged = new PagedLocalSearcher(); @@ -83,6 +82,8 @@ public class AdaptiveLocalSearcher implements LocalSearcher { if (queryParams.limitLong() == 0) { return countSearcher.collect(indexSearcher, queryParams, keyFieldName, transformer); + } else if (realLimit <= maxInMemoryResultEntries) { + return standardSearcher.collect(indexSearcher, queryParams, keyFieldName, transformer); } else if (queryParams.isSorted()) { if (realLimit <= maxAllowedInMemoryLimit) { return scoredPaged.collect(indexSearcher, queryParams, keyFieldName, transformer); diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java index e0cba3b..35de5ca 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java @@ -14,7 +14,7 @@ import reactor.core.scheduler.Schedulers; public class AdaptiveMultiSearcher implements MultiSearcher { - private static final OfficialSearcher officialSearcher = new OfficialSearcher(); + private static final StandardSearcher standardSearcher = new StandardSearcher(); private static final MultiSearcher count = new CountMultiSearcher(); @@ -73,6 +73,8 @@ public class AdaptiveMultiSearcher implements MultiSearcher { return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> { if (queryParams.limitLong() == 0) { return count.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); + } else if (realLimit <= maxInMemoryResultEntries) { + return standardSearcher.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); } else if (queryParams.isSorted()) { if (realLimit <= maxAllowedInMemoryLimit) { return scoredPaged.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/OfficialSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/StandardSearcher.java similarity index 61% rename from src/main/java/it/cavallium/dbengine/lucene/searcher/OfficialSearcher.java rename to src/main/java/it/cavallium/dbengine/lucene/searcher/StandardSearcher.java index c460ec4..3c676f2 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/OfficialSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/StandardSearcher.java @@ -1,6 +1,7 @@ package it.cavallium.dbengine.lucene.searcher; import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler; +import static java.util.Objects.requireNonNull; import io.net5.buffer.api.Send; import it.cavallium.dbengine.database.LLKeyScore; @@ -12,19 +13,22 @@ import java.util.List; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TopDocsCollector; import org.apache.lucene.search.TopFieldCollector; +import org.apache.lucene.search.TopFieldDocs; import org.apache.lucene.search.TopScoreDocCollector; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; -public class OfficialSearcher implements MultiSearcher { +public class StandardSearcher implements MultiSearcher { - protected static final Logger logger = LogManager.getLogger(OfficialSearcher.class); + protected static final Logger logger = LogManager.getLogger(StandardSearcher.class); - public OfficialSearcher() { + public StandardSearcher() { } @Override @@ -74,25 +78,51 @@ public class OfficialSearcher implements MultiSearcher { } else { return TopScoreDocCollector.createSharedManager(queryParams.limitInt(), null, totalHitsThreshold); } - }) - .flatMap(sharedManager -> Flux - .fromIterable(indexSearchers) - .flatMap(shard -> Mono.fromCallable(() -> { - LLUtils.ensureBlocking(); + }).flatMap(sharedManager -> Flux.fromIterable(indexSearchers).>handle((shard, sink) -> { + LLUtils.ensureBlocking(); + try { + var collector = sharedManager.newCollector(); + assert queryParams.computePreciseHitsCount() == null || (queryParams.computePreciseHitsCount() == collector + .scoreMode() + .isExhaustive()); - var collector = sharedManager.newCollector(); - assert queryParams.computePreciseHitsCount() == null - || (queryParams.computePreciseHitsCount() == collector.scoreMode().isExhaustive()); - - shard.search(queryParams.query(), LuceneUtils.withTimeout(collector, queryParams.timeout())); - return collector; - })) - .collectList() - .flatMap(collectors -> Mono.fromCallable(() -> { - LLUtils.ensureBlocking(); - return sharedManager.reduce((List) collectors); - })) - ); + shard.search(queryParams.query(), LuceneUtils.withTimeout(collector, queryParams.timeout())); + sink.next(collector); + } catch (IOException e) { + sink.error(e); + } + }).collectList().handle((collectors, sink) -> { + LLUtils.ensureBlocking(); + try { + if (collectors.size() <= 1) { + sink.next(sharedManager.reduce((List) collectors)); + } else if (queryParams.isSorted() && !queryParams.isSortedByScore()) { + final TopFieldDocs[] topDocs = new TopFieldDocs[collectors.size()]; + int i = 0; + for (var collector : collectors) { + var topFieldDocs = ((TopFieldCollector) collector).topDocs(); + for (ScoreDoc scoreDoc : topFieldDocs.scoreDocs) { + scoreDoc.shardIndex = i; + } + topDocs[i++] = topFieldDocs; + } + sink.next(TopDocs.merge(requireNonNull(queryParams.sort()), 0, queryParams.limitInt(), topDocs)); + } else { + final TopDocs[] topDocs = new TopDocs[collectors.size()]; + int i = 0; + for (var collector : collectors) { + var topScoreDocs = collector.topDocs(); + for (ScoreDoc scoreDoc : topScoreDocs.scoreDocs) { + scoreDoc.shardIndex = i; + } + topDocs[i++] = topScoreDocs; + } + sink.next(TopDocs.merge(0, queryParams.limitInt(), topDocs)); + } + } catch (IOException ex) { + sink.error(ex); + } + })); } /** @@ -117,6 +147,6 @@ public class OfficialSearcher implements MultiSearcher { @Override public String getName() { - return "official"; + return "standard"; } } diff --git a/src/test/java/it/cavallium/dbengine/TestLuceneSearches.java b/src/test/java/it/cavallium/dbengine/TestLuceneSearches.java index 06bad01..f5c1a95 100644 --- a/src/test/java/it/cavallium/dbengine/TestLuceneSearches.java +++ b/src/test/java/it/cavallium/dbengine/TestLuceneSearches.java @@ -35,7 +35,7 @@ import it.cavallium.dbengine.lucene.searcher.AdaptiveMultiSearcher; import it.cavallium.dbengine.lucene.searcher.CountMultiSearcher; import it.cavallium.dbengine.lucene.searcher.LocalSearcher; import it.cavallium.dbengine.lucene.searcher.MultiSearcher; -import it.cavallium.dbengine.lucene.searcher.OfficialSearcher; +import it.cavallium.dbengine.lucene.searcher.StandardSearcher; import it.cavallium.dbengine.lucene.searcher.ScoredPagedMultiSearcher; import it.cavallium.dbengine.lucene.searcher.PagedLocalSearcher; import it.cavallium.dbengine.lucene.searcher.SortedScoredFullMultiSearcher; @@ -278,8 +278,8 @@ public class TestLuceneSearches { Assertions.assertTrue(keys.size() >= hits.value()); } - var officialSearcher = new OfficialSearcher(); - luceneIndex = getLuceneIndex(expectedQueryType.shard(), officialSearcher); + var standardSearcher = new StandardSearcher(); + luceneIndex = getLuceneIndex(expectedQueryType.shard(), standardSearcher); var officialQuery = queryParamsBuilder.limit(ELEMENTS.size() * 2L).build(); try (var officialResults = run(luceneIndex.search(officialQuery))) { var officialHits = officialResults.totalHitsCount();