From 58943b5e08da6b8c97b0fc7e400fa87d3457bf24 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Fri, 28 Jan 2022 19:31:25 +0100 Subject: [PATCH] Simplify query transformations --- .../cavallium/dbengine/client/Indicizer.java | 7 +- .../dbengine/client/LuceneIndexImpl.java | 2 +- .../dbengine/database/LLLuceneIndex.java | 3 +- .../database/disk/LLLocalLuceneIndex.java | 24 +--- .../disk/LLLocalMultiLuceneIndex.java | 28 +--- .../dbengine/lucene/LuceneUtils.java | 121 ++++++++---------- .../lucene/mlt/MoreLikeThisTransformer.java | 55 ++++++++ .../lucene/mlt/MultiMoreLikeThis.java | 6 +- .../UnsortedUnscoredSimpleMultiSearcher.java | 26 ++-- 9 files changed, 138 insertions(+), 134 deletions(-) create mode 100644 src/main/java/it/cavallium/dbengine/lucene/mlt/MoreLikeThisTransformer.java diff --git a/src/main/java/it/cavallium/dbengine/client/Indicizer.java b/src/main/java/it/cavallium/dbengine/client/Indicizer.java index 7fdd0a2..74c7ebd 100644 --- a/src/main/java/it/cavallium/dbengine/client/Indicizer.java +++ b/src/main/java/it/cavallium/dbengine/client/Indicizer.java @@ -1,11 +1,14 @@ package it.cavallium.dbengine.client; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; import it.cavallium.dbengine.database.LLIndexRequest; import it.cavallium.dbengine.database.LLSoftUpdateDocument; import it.cavallium.dbengine.database.LLUpdateDocument; import it.cavallium.dbengine.database.LLTerm; import it.cavallium.dbengine.database.LLUpdateFields; import it.cavallium.dbengine.database.LLUtils; +import java.util.Map; import java.util.Set; import org.jetbrains.annotations.NotNull; import reactor.core.publisher.Flux; @@ -43,7 +46,7 @@ public abstract class Indicizer { public abstract IndicizerSimilarities getPerFieldSimilarity(); - public Flux>> getMoreLikeThisDocumentFields(T key, U value) { - return Flux.empty(); + public Multimap getMoreLikeThisDocumentFields(T key, U value) { + return Multimaps.forMap(Map.of()); } } diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java index 43c035c..5e4de64 100644 --- a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java +++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java @@ -90,7 +90,7 @@ public class LuceneIndexImpl implements LuceneIndex { public Mono>> moreLikeThis(ClientQueryParams queryParams, T key, U mltDocumentValue) { - Flux>> mltDocumentFields + var mltDocumentFields = indicizer.getMoreLikeThisDocumentFields(key, mltDocumentValue); return luceneIndex diff --git a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java index 30a7535..7f62143 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine.database; +import com.google.common.collect.Multimap; import io.net5.buffer.api.Resource; import io.net5.buffer.api.Send; import it.cavallium.data.generator.nativedata.Nullablefloat; @@ -51,7 +52,7 @@ public interface LLLuceneIndex extends LLSnapshottable { Mono moreLikeThis(@Nullable LLSnapshot snapshot, QueryParams queryParams, String keyFieldName, - Flux>> mltDocumentFields); + Multimap mltDocumentFields); /** * @param queryParams the limit is valid for each lucene instance. If you have 15 instances, the number of elements diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index 92259aa..3e42cef 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -6,10 +6,10 @@ import static it.cavallium.dbengine.database.LLUtils.toDocument; import static it.cavallium.dbengine.database.LLUtils.toFields; import static it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.NO_TRANSFORMATION; +import com.google.common.collect.Multimap; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Timer; -import io.net5.buffer.api.Resource; import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.DirectIOOptions; import it.cavallium.dbengine.client.IndicizerAnalyzers; @@ -32,10 +32,10 @@ import it.cavallium.dbengine.lucene.AlwaysDirectIOFSDirectory; import it.cavallium.dbengine.lucene.LuceneHacks; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.collector.Buckets; +import it.cavallium.dbengine.lucene.mlt.MoreLikeThisTransformer; import it.cavallium.dbengine.lucene.searcher.AdaptiveLocalSearcher; import it.cavallium.dbengine.lucene.searcher.BucketParams; import it.cavallium.dbengine.lucene.searcher.DecimalBucketMultiSearcher; -import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer; import it.cavallium.dbengine.lucene.searcher.LocalQueryParams; import it.cavallium.dbengine.lucene.searcher.LocalSearcher; import java.io.IOException; @@ -45,7 +45,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; -import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; @@ -80,7 +79,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; -import reactor.util.function.Tuple2; public class LLLocalLuceneIndex implements LLLuceneIndex { @@ -442,10 +440,10 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { public Mono moreLikeThis(@Nullable LLSnapshot snapshot, QueryParams queryParams, String keyFieldName, - Flux>> mltDocumentFieldsFlux) { + Multimap mltDocumentFieldsFlux) { LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams, luceneAnalyzer); var searcher = this.searcherManager.retrieveSearcher(snapshot); - var transformer = new MoreLikeThisTransformer(mltDocumentFieldsFlux); + var transformer = new MoreLikeThisTransformer(mltDocumentFieldsFlux, luceneAnalyzer, luceneSimilarity); return localSearcher .collect(searcher, localQueryParams, keyFieldName, transformer) @@ -601,18 +599,4 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { return lowMemory; } - private class MoreLikeThisTransformer implements LLSearchTransformer { - - private final Flux>> mltDocumentFieldsFlux; - - public MoreLikeThisTransformer(Flux>> mltDocumentFieldsFlux) { - this.mltDocumentFieldsFlux = mltDocumentFieldsFlux; - } - - @Override - public Mono transform(Mono inputMono) { - return inputMono.flatMap(input -> LuceneUtils.getMoreLikeThisQuery(input.indexSearchers(), input.queryParams(), - luceneAnalyzer, luceneSimilarity, mltDocumentFieldsFlux)); - } - } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index c834f0c..1af956c 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -2,8 +2,8 @@ package it.cavallium.dbengine.database.disk; import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler; +import com.google.common.collect.Multimap; import io.micrometer.core.instrument.MeterRegistry; -import io.net5.buffer.api.Resource; import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.IndicizerAnalyzers; import it.cavallium.dbengine.client.IndicizerSimilarities; @@ -12,23 +12,22 @@ import it.cavallium.dbengine.client.query.QueryParser; import it.cavallium.dbengine.client.query.current.data.Query; import it.cavallium.dbengine.client.query.current.data.QueryParams; import it.cavallium.dbengine.database.LLIndexRequest; -import it.cavallium.dbengine.database.LLUpdateDocument; -import it.cavallium.dbengine.database.LLItem; import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.database.LLSearchResultShard; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLTerm; +import it.cavallium.dbengine.database.LLUpdateDocument; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.lucene.LuceneHacks; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.collector.Buckets; +import it.cavallium.dbengine.lucene.mlt.MoreLikeThisTransformer; import it.cavallium.dbengine.lucene.searcher.AdaptiveMultiSearcher; import it.cavallium.dbengine.lucene.searcher.BucketParams; import it.cavallium.dbengine.lucene.searcher.DecimalBucketMultiSearcher; import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer; import it.cavallium.dbengine.lucene.searcher.LocalQueryParams; import it.cavallium.dbengine.lucene.searcher.MultiSearcher; -import it.unimi.dsi.fastutil.doubles.DoubleArrayList; import java.io.Closeable; import java.io.IOException; import java.nio.file.Path; @@ -39,7 +38,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper; @@ -49,7 +47,6 @@ import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; -import reactor.util.function.Tuple2; public class LLLocalMultiLuceneIndex implements LLLuceneIndex { @@ -233,10 +230,10 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { public Mono moreLikeThis(@Nullable LLSnapshot snapshot, QueryParams queryParams, String keyFieldName, - Flux>> mltDocumentFields) { + Multimap mltDocumentFields) { LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams, luceneAnalyzer); var searchers = this.getIndexSearchers(snapshot); - var transformer = new MultiMoreLikeThisTransformer(mltDocumentFields); + var transformer = new MoreLikeThisTransformer(mltDocumentFields, luceneAnalyzer, luceneSimilarity); // Collect all the shards results into a single global result return multiSearcher @@ -340,19 +337,4 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { public boolean isLowMemoryMode() { return luceneIndices[0].isLowMemoryMode(); } - - private class MultiMoreLikeThisTransformer implements LLSearchTransformer { - - private final Flux>> mltDocumentFields; - - public MultiMoreLikeThisTransformer(Flux>> mltDocumentFields) { - this.mltDocumentFields = mltDocumentFields; - } - - @Override - public Mono transform(Mono inputMono) { - return inputMono.flatMap(input -> LuceneUtils.getMoreLikeThisQuery(input.indexSearchers(), input.queryParams(), - luceneAnalyzer, luceneSimilarity, mltDocumentFields)); - } - } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java index c2c5725..294adda 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java @@ -2,6 +2,8 @@ package it.cavallium.dbengine.lucene; import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.client.IndicizerAnalyzers; import it.cavallium.dbengine.client.IndicizerSimilarities; @@ -31,12 +33,12 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.time.Duration; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; -import java.util.Set; import java.util.stream.Collectors; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -49,6 +51,7 @@ import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexableField; +import org.apache.lucene.queries.mlt.MoreLikeThisQuery; import org.apache.lucene.search.BooleanClause.Occur; import org.apache.lucene.search.BooleanQuery.Builder; import org.apache.lucene.search.Collector; @@ -79,7 +82,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import reactor.util.concurrent.Queues; -import reactor.util.function.Tuple2; public class LuceneUtils { @@ -470,80 +472,57 @@ public class LuceneUtils { } } - public static Mono getMoreLikeThisQuery( - LLIndexSearchers inputIndexSearchers, + public static Query getMoreLikeThisQuery(LLIndexSearchers inputIndexSearchers, LocalQueryParams localQueryParams, Analyzer analyzer, Similarity similarity, - Flux>> mltDocumentFieldsFlux) { - var indexSearchers = inputIndexSearchers.shards(); - Query luceneAdditionalQuery; - try { - luceneAdditionalQuery = localQueryParams.query(); - } catch (Exception e) { - return Mono.error(e); + Multimap mltDocumentFieldsMultimap) throws IOException { + List indexSearchers = inputIndexSearchers.shards(); + Query luceneAdditionalQuery = localQueryParams.query(); + // Create the mutable version of the input + Map> mltDocumentFields = HashMultimap.create(mltDocumentFieldsMultimap).asMap(); + + mltDocumentFields.entrySet().removeIf(entry -> entry.getValue().isEmpty()); + if (mltDocumentFields.isEmpty()) { + return new MatchNoDocsQuery(); + } + MultiMoreLikeThis mlt; + if (indexSearchers.size() == 1) { + mlt = new MultiMoreLikeThis(new BigCompositeReader<>(indexSearchers.get(0).getIndexReader(), IndexReader[]::new), + null + ); + } else { + IndexReader[] indexReaders = new IndexReader[indexSearchers.size()]; + for (int i = 0, size = indexSearchers.size(); i < size; i++) { + indexReaders[i] = indexSearchers.get(i).getIndexReader(); + } + mlt = new MultiMoreLikeThis(new BigCompositeReader<>(indexReaders, new ArrayIndexComparator(indexReaders)), null); + } + mlt.setAnalyzer(analyzer); + mlt.setFieldNames(mltDocumentFields.keySet().toArray(String[]::new)); + mlt.setMinTermFreq(1); + mlt.setMinDocFreq(3); + mlt.setMaxDocFreqPct(20); + mlt.setBoost(localQueryParams.needsScores()); + mlt.setStopWords(ENGLISH_AND_ITALIAN_STOP_WORDS); + if (similarity instanceof TFIDFSimilarity tfidfSimilarity) { + mlt.setSimilarity(tfidfSimilarity); + } else { + mlt.setSimilarity(new ClassicSimilarity()); } - return mltDocumentFieldsFlux - .collectMap(Tuple2::getT1, Tuple2::getT2, HashMap::new) - .flatMap(mltDocumentFields -> Mono.fromCallable(() -> { - mltDocumentFields.entrySet().removeIf(entry -> entry.getValue().isEmpty()); - if (mltDocumentFields.isEmpty()) { - return new LocalQueryParams(new MatchNoDocsQuery(), - localQueryParams.offsetLong(), - localQueryParams.limitLong(), - DEFAULT_PAGE_LIMITS, - localQueryParams.minCompetitiveScore(), - localQueryParams.sort(), - localQueryParams.computePreciseHitsCount(), - localQueryParams.timeout() - ); - } - MultiMoreLikeThis mlt; - if (indexSearchers.size() == 1) { - mlt = new MultiMoreLikeThis(new BigCompositeReader<>(indexSearchers.get(0).getIndexReader(), IndexReader[]::new), null); - } else { - IndexReader[] indexReaders = new IndexReader[indexSearchers.size()]; - for (int i = 0, size = indexSearchers.size(); i < size; i++) { - indexReaders[i] = indexSearchers.get(i).getIndexReader(); - } - mlt = new MultiMoreLikeThis(new BigCompositeReader<>(indexReaders, new ArrayIndexComparator(indexReaders)), null); - } - mlt.setAnalyzer(analyzer); - mlt.setFieldNames(mltDocumentFields.keySet().toArray(String[]::new)); - mlt.setMinTermFreq(1); - mlt.setMinDocFreq(3); - mlt.setMaxDocFreqPct(20); - mlt.setBoost(localQueryParams.needsScores()); - mlt.setStopWords(ENGLISH_AND_ITALIAN_STOP_WORDS); - if (similarity instanceof TFIDFSimilarity tfidfSimilarity) { - mlt.setSimilarity(tfidfSimilarity); - } else { - mlt.setSimilarity(new ClassicSimilarity()); - } - // Get the reference docId and apply it to MoreLikeThis, to generate the query - @SuppressWarnings({"unchecked", "rawtypes"}) - var mltQuery = mlt.like((Map) mltDocumentFields); - Query luceneQuery; - if (!(luceneAdditionalQuery instanceof MatchAllDocsQuery)) { - luceneQuery = new Builder() - .add(mltQuery, Occur.MUST) - .add(new ConstantScoreQuery(luceneAdditionalQuery), Occur.MUST) - .build(); - } else { - luceneQuery = mltQuery; - } - - return new LocalQueryParams(luceneQuery, - localQueryParams.offsetLong(), - localQueryParams.limitLong(), - DEFAULT_PAGE_LIMITS, - localQueryParams.minCompetitiveScore(), - localQueryParams.sort(), - localQueryParams.computePreciseHitsCount(), - localQueryParams.timeout()); - }).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))) - .publishOn(Schedulers.parallel()); + // Get the reference docId and apply it to MoreLikeThis, to generate the query + Query mltQuery = mlt.like(mltDocumentFields); + Query luceneQuery; + if (!(luceneAdditionalQuery instanceof MatchAllDocsQuery)) { + luceneQuery = new Builder() + .add(mltQuery, Occur.MUST) + .add(new ConstantScoreQuery(luceneAdditionalQuery), Occur.MUST) + .build(); + } else { + luceneQuery = mltQuery; + } + return luceneQuery; } public static Collector withTimeout(Collector collector, Duration timeout) { diff --git a/src/main/java/it/cavallium/dbengine/lucene/mlt/MoreLikeThisTransformer.java b/src/main/java/it/cavallium/dbengine/lucene/mlt/MoreLikeThisTransformer.java new file mode 100644 index 0000000..5731674 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/mlt/MoreLikeThisTransformer.java @@ -0,0 +1,55 @@ +package it.cavallium.dbengine.lucene.mlt; + +import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler; + +import com.google.common.collect.Multimap; +import it.cavallium.dbengine.lucene.LuceneUtils; +import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer; +import it.cavallium.dbengine.lucene.searcher.LocalQueryParams; +import java.io.IOException; +import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper; +import org.apache.lucene.search.similarities.PerFieldSimilarityWrapper; +import org.apache.lucene.search.similarities.Similarity; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +public class MoreLikeThisTransformer implements LLSearchTransformer { + + private final Multimap mltDocumentFields; + private final PerFieldAnalyzerWrapper luceneAnalyzer; + private final Similarity luceneSimilarity; + + public MoreLikeThisTransformer(Multimap mltDocumentFields, + PerFieldAnalyzerWrapper luceneAnalyzer, + Similarity luceneSimilarity) { + this.mltDocumentFields = mltDocumentFields; + this.luceneAnalyzer = luceneAnalyzer; + this.luceneSimilarity = luceneSimilarity; + } + + @Override + public Mono transform(Mono inputMono) { + return inputMono.publishOn(uninterruptibleScheduler(Schedulers.boundedElastic())).handle((input, sink) -> { + try { + var rewrittenQuery = LuceneUtils.getMoreLikeThisQuery(input.indexSearchers(), + input.queryParams(), + luceneAnalyzer, + luceneSimilarity, + mltDocumentFields + ); + var queryParams = input.queryParams(); + sink.next(new LocalQueryParams(rewrittenQuery, + queryParams.offsetLong(), + queryParams.limitLong(), + queryParams.pageLimits(), + queryParams.minCompetitiveScore(), + queryParams.sort(), + queryParams.computePreciseHitsCount(), + queryParams.timeout() + )); + } catch (IOException ex) { + sink.error(ex); + } + }); + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/mlt/MultiMoreLikeThis.java b/src/main/java/it/cavallium/dbengine/lucene/mlt/MultiMoreLikeThis.java index 4446ee7..848f82e 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/mlt/MultiMoreLikeThis.java +++ b/src/main/java/it/cavallium/dbengine/lucene/mlt/MultiMoreLikeThis.java @@ -564,7 +564,7 @@ public final class MultiMoreLikeThis { * @param filteredDocument Document with field values extracted for selected fields. * @return More Like This query for the passed document. */ - public Query like(Map> filteredDocument) throws IOException { + public Query like(Map> filteredDocument) throws IOException { if (fieldNames == null) { // gather list of valid fields from lucene Collection fields = BigCompositeReader.getIndexedFields(ir); @@ -743,11 +743,11 @@ public final class MultiMoreLikeThis { } } - private PriorityQueue retrieveTerms(Map> field2fieldValues) + private PriorityQueue retrieveTerms(Map> field2fieldValues) throws IOException { Map> field2termFreqMap = new HashMap<>(); for (String fieldName : fieldNames) { - Collection fieldValues = field2fieldValues.get(fieldName); + Collection fieldValues = field2fieldValues.get(fieldName); if (fieldValues == null) { continue; } diff --git a/src/test/java/it/cavallium/dbengine/UnsortedUnscoredSimpleMultiSearcher.java b/src/test/java/it/cavallium/dbengine/UnsortedUnscoredSimpleMultiSearcher.java index 863b966..e5482d5 100644 --- a/src/test/java/it/cavallium/dbengine/UnsortedUnscoredSimpleMultiSearcher.java +++ b/src/test/java/it/cavallium/dbengine/UnsortedUnscoredSimpleMultiSearcher.java @@ -43,19 +43,8 @@ public class UnsortedUnscoredSimpleMultiSearcher implements MultiSearcher { return queryParamsMono.flatMap(queryParams2 -> { var localQueryParams = getLocalQueryParams(queryParams2); - return Mono - .fromRunnable(() -> { - LLUtils.ensureBlocking(); - if (queryParams2.isSorted() && queryParams2.limitLong() > 0) { - throw new UnsupportedOperationException("Sorted queries are not supported" - + " by SimpleUnsortedUnscoredLuceneMultiSearcher"); - } - if (queryParams2.needsScores() && queryParams2.limitLong() > 0) { - throw new UnsupportedOperationException("Scored queries are not supported" - + " by SimpleUnsortedUnscoredLuceneMultiSearcher"); - } - }) - .thenMany(Flux.fromIterable(indexSearchers.shards())) + return Flux + .fromIterable(indexSearchers.shards()) .flatMap(searcher -> { var llSearcher = Mono.fromCallable(() -> new LLIndexSearcher(searcher, false, null).send()); return localSearcher.collect(llSearcher, localQueryParams, keyFieldName, transformer); @@ -85,6 +74,17 @@ public class UnsortedUnscoredSimpleMultiSearcher implements MultiSearcher { } indexSearchers.close(); }); + }) + .doFirst(() -> { + LLUtils.ensureBlocking(); + if (queryParams2.isSorted() && queryParams2.limitLong() > 0) { + throw new UnsupportedOperationException("Sorted queries are not supported" + + " by SimpleUnsortedUnscoredLuceneMultiSearcher"); + } + if (queryParams2.needsScores() && queryParams2.limitLong() > 0) { + throw new UnsupportedOperationException("Scored queries are not supported" + + " by SimpleUnsortedUnscoredLuceneMultiSearcher"); + } }); } );