Apply postponed pts/qts updates after each getDifference iteration.
This commit is contained in:
parent
b9b9f56b24
commit
c19a0751f0
@ -1370,10 +1370,13 @@ void UpdatesManager::on_get_difference(tl_object_ptr<telegram_api::updates_Diffe
|
|||||||
auto old_date = get_date();
|
auto old_date = get_date();
|
||||||
auto old_qts = get_qts();
|
auto old_qts = get_qts();
|
||||||
on_get_updates_state(std::move(difference->intermediate_state_), "get difference slice");
|
on_get_updates_state(std::move(difference->intermediate_state_), "get difference slice");
|
||||||
|
|
||||||
|
process_postponed_pts_updates();
|
||||||
|
process_pending_qts_updates();
|
||||||
|
|
||||||
auto new_pts = get_pts();
|
auto new_pts = get_pts();
|
||||||
auto new_date = get_date();
|
auto new_date = get_date();
|
||||||
auto new_qts = get_qts();
|
auto new_qts = get_qts();
|
||||||
|
|
||||||
if (old_pts != std::numeric_limits<int32>::max() && new_date == old_date &&
|
if (old_pts != std::numeric_limits<int32>::max() && new_date == old_date &&
|
||||||
(new_pts == old_pts || (min_postponed_update_pts_ != 0 && new_pts >= min_postponed_update_pts_)) &&
|
(new_pts == old_pts || (min_postponed_update_pts_ != 0 && new_pts >= min_postponed_update_pts_)) &&
|
||||||
(new_qts == old_qts || (min_postponed_update_qts_ != 0 && new_qts >= min_postponed_update_qts_))) {
|
(new_qts == old_qts || (min_postponed_update_qts_ != 0 && new_qts >= min_postponed_update_qts_))) {
|
||||||
@ -1952,7 +1955,7 @@ void UpdatesManager::add_pending_pts_update(tl_object_ptr<telegram_api::Update>
|
|||||||
td_->messages_manager_->process_pts_update(std::move(update));
|
td_->messages_manager_->process_pts_update(std::move(update));
|
||||||
|
|
||||||
set_pts(accumulated_pts_, "process pending updates fast path")
|
set_pts(accumulated_pts_, "process pending updates fast path")
|
||||||
.set_value(Unit()); // TODO can't set until get messages are really stored on persistent storage
|
.set_value(Unit()); // TODO can't set until data are really stored on persistent storage
|
||||||
accumulated_pts_count_ = 0;
|
accumulated_pts_count_ = 0;
|
||||||
accumulated_pts_ = -1;
|
accumulated_pts_ = -1;
|
||||||
}
|
}
|
||||||
@ -2086,6 +2089,53 @@ void UpdatesManager::drop_all_pending_pts_updates() {
|
|||||||
pending_pts_updates_.clear();
|
pending_pts_updates_.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void UpdatesManager::process_postponed_pts_updates() {
|
||||||
|
if (postponed_pts_updates_.empty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto initial_pts = get_pts();
|
||||||
|
auto old_pts = initial_pts;
|
||||||
|
int32 skipped_update_count = 0;
|
||||||
|
int32 applied_update_count = 0;
|
||||||
|
while (!postponed_pts_updates_.empty()) {
|
||||||
|
auto update_it = postponed_pts_updates_.begin();
|
||||||
|
auto &update = update_it->second;
|
||||||
|
auto new_pts = update.pts;
|
||||||
|
auto pts_count = update.pts_count;
|
||||||
|
if (new_pts <= old_pts || (old_pts >= 1 && new_pts - 500000000 > old_pts)) {
|
||||||
|
skipped_update_count++;
|
||||||
|
td_->messages_manager_->skip_old_pending_pts_update(std::move(update.update), new_pts, old_pts, pts_count,
|
||||||
|
"process_postponed_pts_updates");
|
||||||
|
update.promise.set_value(Unit());
|
||||||
|
postponed_pts_updates_.erase(update_it);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (old_pts != new_pts - pts_count) {
|
||||||
|
// the updates will be applied or skipped later
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pts_count > 0) {
|
||||||
|
applied_update_count++;
|
||||||
|
td_->messages_manager_->process_pts_update(std::move(update.update));
|
||||||
|
old_pts = new_pts;
|
||||||
|
}
|
||||||
|
update.promise.set_value(Unit());
|
||||||
|
postponed_pts_updates_.erase(update_it);
|
||||||
|
}
|
||||||
|
if (old_pts != initial_pts) {
|
||||||
|
set_pts(old_pts, "process_postponed_pts_updates")
|
||||||
|
.set_value(Unit()); // TODO can't set until data are really stored on persistent storage
|
||||||
|
}
|
||||||
|
CHECK(!running_get_difference_);
|
||||||
|
if (skipped_update_count + applied_update_count > 0) {
|
||||||
|
VLOG(get_difference) << "After skipping " << skipped_update_count << " and applying " << applied_update_count
|
||||||
|
<< " postponed updates pts has changed from " << initial_pts << " to " << old_pts;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void UpdatesManager::process_pending_pts_updates() {
|
void UpdatesManager::process_pending_pts_updates() {
|
||||||
if (pending_pts_updates_.empty()) {
|
if (pending_pts_updates_.empty()) {
|
||||||
return;
|
return;
|
||||||
@ -2104,7 +2154,7 @@ void UpdatesManager::process_pending_pts_updates() {
|
|||||||
if (update.pts_count > 0) {
|
if (update.pts_count > 0) {
|
||||||
td_->messages_manager_->process_pts_update(std::move(update.update));
|
td_->messages_manager_->process_pts_update(std::move(update.update));
|
||||||
set_pts(update.pts, "process_pending_pts_updates")
|
set_pts(update.pts, "process_pending_pts_updates")
|
||||||
.set_value(Unit()); // TODO can't set until get messages are really stored on persistent storage
|
.set_value(Unit()); // TODO can't set until data are really stored on persistent storage
|
||||||
}
|
}
|
||||||
update.promise.set_value(Unit());
|
update.promise.set_value(Unit());
|
||||||
pending_pts_updates_.erase(update_it);
|
pending_pts_updates_.erase(update_it);
|
||||||
|
@ -276,6 +276,8 @@ class UpdatesManager final : public Actor {
|
|||||||
|
|
||||||
void drop_all_pending_pts_updates();
|
void drop_all_pending_pts_updates();
|
||||||
|
|
||||||
|
void process_postponed_pts_updates();
|
||||||
|
|
||||||
void process_pending_pts_updates();
|
void process_pending_pts_updates();
|
||||||
|
|
||||||
void process_pending_seq_updates();
|
void process_pending_seq_updates();
|
||||||
|
Loading…
Reference in New Issue
Block a user