Allow to specify affinity mask for concurrent scheduler threads.

This commit is contained in:
levlam 2022-09-14 14:49:48 +03:00
parent 0eddd8d405
commit 70e3586626
7 changed files with 21 additions and 16 deletions

View File

@ -75,7 +75,7 @@ class Server final : public td::TcpListener::Callback {
int main() {
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
auto scheduler = td::make_unique<td::ConcurrentScheduler>();
scheduler->init(N);
scheduler->init(N, 0);
scheduler->create_actor_unsafe<Server>(0, "Server").release();
scheduler->start();
while (scheduler->run_main(10)) {

View File

@ -122,7 +122,7 @@ class Server final : public td::TcpListener::Callback {
int main() {
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
auto scheduler = td::make_unique<td::ConcurrentScheduler>();
scheduler->init(N);
scheduler->init(N, 0);
scheduler->create_actor_unsafe<Server>(0, "Server").release();
scheduler->start();
while (scheduler->run_main(10)) {

View File

@ -107,7 +107,7 @@ class Server final : public td::TcpListener::Callback {
int main() {
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
auto scheduler = td::make_unique<td::ConcurrentScheduler>();
scheduler->init(N);
scheduler->init(N, 0);
scheduler->create_actor_unsafe<Server>(0, "Server").release();
scheduler->start();
while (scheduler->run_main(10)) {

View File

@ -99,7 +99,7 @@ class ClientManager::Impl final {
CHECK(options_.net_query_stats == nullptr);
options_.net_query_stats = std::make_shared<NetQueryStats>();
concurrent_scheduler_ = make_unique<ConcurrentScheduler>();
concurrent_scheduler_->init(0);
concurrent_scheduler_->init(0, 0);
concurrent_scheduler_->start();
}
tds_[client_id] =
@ -355,7 +355,7 @@ class MultiImpl {
explicit MultiImpl(std::shared_ptr<NetQueryStats> net_query_stats) {
concurrent_scheduler_ = std::make_shared<ConcurrentScheduler>();
concurrent_scheduler_->init(ADDITIONAL_THREAD_COUNT);
concurrent_scheduler_->init(ADDITIONAL_THREAD_COUNT, 0);
concurrent_scheduler_->start();
{

View File

@ -37,7 +37,7 @@ class MainActor final : public td::Actor {
int main() {
td::ConcurrentScheduler scheduler;
scheduler.init(4 /*threads_count*/);
scheduler.init(4 /*thread_count*/, 0);
scheduler.start();
{
auto guard = scheduler.get_main_guard();

View File

@ -15,18 +15,19 @@
namespace td {
void ConcurrentScheduler::init(int32 threads_n) {
void ConcurrentScheduler::init(int32 additional_thread_count, uint64 thread_affinity_mask) {
#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
threads_n = 0;
additional_thread_count = 0;
#endif
threads_n++;
std::vector<std::shared_ptr<MpscPollableQueue<EventFull>>> outbound(threads_n);
additional_thread_count++;
std::vector<std::shared_ptr<MpscPollableQueue<EventFull>>> outbound(additional_thread_count);
#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
for (int32 i = 0; i < threads_n; i++) {
for (int32 i = 0; i < additional_thread_count; i++) {
auto queue = std::make_shared<MpscPollableQueue<EventFull>>();
queue->init();
outbound[i] = queue;
}
thread_affinity_mask_ = thread_affinity_mask;
#endif
// +1 for extra scheduler for IOCP and send_closure from unrelated threads
@ -37,13 +38,13 @@ void ConcurrentScheduler::init(int32 threads_n) {
extra_scheduler_ = 0;
#endif
schedulers_.resize(threads_n + extra_scheduler_);
for (int32 i = 0; i < threads_n + extra_scheduler_; i++) {
schedulers_.resize(additional_thread_count + extra_scheduler_);
for (int32 i = 0; i < additional_thread_count + extra_scheduler_; i++) {
auto &sched = schedulers_[i];
sched = make_unique<Scheduler>();
#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
if (i >= threads_n) {
if (i >= additional_thread_count) {
auto queue = std::make_shared<MpscPollableQueue<EventFull>>();
queue->init();
outbound.push_back(std::move(queue));
@ -75,10 +76,13 @@ void ConcurrentScheduler::start() {
#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
for (size_t i = 1; i + extra_scheduler_ < schedulers_.size(); i++) {
auto &sched = schedulers_[i];
threads_.push_back(td::thread([&] {
threads_.push_back(td::thread([&, thread_affinity_mask = thread_affinity_mask_] {
#if TD_PORT_WINDOWS
detail::Iocp::Guard iocp_guard(iocp_.get());
#endif
if (thread_affinity_mask != 0) {
thread::set_affinity_mask(this_thread::get_id(), thread_affinity_mask);
}
while (!is_finished()) {
sched->run(Timestamp::in(10));
}

View File

@ -26,7 +26,7 @@ namespace td {
class ConcurrentScheduler final : private Scheduler::Callback {
public:
void init(int32 threads_n);
void init(int32 additional_thread_count, uint64 thread_affinity_mask = 0);
void finish_async() {
schedulers_[0]->finish();
@ -90,6 +90,7 @@ class ConcurrentScheduler final : private Scheduler::Callback {
std::atomic<bool> is_finished_{false};
#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
vector<td::thread> threads_;
uint64 thread_affinity_mask_ = 0;
#endif
#if TD_PORT_WINDOWS
unique_ptr<detail::Iocp> iocp_;