diff --git a/CMakeLists.txt b/CMakeLists.txt index 45d5df6..41128e7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,7 +6,7 @@ if (POLICY CMP0065) cmake_policy(SET CMP0065 NEW) endif() -project(TelegramBotApi VERSION 6.2 LANGUAGES CXX) +project(TelegramBotApi VERSION 6.3.2 LANGUAGES CXX) if (POLICY CMP0069) option(TELEGRAM_BOT_API_ENABLE_LTO "Use \"ON\" to enable Link Time Optimization.") @@ -85,6 +85,7 @@ set(TELEGRAM_BOT_API_SOURCE telegram-bot-api/HttpStatConnection.cpp telegram-bot-api/Query.cpp telegram-bot-api/Stats.cpp + telegram-bot-api/Watchdog.cpp telegram-bot-api/WebhookActor.cpp telegram-bot-api/Client.h @@ -95,6 +96,7 @@ set(TELEGRAM_BOT_API_SOURCE telegram-bot-api/HttpStatConnection.h telegram-bot-api/Query.h telegram-bot-api/Stats.h + telegram-bot-api/Watchdog.h telegram-bot-api/WebhookActor.h ) diff --git a/build.html b/build.html index 6bb77c8..0ecaa80 100644 --- a/build.html +++ b/build.html @@ -454,7 +454,7 @@ function onOptionsChanged() { pre_text.push('Note that building requires a lot of memory, so you may need to increase allowed per-process memory usage in /etc/login.conf or build from root.'); } if (os_netbsd) { - pre_text.push('Note that the following instruction is for NetBSD 8.0 and default SH shell.'); + pre_text.push('Note that the following instruction is for NetBSD 8+ and default SH shell.'); } var terminal_name = (function () { @@ -586,8 +586,8 @@ function onOptionsChanged() { if (!use_root) { commands.push('su -'); } - commands.push('export PKG_PATH=ftp://ftp.netbsd.org/pub/pkgsrc/packages/NetBSD/i386/8.0_2019Q2/All'); - var packages = 'git gperf cmake openssl gcc5-libs'; + commands.push('export PKG_PATH=http://cdn.netbsd.org/pub/pkgsrc/packages/NetBSD/$(uname -p)/$(uname -r)/All'); + var packages = 'git gperf cmake openssl gcc12-libs mozilla-rootcerts-openssl'; commands.push('pkg_add ' + packages); if (!use_root) { commands.push('exit'); diff --git a/td b/td index d9cfcf8..7eba198 160000 --- a/td +++ b/td @@ -1 +1 @@ -Subproject commit d9cfcf88fe4ad06dae1716ce8f66bbeb7f9491d9 +Subproject commit 7eba19887ad834fd731b6b07b53c2426fe4beb59 diff --git a/telegram-bot-api/Client.cpp b/telegram-bot-api/Client.cpp index 1effe2a..79bd01f 100644 --- a/telegram-bot-api/Client.cpp +++ b/telegram-bot-api/Client.cpp @@ -273,6 +273,13 @@ bool Client::init_methods() { methods_.emplace("unpinallchatmessages", &Client::process_unpin_all_chat_messages_query); methods_.emplace("setchatstickerset", &Client::process_set_chat_sticker_set_query); methods_.emplace("deletechatstickerset", &Client::process_delete_chat_sticker_set_query); + methods_.emplace("getforumtopiciconstickers", &Client::process_get_forum_topic_icon_stickers_query); + methods_.emplace("createforumtopic", &Client::process_create_forum_topic_query); + methods_.emplace("editforumtopic", &Client::process_edit_forum_topic_query); + methods_.emplace("closeforumtopic", &Client::process_close_forum_topic_query); + methods_.emplace("reopenforumtopic", &Client::process_reopen_forum_topic_query); + methods_.emplace("deleteforumtopic", &Client::process_delete_forum_topic_query); + methods_.emplace("unpinallforumtopicmessages", &Client::process_unpin_all_forum_topic_messages_query); methods_.emplace("getchatmember", &Client::process_get_chat_member_query); methods_.emplace("getchatadministrators", &Client::process_get_chat_administrators_query); methods_.emplace("getchatmembercount", &Client::process_get_chat_member_count_query); @@ -341,6 +348,11 @@ bool Client::init_methods() { return true; } +bool Client::is_local_method(Slice method) { + return method == "close" || method == "logout" || method == "getme" || method == "getupdates" || + method == "setwebhook" || method == "deletewebhook" || method == "getwebhookinfo"; +} + class Client::JsonFile final : public Jsonable { public: JsonFile(const td_api::file *file, const Client *client, bool with_path) @@ -406,8 +418,8 @@ class Client::JsonUser final : public Jsonable { if (user_info != nullptr && !user_info->last_name.empty()) { object("last_name", user_info->last_name); } - if (user_info != nullptr && !user_info->username.empty()) { - object("username", user_info->username); + if (user_info != nullptr && !user_info->active_usernames.empty()) { + object("username", user_info->active_usernames[0]); } if (user_info != nullptr && !user_info->language_code.empty()) { object("language_code", user_info->language_code); @@ -727,8 +739,8 @@ class Client::JsonChat final : public Jsonable { if (!user_info->last_name.empty()) { object("last_name", user_info->last_name); } - if (!user_info->username.empty()) { - object("username", user_info->username); + if (!user_info->active_usernames.empty()) { + object("username", user_info->active_usernames[0]); } object("type", "private"); @@ -744,6 +756,13 @@ class Client::JsonChat final : public Jsonable { // end custom properties impl if (is_full_) { + if (!user_info->active_usernames.empty()) { + object("active_usernames", td::json_array(user_info->active_usernames, + [](Slice username) { return td::JsonString(username); })); + } + if (user_info->emoji_status_custom_emoji_id != 0) { + object("emoji_status_custom_emoji_id", td::to_string(user_info->emoji_status_custom_emoji_id)); + } if (!user_info->bio.empty()) { object("bio", user_info->bio); } @@ -787,8 +806,11 @@ class Client::JsonChat final : public Jsonable { auto supergroup_info = client_->get_supergroup_info(chat_info->supergroup_id); CHECK(supergroup_info != nullptr); - if (!supergroup_info->username.empty()) { - object("username", supergroup_info->username); + if (!supergroup_info->active_usernames.empty()) { + object("username", supergroup_info->active_usernames[0]); + } + if (supergroup_info->is_supergroup && supergroup_info->is_forum) { + object("is_forum", td::JsonTrue()); } if (supergroup_info->is_supergroup) { @@ -807,6 +829,10 @@ class Client::JsonChat final : public Jsonable { // end custom properties impl if (is_full_) { + if (!supergroup_info->active_usernames.empty()) { + object("active_usernames", td::json_array(supergroup_info->active_usernames, + [](Slice username) { return td::JsonString(username); })); + } if (!supergroup_info->description.empty()) { object("description", supergroup_info->description); } @@ -1430,6 +1456,49 @@ class Client::JsonPollAnswer final : public Jsonable { const Client *client_; }; +class Client::JsonForumTopicCreated final : public Jsonable { + public: + explicit JsonForumTopicCreated(const td_api::messageForumTopicCreated *forum_topic_created) + : forum_topic_created_(forum_topic_created) { + } + void store(JsonValueScope *scope) const { + auto object = scope->enter_object(); + object("name", forum_topic_created_->name_); + object("icon_color", forum_topic_created_->icon_->color_); + if (forum_topic_created_->icon_->custom_emoji_id_ != 0) { + object("icon_custom_emoji_id", td::to_string(forum_topic_created_->icon_->custom_emoji_id_)); + } + } + + private: + const td_api::messageForumTopicCreated *forum_topic_created_; +}; + +class Client::JsonForumTopicIsClosedToggled final : public Jsonable { + public: + void store(JsonValueScope *scope) const { + auto object = scope->enter_object(); + } +}; + +class Client::JsonForumTopicInfo final : public Jsonable { + public: + explicit JsonForumTopicInfo(const td_api::forumTopicInfo *forum_topic_info) : forum_topic_info_(forum_topic_info) { + } + void store(JsonValueScope *scope) const { + auto object = scope->enter_object(); + object("message_thread_id", as_client_message_id(forum_topic_info_->message_thread_id_)); + object("name", forum_topic_info_->name_); + object("icon_color", forum_topic_info_->icon_->color_); + if (forum_topic_info_->icon_->custom_emoji_id_ != 0) { + object("icon_custom_emoji_id", td::to_string(forum_topic_info_->icon_->custom_emoji_id_)); + } + } + + private: + const td_api::forumTopicInfo *forum_topic_info_; +}; + class Client::JsonAddress final : public Jsonable { public: explicit JsonAddress(const td_api::address *address) : address_(address) { @@ -1846,6 +1915,9 @@ void Client::JsonMessage::store(JsonValueScope *scope) const { if (message_->scheduled_at != 0) { object("scheduled_at", message_->scheduled_at); } + if (message_->message_thread_id != 0) { + object("message_thread_id", as_client_message_id(message_->message_thread_id)); + } if (message_->initial_send_date > 0) { if (message_->initial_sender_user_id != 0) { object("forward_from", JsonUser(message_->initial_sender_user_id, client_)); @@ -1882,9 +1954,6 @@ void Client::JsonMessage::store(JsonValueScope *scope) const { if (message_->media_album_id != 0) { object("media_group_id", td::to_string(message_->media_album_id)); } - if (message_->message_thread_id != 0) { - object("message_thread_id", td::to_string(message_->message_thread_id)); - } switch (message_->content->get_id()) { case td_api::messageText::ID: { auto content = static_cast(message_->content.get()); @@ -2058,6 +2127,22 @@ void Client::JsonMessage::store(JsonValueScope *scope) const { object("migrate_from_chat_id", td::JsonLong(chat_id)); break; } + case td_api::messageForumTopicCreated::ID: { + auto content = static_cast(message_->content.get()); + object("forum_topic_created", JsonForumTopicCreated(content)); + break; + } + case td_api::messageForumTopicEdited::ID: + break; + case td_api::messageForumTopicIsClosedToggled::ID: { + auto content = static_cast(message_->content.get()); + if (content->is_closed_) { + object("forum_topic_closed", JsonForumTopicIsClosedToggled()); + } else { + object("forum_topic_reopened", JsonForumTopicIsClosedToggled()); + } + break; + } case td_api::messagePinMessage::ID: { auto content = static_cast(message_->content.get()); auto message_id = content->message_id_; @@ -2171,6 +2256,9 @@ void Client::JsonMessage::store(JsonValueScope *scope) const { if (!message_->can_be_saved) { object("has_protected_content", td::JsonTrue()); } + if (message_->is_topic_message) { + object("is_topic_message", td::JsonTrue()); + } } class Client::JsonDeletedMessage final : public Jsonable { @@ -3513,6 +3601,55 @@ class Client::TdOnCheckMessageCallback final : public TdQueryCallback { OnSuccess on_success_; }; +template +class Client::TdOnCheckMessageThreadCallback final : public TdQueryCallback { + public: + TdOnCheckMessageThreadCallback(Client *client, int64 chat_id, int64 message_thread_id, int64 reply_to_message_id, + PromisedQueryPtr query, OnSuccess on_success) + : client_(client) + , chat_id_(chat_id) + , message_thread_id_(message_thread_id) + , reply_to_message_id_(reply_to_message_id) + , query_(std::move(query)) + , on_success_(std::move(on_success)) { + } + + void on_result(object_ptr result) final { + if (result->get_id() == td_api::error::ID) { + auto error = move_object_as(result); + if (error->code_ == 429) { + LOG(WARNING) << "Failed to get message thread " << message_thread_id_ << " in " << chat_id_; + } + return fail_query_with_error(std::move(query_), std::move(error), "Message thread not found"); + } + + CHECK(result->get_id() == td_api::message::ID); + auto full_message_id = client_->add_message(move_object_as(result)); + CHECK(full_message_id.chat_id == chat_id_); + CHECK(full_message_id.message_id == message_thread_id_); + + const MessageInfo *message_info = client_->get_message(chat_id_, message_thread_id_); + CHECK(message_info != nullptr); + if (message_info->message_thread_id != message_thread_id_) { + return fail_query_with_error(std::move(query_), 400, "MESSAGE_THREAD_INVALID", "Message thread not found"); + } + if (!message_info->is_topic_message) { + return fail_query_with_error(std::move(query_), 400, "MESSAGE_THREAD_INVALID", + "Message thread is not a forum topic thread"); + } + + on_success_(chat_id_, message_thread_id_, reply_to_message_id_, std::move(query_)); + } + + private: + Client *client_; + int64 chat_id_; + int64 message_thread_id_; + int64 reply_to_message_id_; + PromisedQueryPtr query_; + OnSuccess on_success_; +}; + template class Client::TdOnCheckRemoteFileIdCallback final : public TdQueryCallback { public: @@ -3843,6 +3980,25 @@ class Client::TdOnGetMyDefaultAdministratorRightsCallback final : public TdQuery PromisedQueryPtr query_; }; +class Client::TdOnGetForumTopicInfoCallback final : public TdQueryCallback { + public: + explicit TdOnGetForumTopicInfoCallback(PromisedQueryPtr query) : query_(std::move(query)) { + } + + void on_result(object_ptr result) final { + if (result->get_id() == td_api::error::ID) { + return fail_query_with_error(std::move(query_), move_object_as(result)); + } + + CHECK(result->get_id() == td_api::forumTopicInfo::ID); + auto forum_topic_info = move_object_as(result); + answer_query(JsonForumTopicInfo(forum_topic_info.get()), std::move(query_)); + } + + private: + PromisedQueryPtr query_; +}; + class Client::TdOnGetMenuButtonCallback final : public TdQueryCallback { public: explicit TdOnGetMenuButtonCallback(PromisedQueryPtr query) : query_(std::move(query)) { @@ -4433,7 +4589,7 @@ ServerBotInfo Client::get_bot_info() const { res.token_ = bot_token_; auto user_info = get_user_info(my_id_); if (user_info != nullptr) { - res.username_ = user_info->username; + res.username_ = user_info->editable_username; } else if (!was_authorized_) { res.username_ = ""; } else { @@ -4510,13 +4666,24 @@ void Client::start_up() { }; td::ClientActor::Options options; options.net_query_stats = parameters_->net_query_stats_; - td_client_ = td::create_actor("TdClientActor", td::make_unique(actor_id(this)), - std::move(options)); + td_client_ = td::create_actor_on_scheduler( + "TdClientActor", 0, td::make_unique(actor_id(this)), std::move(options)); } void Client::send(PromisedQueryPtr query) { if (!query->is_internal()) { query->set_stat_actor(stat_actor_); + if (!parameters_->local_mode_ && !is_local_method(query->method())) { + const BotStatActor *stat = stat_actor_.get_actor_unsafe(); + if (stat->get_active_request_count() > 10000) { + LOG(INFO) << "Fail a query, because there are too many active queries: " << *query; + return query->set_retry_after_error(60); + } + if (stat->get_active_file_upload_bytes() > (static_cast(1) << 33) && !query->files().empty()) { + LOG(INFO) << "Fail a query, because there are too many active file uploads: " << *query; + return query->set_retry_after_error(60); + } + } } cmd_queue_.emplace(std::move(query)); loop(); @@ -4720,7 +4887,7 @@ void Client::check_chat_access(int64 chat_id, AccessRights access_rights, const case ChatInfo::Type::Supergroup: { auto supergroup_info = get_supergroup_info(chat_info->supergroup_id); CHECK(supergroup_info != nullptr); - bool is_public = !supergroup_info->username.empty() || supergroup_info->has_location; + bool is_public = !supergroup_info->active_usernames.empty() || supergroup_info->has_location; if (supergroup_info->status->get_id() == td_api::chatMemberStatusBanned::ID) { if (supergroup_info->is_supergroup) { return fail_query(403, "Forbidden: bot was kicked from the supergroup chat", std::move(query)); @@ -4883,6 +5050,30 @@ void Client::check_message(Slice chat_id_str, int64 message_id, bool allow_empty }); } +template +void Client::check_message_thread(int64 chat_id, int64 message_thread_id, int64 reply_to_message_id, + PromisedQueryPtr query, OnSuccess on_success) { + if (message_thread_id <= 0) { + return on_success(chat_id, 0, reply_to_message_id, std::move(query)); + } + + if (reply_to_message_id != 0) { + const MessageInfo *message_info = get_message(chat_id, reply_to_message_id); + CHECK(message_info != nullptr); + if (message_info->message_thread_id != message_thread_id) { + return fail_query_with_error(std::move(query), 400, "MESSAGE_THREAD_INVALID", + "Replied message is not in the specified message thread"); + } + } + if (reply_to_message_id == message_thread_id) { + return on_success(chat_id, message_thread_id, reply_to_message_id, std::move(query)); + } + + send_request(make_object(chat_id, message_thread_id), + td::make_unique>( + this, chat_id, message_thread_id, reply_to_message_id, std::move(query), std::move(on_success))); +} + template void Client::resolve_sticker_set(const td::string &sticker_set_name, PromisedQueryPtr query, OnSuccess on_success) { if (sticker_set_name.empty()) { @@ -5126,27 +5317,23 @@ void Client::on_update_authorization_state() { parameters_->file_expiration_timeout_seconds_)), td::make_unique()); - auto parameters = make_object(); + auto request = make_object(); + request->use_test_dc_ = is_test_dc_; + request->database_directory_ = dir_; + //request->use_file_database_ = false; + //request->use_chat_info_database_ = false; + //request->use_secret_chats_ = false; + request->use_message_database_ = USE_MESSAGE_DATABASE; + request->api_id_ = parameters_->api_id_; + request->api_hash_ = parameters_->api_hash_; + request->system_language_code_ = "en"; + request->device_model_ = "server"; + request->application_version_ = parameters_->version_; + request->enable_storage_optimizer_ = true; + request->ignore_file_names_ = true; - parameters->use_test_dc_ = is_test_dc_; - parameters->database_directory_ = dir_; - parameters->use_file_database_ = false; - parameters->use_chat_info_database_ = false; - parameters->use_secret_chats_ = false; - parameters->use_message_database_ = USE_MESSAGE_DATABASE; - parameters->api_id_ = parameters_->api_id_; - parameters->api_hash_ = parameters_->api_hash_; - parameters->system_language_code_ = "en"; - parameters->device_model_ = "server"; - parameters->application_version_ = parameters_->version_; - parameters->enable_storage_optimizer_ = true; - parameters->ignore_file_names_ = true; - - return send_request(make_object(std::move(parameters)), - td::make_unique(this)); + return send_request(std::move(request), td::make_unique(this)); } - case td_api::authorizationStateWaitEncryptionKey::ID: - return send_request(make_object(), td::make_unique(this)); case td_api::authorizationStateWaitPhoneNumber::ID: send_request(make_object("online", make_object(true)), td::make_unique()); @@ -5171,7 +5358,7 @@ void Client::on_update_authorization_state() { } if (!was_authorized_) { - LOG(WARNING) << "Logged in as @" << user_info->username; + LOG(WARNING) << "Logged in as @" << user_info->editable_username; was_authorized_ = true; td::send_event(parent_, td::Event::raw(static_cast(this))); update_shared_unix_time_difference(); @@ -5283,9 +5470,14 @@ void Client::on_update(object_ptr result) { } case td_api::updateDeleteMessages::ID: { auto update = move_object_as(result); + td::vector> deleted_messages; for (auto message_id : update->message_ids_) { - delete_message(update->chat_id_, message_id, update->from_cache_); + auto deleted_message = delete_message(update->chat_id_, message_id, update->from_cache_); + if (deleted_message != nullptr) { + deleted_messages.push_back(std::move(deleted_message)); + } } + td::Scheduler::instance()->destroy_on_scheduler(get_file_gc_scheduler_id(), deleted_messages); break; } case td_api::updateFile::ID: { @@ -5582,6 +5774,9 @@ void Client::on_closed() { if (webhook_set_query_) { fail_query(http_status_code, description, std::move(webhook_set_query_)); } + if (active_webhook_set_query_) { + fail_query(http_status_code, description, std::move(active_webhook_set_query_)); + } if (!webhook_url_.empty()) { webhook_id_.reset(); } @@ -5652,16 +5847,12 @@ void Client::timeout_expired() { td::int32 Client::get_database_scheduler_id() { // the same scheduler as for database in Td - auto current_scheduler_id = td::Scheduler::instance()->sched_id(); - auto scheduler_count = td::Scheduler::instance()->sched_count(); - return td::min(current_scheduler_id + 1, scheduler_count - 1); + return 1; } td::int32 Client::get_file_gc_scheduler_id() { // the same scheduler as for file GC in Td - auto current_scheduler_id = td::Scheduler::instance()->sched_id(); - auto scheduler_count = td::Scheduler::instance()->sched_count(); - return td::min(current_scheduler_id + 2, scheduler_count - 1); + return 2; } void Client::clear_tqueue() { @@ -5802,7 +5993,7 @@ td::Result> Client::get_inline_ } } if (cur_temp_bot_user_id_ >= 100000) { - return Status::Error(400, "Too much different LoginUrl bot usernames"); + return Status::Error(400, "Too many different LoginUrl bot usernames"); } auto &user_id = bot_user_ids_[bot_username]; if (user_id == 0) { @@ -6281,7 +6472,7 @@ td::Result> Client::get_input_me need_shipping_address, send_phone_number_to_provider, send_email_address_to_provider, is_flexible), title, description, photo_url, photo_size, photo_width, photo_height, payload, provider_token, provider_data, - td::string()); + td::string(), nullptr); } if (is_input_message_content_required) { @@ -6293,7 +6484,7 @@ td::Result> Client::get_input_me td_api::object_ptr Client::get_message_send_options(bool disable_notification, bool protect_content, td_api::object_ptr &&scheduling_state) { - return make_object(disable_notification, false, protect_content, std::move(scheduling_state)); + return make_object(disable_notification, false, protect_content, false, std::move(scheduling_state)); } td::Result>> Client::get_inline_query_results( @@ -6799,12 +6990,14 @@ td::Result> Client::get_chat TRY_RESULT(can_invite_users, get_json_object_bool_field(object, "can_invite_users")); TRY_RESULT(can_restrict_members, get_json_object_bool_field(object, "can_restrict_members")); TRY_RESULT(can_pin_messages, get_json_object_bool_field(object, "can_pin_messages")); + TRY_RESULT(can_manage_topics, get_json_object_bool_field(object, "can_manage_topics")); TRY_RESULT(can_promote_members, get_json_object_bool_field(object, "can_promote_members")); TRY_RESULT(can_manage_video_chats, get_json_object_bool_field(object, "can_manage_video_chats")); TRY_RESULT(is_anonymous, get_json_object_bool_field(object, "is_anonymous")); - return make_object( - can_manage_chat, can_change_info, can_post_messages, can_edit_messages, can_delete_messages, can_invite_users, - can_restrict_members, can_pin_messages, can_promote_members, can_manage_video_chats, is_anonymous); + return make_object(can_manage_chat, can_change_info, can_post_messages, + can_edit_messages, can_delete_messages, can_invite_users, + can_restrict_members, can_pin_messages, can_manage_topics, + can_promote_members, can_manage_video_chats, is_anonymous); } td::Result> Client::get_chat_administrator_rights( @@ -7228,6 +7421,7 @@ td::Result> Client::get_chat_permiss auto can_change_info = false; auto can_invite_users = false; auto can_pin_messages = false; + auto can_manage_topics = false; if (query->has_arg("permissions")) { allow_legacy = false; @@ -7253,11 +7447,16 @@ td::Result> Client::get_chat_permiss TRY_RESULT_ASSIGN(can_change_info, get_json_object_bool_field(object, "can_change_info")); TRY_RESULT_ASSIGN(can_invite_users, get_json_object_bool_field(object, "can_invite_users")); TRY_RESULT_ASSIGN(can_pin_messages, get_json_object_bool_field(object, "can_pin_messages")); + if (has_json_object_field(object, "can_manage_topics")) { + TRY_RESULT_ASSIGN(can_manage_topics, get_json_object_bool_field(object, "can_manage_topics")); + } else { + can_manage_topics = can_pin_messages; + } return Status::OK(); }(); if (status.is_error()) { - return Status::Error(400, PSLICE() << "Can't parse chat permissions: " << status.error().message()); + return Status::Error(400, PSLICE() << "Can't parse chat permissions: " << status.message()); } } else if (allow_legacy) { allow_legacy = false; @@ -7273,6 +7472,7 @@ td::Result> Client::get_chat_permiss can_change_info = true; can_invite_users = true; can_pin_messages = true; + can_manage_topics = true; } else if (query->has_arg("can_send_messages") || query->has_arg("can_send_media_messages") || query->has_arg("can_send_other_messages") || query->has_arg("can_add_web_page_previews")) { allow_legacy = true; @@ -7284,14 +7484,14 @@ td::Result> Client::get_chat_permiss } return make_object(can_send_messages, can_send_media_messages, can_send_polls, can_send_other_messages, can_add_web_page_previews, can_change_info, - can_invite_users, can_pin_messages); + can_invite_users, can_pin_messages, can_manage_topics); } td::Result> Client::get_input_media(const Query *query, JsonValue &&input_media, bool for_album) const { if (input_media.type() != JsonValue::Type::Object) { - return Status::Error(400, "expected an Object"); + return Status::Error("expected an Object"); } auto &object = input_media.get_object(); @@ -7306,7 +7506,7 @@ td::Result> Client::get_input_me auto input_file = get_input_file(query, Slice(), media, false); if (input_file == nullptr) { - return Status::Error(400, "media not found"); + return Status::Error("media not found"); } TRY_RESULT(thumbnail, get_json_object_string_field(object, "thumb")); @@ -7335,7 +7535,7 @@ td::Result> Client::get_input_me std::move(caption), ttl); } if (for_album && type == "animation") { - return Status::Error(400, PSLICE() << "type \"" << type << "\" can't be used in sendMediaGroup"); + return Status::Error(PSLICE() << "type \"" << type << "\" can't be used in sendMediaGroup"); } if (type == "animation") { TRY_RESULT(width, get_json_object_int_field(object, "width")); @@ -7361,12 +7561,11 @@ td::Result> Client::get_input_me disable_content_type_detection || for_album, std::move(caption)); } - return Status::Error(400, PSLICE() << "type \"" << type << "\" is unsupported"); + return Status::Error(PSLICE() << "type \"" << type << "\" is unsupported"); } td::Result> Client::get_input_media(const Query *query, - Slice field_name, - bool for_album) const { + Slice field_name) const { TRY_RESULT(media, get_required_string_arg(query, field_name)); LOG(INFO) << "Parsing JSON object: " << media; @@ -7376,7 +7575,7 @@ td::Result> Client::get_input_me return Status::Error(400, "Can't parse input media JSON object"); } - auto r_input_message_content = get_input_media(query, r_value.move_as_ok(), for_album); + auto r_input_message_content = get_input_media(query, r_value.move_as_ok(), false); if (r_input_message_content.is_error()) { return Status::Error(400, PSLICE() << "Can't parse InputMedia: " << r_input_message_content.error().message()); } @@ -7405,13 +7604,17 @@ td::Result>> Client:: td::vector> contents; for (auto &input_media : value.get_array()) { - TRY_RESULT(input_message_content, get_input_media(query, std::move(input_media), true)); - contents.push_back(std::move(input_message_content)); + auto r_input_message_content = get_input_media(query, std::move(input_media), true); + if (r_input_message_content.is_error()) { + return Status::Error(400, PSLICE() << "Can't parse InputMedia: " << r_input_message_content.error().message()); + } + contents.push_back(r_input_message_content.move_as_ok()); } return std::move(contents); } -td::Result> Client::get_input_message_invoice(const Query *query) { +td::Result> Client::get_input_message_invoice( + const Query *query) const { TRY_RESULT(title, get_required_string_arg(query, "title")); TRY_RESULT(description, get_required_string_arg(query, "description")); TRY_RESULT(payload, get_required_string_arg(query, "payload")); @@ -7467,13 +7670,18 @@ td::Result> Client::get_input_me auto send_email_address_to_provider = to_bool(query->arg("send_email_to_provider")); auto is_flexible = to_bool(query->arg("is_flexible")); + object_ptr extended_media; + if (!query->arg("extended_media").empty()) { + TRY_RESULT_ASSIGN(extended_media, get_input_media(query, "extended_media")); + } + return make_object( make_object(currency.str(), std::move(prices), max_tip_amount, std::move(suggested_tip_amounts), td::string(), false, need_name, need_phone_number, need_email_address, need_shipping_address, send_phone_number_to_provider, send_email_address_to_provider, is_flexible), title.str(), description.str(), photo_url.str(), photo_size, photo_width, photo_height, payload.str(), - provider_token.str(), provider_data.str(), start_parameter.str()); + provider_token.str(), provider_data.str(), start_parameter.str(), std::move(extended_media)); } td::Result> Client::get_poll_options(const Query *query) { @@ -7748,7 +7956,7 @@ void Client::on_message_send_failed(int64 chat_id, int64 old_message_id, int64 n auto &query = *pending_send_message_queries_[query_id]; if (query.is_multisend) { if (query.error == nullptr || query.error->message_ == "Group send failed") { - if (error->code_ == 429 || error->message_ == "Group send failed") { + if (error->code_ == 429 || error->code_ >= 500 || error->message_ == "Group send failed") { query.error = std::move(error); } else { auto pos = (query.total_message_count - query.awaited_message_count + 1); @@ -7805,11 +8013,8 @@ void Client::on_cmd(PromisedQueryPtr query) { } } - if (logging_out_) { - return fail_query(LOGGING_OUT_ERROR_CODE, get_logging_out_error_description(), std::move(query)); - } - if (closing_) { - return fail_query(CLOSING_ERROR_CODE, CLOSING_ERROR_DESCRIPTION, std::move(query)); + if (logging_out_ || closing_) { + return fail_query_closing(std::move(query)); } CHECK(was_authorized_); @@ -8212,6 +8417,7 @@ td::Status Client::process_forward_message_query(PromisedQueryPtr &query) { td::Status Client::process_send_media_group_query(PromisedQueryPtr &query) { auto chat_id = query->arg("chat_id"); + auto message_thread_id = get_message_id(query.get(), "message_thread_id"); auto reply_to_message_id = get_message_id(query.get(), "reply_to_message_id"); auto allow_sending_without_reply = to_bool(query->arg("allow_sending_without_reply")); auto disable_notification = to_bool(query->arg("disable_notification")); @@ -8223,23 +8429,30 @@ td::Status Client::process_send_media_group_query(PromisedQueryPtr &query) { resolve_reply_markup_bot_usernames( std::move(reply_markup), std::move(query), - [this, chat_id = chat_id.str(), reply_to_message_id, allow_sending_without_reply, disable_notification, + [this, chat_id = chat_id.str(), message_thread_id, reply_to_message_id, allow_sending_without_reply, disable_notification, protect_content, input_message_contents = std::move(input_message_contents), send_at = std::move(send_at)]( object_ptr reply_markup, PromisedQueryPtr query) mutable { - auto on_success = [this, disable_notification, protect_content, + auto on_success = [this, message_thread_id, disable_notification, protect_content, input_message_contents = std::move(input_message_contents), reply_markup = std::move(reply_markup), send_at = std::move(send_at)](int64 chat_id, int64 reply_to_message_id, PromisedQueryPtr query) mutable { - auto it = yet_unsent_message_count_.find(chat_id); - if (it != yet_unsent_message_count_.end() && it->second > MAX_CONCURRENTLY_SENT_CHAT_MESSAGES) { - return query->set_retry_after_error(60); - } + auto on_message_thread_checked = + [this, disable_notification, protect_content, input_message_contents = std::move(input_message_contents), + reply_markup = std::move(reply_markup), send_at = std::move(send_at)](int64 chat_id, int64 message_thread_id, + int64 reply_to_message_id, PromisedQueryPtr query) mutable { + auto it = yet_unsent_message_count_.find(chat_id); + if (it != yet_unsent_message_count_.end() && it->second > MAX_CONCURRENTLY_SENT_CHAT_MESSAGES) { + return query->set_retry_after_error(60); + } - send_request( - make_object(chat_id, 0, reply_to_message_id, - get_message_send_options(disable_notification, protect_content, std::move(send_at)), - std::move(input_message_contents), false), - td::make_unique(this, std::move(query))); + send_request(make_object( + chat_id, message_thread_id, reply_to_message_id, + get_message_send_options(disable_notification, protect_content, std::move(send_at)), + std::move(input_message_contents), false), + td::make_unique(this, std::move(query))); + }; + check_message_thread(chat_id, message_thread_id, reply_to_message_id, std::move(query), + std::move(on_message_thread_checked)); }; check_message(chat_id, reply_to_message_id, reply_to_message_id <= 0 || allow_sending_without_reply, AccessRights::Write, "replied message", std::move(query), std::move(on_success)); @@ -8344,7 +8557,7 @@ td::Status Client::process_edit_message_media_query(PromisedQueryPtr &query) { auto message_id = get_message_id(query.get()); TRY_RESULT(reply_markup, get_reply_markup(query.get())); CHECK_USER_REPLY_MARKUP(); - TRY_RESULT(input_media, get_input_media(query.get(), "media", false)); + TRY_RESULT(input_media, get_input_media(query.get(), "media")); if (chat_id.empty() && message_id == 0) { TRY_RESULT(inline_message_id, get_inline_message_id(query.get())); @@ -8849,6 +9062,89 @@ td::Status Client::process_delete_chat_sticker_set_query(PromisedQueryPtr &query return Status::OK(); } +td::Status Client::process_get_forum_topic_icon_stickers_query(PromisedQueryPtr &query) { + send_request(make_object(), + td::make_unique(this, std::move(query))); + return Status::OK(); +} + +td::Status Client::process_create_forum_topic_query(PromisedQueryPtr &query) { + auto chat_id = query->arg("chat_id"); + auto name = query->arg("name"); + int32 icon_color = get_integer_arg(query.get(), "icon_color", 0); + auto icon_custom_emoji_id = td::to_integer(query->arg("icon_custom_emoji_id")); + + check_chat(chat_id, AccessRights::Write, std::move(query), + [this, name = name.str(), icon_color, icon_custom_emoji_id](int64 chat_id, PromisedQueryPtr query) { + send_request(make_object( + chat_id, name, make_object(icon_color, icon_custom_emoji_id)), + td::make_unique(std::move(query))); + }); + return Status::OK(); +} + +td::Status Client::process_edit_forum_topic_query(PromisedQueryPtr &query) { + auto chat_id = query->arg("chat_id"); + auto message_thread_id = get_message_id(query.get(), "message_thread_id"); + auto name = query->arg("name"); + auto icon_custom_emoji_id = td::to_integer(query->arg("icon_custom_emoji_id")); + + check_chat(chat_id, AccessRights::Write, std::move(query), + [this, message_thread_id, name = name.str(), icon_custom_emoji_id](int64 chat_id, PromisedQueryPtr query) { + send_request(make_object(chat_id, message_thread_id, name, icon_custom_emoji_id), + td::make_unique(std::move(query))); + }); + return Status::OK(); +} + +td::Status Client::process_close_forum_topic_query(PromisedQueryPtr &query) { + auto chat_id = query->arg("chat_id"); + auto message_thread_id = get_message_id(query.get(), "message_thread_id"); + + check_chat(chat_id, AccessRights::Write, std::move(query), + [this, message_thread_id](int64 chat_id, PromisedQueryPtr query) { + send_request(make_object(chat_id, message_thread_id, true), + td::make_unique(std::move(query))); + }); + return Status::OK(); +} + +td::Status Client::process_reopen_forum_topic_query(PromisedQueryPtr &query) { + auto chat_id = query->arg("chat_id"); + auto message_thread_id = get_message_id(query.get(), "message_thread_id"); + + check_chat(chat_id, AccessRights::Write, std::move(query), + [this, message_thread_id](int64 chat_id, PromisedQueryPtr query) { + send_request(make_object(chat_id, message_thread_id, false), + td::make_unique(std::move(query))); + }); + return Status::OK(); +} + +td::Status Client::process_delete_forum_topic_query(PromisedQueryPtr &query) { + auto chat_id = query->arg("chat_id"); + auto message_thread_id = get_message_id(query.get(), "message_thread_id"); + + check_chat(chat_id, AccessRights::Write, std::move(query), + [this, message_thread_id](int64 chat_id, PromisedQueryPtr query) { + send_request(make_object(chat_id, message_thread_id), + td::make_unique(std::move(query))); + }); + return Status::OK(); +} + +td::Status Client::process_unpin_all_forum_topic_messages_query(PromisedQueryPtr &query) { + auto chat_id = query->arg("chat_id"); + auto message_thread_id = get_message_id(query.get(), "message_thread_id"); + + check_chat(chat_id, AccessRights::Write, std::move(query), + [this, message_thread_id](int64 chat_id, PromisedQueryPtr query) { + send_request(make_object(chat_id, message_thread_id), + td::make_unique(std::move(query))); + }); + return Status::OK(); +} + td::Status Client::process_get_chat_member_query(PromisedQueryPtr &query) { auto chat_id = query->arg("chat_id"); TRY_RESULT(user_id, get_user_id(query.get())); @@ -8963,15 +9259,17 @@ td::Status Client::process_promote_chat_member_query(PromisedQueryPtr &query) { auto can_invite_users = to_bool(query->arg("can_invite_users")); auto can_restrict_members = to_bool(query->arg("can_restrict_members")); auto can_pin_messages = to_bool(query->arg("can_pin_messages")); + auto can_manage_topics = to_bool(query->arg("can_manage_topics")); auto can_promote_members = to_bool(query->arg("can_promote_members")); auto can_manage_video_chats = to_bool(query->arg("can_manage_voice_chats")) || to_bool(query->arg("can_manage_video_chats")); auto is_anonymous = to_bool(query->arg("is_anonymous")); auto status = make_object( td::string(), true, - make_object( - can_manage_chat, can_change_info, can_post_messages, can_edit_messages, can_delete_messages, can_invite_users, - can_restrict_members, can_pin_messages, can_promote_members, can_manage_video_chats, is_anonymous)); + make_object(can_manage_chat, can_change_info, can_post_messages, + can_edit_messages, can_delete_messages, can_invite_users, + can_restrict_members, can_pin_messages, can_manage_topics, + can_promote_members, can_manage_video_chats, is_anonymous)); check_chat(chat_id, AccessRights::Write, std::move(query), [this, user_id, status = std::move(status)](int64 chat_id, PromisedQueryPtr query) mutable { auto chat_info = get_chat(chat_id); @@ -9077,6 +9375,7 @@ td::Status Client::process_restrict_chat_member_query(PromisedQueryPtr &query) { permissions->can_change_info_ = old_permissions->can_change_info_; permissions->can_invite_users_ = old_permissions->can_invite_users_; permissions->can_pin_messages_ = old_permissions->can_pin_messages_; + permissions->can_manage_topics_ = old_permissions->can_manage_topics_; } send_request(make_object( @@ -9353,7 +9652,7 @@ td::Status Client::process_answer_custom_query_query(PromisedQueryPtr &query) { } td::Status Client::process_get_updates_query(PromisedQueryPtr &query) { - if (!webhook_url_.empty() || webhook_set_query_) { + if (!webhook_url_.empty() || webhook_set_query_ || active_webhook_set_query_) { fail_query_conflict( "Conflict: can't use getUpdates method while webhook is active; use deleteWebhook to delete the webhook first", std::move(query)); @@ -9369,6 +9668,9 @@ td::Status Client::process_get_updates_query(PromisedQueryPtr &query) { if (offset == previous_get_updates_offset_ && timeout < 3 && now < previous_get_updates_start_time_ + 3.0) { timeout = 3; } + if (offset == previous_get_updates_offset_ && now < previous_get_updates_start_time_ + 0.5) { + limit = 1; + } previous_get_updates_offset_ = offset; previous_get_updates_start_time_ = now; do_get_updates(offset, limit, timeout, std::move(query)); @@ -9404,6 +9706,9 @@ td::Status Client::process_set_webhook_query(PromisedQueryPtr &query) { if (webhook_set_query_) { // already updating webhook. Cancel previous request fail_query_conflict("Conflict: terminated by other setWebhook", std::move(webhook_set_query_)); + } else if (active_webhook_set_query_) { + query->set_retry_after_error(1); + return Status::OK(); } else if (webhook_url_ == new_url && !has_webhook_certificate_ && !new_has_certificate && new_max_connections == webhook_max_connections_ && new_fix_ip_address == webhook_fix_ip_address_ && new_secret_token == webhook_secret_token_ && @@ -9438,6 +9743,7 @@ td::Status Client::process_set_webhook_query(PromisedQueryPtr &query) { // wait for webhook_close callback webhook_query_type_ = WebhookQueryType::Cancel; + CHECK(!active_webhook_set_query_); webhook_set_query_ = std::move(query); return Status::OK(); } @@ -10108,6 +10414,9 @@ bool Client::get_webhook_fix_ip_address(const Query *query) { void Client::do_set_webhook(PromisedQueryPtr query, bool was_deleted) { CHECK(webhook_url_.empty()); + if (logging_out_ || closing_) { + return fail_query_closing(std::move(query)); + } if (to_bool(query->arg("drop_pending_updates"))) { clear_tqueue(); } @@ -10128,53 +10437,88 @@ void Client::do_set_webhook(PromisedQueryPtr query, bool was_deleted) { return fail_query(400, "Bad Request: secret token contains unallowed characters", std::move(query)); } - has_webhook_certificate_ = false; - auto *cert_file_ptr = get_webhook_certificate(query.get()); - if (cert_file_ptr != nullptr) { - auto size = cert_file_ptr->size; - if (size > MAX_CERTIFICATE_FILE_SIZE) { - return fail_query(400, PSLICE() << "Bad Request: certificate size is too big (" << size << " bytes)", - std::move(query)); - } - auto from_path = cert_file_ptr->temp_file_name; - auto to_path = get_webhook_certificate_path(); - auto status = td::copy_file(from_path, to_path, size); - if (status.is_error()) { - return fail_query(500, "Internal Server Error: failed to save certificate", std::move(query)); - } - has_webhook_certificate_ = true; - } else if (query->is_internal() && query->arg("certificate") == "previous") { - has_webhook_certificate_ = true; + if (active_webhook_set_query_) { + // shouldn't happen, unless the active setWebhook request took more than 1 second + return query->set_retry_after_error(1); } - webhook_url_ = new_url.str(); - webhook_set_time_ = td::Time::now(); - webhook_max_connections_ = get_webhook_max_connections(query.get()); - webhook_secret_token_ = secret_token.str(); - webhook_ip_address_ = query->arg("ip_address").str(); - webhook_fix_ip_address_ = get_webhook_fix_ip_address(query.get()); - last_webhook_error_date_ = 0; - last_webhook_error_ = Status::OK(); - - update_allowed_update_types(query.get()); - - LOG(WARNING) << "Create " << (has_webhook_certificate_ ? "" : "not ") << "self signed webhook: " << url.ok(); - auto webhook_actor_name = PSTRING() << "Webhook " << url.ok(); - webhook_id_ = td::create_actor( - webhook_actor_name, actor_shared(this, webhook_generation_), tqueue_id_, url.move_as_ok(), - has_webhook_certificate_ ? get_webhook_certificate_path() : "", webhook_max_connections_, query->is_internal(), - webhook_ip_address_, webhook_fix_ip_address_, webhook_secret_token_, parameters_); - // wait for webhook verified or webhook callback - webhook_query_type_ = WebhookQueryType::Verify; - webhook_set_query_ = std::move(query); + CHECK(!has_webhook_certificate_); + if (query->is_internal()) { + has_webhook_certificate_ = query->arg("certificate") == "previous"; + } else { + auto *cert_file_ptr = get_webhook_certificate(query.get()); + if (cert_file_ptr != nullptr) { + auto size = cert_file_ptr->size; + if (size > MAX_CERTIFICATE_FILE_SIZE) { + return fail_query(400, PSLICE() << "Bad Request: certificate size is too big (" << size << " bytes)", + std::move(query)); + } + CHECK(!webhook_set_query_); + active_webhook_set_query_ = std::move(query); + td::Scheduler::instance()->run_on_scheduler( + get_database_scheduler_id(), [actor_id = actor_id(this), from_path = cert_file_ptr->temp_file_name, + to_path = get_webhook_certificate_path(), size](td::Unit) mutable { + LOG(INFO) << "Copy certificate to " << to_path; + auto status = td::copy_file(from_path, to_path, size); + send_closure(actor_id, &Client::on_webhook_certificate_copied, std::move(status)); + }); + return; + } + } + finish_set_webhook(std::move(query)); } else { answer_query(td::JsonTrue(), std::move(query), was_deleted ? Slice("Webhook was deleted") : Slice("Webhook is already deleted")); } } +void Client::on_webhook_certificate_copied(Status status) { + CHECK(active_webhook_set_query_); + if (status.is_error()) { + return fail_query(500, "Internal Server Error: failed to save certificate", std::move(active_webhook_set_query_)); + } + has_webhook_certificate_ = true; + finish_set_webhook(std::move(active_webhook_set_query_)); +} + +void Client::finish_set_webhook(PromisedQueryPtr query) { + CHECK(!active_webhook_set_query_); + CHECK(!webhook_set_query_); + CHECK(webhook_url_.empty()); + if (logging_out_ || closing_) { + return fail_query_closing(std::move(query)); + } + Slice new_url = query->arg("url"); + CHECK(!new_url.empty()); + webhook_url_ = new_url.str(); + webhook_set_time_ = td::Time::now(); + webhook_max_connections_ = get_webhook_max_connections(query.get()); + webhook_secret_token_ = query->arg("secret_token").str(); + webhook_ip_address_ = query->arg("ip_address").str(); + webhook_fix_ip_address_ = get_webhook_fix_ip_address(query.get()); + last_webhook_error_date_ = 0; + last_webhook_error_ = Status::OK(); + + update_allowed_update_types(query.get()); + + auto url = td::parse_url(new_url, td::HttpUrl::Protocol::Https); + CHECK(url.is_ok()); + + LOG(WARNING) << "Create " << (has_webhook_certificate_ ? "self-signed " : "") << "webhook: " << new_url; + auto webhook_actor_name = PSTRING() << "Webhook " << url.ok(); + webhook_id_ = td::create_actor( + webhook_actor_name, actor_shared(this, webhook_generation_), tqueue_id_, url.move_as_ok(), + has_webhook_certificate_ ? get_webhook_certificate_path() : td::string(), webhook_max_connections_, + query->is_internal(), webhook_ip_address_, webhook_fix_ip_address_, webhook_secret_token_, parameters_); + // wait for webhook verified or webhook callback + webhook_query_type_ = WebhookQueryType::Verify; + CHECK(!active_webhook_set_query_); + webhook_set_query_ = std::move(query); +} + void Client::do_send_message(object_ptr input_message_content, PromisedQueryPtr query) { auto chat_id = query->arg("chat_id"); + auto message_thread_id = get_message_id(query.get(), "message_thread_id"); auto reply_to_message_id = get_message_id(query.get(), "reply_to_message_id"); auto allow_sending_without_reply = to_bool(query->arg("allow_sending_without_reply")); auto disable_notification = to_bool(query->arg("disable_notification")); @@ -10196,21 +10540,30 @@ void Client::do_send_message(object_ptr input_messa resolve_reply_markup_bot_usernames( std::move(reply_markup), std::move(query), - [this, chat_id = chat_id.str(), reply_to_message_id, allow_sending_without_reply, disable_notification, + [this, chat_id = chat_id.str(), message_thread_id, reply_to_message_id, allow_sending_without_reply, disable_notification, protect_content, input_message_content = std::move(input_message_content), send_at = std::move(send_at)]( object_ptr reply_markup, PromisedQueryPtr query) mutable { - auto on_success = [this, disable_notification, protect_content, + auto on_success = [this, message_thread_id, disable_notification, protect_content, input_message_content = std::move(input_message_content), reply_markup = std::move(reply_markup), send_at = std::move(send_at)](int64 chat_id, int64 reply_to_message_id, PromisedQueryPtr query) mutable { - auto it = yet_unsent_message_count_.find(chat_id); - if (it != yet_unsent_message_count_.end() && it->second > MAX_CONCURRENTLY_SENT_CHAT_MESSAGES) { - return query->set_retry_after_error(60); - } - send_request(make_object(chat_id, 0, reply_to_message_id, - get_message_send_options(disable_notification, protect_content, std::move(send_at)), - std::move(reply_markup), std::move(input_message_content)), - td::make_unique(this, std::move(query))); + auto on_message_thread_checked = + [this, disable_notification, protect_content, input_message_content = std::move(input_message_content), + reply_markup = std::move(reply_markup), send_at = std::move(send_at)](int64 chat_id, int64 message_thread_id, + int64 reply_to_message_id, PromisedQueryPtr query) mutable { + auto it = yet_unsent_message_count_.find(chat_id); + if (it != yet_unsent_message_count_.end() && it->second > MAX_CONCURRENTLY_SENT_CHAT_MESSAGES) { + return query->set_retry_after_error(60); + } + + send_request( + make_object(chat_id, message_thread_id, reply_to_message_id, + get_message_send_options(disable_notification, protect_content, std::move(send_at)), + std::move(reply_markup), std::move(input_message_content)), + td::make_unique(this, std::move(query))); + }; + check_message_thread(chat_id, message_thread_id, reply_to_message_id, std::move(query), + std::move(on_message_thread_checked)); }; check_message(chat_id, reply_to_message_id, reply_to_message_id <= 0 || allow_sending_without_reply, AccessRights::Write, "replied message", std::move(query), std::move(on_success)); @@ -10279,6 +10632,16 @@ void Client::fail_query_conflict(Slice message, PromisedQueryPtr &&query) { } } +void Client::fail_query_closing(PromisedQueryPtr &&query) const { + if (logging_out_) { + return fail_query(LOGGING_OUT_ERROR_CODE, get_logging_out_error_description(), std::move(query)); + } + if (closing_) { + return fail_query(CLOSING_ERROR_CODE, CLOSING_ERROR_DESCRIPTION, std::move(query)); + } + UNREACHABLE(); +} + class Client::JsonUpdates final : public Jsonable { public: explicit JsonUpdates(td::Span updates) : updates_(updates) { @@ -10417,8 +10780,15 @@ void Client::long_poll_wakeup(bool force_flag) { void Client::add_user(UserInfo *user_info, object_ptr &&user) { user_info->first_name = std::move(user->first_name_); user_info->last_name = std::move(user->last_name_); - user_info->username = std::move(user->username_); + if (user->usernames_ == nullptr) { + user_info->active_usernames.clear(); + user_info->editable_username.clear(); + } else { + user_info->active_usernames = std::move(user->usernames_->active_usernames_); + user_info->editable_username = std::move(user->usernames_->editable_username_); + } user_info->language_code = std::move(user->language_code_); + user_info->emoji_status_custom_emoji_id = user->emoji_status_ != nullptr ? user->emoji_status_->custom_emoji_id_ : 0; // start custom properties user_info->is_verified = user->is_verified_; @@ -10455,19 +10825,15 @@ void Client::add_user(UserInfo *user_info, object_ptr &&user) { } Client::UserInfo *Client::add_user_info(int64 user_id) { - auto emplace_result = users_.emplace(user_id, nullptr); - auto &user_info = emplace_result.first->second; - if (emplace_result.second) { + auto &user_info = users_[user_id]; + if (user_info == nullptr) { user_info = td::make_unique(); - } else { - CHECK(user_info != nullptr); } return user_info.get(); } const Client::UserInfo *Client::get_user_info(int64 user_id) const { - auto it = users_.find(user_id); - return it == users_.end() ? nullptr : it->second.get(); + return users_.get_pointer(user_id); } void Client::set_user_photo(int64 user_id, object_ptr &&photo) { @@ -10503,19 +10869,15 @@ void Client::add_group(GroupInfo *group_info, object_ptr &&g } Client::GroupInfo *Client::add_group_info(int64 group_id) { - auto emplace_result = groups_.emplace(group_id, nullptr); - auto &group_info = emplace_result.first->second; - if (emplace_result.second) { + auto &group_info = groups_[group_id]; + if (group_info == nullptr) { group_info = td::make_unique(); - } else { - CHECK(group_info != nullptr); } return group_info.get(); } const Client::GroupInfo *Client::get_group_info(int64 group_id) const { - auto it = groups_.find(group_id); - return it == groups_.end() ? nullptr : it->second.get(); + return groups_.get_pointer(group_id); } void Client::set_group_photo(int64 group_id, object_ptr &&photo) { @@ -10531,10 +10893,17 @@ void Client::set_group_invite_link(int64 group_id, td::string &&invite_link) { } void Client::add_supergroup(SupergroupInfo *supergroup_info, object_ptr &&supergroup) { - supergroup_info->username = std::move(supergroup->username_); + if (supergroup->usernames_ == nullptr) { + supergroup_info->active_usernames.clear(); + supergroup_info->editable_username.clear(); + } else { + supergroup_info->active_usernames = std::move(supergroup->usernames_->active_usernames_); + supergroup_info->editable_username = std::move(supergroup->usernames_->editable_username_); + } supergroup_info->date = supergroup->date_; supergroup_info->status = std::move(supergroup->status_); supergroup_info->is_supergroup = !supergroup->is_channel_; + supergroup_info->is_forum = supergroup->is_forum_; supergroup_info->has_location = supergroup->has_location_; supergroup_info->join_to_send_messages = supergroup->join_to_send_messages_; supergroup_info->join_by_request = supergroup->join_by_request_; @@ -10578,39 +10947,27 @@ void Client::set_supergroup_location(int64 supergroup_id, object_ptrsecond; - if (emplace_result.second) { + auto &supergroup_info = supergroups_[supergroup_id]; + if (supergroup_info == nullptr) { supergroup_info = td::make_unique(); - } else { - CHECK(supergroup_info != nullptr); } return supergroup_info.get(); } const Client::SupergroupInfo *Client::get_supergroup_info(int64 supergroup_id) const { - auto it = supergroups_.find(supergroup_id); - return it == supergroups_.end() ? nullptr : it->second.get(); + return supergroups_.get_pointer(supergroup_id); } Client::ChatInfo *Client::add_chat(int64 chat_id) { - LOG(DEBUG) << "Update chat " << chat_id; - auto emplace_result = chats_.emplace(chat_id, nullptr); - auto &chat_info = emplace_result.first->second; - if (emplace_result.second) { + auto &chat_info = chats_[chat_id]; + if (chat_info == nullptr) { chat_info = td::make_unique(); - } else { - CHECK(chat_info != nullptr); } return chat_info.get(); } const Client::ChatInfo *Client::get_chat(int64 chat_id) const { - auto it = chats_.find(chat_id); - if (it == chats_.end()) { - return nullptr; - } - return it->second.get(); + return chats_.get_pointer(chat_id); } Client::ChatType Client::get_chat_type(int64 chat_id) const { @@ -10668,7 +11025,7 @@ td::string Client::get_chat_description(int64 chat_id) const { } return PSTRING() << (supergroup_info->is_supergroup ? "supergroup" : "channel") << " chat " << chat_id << ", chat status = " << to_string(supergroup_info->status) - << ", username = " << supergroup_info->username; + << ", usernames = " << supergroup_info->active_usernames; } case ChatInfo::Type::Unknown: return PSTRING() << "unknown chat " << chat_id; @@ -10754,6 +11111,9 @@ void Client::json_store_administrator_rights(td::JsonObjectScope &object, const if (chat_type == ChatType::Group || chat_type == ChatType::Supergroup) { object("can_pin_messages", td::JsonBool(rights->can_pin_messages_)); } + if (chat_type == ChatType::Supergroup) { + object("can_manage_topics", td::JsonBool(rights->can_manage_topics_)); + } object("can_promote_members", td::JsonBool(rights->can_promote_members_)); object("can_manage_video_chats", td::JsonBool(rights->can_manage_video_chats_)); object("is_anonymous", td::JsonBool(rights->is_anonymous_)); @@ -10768,6 +11128,7 @@ void Client::json_store_permissions(td::JsonObjectScope &object, const td_api::c object("can_change_info", td::JsonBool(permissions->can_change_info_)); object("can_invite_users", td::JsonBool(permissions->can_invite_users_)); object("can_pin_messages", td::JsonBool(permissions->can_pin_messages_)); + object("can_manage_topics", td::JsonBool(permissions->can_manage_topics_)); } void Client::json_store_user_status(td::JsonObjectScope &object, const td_api::UserStatus *userStatus) { @@ -11152,6 +11513,8 @@ bool Client::need_skip_update_message(int64 chat_id, const object_ptr 0; } -Client::Slice Client::get_sticker_set_name(int64 sticker_set_id) const { - auto it = sticker_set_names_.find(sticker_set_id); - if (it == sticker_set_names_.end()) { - return Slice(); - } - return it->second; +td::string Client::get_sticker_set_name(int64 sticker_set_id) const { + return sticker_set_names_.get(sticker_set_id); } void Client::process_new_message_queue(int64 chat_id) { @@ -11511,12 +11872,36 @@ void Client::process_new_message_queue(int64 chat_id) { } int32 message_date = message->edit_date_ == 0 ? message->date_ : message->edit_date_; + if (delayed_update_count_ > 0 && (update_type != delayed_update_type_ || chat_id != delayed_chat_id_)) { + if (delayed_update_count_ == 1) { + LOG(ERROR) << "Receive very old update " << get_update_type_name(delayed_update_type_) << " sent at " + << delayed_min_date_ << " in chat " << delayed_chat_id_ << " with a delay of " << delayed_max_time_ + << " seconds"; + } else { + LOG(ERROR) << "Receive " << delayed_update_count_ << " very old updates " + << get_update_type_name(delayed_update_type_) << " sent from " << delayed_min_date_ << " to " + << delayed_max_date_ << " in chat " << delayed_chat_id_ << " with a delay up to " + << delayed_max_time_ << " seconds"; + } + delayed_update_count_ = 0; + } auto now = get_unix_time(); auto update_delay_time = now - td::max(message_date, parameters_->shared_data_->get_unix_time(webhook_set_time_)); const auto UPDATE_DELAY_WARNING_TIME = 10 * 60; - LOG_IF(ERROR, update_delay_time > UPDATE_DELAY_WARNING_TIME) - << "Receive very old update " << get_update_type_name(update_type) << " sent at " << message_date << " to chat " - << chat_id << " with a delay of " << update_delay_time << " seconds: " << to_string(message); + if (update_delay_time > UPDATE_DELAY_WARNING_TIME && message_date > last_synchronization_error_date_ + 60) { + if (delayed_update_count_ == 0) { + delayed_update_type_ = update_type; + delayed_chat_id_ = chat_id; + delayed_min_date_ = message_date; + delayed_max_date_ = message_date; + delayed_max_time_ = update_delay_time; + } else { + delayed_min_date_ = td::min(message_date, delayed_min_date_); + delayed_max_date_ = td::max(message_date, delayed_max_date_); + delayed_max_time_ = td::max(update_delay_time, delayed_max_time_); + } + delayed_update_count_++; + } auto left_time = message_date + 86400 - now; add_message(std::move(message)); @@ -11558,11 +11943,11 @@ void Client::remove_replies_to_message(int64 chat_id, int64 reply_to_message_id, reply_message_ids_.erase(it); } -void Client::delete_message(int64 chat_id, int64 message_id, bool only_from_cache) { +td::unique_ptr Client::delete_message(int64 chat_id, int64 message_id, bool only_from_cache) { remove_replies_to_message(chat_id, message_id, only_from_cache); - auto it = messages_.find({chat_id, message_id}); - if (it == messages_.end()) { + auto message_info = std::move(messages_[{chat_id, message_id}]); + if (message_info == nullptr) { if (yet_unsent_messages_.count({chat_id, message_id}) > 0) { // yet unsent message is deleted, possible only if we are trying to write to inaccessible supergroup or // sent message was deleted before added to the chat @@ -11586,14 +11971,11 @@ void Client::delete_message(int64 chat_id, int64 message_id, bool only_from_cach on_message_send_failed(chat_id, message_id, 0, std::move(error)); } - return; + } else { + set_message_reply_to_message_id(message_info.get(), 0); + messages_.erase({chat_id, message_id}); } - - auto message_info = it->second.get(); - - set_message_reply_to_message_id(message_info, 0); - - messages_.erase(it); + return message_info; } Client::FullMessageId Client::add_message(object_ptr &&message, bool force_update_content) { @@ -11604,16 +11986,14 @@ Client::FullMessageId Client::add_message(object_ptr &&message, int64 message_id = message->id_; LOG(DEBUG) << "Add message " << message_id << " to chat " << chat_id; - td::unique_ptr message_info; - auto it = messages_.find({chat_id, message_id}); - if (it == messages_.end()) { + auto &message_info = messages_[{chat_id, message_id}]; + if (message_info == nullptr) { message_info = td::make_unique(); - } else { - message_info = std::move(it->second); } message_info->id = message_id; message_info->chat_id = chat_id; + message_info->message_thread_id = message->message_thread_id_; message_info->date = message->date_; message_info->edit_date = message->edit_date_; message_info->media_album_id = message->media_album_id_; @@ -11708,6 +12088,7 @@ Client::FullMessageId Client::add_message(object_ptr &&message, } message_info->can_be_saved = message->can_be_saved_; + message_info->is_topic_message = message->is_topic_message_; message_info->author_signature = std::move(message->author_signature_); if (message->reply_in_chat_id_ != chat_id && message->reply_to_message_id_ != 0) { @@ -11729,7 +12110,6 @@ Client::FullMessageId Client::add_message(object_ptr &&message, } set_message_reply_markup(message_info.get(), std::move(message->reply_markup_)); - messages_[{chat_id, message_id}] = std::move(message_info); message = nullptr; return {chat_id, message_id}; @@ -11757,25 +12137,25 @@ void Client::on_update_message_edited(int64 chat_id, int64 message_id, int32 edi } const Client::MessageInfo *Client::get_message(int64 chat_id, int64 message_id) const { - auto it = messages_.find({chat_id, message_id}); - if (it == messages_.end()) { + auto message_info = messages_.get_pointer({chat_id, message_id}); + if (message_info == nullptr) { LOG(DEBUG) << "Not found message " << message_id << " from chat " << chat_id; return nullptr; } LOG(DEBUG) << "Found message " << message_id << " from chat " << chat_id; - return it->second.get(); + return message_info; } Client::MessageInfo *Client::get_message_editable(int64 chat_id, int64 message_id) { - auto it = messages_.find({chat_id, message_id}); - if (it == messages_.end()) { + auto message_info = messages_.get_pointer({chat_id, message_id}); + if (message_info == nullptr) { LOG(DEBUG) << "Not found message " << message_id << " from chat " << chat_id; return nullptr; } LOG(DEBUG) << "Found message " << message_id << " from chat " << chat_id; - return it->second.get(); + return message_info; } td::string Client::get_chat_member_status(const object_ptr &status) { @@ -11903,7 +12283,7 @@ td::int32 Client::as_scheduled_message_id(int64 message_id) { } td::int64 Client::get_supergroup_chat_id(int64 supergroup_id) { - return static_cast(-1000000000000ll) - supergroup_id; + return static_cast(-1000000000000ll) - supergroup_id; } td::int64 Client::get_basic_group_chat_id(int64 basic_group_id) { diff --git a/telegram-bot-api/Client.h b/telegram-bot-api/Client.h index 92bc674..40c0489 100644 --- a/telegram-bot-api/Client.h +++ b/telegram-bot-api/Client.h @@ -26,6 +26,7 @@ #include "td/utils/Promise.h" #include "td/utils/Slice.h" #include "td/utils/Status.h" +#include "td/utils/WaitFreeHashMap.h" #include #include @@ -154,6 +155,9 @@ class Client final : public WebhookActor::Callback { class JsonChatMembers; class JsonChatMemberUpdated; class JsonChatJoinRequest; + class JsonForumTopicCreated; + class JsonForumTopicIsClosedToggled; + class JsonForumTopicInfo; class JsonGameHighScore; class JsonAddress; class JsonOrderInfo; @@ -200,6 +204,7 @@ class Client final : public WebhookActor::Callback { class TdOnGetEditedMessageCallback; class TdOnGetCallbackQueryMessageCallback; class TdOnGetStickerSetCallback; + class TdOnGetForumTopicInfoCallback; class TdOnGetMenuButtonCallback; class TdOnGetMyCommandsCallback; class TdOnGetMyDefaultAdministratorRightsCallback; @@ -281,6 +286,8 @@ class Client final : public WebhookActor::Callback { template class TdOnCheckMessageCallback; template + class TdOnCheckMessageThreadCallback; + template class TdOnCheckRemoteFileIdCallback; template class TdOnGetChatMemberCallback; @@ -327,6 +334,10 @@ class Client final : public WebhookActor::Callback { void check_message(Slice chat_id_str, int64 message_id, bool allow_empty, AccessRights access_rights, Slice message_type, PromisedQueryPtr query, OnSuccess on_success); + template + void check_message_thread(int64 chat_id, int64 message_thread_id, int64 reply_to_message_id, PromisedQueryPtr query, + OnSuccess on_success); + template void resolve_sticker_set(const td::string &sticker_set_name, PromisedQueryPtr query, OnSuccess on_success); @@ -480,8 +491,7 @@ class Client final : public WebhookActor::Callback { td::Result> get_input_media(const Query *query, td::JsonValue &&input_media, bool for_album) const; - td::Result> get_input_media(const Query *query, Slice field_name, - bool for_album) const; + td::Result> get_input_media(const Query *query, Slice field_name) const; td::Result>> get_input_message_contents(const Query *query, Slice field_name) const; @@ -489,7 +499,7 @@ class Client final : public WebhookActor::Callback { td::Result>> get_input_message_contents( const Query *query, td::JsonValue &&value) const; - static td::Result> get_input_message_invoice(const Query *query); + td::Result> get_input_message_invoice(const Query *query) const; static object_ptr get_message_send_options(bool disable_notification, bool protect_content, @@ -534,6 +544,8 @@ class Client final : public WebhookActor::Callback { static bool init_methods(); + static bool is_local_method(Slice method); + void on_cmd(PromisedQueryPtr query); Status process_get_me_query(PromisedQueryPtr &query); @@ -595,6 +607,13 @@ class Client final : public WebhookActor::Callback { Status process_unpin_all_chat_messages_query(PromisedQueryPtr &query); Status process_set_chat_sticker_set_query(PromisedQueryPtr &query); Status process_delete_chat_sticker_set_query(PromisedQueryPtr &query); + Status process_get_forum_topic_icon_stickers_query(PromisedQueryPtr &query); + Status process_create_forum_topic_query(PromisedQueryPtr &query); + Status process_edit_forum_topic_query(PromisedQueryPtr &query); + Status process_close_forum_topic_query(PromisedQueryPtr &query); + Status process_reopen_forum_topic_query(PromisedQueryPtr &query); + Status process_delete_forum_topic_query(PromisedQueryPtr &query); + Status process_unpin_all_forum_topic_messages_query(PromisedQueryPtr &query); Status process_get_chat_member_query(PromisedQueryPtr &query); Status process_get_chat_administrators_query(PromisedQueryPtr &query); Status process_get_chat_member_count_query(PromisedQueryPtr &query); @@ -672,6 +691,8 @@ class Client final : public WebhookActor::Callback { int32 get_webhook_max_connections(const Query *query) const; static bool get_webhook_fix_ip_address(const Query *query); void do_set_webhook(PromisedQueryPtr query, bool was_deleted); + void on_webhook_certificate_copied(Status status); + void finish_set_webhook(PromisedQueryPtr query); void save_webhook() const; td::string get_webhook_certificate_path() const; @@ -698,6 +719,8 @@ class Client final : public WebhookActor::Callback { void abort_long_poll(bool from_set_webhook); + void fail_query_closing(PromisedQueryPtr &&query) const; + void fail_query_conflict(Slice message, PromisedQueryPtr &&query); static void fail_query_with_error(PromisedQueryPtr query, int32 error_code, Slice error_message, @@ -725,8 +748,10 @@ class Client final : public WebhookActor::Callback { td::string first_name; td::string last_name; - td::string username; + td::vector active_usernames; + td::string editable_username; td::string language_code; + int64 emoji_status_custom_emoji_id; object_ptr photo; td::string bio; @@ -775,7 +800,8 @@ class Client final : public WebhookActor::Callback { const GroupInfo *get_group_info(int64 group_id) const; struct SupergroupInfo { - td::string username; + td::vector active_usernames; + td::string editable_username; object_ptr photo; td::string description; td::string invite_link; @@ -786,6 +812,7 @@ class Client final : public WebhookActor::Callback { object_ptr location; object_ptr status; bool is_supergroup = false; + bool is_forum = false; bool can_set_sticker_set = false; bool has_location = false; bool join_to_send_messages = false; @@ -836,6 +863,7 @@ class Client final : public WebhookActor::Callback { int64 sender_user_id = 0; int64 sender_chat_id = 0; int64 chat_id = 0; + int64 message_thread_id = 0; int32 date = 0; int32 edit_date = 0; int64 initial_chat_id = 0; @@ -847,7 +875,6 @@ class Client final : public WebhookActor::Callback { td::string initial_sender_name; td::string author_signature; int64 reply_to_message_id = 0; - int64 message_thread_id = 0; int64 media_album_id = 0; int64 via_bot_user_id = 0; object_ptr content; @@ -863,6 +890,7 @@ class Client final : public WebhookActor::Callback { bool can_be_saved = false; bool is_automatic_forward = false; + bool is_topic_message = false; mutable bool is_reply_to_message_deleted = false; mutable bool is_content_changed = false; }; @@ -889,7 +917,7 @@ class Client final : public WebhookActor::Callback { bool have_sticker_set_name(int64 sticker_set_id) const; - Slice get_sticker_set_name(int64 sticker_set_id) const; + td::string get_sticker_set_name(int64 sticker_set_id) const; int64 choose_added_member_id(const td_api::messageChatAddMembers *message_add_members) const; @@ -910,7 +938,8 @@ class Client final : public WebhookActor::Callback { static void json_store_user_status(td::JsonObjectScope &object, const td_api::UserStatus *userStatus); void remove_replies_to_message(int64 chat_id, int64 reply_to_message_id, bool only_from_cache); - void delete_message(int64 chat_id, int64 message_id, bool only_from_cache); + + td::unique_ptr delete_message(int64 chat_id, int64 message_id, bool only_from_cache); void add_new_message(object_ptr &&message, bool is_edited); void process_new_message_queue(int64 chat_id); @@ -1063,11 +1092,11 @@ class Client final : public WebhookActor::Callback { static td::FlatHashMap methods_; - td::FlatHashMap, FullMessageIdHash> messages_; // message cache - td::FlatHashMap> users_; // user info cache - td::FlatHashMap> groups_; // group info cache - td::FlatHashMap> supergroups_; // supergroup info cache - td::FlatHashMap> chats_; // chat info cache + td::WaitFreeHashMap, FullMessageIdHash> messages_; + td::WaitFreeHashMap> users_; + td::WaitFreeHashMap> groups_; + td::WaitFreeHashMap> supergroups_; + td::WaitFreeHashMap> chats_; td::FlatHashMap, FullMessageIdHash> reply_message_ids_; // message -> replies to it @@ -1118,7 +1147,7 @@ class Client final : public WebhookActor::Callback { }; td::FlatHashMap new_callback_query_queues_; // sender_user_id -> queue - td::FlatHashMap sticker_set_names_; + td::WaitFreeHashMap sticker_set_names_; int64 cur_temp_bot_user_id_ = 1; td::FlatHashMap bot_user_ids_; @@ -1162,6 +1191,7 @@ class Client final : public WebhookActor::Callback { WebhookQueryType webhook_query_type_ = WebhookQueryType::Cancel; td::ActorOwn webhook_id_; PromisedQueryPtr webhook_set_query_; + PromisedQueryPtr active_webhook_set_query_; td::string webhook_url_; double webhook_set_time_ = 0; int32 webhook_max_connections_ = 0; @@ -1188,6 +1218,13 @@ class Client final : public WebhookActor::Callback { td::uint64 webhook_generation_ = 1; + UpdateType delayed_update_type_ = UpdateType::Size; + int64 delayed_chat_id_ = 0; + int32 delayed_min_date_ = 0; + int32 delayed_max_date_ = 0; + int32 delayed_max_time_ = 0; + size_t delayed_update_count_ = 0; + std::shared_ptr parameters_; td::ActorId stat_actor_; diff --git a/telegram-bot-api/ClientManager.cpp b/telegram-bot-api/ClientManager.cpp index a7bfa95..66a0a87 100644 --- a/telegram-bot-api/ClientManager.cpp +++ b/telegram-bot-api/ClientManager.cpp @@ -6,7 +6,6 @@ // #include "telegram-bot-api/ClientManager.h" -#include "telegram-bot-api/Client.h" #include "telegram-bot-api/ClientParameters.h" #include "telegram-bot-api/WebhookActor.h" #include "telegram-bot-api/StatsJson.h" @@ -31,6 +30,7 @@ #include "td/utils/Parser.h" #include "td/utils/port/IPAddress.h" #include "td/utils/port/Stat.h" +#include "td/utils/port/thread.h" #include "td/utils/Slice.h" #include "td/utils/SliceBuilder.h" #include "td/utils/StackAllocator.h" @@ -39,7 +39,10 @@ #include "td/utils/Random.h" #include "td/utils/base64.h" -#include +#include "memprof/memprof.h" + +#include +#include #include namespace telegram_bot_api { @@ -51,6 +54,8 @@ void ClientManager::close(td::Promise &&promise) { } close_flag_ = true; + watchdog_id_.reset(); + dump_statistics(); auto ids = clients_.ids(); for (auto id : ids) { auto *client_info = clients_.get(id); @@ -262,7 +267,8 @@ void ClientManager::get_stats(td::Promise promise, auto now = td::Time::now(); td::int32 active_bot_count = 0; - std::multimap top_bot_ids; + td::vector> top_bot_ids; + size_t max_bots = 50; for (auto id : clients_.ids()) { auto *client_info = clients_.get(id); CHECK(client_info); @@ -275,15 +281,17 @@ void ClientManager::get_stats(td::Promise promise, continue; } - auto stats = client_info->stat_.as_vector(now); - double score = 0.0; - for (auto &stat : stats) { - if (stat.key_ == "update_count" || stat.key_ == "request_count") { - score -= td::to_double(stat.value_); - } + auto score = static_cast(client_info->stat_.get_score(now) * -1e9); + if (score == 0 && top_bot_ids.size() >= max_bots) { + continue; } - top_bot_ids.emplace(static_cast(score * 1e9), id); + top_bot_ids.emplace_back(score, id); } + if (top_bot_ids.size() < max_bots) { + max_bots = top_bot_ids.size(); + } + std::partial_sort(top_bot_ids.begin(), top_bot_ids.begin() + max_bots, top_bot_ids.end()); + top_bot_ids.resize(max_bots); if(!as_json) { sb << stat_.get_description() << '\n'; @@ -359,10 +367,12 @@ void ClientManager::get_stats(td::Promise promise, for (std::pair top_bot_id : top_bot_ids) { auto client_info = clients_.get(top_bot_id.second); CHECK(client_info); - ServerBotInfo bot_info = client_info->client_->get_actor_unsafe()->get_bot_info(); + ServerBotInfo bot_info = client_info->client_.get_actor_unsafe()->get_bot_info(); + auto active_request_count = client_info->stat_.get_active_request_count(); + auto active_file_upload_bytes = client_info->stat_.get_active_file_upload_bytes(); auto stats = client_info->stat_.as_json_ready_vector(now); JsonStatsBotAdvanced bot( - std::move(top_bot_id), std::move(bot_info), std::move(stats), parameters_->stats_hide_sensible_data_, now + std::move(top_bot_id), std::move(bot_info), active_request_count, active_file_upload_bytes, std::move(stats), parameters_->stats_hide_sensible_data_, now ); bots.push_back(bot); } @@ -372,8 +382,9 @@ void ClientManager::get_stats(td::Promise promise, for (auto top_bot_id : top_bot_ids) { auto *client_info = clients_.get(top_bot_id.second); CHECK(client_info); - auto bot_info = client_info->client_->get_actor_unsafe()->get_bot_info(); - + auto bot_info = client_info->client_.get_actor_unsafe()->get_bot_info(); + auto active_request_count = client_info->stat_.get_active_request_count(); + auto active_file_upload_bytes = client_info->stat_.get_active_file_upload_bytes(); sb << '\n'; sb << "id\t" << bot_info.id_ << '\n'; sb << "uptime\t" << now - bot_info.start_time_ << '\n'; @@ -381,18 +392,30 @@ void ClientManager::get_stats(td::Promise promise, sb << "token\t" << bot_info.token_ << '\n'; } sb << "username\t" << bot_info.username_ << '\n'; - if (!parameters_->stats_hide_sensible_data_) { - sb << "webhook\t" << bot_info.webhook_ << '\n'; - } else if (bot_info.webhook_.empty()) { - sb << "webhook disabled" << '\n'; - } else { - sb << "webhook enabled" << '\n'; + if (active_request_count != 0) { + sb << "active_request_count\t" << active_request_count << '\n'; + } + if (active_file_upload_bytes != 0) { + sb << "active_file_upload_bytes\t" << active_file_upload_bytes << '\n'; + } + if (!bot_info.webhook_.empty()) { + if (!parameters_->stats_hide_sensible_data_) { + sb << "webhook\t" << bot_info.webhook_ << '\n'; + } else { + sb << "webhook enabled" << '\n'; + } + if (bot_info.has_webhook_certificate_) { + sb << "has_custom_certificate\t" << bot_info.has_webhook_certificate_ << '\n'; + } + if (bot_info.webhook_max_connections_ != parameters_->default_max_webhook_connections_) { + sb << "webhook_max_connections\t" << bot_info.webhook_max_connections_ << '\n'; + } } - sb << "has_custom_certificate\t" << bot_info.has_webhook_certificate_ << '\n'; sb << "head_update_id\t" << bot_info.head_update_id_ << '\n'; - sb << "tail_update_id\t" << bot_info.tail_update_id_ << '\n'; - sb << "pending_update_count\t" << bot_info.pending_update_count_ << '\n'; - sb << "webhook_max_connections\t" << bot_info.webhook_max_connections_ << '\n'; + if (bot_info.pending_update_count_ != 0) { + sb << "tail_update_id\t" << bot_info.tail_update_id_ << '\n'; + sb << "pending_update_count\t" << bot_info.pending_update_count_ << '\n'; + } auto stats = client_info->stat_.as_vector(now); for (auto &stat : stats) { @@ -419,9 +442,7 @@ td::int64 ClientManager::get_tqueue_id(td::int64 user_id, bool is_test_dc) { void ClientManager::start_up() { //NB: the same scheduler as for database in Td - auto current_scheduler_id = td::Scheduler::instance()->sched_id(); - auto scheduler_count = td::Scheduler::instance()->sched_count(); - auto scheduler_id = td::min(current_scheduler_id + 1, scheduler_count - 1); + auto scheduler_id = 1; // init tqueue { @@ -459,13 +480,14 @@ void ClientManager::start_up() { LOG(WARNING) << "Loaded " << loaded_event_count << " TQueue events in " << (td::Time::now() - load_start_time) << " seconds"; + next_tqueue_gc_time_ = td::Time::now() + 600; } // init webhook_db and user_db auto concurrent_webhook_db = td::make_unique>(); auto status = concurrent_webhook_db->init(parameters_->working_directory_ + "webhooks_db.binlog", td::DbKey::empty(), scheduler_id); - LOG_IF(FATAL, status.is_error()) << "Can't open webhooks_db.binlog " << status.error(); + LOG_IF(FATAL, status.is_error()) << "Can't open webhooks_db.binlog " << status; parameters_->shared_data_->webhook_db_ = std::move(concurrent_webhook_db); auto concurrent_user_db = td::make_unique>(); @@ -486,6 +508,10 @@ void ClientManager::start_up() { send_closure_later(actor_id(this), &ClientManager::send, std::move(query)); } + // launch watchdog + watchdog_id_ = td::create_actor_on_scheduler( + "ManagerWatchdog", td::Scheduler::instance()->sched_count() - 3, td::this_thread::get_id(), WATCHDOG_TIMEOUT); + set_timeout_in(600.0); } PromisedQueryPtr ClientManager::get_webhook_restore_query(td::Slice token, bool is_user, td::Slice webhook_info, @@ -547,6 +573,61 @@ PromisedQueryPtr ClientManager::get_webhook_restore_query(td::Slice token, bool return PromisedQueryPtr(query.release(), PromiseDeleter(td::Promise>())); } +void ClientManager::dump_statistics() { + if (is_memprof_on()) { + LOG(WARNING) << "Memory dump:"; + td::vector v; + dump_alloc([&](const AllocInfo &info) { v.push_back(info); }); + std::sort(v.begin(), v.end(), [](const AllocInfo &a, const AllocInfo &b) { return a.size > b.size; }); + size_t total_size = 0; + size_t other_size = 0; + int count = 0; + for (auto &info : v) { + if (count++ < 50) { + LOG(WARNING) << td::format::as_size(info.size) << td::format::as_array(info.backtrace); + } else { + other_size += info.size; + } + total_size += info.size; + } + LOG(WARNING) << td::tag("other", td::format::as_size(other_size)); + LOG(WARNING) << td::tag("total size", td::format::as_size(total_size)); + LOG(WARNING) << td::tag("total traces", get_ht_size()); + LOG(WARNING) << td::tag("fast_backtrace_success_rate", get_fast_backtrace_success_rate()); + } + auto r_mem_stat = td::mem_stat(); + if (r_mem_stat.is_ok()) { + auto mem_stat = r_mem_stat.move_as_ok(); + LOG(WARNING) << td::tag("rss", td::format::as_size(mem_stat.resident_size_)); + LOG(WARNING) << td::tag("vm", td::format::as_size(mem_stat.virtual_size_)); + LOG(WARNING) << td::tag("rss_peak", td::format::as_size(mem_stat.resident_size_peak_)); + LOG(WARNING) << td::tag("vm_peak", td::format::as_size(mem_stat.virtual_size_peak_)); + } + LOG(WARNING) << td::tag("buffer_mem", td::format::as_size(td::BufferAllocator::get_buffer_mem())); + LOG(WARNING) << td::tag("buffer_slice_size", td::format::as_size(td::BufferAllocator::get_buffer_slice_size())); + + const auto &shared_data = parameters_->shared_data_; + auto query_list_size = shared_data->query_list_size_.load(std::memory_order_relaxed); + auto query_count = shared_data->query_count_.load(std::memory_order_relaxed); + LOG(WARNING) << td::tag("pending queries", query_count) << td::tag("pending requests", query_list_size); + + td::uint64 i = 0; + bool was_gap = false; + for (auto end = &shared_data->query_list_, cur = end->prev; cur != end; cur = cur->prev, i++) { + if (i < 20 || i > query_list_size - 20 || i % (query_list_size / 50 + 1) == 0) { + if (was_gap) { + LOG(WARNING) << "..."; + was_gap = false; + } + LOG(WARNING) << static_cast(*cur); + } else { + was_gap = true; + } + } + + td::dump_pending_network_queries(*parameters_->net_query_stats_); +} + void ClientManager::raw_event(const td::Event::Raw &event) { auto id = get_link_token(); auto *info = clients_.get(id); @@ -563,6 +644,28 @@ void ClientManager::raw_event(const td::Event::Raw &event) { } } +void ClientManager::timeout_expired() { + send_closure(watchdog_id_, &Watchdog::kick); + set_timeout_in(WATCHDOG_TIMEOUT / 2); + + double now = td::Time::now(); + if (now > next_tqueue_gc_time_) { + auto unix_time = parameters_->shared_data_->get_unix_time(now); + LOG(INFO) << "Run TQueue GC at " << unix_time; + td::int64 deleted_events; + bool is_finished; + std::tie(deleted_events, is_finished) = parameters_->shared_data_->tqueue_->run_gc(unix_time); + LOG(INFO) << "TQueue GC deleted " << deleted_events << " events"; + next_tqueue_gc_time_ = td::Time::now() + (is_finished ? 60.0 : 1.0); + + tqueue_deleted_events_ += deleted_events; + if (tqueue_deleted_events_ > last_tqueue_deleted_events_ + 10000) { + LOG(WARNING) << "TQueue GC already deleted " << tqueue_deleted_events_ << " events since the start"; + last_tqueue_deleted_events_ = tqueue_deleted_events_; + } + } +} + void ClientManager::hangup_shared() { auto id = get_link_token(); auto *info = clients_.get(id); @@ -600,4 +703,6 @@ void ClientManager::finish_close() { stop(); } +constexpr double ClientManager::WATCHDOG_TIMEOUT; + } // namespace telegram_bot_api diff --git a/telegram-bot-api/ClientManager.h b/telegram-bot-api/ClientManager.h index 1fc2aeb..d8a2a40 100644 --- a/telegram-bot-api/ClientManager.h +++ b/telegram-bot-api/ClientManager.h @@ -9,6 +9,7 @@ #include "telegram-bot-api/Client.h" #include "telegram-bot-api/Query.h" #include "telegram-bot-api/Stats.h" +#include "telegram-bot-api/Watchdog.h" #include "td/actor/actor.h" @@ -41,6 +42,8 @@ class ClientManager final : public td::Actor { : parameters_(std::move(parameters)), token_range_(token_range) { } + void dump_statistics(); + void send(PromisedQueryPtr query); void user_login(PromisedQueryPtr query); @@ -71,6 +74,13 @@ class ClientManager final : public td::Actor { bool close_flag_ = false; td::vector> close_promises_; + td::ActorOwn watchdog_id_; + double next_tqueue_gc_time_ = 0.0; + td::int64 tqueue_deleted_events_ = 0; + td::int64 last_tqueue_deleted_events_ = 0; + + static constexpr double WATCHDOG_TIMEOUT = 0.5; + static td::int64 get_tqueue_id(td::int64 user_id, bool is_test_dc); static PromisedQueryPtr get_webhook_restore_query(td::Slice token, bool is_user, td::Slice webhook_info, @@ -78,6 +88,7 @@ class ClientManager final : public td::Actor { void start_up() final; void raw_event(const td::Event::Raw &event) final; + void timeout_expired() final; void hangup_shared() final; void close_db(); void finish_close(); diff --git a/telegram-bot-api/ClientParameters.h b/telegram-bot-api/ClientParameters.h index 97650c8..34f65dd 100644 --- a/telegram-bot-api/ClientParameters.h +++ b/telegram-bot-api/ClientParameters.h @@ -29,10 +29,10 @@ namespace telegram_bot_api { struct SharedData { std::atomic query_count_{0}; + std::atomic query_list_size_{0}; std::atomic next_verbosity_level_{-1}; - // not thread-safe - size_t query_list_size_ = 0; + // not thread-safe, must be used from a single thread td::ListNode query_list_; td::unique_ptr webhook_db_; td::unique_ptr user_db_; diff --git a/telegram-bot-api/HttpConnection.cpp b/telegram-bot-api/HttpConnection.cpp index 6c2c4e6..aa902f2 100644 --- a/telegram-bot-api/HttpConnection.cpp +++ b/telegram-bot-api/HttpConnection.cpp @@ -21,7 +21,7 @@ namespace telegram_bot_api { void HttpConnection::handle(td::unique_ptr http_query, td::ActorOwn connection) { - CHECK(connection_->empty()); + CHECK(connection_.empty()); connection_ = std::move(connection); LOG(DEBUG) << "Handle " << *http_query; diff --git a/telegram-bot-api/HttpStatConnection.cpp b/telegram-bot-api/HttpStatConnection.cpp index 02506cb..23cd293 100644 --- a/telegram-bot-api/HttpStatConnection.cpp +++ b/telegram-bot-api/HttpStatConnection.cpp @@ -15,7 +15,7 @@ namespace telegram_bot_api { void HttpStatConnection::handle(td::unique_ptr http_query, td::ActorOwn connection) { - CHECK(connection_->empty()); + CHECK(connection_.empty()); connection_ = std::move(connection); td::Parser url_path_parser(http_query->url_path_); as_json_ = url_path_parser.try_skip("/json"); diff --git a/telegram-bot-api/Query.cpp b/telegram-bot-api/Query.cpp index 875c1a0..8a109e7 100644 --- a/telegram-bot-api/Query.cpp +++ b/telegram-bot-api/Query.cpp @@ -46,9 +46,9 @@ Query::Query(td::vector &&container, td::Slice token, bool is_u start_timestamp_ = td::Time::now(); LOG(INFO) << "QUERY: create " << td::tag("ptr", this) << *this; if (shared_data_) { - shared_data_->query_count_++; + shared_data_->query_count_.fetch_add(1, std::memory_order_relaxed); if (method_ != "getupdates") { - shared_data_->query_list_size_++; + shared_data_->query_list_size_.fetch_add(1, std::memory_order_relaxed); shared_data_->query_list_.put(this); } } @@ -136,7 +136,7 @@ void Query::send_response_stat() const { return; } send_closure(stat_actor_, &BotStatActor::add_event, - ServerBotStat::Response{state_ == State::OK, answer_.size()}, now); + ServerBotStat::Response{state_ == State::OK, answer_.size(), files_size()}, now); } } // namespace telegram_bot_api diff --git a/telegram-bot-api/Query.h b/telegram-bot-api/Query.h index 7df19fc..6c360ba 100644 --- a/telegram-bot-api/Query.h +++ b/telegram-bot-api/Query.h @@ -23,6 +23,7 @@ #include "td/utils/StringBuilder.h" #include +#include #include #include @@ -112,9 +113,9 @@ class Query final : public td::ListNode { Query &operator=(Query &&) = delete; ~Query() { if (shared_data_) { - shared_data_->query_count_--; + shared_data_->query_count_.fetch_sub(1, std::memory_order_relaxed); if (!empty()) { - shared_data_->query_list_size_--; + shared_data_->query_list_size_.fetch_sub(1, std::memory_order_relaxed); } } } diff --git a/telegram-bot-api/Stats.cpp b/telegram-bot-api/Stats.cpp index fd6a43c..796e97d 100644 --- a/telegram-bot-api/Stats.cpp +++ b/telegram-bot-api/Stats.cpp @@ -185,6 +185,25 @@ td::vector BotStatActor::get_jsonable_description() const { } +double BotStatActor::get_score(double now) { + auto minute_stat = stat_[2].stat_duration(now); + double result = minute_stat.first.request_count_ + minute_stat.first.update_count_; + if (minute_stat.second != 0) { + result /= minute_stat.second; + } + result += td::max(static_cast(get_active_request_count() - 10), static_cast(0)); + result += static_cast(get_active_file_upload_bytes()) * 1e-8; + return result; +} + +td::int64 BotStatActor::get_active_request_count() const { + return active_request_count_; +} + +td::int64 BotStatActor::get_active_file_upload_bytes() const { + return active_file_upload_bytes_; +} + bool BotStatActor::is_active(double now) const { return last_activity_timestamp_ > now - 86400; } diff --git a/telegram-bot-api/Stats.h b/telegram-bot-api/Stats.h index a9cc232..234c2b7 100644 --- a/telegram-bot-api/Stats.h +++ b/telegram-bot-api/Stats.h @@ -116,15 +116,16 @@ struct ServerBotStat { struct Response { bool ok_; size_t size_; + td::int64 files_size_; }; - void on_event(const Response &answer) { + void on_event(const Response &response) { response_count_++; - if (answer.ok_) { + if (response.ok_) { response_count_ok_++; } else { response_count_error_++; } - response_bytes_ += static_cast(answer.size_); + response_bytes_ += static_cast(response.size_); } struct Request { @@ -174,6 +175,7 @@ class BotStatActor final : public td::Actor { for (auto &stat : stat_) { stat.add_event(event, now); } + on_event(event); if (!parent_.empty()) { send_closure(parent_, &BotStatActor::add_event, event, now); } @@ -184,6 +186,12 @@ class BotStatActor final : public td::Actor { td::string get_description() const; td::vector get_jsonable_description() const; + double get_score(double now); + + td::int64 get_active_request_count() const; + + td::int64 get_active_file_upload_bytes() const; + bool is_active(double now) const; static constexpr std::size_t SIZE = 4; @@ -193,6 +201,23 @@ class BotStatActor final : public td::Actor { td::TimedStat stat_[SIZE]; td::ActorId parent_; double last_activity_timestamp_ = -1e9; + td::int64 active_request_count_ = 0; + td::int64 active_file_upload_bytes_ = 0; + + void on_event(const ServerBotStat::Update &update) { + } + + void on_event(const ServerBotStat::Response &response) { + active_request_count_--; + active_file_upload_bytes_ -= response.files_size_; + CHECK(active_request_count_ >= 0); + CHECK(active_file_upload_bytes_ >= 0); + } + + void on_event(const ServerBotStat::Request &request) { + active_request_count_++; + active_file_upload_bytes_ += request.files_size_; + } }; } // namespace telegram_bot_api diff --git a/telegram-bot-api/StatsJson.h b/telegram-bot-api/StatsJson.h index 6e6ba18..d70a5c0 100644 --- a/telegram-bot-api/StatsJson.h +++ b/telegram-bot-api/StatsJson.h @@ -200,10 +200,13 @@ class JsonStatsBotAdvanced : public JsonStatsBot { public: explicit JsonStatsBotAdvanced(std::pair score_id_pair, ServerBotInfo bot, + td::int64 active_request_count, + td::int64 active_file_upload_bytes, td::vector stats, const bool hide_sensible_data, const double now) - : JsonStatsBot(std::move(score_id_pair)), bot_(std::move(bot)), stats_(std::move(stats)), + : JsonStatsBot(std::move(score_id_pair)), bot_(std::move(bot)), active_request_count_(active_request_count), + active_file_upload_bytes_(active_file_upload_bytes), stats_(std::move(stats)), hide_sensible_data_(hide_sensible_data), now_(now) { } void store(td::JsonValueScope *scope) const { @@ -235,6 +238,8 @@ class JsonStatsBotAdvanced : public JsonStatsBot { } private: ServerBotInfo bot_; + td::int64 active_request_count_; + td::int64 active_file_upload_bytes_; td::vector stats_; const bool hide_sensible_data_; const double now_; diff --git a/telegram-bot-api/Watchdog.cpp b/telegram-bot-api/Watchdog.cpp new file mode 100644 index 0000000..8b8105e --- /dev/null +++ b/telegram-bot-api/Watchdog.cpp @@ -0,0 +1,28 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "telegram-bot-api/Watchdog.h" + +#include "td/utils/logging.h" +#include "td/utils/Time.h" + +namespace telegram_bot_api { + +void Watchdog::kick() { + auto now = td::Time::now(); + if (now >= last_kick_time_ + timeout_ && last_kick_time_ > 0) { + LOG(ERROR) << get_name() << " timeout expired after " << now - last_kick_time_ << " seconds"; + td::thread::send_real_time_signal(main_thread_id_, 2); + } + last_kick_time_ = now; + set_timeout_in(timeout_); +} + +void Watchdog::timeout_expired() { + kick(); +} + +} // namespace telegram_bot_api diff --git a/telegram-bot-api/Watchdog.h b/telegram-bot-api/Watchdog.h new file mode 100644 index 0000000..5ab0115 --- /dev/null +++ b/telegram-bot-api/Watchdog.h @@ -0,0 +1,31 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/actor/actor.h" + +#include "td/utils/port/thread.h" + +namespace telegram_bot_api { + +class Watchdog final : public td::Actor { + public: + Watchdog(td::thread::id main_thread_id, double timeout) : main_thread_id_(main_thread_id), timeout_(timeout) { + // watchdog is disabled until it is kicked for the first time + } + + void kick(); + + private: + void timeout_expired() final; + + td::thread::id main_thread_id_; + double timeout_; + double last_kick_time_ = 0.0; +}; + +} // namespace telegram_bot_api diff --git a/telegram-bot-api/telegram-bot-api.cpp b/telegram-bot-api/telegram-bot-api.cpp index 4c72ae7..bef522a 100644 --- a/telegram-bot-api/telegram-bot-api.cpp +++ b/telegram-bot-api/telegram-bot-api.cpp @@ -9,13 +9,10 @@ #include "telegram-bot-api/HttpConnection.h" #include "telegram-bot-api/HttpServer.h" #include "telegram-bot-api/HttpStatConnection.h" -#include "telegram-bot-api/Query.h" #include "telegram-bot-api/Stats.h" - -#include "td/telegram/ClientActor.h" +#include "telegram-bot-api/Watchdog.h" #include "td/db/binlog/Binlog.h" -#include "td/db/TQueue.h" #include "td/net/GetHostByNameActor.h" #include "td/net/HttpInboundConnection.h" @@ -23,13 +20,11 @@ #include "td/actor/actor.h" #include "td/actor/ConcurrentScheduler.h" -#include "td/utils/buffer.h" +#include "td/utils/AsyncFileLog.h" #include "td/utils/CombinedLog.h" #include "td/utils/common.h" #include "td/utils/crypto.h" #include "td/utils/ExitGuard.h" -#include "td/utils/FileLog.h" -#include "td/utils/format.h" //#include "td/utils/GitInfo.h" #include "td/utils/logging.h" #include "td/utils/MemoryLog.h" @@ -42,18 +37,14 @@ #include "td/utils/port/rlimit.h" #include "td/utils/port/signals.h" #include "td/utils/port/stacktrace.h" -#include "td/utils/port/Stat.h" +#include "td/utils/port/thread.h" #include "td/utils/port/user.h" #include "td/utils/Promise.h" #include "td/utils/Slice.h" #include "td/utils/SliceBuilder.h" #include "td/utils/Status.h" #include "td/utils/Time.h" -#include "td/utils/TsLog.h" -#include "memprof/memprof.h" - -#include #include #include #include @@ -78,18 +69,31 @@ static td::MemoryLog<1 << 20> memory_log; void print_log() { auto buf = memory_log.get_buffer(); auto pos = memory_log.get_pos(); + size_t tail_length = buf.size() - pos; + while (tail_length > 0 && buf[pos + tail_length - 1] == ' ') { + tail_length--; + } + if (tail_length + 100 >= buf.size() - pos) { + tail_length = buf.size() - pos; + } td::signal_safe_write("------- Log dump -------\n"); - td::signal_safe_write(buf.substr(pos), false); + td::signal_safe_write(buf.substr(pos, tail_length), false); td::signal_safe_write(buf.substr(0, pos), false); td::signal_safe_write("\n", false); td::signal_safe_write("------------------------\n"); } +static std::atomic_bool has_failed{false}; + static void dump_stacktrace_signal_handler(int sig) { + if (has_failed) { + return; + } td::Stacktrace::print_to_stderr(); } static void fail_signal_handler(int sig) { + has_failed = true; td::signal_safe_write_signal_number(sig); td::Stacktrace::PrintOptions options; options.use_gdb = true; @@ -107,6 +111,9 @@ static void change_verbosity_level_signal_handler(int sig) { static std::atomic_flag need_dump_log; static void dump_log_signal_handler(int sig) { + if (has_failed) { + return; + } need_dump_log.clear(); } @@ -115,61 +122,6 @@ static void sigsegv_signal_handler(int signum, void *addr) { fail_signal_handler(signum); } -static void dump_statistics(const std::shared_ptr &shared_data, - const std::shared_ptr &net_query_stats) { - if (is_memprof_on()) { - LOG(WARNING) << "Memory dump:"; - td::vector v; - dump_alloc([&](const AllocInfo &info) { v.push_back(info); }); - std::sort(v.begin(), v.end(), [](const AllocInfo &a, const AllocInfo &b) { return a.size > b.size; }); - size_t total_size = 0; - size_t other_size = 0; - int count = 0; - for (auto &info : v) { - if (count++ < 50) { - LOG(WARNING) << td::format::as_size(info.size) << td::format::as_array(info.backtrace); - } else { - other_size += info.size; - } - total_size += info.size; - } - LOG(WARNING) << td::tag("other", td::format::as_size(other_size)); - LOG(WARNING) << td::tag("total size", td::format::as_size(total_size)); - LOG(WARNING) << td::tag("total traces", get_ht_size()); - LOG(WARNING) << td::tag("fast_backtrace_success_rate", get_fast_backtrace_success_rate()); - } - auto r_mem_stat = td::mem_stat(); - if (r_mem_stat.is_ok()) { - auto mem_stat = r_mem_stat.move_as_ok(); - LOG(WARNING) << td::tag("rss", td::format::as_size(mem_stat.resident_size_)); - LOG(WARNING) << td::tag("vm", td::format::as_size(mem_stat.virtual_size_)); - LOG(WARNING) << td::tag("rss_peak", td::format::as_size(mem_stat.resident_size_peak_)); - LOG(WARNING) << td::tag("vm_peak", td::format::as_size(mem_stat.virtual_size_peak_)); - } - LOG(WARNING) << td::tag("buffer_mem", td::format::as_size(td::BufferAllocator::get_buffer_mem())); - LOG(WARNING) << td::tag("buffer_slice_size", td::format::as_size(td::BufferAllocator::get_buffer_slice_size())); - - auto query_list_size = shared_data->query_list_size_; - auto query_count = shared_data->query_count_.load(); - LOG(WARNING) << td::tag("pending queries", query_count) << td::tag("pending requests", query_list_size); - - td::uint64 i = 0; - bool was_gap = false; - for (auto end = &shared_data->query_list_, cur = end->prev; cur != end; cur = cur->prev, i++) { - if (i < 20 || i > query_list_size - 20 || i % (query_list_size / 50 + 1) == 0) { - if (was_gap) { - LOG(WARNING) << "..."; - was_gap = false; - } - LOG(WARNING) << static_cast(*cur); - } else { - was_gap = true; - } - } - - td::dump_pending_network_queries(*net_query_stats); -} - int main(int argc, char *argv[]) { SET_VERBOSITY_LEVEL(VERBOSITY_NAME(FATAL)); td::ExitGuard exit_guard; @@ -191,16 +143,16 @@ int main(int argc, char *argv[]) { td::set_signal_handler(td::SignalType::Other, fail_signal_handler).ensure(); td::set_extended_signal_handler(td::SignalType::Error, sigsegv_signal_handler).ensure(); - td::set_runtime_signal_handler(0, change_verbosity_level_signal_handler).ensure(); - td::set_runtime_signal_handler(1, dump_log_signal_handler).ensure(); - td::set_runtime_signal_handler(2, dump_stacktrace_signal_handler).ensure(); + td::set_real_time_signal_handler(0, change_verbosity_level_signal_handler).ensure(); + td::set_real_time_signal_handler(1, dump_log_signal_handler).ensure(); + td::set_real_time_signal_handler(2, dump_stacktrace_signal_handler).ensure(); td::init_openssl_threads(); auto start_time = td::Time::now(); auto shared_data = std::make_shared(); auto parameters = std::make_unique(); - parameters->version_ = "6.2"; + parameters->version_ = "6.3.2"; parameters->shared_data_ = shared_data; parameters->start_time_ = start_time; auto net_query_stats = td::create_net_query_stats(); @@ -222,6 +174,8 @@ int main(int argc, char *argv[]) { td::string username; td::string groupname; td::uint64 max_connections = 0; + td::uint64 cpu_affinity = 0; + td::uint64 main_thread_affinity = 0; ClientManager::TokenRange token_range{0, 1}; parameters->api_id_ = [](auto x) -> td::int32 { @@ -319,6 +273,17 @@ int main(int argc, char *argv[]) { options.add_option('g', "groupname", "effective group name to switch to", td::OptionParser::parse_string(groupname)); options.add_checked_option('c', "max-connections", "maximum number of open file descriptors", td::OptionParser::parse_integer(max_connections)); +#if TD_HAVE_THREAD_AFFINITY + options.add_checked_option('\0', "cpu-affinity", "CPU affinity as 64-bit mask (defaults to all available CPUs)", + td::OptionParser::parse_integer(cpu_affinity)); + options.add_checked_option( + '\0', "main-thread-affinity", + "CPU affinity of the main thread as 64-bit mask (defaults to the value of the option --cpu-affinity)", + td::OptionParser::parse_integer(main_thread_affinity)); +#else + (void)cpu_affinity; + (void)main_thread_affinity; +#endif options.add_checked_option('\0', "max-batch-operations", PSLICE() << "maximum number of batch operations (default: " << parameters->max_batch_operations << ")", @@ -375,10 +340,29 @@ int main(int argc, char *argv[]) { log.set_second(&memory_log); td::log_interface = &log; - td::FileLog file_log; - td::TsLog ts_log(&file_log); + td::AsyncFileLog file_log; auto init_status = [&] { +#if TD_HAVE_THREAD_AFFINITY + if (main_thread_affinity == 0) { + main_thread_affinity = cpu_affinity; + } + if (main_thread_affinity != 0) { + auto initial_mask = td::thread::get_affinity_mask(td::this_thread::get_id()); + if (initial_mask == 0) { + return td::Status::Error("Failed to get current thread affinity"); + } + if (cpu_affinity != 0) { + TRY_STATUS_PREFIX(td::thread::set_affinity_mask(td::this_thread::get_id(), cpu_affinity), + "Can't set CPU affinity mask: "); + } else { + cpu_affinity = initial_mask; + } + TRY_STATUS_PREFIX(td::thread::set_affinity_mask(td::this_thread::get_id(), main_thread_affinity), + "Can't set main thread CPU affinity mask: "); + } +#endif + if (max_connections != 0) { TRY_STATUS_PREFIX(td::set_resource_limit(td::ResourceLimitType::NoFile, max_connections), "Can't set file descriptor limit: "); @@ -392,7 +376,7 @@ int main(int argc, char *argv[]) { TRY_RESULT_PREFIX_ASSIGN(working_directory, td::realpath(working_directory, true), "Invalid working directory specified: "); if (working_directory.empty()) { - return td::Status::Error("Working directory can't be empty"); + return td::Status::Error("Empty path specified as working directory"); } if (working_directory.back() != TD_DIR_SLASH) { working_directory += TD_DIR_SLASH; @@ -448,13 +432,13 @@ int main(int argc, char *argv[]) { log_file_path = working_directory + log_file_path; } TRY_STATUS_PREFIX(file_log.init(log_file_path, log_max_file_size), "Can't open log file: "); - log.set_first(&ts_log); + log.set_first(&file_log); } return td::Status::OK(); }(); if (init_status.is_error()) { - LOG(PLAIN) << init_status.error().message(); + LOG(PLAIN) << init_status.message(); LOG(PLAIN) << options; return 1; } @@ -479,48 +463,58 @@ int main(int argc, char *argv[]) { // << (td::GitInfo::is_dirty() ? "(dirty)" : "") << " started"; LOG(WARNING) << "Bot API " << parameters->version_ << " server started"; - const int threads_n = 5; // +3 for Td, one for slow HTTP connections and one for DNS resolving - td::ConcurrentScheduler sched; - sched.init(threads_n); + // +3 threads for Td + // one thread for ClientManager and all Clients + // one thread for watchdogs + // one thread for slow HTTP connections + // one thread for DNS resolving + const int thread_count = 7; + td::ConcurrentScheduler sched(thread_count, cpu_affinity); td::GetHostByNameActor::Options get_host_by_name_options; - get_host_by_name_options.scheduler_id = threads_n; + get_host_by_name_options.scheduler_id = thread_count; parameters->get_host_by_name_actor_id_ = sched.create_actor_unsafe(0, "GetHostByName", std::move(get_host_by_name_options)) .release(); auto client_manager = - sched.create_actor_unsafe(0, "ClientManager", std::move(parameters), token_range).release(); + sched.create_actor_unsafe(thread_count - 3, "ClientManager", std::move(parameters), token_range) + .release(); + sched .create_actor_unsafe( - 0, "HttpServer", http_ip_address, http_port, + thread_count - 3, "HttpServer", http_ip_address, http_port, [client_manager, shared_data] { return td::ActorOwn( td::create_actor("HttpConnection", client_manager, shared_data)); }) .release(); + if (http_stat_port != 0) { sched .create_actor_unsafe( - 0, "HttpStatsServer", http_stat_ip_address, http_stat_port, + thread_count - 3, "HttpStatsServer", http_stat_ip_address, http_stat_port, [client_manager] { return td::ActorOwn( td::create_actor("HttpStatConnection", client_manager)); }) .release(); } + + constexpr double WATCHDOG_TIMEOUT = 0.5; + auto watchdog_id = + sched.create_actor_unsafe(thread_count - 2, "Watchdog", td::this_thread::get_id(), WATCHDOG_TIMEOUT); + sched.start(); + double next_watchdog_kick_time = start_time; double next_cron_time = start_time; double last_dump_time = start_time - 1000.0; - double last_tqueue_gc_time = start_time - 1000.0; - td::int64 tqueue_deleted_events = 0; - td::int64 last_tqueue_deleted_events = 0; bool close_flag = false; std::atomic_bool can_quit{false}; ServerCpuStat::instance(); // create ServerCpuStat instance while (true) { - sched.run_main(next_cron_time - td::Time::now()); + sched.run_main(td::min(next_cron_time, next_watchdog_kick_time) - td::Time::now()); if (!need_reopen_log.test_and_set()) { td::log_interface->after_rotation(); @@ -533,9 +527,9 @@ int main(int argc, char *argv[]) { } LOG(WARNING) << "Stopping engine with uptime " << (td::Time::now() - start_time) << " seconds by a signal"; - dump_statistics(shared_data, net_query_stats); close_flag = true; auto guard = sched.get_main_guard(); + watchdog_id.reset(); send_closure(client_manager, &ClientManager::close, td::PromiseCreator::lambda([&can_quit](td::Unit) { can_quit.store(true); td::Scheduler::instance()->yield(); @@ -562,7 +556,8 @@ int main(int argc, char *argv[]) { if (!need_dump_log.test_and_set()) { print_log(); - dump_statistics(shared_data, net_query_stats); + auto guard = sched.get_main_guard(); + send_closure(client_manager, &ClientManager::dump_statistics); } double now = td::Time::now(); @@ -574,29 +569,23 @@ int main(int argc, char *argv[]) { ServerCpuStat::update(now); } - if (now > last_tqueue_gc_time + 60.0) { - auto unix_time = shared_data->get_unix_time(now); - LOG(INFO) << "Run TQueue GC at " << unix_time; - last_tqueue_gc_time = now; + if (now >= start_time + 600) { auto guard = sched.get_main_guard(); - auto deleted_events = shared_data->tqueue_->run_gc(unix_time); - LOG(INFO) << "TQueue GC deleted " << deleted_events << " events"; - - tqueue_deleted_events += deleted_events; - if (tqueue_deleted_events > last_tqueue_deleted_events + 10000) { - LOG(WARNING) << "TQueue GC already deleted " << tqueue_deleted_events << " events since the start"; - last_tqueue_deleted_events = tqueue_deleted_events; - } + send_closure(watchdog_id, &Watchdog::kick); + next_watchdog_kick_time = now + WATCHDOG_TIMEOUT / 2; } if (now > last_dump_time + 300.0) { last_dump_time = now; - dump_statistics(shared_data, net_query_stats); + auto guard = sched.get_main_guard(); + send_closure(client_manager, &ClientManager::dump_statistics); } } LOG(WARNING) << "--------------------FINISH ENGINE--------------------"; - CHECK(net_query_stats.use_count() == 1); + if (net_query_stats.use_count() != 1) { + LOG(ERROR) << "NetQueryStats have leaked"; + } net_query_stats = nullptr; sched.finish(); SET_VERBOSITY_LEVEL(VERBOSITY_NAME(FATAL));