2021-01-30 22:14:48 +01:00
|
|
|
package it.cavallium.dbengine.lucene.searcher;
|
2020-12-07 22:15:18 +01:00
|
|
|
|
2021-01-29 17:19:01 +01:00
|
|
|
import it.cavallium.dbengine.database.LLKeyScore;
|
2021-01-30 22:14:48 +01:00
|
|
|
import it.cavallium.dbengine.lucene.LuceneParallelStreamCollectorManager;
|
2021-02-04 22:42:57 +01:00
|
|
|
import it.cavallium.dbengine.lucene.LuceneParallelStreamCollectorResult;
|
2020-12-07 22:15:18 +01:00
|
|
|
import java.io.IOException;
|
|
|
|
import java.util.Set;
|
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
import java.util.function.Consumer;
|
2021-01-29 17:19:01 +01:00
|
|
|
import java.util.function.LongConsumer;
|
2020-12-07 22:15:18 +01:00
|
|
|
import org.apache.lucene.document.Document;
|
|
|
|
import org.apache.lucene.index.IndexableField;
|
|
|
|
import org.apache.lucene.search.IndexSearcher;
|
|
|
|
import org.apache.lucene.search.Query;
|
2021-01-29 17:19:01 +01:00
|
|
|
import org.apache.lucene.search.ScoreMode;
|
2020-12-07 22:15:18 +01:00
|
|
|
import org.apache.lucene.search.Sort;
|
|
|
|
import org.jetbrains.annotations.Nullable;
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Unsorted search (low latency and constant memory usage)
|
|
|
|
*/
|
|
|
|
public class ParallelCollectorStreamSearcher implements LuceneStreamSearcher {
|
|
|
|
|
|
|
|
@Override
|
2021-01-29 17:19:01 +01:00
|
|
|
public void search(IndexSearcher indexSearcher,
|
2020-12-07 22:15:18 +01:00
|
|
|
Query query,
|
|
|
|
int limit,
|
|
|
|
@Nullable Sort luceneSort,
|
2021-01-29 17:19:01 +01:00
|
|
|
ScoreMode scoreMode,
|
2020-12-07 22:15:18 +01:00
|
|
|
String keyFieldName,
|
2021-01-29 17:19:01 +01:00
|
|
|
Consumer<LLKeyScore> resultsConsumer,
|
|
|
|
LongConsumer totalHitsConsumer) throws IOException {
|
2020-12-07 22:15:18 +01:00
|
|
|
if (luceneSort != null) {
|
|
|
|
throw new IllegalArgumentException("ParallelCollectorStreamSearcher doesn't support sorted searches");
|
|
|
|
}
|
|
|
|
|
|
|
|
AtomicInteger currentCount = new AtomicInteger();
|
|
|
|
|
2021-02-04 22:42:57 +01:00
|
|
|
LuceneParallelStreamCollectorResult result = indexSearcher.search(query, LuceneParallelStreamCollectorManager.fromConsumer(scoreMode, (docId, score) -> {
|
2020-12-07 22:15:18 +01:00
|
|
|
if (currentCount.getAndIncrement() >= limit) {
|
|
|
|
return false;
|
|
|
|
} else {
|
2021-02-04 22:42:57 +01:00
|
|
|
Document d = indexSearcher.doc(docId, Set.of(keyFieldName));
|
|
|
|
if (d.getFields().isEmpty()) {
|
|
|
|
logger.error("The document docId: {} is empty.", docId);
|
|
|
|
var realFields = indexSearcher.doc(docId).getFields();
|
|
|
|
if (!realFields.isEmpty()) {
|
|
|
|
logger.error("Present fields:");
|
|
|
|
for (IndexableField field : realFields) {
|
|
|
|
logger.error(" - {}", field.name());
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
2021-02-04 22:42:57 +01:00
|
|
|
}
|
|
|
|
} else {
|
|
|
|
var field = d.getField(keyFieldName);
|
|
|
|
if (field == null) {
|
|
|
|
logger.error("Can't get key of document docId: {}", docId);
|
2020-12-07 22:15:18 +01:00
|
|
|
} else {
|
2021-02-04 22:42:57 +01:00
|
|
|
resultsConsumer.accept(new LLKeyScore(field.stringValue(), score));
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}));
|
|
|
|
//todo: check the accuracy of our hits counter!
|
2021-01-29 17:19:01 +01:00
|
|
|
totalHitsConsumer.accept(result.getTotalHitsCount());
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
}
|