diff --git a/td/telegram/MessagesManager.cpp b/td/telegram/MessagesManager.cpp index 490494a72..7a410f641 100644 --- a/td/telegram/MessagesManager.cpp +++ b/td/telegram/MessagesManager.cpp @@ -663,9 +663,9 @@ class UnpinAllMessagesQuery : public Td::ResultHandler { affected_history->pts_, affected_history->pts_count_, std::move(promise), "unpin all messages"); } else { - td->messages_manager_->add_pending_update(make_tl_object(), affected_history->pts_, - affected_history->pts_count_, std::move(promise), - "unpin all messages"); + td->updates_manager_->add_pending_pts_update(make_tl_object(), affected_history->pts_, + affected_history->pts_count_, std::move(promise), + "unpin all messages"); } } else if (affected_history->offset_ <= 0) { promise_.set_value(Unit()); @@ -1560,9 +1560,9 @@ class ReadMessagesContentsQuery : public Td::ResultHandler { CHECK(affected_messages->get_id() == telegram_api::messages_affectedMessages::ID); if (affected_messages->pts_count_ > 0) { - td->messages_manager_->add_pending_update(make_tl_object(), affected_messages->pts_, - affected_messages->pts_count_, Promise(), - "read messages content query"); + td->updates_manager_->add_pending_pts_update(make_tl_object(), affected_messages->pts_, + affected_messages->pts_count_, Promise(), + "read messages content query"); } promise_.set_value(Unit()); @@ -1778,8 +1778,9 @@ class ReadHistoryQuery : public Td::ResultHandler { LOG(INFO) << "Receive result for ReadHistoryQuery: " << to_string(affected_messages); if (affected_messages->pts_count_ > 0) { - td->messages_manager_->add_pending_update(make_tl_object(), affected_messages->pts_, - affected_messages->pts_count_, Promise(), "read history query"); + td->updates_manager_->add_pending_pts_update(make_tl_object(), affected_messages->pts_, + affected_messages->pts_count_, Promise(), + "read history query"); } promise_.set_value(Unit()); @@ -2247,8 +2248,9 @@ class DeleteHistoryQuery : public Td::ResultHandler { CHECK(affected_history->get_id() == telegram_api::messages_affectedHistory::ID); if (affected_history->pts_count_ > 0) { - td->messages_manager_->add_pending_update(make_tl_object(), affected_history->pts_, - affected_history->pts_count_, Promise(), "delete history query"); + td->updates_manager_->add_pending_pts_update(make_tl_object(), affected_history->pts_, + affected_history->pts_count_, Promise(), + "delete history query"); } if (affected_history->offset_ > 0) { @@ -2446,9 +2448,9 @@ class ReadAllMentionsQuery : public Td::ResultHandler { << dialog_id_; td->updates_manager_->get_difference("Wrong messages_readMentions result"); } else { - td->messages_manager_->add_pending_update(make_tl_object(), affected_history->pts_, - affected_history->pts_count_, Promise(), - "read all mentions query"); + td->updates_manager_->add_pending_pts_update(make_tl_object(), affected_history->pts_, + affected_history->pts_count_, Promise(), + "read all mentions query"); } } @@ -2582,7 +2584,7 @@ class SendMessageActor : public NetActorOnce { return; } - td->messages_manager_->add_pending_update( + td->updates_manager_->add_pending_pts_update( make_tl_object(random_id_, message_id, sent_message->date_), sent_message->pts_, sent_message->pts_count_, Promise(), "send message actor"); } @@ -3600,9 +3602,9 @@ class DeleteMessagesQuery : public Td::ResultHandler { CHECK(affected_messages->get_id() == telegram_api::messages_affectedMessages::ID); if (affected_messages->pts_count_ > 0) { - td->messages_manager_->add_pending_update(make_tl_object(), affected_messages->pts_, - affected_messages->pts_count_, Promise(), - "delete messages query"); + td->updates_manager_->add_pending_pts_update(make_tl_object(), affected_messages->pts_, + affected_messages->pts_count_, Promise(), + "delete messages query"); } if (--query_count_ == 0) { promise_.set_value(Unit()); @@ -6070,7 +6072,7 @@ tl_object_ptr MessagesManager::get_input_encry } } -bool MessagesManager::is_allowed_useless_update(const tl_object_ptr &update) const { +bool MessagesManager::is_allowed_useless_update(const tl_object_ptr &update) { auto constructor_id = update->get_id(); if (constructor_id == dummyUpdate::ID) { // allow dummyUpdate just in case @@ -6085,8 +6087,8 @@ bool MessagesManager::is_allowed_useless_update(const tl_object_ptr &&update, int32 new_pts, - int32 old_pts, int32 pts_count, const char *source) { +void MessagesManager::skip_old_pending_pts_update(tl_object_ptr &&update, int32 new_pts, + int32 old_pts, int32 pts_count, const char *source) { if (update->get_id() == telegram_api::updateNewMessage::ID) { auto update_new_message = static_cast(update.get()); auto full_message_id = get_full_message_id(update_new_message->message_, false); @@ -6125,156 +6127,6 @@ void MessagesManager::skip_old_pending_update(tl_object_ptr::max(); - if (!pending_pts_updates_.empty()) { - auto pts = pending_pts_updates_.begin()->first; - if (pts < result) { - result = pts; - } - } - if (!postponed_pts_updates_.empty()) { - auto pts = postponed_pts_updates_.begin()->first; - if (pts < result) { - result = pts; - } - } - return result; -} - -void MessagesManager::process_pts_update(tl_object_ptr &&update) { - CHECK(update != nullptr); - - // TODO need to save all updates that can change result of running queries not associated with pts (for example - // getHistory) and apply the updates to results of the queries - - if (!UpdatesManager::check_pts_update(update)) { - LOG(ERROR) << "Receive wrong pts update: " << oneline(to_string(update)); - return; - } - - // must be called only during getDifference - CHECK(pending_pts_updates_.empty()); - CHECK(accumulated_pts_ == -1); - - process_update(std::move(update)); -} - -void MessagesManager::add_pending_update(tl_object_ptr &&update, int32 new_pts, int32 pts_count, - Promise &&promise, const char *source) { - // do not try to run getDifference from this function - CHECK(update != nullptr); - CHECK(source != nullptr); - LOG(INFO) << "Receive from " << source << " pending " << to_string(update); - if (pts_count < 0 || new_pts <= pts_count) { - LOG(ERROR) << "Receive update with wrong pts = " << new_pts << " or pts_count = " << pts_count << " from " << source - << ": " << oneline(to_string(update)); - return promise.set_value(Unit()); - } - - // TODO need to save all updates that can change result of running queries not associated with pts (for example - // getHistory) and apply them to result of this queries - - if (!UpdatesManager::check_pts_update(update)) { - LOG(ERROR) << "Receive wrong pts update from " << source << ": " << oneline(to_string(update)); - return promise.set_value(Unit()); - } - - if (DROP_UPDATES) { - set_get_difference_timeout(1.0); - return promise.set_value(Unit()); - } - - int32 old_pts = td_->updates_manager_->get_pts(); - if (new_pts < old_pts - 99 && Slice(source) != "after get difference") { - bool need_restore_pts = new_pts < old_pts - 19999; - auto now = Time::now(); - if (now > last_pts_jump_warning_time_ + 1 && (need_restore_pts || now < last_pts_jump_warning_time_ + 5)) { - LOG(ERROR) << "Restore pts after delete_first_messages from " << old_pts << " to " << new_pts - << " is disabled, pts_count = " << pts_count << ", update is from " << source << ": " - << oneline(to_string(update)); - last_pts_jump_warning_time_ = now; - } - if (need_restore_pts) { - set_get_difference_timeout(0.001); - - /* - LOG(WARNING) << "Restore pts after delete_first_messages"; - td_->updates_manager_->set_pts(new_pts - 1, "restore pts after delete_first_messages"); - old_pts = td_->updates_manager_->get_pts(); - CHECK(old_pts == new_pts - 1); - */ - } - } - - if (new_pts <= old_pts || (old_pts >= 1 && new_pts > old_pts + 500000000)) { - skip_old_pending_update(std::move(update), new_pts, old_pts, pts_count, source); - return promise.set_value(Unit()); - } - - if (td_->updates_manager_->running_get_difference() || !postponed_pts_updates_.empty()) { - LOG(INFO) << "Save pending update got while running getDifference from " << source; - if (td_->updates_manager_->running_get_difference()) { - CHECK(update->get_id() == dummyUpdate::ID || update->get_id() == updateSentMessage::ID); - } - postpone_pts_update(std::move(update), new_pts, pts_count, std::move(promise)); - return; - } - - if (old_pts + pts_count > new_pts) { - LOG(WARNING) << "Have old_pts (= " << old_pts << ") + pts_count (= " << pts_count << ") > new_pts (= " << new_pts - << "). Logged in " << G()->shared_config().get_option_integer("authorization_date") << ". Update from " - << source << " = " << oneline(to_string(update)); - postpone_pts_update(std::move(update), new_pts, pts_count, std::move(promise)); - set_get_difference_timeout(0.001); - return; - } - - accumulated_pts_count_ += pts_count; - if (new_pts > accumulated_pts_) { - accumulated_pts_ = new_pts; - } - - if (old_pts + accumulated_pts_count_ > accumulated_pts_) { - LOG(WARNING) << "Have old_pts (= " << old_pts << ") + accumulated_pts_count (= " << accumulated_pts_count_ - << ") > accumulated_pts (= " << accumulated_pts_ << "). new_pts = " << new_pts - << ", pts_count = " << pts_count << ". Logged in " - << G()->shared_config().get_option_integer("authorization_date") << ". Update from " << source << " = " - << oneline(to_string(update)); - postpone_pts_update(std::move(update), new_pts, pts_count, std::move(promise)); - set_get_difference_timeout(0.001); - return; - } - - LOG_IF(INFO, pts_count == 0 && update->get_id() != dummyUpdate::ID) << "Skip useless update " << to_string(update); - - if (pending_pts_updates_.empty() && old_pts + accumulated_pts_count_ == accumulated_pts_ && - !pts_gap_timeout_.has_timeout()) { - if (pts_count > 0) { - process_update(std::move(update)); - - td_->updates_manager_->set_pts(accumulated_pts_, "process pending updates fast path") - .set_value(Unit()); // TODO can't set until get messages really stored on persistent storage - accumulated_pts_count_ = 0; - accumulated_pts_ = -1; - } - promise.set_value(Unit()); - return; - } - - pending_pts_updates_.emplace(new_pts, PendingPtsUpdate(std::move(update), new_pts, pts_count, std::move(promise))); - - if (old_pts + accumulated_pts_count_ < accumulated_pts_) { - set_get_difference_timeout(UpdatesManager::MAX_UNFILLED_GAP_TIME); - return; - } - - CHECK(old_pts + accumulated_pts_count_ == accumulated_pts_); - if (!pending_pts_updates_.empty()) { - process_pending_updates(); - } -} - MessagesManager::Dialog *MessagesManager::get_service_notifications_dialog() { UserId service_notifications_user_id = td_->contacts_manager_->add_service_notifications_user(); DialogId service_notifications_dialog_id(service_notifications_user_id); @@ -7084,11 +6936,11 @@ void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_p if (new_pts < old_pts - 19999 && !is_postponed_update) { // restore channel pts after delete_first_messages auto now = Time::now(); - if (now > last_pts_jump_warning_time_ + 1) { + if (now > last_channel_pts_jump_warning_time_ + 1) { LOG(ERROR) << "Restore pts in " << d->dialog_id << " from " << source << " after delete_first_messages from " << old_pts << " to " << new_pts << " is temporarily disabled, pts_count = " << pts_count << ", update is from " << source << ": " << oneline(to_string(update)); - last_pts_jump_warning_time_ = now; + last_channel_pts_jump_warning_time_ = now; } get_channel_difference(dialog_id, old_pts, true, "add_pending_channel_update old"); } @@ -7171,16 +7023,7 @@ bool MessagesManager::is_old_channel_update(DialogId dialog_id, int32 new_pts) { return new_pts <= (d == nullptr ? load_channel_pts(dialog_id) : d->pts); } -void MessagesManager::set_get_difference_timeout(double timeout) { - if (!pts_gap_timeout_.has_timeout()) { - LOG(INFO) << "Gap in pts has found, current pts is " << td_->updates_manager_->get_pts(); - pts_gap_timeout_.set_callback(std::move(UpdatesManager::fill_pts_gap)); - pts_gap_timeout_.set_callback_data(static_cast(td_)); - pts_gap_timeout_.set_timeout_in(timeout); - } -} - -void MessagesManager::process_update(tl_object_ptr &&update) { +void MessagesManager::process_pts_update(tl_object_ptr &&update) { switch (update->get_id()) { case dummyUpdate::ID: LOG(INFO) << "Process dummyUpdate"; @@ -7345,29 +7188,6 @@ void MessagesManager::on_message_edited(FullMessageId full_message_id, int32 pts update_used_hashtags(dialog_id, m); } -void MessagesManager::process_pending_updates() { - for (auto &update : pending_pts_updates_) { - process_update(std::move(update.second.update)); - update.second.promise.set_value(Unit()); - } - - td_->updates_manager_->set_pts(accumulated_pts_, "process pending updates") - .set_value(Unit()); // TODO can't set until get messages really stored on persistent storage - drop_pending_updates(); -} - -void MessagesManager::drop_pending_updates() { - accumulated_pts_count_ = 0; - accumulated_pts_ = -1; - pts_gap_timeout_.cancel_timeout(); - pending_pts_updates_.clear(); -} - -void MessagesManager::postpone_pts_update(tl_object_ptr &&update, int32 pts, int32 pts_count, - Promise &&promise) { - postponed_pts_updates_.emplace(pts, PendingPtsUpdate(std::move(update), pts, pts_count, std::move(promise))); -} - string MessagesManager::get_notification_settings_scope_database_key(NotificationSettingsScope scope) { switch (scope) { case NotificationSettingsScope::Private: @@ -8690,31 +8510,11 @@ void MessagesManager::before_get_difference() { // scheduled messages are not returned in getDifference, so we must always reget them after it scheduled_messages_sync_generation_++; - - postponed_pts_updates_.insert(std::make_move_iterator(pending_pts_updates_.begin()), - std::make_move_iterator(pending_pts_updates_.end())); - - drop_pending_updates(); } void MessagesManager::after_get_difference() { CHECK(!td_->updates_manager_->running_get_difference()); - if (postponed_pts_updates_.size()) { - auto postponed_updates = std::move(postponed_pts_updates_); - postponed_pts_updates_.clear(); - - LOG(INFO) << "Begin to apply " << postponed_updates.size() << " postponed pts updates"; - for (auto &postponed_update : postponed_updates) { - auto &update = postponed_update.second; - add_pending_update(std::move(update.update), update.pts, update.pts_count, std::move(update.promise), - "after get difference"); - CHECK(!td_->updates_manager_->running_get_difference()); - } - LOG(INFO) << "Finish to apply postponed pts updates, have " << postponed_pts_updates_.size() - << " left postponed updates"; - } - running_get_difference_ = false; if (!pending_on_get_dialogs_.empty()) { @@ -8777,13 +8577,13 @@ void MessagesManager::after_get_difference() { const Dialog *d = get_dialog(dialog_id); CHECK(d != nullptr); - if (dialog_id.get_type() == DialogType::Channel || pending_pts_updates_.empty() || message_id.is_scheduled() || + if (dialog_id.get_type() == DialogType::Channel || message_id.is_scheduled() || message_id <= d->last_new_message_id) { LOG(ERROR) << "Receive updateMessageId from " << it.second << " to " << full_message_id << " but not receive corresponding message, last_new_message_id = " << d->last_new_message_id; } if (dialog_id.get_type() != DialogType::Channel && - (pending_pts_updates_.empty() || message_id.is_scheduled() || message_id <= d->last_new_message_id)) { + (message_id.is_scheduled() || message_id <= d->last_new_message_id)) { dump_debug_message_op(get_dialog(dialog_id)); } if (message_id.is_scheduled() || message_id <= d->last_new_message_id) { @@ -11749,7 +11549,7 @@ void MessagesManager::init() { always_wait_for_mailbox(); start_time_ = Time::now(); - last_pts_jump_warning_time_ = start_time_ - 3600; + last_channel_pts_jump_warning_time_ = start_time_ - 3600; bool is_authorized = td_->auth_manager_->is_authorized(); bool was_authorized_user = td_->auth_manager_->was_authorized() && !td_->auth_manager_->is_bot(); @@ -27908,7 +27708,7 @@ FullMessageId MessagesManager::on_send_message_success(int64 random_id, MessageI FileId new_file_id, const char *source) { CHECK(source != nullptr); // do not try to run getDifference from this function - if (DROP_UPDATES) { + if (DROP_SEND_MESSAGE_UPDATES) { return {}; } if (!new_message_id.is_valid()) { diff --git a/td/telegram/MessagesManager.h b/td/telegram/MessagesManager.h index 599533366..eb72a145c 100644 --- a/td/telegram/MessagesManager.h +++ b/td/telegram/MessagesManager.h @@ -788,12 +788,10 @@ class MessagesManager : public Actor { tl_object_ptr get_messages_object(int32 total_count, const vector &full_message_ids, bool skip_not_found); - int32 get_min_pending_pts() const; - void process_pts_update(tl_object_ptr &&update); - void add_pending_update(tl_object_ptr &&update, int32 new_pts, int32 pts_count, - Promise &&promise, const char *source); + void skip_old_pending_pts_update(tl_object_ptr &&update, int32 new_pts, int32 old_pts, + int32 pts_count, const char *source); void add_pending_channel_update(DialogId dialog_id, tl_object_ptr &&update, int32 new_pts, int32 pts_count, Promise &&promise, const char *source, @@ -1730,7 +1728,7 @@ class MessagesManager : public Actor { static constexpr const char *DELETE_MESSAGE_USER_REQUEST_SOURCE = "user request"; - static constexpr bool DROP_UPDATES = false; + static constexpr bool DROP_SEND_MESSAGE_UPDATES = false; static FullMessageId get_full_message_id(const tl_object_ptr &message_ptr, bool is_scheduled); @@ -1828,8 +1826,6 @@ class MessagesManager : public Actor { bool can_set_game_score(DialogId dialog_id, const Message *m) const; - void process_update(tl_object_ptr &&update); - void process_channel_update(tl_object_ptr &&update); void on_message_edited(FullMessageId full_message_id, int32 pts); @@ -2062,7 +2058,7 @@ class MessagesManager : public Actor { static void set_message_id(unique_ptr &message, MessageId message_id); - bool is_allowed_useless_update(const tl_object_ptr &update) const; + static bool is_allowed_useless_update(const tl_object_ptr &update); bool is_message_auto_read(DialogId dialog_id, bool is_outgoing) const; @@ -2789,18 +2785,6 @@ class MessagesManager : public Actor { void load_notification_settings(); - void set_get_difference_timeout(double timeout); - - void skip_old_pending_update(tl_object_ptr &&update, int32 new_pts, int32 old_pts, - int32 pts_count, const char *source); - - void process_pending_updates(); - - void drop_pending_updates(); - - void postpone_pts_update(tl_object_ptr &&update, int32 pts, int32 pts_count, - Promise &&promise); - static string get_channel_pts_key(DialogId dialog_id); int32 load_channel_pts(DialogId dialog_id) const; @@ -3013,10 +2997,7 @@ class MessagesManager : public Actor { std::shared_ptr upload_thumbnail_callback_; std::shared_ptr upload_dialog_photo_callback_; - int32 accumulated_pts_count_ = 0; - int32 accumulated_pts_ = -1; - Timeout pts_gap_timeout_; - double last_pts_jump_warning_time_ = 0; + double last_channel_pts_jump_warning_time_ = 0; std::unordered_map, FileIdHash> being_uploaded_files_; // file_id -> message, thumbnail_file_id @@ -3115,8 +3096,6 @@ class MessagesManager : public Actor { bool running_get_difference_ = false; // true after before_get_difference and false after after_get_difference std::unordered_map, DialogIdHash> dialogs_; - std::multimap pending_pts_updates_; - std::multimap postponed_pts_updates_; std::unordered_set loaded_dialogs_; // dialogs loaded from database, but not added to dialogs_ diff --git a/td/telegram/UpdatesManager.cpp b/td/telegram/UpdatesManager.cpp index f65547218..48e275cad 100644 --- a/td/telegram/UpdatesManager.cpp +++ b/td/telegram/UpdatesManager.cpp @@ -16,6 +16,7 @@ #include "td/telegram/ChannelId.h" #include "td/telegram/ChatId.h" #include "td/telegram/ConfigManager.h" +#include "td/telegram/ConfigShared.h" #include "td/telegram/ContactsManager.h" #include "td/telegram/DialogAction.h" #include "td/telegram/DialogId.h" @@ -188,7 +189,7 @@ void UpdatesManager::fill_pts_gap(void *td) { auto td_ptr = static_cast(td); string source = PSTRING() << "pts from " << td_ptr->updates_manager_->get_pts() << " to " - << td_ptr->messages_manager_->get_min_pending_pts(); + << td_ptr->updates_manager_->get_min_pending_pts(); fill_gap(td, source.c_str()); } @@ -270,7 +271,10 @@ void UpdatesManager::before_get_difference(bool is_initial) { // may be called many times before after_get_difference is called send_closure(G()->state_manager(), &StateManager::on_synchronized, false); - td_->messages_manager_->before_get_difference(); + postponed_pts_updates_.insert(std::make_move_iterator(pending_pts_updates_.begin()), + std::make_move_iterator(pending_pts_updates_.end())); + + drop_pending_pts_updates(); send_closure_later(td_->notification_manager_actor_, &NotificationManager::before_get_difference); } @@ -1160,7 +1164,7 @@ void UpdatesManager::process_get_difference_updates( // process updateReadHistoryInbox before new messages if (constructor_id == telegram_api::updateReadHistoryInbox::ID) { static_cast(update.get())->still_unread_count_ = -1; - td_->messages_manager_->process_pts_update(std::move(update)); + process_pts_update(std::move(update)); CHECK(!running_get_difference_); } */ @@ -1340,6 +1344,21 @@ void UpdatesManager::after_get_difference() { VLOG(get_difference) << "Finish to apply " << total_update_count << " postponed updates"; } + if (postponed_pts_updates_.size()) { // must be before td_->messages_manager_->after_get_difference() + auto postponed_updates = std::move(postponed_pts_updates_); + postponed_pts_updates_.clear(); + + LOG(INFO) << "Begin to apply " << postponed_updates.size() << " postponed pts updates"; + for (auto &postponed_update : postponed_updates) { + auto &update = postponed_update.second; + add_pending_pts_update(std::move(update.update), update.pts, update.pts_count, std::move(update.promise), + "after get difference"); + CHECK(!running_get_difference_); + } + LOG(INFO) << "Finish to apply postponed pts updates, have " << postponed_pts_updates_.size() + << " left postponed updates"; + } + td_->animations_manager_->after_get_difference(); td_->contacts_manager_->after_get_difference(); td_->inline_queries_manager_->after_get_difference(); @@ -1385,7 +1404,7 @@ void UpdatesManager::on_pending_updates(vector> Promise &&promise) { tl_object_ptr update_pts_changed; - MultiPromiseActorSafe mpas{"OnPendingUpdatesMultiPromiseActor"}; + MultiPromiseActorSafe mpas{"OnProcessUpdatesMultiPromiseActor"}; mpas.add_promise(std::move(promise)); auto lock = mpas.get_promise(); @@ -1678,7 +1697,7 @@ void UpdatesManager::process_updates(vector> static_cast(update.get())->still_unread_count_ = -1; } - td_->messages_manager_->process_pts_update(std::move(update)); + process_pts_update(std::move(update)); } if (update != nullptr && is_qts_update(update.get())) { process_qts_update(std::move(update), 0, mpas.get_promise()); @@ -1705,6 +1724,161 @@ void UpdatesManager::process_updates(vector> lock.set_value(Unit()); } +int32 UpdatesManager::get_min_pending_pts() const { + int32 result = std::numeric_limits::max(); + if (!pending_pts_updates_.empty()) { + auto pts = pending_pts_updates_.begin()->first; + if (pts < result) { + result = pts; + } + } + if (!postponed_pts_updates_.empty()) { + auto pts = postponed_pts_updates_.begin()->first; + if (pts < result) { + result = pts; + } + } + return result; +} + +void UpdatesManager::process_pts_update(tl_object_ptr &&update) { + CHECK(update != nullptr); + + // TODO need to save all updates that can change result of running queries not associated with pts (for example + // getHistory) and apply the updates to results of the queries + + if (!check_pts_update(update)) { + LOG(ERROR) << "Receive wrong pts update: " << oneline(to_string(update)); + return; + } + + // must be called only during getDifference + CHECK(pending_pts_updates_.empty()); + CHECK(accumulated_pts_ == -1); + + td_->messages_manager_->process_pts_update(std::move(update)); +} + +void UpdatesManager::add_pending_pts_update(tl_object_ptr &&update, int32 new_pts, + int32 pts_count, Promise &&promise, const char *source) { + // do not try to run getDifference from this function + CHECK(update != nullptr); + CHECK(source != nullptr); + LOG(INFO) << "Receive from " << source << " pending " << to_string(update); + if (pts_count < 0 || new_pts <= pts_count) { + LOG(ERROR) << "Receive update with wrong pts = " << new_pts << " or pts_count = " << pts_count << " from " << source + << ": " << oneline(to_string(update)); + return promise.set_value(Unit()); + } + + // TODO need to save all updates that can change result of running queries not associated with pts (for example + // getHistory) and apply them to result of this queries + + if (!check_pts_update(update)) { + LOG(ERROR) << "Receive wrong pts update from " << source << ": " << oneline(to_string(update)); + return promise.set_value(Unit()); + } + + if (DROP_PTS_UPDATES) { + set_pts_gap_timeout(1.0); + return promise.set_value(Unit()); + } + + int32 old_pts = get_pts(); + if (new_pts < old_pts - 99 && Slice(source) != "after get difference") { + bool need_restore_pts = new_pts < old_pts - 19999; + auto now = Time::now(); + if (now > last_pts_jump_warning_time_ + 1 && (need_restore_pts || now < last_pts_jump_warning_time_ + 5)) { + LOG(ERROR) << "Restore pts after delete_first_messages from " << old_pts << " to " << new_pts + << " is disabled, pts_count = " << pts_count << ", update is from " << source << ": " + << oneline(to_string(update)); + last_pts_jump_warning_time_ = now; + } + if (need_restore_pts) { + set_pts_gap_timeout(0.001); + + /* + LOG(WARNING) << "Restore pts after delete_first_messages"; + set_pts(new_pts - 1, "restore pts after delete_first_messages"); + old_pts = get_pts(); + CHECK(old_pts == new_pts - 1); + */ + } + } + + if (new_pts <= old_pts || (old_pts >= 1 && new_pts > old_pts + 500000000)) { + td_->messages_manager_->skip_old_pending_pts_update(std::move(update), new_pts, old_pts, pts_count, source); + return promise.set_value(Unit()); + } + + if (running_get_difference_ || !postponed_pts_updates_.empty()) { + LOG(INFO) << "Save pending update got while running getDifference from " << source; + if (running_get_difference_) { + CHECK(update->get_id() == dummyUpdate::ID || update->get_id() == updateSentMessage::ID); + } + postpone_pts_update(std::move(update), new_pts, pts_count, std::move(promise)); + return; + } + + if (old_pts + pts_count > new_pts) { + LOG(WARNING) << "Have old_pts (= " << old_pts << ") + pts_count (= " << pts_count << ") > new_pts (= " << new_pts + << "). Logged in " << G()->shared_config().get_option_integer("authorization_date") << ". Update from " + << source << " = " << oneline(to_string(update)); + postpone_pts_update(std::move(update), new_pts, pts_count, std::move(promise)); + set_pts_gap_timeout(0.001); + return; + } + + accumulated_pts_count_ += pts_count; + if (new_pts > accumulated_pts_) { + accumulated_pts_ = new_pts; + } + + if (old_pts + accumulated_pts_count_ > accumulated_pts_) { + LOG(WARNING) << "Have old_pts (= " << old_pts << ") + accumulated_pts_count (= " << accumulated_pts_count_ + << ") > accumulated_pts (= " << accumulated_pts_ << "). new_pts = " << new_pts + << ", pts_count = " << pts_count << ". Logged in " + << G()->shared_config().get_option_integer("authorization_date") << ". Update from " << source << " = " + << oneline(to_string(update)); + postpone_pts_update(std::move(update), new_pts, pts_count, std::move(promise)); + set_pts_gap_timeout(0.001); + return; + } + + LOG_IF(INFO, pts_count == 0 && update->get_id() != dummyUpdate::ID) << "Skip useless update " << to_string(update); + + if (pending_pts_updates_.empty() && old_pts + accumulated_pts_count_ == accumulated_pts_ && + !pts_gap_timeout_.has_timeout()) { + if (pts_count > 0) { + td_->messages_manager_->process_pts_update(std::move(update)); + + set_pts(accumulated_pts_, "process pending updates fast path") + .set_value(Unit()); // TODO can't set until get messages really stored on persistent storage + accumulated_pts_count_ = 0; + accumulated_pts_ = -1; + } + promise.set_value(Unit()); + return; + } + + pending_pts_updates_.emplace(new_pts, PendingPtsUpdate(std::move(update), new_pts, pts_count, std::move(promise))); + + if (old_pts + accumulated_pts_count_ < accumulated_pts_) { + set_pts_gap_timeout(MAX_UNFILLED_GAP_TIME); + return; + } + + CHECK(old_pts + accumulated_pts_count_ == accumulated_pts_); + if (!pending_pts_updates_.empty()) { + process_pending_pts_updates(); + } +} + +void UpdatesManager::postpone_pts_update(tl_object_ptr &&update, int32 pts, int32 pts_count, + Promise &&promise) { + postponed_pts_updates_.emplace(pts, PendingPtsUpdate(std::move(update), pts, pts_count, std::move(promise))); +} + void UpdatesManager::process_seq_updates(int32 seq_end, int32 date, vector> &&updates, Promise &&promise) { @@ -1752,6 +1926,24 @@ void UpdatesManager::process_qts_update(tl_object_ptr &&up promise.set_value(Unit()); } +void UpdatesManager::process_pending_pts_updates() { + for (auto &update : pending_pts_updates_) { + td_->messages_manager_->process_pts_update(std::move(update.second.update)); + update.second.promise.set_value(Unit()); + } + + set_pts(accumulated_pts_, "process pending updates") + .set_value(Unit()); // TODO can't set until get messages really stored on persistent storage + drop_pending_pts_updates(); +} + +void UpdatesManager::drop_pending_pts_updates() { + accumulated_pts_count_ = 0; + accumulated_pts_ = -1; + pts_gap_timeout_.cancel_timeout(); + pending_pts_updates_.clear(); +} + void UpdatesManager::process_pending_seq_updates() { if (!pending_seq_updates_.empty()) { LOG(DEBUG) << "Trying to process " << pending_seq_updates_.size() << " pending seq updates"; @@ -1815,6 +2007,14 @@ void UpdatesManager::process_pending_qts_updates() { } } +void UpdatesManager::set_pts_gap_timeout(double timeout) { + if (!pts_gap_timeout_.has_timeout()) { + pts_gap_timeout_.set_callback(std::move(fill_pts_gap)); + pts_gap_timeout_.set_callback_data(static_cast(td_)); + pts_gap_timeout_.set_timeout_in(timeout); + } +} + void UpdatesManager::set_seq_gap_timeout(double timeout) { if (!seq_gap_timeout_.has_timeout()) { seq_gap_timeout_.set_callback(std::move(fill_seq_gap)); @@ -1841,8 +2041,7 @@ void UpdatesManager::on_pending_update(tl_object_ptr updat void UpdatesManager::on_update(tl_object_ptr update, Promise &&promise) { int new_pts = update->pts_; int pts_count = update->pts_count_; - td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, std::move(promise), - "updateNewMessage"); + add_pending_pts_update(std::move(update), new_pts, pts_count, std::move(promise), "updateNewMessage"); } void UpdatesManager::on_update(tl_object_ptr update, Promise &&promise) { @@ -1861,42 +2060,36 @@ void UpdatesManager::on_update(tl_object_ptr &&promise) { int new_pts = update->pts_; int pts_count = update->pts_count_; - td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, std::move(promise), - "updateReadMessagesContents"); + add_pending_pts_update(std::move(update), new_pts, pts_count, std::move(promise), "updateReadMessagesContents"); } void UpdatesManager::on_update(tl_object_ptr update, Promise &&promise) { int new_pts = update->pts_; int pts_count = update->pts_count_; - td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, std::move(promise), - "updateEditMessage"); + add_pending_pts_update(std::move(update), new_pts, pts_count, std::move(promise), "updateEditMessage"); } void UpdatesManager::on_update(tl_object_ptr update, Promise &&promise) { int new_pts = update->pts_; int pts_count = update->pts_count_; if (update->messages_.empty()) { - td_->messages_manager_->add_pending_update(make_tl_object(), new_pts, pts_count, Promise(), - "updateDeleteMessages"); + add_pending_pts_update(make_tl_object(), new_pts, pts_count, Promise(), "updateDeleteMessages"); promise.set_value(Unit()); } else { - td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, std::move(promise), - "updateDeleteMessages"); + add_pending_pts_update(std::move(update), new_pts, pts_count, std::move(promise), "updateDeleteMessages"); } } void UpdatesManager::on_update(tl_object_ptr update, Promise &&promise) { int new_pts = update->pts_; int pts_count = update->pts_count_; - td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, std::move(promise), - "updateReadHistoryInbox"); + add_pending_pts_update(std::move(update), new_pts, pts_count, std::move(promise), "updateReadHistoryInbox"); } void UpdatesManager::on_update(tl_object_ptr update, Promise &&promise) { int new_pts = update->pts_; int pts_count = update->pts_count_; - td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, std::move(promise), - "updateReadHistoryOutbox"); + add_pending_pts_update(std::move(update), new_pts, pts_count, std::move(promise), "updateReadHistoryOutbox"); } void UpdatesManager::on_update(tl_object_ptr update, Promise &&promise) { @@ -2008,8 +2201,7 @@ void UpdatesManager::on_update(tl_object_ptr update, Promise &&promise) { int new_pts = update->pts_; int pts_count = update->pts_count_; - td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, std::move(promise), - "updatePinnedMessages"); + add_pending_pts_update(std::move(update), new_pts, pts_count, std::move(promise), "updatePinnedMessages"); } void UpdatesManager::on_update(tl_object_ptr update, @@ -2063,8 +2255,8 @@ void UpdatesManager::on_update(tl_object_ptr up void UpdatesManager::on_update(tl_object_ptr update, Promise &&promise) { td_->web_pages_manager_->on_get_web_page(std::move(update->webpage_), DialogId()); - td_->messages_manager_->add_pending_update(make_tl_object(), update->pts_, update->pts_count_, - Promise(), "updateWebPage"); + add_pending_pts_update(make_tl_object(), update->pts_, update->pts_count_, Promise(), + "updateWebPage"); promise.set_value(Unit()); } @@ -2084,8 +2276,8 @@ void UpdatesManager::on_update(tl_object_ptr up } if (update->pts_ > 0) { - td_->messages_manager_->add_pending_update(make_tl_object(), update->pts_, update->pts_count_, - Promise(), "updateFolderPeers"); + add_pending_pts_update(make_tl_object(), update->pts_, update->pts_count_, Promise(), + "updateFolderPeers"); } promise.set_value(Unit()); } diff --git a/td/telegram/UpdatesManager.h b/td/telegram/UpdatesManager.h index fb32f3fa5..4fc3c8be5 100644 --- a/td/telegram/UpdatesManager.h +++ b/td/telegram/UpdatesManager.h @@ -40,6 +40,9 @@ class UpdatesManager : public Actor { void on_get_difference(tl_object_ptr &&difference_ptr); + void add_pending_pts_update(tl_object_ptr &&update, int32 new_pts, int32 pts_count, + Promise &&promise, const char *source); + static std::unordered_set get_sent_messages_random_ids(const telegram_api::Updates *updates_ptr); static vector *> get_new_messages( @@ -63,8 +66,6 @@ class UpdatesManager : public Actor { void on_server_pong(tl_object_ptr &&state); - static bool check_pts_update(const tl_object_ptr &update); - int32 get_pts() const { return pts_manager_.mem_pts(); } @@ -77,20 +78,30 @@ class UpdatesManager : public Actor { Promise<> set_pts(int32 pts, const char *source) TD_WARN_UNUSED_RESULT; - static const double MAX_UNFILLED_GAP_TIME; - - static void fill_pts_gap(void *td); - bool running_get_difference() const { return running_get_difference_; } private: static constexpr int32 FORCED_GET_DIFFERENCE_PTS_DIFF = 100000; + static const double MAX_UNFILLED_GAP_TIME; + static constexpr bool DROP_PTS_UPDATES = false; friend class OnUpdate; - class PendingUpdates { + class PendingPtsUpdate { + public: + tl_object_ptr update; + int32 pts; + int32 pts_count; + Promise promise; + + PendingPtsUpdate(tl_object_ptr &&update, int32 pts, int32 pts_count, Promise &&promise) + : update(std::move(update)), pts(pts), pts_count(pts_count), promise(std::move(promise)) { + } + }; + + class PendingSeqUpdates { public: int32 seq_begin; int32 seq_end; @@ -98,8 +109,8 @@ class UpdatesManager : public Actor { vector> updates; Promise promise; - PendingUpdates(int32 seq_begin, int32 seq_end, int32 date, vector> &&updates, - Promise &&promise) + PendingSeqUpdates(int32 seq_begin, int32 seq_end, int32 date, vector> &&updates, + Promise &&promise) : seq_begin(seq_begin), seq_end(seq_end), date(date), updates(std::move(updates)), promise(std::move(promise)) { } }; @@ -121,11 +132,20 @@ class UpdatesManager : public Actor { int32 short_update_date_ = 0; - std::multimap postponed_updates_; // updates received during getDifference - std::multimap pending_seq_updates_; // updates with too big seq + int32 accumulated_pts_count_ = 0; + int32 accumulated_pts_ = -1; + double last_pts_jump_warning_time_ = 0; + + std::multimap pending_pts_updates_; + std::multimap postponed_pts_updates_; + + std::multimap postponed_updates_; // updates received during getDifference + std::multimap pending_seq_updates_; // updates with too big seq std::map pending_qts_updates_; // updates with too big qts + Timeout pts_gap_timeout_; + Timeout seq_gap_timeout_; Timeout qts_gap_timeout_; @@ -168,15 +188,26 @@ class UpdatesManager : public Actor { void process_updates(vector> &&updates, bool force_apply, Promise &&promise); + void postpone_pts_update(tl_object_ptr &&update, int32 pts, int32 pts_count, + Promise &&promise); + + void process_pts_update(tl_object_ptr &&update); + void process_seq_updates(int32 seq_end, int32 date, vector> &&updates, Promise &&promise); void process_qts_update(tl_object_ptr &&update_ptr, int32 qts, Promise &&promise); + void process_pending_pts_updates(); + void process_pending_seq_updates(); void process_pending_qts_updates(); + void drop_pending_pts_updates(); + + static void fill_pts_gap(void *td); + static void fill_seq_gap(void *td); static void fill_qts_gap(void *td); @@ -185,6 +216,8 @@ class UpdatesManager : public Actor { static void fill_gap(void *td, const char *source); + void set_pts_gap_timeout(double timeout); + void set_seq_gap_timeout(double timeout); void set_qts_gap_timeout(double timeout); @@ -197,10 +230,14 @@ class UpdatesManager : public Actor { void after_get_difference(); + int32 get_min_pending_pts() const; + static bool have_update_pts_changed(const vector> &updates); static bool check_pts_update_dialog_id(DialogId dialog_id); + static bool check_pts_update(const tl_object_ptr &update); + static bool is_pts_update(const telegram_api::Update *update); static int32 get_update_pts(const telegram_api::Update *update);