From 70e3586626f8fe3edb878eb94b36abb1dafdf0cb Mon Sep 17 00:00:00 2001 From: levlam Date: Wed, 14 Sep 2022 14:49:48 +0300 Subject: [PATCH] Allow to specify affinity mask for concurrent scheduler threads. --- benchmark/bench_http_server.cpp | 2 +- benchmark/bench_http_server_cheat.cpp | 2 +- benchmark/bench_http_server_fast.cpp | 2 +- td/telegram/Client.cpp | 4 ++-- tdactor/example/example.cpp | 2 +- tdactor/td/actor/ConcurrentScheduler.cpp | 22 +++++++++++++--------- tdactor/td/actor/ConcurrentScheduler.h | 3 ++- 7 files changed, 21 insertions(+), 16 deletions(-) diff --git a/benchmark/bench_http_server.cpp b/benchmark/bench_http_server.cpp index ded5cfa81..7ce892a48 100644 --- a/benchmark/bench_http_server.cpp +++ b/benchmark/bench_http_server.cpp @@ -75,7 +75,7 @@ class Server final : public td::TcpListener::Callback { int main() { SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); auto scheduler = td::make_unique(); - scheduler->init(N); + scheduler->init(N, 0); scheduler->create_actor_unsafe(0, "Server").release(); scheduler->start(); while (scheduler->run_main(10)) { diff --git a/benchmark/bench_http_server_cheat.cpp b/benchmark/bench_http_server_cheat.cpp index fe145f326..8661ba4c1 100644 --- a/benchmark/bench_http_server_cheat.cpp +++ b/benchmark/bench_http_server_cheat.cpp @@ -122,7 +122,7 @@ class Server final : public td::TcpListener::Callback { int main() { SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); auto scheduler = td::make_unique(); - scheduler->init(N); + scheduler->init(N, 0); scheduler->create_actor_unsafe(0, "Server").release(); scheduler->start(); while (scheduler->run_main(10)) { diff --git a/benchmark/bench_http_server_fast.cpp b/benchmark/bench_http_server_fast.cpp index d46273cab..1c272d96b 100644 --- a/benchmark/bench_http_server_fast.cpp +++ b/benchmark/bench_http_server_fast.cpp @@ -107,7 +107,7 @@ class Server final : public td::TcpListener::Callback { int main() { SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); auto scheduler = td::make_unique(); - scheduler->init(N); + scheduler->init(N, 0); scheduler->create_actor_unsafe(0, "Server").release(); scheduler->start(); while (scheduler->run_main(10)) { diff --git a/td/telegram/Client.cpp b/td/telegram/Client.cpp index 101f54c23..00003d933 100644 --- a/td/telegram/Client.cpp +++ b/td/telegram/Client.cpp @@ -99,7 +99,7 @@ class ClientManager::Impl final { CHECK(options_.net_query_stats == nullptr); options_.net_query_stats = std::make_shared(); concurrent_scheduler_ = make_unique(); - concurrent_scheduler_->init(0); + concurrent_scheduler_->init(0, 0); concurrent_scheduler_->start(); } tds_[client_id] = @@ -355,7 +355,7 @@ class MultiImpl { explicit MultiImpl(std::shared_ptr net_query_stats) { concurrent_scheduler_ = std::make_shared(); - concurrent_scheduler_->init(ADDITIONAL_THREAD_COUNT); + concurrent_scheduler_->init(ADDITIONAL_THREAD_COUNT, 0); concurrent_scheduler_->start(); { diff --git a/tdactor/example/example.cpp b/tdactor/example/example.cpp index ef85aad6b..fb5ea6507 100644 --- a/tdactor/example/example.cpp +++ b/tdactor/example/example.cpp @@ -37,7 +37,7 @@ class MainActor final : public td::Actor { int main() { td::ConcurrentScheduler scheduler; - scheduler.init(4 /*threads_count*/); + scheduler.init(4 /*thread_count*/, 0); scheduler.start(); { auto guard = scheduler.get_main_guard(); diff --git a/tdactor/td/actor/ConcurrentScheduler.cpp b/tdactor/td/actor/ConcurrentScheduler.cpp index f944eb459..eb4557452 100644 --- a/tdactor/td/actor/ConcurrentScheduler.cpp +++ b/tdactor/td/actor/ConcurrentScheduler.cpp @@ -15,18 +15,19 @@ namespace td { -void ConcurrentScheduler::init(int32 threads_n) { +void ConcurrentScheduler::init(int32 additional_thread_count, uint64 thread_affinity_mask) { #if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED - threads_n = 0; + additional_thread_count = 0; #endif - threads_n++; - std::vector>> outbound(threads_n); + additional_thread_count++; + std::vector>> outbound(additional_thread_count); #if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED - for (int32 i = 0; i < threads_n; i++) { + for (int32 i = 0; i < additional_thread_count; i++) { auto queue = std::make_shared>(); queue->init(); outbound[i] = queue; } + thread_affinity_mask_ = thread_affinity_mask; #endif // +1 for extra scheduler for IOCP and send_closure from unrelated threads @@ -37,13 +38,13 @@ void ConcurrentScheduler::init(int32 threads_n) { extra_scheduler_ = 0; #endif - schedulers_.resize(threads_n + extra_scheduler_); - for (int32 i = 0; i < threads_n + extra_scheduler_; i++) { + schedulers_.resize(additional_thread_count + extra_scheduler_); + for (int32 i = 0; i < additional_thread_count + extra_scheduler_; i++) { auto &sched = schedulers_[i]; sched = make_unique(); #if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED - if (i >= threads_n) { + if (i >= additional_thread_count) { auto queue = std::make_shared>(); queue->init(); outbound.push_back(std::move(queue)); @@ -75,10 +76,13 @@ void ConcurrentScheduler::start() { #if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED for (size_t i = 1; i + extra_scheduler_ < schedulers_.size(); i++) { auto &sched = schedulers_[i]; - threads_.push_back(td::thread([&] { + threads_.push_back(td::thread([&, thread_affinity_mask = thread_affinity_mask_] { #if TD_PORT_WINDOWS detail::Iocp::Guard iocp_guard(iocp_.get()); #endif + if (thread_affinity_mask != 0) { + thread::set_affinity_mask(this_thread::get_id(), thread_affinity_mask); + } while (!is_finished()) { sched->run(Timestamp::in(10)); } diff --git a/tdactor/td/actor/ConcurrentScheduler.h b/tdactor/td/actor/ConcurrentScheduler.h index 0cf8c9a2c..f833848d2 100644 --- a/tdactor/td/actor/ConcurrentScheduler.h +++ b/tdactor/td/actor/ConcurrentScheduler.h @@ -26,7 +26,7 @@ namespace td { class ConcurrentScheduler final : private Scheduler::Callback { public: - void init(int32 threads_n); + void init(int32 additional_thread_count, uint64 thread_affinity_mask = 0); void finish_async() { schedulers_[0]->finish(); @@ -90,6 +90,7 @@ class ConcurrentScheduler final : private Scheduler::Callback { std::atomic is_finished_{false}; #if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED vector threads_; + uint64 thread_affinity_mask_ = 0; #endif #if TD_PORT_WINDOWS unique_ptr iocp_;