mirror of
https://github.com/tdlight-team/tdlight-telegram-bot-api.git
synced 2024-11-05 19:47:20 +01:00
Improve threads usage.
This commit is contained in:
parent
ec8e44de5a
commit
c927614964
@ -303,9 +303,6 @@ td::int64 ClientManager::get_tqueue_id(td::int64 user_id, bool is_test_dc) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void ClientManager::start_up() {
|
void ClientManager::start_up() {
|
||||||
//NB: the same scheduler as for database in Td
|
|
||||||
auto scheduler_id = 1;
|
|
||||||
|
|
||||||
// init tqueue
|
// init tqueue
|
||||||
{
|
{
|
||||||
auto load_start_time = td::Time::now();
|
auto load_start_time = td::Time::now();
|
||||||
@ -333,7 +330,8 @@ void ClientManager::start_up() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
auto concurrent_binlog = std::make_shared<td::ConcurrentBinlog>(std::move(binlog), scheduler_id);
|
auto concurrent_binlog =
|
||||||
|
std::make_shared<td::ConcurrentBinlog>(std::move(binlog), SharedData::get_database_scheduler_id());
|
||||||
auto concurrent_tqueue_binlog = td::make_unique<td::TQueueBinlog<td::BinlogInterface>>();
|
auto concurrent_tqueue_binlog = td::make_unique<td::TQueueBinlog<td::BinlogInterface>>();
|
||||||
concurrent_tqueue_binlog->set_binlog(std::move(concurrent_binlog));
|
concurrent_tqueue_binlog->set_binlog(std::move(concurrent_binlog));
|
||||||
tqueue->set_callback(std::move(concurrent_tqueue_binlog));
|
tqueue->set_callback(std::move(concurrent_tqueue_binlog));
|
||||||
@ -348,7 +346,7 @@ void ClientManager::start_up() {
|
|||||||
// init webhook_db
|
// init webhook_db
|
||||||
auto concurrent_webhook_db = td::make_unique<td::BinlogKeyValue<td::ConcurrentBinlog>>();
|
auto concurrent_webhook_db = td::make_unique<td::BinlogKeyValue<td::ConcurrentBinlog>>();
|
||||||
auto status = concurrent_webhook_db->init(parameters_->working_directory_ + "webhooks_db.binlog", td::DbKey::empty(),
|
auto status = concurrent_webhook_db->init(parameters_->working_directory_ + "webhooks_db.binlog", td::DbKey::empty(),
|
||||||
scheduler_id);
|
SharedData::get_database_scheduler_id());
|
||||||
LOG_IF(FATAL, status.is_error()) << "Can't open webhooks_db.binlog " << status;
|
LOG_IF(FATAL, status.is_error()) << "Can't open webhooks_db.binlog " << status;
|
||||||
parameters_->shared_data_->webhook_db_ = std::move(concurrent_webhook_db);
|
parameters_->shared_data_->webhook_db_ = std::move(concurrent_webhook_db);
|
||||||
|
|
||||||
@ -365,8 +363,8 @@ void ClientManager::start_up() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// launch watchdog
|
// launch watchdog
|
||||||
watchdog_id_ = td::create_actor_on_scheduler<Watchdog>(
|
watchdog_id_ = td::create_actor_on_scheduler<Watchdog>("ManagerWatchdog", SharedData::get_watchdog_scheduler_id(),
|
||||||
"ManagerWatchdog", td::Scheduler::instance()->sched_count() - 3, td::this_thread::get_id(), WATCHDOG_TIMEOUT);
|
td::this_thread::get_id(), WATCHDOG_TIMEOUT);
|
||||||
set_timeout_in(600.0);
|
set_timeout_in(600.0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -62,6 +62,35 @@ struct SharedData {
|
|||||||
// the same scheduler as for file GC in Td
|
// the same scheduler as for file GC in Td
|
||||||
return 2;
|
return 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static td::int32 get_client_scheduler_id() {
|
||||||
|
// the thread for ClientManager and all Clients
|
||||||
|
return 4;
|
||||||
|
}
|
||||||
|
|
||||||
|
static td::int32 get_watchdog_scheduler_id() {
|
||||||
|
// the thread for watchdogs
|
||||||
|
return 5;
|
||||||
|
}
|
||||||
|
|
||||||
|
static td::int32 get_slow_incoming_http_scheduler_id() {
|
||||||
|
// the thread for slow incoming HTTP connections
|
||||||
|
return 6;
|
||||||
|
}
|
||||||
|
|
||||||
|
static td::int32 get_slow_outgoing_http_scheduler_id() {
|
||||||
|
// the thread for slow outgoing HTTP connections
|
||||||
|
return 7;
|
||||||
|
}
|
||||||
|
|
||||||
|
static td::int32 get_dns_resolver_scheduler_id() {
|
||||||
|
// the thread for DNS resolving
|
||||||
|
return 8;
|
||||||
|
}
|
||||||
|
|
||||||
|
static td::int32 get_thread_count() {
|
||||||
|
return 9;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
struct ClientParameters {
|
struct ClientParameters {
|
||||||
|
@ -6,6 +6,8 @@
|
|||||||
//
|
//
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include "telegram-bot-api/ClientParameters.h"
|
||||||
|
|
||||||
#include "td/net/HttpInboundConnection.h"
|
#include "td/net/HttpInboundConnection.h"
|
||||||
#include "td/net/TcpListener.h"
|
#include "td/net/TcpListener.h"
|
||||||
|
|
||||||
@ -61,13 +63,8 @@ class HttpServer final : public td::TcpListener::Callback {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void accept(td::SocketFd fd) final {
|
void accept(td::SocketFd fd) final {
|
||||||
auto scheduler_count = td::Scheduler::instance()->sched_count();
|
|
||||||
auto scheduler_id = scheduler_count - 1;
|
|
||||||
if (scheduler_id > 0) {
|
|
||||||
scheduler_id--;
|
|
||||||
}
|
|
||||||
td::create_actor<td::HttpInboundConnection>("HttpInboundConnection", td::BufferedFd<td::SocketFd>(std::move(fd)), 0,
|
td::create_actor<td::HttpInboundConnection>("HttpInboundConnection", td::BufferedFd<td::SocketFd>(std::move(fd)), 0,
|
||||||
50, 500, creator_(), scheduler_id)
|
50, 500, creator_(), SharedData::get_slow_incoming_http_scheduler_id())
|
||||||
.release();
|
.release();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -47,10 +47,8 @@ WebhookActor::WebhookActor(td::ActorShared<Callback> callback, td::int64 tqueue_
|
|||||||
, fix_ip_address_(fix_ip_address)
|
, fix_ip_address_(fix_ip_address)
|
||||||
, from_db_flag_(from_db_flag)
|
, from_db_flag_(from_db_flag)
|
||||||
, max_connections_(max_connections)
|
, max_connections_(max_connections)
|
||||||
, secret_token_(std::move(secret_token))
|
, secret_token_(std::move(secret_token)) {
|
||||||
, slow_scheduler_id_(td::Scheduler::instance()->sched_count() - 2) {
|
|
||||||
CHECK(max_connections_ > 0);
|
CHECK(max_connections_ > 0);
|
||||||
CHECK(slow_scheduler_id_ > 0);
|
|
||||||
|
|
||||||
if (!cached_ip_address.empty()) {
|
if (!cached_ip_address.empty()) {
|
||||||
auto r_ip_address = td::IPAddress::get_ip_address(cached_ip_address);
|
auto r_ip_address = td::IPAddress::get_ip_address(cached_ip_address);
|
||||||
@ -230,7 +228,8 @@ td::Status WebhookActor::create_connection(td::BufferedFd<td::SocketFd> fd) {
|
|||||||
auto *conn = connections_.get(id);
|
auto *conn = connections_.get(id);
|
||||||
conn->actor_id_ = td::create_actor<td::HttpOutboundConnection>(
|
conn->actor_id_ = td::create_actor<td::HttpOutboundConnection>(
|
||||||
PSLICE() << "Connect:" << id, std::move(fd), std::move(ssl_stream), 0, 50, 60,
|
PSLICE() << "Connect:" << id, std::move(fd), std::move(ssl_stream), 0, 50, 60,
|
||||||
td::ActorShared<td::HttpOutboundConnection::Callback>(actor_id(this), id), slow_scheduler_id_);
|
td::ActorShared<td::HttpOutboundConnection::Callback>(actor_id(this), id),
|
||||||
|
SharedData::get_slow_outgoing_http_scheduler_id());
|
||||||
conn->ip_generation_ = ip_generation_;
|
conn->ip_generation_ = ip_generation_;
|
||||||
conn->event_id_ = {};
|
conn->event_id_ = {};
|
||||||
conn->id_ = id;
|
conn->id_ = id;
|
||||||
|
@ -177,7 +177,6 @@ class WebhookActor final : public td::HttpOutboundConnection::Callback {
|
|||||||
double last_success_time_ = 0;
|
double last_success_time_ = 0;
|
||||||
double wakeup_at_ = 0;
|
double wakeup_at_ = 0;
|
||||||
bool last_update_was_successful_ = true;
|
bool last_update_was_successful_ = true;
|
||||||
td::int32 slow_scheduler_id_ = -1;
|
|
||||||
|
|
||||||
void relax_wakeup_at(double wakeup_at, const char *source);
|
void relax_wakeup_at(double wakeup_at, const char *source);
|
||||||
|
|
||||||
|
@ -459,27 +459,22 @@ int main(int argc, char *argv[]) {
|
|||||||
// << (td::GitInfo::is_dirty() ? "(dirty)" : "") << " started";
|
// << (td::GitInfo::is_dirty() ? "(dirty)" : "") << " started";
|
||||||
LOG(WARNING) << "Bot API " << parameters->version_ << " server started";
|
LOG(WARNING) << "Bot API " << parameters->version_ << " server started";
|
||||||
|
|
||||||
// +3 threads for Td
|
td::ConcurrentScheduler sched(SharedData::get_thread_count() - 1, cpu_affinity);
|
||||||
// one thread for ClientManager and all Clients
|
|
||||||
// one thread for watchdogs
|
|
||||||
// one thread for slow HTTP connections
|
|
||||||
// one thread for DNS resolving
|
|
||||||
const int thread_count = 7;
|
|
||||||
td::ConcurrentScheduler sched(thread_count, cpu_affinity);
|
|
||||||
|
|
||||||
td::GetHostByNameActor::Options get_host_by_name_options;
|
td::GetHostByNameActor::Options get_host_by_name_options;
|
||||||
get_host_by_name_options.scheduler_id = thread_count;
|
get_host_by_name_options.scheduler_id = SharedData::get_dns_resolver_scheduler_id();
|
||||||
parameters->get_host_by_name_actor_id_ =
|
parameters->get_host_by_name_actor_id_ =
|
||||||
sched.create_actor_unsafe<td::GetHostByNameActor>(0, "GetHostByName", std::move(get_host_by_name_options))
|
sched.create_actor_unsafe<td::GetHostByNameActor>(0, "GetHostByName", std::move(get_host_by_name_options))
|
||||||
.release();
|
.release();
|
||||||
|
|
||||||
auto client_manager =
|
auto client_manager = sched
|
||||||
sched.create_actor_unsafe<ClientManager>(thread_count - 3, "ClientManager", std::move(parameters), token_range)
|
.create_actor_unsafe<ClientManager>(SharedData::get_client_scheduler_id(), "ClientManager",
|
||||||
|
std::move(parameters), token_range)
|
||||||
.release();
|
.release();
|
||||||
|
|
||||||
sched
|
sched
|
||||||
.create_actor_unsafe<HttpServer>(
|
.create_actor_unsafe<HttpServer>(
|
||||||
thread_count - 3, "HttpServer", http_ip_address, http_port,
|
SharedData::get_client_scheduler_id(), "HttpServer", http_ip_address, http_port,
|
||||||
[client_manager, shared_data] {
|
[client_manager, shared_data] {
|
||||||
return td::ActorOwn<td::HttpInboundConnection::Callback>(
|
return td::ActorOwn<td::HttpInboundConnection::Callback>(
|
||||||
td::create_actor<HttpConnection>("HttpConnection", client_manager, shared_data));
|
td::create_actor<HttpConnection>("HttpConnection", client_manager, shared_data));
|
||||||
@ -489,7 +484,7 @@ int main(int argc, char *argv[]) {
|
|||||||
if (http_stat_port != 0) {
|
if (http_stat_port != 0) {
|
||||||
sched
|
sched
|
||||||
.create_actor_unsafe<HttpServer>(
|
.create_actor_unsafe<HttpServer>(
|
||||||
thread_count - 3, "HttpStatsServer", http_stat_ip_address, http_stat_port,
|
SharedData::get_client_scheduler_id(), "HttpStatsServer", http_stat_ip_address, http_stat_port,
|
||||||
[client_manager] {
|
[client_manager] {
|
||||||
return td::ActorOwn<td::HttpInboundConnection::Callback>(
|
return td::ActorOwn<td::HttpInboundConnection::Callback>(
|
||||||
td::create_actor<HttpStatConnection>("HttpStatConnection", client_manager));
|
td::create_actor<HttpStatConnection>("HttpStatConnection", client_manager));
|
||||||
@ -498,8 +493,8 @@ int main(int argc, char *argv[]) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
constexpr double WATCHDOG_TIMEOUT = 0.25;
|
constexpr double WATCHDOG_TIMEOUT = 0.25;
|
||||||
auto watchdog_id =
|
auto watchdog_id = sched.create_actor_unsafe<Watchdog>(SharedData::get_watchdog_scheduler_id(), "Watchdog",
|
||||||
sched.create_actor_unsafe<Watchdog>(thread_count - 2, "Watchdog", td::this_thread::get_id(), WATCHDOG_TIMEOUT);
|
td::this_thread::get_id(), WATCHDOG_TIMEOUT);
|
||||||
|
|
||||||
sched.start();
|
sched.start();
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user