diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index 8dee63d..cc28cbc 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -89,11 +89,11 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { "lucene-" + name, 60 ); - private static final Supplier lowMemorySchedulerSupplier = Suppliers.memoize(() -> + private final Supplier lowMemorySchedulerSupplier = Suppliers.memoize(() -> Schedulers.newBoundedElastic(1, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "lucene-low-memory", Integer.MAX_VALUE))::get; - private static final Supplier querySchedulerSupplier = Suppliers.memoize(() -> boundedSchedulerSupplier.apply("query"))::get; - private static final Supplier blockingSchedulerSupplier = Suppliers.memoize(() -> boundedSchedulerSupplier.apply("blocking"))::get; - private static final Supplier blockingLuceneSearchSchedulerSupplier = Suppliers.memoize(() -> boundedSchedulerSupplier.apply("search-blocking"))::get; + private final Supplier querySchedulerSupplier = Suppliers.memoize(() -> boundedSchedulerSupplier.apply("query"))::get; + private final Supplier blockingSchedulerSupplier = Suppliers.memoize(() -> boundedSchedulerSupplier.apply("blocking"))::get; + private final Supplier blockingLuceneSearchSchedulerSupplier = Suppliers.memoize(() -> boundedSchedulerSupplier.apply("search-blocking"))::get; /** * Lucene query scheduler. */ @@ -475,7 +475,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } else { return Mono.just(signal); } - }).dematerialize()); + }).dematerialize()); }); } @@ -537,7 +537,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { return Mono.just(signal); } }) - .dematerialize() + .dematerialize() ); } @@ -568,73 +568,69 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { }); try { - blockingLuceneSearchScheduler.schedule(() -> { - try { - if (!cancelled.get()) { - if (doDistributedPre) { - allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery); - sink.next(new LLTotalHitsCount(0L)); - } else { - int boundedLimit = Math.max(0, limit > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) limit); - streamSearcher.search(indexSearcher, - luceneQuery, - boundedLimit, - luceneSort, - luceneScoreMode, - minCompetitiveScore, - keyFieldName, - keyScore -> { - try { - if (cancelled.get()) { - return HandleResult.HALT; - } - while (requests.decrementAndGet() < 0) { - requests.incrementAndGet(); - requestsAvailable.acquire(); - if (cancelled.get()) { - return HandleResult.HALT; - } - } - sink.next(fixKeyScore(keyScore, scoreDivisor)); - return HandleResult.CONTINUE; - } catch (Exception ex) { - sink.error(ex); - cancelled.set(true); - requestsAvailable.release(); + if (!cancelled.get()) { + if (doDistributedPre) { + //noinspection BlockingMethodInNonBlockingContext + allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery); + sink.next(new LLTotalHitsCount(0L)); + } else { + int boundedLimit = Math.max(0, limit > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) limit); + //noinspection BlockingMethodInNonBlockingContext + streamSearcher.search(indexSearcher, + luceneQuery, + boundedLimit, + luceneSort, + luceneScoreMode, + minCompetitiveScore, + keyFieldName, + keyScore -> { + try { + if (cancelled.get()) { + return HandleResult.HALT; + } + while (requests.decrementAndGet() < 0) { + requests.incrementAndGet(); + requestsAvailable.acquire(); + if (cancelled.get()) { return HandleResult.HALT; } - }, - totalHitsCount -> { - try { - if (cancelled.get()) { - return; - } - while (requests.decrementAndGet() < 0) { - requests.incrementAndGet(); - requestsAvailable.acquire(); - if (cancelled.get()) { - return; - } - } - sink.next(new LLTotalHitsCount(totalHitsCount)); - } catch (Exception ex) { - sink.error(ex); - cancelled.set(true); - requestsAvailable.release(); + } + sink.next(fixKeyScore(keyScore, scoreDivisor)); + return HandleResult.CONTINUE; + } catch (Exception ex) { + sink.error(ex); + cancelled.set(true); + requestsAvailable.release(); + return HandleResult.HALT; + } + }, + totalHitsCount -> { + try { + if (cancelled.get()) { + return; + } + while (requests.decrementAndGet() < 0) { + requests.incrementAndGet(); + requestsAvailable.acquire(); + if (cancelled.get()) { + return; } } - ); - } - sink.complete(); - } - } catch (Exception ex) { - sink.error(ex); + sink.next(new LLTotalHitsCount(totalHitsCount)); + } catch (Exception ex) { + sink.error(ex); + cancelled.set(true); + requestsAvailable.release(); + } + } + ); } - }); + sink.complete(); + } } catch (Exception ex) { sink.error(ex); } - }, OverflowStrategy.BUFFER).subscribeOn(luceneQueryScheduler)))); + }, OverflowStrategy.ERROR).subscribeOn(blockingLuceneSearchScheduler).publishOn(luceneQueryScheduler)))); } @Override