Rewrite unsorted lucene queries
This commit is contained in:
parent
a5666dd5b4
commit
a1eec93c64
2
pom.xml
2
pom.xml
@ -149,7 +149,7 @@
|
||||
<dependency>
|
||||
<groupId>org.assertj</groupId>
|
||||
<artifactId>assertj-core</artifactId>
|
||||
<version>3.18.0</version>
|
||||
<version>3.21.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<!-- This will get hamcrest-core automatically -->
|
||||
|
@ -156,7 +156,7 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn<Optimis
|
||||
retries++;
|
||||
|
||||
if (retries == 1) {
|
||||
retryTime = new ExponentialPageLimits(0, 5, 2000);
|
||||
retryTime = new ExponentialPageLimits(0, 1, 2000);
|
||||
}
|
||||
long retryMs = retryTime.getPageLimit(retries);
|
||||
|
||||
@ -172,7 +172,9 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn<Optimis
|
||||
}
|
||||
// Wait for n milliseconds
|
||||
try {
|
||||
if (retryMs > 0) {
|
||||
Thread.sleep(retryMs);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw new RocksDBException("Interrupted");
|
||||
}
|
||||
|
@ -18,7 +18,7 @@ public class AdaptiveLocalSearcher implements LocalSearcher {
|
||||
|
||||
private static final LocalSearcher countSearcher = new CountMultiSearcher();
|
||||
|
||||
private static final MultiSearcher unsortedUnscoredContinuous = new UnsortedUnscoredStreamingMultiSearcher();
|
||||
private static final MultiSearcher unsortedUnscoredContinuous = new UnsortedStreamingMultiSearcher();
|
||||
|
||||
/**
|
||||
* Use in-memory collectors if the expected results count is lower or equal than this limit
|
||||
@ -26,13 +26,13 @@ public class AdaptiveLocalSearcher implements LocalSearcher {
|
||||
private final int maxInMemoryResultEntries;
|
||||
|
||||
@Nullable
|
||||
private final UnsortedScoredFullMultiSearcher unsortedScoredFull;
|
||||
private final SortedByScoreFullMultiSearcher sortedByScoreFull;
|
||||
|
||||
@Nullable
|
||||
private final SortedScoredFullMultiSearcher sortedScoredFull;
|
||||
|
||||
public AdaptiveLocalSearcher(LLTempLMDBEnv env, boolean useLMDB, int maxInMemoryResultEntries) {
|
||||
unsortedScoredFull = useLMDB ? new UnsortedScoredFullMultiSearcher(env) : null;
|
||||
sortedByScoreFull = useLMDB ? new SortedByScoreFullMultiSearcher(env) : null;
|
||||
sortedScoredFull = useLMDB ? new SortedScoredFullMultiSearcher(env) : null;
|
||||
this.maxInMemoryResultEntries = maxInMemoryResultEntries;
|
||||
}
|
||||
@ -74,11 +74,20 @@ public class AdaptiveLocalSearcher implements LocalSearcher {
|
||||
|
||||
if (queryParams.limitLong() == 0) {
|
||||
return countSearcher.collect(indexSearcher, queryParams, keyFieldName, transformer);
|
||||
} else if (queryParams.isSorted() || queryParams.needsScores()) {
|
||||
} else if (queryParams.isSorted()) {
|
||||
if (realLimit <= maxAllowedInMemoryLimit) {
|
||||
return scoredPaged.collect(indexSearcher, queryParams, keyFieldName, transformer);
|
||||
} else {
|
||||
if ((queryParams.isSorted() && !queryParams.isSortedByScore())) {
|
||||
if (queryParams.isSortedByScore()) {
|
||||
if (queryParams.limitLong() < maxInMemoryResultEntries) {
|
||||
throw new UnsupportedOperationException("Allowed limit is " + maxInMemoryResultEntries + " or greater");
|
||||
}
|
||||
if (sortedByScoreFull != null) {
|
||||
return sortedByScoreFull.collect(indexSearcher, queryParams, keyFieldName, transformer);
|
||||
} else {
|
||||
return scoredPaged.collect(indexSearcher, queryParams, keyFieldName, transformer);
|
||||
}
|
||||
} else {
|
||||
if (queryParams.limitLong() < maxInMemoryResultEntries) {
|
||||
throw new UnsupportedOperationException("Allowed limit is " + maxInMemoryResultEntries + " or greater");
|
||||
}
|
||||
@ -87,15 +96,6 @@ public class AdaptiveLocalSearcher implements LocalSearcher {
|
||||
} else {
|
||||
return scoredPaged.collect(indexSearcher, queryParams, keyFieldName, transformer);
|
||||
}
|
||||
} else {
|
||||
if (queryParams.limitLong() < maxInMemoryResultEntries) {
|
||||
throw new UnsupportedOperationException("Allowed limit is " + maxInMemoryResultEntries + " or greater");
|
||||
}
|
||||
if (unsortedScoredFull != null) {
|
||||
return unsortedScoredFull.collect(indexSearcher, queryParams, keyFieldName, transformer);
|
||||
} else {
|
||||
return scoredPaged.collect(indexSearcher, queryParams, keyFieldName, transformer);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -16,7 +16,7 @@ public class AdaptiveMultiSearcher implements MultiSearcher {
|
||||
|
||||
private static final MultiSearcher scoredPaged = new ScoredPagedMultiSearcher();
|
||||
|
||||
private static final MultiSearcher unsortedUnscoredContinuous = new UnsortedUnscoredStreamingMultiSearcher();
|
||||
private static final MultiSearcher unsortedUnscoredContinuous = new UnsortedStreamingMultiSearcher();
|
||||
|
||||
/**
|
||||
* Use in-memory collectors if the expected results count is lower or equal than this limit
|
||||
@ -24,13 +24,13 @@ public class AdaptiveMultiSearcher implements MultiSearcher {
|
||||
private final int maxInMemoryResultEntries;
|
||||
|
||||
@Nullable
|
||||
private final UnsortedScoredFullMultiSearcher unsortedScoredFull;
|
||||
private final SortedByScoreFullMultiSearcher sortedByScoreFull;
|
||||
|
||||
@Nullable
|
||||
private final SortedScoredFullMultiSearcher sortedScoredFull;
|
||||
|
||||
public AdaptiveMultiSearcher(LLTempLMDBEnv env, boolean useLMDB, int maxInMemoryResultEntries) {
|
||||
unsortedScoredFull = useLMDB ? new UnsortedScoredFullMultiSearcher(env) : null;
|
||||
sortedByScoreFull = useLMDB ? new SortedByScoreFullMultiSearcher(env) : null;
|
||||
sortedScoredFull = useLMDB ? new SortedScoredFullMultiSearcher(env) : null;
|
||||
this.maxInMemoryResultEntries = maxInMemoryResultEntries;
|
||||
}
|
||||
@ -64,11 +64,20 @@ public class AdaptiveMultiSearcher implements MultiSearcher {
|
||||
return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> {
|
||||
if (queryParams.limitLong() == 0) {
|
||||
return count.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
|
||||
} else if (queryParams.isSorted() || queryParams.needsScores()) {
|
||||
} else if (queryParams.isSorted()) {
|
||||
if (realLimit <= maxAllowedInMemoryLimit) {
|
||||
return scoredPaged.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
|
||||
} else {
|
||||
if ((queryParams.isSorted() && !queryParams.isSortedByScore())) {
|
||||
if (queryParams.isSortedByScore()) {
|
||||
if (queryParams.limitLong() < maxInMemoryResultEntries) {
|
||||
throw new UnsupportedOperationException("Allowed limit is " + maxInMemoryResultEntries + " or greater");
|
||||
}
|
||||
if (sortedByScoreFull != null) {
|
||||
return sortedByScoreFull.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
|
||||
} else {
|
||||
return scoredPaged.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
|
||||
}
|
||||
} else {
|
||||
if (queryParams.limitLong() < maxInMemoryResultEntries) {
|
||||
throw new UnsupportedOperationException("Allowed limit is " + maxInMemoryResultEntries + " or greater");
|
||||
}
|
||||
@ -77,15 +86,6 @@ public class AdaptiveMultiSearcher implements MultiSearcher {
|
||||
} else {
|
||||
return scoredPaged.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
|
||||
}
|
||||
} else {
|
||||
if (queryParams.limitLong() < maxInMemoryResultEntries) {
|
||||
throw new UnsupportedOperationException("Allowed limit is " + maxInMemoryResultEntries + " or greater");
|
||||
}
|
||||
if (unsortedScoredFull != null) {
|
||||
return unsortedScoredFull.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
|
||||
} else {
|
||||
return scoredPaged.collectMulti(indexSearchersMono, queryParams, keyFieldName, transformer);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -0,0 +1,161 @@
|
||||
package it.cavallium.dbengine.lucene.searcher;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.function.Supplier;
|
||||
import org.apache.lucene.index.LeafReader;
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.search.DocIdSetIterator;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.ScoreDoc;
|
||||
import org.apache.lucene.search.ScoreMode;
|
||||
import org.apache.lucene.search.Scorer;
|
||||
import org.apache.lucene.search.Weight;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
|
||||
public class LuceneGenerator implements Supplier<ScoreDoc> {
|
||||
|
||||
private final IndexSearcher shard;
|
||||
private final int shardIndex;
|
||||
private final Query query;
|
||||
private final Iterator<LeafReaderContext> leavesIterator;
|
||||
private final boolean computeScores;
|
||||
private final Float minScore;
|
||||
|
||||
private long remainingOffset;
|
||||
private long remainingAllowedResults;
|
||||
private Weight weight;
|
||||
|
||||
private LeafReaderContext leaf;
|
||||
private DocIdSetIterator docIdSetIterator;
|
||||
private Scorer scorer;
|
||||
|
||||
LuceneGenerator(IndexSearcher shard, LocalQueryParams localQueryParams, int shardIndex) {
|
||||
this.shard = shard;
|
||||
this.shardIndex = shardIndex;
|
||||
this.query = localQueryParams.query();
|
||||
this.remainingOffset = localQueryParams.offsetLong();
|
||||
this.remainingAllowedResults = localQueryParams.limitLong();
|
||||
this.computeScores = localQueryParams.needsScores() || localQueryParams.minCompetitiveScore() != null;
|
||||
this.minScore = localQueryParams.minCompetitiveScore();
|
||||
List<LeafReaderContext> leaves = shard.getTopReaderContext().leaves();
|
||||
this.leavesIterator = leaves.iterator();
|
||||
}
|
||||
|
||||
public static Flux<ScoreDoc> reactive(IndexSearcher shard, LocalQueryParams localQueryParams, int shardIndex) {
|
||||
return Flux
|
||||
.<ScoreDoc, LuceneGenerator>generate(() -> new LuceneGenerator(shard, localQueryParams, shardIndex),
|
||||
(s, sink) -> {
|
||||
var val = s.get();
|
||||
if (val == null) {
|
||||
sink.complete();
|
||||
} else {
|
||||
sink.next(val);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
)
|
||||
.subscribeOn(Schedulers.boundedElastic());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScoreDoc get() {
|
||||
while (remainingOffset > 0) {
|
||||
skipNext();
|
||||
}
|
||||
if (remainingAllowedResults == 0) {
|
||||
return null;
|
||||
} else {
|
||||
remainingAllowedResults--;
|
||||
}
|
||||
return getNext();
|
||||
}
|
||||
|
||||
public void skipNext() {
|
||||
getNext();
|
||||
remainingOffset--;
|
||||
}
|
||||
|
||||
private Weight createWeight() throws IOException {
|
||||
ScoreMode scoreMode = computeScores ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES;
|
||||
return shard.createWeight(shard.rewrite(query), scoreMode, 1f);
|
||||
}
|
||||
|
||||
public ScoreDoc getNext() {
|
||||
if (weight == null) {
|
||||
try {
|
||||
weight = createWeight();
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
return getWeightedNext();
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private ScoreDoc getWeightedNext() throws IOException {
|
||||
while (tryAdvanceDocIdSetIterator()) {
|
||||
LeafReader reader = leaf.reader();
|
||||
Bits liveDocs = reader.getLiveDocs();
|
||||
int doc;
|
||||
while ((doc = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
|
||||
if (docDeleted(liveDocs, doc) || belowMinScore(scorer)) {
|
||||
continue;
|
||||
}
|
||||
return transformDoc(doc);
|
||||
}
|
||||
docIdSetIterator = null;
|
||||
}
|
||||
clearState();
|
||||
return null;
|
||||
}
|
||||
private boolean tryAdvanceDocIdSetIterator() throws IOException {
|
||||
if (docIdSetIterator != null) {
|
||||
return true;
|
||||
}
|
||||
while (leavesIterator.hasNext()) {
|
||||
LeafReaderContext leaf = leavesIterator.next();
|
||||
Scorer scorer = weight.scorer(leaf);
|
||||
if (scorer == null) {
|
||||
continue;
|
||||
}
|
||||
this.scorer = scorer;
|
||||
this.leaf = leaf;
|
||||
this.docIdSetIterator = scorer.iterator();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private ScoreDoc transformDoc(int doc) throws IOException {
|
||||
return new ScoreDoc(leaf.docBase + doc, scorer.score(), shardIndex);
|
||||
}
|
||||
|
||||
private static boolean docDeleted(@Nullable Bits liveDocs, int doc) {
|
||||
if (liveDocs == null) {
|
||||
return false;
|
||||
}
|
||||
return !liveDocs.get(doc);
|
||||
}
|
||||
|
||||
private boolean belowMinScore(Scorer currentScorer) throws IOException {
|
||||
return minScore != null && currentScorer.score() < minScore;
|
||||
}
|
||||
|
||||
private void clearState() {
|
||||
docIdSetIterator = null;
|
||||
scorer = null;
|
||||
leaf = null;
|
||||
}
|
||||
}
|
@ -0,0 +1,55 @@
|
||||
package it.cavallium.dbengine.lucene.searcher;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import java.util.stream.Stream;
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.ScoreDoc;
|
||||
|
||||
public class LuceneMultiGenerator implements Supplier<ScoreDoc> {
|
||||
|
||||
private final Iterator<Supplier<ScoreDoc>> generators;
|
||||
private Supplier<ScoreDoc> luceneGenerator;
|
||||
|
||||
public LuceneMultiGenerator(List<IndexSearcher> shards, LocalQueryParams localQueryParams) {
|
||||
this.generators = IntStream
|
||||
.range(0, shards.size())
|
||||
.mapToObj(shardIndex -> {
|
||||
IndexSearcher shard = shards.get(shardIndex);
|
||||
return (Supplier<ScoreDoc>) new LuceneGenerator(shard, localQueryParams, shardIndex);
|
||||
})
|
||||
.iterator();
|
||||
tryAdvanceGenerator();
|
||||
}
|
||||
|
||||
private void tryAdvanceGenerator() {
|
||||
if (generators.hasNext()) {
|
||||
luceneGenerator = generators.next();
|
||||
} else {
|
||||
luceneGenerator = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScoreDoc get() {
|
||||
if (luceneGenerator == null) {
|
||||
return null;
|
||||
}
|
||||
ScoreDoc item;
|
||||
do {
|
||||
item = luceneGenerator.get();
|
||||
if (item == null) {
|
||||
tryAdvanceGenerator();
|
||||
if (luceneGenerator == null) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
} while (item == null);
|
||||
return item;
|
||||
}
|
||||
}
|
@ -12,8 +12,6 @@ import it.cavallium.dbengine.lucene.LLScoreDoc;
|
||||
import it.cavallium.dbengine.lucene.collector.FullDocsCollector;
|
||||
import it.cavallium.dbengine.lucene.collector.LMDBFullScoreDocCollector;
|
||||
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -21,13 +19,13 @@ import org.apache.lucene.search.IndexSearcher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public class UnsortedScoredFullMultiSearcher implements MultiSearcher {
|
||||
public class SortedByScoreFullMultiSearcher implements MultiSearcher {
|
||||
|
||||
protected static final Logger logger = LogManager.getLogger(UnsortedScoredFullMultiSearcher.class);
|
||||
protected static final Logger logger = LogManager.getLogger(SortedByScoreFullMultiSearcher.class);
|
||||
|
||||
private final LLTempLMDBEnv env;
|
||||
|
||||
public UnsortedScoredFullMultiSearcher(LLTempLMDBEnv env) {
|
||||
public SortedByScoreFullMultiSearcher(LLTempLMDBEnv env) {
|
||||
this.env = env;
|
||||
}
|
||||
|
||||
@ -46,7 +44,7 @@ public class UnsortedScoredFullMultiSearcher implements MultiSearcher {
|
||||
|
||||
return queryParamsMono.flatMap(queryParams2 -> {
|
||||
if (queryParams2.isSorted() && !queryParams2.isSortedByScore()) {
|
||||
throw new IllegalArgumentException(UnsortedScoredFullMultiSearcher.this.getClass().getSimpleName()
|
||||
throw new IllegalArgumentException(SortedByScoreFullMultiSearcher.this.getClass().getSimpleName()
|
||||
+ " doesn't support sorted queries");
|
||||
}
|
||||
|
||||
@ -152,6 +150,6 @@ public class UnsortedScoredFullMultiSearcher implements MultiSearcher {
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "unsorted scored full multi";
|
||||
return "sorted by score full multi";
|
||||
}
|
||||
}
|
@ -0,0 +1,85 @@
|
||||
package it.cavallium.dbengine.lucene.searcher;
|
||||
|
||||
import static java.util.Objects.requireNonNull;
|
||||
|
||||
import io.net5.buffer.api.Send;
|
||||
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
|
||||
import it.cavallium.dbengine.database.LLKeyScore;
|
||||
import it.cavallium.dbengine.database.LLUtils;
|
||||
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
|
||||
import it.cavallium.dbengine.lucene.LuceneUtils;
|
||||
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
|
||||
import java.util.List;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.ScoreDoc;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public class UnsortedStreamingMultiSearcher implements MultiSearcher {
|
||||
|
||||
@Override
|
||||
public Mono<LuceneSearchResult> collectMulti(Mono<Send<LLIndexSearchers>> indexSearchersMono,
|
||||
LocalQueryParams queryParams,
|
||||
String keyFieldName,
|
||||
LLSearchTransformer transformer) {
|
||||
|
||||
return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> {
|
||||
Mono<LocalQueryParams> queryParamsMono;
|
||||
if (transformer == LLSearchTransformer.NO_TRANSFORMATION) {
|
||||
queryParamsMono = Mono.just(queryParams);
|
||||
} else {
|
||||
queryParamsMono = transformer.transform(Mono
|
||||
.fromCallable(() -> new TransformerInput(indexSearchers, queryParams)));
|
||||
}
|
||||
|
||||
return queryParamsMono.map(queryParams2 -> {
|
||||
var localQueryParams = getLocalQueryParams(queryParams2);
|
||||
if (queryParams2.isSorted() && queryParams2.limitLong() > 0) {
|
||||
throw new UnsupportedOperationException("Sorted queries are not supported"
|
||||
+ " by UnsortedContinuousLuceneMultiSearcher");
|
||||
}
|
||||
var shards = indexSearchers.shards();
|
||||
|
||||
Flux<ScoreDoc> scoreDocsFlux = getScoreDocs(localQueryParams, shards);
|
||||
|
||||
Flux<LLKeyScore> resultsFlux = LuceneUtils.convertHits(scoreDocsFlux, shards, keyFieldName, false);
|
||||
|
||||
var totalHitsCount = new TotalHitsCount(0, false);
|
||||
Flux<LLKeyScore> mergedFluxes = resultsFlux
|
||||
.skip(queryParams2.offsetLong())
|
||||
.take(queryParams2.limitLong(), true);
|
||||
|
||||
return new LuceneSearchResult(totalHitsCount, mergedFluxes, indexSearchers::close);
|
||||
});
|
||||
}, false);
|
||||
}
|
||||
|
||||
private Flux<ScoreDoc> getScoreDocs(LocalQueryParams localQueryParams, List<IndexSearcher> shards) {
|
||||
return Flux
|
||||
.fromIterable(shards)
|
||||
.index()
|
||||
.flatMap(tuple -> {
|
||||
var shardIndex = (int) (long) tuple.getT1();
|
||||
var shard = tuple.getT2();
|
||||
return LuceneGenerator.reactive(shard, localQueryParams, shardIndex);
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
private LocalQueryParams getLocalQueryParams(LocalQueryParams queryParams) {
|
||||
return new LocalQueryParams(queryParams.query(),
|
||||
0L,
|
||||
queryParams.offsetLong() + queryParams.limitLong(),
|
||||
queryParams.pageLimits(),
|
||||
queryParams.minCompetitiveScore(),
|
||||
queryParams.sort(),
|
||||
queryParams.computePreciseHitsCount(),
|
||||
queryParams.timeout()
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "unsorted streaming multi";
|
||||
}
|
||||
}
|
@ -1,146 +0,0 @@
|
||||
package it.cavallium.dbengine.lucene.searcher;
|
||||
|
||||
import static it.cavallium.dbengine.lucene.LuceneUtils.withTimeout;
|
||||
import static java.lang.Math.toIntExact;
|
||||
import static java.util.Objects.requireNonNull;
|
||||
|
||||
import io.net5.buffer.api.Send;
|
||||
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
|
||||
import it.cavallium.dbengine.database.LLKeyScore;
|
||||
import it.cavallium.dbengine.database.LLUtils;
|
||||
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
|
||||
import it.cavallium.dbengine.lucene.LuceneUtils;
|
||||
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.LockSupport;
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.search.CollectionTerminatedException;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.ScoreDoc;
|
||||
import org.apache.lucene.search.ScoreMode;
|
||||
import org.apache.lucene.search.SimpleCollector;
|
||||
import org.warp.commonutils.type.ShortNamedThreadFactory;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.FluxSink.OverflowStrategy;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
|
||||
public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher {
|
||||
|
||||
private static final int SEARCH_THREADS = Math.min(Math.max(8, Runtime.getRuntime().availableProcessors()), 128);
|
||||
private static final ThreadFactory THREAD_FACTORY = new ShortNamedThreadFactory("UnscoredStreamingSearcher");
|
||||
private static final ExecutorService EXECUTOR = Executors.newCachedThreadPool(THREAD_FACTORY); // Executors.newFixedThreadPool(SEARCH_THREADS, THREAD_FACTORY);
|
||||
|
||||
@Override
|
||||
public Mono<LuceneSearchResult> collectMulti(Mono<Send<LLIndexSearchers>> indexSearchersMono,
|
||||
LocalQueryParams queryParams,
|
||||
String keyFieldName,
|
||||
LLSearchTransformer transformer) {
|
||||
|
||||
return LLUtils.usingSendResource(indexSearchersMono, indexSearchers -> {
|
||||
Mono<LocalQueryParams> queryParamsMono;
|
||||
if (transformer == LLSearchTransformer.NO_TRANSFORMATION) {
|
||||
queryParamsMono = Mono.just(queryParams);
|
||||
} else {
|
||||
queryParamsMono = transformer.transform(Mono
|
||||
.fromCallable(() -> new TransformerInput(indexSearchers, queryParams)));
|
||||
}
|
||||
|
||||
return queryParamsMono.map(queryParams2 -> {
|
||||
var localQueryParams = getLocalQueryParams(queryParams2);
|
||||
if (queryParams2.isSorted() && queryParams2.limitLong() > 0) {
|
||||
throw new UnsupportedOperationException("Sorted queries are not supported"
|
||||
+ " by UnsortedUnscoredContinuousLuceneMultiSearcher");
|
||||
}
|
||||
if (queryParams2.needsScores() && queryParams2.limitLong() > 0) {
|
||||
throw new UnsupportedOperationException("Scored queries are not supported"
|
||||
+ " by UnsortedUnscoredContinuousLuceneMultiSearcher");
|
||||
}
|
||||
var shards = indexSearchers.shards();
|
||||
|
||||
Flux<ScoreDoc> scoreDocsFlux = getScoreDocs(localQueryParams, shards);
|
||||
|
||||
Flux<LLKeyScore> resultsFlux = LuceneUtils.convertHits(scoreDocsFlux, shards, keyFieldName, false);
|
||||
|
||||
var totalHitsCount = new TotalHitsCount(0, false);
|
||||
Flux<LLKeyScore> mergedFluxes = resultsFlux
|
||||
.skip(queryParams2.offsetLong())
|
||||
.take(queryParams2.limitLong(), true);
|
||||
|
||||
return new LuceneSearchResult(totalHitsCount, mergedFluxes, indexSearchers::close);
|
||||
});
|
||||
}, false);
|
||||
}
|
||||
|
||||
private Flux<ScoreDoc> getScoreDocs(LocalQueryParams localQueryParams, List<IndexSearcher> shards) {
|
||||
return Flux
|
||||
.<ScoreDoc>create(sink -> EXECUTOR.execute(() -> {
|
||||
try {
|
||||
LLUtils.ensureBlocking();
|
||||
var thread = Thread.currentThread();
|
||||
sink.onRequest(lc -> LockSupport.unpark(thread));
|
||||
int shardIndexTemp = 0;
|
||||
for (IndexSearcher shard : shards) {
|
||||
if (sink.isCancelled()) break;
|
||||
final int shardIndex = shardIndexTemp;
|
||||
shard.search(localQueryParams.query(), withTimeout(new SimpleCollector() {
|
||||
private LeafReaderContext leafReaderContext;
|
||||
|
||||
@Override
|
||||
protected void doSetNextReader(LeafReaderContext context) {
|
||||
this.leafReaderContext = context;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void collect(int i) {
|
||||
// Assert that this is a non-blocking context
|
||||
assert !Schedulers.isInNonBlockingThread();
|
||||
var scoreDoc = new ScoreDoc(leafReaderContext.docBase + i, 0, shardIndex);
|
||||
if (sink.requestedFromDownstream() <= 0 || sink.isCancelled()) {
|
||||
if (sink.isCancelled()) {
|
||||
throw new CollectionTerminatedException();
|
||||
} else {
|
||||
// 1000ms
|
||||
LockSupport.parkNanos(1000000000L);
|
||||
}
|
||||
}
|
||||
sink.next(scoreDoc);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScoreMode scoreMode() {
|
||||
return ScoreMode.COMPLETE_NO_SCORES;
|
||||
}
|
||||
}, localQueryParams.timeout()));
|
||||
shardIndexTemp++;
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
sink.error(e);
|
||||
}
|
||||
sink.complete();
|
||||
}), OverflowStrategy.BUFFER)
|
||||
.publishOn(Schedulers.parallel());
|
||||
|
||||
}
|
||||
|
||||
private LocalQueryParams getLocalQueryParams(LocalQueryParams queryParams) {
|
||||
return new LocalQueryParams(queryParams.query(),
|
||||
0L,
|
||||
queryParams.offsetLong() + queryParams.limitLong(),
|
||||
queryParams.pageLimits(),
|
||||
queryParams.minCompetitiveScore(),
|
||||
queryParams.sort(),
|
||||
queryParams.computePreciseHitsCount(),
|
||||
queryParams.timeout()
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "unsorted unscored streaming multi";
|
||||
}
|
||||
}
|
@ -39,8 +39,8 @@ import it.cavallium.dbengine.lucene.searcher.OfficialSearcher;
|
||||
import it.cavallium.dbengine.lucene.searcher.ScoredPagedMultiSearcher;
|
||||
import it.cavallium.dbengine.lucene.searcher.PagedLocalSearcher;
|
||||
import it.cavallium.dbengine.lucene.searcher.SortedScoredFullMultiSearcher;
|
||||
import it.cavallium.dbengine.lucene.searcher.UnsortedScoredFullMultiSearcher;
|
||||
import it.cavallium.dbengine.lucene.searcher.UnsortedUnscoredStreamingMultiSearcher;
|
||||
import it.cavallium.dbengine.lucene.searcher.SortedByScoreFullMultiSearcher;
|
||||
import it.cavallium.dbengine.lucene.searcher.UnsortedStreamingMultiSearcher;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
@ -164,11 +164,11 @@ public class TestLuceneSearches {
|
||||
if (info.sorted() && !info.sortedByScore()) {
|
||||
sink.next(new SortedScoredFullMultiSearcher(ENV));
|
||||
} else {
|
||||
sink.next(new UnsortedScoredFullMultiSearcher(ENV));
|
||||
sink.next(new SortedByScoreFullMultiSearcher(ENV));
|
||||
}
|
||||
if (!info.sorted()) {
|
||||
sink.next(new UnsortedUnscoredSimpleMultiSearcher(new PagedLocalSearcher()));
|
||||
sink.next(new UnsortedUnscoredStreamingMultiSearcher());
|
||||
sink.next(new UnsortedStreamingMultiSearcher());
|
||||
}
|
||||
}
|
||||
sink.next(new AdaptiveMultiSearcher(ENV, true, MAX_IN_MEMORY_RESULT_ENTRIES));
|
||||
@ -254,7 +254,7 @@ public class TestLuceneSearches {
|
||||
private boolean supportsPreciseHitsCount(LocalSearcher searcher,
|
||||
ClientQueryParams query) {
|
||||
var sorted = query.isSorted();
|
||||
if (searcher instanceof UnsortedUnscoredStreamingMultiSearcher) {
|
||||
if (searcher instanceof UnsortedStreamingMultiSearcher) {
|
||||
return false;
|
||||
} else if (!sorted) {
|
||||
return !(searcher instanceof AdaptiveMultiSearcher) && !(searcher instanceof AdaptiveLocalSearcher);
|
||||
|
Loading…
Reference in New Issue
Block a user