diff --git a/td/telegram/UpdatesManager.cpp b/td/telegram/UpdatesManager.cpp index f02cb5141..3918141f6 100644 --- a/td/telegram/UpdatesManager.cpp +++ b/td/telegram/UpdatesManager.cpp @@ -1370,10 +1370,13 @@ void UpdatesManager::on_get_difference(tl_object_ptrintermediate_state_), "get difference slice"); + + process_postponed_pts_updates(); + process_pending_qts_updates(); + auto new_pts = get_pts(); auto new_date = get_date(); auto new_qts = get_qts(); - if (old_pts != std::numeric_limits::max() && new_date == old_date && (new_pts == old_pts || (min_postponed_update_pts_ != 0 && new_pts >= min_postponed_update_pts_)) && (new_qts == old_qts || (min_postponed_update_qts_ != 0 && new_qts >= min_postponed_update_qts_))) { @@ -1952,7 +1955,7 @@ void UpdatesManager::add_pending_pts_update(tl_object_ptr td_->messages_manager_->process_pts_update(std::move(update)); set_pts(accumulated_pts_, "process pending updates fast path") - .set_value(Unit()); // TODO can't set until get messages are really stored on persistent storage + .set_value(Unit()); // TODO can't set until data are really stored on persistent storage accumulated_pts_count_ = 0; accumulated_pts_ = -1; } @@ -2086,6 +2089,53 @@ void UpdatesManager::drop_all_pending_pts_updates() { pending_pts_updates_.clear(); } +void UpdatesManager::process_postponed_pts_updates() { + if (postponed_pts_updates_.empty()) { + return; + } + + auto initial_pts = get_pts(); + auto old_pts = initial_pts; + int32 skipped_update_count = 0; + int32 applied_update_count = 0; + while (!postponed_pts_updates_.empty()) { + auto update_it = postponed_pts_updates_.begin(); + auto &update = update_it->second; + auto new_pts = update.pts; + auto pts_count = update.pts_count; + if (new_pts <= old_pts || (old_pts >= 1 && new_pts - 500000000 > old_pts)) { + skipped_update_count++; + td_->messages_manager_->skip_old_pending_pts_update(std::move(update.update), new_pts, old_pts, pts_count, + "process_postponed_pts_updates"); + update.promise.set_value(Unit()); + postponed_pts_updates_.erase(update_it); + continue; + } + + if (old_pts != new_pts - pts_count) { + // the updates will be applied or skipped later + break; + } + + if (pts_count > 0) { + applied_update_count++; + td_->messages_manager_->process_pts_update(std::move(update.update)); + old_pts = new_pts; + } + update.promise.set_value(Unit()); + postponed_pts_updates_.erase(update_it); + } + if (old_pts != initial_pts) { + set_pts(old_pts, "process_postponed_pts_updates") + .set_value(Unit()); // TODO can't set until data are really stored on persistent storage + } + CHECK(!running_get_difference_); + if (skipped_update_count + applied_update_count > 0) { + VLOG(get_difference) << "After skipping " << skipped_update_count << " and applying " << applied_update_count + << " postponed updates pts has changed from " << initial_pts << " to " << old_pts; + } +} + void UpdatesManager::process_pending_pts_updates() { if (pending_pts_updates_.empty()) { return; @@ -2104,7 +2154,7 @@ void UpdatesManager::process_pending_pts_updates() { if (update.pts_count > 0) { td_->messages_manager_->process_pts_update(std::move(update.update)); set_pts(update.pts, "process_pending_pts_updates") - .set_value(Unit()); // TODO can't set until get messages are really stored on persistent storage + .set_value(Unit()); // TODO can't set until data are really stored on persistent storage } update.promise.set_value(Unit()); pending_pts_updates_.erase(update_it); diff --git a/td/telegram/UpdatesManager.h b/td/telegram/UpdatesManager.h index 11a6d57aa..8a280fb5f 100644 --- a/td/telegram/UpdatesManager.h +++ b/td/telegram/UpdatesManager.h @@ -276,6 +276,8 @@ class UpdatesManager final : public Actor { void drop_all_pending_pts_updates(); + void process_postponed_pts_updates(); + void process_pending_pts_updates(); void process_pending_seq_updates();