From e9d3b4881e8c9a8d2d57a11746339d3dbb8a40db Mon Sep 17 00:00:00 2001 From: levlam Date: Mon, 3 Aug 2020 16:57:30 +0300 Subject: [PATCH] Support gaps in qts updates. GitOrigin-RevId: afcae4aa4ac456f5b8d8b2e46b92126a606bdca9 --- td/telegram/UpdatesManager.cpp | 130 ++++++++++++++++++++++++--------- td/telegram/UpdatesManager.h | 18 ++++- 2 files changed, 111 insertions(+), 37 deletions(-) diff --git a/td/telegram/UpdatesManager.cpp b/td/telegram/UpdatesManager.cpp index 332f369e3..d187111c8 100644 --- a/td/telegram/UpdatesManager.cpp +++ b/td/telegram/UpdatesManager.cpp @@ -182,6 +182,10 @@ void UpdatesManager::fill_seq_gap(void *td) { fill_gap(td, "seq"); } +void UpdatesManager::fill_qts_gap(void *td) { + fill_gap(td, "qts"); +} + void UpdatesManager::fill_get_difference_gap(void *td) { fill_gap(td, "getDifference"); } @@ -298,22 +302,6 @@ Promise<> UpdatesManager::set_pts(int32 pts, const char *source) { return result; } -Promise<> UpdatesManager::set_qts(int32 qts) { - Promise<> result; - if (qts > get_qts() || (0 < qts && qts < get_qts() - 399999)) { // qts can only go up or drop cardinally - if (qts < get_qts() - 399999) { - LOG(WARNING) << "Qts decreases from " << get_qts() << " to " << qts; - } else { - LOG(INFO) << "Update qts from " << get_qts() << " to " << qts; - } - - result = add_qts(qts); - } else if (qts < get_qts()) { - LOG(ERROR) << "Receive wrong qts = " << qts << " less than current qts = " << get_qts(); - } - return result; -} - void UpdatesManager::set_date(int32 date, bool from_update, string date_source) { if (date > date_) { LOG(INFO) << "Update date to " << date; @@ -811,7 +799,7 @@ void UpdatesManager::on_get_updates_state(tl_object_ptrpts_, full_source.c_str()).set_value(Unit()); set_date(state->date_, false, std::move(full_source)); - // set_qts(state->qts_).set_value(Unit()); + add_qts(state->qts_).set_value(Unit()); seq_ = state->seq_; } @@ -993,7 +981,7 @@ void UpdatesManager::on_server_pong(tl_object_ptr & void UpdatesManager::process_get_difference_updates( vector> &&new_messages, - vector> &&new_encrypted_messages, int32 qts, + vector> &&new_encrypted_messages, vector> &&other_updates) { VLOG(get_difference) << "In get difference receive " << new_messages.size() << " messages, " << new_encrypted_messages.size() << " encrypted messages and " << other_updates.size() @@ -1038,8 +1026,6 @@ void UpdatesManager::process_get_difference_updates( } process_updates(std::move(other_updates), true); - - set_qts(qts).set_value(Unit()); } void UpdatesManager::on_get_difference(tl_object_ptr &&difference_ptr) { @@ -1068,7 +1054,7 @@ void UpdatesManager::on_get_difference(tl_object_ptrcontacts_manager_->on_get_chats(std::move(difference->chats_), "updates.difference"); process_get_difference_updates(std::move(difference->new_messages_), - std::move(difference->new_encrypted_messages_), difference->state_->qts_, + std::move(difference->new_encrypted_messages_), std::move(difference->other_updates_)); if (running_get_difference_) { LOG(ERROR) << "Get difference has run while processing get difference updates"; @@ -1092,7 +1078,7 @@ void UpdatesManager::on_get_difference(tl_object_ptrnew_messages_), std::move(difference->new_encrypted_messages_), - difference->intermediate_state_->qts_, std::move(difference->other_updates_)); + std::move(difference->other_updates_)); if (running_get_difference_) { LOG(ERROR) << "Get difference has run while processing get difference updates"; break; @@ -1315,7 +1301,8 @@ void UpdatesManager::on_pending_updates(vector &&update, int32 qts) { + CHECK(update != nullptr); + if (qts <= 1) { + LOG(ERROR) << "Receive wrong qts " << qts << " in " << oneline(to_string(update)); + return; + } + + int32 old_qts = get_qts(); + if (qts < old_qts - 1000001) { + LOG(WARNING) << "Restore qts after qts overflow from " << old_qts << " to " << qts << " by " + << oneline(to_string(update)); + add_qts(qts - 1).set_value(Unit()); + CHECK(get_qts() == qts - 1); + old_qts = qts - 1; + } + + if (qts <= old_qts) { + LOG(INFO) << "Skip already applied update with qts = " << qts; + return; + } + + CHECK(!running_get_difference_); + + if (qts > old_qts + 1) { + if (pending_qts_updates_.empty()) { + set_qts_gap_timeout(MAX_UNFILLED_GAP_TIME); + } + bool is_inserted = pending_qts_updates_.emplace(qts, std::move(update)).second; + if (!is_inserted) { + LOG(INFO) << "Receive duplicate update with qts = " << qts; + } + return; + } + + process_qts_update(std::move(update), qts); + process_pending_qts_updates(); +} + void UpdatesManager::process_updates(vector> &&updates, bool force_apply) { tl_object_ptr update_pts_changed; /* @@ -1435,6 +1460,22 @@ void UpdatesManager::process_seq_updates(int32 seq_end, int32 date, } } +void UpdatesManager::process_qts_update(tl_object_ptr &&update, int32 qts) { + switch (update->get_id()) { + case telegram_api::updateNewEncryptedMessage::ID: { + auto message = std::move(move_tl_object_as(update)->message_); + send_closure(td_->secret_chats_manager_, &SecretChatsManager::on_new_message, std::move(message), add_qts(qts)); + break; + } + case telegram_api::updateChannelParticipant::ID: + // TODO + break; + default: + UNREACHABLE(); + break; + } +} + void UpdatesManager::process_pending_seq_updates() { while (!pending_seq_updates_.empty() && !running_get_difference_) { auto update_it = pending_seq_updates_.begin(); @@ -1458,6 +1499,27 @@ void UpdatesManager::process_pending_seq_updates() { } } +void UpdatesManager::process_pending_qts_updates() { + if (pending_qts_updates_.empty()) { + return; + } + while (!pending_qts_updates_.empty()) { + CHECK(!running_get_difference_); + auto update_it = pending_qts_updates_.begin(); + auto qts = update_it->first; + if (qts > get_qts() + 1) { + return; + } + if (qts == get_qts() + 1) { + process_qts_update(std::move(update_it->second), qts); + } + pending_qts_updates_.erase(update_it); + } + if (pending_qts_updates_.empty()) { + qts_gap_timeout_.cancel_timeout(); + } +} + void UpdatesManager::set_seq_gap_timeout(double timeout) { if (!seq_gap_timeout_.has_timeout()) { seq_gap_timeout_.set_callback(std::move(fill_seq_gap)); @@ -1466,6 +1528,13 @@ void UpdatesManager::set_seq_gap_timeout(double timeout) { } } +void UpdatesManager::set_qts_gap_timeout(double timeout) { + CHECK(!qts_gap_timeout_.has_timeout()); + qts_gap_timeout_.set_callback(std::move(fill_qts_gap)); + qts_gap_timeout_.set_callback_data(static_cast(td_)); + qts_gap_timeout_.set_timeout_in(timeout); +} + void UpdatesManager::on_pending_update(tl_object_ptr update, int32 seq, const char *source) { vector> updates; updates.push_back(std::move(update)); @@ -1941,19 +2010,12 @@ void UpdatesManager::on_update(tl_object_ptr upd } void UpdatesManager::on_update(tl_object_ptr update, bool force_apply) { - if (!force_apply) { - if (update->qts_ <= get_qts()) { - LOG(INFO) << "Ignore already processed update with qts " << update->qts_; - return; - } - if (update->qts_ != get_qts() + 1) { - // TODO fill gap - return; - } + if (force_apply) { + return process_qts_update(std::move(update), 0); } - send_closure(td_->secret_chats_manager_, &SecretChatsManager::on_new_message, std::move(update->message_), - add_qts(update->qts_)); + auto qts = update->qts_; + add_pending_qts_update(std::move(update), qts); } void UpdatesManager::on_update(tl_object_ptr update, bool /*force_apply*/) { diff --git a/td/telegram/UpdatesManager.h b/td/telegram/UpdatesManager.h index 4185d8c7c..fcb898296 100644 --- a/td/telegram/UpdatesManager.h +++ b/td/telegram/UpdatesManager.h @@ -71,8 +71,6 @@ class UpdatesManager : public Actor { Promise<> set_pts(int32 pts, const char *source) TD_WARN_UNUSED_RESULT; - Promise<> set_qts(int32 qts) TD_WARN_UNUSED_RESULT; - static const double MAX_UNFILLED_GAP_TIME; static void fill_pts_gap(void *td); @@ -112,8 +110,12 @@ class UpdatesManager : public Actor { std::multimap postponed_updates_; // updates received during getDifference std::multimap pending_seq_updates_; // updates with too big seq + std::map> pending_qts_updates_; // updates with too big qts + Timeout seq_gap_timeout_; + Timeout qts_gap_timeout_; + int32 retry_time_ = 1; Timeout retry_timeout_; @@ -139,10 +141,12 @@ class UpdatesManager : public Actor { void process_get_difference_updates(vector> &&new_messages, vector> &&new_encrypted_messages, - int32 qts, vector> &&other_updates); + vector> &&other_updates); void on_pending_update(tl_object_ptr update, int32 seq, const char *source); + void add_pending_qts_update(tl_object_ptr &&update, int32 qts); + void on_pending_updates(vector> &&updates, int32 seq_begin, int32 seq_end, int32 date, const char *source); @@ -150,16 +154,24 @@ class UpdatesManager : public Actor { void process_seq_updates(int32 seq_end, int32 date, vector> &&updates); + void process_qts_update(tl_object_ptr &&update, int32 qts); + void process_pending_seq_updates(); + void process_pending_qts_updates(); + static void fill_seq_gap(void *td); + static void fill_qts_gap(void *td); + static void fill_get_difference_gap(void *td); static void fill_gap(void *td, const char *source); void set_seq_gap_timeout(double timeout); + void set_qts_gap_timeout(double timeout); + void on_failed_get_difference(); void before_get_difference(bool is_initial);