Global scores between multiple lucene shards

This commit is contained in:
Andrea Cavalli 2021-02-04 22:42:57 +01:00
parent 059da90ef4
commit 151884b772
34 changed files with 941 additions and 261 deletions

View File

@ -168,7 +168,11 @@
<artifactId>reactor-tools</artifactId>
<version>3.4.2</version>
</dependency>
<dependency>
<groupId>org.novasearch</groupId>
<artifactId>lucene-relevance</artifactId>
<version>8.0.0.0.5</version>
</dependency>
</dependencies>
<build>
<testSourceDirectory>src/test/java</testSourceDirectory>

View File

@ -5,9 +5,11 @@ import it.cavallium.dbengine.database.LLItem;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLScoreMode;
import it.cavallium.dbengine.database.LLTerm;
import it.cavallium.dbengine.database.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity;
import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
import it.cavallium.dbengine.lucene.serializer.TermQuery;
import it.cavallium.dbengine.lucene.serializer.Query;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
@ -27,12 +29,12 @@ public class IndicizationExample {
.addDocument(new LLTerm("id", "123"),
new LLDocument(new LLItem[]{
LLItem.newStringField("id", "123", Store.YES),
LLItem.newStringField("name", "Mario", Store.NO),
LLItem.newTextField("name", "Mario", Store.NO),
LLItem.newStringField("surname", "Rossi", Store.NO)
})
)
.then(index.refresh())
.then(index.search(null, new TermQuery("name", "Mario"), 1, null, LLScoreMode.COMPLETE_NO_SCORES, "id"))
.then(index.search(null, Query.exactSearch(TextFieldsAnalyzer.PartialString,"name", "Mario"), 1, null, LLScoreMode.COMPLETE, "id"))
.flatMap(results -> results
.results()
.flatMap(r -> r)
@ -45,6 +47,70 @@ public class IndicizationExample {
)
.subscribeOn(Schedulers.parallel())
.block();
tempIndex(true)
.flatMap(index ->
index
.addDocument(new LLTerm("id", "126"),
new LLDocument(new LLItem[]{
LLItem.newStringField("id", "126", Store.YES),
LLItem.newTextField("name", "Marioxq", Store.NO),
LLItem.newStringField("surname", "Rossi", Store.NO)
})
)
.then(index
.addDocument(new LLTerm("id", "123"),
new LLDocument(new LLItem[]{
LLItem.newStringField("id", "123", Store.YES),
LLItem.newTextField("name", "Mario", Store.NO),
LLItem.newStringField("surname", "Rossi", Store.NO)
})
))
.then(index
.addDocument(new LLTerm("id", "124"),
new LLDocument(new LLItem[]{
LLItem.newStringField("id", "124", Store.YES),
LLItem.newTextField("name", "Mariossi", Store.NO),
LLItem.newStringField("surname", "Rossi", Store.NO)
})
))
.then(index
.addDocument(new LLTerm("id", "125"),
new LLDocument(new LLItem[]{
LLItem.newStringField("id", "125", Store.YES),
LLItem.newTextField("name", "Mario marios", Store.NO),
LLItem.newStringField("surname", "Rossi", Store.NO)
})
))
.then(index
.addDocument(new LLTerm("id", "128"),
new LLDocument(new LLItem[]{
LLItem.newStringField("id", "128", Store.YES),
LLItem.newTextField("name", "Marion", Store.NO),
LLItem.newStringField("surname", "Rossi", Store.NO)
})
))
.then(index
.addDocument(new LLTerm("id", "127"),
new LLDocument(new LLItem[]{
LLItem.newStringField("id", "127", Store.YES),
LLItem.newTextField("name", "Mariotto", Store.NO),
LLItem.newStringField("surname", "Rossi", Store.NO)
})
))
.then(index.refresh())
.then(index.search(null, Query.exactSearch(TextFieldsAnalyzer.PartialString,"name", "Mario"), 10, MultiSort.topScore()
.getQuerySort(), LLScoreMode.COMPLETE, "id"))
.flatMap(results -> LuceneUtils.mergeStream(results
.results(), MultiSort.topScoreRaw(), 10)
.doOnNext(value -> System.out.println("Value: " + value))
.then(results.totalHitsCount())
)
.doOnNext(count -> System.out.println("Total hits: " + count))
.doOnTerminate(() -> System.out.println("Completed"))
.then(index.close())
)
.subscribeOn(Schedulers.parallel())
.block();
}
public static final class CurrentCustomType {
@ -86,8 +152,9 @@ public class IndicizationExample {
.subscribeOn(Schedulers.boundedElastic())
.then(new LLLocalDatabaseConnection(wrkspcPath, true).connect())
.flatMap(conn -> conn.getLuceneIndex("testindices",
3,
TextFieldsAnalyzer.PartialWords,
10,
TextFieldsAnalyzer.PartialString,
TextFieldsSimilarity.NGramBM25Plus,
Duration.ofSeconds(5),
Duration.ofSeconds(5),
false

View File

@ -7,7 +7,7 @@ import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLSnapshottable;
import it.cavallium.dbengine.database.LLSort;
import it.cavallium.dbengine.database.LLTerm;
import it.cavallium.dbengine.database.LuceneUtils;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.database.collections.Joiner.ValueGetter;
import it.cavallium.dbengine.lucene.serializer.Query;
import java.util.Map;

View File

@ -1,5 +1,6 @@
package it.cavallium.dbengine.client;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLSort;
import java.util.Comparator;
import java.util.function.ToIntFunction;
@ -63,12 +64,16 @@ public class MultiSort<T> {
return new MultiSort<>(LLSort.newRandomSortField(), (a, b) -> 0);
}
public static MultiSort<LLKeyScore> topScoreRaw() {
return new MultiSort<>(LLSort.newSortScore(), Comparator.comparingDouble(LLKeyScore::getScore).reversed());
}
public static <T> MultiSort<SearchResultKey<T>> topScore() {
return new MultiSort<>(null, Comparator.<SearchResultKey<T>>comparingDouble(SearchResultKey::getScore).reversed());
return new MultiSort<>(LLSort.newSortScore(), Comparator.<SearchResultKey<T>>comparingDouble(SearchResultKey::getScore).reversed());
}
public static <T, U> MultiSort<SearchResultItem<T, U>> topScoreWithValues() {
return new MultiSort<>(null, Comparator.<SearchResultItem<T, U>>comparingDouble(SearchResultItem::getScore).reversed());
return new MultiSort<>(LLSort.newSortScore(), Comparator.<SearchResultItem<T, U>>comparingDouble(SearchResultItem::getScore).reversed());
}
public LLSort getQuerySort() {

View File

@ -0,0 +1,9 @@
package it.cavallium.dbengine.database;
import java.io.IOException;
import org.apache.lucene.search.CollectionStatistics;
public interface LLCollectionStatisticsGetter {
CollectionStatistics collectionStatistics(String field) throws IOException;
}

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.database;
import it.cavallium.dbengine.database.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity;
import java.time.Duration;
import java.util.List;
import reactor.core.publisher.Mono;
@ -15,6 +16,7 @@ public interface LLDatabaseConnection {
Mono<? extends LLLuceneIndex> getLuceneIndex(String name,
int instancesCount,
TextFieldsAnalyzer textFieldsAnalyzer,
TextFieldsSimilarity scorer,
Duration queryRefreshDebounceTime,
Duration commitDebounceTime,
boolean lowMemory);

View File

@ -1,7 +1,22 @@
package it.cavallium.dbengine.database;
public enum LLScoreMode {
import org.apache.lucene.search.Scorer;
public enum LLScoreMode {
/**
* Produced scorers will allow visiting all matches and get their score.
*/
COMPLETE,
TOP_SCORES,
COMPLETE_NO_SCORES
/**
* Produced scorers will allow visiting all matches but scores won't be
* available.
* Much faster in multi-lucene indices than complete, because it will not need global scores calculation.
*/
COMPLETE_NO_SCORES,
/**
* Produced scorers will optionally allow skipping over non-competitive
* hits using the {@link Scorer#setMinCompetitiveScore(float)} API.
* This can reduce time if using setMinCompetitiveScore.
*/
TOP_SCORES
}

View File

@ -0,0 +1,13 @@
package it.cavallium.dbengine.database;
import java.io.IOException;
import org.apache.lucene.search.CollectionStatistics;
import org.apache.lucene.search.IndexSearcher;
public interface LLSearchCollectionStatisticsGetter {
CollectionStatistics collectionStatistics(IndexSearcher indexSearcher,
String field,
boolean distributedPre,
long actionId) throws IOException;
}

View File

@ -25,6 +25,7 @@ public class LLSearchResult {
return (a, b) -> {
var mergedTotals = a.totalHitsCount.flatMap(aL -> b.totalHitsCount.map(bL -> aL + bL));
var mergedResults = Flux.merge(a.results, b.results);
return new LLSearchResult(mergedTotals, mergedResults);
};
}
@ -37,6 +38,10 @@ public class LLSearchResult {
return this.results;
}
public Mono<Void> completion() {
return results.flatMap(r -> r).then();
}
public boolean equals(final Object o) {
if (o == this) {
return true;

View File

@ -22,6 +22,14 @@ public class LLSort {
return new LLSort(null, LLSortType.RANDOM, false);
}
public static LLSort newSortScore() {
return new LLSort(null, LLSortType.SCORE, false);
}
public static LLSort newSortDoc() {
return new LLSort(null, LLSortType.DOC, false);
}
public String getFieldName() {
return fieldName;
}

View File

@ -2,5 +2,7 @@ package it.cavallium.dbengine.database;
public enum LLSortType {
LONG,
RANDOM
RANDOM,
SCORE,
DOC
}

View File

@ -45,6 +45,10 @@ public class LLUtils {
return new Sort(new SortedNumericSortField(sort.getFieldName(), SortField.Type.LONG, sort.isReverse()));
} else if (sort.getType() == LLSortType.RANDOM) {
return new Sort(new RandomSortField());
} else if (sort.getType() == LLSortType.SCORE) {
return new Sort(SortField.FIELD_SCORE);
} else if (sort.getType() == LLSortType.DOC) {
return new Sort(SortField.FIELD_DOC);
}
return null;
}

View File

@ -1,88 +0,0 @@
package it.cavallium.dbengine.database;
import it.cavallium.dbengine.client.MultiSort;
import it.cavallium.dbengine.database.analyzer.N4CharGramAnalyzer;
import it.cavallium.dbengine.database.analyzer.N4CharGramEdgeAnalyzer;
import it.cavallium.dbengine.database.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.database.analyzer.WordAnalyzer;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.LowerCaseFilter;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.en.EnglishPossessiveFilter;
import org.apache.lucene.analysis.en.KStemFilter;
import org.apache.lucene.analysis.miscellaneous.ASCIIFoldingFilter;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
public class LuceneUtils {
private static final Analyzer lucene4CharGramAnalyzerEdgeInstance = new N4CharGramEdgeAnalyzer();
private static final Analyzer lucene4CharGramAnalyzerInstance = new N4CharGramAnalyzer();
private static final Analyzer luceneWordAnalyzerStopWordsAndStemInstance = new WordAnalyzer(true, true);
private static final Analyzer luceneWordAnalyzerStopWordsInstance = new WordAnalyzer(true, false);
private static final Analyzer luceneWordAnalyzerStemInstance = new WordAnalyzer(false, true);
private static final Analyzer luceneWordAnalyzerSimpleInstance = new WordAnalyzer(false, false);
public static Analyzer getAnalyzer(TextFieldsAnalyzer analyzer) {
switch (analyzer) {
case PartialWordsEdge:
return lucene4CharGramAnalyzerEdgeInstance;
case PartialWords:
return lucene4CharGramAnalyzerInstance;
case FullText:
return luceneWordAnalyzerStopWordsAndStemInstance;
case WordWithStopwordsStripping:
return luceneWordAnalyzerStopWordsInstance;
case WordWithStemming:
return luceneWordAnalyzerStemInstance;
case WordSimple:
return luceneWordAnalyzerSimpleInstance;
default:
throw new UnsupportedOperationException("Unknown analyzer: " + analyzer);
}
}
/**
*
* @param stem Enable stem filters on words.
* Pass false if it will be used with a n-gram filter
*/
public static TokenStream newCommonFilter(TokenStream tokenStream, boolean stem) {
tokenStream = newCommonNormalizer(tokenStream);
if (stem) {
tokenStream = new KStemFilter(tokenStream);
tokenStream = new EnglishPossessiveFilter(tokenStream);
}
return tokenStream;
}
public static TokenStream newCommonNormalizer(TokenStream tokenStream) {
tokenStream = new ASCIIFoldingFilter(tokenStream);
tokenStream = new LowerCaseFilter(tokenStream);
return tokenStream;
}
/**
* Merge streams together maintaining absolute order
*/
public static <T> Flux<T> mergeStream(Flux<Flux<T>> mappedMultiResults,
@Nullable MultiSort<T> sort,
@Nullable Integer limit) {
if (limit != null && limit == 0) {
return mappedMultiResults.flatMap(f -> f).ignoreElements().flux();
}
return mappedMultiResults.collectList().flatMapMany(mappedMultiResultsList -> {
Flux<T> mergedFlux;
if (sort == null) {
mergedFlux = Flux.merge(mappedMultiResultsList);
} else {
//noinspection unchecked
mergedFlux = Flux.mergeOrdered(32, sort.getResultSort(), mappedMultiResultsList.toArray(Flux[]::new));
}
if (limit == null) {
return mergedFlux;
} else {
return mergedFlux.take(limit);
}
});
}
}

View File

@ -0,0 +1,33 @@
package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.LLSearchCollectionStatisticsGetter;
import java.io.IOException;
import org.apache.lucene.search.CollectionStatistics;
import org.apache.lucene.search.IndexSearcher;
public class LLIndexSearcherWithCustomCollectionStatistics extends IndexSearcher {
private final IndexSearcher indexSearcher;
private final LLSearchCollectionStatisticsGetter customCollectionStatisticsGetter;
private final boolean distributedPre;
private final long actionId;
public LLIndexSearcherWithCustomCollectionStatistics(IndexSearcher indexSearcher,
LLSearchCollectionStatisticsGetter customCollectionStatisticsGetter,
boolean distributedPre,
long actionId) {
super(indexSearcher.getIndexReader());
this.indexSearcher = indexSearcher;
this.setSimilarity(indexSearcher.getSimilarity());
this.setQueryCache(indexSearcher.getQueryCache());
this.setQueryCachingPolicy(indexSearcher.getQueryCachingPolicy());
this.customCollectionStatisticsGetter = customCollectionStatisticsGetter;
this.distributedPre = distributedPre;
this.actionId = actionId;
}
@Override
public CollectionStatistics collectionStatistics(String field) throws IOException {
return customCollectionStatisticsGetter.collectionStatistics(indexSearcher, field, distributedPre, actionId);
}
}

View File

@ -3,7 +3,8 @@ package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.LLDatabaseConnection;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
@ -51,6 +52,7 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection {
public Mono<LLLuceneIndex> getLuceneIndex(String name,
int instancesCount,
TextFieldsAnalyzer textFieldsAnalyzer,
TextFieldsSimilarity textFieldsSimilarity,
Duration queryRefreshDebounceTime,
Duration commitDebounceTime,
boolean lowMemory) {
@ -61,6 +63,7 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection {
name,
instancesCount,
textFieldsAnalyzer,
textFieldsSimilarity,
queryRefreshDebounceTime,
commitDebounceTime,
lowMemory
@ -69,9 +72,11 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection {
return new LLLocalLuceneIndex(basePath.resolve("lucene"),
name,
textFieldsAnalyzer,
textFieldsSimilarity,
queryRefreshDebounceTime,
commitDebounceTime,
lowMemory
lowMemory,
null
);
}
})

View File

@ -93,30 +93,6 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
this.handles.put(columns.get(i), handles.get(i));
}
/*
System.out.println("----Data----");
this.handles.forEach((Column column, ColumnFamilyHandle hnd) -> {
System.out.println("Column: " + column.getName());
if (!column.getName().contains("hash")) {
var val = new ArrayList<String>();
var iter = db.newIterator(hnd);
iter.seekToFirst();
while (iter.isValid()) {
val.add(Column.toString(iter.key()));
System.out.println(" " + Column.toString(iter.key()));
iter.next();
}
}
});
*/
/*
System.out.println("----Columns----");
this.handles.forEach((Column column, ColumnFamilyHandle hnd) -> {
System.out.println("Column: " + column.getName());
});
*/
flushDb(db, handles);
} catch (RocksDBException ex) {
throw new IOException(ex);

View File

@ -4,15 +4,18 @@ import it.cavallium.dbengine.database.LLDocument;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLScoreMode;
import it.cavallium.dbengine.database.LLSearchCollectionStatisticsGetter;
import it.cavallium.dbengine.database.LLSearchResult;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLSort;
import it.cavallium.dbengine.database.LLTerm;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.LuceneUtils;
import it.cavallium.dbengine.database.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.ScheduledTaskLifecycle;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity;
import it.cavallium.dbengine.lucene.searcher.AdaptiveStreamSearcher;
import it.cavallium.dbengine.lucene.searcher.AllowOnlyQueryParsingCollectorStreamSearcher;
import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher;
import it.cavallium.dbengine.lucene.searcher.PagedStreamSearcher;
import it.cavallium.dbengine.lucene.serializer.QueryParser;
@ -39,6 +42,7 @@ import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.jetbrains.annotations.Nullable;
@ -48,6 +52,7 @@ import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmissionException;
import reactor.core.publisher.Sinks.EmitResult;
import reactor.core.publisher.Sinks.Empty;
import reactor.core.publisher.Sinks.Many;
import reactor.core.publisher.Sinks.One;
import reactor.core.scheduler.Scheduler;
@ -58,6 +63,7 @@ import reactor.util.function.Tuples;
public class LLLocalLuceneIndex implements LLLuceneIndex {
private static final LuceneStreamSearcher streamSearcher = new AdaptiveStreamSearcher();
private static final AllowOnlyQueryParsingCollectorStreamSearcher allowOnlyQueryParsingCollectorStreamSearcher = new AllowOnlyQueryParsingCollectorStreamSearcher();
/**
* Global lucene index scheduler.
* There is only a single thread globally to not overwhelm the disk with
@ -84,15 +90,19 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
*/
private final ConcurrentHashMap<Long, LuceneIndexSnapshot> snapshots = new ConcurrentHashMap<>();
private final boolean lowMemory;
private final TextFieldsSimilarity similarity;
private final ScheduledTaskLifecycle scheduledTasksLifecycle;
private final @Nullable LLSearchCollectionStatisticsGetter distributedCollectionStatisticsGetter;
public LLLocalLuceneIndex(Path luceneBasePath,
String name,
TextFieldsAnalyzer analyzer,
TextFieldsSimilarity similarity,
Duration queryRefreshDebounceTime,
Duration commitDebounceTime,
boolean lowMemory) throws IOException {
boolean lowMemory,
@Nullable LLSearchCollectionStatisticsGetter distributedCollectionStatisticsGetter) throws IOException {
if (name.length() == 0) {
throw new IOException("Empty lucene database name");
}
@ -101,6 +111,8 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
this.luceneIndexName = name;
this.snapshotter = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
this.lowMemory = lowMemory;
this.similarity = similarity;
this.distributedCollectionStatisticsGetter = distributedCollectionStatisticsGetter;
IndexWriterConfig indexWriterConfig = new IndexWriterConfig(LuceneUtils.getAnalyzer(analyzer));
indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND);
indexWriterConfig.setIndexDeletionPolicy(snapshotter);
@ -112,6 +124,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
indexWriterConfig.setRAMBufferSizeMB(128);
indexWriterConfig.setRAMPerThreadHardLimitMB(512);
}
indexWriterConfig.setSimilarity(getSimilarity());
this.indexWriter = new IndexWriter(directory, indexWriterConfig);
this.searcherManager = new SearcherManager(indexWriter, false, false, null);
@ -123,6 +136,10 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
registerScheduledFixedTask(this::scheduledQueryRefresh, queryRefreshDebounceTime);
}
private Similarity getSimilarity() {
return LuceneUtils.getSimilarity(similarity);
}
private void registerScheduledFixedTask(Runnable task, Duration duration) {
scheduledTasksLifecycle.registerScheduledTask(luceneBlockingScheduler.schedulePeriodically(() -> {
scheduledTasksLifecycle.startScheduledTask();
@ -280,13 +297,24 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
}).subscribeOn(luceneBlockingScheduler);
}
private Mono<IndexSearcher> acquireSearcherWrapper(LLSnapshot snapshot) {
private Mono<IndexSearcher> acquireSearcherWrapper(LLSnapshot snapshot, boolean distributedPre, long actionId) {
return Mono.fromCallable(() -> {
IndexSearcher indexSearcher;
if (snapshot == null) {
//noinspection BlockingMethodInNonBlockingContext
return searcherManager.acquire();
indexSearcher = searcherManager.acquire();
indexSearcher.setSimilarity(getSimilarity());
} else {
return resolveSnapshot(snapshot).getIndexSearcher();
indexSearcher = resolveSnapshot(snapshot).getIndexSearcher();
}
if (distributedCollectionStatisticsGetter != null && actionId != -1) {
return new LLIndexSearcherWithCustomCollectionStatistics(indexSearcher,
distributedCollectionStatisticsGetter,
distributedPre,
actionId
);
} else {
return indexSearcher;
}
}).subscribeOn(luceneBlockingScheduler);
}
@ -304,12 +332,38 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
}).subscribeOn(luceneBlockingScheduler);
}
@SuppressWarnings({"Convert2MethodRef", "unchecked", "rawtypes"})
@Override
public Mono<LLSearchResult> moreLikeThis(@Nullable LLSnapshot snapshot,
Flux<Tuple2<String, Set<String>>> mltDocumentFieldsFlux,
int limit,
String keyFieldName) {
return moreLikeThis(snapshot, mltDocumentFieldsFlux, limit, keyFieldName, false, 0, 1);
}
public Mono<LLSearchResult> distributedMoreLikeThis(@Nullable LLSnapshot snapshot,
Flux<Tuple2<String, Set<String>>> mltDocumentFieldsFlux,
int limit,
String keyFieldName,
long actionId,
int scoreDivisor) {
return moreLikeThis(snapshot, mltDocumentFieldsFlux, limit, keyFieldName, false, actionId, scoreDivisor);
}
public Mono<Void> distributedPreMoreLikeThis(@Nullable LLSnapshot snapshot,
Flux<Tuple2<String, Set<String>>> mltDocumentFieldsFlux,
String keyFieldName, long actionId) {
return moreLikeThis(snapshot, mltDocumentFieldsFlux, -1, keyFieldName, true, actionId, 1)
.flatMap(LLSearchResult::completion);
}
@SuppressWarnings({"Convert2MethodRef", "unchecked", "rawtypes"})
private Mono<LLSearchResult> moreLikeThis(@Nullable LLSnapshot snapshot,
Flux<Tuple2<String, Set<String>>> mltDocumentFieldsFlux,
int limit,
String keyFieldName,
boolean doDistributedPre,
long actionId,
int scoreDivisor) {
return mltDocumentFieldsFlux
.collectMap(Tuple2::getT1, Tuple2::getT2, HashMap::new)
.flatMap(mltDocumentFields -> {
@ -317,7 +371,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
return Mono.just(LLSearchResult.empty());
}
return acquireSearcherWrapper(snapshot)
return acquireSearcherWrapper(snapshot, doDistributedPre, actionId)
.flatMap(indexSearcher -> Mono
.fromCallable(() -> {
var mlt = new MoreLikeThis(indexSearcher.getIndexReader());
@ -339,31 +393,39 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.many()
.unicast()
.onBackpressureBuffer(new ArrayBlockingQueue<>(1000));
Empty<Void> completeSink = Sinks.empty();
luceneBlockingScheduler.schedule(() -> {
Schedulers.boundedElastic().schedule(() -> {
try {
streamSearcher.search(indexSearcher,
query,
limit,
null,
ScoreMode.TOP_SCORES,
keyFieldName,
keyScore -> {
EmitResult result = topKeysSink.tryEmitNext(keyScore);
if (result.isFailure()) {
throw new EmissionException(result);
}
},
totalHitsCount -> {
EmitResult result = totalHitsCountSink.tryEmitValue(totalHitsCount);
if (result.isFailure()) {
throw new EmissionException(result);
}
});
if (doDistributedPre) {
allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, query);
totalHitsCountSink.tryEmitValue(0L);
} else {
streamSearcher.search(indexSearcher,
query,
limit,
null,
ScoreMode.TOP_SCORES,
keyFieldName,
keyScore -> {
EmitResult result = topKeysSink.tryEmitNext(fixKeyScore(keyScore, scoreDivisor));
if (result.isFailure()) {
throw new EmissionException(result);
}
},
totalHitsCount -> {
EmitResult result = totalHitsCountSink.tryEmitValue(totalHitsCount);
if (result.isFailure()) {
throw new EmissionException(result);
}
});
}
topKeysSink.tryEmitComplete();
completeSink.tryEmitEmpty();
} catch (IOException e) {
topKeysSink.tryEmitError(e);
totalHitsCountSink.tryEmitError(e);
completeSink.tryEmitError(e);
}
});
@ -377,12 +439,33 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
});
}
@SuppressWarnings("Convert2MethodRef")
private LLKeyScore fixKeyScore(LLKeyScore keyScore, int scoreDivisor) {
return scoreDivisor == 1 ? keyScore : new LLKeyScore(keyScore.getKey(), keyScore.getScore() / (float) scoreDivisor);
}
@Override
public Mono<LLSearchResult> search(@Nullable LLSnapshot snapshot, it.cavallium.dbengine.lucene.serializer.Query query, int limit,
@Nullable LLSort sort, LLScoreMode scoreMode, String keyFieldName) {
return search(snapshot, query, limit, sort, scoreMode, keyFieldName, false, 0, 1);
}
return acquireSearcherWrapper(snapshot)
public Mono<LLSearchResult> distributedSearch(@Nullable LLSnapshot snapshot, it.cavallium.dbengine.lucene.serializer.Query query, int limit,
@Nullable LLSort sort, LLScoreMode scoreMode, String keyFieldName, long actionId, int scoreDivisor) {
return search(snapshot, query, limit, sort, scoreMode, keyFieldName, false, actionId, scoreDivisor);
}
public Mono<Void> distributedPreSearch(@Nullable LLSnapshot snapshot, it.cavallium.dbengine.lucene.serializer.Query query,
@Nullable LLSort sort, LLScoreMode scoreMode, String keyFieldName, long actionId) {
return search(snapshot, query, -1, sort, scoreMode, keyFieldName, true, actionId, 1)
.flatMap(LLSearchResult::completion);
}
@SuppressWarnings("Convert2MethodRef")
private Mono<LLSearchResult> search(@Nullable LLSnapshot snapshot,
it.cavallium.dbengine.lucene.serializer.Query query, int limit,
@Nullable LLSort sort, LLScoreMode scoreMode, String keyFieldName,
boolean doDistributedPre, long actionId, int scoreDivisor) {
return acquireSearcherWrapper(snapshot, doDistributedPre, actionId)
.flatMap(indexSearcher -> Mono
.fromCallable(() -> {
Query luceneQuery = QueryParser.parse(query);
@ -402,27 +485,32 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.many()
.unicast()
.onBackpressureBuffer(new ArrayBlockingQueue<>(PagedStreamSearcher.MAX_ITEMS_PER_PAGE));
luceneBlockingScheduler.schedule(() -> {
Schedulers.boundedElastic().schedule(() -> {
try {
streamSearcher.search(indexSearcher,
luceneQuery,
limit,
luceneSort,
luceneScoreMode,
keyFieldName,
keyScore -> {
EmitResult result = topKeysSink.tryEmitNext(keyScore);
if (result.isFailure()) {
throw new EmissionException(result);
if (doDistributedPre) {
allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery);
totalHitsCountSink.tryEmitValue(0L);
} else {
streamSearcher.search(indexSearcher,
luceneQuery,
limit,
luceneSort,
luceneScoreMode,
keyFieldName,
keyScore -> {
EmitResult result = topKeysSink.tryEmitNext(fixKeyScore(keyScore, scoreDivisor));
if (result.isFailure()) {
throw new EmissionException(result);
}
},
totalHitsCount -> {
EmitResult result = totalHitsCountSink.tryEmitValue(totalHitsCount);
if (result.isFailure()) {
throw new EmissionException(result);
}
}
},
totalHitsCount -> {
EmitResult result = totalHitsCountSink.tryEmitValue(totalHitsCount);
if (result.isFailure()) {
throw new EmissionException(result);
}
});
);
}
topKeysSink.tryEmitComplete();
} catch (IOException e) {
topKeysSink.tryEmitError(e);

View File

@ -1,5 +1,7 @@
package it.cavallium.dbengine.database.disk;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import it.cavallium.dbengine.database.LLDocument;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLScoreMode;
@ -7,7 +9,8 @@ import it.cavallium.dbengine.database.LLSearchResult;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLSort;
import it.cavallium.dbengine.database.LLTerm;
import it.cavallium.dbengine.database.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity;
import it.cavallium.dbengine.lucene.serializer.Query;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
@ -16,8 +19,13 @@ import java.nio.file.Path;
import java.time.Duration;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.search.CollectionStatistics;
import org.apache.lucene.search.IndexSearcher;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.batch.ParallelUtils;
import org.warp.commonutils.functional.IOBiConsumer;
@ -34,12 +42,17 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
private final AtomicLong nextSnapshotNumber = new AtomicLong(1);
private final LLLocalLuceneIndex[] luceneIndices;
private final AtomicLong nextActionId = new AtomicLong(0);
private final ConcurrentHashMap<Long, Cache<String, CollectionStatistics>[]> statistics = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Long, AtomicInteger> completedStreams = new ConcurrentHashMap<>();
private final int maxQueueSize = 1000;
public LLLocalMultiLuceneIndex(Path lucene,
String name,
int instancesCount,
TextFieldsAnalyzer textFieldsAnalyzer,
TextFieldsSimilarity textFieldsSimilarity,
Duration queryRefreshDebounceTime,
Duration commitDebounceTime,
boolean lowMemory) throws IOException {
@ -50,6 +63,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
LLLocalLuceneIndex[] luceneIndices = new LLLocalLuceneIndex[instancesCount];
for (int i = 0; i < instancesCount; i++) {
int finalI = i;
String instanceName;
if (i == 0) {
instanceName = name;
@ -59,14 +73,75 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
luceneIndices[i] = new LLLocalLuceneIndex(lucene,
instanceName,
textFieldsAnalyzer,
textFieldsSimilarity,
queryRefreshDebounceTime,
commitDebounceTime,
lowMemory
lowMemory,
(indexSearcher, field, distributedPre, actionId) -> distributedCustomCollectionStatistics(finalI,
indexSearcher,
field,
distributedPre,
actionId
)
);
}
this.luceneIndices = luceneIndices;
}
private long newAction() {
var statistics = new Cache[luceneIndices.length];
for (int i = 0; i < luceneIndices.length; i++) {
statistics[i] = CacheBuilder.newBuilder().build();
}
long actionId = nextActionId.getAndIncrement();
//noinspection unchecked
this.statistics.put(actionId, statistics);
this.completedStreams.put(actionId, new AtomicInteger(0));
return actionId;
}
private void completedAction(long actionId) {
var completedStreamsCount = completedStreams.get(actionId);
if (completedStreamsCount != null) {
if (completedStreamsCount.incrementAndGet() >= luceneIndices.length) {
this.statistics.remove(actionId);
this.completedStreams.remove(actionId);
}
}
}
private CollectionStatistics distributedCustomCollectionStatistics(int luceneIndex,
IndexSearcher indexSearcher, String field, boolean distributedPre, long actionId) throws IOException {
if (distributedPre) {
try {
return statistics.get(actionId)[luceneIndex].get(field, () -> indexSearcher.collectionStatistics(field));
} catch (ExecutionException e) {
throw new IOException();
}
} else {
long maxDoc = 0;
long docCount = 0;
long sumTotalTermFreq = 0;
long sumDocFreq = 0;
for (int i = 0; i < luceneIndices.length; i++) {
CollectionStatistics iCollStats = statistics.get(actionId)[i].getIfPresent(field);
if (iCollStats != null) {
maxDoc += iCollStats.maxDoc();
docCount += iCollStats.docCount();
sumTotalTermFreq += iCollStats.sumTotalTermFreq();
sumDocFreq += iCollStats.sumDocFreq();
}
}
return new CollectionStatistics(field,
(int) Math.max(1, Math.min(maxDoc, Integer.MAX_VALUE)),
Math.max(1, docCount),
Math.max(1, sumTotalTermFreq),
Math.max(1, sumDocFreq)
);
}
}
private LLLocalLuceneIndex getLuceneIndex(LLTerm id) {
return luceneIndices[getLuceneIndexId(id)];
}
@ -130,14 +205,61 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
Flux<Tuple2<String, Set<String>>> mltDocumentFields,
int limit,
String keyFieldName) {
return Flux
.fromArray(luceneIndices)
.index()
.flatMap(tuple -> Mono
.fromCallable(() -> resolveSnapshotOptional(snapshot, (int) (long) tuple.getT1()))
.map(luceneSnapshot -> Tuples.of(tuple.getT2(), luceneSnapshot)))
.flatMap(tuple -> tuple.getT1().moreLikeThis(tuple.getT2().orElse(null), mltDocumentFields, limit, keyFieldName))
.reduce(LLSearchResult.accumulator());
long actionId;
int scoreDivisor;
Flux<Tuple2<String, Set<String>>> mltDocumentFieldsShared;
Mono<Void> distributedPre;
if (luceneIndices.length > 1) {
actionId = newAction();
scoreDivisor = 20;
mltDocumentFieldsShared = mltDocumentFields.publish().refCount();
distributedPre = Flux
.fromArray(luceneIndices)
.index()
.flatMap(tuple -> Mono
.fromCallable(() -> resolveSnapshotOptional(snapshot, (int) (long) tuple.getT1()))
.map(luceneSnapshot -> Tuples.of(tuple.getT2(), luceneSnapshot))
)
.flatMap(tuple -> tuple
.getT1()
.distributedPreMoreLikeThis(tuple.getT2().orElse(null), mltDocumentFieldsShared, keyFieldName, actionId)
)
.then();
} else {
actionId = -1;
scoreDivisor = 1;
mltDocumentFieldsShared = mltDocumentFields;
distributedPre = Mono.empty();
}
return distributedPre.then(Flux
.fromArray(luceneIndices)
.index()
.flatMap(tuple -> Mono
.fromCallable(() -> resolveSnapshotOptional(snapshot, (int) (long) tuple.getT1()))
.map(luceneSnapshot -> Tuples.of(tuple.getT2(), luceneSnapshot)))
.flatMap(tuple -> tuple
.getT1()
.distributedMoreLikeThis(tuple.getT2().orElse(null),
mltDocumentFieldsShared,
limit,
keyFieldName,
actionId,
scoreDivisor
)
)
.reduce(LLSearchResult.accumulator())
.map(result -> {
if (actionId != -1) {
var resultsWithTermination = result
.results()
.map(flux -> flux.doOnTerminate(() -> completedAction(actionId)));
return new LLSearchResult(result.totalHitsCount(), resultsWithTermination);
} else {
return result;
}
})
);
}
@Override
@ -147,14 +269,60 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
@Nullable LLSort sort,
LLScoreMode scoreMode,
String keyFieldName) {
return Flux
.fromArray(luceneIndices)
.index()
.flatMap(tuple -> Mono
.fromCallable(() -> resolveSnapshotOptional(snapshot, (int) (long) tuple.getT1()))
.map(luceneSnapshot -> Tuples.of(tuple.getT2(), luceneSnapshot)))
.flatMap(tuple -> tuple.getT1().search(tuple.getT2().orElse(null), query, limit, sort, scoreMode, keyFieldName))
.reduce(LLSearchResult.accumulator());
long actionId;
int scoreDivisor;
Mono<Void> distributedPre;
if (luceneIndices.length <= 1 || scoreMode == LLScoreMode.COMPLETE_NO_SCORES) {
actionId = -1;
scoreDivisor = 1;
distributedPre = Mono.empty();
} else {
actionId = newAction();
scoreDivisor = 20;
distributedPre = Flux
.fromArray(luceneIndices)
.index()
.flatMap(tuple -> Mono
.fromCallable(() -> resolveSnapshotOptional(snapshot, (int) (long) tuple.getT1()))
.map(luceneSnapshot -> Tuples.of(tuple.getT2(), luceneSnapshot))
)
.flatMap(tuple -> tuple
.getT1()
.distributedPreSearch(tuple.getT2().orElse(null), query, sort, scoreMode, keyFieldName, actionId)
)
.then();
}
return distributedPre
.then(Flux
.fromArray(luceneIndices)
.index()
.flatMap(tuple -> Mono
.fromCallable(() -> resolveSnapshotOptional(snapshot, (int) (long) tuple.getT1()))
.map(luceneSnapshot -> Tuples.of(tuple.getT2(), luceneSnapshot))
)
.flatMap(tuple -> tuple
.getT1()
.distributedSearch(tuple.getT2().orElse(null),
query,
limit,
sort,
scoreMode,
keyFieldName,
actionId,
scoreDivisor
))
.reduce(LLSearchResult.accumulator())
.map(result -> {
if (actionId != -1) {
var resultsWithTermination = result
.results()
.map(flux -> flux.doOnTerminate(() -> completedAction(actionId)));
return new LLSearchResult(result.totalHitsCount(), resultsWithTermination);
} else {
return result;
}
})
);
}
@Override

View File

@ -3,7 +3,6 @@ package it.cavallium.dbengine.lucene;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.LeafCollector;
@ -17,22 +16,28 @@ public class LuceneParallelStreamCollector implements Collector, LeafCollector {
private final LuceneParallelStreamConsumer streamConsumer;
private final AtomicBoolean stopped;
private final AtomicLong totalHitsCounter;
private final ReentrantLock lock;
private Scorable scorer;
public LuceneParallelStreamCollector(int base, ScoreMode scoreMode, LuceneParallelStreamConsumer streamConsumer,
AtomicBoolean stopped, AtomicLong totalHitsCounter, ReentrantLock lock) {
public LuceneParallelStreamCollector(int base,
ScoreMode scoreMode,
LuceneParallelStreamConsumer streamConsumer,
AtomicBoolean stopped,
AtomicLong totalHitsCounter) {
this.base = base;
this.scoreMode = scoreMode;
this.streamConsumer = streamConsumer;
this.stopped = stopped;
this.totalHitsCounter = totalHitsCounter;
this.lock = lock;
}
@Override
public final LeafCollector getLeafCollector(LeafReaderContext context) {
return new LuceneParallelStreamCollector(context.docBase, scoreMode, streamConsumer, stopped, totalHitsCounter, lock);
return new LuceneParallelStreamCollector(context.docBase,
scoreMode,
streamConsumer,
stopped,
totalHitsCounter
);
}
@Override
@ -44,16 +49,11 @@ public class LuceneParallelStreamCollector implements Collector, LeafCollector {
public void collect(int doc) throws IOException {
doc += base;
totalHitsCounter.incrementAndGet();
lock.lock();
try {
if (!stopped.get()) {
assert (scorer == null) || scorer.docID() == doc;
if (!streamConsumer.consume(doc, scorer == null ? 0 : scorer.score())) {
stopped.set(true);
}
if (!stopped.get()) {
assert (scorer == null) || scorer.docID() == doc;
if (!streamConsumer.consume(doc, scorer == null ? 0 : scorer.score())) {
stopped.set(true);
}
} finally {
lock.unlock();
}
}

View File

@ -3,7 +3,6 @@ package it.cavallium.dbengine.lucene;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.ScoreMode;
@ -14,7 +13,6 @@ public class LuceneParallelStreamCollectorManager implements
private final LuceneParallelStreamConsumer streamConsumer;
private final AtomicBoolean stopped;
private final AtomicLong totalHitsCounter;
private final ReentrantLock lock;
public static LuceneParallelStreamCollectorManager fromConsumer(
ScoreMode scoreMode,
@ -22,17 +20,17 @@ public class LuceneParallelStreamCollectorManager implements
return new LuceneParallelStreamCollectorManager(scoreMode, streamConsumer);
}
public LuceneParallelStreamCollectorManager(ScoreMode scoreMode, LuceneParallelStreamConsumer streamConsumer) {
public LuceneParallelStreamCollectorManager(ScoreMode scoreMode,
LuceneParallelStreamConsumer streamConsumer) {
this.scoreMode = scoreMode;
this.streamConsumer = streamConsumer;
this.stopped = new AtomicBoolean();
this.totalHitsCounter = new AtomicLong();
this.lock = new ReentrantLock();
}
@Override
public LuceneParallelStreamCollector newCollector() {
return new LuceneParallelStreamCollector(0, scoreMode, streamConsumer, stopped, totalHitsCounter, lock);
return new LuceneParallelStreamCollector(0, scoreMode, streamConsumer, stopped, totalHitsCounter);
}
@Override

View File

@ -11,4 +11,5 @@ public class LuceneParallelStreamCollectorResult {
public long getTotalHitsCount() {
return totalHitsCount;
}
}

View File

@ -1,5 +1,7 @@
package it.cavallium.dbengine.lucene;
import java.io.IOException;
public interface LuceneParallelStreamConsumer {
/**
@ -7,5 +9,5 @@ public interface LuceneParallelStreamConsumer {
* @param score score of document
* @return true to continue, false to stop the execution
*/
boolean consume(int docId, float score);
boolean consume(int docId, float score) throws IOException;
}

View File

@ -0,0 +1,170 @@
package it.cavallium.dbengine.lucene;
import it.cavallium.dbengine.client.MultiSort;
import it.cavallium.dbengine.lucene.analyzer.N4CharGramAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.N4CharGramEdgeAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity;
import it.cavallium.dbengine.lucene.analyzer.WordAnalyzer;
import it.cavallium.dbengine.lucene.similarity.NGramSimilarity;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.LowerCaseFilter;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.en.EnglishPossessiveFilter;
import org.apache.lucene.analysis.en.KStemFilter;
import org.apache.lucene.analysis.miscellaneous.ASCIIFoldingFilter;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.misc.SweetSpotSimilarity;
import org.apache.lucene.search.similarities.BooleanSimilarity;
import org.apache.lucene.search.similarities.ClassicSimilarity;
import org.apache.lucene.search.similarities.Similarity;
import org.jetbrains.annotations.Nullable;
import org.novasearch.lucene.search.similarities.BM25Similarity;
import org.novasearch.lucene.search.similarities.BM25Similarity.BM25Model;
import org.novasearch.lucene.search.similarities.LdpSimilarity;
import org.novasearch.lucene.search.similarities.LtcSimilarity;
import org.novasearch.lucene.search.similarities.RobertsonSimilarity;
import reactor.core.publisher.Flux;
public class LuceneUtils {
private static final Analyzer lucene4GramWordsAnalyzerEdgeInstance = new N4CharGramEdgeAnalyzer(true);
private static final Analyzer lucene4GramStringAnalyzerEdgeInstance = new N4CharGramEdgeAnalyzer(false);
private static final Analyzer lucene4GramWordsAnalyzerInstance = new N4CharGramAnalyzer(true);
private static final Analyzer lucene4GramStringAnalyzerInstance = new N4CharGramAnalyzer(false);
private static final Analyzer luceneStandardAnalyzerInstance = new StandardAnalyzer();
private static final Analyzer luceneWordAnalyzerStopWordsAndStemInstance = new WordAnalyzer(true, true);
private static final Analyzer luceneWordAnalyzerStopWordsInstance = new WordAnalyzer(true, false);
private static final Analyzer luceneWordAnalyzerStemInstance = new WordAnalyzer(false, true);
private static final Analyzer luceneWordAnalyzerSimpleInstance = new WordAnalyzer(false, false);
private static final Similarity luceneBM25ClassicSimilarityInstance = new BM25Similarity(BM25Model.CLASSIC);
private static final Similarity luceneBM25PlusSimilarityInstance = new BM25Similarity(BM25Model.PLUS);
private static final Similarity luceneBM25LSimilarityInstance = new BM25Similarity(BM25Model.L);
private static final Similarity luceneBM15PlusSimilarityInstance = new BM25Similarity(1.2f, 0.0f, 0.5f, BM25Model.PLUS);
private static final Similarity luceneBM11PlusSimilarityInstance = new BM25Similarity(1.2f, 1.0f, 0.5f, BM25Model.PLUS);
private static final Similarity luceneBM25ClassicNGramSimilarityInstance = NGramSimilarity.bm25(BM25Model.CLASSIC);
private static final Similarity luceneBM25PlusNGramSimilarityInstance = NGramSimilarity.bm25(BM25Model.PLUS);
private static final Similarity luceneBM25LNGramSimilarityInstance = NGramSimilarity.bm25(BM25Model.L);
private static final Similarity luceneBM15PlusNGramSimilarityInstance = NGramSimilarity.bm15(BM25Model.PLUS);
private static final Similarity luceneBM11PlusNGramSimilarityInstance = NGramSimilarity.bm11(BM25Model.PLUS);
private static final Similarity luceneClassicSimilarityInstance = new ClassicSimilarity();
private static final Similarity luceneClassicNGramSimilarityInstance = NGramSimilarity.classic();
private static final Similarity luceneSweetSpotSimilarityInstance = new SweetSpotSimilarity();
private static final Similarity luceneLTCSimilarityInstance = new LtcSimilarity();
private static final Similarity luceneLDPSimilarityInstance = new LdpSimilarity();
private static final Similarity luceneLDPNoLengthSimilarityInstance = new LdpSimilarity(0, 0.5f);
private static final Similarity luceneBooleanSimilarityInstance = new BooleanSimilarity();
private static final Similarity luceneRobertsonSimilarityInstance = new RobertsonSimilarity();
public static Analyzer getAnalyzer(TextFieldsAnalyzer analyzer) {
switch (analyzer) {
case PartialWords:
return lucene4GramWordsAnalyzerInstance;
case PartialString:
return lucene4GramStringAnalyzerInstance;
case PartialWordsEdge:
return lucene4GramWordsAnalyzerEdgeInstance;
case PartialStringEdge:
return lucene4GramStringAnalyzerEdgeInstance;
case Standard:
return luceneStandardAnalyzerInstance;
case FullText:
return luceneWordAnalyzerStopWordsAndStemInstance;
case WordWithStopwordsStripping:
return luceneWordAnalyzerStopWordsInstance;
case WordWithStemming:
return luceneWordAnalyzerStemInstance;
case WordSimple:
return luceneWordAnalyzerSimpleInstance;
default:
throw new UnsupportedOperationException("Unknown analyzer: " + analyzer);
}
}
public static Similarity getSimilarity(TextFieldsSimilarity similarity) {
switch (similarity) {
case BM25Classic:
return luceneBM25ClassicSimilarityInstance;
case NGramBM25Classic:
return luceneBM25ClassicNGramSimilarityInstance;
case BM25L:
return luceneBM25LSimilarityInstance;
case NGramBM25L:
return luceneBM25LNGramSimilarityInstance;
case Classic:
return luceneClassicSimilarityInstance;
case NGramClassic:
return luceneClassicNGramSimilarityInstance;
case BM25Plus:
return luceneBM25PlusSimilarityInstance;
case NGramBM25Plus:
return luceneBM25PlusNGramSimilarityInstance;
case BM15Plus:
return luceneBM15PlusSimilarityInstance;
case NGramBM15Plus:
return luceneBM15PlusNGramSimilarityInstance;
case BM11Plus:
return luceneBM11PlusSimilarityInstance;
case NGramBM11Plus:
return luceneBM11PlusNGramSimilarityInstance;
case SweetSpot:
return luceneSweetSpotSimilarityInstance;
case LTC:
return luceneLTCSimilarityInstance;
case LDP:
return luceneLDPSimilarityInstance;
case LDPNoLength:
return luceneLDPNoLengthSimilarityInstance;
case Robertson:
return luceneRobertsonSimilarityInstance;
case Boolean:
return luceneBooleanSimilarityInstance;
default:
throw new IllegalStateException("Unknown similarity: " + similarity);
}
}
/**
*
* @param stem Enable stem filters on words.
* Pass false if it will be used with a n-gram filter
*/
public static TokenStream newCommonFilter(TokenStream tokenStream, boolean stem) {
tokenStream = newCommonNormalizer(tokenStream);
if (stem) {
tokenStream = new KStemFilter(tokenStream);
tokenStream = new EnglishPossessiveFilter(tokenStream);
}
return tokenStream;
}
public static TokenStream newCommonNormalizer(TokenStream tokenStream) {
tokenStream = new ASCIIFoldingFilter(tokenStream);
tokenStream = new LowerCaseFilter(tokenStream);
return tokenStream;
}
/**
* Merge streams together maintaining absolute order
*/
public static <T> Flux<T> mergeStream(Flux<Flux<T>> mappedMultiResults,
@Nullable MultiSort<T> sort,
@Nullable Integer limit) {
if (limit != null && limit == 0) {
return mappedMultiResults.flatMap(f -> f).ignoreElements().flux();
}
return mappedMultiResults.collectList().flatMapMany(mappedMultiResultsList -> {
Flux<T> mergedFlux;
if (sort == null) {
mergedFlux = Flux.merge(mappedMultiResultsList);
} else {
//noinspection unchecked
mergedFlux = Flux.mergeOrdered(32, sort.getResultSort(), mappedMultiResultsList.toArray(Flux[]::new));
}
if (limit == null) {
return mergedFlux;
} else {
return mergedFlux.take(limit);
}
});
}
}

View File

@ -1,24 +1,34 @@
package it.cavallium.dbengine.database.analyzer;
package it.cavallium.dbengine.lucene.analyzer;
import it.cavallium.dbengine.database.LuceneUtils;
import it.cavallium.dbengine.lucene.LuceneUtils;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.Tokenizer;
import org.apache.lucene.analysis.core.KeywordTokenizer;
import org.apache.lucene.analysis.ngram.NGramTokenFilter;
import org.apache.lucene.analysis.standard.StandardTokenizer;
public class N4CharGramAnalyzer extends Analyzer {
public N4CharGramAnalyzer() {
private final boolean words;
public N4CharGramAnalyzer(boolean words) {
this.words = words;
}
@Override
protected TokenStreamComponents createComponents(final String fieldName) {
Tokenizer tokenizer = new KeywordTokenizer();
TokenStream tokenStream = tokenizer;
tokenStream = LuceneUtils.newCommonFilter(tokenStream, false);
tokenStream = new NGramTokenFilter(tokenStream, 4, 4, false);
Tokenizer tokenizer;
TokenStream tokenStream;
if (words) {
tokenizer = new StandardTokenizer();
tokenStream = tokenizer;
} else {
tokenizer = new KeywordTokenizer();
tokenStream = tokenizer;
}
tokenStream = LuceneUtils.newCommonFilter(tokenStream, words);
tokenStream = new NGramTokenFilter(tokenStream, 3, 5, false);
return new TokenStreamComponents(tokenizer, tokenStream);
}

View File

@ -1,24 +1,34 @@
package it.cavallium.dbengine.database.analyzer;
package it.cavallium.dbengine.lucene.analyzer;
import it.cavallium.dbengine.database.LuceneUtils;
import it.cavallium.dbengine.lucene.LuceneUtils;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.Tokenizer;
import org.apache.lucene.analysis.core.KeywordTokenizer;
import org.apache.lucene.analysis.ngram.EdgeNGramTokenFilter;
import org.apache.lucene.analysis.standard.StandardTokenizer;
public class N4CharGramEdgeAnalyzer extends Analyzer {
public N4CharGramEdgeAnalyzer() {
private final boolean words;
public N4CharGramEdgeAnalyzer(boolean words) {
this.words = words;
}
@Override
protected TokenStreamComponents createComponents(final String fieldName) {
Tokenizer tokenizer = new KeywordTokenizer();
TokenStream tokenStream = tokenizer;
tokenStream = LuceneUtils.newCommonFilter(tokenStream, false);
tokenStream = new EdgeNGramTokenFilter(tokenStream, 4, 4, false);
Tokenizer tokenizer;
TokenStream tokenStream;
if (words) {
tokenizer = new StandardTokenizer();
tokenStream = tokenizer;
} else {
tokenizer = new KeywordTokenizer();
tokenStream = tokenizer;
}
tokenStream = LuceneUtils.newCommonFilter(tokenStream, words);
tokenStream = new EdgeNGramTokenFilter(tokenStream, 3, 5, false);
return new TokenStreamComponents(tokenizer, tokenStream);
}

View File

@ -1,8 +1,11 @@
package it.cavallium.dbengine.database.analyzer;
package it.cavallium.dbengine.lucene.analyzer;
public enum TextFieldsAnalyzer {
PartialWordsEdge,
PartialWords,
PartialWordsEdge,
PartialString,
PartialStringEdge,
Standard,
WordSimple,
WordWithStopwordsStripping,
WordWithStemming,

View File

@ -0,0 +1,22 @@
package it.cavallium.dbengine.lucene.analyzer;
public enum TextFieldsSimilarity {
BM25Classic,
NGramBM25Classic,
BM25L,
NGramBM25L,
BM25Plus,
NGramBM25Plus,
BM15Plus,
NGramBM15Plus,
BM11Plus,
NGramBM11Plus,
Classic,
NGramClassic,
SweetSpot,
LTC,
LDP,
LDPNoLength,
Robertson,
Boolean
}

View File

@ -1,7 +1,7 @@
package it.cavallium.dbengine.database.analyzer;
package it.cavallium.dbengine.lucene.analyzer;
import it.cavallium.dbengine.database.EnglishItalianStopFilter;
import it.cavallium.dbengine.database.LuceneUtils;
import it.cavallium.dbengine.lucene.LuceneUtils;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.Tokenizer;

View File

@ -15,6 +15,7 @@ import org.jetbrains.annotations.Nullable;
*/
public class AdaptiveStreamSearcher implements LuceneStreamSearcher {
private static final boolean ENABLE_PARALLEL_COLLECTOR = true;
private final SimpleStreamSearcher simpleStreamSearcher;
private final ParallelCollectorStreamSearcher parallelCollectorStreamSearcher;
private final PagedStreamSearcher pagedStreamSearcher;
@ -38,10 +39,10 @@ public class AdaptiveStreamSearcher implements LuceneStreamSearcher {
LongConsumer totalHitsConsumer) throws IOException {
if (limit == 0) {
totalHitsConsumer.accept(countStreamSearcher.count(indexSearcher, query));
} else if (luceneSort == null) {
} else if (luceneSort == null && ENABLE_PARALLEL_COLLECTOR) {
parallelCollectorStreamSearcher.search(indexSearcher, query, limit, null, scoreMode, keyFieldName, consumer, totalHitsConsumer);
} else {
if (limit > PagedStreamSearcher.MAX_ITEMS_PER_PAGE) {
if (luceneSort != null && limit > PagedStreamSearcher.MAX_ITEMS_PER_PAGE) {
pagedStreamSearcher.search(indexSearcher, query, limit, luceneSort, scoreMode, keyFieldName, consumer, totalHitsConsumer);
} else {
simpleStreamSearcher.search(indexSearcher, query, limit, luceneSort, scoreMode, keyFieldName, consumer, totalHitsConsumer);

View File

@ -0,0 +1,93 @@
package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.database.LLKeyScore;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.CompletionException;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Sort;
import org.jetbrains.annotations.Nullable;
/**
* Search that only parse query without doing any effective search
*/
public class AllowOnlyQueryParsingCollectorStreamSearcher implements LuceneStreamSearcher {
public void search(IndexSearcher indexSearcher,
Query query) throws IOException {
search(indexSearcher, query, 0, null, null, null, null, null);
}
@Override
public void search(IndexSearcher indexSearcher,
Query query,
int limit,
@Nullable Sort luceneSort,
ScoreMode scoreMode,
String keyFieldName,
Consumer<LLKeyScore> resultsConsumer,
LongConsumer totalHitsConsumer) throws IOException {
if (limit > 0) {
throw new IllegalArgumentException("Limit > 0 not allowed");
}
if (luceneSort != null) {
throw new IllegalArgumentException("Lucene sort not allowed");
}
if (scoreMode != null) {
throw new IllegalArgumentException("Score mode not allowed");
}
if (keyFieldName != null) {
throw new IllegalArgumentException("Key field name not allowed");
}
if (resultsConsumer != null) {
throw new IllegalArgumentException("Results consumer not allowed");
}
if (totalHitsConsumer != null) {
throw new IllegalArgumentException("Total hits consumer not allowed");
}
indexSearcher.search(query, new CollectorManager<>() {
@Override
public Collector newCollector() {
return new Collector() {
@Override
public LeafCollector getLeafCollector(LeafReaderContext context) {
return new LeafCollector() {
@Override
public void setScorer(Scorable scorer) {
try {
scorer.setMinCompetitiveScore(Float.MAX_VALUE);
} catch (IOException e) {
throw new CompletionException(e);
}
}
@Override
public void collect(int doc) {
System.out.println(doc);
}
};
}
@Override
public ScoreMode scoreMode() {
return ScoreMode.TOP_SCORES;
}
};
}
@Override
public Object reduce(Collection<Collector> collectors) {
return null;
}
});
}
}

View File

@ -2,9 +2,9 @@ package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.lucene.LuceneParallelStreamCollectorManager;
import it.cavallium.dbengine.lucene.LuceneParallelStreamCollectorResult;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
@ -36,32 +36,27 @@ public class ParallelCollectorStreamSearcher implements LuceneStreamSearcher {
AtomicInteger currentCount = new AtomicInteger();
var result = indexSearcher.search(query, LuceneParallelStreamCollectorManager.fromConsumer(scoreMode, (docId, score) -> {
LuceneParallelStreamCollectorResult result = indexSearcher.search(query, LuceneParallelStreamCollectorManager.fromConsumer(scoreMode, (docId, score) -> {
if (currentCount.getAndIncrement() >= limit) {
return false;
} else {
try {
Document d = indexSearcher.doc(docId, Set.of(keyFieldName));
if (d.getFields().isEmpty()) {
logger.error("The document docId: {} is empty.", docId);
var realFields = indexSearcher.doc(docId).getFields();
if (!realFields.isEmpty()) {
logger.error("Present fields:");
for (IndexableField field : realFields) {
logger.error(" - {}", field.name());
}
}
} else {
var field = d.getField(keyFieldName);
if (field == null) {
logger.error("Can't get key of document docId: {}", docId);
} else {
resultsConsumer.accept(new LLKeyScore(field.stringValue(), score));
Document d = indexSearcher.doc(docId, Set.of(keyFieldName));
if (d.getFields().isEmpty()) {
logger.error("The document docId: {} is empty.", docId);
var realFields = indexSearcher.doc(docId).getFields();
if (!realFields.isEmpty()) {
logger.error("Present fields:");
for (IndexableField field : realFields) {
logger.error(" - {}", field.name());
}
}
} catch (IOException e) {
e.printStackTrace();
throw new CompletionException(e);
} else {
var field = d.getField(keyFieldName);
if (field == null) {
logger.error("Can't get key of document docId: {}", docId);
} else {
resultsConsumer.accept(new LLKeyScore(field.stringValue(), score));
}
}
return true;
}

View File

@ -30,7 +30,12 @@ public class SimpleStreamSearcher implements LuceneStreamSearcher {
String keyFieldName,
Consumer<LLKeyScore> resultsConsumer,
LongConsumer totalHitsConsumer) throws IOException {
TopDocs topDocs = indexSearcher.search(query, limit, luceneSort, scoreMode != ScoreMode.COMPLETE_NO_SCORES);
TopDocs topDocs;
if (luceneSort != null) {
topDocs = indexSearcher.search(query, limit, luceneSort, scoreMode != ScoreMode.COMPLETE_NO_SCORES);
} else {
topDocs = indexSearcher.search(query, limit);
}
totalHitsConsumer.accept(topDocs.totalHits.value);
var hits = ObjectArrayList.wrap(topDocs.scoreDocs);
for (ScoreDoc hit : hits) {

View File

@ -2,8 +2,8 @@ package it.cavallium.dbengine.lucene.serializer;
import static it.cavallium.dbengine.lucene.serializer.QueryParser.USE_PHRASE_QUERY;
import it.cavallium.dbengine.database.LuceneUtils;
import it.cavallium.dbengine.database.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
import java.io.IOException;
import java.io.StringReader;
import java.util.LinkedList;

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package it.cavallium.dbengine.lucene.similarity;
import org.apache.lucene.search.similarities.ClassicSimilarity;
import org.apache.lucene.search.similarities.Similarity;
import org.novasearch.lucene.search.similarities.BM25Similarity;
import org.novasearch.lucene.search.similarities.BM25Similarity.BM25Model;
public class NGramSimilarity {
private NGramSimilarity() {
}
public static Similarity classic() {
var instance = new ClassicSimilarity();
instance.setDiscountOverlaps(false);
return instance;
}
public static Similarity bm25(BM25Model model) {
var instance = new BM25Similarity(model);
instance.setDiscountOverlaps(false);
return instance;
}
public static Similarity bm15(BM25Model model) {
var instance = new BM25Similarity(1.2f, 0.0f, 0.5f, model);
instance.setDiscountOverlaps(false);
return instance;
}
public static Similarity bm11(BM25Model model) {
var instance = new BM25Similarity(1.2f, 1.0f, 0.5f, model);
instance.setDiscountOverlaps(false);
return instance;
}
}