Use better overflow strategy

This commit is contained in:
Andrea Cavalli 2021-03-03 21:32:45 +01:00
parent b71f3dceed
commit e3fcf7f74f

View File

@ -59,6 +59,7 @@ import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink.OverflowStrategy;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
@ -563,62 +564,74 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
requestsAvailable.release();
});
luceneQueryScheduler.schedule(() -> {
try {
requestsAvailable.acquire();
if (!cancelled.get()) {
if (doDistributedPre) {
allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery);
sink.next(new LLTotalHitsCount(0L));
} else {
int boundedLimit = Math.max(0, limit > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) limit);
streamSearcher.search(indexSearcher,
luceneQuery,
boundedLimit,
luceneSort,
luceneScoreMode,
minCompetitiveScore,
keyFieldName,
keyScore -> {
try {
if (requests.get() <= 0 && !cancelled.get()) {
requestsAvailable.acquire();
}
if (cancelled.get()) {
try {
luceneQueryScheduler.schedule(() -> {
try {
if (!cancelled.get()) {
if (doDistributedPre) {
allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery);
sink.next(new LLTotalHitsCount(0L));
} else {
int boundedLimit = Math.max(0, limit > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) limit);
streamSearcher.search(indexSearcher,
luceneQuery,
boundedLimit,
luceneSort,
luceneScoreMode,
minCompetitiveScore,
keyFieldName,
keyScore -> {
try {
if (cancelled.get()) {
return HandleResult.HALT;
}
while (requests.get() <= 0) {
requestsAvailable.acquire();
if (cancelled.get()) {
return HandleResult.HALT;
}
}
requests.decrementAndGet();
sink.next(fixKeyScore(keyScore, scoreDivisor));
return HandleResult.CONTINUE;
} catch (Exception ex) {
sink.error(ex);
cancelled.set(true);
requestsAvailable.release();
return HandleResult.HALT;
}
requests.updateAndGet(n -> n > 0 ? n - 1 : 0);
sink.next(fixKeyScore(keyScore, scoreDivisor));
return HandleResult.CONTINUE;
} catch (Exception ex) {
sink.error(ex);
cancelled.set(true);
requestsAvailable.release();
return HandleResult.HALT;
}
},
totalHitsCount -> {
try {
if (requests.get() <= 0 && !cancelled.get()) {
requestsAvailable.acquire();
},
totalHitsCount -> {
try {
if (cancelled.get()) {
return;
}
while (requests.get() <= 0) {
requestsAvailable.acquire();
if (cancelled.get()) {
return;
}
}
requests.decrementAndGet();
sink.next(new LLTotalHitsCount(totalHitsCount));
} catch (Exception ex) {
sink.error(ex);
cancelled.set(true);
requestsAvailable.release();
}
requests.updateAndGet(n -> n > 0 ? n - 1 : 0);
sink.next(new LLTotalHitsCount(totalHitsCount));
} catch (Exception ex) {
sink.error(ex);
cancelled.set(true);
requestsAvailable.release();
}
}
);
);
}
sink.complete();
}
sink.complete();
} catch (Exception ex) {
sink.error(ex);
}
} catch (Exception ex) {
sink.error(ex);
}
});
}).subscribeOn(Schedulers.boundedElastic()))));
});
} catch (Exception ex) {
sink.error(ex);
}
}, OverflowStrategy.ERROR).subscribeOn(Schedulers.boundedElastic()))));
}
@Override