From 9a5872fe27a3b8ca386bb50165f46e83b456753c Mon Sep 17 00:00:00 2001 From: levlam Date: Sun, 15 Aug 2021 12:46:38 +0300 Subject: [PATCH] Use update receive time to calculate proper gap time. --- td/telegram/MessagesManager.cpp | 16 ++--- td/telegram/UpdatesManager.cpp | 101 ++++++++++++++++++++------------ td/telegram/UpdatesManager.h | 29 ++++++--- 3 files changed, 94 insertions(+), 52 deletions(-) diff --git a/td/telegram/MessagesManager.cpp b/td/telegram/MessagesManager.cpp index 8a1cf02ef..7c1301acb 100644 --- a/td/telegram/MessagesManager.cpp +++ b/td/telegram/MessagesManager.cpp @@ -660,7 +660,7 @@ class UnpinAllMessagesQuery final : public Td::ResultHandler { std::move(promise), "unpin all messages"); } else { td->updates_manager_->add_pending_pts_update(make_tl_object(), affected_history->pts_, - affected_history->pts_count_, std::move(promise), + affected_history->pts_count_, Time::now(), std::move(promise), "unpin all messages"); } } else if (affected_history->offset_ <= 0) { @@ -1812,7 +1812,7 @@ class ReadMessagesContentsQuery final : public Td::ResultHandler { if (affected_messages->pts_count_ > 0) { td->updates_manager_->add_pending_pts_update(make_tl_object(), affected_messages->pts_, - affected_messages->pts_count_, Promise(), + affected_messages->pts_count_, Time::now(), Promise(), "read messages content query"); } @@ -2034,7 +2034,7 @@ class ReadHistoryQuery final : public Td::ResultHandler { if (affected_messages->pts_count_ > 0) { td->updates_manager_->add_pending_pts_update(make_tl_object(), affected_messages->pts_, - affected_messages->pts_count_, Promise(), + affected_messages->pts_count_, Time::now(), Promise(), "read history query"); } @@ -2504,7 +2504,7 @@ class DeleteHistoryQuery final : public Td::ResultHandler { if (affected_history->pts_count_ > 0) { td->updates_manager_->add_pending_pts_update(make_tl_object(), affected_history->pts_, - affected_history->pts_count_, Promise(), + affected_history->pts_count_, Time::now(), Promise(), "delete history query"); } @@ -2602,7 +2602,7 @@ class DeletePhoneCallHistoryQuery final : public Td::ResultHandler { auto pts_count = affected_messages->pts_count_; auto update = make_tl_object(std::move(affected_messages->messages_), pts, pts_count); - td->updates_manager_->add_pending_pts_update(std::move(update), pts, pts_count, std::move(promise), + td->updates_manager_->add_pending_pts_update(std::move(update), pts, pts_count, Time::now(), std::move(promise), "delete phone call history query"); } else if (affected_messages->offset_ <= 0) { promise_.set_value(Unit()); @@ -2759,7 +2759,7 @@ class ReadMentionsQuery final : public Td::ResultHandler { td->updates_manager_->get_difference("Wrong messages_readMentions result"); } else { td->updates_manager_->add_pending_pts_update(make_tl_object(), affected_history->pts_, - affected_history->pts_count_, Promise(), + affected_history->pts_count_, Time::now(), Promise(), "read all mentions query"); } } @@ -2899,7 +2899,7 @@ class SendMessageActor final : public NetActorOnce { } td->updates_manager_->add_pending_pts_update(std::move(update), sent_message->pts_, sent_message->pts_count_, - Promise(), "send message actor"); + Time::now(), Promise(), "send message actor"); } void on_error(uint64 id, Status status) final { @@ -3914,7 +3914,7 @@ class DeleteMessagesQuery final : public Td::ResultHandler { if (affected_messages->pts_count_ > 0) { td->updates_manager_->add_pending_pts_update(make_tl_object(), affected_messages->pts_, - affected_messages->pts_count_, Promise(), + affected_messages->pts_count_, Time::now(), Promise(), "delete messages query"); } if (--query_count_ == 0) { diff --git a/td/telegram/UpdatesManager.cpp b/td/telegram/UpdatesManager.cpp index ea4cae858..25ef5c974 100644 --- a/td/telegram/UpdatesManager.cpp +++ b/td/telegram/UpdatesManager.cpp @@ -854,7 +854,7 @@ void UpdatesManager::on_get_updates(tl_object_ptr &&updat auto updates = move_tl_object_as(updates_ptr); td_->contacts_manager_->on_get_users(std::move(updates->users_), "updatesCombined"); td_->contacts_manager_->on_get_chats(std::move(updates->chats_), "updatesCombined"); - on_pending_updates(std::move(updates->updates_), updates->seq_start_, updates->seq_, updates->date_, + on_pending_updates(std::move(updates->updates_), updates->seq_start_, updates->seq_, updates->date_, Time::now(), std::move(promise), "telegram_api::updatesCombined"); break; } @@ -862,8 +862,8 @@ void UpdatesManager::on_get_updates(tl_object_ptr &&updat auto updates = move_tl_object_as(updates_ptr); td_->contacts_manager_->on_get_users(std::move(updates->users_), "updates"); td_->contacts_manager_->on_get_chats(std::move(updates->chats_), "updates"); - on_pending_updates(std::move(updates->updates_), updates->seq_, updates->seq_, updates->date_, std::move(promise), - "telegram_api::updates"); + on_pending_updates(std::move(updates->updates_), updates->seq_, updates->seq_, updates->date_, Time::now(), + std::move(promise), "telegram_api::updates"); break; } case telegram_api::updateShortSentMessage::ID: @@ -1409,11 +1409,12 @@ void UpdatesManager::after_get_difference() { auto updates = std::move(it->second.updates); auto updates_seq_begin = it->second.seq_begin; auto updates_seq_end = it->second.seq_end; + auto receive_time = it->second.receive_time; auto promise = std::move(it->second.promise); // ignore it->second.date, because it may be too old postponed_updates_.erase(it); auto update_count = updates.size(); - on_pending_updates(std::move(updates), updates_seq_begin, updates_seq_end, 0, std::move(promise), + on_pending_updates(std::move(updates), updates_seq_begin, updates_seq_end, 0, receive_time, std::move(promise), "postponed updates"); if (running_get_difference_) { VLOG(get_difference) << "Finish to apply postponed updates with " << postponed_updates_.size() @@ -1433,8 +1434,8 @@ void UpdatesManager::after_get_difference() { LOG(INFO) << "Begin to apply " << postponed_updates.size() << " postponed pts updates"; for (auto &postponed_update : postponed_updates) { auto &update = postponed_update.second; - add_pending_pts_update(std::move(update.update), update.pts, update.pts_count, std::move(update.promise), - "after get difference"); + add_pending_pts_update(std::move(update.update), update.pts, update.pts_count, update.receive_time, + std::move(update.promise), "after get difference"); CHECK(!running_get_difference_); } LOG(INFO) << "Finish to apply postponed pts updates, have " << postponed_pts_updates_.size() @@ -1451,7 +1452,8 @@ void UpdatesManager::after_get_difference() { } void UpdatesManager::on_pending_updates(vector> &&updates, int32 seq_begin, - int32 seq_end, int32 date, Promise &&promise, const char *source) { + int32 seq_end, int32 date, double receive_time, Promise &&promise, + const char *source) { if (get_pts() == -1) { init_state(); } @@ -1485,8 +1487,8 @@ void UpdatesManager::on_pending_updates(vector auto &pending_update = pending_qts_updates_[qts]; if (pending_update.update != nullptr) { LOG(WARNING) << "Receive duplicate update with qts = " << qts; + } else { + pending_update.receive_time = Time::now(); } pending_update.update = std::move(update); pending_update.promises.push_back(std::move(promise)); @@ -1849,7 +1853,8 @@ void UpdatesManager::process_pts_update(tl_object_ptr &&up } void UpdatesManager::add_pending_pts_update(tl_object_ptr &&update, int32 new_pts, - int32 pts_count, Promise &&promise, const char *source) { + int32 pts_count, double receive_time, Promise &&promise, + const char *source) { // do not try to run getDifference from this function CHECK(update != nullptr); CHECK(source != nullptr); @@ -1905,7 +1910,7 @@ void UpdatesManager::add_pending_pts_update(tl_object_ptr if (running_get_difference_) { CHECK(update->get_id() == dummyUpdate::ID || update->get_id() == updateSentMessage::ID); } - postpone_pts_update(std::move(update), new_pts, pts_count, std::move(promise)); + postpone_pts_update(std::move(update), new_pts, pts_count, receive_time, std::move(promise)); return; } @@ -1913,7 +1918,7 @@ void UpdatesManager::add_pending_pts_update(tl_object_ptr LOG(WARNING) << "Have old_pts (= " << old_pts << ") + pts_count (= " << pts_count << ") > new_pts (= " << new_pts << "). Logged in " << G()->shared_config().get_option_integer("authorization_date") << ". Update from " << source << " = " << oneline(to_string(update)); - postpone_pts_update(std::move(update), new_pts, pts_count, std::move(promise)); + postpone_pts_update(std::move(update), new_pts, pts_count, receive_time, std::move(promise)); set_pts_gap_timeout(0.001); return; } @@ -1929,7 +1934,7 @@ void UpdatesManager::add_pending_pts_update(tl_object_ptr << ", pts_count = " << pts_count << ". Logged in " << G()->shared_config().get_option_integer("authorization_date") << ". Update from " << source << " = " << oneline(to_string(update)); - postpone_pts_update(std::move(update), new_pts, pts_count, std::move(promise)); + postpone_pts_update(std::move(update), new_pts, pts_count, receive_time, std::move(promise)); set_pts_gap_timeout(0.001); return; } @@ -1950,10 +1955,11 @@ void UpdatesManager::add_pending_pts_update(tl_object_ptr return; } - pending_pts_updates_.emplace(new_pts, PendingPtsUpdate(std::move(update), new_pts, pts_count, std::move(promise))); + pending_pts_updates_.emplace( + new_pts, PendingPtsUpdate(std::move(update), new_pts, pts_count, receive_time, std::move(promise))); if (old_pts < accumulated_pts_ - accumulated_pts_count_) { - set_pts_gap_timeout(MAX_UNFILLED_GAP_TIME); + set_pts_gap_timeout(receive_time + MAX_UNFILLED_GAP_TIME - Time::now()); last_pts_gap_time_ = Time::now(); return; } @@ -1963,8 +1969,9 @@ void UpdatesManager::add_pending_pts_update(tl_object_ptr } void UpdatesManager::postpone_pts_update(tl_object_ptr &&update, int32 pts, int32 pts_count, - Promise &&promise) { - postponed_pts_updates_.emplace(pts, PendingPtsUpdate(std::move(update), pts, pts_count, std::move(promise))); + double receive_time, Promise &&promise) { + postponed_pts_updates_.emplace(pts, + PendingPtsUpdate(std::move(update), pts, pts_count, receive_time, std::move(promise))); } void UpdatesManager::process_seq_updates(int32 seq_end, int32 date, @@ -2095,7 +2102,15 @@ void UpdatesManager::process_pending_seq_updates() { seq_gap_timeout_.cancel_timeout(); } else { // if after getDifference still have a gap - set_seq_gap_timeout(MAX_UNFILLED_GAP_TIME); + auto update_it = pending_seq_updates_.begin(); + double receive_time = update_it->second.receive_time; + for (size_t i = 0; i < 10; i++) { + if (++update_it == pending_seq_updates_.end()) { + break; + } + receive_time = min(receive_time, update_it->second.receive_time); + } + set_seq_gap_timeout(receive_time + MAX_UNFILLED_GAP_TIME - Time::now()); } } @@ -2127,7 +2142,15 @@ void UpdatesManager::process_pending_qts_updates() { qts_gap_timeout_.cancel_timeout(); } else { // if after getDifference still have a gap - set_qts_gap_timeout(MAX_UNFILLED_GAP_TIME); + auto update_it = pending_qts_updates_.begin(); + double receive_time = update_it->second.receive_time; + for (size_t i = 0; i < 10; i++) { + if (++update_it == pending_qts_updates_.end()) { + break; + } + receive_time = min(receive_time, update_it->second.receive_time); + } + set_qts_gap_timeout(receive_time + MAX_UNFILLED_GAP_TIME - Time::now()); } } @@ -2159,13 +2182,13 @@ void UpdatesManager::on_pending_update(tl_object_ptr updat const char *source) { vector> updates; updates.push_back(std::move(update)); - on_pending_updates(std::move(updates), seq, seq, 0, std::move(promise), source); + on_pending_updates(std::move(updates), seq, seq, 0, Time::now(), std::move(promise), source); } void UpdatesManager::on_update(tl_object_ptr update, Promise &&promise) { int new_pts = update->pts_; int pts_count = update->pts_count_; - add_pending_pts_update(std::move(update), new_pts, pts_count, std::move(promise), "updateNewMessage"); + add_pending_pts_update(std::move(update), new_pts, pts_count, Time::now(), std::move(promise), "updateNewMessage"); } void UpdatesManager::on_update(tl_object_ptr update, Promise &&promise) { @@ -2184,36 +2207,41 @@ void UpdatesManager::on_update(tl_object_ptr &&promise) { int new_pts = update->pts_; int pts_count = update->pts_count_; - add_pending_pts_update(std::move(update), new_pts, pts_count, std::move(promise), "updateReadMessagesContents"); + add_pending_pts_update(std::move(update), new_pts, pts_count, Time::now(), std::move(promise), + "updateReadMessagesContents"); } void UpdatesManager::on_update(tl_object_ptr update, Promise &&promise) { int new_pts = update->pts_; int pts_count = update->pts_count_; - add_pending_pts_update(std::move(update), new_pts, pts_count, std::move(promise), "updateEditMessage"); + add_pending_pts_update(std::move(update), new_pts, pts_count, Time::now(), std::move(promise), "updateEditMessage"); } void UpdatesManager::on_update(tl_object_ptr update, Promise &&promise) { int new_pts = update->pts_; int pts_count = update->pts_count_; if (update->messages_.empty()) { - add_pending_pts_update(make_tl_object(), new_pts, pts_count, Promise(), "updateDeleteMessages"); + add_pending_pts_update(make_tl_object(), new_pts, pts_count, Time::now(), Promise(), + "updateDeleteMessages"); promise.set_value(Unit()); } else { - add_pending_pts_update(std::move(update), new_pts, pts_count, std::move(promise), "updateDeleteMessages"); + add_pending_pts_update(std::move(update), new_pts, pts_count, Time::now(), std::move(promise), + "updateDeleteMessages"); } } void UpdatesManager::on_update(tl_object_ptr update, Promise &&promise) { int new_pts = update->pts_; int pts_count = update->pts_count_; - add_pending_pts_update(std::move(update), new_pts, pts_count, std::move(promise), "updateReadHistoryInbox"); + add_pending_pts_update(std::move(update), new_pts, pts_count, Time::now(), std::move(promise), + "updateReadHistoryInbox"); } void UpdatesManager::on_update(tl_object_ptr update, Promise &&promise) { int new_pts = update->pts_; int pts_count = update->pts_count_; - add_pending_pts_update(std::move(update), new_pts, pts_count, std::move(promise), "updateReadHistoryOutbox"); + add_pending_pts_update(std::move(update), new_pts, pts_count, Time::now(), std::move(promise), + "updateReadHistoryOutbox"); } void UpdatesManager::on_update(tl_object_ptr update, Promise &&promise) { @@ -2325,7 +2353,8 @@ void UpdatesManager::on_update(tl_object_ptr update, Promise &&promise) { int new_pts = update->pts_; int pts_count = update->pts_count_; - add_pending_pts_update(std::move(update), new_pts, pts_count, std::move(promise), "updatePinnedMessages"); + add_pending_pts_update(std::move(update), new_pts, pts_count, Time::now(), std::move(promise), + "updatePinnedMessages"); } void UpdatesManager::on_update(tl_object_ptr update, @@ -2388,7 +2417,7 @@ void UpdatesManager::on_update(tl_object_ptr up void UpdatesManager::on_update(tl_object_ptr update, Promise &&promise) { td_->web_pages_manager_->on_get_web_page(std::move(update->webpage_), DialogId()); - add_pending_pts_update(make_tl_object(), update->pts_, update->pts_count_, Promise(), + add_pending_pts_update(make_tl_object(), update->pts_, update->pts_count_, Time::now(), Promise(), "updateWebPage"); promise.set_value(Unit()); } @@ -2409,8 +2438,8 @@ void UpdatesManager::on_update(tl_object_ptr up } if (update->pts_ > 0) { - add_pending_pts_update(make_tl_object(), update->pts_, update->pts_count_, Promise(), - "updateFolderPeers"); + add_pending_pts_update(make_tl_object(), update->pts_, update->pts_count_, Time::now(), + Promise(), "updateFolderPeers"); } promise.set_value(Unit()); } diff --git a/td/telegram/UpdatesManager.h b/td/telegram/UpdatesManager.h index 4013dd882..66b6a3c77 100644 --- a/td/telegram/UpdatesManager.h +++ b/td/telegram/UpdatesManager.h @@ -95,7 +95,7 @@ class UpdatesManager final : public Actor { void on_get_updates(tl_object_ptr &&updates_ptr, Promise &&promise); void add_pending_pts_update(tl_object_ptr &&update, int32 new_pts, int32 pts_count, - Promise &&promise, const char *source); + double receive_time, Promise &&promise, const char *source); static std::unordered_set get_sent_messages_random_ids(const telegram_api::Updates *updates_ptr); @@ -134,10 +134,16 @@ class UpdatesManager final : public Actor { tl_object_ptr update; int32 pts; int32 pts_count; + double receive_time; Promise promise; - PendingPtsUpdate(tl_object_ptr &&update, int32 pts, int32 pts_count, Promise &&promise) - : update(std::move(update)), pts(pts), pts_count(pts_count), promise(std::move(promise)) { + PendingPtsUpdate(tl_object_ptr &&update, int32 pts, int32 pts_count, double receive_time, + Promise &&promise) + : update(std::move(update)) + , pts(pts) + , pts_count(pts_count) + , receive_time(receive_time) + , promise(std::move(promise)) { } }; @@ -146,17 +152,24 @@ class UpdatesManager final : public Actor { int32 seq_begin; int32 seq_end; int32 date; + double receive_time; vector> updates; Promise promise; - PendingSeqUpdates(int32 seq_begin, int32 seq_end, int32 date, vector> &&updates, - Promise &&promise) - : seq_begin(seq_begin), seq_end(seq_end), date(date), updates(std::move(updates)), promise(std::move(promise)) { + PendingSeqUpdates(int32 seq_begin, int32 seq_end, int32 date, double receive_time, + vector> &&updates, Promise &&promise) + : seq_begin(seq_begin) + , seq_end(seq_end) + , date(date) + , receive_time(receive_time) + , updates(std::move(updates)) + , promise(std::move(promise)) { } }; class PendingQtsUpdate { public: + double receive_time; tl_object_ptr update; vector> promises; }; @@ -243,13 +256,13 @@ class UpdatesManager final : public Actor { void add_pending_qts_update(tl_object_ptr &&update, int32 qts, Promise &&promise); void on_pending_updates(vector> &&updates, int32 seq_begin, int32 seq_end, - int32 date, Promise &&promise, const char *source); + int32 date, double receive_time, Promise &&promise, const char *source); void process_updates(vector> &&updates, bool force_apply, Promise &&promise); void postpone_pts_update(tl_object_ptr &&update, int32 pts, int32 pts_count, - Promise &&promise); + double receive_time, Promise &&promise); void process_pts_update(tl_object_ptr &&update);