Fix deadlock

This commit is contained in:
Andrea Cavalli 2021-03-06 17:28:33 +01:00
parent 08434d475c
commit 99f3686eab

View File

@ -89,11 +89,11 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
"lucene-" + name,
60
);
private static final Supplier<Scheduler> lowMemorySchedulerSupplier = Suppliers.memoize(() ->
private final Supplier<Scheduler> lowMemorySchedulerSupplier = Suppliers.memoize(() ->
Schedulers.newBoundedElastic(1, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "lucene-low-memory", Integer.MAX_VALUE))::get;
private static final Supplier<Scheduler> querySchedulerSupplier = Suppliers.memoize(() -> boundedSchedulerSupplier.apply("query"))::get;
private static final Supplier<Scheduler> blockingSchedulerSupplier = Suppliers.memoize(() -> boundedSchedulerSupplier.apply("blocking"))::get;
private static final Supplier<Scheduler> blockingLuceneSearchSchedulerSupplier = Suppliers.memoize(() -> boundedSchedulerSupplier.apply("search-blocking"))::get;
private final Supplier<Scheduler> querySchedulerSupplier = Suppliers.memoize(() -> boundedSchedulerSupplier.apply("query"))::get;
private final Supplier<Scheduler> blockingSchedulerSupplier = Suppliers.memoize(() -> boundedSchedulerSupplier.apply("blocking"))::get;
private final Supplier<Scheduler> blockingLuceneSearchSchedulerSupplier = Suppliers.memoize(() -> boundedSchedulerSupplier.apply("search-blocking"))::get;
/**
* Lucene query scheduler.
*/
@ -475,7 +475,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
} else {
return Mono.just(signal);
}
}).<LLSearchResult>dematerialize());
}).dematerialize());
});
}
@ -537,7 +537,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
return Mono.just(signal);
}
})
.<LLSearchResult>dematerialize()
.dematerialize()
);
}
@ -568,73 +568,69 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
});
try {
blockingLuceneSearchScheduler.schedule(() -> {
try {
if (!cancelled.get()) {
if (doDistributedPre) {
allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery);
sink.next(new LLTotalHitsCount(0L));
} else {
int boundedLimit = Math.max(0, limit > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) limit);
streamSearcher.search(indexSearcher,
luceneQuery,
boundedLimit,
luceneSort,
luceneScoreMode,
minCompetitiveScore,
keyFieldName,
keyScore -> {
try {
if (cancelled.get()) {
return HandleResult.HALT;
}
while (requests.decrementAndGet() < 0) {
requests.incrementAndGet();
requestsAvailable.acquire();
if (cancelled.get()) {
return HandleResult.HALT;
}
}
sink.next(fixKeyScore(keyScore, scoreDivisor));
return HandleResult.CONTINUE;
} catch (Exception ex) {
sink.error(ex);
cancelled.set(true);
requestsAvailable.release();
if (!cancelled.get()) {
if (doDistributedPre) {
//noinspection BlockingMethodInNonBlockingContext
allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery);
sink.next(new LLTotalHitsCount(0L));
} else {
int boundedLimit = Math.max(0, limit > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) limit);
//noinspection BlockingMethodInNonBlockingContext
streamSearcher.search(indexSearcher,
luceneQuery,
boundedLimit,
luceneSort,
luceneScoreMode,
minCompetitiveScore,
keyFieldName,
keyScore -> {
try {
if (cancelled.get()) {
return HandleResult.HALT;
}
while (requests.decrementAndGet() < 0) {
requests.incrementAndGet();
requestsAvailable.acquire();
if (cancelled.get()) {
return HandleResult.HALT;
}
},
totalHitsCount -> {
try {
if (cancelled.get()) {
return;
}
while (requests.decrementAndGet() < 0) {
requests.incrementAndGet();
requestsAvailable.acquire();
if (cancelled.get()) {
return;
}
}
sink.next(new LLTotalHitsCount(totalHitsCount));
} catch (Exception ex) {
sink.error(ex);
cancelled.set(true);
requestsAvailable.release();
}
sink.next(fixKeyScore(keyScore, scoreDivisor));
return HandleResult.CONTINUE;
} catch (Exception ex) {
sink.error(ex);
cancelled.set(true);
requestsAvailable.release();
return HandleResult.HALT;
}
},
totalHitsCount -> {
try {
if (cancelled.get()) {
return;
}
while (requests.decrementAndGet() < 0) {
requests.incrementAndGet();
requestsAvailable.acquire();
if (cancelled.get()) {
return;
}
}
);
}
sink.complete();
}
} catch (Exception ex) {
sink.error(ex);
sink.next(new LLTotalHitsCount(totalHitsCount));
} catch (Exception ex) {
sink.error(ex);
cancelled.set(true);
requestsAvailable.release();
}
}
);
}
});
sink.complete();
}
} catch (Exception ex) {
sink.error(ex);
}
}, OverflowStrategy.BUFFER).subscribeOn(luceneQueryScheduler))));
}, OverflowStrategy.ERROR).subscribeOn(blockingLuceneSearchScheduler).publishOn(luceneQueryScheduler))));
}
@Override