Switch back from getDifference to updates handling if there are too much of them.

This commit is contained in:
levlam 2021-01-05 16:06:51 +03:00
parent 0ee37c5e64
commit 970f65604a
2 changed files with 74 additions and 7 deletions

View File

@ -224,12 +224,26 @@ void UpdatesManager::get_difference(const char *source) {
VLOG(get_difference) << "Skip running getDifference from " << source << " because it is already running";
return;
}
run_get_difference(false, source);
}
void UpdatesManager::run_get_difference(bool is_recursive, const char *source) {
CHECK(get_pts() != -1);
CHECK(td_->auth_manager_->is_authorized());
CHECK(!running_get_difference_);
running_get_difference_ = true;
VLOG(get_difference) << "-----BEGIN GET DIFFERENCE----- from " << source;
before_get_difference(false);
if (!is_recursive) {
min_postponed_update_pts_ = 0;
min_postponed_update_qts_ = 0;
}
td_->create_handler<GetDifferenceQuery>()->send();
last_get_difference_pts_ = get_pts();
}
@ -1137,6 +1151,11 @@ void UpdatesManager::on_get_difference(tl_object_ptr<telegram_api::updates_Diffe
VLOG(get_difference) << "----- END GET DIFFERENCE-----";
running_get_difference_ = false;
if (!td_->auth_manager_->is_authorized()) {
// just in case
return;
}
if (difference_ptr == nullptr) {
on_failed_get_difference();
return;
@ -1192,8 +1211,10 @@ void UpdatesManager::on_get_difference(tl_object_ptr<telegram_api::updates_Diffe
}
case telegram_api::updates_differenceSlice::ID: {
auto difference = move_tl_object_as<telegram_api::updates_differenceSlice>(difference_ptr);
bool is_pts_changed = have_update_pts_changed(difference->other_updates_);
if (difference->intermediate_state_->pts_ >= get_pts() && get_pts() != std::numeric_limits<int32>::max() &&
difference->intermediate_state_->date_ >= date_ && difference->intermediate_state_->qts_ == get_qts()) {
difference->intermediate_state_->date_ >= date_ && difference->intermediate_state_->qts_ == get_qts() &&
!is_pts_changed) {
// TODO send new getDifference request and apply difference slice only after that
}
@ -1206,12 +1227,28 @@ void UpdatesManager::on_get_difference(tl_object_ptr<telegram_api::updates_Diffe
std::move(difference->new_encrypted_messages_),
std::move(difference->other_updates_));
if (running_get_difference_) {
LOG(ERROR) << "Get difference has run while processing get difference updates";
if (!is_pts_changed) {
LOG(ERROR) << "Get difference has run while processing get difference updates";
}
break;
}
CHECK(!is_pts_changed);
auto state = std::move(difference->intermediate_state_);
if (get_pts() != std::numeric_limits<int32>::max() && state->date_ == get_date() &&
(state->pts_ == get_pts() ||
(min_postponed_update_pts_ != 0 && state->pts_ >= min_postponed_update_pts_ + 1000)) &&
(state->qts_ == get_qts() ||
(min_postponed_update_qts_ != 0 && state->qts_ >= min_postponed_update_qts_ + 1000))) {
on_get_updates_state(std::move(state), "get difference final slice");
VLOG(get_difference) << "Trying to switch back from getDifference to update processing";
break;
}
on_get_updates_state(std::move(difference->intermediate_state_), "get difference slice");
get_difference("on updates_differenceSlice");
on_get_updates_state(std::move(state), "get difference slice");
if (get_pts() != -1) { // just in case
run_get_difference(true, "on updates_differenceSlice");
}
break;
}
case telegram_api::updates_differenceTooLong::ID: {
@ -1246,7 +1283,8 @@ void UpdatesManager::after_get_difference() {
}
if (postponed_updates_.size()) {
VLOG(get_difference) << "Begin to apply " << postponed_updates_.size() << " postponed updates";
VLOG(get_difference) << "Begin to apply " << postponed_updates_.size() << " postponed update chunks";
size_t total_update_count = 0;
while (!postponed_updates_.empty()) {
auto it = postponed_updates_.begin();
auto updates = std::move(it->second.updates);
@ -1255,15 +1293,18 @@ void UpdatesManager::after_get_difference() {
auto promise = std::move(it->second.promise);
// ignore it->second.date, because it may be too old
postponed_updates_.erase(it);
auto update_count = updates.size();
on_pending_updates(std::move(updates), updates_seq_begin, updates_seq_end, 0, std::move(promise),
"postponed updates");
if (running_get_difference_) {
VLOG(get_difference) << "Finish to apply postponed updates with " << postponed_updates_.size()
<< " updates left, because forced to run getDifference";
<< " updates left after applied " << total_update_count
<< " updates, because forced to run getDifference";
return;
}
total_update_count += update_count;
}
VLOG(get_difference) << "Finish to apply postponed updates";
VLOG(get_difference) << "Finish to apply " << total_update_count << " postponed updates";
}
td_->animations_manager_->after_get_difference();
@ -1300,6 +1341,16 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
if (running_get_difference_ /*|| string(source) != string("postponed updates")*/) {
LOG(INFO) << "Postpone " << updates.size() << " updates [" << seq_begin << ", " << seq_end
<< "] with date = " << date << " from " << source;
for (auto &update : updates) {
auto pts = get_update_pts(update.get());
if (pts != 0 && (min_postponed_update_pts_ == 0 || pts < min_postponed_update_pts_)) {
min_postponed_update_pts_ = pts;
}
auto qts = get_update_qts(update.get());
if (qts != 0 && (min_postponed_update_qts_ == 0 || qts < min_postponed_update_qts_)) {
min_postponed_update_qts_ = qts;
}
}
postponed_updates_.emplace(seq_begin,
PendingUpdates(seq_begin, seq_end, date, std::move(updates), std::move(promise)));
return;
@ -2015,6 +2066,16 @@ int32 UpdatesManager::get_short_update_date() const {
return now;
}
bool UpdatesManager::have_update_pts_changed(const vector<tl_object_ptr<telegram_api::Update>> &updates) {
for (auto &update : updates) {
CHECK(update != nullptr);
if (update->get_id() == telegram_api::updatePtsChanged::ID) {
return true;
}
}
return false;
}
int32 UpdatesManager::get_update_pts(const telegram_api::Update *update) {
switch (update->get_id()) {
case telegram_api::updateNewMessage::ID:

View File

@ -133,6 +133,8 @@ class UpdatesManager : public Actor {
bool running_get_difference_ = false;
int32 last_get_difference_pts_ = 0;
int32 min_postponed_update_pts_ = 0;
int32 min_postponed_update_qts_ = 0;
void tear_down() override;
@ -184,12 +186,16 @@ class UpdatesManager : public Actor {
void set_qts_gap_timeout(double timeout);
void run_get_difference(bool is_recursive, const char *source);
void on_failed_get_difference();
void before_get_difference(bool is_initial);
void after_get_difference();
static bool have_update_pts_changed(const vector<tl_object_ptr<telegram_api::Update>> &updates);
static int32 get_update_pts(const telegram_api::Update *update);
static int32 get_update_qts(const telegram_api::Update *update);