From b9b9f56b24df0691cdaed78160feaab1b5133cf3 Mon Sep 17 00:00:00 2001 From: levlam Date: Mon, 16 Aug 2021 12:19:30 +0300 Subject: [PATCH] Process pending pts updates as fast as possible. --- td/telegram/UpdatesManager.cpp | 65 ++++++++++++++++++++++++++++------ td/telegram/UpdatesManager.h | 7 ++-- 2 files changed, 60 insertions(+), 12 deletions(-) diff --git a/td/telegram/UpdatesManager.cpp b/td/telegram/UpdatesManager.cpp index f1bc3790e..f02cb5141 100644 --- a/td/telegram/UpdatesManager.cpp +++ b/td/telegram/UpdatesManager.cpp @@ -308,7 +308,7 @@ void UpdatesManager::before_get_difference(bool is_initial) { postponed_pts_updates_.insert(std::make_move_iterator(pending_pts_updates_.begin()), std::make_move_iterator(pending_pts_updates_.end())); - drop_pending_pts_updates(); + drop_all_pending_pts_updates(); send_closure_later(td_->notification_manager_actor_, &NotificationManager::before_get_difference); } @@ -1952,7 +1952,7 @@ void UpdatesManager::add_pending_pts_update(tl_object_ptr td_->messages_manager_->process_pts_update(std::move(update)); set_pts(accumulated_pts_, "process pending updates fast path") - .set_value(Unit()); // TODO can't set until get messages really stored on persistent storage + .set_value(Unit()); // TODO can't set until get messages are really stored on persistent storage accumulated_pts_count_ = 0; accumulated_pts_ = -1; } @@ -1964,12 +1964,17 @@ void UpdatesManager::add_pending_pts_update(tl_object_ptr new_pts, PendingPtsUpdate(std::move(update), new_pts, pts_count, receive_time, std::move(promise))); if (old_pts < accumulated_pts_ - accumulated_pts_count_) { - set_pts_gap_timeout(receive_time + MAX_UNFILLED_GAP_TIME - Time::now()); + if (old_pts == new_pts - pts_count) { + // can't apply all updates, but can apply this and probably some other updates + process_pending_pts_updates(); + } else { + set_pts_gap_timeout(receive_time + MAX_UNFILLED_GAP_TIME - Time::now()); + } return; } CHECK(old_pts == accumulated_pts_ - accumulated_pts_count_); - process_pending_pts_updates(); + process_all_pending_pts_updates(); } void UpdatesManager::postpone_pts_update(tl_object_ptr &&update, int32 pts, int32 pts_count, @@ -2052,7 +2057,7 @@ void UpdatesManager::process_qts_update(tl_object_ptr &&up promise.set_value(Unit()); } -void UpdatesManager::process_pending_pts_updates() { +void UpdatesManager::process_all_pending_pts_updates() { auto begin_time = Time::now(); for (auto &update : pending_pts_updates_) { td_->messages_manager_->process_pts_update(std::move(update.second.update)); @@ -2069,18 +2074,58 @@ void UpdatesManager::process_pending_pts_updates() { } } - set_pts(accumulated_pts_, "process_pending_pts_updates") + set_pts(accumulated_pts_, "process_all_pending_pts_updates") .set_value(Unit()); // TODO can't set until updates are stored on persistent storage - drop_pending_pts_updates(); + drop_all_pending_pts_updates(); } -void UpdatesManager::drop_pending_pts_updates() { +void UpdatesManager::drop_all_pending_pts_updates() { accumulated_pts_count_ = 0; accumulated_pts_ = -1; pts_gap_timeout_.cancel_timeout(); pending_pts_updates_.clear(); } +void UpdatesManager::process_pending_pts_updates() { + if (pending_pts_updates_.empty()) { + return; + } + + bool processed_pending_update = false; + while (!pending_pts_updates_.empty()) { + auto update_it = pending_pts_updates_.begin(); + auto &update = update_it->second; + if (get_pts() != update.pts - update.pts_count) { + // the updates will be applied or skipped later + break; + } + + processed_pending_update = true; + if (update.pts_count > 0) { + td_->messages_manager_->process_pts_update(std::move(update.update)); + set_pts(update.pts, "process_pending_pts_updates") + .set_value(Unit()); // TODO can't set until get messages are really stored on persistent storage + } + update.promise.set_value(Unit()); + pending_pts_updates_.erase(update_it); + } + if (processed_pending_update) { + pts_gap_timeout_.cancel_timeout(); + } + if (!pending_pts_updates_.empty()) { + // if still have a gap, reset timeout + auto update_it = pending_pts_updates_.begin(); + double receive_time = update_it->second.receive_time; + for (size_t i = 0; i < GAP_TIMEOUT_UPDATE_COUNT; i++) { + if (++update_it == pending_pts_updates_.end()) { + break; + } + receive_time = min(receive_time, update_it->second.receive_time); + } + set_pts_gap_timeout(receive_time + MAX_UNFILLED_GAP_TIME - Time::now()); + } +} + void UpdatesManager::process_pending_seq_updates() { if (!pending_seq_updates_.empty()) { LOG(DEBUG) << "Trying to process " << pending_seq_updates_.size() << " pending seq updates"; @@ -2119,7 +2164,7 @@ void UpdatesManager::process_pending_seq_updates() { // if still have a gap, reset timeout auto update_it = pending_seq_updates_.begin(); double receive_time = update_it->second.receive_time; - for (size_t i = 0; i < 10; i++) { + for (size_t i = 0; i < GAP_TIMEOUT_UPDATE_COUNT; i++) { if (++update_it == pending_seq_updates_.end()) { break; } @@ -2166,7 +2211,7 @@ void UpdatesManager::process_pending_qts_updates() { // if still have a gap, reset timeout auto update_it = pending_qts_updates_.begin(); double receive_time = update_it->second.receive_time; - for (size_t i = 0; i < 10; i++) { + for (size_t i = 0; i < GAP_TIMEOUT_UPDATE_COUNT; i++) { if (++update_it == pending_qts_updates_.end()) { break; } diff --git a/td/telegram/UpdatesManager.h b/td/telegram/UpdatesManager.h index b9f6a0553..11a6d57aa 100644 --- a/td/telegram/UpdatesManager.h +++ b/td/telegram/UpdatesManager.h @@ -124,6 +124,7 @@ class UpdatesManager final : public Actor { private: static constexpr int32 FORCED_GET_DIFFERENCE_PTS_DIFF = 100000; + static constexpr int32 GAP_TIMEOUT_UPDATE_COUNT = 20; static const double MAX_UNFILLED_GAP_TIME; static constexpr bool DROP_PTS_UPDATES = false; @@ -271,14 +272,16 @@ class UpdatesManager final : public Actor { void process_qts_update(tl_object_ptr &&update_ptr, int32 qts, Promise &&promise); + void process_all_pending_pts_updates(); + + void drop_all_pending_pts_updates(); + void process_pending_pts_updates(); void process_pending_seq_updates(); void process_pending_qts_updates(); - void drop_pending_pts_updates(); - static void fill_pts_gap(void *td); static void fill_seq_gap(void *td);