Fix deadlock

This commit is contained in:
Andrea Cavalli 2021-11-29 14:01:57 +01:00
parent 3d9247c969
commit d8de969bee
2 changed files with 4 additions and 13 deletions

View File

@ -358,17 +358,10 @@ public class LuceneUtils {
return hitsFlux
.mapNotNull(hit -> mapHitBlocking(hit, indexSearchers, keyFieldName));
} else {
// Compute parallelism
var availableProcessors = Runtime.getRuntime().availableProcessors();
var min = Queues.XS_BUFFER_SIZE;
var maxParallelGroups = Math.max(availableProcessors, min);
return hitsFlux
.groupBy(hit -> hit.shardIndex % maxParallelGroups) // Max n groups
.flatMap(shardHits -> shardHits
.mapNotNull(hit -> mapHitBlocking(hit, indexSearchers, keyFieldName)),
maxParallelGroups // Max n concurrency. Concurrency must be >= total groups count
);
.flatMap(hit -> Mono
.fromCallable(() -> mapHitBlocking(hit, indexSearchers, keyFieldName))
.subscribeOn(Schedulers.boundedElastic()));
}
}

View File

@ -74,9 +74,7 @@ public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher {
scoreDocsSink.complete();
}
}
}, OverflowStrategy.ERROR)
.subscribeOn(Schedulers.boundedElastic(), false)
.limitRate(1024);
}, OverflowStrategy.ERROR).subscribeOn(Schedulers.boundedElastic());
Flux<LLKeyScore> resultsFlux = LuceneUtils.convertHits(scoreDocsFlux, shards, keyFieldName, false);