Process pending pts updates as fast as possible.

This commit is contained in:
levlam 2021-08-16 12:19:30 +03:00
parent 0f5127602a
commit b9b9f56b24
2 changed files with 60 additions and 12 deletions

View File

@ -308,7 +308,7 @@ void UpdatesManager::before_get_difference(bool is_initial) {
postponed_pts_updates_.insert(std::make_move_iterator(pending_pts_updates_.begin()), postponed_pts_updates_.insert(std::make_move_iterator(pending_pts_updates_.begin()),
std::make_move_iterator(pending_pts_updates_.end())); std::make_move_iterator(pending_pts_updates_.end()));
drop_pending_pts_updates(); drop_all_pending_pts_updates();
send_closure_later(td_->notification_manager_actor_, &NotificationManager::before_get_difference); send_closure_later(td_->notification_manager_actor_, &NotificationManager::before_get_difference);
} }
@ -1952,7 +1952,7 @@ void UpdatesManager::add_pending_pts_update(tl_object_ptr<telegram_api::Update>
td_->messages_manager_->process_pts_update(std::move(update)); td_->messages_manager_->process_pts_update(std::move(update));
set_pts(accumulated_pts_, "process pending updates fast path") set_pts(accumulated_pts_, "process pending updates fast path")
.set_value(Unit()); // TODO can't set until get messages really stored on persistent storage .set_value(Unit()); // TODO can't set until get messages are really stored on persistent storage
accumulated_pts_count_ = 0; accumulated_pts_count_ = 0;
accumulated_pts_ = -1; accumulated_pts_ = -1;
} }
@ -1964,12 +1964,17 @@ void UpdatesManager::add_pending_pts_update(tl_object_ptr<telegram_api::Update>
new_pts, PendingPtsUpdate(std::move(update), new_pts, pts_count, receive_time, std::move(promise))); new_pts, PendingPtsUpdate(std::move(update), new_pts, pts_count, receive_time, std::move(promise)));
if (old_pts < accumulated_pts_ - accumulated_pts_count_) { if (old_pts < accumulated_pts_ - accumulated_pts_count_) {
set_pts_gap_timeout(receive_time + MAX_UNFILLED_GAP_TIME - Time::now()); if (old_pts == new_pts - pts_count) {
// can't apply all updates, but can apply this and probably some other updates
process_pending_pts_updates();
} else {
set_pts_gap_timeout(receive_time + MAX_UNFILLED_GAP_TIME - Time::now());
}
return; return;
} }
CHECK(old_pts == accumulated_pts_ - accumulated_pts_count_); CHECK(old_pts == accumulated_pts_ - accumulated_pts_count_);
process_pending_pts_updates(); process_all_pending_pts_updates();
} }
void UpdatesManager::postpone_pts_update(tl_object_ptr<telegram_api::Update> &&update, int32 pts, int32 pts_count, void UpdatesManager::postpone_pts_update(tl_object_ptr<telegram_api::Update> &&update, int32 pts, int32 pts_count,
@ -2052,7 +2057,7 @@ void UpdatesManager::process_qts_update(tl_object_ptr<telegram_api::Update> &&up
promise.set_value(Unit()); promise.set_value(Unit());
} }
void UpdatesManager::process_pending_pts_updates() { void UpdatesManager::process_all_pending_pts_updates() {
auto begin_time = Time::now(); auto begin_time = Time::now();
for (auto &update : pending_pts_updates_) { for (auto &update : pending_pts_updates_) {
td_->messages_manager_->process_pts_update(std::move(update.second.update)); td_->messages_manager_->process_pts_update(std::move(update.second.update));
@ -2069,18 +2074,58 @@ void UpdatesManager::process_pending_pts_updates() {
} }
} }
set_pts(accumulated_pts_, "process_pending_pts_updates") set_pts(accumulated_pts_, "process_all_pending_pts_updates")
.set_value(Unit()); // TODO can't set until updates are stored on persistent storage .set_value(Unit()); // TODO can't set until updates are stored on persistent storage
drop_pending_pts_updates(); drop_all_pending_pts_updates();
} }
void UpdatesManager::drop_pending_pts_updates() { void UpdatesManager::drop_all_pending_pts_updates() {
accumulated_pts_count_ = 0; accumulated_pts_count_ = 0;
accumulated_pts_ = -1; accumulated_pts_ = -1;
pts_gap_timeout_.cancel_timeout(); pts_gap_timeout_.cancel_timeout();
pending_pts_updates_.clear(); pending_pts_updates_.clear();
} }
void UpdatesManager::process_pending_pts_updates() {
if (pending_pts_updates_.empty()) {
return;
}
bool processed_pending_update = false;
while (!pending_pts_updates_.empty()) {
auto update_it = pending_pts_updates_.begin();
auto &update = update_it->second;
if (get_pts() != update.pts - update.pts_count) {
// the updates will be applied or skipped later
break;
}
processed_pending_update = true;
if (update.pts_count > 0) {
td_->messages_manager_->process_pts_update(std::move(update.update));
set_pts(update.pts, "process_pending_pts_updates")
.set_value(Unit()); // TODO can't set until get messages are really stored on persistent storage
}
update.promise.set_value(Unit());
pending_pts_updates_.erase(update_it);
}
if (processed_pending_update) {
pts_gap_timeout_.cancel_timeout();
}
if (!pending_pts_updates_.empty()) {
// if still have a gap, reset timeout
auto update_it = pending_pts_updates_.begin();
double receive_time = update_it->second.receive_time;
for (size_t i = 0; i < GAP_TIMEOUT_UPDATE_COUNT; i++) {
if (++update_it == pending_pts_updates_.end()) {
break;
}
receive_time = min(receive_time, update_it->second.receive_time);
}
set_pts_gap_timeout(receive_time + MAX_UNFILLED_GAP_TIME - Time::now());
}
}
void UpdatesManager::process_pending_seq_updates() { void UpdatesManager::process_pending_seq_updates() {
if (!pending_seq_updates_.empty()) { if (!pending_seq_updates_.empty()) {
LOG(DEBUG) << "Trying to process " << pending_seq_updates_.size() << " pending seq updates"; LOG(DEBUG) << "Trying to process " << pending_seq_updates_.size() << " pending seq updates";
@ -2119,7 +2164,7 @@ void UpdatesManager::process_pending_seq_updates() {
// if still have a gap, reset timeout // if still have a gap, reset timeout
auto update_it = pending_seq_updates_.begin(); auto update_it = pending_seq_updates_.begin();
double receive_time = update_it->second.receive_time; double receive_time = update_it->second.receive_time;
for (size_t i = 0; i < 10; i++) { for (size_t i = 0; i < GAP_TIMEOUT_UPDATE_COUNT; i++) {
if (++update_it == pending_seq_updates_.end()) { if (++update_it == pending_seq_updates_.end()) {
break; break;
} }
@ -2166,7 +2211,7 @@ void UpdatesManager::process_pending_qts_updates() {
// if still have a gap, reset timeout // if still have a gap, reset timeout
auto update_it = pending_qts_updates_.begin(); auto update_it = pending_qts_updates_.begin();
double receive_time = update_it->second.receive_time; double receive_time = update_it->second.receive_time;
for (size_t i = 0; i < 10; i++) { for (size_t i = 0; i < GAP_TIMEOUT_UPDATE_COUNT; i++) {
if (++update_it == pending_qts_updates_.end()) { if (++update_it == pending_qts_updates_.end()) {
break; break;
} }

View File

@ -124,6 +124,7 @@ class UpdatesManager final : public Actor {
private: private:
static constexpr int32 FORCED_GET_DIFFERENCE_PTS_DIFF = 100000; static constexpr int32 FORCED_GET_DIFFERENCE_PTS_DIFF = 100000;
static constexpr int32 GAP_TIMEOUT_UPDATE_COUNT = 20;
static const double MAX_UNFILLED_GAP_TIME; static const double MAX_UNFILLED_GAP_TIME;
static constexpr bool DROP_PTS_UPDATES = false; static constexpr bool DROP_PTS_UPDATES = false;
@ -271,14 +272,16 @@ class UpdatesManager final : public Actor {
void process_qts_update(tl_object_ptr<telegram_api::Update> &&update_ptr, int32 qts, Promise<Unit> &&promise); void process_qts_update(tl_object_ptr<telegram_api::Update> &&update_ptr, int32 qts, Promise<Unit> &&promise);
void process_all_pending_pts_updates();
void drop_all_pending_pts_updates();
void process_pending_pts_updates(); void process_pending_pts_updates();
void process_pending_seq_updates(); void process_pending_seq_updates();
void process_pending_qts_updates(); void process_pending_qts_updates();
void drop_pending_pts_updates();
static void fill_pts_gap(void *td); static void fill_pts_gap(void *td);
static void fill_seq_gap(void *td); static void fill_seq_gap(void *td);