Rewrite lucene transformers and implement MoreLikeThis sharding
This commit is contained in:
parent
5cfb5f49cd
commit
ca37d1fb68
@ -399,78 +399,6 @@ public class LLUtils {
|
||||
.doOnDiscard(Send.class, Send::close);
|
||||
}
|
||||
|
||||
public static Mono<LocalQueryParams> getMoreLikeThisQuery(
|
||||
List<LLIndexSearcher> indexSearchers,
|
||||
@Nullable LLSnapshot snapshot,
|
||||
LocalQueryParams localQueryParams,
|
||||
Analyzer analyzer,
|
||||
Similarity similarity,
|
||||
Flux<Tuple2<String, Set<String>>> mltDocumentFieldsFlux) {
|
||||
Query luceneAdditionalQuery;
|
||||
try {
|
||||
luceneAdditionalQuery = localQueryParams.query();
|
||||
} catch (Exception e) {
|
||||
return Mono.error(e);
|
||||
}
|
||||
return mltDocumentFieldsFlux
|
||||
.collectMap(Tuple2::getT1, Tuple2::getT2, HashMap::new)
|
||||
.flatMap(mltDocumentFields -> Mono.fromCallable(() -> {
|
||||
mltDocumentFields.entrySet().removeIf(entry -> entry.getValue().isEmpty());
|
||||
if (mltDocumentFields.isEmpty()) {
|
||||
return new LocalQueryParams(new MatchNoDocsQuery(),
|
||||
localQueryParams.offset(),
|
||||
localQueryParams.limit(),
|
||||
localQueryParams.minCompetitiveScore(),
|
||||
localQueryParams.sort(),
|
||||
localQueryParams.scoreMode()
|
||||
);
|
||||
}
|
||||
MultiMoreLikeThis mlt;
|
||||
if (indexSearchers.size() == 1) {
|
||||
mlt = new MultiMoreLikeThis(indexSearchers.get(0).getIndexReader(), null);
|
||||
} else {
|
||||
IndexReader[] indexReaders = new IndexReader[indexSearchers.size()];
|
||||
for (int i = 0, size = indexSearchers.size(); i < size; i++) {
|
||||
indexReaders[i] = indexSearchers.get(i).getIndexReader();
|
||||
}
|
||||
mlt = new MultiMoreLikeThis(indexReaders, null);
|
||||
}
|
||||
mlt.setAnalyzer(analyzer);
|
||||
mlt.setFieldNames(mltDocumentFields.keySet().toArray(String[]::new));
|
||||
mlt.setMinTermFreq(1);
|
||||
mlt.setMinDocFreq(3);
|
||||
mlt.setMaxDocFreqPct(20);
|
||||
mlt.setBoost(localQueryParams.scoreMode().needsScores());
|
||||
mlt.setStopWords(EnglishItalianStopFilter.getStopWordsString());
|
||||
if (similarity instanceof TFIDFSimilarity tfidfSimilarity) {
|
||||
mlt.setSimilarity(tfidfSimilarity);
|
||||
} else {
|
||||
mlt.setSimilarity(new ClassicSimilarity());
|
||||
}
|
||||
|
||||
// Get the reference docId and apply it to MoreLikeThis, to generate the query
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
var mltQuery = mlt.like((Map) mltDocumentFields);
|
||||
Query luceneQuery;
|
||||
if (!(luceneAdditionalQuery instanceof MatchAllDocsQuery)) {
|
||||
luceneQuery = new BooleanQuery.Builder()
|
||||
.add(mltQuery, Occur.MUST)
|
||||
.add(new ConstantScoreQuery(luceneAdditionalQuery), Occur.MUST)
|
||||
.build();
|
||||
} else {
|
||||
luceneQuery = mltQuery;
|
||||
}
|
||||
|
||||
return new LocalQueryParams(luceneQuery,
|
||||
localQueryParams.offset(),
|
||||
localQueryParams.limit(),
|
||||
localQueryParams.minCompetitiveScore(),
|
||||
localQueryParams.sort(),
|
||||
localQueryParams.scoreMode()
|
||||
);
|
||||
}).subscribeOn(Schedulers.boundedElastic()));
|
||||
}
|
||||
|
||||
public static record DirectBuffer(@NotNull Send<Buffer> buffer, @NotNull ByteBuffer byteBuffer) {}
|
||||
|
||||
@NotNull
|
||||
|
@ -1,81 +0,0 @@
|
||||
package it.cavallium.dbengine.database.disk;
|
||||
|
||||
import io.net5.buffer.api.Drop;
|
||||
import io.net5.buffer.api.Owned;
|
||||
import io.net5.buffer.api.Send;
|
||||
import io.net5.buffer.api.internal.ResourceSupport;
|
||||
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
|
||||
public class LLIndexContext extends ResourceSupport<LLIndexContext, LLIndexContext> {
|
||||
|
||||
private LLIndexSearcher indexSearcher;
|
||||
private LLSearchTransformer indexQueryTransformer;
|
||||
|
||||
protected LLIndexContext(Send<LLIndexSearcher> indexSearcher,
|
||||
LLSearchTransformer indexQueryTransformer,
|
||||
Drop<LLIndexContext> drop) {
|
||||
super(new CloseOnDrop(drop));
|
||||
this.indexSearcher = indexSearcher.receive();
|
||||
this.indexQueryTransformer = indexQueryTransformer;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RuntimeException createResourceClosedException() {
|
||||
return new IllegalStateException("Closed");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Owned<LLIndexContext> prepareSend() {
|
||||
var indexSearcher = this.indexSearcher.send();
|
||||
var indexQueryTransformer = this.indexQueryTransformer;
|
||||
makeInaccessible();
|
||||
return drop -> new LLIndexContext(indexSearcher, indexQueryTransformer, drop);
|
||||
}
|
||||
|
||||
private void makeInaccessible() {
|
||||
this.indexSearcher = null;
|
||||
this.indexQueryTransformer = null;
|
||||
}
|
||||
|
||||
public IndexSearcher getIndexSearcher() {
|
||||
if (!isOwned()) {
|
||||
throw new UnsupportedOperationException("Closed");
|
||||
}
|
||||
return indexSearcher.getIndexSearcher();
|
||||
}
|
||||
|
||||
public IndexReader getIndexReader() {
|
||||
if (!isOwned()) {
|
||||
throw new UnsupportedOperationException("Closed");
|
||||
}
|
||||
return indexSearcher.getIndexReader();
|
||||
}
|
||||
|
||||
public LLSearchTransformer getIndexQueryTransformer() {
|
||||
if (!isOwned()) {
|
||||
throw new UnsupportedOperationException("Closed");
|
||||
}
|
||||
return indexQueryTransformer;
|
||||
}
|
||||
|
||||
private static class CloseOnDrop implements Drop<LLIndexContext> {
|
||||
|
||||
private final Drop<LLIndexContext> delegate;
|
||||
|
||||
public CloseOnDrop(Drop<LLIndexContext> drop) {
|
||||
this.delegate = drop;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void drop(LLIndexContext obj) {
|
||||
try {
|
||||
if (obj.indexSearcher != null) obj.indexSearcher.close();
|
||||
delegate.drop(obj);
|
||||
} finally {
|
||||
obj.makeInaccessible();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -20,39 +20,39 @@ import org.apache.lucene.index.MultiReader;
|
||||
import org.apache.lucene.index.StoredFieldVisitor;
|
||||
import org.apache.lucene.index.Term;
|
||||
|
||||
public interface LLIndexContexts extends Resource<LLIndexContexts> {
|
||||
public interface LLIndexSearchers extends Resource<LLIndexSearchers> {
|
||||
|
||||
static LLIndexContexts of(List<Send<LLIndexContext>> indexSearchers) {
|
||||
static LLIndexSearchers of(List<Send<LLIndexSearcher>> indexSearchers) {
|
||||
return new ShardedIndexSearchers(indexSearchers, d -> {});
|
||||
}
|
||||
|
||||
static UnshardedIndexSearchers unsharded(Send<LLIndexContext> indexSearcher) {
|
||||
static UnshardedIndexSearchers unsharded(Send<LLIndexSearcher> indexSearcher) {
|
||||
return new UnshardedIndexSearchers(indexSearcher, d -> {});
|
||||
}
|
||||
|
||||
Iterable<LLIndexContext> shards();
|
||||
Iterable<LLIndexSearcher> shards();
|
||||
|
||||
LLIndexContext shard(int shardIndex);
|
||||
LLIndexSearcher shard(int shardIndex);
|
||||
|
||||
IndexReader allShards();
|
||||
|
||||
class UnshardedIndexSearchers extends ResourceSupport<LLIndexContexts, UnshardedIndexSearchers>
|
||||
implements LLIndexContexts {
|
||||
class UnshardedIndexSearchers extends ResourceSupport<LLIndexSearchers, UnshardedIndexSearchers>
|
||||
implements LLIndexSearchers {
|
||||
|
||||
private LLIndexContext indexSearcher;
|
||||
private LLIndexSearcher indexSearcher;
|
||||
|
||||
public UnshardedIndexSearchers(Send<LLIndexContext> indexSearcher, Drop<UnshardedIndexSearchers> drop) {
|
||||
public UnshardedIndexSearchers(Send<LLIndexSearcher> indexSearcher, Drop<UnshardedIndexSearchers> drop) {
|
||||
super(new CloseOnDrop(drop));
|
||||
this.indexSearcher = indexSearcher.receive();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterable<LLIndexContext> shards() {
|
||||
public Iterable<LLIndexSearcher> shards() {
|
||||
return Collections.singleton(indexSearcher);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LLIndexContext shard(int shardIndex) {
|
||||
public LLIndexSearcher shard(int shardIndex) {
|
||||
if (!isOwned()) {
|
||||
throw attachTrace(new IllegalStateException("UnshardedIndexSearchers must be owned to be used"));
|
||||
}
|
||||
@ -67,7 +67,7 @@ public interface LLIndexContexts extends Resource<LLIndexContexts> {
|
||||
return indexSearcher.getIndexReader();
|
||||
}
|
||||
|
||||
public LLIndexContext shard() {
|
||||
public LLIndexSearcher shard() {
|
||||
return this.shard(0);
|
||||
}
|
||||
|
||||
@ -78,7 +78,7 @@ public interface LLIndexContexts extends Resource<LLIndexContexts> {
|
||||
|
||||
@Override
|
||||
protected Owned<UnshardedIndexSearchers> prepareSend() {
|
||||
Send<LLIndexContext> indexSearcher = this.indexSearcher.send();
|
||||
Send<LLIndexSearcher> indexSearcher = this.indexSearcher.send();
|
||||
this.makeInaccessible();
|
||||
return drop -> new UnshardedIndexSearchers(indexSearcher, drop);
|
||||
}
|
||||
@ -107,26 +107,26 @@ public interface LLIndexContexts extends Resource<LLIndexContexts> {
|
||||
}
|
||||
}
|
||||
|
||||
class ShardedIndexSearchers extends ResourceSupport<LLIndexContexts, ShardedIndexSearchers>
|
||||
implements LLIndexContexts {
|
||||
class ShardedIndexSearchers extends ResourceSupport<LLIndexSearchers, ShardedIndexSearchers>
|
||||
implements LLIndexSearchers {
|
||||
|
||||
private List<LLIndexContext> indexSearchers;
|
||||
private List<LLIndexSearcher> indexSearchers;
|
||||
|
||||
public ShardedIndexSearchers(List<Send<LLIndexContext>> indexSearchers, Drop<ShardedIndexSearchers> drop) {
|
||||
public ShardedIndexSearchers(List<Send<LLIndexSearcher>> indexSearchers, Drop<ShardedIndexSearchers> drop) {
|
||||
super(new CloseOnDrop(drop));
|
||||
this.indexSearchers = new ArrayList<>(indexSearchers.size());
|
||||
for (Send<LLIndexContext> indexSearcher : indexSearchers) {
|
||||
for (Send<LLIndexSearcher> indexSearcher : indexSearchers) {
|
||||
this.indexSearchers.add(indexSearcher.receive());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterable<LLIndexContext> shards() {
|
||||
public Iterable<LLIndexSearcher> shards() {
|
||||
return Collections.unmodifiableList(indexSearchers);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LLIndexContext shard(int shardIndex) {
|
||||
public LLIndexSearcher shard(int shardIndex) {
|
||||
if (!isOwned()) {
|
||||
throw attachTrace(new IllegalStateException("ShardedIndexSearchers must be owned to be used"));
|
||||
}
|
||||
@ -161,8 +161,8 @@ public interface LLIndexContexts extends Resource<LLIndexContexts> {
|
||||
|
||||
@Override
|
||||
protected Owned<ShardedIndexSearchers> prepareSend() {
|
||||
List<Send<LLIndexContext>> indexSearchers = new ArrayList<>(this.indexSearchers.size());
|
||||
for (LLIndexContext indexSearcher : this.indexSearchers) {
|
||||
List<Send<LLIndexSearcher>> indexSearchers = new ArrayList<>(this.indexSearchers.size());
|
||||
for (LLIndexSearcher indexSearcher : this.indexSearchers) {
|
||||
indexSearchers.add(indexSearcher.send());
|
||||
}
|
||||
this.makeInaccessible();
|
||||
@ -185,7 +185,7 @@ public interface LLIndexContexts extends Resource<LLIndexContexts> {
|
||||
public void drop(ShardedIndexSearchers obj) {
|
||||
try {
|
||||
if (obj.indexSearchers != null) {
|
||||
for (LLIndexContext indexSearcher : obj.indexSearchers) {
|
||||
for (LLIndexSearcher indexSearcher : obj.indexSearchers) {
|
||||
indexSearcher.close();
|
||||
}
|
||||
}
|
@ -26,7 +26,6 @@ import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Phaser;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -322,7 +321,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
QueryParams queryParams,
|
||||
String keyFieldName,
|
||||
Flux<Tuple2<String, Set<String>>> mltDocumentFieldsFlux) {
|
||||
return LLUtils
|
||||
return LuceneUtils
|
||||
.getMoreLikeThisQuery(this, snapshot, LuceneUtils.toLocalQueryParams(queryParams), mltDocumentFieldsFlux)
|
||||
.flatMap(modifiedLocalQuery -> searcherManager
|
||||
.retrieveSearcher(snapshot)
|
||||
@ -339,7 +338,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
QueryParams queryParams,
|
||||
Flux<Tuple2<String, Set<String>>> mltDocumentFieldsFlux,
|
||||
LuceneMultiSearcher shardSearcher) {
|
||||
return LLUtils
|
||||
return LuceneUtils
|
||||
.getMoreLikeThisQuery(this, snapshot, LuceneUtils.toLocalQueryParams(queryParams), mltDocumentFieldsFlux)
|
||||
.flatMap(modifiedLocalQuery -> searcherManager
|
||||
.retrieveSearcher(snapshot)
|
||||
@ -363,13 +362,9 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
.doOnDiscard(Send.class, Send::close);
|
||||
}
|
||||
|
||||
public Mono<Send<LLIndexContext>> retrieveContext(@Nullable LLSnapshot snapshot,
|
||||
@Nullable LLSearchTransformer indexQueryTransformer) {
|
||||
public Mono<Send<LLIndexSearcher>> retrieveSearcher(@Nullable LLSnapshot snapshot) {
|
||||
return searcherManager
|
||||
.retrieveSearcher(snapshot)
|
||||
.map(indexSearcherToReceive -> new LLIndexContext(indexSearcherToReceive,
|
||||
Objects.requireNonNullElse(indexQueryTransformer, LLSearchTransformer.NO_TRANSFORMATION),
|
||||
d -> {}).send())
|
||||
.doOnDiscard(Send.class, Send::close);
|
||||
}
|
||||
|
||||
|
@ -28,7 +28,6 @@ import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Function;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
@ -37,13 +36,11 @@ import reactor.util.function.Tuple2;
|
||||
|
||||
public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
|
||||
|
||||
// Scheduler used to get callback values of LuceneStreamSearcher without creating deadlocks
|
||||
protected final Scheduler luceneSearcherScheduler = LuceneUtils.newLuceneSearcherScheduler(true);
|
||||
|
||||
private final ConcurrentHashMap<Long, LLSnapshot[]> registeredSnapshots = new ConcurrentHashMap<>();
|
||||
private final AtomicLong nextSnapshotNumber = new AtomicLong(1);
|
||||
private final LLLocalLuceneIndex[] luceneIndices;
|
||||
|
||||
private final IndicizerAnalyzers indicizerAnalyzers;
|
||||
private final IndicizerSimilarities indicizerSimilarities;
|
||||
|
||||
private final LuceneMultiSearcher multiSearcher = new AdaptiveLuceneMultiSearcher();
|
||||
|
||||
@ -74,6 +71,8 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
|
||||
);
|
||||
}
|
||||
this.luceneIndices = luceneIndices;
|
||||
this.indicizerAnalyzers = indicizerAnalyzers;
|
||||
this.indicizerSimilarities = indicizerSimilarities;
|
||||
}
|
||||
|
||||
private LLLocalLuceneIndex getLuceneIndex(LLTerm id) {
|
||||
@ -89,18 +88,19 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
|
||||
return luceneIndices[0].getLuceneIndexName();
|
||||
}
|
||||
|
||||
private Flux<Send<LLIndexContext>> getIndexContexts(LLSnapshot snapshot,
|
||||
Function<LLLocalLuceneIndex, LLSearchTransformer> indexQueryTransformers) {
|
||||
private Mono<Send<LLIndexSearchers>> getIndexSearchers(LLSnapshot snapshot) {
|
||||
return Flux
|
||||
.fromArray(luceneIndices)
|
||||
.index()
|
||||
// Resolve the snapshot of each shard
|
||||
.flatMap(tuple -> Mono
|
||||
.fromCallable(() -> resolveSnapshotOptional(snapshot, (int) (long) tuple.getT1()))
|
||||
.flatMap(luceneSnapshot -> tuple.getT2().retrieveContext(
|
||||
luceneSnapshot.orElse(null), indexQueryTransformers.apply(tuple.getT2()))
|
||||
.flatMap(luceneSnapshot -> tuple.getT2().retrieveSearcher(
|
||||
luceneSnapshot.orElse(null))
|
||||
)
|
||||
);
|
||||
)
|
||||
.collectList()
|
||||
.map(searchers -> LLIndexSearchers.of(searchers).send());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -198,38 +198,18 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
|
||||
String keyFieldName,
|
||||
Flux<Tuple2<String, Set<String>>> mltDocumentFields) {
|
||||
LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams);
|
||||
Flux<Send<LLIndexContext>> serchers = this
|
||||
.getIndexContexts(snapshot, luceneIndex -> LLSearchTransformer.NO_TRANSFORMATION);
|
||||
var searchers = this.getIndexSearchers(snapshot);
|
||||
var transformer = new MoreLikeThisTransformer(mltDocumentFields);
|
||||
|
||||
// Collect all the shards results into a single global result
|
||||
return multiSearcher
|
||||
.collect(serchers, localQueryParams, keyFieldName)
|
||||
.collectMulti(searchers, localQueryParams, keyFieldName, transformer)
|
||||
// Transform the result type
|
||||
.map(resultToReceive -> {
|
||||
var result = resultToReceive.receive();
|
||||
return new LLSearchResultShard(result.results(), result.totalHitsCount(),
|
||||
d -> result.close()).send();
|
||||
});
|
||||
|
||||
return multiSearcher
|
||||
// Create shard searcher
|
||||
.createShardSearcher(localQueryParams)
|
||||
.flatMap(shardSearcher -> Flux
|
||||
// Iterate the indexed shards
|
||||
.fromArray(luceneIndices).index()
|
||||
// Resolve the snapshot of each shard
|
||||
.flatMap(tuple -> Mono
|
||||
.fromCallable(() -> resolveSnapshotOptional(snapshot, (int) (long) tuple.getT1()))
|
||||
.map(luceneSnapshot -> new LuceneIndexWithSnapshot(tuple.getT2(), luceneSnapshot))
|
||||
)
|
||||
// Execute the query and collect it using the shard searcher
|
||||
.flatMap(luceneIndexWithSnapshot -> luceneIndexWithSnapshot.luceneIndex()
|
||||
.distributedMoreLikeThis(luceneIndexWithSnapshot.snapshot.orElse(null), queryParams, mltDocumentFields, shardSearcher))
|
||||
// Collect all the shards results into a single global result
|
||||
.then(shardSearcher.collect(localQueryParams, keyFieldName, luceneSearcherScheduler))
|
||||
)
|
||||
// Fix the result type
|
||||
.map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result.release()));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -237,11 +217,11 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
|
||||
QueryParams queryParams,
|
||||
String keyFieldName) {
|
||||
LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams);
|
||||
Flux<Send<LLIndexSearcher>> serchers = getIndexContexts(snapshot);
|
||||
var searchers = getIndexSearchers(snapshot);
|
||||
|
||||
// Collect all the shards results into a single global result
|
||||
return multiSearcher
|
||||
.collect(serchers, localQueryParams, keyFieldName)
|
||||
.collectMulti(searchers, localQueryParams, keyFieldName, LLSearchTransformer.NO_TRANSFORMATION)
|
||||
// Transform the result type
|
||||
.map(resultToReceive -> {
|
||||
var result = resultToReceive.receive();
|
||||
@ -310,29 +290,22 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
|
||||
|
||||
private class MoreLikeThisTransformer implements LLSearchTransformer {
|
||||
|
||||
private final LLLocalLuceneIndex luceneIndex;
|
||||
private final LLSnapshot snapshot;
|
||||
private final String keyFieldName;
|
||||
private final Flux<Tuple2<String, Set<String>>> mltDocumentFields;
|
||||
|
||||
public MoreLikeThisTransformer(LLLocalLuceneIndex luceneIndex,
|
||||
@Nullable LLSnapshot snapshot,
|
||||
String keyFieldName,
|
||||
Flux<Tuple2<String, Set<String>>> mltDocumentFields) {
|
||||
this.luceneIndex = luceneIndex;
|
||||
this.snapshot = snapshot;
|
||||
this.keyFieldName = keyFieldName;
|
||||
public MoreLikeThisTransformer(Flux<Tuple2<String, Set<String>>> mltDocumentFields) {
|
||||
this.mltDocumentFields = mltDocumentFields;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<LocalQueryParams> transform(Mono<LocalQueryParams> queryParamsMono) {
|
||||
return queryParamsMono
|
||||
.flatMap(queryParams -> {
|
||||
luceneIndex.getMoreLikeThisTransformer(snapshot, queryParams, mltDocumentFields, );
|
||||
});
|
||||
LLLocalMultiLuceneIndex.this.
|
||||
return null;
|
||||
public Mono<LocalQueryParams> transform(Mono<TransformerInput> inputMono) {
|
||||
return inputMono.flatMap(input -> {
|
||||
var defaultAnalyzer = LLLocalMultiLuceneIndex.this.indicizerAnalyzers.defaultAnalyzer();
|
||||
var defaultSimilarity = LLLocalMultiLuceneIndex.this.indicizerSimilarities.defaultSimilarity();
|
||||
var luceneAnalyzer = LuceneUtils.getAnalyzer(defaultAnalyzer);
|
||||
var luceneSimilarity = LuceneUtils.getSimilarity(defaultSimilarity);
|
||||
return LuceneUtils.getMoreLikeThisQuery(input.indexSearchers(), input.queryParams(),
|
||||
luceneAnalyzer, luceneSimilarity, mltDocumentFields);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -6,16 +6,20 @@ import it.cavallium.dbengine.client.IndicizerSimilarities;
|
||||
import it.cavallium.dbengine.client.query.QueryParser;
|
||||
import it.cavallium.dbengine.client.query.current.data.QueryParams;
|
||||
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
|
||||
import it.cavallium.dbengine.database.EnglishItalianStopFilter;
|
||||
import it.cavallium.dbengine.database.LLKeyScore;
|
||||
import it.cavallium.dbengine.database.LLSnapshot;
|
||||
import it.cavallium.dbengine.database.collections.DatabaseMapDictionary;
|
||||
import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep;
|
||||
import it.cavallium.dbengine.database.collections.ValueGetter;
|
||||
import it.cavallium.dbengine.database.disk.LLIndexContexts;
|
||||
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
|
||||
import it.cavallium.dbengine.lucene.analyzer.NCharGramAnalyzer;
|
||||
import it.cavallium.dbengine.lucene.analyzer.NCharGramEdgeAnalyzer;
|
||||
import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
|
||||
import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity;
|
||||
import it.cavallium.dbengine.lucene.analyzer.WordAnalyzer;
|
||||
import it.cavallium.dbengine.lucene.mlt.MultiMoreLikeThis;
|
||||
import it.cavallium.dbengine.lucene.searcher.LocalQueryParams;
|
||||
import it.cavallium.dbengine.lucene.similarity.NGramSimilarity;
|
||||
import java.io.EOFException;
|
||||
@ -24,9 +28,11 @@ import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.lucene.analysis.Analyzer;
|
||||
import org.apache.lucene.analysis.LowerCaseFilter;
|
||||
@ -39,7 +45,13 @@ import org.apache.lucene.analysis.standard.StandardAnalyzer;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.IndexableField;
|
||||
import org.apache.lucene.search.BooleanClause.Occur;
|
||||
import org.apache.lucene.search.BooleanQuery;
|
||||
import org.apache.lucene.search.ConstantScoreQuery;
|
||||
import org.apache.lucene.search.FieldDoc;
|
||||
import org.apache.lucene.search.MatchAllDocsQuery;
|
||||
import org.apache.lucene.search.MatchNoDocsQuery;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.ScoreDoc;
|
||||
import org.apache.lucene.search.Sort;
|
||||
import org.apache.lucene.search.TopDocs;
|
||||
@ -49,6 +61,7 @@ import org.apache.lucene.search.similarities.BooleanSimilarity;
|
||||
import org.apache.lucene.search.similarities.ClassicSimilarity;
|
||||
import org.apache.lucene.search.similarities.PerFieldSimilarityWrapper;
|
||||
import org.apache.lucene.search.similarities.Similarity;
|
||||
import org.apache.lucene.search.similarities.TFIDFSimilarity;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.novasearch.lucene.search.similarities.BM25Similarity;
|
||||
@ -59,9 +72,11 @@ import org.novasearch.lucene.search.similarities.RobertsonSimilarity;
|
||||
import org.warp.commonutils.log.Logger;
|
||||
import org.warp.commonutils.log.LoggerFactory;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Scheduler;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
import reactor.util.concurrent.Queues;
|
||||
import reactor.util.function.Tuple2;
|
||||
|
||||
public class LuceneUtils {
|
||||
|
||||
@ -505,4 +520,75 @@ public class LuceneUtils {
|
||||
true
|
||||
);
|
||||
}
|
||||
|
||||
public static Mono<LocalQueryParams> getMoreLikeThisQuery(
|
||||
List<LLIndexSearcher> indexSearchers,
|
||||
LocalQueryParams localQueryParams,
|
||||
Analyzer analyzer,
|
||||
Similarity similarity,
|
||||
Flux<Tuple2<String, Set<String>>> mltDocumentFieldsFlux) {
|
||||
Query luceneAdditionalQuery;
|
||||
try {
|
||||
luceneAdditionalQuery = localQueryParams.query();
|
||||
} catch (Exception e) {
|
||||
return Mono.error(e);
|
||||
}
|
||||
return mltDocumentFieldsFlux
|
||||
.collectMap(Tuple2::getT1, Tuple2::getT2, HashMap::new)
|
||||
.flatMap(mltDocumentFields -> Mono.fromCallable(() -> {
|
||||
mltDocumentFields.entrySet().removeIf(entry -> entry.getValue().isEmpty());
|
||||
if (mltDocumentFields.isEmpty()) {
|
||||
return new LocalQueryParams(new MatchNoDocsQuery(),
|
||||
localQueryParams.offset(),
|
||||
localQueryParams.limit(),
|
||||
localQueryParams.minCompetitiveScore(),
|
||||
localQueryParams.sort(),
|
||||
localQueryParams.scoreMode()
|
||||
);
|
||||
}
|
||||
MultiMoreLikeThis mlt;
|
||||
if (indexSearchers.size() == 1) {
|
||||
mlt = new MultiMoreLikeThis(indexSearchers.get(0).getIndexReader(), null);
|
||||
} else {
|
||||
IndexReader[] indexReaders = new IndexReader[indexSearchers.size()];
|
||||
for (int i = 0, size = indexSearchers.size(); i < size; i++) {
|
||||
indexReaders[i] = indexSearchers.get(i).getIndexReader();
|
||||
}
|
||||
mlt = new MultiMoreLikeThis(indexReaders, null);
|
||||
}
|
||||
mlt.setAnalyzer(analyzer);
|
||||
mlt.setFieldNames(mltDocumentFields.keySet().toArray(String[]::new));
|
||||
mlt.setMinTermFreq(1);
|
||||
mlt.setMinDocFreq(3);
|
||||
mlt.setMaxDocFreqPct(20);
|
||||
mlt.setBoost(localQueryParams.scoreMode().needsScores());
|
||||
mlt.setStopWords(EnglishItalianStopFilter.getStopWordsString());
|
||||
if (similarity instanceof TFIDFSimilarity tfidfSimilarity) {
|
||||
mlt.setSimilarity(tfidfSimilarity);
|
||||
} else {
|
||||
mlt.setSimilarity(new ClassicSimilarity());
|
||||
}
|
||||
|
||||
// Get the reference docId and apply it to MoreLikeThis, to generate the query
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
var mltQuery = mlt.like((Map) mltDocumentFields);
|
||||
Query luceneQuery;
|
||||
if (!(luceneAdditionalQuery instanceof MatchAllDocsQuery)) {
|
||||
luceneQuery = new BooleanQuery.Builder()
|
||||
.add(mltQuery, Occur.MUST)
|
||||
.add(new ConstantScoreQuery(luceneAdditionalQuery), Occur.MUST)
|
||||
.build();
|
||||
} else {
|
||||
luceneQuery = mltQuery;
|
||||
}
|
||||
|
||||
return new LocalQueryParams(luceneQuery,
|
||||
localQueryParams.offset(),
|
||||
localQueryParams.limit(),
|
||||
localQueryParams.minCompetitiveScore(),
|
||||
localQueryParams.sort(),
|
||||
localQueryParams.scoreMode()
|
||||
);
|
||||
}).subscribeOn(Schedulers.boundedElastic()));
|
||||
}
|
||||
}
|
||||
|
@ -1,10 +1,17 @@
|
||||
package it.cavallium.dbengine.lucene.searcher;
|
||||
|
||||
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
|
||||
import java.util.List;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public interface LLSearchTransformer {
|
||||
|
||||
LLSearchTransformer NO_TRANSFORMATION = queryParamsMono -> queryParamsMono;
|
||||
LLSearchTransformer NO_TRANSFORMATION = queryParamsMono -> queryParamsMono
|
||||
.map(TransformerInput::queryParams);
|
||||
|
||||
Mono<LocalQueryParams> transform(Mono<LocalQueryParams> queryParamsMono);
|
||||
record TransformerInput(List<LLIndexSearcher> indexSearchers,
|
||||
LocalQueryParams queryParams) {}
|
||||
|
||||
Mono<LocalQueryParams> transform(Mono<TransformerInput> inputMono);
|
||||
}
|
||||
|
@ -9,10 +9,12 @@ public interface LuceneLocalSearcher {
|
||||
|
||||
/**
|
||||
* @param indexSearcherMono Lucene index searcher
|
||||
* @param queryParams the query parameters
|
||||
* @param keyFieldName the name of the key field
|
||||
* @param queryParams the query parameters
|
||||
* @param keyFieldName the name of the key field
|
||||
* @param transformer the search query transformer
|
||||
*/
|
||||
Mono<Send<LuceneSearchResult>> collect(Mono<Send<LLIndexContext>> indexSearcherMono,
|
||||
Mono<Send<LuceneSearchResult>> collect(Mono<Send<LLIndexSearcher>> indexSearcherMono,
|
||||
LocalQueryParams queryParams,
|
||||
String keyFieldName);
|
||||
String keyFieldName,
|
||||
LLSearchTransformer transformer);
|
||||
}
|
||||
|
@ -3,28 +3,35 @@ package it.cavallium.dbengine.lucene.searcher;
|
||||
import io.net5.buffer.api.Send;
|
||||
import it.cavallium.dbengine.database.disk.LLIndexContext;
|
||||
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
|
||||
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public interface LuceneMultiSearcher extends LuceneLocalSearcher {
|
||||
|
||||
/**
|
||||
* @param queryParams the query parameters
|
||||
* @param keyFieldName the name of the key field
|
||||
* @param indexSearchersMono Lucene index searcher
|
||||
* @param queryParams the query parameters
|
||||
* @param keyFieldName the name of the key field
|
||||
* @param transformer the search query transformer
|
||||
*/
|
||||
Mono<Send<LuceneSearchResult>> collect(Flux<Send<LLIndexContext>> indexSearchersFlux,
|
||||
Mono<Send<LuceneSearchResult>> collectMulti(Mono<Send<LLIndexSearchers>> indexSearchersMono,
|
||||
LocalQueryParams queryParams,
|
||||
String keyFieldName);
|
||||
String keyFieldName,
|
||||
LLSearchTransformer transformer);
|
||||
|
||||
/**
|
||||
* @param indexSearcherMono Lucene index searcher
|
||||
* @param queryParams the query parameters
|
||||
* @param keyFieldName the name of the key field
|
||||
* @param queryParams the query parameters
|
||||
* @param keyFieldName the name of the key field
|
||||
* @param transformer the search query transformer
|
||||
*/
|
||||
@Override
|
||||
default Mono<Send<LuceneSearchResult>> collect(Mono<Send<LLIndexContext>> indexSearcherMono,
|
||||
default Mono<Send<LuceneSearchResult>> collect(Mono<Send<LLIndexSearcher>> indexSearcherMono,
|
||||
LocalQueryParams queryParams,
|
||||
String keyFieldName) {
|
||||
return this.collect(indexSearcherMono.flux(), queryParams, keyFieldName);
|
||||
String keyFieldName,
|
||||
LLSearchTransformer transformer) {
|
||||
var searchers = indexSearcherMono.map(a -> LLIndexSearchers.unsharded(a).send());
|
||||
return this.collectMulti(searchers, queryParams, keyFieldName, transformer);
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user