From 3a1aaa11486f319aec3dbc691cc48e8eabd47da8 Mon Sep 17 00:00:00 2001 From: levlam Date: Thu, 16 Mar 2023 17:54:33 +0300 Subject: [PATCH] Move suffix load out of Dialog. --- td/telegram/MessagesManager.cpp | 114 +++++++++++++++++++------------- td/telegram/MessagesManager.h | 25 ++++--- 2 files changed, 83 insertions(+), 56 deletions(-) diff --git a/td/telegram/MessagesManager.cpp b/td/telegram/MessagesManager.cpp index 24bfbf519..4e0cb447f 100644 --- a/td/telegram/MessagesManager.cpp +++ b/td/telegram/MessagesManager.cpp @@ -15142,8 +15142,11 @@ void MessagesManager::set_dialog_last_message_id(Dialog *d, MessageId last_messa } } if (!last_message_id.is_valid()) { - d->suffix_load_first_message_id_ = MessageId(); - d->suffix_load_done_ = false; + auto it = dialog_suffix_load_queries_.find(d->dialog_id); + if (it != dialog_suffix_load_queries_.end()) { + it->second->suffix_load_first_message_id_ = MessageId(); + it->second->suffix_load_done_ = false; + } } if (last_message_id.is_valid() && d->delete_last_message_date != 0) { d->delete_last_message_date = 0; @@ -16198,11 +16201,17 @@ bool MessagesManager::can_unload_message(const Dialog *d, const Message *m) cons // don't want to unload messages from the last album // can't unload from memory last dialog, last database messages, yet unsent messages, being edited media messages and active live locations // can't unload messages in dialog with active suffix load query + { + auto it = dialog_suffix_load_queries_.find(d->dialog_id); + if (it != dialog_suffix_load_queries_.end() && !it->second->suffix_load_queries_.empty()) { + return false; + } + } return d->open_count == 0 && m->message_id != d->last_message_id && m->message_id != d->last_database_message_id && !m->message_id.is_yet_unsent() && active_live_location_full_message_ids_.count(full_message_id) == 0 && replied_by_yet_unsent_messages_.count(full_message_id) == 0 && m->edited_content == nullptr && - d->suffix_load_queries_.empty() && m->message_id != d->reply_markup_message_id && - m->message_id != d->last_pinned_message_id && m->message_id != d->last_edited_message_id && + m->message_id != d->reply_markup_message_id && m->message_id != d->last_pinned_message_id && + m->message_id != d->last_edited_message_id && (m->media_album_id != d->last_media_album_id || m->media_album_id == 0); } @@ -16547,22 +16556,24 @@ unique_ptr MessagesManager::do_delete_message(Dialog * set_dialog_first_database_message_id(d, MessageId(), "do_delete_message"); } - if (message_id == d->suffix_load_first_message_id_) { + auto suffix_load_queries_it = dialog_suffix_load_queries_.find(d->dialog_id); + if (suffix_load_queries_it != dialog_suffix_load_queries_.end() && + message_id == suffix_load_queries_it->second->suffix_load_first_message_id_) { MessagesConstIterator it(d, message_id); CHECK(*it == m); if ((*it)->have_previous) { --it; if (*it != nullptr) { - d->suffix_load_first_message_id_ = (*it)->message_id; + suffix_load_queries_it->second->suffix_load_first_message_id_ = (*it)->message_id; } else { LOG(ERROR) << "Have have_previous is true, but there is no previous for " << full_message_id << " from " << source; - d->suffix_load_first_message_id_ = MessageId(); - d->suffix_load_done_ = false; + suffix_load_queries_it->second->suffix_load_first_message_id_ = MessageId(); + suffix_load_queries_it->second->suffix_load_done_ = false; } } else { - d->suffix_load_first_message_id_ = MessageId(); - d->suffix_load_done_ = false; + suffix_load_queries_it->second->suffix_load_first_message_id_ = MessageId(); + suffix_load_queries_it->second->suffix_load_done_ = false; } } } else { @@ -16570,10 +16581,13 @@ unique_ptr MessagesManager::do_delete_message(Dialog * CHECK(td_->auth_manager_->is_bot() && !G()->use_message_database()); set_dialog_last_message_id(d, MessageId(), "do_delete_message"); } - } - if (only_from_memory && message_id >= d->suffix_load_first_message_id_) { - d->suffix_load_first_message_id_ = MessageId(); - d->suffix_load_done_ = false; + + auto suffix_load_queries_it = dialog_suffix_load_queries_.find(d->dialog_id); + if (suffix_load_queries_it != dialog_suffix_load_queries_.end() && + message_id >= suffix_load_queries_it->second->suffix_load_first_message_id_) { + suffix_load_queries_it->second->suffix_load_first_message_id_ = MessageId(); + suffix_load_queries_it->second->suffix_load_done_ = false; + } } if (m->have_previous && (only_from_memory || !m->have_next)) { @@ -41117,24 +41131,25 @@ void MessagesManager::clear_recently_found_dialogs() { recently_found_dialogs_.clear_dialogs(); } -void MessagesManager::suffix_load_loop(Dialog *d) { - if (d->suffix_load_has_query_) { +void MessagesManager::suffix_load_loop(const Dialog *d, SuffixLoadQueries *queries) { + CHECK(queries != nullptr); + if (queries->suffix_load_has_query_) { return; } - - if (d->suffix_load_queries_.empty()) { + if (queries->suffix_load_queries_.empty()) { return; } - CHECK(!d->suffix_load_done_); + CHECK(!queries->suffix_load_done_); + CHECK(d != nullptr); auto dialog_id = d->dialog_id; - auto from_message_id = d->suffix_load_first_message_id_; + auto from_message_id = queries->suffix_load_first_message_id_; LOG(INFO) << "Send suffix load query in " << dialog_id << " from " << from_message_id; auto promise = PromiseCreator::lambda([actor_id = actor_id(this), dialog_id](Result result) { send_closure(actor_id, &MessagesManager::suffix_load_query_ready, dialog_id); }); - d->suffix_load_has_query_ = true; - d->suffix_load_query_message_id_ = from_message_id; + queries->suffix_load_has_query_ = true; + queries->suffix_load_query_message_id_ = from_message_id; if (from_message_id.is_valid()) { get_history_impl(d, from_message_id, -1, 100, true, true, std::move(promise)); } else { @@ -41143,56 +41158,65 @@ void MessagesManager::suffix_load_loop(Dialog *d) { } } -void MessagesManager::suffix_load_update_first_message_id(Dialog *d) { - if (!d->suffix_load_first_message_id_.is_valid()) { +void MessagesManager::suffix_load_update_first_message_id(const Dialog *d, SuffixLoadQueries *queries) { + CHECK(d != nullptr); + CHECK(queries != nullptr); + if (!queries->suffix_load_first_message_id_.is_valid()) { if (!d->last_message_id.is_valid()) { return; } - d->suffix_load_first_message_id_ = d->last_message_id; + queries->suffix_load_first_message_id_ = d->last_message_id; } - auto it = MessagesConstIterator(d, d->suffix_load_first_message_id_); + auto it = MessagesConstIterator(d, queries->suffix_load_first_message_id_); CHECK(*it != nullptr); - CHECK((*it)->message_id == d->suffix_load_first_message_id_); + CHECK((*it)->message_id == queries->suffix_load_first_message_id_); while ((*it)->have_previous) { --it; } - d->suffix_load_first_message_id_ = (*it)->message_id; + queries->suffix_load_first_message_id_ = (*it)->message_id; } void MessagesManager::suffix_load_query_ready(DialogId dialog_id) { + auto *queries = dialog_suffix_load_queries_[dialog_id].get(); + CHECK(queries != nullptr); + CHECK(queries->suffix_load_has_query_); + LOG(INFO) << "Finished suffix load query in " << dialog_id; auto *d = get_dialog(dialog_id); - CHECK(d != nullptr); - bool is_unchanged = d->suffix_load_first_message_id_ == d->suffix_load_query_message_id_; - suffix_load_update_first_message_id(d); - if (is_unchanged && d->suffix_load_first_message_id_ == d->suffix_load_query_message_id_) { + bool is_unchanged = queries->suffix_load_first_message_id_ == queries->suffix_load_query_message_id_; + suffix_load_update_first_message_id(d, queries); + if (is_unchanged && queries->suffix_load_first_message_id_ == queries->suffix_load_query_message_id_) { LOG(INFO) << "Finished suffix load in " << dialog_id; - d->suffix_load_done_ = true; + queries->suffix_load_done_ = true; } - d->suffix_load_has_query_ = false; + queries->suffix_load_has_query_ = false; // Remove ready queries - auto *m = get_message_force(d, d->suffix_load_first_message_id_, "suffix_load_query_ready"); - auto ready_it = std::partition(d->suffix_load_queries_.begin(), d->suffix_load_queries_.end(), - [&](auto &value) { return !(d->suffix_load_done_ || value.second(m)); }); - for (auto it = ready_it; it != d->suffix_load_queries_.end(); ++it) { + auto *m = get_message_force(d, queries->suffix_load_first_message_id_, "suffix_load_query_ready"); + auto ready_it = std::partition(queries->suffix_load_queries_.begin(), queries->suffix_load_queries_.end(), + [&](auto &value) { return !(queries->suffix_load_done_ || value.second(m)); }); + for (auto it = ready_it; it != queries->suffix_load_queries_.end(); ++it) { it->first.set_value(Unit()); } - d->suffix_load_queries_.erase(ready_it, d->suffix_load_queries_.end()); + queries->suffix_load_queries_.erase(ready_it, queries->suffix_load_queries_.end()); - suffix_load_loop(d); + suffix_load_loop(d, queries); } void MessagesManager::suffix_load_add_query(Dialog *d, std::pair, std::function> query) { - suffix_load_update_first_message_id(d); - auto *m = get_message_force(d, d->suffix_load_first_message_id_, "suffix_load_add_query"); - if (d->suffix_load_done_ || query.second(m)) { + auto &queries = dialog_suffix_load_queries_[d->dialog_id]; + if (queries == nullptr) { + queries = make_unique(); + } + suffix_load_update_first_message_id(d, queries.get()); + auto *m = get_message_force(d, queries->suffix_load_first_message_id_, "suffix_load_add_query"); + if (queries->suffix_load_done_ || query.second(m)) { query.first.set_value(Unit()); } else { - d->suffix_load_queries_.emplace_back(std::move(query)); - suffix_load_loop(d); + queries->suffix_load_queries_.emplace_back(std::move(query)); + suffix_load_loop(d, queries.get()); } } diff --git a/td/telegram/MessagesManager.h b/td/telegram/MessagesManager.h index 9a649a6ac..114bc0107 100644 --- a/td/telegram/MessagesManager.h +++ b/td/telegram/MessagesManager.h @@ -1403,9 +1403,6 @@ class MessagesManager final : public Actor { bool has_unload_timeout = false; bool is_channel_difference_finished = false; - bool suffix_load_done_ = false; - bool suffix_load_has_query_ = false; - int32 pts = 0; // for channels only int32 pending_read_channel_inbox_pts = 0; // for channels only int32 pending_read_channel_inbox_server_unread_count = 0; // for channels only @@ -1431,12 +1428,6 @@ class MessagesManager final : public Actor { string client_data; - // Load from newest to oldest message - MessageId suffix_load_first_message_id_; // identifier of some message such all suffix messages in range - // [suffix_load_first_message_id_, last_message_id] are loaded - MessageId suffix_load_query_message_id_; - std::vector, std::function>> suffix_load_queries_; - unique_ptr messages; unique_ptr scheduled_messages; @@ -1736,6 +1727,16 @@ class MessagesManager final : public Actor { } }; + struct SuffixLoadQueries { + bool suffix_load_done_ = false; + bool suffix_load_has_query_ = false; + + MessageId suffix_load_first_message_id_; // identifier of some message such all suffix messages in range + // [suffix_load_first_message_id_, last_message_id] are loaded + MessageId suffix_load_query_message_id_; + vector, std::function>> suffix_load_queries_; + }; + class BlockMessageSenderFromRepliesOnServerLogEvent; class DeleteAllCallMessagesOnServerLogEvent; class DeleteAllChannelMessagesFromSenderOnServerLogEvent; @@ -3346,8 +3347,8 @@ class MessagesManager final : public Actor { static uint64 save_unpin_all_dialog_messages_on_server_log_event(DialogId dialog_id); - void suffix_load_loop(Dialog *d); - static void suffix_load_update_first_message_id(Dialog *d); + void suffix_load_loop(const Dialog *d, SuffixLoadQueries *queries); + void suffix_load_update_first_message_id(const Dialog *d, SuffixLoadQueries *queries); void suffix_load_query_ready(DialogId dialog_id); void suffix_load_add_query(Dialog *d, std::pair, std::function> query); void suffix_load_till_date(Dialog *d, int32 date, Promise promise); @@ -3752,6 +3753,8 @@ class MessagesManager final : public Actor { FlatHashMap, FullMessageIdHash> yet_unsent_thread_message_ids_; // {dialog_id, top_thread_message_id} -> yet unsent message IDs + FlatHashMap, DialogIdHash> dialog_suffix_load_queries_; + struct PendingMessageView { FlatHashSet message_ids_; bool increment_view_counter_ = false;