From a1eec93c64738faeeb9666ff72218fd30a0fa4df Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 23 Dec 2021 02:13:51 +0100 Subject: [PATCH] Rewrite unsorted lucene queries --- pom.xml | 2 +- .../disk/OptimisticRocksDBColumn.java | 6 +- .../searcher/AdaptiveLocalSearcher.java | 18 +- .../searcher/AdaptiveMultiSearcher.java | 18 +- .../lucene/searcher/LuceneGenerator.java | 161 ++++++++++++++++++ .../lucene/searcher/LuceneMultiGenerator.java | 55 ++++++ ...va => SortedByScoreFullMultiSearcher.java} | 12 +- .../UnsortedStreamingMultiSearcher.java | 85 +++++++++ ...nsortedUnscoredStreamingMultiSearcher.java | 146 ---------------- .../dbengine/TestLuceneSearches.java | 10 +- 10 files changed, 334 insertions(+), 179 deletions(-) create mode 100644 src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneGenerator.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneMultiGenerator.java rename src/main/java/it/cavallium/dbengine/lucene/searcher/{UnsortedScoredFullMultiSearcher.java => SortedByScoreFullMultiSearcher.java} (91%) create mode 100644 src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedStreamingMultiSearcher.java delete mode 100644 src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java diff --git a/pom.xml b/pom.xml index 1e85653..4522732 100644 --- a/pom.xml +++ b/pom.xml @@ -149,7 +149,7 @@ org.assertj assertj-core - 3.18.0 + 3.21.0 test diff --git a/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java index e0d1b19..cd1f9cc 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java @@ -156,7 +156,7 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn 0) { + Thread.sleep(retryMs); + } } catch (InterruptedException e) { throw new RocksDBException("Interrupted"); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java index 778de27..7245278 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveLocalSearcher.java @@ -18,7 +18,7 @@ public class AdaptiveLocalSearcher implements LocalSearcher { private static final LocalSearcher countSearcher = new CountMultiSearcher(); - private static final MultiSearcher unsortedUnscoredContinuous = new UnsortedUnscoredStreamingMultiSearcher(); + private static final MultiSearcher unsortedUnscoredContinuous = new UnsortedStreamingMultiSearcher(); /** * Use in-memory collectors if the expected results count is lower or equal than this limit @@ -26,13 +26,13 @@ public class AdaptiveLocalSearcher implements LocalSearcher { private final int maxInMemoryResultEntries; @Nullable - private final UnsortedScoredFullMultiSearcher unsortedScoredFull; + private final SortedByScoreFullMultiSearcher sortedByScoreFull; @Nullable private final SortedScoredFullMultiSearcher sortedScoredFull; public AdaptiveLocalSearcher(LLTempLMDBEnv env, boolean useLMDB, int maxInMemoryResultEntries) { - unsortedScoredFull = useLMDB ? new UnsortedScoredFullMultiSearcher(env) : null; + sortedByScoreFull = useLMDB ? new SortedByScoreFullMultiSearcher(env) : null; sortedScoredFull = useLMDB ? new SortedScoredFullMultiSearcher(env) : null; this.maxInMemoryResultEntries = maxInMemoryResultEntries; } @@ -74,16 +74,16 @@ public class AdaptiveLocalSearcher implements LocalSearcher { if (queryParams.limitLong() == 0) { return countSearcher.collect(indexSearcher, queryParams, keyFieldName, transformer); - } else if (queryParams.isSorted() || queryParams.needsScores()) { + } else if (queryParams.isSorted()) { if (realLimit <= maxAllowedInMemoryLimit) { return scoredPaged.collect(indexSearcher, queryParams, keyFieldName, transformer); } else { - if ((queryParams.isSorted() && !queryParams.isSortedByScore())) { + if (queryParams.isSortedByScore()) { if (queryParams.limitLong() < maxInMemoryResultEntries) { throw new UnsupportedOperationException("Allowed limit is " + maxInMemoryResultEntries + " or greater"); } - if (sortedScoredFull != null) { - return sortedScoredFull.collect(indexSearcher, queryParams, keyFieldName, transformer); + if (sortedByScoreFull != null) { + return sortedByScoreFull.collect(indexSearcher, queryParams, keyFieldName, transformer); } else { return scoredPaged.collect(indexSearcher, queryParams, keyFieldName, transformer); } @@ -91,8 +91,8 @@ public class AdaptiveLocalSearcher implements LocalSearcher { if (queryParams.limitLong() < maxInMemoryResultEntries) { throw new UnsupportedOperationException("Allowed limit is " + maxInMemoryResultEntries + " or greater"); } - if (unsortedScoredFull != null) { - return unsortedScoredFull.collect(indexSearcher, queryParams, keyFieldName, transformer); + if (sortedScoredFull != null) { + return sortedScoredFull.collect(indexSearcher, queryParams, keyFieldName, transformer); } else { return scoredPaged.collect(indexSearcher, queryParams, keyFieldName, transformer); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java index b4fe857..b56a064 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveMultiSearcher.java @@ -16,7 +16,7 @@ public class AdaptiveMultiSearcher implements MultiSearcher { private static final MultiSearcher scoredPaged = new ScoredPagedMultiSearcher(); - private static final MultiSearcher unsortedUnscoredContinuous = new UnsortedUnscoredStreamingMultiSearcher(); + private static final MultiSearcher unsortedUnscoredContinuous = new UnsortedStreamingMultiSearcher(); /** * Use in-memory collectors if the expected results count is lower or equal than this limit @@ -24,13 +24,13 @@ public class AdaptiveMultiSearcher implements MultiSearcher { private final int maxInMemoryResultEntries; @Nullable - private final UnsortedScoredFullMultiSearcher unsortedScoredFull; + private final SortedByScoreFullMultiSearcher sortedByScoreFull; @Nullable private final SortedScoredFullMultiSearcher sortedScoredFull; public AdaptiveMultiSearcher(LLTempLMDBEnv env, boolean useLMDB, int maxInMemoryResultEntries) { - unsortedScoredFull = useLMDB ? new UnsortedScoredFullMultiSearcher(env) : null; + sortedByScoreFull = useLMDB ? new SortedByScoreFullMultiSearcher(env) : null; sortedScoredFull = useLMDB ? new SortedScoredFullMultiSearcher(env) : null; this.maxInMemoryResultEntries = maxInMemoryResultEntries; } @@ -64,16 +64,16 @@ public class AdaptiveMultiSearcher implements MultiSearcher { return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> { if (queryParams.limitLong() == 0) { return count.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); - } else if (queryParams.isSorted() || queryParams.needsScores()) { + } else if (queryParams.isSorted()) { if (realLimit <= maxAllowedInMemoryLimit) { return scoredPaged.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); } else { - if ((queryParams.isSorted() && !queryParams.isSortedByScore())) { + if (queryParams.isSortedByScore()) { if (queryParams.limitLong() < maxInMemoryResultEntries) { throw new UnsupportedOperationException("Allowed limit is " + maxInMemoryResultEntries + " or greater"); } - if (sortedScoredFull != null) { - return sortedScoredFull.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); + if (sortedByScoreFull != null) { + return sortedByScoreFull.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); } else { return scoredPaged.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); } @@ -81,8 +81,8 @@ public class AdaptiveMultiSearcher implements MultiSearcher { if (queryParams.limitLong() < maxInMemoryResultEntries) { throw new UnsupportedOperationException("Allowed limit is " + maxInMemoryResultEntries + " or greater"); } - if (unsortedScoredFull != null) { - return unsortedScoredFull.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); + if (sortedScoredFull != null) { + return sortedScoredFull.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); } else { return scoredPaged.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneGenerator.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneGenerator.java new file mode 100644 index 0000000..5f098fe --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneGenerator.java @@ -0,0 +1,161 @@ +package it.cavallium.dbengine.lucene.searcher; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Iterator; +import java.util.List; +import java.util.function.Supplier; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.Weight; +import org.apache.lucene.util.Bits; +import org.jetbrains.annotations.Nullable; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; + +public class LuceneGenerator implements Supplier { + + private final IndexSearcher shard; + private final int shardIndex; + private final Query query; + private final Iterator leavesIterator; + private final boolean computeScores; + private final Float minScore; + + private long remainingOffset; + private long remainingAllowedResults; + private Weight weight; + + private LeafReaderContext leaf; + private DocIdSetIterator docIdSetIterator; + private Scorer scorer; + + LuceneGenerator(IndexSearcher shard, LocalQueryParams localQueryParams, int shardIndex) { + this.shard = shard; + this.shardIndex = shardIndex; + this.query = localQueryParams.query(); + this.remainingOffset = localQueryParams.offsetLong(); + this.remainingAllowedResults = localQueryParams.limitLong(); + this.computeScores = localQueryParams.needsScores() || localQueryParams.minCompetitiveScore() != null; + this.minScore = localQueryParams.minCompetitiveScore(); + List leaves = shard.getTopReaderContext().leaves(); + this.leavesIterator = leaves.iterator(); + } + + public static Flux reactive(IndexSearcher shard, LocalQueryParams localQueryParams, int shardIndex) { + return Flux + .generate(() -> new LuceneGenerator(shard, localQueryParams, shardIndex), + (s, sink) -> { + var val = s.get(); + if (val == null) { + sink.complete(); + } else { + sink.next(val); + } + return s; + } + ) + .subscribeOn(Schedulers.boundedElastic()); + } + + @Override + public ScoreDoc get() { + while (remainingOffset > 0) { + skipNext(); + } + if (remainingAllowedResults == 0) { + return null; + } else { + remainingAllowedResults--; + } + return getNext(); + } + + public void skipNext() { + getNext(); + remainingOffset--; + } + + private Weight createWeight() throws IOException { + ScoreMode scoreMode = computeScores ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES; + return shard.createWeight(shard.rewrite(query), scoreMode, 1f); + } + + public ScoreDoc getNext() { + if (weight == null) { + try { + weight = createWeight(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + try { + return getWeightedNext(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private ScoreDoc getWeightedNext() throws IOException { + while (tryAdvanceDocIdSetIterator()) { + LeafReader reader = leaf.reader(); + Bits liveDocs = reader.getLiveDocs(); + int doc; + while ((doc = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + if (docDeleted(liveDocs, doc) || belowMinScore(scorer)) { + continue; + } + return transformDoc(doc); + } + docIdSetIterator = null; + } + clearState(); + return null; + } + private boolean tryAdvanceDocIdSetIterator() throws IOException { + if (docIdSetIterator != null) { + return true; + } + while (leavesIterator.hasNext()) { + LeafReaderContext leaf = leavesIterator.next(); + Scorer scorer = weight.scorer(leaf); + if (scorer == null) { + continue; + } + this.scorer = scorer; + this.leaf = leaf; + this.docIdSetIterator = scorer.iterator(); + return true; + } + return false; + } + + private ScoreDoc transformDoc(int doc) throws IOException { + return new ScoreDoc(leaf.docBase + doc, scorer.score(), shardIndex); + } + + private static boolean docDeleted(@Nullable Bits liveDocs, int doc) { + if (liveDocs == null) { + return false; + } + return !liveDocs.get(doc); + } + + private boolean belowMinScore(Scorer currentScorer) throws IOException { + return minScore != null && currentScorer.score() < minScore; + } + + private void clearState() { + docIdSetIterator = null; + scorer = null; + leaf = null; + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneMultiGenerator.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneMultiGenerator.java new file mode 100644 index 0000000..8cef8e2 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneMultiGenerator.java @@ -0,0 +1,55 @@ +package it.cavallium.dbengine.lucene.searcher; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.ScoreDoc; + +public class LuceneMultiGenerator implements Supplier { + + private final Iterator> generators; + private Supplier luceneGenerator; + + public LuceneMultiGenerator(List shards, LocalQueryParams localQueryParams) { + this.generators = IntStream + .range(0, shards.size()) + .mapToObj(shardIndex -> { + IndexSearcher shard = shards.get(shardIndex); + return (Supplier) new LuceneGenerator(shard, localQueryParams, shardIndex); + }) + .iterator(); + tryAdvanceGenerator(); + } + + private void tryAdvanceGenerator() { + if (generators.hasNext()) { + luceneGenerator = generators.next(); + } else { + luceneGenerator = null; + } + } + + @Override + public ScoreDoc get() { + if (luceneGenerator == null) { + return null; + } + ScoreDoc item; + do { + item = luceneGenerator.get(); + if (item == null) { + tryAdvanceGenerator(); + if (luceneGenerator == null) { + return null; + } + } + } while (item == null); + return item; + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedScoredFullMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedByScoreFullMultiSearcher.java similarity index 91% rename from src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedScoredFullMultiSearcher.java rename to src/main/java/it/cavallium/dbengine/lucene/searcher/SortedByScoreFullMultiSearcher.java index 7d86447..ff0b016 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedScoredFullMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedByScoreFullMultiSearcher.java @@ -12,8 +12,6 @@ import it.cavallium.dbengine.lucene.LLScoreDoc; import it.cavallium.dbengine.lucene.collector.FullDocsCollector; import it.cavallium.dbengine.lucene.collector.LMDBFullScoreDocCollector; import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput; -import java.io.Closeable; -import java.io.IOException; import java.util.List; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -21,13 +19,13 @@ import org.apache.lucene.search.IndexSearcher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -public class UnsortedScoredFullMultiSearcher implements MultiSearcher { +public class SortedByScoreFullMultiSearcher implements MultiSearcher { - protected static final Logger logger = LogManager.getLogger(UnsortedScoredFullMultiSearcher.class); + protected static final Logger logger = LogManager.getLogger(SortedByScoreFullMultiSearcher.class); private final LLTempLMDBEnv env; - public UnsortedScoredFullMultiSearcher(LLTempLMDBEnv env) { + public SortedByScoreFullMultiSearcher(LLTempLMDBEnv env) { this.env = env; } @@ -46,7 +44,7 @@ public class UnsortedScoredFullMultiSearcher implements MultiSearcher { return queryParamsMono.flatMap(queryParams2 -> { if (queryParams2.isSorted() && !queryParams2.isSortedByScore()) { - throw new IllegalArgumentException(UnsortedScoredFullMultiSearcher.this.getClass().getSimpleName() + throw new IllegalArgumentException(SortedByScoreFullMultiSearcher.this.getClass().getSimpleName() + " doesn't support sorted queries"); } @@ -152,6 +150,6 @@ public class UnsortedScoredFullMultiSearcher implements MultiSearcher { @Override public String getName() { - return "unsorted scored full multi"; + return "sorted by score full multi"; } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedStreamingMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedStreamingMultiSearcher.java new file mode 100644 index 0000000..370848c --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedStreamingMultiSearcher.java @@ -0,0 +1,85 @@ +package it.cavallium.dbengine.lucene.searcher; + +import static java.util.Objects.requireNonNull; + +import io.net5.buffer.api.Send; +import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; +import it.cavallium.dbengine.database.LLKeyScore; +import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.disk.LLIndexSearchers; +import it.cavallium.dbengine.lucene.LuceneUtils; +import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput; +import java.util.List; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.ScoreDoc; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class UnsortedStreamingMultiSearcher implements MultiSearcher { + + @Override + public Mono collectMulti(Mono> indexSearchersMono, + LocalQueryParams queryParams, + String keyFieldName, + LLSearchTransformer transformer) { + + return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> { + Mono queryParamsMono; + if (transformer == LLSearchTransformer.NO_TRANSFORMATION) { + queryParamsMono = Mono.just(queryParams); + } else { + queryParamsMono = transformer.transform(Mono + .fromCallable(() -> new TransformerInput(indexSearchers, queryParams))); + } + + return queryParamsMono.map(queryParams2 -> { + var localQueryParams = getLocalQueryParams(queryParams2); + if (queryParams2.isSorted() && queryParams2.limitLong() > 0) { + throw new UnsupportedOperationException("Sorted queries are not supported" + + " by UnsortedContinuousLuceneMultiSearcher"); + } + var shards = indexSearchers.shards(); + + Flux scoreDocsFlux = getScoreDocs(localQueryParams, shards); + + Flux resultsFlux = LuceneUtils.convertHits(scoreDocsFlux, shards, keyFieldName, false); + + var totalHitsCount = new TotalHitsCount(0, false); + Flux mergedFluxes = resultsFlux + .skip(queryParams2.offsetLong()) + .take(queryParams2.limitLong(), true); + + return new LuceneSearchResult(totalHitsCount, mergedFluxes, indexSearchers::close); + }); + }, false); + } + + private Flux getScoreDocs(LocalQueryParams localQueryParams, List shards) { + return Flux + .fromIterable(shards) + .index() + .flatMap(tuple -> { + var shardIndex = (int) (long) tuple.getT1(); + var shard = tuple.getT2(); + return LuceneGenerator.reactive(shard, localQueryParams, shardIndex); + }); + + } + + private LocalQueryParams getLocalQueryParams(LocalQueryParams queryParams) { + return new LocalQueryParams(queryParams.query(), + 0L, + queryParams.offsetLong() + queryParams.limitLong(), + queryParams.pageLimits(), + queryParams.minCompetitiveScore(), + queryParams.sort(), + queryParams.computePreciseHitsCount(), + queryParams.timeout() + ); + } + + @Override + public String getName() { + return "unsorted streaming multi"; + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java deleted file mode 100644 index bc00dfc..0000000 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java +++ /dev/null @@ -1,146 +0,0 @@ -package it.cavallium.dbengine.lucene.searcher; - -import static it.cavallium.dbengine.lucene.LuceneUtils.withTimeout; -import static java.lang.Math.toIntExact; -import static java.util.Objects.requireNonNull; - -import io.net5.buffer.api.Send; -import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; -import it.cavallium.dbengine.database.LLKeyScore; -import it.cavallium.dbengine.database.LLUtils; -import it.cavallium.dbengine.database.disk.LLIndexSearchers; -import it.cavallium.dbengine.lucene.LuceneUtils; -import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.LockSupport; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.search.CollectionTerminatedException; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.ScoreDoc; -import org.apache.lucene.search.ScoreMode; -import org.apache.lucene.search.SimpleCollector; -import org.warp.commonutils.type.ShortNamedThreadFactory; -import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink.OverflowStrategy; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; - -public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher { - - private static final int SEARCH_THREADS = Math.min(Math.max(8, Runtime.getRuntime().availableProcessors()), 128); - private static final ThreadFactory THREAD_FACTORY = new ShortNamedThreadFactory("UnscoredStreamingSearcher"); - private static final ExecutorService EXECUTOR = Executors.newCachedThreadPool(THREAD_FACTORY); // Executors.newFixedThreadPool(SEARCH_THREADS, THREAD_FACTORY); - - @Override - public Mono collectMulti(Mono> indexSearchersMono, - LocalQueryParams queryParams, - String keyFieldName, - LLSearchTransformer transformer) { - - return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> { - Mono queryParamsMono; - if (transformer == LLSearchTransformer.NO_TRANSFORMATION) { - queryParamsMono = Mono.just(queryParams); - } else { - queryParamsMono = transformer.transform(Mono - .fromCallable(() -> new TransformerInput(indexSearchers, queryParams))); - } - - return queryParamsMono.map(queryParams2 -> { - var localQueryParams = getLocalQueryParams(queryParams2); - if (queryParams2.isSorted() && queryParams2.limitLong() > 0) { - throw new UnsupportedOperationException("Sorted queries are not supported" - + " by UnsortedUnscoredContinuousLuceneMultiSearcher"); - } - if (queryParams2.needsScores() && queryParams2.limitLong() > 0) { - throw new UnsupportedOperationException("Scored queries are not supported" - + " by UnsortedUnscoredContinuousLuceneMultiSearcher"); - } - var shards = indexSearchers.shards(); - - Flux scoreDocsFlux = getScoreDocs(localQueryParams, shards); - - Flux resultsFlux = LuceneUtils.convertHits(scoreDocsFlux, shards, keyFieldName, false); - - var totalHitsCount = new TotalHitsCount(0, false); - Flux mergedFluxes = resultsFlux - .skip(queryParams2.offsetLong()) - .take(queryParams2.limitLong(), true); - - return new LuceneSearchResult(totalHitsCount, mergedFluxes, indexSearchers::close); - }); - }, false); - } - - private Flux getScoreDocs(LocalQueryParams localQueryParams, List shards) { - return Flux - .create(sink -> EXECUTOR.execute(() -> { - try { - LLUtils.ensureBlocking(); - var thread = Thread.currentThread(); - sink.onRequest(lc -> LockSupport.unpark(thread)); - int shardIndexTemp = 0; - for (IndexSearcher shard : shards) { - if (sink.isCancelled()) break; - final int shardIndex = shardIndexTemp; - shard.search(localQueryParams.query(), withTimeout(new SimpleCollector() { - private LeafReaderContext leafReaderContext; - - @Override - protected void doSetNextReader(LeafReaderContext context) { - this.leafReaderContext = context; - } - - @Override - public void collect(int i) { - // Assert that this is a non-blocking context - assert !Schedulers.isInNonBlockingThread(); - var scoreDoc = new ScoreDoc(leafReaderContext.docBase + i, 0, shardIndex); - if (sink.requestedFromDownstream() <= 0 || sink.isCancelled()) { - if (sink.isCancelled()) { - throw new CollectionTerminatedException(); - } else { - // 1000ms - LockSupport.parkNanos(1000000000L); - } - } - sink.next(scoreDoc); - } - - @Override - public ScoreMode scoreMode() { - return ScoreMode.COMPLETE_NO_SCORES; - } - }, localQueryParams.timeout())); - shardIndexTemp++; - } - } catch (Throwable e) { - sink.error(e); - } - sink.complete(); - }), OverflowStrategy.BUFFER) - .publishOn(Schedulers.parallel()); - - } - - private LocalQueryParams getLocalQueryParams(LocalQueryParams queryParams) { - return new LocalQueryParams(queryParams.query(), - 0L, - queryParams.offsetLong() + queryParams.limitLong(), - queryParams.pageLimits(), - queryParams.minCompetitiveScore(), - queryParams.sort(), - queryParams.computePreciseHitsCount(), - queryParams.timeout() - ); - } - - @Override - public String getName() { - return "unsorted unscored streaming multi"; - } -} diff --git a/src/test/java/it/cavallium/dbengine/TestLuceneSearches.java b/src/test/java/it/cavallium/dbengine/TestLuceneSearches.java index 7ab9be2..35bb720 100644 --- a/src/test/java/it/cavallium/dbengine/TestLuceneSearches.java +++ b/src/test/java/it/cavallium/dbengine/TestLuceneSearches.java @@ -39,8 +39,8 @@ import it.cavallium.dbengine.lucene.searcher.OfficialSearcher; import it.cavallium.dbengine.lucene.searcher.ScoredPagedMultiSearcher; import it.cavallium.dbengine.lucene.searcher.PagedLocalSearcher; import it.cavallium.dbengine.lucene.searcher.SortedScoredFullMultiSearcher; -import it.cavallium.dbengine.lucene.searcher.UnsortedScoredFullMultiSearcher; -import it.cavallium.dbengine.lucene.searcher.UnsortedUnscoredStreamingMultiSearcher; +import it.cavallium.dbengine.lucene.searcher.SortedByScoreFullMultiSearcher; +import it.cavallium.dbengine.lucene.searcher.UnsortedStreamingMultiSearcher; import java.io.IOException; import java.util.Collections; import java.util.HashMap; @@ -164,11 +164,11 @@ public class TestLuceneSearches { if (info.sorted() && !info.sortedByScore()) { sink.next(new SortedScoredFullMultiSearcher(ENV)); } else { - sink.next(new UnsortedScoredFullMultiSearcher(ENV)); + sink.next(new SortedByScoreFullMultiSearcher(ENV)); } if (!info.sorted()) { sink.next(new UnsortedUnscoredSimpleMultiSearcher(new PagedLocalSearcher())); - sink.next(new UnsortedUnscoredStreamingMultiSearcher()); + sink.next(new UnsortedStreamingMultiSearcher()); } } sink.next(new AdaptiveMultiSearcher(ENV, true, MAX_IN_MEMORY_RESULT_ENTRIES)); @@ -254,7 +254,7 @@ public class TestLuceneSearches { private boolean supportsPreciseHitsCount(LocalSearcher searcher, ClientQueryParams query) { var sorted = query.isSorted(); - if (searcher instanceof UnsortedUnscoredStreamingMultiSearcher) { + if (searcher instanceof UnsortedStreamingMultiSearcher) { return false; } else if (!sorted) { return !(searcher instanceof AdaptiveMultiSearcher) && !(searcher instanceof AdaptiveLocalSearcher);