Use multiset to store pending updates and improve PendingPtsUpdate comparison.

This commit is contained in:
levlam 2024-01-26 14:59:07 +03:00
parent aa6be7d9bf
commit 73dc2b9889
2 changed files with 72 additions and 60 deletions

View File

@ -92,7 +92,6 @@
#include "td/utils/StringBuilder.h"
#include "td/utils/Time.h"
#include <iterator>
#include <limits>
namespace td {
@ -334,7 +333,7 @@ void UpdatesManager::repair_pts_gap() {
return;
}
auto it = pending_pts_updates_.begin();
if (it->second.pts != pts + it->second.pts_count) {
if (it->pts != pts + it->pts_count) {
return;
}
if (last_fetched_pts_ == pts) {
@ -366,22 +365,22 @@ void UpdatesManager::fill_pts_gap(void *td) {
const telegram_api::Update *first_update = nullptr;
auto max_pts = 0;
if (!updates_manager->pending_pts_updates_.empty()) {
auto &min_update = updates_manager->pending_pts_updates_.begin()->second;
auto &min_update = *updates_manager->pending_pts_updates_.begin();
if (min_update.pts < min_pts) {
min_pts = min_update.pts;
min_pts_count = min_update.pts_count;
first_update = min_update.update.get();
}
max_pts = max(max_pts, updates_manager->pending_pts_updates_.rbegin()->first);
max_pts = max(max_pts, updates_manager->pending_pts_updates_.rbegin()->pts);
}
if (!updates_manager->postponed_pts_updates_.empty()) {
auto &min_update = updates_manager->postponed_pts_updates_.begin()->second;
auto &min_update = *updates_manager->postponed_pts_updates_.begin();
if (min_update.pts < min_pts) {
min_pts = min_update.pts;
min_pts_count = min_update.pts_count;
first_update = min_update.update.get();
}
max_pts = max(max_pts, updates_manager->postponed_pts_updates_.rbegin()->first);
max_pts = max(max_pts, updates_manager->postponed_pts_updates_.rbegin()->pts);
}
string source = PSTRING() << "PTS from " << updates_manager->get_pts() << " to " << min_pts << "(-" << min_pts_count
<< ")-" << max_pts << ' '
@ -400,8 +399,8 @@ void UpdatesManager::fill_seq_gap(void *td) {
auto min_seq = std::numeric_limits<int32>::max();
auto max_seq = 0;
if (!updates_manager->pending_seq_updates_.empty()) {
min_seq = updates_manager->pending_seq_updates_.begin()->first;
max_seq = updates_manager->pending_seq_updates_.rbegin()->second.seq_end;
min_seq = updates_manager->pending_seq_updates_.begin()->seq_begin;
max_seq = updates_manager->pending_seq_updates_.rbegin()->seq_end;
}
string source = PSTRING() << "seq from " << updates_manager->seq_ << " to " << min_seq << '-' << max_seq;
fill_gap(td, source.c_str());
@ -518,14 +517,18 @@ void UpdatesManager::before_get_difference(bool is_initial) {
// may be called many times before after_get_difference is called
send_closure(G()->state_manager(), &StateManager::on_synchronized, false);
vector<Promise<Unit>> promises;
if (can_postpone_updates()) {
postponed_pts_updates_.insert(std::make_move_iterator(pending_pts_updates_.begin()),
std::make_move_iterator(pending_pts_updates_.end()));
for (auto &update : pending_pts_updates_) {
postponed_pts_updates_.emplace(std::move(update.update), update.pts, update.pts_count, update.receive_time,
std::move(update.promise));
}
} else {
for (auto &update : pending_pts_updates_) {
update.second.promise.set_value(Unit());
promises.push_back(std::move(update.promise));
}
}
set_promises(promises);
drop_all_pending_pts_updates();
@ -1909,7 +1912,7 @@ void UpdatesManager::on_get_difference(tl_object_ptr<telegram_api::updates_Diffe
pending_seq_updates_.clear();
for (auto &pending_update : pending_seq_updates) {
pending_update.second.promise.set_value(Unit());
pending_update.promise.set_value(Unit());
}
}
break;
@ -2030,8 +2033,8 @@ void UpdatesManager::on_get_pts_update(int32 pts,
}
LOG(DEBUG) << "Receive update with PTS " << pts << ": " << to_string(difference_ptr);
if (get_pts() != pts - 1 || running_get_difference_ || !postponed_pts_updates_.empty() ||
pending_pts_updates_.empty() || pending_pts_updates_.begin()->first > pts + 1 ||
pending_pts_updates_.begin()->first != pts + pending_pts_updates_.begin()->second.pts_count) {
pending_pts_updates_.empty() || pending_pts_updates_.begin()->pts > pts + 1 ||
pending_pts_updates_.begin()->pts != pts + pending_pts_updates_.begin()->pts_count) {
return;
}
@ -2142,12 +2145,12 @@ void UpdatesManager::after_get_difference() {
size_t total_update_count = 0;
while (!postponed_updates_.empty()) {
auto it = postponed_updates_.begin();
auto updates = std::move(it->second.updates);
auto updates_seq_begin = it->second.seq_begin;
auto updates_seq_end = it->second.seq_end;
auto receive_time = it->second.receive_time;
auto promise = std::move(it->second.promise);
// ignore it->second.date, because it may be too old
auto updates = std::move(it->updates);
auto updates_seq_begin = it->seq_begin;
auto updates_seq_end = it->seq_end;
auto receive_time = it->receive_time;
auto promise = std::move(it->promise);
// ignore it->date, because it may be too old
postponed_updates_.erase(it);
auto update_count = updates.size();
on_pending_updates(std::move(updates), updates_seq_begin, updates_seq_end, 0, receive_time, std::move(promise),
@ -2178,7 +2181,7 @@ void UpdatesManager::after_get_difference() {
VLOG(get_difference) << "Begin to apply " << postponed_updates.size()
<< " postponed PTS updates with PTS = " << get_pts();
for (auto &postponed_update : postponed_updates) {
auto &update = postponed_update.second;
auto &update = postponed_update;
add_pending_pts_update(std::move(update.update), update.pts, update.pts_count, update.receive_time,
std::move(update.promise), AFTER_GET_DIFFERENCE_SOURCE);
CHECK(!running_get_difference_);
@ -2559,8 +2562,7 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
LOG(ERROR) << "Run get difference while applying updates from " << source;
}
if (can_postpone) {
postponed_updates_.emplace(
seq_begin, PendingSeqUpdates(seq_begin, seq_end, date, receive_time, std::move(updates), std::move(lock)));
postponed_updates_.emplace(seq_begin, seq_end, date, receive_time, std::move(updates), std::move(lock));
} else {
lock.set_value(Unit());
}
@ -2591,12 +2593,9 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
LOG(INFO) << "Gap in seq has found. Receive " << updates.size() << " updates [" << seq_begin << ", " << seq_end
<< "] from " << source << ", but seq = " << seq_;
LOG_IF(WARNING, pending_seq_updates_.count(seq_begin) > 0)
<< "Already have pending updates with seq = " << seq_begin << ", but receive it again from " << source;
if (can_postpone_updates()) {
pending_seq_updates_.emplace(
seq_begin, PendingSeqUpdates(seq_begin, seq_end, date, receive_time, std::move(updates), std::move(lock)));
pending_seq_updates_.emplace(seq_begin, seq_end, date, receive_time, std::move(updates), std::move(lock));
} else {
lock.set_value(Unit());
}
@ -2910,8 +2909,7 @@ void UpdatesManager::add_pending_pts_update(tl_object_ptr<telegram_api::Update>
return;
}
pending_pts_updates_.emplace(
new_pts, PendingPtsUpdate(std::move(update), new_pts, pts_count, receive_time, std::move(promise)));
pending_pts_updates_.emplace(std::move(update), new_pts, pts_count, receive_time, std::move(promise));
if (old_pts < accumulated_pts_ - accumulated_pts_count_) {
if (old_pts == new_pts - pts_count) {
@ -2935,8 +2933,7 @@ void UpdatesManager::postpone_pts_update(tl_object_ptr<telegram_api::Update> &&u
if (!can_postpone_updates() || (pts_count > 1 && td_->option_manager_->get_option_integer("session_count") <= 1)) {
return promise.set_value(Unit());
}
postponed_pts_updates_.emplace(pts,
PendingPtsUpdate(std::move(update), pts, pts_count, receive_time, std::move(promise)));
postponed_pts_updates_.emplace(std::move(update), pts, pts_count, receive_time, std::move(promise));
}
void UpdatesManager::process_seq_updates(int32 seq_end, int32 date,
@ -3087,8 +3084,8 @@ void UpdatesManager::process_qts_update(tl_object_ptr<telegram_api::Update> &&up
void UpdatesManager::process_all_pending_pts_updates() {
auto begin_time = Time::now();
for (auto &update : pending_pts_updates_) {
td_->messages_manager_->process_pts_update(std::move(update.second.update));
update.second.promise.set_value(Unit());
td_->messages_manager_->process_pts_update(std::move(update.update));
update.promise.set_value(Unit());
}
if (last_pts_gap_time_ != 0) {
@ -3126,13 +3123,13 @@ void UpdatesManager::process_postponed_pts_updates() {
int32 applied_update_count = 0;
auto update_it = postponed_pts_updates_.begin();
while (update_it != postponed_pts_updates_.end()) {
auto new_pts = update_it->second.pts;
auto pts_count = update_it->second.pts_count;
auto new_pts = update_it->pts;
auto pts_count = update_it->pts_count;
if (new_pts <= old_pts || (old_pts >= 1 && new_pts - (1 << 30) > old_pts)) {
skipped_update_count++;
td_->messages_manager_->skip_old_pending_pts_update(std::move(update_it->second.update), new_pts, old_pts,
pts_count, "process_postponed_pts_updates");
update_it->second.promise.set_value(Unit());
td_->messages_manager_->skip_old_pending_pts_update(std::move(update_it->update), new_pts, old_pts, pts_count,
"process_postponed_pts_updates");
update_it->promise.set_value(Unit());
update_it = postponed_pts_updates_.erase(update_it);
continue;
}
@ -3152,15 +3149,15 @@ void UpdatesManager::process_postponed_pts_updates() {
if (old_pts > new_pts - pts_count || last_update_it == postponed_pts_updates_.end() ||
i == GAP_TIMEOUT_UPDATE_COUNT) {
// the updates can't be applied
VLOG(get_difference) << "Can't apply " << i << " next postponed updates with PTS " << update_it->second.pts
<< '-' << new_pts << ", because their pts_count is " << pts_count
<< " instead of expected " << new_pts - old_pts;
VLOG(get_difference) << "Can't apply " << i << " next postponed updates with PTS " << update_it->pts << '-'
<< new_pts << ", because their pts_count is " << pts_count << " instead of expected "
<< new_pts - old_pts;
last_update_it = update_it;
break;
}
new_pts = last_update_it->second.pts;
pts_count += last_update_it->second.pts_count;
new_pts = last_update_it->pts;
pts_count += last_update_it->pts_count;
}
if (last_update_it == update_it) {
@ -3170,11 +3167,11 @@ void UpdatesManager::process_postponed_pts_updates() {
CHECK(old_pts == new_pts - pts_count);
while (update_it != last_update_it) {
if (update_it->second.pts_count > 0) {
if (update_it->pts_count > 0) {
applied_update_count++;
td_->messages_manager_->process_pts_update(std::move(update_it->second.update));
td_->messages_manager_->process_pts_update(std::move(update_it->update));
}
update_it->second.promise.set_value(Unit());
update_it->promise.set_value(Unit());
update_it = postponed_pts_updates_.erase(update_it);
}
old_pts = new_pts;
@ -3209,7 +3206,7 @@ void UpdatesManager::process_pending_pts_updates() {
int32 applied_update_count = 0;
while (!pending_pts_updates_.empty()) {
auto update_it = pending_pts_updates_.begin();
auto &update = update_it->second;
auto &update = *update_it;
if (get_pts() != update.pts - update.pts_count) {
// the updates will be applied or skipped later
break;
@ -3237,12 +3234,12 @@ void UpdatesManager::process_pending_pts_updates() {
if (!pending_pts_updates_.empty()) {
// if still have a gap, reset timeout
auto update_it = pending_pts_updates_.begin();
double receive_time = update_it->second.receive_time;
double receive_time = update_it->receive_time;
for (size_t i = 0; i < GAP_TIMEOUT_UPDATE_COUNT; i++) {
if (++update_it == pending_pts_updates_.end()) {
break;
}
receive_time = min(receive_time, update_it->second.receive_time);
receive_time = min(receive_time, update_it->receive_time);
}
set_pts_gap_timeout(max(receive_time + MAX_UNFILLED_GAP_TIME - Time::now(), 0.001));
}
@ -3266,7 +3263,7 @@ void UpdatesManager::process_pending_seq_updates() {
int32 applied_update_count = 0;
while (!pending_seq_updates_.empty() && !running_get_difference_) {
auto update_it = pending_seq_updates_.begin();
auto &update = update_it->second;
auto &update = *update_it;
auto seq_begin = update.seq_begin;
if (seq_begin - 1 > seq_ && seq_begin - (1 << 30) <= seq_) {
// the updates will be applied later
@ -3294,12 +3291,12 @@ void UpdatesManager::process_pending_seq_updates() {
if (!pending_seq_updates_.empty()) {
// if still have a gap, reset timeout
auto update_it = pending_seq_updates_.begin();
double receive_time = update_it->second.receive_time;
double receive_time = update_it->receive_time;
for (size_t i = 0; i < GAP_TIMEOUT_UPDATE_COUNT; i++) {
if (++update_it == pending_seq_updates_.end()) {
break;
}
receive_time = min(receive_time, update_it->second.receive_time);
receive_time = min(receive_time, update_it->receive_time);
}
set_seq_gap_timeout(receive_time + MAX_UNFILLED_GAP_TIME - Time::now());
}

View File

@ -29,6 +29,7 @@
#include "td/utils/TlStorerToString.h"
#include <map>
#include <set>
#include <utility>
namespace td {
@ -153,11 +154,11 @@ class UpdatesManager final : public Actor {
class PendingPtsUpdate {
public:
tl_object_ptr<telegram_api::Update> update;
mutable tl_object_ptr<telegram_api::Update> update;
int32 pts;
int32 pts_count;
double receive_time;
Promise<Unit> promise;
mutable Promise<Unit> promise;
PendingPtsUpdate(tl_object_ptr<telegram_api::Update> &&update, int32 pts, int32 pts_count, double receive_time,
Promise<Unit> &&promise)
@ -167,6 +168,13 @@ class UpdatesManager final : public Actor {
, receive_time(receive_time)
, promise(std::move(promise)) {
}
bool operator<(const PendingPtsUpdate &other) const {
if (pts != other.pts) {
return pts < other.pts;
}
return other.pts_count < pts_count;
}
};
class PendingSeqUpdates {
@ -175,8 +183,8 @@ class UpdatesManager final : public Actor {
int32 seq_end;
int32 date;
double receive_time;
vector<tl_object_ptr<telegram_api::Update>> updates;
Promise<Unit> promise;
mutable vector<tl_object_ptr<telegram_api::Update>> updates;
mutable Promise<Unit> promise;
PendingSeqUpdates(int32 seq_begin, int32 seq_end, int32 date, double receive_time,
vector<tl_object_ptr<telegram_api::Update>> &&updates, Promise<Unit> &&promise)
@ -187,6 +195,13 @@ class UpdatesManager final : public Actor {
, updates(std::move(updates))
, promise(std::move(promise)) {
}
bool operator<(const PendingSeqUpdates &other) const {
if (seq_begin != other.seq_begin) {
return seq_begin < other.seq_begin;
}
return other.seq_end < seq_end;
}
};
class PendingQtsUpdate {
@ -229,11 +244,11 @@ class UpdatesManager final : public Actor {
double last_pts_jump_warning_time_ = 0;
double last_pts_gap_time_ = 0;
std::multimap<int32, PendingPtsUpdate> pending_pts_updates_;
std::multimap<int32, PendingPtsUpdate> postponed_pts_updates_;
std::multiset<PendingPtsUpdate> pending_pts_updates_;
std::multiset<PendingPtsUpdate> postponed_pts_updates_;
std::multimap<int32, PendingSeqUpdates> postponed_updates_; // updates received during getDifference
std::multimap<int32, PendingSeqUpdates> pending_seq_updates_; // updates with too big seq
std::multiset<PendingSeqUpdates> postponed_updates_; // updates received during getDifference
std::multiset<PendingSeqUpdates> pending_seq_updates_; // updates with too big seq
std::map<int32, PendingQtsUpdate> pending_qts_updates_; // updates with too big QTS