Use update receive time to calculate proper gap time.

This commit is contained in:
levlam 2021-08-15 12:46:38 +03:00
parent 9b801645f0
commit 9a5872fe27
3 changed files with 94 additions and 52 deletions

View File

@ -660,7 +660,7 @@ class UnpinAllMessagesQuery final : public Td::ResultHandler {
std::move(promise), "unpin all messages");
} else {
td->updates_manager_->add_pending_pts_update(make_tl_object<dummyUpdate>(), affected_history->pts_,
affected_history->pts_count_, std::move(promise),
affected_history->pts_count_, Time::now(), std::move(promise),
"unpin all messages");
}
} else if (affected_history->offset_ <= 0) {
@ -1812,7 +1812,7 @@ class ReadMessagesContentsQuery final : public Td::ResultHandler {
if (affected_messages->pts_count_ > 0) {
td->updates_manager_->add_pending_pts_update(make_tl_object<dummyUpdate>(), affected_messages->pts_,
affected_messages->pts_count_, Promise<Unit>(),
affected_messages->pts_count_, Time::now(), Promise<Unit>(),
"read messages content query");
}
@ -2034,7 +2034,7 @@ class ReadHistoryQuery final : public Td::ResultHandler {
if (affected_messages->pts_count_ > 0) {
td->updates_manager_->add_pending_pts_update(make_tl_object<dummyUpdate>(), affected_messages->pts_,
affected_messages->pts_count_, Promise<Unit>(),
affected_messages->pts_count_, Time::now(), Promise<Unit>(),
"read history query");
}
@ -2504,7 +2504,7 @@ class DeleteHistoryQuery final : public Td::ResultHandler {
if (affected_history->pts_count_ > 0) {
td->updates_manager_->add_pending_pts_update(make_tl_object<dummyUpdate>(), affected_history->pts_,
affected_history->pts_count_, Promise<Unit>(),
affected_history->pts_count_, Time::now(), Promise<Unit>(),
"delete history query");
}
@ -2602,7 +2602,7 @@ class DeletePhoneCallHistoryQuery final : public Td::ResultHandler {
auto pts_count = affected_messages->pts_count_;
auto update =
make_tl_object<telegram_api::updateDeleteMessages>(std::move(affected_messages->messages_), pts, pts_count);
td->updates_manager_->add_pending_pts_update(std::move(update), pts, pts_count, std::move(promise),
td->updates_manager_->add_pending_pts_update(std::move(update), pts, pts_count, Time::now(), std::move(promise),
"delete phone call history query");
} else if (affected_messages->offset_ <= 0) {
promise_.set_value(Unit());
@ -2759,7 +2759,7 @@ class ReadMentionsQuery final : public Td::ResultHandler {
td->updates_manager_->get_difference("Wrong messages_readMentions result");
} else {
td->updates_manager_->add_pending_pts_update(make_tl_object<dummyUpdate>(), affected_history->pts_,
affected_history->pts_count_, Promise<Unit>(),
affected_history->pts_count_, Time::now(), Promise<Unit>(),
"read all mentions query");
}
}
@ -2899,7 +2899,7 @@ class SendMessageActor final : public NetActorOnce {
}
td->updates_manager_->add_pending_pts_update(std::move(update), sent_message->pts_, sent_message->pts_count_,
Promise<Unit>(), "send message actor");
Time::now(), Promise<Unit>(), "send message actor");
}
void on_error(uint64 id, Status status) final {
@ -3914,7 +3914,7 @@ class DeleteMessagesQuery final : public Td::ResultHandler {
if (affected_messages->pts_count_ > 0) {
td->updates_manager_->add_pending_pts_update(make_tl_object<dummyUpdate>(), affected_messages->pts_,
affected_messages->pts_count_, Promise<Unit>(),
affected_messages->pts_count_, Time::now(), Promise<Unit>(),
"delete messages query");
}
if (--query_count_ == 0) {

View File

@ -854,7 +854,7 @@ void UpdatesManager::on_get_updates(tl_object_ptr<telegram_api::Updates> &&updat
auto updates = move_tl_object_as<telegram_api::updatesCombined>(updates_ptr);
td_->contacts_manager_->on_get_users(std::move(updates->users_), "updatesCombined");
td_->contacts_manager_->on_get_chats(std::move(updates->chats_), "updatesCombined");
on_pending_updates(std::move(updates->updates_), updates->seq_start_, updates->seq_, updates->date_,
on_pending_updates(std::move(updates->updates_), updates->seq_start_, updates->seq_, updates->date_, Time::now(),
std::move(promise), "telegram_api::updatesCombined");
break;
}
@ -862,8 +862,8 @@ void UpdatesManager::on_get_updates(tl_object_ptr<telegram_api::Updates> &&updat
auto updates = move_tl_object_as<telegram_api::updates>(updates_ptr);
td_->contacts_manager_->on_get_users(std::move(updates->users_), "updates");
td_->contacts_manager_->on_get_chats(std::move(updates->chats_), "updates");
on_pending_updates(std::move(updates->updates_), updates->seq_, updates->seq_, updates->date_, std::move(promise),
"telegram_api::updates");
on_pending_updates(std::move(updates->updates_), updates->seq_, updates->seq_, updates->date_, Time::now(),
std::move(promise), "telegram_api::updates");
break;
}
case telegram_api::updateShortSentMessage::ID:
@ -1409,11 +1409,12 @@ void UpdatesManager::after_get_difference() {
auto updates = std::move(it->second.updates);
auto updates_seq_begin = it->second.seq_begin;
auto updates_seq_end = it->second.seq_end;
auto receive_time = it->second.receive_time;
auto promise = std::move(it->second.promise);
// ignore it->second.date, because it may be too old
postponed_updates_.erase(it);
auto update_count = updates.size();
on_pending_updates(std::move(updates), updates_seq_begin, updates_seq_end, 0, std::move(promise),
on_pending_updates(std::move(updates), updates_seq_begin, updates_seq_end, 0, receive_time, std::move(promise),
"postponed updates");
if (running_get_difference_) {
VLOG(get_difference) << "Finish to apply postponed updates with " << postponed_updates_.size()
@ -1433,8 +1434,8 @@ void UpdatesManager::after_get_difference() {
LOG(INFO) << "Begin to apply " << postponed_updates.size() << " postponed pts updates";
for (auto &postponed_update : postponed_updates) {
auto &update = postponed_update.second;
add_pending_pts_update(std::move(update.update), update.pts, update.pts_count, std::move(update.promise),
"after get difference");
add_pending_pts_update(std::move(update.update), update.pts, update.pts_count, update.receive_time,
std::move(update.promise), "after get difference");
CHECK(!running_get_difference_);
}
LOG(INFO) << "Finish to apply postponed pts updates, have " << postponed_pts_updates_.size()
@ -1451,7 +1452,8 @@ void UpdatesManager::after_get_difference() {
}
void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Update>> &&updates, int32 seq_begin,
int32 seq_end, int32 date, Promise<Unit> &&promise, const char *source) {
int32 seq_end, int32 date, double receive_time, Promise<Unit> &&promise,
const char *source) {
if (get_pts() == -1) {
init_state();
}
@ -1485,8 +1487,8 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
min_postponed_update_qts_ = qts;
}
}
postponed_updates_.emplace(seq_begin,
PendingSeqUpdates(seq_begin, seq_end, date, std::move(updates), std::move(promise)));
postponed_updates_.emplace(
seq_begin, PendingSeqUpdates(seq_begin, seq_end, date, receive_time, std::move(updates), std::move(promise)));
return;
}
@ -1631,8 +1633,8 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
if (running_get_difference_) {
LOG(ERROR) << "Postpone " << updates.size() << " updates [" << seq_begin << ", " << seq_end
<< "] with date = " << date << " from " << source;
postponed_updates_.emplace(seq_begin,
PendingSeqUpdates(seq_begin, seq_end, date, std::move(updates), mpas.get_promise()));
postponed_updates_.emplace(
seq_begin, PendingSeqUpdates(seq_begin, seq_end, date, receive_time, std::move(updates), mpas.get_promise()));
return lock.set_value(Unit());
}
@ -1668,9 +1670,9 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
LOG_IF(WARNING, pending_seq_updates_.find(seq_begin) != pending_seq_updates_.end())
<< "Already have pending updates with seq = " << seq_begin << ", but receive it again from " << source;
pending_seq_updates_.emplace(seq_begin,
PendingSeqUpdates(seq_begin, seq_end, date, std::move(updates), mpas.get_promise()));
set_seq_gap_timeout(MAX_UNFILLED_GAP_TIME);
pending_seq_updates_.emplace(
seq_begin, PendingSeqUpdates(seq_begin, seq_end, date, receive_time, std::move(updates), mpas.get_promise()));
set_seq_gap_timeout(receive_time + MAX_UNFILLED_GAP_TIME - Time::now());
lock.set_value(Unit());
}
@ -1711,6 +1713,8 @@ void UpdatesManager::add_pending_qts_update(tl_object_ptr<telegram_api::Update>
auto &pending_update = pending_qts_updates_[qts];
if (pending_update.update != nullptr) {
LOG(WARNING) << "Receive duplicate update with qts = " << qts;
} else {
pending_update.receive_time = Time::now();
}
pending_update.update = std::move(update);
pending_update.promises.push_back(std::move(promise));
@ -1849,7 +1853,8 @@ void UpdatesManager::process_pts_update(tl_object_ptr<telegram_api::Update> &&up
}
void UpdatesManager::add_pending_pts_update(tl_object_ptr<telegram_api::Update> &&update, int32 new_pts,
int32 pts_count, Promise<Unit> &&promise, const char *source) {
int32 pts_count, double receive_time, Promise<Unit> &&promise,
const char *source) {
// do not try to run getDifference from this function
CHECK(update != nullptr);
CHECK(source != nullptr);
@ -1905,7 +1910,7 @@ void UpdatesManager::add_pending_pts_update(tl_object_ptr<telegram_api::Update>
if (running_get_difference_) {
CHECK(update->get_id() == dummyUpdate::ID || update->get_id() == updateSentMessage::ID);
}
postpone_pts_update(std::move(update), new_pts, pts_count, std::move(promise));
postpone_pts_update(std::move(update), new_pts, pts_count, receive_time, std::move(promise));
return;
}
@ -1913,7 +1918,7 @@ void UpdatesManager::add_pending_pts_update(tl_object_ptr<telegram_api::Update>
LOG(WARNING) << "Have old_pts (= " << old_pts << ") + pts_count (= " << pts_count << ") > new_pts (= " << new_pts
<< "). Logged in " << G()->shared_config().get_option_integer("authorization_date") << ". Update from "
<< source << " = " << oneline(to_string(update));
postpone_pts_update(std::move(update), new_pts, pts_count, std::move(promise));
postpone_pts_update(std::move(update), new_pts, pts_count, receive_time, std::move(promise));
set_pts_gap_timeout(0.001);
return;
}
@ -1929,7 +1934,7 @@ void UpdatesManager::add_pending_pts_update(tl_object_ptr<telegram_api::Update>
<< ", pts_count = " << pts_count << ". Logged in "
<< G()->shared_config().get_option_integer("authorization_date") << ". Update from " << source << " = "
<< oneline(to_string(update));
postpone_pts_update(std::move(update), new_pts, pts_count, std::move(promise));
postpone_pts_update(std::move(update), new_pts, pts_count, receive_time, std::move(promise));
set_pts_gap_timeout(0.001);
return;
}
@ -1950,10 +1955,11 @@ void UpdatesManager::add_pending_pts_update(tl_object_ptr<telegram_api::Update>
return;
}
pending_pts_updates_.emplace(new_pts, PendingPtsUpdate(std::move(update), new_pts, pts_count, std::move(promise)));
pending_pts_updates_.emplace(
new_pts, PendingPtsUpdate(std::move(update), new_pts, pts_count, receive_time, std::move(promise)));
if (old_pts < accumulated_pts_ - accumulated_pts_count_) {
set_pts_gap_timeout(MAX_UNFILLED_GAP_TIME);
set_pts_gap_timeout(receive_time + MAX_UNFILLED_GAP_TIME - Time::now());
last_pts_gap_time_ = Time::now();
return;
}
@ -1963,8 +1969,9 @@ void UpdatesManager::add_pending_pts_update(tl_object_ptr<telegram_api::Update>
}
void UpdatesManager::postpone_pts_update(tl_object_ptr<telegram_api::Update> &&update, int32 pts, int32 pts_count,
Promise<Unit> &&promise) {
postponed_pts_updates_.emplace(pts, PendingPtsUpdate(std::move(update), pts, pts_count, std::move(promise)));
double receive_time, Promise<Unit> &&promise) {
postponed_pts_updates_.emplace(pts,
PendingPtsUpdate(std::move(update), pts, pts_count, receive_time, std::move(promise)));
}
void UpdatesManager::process_seq_updates(int32 seq_end, int32 date,
@ -2095,7 +2102,15 @@ void UpdatesManager::process_pending_seq_updates() {
seq_gap_timeout_.cancel_timeout();
} else {
// if after getDifference still have a gap
set_seq_gap_timeout(MAX_UNFILLED_GAP_TIME);
auto update_it = pending_seq_updates_.begin();
double receive_time = update_it->second.receive_time;
for (size_t i = 0; i < 10; i++) {
if (++update_it == pending_seq_updates_.end()) {
break;
}
receive_time = min(receive_time, update_it->second.receive_time);
}
set_seq_gap_timeout(receive_time + MAX_UNFILLED_GAP_TIME - Time::now());
}
}
@ -2127,7 +2142,15 @@ void UpdatesManager::process_pending_qts_updates() {
qts_gap_timeout_.cancel_timeout();
} else {
// if after getDifference still have a gap
set_qts_gap_timeout(MAX_UNFILLED_GAP_TIME);
auto update_it = pending_qts_updates_.begin();
double receive_time = update_it->second.receive_time;
for (size_t i = 0; i < 10; i++) {
if (++update_it == pending_qts_updates_.end()) {
break;
}
receive_time = min(receive_time, update_it->second.receive_time);
}
set_qts_gap_timeout(receive_time + MAX_UNFILLED_GAP_TIME - Time::now());
}
}
@ -2159,13 +2182,13 @@ void UpdatesManager::on_pending_update(tl_object_ptr<telegram_api::Update> updat
const char *source) {
vector<tl_object_ptr<telegram_api::Update>> updates;
updates.push_back(std::move(update));
on_pending_updates(std::move(updates), seq, seq, 0, std::move(promise), source);
on_pending_updates(std::move(updates), seq, seq, 0, Time::now(), std::move(promise), source);
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateNewMessage> update, Promise<Unit> &&promise) {
int new_pts = update->pts_;
int pts_count = update->pts_count_;
add_pending_pts_update(std::move(update), new_pts, pts_count, std::move(promise), "updateNewMessage");
add_pending_pts_update(std::move(update), new_pts, pts_count, Time::now(), std::move(promise), "updateNewMessage");
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateNewChannelMessage> update, Promise<Unit> &&promise) {
@ -2184,36 +2207,41 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateReadMessagesCon
Promise<Unit> &&promise) {
int new_pts = update->pts_;
int pts_count = update->pts_count_;
add_pending_pts_update(std::move(update), new_pts, pts_count, std::move(promise), "updateReadMessagesContents");
add_pending_pts_update(std::move(update), new_pts, pts_count, Time::now(), std::move(promise),
"updateReadMessagesContents");
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateEditMessage> update, Promise<Unit> &&promise) {
int new_pts = update->pts_;
int pts_count = update->pts_count_;
add_pending_pts_update(std::move(update), new_pts, pts_count, std::move(promise), "updateEditMessage");
add_pending_pts_update(std::move(update), new_pts, pts_count, Time::now(), std::move(promise), "updateEditMessage");
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateDeleteMessages> update, Promise<Unit> &&promise) {
int new_pts = update->pts_;
int pts_count = update->pts_count_;
if (update->messages_.empty()) {
add_pending_pts_update(make_tl_object<dummyUpdate>(), new_pts, pts_count, Promise<Unit>(), "updateDeleteMessages");
add_pending_pts_update(make_tl_object<dummyUpdate>(), new_pts, pts_count, Time::now(), Promise<Unit>(),
"updateDeleteMessages");
promise.set_value(Unit());
} else {
add_pending_pts_update(std::move(update), new_pts, pts_count, std::move(promise), "updateDeleteMessages");
add_pending_pts_update(std::move(update), new_pts, pts_count, Time::now(), std::move(promise),
"updateDeleteMessages");
}
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateReadHistoryInbox> update, Promise<Unit> &&promise) {
int new_pts = update->pts_;
int pts_count = update->pts_count_;
add_pending_pts_update(std::move(update), new_pts, pts_count, std::move(promise), "updateReadHistoryInbox");
add_pending_pts_update(std::move(update), new_pts, pts_count, Time::now(), std::move(promise),
"updateReadHistoryInbox");
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateReadHistoryOutbox> update, Promise<Unit> &&promise) {
int new_pts = update->pts_;
int pts_count = update->pts_count_;
add_pending_pts_update(std::move(update), new_pts, pts_count, std::move(promise), "updateReadHistoryOutbox");
add_pending_pts_update(std::move(update), new_pts, pts_count, Time::now(), std::move(promise),
"updateReadHistoryOutbox");
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateServiceNotification> update, Promise<Unit> &&promise) {
@ -2325,7 +2353,8 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateReadChannelDisc
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updatePinnedMessages> update, Promise<Unit> &&promise) {
int new_pts = update->pts_;
int pts_count = update->pts_count_;
add_pending_pts_update(std::move(update), new_pts, pts_count, std::move(promise), "updatePinnedMessages");
add_pending_pts_update(std::move(update), new_pts, pts_count, Time::now(), std::move(promise),
"updatePinnedMessages");
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updatePinnedChannelMessages> update,
@ -2388,7 +2417,7 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updatePeerLocated> up
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateWebPage> update, Promise<Unit> &&promise) {
td_->web_pages_manager_->on_get_web_page(std::move(update->webpage_), DialogId());
add_pending_pts_update(make_tl_object<dummyUpdate>(), update->pts_, update->pts_count_, Promise<Unit>(),
add_pending_pts_update(make_tl_object<dummyUpdate>(), update->pts_, update->pts_count_, Time::now(), Promise<Unit>(),
"updateWebPage");
promise.set_value(Unit());
}
@ -2409,8 +2438,8 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateFolderPeers> up
}
if (update->pts_ > 0) {
add_pending_pts_update(make_tl_object<dummyUpdate>(), update->pts_, update->pts_count_, Promise<Unit>(),
"updateFolderPeers");
add_pending_pts_update(make_tl_object<dummyUpdate>(), update->pts_, update->pts_count_, Time::now(),
Promise<Unit>(), "updateFolderPeers");
}
promise.set_value(Unit());
}

View File

@ -95,7 +95,7 @@ class UpdatesManager final : public Actor {
void on_get_updates(tl_object_ptr<telegram_api::Updates> &&updates_ptr, Promise<Unit> &&promise);
void add_pending_pts_update(tl_object_ptr<telegram_api::Update> &&update, int32 new_pts, int32 pts_count,
Promise<Unit> &&promise, const char *source);
double receive_time, Promise<Unit> &&promise, const char *source);
static std::unordered_set<int64> get_sent_messages_random_ids(const telegram_api::Updates *updates_ptr);
@ -134,10 +134,16 @@ class UpdatesManager final : public Actor {
tl_object_ptr<telegram_api::Update> update;
int32 pts;
int32 pts_count;
double receive_time;
Promise<Unit> promise;
PendingPtsUpdate(tl_object_ptr<telegram_api::Update> &&update, int32 pts, int32 pts_count, Promise<Unit> &&promise)
: update(std::move(update)), pts(pts), pts_count(pts_count), promise(std::move(promise)) {
PendingPtsUpdate(tl_object_ptr<telegram_api::Update> &&update, int32 pts, int32 pts_count, double receive_time,
Promise<Unit> &&promise)
: update(std::move(update))
, pts(pts)
, pts_count(pts_count)
, receive_time(receive_time)
, promise(std::move(promise)) {
}
};
@ -146,17 +152,24 @@ class UpdatesManager final : public Actor {
int32 seq_begin;
int32 seq_end;
int32 date;
double receive_time;
vector<tl_object_ptr<telegram_api::Update>> updates;
Promise<Unit> promise;
PendingSeqUpdates(int32 seq_begin, int32 seq_end, int32 date, vector<tl_object_ptr<telegram_api::Update>> &&updates,
Promise<Unit> &&promise)
: seq_begin(seq_begin), seq_end(seq_end), date(date), updates(std::move(updates)), promise(std::move(promise)) {
PendingSeqUpdates(int32 seq_begin, int32 seq_end, int32 date, double receive_time,
vector<tl_object_ptr<telegram_api::Update>> &&updates, Promise<Unit> &&promise)
: seq_begin(seq_begin)
, seq_end(seq_end)
, date(date)
, receive_time(receive_time)
, updates(std::move(updates))
, promise(std::move(promise)) {
}
};
class PendingQtsUpdate {
public:
double receive_time;
tl_object_ptr<telegram_api::Update> update;
vector<Promise<Unit>> promises;
};
@ -243,13 +256,13 @@ class UpdatesManager final : public Actor {
void add_pending_qts_update(tl_object_ptr<telegram_api::Update> &&update, int32 qts, Promise<Unit> &&promise);
void on_pending_updates(vector<tl_object_ptr<telegram_api::Update>> &&updates, int32 seq_begin, int32 seq_end,
int32 date, Promise<Unit> &&promise, const char *source);
int32 date, double receive_time, Promise<Unit> &&promise, const char *source);
void process_updates(vector<tl_object_ptr<telegram_api::Update>> &&updates, bool force_apply,
Promise<Unit> &&promise);
void postpone_pts_update(tl_object_ptr<telegram_api::Update> &&update, int32 pts, int32 pts_count,
Promise<Unit> &&promise);
double receive_time, Promise<Unit> &&promise);
void process_pts_update(tl_object_ptr<telegram_api::Update> &&update);