This commit is contained in:
Andrea Cavalli 2021-03-03 20:00:58 +01:00
parent a06d448182
commit b71f3dceed
2 changed files with 68 additions and 68 deletions

View File

@ -451,7 +451,10 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
luceneQuery = mltQuery; luceneQuery = mltQuery;
} }
return luceneSearch(doDistributedPre, return luceneQuery;
})
.subscribeOn(luceneQueryScheduler)
.map(luceneQuery -> luceneSearch(doDistributedPre,
indexSearcher, indexSearcher,
queryParams.getLimit(), queryParams.getLimit(),
queryParams.getMinCompetitiveScore().getNullable(), queryParams.getMinCompetitiveScore().getNullable(),
@ -460,9 +463,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
luceneQuery, luceneQuery,
QueryParser.toSort(queryParams.getSort()), QueryParser.toSort(queryParams.getSort()),
QueryParser.toScoreMode(queryParams.getScoreMode()) QueryParser.toScoreMode(queryParams.getScoreMode())
); ))
})
.subscribeOn(luceneQueryScheduler)
.materialize() .materialize()
.flatMap(signal -> { .flatMap(signal -> {
if (signal.isOnComplete() || signal.isOnError()) { if (signal.isOnComplete() || signal.isOnError()) {
@ -507,7 +508,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
}) })
.subscribeOn(luceneQueryScheduler) .subscribeOn(luceneQueryScheduler)
.flatMap(tuple -> Mono .flatMap(tuple -> Mono
.fromCallable(() -> { .fromSupplier(() -> {
Query luceneQuery = tuple.getT1(); Query luceneQuery = tuple.getT1();
Sort luceneSort = tuple.getT2().orElse(null); Sort luceneSort = tuple.getT2().orElse(null);
ScoreMode luceneScoreMode = tuple.getT3(); ScoreMode luceneScoreMode = tuple.getT3();
@ -522,7 +523,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
luceneSort, luceneSort,
luceneScoreMode luceneScoreMode
); );
}).subscribeOn(luceneQueryScheduler) })
) )
.materialize() .materialize()
.flatMap(signal -> { .flatMap(signal -> {
@ -545,7 +546,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
Query luceneQuery, Query luceneQuery,
Sort luceneSort, Sort luceneSort,
ScoreMode luceneScoreMode) { ScoreMode luceneScoreMode) {
var searchFlux = Flux.<LLSignal>create(sink -> { return new LLSearchResult(Flux.just(Flux.defer(() -> Flux.<LLSignal>create(sink -> {
AtomicBoolean cancelled = new AtomicBoolean(); AtomicBoolean cancelled = new AtomicBoolean();
AtomicLong requests = new AtomicLong(); AtomicLong requests = new AtomicLong();
Semaphore requestsAvailable = new Semaphore(0); Semaphore requestsAvailable = new Semaphore(0);
@ -553,23 +554,24 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
cancelled.set(true); cancelled.set(true);
requestsAvailable.release(); requestsAvailable.release();
}); });
sink.onCancel(() -> {
cancelled.set(true);
requestsAvailable.release();
});
sink.onRequest(delta -> { sink.onRequest(delta -> {
requests.addAndGet(delta); requests.addAndGet(delta);
requestsAvailable.release(); requestsAvailable.release();
}); });
luceneQueryScheduler.schedule(() -> {
try { try {
//noinspection BlockingMethodInNonBlockingContext
requestsAvailable.acquire(); requestsAvailable.acquire();
requestsAvailable.release();
if (!cancelled.get()) { if (!cancelled.get()) {
if (doDistributedPre) { if (doDistributedPre) {
//noinspection BlockingMethodInNonBlockingContext
allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery); allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery);
sink.next(new LLTotalHitsCount(0L)); sink.next(new LLTotalHitsCount(0L));
} else { } else {
int boundedLimit = Math.max(0, limit > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) limit); int boundedLimit = Math.max(0, limit > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) limit);
//noinspection BlockingMethodInNonBlockingContext
streamSearcher.search(indexSearcher, streamSearcher.search(indexSearcher,
luceneQuery, luceneQuery,
boundedLimit, boundedLimit,
@ -579,48 +581,44 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
keyFieldName, keyFieldName,
keyScore -> { keyScore -> {
try { try {
while (requests.get() <= 0 && !cancelled.get()) { if (requests.get() <= 0 && !cancelled.get()) {
requestsAvailable.acquire(); requestsAvailable.acquire();
} }
if (!cancelled.get()) { if (cancelled.get()) {
requests.decrementAndGet();
sink.next(fixKeyScore(keyScore, scoreDivisor));
return HandleResult.CONTINUE;
} else {
return HandleResult.HALT; return HandleResult.HALT;
} }
requests.updateAndGet(n -> n > 0 ? n - 1 : 0);
sink.next(fixKeyScore(keyScore, scoreDivisor));
return HandleResult.CONTINUE;
} catch (Exception ex) { } catch (Exception ex) {
sink.error(ex); sink.error(ex);
cancelled.set(true); cancelled.set(true);
requestsAvailable.release();
return HandleResult.HALT; return HandleResult.HALT;
} }
}, },
totalHitsCount -> { totalHitsCount -> {
try { try {
while (requests.get() <= 0 && !cancelled.get()) { if (requests.get() <= 0 && !cancelled.get()) {
requestsAvailable.acquire(); requestsAvailable.acquire();
} }
if (!cancelled.get()) { requests.updateAndGet(n -> n > 0 ? n - 1 : 0);
requests.decrementAndGet();
sink.next(new LLTotalHitsCount(totalHitsCount)); sink.next(new LLTotalHitsCount(totalHitsCount));
}
} catch (Exception ex) { } catch (Exception ex) {
sink.error(ex); sink.error(ex);
cancelled.set(true); cancelled.set(true);
requestsAvailable.release();
} }
} }
); );
} }
if (!cancelled.get()) {
sink.complete(); sink.complete();
} }
}
} catch (Exception ex) { } catch (Exception ex) {
sink.error(ex); sink.error(ex);
} }
}).subscribeOn(luceneQueryScheduler); });
}).subscribeOn(Schedulers.boundedElastic()))));
return new LLSearchResult(Flux.just(searchFlux));
} }
@Override @Override

View File

@ -238,13 +238,14 @@ public class LuceneUtils {
.map(LuceneSignal::getTotalHitsCount) .map(LuceneSignal::getTotalHitsCount)
.reduce(Long::sum) .reduce(Long::sum)
.map(sum -> LuceneSignal.totalHitsCount(sum)); .map(sum -> LuceneSignal.totalHitsCount(sum));
return sortedValues.mergeWith(sortedTotalSize); return Flux.merge(sortedValues, sortedTotalSize);
} }
public static Flux<LLSignal> mergeSignalStreamRaw(Flux<Flux<LLSignal>> mappedKeys, public static Flux<LLSignal> mergeSignalStreamRaw(Flux<Flux<LLSignal>> mappedKeys,
MultiSort<LLSignal> mappedSort, MultiSort<LLSignal> mappedSort,
Long limit) { Long limit) {
Flux<Flux<LLSignal>> sharedMappedSignals = mappedKeys.publish().refCount(2); Flux<Flux<LLSignal>> sharedMappedSignals = mappedKeys.publish().refCount(2);
Flux<LLSignal> sortedValues = LuceneUtils Flux<LLSignal> sortedValues = LuceneUtils
.mergeStream(sharedMappedSignals.map(sub -> sub.filter(LLSignal::isValue)), mappedSort, limit); .mergeStream(sharedMappedSignals.map(sub -> sub.filter(LLSignal::isValue)), mappedSort, limit);
//noinspection Convert2MethodRef //noinspection Convert2MethodRef
@ -254,6 +255,7 @@ public class LuceneUtils {
.map(LLSignal::getTotalHitsCount) .map(LLSignal::getTotalHitsCount)
.reduce(Long::sum) .reduce(Long::sum)
.map(sum -> new LLTotalHitsCount(sum)); .map(sum -> new LLTotalHitsCount(sum));
return sortedValues.mergeWith(sortedTotalSize);
return Flux.merge(sortedValues, sortedTotalSize);
} }
} }