Add dedicated threads for TQueue and webhook databases and webhook certificate processing.

This commit is contained in:
levlam 2023-07-25 22:32:05 +03:00
parent c927614964
commit 9f688af4fb
4 changed files with 21 additions and 15 deletions

View File

@ -9874,7 +9874,7 @@ void Client::webhook_error(td::Status status) {
void Client::webhook_closed(td::Status status) { void Client::webhook_closed(td::Status status) {
if (has_webhook_certificate_) { if (has_webhook_certificate_) {
td::Scheduler::instance()->run_on_scheduler(SharedData::get_database_scheduler_id(), td::Scheduler::instance()->run_on_scheduler(SharedData::get_webhook_certificate_scheduler_id(),
[actor_id = actor_id(this), path = get_webhook_certificate_path(), [actor_id = actor_id(this), path = get_webhook_certificate_path(),
status = std::move(status)](td::Unit) mutable { status = std::move(status)](td::Unit) mutable {
LOG(INFO) << "Unlink certificate " << path; LOG(INFO) << "Unlink certificate " << path;
@ -9987,7 +9987,7 @@ void Client::do_set_webhook(PromisedQueryPtr query, bool was_deleted) {
CHECK(!webhook_set_query_); CHECK(!webhook_set_query_);
active_webhook_set_query_ = std::move(query); active_webhook_set_query_ = std::move(query);
td::Scheduler::instance()->run_on_scheduler( td::Scheduler::instance()->run_on_scheduler(
SharedData::get_database_scheduler_id(), SharedData::get_webhook_certificate_scheduler_id(),
[actor_id = actor_id(this), from_path = cert_file_ptr->temp_file_name, [actor_id = actor_id(this), from_path = cert_file_ptr->temp_file_name,
to_path = get_webhook_certificate_path(), size](td::Unit) mutable { to_path = get_webhook_certificate_path(), size](td::Unit) mutable {
LOG(INFO) << "Copy certificate to " << to_path; LOG(INFO) << "Copy certificate to " << to_path;

View File

@ -331,7 +331,7 @@ void ClientManager::start_up() {
} }
auto concurrent_binlog = auto concurrent_binlog =
std::make_shared<td::ConcurrentBinlog>(std::move(binlog), SharedData::get_database_scheduler_id()); std::make_shared<td::ConcurrentBinlog>(std::move(binlog), SharedData::get_binlog_scheduler_id());
auto concurrent_tqueue_binlog = td::make_unique<td::TQueueBinlog<td::BinlogInterface>>(); auto concurrent_tqueue_binlog = td::make_unique<td::TQueueBinlog<td::BinlogInterface>>();
concurrent_tqueue_binlog->set_binlog(std::move(concurrent_binlog)); concurrent_tqueue_binlog->set_binlog(std::move(concurrent_binlog));
tqueue->set_callback(std::move(concurrent_tqueue_binlog)); tqueue->set_callback(std::move(concurrent_tqueue_binlog));
@ -346,7 +346,7 @@ void ClientManager::start_up() {
// init webhook_db // init webhook_db
auto concurrent_webhook_db = td::make_unique<td::BinlogKeyValue<td::ConcurrentBinlog>>(); auto concurrent_webhook_db = td::make_unique<td::BinlogKeyValue<td::ConcurrentBinlog>>();
auto status = concurrent_webhook_db->init(parameters_->working_directory_ + "webhooks_db.binlog", td::DbKey::empty(), auto status = concurrent_webhook_db->init(parameters_->working_directory_ + "webhooks_db.binlog", td::DbKey::empty(),
SharedData::get_database_scheduler_id()); SharedData::get_binlog_scheduler_id());
LOG_IF(FATAL, status.is_error()) << "Can't open webhooks_db.binlog " << status; LOG_IF(FATAL, status.is_error()) << "Can't open webhooks_db.binlog " << status;
parameters_->shared_data_->webhook_db_ = std::move(concurrent_webhook_db); parameters_->shared_data_->webhook_db_ = std::move(concurrent_webhook_db);

View File

@ -53,11 +53,6 @@ struct SharedData {
return static_cast<td::int32>(result); return static_cast<td::int32>(result);
} }
static td::int32 get_database_scheduler_id() {
// the same scheduler as for database in Td
return 1;
}
static td::int32 get_file_gc_scheduler_id() { static td::int32 get_file_gc_scheduler_id() {
// the same scheduler as for file GC in Td // the same scheduler as for file GC in Td
return 2; return 2;
@ -88,9 +83,19 @@ struct SharedData {
return 8; return 8;
} }
static td::int32 get_thread_count() { static td::int32 get_binlog_scheduler_id() {
// the thread for TQueue and webhook binlogs
return 9; return 9;
} }
static td::int32 get_webhook_certificate_scheduler_id() {
// the thread for webhook certificate processing
return 10;
}
static td::int32 get_thread_count() {
return 11;
}
}; };
struct ClientParameters { struct ClientParameters {

View File

@ -724,9 +724,10 @@ void WebhookActor::start_up() {
if (url_.protocol_ != td::HttpUrl::Protocol::Http && !stop_flag_) { if (url_.protocol_ != td::HttpUrl::Protocol::Http && !stop_flag_) {
// asynchronously create SSL context // asynchronously create SSL context
td::Scheduler::instance()->run_on_scheduler( td::Scheduler::instance()->run_on_scheduler(SharedData::get_webhook_certificate_scheduler_id(),
SharedData::get_database_scheduler_id(), [actor_id = actor_id(this), cert_path = cert_path_](td::Unit) mutable { [actor_id = actor_id(this), cert_path = cert_path_](td::Unit) mutable {
send_closure(actor_id, &WebhookActor::on_ssl_context_created, send_closure(
actor_id, &WebhookActor::on_ssl_context_created,
td::SslCtx::create(cert_path, td::SslCtx::VerifyPeer::On)); td::SslCtx::create(cert_path, td::SslCtx::VerifyPeer::On));
}); });
} }