Drop pending seq/qts updates received before seq/qts overflow.

This commit is contained in:
levlam 2021-08-15 13:52:00 +03:00
parent 8ac0b02a6d
commit 6194d9ec14

View File

@ -1401,7 +1401,7 @@ void UpdatesManager::after_get_difference() {
return; return;
} }
if (postponed_updates_.size()) { if (!postponed_updates_.empty()) {
VLOG(get_difference) << "Begin to apply " << postponed_updates_.size() << " postponed update chunks"; VLOG(get_difference) << "Begin to apply " << postponed_updates_.size() << " postponed update chunks";
size_t total_update_count = 0; size_t total_update_count = 0;
while (!postponed_updates_.empty()) { while (!postponed_updates_.empty()) {
@ -1427,7 +1427,7 @@ void UpdatesManager::after_get_difference() {
VLOG(get_difference) << "Finish to apply " << total_update_count << " postponed updates"; VLOG(get_difference) << "Finish to apply " << total_update_count << " postponed updates";
} }
if (postponed_pts_updates_.size()) { // must be before td_->messages_manager_->after_get_difference() if (!postponed_pts_updates_.empty()) { // must be before td_->messages_manager_->after_get_difference()
auto postponed_updates = std::move(postponed_pts_updates_); auto postponed_updates = std::move(postponed_pts_updates_);
postponed_pts_updates_.clear(); postponed_pts_updates_.clear();
@ -2081,16 +2081,17 @@ void UpdatesManager::process_pending_seq_updates() {
while (!pending_seq_updates_.empty() && !running_get_difference_) { while (!pending_seq_updates_.empty() && !running_get_difference_) {
auto update_it = pending_seq_updates_.begin(); auto update_it = pending_seq_updates_.begin();
auto seq_begin = update_it->second.seq_begin; auto seq_begin = update_it->second.seq_begin;
if (seq_begin > seq_ + 1) { if (seq_begin - 1 > seq_ && seq_begin - 500000000 <= seq_) {
// the updates will be applied later
break; break;
} }
if (seq_begin == seq_ + 1) { if (seq_begin - 1 == seq_) {
process_seq_updates(update_it->second.seq_end, update_it->second.date, std::move(update_it->second.updates), process_seq_updates(update_it->second.seq_end, update_it->second.date, std::move(update_it->second.updates),
std::move(update_it->second.promise)); std::move(update_it->second.promise));
} else { } else {
// old update // old update
CHECK(seq_begin != 0); CHECK(seq_begin != 0);
LOG_IF(ERROR, update_it->second.seq_end > seq_) LOG_IF(ERROR, update_it->second.seq_end > seq_ && seq_begin - 1 < seq_)
<< "Strange updates coming with seq_begin = " << seq_begin << ", seq_end = " << update_it->second.seq_end << "Strange updates coming with seq_begin = " << seq_begin << ", seq_end = " << update_it->second.seq_end
<< ", but seq = " << seq_; << ", but seq = " << seq_;
update_it->second.promise.set_value(Unit()); update_it->second.promise.set_value(Unit());
@ -2117,12 +2118,15 @@ void UpdatesManager::process_pending_qts_updates() {
if (pending_qts_updates_.empty()) { if (pending_qts_updates_.empty()) {
return; return;
} }
LOG(DEBUG) << "Process " << pending_qts_updates_.size() << " pending qts updates"; LOG(DEBUG) << "Process " << pending_qts_updates_.size() << " pending qts updates";
while (!pending_qts_updates_.empty()) { while (!pending_qts_updates_.empty()) {
CHECK(!running_get_difference_); CHECK(!running_get_difference_);
auto update_it = pending_qts_updates_.begin(); auto update_it = pending_qts_updates_.begin();
auto qts = update_it->first; auto qts = update_it->first;
if (qts > get_qts() + 1) { auto old_qts = get_qts();
if (qts - 1 > old_qts && qts - 500000000 <= old_qts) {
// the update will be applied later
break; break;
} }
auto promise = PromiseCreator::lambda([promises = std::move(update_it->second.promises)](Unit) mutable { auto promise = PromiseCreator::lambda([promises = std::move(update_it->second.promises)](Unit) mutable {
@ -2130,13 +2134,14 @@ void UpdatesManager::process_pending_qts_updates() {
promise.set_value(Unit()); promise.set_value(Unit());
} }
}); });
if (qts == get_qts() + 1) { if (qts == old_qts + 1) {
process_qts_update(std::move(update_it->second.update), qts, std::move(promise)); process_qts_update(std::move(update_it->second.update), qts, std::move(promise));
} else { } else {
promise.set_value(Unit()); promise.set_value(Unit());
} }
pending_qts_updates_.erase(update_it); pending_qts_updates_.erase(update_it);
} }
if (pending_qts_updates_.empty()) { if (pending_qts_updates_.empty()) {
qts_gap_timeout_.cancel_timeout(); qts_gap_timeout_.cancel_timeout();
} else { } else {