Fix possible deadlock

This commit is contained in:
Andrea Cavalli 2021-09-07 02:08:29 +02:00
parent 936c07406e
commit f5d3474966
2 changed files with 10 additions and 2 deletions

View File

@ -178,6 +178,9 @@ public class LLMemoryDictionary implements LLDictionary {
boolean existsAlmostCertainly) { boolean existsAlmostCertainly) {
return Mono.usingWhen(keyMono, return Mono.usingWhen(keyMono,
key -> Mono.fromCallable(() -> { key -> Mono.fromCallable(() -> {
if (updateMode == UpdateMode.DISALLOW) {
throw new UnsupportedOperationException("update() is disallowed");
}
AtomicReference<Send<Buffer>> oldRef = new AtomicReference<>(null); AtomicReference<Send<Buffer>> oldRef = new AtomicReference<>(null);
var newValue = mainDb.compute(k(key), (_unused, old) -> { var newValue = mainDb.compute(k(key), (_unused, old) -> {
if (old != null) { if (old != null) {

View File

@ -25,9 +25,14 @@ import reactor.core.publisher.Sinks.EmitResult;
import reactor.core.publisher.Sinks.Many; import reactor.core.publisher.Sinks.Many;
import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
import reactor.util.concurrent.Queues;
public class UnscoredUnsortedContinuousLuceneMultiSearcher implements LuceneMultiSearcher { public class UnscoredUnsortedContinuousLuceneMultiSearcher implements LuceneMultiSearcher {
private static final Scheduler UNSCORED_UNSORTED_EXECUTOR = Schedulers.newBoundedElastic(Runtime
.getRuntime()
.availableProcessors(), Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "UnscoredUnsortedExecutor");
@Override @Override
public Mono<LuceneShardSearcher> createShardSearcher(LocalQueryParams queryParams) { public Mono<LuceneShardSearcher> createShardSearcher(LocalQueryParams queryParams) {
return Mono return Mono
@ -62,7 +67,7 @@ public class UnscoredUnsortedContinuousLuceneMultiSearcher implements LuceneMult
} }
@Override @Override
protected void doSetNextReader(LeafReaderContext context) throws IOException { protected void doSetNextReader(LeafReaderContext context) {
this.context = context; this.context = context;
} }
@ -109,7 +114,7 @@ public class UnscoredUnsortedContinuousLuceneMultiSearcher implements LuceneMult
} }
collector.setShardIndex(collectorShardIndex); collector.setShardIndex(collectorShardIndex);
remainingCollectors.incrementAndGet(); remainingCollectors.incrementAndGet();
Schedulers.boundedElastic().schedule(() -> { UNSCORED_UNSORTED_EXECUTOR.schedule(() -> {
try { try {
indexSearcher.search(queryParams.query(), collector); indexSearcher.search(queryParams.query(), collector);