From 241718eae8451d9053d013b81322d3b02e7eb13b Mon Sep 17 00:00:00 2001 From: levlam Date: Fri, 20 Aug 2021 17:21:31 +0300 Subject: [PATCH] Extract pts and qts updates and apply them during getDifference. --- td/telegram/UpdatesManager.cpp | 192 +++++++++++++++++---------------- 1 file changed, 98 insertions(+), 94 deletions(-) diff --git a/td/telegram/UpdatesManager.cpp b/td/telegram/UpdatesManager.cpp index d753993da..95f327dc8 100644 --- a/td/telegram/UpdatesManager.cpp +++ b/td/telegram/UpdatesManager.cpp @@ -1338,15 +1338,8 @@ void UpdatesManager::on_get_difference(tl_object_ptr(difference_ptr); set_date(difference->date_, false, "on_get_difference_empty"); seq_ = difference->seq_; - if (!pending_seq_updates_.empty()) { - LOG(WARNING) << "Drop " << pending_seq_updates_.size() << " pending seq updates after receive empty difference"; - auto pending_seq_updates = std::move(pending_seq_updates_); - pending_seq_updates_.clear(); - for (auto &pending_update : pending_seq_updates) { - pending_update.second.promise.set_value(Unit()); - } - } + process_pending_qts_updates(); if (!pending_qts_updates_.empty()) { LOG(WARNING) << "Drop " << pending_qts_updates_.size() << " pending qts updates after receive empty difference"; auto pending_qts_updates = std::move(pending_qts_updates_); @@ -1359,6 +1352,17 @@ void UpdatesManager::on_get_difference(tl_object_ptrget_id(); - const tl_object_ptr *message_ptr = nullptr; - int32 pts = 0; - if (id == telegram_api::updateNewChannelMessage::ID) { - auto update_new_channel_message = static_cast(update.get()); - message_ptr = &update_new_channel_message->message_; - pts = update_new_channel_message->pts_; - } - if (id == telegram_api::updateEditChannelMessage::ID) { - auto update_edit_channel_message = static_cast(update.get()); - message_ptr = &update_edit_channel_message->message_; - pts = update_edit_channel_message->pts_; - } - - // for channels we can try to replace unacceptable update with updateChannelTooLong - if (message_ptr != nullptr) { - auto dialog_id = td_->messages_manager_->get_message_dialog_id(*message_ptr); - if (dialog_id.get_type() == DialogType::Channel) { - auto channel_id = dialog_id.get_channel_id(); - if (td_->contacts_manager_->have_channel_force(channel_id)) { - if (td_->messages_manager_->is_old_channel_update(dialog_id, pts)) { - // the update will be ignored anyway, so there is no reason to replace it or force get_difference - LOG(INFO) << "Allow an outdated unacceptable update from " << source; - continue; - } - if ((*message_ptr)->get_id() != telegram_api::messageService::ID) { - // don't replace service messages, because they can be about bot's kicking - LOG(INFO) << "Replace update about new message with updateChannelTooLong in " << dialog_id; - update = telegram_api::make_object( - telegram_api::updateChannelTooLong::PTS_MASK, channel_id.get(), pts); - continue; - } - } - } else { - LOG(ERROR) << "Update is not from a channel: " << to_string(update); + if (!is_acceptable_update(update.get())) { + CHECK(update != nullptr); + int32 id = update->get_id(); + const tl_object_ptr *message_ptr = nullptr; + int32 pts = 0; + if (id == telegram_api::updateNewChannelMessage::ID) { + auto update_new_channel_message = static_cast(update.get()); + message_ptr = &update_new_channel_message->message_; + pts = update_new_channel_message->pts_; } + if (id == telegram_api::updateEditChannelMessage::ID) { + auto update_edit_channel_message = static_cast(update.get()); + message_ptr = &update_edit_channel_message->message_; + pts = update_edit_channel_message->pts_; + } + + // for channels we can try to replace unacceptable update with updateChannelTooLong + if (message_ptr != nullptr) { + auto dialog_id = td_->messages_manager_->get_message_dialog_id(*message_ptr); + if (dialog_id.get_type() == DialogType::Channel) { + auto channel_id = dialog_id.get_channel_id(); + if (td_->contacts_manager_->have_channel_force(channel_id)) { + if (td_->messages_manager_->is_old_channel_update(dialog_id, pts)) { + // the update will be ignored anyway, so there is no reason to replace it or force get_difference + LOG(INFO) << "Allow an outdated unacceptable update from " << source; + continue; + } + if ((*message_ptr)->get_id() != telegram_api::messageService::ID) { + // don't replace service messages, because they can be about bot's kicking + LOG(INFO) << "Replace update about new message with updateChannelTooLong in " << dialog_id; + update = telegram_api::make_object( + telegram_api::updateChannelTooLong::PTS_MASK, channel_id.get(), pts); + continue; + } + } + } else { + LOG(ERROR) << "Update is not from a channel: " << to_string(update); + } + } + + get_difference("on unacceptable updates in on_pending_updates"); + return promise.set_value(Unit()); } - - get_difference("on unacceptable updates in on_pending_updates"); - return promise.set_value(Unit()); } - } - if (date > 0 && updates.size() == 1 && updates[0] != nullptr && - updates[0]->get_id() == telegram_api::updateReadHistoryOutbox::ID) { - auto update = static_cast(updates[0].get()); - DialogId dialog_id(update->peer_); - if (dialog_id.get_type() == DialogType::User) { - auto user_id = dialog_id.get_user_id(); - if (user_id.is_valid()) { - td_->contacts_manager_->on_update_user_local_was_online(user_id, date); + if (date > 0 && updates.size() == 1 && updates[0] != nullptr && + updates[0]->get_id() == telegram_api::updateReadHistoryOutbox::ID) { + auto update = static_cast(updates[0].get()); + DialogId dialog_id(update->peer_); + if (dialog_id.get_type() == DialogType::User) { + auto user_id = dialog_id.get_user_id(); + if (user_id.is_valid()) { + td_->contacts_manager_->on_update_user_local_was_online(user_id, date); + } } } } @@ -1659,7 +1647,9 @@ void UpdatesManager::on_pending_updates(vector(update), mpas.get_promise()); update = nullptr; } - CHECK(!running_get_difference_); + CHECK(need_postpone || !running_get_difference_); } } for (auto &update : updates) { if (update != nullptr) { - if (is_pts_update(update.get()) || is_qts_update(update.get())) { + if (is_pts_update(update.get())) { + if (running_get_difference_) { + auto pts = get_update_pts(update.get()); + if (pts != 0 && (min_postponed_update_pts_ == 0 || pts < min_postponed_update_pts_)) { + min_postponed_update_pts_ = pts; + } + } + downcast_call(*update, OnUpdate(this, update, mpas.get_promise())); + update = nullptr; + } else if (is_qts_update(update.get())) { + if (running_get_difference_) { + auto qts = get_update_qts(update.get()); + if (qts != 0 && (min_postponed_update_qts_ == 0 || qts < min_postponed_update_qts_)) { + min_postponed_update_qts_ = qts; + } + } downcast_call(*update, OnUpdate(this, update, mpas.get_promise())); update = nullptr; } } } - if (running_get_difference_) { - LOG(ERROR) << "Postpone " << updates.size() << " updates [" << seq_begin << ", " << seq_end - << "] with date = " << date << " from " << source; - postponed_updates_.emplace( - seq_begin, PendingSeqUpdates(seq_begin, seq_end, date, receive_time, std::move(updates), mpas.get_promise())); - return lock.set_value(Unit()); - } - if (seq_begin == 0 && seq_end == 0) { bool have_updates = false; for (auto &update : updates) { @@ -1727,11 +1724,22 @@ void UpdatesManager::on_pending_updates(vector return; } - CHECK(!running_get_difference_); - - if (qts - 1 > old_qts && old_qts > 0) { + if (running_get_difference_ || (qts - 1 > old_qts && old_qts > 0)) { LOG(INFO) << "Postpone update with qts = " << qts; - if (pending_qts_updates_.empty()) { + if (!running_get_difference_ && pending_qts_updates_.empty()) { set_qts_gap_timeout(MAX_UNFILLED_GAP_TIME); } auto &pending_update = pending_qts_updates_[qts]; @@ -1976,9 +1982,6 @@ void UpdatesManager::add_pending_pts_update(tl_object_ptr if (running_get_difference_ || !postponed_pts_updates_.empty()) { LOG(INFO) << "Save pending update got while running getDifference from " << source; - if (running_get_difference_) { - CHECK(update->get_id() == dummyUpdate::ID || update->get_id() == updateSentMessage::ID); - } postpone_pts_update(std::move(update), new_pts, pts_count, receive_time, std::move(promise)); return; } @@ -2360,6 +2363,7 @@ void UpdatesManager::process_pending_qts_updates() { } set_qts_gap_timeout(receive_time + MAX_UNFILLED_GAP_TIME - Time::now()); } + CHECK(!running_get_difference_); } void UpdatesManager::set_pts_gap_timeout(double timeout) {