From 505de18ecb49f119bc9a7ecba942d608e4179391 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Wed, 14 Apr 2021 02:37:03 +0200 Subject: [PATCH] Fix returning a single result --- .../database/disk/LLLocalLuceneIndex.java | 131 +++++++++--------- .../disk/LLLocalMultiLuceneIndex.java | 85 +++++++----- .../lucene/searcher/SimpleStreamSearcher.java | 14 +- .../lucene/searcher/TopDocsSearcher.java | 14 ++ 4 files changed, 133 insertions(+), 111 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index 65c01c1..5929372 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -429,59 +429,55 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { return Mono.just(new LLSearchResult(Flux.empty())); } - return acquireSearcherWrapper(snapshot, doDistributedPre, actionId).flatMap(indexSearcher -> Mono - .fromCallable(() -> { - var mlt = new MoreLikeThis(indexSearcher.getIndexReader()); - mlt.setAnalyzer(indexWriter.getAnalyzer()); - mlt.setFieldNames(mltDocumentFields.keySet().toArray(String[]::new)); - mlt.setMinTermFreq(1); - mlt.setMinDocFreq(3); - mlt.setMaxDocFreqPct(20); - mlt.setBoost(QueryParser.isScoringEnabled(queryParams)); - mlt.setStopWords(EnglishItalianStopFilter.getStopWordsString()); - var similarity = getSimilarity(); - if (similarity instanceof TFIDFSimilarity) { - mlt.setSimilarity((TFIDFSimilarity) similarity); - } else { - logger.trace("Using an unsupported similarity algorithm for MoreLikeThis:" - + " {}. You must use a similarity instance based on TFIDFSimilarity!", similarity); - } + return acquireSearcherWrapper(snapshot, doDistributedPre, actionId) + .flatMap(indexSearcher -> Mono + .fromCallable(() -> { + var mlt = new MoreLikeThis(indexSearcher.getIndexReader()); + mlt.setAnalyzer(indexWriter.getAnalyzer()); + mlt.setFieldNames(mltDocumentFields.keySet().toArray(String[]::new)); + mlt.setMinTermFreq(1); + mlt.setMinDocFreq(3); + mlt.setMaxDocFreqPct(20); + mlt.setBoost(QueryParser.isScoringEnabled(queryParams)); + mlt.setStopWords(EnglishItalianStopFilter.getStopWordsString()); + var similarity = getSimilarity(); + if (similarity instanceof TFIDFSimilarity) { + mlt.setSimilarity((TFIDFSimilarity) similarity); + } else { + logger.trace("Using an unsupported similarity algorithm for MoreLikeThis:" + + " {}. You must use a similarity instance based on TFIDFSimilarity!", similarity); + } - // Get the reference doc and apply it to MoreLikeThis, to generate the query - //noinspection BlockingMethodInNonBlockingContext - var mltQuery = mlt.like((Map) mltDocumentFields); - Query luceneQuery; - if (luceneAdditionalQuery != null) { - luceneQuery = new BooleanQuery.Builder() - .add(mltQuery, Occur.MUST) - .add(new ConstantScoreQuery(luceneAdditionalQuery), Occur.MUST) - .build(); - } else { - luceneQuery = mltQuery; - } + // Get the reference doc and apply it to MoreLikeThis, to generate the query + //noinspection BlockingMethodInNonBlockingContext + var mltQuery = mlt.like((Map) mltDocumentFields); + Query luceneQuery; + if (luceneAdditionalQuery != null) { + luceneQuery = new BooleanQuery.Builder() + .add(mltQuery, Occur.MUST) + .add(new ConstantScoreQuery(luceneAdditionalQuery), Occur.MUST) + .build(); + } else { + luceneQuery = mltQuery; + } - return luceneQuery; - }) - .subscribeOn(luceneQueryScheduler) - .map(luceneQuery -> luceneSearch(doDistributedPre, - indexSearcher, - queryParams.getOffset(), - queryParams.getLimit(), - queryParams.getMinCompetitiveScore().getNullable(), - keyFieldName, - scoreDivisor, - luceneQuery, - QueryParser.toSort(queryParams.getSort()), - QueryParser.toScoreMode(queryParams.getScoreMode()) - )) - .materialize() - .flatMap(signal -> { - if (signal.isOnComplete() || signal.isOnError()) { - return releaseSearcherWrapper(snapshot, indexSearcher).thenReturn(signal); - } else { - return Mono.just(signal); - } - }).dematerialize()); + return luceneQuery; + }) + .subscribeOn(luceneQueryScheduler) + .map(luceneQuery -> luceneSearch(doDistributedPre, + indexSearcher, + queryParams.getOffset(), + queryParams.getLimit(), + queryParams.getMinCompetitiveScore().getNullable(), + keyFieldName, + scoreDivisor, + luceneQuery, + QueryParser.toSort(queryParams.getSort()), + QueryParser.toScoreMode(queryParams.getScoreMode()), + releaseSearcherWrapper(snapshot, indexSearcher) + )) + .onErrorResume(ex -> releaseSearcherWrapper(snapshot, indexSearcher).then(Mono.error(ex))) + ); }); } @@ -514,7 +510,8 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { private Mono search(@Nullable LLSnapshot snapshot, QueryParams queryParams, String keyFieldName, boolean doDistributedPre, long actionId, int scoreDivisor) { - return acquireSearcherWrapper(snapshot, doDistributedPre, actionId) + return this + .acquireSearcherWrapper(snapshot, doDistributedPre, actionId) .flatMap(indexSearcher -> Mono .fromCallable(() -> { Objects.requireNonNull(queryParams.getScoreMode(), "ScoreMode must not be null"); @@ -524,7 +521,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { return Tuples.of(luceneQuery, Optional.ofNullable(luceneSort), luceneScoreMode); }) .subscribeOn(luceneQueryScheduler) - .flatMap(tuple -> Mono + .flatMap(tuple -> Mono .fromSupplier(() -> { Query luceneQuery = tuple.getT1(); Sort luceneSort = tuple.getT2().orElse(null); @@ -539,22 +536,18 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { scoreDivisor, luceneQuery, luceneSort, - luceneScoreMode + luceneScoreMode, + releaseSearcherWrapper(snapshot, indexSearcher) ); }) + .onErrorResume(ex -> releaseSearcherWrapper(snapshot, indexSearcher).then(Mono.error(ex))) ) - .materialize() - .flatMap(signal -> { - if (signal.isOnComplete() || signal.isOnError()) { - return releaseSearcherWrapper(snapshot, indexSearcher).thenReturn(signal); - } else { - return Mono.just(signal); - } - }) - .dematerialize() ); } + /** + * This method always returns 1 shard! Not zero, not more than one. + */ private LLSearchResult luceneSearch(boolean doDistributedPre, IndexSearcher indexSearcher, long offset, @@ -564,7 +557,8 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { int scoreDivisor, Query luceneQuery, Sort luceneSort, - ScoreMode luceneScoreMode) { + ScoreMode luceneScoreMode, + Mono successCleanup) { return new LLSearchResult(Mono.create(monoSink -> { LuceneSearchInstance luceneSearchInstance; @@ -573,7 +567,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { if (doDistributedPre) { //noinspection BlockingMethodInNonBlockingContext allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery); - monoSink.success(new LLSearchResultShard(Flux.empty(), 0)); + monoSink.success(new LLSearchResultShard(successCleanup.thenMany(Flux.empty()), 0)); return; } else { int boundedOffset = Math.max(0, offset > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) offset); @@ -642,7 +636,12 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { }, OverflowStrategy.ERROR).subscribeOn(blockingLuceneSearchScheduler).publishOn(luceneQueryScheduler); - monoSink.success(new LLSearchResultShard(resultsFlux, totalHitsCount)); + monoSink.success(new LLSearchResultShard(Flux + .usingWhen( + Mono.just(true), + b -> resultsFlux, + b -> successCleanup), + totalHitsCount)); }).subscribeOn(blockingLuceneSearchScheduler).publishOn(luceneQueryScheduler).flux()); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index 83e5c90..5c7faed 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -25,6 +25,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import lombok.Value; import org.apache.lucene.search.CollectionStatistics; import org.apache.lucene.search.IndexSearcher; import org.jetbrains.annotations.Nullable; @@ -215,13 +216,10 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { if (queryParams.getOffset() != 0) { return Mono.error(new IllegalArgumentException("MultiLuceneIndex requires an offset equal to 0")); } - long actionId; - int scoreDivisor; Flux>> mltDocumentFieldsShared; - Mono distributedPre; + Mono distributedPre; if (luceneIndices.length > 1) { - actionId = newAction(); - scoreDivisor = 20; + long actionId = newAction(); mltDocumentFieldsShared = mltDocumentFields.publish().refCount(); distributedPre = Flux .fromArray(luceneIndices) @@ -239,16 +237,15 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { actionId ) ) - .then(); + .then(Mono.just(new DistributedSearch(actionId, 20))); } else { - actionId = -1; - scoreDivisor = 1; mltDocumentFieldsShared = mltDocumentFields; - distributedPre = Mono.empty(); + distributedPre = Mono.just(new DistributedSearch(-1, 1)); } //noinspection DuplicatedCode - return distributedPre.then(Flux + return distributedPre + .flatMap(distributedSearch -> Flux .fromArray(luceneIndices) .index() .flatMap(tuple -> Mono @@ -260,45 +257,55 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { queryParams, keyFieldName, mltDocumentFieldsShared, - actionId, - scoreDivisor + distributedSearch.getActionId(), + distributedSearch.getScoreDivisor() ) ) .reduce(LLSearchResult.accumulator()) .map(result -> { - if (actionId != -1) { + if (distributedSearch.getActionId() != -1) { Flux resultsWithTermination = result .getResults() - .map(flux -> new LLSearchResultShard(flux - .getResults() - .doOnTerminate(() -> completedAction(actionId)), flux.getTotalHitsCount()) + .map(flux -> new LLSearchResultShard(Flux + .using( + distributedSearch::getActionId, + actionId -> flux.getResults(), + this::completedAction + ), flux.getTotalHitsCount()) ); return new LLSearchResult(resultsWithTermination); } else { return result; } }) + .doOnError(ex -> { + if (distributedSearch.getActionId() != -1) { + completedAction(distributedSearch.getActionId()); + } + }) ); } + @Value + private static class DistributedSearch { + long actionId; + int scoreDivisor; + } + @Override public Mono search(@Nullable LLSnapshot snapshot, QueryParams queryParams, String keyFieldName) { - long actionId; - int scoreDivisor; - Mono distributedPre; if (queryParams.getOffset() != 0) { return Mono.error(new IllegalArgumentException("MultiLuceneIndex requires an offset equal to 0")); } + + Mono distributedSearchMono; if (luceneIndices.length <= 1 || !queryParams.getScoreMode().getComputeScores()) { - actionId = -1; - scoreDivisor = 1; - distributedPre = Mono.empty(); + distributedSearchMono = Mono.just(new DistributedSearch(-1, 1)); } else { - actionId = newAction(); - scoreDivisor = 20; - distributedPre = Flux + var actionId = newAction(); + distributedSearchMono = Flux .fromArray(luceneIndices) .index() .flatMap(tuple -> Mono @@ -313,11 +320,11 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { actionId ) ) - .then(); + .then(Mono.just(new DistributedSearch(actionId, 20))); } - //noinspection DuplicatedCode - return distributedPre - .then(Flux + + return distributedSearchMono + .flatMap(distributedSearch -> Flux .fromArray(luceneIndices) .index() .flatMap(tuple -> Mono @@ -329,23 +336,31 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { .distributedSearch(tuple.getT2().orElse(null), queryParams, keyFieldName, - actionId, - scoreDivisor + distributedSearch.getActionId(), + distributedSearch.getScoreDivisor() )) .reduce(LLSearchResult.accumulator()) .map(result -> { - if (actionId != -1) { + if (distributedSearch.getActionId() != -1) { Flux resultsWithTermination = result .getResults() - .map(flux -> new LLSearchResultShard(flux - .getResults() - .doOnTerminate(() -> completedAction(actionId)), flux.getTotalHitsCount()) + .map(flux -> new LLSearchResultShard(Flux + .using( + distributedSearch::getActionId, + actionId -> flux.getResults(), + this::completedAction + ), flux.getTotalHitsCount()) ); return new LLSearchResult(resultsWithTermination); } else { return result; } }) + .doOnError(ex -> { + if (distributedSearch.getActionId() != -1) { + completedAction(distributedSearch.getActionId()); + } + }) ); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleStreamSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleStreamSearcher.java index 34d17a3..195d6d0 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleStreamSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleStreamSearcher.java @@ -32,8 +32,8 @@ public class SimpleStreamSearcher implements LuceneStreamSearcher { scoreMode != ScoreMode.COMPLETE_NO_SCORES, 1000 ); - var firstTopDocs = searcher.getTopDocs(0, 1); - long totalHitsCount = firstTopDocs.totalHits.value; + var topDocs = searcher.getTopDocs(); + long totalHitsCount = topDocs.totalHits.value; return new LuceneSearchInstance() { @Override public long getTotalHitsCount() { @@ -42,14 +42,8 @@ public class SimpleStreamSearcher implements LuceneStreamSearcher { @Override public void getResults(ResultItemConsumer resultsConsumer) throws IOException { - if (firstTopDocs.scoreDocs.length > 0) { - { - var hit = firstTopDocs.scoreDocs[0]; - if (publishHit(hit, resultsConsumer) == HandleResult.HALT) { - return; - } - } - ObjectArrayList hits = ObjectArrayList.wrap(searcher.getTopDocs(offset, limit - 1).scoreDocs); + if (topDocs.scoreDocs.length > 0) { + ObjectArrayList hits = ObjectArrayList.wrap(topDocs.scoreDocs); for (ScoreDoc hit : hits) { if (publishHit(hit, resultsConsumer) == HandleResult.HALT) { return; diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/TopDocsSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/TopDocsSearcher.java index 2627f21..edd01e8 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/TopDocsSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/TopDocsSearcher.java @@ -35,6 +35,20 @@ class TopDocsSearcher { indexSearcher.search(query, collector); } + /** + * This method must not be called more than once! + */ + public TopDocs getTopDocs() throws IOException { + TopDocs topDocs = collector.topDocs(); + if (doDocScores) { + TopFieldCollector.populateScores(topDocs.scoreDocs, indexSearcher, query); + } + return topDocs; + } + + /** + * This method must not be called more than once! + */ public TopDocs getTopDocs(int offset, int length) throws IOException { TopDocs topDocs = collector.topDocs(offset, length); if (doDocScores) {