diff --git a/td/telegram/UpdatesManager.cpp b/td/telegram/UpdatesManager.cpp index 837101d02..997860517 100644 --- a/td/telegram/UpdatesManager.cpp +++ b/td/telegram/UpdatesManager.cpp @@ -181,9 +181,18 @@ void UpdatesManager::fill_pts_gap(void *td) { return; } - auto td_ptr = static_cast(td); - string source = PSTRING() << "pts from " << td_ptr->updates_manager_->get_pts() << " to " - << td_ptr->updates_manager_->get_min_pending_pts(); + auto updates_manager = static_cast(td)->updates_manager_.get(); + auto min_pts = std::numeric_limits::max(); + auto max_pts = 0; + if (!updates_manager->pending_pts_updates_.empty()) { + min_pts = min(min_pts, updates_manager->pending_pts_updates_.begin()->first); + max_pts = max(max_pts, updates_manager->pending_pts_updates_.rbegin()->first); + } + if (!updates_manager->postponed_pts_updates_.empty()) { + min_pts = min(min_pts, updates_manager->postponed_pts_updates_.begin()->first); + max_pts = max(max_pts, updates_manager->postponed_pts_updates_.rbegin()->first); + } + string source = PSTRING() << "pts from " << updates_manager->get_pts() << " to " << min_pts << '-' << max_pts; fill_gap(td, source.c_str()); } @@ -193,12 +202,14 @@ void UpdatesManager::fill_seq_gap(void *td) { return; } - auto td_ptr = static_cast(td); - auto seq = std::numeric_limits::max(); - if (!td_ptr->updates_manager_->pending_seq_updates_.empty()) { - seq = td_ptr->updates_manager_->pending_seq_updates_.begin()->first; + auto updates_manager = static_cast(td)->updates_manager_.get(); + auto min_seq = std::numeric_limits::max(); + auto max_seq = 0; + if (!updates_manager->pending_seq_updates_.empty()) { + min_seq = updates_manager->pending_seq_updates_.begin()->first; + max_seq = updates_manager->pending_seq_updates_.rbegin()->second.seq_end; } - string source = PSTRING() << "seq from " << td_ptr->updates_manager_->seq_ << " to " << seq; + string source = PSTRING() << "seq from " << updates_manager->seq_ << " to " << min_seq << '-' << max_seq; fill_gap(td, source.c_str()); } @@ -208,12 +219,14 @@ void UpdatesManager::fill_qts_gap(void *td) { return; } - auto td_ptr = static_cast(td); - auto qts = std::numeric_limits::max(); - if (!td_ptr->updates_manager_->pending_qts_updates_.empty()) { - qts = td_ptr->updates_manager_->pending_qts_updates_.begin()->first; + auto updates_manager = static_cast(td)->updates_manager_.get(); + auto min_qts = std::numeric_limits::max(); + auto max_qts = 0; + if (!updates_manager->pending_qts_updates_.empty()) { + min_qts = updates_manager->pending_qts_updates_.begin()->first; + max_qts = updates_manager->pending_qts_updates_.rbegin()->first; } - string source = PSTRING() << "qts from " << td_ptr->updates_manager_->get_qts() << " to " << qts; + string source = PSTRING() << "qts from " << updates_manager->get_qts() << " to " << min_qts << '-' << max_qts; fill_gap(td, source.c_str()); } @@ -295,7 +308,7 @@ void UpdatesManager::before_get_difference(bool is_initial) { postponed_pts_updates_.insert(std::make_move_iterator(pending_pts_updates_.begin()), std::make_move_iterator(pending_pts_updates_.end())); - drop_pending_pts_updates(); + drop_all_pending_pts_updates(); send_closure_later(td_->notification_manager_actor_, &NotificationManager::before_get_difference); } @@ -1353,19 +1366,25 @@ void UpdatesManager::on_get_difference(tl_object_ptrintermediate_state_); - if (get_pts() != std::numeric_limits::max() && state->date_ == get_date() && - (state->pts_ == get_pts() || - (min_postponed_update_pts_ != 0 && state->pts_ - 500 >= min_postponed_update_pts_)) && - (state->qts_ == get_qts() || - (min_postponed_update_qts_ != 0 && state->qts_ - 500 >= min_postponed_update_qts_))) { - on_get_updates_state(std::move(state), "get difference final slice"); - VLOG(get_difference) << "Trying to switch back from getDifference to update processing"; + auto old_pts = get_pts(); + auto old_date = get_date(); + auto old_qts = get_qts(); + on_get_updates_state(std::move(difference->intermediate_state_), "get difference slice"); + + process_postponed_pts_updates(); + process_pending_qts_updates(); + + auto new_pts = get_pts(); + auto new_date = get_date(); + auto new_qts = get_qts(); + if (old_pts != std::numeric_limits::max() && new_date == old_date && + (new_pts == old_pts || (min_postponed_update_pts_ != 0 && new_pts >= min_postponed_update_pts_)) && + (new_qts == old_qts || (min_postponed_update_qts_ != 0 && new_qts >= min_postponed_update_qts_))) { + VLOG(get_difference) << "Switch back from getDifference to update processing"; break; } - on_get_updates_state(std::move(state), "get difference slice"); - if (get_pts() != -1) { // just in case + if (new_pts != -1) { // just in case run_get_difference(true, "on updates_differenceSlice"); } break; @@ -1393,10 +1412,14 @@ void UpdatesManager::after_get_difference() { retry_timeout_.cancel_timeout(); retry_time_ = 1; + // cancels qts_gap_timeout_ if needed, can apply some updates received during getDifference, + // but missed in getDifference process_pending_qts_updates(); - process_pending_seq_updates(); // cancels seq_gap_timeout_, may apply some updates received before getDifference, - // but not returned in getDifference + // cancels seq_gap_timeout_ if needed, can apply some updates received during getDifference, + // but missed in getDifference + process_pending_seq_updates(); + if (running_get_difference_) { return; } @@ -1431,15 +1454,17 @@ void UpdatesManager::after_get_difference() { auto postponed_updates = std::move(postponed_pts_updates_); postponed_pts_updates_.clear(); - LOG(INFO) << "Begin to apply " << postponed_updates.size() << " postponed pts updates"; + VLOG(get_difference) << "Begin to apply " << postponed_updates.size() + << " postponed pts updates with pts = " << get_pts(); for (auto &postponed_update : postponed_updates) { auto &update = postponed_update.second; add_pending_pts_update(std::move(update.update), update.pts, update.pts_count, update.receive_time, std::move(update.promise), "after get difference"); CHECK(!running_get_difference_); } - LOG(INFO) << "Finish to apply postponed pts updates, have " << postponed_pts_updates_.size() - << " left postponed updates"; + VLOG(get_difference) << "After applying postponed pts updates have pts = " << get_pts() + << ", max_pts = " << accumulated_pts_ << " and " << pending_pts_updates_.size() << " + " + << postponed_pts_updates_.size() << " pending pts updates"; } td_->animations_manager_->after_get_difference(); @@ -1656,11 +1681,11 @@ void UpdatesManager::on_pending_updates(vector seq_) { - LOG(ERROR) << "Strange updates from " << source << " coming with seq_begin = " << seq_begin - << ", seq_end = " << seq_end << ", but seq = " << seq_; + LOG(ERROR) << "Receive updates with seq_begin = " << seq_begin << ", seq_end = " << seq_end + << ", but seq = " << seq_ << " from " << source; } else { - LOG(INFO) << "Old updates from " << source << " coming with seq_begin = " << seq_begin - << ", seq_end = " << seq_end << ", but seq = " << seq_; + LOG(INFO) << "Receive old updates with seq_begin = " << seq_begin << ", seq_end = " << seq_end + << ", but seq = " << seq_ << " from " << source; } return lock.set_value(Unit()); } @@ -1817,23 +1842,6 @@ void UpdatesManager::process_updates(vector> lock.set_value(Unit()); } -int32 UpdatesManager::get_min_pending_pts() const { - int32 result = std::numeric_limits::max(); - if (!pending_pts_updates_.empty()) { - auto pts = pending_pts_updates_.begin()->first; - if (pts < result) { - result = pts; - } - } - if (!postponed_pts_updates_.empty()) { - auto pts = postponed_pts_updates_.begin()->first; - if (pts < result) { - result = pts; - } - } - return result; -} - void UpdatesManager::process_pts_update(tl_object_ptr &&update) { CHECK(update != nullptr); @@ -1947,7 +1955,7 @@ void UpdatesManager::add_pending_pts_update(tl_object_ptr td_->messages_manager_->process_pts_update(std::move(update)); set_pts(accumulated_pts_, "process pending updates fast path") - .set_value(Unit()); // TODO can't set until get messages really stored on persistent storage + .set_value(Unit()); // TODO can't set until data are really stored on persistent storage accumulated_pts_count_ = 0; accumulated_pts_ = -1; } @@ -1959,12 +1967,17 @@ void UpdatesManager::add_pending_pts_update(tl_object_ptr new_pts, PendingPtsUpdate(std::move(update), new_pts, pts_count, receive_time, std::move(promise))); if (old_pts < accumulated_pts_ - accumulated_pts_count_) { - set_pts_gap_timeout(receive_time + MAX_UNFILLED_GAP_TIME - Time::now()); + if (old_pts == new_pts - pts_count) { + // can't apply all updates, but can apply this and probably some other updates + process_pending_pts_updates(); + } else { + set_pts_gap_timeout(receive_time + MAX_UNFILLED_GAP_TIME - Time::now()); + } return; } CHECK(old_pts == accumulated_pts_ - accumulated_pts_count_); - process_pending_pts_updates(); + process_all_pending_pts_updates(); } void UpdatesManager::postpone_pts_update(tl_object_ptr &&update, int32 pts, int32 pts_count, @@ -2047,64 +2060,167 @@ void UpdatesManager::process_qts_update(tl_object_ptr &&up promise.set_value(Unit()); } -void UpdatesManager::process_pending_pts_updates() { +void UpdatesManager::process_all_pending_pts_updates() { + auto begin_time = Time::now(); for (auto &update : pending_pts_updates_) { td_->messages_manager_->process_pts_update(std::move(update.second.update)); update.second.promise.set_value(Unit()); } if (last_pts_gap_time_ != 0) { + auto begin_diff = begin_time - last_pts_gap_time_; auto diff = Time::now() - last_pts_gap_time_; last_pts_gap_time_ = 0; if (diff > 0.1) { VLOG(get_difference) << "Gap in pts from " << accumulated_pts_ - accumulated_pts_count_ << " to " - << accumulated_pts_ << " has been filled in " << diff << " seconds"; + << accumulated_pts_ << " has been filled in " << begin_diff << '-' << diff << " seconds"; } } - set_pts(accumulated_pts_, "postpone_pending_pts_update") + set_pts(accumulated_pts_, "process_all_pending_pts_updates") .set_value(Unit()); // TODO can't set until updates are stored on persistent storage - drop_pending_pts_updates(); + drop_all_pending_pts_updates(); } -void UpdatesManager::drop_pending_pts_updates() { +void UpdatesManager::drop_all_pending_pts_updates() { accumulated_pts_count_ = 0; accumulated_pts_ = -1; pts_gap_timeout_.cancel_timeout(); pending_pts_updates_.clear(); } +void UpdatesManager::process_postponed_pts_updates() { + if (postponed_pts_updates_.empty()) { + return; + } + + auto initial_pts = get_pts(); + auto old_pts = initial_pts; + int32 skipped_update_count = 0; + int32 applied_update_count = 0; + while (!postponed_pts_updates_.empty()) { + auto update_it = postponed_pts_updates_.begin(); + auto &update = update_it->second; + auto new_pts = update.pts; + auto pts_count = update.pts_count; + if (new_pts <= old_pts || (old_pts >= 1 && new_pts - 500000000 > old_pts)) { + skipped_update_count++; + td_->messages_manager_->skip_old_pending_pts_update(std::move(update.update), new_pts, old_pts, pts_count, + "process_postponed_pts_updates"); + update.promise.set_value(Unit()); + postponed_pts_updates_.erase(update_it); + continue; + } + + if (old_pts != new_pts - pts_count) { + // the updates will be applied or skipped later + break; + } + + if (pts_count > 0) { + applied_update_count++; + td_->messages_manager_->process_pts_update(std::move(update.update)); + old_pts = new_pts; + } + update.promise.set_value(Unit()); + postponed_pts_updates_.erase(update_it); + } + if (old_pts != initial_pts) { + set_pts(old_pts, "process_postponed_pts_updates") + .set_value(Unit()); // TODO can't set until data are really stored on persistent storage + } + CHECK(!running_get_difference_); + if (skipped_update_count + applied_update_count > 0) { + VLOG(get_difference) << "After skipping " << skipped_update_count << " and applying " << applied_update_count + << " postponed updates pts has changed from " << initial_pts << " to " << old_pts; + } +} + +void UpdatesManager::process_pending_pts_updates() { + if (pending_pts_updates_.empty()) { + return; + } + + bool processed_pending_update = false; + while (!pending_pts_updates_.empty()) { + auto update_it = pending_pts_updates_.begin(); + auto &update = update_it->second; + if (get_pts() != update.pts - update.pts_count) { + // the updates will be applied or skipped later + break; + } + + processed_pending_update = true; + if (update.pts_count > 0) { + td_->messages_manager_->process_pts_update(std::move(update.update)); + set_pts(update.pts, "process_pending_pts_updates") + .set_value(Unit()); // TODO can't set until data are really stored on persistent storage + + if (accumulated_pts_ != -1) { + CHECK(update.pts <= accumulated_pts_); + CHECK(accumulated_pts_count_ >= update.pts_count); + accumulated_pts_count_ -= update.pts_count; + } + } + update.promise.set_value(Unit()); + pending_pts_updates_.erase(update_it); + } + if (processed_pending_update) { + pts_gap_timeout_.cancel_timeout(); + } + if (!pending_pts_updates_.empty()) { + // if still have a gap, reset timeout + auto update_it = pending_pts_updates_.begin(); + double receive_time = update_it->second.receive_time; + for (size_t i = 0; i < GAP_TIMEOUT_UPDATE_COUNT; i++) { + if (++update_it == pending_pts_updates_.end()) { + break; + } + receive_time = min(receive_time, update_it->second.receive_time); + } + set_pts_gap_timeout(receive_time + MAX_UNFILLED_GAP_TIME - Time::now()); + } +} + void UpdatesManager::process_pending_seq_updates() { if (!pending_seq_updates_.empty()) { LOG(DEBUG) << "Trying to process " << pending_seq_updates_.size() << " pending seq updates"; + // must not return, because in case of seq overflow there are no pending seq updates } + + bool processed_pending_update = false; while (!pending_seq_updates_.empty() && !running_get_difference_) { auto update_it = pending_seq_updates_.begin(); - auto seq_begin = update_it->second.seq_begin; + auto &update = update_it->second; + auto seq_begin = update.seq_begin; if (seq_begin - 1 > seq_ && seq_begin - 500000000 <= seq_) { // the updates will be applied later break; } + + processed_pending_update = true; + auto seq_end = update.seq_end; if (seq_begin - 1 == seq_) { - process_seq_updates(update_it->second.seq_end, update_it->second.date, std::move(update_it->second.updates), - std::move(update_it->second.promise)); + process_seq_updates(seq_end, update.date, std::move(update.updates), std::move(update.promise)); } else { // old update CHECK(seq_begin != 0); - LOG_IF(ERROR, update_it->second.seq_end > seq_ && seq_begin - 1 < seq_) - << "Strange updates coming with seq_begin = " << seq_begin << ", seq_end = " << update_it->second.seq_end - << ", but seq = " << seq_; - update_it->second.promise.set_value(Unit()); + if (seq_begin <= seq_ && seq_ < seq_end) { + LOG(ERROR) << "Receive updates with seq_begin = " << seq_begin << ", seq_end = " << seq_end + << ", but seq = " << seq_; + } + update.promise.set_value(Unit()); } pending_seq_updates_.erase(update_it); } - if (pending_seq_updates_.empty()) { + if (pending_seq_updates_.empty() || processed_pending_update) { seq_gap_timeout_.cancel_timeout(); - } else { - // if after getDifference still have a gap + } + if (!pending_seq_updates_.empty()) { + // if still have a gap, reset timeout auto update_it = pending_seq_updates_.begin(); double receive_time = update_it->second.receive_time; - for (size_t i = 0; i < 10; i++) { + for (size_t i = 0; i < GAP_TIMEOUT_UPDATE_COUNT; i++) { if (++update_it == pending_seq_updates_.end()) { break; } @@ -2120,6 +2236,7 @@ void UpdatesManager::process_pending_qts_updates() { } LOG(DEBUG) << "Process " << pending_qts_updates_.size() << " pending qts updates"; + bool processed_pending_update = false; while (!pending_qts_updates_.empty()) { CHECK(!running_get_difference_); auto update_it = pending_qts_updates_.begin(); @@ -2134,6 +2251,7 @@ void UpdatesManager::process_pending_qts_updates() { promise.set_value(Unit()); } }); + processed_pending_update = true; if (qts == old_qts + 1) { process_qts_update(std::move(update_it->second.update), qts, std::move(promise)); } else { @@ -2142,13 +2260,14 @@ void UpdatesManager::process_pending_qts_updates() { pending_qts_updates_.erase(update_it); } - if (pending_qts_updates_.empty()) { + if (processed_pending_update) { qts_gap_timeout_.cancel_timeout(); - } else { - // if after getDifference still have a gap + } + if (!pending_qts_updates_.empty()) { + // if still have a gap, reset timeout auto update_it = pending_qts_updates_.begin(); double receive_time = update_it->second.receive_time; - for (size_t i = 0; i < 10; i++) { + for (size_t i = 0; i < GAP_TIMEOUT_UPDATE_COUNT; i++) { if (++update_it == pending_qts_updates_.end()) { break; } diff --git a/td/telegram/UpdatesManager.h b/td/telegram/UpdatesManager.h index 66b6a3c77..8a280fb5f 100644 --- a/td/telegram/UpdatesManager.h +++ b/td/telegram/UpdatesManager.h @@ -124,6 +124,7 @@ class UpdatesManager final : public Actor { private: static constexpr int32 FORCED_GET_DIFFERENCE_PTS_DIFF = 100000; + static constexpr int32 GAP_TIMEOUT_UPDATE_COUNT = 20; static const double MAX_UNFILLED_GAP_TIME; static constexpr bool DROP_PTS_UPDATES = false; @@ -271,14 +272,18 @@ class UpdatesManager final : public Actor { void process_qts_update(tl_object_ptr &&update_ptr, int32 qts, Promise &&promise); + void process_all_pending_pts_updates(); + + void drop_all_pending_pts_updates(); + + void process_postponed_pts_updates(); + void process_pending_pts_updates(); void process_pending_seq_updates(); void process_pending_qts_updates(); - void drop_pending_pts_updates(); - static void fill_pts_gap(void *td); static void fill_seq_gap(void *td); @@ -305,8 +310,6 @@ class UpdatesManager final : public Actor { void after_get_difference(); - int32 get_min_pending_pts() const; - static bool have_update_pts_changed(const vector> &updates); static bool check_pts_update_dialog_id(DialogId dialog_id);