From c927614964b56755b002d4a7323d87052a132a5d Mon Sep 17 00:00:00 2001 From: levlam Date: Tue, 25 Jul 2023 22:26:12 +0300 Subject: [PATCH] Improve threads usage. --- telegram-bot-api/ClientManager.cpp | 12 +++++------ telegram-bot-api/ClientParameters.h | 29 +++++++++++++++++++++++++++ telegram-bot-api/HttpServer.h | 9 +++------ telegram-bot-api/WebhookActor.cpp | 7 +++---- telegram-bot-api/WebhookActor.h | 1 - telegram-bot-api/telegram-bot-api.cpp | 25 +++++++++-------------- 6 files changed, 50 insertions(+), 33 deletions(-) diff --git a/telegram-bot-api/ClientManager.cpp b/telegram-bot-api/ClientManager.cpp index 210d215..1bad19d 100644 --- a/telegram-bot-api/ClientManager.cpp +++ b/telegram-bot-api/ClientManager.cpp @@ -303,9 +303,6 @@ td::int64 ClientManager::get_tqueue_id(td::int64 user_id, bool is_test_dc) { } void ClientManager::start_up() { - //NB: the same scheduler as for database in Td - auto scheduler_id = 1; - // init tqueue { auto load_start_time = td::Time::now(); @@ -333,7 +330,8 @@ void ClientManager::start_up() { } } - auto concurrent_binlog = std::make_shared(std::move(binlog), scheduler_id); + auto concurrent_binlog = + std::make_shared(std::move(binlog), SharedData::get_database_scheduler_id()); auto concurrent_tqueue_binlog = td::make_unique>(); concurrent_tqueue_binlog->set_binlog(std::move(concurrent_binlog)); tqueue->set_callback(std::move(concurrent_tqueue_binlog)); @@ -348,7 +346,7 @@ void ClientManager::start_up() { // init webhook_db auto concurrent_webhook_db = td::make_unique>(); auto status = concurrent_webhook_db->init(parameters_->working_directory_ + "webhooks_db.binlog", td::DbKey::empty(), - scheduler_id); + SharedData::get_database_scheduler_id()); LOG_IF(FATAL, status.is_error()) << "Can't open webhooks_db.binlog " << status; parameters_->shared_data_->webhook_db_ = std::move(concurrent_webhook_db); @@ -365,8 +363,8 @@ void ClientManager::start_up() { } // launch watchdog - watchdog_id_ = td::create_actor_on_scheduler( - "ManagerWatchdog", td::Scheduler::instance()->sched_count() - 3, td::this_thread::get_id(), WATCHDOG_TIMEOUT); + watchdog_id_ = td::create_actor_on_scheduler("ManagerWatchdog", SharedData::get_watchdog_scheduler_id(), + td::this_thread::get_id(), WATCHDOG_TIMEOUT); set_timeout_in(600.0); } diff --git a/telegram-bot-api/ClientParameters.h b/telegram-bot-api/ClientParameters.h index 6956388..b233138 100644 --- a/telegram-bot-api/ClientParameters.h +++ b/telegram-bot-api/ClientParameters.h @@ -62,6 +62,35 @@ struct SharedData { // the same scheduler as for file GC in Td return 2; } + + static td::int32 get_client_scheduler_id() { + // the thread for ClientManager and all Clients + return 4; + } + + static td::int32 get_watchdog_scheduler_id() { + // the thread for watchdogs + return 5; + } + + static td::int32 get_slow_incoming_http_scheduler_id() { + // the thread for slow incoming HTTP connections + return 6; + } + + static td::int32 get_slow_outgoing_http_scheduler_id() { + // the thread for slow outgoing HTTP connections + return 7; + } + + static td::int32 get_dns_resolver_scheduler_id() { + // the thread for DNS resolving + return 8; + } + + static td::int32 get_thread_count() { + return 9; + } }; struct ClientParameters { diff --git a/telegram-bot-api/HttpServer.h b/telegram-bot-api/HttpServer.h index 8686a3f..c2398ee 100644 --- a/telegram-bot-api/HttpServer.h +++ b/telegram-bot-api/HttpServer.h @@ -6,6 +6,8 @@ // #pragma once +#include "telegram-bot-api/ClientParameters.h" + #include "td/net/HttpInboundConnection.h" #include "td/net/TcpListener.h" @@ -61,13 +63,8 @@ class HttpServer final : public td::TcpListener::Callback { } void accept(td::SocketFd fd) final { - auto scheduler_count = td::Scheduler::instance()->sched_count(); - auto scheduler_id = scheduler_count - 1; - if (scheduler_id > 0) { - scheduler_id--; - } td::create_actor("HttpInboundConnection", td::BufferedFd(std::move(fd)), 0, - 50, 500, creator_(), scheduler_id) + 50, 500, creator_(), SharedData::get_slow_incoming_http_scheduler_id()) .release(); } diff --git a/telegram-bot-api/WebhookActor.cpp b/telegram-bot-api/WebhookActor.cpp index b07a867..1cd3661 100644 --- a/telegram-bot-api/WebhookActor.cpp +++ b/telegram-bot-api/WebhookActor.cpp @@ -47,10 +47,8 @@ WebhookActor::WebhookActor(td::ActorShared callback, td::int64 tqueue_ , fix_ip_address_(fix_ip_address) , from_db_flag_(from_db_flag) , max_connections_(max_connections) - , secret_token_(std::move(secret_token)) - , slow_scheduler_id_(td::Scheduler::instance()->sched_count() - 2) { + , secret_token_(std::move(secret_token)) { CHECK(max_connections_ > 0); - CHECK(slow_scheduler_id_ > 0); if (!cached_ip_address.empty()) { auto r_ip_address = td::IPAddress::get_ip_address(cached_ip_address); @@ -230,7 +228,8 @@ td::Status WebhookActor::create_connection(td::BufferedFd fd) { auto *conn = connections_.get(id); conn->actor_id_ = td::create_actor( PSLICE() << "Connect:" << id, std::move(fd), std::move(ssl_stream), 0, 50, 60, - td::ActorShared(actor_id(this), id), slow_scheduler_id_); + td::ActorShared(actor_id(this), id), + SharedData::get_slow_outgoing_http_scheduler_id()); conn->ip_generation_ = ip_generation_; conn->event_id_ = {}; conn->id_ = id; diff --git a/telegram-bot-api/WebhookActor.h b/telegram-bot-api/WebhookActor.h index 926a914..4c101a3 100644 --- a/telegram-bot-api/WebhookActor.h +++ b/telegram-bot-api/WebhookActor.h @@ -177,7 +177,6 @@ class WebhookActor final : public td::HttpOutboundConnection::Callback { double last_success_time_ = 0; double wakeup_at_ = 0; bool last_update_was_successful_ = true; - td::int32 slow_scheduler_id_ = -1; void relax_wakeup_at(double wakeup_at, const char *source); diff --git a/telegram-bot-api/telegram-bot-api.cpp b/telegram-bot-api/telegram-bot-api.cpp index 36bde52..11bae39 100644 --- a/telegram-bot-api/telegram-bot-api.cpp +++ b/telegram-bot-api/telegram-bot-api.cpp @@ -459,27 +459,22 @@ int main(int argc, char *argv[]) { // << (td::GitInfo::is_dirty() ? "(dirty)" : "") << " started"; LOG(WARNING) << "Bot API " << parameters->version_ << " server started"; - // +3 threads for Td - // one thread for ClientManager and all Clients - // one thread for watchdogs - // one thread for slow HTTP connections - // one thread for DNS resolving - const int thread_count = 7; - td::ConcurrentScheduler sched(thread_count, cpu_affinity); + td::ConcurrentScheduler sched(SharedData::get_thread_count() - 1, cpu_affinity); td::GetHostByNameActor::Options get_host_by_name_options; - get_host_by_name_options.scheduler_id = thread_count; + get_host_by_name_options.scheduler_id = SharedData::get_dns_resolver_scheduler_id(); parameters->get_host_by_name_actor_id_ = sched.create_actor_unsafe(0, "GetHostByName", std::move(get_host_by_name_options)) .release(); - auto client_manager = - sched.create_actor_unsafe(thread_count - 3, "ClientManager", std::move(parameters), token_range) - .release(); + auto client_manager = sched + .create_actor_unsafe(SharedData::get_client_scheduler_id(), "ClientManager", + std::move(parameters), token_range) + .release(); sched .create_actor_unsafe( - thread_count - 3, "HttpServer", http_ip_address, http_port, + SharedData::get_client_scheduler_id(), "HttpServer", http_ip_address, http_port, [client_manager, shared_data] { return td::ActorOwn( td::create_actor("HttpConnection", client_manager, shared_data)); @@ -489,7 +484,7 @@ int main(int argc, char *argv[]) { if (http_stat_port != 0) { sched .create_actor_unsafe( - thread_count - 3, "HttpStatsServer", http_stat_ip_address, http_stat_port, + SharedData::get_client_scheduler_id(), "HttpStatsServer", http_stat_ip_address, http_stat_port, [client_manager] { return td::ActorOwn( td::create_actor("HttpStatConnection", client_manager)); @@ -498,8 +493,8 @@ int main(int argc, char *argv[]) { } constexpr double WATCHDOG_TIMEOUT = 0.25; - auto watchdog_id = - sched.create_actor_unsafe(thread_count - 2, "Watchdog", td::this_thread::get_id(), WATCHDOG_TIMEOUT); + auto watchdog_id = sched.create_actor_unsafe(SharedData::get_watchdog_scheduler_id(), "Watchdog", + td::this_thread::get_id(), WATCHDOG_TIMEOUT); sched.start();