Bugfixes
This commit is contained in:
parent
b5ccf315be
commit
6c97b0f068
@ -21,6 +21,7 @@ import it.cavallium.dbengine.lucene.LuceneUtils;
|
||||
import it.cavallium.dbengine.lucene.ScheduledTaskLifecycle;
|
||||
import it.cavallium.dbengine.lucene.searcher.AdaptiveLuceneLocalSearcher;
|
||||
import it.cavallium.dbengine.lucene.searcher.AdaptiveLuceneMultiSearcher;
|
||||
import it.cavallium.dbengine.lucene.searcher.LocalQueryParams;
|
||||
import it.cavallium.dbengine.lucene.searcher.LuceneLocalSearcher;
|
||||
import it.cavallium.dbengine.lucene.searcher.LuceneMultiSearcher;
|
||||
import it.cavallium.dbengine.lucene.searcher.LuceneShardSearcher;
|
||||
@ -36,7 +37,6 @@ import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.apache.commons.lang3.NotImplementedException;
|
||||
import org.apache.lucene.index.ConcurrentMergeScheduler;
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
@ -50,6 +50,8 @@ import org.apache.lucene.search.BooleanClause.Occur;
|
||||
import org.apache.lucene.search.BooleanQuery;
|
||||
import org.apache.lucene.search.ConstantScoreQuery;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.MatchAllDocsQuery;
|
||||
import org.apache.lucene.search.MatchNoDocsQuery;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.ScoreMode;
|
||||
import org.apache.lucene.search.SearcherManager;
|
||||
@ -462,15 +464,109 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
QueryParams queryParams,
|
||||
String keyFieldName,
|
||||
Flux<Tuple2<String, Set<String>>> mltDocumentFieldsFlux) {
|
||||
throw new NotImplementedException();
|
||||
return getMoreLikeThisQuery(snapshot, LuceneUtils.toLocalQueryParams(queryParams), mltDocumentFieldsFlux)
|
||||
.flatMap(modifiedLocalQuery -> Mono
|
||||
.usingWhen(
|
||||
this.acquireSearcherWrapper(snapshot),
|
||||
indexSearcher -> localSearcher.collect(indexSearcher, modifiedLocalQuery, keyFieldName, luceneSearcherScheduler),
|
||||
indexSearcher -> releaseSearcherWrapper(snapshot, indexSearcher)
|
||||
)
|
||||
.map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount()))
|
||||
);
|
||||
}
|
||||
|
||||
public Mono<Void> distributedMoreLikeThis(@Nullable LLSnapshot snapshot,
|
||||
QueryParams queryParams,
|
||||
Flux<Tuple2<String, Set<String>>> mltDocumentFieldsFlux,
|
||||
LuceneShardSearcher shardSearcher) {
|
||||
return getMoreLikeThisQuery(snapshot, LuceneUtils.toLocalQueryParams(queryParams), mltDocumentFieldsFlux)
|
||||
.flatMap(modifiedLocalQuery -> Mono
|
||||
.usingWhen(
|
||||
this.acquireSearcherWrapper(snapshot),
|
||||
indexSearcher -> shardSearcher.searchOn(indexSearcher, modifiedLocalQuery, luceneSearcherScheduler),
|
||||
indexSearcher -> releaseSearcherWrapper(snapshot, indexSearcher)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
public Mono<LocalQueryParams> getMoreLikeThisQuery(@Nullable LLSnapshot snapshot,
|
||||
LocalQueryParams localQueryParams,
|
||||
Flux<Tuple2<String, Set<String>>> mltDocumentFieldsFlux) {
|
||||
Query luceneAdditionalQuery;
|
||||
try {
|
||||
luceneAdditionalQuery = localQueryParams.query();
|
||||
} catch (Exception e) {
|
||||
return Mono.error(e);
|
||||
}
|
||||
return mltDocumentFieldsFlux
|
||||
.collectMap(Tuple2::getT1, Tuple2::getT2, HashMap::new)
|
||||
.flatMap(mltDocumentFields -> {
|
||||
mltDocumentFields.entrySet().removeIf(entry -> entry.getValue().isEmpty());
|
||||
if (mltDocumentFields.isEmpty()) {
|
||||
return Mono.just(new LocalQueryParams(new MatchNoDocsQuery(),
|
||||
localQueryParams.offset(),
|
||||
localQueryParams.limit(),
|
||||
localQueryParams.minCompetitiveScore(),
|
||||
localQueryParams.sort(),
|
||||
localQueryParams.scoreMode()
|
||||
));
|
||||
}
|
||||
return Mono
|
||||
.usingWhen(
|
||||
this.acquireSearcherWrapper(snapshot),
|
||||
indexSearcher -> Mono
|
||||
.fromCallable(() -> {
|
||||
var mlt = new MoreLikeThis(indexSearcher.getIndexReader());
|
||||
mlt.setAnalyzer(indexWriter.getAnalyzer());
|
||||
mlt.setFieldNames(mltDocumentFields.keySet().toArray(String[]::new));
|
||||
mlt.setMinTermFreq(1);
|
||||
mlt.setMinDocFreq(3);
|
||||
mlt.setMaxDocFreqPct(20);
|
||||
mlt.setBoost(localQueryParams.scoreMode().needsScores());
|
||||
mlt.setStopWords(EnglishItalianStopFilter.getStopWordsString());
|
||||
var similarity = getSimilarity();
|
||||
if (similarity instanceof TFIDFSimilarity) {
|
||||
mlt.setSimilarity((TFIDFSimilarity) similarity);
|
||||
} else {
|
||||
logger.trace("Using an unsupported similarity algorithm for MoreLikeThis:"
|
||||
+ " {}. You must use a similarity instance based on TFIDFSimilarity!", similarity);
|
||||
}
|
||||
|
||||
// Get the reference docId and apply it to MoreLikeThis, to generate the query
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
var mltQuery = mlt.like((Map) mltDocumentFields);
|
||||
Query luceneQuery;
|
||||
if (!(luceneAdditionalQuery instanceof MatchAllDocsQuery)) {
|
||||
luceneQuery = new BooleanQuery.Builder()
|
||||
.add(mltQuery, Occur.MUST)
|
||||
.add(new ConstantScoreQuery(luceneAdditionalQuery), Occur.MUST)
|
||||
.build();
|
||||
} else {
|
||||
luceneQuery = mltQuery;
|
||||
}
|
||||
|
||||
return luceneQuery;
|
||||
})
|
||||
.subscribeOn(Schedulers.boundedElastic())
|
||||
.map(luceneQuery -> new LocalQueryParams(luceneQuery,
|
||||
localQueryParams.offset(),
|
||||
localQueryParams.limit(),
|
||||
localQueryParams.minCompetitiveScore(),
|
||||
localQueryParams.sort(),
|
||||
localQueryParams.scoreMode()
|
||||
)),
|
||||
indexSearcher -> releaseSearcherWrapper(snapshot, indexSearcher)
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<LLSearchResultShard> search(@Nullable LLSnapshot snapshot, QueryParams queryParams, String keyFieldName) {
|
||||
LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams);
|
||||
return Mono
|
||||
.usingWhen(
|
||||
this.acquireSearcherWrapper(snapshot),
|
||||
indexSearcher -> localSearcher.collect(indexSearcher, queryParams, keyFieldName, luceneSearcherScheduler),
|
||||
indexSearcher -> localSearcher.collect(indexSearcher, localQueryParams, keyFieldName, luceneSearcherScheduler),
|
||||
indexSearcher -> releaseSearcherWrapper(snapshot, indexSearcher)
|
||||
)
|
||||
.map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount()));
|
||||
@ -479,10 +575,11 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
public Mono<Void> distributedSearch(@Nullable LLSnapshot snapshot,
|
||||
QueryParams queryParams,
|
||||
LuceneShardSearcher shardSearcher) {
|
||||
LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams);
|
||||
return Mono
|
||||
.usingWhen(
|
||||
this.acquireSearcherWrapper(snapshot),
|
||||
indexSearcher -> shardSearcher.searchOn(indexSearcher, queryParams, luceneSearcherScheduler),
|
||||
indexSearcher -> shardSearcher.searchOn(indexSearcher, localQueryParams, luceneSearcherScheduler),
|
||||
indexSearcher -> releaseSearcherWrapper(snapshot, indexSearcher)
|
||||
);
|
||||
}
|
||||
|
@ -14,9 +14,11 @@ import it.cavallium.dbengine.database.LLSearchResult;
|
||||
import it.cavallium.dbengine.database.LLSearchResultShard;
|
||||
import it.cavallium.dbengine.database.LLSnapshot;
|
||||
import it.cavallium.dbengine.database.LLTerm;
|
||||
import it.cavallium.dbengine.lucene.LuceneUtils;
|
||||
import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
|
||||
import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity;
|
||||
import it.cavallium.dbengine.lucene.searcher.AdaptiveLuceneMultiSearcher;
|
||||
import it.cavallium.dbengine.lucene.searcher.LocalQueryParams;
|
||||
import it.cavallium.dbengine.lucene.searcher.LuceneMultiSearcher;
|
||||
import it.cavallium.dbengine.lucene.searcher.LuceneShardSearcher;
|
||||
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
|
||||
@ -41,7 +43,6 @@ import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.apache.commons.lang3.NotImplementedException;
|
||||
import org.apache.lucene.search.CollectionStatistics;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
@ -199,19 +200,40 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
|
||||
QueryParams queryParams,
|
||||
String keyFieldName,
|
||||
Flux<Tuple2<String, Set<String>>> mltDocumentFields) {
|
||||
throw new NotImplementedException();
|
||||
LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams);
|
||||
record LuceneIndexWithSnapshot(LLLocalLuceneIndex luceneIndex, Optional<LLSnapshot> snapshot) {}
|
||||
|
||||
return multiSearcher
|
||||
// Create shard searcher
|
||||
.createShardSearcher(localQueryParams)
|
||||
.flatMap(shardSearcher -> Flux
|
||||
// Iterate the indexed shards
|
||||
.fromArray(luceneIndices).index()
|
||||
// Resolve the snapshot of each shard
|
||||
.flatMap(tuple -> Mono
|
||||
.fromCallable(() -> resolveSnapshotOptional(snapshot, (int) (long) tuple.getT1()))
|
||||
.map(luceneSnapshot -> new LuceneIndexWithSnapshot(tuple.getT2(), luceneSnapshot))
|
||||
)
|
||||
// Execute the query and collect it using the shard searcher
|
||||
.flatMap(luceneIndexWithSnapshot -> luceneIndexWithSnapshot.luceneIndex()
|
||||
.distributedMoreLikeThis(luceneIndexWithSnapshot.snapshot.orElse(null), queryParams, mltDocumentFields, shardSearcher))
|
||||
// Collect all the shards results into a single global result
|
||||
.then(shardSearcher.collect(localQueryParams, keyFieldName, Schedulers.boundedElastic()))
|
||||
)
|
||||
// Fix the result type
|
||||
.map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<LLSearchResultShard> search(@Nullable LLSnapshot snapshot,
|
||||
QueryParams queryParams,
|
||||
String keyFieldName) {
|
||||
|
||||
LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams);
|
||||
record LuceneIndexWithSnapshot(LLLocalLuceneIndex luceneIndex, Optional<LLSnapshot> snapshot) {}
|
||||
|
||||
return multiSearcher
|
||||
// Create shard searcher
|
||||
.createShardSearcher(queryParams)
|
||||
.createShardSearcher(localQueryParams)
|
||||
.flatMap(shardSearcher -> Flux
|
||||
// Iterate the indexed shards
|
||||
.fromArray(luceneIndices).index()
|
||||
@ -224,7 +246,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
|
||||
.flatMap(luceneIndexWithSnapshot -> luceneIndexWithSnapshot.luceneIndex()
|
||||
.distributedSearch(luceneIndexWithSnapshot.snapshot.orElse(null), queryParams, shardSearcher))
|
||||
// Collect all the shards results into a single global result
|
||||
.then(shardSearcher.collect(queryParams, keyFieldName, Schedulers.boundedElastic()))
|
||||
.then(shardSearcher.collect(localQueryParams, keyFieldName, Schedulers.boundedElastic()))
|
||||
)
|
||||
// Fix the result type
|
||||
.map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount()));
|
||||
|
@ -1,18 +1,10 @@
|
||||
package it.cavallium.dbengine.lucene;
|
||||
|
||||
import com.ibm.icu.text.Collator;
|
||||
import com.ibm.icu.util.ULocale;
|
||||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import it.cavallium.dbengine.client.IndicizerAnalyzers;
|
||||
import it.cavallium.dbengine.client.IndicizerSimilarities;
|
||||
import it.cavallium.dbengine.client.MultiSort;
|
||||
import it.cavallium.dbengine.client.SearchResult;
|
||||
import it.cavallium.dbengine.client.SearchResultItem;
|
||||
import it.cavallium.dbengine.client.SearchResultKey;
|
||||
import it.cavallium.dbengine.client.SearchResultKeys;
|
||||
import it.cavallium.dbengine.database.LLKeyScore;
|
||||
import it.cavallium.dbengine.database.LLSearchResultShard;
|
||||
import it.cavallium.dbengine.database.LLUtils;
|
||||
import it.cavallium.dbengine.client.query.QueryParser;
|
||||
import it.cavallium.dbengine.client.query.current.data.QueryParams;
|
||||
import it.cavallium.dbengine.database.collections.DatabaseMapDictionary;
|
||||
import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep;
|
||||
import it.cavallium.dbengine.database.collections.Joiner.ValueGetter;
|
||||
@ -21,12 +13,12 @@ import it.cavallium.dbengine.lucene.analyzer.NCharGramEdgeAnalyzer;
|
||||
import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
|
||||
import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity;
|
||||
import it.cavallium.dbengine.lucene.analyzer.WordAnalyzer;
|
||||
import it.cavallium.dbengine.lucene.searcher.LocalQueryParams;
|
||||
import it.cavallium.dbengine.lucene.similarity.NGramSimilarity;
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
@ -36,7 +28,6 @@ import org.apache.lucene.analysis.LowerCaseFilter;
|
||||
import org.apache.lucene.analysis.TokenStream;
|
||||
import org.apache.lucene.analysis.en.EnglishPossessiveFilter;
|
||||
import org.apache.lucene.analysis.en.KStemFilter;
|
||||
import org.apache.lucene.analysis.icu.ICUCollationKeyAnalyzer;
|
||||
import org.apache.lucene.analysis.miscellaneous.ASCIIFoldingFilter;
|
||||
import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper;
|
||||
import org.apache.lucene.analysis.standard.StandardAnalyzer;
|
||||
@ -44,7 +35,6 @@ import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.IndexableField;
|
||||
import org.apache.lucene.search.FieldDoc;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.ScoreDoc;
|
||||
import org.apache.lucene.search.similarities.BooleanSimilarity;
|
||||
import org.apache.lucene.search.similarities.ClassicSimilarity;
|
||||
@ -57,10 +47,6 @@ import org.novasearch.lucene.search.similarities.LdpSimilarity;
|
||||
import org.novasearch.lucene.search.similarities.LtcSimilarity;
|
||||
import org.novasearch.lucene.search.similarities.RobertsonSimilarity;
|
||||
import org.warp.commonutils.log.Logger;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.util.function.Tuple2;
|
||||
import reactor.util.function.Tuples;
|
||||
|
||||
public class LuceneUtils {
|
||||
private static final Analyzer lucene4GramWordsAnalyzerEdgeInstance = new NCharGramEdgeAnalyzer(true, 4, 4);
|
||||
@ -333,4 +319,14 @@ public class LuceneUtils {
|
||||
}
|
||||
return scoreDocs[scoreDocs.length - 1];
|
||||
}
|
||||
|
||||
public static LocalQueryParams toLocalQueryParams(QueryParams queryParams) {
|
||||
return new LocalQueryParams(QueryParser.toQuery(queryParams.query()),
|
||||
safeLongToInt(queryParams.offset()),
|
||||
safeLongToInt(queryParams.limit()),
|
||||
queryParams.minCompetitiveScore().getNullable(),
|
||||
QueryParser.toSort(queryParams.sort()),
|
||||
QueryParser.toScoreMode(queryParams.scoreMode())
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -9,11 +9,17 @@ public class AdaptiveLuceneLocalSearcher implements LuceneLocalSearcher {
|
||||
|
||||
private static final LuceneLocalSearcher localSearcher = new SimpleLuceneLocalSearcher();
|
||||
|
||||
private static final LuceneLocalSearcher countSearcher = new CountLuceneLocalSearcher();
|
||||
|
||||
@Override
|
||||
public Mono<LuceneSearchResult> collect(IndexSearcher indexSearcher,
|
||||
QueryParams queryParams,
|
||||
LocalQueryParams queryParams,
|
||||
String keyFieldName,
|
||||
Scheduler scheduler) {
|
||||
return localSearcher.collect(indexSearcher, queryParams, keyFieldName, scheduler);
|
||||
if (queryParams.limit() == 0) {
|
||||
return countSearcher.collect(indexSearcher, queryParams, keyFieldName, scheduler);
|
||||
} else {
|
||||
return localSearcher.collect(indexSearcher, queryParams, keyFieldName, scheduler);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -14,11 +14,10 @@ public class AdaptiveLuceneMultiSearcher implements LuceneMultiSearcher {
|
||||
private static final LuceneMultiSearcher countLuceneMultiSearcher = new CountLuceneMultiSearcher();
|
||||
|
||||
@Override
|
||||
public Mono<LuceneShardSearcher> createShardSearcher(QueryParams queryParams) {
|
||||
Sort luceneSort = QueryParser.toSort(queryParams.sort());
|
||||
public Mono<LuceneShardSearcher> createShardSearcher(LocalQueryParams queryParams) {
|
||||
if (queryParams.limit() <= 0) {
|
||||
return countLuceneMultiSearcher.createShardSearcher(queryParams);
|
||||
} else if ((luceneSort != null && luceneSort != Sort.RELEVANCE) || queryParams.scoreMode().computeScores()) {
|
||||
} else if ((queryParams.sort() != null && queryParams.sort() != Sort.RELEVANCE) || queryParams.scoreMode().needsScores()) {
|
||||
return sharedSortedLuceneMultiSearcher.createShardSearcher(queryParams);
|
||||
} else {
|
||||
return unscoredLuceneMultiSearcher.createShardSearcher(queryParams);
|
||||
|
@ -0,0 +1,25 @@
|
||||
package it.cavallium.dbengine.lucene.searcher;
|
||||
|
||||
import it.cavallium.dbengine.client.query.QueryParser;
|
||||
import it.cavallium.dbengine.client.query.current.data.QueryParams;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Scheduler;
|
||||
|
||||
public class CountLuceneLocalSearcher implements LuceneLocalSearcher {
|
||||
|
||||
@Override
|
||||
public Mono<LuceneSearchResult> collect(IndexSearcher indexSearcher,
|
||||
LocalQueryParams queryParams,
|
||||
String keyFieldName,
|
||||
Scheduler scheduler) {
|
||||
//noinspection BlockingMethodInNonBlockingContext
|
||||
return Mono
|
||||
.fromCallable(() -> new LuceneSearchResult(
|
||||
indexSearcher.count(queryParams.query()),
|
||||
Flux.empty())
|
||||
)
|
||||
.subscribeOn(scheduler);
|
||||
}
|
||||
}
|
@ -12,25 +12,24 @@ import reactor.core.scheduler.Scheduler;
|
||||
public class CountLuceneMultiSearcher implements LuceneMultiSearcher {
|
||||
|
||||
@Override
|
||||
public Mono<LuceneShardSearcher> createShardSearcher(QueryParams queryParams) {
|
||||
public Mono<LuceneShardSearcher> createShardSearcher(LocalQueryParams queryParams) {
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
AtomicLong totalHits = new AtomicLong(0);
|
||||
return new LuceneShardSearcher() {
|
||||
@Override
|
||||
public Mono<Void> searchOn(IndexSearcher indexSearcher, QueryParams queryParams, Scheduler scheduler) {
|
||||
public Mono<Void> searchOn(IndexSearcher indexSearcher, LocalQueryParams queryParams, Scheduler scheduler) {
|
||||
return Mono
|
||||
.<Void>fromCallable(() -> {
|
||||
Query luceneQuery = QueryParser.toQuery(queryParams.query());
|
||||
//noinspection BlockingMethodInNonBlockingContext
|
||||
totalHits.addAndGet(indexSearcher.count(luceneQuery));
|
||||
totalHits.addAndGet(indexSearcher.count(queryParams.query()));
|
||||
return null;
|
||||
})
|
||||
.subscribeOn(scheduler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<LuceneSearchResult> collect(QueryParams queryParams, String keyFieldName, Scheduler scheduler) {
|
||||
public Mono<LuceneSearchResult> collect(LocalQueryParams queryParams, String keyFieldName, Scheduler scheduler) {
|
||||
return Mono.fromCallable(() -> new LuceneSearchResult(totalHits.get(), Flux.empty()));
|
||||
}
|
||||
};
|
||||
|
@ -43,7 +43,7 @@ class FieldSimpleLuceneShardSearcher implements LuceneShardSearcher {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> searchOn(IndexSearcher indexSearcher, QueryParams queryParams, Scheduler scheduler) {
|
||||
public Mono<Void> searchOn(IndexSearcher indexSearcher, LocalQueryParams queryParams, Scheduler scheduler) {
|
||||
return Mono.<Void>fromCallable(() -> {
|
||||
TopFieldCollector collector;
|
||||
synchronized (lock) {
|
||||
@ -59,7 +59,7 @@ class FieldSimpleLuceneShardSearcher implements LuceneShardSearcher {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<LuceneSearchResult> collect(QueryParams queryParams, String keyFieldName, Scheduler scheduler) {
|
||||
public Mono<LuceneSearchResult> collect(LocalQueryParams queryParams, String keyFieldName, Scheduler scheduler) {
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
TopDocs[] topDocs;
|
||||
@ -95,11 +95,8 @@ class FieldSimpleLuceneShardSearcher implements LuceneShardSearcher {
|
||||
() -> new CurrentPageInfo(LuceneUtils.getLastFieldDoc(result.scoreDocs), paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1),
|
||||
(s, sink) -> {
|
||||
if (s.last() != null && s.remainingLimit() > 0) {
|
||||
Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null");
|
||||
Query luceneQuery = QueryParser.toQuery(queryParams.query());
|
||||
Sort luceneSort = QueryParser.toSort(queryParams.sort());
|
||||
ScoreMode luceneScoreMode = QueryParser.toScoreMode(queryParams.scoreMode());
|
||||
if (luceneSort == null && luceneScoreMode.needsScores()) {
|
||||
Sort luceneSort = queryParams.sort();
|
||||
if (luceneSort == null && queryParams.scoreMode().needsScores()) {
|
||||
luceneSort = Sort.RELEVANCE;
|
||||
}
|
||||
CollectorManager<TopFieldCollector, TopFieldDocs> sharedManager = TopFieldCollector
|
||||
|
@ -0,0 +1,11 @@
|
||||
package it.cavallium.dbengine.lucene.searcher;
|
||||
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.ScoreMode;
|
||||
import org.apache.lucene.search.Sort;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
public record LocalQueryParams(@NotNull Query query, int offset, int limit,
|
||||
@Nullable Float minCompetitiveScore, @Nullable Sort sort,
|
||||
@NotNull ScoreMode scoreMode) {}
|
@ -14,7 +14,7 @@ public interface LuceneLocalSearcher {
|
||||
* @param scheduler a blocking scheduler
|
||||
*/
|
||||
Mono<LuceneSearchResult> collect(IndexSearcher indexSearcher,
|
||||
QueryParams queryParams,
|
||||
LocalQueryParams queryParams,
|
||||
String keyFieldName,
|
||||
Scheduler scheduler);
|
||||
}
|
||||
|
@ -25,7 +25,7 @@ public interface LuceneMultiSearcher {
|
||||
* Do a lucene query, receiving the single results using a consumer
|
||||
* @param queryParams the query parameters
|
||||
*/
|
||||
Mono<LuceneShardSearcher> createShardSearcher(QueryParams queryParams);
|
||||
Mono<LuceneShardSearcher> createShardSearcher(LocalQueryParams queryParams);
|
||||
|
||||
static Flux<LLKeyScore> convertHits(
|
||||
ScoreDoc[] hits,
|
||||
|
@ -13,7 +13,7 @@ public interface LuceneShardSearcher {
|
||||
* @param scheduler a blocking scheduler
|
||||
*/
|
||||
Mono<Void> searchOn(IndexSearcher indexSearcher,
|
||||
QueryParams queryParams,
|
||||
LocalQueryParams queryParams,
|
||||
Scheduler scheduler);
|
||||
|
||||
/**
|
||||
@ -21,5 +21,5 @@ public interface LuceneShardSearcher {
|
||||
* @param keyFieldName the name of the key field
|
||||
* @param scheduler a blocking scheduler
|
||||
*/
|
||||
Mono<LuceneSearchResult> collect(QueryParams queryParams, String keyFieldName, Scheduler scheduler);
|
||||
Mono<LuceneSearchResult> collect(LocalQueryParams queryParams, String keyFieldName, Scheduler scheduler);
|
||||
}
|
||||
|
@ -18,14 +18,11 @@ import reactor.core.publisher.Mono;
|
||||
public class SharedSortedLuceneMultiSearcher implements LuceneMultiSearcher {
|
||||
|
||||
@Override
|
||||
public Mono<LuceneShardSearcher> createShardSearcher(QueryParams queryParams) {
|
||||
public Mono<LuceneShardSearcher> createShardSearcher(LocalQueryParams queryParams) {
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null");
|
||||
Query luceneQuery = QueryParser.toQuery(queryParams.query());
|
||||
Sort luceneSort = QueryParser.toSort(queryParams.sort());
|
||||
ScoreMode luceneScoreMode = QueryParser.toScoreMode(queryParams.scoreMode());
|
||||
if (luceneSort == null && luceneScoreMode.needsScores()) {
|
||||
Sort luceneSort = queryParams.sort();
|
||||
if (luceneSort == null && queryParams.scoreMode().needsScores()) {
|
||||
luceneSort = Sort.RELEVANCE;
|
||||
}
|
||||
PaginationInfo paginationInfo;
|
||||
@ -36,7 +33,7 @@ public class SharedSortedLuceneMultiSearcher implements LuceneMultiSearcher {
|
||||
}
|
||||
CollectorManager<TopFieldCollector, TopFieldDocs> sharedManager = TopFieldCollector
|
||||
.createSharedManager(luceneSort, LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset() + paginationInfo.firstPageLimit()), null, 1000);
|
||||
return new FieldSimpleLuceneShardSearcher(sharedManager, luceneQuery, paginationInfo);
|
||||
return new FieldSimpleLuceneShardSearcher(sharedManager, queryParams.query(), paginationInfo);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -1,16 +1,29 @@
|
||||
package it.cavallium.dbengine.lucene.searcher;
|
||||
|
||||
import static it.cavallium.dbengine.lucene.searcher.CurrentPageInfo.EMPTY_STATUS;
|
||||
import static it.cavallium.dbengine.lucene.searcher.CurrentPageInfo.TIE_BREAKER;
|
||||
import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.FIRST_PAGE_LIMIT;
|
||||
import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SEARCH_LIMIT;
|
||||
|
||||
import it.cavallium.dbengine.client.query.QueryParser;
|
||||
import it.cavallium.dbengine.client.query.current.data.QueryParams;
|
||||
import it.cavallium.dbengine.database.LLKeyScore;
|
||||
import it.cavallium.dbengine.lucene.LuceneUtils;
|
||||
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.lucene.search.CollectorManager;
|
||||
import org.apache.lucene.search.FieldDoc;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.ScoreDoc;
|
||||
import org.apache.lucene.search.ScoreMode;
|
||||
import org.apache.lucene.search.Sort;
|
||||
import org.apache.lucene.search.TopDocs;
|
||||
import org.apache.lucene.search.TopFieldCollector;
|
||||
import org.apache.lucene.search.TopFieldDocs;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Scheduler;
|
||||
@ -19,32 +32,72 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
|
||||
|
||||
@Override
|
||||
public Mono<LuceneSearchResult> collect(IndexSearcher indexSearcher,
|
||||
QueryParams queryParams,
|
||||
LocalQueryParams queryParams,
|
||||
String keyFieldName,
|
||||
Scheduler scheduler) {
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null");
|
||||
Query luceneQuery = QueryParser.toQuery(queryParams.query());
|
||||
Sort luceneSort = QueryParser.toSort(queryParams.sort());
|
||||
ScoreMode luceneScoreMode = QueryParser.toScoreMode(queryParams.scoreMode());
|
||||
TopDocs topDocs = TopDocsSearcher.getTopDocs(indexSearcher,
|
||||
luceneQuery,
|
||||
luceneSort,
|
||||
LuceneUtils.safeLongToInt(queryParams.offset() + queryParams.limit()),
|
||||
PaginationInfo paginationInfo;
|
||||
if (queryParams.limit() <= MAX_SINGLE_SEARCH_LIMIT) {
|
||||
paginationInfo = new PaginationInfo(queryParams.limit(), queryParams.offset(), queryParams.limit(), true);
|
||||
} else {
|
||||
paginationInfo = new PaginationInfo(queryParams.limit(), queryParams.offset(), FIRST_PAGE_LIMIT, true);
|
||||
}
|
||||
//noinspection BlockingMethodInNonBlockingContext
|
||||
TopDocs firstPageTopDocs = TopDocsSearcher.getTopDocs(indexSearcher,
|
||||
queryParams.query(),
|
||||
queryParams.sort(),
|
||||
LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset() + paginationInfo.firstPageLimit()),
|
||||
null,
|
||||
luceneScoreMode.needsScores(),
|
||||
queryParams.scoreMode().needsScores(),
|
||||
1000,
|
||||
LuceneUtils.safeLongToInt(queryParams.offset()), LuceneUtils.safeLongToInt(queryParams.limit()));
|
||||
Flux<LLKeyScore> hitsMono = LuceneMultiSearcher
|
||||
LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset()), LuceneUtils.safeLongToInt(paginationInfo.firstPageLimit()));
|
||||
Flux<LLKeyScore> firstPageMono = LuceneMultiSearcher
|
||||
.convertHits(
|
||||
topDocs.scoreDocs,
|
||||
firstPageTopDocs.scoreDocs,
|
||||
IndexSearchers.unsharded(indexSearcher),
|
||||
keyFieldName,
|
||||
scheduler
|
||||
)
|
||||
.take(queryParams.limit(), true);
|
||||
return new LuceneSearchResult(topDocs.totalHits.value, hitsMono);
|
||||
|
||||
|
||||
Flux<LLKeyScore> nextHits = Flux.defer(() -> {
|
||||
if (paginationInfo.totalLimit() - paginationInfo.firstPageLimit() <= 0) {
|
||||
return Flux.empty();
|
||||
}
|
||||
return Flux
|
||||
.<TopDocs, CurrentPageInfo>generate(
|
||||
() -> new CurrentPageInfo(LuceneUtils.getLastScoreDoc(firstPageTopDocs.scoreDocs), paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1),
|
||||
(s, sink) -> {
|
||||
if (s.last() != null && s.remainingLimit() > 0) {
|
||||
TopDocs pageTopDocs;
|
||||
try {
|
||||
//noinspection BlockingMethodInNonBlockingContext
|
||||
pageTopDocs = TopDocsSearcher.getTopDocs(indexSearcher, queryParams.query(),
|
||||
queryParams.sort(), s.currentPageLimit(), s.last(), queryParams.scoreMode().needsScores(), 1000);
|
||||
} catch (IOException e) {
|
||||
sink.error(e);
|
||||
return EMPTY_STATUS;
|
||||
}
|
||||
var pageLastDoc = LuceneUtils.getLastScoreDoc(pageTopDocs.scoreDocs);
|
||||
sink.next(pageTopDocs);
|
||||
return new CurrentPageInfo(pageLastDoc, s.remainingLimit() - s.currentPageLimit(), s.pageIndex() + 1);
|
||||
} else {
|
||||
sink.complete();
|
||||
return EMPTY_STATUS;
|
||||
}
|
||||
},
|
||||
s -> {}
|
||||
)
|
||||
.subscribeOn(scheduler)
|
||||
.concatMap(topFieldDoc -> LuceneMultiSearcher
|
||||
.convertHits(topFieldDoc.scoreDocs, IndexSearchers.unsharded(indexSearcher), keyFieldName, scheduler)
|
||||
);
|
||||
});
|
||||
|
||||
return new LuceneSearchResult(firstPageTopDocs.totalHits.value, firstPageMono.concatWith(nextHits));
|
||||
})
|
||||
.subscribeOn(scheduler);
|
||||
}
|
||||
|
@ -15,17 +15,13 @@ import reactor.core.publisher.Mono;
|
||||
public class UnscoredLuceneMultiSearcher implements LuceneMultiSearcher {
|
||||
|
||||
@Override
|
||||
public Mono<LuceneShardSearcher> createShardSearcher(QueryParams queryParams) {
|
||||
public Mono<LuceneShardSearcher> createShardSearcher(LocalQueryParams queryParams) {
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null");
|
||||
Query luceneQuery = QueryParser.toQuery(queryParams.query());
|
||||
Sort luceneSort = QueryParser.toSort(queryParams.sort());
|
||||
ScoreMode luceneScoreMode = QueryParser.toScoreMode(queryParams.scoreMode());
|
||||
if (luceneScoreMode.needsScores()) {
|
||||
if (queryParams.scoreMode().needsScores()) {
|
||||
throw new UnsupportedOperationException("Can't use the unscored searcher to do a scored query");
|
||||
}
|
||||
if (luceneSort != null && luceneSort != Sort.RELEVANCE) {
|
||||
if (queryParams.sort() != null && queryParams.sort() != Sort.RELEVANCE) {
|
||||
throw new UnsupportedOperationException("Can't use the unscored searcher to do a sorted query");
|
||||
}
|
||||
PaginationInfo paginationInfo;
|
||||
@ -39,7 +35,7 @@ public class UnscoredLuceneMultiSearcher implements LuceneMultiSearcher {
|
||||
null,
|
||||
1000
|
||||
), queryParams.offset(), queryParams.limit());
|
||||
return new UnscoredLuceneShardSearcher(unsortedCollectorManager, luceneQuery, paginationInfo);
|
||||
return new UnscoredLuceneShardSearcher(unsortedCollectorManager, queryParams.query(), paginationInfo);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -40,7 +40,7 @@ class UnscoredLuceneShardSearcher implements LuceneShardSearcher {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> searchOn(IndexSearcher indexSearcher, QueryParams queryParams, Scheduler scheduler) {
|
||||
public Mono<Void> searchOn(IndexSearcher indexSearcher, LocalQueryParams queryParams, Scheduler scheduler) {
|
||||
return Mono.<Void>fromCallable(() -> {
|
||||
TopDocsCollector<? extends ScoreDoc> collector;
|
||||
synchronized (lock) {
|
||||
@ -56,7 +56,7 @@ class UnscoredLuceneShardSearcher implements LuceneShardSearcher {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<LuceneSearchResult> collect(QueryParams queryParams, String keyFieldName, Scheduler scheduler) {
|
||||
public Mono<LuceneSearchResult> collect(LocalQueryParams queryParams, String keyFieldName, Scheduler scheduler) {
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
TopDocs[] topDocs;
|
||||
@ -93,7 +93,7 @@ class UnscoredLuceneShardSearcher implements LuceneShardSearcher {
|
||||
(s, sink) -> {
|
||||
if (s.last() != null && s.remainingLimit() > 0 && s.currentPageLimit() > 0) {
|
||||
Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null");
|
||||
Query luceneQuery = QueryParser.toQuery(queryParams.query());
|
||||
Query luceneQuery = queryParams.query();
|
||||
UnsortedCollectorManager currentPageUnsortedCollectorManager = new UnsortedCollectorManager(() -> TopDocsSearcher.getTopDocsCollector(null,
|
||||
s.currentPageLimit(),
|
||||
s.last(),
|
||||
|
Loading…
Reference in New Issue
Block a user