From 249598a711feb7c1c9ad87cee306f754afb8f3e7 Mon Sep 17 00:00:00 2001 From: levlam Date: Thu, 4 Jan 2024 19:30:58 +0300 Subject: [PATCH] Move get_common_dialogs to CommonDialogManager. --- td/telegram/CommonDialogManager.cpp | 202 ++++++++++++++++++++++++++++ td/telegram/CommonDialogManager.h | 31 +++++ td/telegram/ContactsManager.cpp | 3 +- td/telegram/MessagesManager.cpp | 191 +------------------------- td/telegram/MessagesManager.h | 16 --- td/telegram/Td.cpp | 4 +- 6 files changed, 239 insertions(+), 208 deletions(-) diff --git a/td/telegram/CommonDialogManager.cpp b/td/telegram/CommonDialogManager.cpp index 8c782de54..2892f5d13 100644 --- a/td/telegram/CommonDialogManager.cpp +++ b/td/telegram/CommonDialogManager.cpp @@ -6,13 +6,215 @@ // #include "td/telegram/CommonDialogManager.h" +#include "td/telegram/ContactsManager.h" +#include "td/telegram/DialogManager.h" +#include "td/telegram/Global.h" +#include "td/telegram/Td.h" + +#include "td/utils/algorithm.h" +#include "td/utils/buffer.h" +#include "td/utils/logging.h" + +#include + namespace td { +class GetCommonDialogsQuery final : public Td::ResultHandler { + Promise promise_; + UserId user_id_; + int64 offset_chat_id_ = 0; + + public: + explicit GetCommonDialogsQuery(Promise &&promise) : promise_(std::move(promise)) { + } + + void send(UserId user_id, tl_object_ptr &&input_user, int64 offset_chat_id, int32 limit) { + user_id_ = user_id; + offset_chat_id_ = offset_chat_id; + + send_query(G()->net_query_creator().create( + telegram_api::messages_getCommonChats(std::move(input_user), offset_chat_id, limit))); + } + + void on_result(BufferSlice packet) final { + auto result_ptr = fetch_result(packet); + if (result_ptr.is_error()) { + return on_error(result_ptr.move_as_error()); + } + + auto chats_ptr = result_ptr.move_as_ok(); + LOG(INFO) << "Receive result for GetCommonDialogsQuery: " << to_string(chats_ptr); + switch (chats_ptr->get_id()) { + case telegram_api::messages_chats::ID: { + auto chats = move_tl_object_as(chats_ptr); + td_->common_dialog_manager_->on_get_common_dialogs(user_id_, offset_chat_id_, std::move(chats->chats_), + narrow_cast(chats->chats_.size())); + break; + } + case telegram_api::messages_chatsSlice::ID: { + auto chats = move_tl_object_as(chats_ptr); + td_->common_dialog_manager_->on_get_common_dialogs(user_id_, offset_chat_id_, std::move(chats->chats_), + chats->count_); + break; + } + default: + UNREACHABLE(); + } + + promise_.set_value(Unit()); + } + + void on_error(Status status) final { + promise_.set_error(std::move(status)); + } +}; + CommonDialogManager::CommonDialogManager(Td *td, ActorShared<> parent) : td_(td), parent_(std::move(parent)) { } +CommonDialogManager::~CommonDialogManager() { + Scheduler::instance()->destroy_on_scheduler(G()->get_gc_scheduler_id(), found_common_dialogs_); +} + void CommonDialogManager::tear_down() { parent_.reset(); } +void CommonDialogManager::drop_common_dialogs_cache(UserId user_id) { + auto it = found_common_dialogs_.find(user_id); + if (it != found_common_dialogs_.end()) { + it->second.is_outdated = true; + } +} + +std::pair> CommonDialogManager::get_common_dialogs(UserId user_id, DialogId offset_dialog_id, + int32 limit, bool force, + Promise &&promise) { + auto r_input_user = td_->contacts_manager_->get_input_user(user_id); + if (r_input_user.is_error()) { + promise.set_error(r_input_user.move_as_error()); + return {}; + } + + if (user_id == td_->contacts_manager_->get_my_id()) { + promise.set_error(Status::Error(400, "Can't get common chats with self")); + return {}; + } + if (limit <= 0) { + promise.set_error(Status::Error(400, "Parameter limit must be positive")); + return {}; + } + if (limit > MAX_GET_DIALOGS) { + limit = MAX_GET_DIALOGS; + } + + int64 offset_chat_id = 0; + switch (offset_dialog_id.get_type()) { + case DialogType::Chat: + offset_chat_id = offset_dialog_id.get_chat_id().get(); + break; + case DialogType::Channel: + offset_chat_id = offset_dialog_id.get_channel_id().get(); + break; + case DialogType::None: + if (offset_dialog_id == DialogId()) { + break; + } + // fallthrough + case DialogType::User: + case DialogType::SecretChat: + promise.set_error(Status::Error(400, "Wrong offset_chat_id")); + return {}; + default: + UNREACHABLE(); + break; + } + + auto it = found_common_dialogs_.find(user_id); + if (it != found_common_dialogs_.end() && !it->second.dialog_ids.empty()) { + int32 total_count = it->second.total_count; + vector &common_dialog_ids = it->second.dialog_ids; + bool use_cache = (!it->second.is_outdated && it->second.receive_time >= Time::now() - 3600) || force || + offset_chat_id != 0 || common_dialog_ids.size() >= static_cast(MAX_GET_DIALOGS); + // use cache if it is up-to-date, or we required to use it or we can't update it + if (use_cache) { + auto offset_it = common_dialog_ids.begin(); + if (offset_dialog_id != DialogId()) { + offset_it = std::find(common_dialog_ids.begin(), common_dialog_ids.end(), offset_dialog_id); + if (offset_it == common_dialog_ids.end()) { + promise.set_error(Status::Error(400, "Wrong offset_chat_id")); + return {}; + } + ++offset_it; + } + vector result; + while (result.size() < static_cast(limit)) { + if (offset_it == common_dialog_ids.end()) { + break; + } + auto dialog_id = *offset_it++; + if (dialog_id == DialogId()) { // end of the list + promise.set_value(Unit()); + return {total_count, std::move(result)}; + } + result.push_back(dialog_id); + } + if (result.size() == static_cast(limit) || force) { + promise.set_value(Unit()); + return {total_count, std::move(result)}; + } + } + } + + td_->create_handler(std::move(promise)) + ->send(user_id, r_input_user.move_as_ok(), offset_chat_id, MAX_GET_DIALOGS); + return {}; +} + +void CommonDialogManager::on_get_common_dialogs(UserId user_id, int64 offset_chat_id, + vector> &&chats, int32 total_count) { + CHECK(user_id.is_valid()); + td_->contacts_manager_->on_update_user_common_chat_count(user_id, total_count); + + auto &common_dialogs = found_common_dialogs_[user_id]; + if (common_dialogs.is_outdated && offset_chat_id == 0 && + common_dialogs.dialog_ids.size() < static_cast(MAX_GET_DIALOGS)) { + // drop outdated cache if possible + common_dialogs = CommonDialogs(); + } + if (common_dialogs.receive_time == 0) { + common_dialogs.receive_time = Time::now(); + } + common_dialogs.is_outdated = false; + auto &result = common_dialogs.dialog_ids; + if (!result.empty() && result.back() == DialogId()) { + return; + } + bool is_last = chats.empty() && offset_chat_id == 0; + for (auto &chat : chats) { + auto dialog_id = ContactsManager::get_dialog_id(chat); + if (!dialog_id.is_valid()) { + LOG(ERROR) << "Receive invalid " << to_string(chat); + continue; + } + td_->contacts_manager_->on_get_chat(std::move(chat), "on_get_common_dialogs"); + + if (!td::contains(result, dialog_id)) { + td_->dialog_manager_->force_create_dialog(dialog_id, "get common dialogs"); + result.push_back(dialog_id); + } + } + if (result.size() >= static_cast(total_count) || is_last) { + if (result.size() != static_cast(total_count)) { + LOG(ERROR) << "Fix total count of common groups with " << user_id << " from " << total_count << " to " + << result.size(); + total_count = narrow_cast(result.size()); + td_->contacts_manager_->on_update_user_common_chat_count(user_id, total_count); + } + + result.emplace_back(); + } + common_dialogs.total_count = total_count; +} + } // namespace td diff --git a/td/telegram/CommonDialogManager.h b/td/telegram/CommonDialogManager.h index 5135d698e..b4f28ddfb 100644 --- a/td/telegram/CommonDialogManager.h +++ b/td/telegram/CommonDialogManager.h @@ -6,9 +6,17 @@ // #pragma once +#include "td/telegram/DialogId.h" +#include "td/telegram/telegram_api.h" +#include "td/telegram/UserId.h" + #include "td/actor/actor.h" #include "td/utils/common.h" +#include "td/utils/FlatHashMap.h" +#include "td/utils/Promise.h" + +#include namespace td { @@ -17,10 +25,33 @@ class Td; class CommonDialogManager final : public Actor { public: CommonDialogManager(Td *td, ActorShared<> parent); + CommonDialogManager(const CommonDialogManager &) = delete; + CommonDialogManager &operator=(const CommonDialogManager &) = delete; + CommonDialogManager(CommonDialogManager &&) = delete; + CommonDialogManager &operator=(CommonDialogManager &&) = delete; + ~CommonDialogManager() final; + + void on_get_common_dialogs(UserId user_id, int64 offset_chat_id, vector> &&chats, + int32 total_count); + + void drop_common_dialogs_cache(UserId user_id); + + std::pair> get_common_dialogs(UserId user_id, DialogId offset_dialog_id, int32 limit, + bool force, Promise &&promise); private: void tear_down() final; + static constexpr int32 MAX_GET_DIALOGS = 100; // server side limit + + struct CommonDialogs { + vector dialog_ids; + double receive_time = 0; + int32 total_count = 0; + bool is_outdated = false; + }; + FlatHashMap found_common_dialogs_; + Td *td_; ActorShared<> parent_; }; diff --git a/td/telegram/ContactsManager.cpp b/td/telegram/ContactsManager.cpp index 737e85755..b66fb171d 100644 --- a/td/telegram/ContactsManager.cpp +++ b/td/telegram/ContactsManager.cpp @@ -12,6 +12,7 @@ #include "td/telegram/BlockListId.h" #include "td/telegram/BotMenuButton.h" #include "td/telegram/ChannelParticipantFilter.h" +#include "td/telegram/CommonDialogManager.h" #include "td/telegram/ConfigManager.h" #include "td/telegram/Dependencies.h" #include "td/telegram/DialogInviteLink.h" @@ -13129,7 +13130,7 @@ void ContactsManager::update_user_full(UserFull *user_full, UserId user_id, cons unavailable_user_fulls_.erase(user_id); // don't needed anymore if (user_full->is_common_chat_count_changed) { - td_->messages_manager_->drop_common_dialogs_cache(user_id); + td_->common_dialog_manager_->drop_common_dialogs_cache(user_id); user_full->is_common_chat_count_changed = false; } if (true) { diff --git a/td/telegram/MessagesManager.cpp b/td/telegram/MessagesManager.cpp index dae1ced83..c2cb80046 100644 --- a/td/telegram/MessagesManager.cpp +++ b/td/telegram/MessagesManager.cpp @@ -668,56 +668,6 @@ class SearchPublicDialogsQuery final : public Td::ResultHandler { } }; -class GetCommonDialogsQuery final : public Td::ResultHandler { - Promise promise_; - UserId user_id_; - int64 offset_chat_id_ = 0; - - public: - explicit GetCommonDialogsQuery(Promise &&promise) : promise_(std::move(promise)) { - } - - void send(UserId user_id, tl_object_ptr &&input_user, int64 offset_chat_id, int32 limit) { - user_id_ = user_id; - offset_chat_id_ = offset_chat_id; - - send_query(G()->net_query_creator().create( - telegram_api::messages_getCommonChats(std::move(input_user), offset_chat_id, limit))); - } - - void on_result(BufferSlice packet) final { - auto result_ptr = fetch_result(packet); - if (result_ptr.is_error()) { - return on_error(result_ptr.move_as_error()); - } - - auto chats_ptr = result_ptr.move_as_ok(); - LOG(INFO) << "Receive result for GetCommonDialogsQuery: " << to_string(chats_ptr); - switch (chats_ptr->get_id()) { - case telegram_api::messages_chats::ID: { - auto chats = move_tl_object_as(chats_ptr); - td_->messages_manager_->on_get_common_dialogs(user_id_, offset_chat_id_, std::move(chats->chats_), - narrow_cast(chats->chats_.size())); - break; - } - case telegram_api::messages_chatsSlice::ID: { - auto chats = move_tl_object_as(chats_ptr); - td_->messages_manager_->on_get_common_dialogs(user_id_, offset_chat_id_, std::move(chats->chats_), - chats->count_); - break; - } - default: - UNREACHABLE(); - } - - promise_.set_value(Unit()); - } - - void on_error(Status status) final { - promise_.set_error(std::move(status)); - } -}; - class GetBlockedDialogsQuery final : public Td::ResultHandler { Promise> promise_; int32 offset_; @@ -5946,8 +5896,8 @@ MessagesManager::~MessagesManager() { Scheduler::instance()->destroy_on_scheduler( G()->get_gc_scheduler_id(), ttl_nodes_, ttl_heap_, being_sent_messages_, update_message_ids_, update_scheduled_message_ids_, message_id_to_dialog_id_, last_clear_history_message_id_to_dialog_id_, dialogs_, - postponed_chat_read_inbox_updates_, found_public_dialogs_, found_on_server_dialogs_, found_common_dialogs_, - message_embedding_codes_[0], message_embedding_codes_[1], message_to_replied_media_timestamp_messages_, + postponed_chat_read_inbox_updates_, found_public_dialogs_, found_on_server_dialogs_, message_embedding_codes_[0], + message_embedding_codes_[1], message_to_replied_media_timestamp_messages_, story_to_replied_media_timestamp_messages_, notification_group_id_to_dialog_id_, pending_get_channel_differences_, active_get_channel_differences_, get_channel_difference_to_log_event_id_, channel_get_difference_retry_timeouts_, is_channel_difference_finished_, expected_channel_pts_, expected_channel_max_message_id_, resolved_usernames_, @@ -17171,143 +17121,6 @@ vector MessagesManager::search_dialogs_on_server(const string &query, return {}; } -void MessagesManager::drop_common_dialogs_cache(UserId user_id) { - auto it = found_common_dialogs_.find(user_id); - if (it != found_common_dialogs_.end()) { - it->second.is_outdated = true; - } -} - -std::pair> MessagesManager::get_common_dialogs(UserId user_id, DialogId offset_dialog_id, - int32 limit, bool force, - Promise &&promise) { - auto r_input_user = td_->contacts_manager_->get_input_user(user_id); - if (r_input_user.is_error()) { - promise.set_error(r_input_user.move_as_error()); - return {}; - } - - if (user_id == td_->contacts_manager_->get_my_id()) { - promise.set_error(Status::Error(400, "Can't get common chats with self")); - return {}; - } - if (limit <= 0) { - promise.set_error(Status::Error(400, "Parameter limit must be positive")); - return {}; - } - if (limit > MAX_GET_DIALOGS) { - limit = MAX_GET_DIALOGS; - } - - int64 offset_chat_id = 0; - switch (offset_dialog_id.get_type()) { - case DialogType::Chat: - offset_chat_id = offset_dialog_id.get_chat_id().get(); - break; - case DialogType::Channel: - offset_chat_id = offset_dialog_id.get_channel_id().get(); - break; - case DialogType::None: - if (offset_dialog_id == DialogId()) { - break; - } - // fallthrough - case DialogType::User: - case DialogType::SecretChat: - promise.set_error(Status::Error(400, "Wrong offset_chat_id")); - return {}; - default: - UNREACHABLE(); - break; - } - - auto it = found_common_dialogs_.find(user_id); - if (it != found_common_dialogs_.end() && !it->second.dialog_ids.empty()) { - int32 total_count = it->second.total_count; - vector &common_dialog_ids = it->second.dialog_ids; - bool use_cache = (!it->second.is_outdated && it->second.receive_time >= Time::now() - 3600) || force || - offset_chat_id != 0 || common_dialog_ids.size() >= static_cast(MAX_GET_DIALOGS); - // use cache if it is up-to-date, or we required to use it or we can't update it - if (use_cache) { - auto offset_it = common_dialog_ids.begin(); - if (offset_dialog_id != DialogId()) { - offset_it = std::find(common_dialog_ids.begin(), common_dialog_ids.end(), offset_dialog_id); - if (offset_it == common_dialog_ids.end()) { - promise.set_error(Status::Error(400, "Wrong offset_chat_id")); - return {}; - } - ++offset_it; - } - vector result; - while (result.size() < static_cast(limit)) { - if (offset_it == common_dialog_ids.end()) { - break; - } - auto dialog_id = *offset_it++; - if (dialog_id == DialogId()) { // end of the list - promise.set_value(Unit()); - return {total_count, std::move(result)}; - } - result.push_back(dialog_id); - } - if (result.size() == static_cast(limit) || force) { - promise.set_value(Unit()); - return {total_count, std::move(result)}; - } - } - } - - td_->create_handler(std::move(promise)) - ->send(user_id, r_input_user.move_as_ok(), offset_chat_id, MAX_GET_DIALOGS); - return {}; -} - -void MessagesManager::on_get_common_dialogs(UserId user_id, int64 offset_chat_id, - vector> &&chats, int32 total_count) { - CHECK(user_id.is_valid()); - td_->contacts_manager_->on_update_user_common_chat_count(user_id, total_count); - - auto &common_dialogs = found_common_dialogs_[user_id]; - if (common_dialogs.is_outdated && offset_chat_id == 0 && - common_dialogs.dialog_ids.size() < static_cast(MAX_GET_DIALOGS)) { - // drop outdated cache if possible - common_dialogs = CommonDialogs(); - } - if (common_dialogs.receive_time == 0) { - common_dialogs.receive_time = Time::now(); - } - common_dialogs.is_outdated = false; - auto &result = common_dialogs.dialog_ids; - if (!result.empty() && result.back() == DialogId()) { - return; - } - bool is_last = chats.empty() && offset_chat_id == 0; - for (auto &chat : chats) { - auto dialog_id = ContactsManager::get_dialog_id(chat); - if (!dialog_id.is_valid()) { - LOG(ERROR) << "Receive invalid " << to_string(chat); - continue; - } - td_->contacts_manager_->on_get_chat(std::move(chat), "on_get_common_dialogs"); - - if (!td::contains(result, dialog_id)) { - force_create_dialog(dialog_id, "get common dialogs"); - result.push_back(dialog_id); - } - } - if (result.size() >= static_cast(total_count) || is_last) { - if (result.size() != static_cast(total_count)) { - LOG(ERROR) << "Fix total count of common groups with " << user_id << " from " << total_count << " to " - << result.size(); - total_count = narrow_cast(result.size()); - td_->contacts_manager_->on_update_user_common_chat_count(user_id, total_count); - } - - result.emplace_back(); - } - common_dialogs.total_count = total_count; -} - void MessagesManager::block_message_sender_from_replies(MessageId message_id, bool need_delete_message, bool need_delete_all_messages, bool report_spam, Promise &&promise) { diff --git a/td/telegram/MessagesManager.h b/td/telegram/MessagesManager.h index 83d87445c..06031910a 100644 --- a/td/telegram/MessagesManager.h +++ b/td/telegram/MessagesManager.h @@ -238,9 +238,6 @@ class MessagesManager final : public Actor { int32 total_count, vector> &&messages, Promise &&promise); - void on_get_common_dialogs(UserId user_id, int64 offset_chat_id, vector> &&chats, - int32 total_count); - bool on_update_message_id(int64 random_id, MessageId new_message_id, const char *source); void on_update_dialog_draft_message(DialogId dialog_id, MessageId top_thread_message_id, @@ -575,11 +572,6 @@ class MessagesManager final : public Actor { vector search_dialogs_on_server(const string &query, int32 limit, Promise &&promise); - void drop_common_dialogs_cache(UserId user_id); - - std::pair> get_common_dialogs(UserId user_id, DialogId offset_dialog_id, int32 limit, - bool force, Promise &&promise); - void block_message_sender_from_replies(MessageId message_id, bool need_delete_message, bool need_delete_all_messages, bool report_spam, Promise &&promise); @@ -3367,14 +3359,6 @@ class MessagesManager final : public Actor { FlatHashMap> found_public_dialogs_; // TODO time bound cache FlatHashMap> found_on_server_dialogs_; // TODO time bound cache - struct CommonDialogs { - vector dialog_ids; - double receive_time = 0; - int32 total_count = 0; - bool is_outdated = false; - }; - FlatHashMap found_common_dialogs_; - FlatHashMap get_dialog_message_by_date_results_; FlatHashMap> found_dialog_message_calendars_; diff --git a/td/telegram/Td.cpp b/td/telegram/Td.cpp index 367e90621..45206e830 100644 --- a/td/telegram/Td.cpp +++ b/td/telegram/Td.cpp @@ -870,8 +870,8 @@ class GetGroupsInCommonRequest final : public RequestActor<> { std::pair> dialog_ids_; void do_run(Promise &&promise) final { - dialog_ids_ = td_->messages_manager_->get_common_dialogs(user_id_, offset_dialog_id_, limit_, get_tries() < 2, - std::move(promise)); + dialog_ids_ = td_->common_dialog_manager_->get_common_dialogs(user_id_, offset_dialog_id_, limit_, get_tries() < 2, + std::move(promise)); } void do_send_result() final {