Cancellable iteration in lucene

This commit is contained in:
Andrea Cavalli 2021-03-03 00:13:57 +01:00
parent d9e1d38390
commit 07ea61050f
11 changed files with 60 additions and 39 deletions

View File

@ -19,6 +19,7 @@ import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity;
import it.cavallium.dbengine.lucene.searcher.AdaptiveStreamSearcher;
import it.cavallium.dbengine.lucene.searcher.AllowOnlyQueryParsingCollectorStreamSearcher;
import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher;
import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher.HandleResult;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
@ -26,6 +27,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@ -550,13 +552,15 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.unicast()
.onBackpressureBuffer();
var searchFlux = Flux.<Void>push(sink -> {
var searchFlux = Mono.<Void>create(sink -> {
try {
var opId = new Random().nextInt();
if (doDistributedPre) {
allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery);
totalHitsCountSink.tryEmitValue(0L);
} else {
int boundedLimit = Math.max(0, limit > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) limit);
logger.warn(opId + " start");
streamSearcher.search(indexSearcher,
luceneQuery,
boundedLimit,
@ -565,12 +569,17 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
minCompetitiveScore,
keyFieldName,
keyScore -> {
logger.warn(opId + " item");
EmitResult result = topKeysSink.tryEmitNext(fixKeyScore(keyScore, scoreDivisor));
if (result.isFailure()) {
if (result.isSuccess()) {
return HandleResult.CONTINUE;
} else {
if (result == EmitResult.FAIL_CANCELLED) {
logger.debug("Fail to emit next value: cancelled");
return HandleResult.HALT;
} else if (result == EmitResult.FAIL_TERMINATED) {
logger.debug("Fail to emit next value: terminated");
return HandleResult.HALT;
} else if (result == EmitResult.FAIL_ZERO_SUBSCRIBER) {
logger.error("Fail to emit next value: zero subscriber. You must subscribe to results before total hits if you specified a limit > 0!");
sink.error(new EmissionException(result));
@ -581,6 +590,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
}
},
totalHitsCount -> {
logger.warn(opId + " total-hits-count");
EmitResult result = totalHitsCountSink.tryEmitValue(totalHitsCount);
if (result.isFailure()) {
if (result == EmitResult.FAIL_CANCELLED) {
@ -597,14 +607,15 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
}
);
}
logger.warn(opId + " complete");
topKeysSink.tryEmitComplete();
sink.complete();
sink.success();
} catch (IOException e) {
topKeysSink.tryEmitError(e);
totalHitsCountSink.tryEmitError(e);
sink.error(e);
}
}).share();
}).subscribeOn(luceneQueryScheduler).cache();
return new LLSearchResult(
Mono.<Long>firstWithValue(searchFlux.then(Mono.empty()), totalHitsCountSink.asMono()),

View File

@ -1,5 +1,6 @@
package it.cavallium.dbengine.lucene;
import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher.HandleResult;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@ -61,7 +62,7 @@ public class LuceneParallelStreamCollector implements Collector, LeafCollector {
if (!stopped.get()) {
var score = scorer == null ? 0 : scorer.score();
if (minCompetitiveScore == null || score >= minCompetitiveScore) {
if (!streamConsumer.consume(doc, score)) {
if (streamConsumer.consume(doc, score) == HandleResult.HALT) {
stopped.set(true);
}
}

View File

@ -1,5 +1,6 @@
package it.cavallium.dbengine.lucene;
import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher.HandleResult;
import java.io.IOException;
public interface LuceneParallelStreamConsumer {
@ -7,7 +8,6 @@ public interface LuceneParallelStreamConsumer {
/**
* @param docId document id
* @param score score of document
* @return true to continue, false to stop the execution
*/
boolean consume(int docId, float score) throws IOException;
HandleResult consume(int docId, float score) throws IOException;
}

View File

@ -7,10 +7,11 @@ import it.cavallium.dbengine.lucene.analyzer.NCharGramEdgeAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity;
import it.cavallium.dbengine.lucene.analyzer.WordAnalyzer;
import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher.HandleResult;
import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher.ResultItemConsumer;
import it.cavallium.dbengine.lucene.similarity.NGramSimilarity;
import java.io.IOException;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.LowerCaseFilter;
import org.apache.lucene.analysis.TokenStream;
@ -188,13 +189,13 @@ public class LuceneUtils {
});
}
public static void collectTopDoc(Logger logger,
public static HandleResult collectTopDoc(Logger logger,
int docId,
float score,
Float minCompetitiveScore,
IndexSearcher indexSearcher,
String keyFieldName,
Consumer<LLKeyScore> resultsConsumer) throws IOException {
ResultItemConsumer resultsConsumer) throws IOException {
if (minCompetitiveScore == null || score >= minCompetitiveScore) {
Document d = indexSearcher.doc(docId, Set.of(keyFieldName));
if (d.getFields().isEmpty()) {
@ -211,9 +212,12 @@ public class LuceneUtils {
if (field == null) {
logger.error("Can't get key of document docId: {}, score: {}", docId, score);
} else {
resultsConsumer.accept(new LLKeyScore(field.stringValue(), score));
if (resultsConsumer.accept(new LLKeyScore(field.stringValue(), score)) == HandleResult.HALT) {
return HandleResult.HALT;
}
}
}
}
return HandleResult.CONTINUE;
}
}

View File

@ -1,8 +1,6 @@
package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.database.LLKeyScore;
import java.io.IOException;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
@ -37,7 +35,7 @@ public class AdaptiveStreamSearcher implements LuceneStreamSearcher {
ScoreMode scoreMode,
@Nullable Float minCompetitiveScore,
String keyFieldName,
Consumer<LLKeyScore> consumer,
ResultItemConsumer consumer,
LongConsumer totalHitsConsumer) throws IOException {
if (limit == 0) {
totalHitsConsumer.accept(countStreamSearcher.count(indexSearcher, query));

View File

@ -1,9 +1,7 @@
package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.database.LLKeyScore;
import java.io.IOException;
import java.util.Collection;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Collector;
@ -34,7 +32,7 @@ public class AllowOnlyQueryParsingCollectorStreamSearcher implements LuceneStrea
ScoreMode scoreMode,
@Nullable Float minCompetitiveScore,
String keyFieldName,
Consumer<LLKeyScore> resultsConsumer,
ResultItemConsumer resultsConsumer,
LongConsumer totalHitsConsumer) throws IOException {
if (limit > 0) {
throw new IllegalArgumentException("Limit > 0 not allowed");

View File

@ -1,8 +1,6 @@
package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.database.LLKeyScore;
import java.io.IOException;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
@ -23,7 +21,7 @@ public class CountStreamSearcher implements LuceneStreamSearcher {
ScoreMode scoreMode,
@Nullable Float minCompetitiveScore,
String keyFieldName,
Consumer<LLKeyScore> resultsConsumer,
ResultItemConsumer resultsConsumer,
LongConsumer totalHitsConsumer) throws IOException {
if (limit != 0) {
throw new IllegalArgumentException("CountStream doesn't support a limit different than 0");

View File

@ -2,7 +2,6 @@ package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.database.LLKeyScore;
import java.io.IOException;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
@ -36,6 +35,16 @@ public interface LuceneStreamSearcher {
ScoreMode scoreMode,
@Nullable Float minCompetitiveScore,
String keyFieldName,
Consumer<LLKeyScore> resultsConsumer,
ResultItemConsumer resultsConsumer,
LongConsumer totalHitsConsumer) throws IOException;
@FunctionalInterface
interface ResultItemConsumer {
HandleResult accept(LLKeyScore item);
}
enum HandleResult {
CONTINUE, HALT
}
}

View File

@ -1,9 +1,7 @@
package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.lucene.LuceneUtils;
import java.io.IOException;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
@ -34,7 +32,7 @@ public class PagedStreamSearcher implements LuceneStreamSearcher {
ScoreMode scoreMode,
@Nullable Float minCompetitiveScore,
String keyFieldName,
Consumer<LLKeyScore> resultsConsumer,
ResultItemConsumer resultsConsumer,
LongConsumer totalHitsConsumer) throws IOException {
if (limit < MAX_ITEMS_PER_PAGE) {
// Use a normal search method because the limit is low
@ -74,29 +72,32 @@ public class PagedStreamSearcher implements LuceneStreamSearcher {
}
}
private void consumeHits(IntWrapper currentAllowedResults,
private HandleResult consumeHits(IntWrapper currentAllowedResults,
ScoreDoc[] hits,
IndexSearcher indexSearcher,
@Nullable Float minCompetitiveScore,
String keyFieldName,
Consumer<LLKeyScore> resultsConsumer) throws IOException {
ResultItemConsumer resultsConsumer) throws IOException {
for (ScoreDoc hit : hits) {
int docId = hit.doc;
float score = hit.score;
if (currentAllowedResults.var-- > 0) {
LuceneUtils.collectTopDoc(logger,
if (LuceneUtils.collectTopDoc(logger,
docId,
score,
minCompetitiveScore,
indexSearcher,
keyFieldName,
resultsConsumer
);
) == HandleResult.HALT) {
return HandleResult.HALT;
}
} else {
break;
}
}
return HandleResult.CONTINUE;
}
private static ScoreDoc getLastItem(ScoreDoc[] scoreDocs) {

View File

@ -6,7 +6,6 @@ import it.cavallium.dbengine.lucene.LuceneParallelStreamCollectorResult;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexableField;
@ -29,7 +28,7 @@ public class ParallelCollectorStreamSearcher implements LuceneStreamSearcher {
ScoreMode scoreMode,
@Nullable Float minCompetitiveScore,
String keyFieldName,
Consumer<LLKeyScore> resultsConsumer,
ResultItemConsumer resultsConsumer,
LongConsumer totalHitsConsumer) throws IOException {
if (luceneSort != null) {
throw new IllegalArgumentException("ParallelCollectorStreamSearcher doesn't support sorted searches");
@ -39,7 +38,7 @@ public class ParallelCollectorStreamSearcher implements LuceneStreamSearcher {
LuceneParallelStreamCollectorResult result = indexSearcher.search(query, LuceneParallelStreamCollectorManager.fromConsumer(scoreMode, minCompetitiveScore, (docId, score) -> {
if (currentCount.getAndIncrement() >= limit) {
return false;
return HandleResult.HALT;
} else {
Document d = indexSearcher.doc(docId, Set.of(keyFieldName));
if (d.getFields().isEmpty()) {
@ -56,10 +55,12 @@ public class ParallelCollectorStreamSearcher implements LuceneStreamSearcher {
if (field == null) {
logger.error("Can't get key of document docId: {}", docId);
} else {
resultsConsumer.accept(new LLKeyScore(field.stringValue(), score));
if (resultsConsumer.accept(new LLKeyScore(field.stringValue(), score)) == HandleResult.HALT) {
return HandleResult.HALT;
}
}
}
return true;
return HandleResult.CONTINUE;
}
}));
//todo: check the accuracy of our hits counter!

View File

@ -1,10 +1,8 @@
package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.io.IOException;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
@ -27,7 +25,7 @@ public class SimpleStreamSearcher implements LuceneStreamSearcher {
ScoreMode scoreMode,
@Nullable Float minCompetitiveScore,
String keyFieldName,
Consumer<LLKeyScore> resultsConsumer,
ResultItemConsumer resultsConsumer,
LongConsumer totalHitsConsumer) throws IOException {
TopDocs topDocs;
if (luceneSort != null) {
@ -40,14 +38,16 @@ public class SimpleStreamSearcher implements LuceneStreamSearcher {
for (ScoreDoc hit : hits) {
int docId = hit.doc;
float score = hit.score;
LuceneUtils.collectTopDoc(logger,
if (LuceneUtils.collectTopDoc(logger,
docId,
score,
minCompetitiveScore,
indexSearcher,
keyFieldName,
resultsConsumer
);
) == HandleResult.HALT) {
return;
}
}
}
}