From 07ea61050f626598c5ada1d5dcc0f49672fdb800 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Wed, 3 Mar 2021 00:13:57 +0100 Subject: [PATCH] Cancellable iteration in lucene --- .../database/disk/LLLocalLuceneIndex.java | 19 +++++++++++++++---- .../lucene/LuceneParallelStreamCollector.java | 3 ++- .../lucene/LuceneParallelStreamConsumer.java | 4 ++-- .../dbengine/lucene/LuceneUtils.java | 12 ++++++++---- .../searcher/AdaptiveStreamSearcher.java | 4 +--- ...lyQueryParsingCollectorStreamSearcher.java | 4 +--- .../lucene/searcher/CountStreamSearcher.java | 4 +--- .../lucene/searcher/LuceneStreamSearcher.java | 13 +++++++++++-- .../lucene/searcher/PagedStreamSearcher.java | 15 ++++++++------- .../ParallelCollectorStreamSearcher.java | 11 ++++++----- .../lucene/searcher/SimpleStreamSearcher.java | 10 +++++----- 11 files changed, 60 insertions(+), 39 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index affcb3b..211a46e 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -19,6 +19,7 @@ import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity; import it.cavallium.dbengine.lucene.searcher.AdaptiveStreamSearcher; import it.cavallium.dbengine.lucene.searcher.AllowOnlyQueryParsingCollectorStreamSearcher; import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher; +import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher.HandleResult; import java.io.IOException; import java.nio.file.Path; import java.time.Duration; @@ -26,6 +27,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -550,13 +552,15 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { .unicast() .onBackpressureBuffer(); - var searchFlux = Flux.push(sink -> { + var searchFlux = Mono.create(sink -> { try { + var opId = new Random().nextInt(); if (doDistributedPre) { allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery); totalHitsCountSink.tryEmitValue(0L); } else { int boundedLimit = Math.max(0, limit > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) limit); + logger.warn(opId + " start"); streamSearcher.search(indexSearcher, luceneQuery, boundedLimit, @@ -565,12 +569,17 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { minCompetitiveScore, keyFieldName, keyScore -> { + logger.warn(opId + " item"); EmitResult result = topKeysSink.tryEmitNext(fixKeyScore(keyScore, scoreDivisor)); - if (result.isFailure()) { + if (result.isSuccess()) { + return HandleResult.CONTINUE; + } else { if (result == EmitResult.FAIL_CANCELLED) { logger.debug("Fail to emit next value: cancelled"); + return HandleResult.HALT; } else if (result == EmitResult.FAIL_TERMINATED) { logger.debug("Fail to emit next value: terminated"); + return HandleResult.HALT; } else if (result == EmitResult.FAIL_ZERO_SUBSCRIBER) { logger.error("Fail to emit next value: zero subscriber. You must subscribe to results before total hits if you specified a limit > 0!"); sink.error(new EmissionException(result)); @@ -581,6 +590,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } }, totalHitsCount -> { + logger.warn(opId + " total-hits-count"); EmitResult result = totalHitsCountSink.tryEmitValue(totalHitsCount); if (result.isFailure()) { if (result == EmitResult.FAIL_CANCELLED) { @@ -597,14 +607,15 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } ); } + logger.warn(opId + " complete"); topKeysSink.tryEmitComplete(); - sink.complete(); + sink.success(); } catch (IOException e) { topKeysSink.tryEmitError(e); totalHitsCountSink.tryEmitError(e); sink.error(e); } - }).share(); + }).subscribeOn(luceneQueryScheduler).cache(); return new LLSearchResult( Mono.firstWithValue(searchFlux.then(Mono.empty()), totalHitsCountSink.asMono()), diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneParallelStreamCollector.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneParallelStreamCollector.java index 0a62ca9..eb5bf71 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneParallelStreamCollector.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneParallelStreamCollector.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine.lucene; +import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher.HandleResult; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -61,7 +62,7 @@ public class LuceneParallelStreamCollector implements Collector, LeafCollector { if (!stopped.get()) { var score = scorer == null ? 0 : scorer.score(); if (minCompetitiveScore == null || score >= minCompetitiveScore) { - if (!streamConsumer.consume(doc, score)) { + if (streamConsumer.consume(doc, score) == HandleResult.HALT) { stopped.set(true); } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneParallelStreamConsumer.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneParallelStreamConsumer.java index f858c9e..1fb20e2 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneParallelStreamConsumer.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneParallelStreamConsumer.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine.lucene; +import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher.HandleResult; import java.io.IOException; public interface LuceneParallelStreamConsumer { @@ -7,7 +8,6 @@ public interface LuceneParallelStreamConsumer { /** * @param docId document id * @param score score of document - * @return true to continue, false to stop the execution */ - boolean consume(int docId, float score) throws IOException; + HandleResult consume(int docId, float score) throws IOException; } diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java index ed2209e..e5ea00c 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java @@ -7,10 +7,11 @@ import it.cavallium.dbengine.lucene.analyzer.NCharGramEdgeAnalyzer; import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer; import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity; import it.cavallium.dbengine.lucene.analyzer.WordAnalyzer; +import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher.HandleResult; +import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher.ResultItemConsumer; import it.cavallium.dbengine.lucene.similarity.NGramSimilarity; import java.io.IOException; import java.util.Set; -import java.util.function.Consumer; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.LowerCaseFilter; import org.apache.lucene.analysis.TokenStream; @@ -188,13 +189,13 @@ public class LuceneUtils { }); } - public static void collectTopDoc(Logger logger, + public static HandleResult collectTopDoc(Logger logger, int docId, float score, Float minCompetitiveScore, IndexSearcher indexSearcher, String keyFieldName, - Consumer resultsConsumer) throws IOException { + ResultItemConsumer resultsConsumer) throws IOException { if (minCompetitiveScore == null || score >= minCompetitiveScore) { Document d = indexSearcher.doc(docId, Set.of(keyFieldName)); if (d.getFields().isEmpty()) { @@ -211,9 +212,12 @@ public class LuceneUtils { if (field == null) { logger.error("Can't get key of document docId: {}, score: {}", docId, score); } else { - resultsConsumer.accept(new LLKeyScore(field.stringValue(), score)); + if (resultsConsumer.accept(new LLKeyScore(field.stringValue(), score)) == HandleResult.HALT) { + return HandleResult.HALT; + } } } } + return HandleResult.CONTINUE; } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveStreamSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveStreamSearcher.java index 08a00ed..82122d8 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveStreamSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveStreamSearcher.java @@ -1,8 +1,6 @@ package it.cavallium.dbengine.lucene.searcher; -import it.cavallium.dbengine.database.LLKeyScore; import java.io.IOException; -import java.util.function.Consumer; import java.util.function.LongConsumer; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; @@ -37,7 +35,7 @@ public class AdaptiveStreamSearcher implements LuceneStreamSearcher { ScoreMode scoreMode, @Nullable Float minCompetitiveScore, String keyFieldName, - Consumer consumer, + ResultItemConsumer consumer, LongConsumer totalHitsConsumer) throws IOException { if (limit == 0) { totalHitsConsumer.accept(countStreamSearcher.count(indexSearcher, query)); diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AllowOnlyQueryParsingCollectorStreamSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AllowOnlyQueryParsingCollectorStreamSearcher.java index 460e192..16ba4fd 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AllowOnlyQueryParsingCollectorStreamSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AllowOnlyQueryParsingCollectorStreamSearcher.java @@ -1,9 +1,7 @@ package it.cavallium.dbengine.lucene.searcher; -import it.cavallium.dbengine.database.LLKeyScore; import java.io.IOException; import java.util.Collection; -import java.util.function.Consumer; import java.util.function.LongConsumer; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.Collector; @@ -34,7 +32,7 @@ public class AllowOnlyQueryParsingCollectorStreamSearcher implements LuceneStrea ScoreMode scoreMode, @Nullable Float minCompetitiveScore, String keyFieldName, - Consumer resultsConsumer, + ResultItemConsumer resultsConsumer, LongConsumer totalHitsConsumer) throws IOException { if (limit > 0) { throw new IllegalArgumentException("Limit > 0 not allowed"); diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountStreamSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountStreamSearcher.java index f45055c..e1e7521 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountStreamSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountStreamSearcher.java @@ -1,8 +1,6 @@ package it.cavallium.dbengine.lucene.searcher; -import it.cavallium.dbengine.database.LLKeyScore; import java.io.IOException; -import java.util.function.Consumer; import java.util.function.LongConsumer; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; @@ -23,7 +21,7 @@ public class CountStreamSearcher implements LuceneStreamSearcher { ScoreMode scoreMode, @Nullable Float minCompetitiveScore, String keyFieldName, - Consumer resultsConsumer, + ResultItemConsumer resultsConsumer, LongConsumer totalHitsConsumer) throws IOException { if (limit != 0) { throw new IllegalArgumentException("CountStream doesn't support a limit different than 0"); diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneStreamSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneStreamSearcher.java index 29a8148..d692a75 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneStreamSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneStreamSearcher.java @@ -2,7 +2,6 @@ package it.cavallium.dbengine.lucene.searcher; import it.cavallium.dbengine.database.LLKeyScore; import java.io.IOException; -import java.util.function.Consumer; import java.util.function.LongConsumer; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; @@ -36,6 +35,16 @@ public interface LuceneStreamSearcher { ScoreMode scoreMode, @Nullable Float minCompetitiveScore, String keyFieldName, - Consumer resultsConsumer, + ResultItemConsumer resultsConsumer, LongConsumer totalHitsConsumer) throws IOException; + + @FunctionalInterface + interface ResultItemConsumer { + + HandleResult accept(LLKeyScore item); + } + + enum HandleResult { + CONTINUE, HALT + } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedStreamSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedStreamSearcher.java index 5ef3e08..2371f8d 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedStreamSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedStreamSearcher.java @@ -1,9 +1,7 @@ package it.cavallium.dbengine.lucene.searcher; -import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.lucene.LuceneUtils; import java.io.IOException; -import java.util.function.Consumer; import java.util.function.LongConsumer; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; @@ -34,7 +32,7 @@ public class PagedStreamSearcher implements LuceneStreamSearcher { ScoreMode scoreMode, @Nullable Float minCompetitiveScore, String keyFieldName, - Consumer resultsConsumer, + ResultItemConsumer resultsConsumer, LongConsumer totalHitsConsumer) throws IOException { if (limit < MAX_ITEMS_PER_PAGE) { // Use a normal search method because the limit is low @@ -74,29 +72,32 @@ public class PagedStreamSearcher implements LuceneStreamSearcher { } } - private void consumeHits(IntWrapper currentAllowedResults, + private HandleResult consumeHits(IntWrapper currentAllowedResults, ScoreDoc[] hits, IndexSearcher indexSearcher, @Nullable Float minCompetitiveScore, String keyFieldName, - Consumer resultsConsumer) throws IOException { + ResultItemConsumer resultsConsumer) throws IOException { for (ScoreDoc hit : hits) { int docId = hit.doc; float score = hit.score; if (currentAllowedResults.var-- > 0) { - LuceneUtils.collectTopDoc(logger, + if (LuceneUtils.collectTopDoc(logger, docId, score, minCompetitiveScore, indexSearcher, keyFieldName, resultsConsumer - ); + ) == HandleResult.HALT) { + return HandleResult.HALT; + } } else { break; } } + return HandleResult.CONTINUE; } private static ScoreDoc getLastItem(ScoreDoc[] scoreDocs) { diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/ParallelCollectorStreamSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/ParallelCollectorStreamSearcher.java index f5a7821..1bdc5bc 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/ParallelCollectorStreamSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/ParallelCollectorStreamSearcher.java @@ -6,7 +6,6 @@ import it.cavallium.dbengine.lucene.LuceneParallelStreamCollectorResult; import java.io.IOException; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; import java.util.function.LongConsumer; import org.apache.lucene.document.Document; import org.apache.lucene.index.IndexableField; @@ -29,7 +28,7 @@ public class ParallelCollectorStreamSearcher implements LuceneStreamSearcher { ScoreMode scoreMode, @Nullable Float minCompetitiveScore, String keyFieldName, - Consumer resultsConsumer, + ResultItemConsumer resultsConsumer, LongConsumer totalHitsConsumer) throws IOException { if (luceneSort != null) { throw new IllegalArgumentException("ParallelCollectorStreamSearcher doesn't support sorted searches"); @@ -39,7 +38,7 @@ public class ParallelCollectorStreamSearcher implements LuceneStreamSearcher { LuceneParallelStreamCollectorResult result = indexSearcher.search(query, LuceneParallelStreamCollectorManager.fromConsumer(scoreMode, minCompetitiveScore, (docId, score) -> { if (currentCount.getAndIncrement() >= limit) { - return false; + return HandleResult.HALT; } else { Document d = indexSearcher.doc(docId, Set.of(keyFieldName)); if (d.getFields().isEmpty()) { @@ -56,10 +55,12 @@ public class ParallelCollectorStreamSearcher implements LuceneStreamSearcher { if (field == null) { logger.error("Can't get key of document docId: {}", docId); } else { - resultsConsumer.accept(new LLKeyScore(field.stringValue(), score)); + if (resultsConsumer.accept(new LLKeyScore(field.stringValue(), score)) == HandleResult.HALT) { + return HandleResult.HALT; + } } } - return true; + return HandleResult.CONTINUE; } })); //todo: check the accuracy of our hits counter! diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleStreamSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleStreamSearcher.java index eaa691c..4886202 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleStreamSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleStreamSearcher.java @@ -1,10 +1,8 @@ package it.cavallium.dbengine.lucene.searcher; -import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.lucene.LuceneUtils; import it.unimi.dsi.fastutil.objects.ObjectArrayList; import java.io.IOException; -import java.util.function.Consumer; import java.util.function.LongConsumer; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; @@ -27,7 +25,7 @@ public class SimpleStreamSearcher implements LuceneStreamSearcher { ScoreMode scoreMode, @Nullable Float minCompetitiveScore, String keyFieldName, - Consumer resultsConsumer, + ResultItemConsumer resultsConsumer, LongConsumer totalHitsConsumer) throws IOException { TopDocs topDocs; if (luceneSort != null) { @@ -40,14 +38,16 @@ public class SimpleStreamSearcher implements LuceneStreamSearcher { for (ScoreDoc hit : hits) { int docId = hit.doc; float score = hit.score; - LuceneUtils.collectTopDoc(logger, + if (LuceneUtils.collectTopDoc(logger, docId, score, minCompetitiveScore, indexSearcher, keyFieldName, resultsConsumer - ); + ) == HandleResult.HALT) { + return; + } } } }