diff --git a/CMakeLists.txt b/CMakeLists.txt index 41128e7..348a5dc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,7 +6,7 @@ if (POLICY CMP0065) cmake_policy(SET CMP0065 NEW) endif() -project(TelegramBotApi VERSION 6.3.2 LANGUAGES CXX) +project(TelegramBotApi VERSION 6.4 LANGUAGES CXX) if (POLICY CMP0069) option(TELEGRAM_BOT_API_ENABLE_LTO "Use \"ON\" to enable Link Time Optimization.") diff --git a/td b/td index 7eba198..93c42f6 160000 --- a/td +++ b/td @@ -1 +1 @@ -Subproject commit 7eba19887ad834fd731b6b07b53c2426fe4beb59 +Subproject commit 93c42f6d7c1209937431469f80427d48907f1b8d diff --git a/telegram-bot-api/Client.cpp b/telegram-bot-api/Client.cpp index 18fc394..56da745 100644 --- a/telegram-bot-api/Client.cpp +++ b/telegram-bot-api/Client.cpp @@ -58,15 +58,23 @@ using td::JsonValue; using td_api::make_object; using td_api::move_object_as; +int Client::get_retry_after_time(Slice error_message) { + Slice prefix = "Too Many Requests: retry after "; + if (begins_with(error_message, prefix)) { + auto r_retry_after = td::to_integer_safe(error_message.substr(prefix.size())); + if (r_retry_after.is_ok() && r_retry_after.ok() > 0) { + return r_retry_after.ok(); + } + } + return 0; +} + void Client::fail_query_with_error(PromisedQueryPtr query, int32 error_code, Slice error_message, Slice default_message) { if (error_code == 429) { - Slice prefix = "Too Many Requests: retry after "; - if (begins_with(error_message, prefix)) { - auto r_retry_after = td::to_integer_safe(error_message.substr(prefix.size())); - if (r_retry_after.is_ok() && r_retry_after.ok() > 0) { - return query->set_retry_after_error(r_retry_after.ok()); - } + auto retry_after_time = get_retry_after_time(error_message); + if (retry_after_time > 0) { + return query->set_retry_after_error(retry_after_time); } LOG(ERROR) << "Wrong error message: " << error_message << " from " << *query; return fail_query(500, error_message, std::move(query)); @@ -207,9 +215,9 @@ Client::Client(td::ActorShared<> parent, const td::string &bot_token, bool is_us } Client::~Client() { - td::Scheduler::instance()->destroy_on_scheduler(get_file_gc_scheduler_id(), messages_, users_, groups_, supergroups_, - chats_, reply_message_ids_, yet_unsent_reply_message_ids_, - sticker_set_names_); + td::Scheduler::instance()->destroy_on_scheduler(SharedData::get_file_gc_scheduler_id(), messages_, users_, groups_, + supergroups_, chats_, reply_message_ids_, + yet_unsent_reply_message_ids_, sticker_set_names_); } bool Client::init_methods() { @@ -280,6 +288,11 @@ bool Client::init_methods() { methods_.emplace("reopenforumtopic", &Client::process_reopen_forum_topic_query); methods_.emplace("deleteforumtopic", &Client::process_delete_forum_topic_query); methods_.emplace("unpinallforumtopicmessages", &Client::process_unpin_all_forum_topic_messages_query); + methods_.emplace("editgeneralforumtopic", &Client::process_edit_general_forum_topic_query); + methods_.emplace("closegeneralforumtopic", &Client::process_close_general_forum_topic_query); + methods_.emplace("reopengeneralforumtopic", &Client::process_reopen_general_forum_topic_query); + methods_.emplace("hidegeneralforumtopic", &Client::process_hide_general_forum_topic_query); + methods_.emplace("unhidegeneralforumtopic", &Client::process_unhide_general_forum_topic_query); methods_.emplace("getchatmember", &Client::process_get_chat_member_query); methods_.emplace("getchatadministrators", &Client::process_get_chat_administrators_query); methods_.emplace("getchatmembercount", &Client::process_get_chat_member_count_query); @@ -353,6 +366,13 @@ bool Client::is_local_method(Slice method) { method == "setwebhook" || method == "deletewebhook" || method == "getwebhookinfo"; } +class Client::JsonEmptyObject final : public Jsonable { + public: + void store(JsonValueScope *scope) const { + auto object = scope->enter_object(); + } +}; + class Client::JsonFile final : public Jsonable { public: JsonFile(const td_api::file *file, const Client *client, bool with_path) @@ -714,6 +734,12 @@ class Client::JsonMessage final : public Jsonable { } } } + + void add_media_spoiler(td::JsonObjectScope &object, bool has_spoiler) const { + if (has_spoiler) { + object("has_media_spoiler", td::JsonTrue()); + } + } }; class Client::JsonChat final : public Jsonable { @@ -859,6 +885,12 @@ class Client::JsonChat final : public Jsonable { if (supergroup_info->is_supergroup && supergroup_info->join_by_request) { object("join_by_request", td::JsonTrue()); } + if (supergroup_info->is_supergroup && supergroup_info->has_hidden_members) { + object("has_hidden_members", td::JsonTrue()); + } + if (supergroup_info->has_aggressive_anti_spam_enabled) { + object("has_aggressive_anti_spam_enabled", td::JsonTrue()); + } if (supergroup_info->slow_mode_delay != 0) { object("slow_mode_delay", supergroup_info->slow_mode_delay); } @@ -908,7 +940,7 @@ class Client::JsonChat final : public Jsonable { } if (pinned_message_id_ != 0) { CHECK(pinned_message_id_ != -1); - const MessageInfo *pinned_message = client_->get_message(chat_id_, pinned_message_id_); + const MessageInfo *pinned_message = client_->get_message(chat_id_, pinned_message_id_, true); if (pinned_message != nullptr) { object("pinned_message", JsonMessage(pinned_message, false, "pin in JsonChat", client_)); } else { @@ -1163,19 +1195,36 @@ class Client::JsonSticker final : public Jsonable { object("is_animated", td::JsonBool(format == td_api::stickerFormatTgs::ID)); object("is_video", td::JsonBool(format == td_api::stickerFormatWebm::ID)); - object("type", Client::get_sticker_type(sticker_->type_)); - - if (sticker_->custom_emoji_id_ != 0) { - object("custom_emoji_id", td::to_string(sticker_->custom_emoji_id_)); + switch (sticker_->full_type_->get_id()) { + case td_api::stickerFullTypeRegular::ID: { + auto full_type = static_cast(sticker_->full_type_.get()); + object("type", Client::get_sticker_type(make_object())); + if (full_type->premium_animation_ != nullptr) { + object("premium_animation", JsonFile(full_type->premium_animation_.get(), client_, false)); + } + break; + } + case td_api::stickerFullTypeMask::ID: { + auto full_type = static_cast(sticker_->full_type_.get()); + object("type", Client::get_sticker_type(make_object())); + if (full_type->mask_position_ != nullptr) { + object("mask_position", JsonMaskPosition(full_type->mask_position_.get())); + } + break; + } + case td_api::stickerFullTypeCustomEmoji::ID: { + auto full_type = static_cast(sticker_->full_type_.get()); + object("type", Client::get_sticker_type(make_object())); + if (full_type->custom_emoji_id_ != 0) { + object("custom_emoji_id", td::to_string(full_type->custom_emoji_id_)); + } + break; + } + default: + UNREACHABLE(); + break; } - const auto &mask_position = sticker_->mask_position_; - if (mask_position != nullptr) { - object("mask_position", JsonMaskPosition(mask_position.get())); - } - if (sticker_->premium_animation_ != nullptr) { - object("premium_animation", JsonFile(sticker_->premium_animation_.get(), client_, false)); - } client_->json_store_thumbnail(object, sticker_->thumbnail_.get()); client_->json_store_file(object, sticker_->sticker_.get()); } @@ -1474,11 +1523,25 @@ class Client::JsonForumTopicCreated final : public Jsonable { const td_api::messageForumTopicCreated *forum_topic_created_; }; -class Client::JsonForumTopicIsClosedToggled final : public Jsonable { +class Client::JsonForumTopicEdited final : public Jsonable { public: + explicit JsonForumTopicEdited(const td_api::messageForumTopicEdited *forum_topic_edited) + : forum_topic_edited_(forum_topic_edited) { + } void store(JsonValueScope *scope) const { auto object = scope->enter_object(); + if (!forum_topic_edited_->name_.empty()) { + object("name", forum_topic_edited_->name_); + } + if (forum_topic_edited_->edit_icon_custom_emoji_id_) { + object("icon_custom_emoji_id", forum_topic_edited_->icon_custom_emoji_id_ == 0 + ? td::string() + : td::to_string(forum_topic_edited_->icon_custom_emoji_id_)); + } } + + private: + const td_api::messageForumTopicEdited *forum_topic_edited_; }; class Client::JsonForumTopicInfo final : public Jsonable { @@ -1717,13 +1780,6 @@ class Client::JsonVideoChatScheduled final : public Jsonable { const td_api::messageVideoChatScheduled *video_chat_scheduled_; }; -class Client::JsonVideoChatStarted final : public Jsonable { - public: - void store(JsonValueScope *scope) const { - auto object = scope->enter_object(); - } -}; - class Client::JsonVideoChatEnded final : public Jsonable { public: explicit JsonVideoChatEnded(const td_api::messageVideoChatEnded *video_chat_ended) @@ -1754,24 +1810,19 @@ class Client::JsonInviteVideoChatParticipants final : public Jsonable { const Client *client_; }; -class Client::JsonChatSetTtl final : public Jsonable { +class Client::JsonChatSetMessageAutoDeleteTime final : public Jsonable { public: - explicit JsonChatSetTtl(const td_api::messageChatSetTtl *chat_set_ttl) : chat_set_ttl_(chat_set_ttl) { + explicit JsonChatSetMessageAutoDeleteTime( + const td_api::messageChatSetMessageAutoDeleteTime *chat_set_message_auto_delete_time) + : chat_set_message_auto_delete_time_(chat_set_message_auto_delete_time) { } void store(JsonValueScope *scope) const { auto object = scope->enter_object(); - object("message_auto_delete_time", chat_set_ttl_->ttl_); + object("message_auto_delete_time", chat_set_message_auto_delete_time_->message_auto_delete_time_); } private: - const td_api::messageChatSetTtl *chat_set_ttl_; -}; - -class Client::JsonCallbackGame final : public Jsonable { - public: - void store(JsonValueScope *scope) const { - auto object = scope->enter_object(); - } + const td_api::messageChatSetMessageAutoDeleteTime *chat_set_message_auto_delete_time_; }; class Client::JsonWebAppInfo final : public Jsonable { @@ -1816,7 +1867,7 @@ class Client::JsonInlineKeyboardButton final : public Jsonable { break; } case td_api::inlineKeyboardButtonTypeCallbackGame::ID: - object("callback_game", JsonCallbackGame()); + object("callback_game", JsonEmptyObject()); break; case td_api::inlineKeyboardButtonTypeSwitchInline::ID: { auto type = static_cast(button_->type_.get()); @@ -1943,7 +1994,7 @@ void Client::JsonMessage::store(JsonValueScope *scope) const { object("forward_date", message_->initial_send_date); } if (message_->reply_to_message_id > 0 && need_reply_ && !message_->is_reply_to_message_deleted) { - const MessageInfo *reply_to_message = client_->get_message(message_->chat_id, message_->reply_to_message_id); + const MessageInfo *reply_to_message = client_->get_message(message_->chat_id, message_->reply_to_message_id, true); if (reply_to_message != nullptr) { object("reply_to_message", JsonMessage(reply_to_message, false, "reply in " + source_, client_)); } else { @@ -1968,6 +2019,7 @@ void Client::JsonMessage::store(JsonValueScope *scope) const { object("animation", JsonAnimation(content->animation_.get(), false, client_)); object("document", JsonAnimation(content->animation_.get(), true, client_)); add_caption(object, content->caption_); + add_media_spoiler(object, content->has_spoiler_); break; } case td_api::messageAudio::ID: { @@ -1987,6 +2039,7 @@ void Client::JsonMessage::store(JsonValueScope *scope) const { CHECK(content->photo_ != nullptr); object("photo", JsonPhoto(content->photo_.get(), client_)); add_caption(object, content->caption_); + add_media_spoiler(object, content->has_spoiler_); break; } case td_api::messageSticker::ID: { @@ -1998,6 +2051,7 @@ void Client::JsonMessage::store(JsonValueScope *scope) const { auto content = static_cast(message_->content.get()); object("video", JsonVideo(content->video_.get(), client_)); add_caption(object, content->caption_); + add_media_spoiler(object, content->has_spoiler_); break; } case td_api::messageVideoNote::ID: { @@ -2132,14 +2186,26 @@ void Client::JsonMessage::store(JsonValueScope *scope) const { object("forum_topic_created", JsonForumTopicCreated(content)); break; } - case td_api::messageForumTopicEdited::ID: + case td_api::messageForumTopicEdited::ID: { + auto content = static_cast(message_->content.get()); + object("forum_topic_edited", JsonForumTopicEdited(content)); break; + } case td_api::messageForumTopicIsClosedToggled::ID: { auto content = static_cast(message_->content.get()); if (content->is_closed_) { - object("forum_topic_closed", JsonForumTopicIsClosedToggled()); + object("forum_topic_closed", JsonEmptyObject()); } else { - object("forum_topic_reopened", JsonForumTopicIsClosedToggled()); + object("forum_topic_reopened", JsonEmptyObject()); + } + break; + } + case td_api::messageForumTopicIsHiddenToggled::ID: { + auto content = static_cast(message_->content.get()); + if (content->is_hidden_) { + object("general_forum_topic_hidden", JsonEmptyObject()); + } else { + object("general_forum_topic_unhidden", JsonEmptyObject()); } break; } @@ -2147,7 +2213,7 @@ void Client::JsonMessage::store(JsonValueScope *scope) const { auto content = static_cast(message_->content.get()); auto message_id = content->message_id_; if (message_id > 0) { - const MessageInfo *pinned_message = client_->get_message(message_->chat_id, message_id); + const MessageInfo *pinned_message = client_->get_message(message_->chat_id, message_id, true); if (pinned_message != nullptr) { object("pinned_message", JsonMessage(pinned_message, false, "pin in " + source_, client_)); } else { @@ -2169,9 +2235,9 @@ void Client::JsonMessage::store(JsonValueScope *scope) const { break; case td_api::messageScreenshotTaken::ID: break; - case td_api::messageChatSetTtl::ID: { - auto content = static_cast(message_->content.get()); - object("message_auto_delete_timer_changed", JsonChatSetTtl(content)); + case td_api::messageChatSetMessageAutoDeleteTime::ID: { + auto content = static_cast(message_->content.get()); + object("message_auto_delete_timer_changed", JsonChatSetMessageAutoDeleteTime(content)); break; } case td_api::messageUnsupported::ID: @@ -2220,8 +2286,8 @@ void Client::JsonMessage::store(JsonValueScope *scope) const { break; } case td_api::messageVideoChatStarted::ID: - object("video_chat_started", JsonVideoChatStarted()); - object("voice_chat_started", JsonVideoChatStarted()); + object("video_chat_started", JsonEmptyObject()); + object("voice_chat_started", JsonEmptyObject()); break; case td_api::messageVideoChatEnded::ID: { auto content = static_cast(message_->content.get()); @@ -2244,6 +2310,11 @@ void Client::JsonMessage::store(JsonValueScope *scope) const { } case td_api::messageGiftedPremium::ID: break; + case td_api::messageSuggestProfilePhoto::ID: + break; + case td_api::messageBotWriteAccessAllowed::ID: + object("write_access_allowed", JsonEmptyObject()); + break; default: UNREACHABLE(); } @@ -3003,19 +3074,34 @@ class Client::JsonChatsNearby : public Jsonable { class Client::JsonMessagesArray : public Jsonable { public: - explicit JsonMessagesArray(object_ptr &messages, Client *client) : messages_(messages), client_(client) { + explicit JsonMessagesArray(td::vector> &messages, Client *client) : messages_(messages), client_(client) { } void store(JsonValueScope *scope) const { auto array = scope->enter_array(); - for (auto &message : messages_->messages_) { + for (auto &message : messages_) { auto full_message_id = client_->add_message(std::move(message)); - const MessageInfo *m = client_->get_message(full_message_id.chat_id, full_message_id.message_id); + const MessageInfo *m = client_->get_message(full_message_id.chat_id, full_message_id.message_id, true); array << JsonMessage(m, true, "search", client_); } } private: - object_ptr &messages_; + td::vector> &messages_; + Client *client_; +}; + +class Client::JsonFoundMessages : public Jsonable { + public: + explicit JsonFoundMessages(object_ptr &foundMessages, Client *client) : foundMessages_(foundMessages), client_(client) { + } + void store(JsonValueScope *scope) const { + auto object = scope->enter_object(); + object("next_offset", foundMessages_->next_offset_); + object("messages", JsonMessagesArray(foundMessages_->messages_, std::move(client_))); + } + + private: + object_ptr &foundMessages_; Client *client_; }; @@ -3101,13 +3187,12 @@ class Client::TdOnAuthorizationCallback final : public TdQueryCallback { bool was_ready = client_->authorization_state_->get_id() != td_api::authorizationStateWaitPhoneNumber::ID; if (result->get_id() == td_api::error::ID) { auto error = move_object_as(result); - if (error->code_ == 429 || error->code_ >= 500 || (error->code_ != 401 && was_ready)) { + if (error->code_ != 401 && was_ready) { // try again return client_->on_update_authorization_state(); } - LOG(WARNING) << "Logging out due to " << td::oneline(to_string(error)); - client_->log_out(error->message_ == "API_ID_INVALID"); + client_->log_out(error->code_, error->message_); } else if (was_ready) { client_->on_update_authorization_state(); } @@ -3138,8 +3223,7 @@ class Client::TdOnAuthorizationQueryCallback : public TdQueryCallback { } fail_query(401, "Unauthorized: Log in failed, logging out due to " + td::oneline(to_string(error)), std::move(query_)); - LOG(WARNING) << "Logging out due to " << td::oneline(to_string(error)); - client_->log_out(false); + client_->log_out(error->code_, error->message_); } else { if (client_->authorization_state_->get_id() == td_api::authorizationStateWaitRegistration::ID && !client_->parameters_->allow_users_registration_) { @@ -3147,7 +3231,9 @@ class Client::TdOnAuthorizationQueryCallback : public TdQueryCallback { "Unauthorized: It is not allowed to register users with this api. You can enable it with the " "command line option --allow-users-registration. Logging out", std::move(query_)); - return client_->log_out(false); + return client_->log_out(401, + "Unauthorized: It is not allowed to register users with this api. You can enable it with the " + "command line option --allow-users-registration. Logging out"); } if (send_token_) { answer_query(JsonAuthorizationState(client_->authorization_state_.get(), client_->bot_token_), std::move(query_)); @@ -3203,11 +3289,13 @@ class Client::TdOnGetUserProfilePhotosCallback final : public TdQueryCallback { class Client::TdOnSendMessageCallback final : public TdQueryCallback { public: - TdOnSendMessageCallback(Client *client, PromisedQueryPtr query) : client_(client), query_(std::move(query)) { + TdOnSendMessageCallback(Client *client, int64 chat_id, PromisedQueryPtr query) + : client_(client), chat_id_(chat_id), query_(std::move(query)) { } void on_result(object_ptr result) final { if (result->get_id() == td_api::error::ID) { + client_->decrease_yet_unsent_message_count(chat_id_, 1); return fail_query_with_error(std::move(query_), move_object_as(result)); } @@ -3218,21 +3306,27 @@ class Client::TdOnSendMessageCallback final : public TdQueryCallback { private: Client *client_; + int64 chat_id_; PromisedQueryPtr query_; }; class Client::TdOnSendMessageAlbumCallback final : public TdQueryCallback { public: - TdOnSendMessageAlbumCallback(Client *client, PromisedQueryPtr query) : client_(client), query_(std::move(query)) { + TdOnSendMessageAlbumCallback(Client *client, int64 chat_id, std::size_t message_count, PromisedQueryPtr query) + : client_(client), chat_id_(chat_id), message_count_(message_count), query_(std::move(query)) { } void on_result(object_ptr result) final { if (result->get_id() == td_api::error::ID) { + if (message_count_ > 0) { + client_->decrease_yet_unsent_message_count(chat_id_, static_cast(message_count_)); + } return fail_query_with_error(std::move(query_), move_object_as(result)); } CHECK(result->get_id() == td_api::messages::ID); auto messages = move_object_as(result); + CHECK(messages->messages_.size() == message_count_); auto query_id = client_->get_send_message_query_id(std::move(query_), true); for (auto &message : messages->messages_) { client_->on_sent_message(std::move(message), query_id); @@ -3241,6 +3335,8 @@ class Client::TdOnSendMessageAlbumCallback final : public TdQueryCallback { private: Client *client_; + int64 chat_id_; + std::size_t message_count_; PromisedQueryPtr query_; }; @@ -3265,7 +3361,7 @@ class Client::TdOnDeleteFailedToSendMessageCallback final : public TdQueryCallba } CHECK(result->get_id() == td_api::ok::ID); - if (client_->get_message(chat_id_, message_id_) != nullptr) { + if (client_->get_message(chat_id_, message_id_, true) != nullptr) { LOG(ERROR) << "Have cache for message " << message_id_ << " in the chat " << chat_id_; client_->delete_message(chat_id_, message_id_, false); } @@ -3293,7 +3389,7 @@ class Client::TdOnEditMessageCallback final : public TdQueryCallback { int64 chat_id = message->chat_id_; int64 message_id = message->id_; - auto message_info = client_->get_message(chat_id, message_id); + auto message_info = client_->get_message(chat_id, message_id, true); if (message_info == nullptr) { return fail_query_with_error(std::move(query_), 400, "message not found"); } @@ -3336,7 +3432,7 @@ class Client::TdOnStopPollCallback final : public TdQueryCallback { } CHECK(result->get_id() == td_api::ok::ID); - auto message_info = client_->get_message(chat_id_, message_id_); + auto message_info = client_->get_message(chat_id_, message_id_, true); if (message_info == nullptr) { return fail_query_with_error(std::move(query_), 400, "message not found"); } @@ -3628,7 +3724,7 @@ class Client::TdOnCheckMessageThreadCallback final : public TdQueryCallback { CHECK(full_message_id.chat_id == chat_id_); CHECK(full_message_id.message_id == message_thread_id_); - const MessageInfo *message_info = client_->get_message(chat_id_, message_thread_id_); + const MessageInfo *message_info = client_->get_message(chat_id_, message_thread_id_, true); CHECK(message_info != nullptr); if (message_info->message_thread_id != message_thread_id_) { return fail_query_with_error(std::move(query_), 400, "MESSAGE_THREAD_INVALID", "Message thread not found"); @@ -4478,7 +4574,28 @@ class Client::TdOnReturnMessagesCallback : public TdQueryCallback { CHECK(result->get_id() == td_api::messages::ID); auto messages = move_object_as(result); - answer_query(JsonMessagesArray(messages, client_), std::move(query_)); + answer_query(JsonMessagesArray(messages->messages_, client_), std::move(query_)); + } + + private: + Client *client_; + PromisedQueryPtr query_; +}; + +class Client::TdOnFoundMessagesCallback : public TdQueryCallback { + public: + explicit TdOnFoundMessagesCallback(Client *client, PromisedQueryPtr query) + : client_(client), query_(std::move(query)) { + } + + void on_result(object_ptr result) override { + if (result->get_id() == td_api::error::ID) { + return fail_query_with_error(std::move(query_), move_object_as(result)); + } + CHECK(result->get_id() == td_api::messages::ID); + + auto foundMessages = move_object_as(result); + answer_query(JsonFoundMessages(foundMessages, client_), std::move(query_)); } private: @@ -4556,8 +4673,18 @@ void Client::close() { } } -void Client::log_out(bool is_api_id_invalid) { - is_api_id_invalid_ |= is_api_id_invalid; +void Client::log_out(int32 error_code, Slice error_message) { + LOG(WARNING) << "Logging out due to error " << error_code << ": " << error_message; + if (error_message == "API_ID_INVALID") { + is_api_id_invalid_ = true; + } else if (error_code == 429) { + auto retry_after_time = get_retry_after_time(error_message); + if (retry_after_time > 0) { + next_authorization_time_ = td::max(next_authorization_time_, td::Time::now() + retry_after_time); + } + } else if (error_code >= 500) { + next_authorization_time_ = td::max(next_authorization_time_, td::Time::now() + 1); + } if (!td_client_.empty() && !logging_out_ && !closing_) { do_send_request(make_object(), td::make_unique()); } @@ -4673,14 +4800,23 @@ void Client::start_up() { void Client::send(PromisedQueryPtr query) { if (!query->is_internal()) { query->set_stat_actor(stat_actor_); - if (!parameters_->local_mode_ && !is_local_method(query->method())) { - const BotStatActor *stat = stat_actor_.get_actor_unsafe(); - if (stat->get_active_request_count() > 10000) { + if (!parameters_->local_mode_ && !is_local_method(query->method()) && + td::Time::now() > parameters_->start_time_ + 10 * 60) { + BotStatActor *stat = stat_actor_.get_actor_unsafe(); + auto update_per_minute = static_cast(stat->get_minute_update_count(td::Time::now()) * 60); + if (stat->get_active_request_count() > 500 + update_per_minute) { LOG(INFO) << "Fail a query, because there are too many active queries: " << *query; + flood_limited_query_count_++; return query->set_retry_after_error(60); } - if (stat->get_active_file_upload_bytes() > (static_cast(1) << 33) && !query->files().empty()) { + if (stat->get_active_file_upload_bytes() > (static_cast(1) << 32) && !query->files().empty()) { + LOG(INFO) << "Fail a query, because the total size of active file uploads is too big: " << *query; + flood_limited_query_count_++; + return query->set_retry_after_error(60); + } + if (stat->get_active_file_upload_count() > 100 + update_per_minute / 5 && !query->files().empty()) { LOG(INFO) << "Fail a query, because there are too many active file uploads: " << *query; + flood_limited_query_count_++; return query->set_retry_after_error(60); } } @@ -4724,7 +4860,7 @@ void Client::on_get_reply_message(int64 chat_id, object_ptr rep add_message(std::move(reply_to_message)); } - process_new_message_queue(chat_id); + process_new_message_queue(chat_id, 1); } void Client::on_get_edited_message(object_ptr edited_message) { @@ -4798,7 +4934,7 @@ void Client::on_get_sticker_set(int64 set_id, int64 new_callback_query_user_id, process_new_callback_query_queue(new_callback_query_user_id, 2); } if (new_message_chat_id != 0) { - process_new_message_queue(new_message_chat_id); + process_new_message_queue(new_message_chat_id, 2); } } @@ -5058,7 +5194,7 @@ void Client::check_message_thread(int64 chat_id, int64 message_thread_id, int64 } if (reply_to_message_id != 0) { - const MessageInfo *message_info = get_message(chat_id, reply_to_message_id); + const MessageInfo *message_info = get_message(chat_id, reply_to_message_id, true); CHECK(message_info != nullptr); if (message_info->message_thread_id != message_thread_id) { return fail_query_with_error(std::move(query), 400, "MESSAGE_THREAD_INVALID", @@ -5220,12 +5356,9 @@ void Client::get_chat_member(int64 chat_id, int64 user_id, PromisedQueryPtr quer } void Client::send_request(object_ptr &&f, td::unique_ptr handler) { - if (logging_out_) { - return handler->on_result( - make_object(LOGGING_OUT_ERROR_CODE, get_logging_out_error_description().str())); - } - if (closing_) { - return handler->on_result(make_object(CLOSING_ERROR_CODE, CLOSING_ERROR_DESCRIPTION.str())); + if (closing_ || logging_out_) { + auto error = get_closing_error(); + return handler->on_result(make_object(error.code, error.message.str())); } do_send_request(std::move(f), std::move(handler)); @@ -5258,13 +5391,12 @@ void Client::on_update_file(object_ptr file) { } if (!file->local_->is_downloading_active_ && download_started_file_ids_.count(file_id)) { // also includes all 5xx and 429 errors + if (closing_ || logging_out_) { + auto error = get_closing_error(); + return on_file_download(file_id, Status::Error(error.code, error.message)); + } + auto error = Status::Error(400, "Bad Request: wrong file_id or the file is temporarily unavailable"); - if (logging_out_) { - error = Status::Error(LOGGING_OUT_ERROR_CODE, get_logging_out_error_description()); - } - if (closing_) { - error = Status::Error(CLOSING_ERROR_CODE, CLOSING_ERROR_DESCRIPTION); - } return on_file_download(file_id, std::move(error)); } } @@ -5398,7 +5530,7 @@ void Client::on_update_authorization_state() { case td_api::authorizationStateClosed::ID: return on_closed(); default: - return log_out(false); // just in case + return log_out(500, "Unknown authorization state"); // just in case } } @@ -5477,7 +5609,7 @@ void Client::on_update(object_ptr result) { deleted_messages.push_back(std::move(deleted_message)); } } - td::Scheduler::instance()->destroy_on_scheduler(get_file_gc_scheduler_id(), deleted_messages); + td::Scheduler::instance()->destroy_on_scheduler(SharedData::get_file_gc_scheduler_id(), deleted_messages); break; } case td_api::updateFile::ID: { @@ -5536,7 +5668,7 @@ void Client::on_update(object_ptr result) { chat_info->title = std::move(chat->title_); chat_info->photo_info = std::move(chat->photo_); chat_info->permissions = std::move(chat->permissions_); - chat_info->message_auto_delete_time = chat->message_ttl_; + chat_info->message_auto_delete_time = chat->message_auto_delete_time_; chat_info->has_protected_content = chat->has_protected_content_; break; } @@ -5561,11 +5693,11 @@ void Client::on_update(object_ptr result) { chat_info->permissions = std::move(update->permissions_); break; } - case td_api::updateChatMessageTtl::ID: { - auto update = move_object_as(result); + case td_api::updateChatMessageAutoDeleteTime::ID: { + auto update = move_object_as(result); auto chat_info = add_chat(update->chat_id_); CHECK(chat_info->type != ChatInfo::Type::Unknown); - chat_info->message_auto_delete_time = update->message_ttl_; + chat_info->message_auto_delete_time = update->message_auto_delete_time_; break; } case td_api::updateChatHasProtectedContent::ID: { @@ -5585,7 +5717,8 @@ void Client::on_update(object_ptr result) { auto update = move_object_as(result); auto user_id = update->user_id_; auto full_info = update->user_full_info_.get(); - set_user_photo(user_id, std::move(full_info->photo_)); + set_user_photo(user_id, + full_info->photo_ == nullptr ? std::move(full_info->public_photo_) : std::move(full_info->photo_)); if (full_info->bio_ != nullptr) { set_user_bio(user_id, std::move(full_info->bio_->text_)); } @@ -5637,6 +5770,8 @@ void Client::on_update(object_ptr result) { set_supergroup_slow_mode_delay(supergroup_id, full_info->slow_mode_delay_); set_supergroup_linked_chat_id(supergroup_id, full_info->linked_chat_id_); set_supergroup_location(supergroup_id, std::move(full_info->location_)); + set_supergroup_has_hidden_members(supergroup_id, full_info->has_hidden_members_); + set_supergroup_has_aggressive_anti_spam_enabled(supergroup_id, full_info->has_aggressive_anti_spam_enabled_); break; } case td_api::updateOption::ID: { @@ -5748,6 +5883,12 @@ void Client::on_update(object_ptr result) { void Client::on_result(td::uint64 id, object_ptr result) { LOG(DEBUG) << "Receive from Td: " << id << " " << to_string(result); + if (flood_limited_query_count_ > 0 && td::Time::now() > next_flood_limit_warning_time_) { + LOG(WARNING) << "Flood-limited " << flood_limited_query_count_ << " queries"; + flood_limited_query_count_ = 0; + next_flood_limit_warning_time_ = td::Time::now() + 1; + } + if (id == 0) { return on_update(std::move(result)); } @@ -5759,23 +5900,17 @@ void Client::on_result(td::uint64 id, object_ptr result) { handlers_.erase(id); } -td::Slice Client::get_logging_out_error_description() const { - return is_api_id_invalid_ ? API_ID_INVALID_ERROR_DESCRIPTION : LOGGING_OUT_ERROR_DESCRIPTION; -} - void Client::on_closed() { LOG(WARNING) << "Closed"; CHECK(logging_out_ || closing_); CHECK(!td_client_.empty()); td_client_.reset(); - int http_status_code = logging_out_ ? LOGGING_OUT_ERROR_CODE : CLOSING_ERROR_CODE; - Slice description = logging_out_ ? get_logging_out_error_description() : CLOSING_ERROR_DESCRIPTION; if (webhook_set_query_) { - fail_query(http_status_code, description, std::move(webhook_set_query_)); + fail_query_closing(std::move(webhook_set_query_)); } if (active_webhook_set_query_) { - fail_query(http_status_code, description, std::move(active_webhook_set_query_)); + fail_query_closing(std::move(active_webhook_set_query_)); } if (!webhook_url_.empty()) { webhook_id_.reset(); @@ -5788,23 +5923,24 @@ void Client::on_closed() { while (!cmd_queue_.empty()) { auto query = std::move(cmd_queue_.front()); cmd_queue_.pop(); - fail_query(http_status_code, description, std::move(query)); + fail_query_closing(std::move(query)); } - while (!yet_unsent_messages_.empty()) { - auto it = yet_unsent_messages_.begin(); - auto chat_id = it->first.chat_id; - auto message_id = it->first.message_id; + while (!pending_send_message_queries_.empty()) { + auto it = pending_send_message_queries_.begin(); if (!USE_MESSAGE_DATABASE) { - LOG(ERROR) << "Doesn't receive updateMessageSendFailed for message " << message_id << " in chat " << chat_id; + LOG(ERROR) << "Doesn't receive updateMessageSendFailed for " << *it->second->query << " with " + << it->second->awaited_message_count << " awaited messages"; } - on_message_send_failed(chat_id, message_id, 0, Status::Error(http_status_code, description)); + fail_query_closing(std::move(it->second->query)); + pending_send_message_queries_.erase(it); } - CHECK(yet_unsent_message_count_.empty()); + yet_unsent_message_count_.clear(); + yet_unsent_messages_.clear(); while (!pending_bot_resolve_queries_.empty()) { auto it = pending_bot_resolve_queries_.begin(); - fail_query(http_status_code, description, std::move(it->second.query)); + fail_query_closing(std::move(it->second.query)); pending_bot_resolve_queries_.erase(it); } @@ -5812,14 +5948,19 @@ void Client::on_closed() { auto it = file_download_listeners_.begin(); auto file_id = it->first; LOG(ERROR) << "Doesn't receive updateFile for file " << file_id; - on_file_download(file_id, Status::Error(http_status_code, description)); + auto queries = std::move(it->second); + file_download_listeners_.erase(it); + for (auto &query : queries) { + fail_query_closing(std::move(query)); + } } + download_started_file_ids_.clear(); if (logging_out_) { parameters_->shared_data_->webhook_db_->erase(bot_token_with_dc_); parameters_->shared_data_->user_db_->erase(bot_token_with_dc_); - td::Scheduler::instance()->run_on_scheduler(get_file_gc_scheduler_id(), + td::Scheduler::instance()->run_on_scheduler(SharedData::get_file_gc_scheduler_id(), [actor_id = actor_id(this), dir = dir_](td::Unit) { CHECK(dir.size() >= 24); CHECK(dir.back() == TD_DIR_SLASH); @@ -5837,7 +5978,18 @@ void Client::finish_closing() { clear_tqueue(); } - set_timeout_in(need_close_ ? 0 : 600); + if (need_close_) { + return stop(); + } + + auto timeout = [&] { + if (next_authorization_time_ <= 0.0) { + return 600.0; + } + return td::min(next_authorization_time_ - td::Time::now(), 600.0); + }(); + set_timeout_in(timeout); + LOG(INFO) << "Keep client opened for " << timeout << " seconds"; } void Client::timeout_expired() { @@ -5845,28 +5997,10 @@ void Client::timeout_expired() { stop(); } -td::int32 Client::get_database_scheduler_id() { - // the same scheduler as for database in Td - return 1; -} - -td::int32 Client::get_file_gc_scheduler_id() { - // the same scheduler as for file GC in Td - return 2; -} - void Client::clear_tqueue() { CHECK(webhook_id_.empty()); auto &tqueue = parameters_->shared_data_->tqueue_; - auto size = tqueue->get_size(tqueue_id_); - if (size > 0) { - LOG(INFO) << "Removing " << size << " tqueue events"; - td::MutableSpan span; - auto r_size = tqueue->get(tqueue_id_, tqueue->get_tail(tqueue_id_), true, 0, span); - CHECK(r_size.is_ok()); - CHECK(r_size.ok() == 0); - CHECK(tqueue->get_size(tqueue_id_) == 0); - } + tqueue->clear(tqueue_id_, 0); } bool Client::to_bool(td::MutableSlice value) { @@ -6042,7 +6176,8 @@ td::Result> Client::get_reply_markup(Jso td::vector>> rows; td::vector>> inline_rows; Slice input_field_placeholder; - bool resize = false; + bool resize_keyboard = false; + bool is_persistent = false; bool one_time = false; bool remove = false; bool is_personal = false; @@ -6096,7 +6231,12 @@ td::Result> Client::get_reply_markup(Jso if (field_value.second.type() != JsonValue::Type::Boolean) { return Status::Error(400, "Field \"resize_keyboard\" of the ReplyKeyboardMarkup must be of the type Boolean"); } - resize = field_value.second.get_boolean(); + resize_keyboard = field_value.second.get_boolean(); + } else if (field_value.first == "is_persistent") { + if (field_value.second.type() != JsonValue::Type::Boolean) { + return Status::Error(400, "Field \"is_persistent\" of the ReplyKeyboardMarkup must be of the type Boolean"); + } + is_persistent = field_value.second.get_boolean(); } else if (field_value.first == "one_time_keyboard") { if (field_value.second.type() != JsonValue::Type::Boolean) { return Status::Error(400, "Field \"one_time_keyboard\" of the ReplyKeyboardMarkup must be of the type Boolean"); @@ -6127,8 +6267,8 @@ td::Result> Client::get_reply_markup(Jso object_ptr result; if (!rows.empty()) { - result = make_object(std::move(rows), resize, one_time, is_personal, - input_field_placeholder.str()); + result = make_object(std::move(rows), is_persistent, resize_keyboard, one_time, + is_personal, input_field_placeholder.str()); } else if (!inline_rows.empty()) { result = make_object(std::move(inline_rows)); } else if (remove) { @@ -6671,7 +6811,7 @@ td::Result> Client::get_inlin if (input_message_content == nullptr) { input_message_content = make_object( - nullptr, nullptr, td::vector(), gif_duration, gif_width, gif_height, std::move(caption)); + nullptr, nullptr, td::vector(), gif_duration, gif_width, gif_height, std::move(caption), false); } return make_object( id, title, thumbnail_url, thumbnail_mime_type, gif_url, "image/gif", gif_duration, gif_width, gif_height, @@ -6712,7 +6852,7 @@ td::Result> Client::get_inlin if (input_message_content == nullptr) { input_message_content = make_object( - nullptr, nullptr, td::vector(), mpeg4_duration, mpeg4_width, mpeg4_height, std::move(caption)); + nullptr, nullptr, td::vector(), mpeg4_duration, mpeg4_width, mpeg4_height, std::move(caption), false); } return make_object( id, title, thumbnail_url, thumbnail_mime_type, mpeg4_url, "video/mp4", mpeg4_duration, mpeg4_width, @@ -6730,8 +6870,8 @@ td::Result> Client::get_inlin } if (input_message_content == nullptr) { - input_message_content = - make_object(nullptr, nullptr, td::vector(), 0, 0, std::move(caption), 0); + input_message_content = make_object(nullptr, nullptr, td::vector(), 0, 0, + std::move(caption), 0, false); } return make_object(id, title, description, thumbnail_url, photo_url, photo_width, photo_height, std::move(reply_markup), @@ -6801,7 +6941,7 @@ td::Result> Client::get_inlin if (input_message_content == nullptr) { input_message_content = make_object(nullptr, nullptr, td::vector(), video_duration, video_width, - video_height, false, std::move(caption), 0); + video_height, false, std::move(caption), 0, false); } return make_object(id, title, description, thumbnail_url, video_url, mime_type, video_width, video_height, video_duration, @@ -7500,8 +7640,9 @@ td::Result> Client::get_input_me TRY_RESULT(parse_mode, get_json_object_string_field(object, "parse_mode")); auto entities = get_json_object_field_force(object, "caption_entities"); TRY_RESULT(caption, get_formatted_text(std::move(input_caption), std::move(parse_mode), std::move(entities))); - // TRY_RESULT(ttl, get_json_object_int_field(object, "ttl")); - int32 ttl = 0; + // TRY_RESULT(self_destruct_time, get_json_object_int_field(object, "self_destruct_time")); + int32 self_destruct_time = 0; + TRY_RESULT(has_spoiler, get_json_object_bool_field(object, "has_spoiler")); TRY_RESULT(media, get_json_object_string_field(object, "media", true)); auto input_file = get_input_file(query, Slice(), media, false); @@ -7519,7 +7660,8 @@ td::Result> Client::get_input_me TRY_RESULT(type, get_json_object_string_field(object, "type", false)); if (type == "photo") { return make_object(std::move(input_file), std::move(input_thumbnail), - td::vector(), 0, 0, std::move(caption), ttl); + td::vector(), 0, 0, std::move(caption), self_destruct_time, + has_spoiler); } if (type == "video") { TRY_RESULT(width, get_json_object_int_field(object, "width")); @@ -7532,7 +7674,7 @@ td::Result> Client::get_input_me return make_object(std::move(input_file), std::move(input_thumbnail), td::vector(), duration, width, height, supports_streaming, - std::move(caption), ttl); + std::move(caption), self_destruct_time, has_spoiler); } if (for_album && type == "animation") { return Status::Error(PSLICE() << "type \"" << type << "\" can't be used in sendMediaGroup"); @@ -7545,7 +7687,8 @@ td::Result> Client::get_input_me height = td::clamp(height, 0, MAX_LENGTH); duration = td::clamp(duration, 0, MAX_DURATION); return make_object(std::move(input_file), std::move(input_thumbnail), - td::vector(), duration, width, height, std::move(caption)); + td::vector(), duration, width, height, std::move(caption), + has_spoiler); } if (type == "audio") { TRY_RESULT(duration, get_json_object_int_field(object, "duration")); @@ -7753,6 +7896,16 @@ td::Result Client::get_user_id(const Query *query, Slice field_name) return user_id; } +void Client::decrease_yet_unsent_message_count(int64 chat_id, int32 count) { + auto count_it = yet_unsent_message_count_.find(chat_id); + CHECK(count_it != yet_unsent_message_count_.end()); + CHECK(count_it->second >= count); + count_it->second -= count; + if (count_it->second == 0) { + yet_unsent_message_count_.erase(count_it); + } +} + td::int64 Client::extract_yet_unsent_message_query_id(int64 chat_id, int64 message_id, bool *is_reply_to_message_deleted) { auto yet_unsent_message_it = yet_unsent_messages_.find({chat_id, message_id}); @@ -7766,12 +7919,7 @@ td::int64 Client::extract_yet_unsent_message_query_id(int64 chat_id, int64 messa yet_unsent_messages_.erase(yet_unsent_message_it); - auto count_it = yet_unsent_message_count_.find(chat_id); - CHECK(count_it != yet_unsent_message_count_.end()); - CHECK(count_it->second > 0); - if (--count_it->second == 0) { - yet_unsent_message_count_.erase(count_it); - } + decrease_yet_unsent_message_count(chat_id, 1); if (reply_to_message_id > 0) { auto it = yet_unsent_reply_message_ids_.find({chat_id, reply_to_message_id}); @@ -7919,7 +8067,7 @@ void Client::on_message_send_succeeded(object_ptr &&message, in int64 new_message_id = full_message_id.message_id; CHECK(new_message_id > 0); - auto message_info = get_message(chat_id, new_message_id); + auto message_info = get_message(chat_id, new_message_id, true); CHECK(message_info != nullptr); message_info->is_content_changed = false; @@ -7956,7 +8104,7 @@ void Client::on_message_send_failed(int64 chat_id, int64 old_message_id, int64 n auto &query = *pending_send_message_queries_[query_id]; if (query.is_multisend) { if (query.error == nullptr || query.error->message_ == "Group send failed") { - if (error->code_ == 429 || error->code_ >= 500 || error->message_ == "Group send failed") { + if (error->code_ == 401 || error->code_ == 429 || error->code_ >= 500 || error->message_ == "Group send failed") { query.error = std::move(error); } else { auto pos = (query.total_message_count - query.awaited_message_count + 1); @@ -7985,7 +8133,7 @@ void Client::on_message_send_failed(int64 chat_id, int64 old_message_id, int64 n void Client::on_cmd(PromisedQueryPtr query) { LOG(DEBUG) << "Process query " << *query; - if (!td_client_.empty()) { + if (!td_client_.empty() && was_authorized_) { if (query->method() == "close") { auto retry_after = static_cast(10 * 60 - (td::Time::now() - start_time_)); if (retry_after > 0 && start_time_ > parameters_->start_time_ + 10 * 60) { @@ -8139,6 +8287,17 @@ td::Status Client::process_get_user_profile_photos_query(PromisedQueryPtr &query } td::Status Client::process_send_message_query(PromisedQueryPtr &query) { + auto r_chat_id = td::to_integer_safe(query->arg("chat_id")); + if (r_chat_id.is_ok()) { + // fast path + auto it = yet_unsent_message_count_.find(r_chat_id.ok()); + if (it != yet_unsent_message_count_.end() && it->second >= MAX_CONCURRENTLY_SENT_CHAT_MESSAGES) { + flood_limited_query_count_++; + query->set_retry_after_error(60); + return Status::OK(); + } + } + TRY_RESULT(input_message_text, get_input_message_text(query.get())); do_send_message(std::move(input_message_text), std::move(query)); return Status::OK(); @@ -8154,9 +8313,10 @@ td::Status Client::process_send_animation_query(PromisedQueryPtr &query) { int32 width = get_integer_arg(query.get(), "width", 0, 0, MAX_LENGTH); int32 height = get_integer_arg(query.get(), "height", 0, 0, MAX_LENGTH); TRY_RESULT(caption, get_caption(query.get())); + auto has_spoiler = to_bool(query->arg("has_spoiler")); do_send_message( make_object(std::move(animation), std::move(thumbnail), td::vector(), - duration, width, height, std::move(caption)), + duration, width, height, std::move(caption), has_spoiler), std::move(query)); return Status::OK(); } @@ -8203,9 +8363,10 @@ td::Status Client::process_send_photo_query(PromisedQueryPtr &query) { return Status::Error(400, "There is no photo in the request"); } TRY_RESULT(caption, get_caption(query.get())); - auto ttl = 0; + auto self_destruct_time = 0; + auto has_spoiler = to_bool(query->arg("has_spoiler")); do_send_message(make_object(std::move(photo), nullptr, td::vector(), 0, 0, - std::move(caption), ttl), + std::move(caption), self_destruct_time, has_spoiler), std::move(query)); return Status::OK(); } @@ -8231,11 +8392,12 @@ td::Status Client::process_send_video_query(PromisedQueryPtr &query) { int32 height = get_integer_arg(query.get(), "height", 0, 0, MAX_LENGTH); bool supports_streaming = to_bool(query->arg("supports_streaming")); TRY_RESULT(caption, get_caption(query.get())); - auto ttl = 0; - do_send_message( - make_object(std::move(video), std::move(thumbnail), td::vector(), duration, - width, height, supports_streaming, std::move(caption), ttl), - std::move(query)); + auto self_destruct_time = 0; + auto has_spoiler = to_bool(query->arg("has_spoiler")); + do_send_message(make_object(std::move(video), std::move(thumbnail), td::vector(), + duration, width, height, supports_streaming, + std::move(caption), self_destruct_time, has_spoiler), + std::move(query)); return Status::OK(); } @@ -8436,21 +8598,25 @@ td::Status Client::process_send_media_group_query(PromisedQueryPtr &query) { input_message_contents = std::move(input_message_contents), reply_markup = std::move(reply_markup), send_at = std::move(send_at)](int64 chat_id, int64 reply_to_message_id, PromisedQueryPtr query) mutable { - auto on_message_thread_checked = - [this, disable_notification, protect_content, input_message_contents = std::move(input_message_contents), - reply_markup = std::move(reply_markup), send_at = std::move(send_at)](int64 chat_id, int64 message_thread_id, - int64 reply_to_message_id, PromisedQueryPtr query) mutable { - auto it = yet_unsent_message_count_.find(chat_id); - if (it != yet_unsent_message_count_.end() && it->second > MAX_CONCURRENTLY_SENT_CHAT_MESSAGES) { - return query->set_retry_after_error(60); - } + auto on_message_thread_checked = [this, disable_notification, protect_content, + input_message_contents = std::move(input_message_contents), + reply_markup = std::move(reply_markup), send_at = std::move(send_at)]( + int64 chat_id, int64 message_thread_id, int64 reply_to_message_id, + PromisedQueryPtr query) mutable { + auto &count = yet_unsent_message_count_[chat_id]; + if (count >= MAX_CONCURRENTLY_SENT_CHAT_MESSAGES) { + flood_limited_query_count_++; + return query->set_retry_after_error(60); + } + auto message_count = input_message_contents.size(); + count += static_cast(message_count); - send_request(make_object( - chat_id, message_thread_id, reply_to_message_id, - get_message_send_options(disable_notification, protect_content, std::move(send_at)), - std::move(input_message_contents), false), - td::make_unique(this, std::move(query))); - }; + send_request( + make_object(chat_id, message_thread_id, reply_to_message_id, + get_message_send_options(disable_notification, protect_content, std::move(send_at)), + std::move(input_message_contents), false), + td::make_unique(this, chat_id, message_count, std::move(query))); + }; check_message_thread(chat_id, message_thread_id, reply_to_message_id, std::move(query), std::move(on_message_thread_checked)); }; @@ -8462,14 +8628,15 @@ td::Status Client::process_send_media_group_query(PromisedQueryPtr &query) { td::Status Client::process_send_chat_action_query(PromisedQueryPtr &query) { auto chat_id = query->arg("chat_id"); + auto message_thread_id = get_message_id(query.get(), "message_thread_id"); object_ptr action = get_chat_action(query.get()); if (action == nullptr) { return Status::Error(400, "Wrong parameter action in request"); } check_chat(chat_id, AccessRights::Write, std::move(query), - [this, action = std::move(action)](int64 chat_id, PromisedQueryPtr query) mutable { - send_request(make_object(chat_id, 0, std::move(action)), + [this, message_thread_id, action = std::move(action)](int64 chat_id, PromisedQueryPtr query) mutable { + send_request(make_object(chat_id, message_thread_id, std::move(action)), td::make_unique(std::move(query))); }); return Status::OK(); @@ -8668,7 +8835,6 @@ td::Status Client::process_delete_message_query(PromisedQueryPtr &query) { check_message(chat_id, message_id, false, AccessRights::Write, "message to delete", std::move(query), [this](int64 chat_id, int64 message_id, PromisedQueryPtr query) { - delete_message(chat_id, message_id, false); send_request(make_object(chat_id, td::vector{message_id}, true), td::make_unique(std::move(query))); }); @@ -9087,11 +9253,14 @@ td::Status Client::process_edit_forum_topic_query(PromisedQueryPtr &query) { auto chat_id = query->arg("chat_id"); auto message_thread_id = get_message_id(query.get(), "message_thread_id"); auto name = query->arg("name"); + auto edit_icon_custom_emoji_id = query->has_arg("icon_custom_emoji_id"); auto icon_custom_emoji_id = td::to_integer(query->arg("icon_custom_emoji_id")); check_chat(chat_id, AccessRights::Write, std::move(query), - [this, message_thread_id, name = name.str(), icon_custom_emoji_id](int64 chat_id, PromisedQueryPtr query) { - send_request(make_object(chat_id, message_thread_id, name, icon_custom_emoji_id), + [this, message_thread_id, name = name.str(), edit_icon_custom_emoji_id, icon_custom_emoji_id]( + int64 chat_id, PromisedQueryPtr query) { + send_request(make_object(chat_id, message_thread_id, name, + edit_icon_custom_emoji_id, icon_custom_emoji_id), td::make_unique(std::move(query))); }); return Status::OK(); @@ -9145,6 +9314,58 @@ td::Status Client::process_unpin_all_forum_topic_messages_query(PromisedQueryPtr return Status::OK(); } +td::Status Client::process_edit_general_forum_topic_query(PromisedQueryPtr &query) { + auto chat_id = query->arg("chat_id"); + auto name = query->arg("name"); + + check_chat(chat_id, AccessRights::Write, std::move(query), + [this, name = name.str()](int64 chat_id, PromisedQueryPtr query) { + send_request(make_object(chat_id, GENERAL_MESSAGE_THREAD_ID, name, false, 0), + td::make_unique(std::move(query))); + }); + return Status::OK(); +} + +td::Status Client::process_close_general_forum_topic_query(PromisedQueryPtr &query) { + auto chat_id = query->arg("chat_id"); + + check_chat(chat_id, AccessRights::Write, std::move(query), [this](int64 chat_id, PromisedQueryPtr query) { + send_request(make_object(chat_id, GENERAL_MESSAGE_THREAD_ID, true), + td::make_unique(std::move(query))); + }); + return Status::OK(); +} + +td::Status Client::process_reopen_general_forum_topic_query(PromisedQueryPtr &query) { + auto chat_id = query->arg("chat_id"); + + check_chat(chat_id, AccessRights::Write, std::move(query), [this](int64 chat_id, PromisedQueryPtr query) { + send_request(make_object(chat_id, GENERAL_MESSAGE_THREAD_ID, false), + td::make_unique(std::move(query))); + }); + return Status::OK(); +} + +td::Status Client::process_hide_general_forum_topic_query(PromisedQueryPtr &query) { + auto chat_id = query->arg("chat_id"); + + check_chat(chat_id, AccessRights::Write, std::move(query), [this](int64 chat_id, PromisedQueryPtr query) { + send_request(make_object(chat_id, true), + td::make_unique(std::move(query))); + }); + return Status::OK(); +} + +td::Status Client::process_unhide_general_forum_topic_query(PromisedQueryPtr &query) { + auto chat_id = query->arg("chat_id"); + + check_chat(chat_id, AccessRights::Write, std::move(query), [this](int64 chat_id, PromisedQueryPtr query) { + send_request(make_object(chat_id, false), + td::make_unique(std::move(query))); + }); + return Status::OK(); +} + td::Status Client::process_get_chat_member_query(PromisedQueryPtr &query) { auto chat_id = query->arg("chat_id"); TRY_RESULT(user_id, get_user_id(query.get())); @@ -9727,7 +9948,7 @@ td::Status Client::process_set_webhook_query(PromisedQueryPtr &query) { if (now > next_set_webhook_logging_time_ || webhook_url_ != new_url) { next_set_webhook_logging_time_ = now + 300; LOG(WARNING) << "Set webhook to " << new_url << ", max_connections = " << new_max_connections - << ", IP address = " << new_ip_address; + << ", IP address = " << new_ip_address << ", drop_pending_updates = " << drop_pending_updates; } if (!new_url.empty()) { @@ -9773,7 +9994,7 @@ td::Status Client::process_get_message_info_query(PromisedQueryPtr &query) { auto send_reply = to_bool(query->arg("send_reply")); check_message(chat_id, message_id, false, AccessRights::Read, "message", std::move(query), [this, send_reply](int64 chat_id, int64 message_id, PromisedQueryPtr query) { - auto message = get_message(chat_id, message_id); + auto message = get_message(chat_id, message_id, true); answer_query(JsonMessage(message, send_reply, "get message info", this), std::move(query)); }); @@ -10080,16 +10301,16 @@ td::Status Client::process_create_chat_query(PromisedQueryPtr &query) { auto chat_type = query->arg("type"); auto title = query->arg("title"); auto description = query->arg("description"); - + auto message_auto_delete_time = get_integer_arg(query.get(), "message_auto_delete_time", 0); if (chat_type == "supergroup") { - send_request(make_object(title.str(), false, description.str(), nullptr, false), + send_request(make_object(title.str(), false, description.str(), nullptr, message_auto_delete_time, false), td::make_unique(this, std::move(query))); } else if (chat_type == "channel") { - send_request(make_object(title.str(), true, description.str(), nullptr, false), + send_request(make_object(title.str(), true, description.str(), nullptr, message_auto_delete_time, false), td::make_unique(this, std::move(query))); } else if (chat_type == "group") { TRY_RESULT(initial_members, get_int_array_arg(query.get(), "user_ids")) - send_request(make_object(std::move(initial_members), title.str()), + send_request(make_object(std::move(initial_members), title.str(), message_auto_delete_time), td::make_unique(this, std::move(query))); } else { return Status::Error(400, "Chat type is not specified"); @@ -10100,15 +10321,12 @@ td::Status Client::process_create_chat_query(PromisedQueryPtr &query) { td::Status Client::process_search_messages_query(PromisedQueryPtr &query) { CHECK_IS_USER(); auto query_ = query->arg("query"); - auto offset_date = get_integer_arg(query.get(), "offset_date", 0); - auto offset_chat_id = get_int64_arg(query.get(), "offset_chat_id", 0); - auto offset_message_id = get_int64_arg(query.get(), "offset_message_id", 0); + auto offset = query->arg("offset"); TRY_RESULT(filter, get_search_messages_filter(query.get())); auto min_date = get_integer_arg(query.get(), "min_date", 0); auto max_date = get_integer_arg(query.get(), "max_date", 0); - send_request(make_object(nullptr, query_.str(), offset_date, offset_chat_id, - offset_message_id, 100, std::move(filter), min_date, max_date), + send_request(make_object(nullptr, query_.str(), offset.str(), 100, std::move(filter), min_date, max_date), td::make_unique(this, std::move(query))); return Status::OK(); } @@ -10130,7 +10348,7 @@ td::Status Client::process_search_chat_messages_query(PromisedQueryPtr &query) { int64 chat_id, PromisedQueryPtr query) mutable { send_request(make_object(chat_id, query_.str(), std::move(sender), from_message_id, 0, 100, std::move(filter), 0), - td::make_unique(this, std::move(query))); + td::make_unique(this, std::move(query))); }); return Status::OK(); } @@ -10346,7 +10564,7 @@ void Client::webhook_error(Status status) { void Client::webhook_closed(Status status) { if (has_webhook_certificate_) { - td::Scheduler::instance()->run_on_scheduler(get_database_scheduler_id(), + td::Scheduler::instance()->run_on_scheduler(SharedData::get_database_scheduler_id(), [actor_id = actor_id(this), path = get_webhook_certificate_path(), status = std::move(status)](td::Unit) mutable { LOG(INFO) << "Unlink certificate " << path; @@ -10459,8 +10677,9 @@ void Client::do_set_webhook(PromisedQueryPtr query, bool was_deleted) { CHECK(!webhook_set_query_); active_webhook_set_query_ = std::move(query); td::Scheduler::instance()->run_on_scheduler( - get_database_scheduler_id(), [actor_id = actor_id(this), from_path = cert_file_ptr->temp_file_name, - to_path = get_webhook_certificate_path(), size](td::Unit) mutable { + SharedData::get_database_scheduler_id(), + [actor_id = actor_id(this), from_path = cert_file_ptr->temp_file_name, + to_path = get_webhook_certificate_path(), size](td::Unit) mutable { LOG(INFO) << "Copy certificate to " << to_path; auto status = td::copy_file(from_path, to_path, size); send_closure(actor_id, &Client::on_webhook_certificate_copied, std::move(status)); @@ -10554,16 +10773,18 @@ void Client::do_send_message(object_ptr input_messa [this, disable_notification, protect_content, input_message_content = std::move(input_message_content), reply_markup = std::move(reply_markup), send_at = std::move(send_at)](int64 chat_id, int64 message_thread_id, int64 reply_to_message_id, PromisedQueryPtr query) mutable { - auto it = yet_unsent_message_count_.find(chat_id); - if (it != yet_unsent_message_count_.end() && it->second > MAX_CONCURRENTLY_SENT_CHAT_MESSAGES) { + auto &count = yet_unsent_message_count_[chat_id]; + if (count >= MAX_CONCURRENTLY_SENT_CHAT_MESSAGES) { + flood_limited_query_count_++; return query->set_retry_after_error(60); } + count++; send_request( make_object(chat_id, message_thread_id, reply_to_message_id, get_message_send_options(disable_notification, protect_content, std::move(send_at)), std::move(reply_markup), std::move(input_message_content)), - td::make_unique(this, std::move(query))); + td::make_unique(this, chat_id, std::move(query))); }; check_message_thread(chat_id, message_thread_id, reply_to_message_id, std::move(query), std::move(on_message_thread_checked)); @@ -10600,7 +10821,6 @@ void Client::on_sent_message(object_ptr &&message, int64 query_ yet_unsent_message.send_message_query_id = query_id; auto emplace_result = yet_unsent_messages_.emplace(yet_unsent_message_id, yet_unsent_message); CHECK(emplace_result.second); - yet_unsent_message_count_[chat_id]++; auto &query = *pending_send_message_queries_[query_id]; query.awaited_message_count++; @@ -10635,14 +10855,43 @@ void Client::fail_query_conflict(Slice message, PromisedQueryPtr &&query) { } } -void Client::fail_query_closing(PromisedQueryPtr &&query) const { +void Client::fail_query_closing(PromisedQueryPtr &&query) { + auto error = get_closing_error(); + if (error.retry_after > 0) { + query->set_retry_after_error(error.retry_after); + } else { + fail_query(error.code, error.message, std::move(query)); + } +} + +Client::ClosingError Client::get_closing_error() { + ClosingError result; + result.retry_after = 0; if (logging_out_) { - return fail_query(LOGGING_OUT_ERROR_CODE, get_logging_out_error_description(), std::move(query)); + if (is_api_id_invalid_) { + result.code = 401; + result.message = Slice("Unauthorized: invalid api-id/api-hash"); + } else if (next_authorization_time_ > 0.0) { + result.code = 429; + result.retry_after = td::max(static_cast(next_authorization_time_ - td::Time::now()), 0) + 1; + if (result.retry_after != prev_retry_after) { + prev_retry_after = result.retry_after; + retry_after_error_message = PSTRING() << "Too Many Requests: retry after " << result.retry_after; + } + result.message = retry_after_error_message; + } else if (clear_tqueue_) { + result.code = 400; + result.message = Slice("Logged out"); + } else { + result.code = 401; + result.message = Slice("Unauthorized"); + } + } else { + CHECK(closing_); + result.code = 500; + result.message = Slice("Internal Server Error: restart"); } - if (closing_) { - return fail_query(CLOSING_ERROR_CODE, CLOSING_ERROR_DESCRIPTION, std::move(query)); - } - UNREACHABLE(); + return result; } class Client::JsonUpdates final : public Jsonable { @@ -10670,17 +10919,11 @@ void Client::do_get_updates(int32 offset, int32 limit, int32 timeout, PromisedQu LOG(DEBUG) << "Get updates with offset = " << offset << ", limit = " << limit << " and timeout = " << timeout; LOG(DEBUG) << "Queue head = " << tqueue->get_head(tqueue_id_) << ", queue tail = " << tqueue->get_tail(tqueue_id_); + if (offset < 0) { + tqueue->clear(tqueue_id_, -offset); + } if (offset <= 0) { - auto head = tqueue->get_head(tqueue_id_); - if (!head.empty() && offset < 0) { - // negative offset is counted from the end - auto tail = tqueue->get_tail(tqueue_id_); - CHECK(!tail.empty()); - offset += tail.value(); - } - if (offset < head.value()) { - offset = head.value(); - } + offset = tqueue->get_head(tqueue_id_).value(); } td::MutableSpan updates(parameters_->shared_data_->event_buffer_, @@ -10949,6 +11192,15 @@ void Client::set_supergroup_location(int64 supergroup_id, object_ptrlocation = std::move(location); } +void Client::set_supergroup_has_hidden_members(int64 supergroup_id, bool has_hidden_members) { + add_supergroup_info(supergroup_id)->has_hidden_members = has_hidden_members; +} + +void Client::set_supergroup_has_aggressive_anti_spam_enabled(int64 supergroup_id, + bool has_aggressive_anti_spam_enabled) { + add_supergroup_info(supergroup_id)->has_aggressive_anti_spam_enabled = has_aggressive_anti_spam_enabled; +} + Client::SupergroupInfo *Client::add_supergroup_info(int64 supergroup_id) { auto &supergroup_info = supergroups_[supergroup_id]; if (supergroup_info == nullptr) { @@ -11324,7 +11576,7 @@ void Client::add_new_message(object_ptr &&message, bool is_edit auto chat_id = message->chat_id_; CHECK(chat_id != 0); new_message_queues_[chat_id].queue_.emplace(std::move(message), is_edited); - process_new_message_queue(chat_id); + process_new_message_queue(chat_id, 0); } void Client::add_update_poll(object_ptr &&update) { @@ -11365,9 +11617,13 @@ void Client::process_new_callback_query_queue(int64 user_id, int state) { auto &queue = new_callback_query_queues_[user_id]; if (queue.has_active_request_) { CHECK(state == 0); + CHECK(!queue.queue_.empty()); + LOG(INFO) << "Have an active request in callback query queue of size " << queue.queue_.size() << " for user " + << user_id; return; } if (logging_out_ || closing_) { + LOG(INFO) << "Ignore callback query while closing for user " << user_id; new_callback_query_queues_.erase(user_id); return; } @@ -11375,8 +11631,10 @@ void Client::process_new_callback_query_queue(int64 user_id, int state) { auto &query = queue.queue_.front(); int64 chat_id = query->chat_id_; int64 message_id = query->message_id_; - auto message_info = get_message(chat_id, message_id); + auto message_info = get_message(chat_id, message_id, state > 0); // callback message can be already deleted in the bot outbox + LOG(INFO) << "Process callback query from user " << user_id << " in message " << message_id << " in chat " + << chat_id << " with state " << state; if (state == 0) { if (message_info == nullptr) { // get the message from the server @@ -11389,7 +11647,7 @@ void Client::process_new_callback_query_queue(int64 user_id, int state) { if (state == 1) { auto reply_to_message_id = message_info == nullptr || message_info->is_reply_to_message_deleted ? 0 : message_info->reply_to_message_id; - if (reply_to_message_id > 0 && get_message(chat_id, reply_to_message_id) == nullptr) { + if (reply_to_message_id > 0 && get_message(chat_id, reply_to_message_id, false) == nullptr) { queue.has_active_request_ = true; return send_request(make_object(chat_id, message_id), td::make_unique(this, user_id, state)); @@ -11406,7 +11664,7 @@ void Client::process_new_callback_query_queue(int64 user_id, int state) { auto reply_to_message_id = message_info == nullptr || message_info->is_reply_to_message_deleted ? 0 : message_info->reply_to_message_id; if (reply_to_message_id > 0) { - auto reply_to_message_info = get_message(chat_id, reply_to_message_id); + auto reply_to_message_info = get_message(chat_id, reply_to_message_id, true); auto reply_sticker_set_id = reply_to_message_info == nullptr ? 0 : get_sticker_set_id(reply_to_message_info->content); if (!have_sticker_set_name(reply_sticker_set_id)) { @@ -11517,7 +11775,9 @@ bool Client::need_skip_update_message(int64 chat_id, const object_ptrttl_ > 0 && message->ttl_expires_in_ == message->ttl_) { + if (message->self_destruct_time_ > 0) { return true; } -*/ + if (message->forward_info_ != nullptr && message->forward_info_->origin_->get_id() == td_api::messageForwardOriginMessageImport::ID) { return true; @@ -11581,7 +11840,7 @@ bool Client::need_skip_update_message(int64 chat_id, const object_ptrid_); + const MessageInfo *old_message = get_message(chat_id, message->id_, true); if (old_message != nullptr && !old_message->is_content_changed) { return true; } @@ -11818,7 +12077,7 @@ td::string Client::get_sticker_set_name(int64 sticker_set_id) const { return sticker_set_names_.get(sticker_set_id); } -void Client::process_new_message_queue(int64 chat_id) { +void Client::process_new_message_queue(int64 chat_id, int state) { auto &queue = new_message_queues_[chat_id]; if (queue.has_active_request_) { return; @@ -11832,7 +12091,7 @@ void Client::process_new_message_queue(int64 chat_id) { CHECK(chat_id == message_ref->chat_id_); int64 message_id = message_ref->id_; int64 reply_to_message_id = get_reply_to_message_id(message_ref); - if (reply_to_message_id > 0 && get_message(chat_id, reply_to_message_id) == nullptr) { + if (reply_to_message_id > 0 && get_message(chat_id, reply_to_message_id, state > 0) == nullptr) { queue.has_active_request_ = true; return send_request(make_object(chat_id, message_id), td::make_unique(this, chat_id)); @@ -11844,7 +12103,7 @@ void Client::process_new_message_queue(int64 chat_id) { td::make_unique(this, message_sticker_set_id, 0, chat_id)); } if (reply_to_message_id > 0) { - auto reply_to_message_info = get_message(chat_id, reply_to_message_id); + auto reply_to_message_info = get_message(chat_id, reply_to_message_id, true); CHECK(reply_to_message_info != nullptr); auto reply_sticker_set_id = get_sticker_set_id(reply_to_message_info->content); if (!have_sticker_set_name(reply_sticker_set_id)) { @@ -11908,7 +12167,7 @@ void Client::process_new_message_queue(int64 chat_id) { auto left_time = message_date + 86400 - now; add_message(std::move(message)); - auto message_info = get_message(chat_id, message_id); + auto message_info = get_message(chat_id, message_id, true); CHECK(message_info != nullptr); message_info->is_content_changed = false; @@ -12110,6 +12369,8 @@ Client::FullMessageId Client::add_message(object_ptr &&message, send_request(make_object(sticker_set_id), td::make_unique(this, sticker_set_id, 0, 0)); } + } else if (message->content_->get_id() == td_api::messagePoll::ID) { + message_info->content = std::move(message->content_); } set_message_reply_markup(message_info.get(), std::move(message->reply_markup_)); @@ -12139,14 +12400,18 @@ void Client::on_update_message_edited(int64 chat_id, int64 message_id, int32 edi set_message_reply_markup(message_info, std::move(reply_markup)); } -const Client::MessageInfo *Client::get_message(int64 chat_id, int64 message_id) const { +const Client::MessageInfo *Client::get_message(int64 chat_id, int64 message_id, bool force_cache) const { auto message_info = messages_.get_pointer({chat_id, message_id}); if (message_info == nullptr) { LOG(DEBUG) << "Not found message " << message_id << " from chat " << chat_id; return nullptr; } - LOG(DEBUG) << "Found message " << message_id << " from chat " << chat_id; + if (!force_cache && message_info->content->get_id() == td_api::messagePoll::ID) { + LOG(DEBUG) << "Ignore found message " << message_id << " from chat " << chat_id; + return nullptr; + } + LOG(DEBUG) << "Found message " << message_id << " from chat " << chat_id; return message_info; } @@ -12293,6 +12558,8 @@ td::int64 Client::get_basic_group_chat_id(int64 basic_group_id) { return -basic_group_id; } +constexpr Client::int64 Client::GENERAL_MESSAGE_THREAD_ID; + constexpr Client::int64 Client::GREAT_MINDS_SET_ID; constexpr Client::Slice Client::GREAT_MINDS_SET_NAME; diff --git a/telegram-bot-api/Client.h b/telegram-bot-api/Client.h index 40c0489..169e5b4 100644 --- a/telegram-bot-api/Client.h +++ b/telegram-bot-api/Client.h @@ -22,13 +22,13 @@ #include "td/utils/Container.h" #include "td/utils/FlatHashMap.h" #include "td/utils/FlatHashSet.h" +#include "td/utils/HashTableUtils.h" #include "td/utils/JsonBuilder.h" #include "td/utils/Promise.h" #include "td/utils/Slice.h" #include "td/utils/Status.h" #include "td/utils/WaitFreeHashMap.h" -#include #include #include #include @@ -68,10 +68,12 @@ class Client final : public WebhookActor::Callback { static constexpr bool USE_MESSAGE_DATABASE = false; + static constexpr int64 GENERAL_MESSAGE_THREAD_ID = 1 << 20; + static constexpr int32 MAX_CERTIFICATE_FILE_SIZE = 3 << 20; static constexpr int32 MAX_DOWNLOAD_FILE_SIZE = 20 << 20; - static constexpr int32 MAX_CONCURRENTLY_SENT_CHAT_MESSAGES = 1000; // some unreasonably big value + static constexpr int32 MAX_CONCURRENTLY_SENT_CHAT_MESSAGES = 250; // some unreasonably big value static constexpr std::size_t MIN_PENDING_UPDATES_WARNING = 200; @@ -97,6 +99,7 @@ class Client final : public WebhookActor::Callback { static constexpr int USER_ONLY_ERROR_CODE = 405; static constexpr Slice USER_ONLY_ERROR_DESCRIPTION = "Method Not Allowed: You can only use this method as a user"; + class JsonEmptyObject; class JsonFile; class JsonDatedFile; class JsonDatedFiles; @@ -132,7 +135,6 @@ class Client final : public WebhookActor::Callback { class JsonPollAnswer; class JsonEntity; class JsonVectorEntities; - class JsonCallbackGame; class JsonWebAppInfo; class JsonInlineKeyboardButton; class JsonInlineKeyboard; @@ -156,7 +158,7 @@ class Client final : public WebhookActor::Callback { class JsonChatMemberUpdated; class JsonChatJoinRequest; class JsonForumTopicCreated; - class JsonForumTopicIsClosedToggled; + class JsonForumTopicEdited; class JsonForumTopicInfo; class JsonGameHighScore; class JsonAddress; @@ -168,10 +170,9 @@ class Client final : public WebhookActor::Callback { class JsonWebAppData; class JsonProximityAlertTriggered; class JsonVideoChatScheduled; - class JsonVideoChatStarted; class JsonVideoChatEnded; class JsonInviteVideoChatParticipants; - class JsonChatSetTtl; + class JsonChatSetMessageAutoDeleteTime; class JsonUpdateTypes; class JsonWebhookInfo; class JsonStickerSet; @@ -184,6 +185,7 @@ class Client final : public WebhookActor::Callback { class JsonChats; class JsonChatsNearby; class JsonMessagesArray; + class JsonFoundMessages; class JsonProxy; class JsonProxiesArray; //stop custom Json objects @@ -236,6 +238,7 @@ class Client final : public WebhookActor::Callback { class TdOnJoinChatCallback; class TdOnReturnChatCallback; class TdOnReturnMessagesCallback; + class TdOnFoundMessagesCallback; class TdOnGetCallbackQueryAnswerCallback; class TdOnGetProxiesQueryCallback; class TdOnAddProxyQueryCallback; @@ -359,15 +362,10 @@ class Client final : public WebhookActor::Callback { void on_result(td::uint64 id, object_ptr result); void on_update_authorization_state(); - void log_out(bool is_api_id_invalid); - Slice get_logging_out_error_description() const; + void log_out(int32 error_code, Slice error_message); void on_closed(); void finish_closing(); - static int32 get_database_scheduler_id(); - - static int32 get_file_gc_scheduler_id(); - void clear_tqueue(); bool allow_update_before_authorization(const td_api::Object *update) const; @@ -519,6 +517,8 @@ class Client final : public WebhookActor::Callback { static td::Result get_user_id(const Query *query, Slice field_name = Slice("user_id")); + void decrease_yet_unsent_message_count(int64 chat_id, int32 count); + int64 extract_yet_unsent_message_query_id(int64 chat_id, int64 message_id, bool *is_reply_to_message_deleted); // start custom helper methods @@ -614,6 +614,11 @@ class Client final : public WebhookActor::Callback { Status process_reopen_forum_topic_query(PromisedQueryPtr &query); Status process_delete_forum_topic_query(PromisedQueryPtr &query); Status process_unpin_all_forum_topic_messages_query(PromisedQueryPtr &query); + Status process_edit_general_forum_topic_query(PromisedQueryPtr &query); + Status process_close_general_forum_topic_query(PromisedQueryPtr &query); + Status process_reopen_general_forum_topic_query(PromisedQueryPtr &query); + Status process_hide_general_forum_topic_query(PromisedQueryPtr &query); + Status process_unhide_general_forum_topic_query(PromisedQueryPtr &query); Status process_get_chat_member_query(PromisedQueryPtr &query); Status process_get_chat_administrators_query(PromisedQueryPtr &query); Status process_get_chat_member_count_query(PromisedQueryPtr &query); @@ -719,10 +724,19 @@ class Client final : public WebhookActor::Callback { void abort_long_poll(bool from_set_webhook); - void fail_query_closing(PromisedQueryPtr &&query) const; + void fail_query_closing(PromisedQueryPtr &&query); void fail_query_conflict(Slice message, PromisedQueryPtr &&query); + struct ClosingError { + int code; + int retry_after; + Slice message; + }; + ClosingError get_closing_error(); + + static int get_retry_after_time(Slice error_message); + static void fail_query_with_error(PromisedQueryPtr query, int32 error_code, Slice error_message, Slice default_message = Slice()); @@ -817,6 +831,8 @@ class Client final : public WebhookActor::Callback { bool has_location = false; bool join_to_send_messages = false; bool join_by_request = false; + bool has_hidden_members = false; + bool has_aggressive_anti_spam_enabled = false; // start custom properties bool is_verified = false; @@ -832,6 +848,8 @@ class Client final : public WebhookActor::Callback { void set_supergroup_slow_mode_delay(int64 supergroup_id, int32 slow_mode_delay); void set_supergroup_linked_chat_id(int64 supergroup_id, int64 linked_chat_id); void set_supergroup_location(int64 supergroup_id, object_ptr location); + void set_supergroup_has_hidden_members(int64 supergroup_id, bool has_hidden_members); + void set_supergroup_has_aggressive_anti_spam_enabled(int64 supergroup_id, bool has_aggressive_anti_spam_enabled); SupergroupInfo *add_supergroup_info(int64 supergroup_id); const SupergroupInfo *get_supergroup_info(int64 supergroup_id) const; @@ -942,7 +960,8 @@ class Client final : public WebhookActor::Callback { td::unique_ptr delete_message(int64 chat_id, int64 message_id, bool only_from_cache); void add_new_message(object_ptr &&message, bool is_edited); - void process_new_message_queue(int64 chat_id); + + void process_new_message_queue(int64 chat_id, int state); struct FullMessageId { int64 chat_id; @@ -959,14 +978,14 @@ class Client final : public WebhookActor::Callback { }; struct FullMessageIdHash { - std::size_t operator()(FullMessageId full_message_id) const { - return std::hash()(full_message_id.chat_id) * 2023654985u + - std::hash()(full_message_id.message_id); + td::uint32 operator()(FullMessageId full_message_id) const { + return td::Hash()(full_message_id.chat_id) * 2023654985u + + td::Hash()(full_message_id.message_id); } }; FullMessageId add_message(object_ptr &&message, bool force_update_content = false); - const MessageInfo *get_message(int64 chat_id, int64 message_id) const; + const MessageInfo *get_message(int64 chat_id, int64 message_id, bool force_cache) const; MessageInfo *get_message_editable(int64 chat_id, int64 message_id); void update_message_content(int64 chat_id, int64 message_id, object_ptr &&content); @@ -998,6 +1017,7 @@ class Client final : public WebhookActor::Callback { const td::string &inline_message_id); void add_new_callback_query(object_ptr &&query); + void process_new_callback_query_queue(int64 user_id, int state); void add_new_inline_callback_query(object_ptr &&query); @@ -1085,6 +1105,10 @@ class Client final : public WebhookActor::Callback { int64 my_id_ = -1; int32 authorization_date_ = -1; + double next_authorization_time_ = 0; + + int32 prev_retry_after = 0; + td::string retry_after_error_message; int64 group_anonymous_bot_user_id_ = 0; int64 channel_bot_user_id_ = 0; @@ -1216,6 +1240,9 @@ class Client final : public WebhookActor::Callback { double previous_get_updates_finish_time_ = 0; double next_get_updates_conflict_time_ = 0; + int32 flood_limited_query_count_ = 0; + double next_flood_limit_warning_time_ = 0; + td::uint64 webhook_generation_ = 1; UpdateType delayed_update_type_ = UpdateType::Size; diff --git a/telegram-bot-api/ClientManager.cpp b/telegram-bot-api/ClientManager.cpp index 66a0a87..67f06db 100644 --- a/telegram-bot-api/ClientManager.cpp +++ b/telegram-bot-api/ClientManager.cpp @@ -209,14 +209,14 @@ bool ClientManager::check_flood_limits(PromisedQueryPtr &query, bool is_user_log flood_control.add_limit(60 * 60, 600); // 600 in an hour } } - auto now = static_cast(td::Time::now()); - td::uint32 wakeup_at = flood_control.get_wakeup_at(); + auto now = td::Time::now(); + auto wakeup_at = flood_control.get_wakeup_at(); if (wakeup_at > now) { LOG(INFO) << "Failed to create Client from IP address " << ip_address; query->set_retry_after_error(static_cast(wakeup_at - now) + 1); return false; } - flood_control.add_event(static_cast(now)); + flood_control.add_event(now); } return true; } @@ -344,13 +344,13 @@ void ClientManager::get_stats(td::Promise promise, if(as_json) { jb_root("buffer_memory", JsonStatsSize(td::BufferAllocator::get_buffer_mem())); - jb_root("active_webhook_connections", td::JsonLong(WebhookActor::get_total_connections_count())); - jb_root("active_requests", td::JsonLong(parameters_->shared_data_->query_count_.load())); + jb_root("active_webhook_connections", td::JsonLong(WebhookActor::get_total_connection_count())); + jb_root("active_requests", td::JsonLong(parameters_->shared_data_->query_count_.load(std::memory_order_relaxed))); jb_root("active_network_queries", td::JsonLong(td::get_pending_network_query_count(*parameters_->net_query_stats_))); } else { sb << "buffer_memory\t" << td::format::as_size(td::BufferAllocator::get_buffer_mem()) << '\n'; - sb << "active_webhook_connections\t" << WebhookActor::get_total_connections_count() << '\n'; - sb << "active_requests\t" << parameters_->shared_data_->query_count_.load() << '\n'; + sb << "active_webhook_connections\t" << WebhookActor::get_total_connection_count() << '\n'; + sb << "active_requests\t" << parameters_->shared_data_->query_count_.load(std::memory_order_relaxed) << '\n'; sb << "active_network_queries\t" << td::get_pending_network_query_count(*parameters_->net_query_stats_) << '\n'; } if(as_json) { @@ -370,9 +370,10 @@ void ClientManager::get_stats(td::Promise promise, ServerBotInfo bot_info = client_info->client_.get_actor_unsafe()->get_bot_info(); auto active_request_count = client_info->stat_.get_active_request_count(); auto active_file_upload_bytes = client_info->stat_.get_active_file_upload_bytes(); + auto active_file_upload_count = client_info->stat_.get_active_file_upload_count(); auto stats = client_info->stat_.as_json_ready_vector(now); JsonStatsBotAdvanced bot( - std::move(top_bot_id), std::move(bot_info), active_request_count, active_file_upload_bytes, std::move(stats), parameters_->stats_hide_sensible_data_, now + std::move(top_bot_id), std::move(bot_info), active_request_count, active_file_upload_bytes, active_file_upload_count, std::move(stats), parameters_->stats_hide_sensible_data_, now ); bots.push_back(bot); } @@ -385,6 +386,7 @@ void ClientManager::get_stats(td::Promise promise, auto bot_info = client_info->client_.get_actor_unsafe()->get_bot_info(); auto active_request_count = client_info->stat_.get_active_request_count(); auto active_file_upload_bytes = client_info->stat_.get_active_file_upload_bytes(); + auto active_file_upload_count = client_info->stat_.get_active_file_upload_count(); sb << '\n'; sb << "id\t" << bot_info.id_ << '\n'; sb << "uptime\t" << now - bot_info.start_time_ << '\n'; @@ -398,6 +400,9 @@ void ClientManager::get_stats(td::Promise promise, if (active_file_upload_bytes != 0) { sb << "active_file_upload_bytes\t" << active_file_upload_bytes << '\n'; } + if (active_file_upload_count != 0) { + sb << "active_file_upload_count\t" << active_file_upload_count << '\n'; + } if (!bot_info.webhook_.empty()) { if (!parameters_->stats_hide_sensible_data_) { sb << "webhook\t" << bot_info.webhook_ << '\n'; diff --git a/telegram-bot-api/ClientParameters.h b/telegram-bot-api/ClientParameters.h index 34f65dd..7b71ad5 100644 --- a/telegram-bot-api/ClientParameters.h +++ b/telegram-bot-api/ClientParameters.h @@ -53,6 +53,16 @@ struct SharedData { } return static_cast(result); } + + static td::int32 get_database_scheduler_id() { + // the same scheduler as for database in Td + return 1; + } + + static td::int32 get_file_gc_scheduler_id() { + // the same scheduler as for file GC in Td + return 2; + } }; struct ClientParameters { diff --git a/telegram-bot-api/HttpServer.h b/telegram-bot-api/HttpServer.h index f7d0009..9699835 100644 --- a/telegram-bot-api/HttpServer.h +++ b/telegram-bot-api/HttpServer.h @@ -46,7 +46,7 @@ class HttpServer final : public td::TcpListener::Callback { set_timeout_at(wakeup_at); return; } - flood_control_.add_event(static_cast(now)); + flood_control_.add_event(now); LOG(INFO) << "Create tcp listener " << td::tag("address", ip_address_) << td::tag("port", port_); listener_ = td::create_actor( PSLICE() << "TcpListener" << td::tag("address", ip_address_) << td::tag("port", port_), port_, diff --git a/telegram-bot-api/Query.cpp b/telegram-bot-api/Query.cpp index 8a109e7..f017e6a 100644 --- a/telegram-bot-api/Query.cpp +++ b/telegram-bot-api/Query.cpp @@ -136,7 +136,7 @@ void Query::send_response_stat() const { return; } send_closure(stat_actor_, &BotStatActor::add_event, - ServerBotStat::Response{state_ == State::OK, answer_.size(), files_size()}, now); + ServerBotStat::Response{state_ == State::OK, answer_.size(), file_count(), files_size()}, now); } } // namespace telegram_bot_api diff --git a/telegram-bot-api/Stats.cpp b/telegram-bot-api/Stats.cpp index 796e97d..c9bc343 100644 --- a/telegram-bot-api/Stats.cpp +++ b/telegram-bot-api/Stats.cpp @@ -196,6 +196,15 @@ double BotStatActor::get_score(double now) { return result; } +double BotStatActor::get_minute_update_count(double now) { + auto minute_stat = stat_[2].stat_duration(now); + double result = minute_stat.first.update_count_; + if (minute_stat.second != 0) { + result /= minute_stat.second; + } + return result; +} + td::int64 BotStatActor::get_active_request_count() const { return active_request_count_; } @@ -204,6 +213,10 @@ td::int64 BotStatActor::get_active_file_upload_bytes() const { return active_file_upload_bytes_; } +td::int64 BotStatActor::get_active_file_upload_count() const { + return active_file_upload_count_; +} + bool BotStatActor::is_active(double now) const { return last_activity_timestamp_ > now - 86400; } diff --git a/telegram-bot-api/Stats.h b/telegram-bot-api/Stats.h index 234c2b7..89c3300 100644 --- a/telegram-bot-api/Stats.h +++ b/telegram-bot-api/Stats.h @@ -116,6 +116,7 @@ struct ServerBotStat { struct Response { bool ok_; size_t size_; + td::int64 file_count_; td::int64 files_size_; }; void on_event(const Response &response) { @@ -183,15 +184,20 @@ class BotStatActor final : public td::Actor { td::vector as_vector(double now); td::vector as_json_ready_vector(double now); + td::string get_description() const; td::vector get_jsonable_description() const; double get_score(double now); + double get_minute_update_count(double now); + td::int64 get_active_request_count() const; td::int64 get_active_file_upload_bytes() const; + td::int64 get_active_file_upload_count() const; + bool is_active(double now) const; static constexpr std::size_t SIZE = 4; @@ -203,12 +209,14 @@ class BotStatActor final : public td::Actor { double last_activity_timestamp_ = -1e9; td::int64 active_request_count_ = 0; td::int64 active_file_upload_bytes_ = 0; + td::int64 active_file_upload_count_ = 0; void on_event(const ServerBotStat::Update &update) { } void on_event(const ServerBotStat::Response &response) { active_request_count_--; + active_file_upload_count_ -= response.file_count_; active_file_upload_bytes_ -= response.files_size_; CHECK(active_request_count_ >= 0); CHECK(active_file_upload_bytes_ >= 0); @@ -216,6 +224,7 @@ class BotStatActor final : public td::Actor { void on_event(const ServerBotStat::Request &request) { active_request_count_++; + active_file_upload_count_ += request.file_count_; active_file_upload_bytes_ += request.files_size_; } }; diff --git a/telegram-bot-api/StatsJson.h b/telegram-bot-api/StatsJson.h index d70a5c0..92531da 100644 --- a/telegram-bot-api/StatsJson.h +++ b/telegram-bot-api/StatsJson.h @@ -202,12 +202,13 @@ class JsonStatsBotAdvanced : public JsonStatsBot { ServerBotInfo bot, td::int64 active_request_count, td::int64 active_file_upload_bytes, + td::int64 active_file_upload_count, td::vector stats, const bool hide_sensible_data, const double now) : JsonStatsBot(std::move(score_id_pair)), bot_(std::move(bot)), active_request_count_(active_request_count), - active_file_upload_bytes_(active_file_upload_bytes), stats_(std::move(stats)), - hide_sensible_data_(hide_sensible_data), now_(now) { + active_file_upload_bytes_(active_file_upload_bytes), active_file_upload_count_(active_file_upload_count), + stats_(std::move(stats)), hide_sensible_data_(hide_sensible_data), now_(now) { } void store(td::JsonValueScope *scope) const { auto object = scope->enter_object(); @@ -240,6 +241,7 @@ class JsonStatsBotAdvanced : public JsonStatsBot { ServerBotInfo bot_; td::int64 active_request_count_; td::int64 active_file_upload_bytes_; + td::int64 active_file_upload_count_; td::vector stats_; const bool hide_sensible_data_; const double now_; diff --git a/telegram-bot-api/WebhookActor.cpp b/telegram-bot-api/WebhookActor.cpp index e6b7727..7701605 100644 --- a/telegram-bot-api/WebhookActor.cpp +++ b/telegram-bot-api/WebhookActor.cpp @@ -11,11 +11,8 @@ #include "td/net/GetHostByNameActor.h" #include "td/net/HttpHeaderCreator.h" #include "td/net/HttpProxy.h" -#include "td/net/SslStream.h" #include "td/net/TransparentProxy.h" -#include "td/actor/actor.h" - #include "td/utils/base64.h" #include "td/utils/buffer.h" #include "td/utils/common.h" @@ -32,13 +29,11 @@ #include "td/utils/Span.h" #include "td/utils/Time.h" -#include - namespace telegram_bot_api { static int VERBOSITY_NAME(webhook) = VERBOSITY_NAME(DEBUG); -std::atomic WebhookActor::total_connections_count_{0}; +std::atomic WebhookActor::total_connection_count_{0}; WebhookActor::WebhookActor(td::ActorShared callback, td::int64 tqueue_id, td::HttpUrl url, td::string cert_path, td::int32 max_connections, bool from_db_flag, @@ -52,8 +47,10 @@ WebhookActor::WebhookActor(td::ActorShared callback, td::int64 tqueue_ , fix_ip_address_(fix_ip_address) , from_db_flag_(from_db_flag) , max_connections_(max_connections) - , secret_token_(std::move(secret_token)) { + , secret_token_(std::move(secret_token)) + , slow_scheduler_id_(td::Scheduler::instance()->sched_count() - 2) { CHECK(max_connections_ > 0); + CHECK(slow_scheduler_id_ > 0); if (!cached_ip_address.empty()) { auto r_ip_address = td::IPAddress::get_ip_address(cached_ip_address); @@ -74,6 +71,11 @@ WebhookActor::WebhookActor(td::ActorShared callback, td::int64 tqueue_ << ", max_connections = " << max_connections_; } +WebhookActor::~WebhookActor() { + td::Scheduler::instance()->destroy_on_scheduler(SharedData::get_file_gc_scheduler_id(), update_map_, queue_updates_, + queues_); +} + void WebhookActor::relax_wakeup_at(double wakeup_at, const char *source) { if (wakeup_at_ == 0 || wakeup_at < wakeup_at_) { VLOG(webhook) << "Wake up in " << wakeup_at - td::Time::now() << " from " << source; @@ -114,8 +116,9 @@ void WebhookActor::on_resolved_ip_address(td::Result r_ip_address return on_error(r_ip_address.move_as_error()); } auto new_ip_address = r_ip_address.move_as_ok(); - if (!check_ip_address(new_ip_address)) { - return on_error(td::Status::Error(PSLICE() << "IP address " << new_ip_address.get_ip_str() << " is reserved")); + auto check_status = check_ip_address(new_ip_address); + if (check_status.is_error()) { + return on_error(std::move(check_status)); } if (!(ip_address_ == new_ip_address)) { VLOG(webhook) << "IP address has changed: " << ip_address_ << " --> " << new_ip_address; @@ -128,24 +131,25 @@ void WebhookActor::on_resolved_ip_address(td::Result r_ip_address VLOG(webhook) << "IP address was verified"; } -td::Status WebhookActor::create_connection() { - if (!ip_address_.is_valid()) { - VLOG(webhook) << "Can't create connection: IP address is not ready"; - return td::Status::Error("IP address is not ready"); +void WebhookActor::on_ssl_context_created(td::Result r_ssl_ctx) { + if (r_ssl_ctx.is_error()) { + create_webhook_error("Can't create an SSL context", r_ssl_ctx.move_as_error(), true); + loop(); + return; } + ssl_ctx_ = r_ssl_ctx.move_as_ok(); + VLOG(webhook) << "SSL context was created"; + loop(); +} + +td::Status WebhookActor::create_connection() { + CHECK(ip_address_.is_valid()); if (parameters_->webhook_proxy_ip_address_.is_valid()) { auto r_proxy_socket_fd = td::SocketFd::open(parameters_->webhook_proxy_ip_address_); if (r_proxy_socket_fd.is_error()) { - td::Slice error_message = "Can't connect to the webhook proxy"; - auto error = td::Status::Error(PSLICE() << error_message << ": " << r_proxy_socket_fd.error()); - VLOG(webhook) << error; - on_webhook_error(error_message); - on_error(td::Status::Error(error_message)); - return error; + return create_webhook_error("Can't connect to the webhook proxy", r_proxy_socket_fd.move_as_error(), false); } if (!was_checked_) { - TRY_STATUS(create_ssl_stream()); // check certificate - // verify webhook even we can't establish connection to the webhook was_checked_ = true; on_webhook_verified(); @@ -188,29 +192,33 @@ td::Status WebhookActor::create_connection() { auto r_fd = td::SocketFd::open(ip_address_); if (r_fd.is_error()) { - td::Slice error_message = "Can't connect to the webhook"; - auto error = td::Status::Error(PSLICE() << error_message << ": " << r_fd.error()); - VLOG(webhook) << error; - on_webhook_error(error_message); - on_error(r_fd.move_as_error()); - return error; + return create_webhook_error("Can't connect to the webhook", r_fd.move_as_error(), false); } return create_connection(td::BufferedFd(r_fd.move_as_ok())); } +td::Status WebhookActor::create_webhook_error(td::Slice error_message, td::Status &&result, bool is_public) { + CHECK(result.is_error()); + auto error = td::Status::Error(PSLICE() << error_message << ": " << result); + VLOG(webhook) << error; + if (is_public) { + on_webhook_error(PSLICE() << error_message << ": " << result.public_message()); + } else { + on_webhook_error(error_message); + } + on_error(std::move(result)); + return std::move(error); +} + td::Result WebhookActor::create_ssl_stream() { if (url_.protocol_ == td::HttpUrl::Protocol::Http) { return td::SslStream(); } - auto r_ssl_stream = td::SslStream::create(url_.host_, cert_path_, td::SslStream::VerifyPeer::On, !cert_path_.empty()); + CHECK(ssl_ctx_); + auto r_ssl_stream = td::SslStream::create(url_.host_, ssl_ctx_, !cert_path_.empty()); if (r_ssl_stream.is_error()) { - td::Slice error_message = "Can't create an SSL connection"; - auto error = td::Status::Error(PSLICE() << error_message << ": " << r_ssl_stream.error()); - VLOG(webhook) << error; - on_webhook_error(PSLICE() << error_message << ": " << r_ssl_stream.error().public_message()); - on_error(r_ssl_stream.move_as_error()); - return std::move(error); + return create_webhook_error("Can't create an SSL connection", r_ssl_stream.move_as_error(), true); } return r_ssl_stream.move_as_ok(); } @@ -221,13 +229,13 @@ td::Status WebhookActor::create_connection(td::BufferedFd fd) { auto id = connections_.create(Connection()); auto *conn = connections_.get(id); conn->actor_id_ = td::create_actor( - PSLICE() << "Connect:" << id, std::move(fd), std::move(ssl_stream), std::numeric_limits::max(), 20, 60, - td::ActorShared(actor_id(this), id)); + PSLICE() << "Connect:" << id, std::move(fd), std::move(ssl_stream), 0, 20, 60, + td::ActorShared(actor_id(this), id), slow_scheduler_id_); conn->ip_generation_ = ip_generation_; conn->event_id_ = {}; conn->id_ = id; ready_connections_.put(conn->to_list_node()); - total_connections_count_.fetch_add(1, std::memory_order_relaxed); + total_connection_count_.fetch_add(1, std::memory_order_relaxed); if (!was_checked_) { was_checked_ = true; @@ -251,6 +259,15 @@ void WebhookActor::on_socket_ready_async(td::Result } void WebhookActor::create_new_connections() { + if (!ip_address_.is_valid()) { + VLOG(webhook) << "Can't create new connections: IP address is not ready"; + return; + } + if (url_.protocol_ != td::HttpUrl::Protocol::Http && !ssl_ctx_) { + VLOG(webhook) << "Can't create new connections: SSL context is not ready"; + return; + } + size_t need_connections = queue_updates_.size(); if (need_connections > static_cast(max_connections_)) { need_connections = max_connections_; @@ -287,7 +304,7 @@ void WebhookActor::create_new_connections() { << td::tag("after", td::format::as_time(wakeup_at - now)); break; } - flood->add_event(static_cast(now)); + flood->add_event(now); if (create_connection().is_error()) { relax_wakeup_at(now + 1.0, "create_new_connections error"); return; @@ -652,7 +669,7 @@ void WebhookActor::handle(td::unique_ptr response) { if (need_close || close_connection) { VLOG(webhook) << "Close connection " << connection_id; connections_.erase(connection_ptr->id_); - total_connections_count_.fetch_sub(1, std::memory_order_relaxed); + total_connection_count_.fetch_sub(1, std::memory_order_relaxed); } else { ready_connections_.put(connection_ptr->to_list_node()); } @@ -668,10 +685,10 @@ void WebhookActor::start_up() { max_loaded_updates_ = max_connections_ * 2; next_ip_address_resolve_time_ = last_success_time_ = td::Time::now() - 3600; - active_new_connection_flood_.add_limit(1, 10 * max_connections_); - active_new_connection_flood_.add_limit(5, 20 * max_connections_); - pending_new_connection_flood_.add_limit(1, 1); + active_new_connection_flood_.add_limit(1, 20); + + pending_new_connection_flood_.add_limit(2, 1); if (!parameters_->local_mode_) { if (url_.protocol_ == td::HttpUrl::Protocol::Https || (parameters_->allow_http_ && url_.protocol_ == td::HttpUrl::Protocol::Http)) { @@ -682,15 +699,14 @@ void WebhookActor::start_up() { } else { CHECK(url_.protocol_ == td::HttpUrl::Protocol::Http); VLOG(webhook) << "Can't create connection: HTTP is forbidden"; - on_error(td::Status::Error("HTTPS url must be provided for webhook")); + on_error(td::Status::Error("An HTTPS URL must be provided for webhook")); } } if (fix_ip_address_ && !stop_flag_) { - if (!ip_address_.is_valid()) { - on_error(td::Status::Error("Invalid IP address specified")); - } else if (!check_ip_address(ip_address_)) { - on_error(td::Status::Error(PSLICE() << "IP address " << ip_address_.get_ip_str() << " is reserved")); + auto check_status = check_ip_address(ip_address_); + if (check_status.is_error()) { + return on_error(std::move(check_status)); } } @@ -699,6 +715,15 @@ void WebhookActor::start_up() { on_webhook_verified(); } + if (url_.protocol_ != td::HttpUrl::Protocol::Http && !stop_flag_) { + // asynchronously create SSL context + td::Scheduler::instance()->run_on_scheduler( + SharedData::get_database_scheduler_id(), [actor_id = actor_id(this), cert_path = cert_path_](td::Unit) mutable { + send_closure(actor_id, &WebhookActor::on_ssl_context_created, + td::SslCtx::create(cert_path, td::SslCtx::VerifyPeer::On)); + }); + } + yield(); } @@ -720,7 +745,7 @@ void WebhookActor::close() { } void WebhookActor::tear_down() { - total_connections_count_.fetch_sub(connections_.size(), std::memory_order_relaxed); + total_connection_count_.fetch_sub(connections_.size(), std::memory_order_relaxed); } void WebhookActor::on_webhook_verified() { @@ -731,24 +756,26 @@ void WebhookActor::on_webhook_verified() { send_closure(callback_, &Callback::webhook_verified, std::move(ip_address_str)); } -bool WebhookActor::check_ip_address(const td::IPAddress &addr) const { +td::Status WebhookActor::check_ip_address(const td::IPAddress &addr) const { if (!addr.is_valid()) { - return false; + return td::Status::Error("Invalid IP address specified"); } if (parameters_->local_mode_) { - // allow any valid IP address - return true; + return td::Status::OK(); } if (!addr.is_ipv4()) { VLOG(webhook) << "Bad IP address (not IPv4): " << addr; - return false; + return td::Status::Error("IPv6-only addresses are not allowed"); } - return !addr.is_reserved(); + if (addr.is_reserved()) { + return td::Status::Error(PSLICE() << "IP address " << addr.get_ip_str() << " is reserved"); + } + return td::Status::OK(); } void WebhookActor::on_error(td::Status status) { VLOG(webhook) << "Receive webhook error " << status; - if (!was_checked_) { + if (!was_checked_ && !stop_flag_) { CHECK(!callback_.empty()); send_closure(std::move(callback_), &Callback::webhook_closed, std::move(status)); stop_flag_ = true; diff --git a/telegram-bot-api/WebhookActor.h b/telegram-bot-api/WebhookActor.h index 28f5989..ab3b8af 100644 --- a/telegram-bot-api/WebhookActor.h +++ b/telegram-bot-api/WebhookActor.h @@ -12,6 +12,7 @@ #include "td/net/HttpOutboundConnection.h" #include "td/net/HttpQuery.h" +#include "td/net/SslCtx.h" #include "td/net/SslStream.h" #include "td/actor/actor.h" @@ -21,6 +22,7 @@ #include "td/utils/Container.h" #include "td/utils/FlatHashMap.h" #include "td/utils/FloodControlFast.h" +#include "td/utils/HashTableUtils.h" #include "td/utils/HttpUrl.h" #include "td/utils/JsonBuilder.h" #include "td/utils/List.h" @@ -31,7 +33,6 @@ #include "td/utils/VectorQueue.h" #include -#include #include #include #include @@ -54,13 +55,18 @@ class WebhookActor final : public td::HttpOutboundConnection::Callback { WebhookActor(td::ActorShared callback, td::int64 tqueue_id, td::HttpUrl url, td::string cert_path, td::int32 max_connections, bool from_db_flag, td::string cached_ip_address, bool fix_ip_address, td::string secret_token, std::shared_ptr parameters); + WebhookActor(const WebhookActor &) = delete; + WebhookActor &operator=(const WebhookActor &) = delete; + WebhookActor(WebhookActor &&) = delete; + WebhookActor &operator=(WebhookActor &&) = delete; + ~WebhookActor(); void update(); void close(); - static td::int64 get_total_connections_count() { - return total_connections_count_; + static td::int64 get_total_connection_count() { + return total_connection_count_; } private: @@ -69,14 +75,14 @@ class WebhookActor final : public td::HttpOutboundConnection::Callback { static constexpr int WEBHOOK_MAX_RESEND_TIMEOUT = 60; static constexpr int WEBHOOK_DROP_TIMEOUT = 60 * 60 * 23; - static std::atomic total_connections_count_; + static std::atomic total_connection_count_; td::ActorShared callback_; td::int64 tqueue_id_; bool tqueue_empty_ = false; std::size_t last_pending_update_count_ = MIN_PENDING_UPDATES_WARNING; td::HttpUrl url_; - td::string cert_path_; + const td::string cert_path_; std::shared_ptr parameters_; double last_error_time_ = 0; @@ -122,8 +128,8 @@ class WebhookActor final : public td::HttpOutboundConnection::Callback { td::TQueue::EventId tqueue_offset_; std::size_t max_loaded_updates_ = 0; struct EventIdHash { - std::size_t operator()(td::TQueue::EventId event_id) const { - return std::hash()(event_id.value()); + td::uint32 operator()(td::TQueue::EventId event_id) const { + return td::Hash()(event_id.value()); } }; td::FlatHashMap, EventIdHash> update_map_; @@ -133,6 +139,7 @@ class WebhookActor final : public td::HttpOutboundConnection::Callback { double first_error_410_time_ = 0; + td::SslCtx ssl_ctx_; td::IPAddress ip_address_; td::int32 ip_generation_ = 0; double next_ip_address_resolve_time_ = 0; @@ -170,12 +177,17 @@ class WebhookActor final : public td::HttpOutboundConnection::Callback { double last_success_time_ = 0; double wakeup_at_ = 0; bool last_update_was_successful_ = true; + td::int32 slow_scheduler_id_ = -1; void relax_wakeup_at(double wakeup_at, const char *source); void resolve_ip_address(); void on_resolved_ip_address(td::Result r_ip_address); + void on_ssl_context_created(td::Result r_ssl_ctx); + + td::Status create_webhook_error(td::Slice error_message, td::Status &&result, bool is_public); + td::Result create_ssl_stream(); td::Status create_connection() TD_WARN_UNUSED_RESULT; td::Status create_connection(td::BufferedFd fd) TD_WARN_UNUSED_RESULT; @@ -202,7 +214,7 @@ class WebhookActor final : public td::HttpOutboundConnection::Callback { void start_up() final; - bool check_ip_address(const td::IPAddress &addr) const; + td::Status check_ip_address(const td::IPAddress &addr) const; void on_error(td::Status status); void on_connection_error(td::Status error) final; diff --git a/telegram-bot-api/telegram-bot-api.cpp b/telegram-bot-api/telegram-bot-api.cpp index bef522a..7090bb6 100644 --- a/telegram-bot-api/telegram-bot-api.cpp +++ b/telegram-bot-api/telegram-bot-api.cpp @@ -67,6 +67,7 @@ static void quit_signal_handler(int sig) { static td::MemoryLog<1 << 20> memory_log; void print_log() { + td::LogGuard log_guard; auto buf = memory_log.get_buffer(); auto pos = memory_log.get_pos(); size_t tail_length = buf.size() - pos; @@ -85,19 +86,30 @@ void print_log() { static std::atomic_bool has_failed{false}; +static std::atomic_flag need_dump_statistics; + static void dump_stacktrace_signal_handler(int sig) { if (has_failed) { return; } + td::LogGuard log_guard; + if (LOG_TAG != nullptr && *LOG_TAG) { + td::signal_safe_write(td::Slice(LOG_TAG)); + td::signal_safe_write(td::Slice("\n"), false); + } td::Stacktrace::print_to_stderr(); + need_dump_statistics.clear(); } static void fail_signal_handler(int sig) { has_failed = true; - td::signal_safe_write_signal_number(sig); - td::Stacktrace::PrintOptions options; - options.use_gdb = true; - td::Stacktrace::print_to_stderr(options); + { + td::LogGuard log_guard; + td::signal_safe_write_signal_number(sig); + td::Stacktrace::PrintOptions options; + options.use_gdb = true; + td::Stacktrace::print_to_stderr(options); + } print_log(); _Exit(EXIT_FAILURE); } @@ -130,6 +142,7 @@ int main(int argc, char *argv[]) { need_reopen_log.test_and_set(); need_quit.test_and_set(); need_change_verbosity_level.test_and_set(); + need_dump_statistics.test_and_set(); need_dump_log.test_and_set(); td::Stacktrace::init(); @@ -152,7 +165,7 @@ int main(int argc, char *argv[]) { auto start_time = td::Time::now(); auto shared_data = std::make_shared(); auto parameters = std::make_unique(); - parameters->version_ = "6.3.2"; + parameters->version_ = "6.4"; parameters->shared_data_ = shared_data; parameters->start_time_ = start_time; auto net_query_stats = td::create_net_query_stats(); @@ -556,8 +569,7 @@ int main(int argc, char *argv[]) { if (!need_dump_log.test_and_set()) { print_log(); - auto guard = sched.get_main_guard(); - send_closure(client_manager, &ClientManager::dump_statistics); + need_dump_statistics.clear(); } double now = td::Time::now(); @@ -575,7 +587,7 @@ int main(int argc, char *argv[]) { next_watchdog_kick_time = now + WATCHDOG_TIMEOUT / 2; } - if (now > last_dump_time + 300.0) { + if (!need_dump_statistics.test_and_set() || now > last_dump_time + 300.0) { last_dump_time = now; auto guard = sched.get_main_guard(); send_closure(client_manager, &ClientManager::dump_statistics);