Merge remote-tracking branch 'td/master'

This commit is contained in:
Andrea Cavalli 2021-08-19 16:15:03 +02:00
commit a9d2c286ee
2 changed files with 199 additions and 77 deletions

View File

@ -181,9 +181,18 @@ void UpdatesManager::fill_pts_gap(void *td) {
return;
}
auto td_ptr = static_cast<Td *>(td);
string source = PSTRING() << "pts from " << td_ptr->updates_manager_->get_pts() << " to "
<< td_ptr->updates_manager_->get_min_pending_pts();
auto updates_manager = static_cast<Td *>(td)->updates_manager_.get();
auto min_pts = std::numeric_limits<int32>::max();
auto max_pts = 0;
if (!updates_manager->pending_pts_updates_.empty()) {
min_pts = min(min_pts, updates_manager->pending_pts_updates_.begin()->first);
max_pts = max(max_pts, updates_manager->pending_pts_updates_.rbegin()->first);
}
if (!updates_manager->postponed_pts_updates_.empty()) {
min_pts = min(min_pts, updates_manager->postponed_pts_updates_.begin()->first);
max_pts = max(max_pts, updates_manager->postponed_pts_updates_.rbegin()->first);
}
string source = PSTRING() << "pts from " << updates_manager->get_pts() << " to " << min_pts << '-' << max_pts;
fill_gap(td, source.c_str());
}
@ -193,12 +202,14 @@ void UpdatesManager::fill_seq_gap(void *td) {
return;
}
auto td_ptr = static_cast<Td *>(td);
auto seq = std::numeric_limits<int32>::max();
if (!td_ptr->updates_manager_->pending_seq_updates_.empty()) {
seq = td_ptr->updates_manager_->pending_seq_updates_.begin()->first;
auto updates_manager = static_cast<Td *>(td)->updates_manager_.get();
auto min_seq = std::numeric_limits<int32>::max();
auto max_seq = 0;
if (!updates_manager->pending_seq_updates_.empty()) {
min_seq = updates_manager->pending_seq_updates_.begin()->first;
max_seq = updates_manager->pending_seq_updates_.rbegin()->second.seq_end;
}
string source = PSTRING() << "seq from " << td_ptr->updates_manager_->seq_ << " to " << seq;
string source = PSTRING() << "seq from " << updates_manager->seq_ << " to " << min_seq << '-' << max_seq;
fill_gap(td, source.c_str());
}
@ -208,12 +219,14 @@ void UpdatesManager::fill_qts_gap(void *td) {
return;
}
auto td_ptr = static_cast<Td *>(td);
auto qts = std::numeric_limits<int32>::max();
if (!td_ptr->updates_manager_->pending_qts_updates_.empty()) {
qts = td_ptr->updates_manager_->pending_qts_updates_.begin()->first;
auto updates_manager = static_cast<Td *>(td)->updates_manager_.get();
auto min_qts = std::numeric_limits<int32>::max();
auto max_qts = 0;
if (!updates_manager->pending_qts_updates_.empty()) {
min_qts = updates_manager->pending_qts_updates_.begin()->first;
max_qts = updates_manager->pending_qts_updates_.rbegin()->first;
}
string source = PSTRING() << "qts from " << td_ptr->updates_manager_->get_qts() << " to " << qts;
string source = PSTRING() << "qts from " << updates_manager->get_qts() << " to " << min_qts << '-' << max_qts;
fill_gap(td, source.c_str());
}
@ -295,7 +308,7 @@ void UpdatesManager::before_get_difference(bool is_initial) {
postponed_pts_updates_.insert(std::make_move_iterator(pending_pts_updates_.begin()),
std::make_move_iterator(pending_pts_updates_.end()));
drop_pending_pts_updates();
drop_all_pending_pts_updates();
send_closure_later(td_->notification_manager_actor_, &NotificationManager::before_get_difference);
}
@ -1353,19 +1366,25 @@ void UpdatesManager::on_get_difference(tl_object_ptr<telegram_api::updates_Diffe
}
CHECK(!is_pts_changed);
auto state = std::move(difference->intermediate_state_);
if (get_pts() != std::numeric_limits<int32>::max() && state->date_ == get_date() &&
(state->pts_ == get_pts() ||
(min_postponed_update_pts_ != 0 && state->pts_ - 500 >= min_postponed_update_pts_)) &&
(state->qts_ == get_qts() ||
(min_postponed_update_qts_ != 0 && state->qts_ - 500 >= min_postponed_update_qts_))) {
on_get_updates_state(std::move(state), "get difference final slice");
VLOG(get_difference) << "Trying to switch back from getDifference to update processing";
auto old_pts = get_pts();
auto old_date = get_date();
auto old_qts = get_qts();
on_get_updates_state(std::move(difference->intermediate_state_), "get difference slice");
process_postponed_pts_updates();
process_pending_qts_updates();
auto new_pts = get_pts();
auto new_date = get_date();
auto new_qts = get_qts();
if (old_pts != std::numeric_limits<int32>::max() && new_date == old_date &&
(new_pts == old_pts || (min_postponed_update_pts_ != 0 && new_pts >= min_postponed_update_pts_)) &&
(new_qts == old_qts || (min_postponed_update_qts_ != 0 && new_qts >= min_postponed_update_qts_))) {
VLOG(get_difference) << "Switch back from getDifference to update processing";
break;
}
on_get_updates_state(std::move(state), "get difference slice");
if (get_pts() != -1) { // just in case
if (new_pts != -1) { // just in case
run_get_difference(true, "on updates_differenceSlice");
}
break;
@ -1393,10 +1412,14 @@ void UpdatesManager::after_get_difference() {
retry_timeout_.cancel_timeout();
retry_time_ = 1;
// cancels qts_gap_timeout_ if needed, can apply some updates received during getDifference,
// but missed in getDifference
process_pending_qts_updates();
process_pending_seq_updates(); // cancels seq_gap_timeout_, may apply some updates received before getDifference,
// but not returned in getDifference
// cancels seq_gap_timeout_ if needed, can apply some updates received during getDifference,
// but missed in getDifference
process_pending_seq_updates();
if (running_get_difference_) {
return;
}
@ -1431,15 +1454,17 @@ void UpdatesManager::after_get_difference() {
auto postponed_updates = std::move(postponed_pts_updates_);
postponed_pts_updates_.clear();
LOG(INFO) << "Begin to apply " << postponed_updates.size() << " postponed pts updates";
VLOG(get_difference) << "Begin to apply " << postponed_updates.size()
<< " postponed pts updates with pts = " << get_pts();
for (auto &postponed_update : postponed_updates) {
auto &update = postponed_update.second;
add_pending_pts_update(std::move(update.update), update.pts, update.pts_count, update.receive_time,
std::move(update.promise), "after get difference");
CHECK(!running_get_difference_);
}
LOG(INFO) << "Finish to apply postponed pts updates, have " << postponed_pts_updates_.size()
<< " left postponed updates";
VLOG(get_difference) << "After applying postponed pts updates have pts = " << get_pts()
<< ", max_pts = " << accumulated_pts_ << " and " << pending_pts_updates_.size() << " + "
<< postponed_pts_updates_.size() << " pending pts updates";
}
td_->animations_manager_->after_get_difference();
@ -1656,11 +1681,11 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
set_seq_gap_timeout(0.001);
}
if (seq_end > seq_) {
LOG(ERROR) << "Strange updates from " << source << " coming with seq_begin = " << seq_begin
<< ", seq_end = " << seq_end << ", but seq = " << seq_;
LOG(ERROR) << "Receive updates with seq_begin = " << seq_begin << ", seq_end = " << seq_end
<< ", but seq = " << seq_ << " from " << source;
} else {
LOG(INFO) << "Old updates from " << source << " coming with seq_begin = " << seq_begin
<< ", seq_end = " << seq_end << ", but seq = " << seq_;
LOG(INFO) << "Receive old updates with seq_begin = " << seq_begin << ", seq_end = " << seq_end
<< ", but seq = " << seq_ << " from " << source;
}
return lock.set_value(Unit());
}
@ -1817,23 +1842,6 @@ void UpdatesManager::process_updates(vector<tl_object_ptr<telegram_api::Update>>
lock.set_value(Unit());
}
int32 UpdatesManager::get_min_pending_pts() const {
int32 result = std::numeric_limits<int32>::max();
if (!pending_pts_updates_.empty()) {
auto pts = pending_pts_updates_.begin()->first;
if (pts < result) {
result = pts;
}
}
if (!postponed_pts_updates_.empty()) {
auto pts = postponed_pts_updates_.begin()->first;
if (pts < result) {
result = pts;
}
}
return result;
}
void UpdatesManager::process_pts_update(tl_object_ptr<telegram_api::Update> &&update) {
CHECK(update != nullptr);
@ -1947,7 +1955,7 @@ void UpdatesManager::add_pending_pts_update(tl_object_ptr<telegram_api::Update>
td_->messages_manager_->process_pts_update(std::move(update));
set_pts(accumulated_pts_, "process pending updates fast path")
.set_value(Unit()); // TODO can't set until get messages really stored on persistent storage
.set_value(Unit()); // TODO can't set until data are really stored on persistent storage
accumulated_pts_count_ = 0;
accumulated_pts_ = -1;
}
@ -1959,12 +1967,17 @@ void UpdatesManager::add_pending_pts_update(tl_object_ptr<telegram_api::Update>
new_pts, PendingPtsUpdate(std::move(update), new_pts, pts_count, receive_time, std::move(promise)));
if (old_pts < accumulated_pts_ - accumulated_pts_count_) {
if (old_pts == new_pts - pts_count) {
// can't apply all updates, but can apply this and probably some other updates
process_pending_pts_updates();
} else {
set_pts_gap_timeout(receive_time + MAX_UNFILLED_GAP_TIME - Time::now());
}
return;
}
CHECK(old_pts == accumulated_pts_ - accumulated_pts_count_);
process_pending_pts_updates();
process_all_pending_pts_updates();
}
void UpdatesManager::postpone_pts_update(tl_object_ptr<telegram_api::Update> &&update, int32 pts, int32 pts_count,
@ -2047,64 +2060,167 @@ void UpdatesManager::process_qts_update(tl_object_ptr<telegram_api::Update> &&up
promise.set_value(Unit());
}
void UpdatesManager::process_pending_pts_updates() {
void UpdatesManager::process_all_pending_pts_updates() {
auto begin_time = Time::now();
for (auto &update : pending_pts_updates_) {
td_->messages_manager_->process_pts_update(std::move(update.second.update));
update.second.promise.set_value(Unit());
}
if (last_pts_gap_time_ != 0) {
auto begin_diff = begin_time - last_pts_gap_time_;
auto diff = Time::now() - last_pts_gap_time_;
last_pts_gap_time_ = 0;
if (diff > 0.1) {
VLOG(get_difference) << "Gap in pts from " << accumulated_pts_ - accumulated_pts_count_ << " to "
<< accumulated_pts_ << " has been filled in " << diff << " seconds";
<< accumulated_pts_ << " has been filled in " << begin_diff << '-' << diff << " seconds";
}
}
set_pts(accumulated_pts_, "postpone_pending_pts_update")
set_pts(accumulated_pts_, "process_all_pending_pts_updates")
.set_value(Unit()); // TODO can't set until updates are stored on persistent storage
drop_pending_pts_updates();
drop_all_pending_pts_updates();
}
void UpdatesManager::drop_pending_pts_updates() {
void UpdatesManager::drop_all_pending_pts_updates() {
accumulated_pts_count_ = 0;
accumulated_pts_ = -1;
pts_gap_timeout_.cancel_timeout();
pending_pts_updates_.clear();
}
void UpdatesManager::process_postponed_pts_updates() {
if (postponed_pts_updates_.empty()) {
return;
}
auto initial_pts = get_pts();
auto old_pts = initial_pts;
int32 skipped_update_count = 0;
int32 applied_update_count = 0;
while (!postponed_pts_updates_.empty()) {
auto update_it = postponed_pts_updates_.begin();
auto &update = update_it->second;
auto new_pts = update.pts;
auto pts_count = update.pts_count;
if (new_pts <= old_pts || (old_pts >= 1 && new_pts - 500000000 > old_pts)) {
skipped_update_count++;
td_->messages_manager_->skip_old_pending_pts_update(std::move(update.update), new_pts, old_pts, pts_count,
"process_postponed_pts_updates");
update.promise.set_value(Unit());
postponed_pts_updates_.erase(update_it);
continue;
}
if (old_pts != new_pts - pts_count) {
// the updates will be applied or skipped later
break;
}
if (pts_count > 0) {
applied_update_count++;
td_->messages_manager_->process_pts_update(std::move(update.update));
old_pts = new_pts;
}
update.promise.set_value(Unit());
postponed_pts_updates_.erase(update_it);
}
if (old_pts != initial_pts) {
set_pts(old_pts, "process_postponed_pts_updates")
.set_value(Unit()); // TODO can't set until data are really stored on persistent storage
}
CHECK(!running_get_difference_);
if (skipped_update_count + applied_update_count > 0) {
VLOG(get_difference) << "After skipping " << skipped_update_count << " and applying " << applied_update_count
<< " postponed updates pts has changed from " << initial_pts << " to " << old_pts;
}
}
void UpdatesManager::process_pending_pts_updates() {
if (pending_pts_updates_.empty()) {
return;
}
bool processed_pending_update = false;
while (!pending_pts_updates_.empty()) {
auto update_it = pending_pts_updates_.begin();
auto &update = update_it->second;
if (get_pts() != update.pts - update.pts_count) {
// the updates will be applied or skipped later
break;
}
processed_pending_update = true;
if (update.pts_count > 0) {
td_->messages_manager_->process_pts_update(std::move(update.update));
set_pts(update.pts, "process_pending_pts_updates")
.set_value(Unit()); // TODO can't set until data are really stored on persistent storage
if (accumulated_pts_ != -1) {
CHECK(update.pts <= accumulated_pts_);
CHECK(accumulated_pts_count_ >= update.pts_count);
accumulated_pts_count_ -= update.pts_count;
}
}
update.promise.set_value(Unit());
pending_pts_updates_.erase(update_it);
}
if (processed_pending_update) {
pts_gap_timeout_.cancel_timeout();
}
if (!pending_pts_updates_.empty()) {
// if still have a gap, reset timeout
auto update_it = pending_pts_updates_.begin();
double receive_time = update_it->second.receive_time;
for (size_t i = 0; i < GAP_TIMEOUT_UPDATE_COUNT; i++) {
if (++update_it == pending_pts_updates_.end()) {
break;
}
receive_time = min(receive_time, update_it->second.receive_time);
}
set_pts_gap_timeout(receive_time + MAX_UNFILLED_GAP_TIME - Time::now());
}
}
void UpdatesManager::process_pending_seq_updates() {
if (!pending_seq_updates_.empty()) {
LOG(DEBUG) << "Trying to process " << pending_seq_updates_.size() << " pending seq updates";
// must not return, because in case of seq overflow there are no pending seq updates
}
bool processed_pending_update = false;
while (!pending_seq_updates_.empty() && !running_get_difference_) {
auto update_it = pending_seq_updates_.begin();
auto seq_begin = update_it->second.seq_begin;
auto &update = update_it->second;
auto seq_begin = update.seq_begin;
if (seq_begin - 1 > seq_ && seq_begin - 500000000 <= seq_) {
// the updates will be applied later
break;
}
processed_pending_update = true;
auto seq_end = update.seq_end;
if (seq_begin - 1 == seq_) {
process_seq_updates(update_it->second.seq_end, update_it->second.date, std::move(update_it->second.updates),
std::move(update_it->second.promise));
process_seq_updates(seq_end, update.date, std::move(update.updates), std::move(update.promise));
} else {
// old update
CHECK(seq_begin != 0);
LOG_IF(ERROR, update_it->second.seq_end > seq_ && seq_begin - 1 < seq_)
<< "Strange updates coming with seq_begin = " << seq_begin << ", seq_end = " << update_it->second.seq_end
if (seq_begin <= seq_ && seq_ < seq_end) {
LOG(ERROR) << "Receive updates with seq_begin = " << seq_begin << ", seq_end = " << seq_end
<< ", but seq = " << seq_;
update_it->second.promise.set_value(Unit());
}
update.promise.set_value(Unit());
}
pending_seq_updates_.erase(update_it);
}
if (pending_seq_updates_.empty()) {
if (pending_seq_updates_.empty() || processed_pending_update) {
seq_gap_timeout_.cancel_timeout();
} else {
// if after getDifference still have a gap
}
if (!pending_seq_updates_.empty()) {
// if still have a gap, reset timeout
auto update_it = pending_seq_updates_.begin();
double receive_time = update_it->second.receive_time;
for (size_t i = 0; i < 10; i++) {
for (size_t i = 0; i < GAP_TIMEOUT_UPDATE_COUNT; i++) {
if (++update_it == pending_seq_updates_.end()) {
break;
}
@ -2120,6 +2236,7 @@ void UpdatesManager::process_pending_qts_updates() {
}
LOG(DEBUG) << "Process " << pending_qts_updates_.size() << " pending qts updates";
bool processed_pending_update = false;
while (!pending_qts_updates_.empty()) {
CHECK(!running_get_difference_);
auto update_it = pending_qts_updates_.begin();
@ -2134,6 +2251,7 @@ void UpdatesManager::process_pending_qts_updates() {
promise.set_value(Unit());
}
});
processed_pending_update = true;
if (qts == old_qts + 1) {
process_qts_update(std::move(update_it->second.update), qts, std::move(promise));
} else {
@ -2142,13 +2260,14 @@ void UpdatesManager::process_pending_qts_updates() {
pending_qts_updates_.erase(update_it);
}
if (pending_qts_updates_.empty()) {
if (processed_pending_update) {
qts_gap_timeout_.cancel_timeout();
} else {
// if after getDifference still have a gap
}
if (!pending_qts_updates_.empty()) {
// if still have a gap, reset timeout
auto update_it = pending_qts_updates_.begin();
double receive_time = update_it->second.receive_time;
for (size_t i = 0; i < 10; i++) {
for (size_t i = 0; i < GAP_TIMEOUT_UPDATE_COUNT; i++) {
if (++update_it == pending_qts_updates_.end()) {
break;
}

View File

@ -124,6 +124,7 @@ class UpdatesManager final : public Actor {
private:
static constexpr int32 FORCED_GET_DIFFERENCE_PTS_DIFF = 100000;
static constexpr int32 GAP_TIMEOUT_UPDATE_COUNT = 20;
static const double MAX_UNFILLED_GAP_TIME;
static constexpr bool DROP_PTS_UPDATES = false;
@ -271,14 +272,18 @@ class UpdatesManager final : public Actor {
void process_qts_update(tl_object_ptr<telegram_api::Update> &&update_ptr, int32 qts, Promise<Unit> &&promise);
void process_all_pending_pts_updates();
void drop_all_pending_pts_updates();
void process_postponed_pts_updates();
void process_pending_pts_updates();
void process_pending_seq_updates();
void process_pending_qts_updates();
void drop_pending_pts_updates();
static void fill_pts_gap(void *td);
static void fill_seq_gap(void *td);
@ -305,8 +310,6 @@ class UpdatesManager final : public Actor {
void after_get_difference();
int32 get_min_pending_pts() const;
static bool have_update_pts_changed(const vector<tl_object_ptr<telegram_api::Update>> &updates);
static bool check_pts_update_dialog_id(DialogId dialog_id);