From 1d5dc9e5e82e31302b44afc5dcc3e34c7a61c228 Mon Sep 17 00:00:00 2001 From: levlam Date: Tue, 22 Nov 2022 21:31:04 +0300 Subject: [PATCH] Check for MAX_CONCURRENTLY_SENT_CHAT_MESSAGES as early as possible. --- telegram-bot-api/Client.cpp | 84 ++++++++++++++++++++++++------------- telegram-bot-api/Client.h | 4 +- 2 files changed, 59 insertions(+), 29 deletions(-) diff --git a/telegram-bot-api/Client.cpp b/telegram-bot-api/Client.cpp index e7a4a43..735ad75 100644 --- a/telegram-bot-api/Client.cpp +++ b/telegram-bot-api/Client.cpp @@ -2836,11 +2836,13 @@ class Client::TdOnGetUserProfilePhotosCallback final : public TdQueryCallback { class Client::TdOnSendMessageCallback final : public TdQueryCallback { public: - TdOnSendMessageCallback(Client *client, PromisedQueryPtr query) : client_(client), query_(std::move(query)) { + TdOnSendMessageCallback(Client *client, int64 chat_id, PromisedQueryPtr query) + : client_(client), chat_id_(chat_id), query_(std::move(query)) { } void on_result(object_ptr result) final { if (result->get_id() == td_api::error::ID) { + client_->decrease_yet_unsent_message_count(chat_id_, 1); return fail_query_with_error(std::move(query_), move_object_as(result)); } @@ -2851,21 +2853,27 @@ class Client::TdOnSendMessageCallback final : public TdQueryCallback { private: Client *client_; + int64 chat_id_; PromisedQueryPtr query_; }; class Client::TdOnSendMessageAlbumCallback final : public TdQueryCallback { public: - TdOnSendMessageAlbumCallback(Client *client, PromisedQueryPtr query) : client_(client), query_(std::move(query)) { + TdOnSendMessageAlbumCallback(Client *client, int64 chat_id, std::size_t message_count, PromisedQueryPtr query) + : client_(client), chat_id_(chat_id), message_count_(message_count), query_(std::move(query)) { } void on_result(object_ptr result) final { if (result->get_id() == td_api::error::ID) { + if (message_count_ > 0) { + client_->decrease_yet_unsent_message_count(chat_id_, static_cast(message_count_)); + } return fail_query_with_error(std::move(query_), move_object_as(result)); } CHECK(result->get_id() == td_api::messages::ID); auto messages = move_object_as(result); + CHECK(messages->messages_.size() == message_count_); auto query_id = client_->get_send_message_query_id(std::move(query_), true); for (auto &message : messages->messages_) { client_->on_sent_message(std::move(message), query_id); @@ -2874,6 +2882,8 @@ class Client::TdOnSendMessageAlbumCallback final : public TdQueryCallback { private: Client *client_; + int64 chat_id_; + std::size_t message_count_; PromisedQueryPtr query_; }; @@ -4062,7 +4072,7 @@ void Client::send(PromisedQueryPtr query) { query->set_stat_actor(stat_actor_); if (!parameters_->local_mode_ && !is_local_method(query->method())) { const BotStatActor *stat = stat_actor_.get_actor_unsafe(); - if (stat->get_active_request_count() > 10000) { + if (stat->get_active_request_count() > 5000) { LOG(INFO) << "Fail a query, because there are too many active queries: " << *query; return query->set_retry_after_error(60); } @@ -5140,7 +5150,7 @@ void Client::on_closed() { } on_message_send_failed(chat_id, message_id, 0, Status::Error(http_status_code, description)); } - CHECK(yet_unsent_message_count_.empty()); + yet_unsent_message_count_.clear(); while (!pending_bot_resolve_queries_.empty()) { auto it = pending_bot_resolve_queries_.begin(); @@ -7083,6 +7093,16 @@ td::Result Client::get_user_id(const Query *query, Slice field_name) return user_id; } +void Client::decrease_yet_unsent_message_count(int64 chat_id, int32 count) { + auto count_it = yet_unsent_message_count_.find(chat_id); + CHECK(count_it != yet_unsent_message_count_.end()); + CHECK(count_it->second >= count); + count_it->second -= count; + if (count_it->second == 0) { + yet_unsent_message_count_.erase(count_it); + } +} + td::int64 Client::extract_yet_unsent_message_query_id(int64 chat_id, int64 message_id, bool *is_reply_to_message_deleted) { auto yet_unsent_message_it = yet_unsent_messages_.find({chat_id, message_id}); @@ -7096,12 +7116,7 @@ td::int64 Client::extract_yet_unsent_message_query_id(int64 chat_id, int64 messa yet_unsent_messages_.erase(yet_unsent_message_it); - auto count_it = yet_unsent_message_count_.find(chat_id); - CHECK(count_it != yet_unsent_message_count_.end()); - CHECK(count_it->second > 0); - if (--count_it->second == 0) { - yet_unsent_message_count_.erase(count_it); - } + decrease_yet_unsent_message_count(chat_id, 1); if (reply_to_message_id > 0) { auto it = yet_unsent_reply_message_ids_.find({chat_id, reply_to_message_id}); @@ -7329,6 +7344,16 @@ td::Status Client::process_get_user_profile_photos_query(PromisedQueryPtr &query } td::Status Client::process_send_message_query(PromisedQueryPtr &query) { + auto r_chat_id = td::to_integer_safe(query->arg("chat_id")); + if (r_chat_id.is_ok()) { + // fast path + auto it = yet_unsent_message_count_.find(r_chat_id.ok()); + if (it != yet_unsent_message_count_.end() && it->second >= MAX_CONCURRENTLY_SENT_CHAT_MESSAGES) { + query->set_retry_after_error(60); + return Status::OK(); + } + } + TRY_RESULT(input_message_text, get_input_message_text(query.get())); do_send_message(std::move(input_message_text), std::move(query)); return Status::OK(); @@ -7622,21 +7647,24 @@ td::Status Client::process_send_media_group_query(PromisedQueryPtr &query) { input_message_contents = std::move(input_message_contents), reply_markup = std::move(reply_markup)](int64 chat_id, int64 reply_to_message_id, PromisedQueryPtr query) mutable { - auto on_message_thread_checked = - [this, disable_notification, protect_content, input_message_contents = std::move(input_message_contents), - reply_markup = std::move(reply_markup)](int64 chat_id, int64 message_thread_id, - int64 reply_to_message_id, PromisedQueryPtr query) mutable { - auto it = yet_unsent_message_count_.find(chat_id); - if (it != yet_unsent_message_count_.end() && it->second > MAX_CONCURRENTLY_SENT_CHAT_MESSAGES) { - return query->set_retry_after_error(60); - } + auto on_message_thread_checked = [this, disable_notification, protect_content, + input_message_contents = std::move(input_message_contents), + reply_markup = std::move(reply_markup)]( + int64 chat_id, int64 message_thread_id, int64 reply_to_message_id, + PromisedQueryPtr query) mutable { + auto &count = yet_unsent_message_count_[chat_id]; + if (count >= MAX_CONCURRENTLY_SENT_CHAT_MESSAGES) { + return query->set_retry_after_error(60); + } + auto message_count = input_message_contents.size(); + count += static_cast(message_count); - send_request(make_object( - chat_id, message_thread_id, reply_to_message_id, - get_message_send_options(disable_notification, protect_content), - std::move(input_message_contents), false), - td::make_unique(this, std::move(query))); - }; + send_request( + make_object(chat_id, message_thread_id, reply_to_message_id, + get_message_send_options(disable_notification, protect_content), + std::move(input_message_contents), false), + td::make_unique(this, chat_id, message_count, std::move(query))); + }; check_message_thread(chat_id, message_thread_id, reply_to_message_id, std::move(query), std::move(on_message_thread_checked)); }; @@ -9206,16 +9234,17 @@ void Client::do_send_message(object_ptr input_messa [this, disable_notification, protect_content, input_message_content = std::move(input_message_content), reply_markup = std::move(reply_markup)](int64 chat_id, int64 message_thread_id, int64 reply_to_message_id, PromisedQueryPtr query) mutable { - auto it = yet_unsent_message_count_.find(chat_id); - if (it != yet_unsent_message_count_.end() && it->second > MAX_CONCURRENTLY_SENT_CHAT_MESSAGES) { + auto &count = yet_unsent_message_count_[chat_id]; + if (count >= MAX_CONCURRENTLY_SENT_CHAT_MESSAGES) { return query->set_retry_after_error(60); } + count++; send_request( make_object(chat_id, message_thread_id, reply_to_message_id, get_message_send_options(disable_notification, protect_content), std::move(reply_markup), std::move(input_message_content)), - td::make_unique(this, std::move(query))); + td::make_unique(this, chat_id, std::move(query))); }; check_message_thread(chat_id, message_thread_id, reply_to_message_id, std::move(query), std::move(on_message_thread_checked)); @@ -9252,7 +9281,6 @@ void Client::on_sent_message(object_ptr &&message, int64 query_ yet_unsent_message.send_message_query_id = query_id; auto emplace_result = yet_unsent_messages_.emplace(yet_unsent_message_id, yet_unsent_message); CHECK(emplace_result.second); - yet_unsent_message_count_[chat_id]++; auto &query = *pending_send_message_queries_[query_id]; query.awaited_message_count++; diff --git a/telegram-bot-api/Client.h b/telegram-bot-api/Client.h index 31a95d3..5249719 100644 --- a/telegram-bot-api/Client.h +++ b/telegram-bot-api/Client.h @@ -71,7 +71,7 @@ class Client final : public WebhookActor::Callback { static constexpr int32 MAX_CERTIFICATE_FILE_SIZE = 3 << 20; static constexpr int32 MAX_DOWNLOAD_FILE_SIZE = 20 << 20; - static constexpr int32 MAX_CONCURRENTLY_SENT_CHAT_MESSAGES = 1000; // some unreasonably big value + static constexpr int32 MAX_CONCURRENTLY_SENT_CHAT_MESSAGES = 250; // some unreasonably big value static constexpr std::size_t MIN_PENDING_UPDATES_WARNING = 200; @@ -474,6 +474,8 @@ class Client final : public WebhookActor::Callback { static td::Result get_user_id(const Query *query, Slice field_name = Slice("user_id")); + void decrease_yet_unsent_message_count(int64 chat_id, int32 count); + int64 extract_yet_unsent_message_query_id(int64 chat_id, int64 message_id, bool *is_reply_to_message_deleted); void on_message_send_succeeded(object_ptr &&message, int64 old_message_id);