Update LLLocalLuceneIndex.java

This commit is contained in:
Andrea Cavalli 2021-02-25 00:00:16 +01:00
parent c3be27c15b
commit cbcc4df690

View File

@ -19,7 +19,6 @@ import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity;
import it.cavallium.dbengine.lucene.searcher.AdaptiveStreamSearcher; import it.cavallium.dbengine.lucene.searcher.AdaptiveStreamSearcher;
import it.cavallium.dbengine.lucene.searcher.AllowOnlyQueryParsingCollectorStreamSearcher; import it.cavallium.dbengine.lucene.searcher.AllowOnlyQueryParsingCollectorStreamSearcher;
import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher; import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher;
import it.cavallium.dbengine.lucene.searcher.PagedStreamSearcher;
import it.cavallium.dbengine.lucene.serializer.QueryParser; import it.cavallium.dbengine.lucene.serializer.QueryParser;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Path; import java.nio.file.Path;
@ -50,6 +49,8 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.FSDirectory;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux; import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
@ -66,6 +67,7 @@ import reactor.util.function.Tuples;
public class LLLocalLuceneIndex implements LLLuceneIndex { public class LLLocalLuceneIndex implements LLLuceneIndex {
protected static final Logger logger = LoggerFactory.getLogger(LLLocalLuceneIndex.class);
private static final LuceneStreamSearcher streamSearcher = new AdaptiveStreamSearcher(); private static final LuceneStreamSearcher streamSearcher = new AdaptiveStreamSearcher();
private static final AllowOnlyQueryParsingCollectorStreamSearcher allowOnlyQueryParsingCollectorStreamSearcher = new AllowOnlyQueryParsingCollectorStreamSearcher(); private static final AllowOnlyQueryParsingCollectorStreamSearcher allowOnlyQueryParsingCollectorStreamSearcher = new AllowOnlyQueryParsingCollectorStreamSearcher();
/** /**
@ -508,8 +510,9 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
Many<LLKeyScore> topKeysSink = Sinks Many<LLKeyScore> topKeysSink = Sinks
.many() .many()
.unicast() .unicast()
.onBackpressureBuffer(new ArrayBlockingQueue<>(PagedStreamSearcher.MAX_ITEMS_PER_PAGE)); .onBackpressureBuffer();
Schedulers.boundedElastic().schedule(() -> {
var searchFlux = Flux.<Void>push(sink -> {
try { try {
if (doDistributedPre) { if (doDistributedPre) {
allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery); allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery);
@ -525,26 +528,50 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
keyFieldName, keyFieldName,
keyScore -> { keyScore -> {
EmitResult result = topKeysSink.tryEmitNext(fixKeyScore(keyScore, scoreDivisor)); EmitResult result = topKeysSink.tryEmitNext(fixKeyScore(keyScore, scoreDivisor));
if (result.isFailure() && result != EmitResult.FAIL_CANCELLED && result != EmitResult.FAIL_ZERO_SUBSCRIBER) { if (result.isFailure()) {
throw new EmissionException(result); if (result == EmitResult.FAIL_CANCELLED) {
logger.debug("Fail to emit next value: cancelled");
} else if (result == EmitResult.FAIL_TERMINATED) {
logger.debug("Fail to emit next value: terminated");
} else if (result == EmitResult.FAIL_ZERO_SUBSCRIBER) {
logger.error("Fail to emit next value: zero subscriber. You must subscribe to results before total hits if you specified a limit > 0!");
sink.error(new EmissionException(result));
throw new EmissionException(result);
} else {
throw new EmissionException(result);
}
} }
}, },
totalHitsCount -> { totalHitsCount -> {
EmitResult result = totalHitsCountSink.tryEmitValue(totalHitsCount); EmitResult result = totalHitsCountSink.tryEmitValue(totalHitsCount);
if (result.isFailure() && result != EmitResult.FAIL_CANCELLED && result != EmitResult.FAIL_ZERO_SUBSCRIBER) { if (result.isFailure()) {
throw new EmissionException(result); if (result == EmitResult.FAIL_CANCELLED) {
logger.debug("Fail to emit total hits count: cancelled");
} else if (result == EmitResult.FAIL_TERMINATED) {
logger.debug("Fail to emit total hits count: terminated");
} else if (result == EmitResult.FAIL_ZERO_SUBSCRIBER) {
logger.debug("Fail to emit total hits count: zero subscriber");
} else {
sink.error(new EmissionException(result));
throw new EmissionException(result);
}
} }
} }
); );
} }
topKeysSink.tryEmitComplete(); topKeysSink.tryEmitComplete();
sink.complete();
} catch (IOException e) { } catch (IOException e) {
topKeysSink.tryEmitError(e); topKeysSink.tryEmitError(e);
totalHitsCountSink.tryEmitError(e); totalHitsCountSink.tryEmitError(e);
sink.error(e);
} }
}); }).share();
return new LLSearchResult(totalHitsCountSink.asMono(), Flux.just(topKeysSink.asFlux())); return new LLSearchResult(
Mono.<Long>firstWithValue(searchFlux.then(Mono.empty()), totalHitsCountSink.asMono()),
Flux.<Flux<LLKeyScore>>merge(searchFlux.then(Mono.empty()), Flux.just(topKeysSink.asFlux()))
);
}).subscribeOn(luceneBlockingScheduler) }).subscribeOn(luceneBlockingScheduler)
) )
.materialize() .materialize()