CavalliumDBEngine/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java

861 lines
36 KiB
Java
Raw Normal View History

package it.cavallium.dbengine.lucene;
import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler;
2022-07-02 11:44:13 +02:00
import static it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite.NO_REWRITE;
import static reactor.core.scheduler.Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE;
import static reactor.core.scheduler.Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE;
2022-01-28 19:31:25 +01:00
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
2022-06-20 11:55:41 +02:00
import it.cavallium.data.generator.nativedata.Nullabledouble;
import it.cavallium.data.generator.nativedata.Nullableint;
import it.cavallium.data.generator.nativedata.Nullablelong;
2021-03-11 14:45:45 +01:00
import it.cavallium.dbengine.client.CompositeSnapshot;
2021-07-06 01:30:37 +02:00
import it.cavallium.dbengine.client.query.QueryParser;
2022-07-02 11:44:13 +02:00
import it.cavallium.dbengine.client.query.current.data.NoSort;
2021-07-06 01:30:37 +02:00
import it.cavallium.dbengine.client.query.current.data.QueryParams;
2021-08-04 01:12:39 +02:00
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.LLKeyScore;
2022-03-05 15:46:40 +01:00
import it.cavallium.dbengine.database.LLTerm;
2021-09-23 20:57:28 +02:00
import it.cavallium.dbengine.database.LLUtils;
2021-03-11 14:45:45 +01:00
import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep;
2022-03-20 14:33:27 +01:00
import it.cavallium.dbengine.database.collections.DatabaseStageEntry;
import it.cavallium.dbengine.database.collections.DatabaseStageMap;
2021-07-17 11:52:08 +02:00
import it.cavallium.dbengine.database.collections.ValueGetter;
2022-07-02 11:44:13 +02:00
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
2021-09-20 12:51:27 +02:00
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
2022-07-02 11:44:13 +02:00
import it.cavallium.dbengine.database.disk.LLIndexSearchers.UnshardedIndexSearchers;
import it.cavallium.dbengine.database.disk.LuceneThreadFactory;
import it.cavallium.dbengine.lucene.LuceneConcurrentMergeScheduler.LuceneMergeThread;
2022-01-12 16:18:31 +01:00
import it.cavallium.dbengine.lucene.analyzer.LegacyWordAnalyzer;
2021-02-05 20:34:58 +01:00
import it.cavallium.dbengine.lucene.analyzer.NCharGramAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.NCharGramEdgeAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity;
import it.cavallium.dbengine.lucene.analyzer.WordAnalyzer;
2022-03-05 15:46:40 +01:00
import it.cavallium.dbengine.lucene.directory.RocksdbDirectory;
import it.cavallium.dbengine.lucene.mlt.BigCompositeReader;
import it.cavallium.dbengine.lucene.mlt.MultiMoreLikeThis;
2022-07-02 11:44:13 +02:00
import it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite;
2021-07-06 01:30:37 +02:00
import it.cavallium.dbengine.lucene.searcher.LocalQueryParams;
2022-07-02 11:44:13 +02:00
import it.cavallium.dbengine.lucene.searcher.LocalSearcher;
import it.cavallium.dbengine.lucene.searcher.LuceneSearchResult;
import it.cavallium.dbengine.lucene.searcher.MultiSearcher;
import it.cavallium.dbengine.lucene.similarity.NGramSimilarity;
2022-03-05 15:46:40 +01:00
import it.cavallium.dbengine.rpc.current.data.ByteBuffersDirectory;
import it.cavallium.dbengine.rpc.current.data.DirectIOFSDirectory;
import it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers;
import it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities;
import it.cavallium.dbengine.rpc.current.data.LuceneDirectoryOptions;
import it.cavallium.dbengine.rpc.current.data.LuceneIndexStructure;
2022-06-20 11:55:41 +02:00
import it.cavallium.dbengine.rpc.current.data.LuceneOptions;
2022-03-05 15:46:40 +01:00
import it.cavallium.dbengine.rpc.current.data.MemoryMappedFSDirectory;
import it.cavallium.dbengine.rpc.current.data.NIOFSDirectory;
import it.cavallium.dbengine.rpc.current.data.NRTCachingDirectory;
2022-07-22 13:49:03 +02:00
import it.cavallium.dbengine.rpc.current.data.RAFFSDirectory;
2022-03-05 15:46:40 +01:00
import it.cavallium.dbengine.rpc.current.data.RocksDBSharedDirectory;
2022-03-11 17:59:46 +01:00
import it.cavallium.dbengine.rpc.current.data.RocksDBStandaloneDirectory;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
2021-12-18 18:16:56 +01:00
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap;
2021-07-01 21:19:52 +02:00
import java.io.EOFException;
2021-02-14 13:46:11 +01:00
import java.io.IOException;
2021-07-01 21:19:52 +02:00
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
2022-03-05 15:46:40 +01:00
import java.nio.file.Path;
2021-12-12 23:40:30 +01:00
import java.time.Duration;
import java.util.ArrayList;
2022-01-28 19:31:25 +01:00
import java.util.Collection;
2021-05-28 16:04:59 +02:00
import java.util.HashMap;
import java.util.List;
2021-03-11 14:45:45 +01:00
import java.util.Map;
import java.util.Map.Entry;
2021-07-17 23:06:26 +02:00
import java.util.NoSuchElementException;
2022-03-05 15:46:40 +01:00
import java.util.Optional;
2021-07-17 23:06:26 +02:00
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.analysis.Analyzer;
2022-01-11 22:23:07 +01:00
import org.apache.lucene.analysis.CharArraySet;
import org.apache.lucene.analysis.en.EnglishAnalyzer;
import org.apache.lucene.analysis.it.ItalianAnalyzer;
2021-05-28 16:04:59 +02:00
import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
2021-02-14 13:46:11 +01:00
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexReader;
2021-02-14 13:46:11 +01:00
import org.apache.lucene.index.IndexableField;
2022-06-20 11:55:41 +02:00
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.TieredMergePolicy;
2022-03-05 15:46:40 +01:00
import org.apache.lucene.misc.store.DirectIODirectory;
2022-07-22 13:49:03 +02:00
import org.apache.lucene.misc.store.RAFDirectory;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery.Builder;
2021-12-12 23:40:30 +01:00
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.ConstantScoreQuery;
2021-09-22 11:03:39 +02:00
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.MatchNoDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
2021-12-12 23:40:30 +01:00
import org.apache.lucene.search.TimeLimitingCollector;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldDocs;
2021-08-04 01:12:39 +02:00
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.search.similarities.BooleanSimilarity;
import org.apache.lucene.search.similarities.ClassicSimilarity;
2021-05-28 16:04:59 +02:00
import org.apache.lucene.search.similarities.PerFieldSimilarityWrapper;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.search.similarities.TFIDFSimilarity;
2022-03-05 15:46:40 +01:00
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
2022-07-22 13:49:03 +02:00
import org.apache.lucene.store.MMapDirectory;
2022-03-05 15:46:40 +01:00
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.StringHelper;
2021-07-17 23:06:26 +02:00
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.novasearch.lucene.search.similarities.BM25Similarity;
import org.novasearch.lucene.search.similarities.BM25Similarity.BM25Model;
import org.novasearch.lucene.search.similarities.LdpSimilarity;
import org.novasearch.lucene.search.similarities.LtcSimilarity;
import org.novasearch.lucene.search.similarities.RobertsonSimilarity;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
2021-09-07 11:28:03 +02:00
import reactor.core.scheduler.Schedulers;
2021-09-18 18:34:21 +02:00
import reactor.util.concurrent.Queues;
public class LuceneUtils {
2021-07-17 23:06:26 +02:00
private static final Logger logger = LogManager.getLogger(LuceneUtils.class);
2021-07-17 23:06:26 +02:00
2022-01-11 22:23:07 +01:00
private static final Analyzer luceneEdge4GramAnalyzerEdgeInstance = new NCharGramEdgeAnalyzer(4, 4);
private static final Analyzer lucene4GramAnalyzerInstance = new NCharGramAnalyzer(4, 4);
private static final Analyzer luceneEdge3To5GramAnalyzerEdgeInstance = new NCharGramEdgeAnalyzer(3, 5);
private static final Analyzer lucene3To5GramAnalyzerInstance = new NCharGramAnalyzer(3, 5);
private static final Analyzer luceneStandardAnalyzerInstance = new StandardAnalyzer();
2022-01-12 16:18:31 +01:00
private static final Analyzer luceneWordAnalyzerLegacy1Instance = new LegacyWordAnalyzer(false, true, true);
private static final Analyzer luceneWordAnalyzerLegacy2Instance = new LegacyWordAnalyzer(false, false, true);
2022-01-18 00:02:55 +01:00
private static final Analyzer luceneWordAnalyzerLegacy3Instance = new LegacyWordAnalyzer(false, true, true);
2022-01-11 22:23:07 +01:00
private static final Analyzer luceneWordAnalyzerStemInstance = new WordAnalyzer(false,true);
private static final Analyzer luceneWordAnalyzerSimpleInstance = new WordAnalyzer(false, false);
private static final Analyzer luceneICUCollationKeyInstance = new WordAnalyzer(true, true);
private static final Similarity luceneBM25StandardSimilarityInstance = new org.apache.lucene.search.similarities.BM25Similarity();
private static final Similarity luceneBM25ClassicSimilarityInstance = new BM25Similarity(BM25Model.CLASSIC);
private static final Similarity luceneBM25PlusSimilarityInstance = new BM25Similarity(BM25Model.PLUS);
private static final Similarity luceneBM25LSimilarityInstance = new BM25Similarity(BM25Model.L);
private static final Similarity luceneBM15PlusSimilarityInstance = new BM25Similarity(1.2f, 0.0f, 0.5f, BM25Model.PLUS);
private static final Similarity luceneBM11PlusSimilarityInstance = new BM25Similarity(1.2f, 1.0f, 0.5f, BM25Model.PLUS);
private static final Similarity luceneBM25ClassicNGramSimilarityInstance = NGramSimilarity.bm25(BM25Model.CLASSIC);
private static final Similarity luceneBM25PlusNGramSimilarityInstance = NGramSimilarity.bm25(BM25Model.PLUS);
private static final Similarity luceneBM25LNGramSimilarityInstance = NGramSimilarity.bm25(BM25Model.L);
private static final Similarity luceneBM15PlusNGramSimilarityInstance = NGramSimilarity.bm15(BM25Model.PLUS);
private static final Similarity luceneBM11PlusNGramSimilarityInstance = NGramSimilarity.bm11(BM25Model.PLUS);
private static final Similarity luceneClassicSimilarityInstance = new ClassicSimilarity();
private static final Similarity luceneClassicNGramSimilarityInstance = NGramSimilarity.classic();
private static final Similarity luceneLTCSimilarityInstance = new LtcSimilarity();
private static final Similarity luceneLDPSimilarityInstance = new LdpSimilarity();
private static final Similarity luceneLDPNoLengthSimilarityInstance = new LdpSimilarity(0, 0.5f);
private static final Similarity luceneBooleanSimilarityInstance = new BooleanSimilarity();
private static final Similarity luceneRobertsonSimilarityInstance = new RobertsonSimilarity();
2021-09-20 18:20:59 +02:00
// TODO: remove this default page limits and make the limits configurable into QueryParams
private static final PageLimits DEFAULT_PAGE_LIMITS = new ExponentialPageLimits();
2022-01-11 22:23:07 +01:00
private static final CharArraySet ENGLISH_AND_ITALIAN_STOP_WORDS;
private static final LuceneIndexStructure SINGLE_STRUCTURE = new LuceneIndexStructure(1, IntList.of(0));
2022-06-20 11:55:41 +02:00
private static final it.cavallium.dbengine.rpc.current.data.TieredMergePolicy DEFAULT_MERGE_POLICY = new it.cavallium.dbengine.rpc.current.data.TieredMergePolicy(
Nullabledouble.empty(),
Nullabledouble.empty(),
Nullableint.empty(),
Nullablelong.empty(),
Nullablelong.empty(),
Nullabledouble.empty(),
Nullablelong.empty(),
Nullabledouble.empty()
);
2022-01-11 22:23:07 +01:00
private static final Scheduler LUCENE_COMMON_SCHEDULER = uninterruptibleScheduler(Schedulers.newBoundedElastic(
DEFAULT_BOUNDED_ELASTIC_SIZE,
DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
new LuceneThreadFactory("lucene-common").setDaemon(true).withGroup(new ThreadGroup("lucene-common")),
Math.toIntExact(Duration.ofHours(1).toSeconds())
));
2022-01-11 22:23:07 +01:00
static {
var cas = new CharArraySet(
EnglishAnalyzer.ENGLISH_STOP_WORDS_SET.size() + ItalianAnalyzer.getDefaultStopSet().size(), true);
cas.addAll(EnglishAnalyzer.ENGLISH_STOP_WORDS_SET);
cas.addAll(ItalianAnalyzer.getDefaultStopSet());
ENGLISH_AND_ITALIAN_STOP_WORDS = CharArraySet.unmodifiableSet(cas);
}
2021-05-28 16:04:59 +02:00
@SuppressWarnings("DuplicatedCode")
public static Analyzer getAnalyzer(TextFieldsAnalyzer analyzer) {
2021-05-28 16:04:59 +02:00
return switch (analyzer) {
2022-01-11 22:23:07 +01:00
case N4Gram -> lucene4GramAnalyzerInstance;
case N4GramEdge -> luceneEdge4GramAnalyzerEdgeInstance;
case N3To5Gram -> lucene3To5GramAnalyzerInstance;
case N3To5GramEdge -> luceneEdge3To5GramAnalyzerEdgeInstance;
2021-05-28 16:04:59 +02:00
case Standard -> luceneStandardAnalyzerInstance;
2022-01-11 22:23:07 +01:00
case StandardMultilanguage -> luceneWordAnalyzerStemInstance;
2022-01-12 16:18:31 +01:00
case LegacyFullText -> luceneWordAnalyzerLegacy1Instance;
case LegacyWordWithStemming -> luceneWordAnalyzerLegacy2Instance;
2022-01-18 00:02:55 +01:00
case LegacyICU -> luceneWordAnalyzerLegacy3Instance;
2022-01-11 22:23:07 +01:00
case StandardSimple -> luceneWordAnalyzerSimpleInstance;
2021-05-28 16:04:59 +02:00
case ICUCollationKey -> luceneICUCollationKeyInstance;
//noinspection UnnecessaryDefault
default -> throw new UnsupportedOperationException("Unknown analyzer: " + analyzer);
};
}
2021-05-28 16:04:59 +02:00
@SuppressWarnings("DuplicatedCode")
public static Similarity getSimilarity(TextFieldsSimilarity similarity) {
2021-05-28 16:04:59 +02:00
return switch (similarity) {
2022-01-11 22:23:07 +01:00
case BM25Standard -> luceneBM25StandardSimilarityInstance;
2021-05-28 16:04:59 +02:00
case BM25Classic -> luceneBM25ClassicSimilarityInstance;
case NGramBM25Classic -> luceneBM25ClassicNGramSimilarityInstance;
case BM25L -> luceneBM25LSimilarityInstance;
case NGramBM25L -> luceneBM25LNGramSimilarityInstance;
case Classic -> luceneClassicSimilarityInstance;
case NGramClassic -> luceneClassicNGramSimilarityInstance;
case BM25Plus -> luceneBM25PlusSimilarityInstance;
case NGramBM25Plus -> luceneBM25PlusNGramSimilarityInstance;
case BM15Plus -> luceneBM15PlusSimilarityInstance;
case NGramBM15Plus -> luceneBM15PlusNGramSimilarityInstance;
case BM11Plus -> luceneBM11PlusSimilarityInstance;
case NGramBM11Plus -> luceneBM11PlusNGramSimilarityInstance;
case LTC -> luceneLTCSimilarityInstance;
case LDP -> luceneLDPSimilarityInstance;
case LDPNoLength -> luceneLDPNoLengthSimilarityInstance;
case Robertson -> luceneRobertsonSimilarityInstance;
case Boolean -> luceneBooleanSimilarityInstance;
//noinspection UnnecessaryDefault
default -> throw new IllegalStateException("Unknown similarity: " + similarity);
};
}
2021-07-17 23:06:26 +02:00
/**
* @throws NoSuchElementException when the key is not found
* @throws IOException when an error occurs when reading the document
*/
@NotNull
2022-03-10 01:43:37 +01:00
public static IndexableField keyOfTopDoc(int docId, IndexReader indexReader,
2021-07-17 23:06:26 +02:00
String keyFieldName) throws IOException, NoSuchElementException {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called keyOfTopDoc in a nonblocking thread");
}
if (docId > indexReader.maxDoc()) {
throw new IOException("Document " + docId + " > maxDoc (" +indexReader.maxDoc() + ")");
}
DocumentStoredSingleFieldVisitor visitor = new DocumentStoredSingleFieldVisitor(keyFieldName);
indexReader.document(docId, visitor);
Document d = visitor.getDocument();
if (d.getFields().isEmpty()) {
2021-07-17 23:06:26 +02:00
throw new NoSuchElementException(
"Can't get key (field \"" + keyFieldName + "\") of document docId: " + docId + ". Available fields: []");
2021-07-04 01:34:17 +02:00
} else {
var field = d.getField(keyFieldName);
if (field == null) {
2021-07-17 23:06:26 +02:00
throw new NoSuchElementException(
"Can't get key (field \"" + keyFieldName + "\") of document docId: " + docId + ". Available fields: " + d
.getFields()
.stream()
.map(IndexableField::name)
.collect(Collectors.joining(",", "[", "]")));
} else {
2022-03-10 01:43:37 +01:00
return field;
}
2021-07-04 01:34:17 +02:00
}
}
2021-03-11 14:45:45 +01:00
public static <T, U, V> ValueGetter<Entry<T, U>, V> getAsyncDbValueGetterDeep(
CompositeSnapshot snapshot,
2022-03-20 14:33:27 +01:00
DatabaseMapDictionaryDeep<T, Object2ObjectSortedMap<U, V>, ? extends DatabaseStageMap<U, V, ? extends DatabaseStageEntry<V>>> dictionaryDeep) {
2022-05-21 15:28:52 +02:00
return entry -> Mono.usingWhen(dictionaryDeep.at(snapshot, entry.getKey()),
sub -> sub.getValue(snapshot, entry.getValue()),
2022-05-21 22:41:48 +02:00
LLUtils::finalizeResource
2022-05-21 15:28:52 +02:00
);
2021-03-11 14:45:45 +01:00
}
2021-05-28 16:04:59 +02:00
public static PerFieldAnalyzerWrapper toPerFieldAnalyzerWrapper(IndicizerAnalyzers indicizerAnalyzers) {
HashMap<String, Analyzer> perFieldAnalyzer = new HashMap<>();
indicizerAnalyzers
.fieldAnalyzer()
.forEach((key, value) -> perFieldAnalyzer.put(key, LuceneUtils.getAnalyzer(value)));
return new PerFieldAnalyzerWrapper(LuceneUtils.getAnalyzer(indicizerAnalyzers.defaultAnalyzer()), perFieldAnalyzer);
}
public static PerFieldSimilarityWrapper toPerFieldSimilarityWrapper(IndicizerSimilarities indicizerSimilarities) {
HashMap<String, Similarity> perFieldSimilarity = new HashMap<>();
indicizerSimilarities
.fieldSimilarity()
.forEach((key, value) -> perFieldSimilarity.put(key, LuceneUtils.getSimilarity(value)));
var defaultSimilarity = LuceneUtils.getSimilarity(indicizerSimilarities.defaultSimilarity());
return new PerFieldSimilarityWrapper() {
@Override
public Similarity get(String name) {
return perFieldSimilarity.getOrDefault(name, defaultSimilarity);
}
};
}
2021-07-01 21:19:52 +02:00
public static int alignUnsigned(int number, boolean expand) {
if (number % 4096 != 0) {
if (expand) {
return number + (4096 - (number % 4096));
} else {
return number - (number % 4096);
}
} else {
return number;
}
}
public static long alignUnsigned(long number, boolean expand) {
if (number % 4096L != 0) {
if (expand) {
return number + (4096L - (number % 4096L));
} else {
return number - (number % 4096L);
}
} else {
return number;
}
}
public static void readInternalAligned(Object ref,
FileChannel channel,
long pos,
ByteBuffer b,
int readLength,
int usefulLength,
long end) throws IOException {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called readInternalAligned in a nonblocking thread");
}
2021-07-01 21:19:52 +02:00
int startBufPosition = b.position();
int readData = 0;
int i;
for(; readLength > 0; readLength -= i) {
int toRead = readLength;
b.limit(b.position() + toRead);
assert b.remaining() == toRead;
var beforeReadBufPosition = b.position();
channel.read(b, pos);
b.limit(Math.min(startBufPosition + usefulLength, b.position() + toRead));
var afterReadBufPosition = b.position();
i = (afterReadBufPosition - beforeReadBufPosition);
readData += i;
if (i < toRead && i > 0) {
if (readData < usefulLength) {
throw new EOFException("read past EOF: " + ref + " buffer: " + b + " chunkLen: " + toRead + " end: " + end);
}
if (readData == usefulLength) {
b.limit(b.position());
// File end reached
return;
}
}
if (i < 0) {
throw new EOFException("read past EOF: " + ref + " buffer: " + b + " chunkLen: " + toRead + " end: " + end);
}
assert i > 0 : "FileChannel.read with non zero-length bb.remaining() must always read at least one byte (FileChannel is in blocking mode, see spec of ReadableByteChannel)";
2021-09-22 18:33:28 +02:00
pos += i;
2021-07-01 21:19:52 +02:00
}
assert readLength == 0;
}
public static int safeLongToInt(long l) {
if (l > 2147483630) {
return 2147483630;
} else if (l < -2147483630) {
return -2147483630;
} else {
return (int) l;
}
}
@Nullable
public static ScoreDoc getLastScoreDoc(ScoreDoc[] scoreDocs) {
if (scoreDocs == null) {
return null;
}
if (scoreDocs.length == 0) {
return null;
}
return scoreDocs[scoreDocs.length - 1];
}
2021-07-06 01:30:37 +02:00
2021-11-16 23:19:23 +01:00
public static LocalQueryParams toLocalQueryParams(QueryParams queryParams, Analyzer analyzer) {
return new LocalQueryParams(QueryParser.toQuery(queryParams.query(), analyzer),
2021-10-15 22:03:53 +02:00
queryParams.offset(),
queryParams.limit(),
2021-09-20 18:20:59 +02:00
DEFAULT_PAGE_LIMITS,
2021-07-06 01:30:37 +02:00
QueryParser.toSort(queryParams.sort()),
2021-12-12 23:40:30 +01:00
queryParams.computePreciseHitsCount(),
Duration.ofMillis(queryParams.timeoutMilliseconds())
2021-07-06 01:30:37 +02:00
);
}
2021-09-18 18:34:21 +02:00
public static Flux<LLKeyScore> convertHits(Flux<ScoreDoc> hitsFlux,
2021-09-22 11:03:39 +02:00
List<IndexSearcher> indexSearchers,
2022-02-26 03:28:20 +01:00
@Nullable String keyFieldName,
2021-07-30 14:01:12 +02:00
boolean preserveOrder) {
2021-09-18 18:34:21 +02:00
if (preserveOrder) {
return hitsFlux
.publishOn(LuceneUtils.luceneScheduler())
.mapNotNull(hit -> mapHitBlocking(hit, indexSearchers, keyFieldName))
.publishOn(Schedulers.parallel());
2021-09-18 18:34:21 +02:00
} else {
return hitsFlux
.buffer(Queues.XS_BUFFER_SIZE, () -> new ArrayList<Object>(Queues.XS_BUFFER_SIZE))
.flatMap(shardHits -> Mono.fromCallable(() -> {
2022-02-14 00:31:31 +01:00
int i2 = 0;
int size = shardHits.size();
for (int i = 0; i < size; i++) {
var el = mapHitBlocking((ScoreDoc) shardHits.get(i), indexSearchers, keyFieldName);
if (el != null) {
shardHits.set(i2, el);
i2++;
}
}
if (i2 < size) {
//noinspection unchecked
return (List<LLKeyScore>) (List<?>) shardHits.subList(0, i2);
} else {
//noinspection unchecked
return (List<LLKeyScore>) (List<?>) shardHits;
}
}).subscribeOn(luceneScheduler()))
.flatMapIterable(a -> a)
.publishOn(Schedulers.parallel());
2021-09-18 18:34:21 +02:00
}
2021-07-30 14:01:12 +02:00
}
@Nullable
private static LLKeyScore mapHitBlocking(ScoreDoc hit,
2021-09-22 11:03:39 +02:00
List<IndexSearcher> indexSearchers,
2022-02-26 03:28:20 +01:00
@Nullable String keyFieldName) {
assert !Schedulers.isInNonBlockingThread();
2021-07-30 14:01:12 +02:00
int shardDocId = hit.doc;
int shardIndex = hit.shardIndex;
float score = hit.score;
IndexSearcher indexSearcher;
2021-09-22 11:03:39 +02:00
if (shardIndex == -1 && indexSearchers.size() == 1) {
indexSearcher = indexSearchers.get(0);
} else {
indexSearcher = indexSearchers.get(shardIndex);
2021-09-22 11:03:39 +02:00
}
2021-07-30 14:01:12 +02:00
try {
2022-03-10 01:43:37 +01:00
IndexableField collectedDoc;
2022-02-26 03:28:20 +01:00
if (keyFieldName != null) {
collectedDoc = keyOfTopDoc(shardDocId, indexSearcher.getIndexReader(), keyFieldName);
} else {
collectedDoc = null;
}
2022-03-15 11:46:00 +01:00
return new LLKeyScore(shardDocId, shardIndex, score, collectedDoc);
2021-07-30 14:01:12 +02:00
} catch (NoSuchElementException ex) {
2021-08-24 11:06:25 +02:00
logger.debug("Error: document {} key is not present!", shardDocId);
2021-07-30 14:01:12 +02:00
return null;
} catch (Exception ex) {
2021-08-24 11:06:25 +02:00
logger.error("Failed to read document {}", shardDocId, ex);
2022-03-15 11:46:00 +01:00
return new LLKeyScore(shardDocId, shardIndex, score, null);
2021-07-30 14:01:12 +02:00
}
}
public static TopDocs mergeTopDocs(
@Nullable Sort sort,
@Nullable Integer startN,
@Nullable Integer topN,
2021-11-09 00:54:09 +01:00
TopDocs[] topDocs) {
2021-07-08 18:54:53 +02:00
if ((startN == null) != (topN == null)) {
throw new IllegalArgumentException("You must pass startN and topN together or nothing");
}
TopDocs result;
if (sort != null) {
if (!(topDocs instanceof TopFieldDocs[])) {
throw new IllegalStateException("Expected TopFieldDocs[], got TopDocs[]");
}
2021-07-08 18:54:53 +02:00
if (startN == null) {
int defaultTopN = 0;
for (TopDocs td : topDocs) {
int length = td.scoreDocs.length;
defaultTopN += length;
}
result = TopDocs.merge(sort, 0, defaultTopN,
2021-11-09 00:54:09 +01:00
(TopFieldDocs[]) topDocs
2021-07-08 18:54:53 +02:00
);
} else {
result = TopDocs.merge(sort, startN,
topN,
2021-11-09 00:54:09 +01:00
(TopFieldDocs[]) topDocs
2021-07-08 18:54:53 +02:00
);
}
} else {
2021-07-08 18:54:53 +02:00
if (startN == null) {
int defaultTopN = 0;
for (TopDocs td : topDocs) {
int length = td.scoreDocs.length;
defaultTopN += length;
}
result = TopDocs.merge(0,
defaultTopN,
2021-11-09 00:54:09 +01:00
topDocs
2021-07-08 18:54:53 +02:00
);
} else {
result = TopDocs.merge(startN,
topN,
2021-11-09 00:54:09 +01:00
topDocs
2021-07-08 18:54:53 +02:00
);
}
}
return result;
}
2021-07-17 23:06:26 +02:00
2022-02-11 21:08:23 +01:00
public static int totalHitsThreshold(@Nullable Boolean complete) {
return complete == null || complete ? Integer.MAX_VALUE : 1;
2021-07-17 23:06:26 +02:00
}
2021-08-04 01:12:39 +02:00
2022-02-11 21:08:23 +01:00
public static long totalHitsThresholdLong(@Nullable Boolean complete) {
return complete == null || complete ? Long.MAX_VALUE : 1;
2021-10-15 22:03:53 +02:00
}
2021-08-04 01:12:39 +02:00
public static TotalHitsCount convertTotalHitsCount(TotalHits totalHits) {
return switch (totalHits.relation) {
case EQUAL_TO -> TotalHitsCount.of(totalHits.value, true);
case GREATER_THAN_OR_EQUAL_TO -> TotalHitsCount.of(totalHits.value, false);
};
}
public static TotalHitsCount sum(TotalHitsCount totalHitsCount, TotalHitsCount totalHitsCount1) {
return TotalHitsCount.of(totalHitsCount.value() + totalHitsCount1.value(),
totalHitsCount.exact() && totalHitsCount1.exact()
);
}
@SuppressWarnings("unused")
public static String toHumanReadableString(TotalHitsCount totalHitsCount) {
if (totalHitsCount.exact()) {
return Long.toString(totalHitsCount.value());
} else {
return totalHitsCount.value() + "+";
}
}
2021-09-08 21:34:52 +02:00
2022-01-28 19:31:25 +01:00
public static Query getMoreLikeThisQuery(LLIndexSearchers inputIndexSearchers,
LocalQueryParams localQueryParams,
Analyzer analyzer,
Similarity similarity,
2022-01-28 19:31:25 +01:00
Multimap<String, String> mltDocumentFieldsMultimap) throws IOException {
List<IndexSearcher> indexSearchers = inputIndexSearchers.shards();
Query luceneAdditionalQuery = localQueryParams.query();
// Create the mutable version of the input
Map<String, Collection<String>> mltDocumentFields = HashMultimap.create(mltDocumentFieldsMultimap).asMap();
mltDocumentFields.entrySet().removeIf(entry -> entry.getValue().isEmpty());
if (mltDocumentFields.isEmpty()) {
return new MatchNoDocsQuery();
}
2022-01-28 19:31:25 +01:00
MultiMoreLikeThis mlt;
if (indexSearchers.size() == 1) {
mlt = new MultiMoreLikeThis(new BigCompositeReader<>(indexSearchers.get(0).getIndexReader(), IndexReader[]::new),
null
);
} else {
IndexReader[] indexReaders = new IndexReader[indexSearchers.size()];
for (int i = 0, size = indexSearchers.size(); i < size; i++) {
indexReaders[i] = indexSearchers.get(i).getIndexReader();
}
mlt = new MultiMoreLikeThis(new BigCompositeReader<>(indexReaders, new ArrayIndexComparator(indexReaders)), null);
}
mlt.setAnalyzer(analyzer);
mlt.setFieldNames(mltDocumentFields.keySet().toArray(String[]::new));
mlt.setMinTermFreq(1);
mlt.setMinDocFreq(3);
mlt.setMaxDocFreqPct(20);
mlt.setBoost(localQueryParams.needsScores());
mlt.setStopWords(ENGLISH_AND_ITALIAN_STOP_WORDS);
if (similarity instanceof TFIDFSimilarity tfidfSimilarity) {
mlt.setSimilarity(tfidfSimilarity);
} else {
mlt.setSimilarity(new ClassicSimilarity());
}
// Get the reference docId and apply it to MoreLikeThis, to generate the query
Query mltQuery = mlt.like(mltDocumentFields);
Query luceneQuery;
if (!(luceneAdditionalQuery instanceof MatchAllDocsQuery)) {
luceneQuery = new Builder()
.add(mltQuery, Occur.MUST)
.add(new ConstantScoreQuery(luceneAdditionalQuery), Occur.MUST)
.build();
} else {
luceneQuery = mltQuery;
}
return luceneQuery;
}
2021-12-12 23:40:30 +01:00
2021-12-16 02:38:56 +01:00
public static Collector withTimeout(Collector collector, Duration timeout) {
2021-12-12 23:40:30 +01:00
return new TimeLimitingCollector(collector, TimeLimitingCollector.getGlobalCounter(), timeout.toMillis());
}
2022-03-05 15:46:40 +01:00
public static String getStandardName(String clusterName, int shardIndex) {
return clusterName + "-shard" + shardIndex;
}
public static int getLuceneIndexId(LLTerm id, int totalShards) {
return Math.abs(StringHelper.murmurhash3_x86_32(id.getValueBytesRef(), 7) % totalShards);
}
2022-07-25 01:57:34 +02:00
public static CheckOutputDirectory createLuceneDirectory(LuceneDirectoryOptions directoryOptions,
String directoryName,
LuceneRocksDBManager rocksDBManager)
throws IOException {
return new CheckOutputDirectory(createLuceneDirectoryInternal(directoryOptions, directoryName, rocksDBManager));
}
private static Directory createLuceneDirectoryInternal(LuceneDirectoryOptions directoryOptions,
2022-03-05 15:46:40 +01:00
String directoryName,
LuceneRocksDBManager rocksDBManager)
throws IOException {
Directory directory;
2022-03-05 15:46:40 +01:00
if (directoryOptions instanceof ByteBuffersDirectory) {
directory = new org.apache.lucene.store.ByteBuffersDirectory();
2022-03-05 15:46:40 +01:00
} else if (directoryOptions instanceof DirectIOFSDirectory directIOFSDirectory) {
2022-07-25 01:57:34 +02:00
FSDirectory delegateDirectory = (FSDirectory) createLuceneDirectoryInternal(directIOFSDirectory.delegate(),
2022-03-05 15:46:40 +01:00
directoryName,
rocksDBManager
);
if (Constants.LINUX || Constants.MAC_OS_X) {
try {
int mergeBufferSize = directIOFSDirectory.mergeBufferSize().orElse(DirectIODirectory.DEFAULT_MERGE_BUFFER_SIZE);
long minBytesDirect = directIOFSDirectory.minBytesDirect().orElse(DirectIODirectory.DEFAULT_MIN_BYTES_DIRECT);
directory = new DirectIODirectory(delegateDirectory, mergeBufferSize, minBytesDirect);
2022-03-05 15:46:40 +01:00
} catch (UnsupportedOperationException ex) {
logger.warn("Failed to open FSDirectory with DIRECT flag", ex);
directory = delegateDirectory;
2022-03-05 15:46:40 +01:00
}
} else {
logger.warn("Failed to open FSDirectory with DIRECT flag because the operating system is Windows");
directory = delegateDirectory;
2022-03-05 15:46:40 +01:00
}
} else if (directoryOptions instanceof MemoryMappedFSDirectory memoryMappedFSDirectory) {
directory = new MMapDirectory(memoryMappedFSDirectory.managedPath().resolve(directoryName + ".lucene.db"));
2022-03-05 15:46:40 +01:00
} else if (directoryOptions instanceof NIOFSDirectory niofsDirectory) {
directory = new org.apache.lucene.store.NIOFSDirectory(niofsDirectory
2022-03-05 15:46:40 +01:00
.managedPath()
.resolve(directoryName + ".lucene.db"));
2022-07-22 13:49:03 +02:00
} else if (directoryOptions instanceof RAFFSDirectory rafFsDirectory) {
directory = new RAFDirectory(rafFsDirectory.managedPath().resolve(directoryName + ".lucene.db"));
2022-03-05 15:46:40 +01:00
} else if (directoryOptions instanceof NRTCachingDirectory nrtCachingDirectory) {
2022-07-25 01:57:34 +02:00
var delegateDirectory = createLuceneDirectoryInternal(nrtCachingDirectory.delegate(), directoryName, rocksDBManager);
directory = new org.apache.lucene.store.NRTCachingDirectory(delegateDirectory,
2022-06-20 11:55:41 +02:00
toMB(nrtCachingDirectory.maxMergeSizeBytes()),
toMB(nrtCachingDirectory.maxCachedBytes())
2022-03-05 15:46:40 +01:00
);
} else if (directoryOptions instanceof RocksDBSharedDirectory rocksDBSharedDirectory) {
var dbInstance = rocksDBManager.getOrCreate(rocksDBSharedDirectory.managedPath());
directory = new RocksdbDirectory(rocksDBManager.getAllocator(),
2022-03-09 02:29:38 +01:00
dbInstance.db(),
2022-03-05 15:46:40 +01:00
dbInstance.handles(),
directoryName,
rocksDBSharedDirectory.blockSize()
);
2022-03-11 17:59:46 +01:00
} else if (directoryOptions instanceof RocksDBStandaloneDirectory rocksDBStandaloneDirectory) {
var dbInstance = rocksDBManager.getOrCreate(rocksDBStandaloneDirectory.managedPath());
directory = new RocksdbDirectory(rocksDBManager.getAllocator(),
2022-03-11 17:59:46 +01:00
dbInstance.db(),
dbInstance.handles(),
directoryName,
rocksDBStandaloneDirectory.blockSize()
);
} else {
2022-03-05 15:46:40 +01:00
throw new UnsupportedOperationException("Unsupported directory: " + directoryName + ", " + directoryOptions);
}
2022-07-25 01:57:34 +02:00
return directory;
2022-03-05 15:46:40 +01:00
}
public static Optional<Path> getManagedPath(LuceneDirectoryOptions directoryOptions) {
if (directoryOptions instanceof ByteBuffersDirectory) {
return Optional.empty();
} else if (directoryOptions instanceof DirectIOFSDirectory directIOFSDirectory) {
return getManagedPath(directIOFSDirectory.delegate());
} else if (directoryOptions instanceof MemoryMappedFSDirectory memoryMappedFSDirectory) {
return Optional.of(memoryMappedFSDirectory.managedPath());
} else if (directoryOptions instanceof NIOFSDirectory niofsDirectory) {
return Optional.of(niofsDirectory.managedPath());
2022-07-22 13:49:03 +02:00
} else if (directoryOptions instanceof RAFFSDirectory raffsDirectory) {
return Optional.of(raffsDirectory.managedPath());
2022-03-05 15:46:40 +01:00
} else if (directoryOptions instanceof NRTCachingDirectory nrtCachingDirectory) {
return getManagedPath(nrtCachingDirectory.delegate());
2022-03-11 17:59:46 +01:00
} else if (directoryOptions instanceof RocksDBStandaloneDirectory rocksDBStandaloneDirectory) {
return Optional.of(rocksDBStandaloneDirectory.managedPath());
2022-03-05 15:46:40 +01:00
} else if (directoryOptions instanceof RocksDBSharedDirectory rocksDBSharedDirectory) {
return Optional.of(rocksDBSharedDirectory.managedPath());
} else {
throw new UnsupportedOperationException("Unsupported directory: " + directoryOptions);
}
}
2022-03-11 17:59:46 +01:00
public static boolean getIsFilesystemCompressed(LuceneDirectoryOptions directoryOptions) {
if (directoryOptions instanceof ByteBuffersDirectory) {
return false;
} else if (directoryOptions instanceof DirectIOFSDirectory directIOFSDirectory) {
return getIsFilesystemCompressed(directIOFSDirectory.delegate());
} else if (directoryOptions instanceof MemoryMappedFSDirectory) {
return false;
} else if (directoryOptions instanceof NIOFSDirectory) {
return false;
2022-07-22 13:49:03 +02:00
} else if (directoryOptions instanceof RAFFSDirectory) {
return false;
2022-03-11 17:59:46 +01:00
} else if (directoryOptions instanceof NRTCachingDirectory nrtCachingDirectory) {
return getIsFilesystemCompressed(nrtCachingDirectory.delegate());
} else if (directoryOptions instanceof RocksDBStandaloneDirectory) {
return true;
} else if (directoryOptions instanceof RocksDBSharedDirectory) {
return true;
} else {
throw new UnsupportedOperationException("Unsupported directory: " + directoryOptions);
}
}
public static IntList intListTo(int to) {
var il = new IntArrayList(to);
for (int i = 0; i < to; i++) {
il.add(i);
}
return il;
}
public static LuceneIndexStructure singleStructure() {
return SINGLE_STRUCTURE;
}
public static LuceneIndexStructure shardsStructure(int count) {
return new LuceneIndexStructure(count, intListTo(count));
}
2022-06-20 11:55:41 +02:00
public static MergePolicy getMergePolicy(LuceneOptions luceneOptions) {
var mergePolicy = new TieredMergePolicy();
var mergePolicyOptions = luceneOptions.mergePolicy();
if (mergePolicyOptions.deletesPctAllowed().isPresent()) {
mergePolicy.setDeletesPctAllowed(mergePolicyOptions.deletesPctAllowed().get());
}
if (mergePolicyOptions.forceMergeDeletesPctAllowed().isPresent()) {
mergePolicy.setForceMergeDeletesPctAllowed(mergePolicyOptions.forceMergeDeletesPctAllowed().get());
}
if (mergePolicyOptions.maxMergeAtOnce().isPresent()) {
mergePolicy.setMaxMergeAtOnce(mergePolicyOptions.maxMergeAtOnce().get());
}
if (mergePolicyOptions.maxMergedSegmentBytes().isPresent()) {
mergePolicy.setMaxMergedSegmentMB(toMB(mergePolicyOptions.maxMergedSegmentBytes().get()));
}
if (mergePolicyOptions.floorSegmentBytes().isPresent()) {
mergePolicy.setFloorSegmentMB(toMB(mergePolicyOptions.floorSegmentBytes().get()));
}
if (mergePolicyOptions.segmentsPerTier().isPresent()) {
mergePolicy.setSegmentsPerTier(mergePolicyOptions.segmentsPerTier().get());
}
if (mergePolicyOptions.maxCFSSegmentSizeBytes().isPresent()) {
mergePolicy.setMaxCFSSegmentSizeMB(toMB(mergePolicyOptions.maxCFSSegmentSizeBytes().get()));
}
if (mergePolicyOptions.noCFSRatio().isPresent()) {
mergePolicy.setNoCFSRatio(mergePolicyOptions.noCFSRatio().get());
}
return mergePolicy;
}
public static double toMB(long bytes) {
if (bytes == Long.MAX_VALUE) return Double.MAX_VALUE;
return ((double) bytes) / 1024D / 1024D;
}
public static it.cavallium.dbengine.rpc.current.data.TieredMergePolicy getDefaultMergePolicy() {
return DEFAULT_MERGE_POLICY;
}
2022-07-02 11:44:13 +02:00
public static QueryParams getCountQueryParams(it.cavallium.dbengine.client.query.current.data.Query query) {
return QueryParams.of(query, 0, 0, NoSort.of(), false, Long.MAX_VALUE);
}
2022-07-02 12:22:16 +02:00
/**
* Rewrite a lucene query of a local searcher, then call the local searcher again with the rewritten query
*/
2022-07-02 11:44:13 +02:00
public static Mono<LuceneSearchResult> rewrite(LocalSearcher localSearcher,
Mono<LLIndexSearcher> indexSearcherMono,
LocalQueryParams queryParams,
String keyFieldName,
GlobalQueryRewrite transformer) {
2022-08-24 14:30:16 +02:00
return Mono.usingWhen(indexSearcherMono.map(LLIndexSearchers::unsharded), indexSearchers -> Mono
.fromCallable(() -> transformer.rewrite(indexSearchers, queryParams))
.transform(LuceneUtils::scheduleLucene)
.flatMap(queryParams2 ->
localSearcher.collect(indexSearcherMono, queryParams2, keyFieldName, NO_REWRITE)),
LLUtils::finalizeResource);
2022-07-02 11:44:13 +02:00
}
2022-07-02 12:22:16 +02:00
/**
* Rewrite a lucene query of a multi searcher, then call the multi searcher again with the rewritten query
*/
2022-07-02 11:44:13 +02:00
public static Mono<LuceneSearchResult> rewriteMulti(MultiSearcher multiSearcher,
Mono<LLIndexSearchers> indexSearchersMono,
LocalQueryParams queryParams,
String keyFieldName,
GlobalQueryRewrite transformer) {
return Mono.usingWhen(indexSearchersMono,
indexSearchers -> Mono
.fromCallable(() -> transformer.rewrite(indexSearchers, queryParams))
.transform(LuceneUtils::scheduleLucene)
2022-07-02 11:44:13 +02:00
.flatMap(queryParams2 ->
multiSearcher.collectMulti(indexSearchersMono, queryParams2, keyFieldName, NO_REWRITE)),
LLUtils::finalizeResource
);
}
public static void checkLuceneThread() {
var thread = Thread.currentThread();
if (!isLuceneThread()) {
throw printLuceneThreadWarning(thread);
}
}
@SuppressWarnings("ThrowableNotThrown")
public static void warnLuceneThread() {
var thread = Thread.currentThread();
if (!isLuceneThread()) {
printLuceneThreadWarning(thread);
}
}
private static IllegalStateException printLuceneThreadWarning(Thread thread) {
var error = new IllegalStateException("Current thread is not a lucene thread: " + thread.getId() + " " + thread
+ ". Schedule it using LuceneUtils.luceneScheduler()");
logger.warn("Current thread is not a lucene thread: {} {}", thread.getId(), thread, error);
return error;
}
public static boolean isLuceneThread() {
var thread = Thread.currentThread();
return thread instanceof LuceneThread || thread instanceof LuceneMergeThread;
}
public static Scheduler luceneScheduler() {
return LUCENE_COMMON_SCHEDULER;
}
public static <T> Mono<T> scheduleLucene(Mono<T> prev) {
return prev.subscribeOn(LUCENE_COMMON_SCHEDULER).publishOn(Schedulers.parallel());
}
public static <T> Flux<T> scheduleLucene(Flux<T> prev) {
return prev.subscribeOn(LUCENE_COMMON_SCHEDULER).publishOn(Schedulers.parallel());
}
}