Support gaps in qts updates.

GitOrigin-RevId: afcae4aa4ac456f5b8d8b2e46b92126a606bdca9
This commit is contained in:
levlam 2020-08-03 16:57:30 +03:00
parent 3be95dd6e8
commit e9d3b4881e
2 changed files with 111 additions and 37 deletions

View File

@ -182,6 +182,10 @@ void UpdatesManager::fill_seq_gap(void *td) {
fill_gap(td, "seq");
}
void UpdatesManager::fill_qts_gap(void *td) {
fill_gap(td, "qts");
}
void UpdatesManager::fill_get_difference_gap(void *td) {
fill_gap(td, "getDifference");
}
@ -298,22 +302,6 @@ Promise<> UpdatesManager::set_pts(int32 pts, const char *source) {
return result;
}
Promise<> UpdatesManager::set_qts(int32 qts) {
Promise<> result;
if (qts > get_qts() || (0 < qts && qts < get_qts() - 399999)) { // qts can only go up or drop cardinally
if (qts < get_qts() - 399999) {
LOG(WARNING) << "Qts decreases from " << get_qts() << " to " << qts;
} else {
LOG(INFO) << "Update qts from " << get_qts() << " to " << qts;
}
result = add_qts(qts);
} else if (qts < get_qts()) {
LOG(ERROR) << "Receive wrong qts = " << qts << " less than current qts = " << get_qts();
}
return result;
}
void UpdatesManager::set_date(int32 date, bool from_update, string date_source) {
if (date > date_) {
LOG(INFO) << "Update date to " << date;
@ -811,7 +799,7 @@ void UpdatesManager::on_get_updates_state(tl_object_ptr<telegram_api::updates_st
string full_source = "on_get_updates_state " + oneline(to_string(state)) + " from " + source;
set_pts(state->pts_, full_source.c_str()).set_value(Unit());
set_date(state->date_, false, std::move(full_source));
// set_qts(state->qts_).set_value(Unit());
add_qts(state->qts_).set_value(Unit());
seq_ = state->seq_;
}
@ -993,7 +981,7 @@ void UpdatesManager::on_server_pong(tl_object_ptr<telegram_api::updates_state> &
void UpdatesManager::process_get_difference_updates(
vector<tl_object_ptr<telegram_api::Message>> &&new_messages,
vector<tl_object_ptr<telegram_api::EncryptedMessage>> &&new_encrypted_messages, int32 qts,
vector<tl_object_ptr<telegram_api::EncryptedMessage>> &&new_encrypted_messages,
vector<tl_object_ptr<telegram_api::Update>> &&other_updates) {
VLOG(get_difference) << "In get difference receive " << new_messages.size() << " messages, "
<< new_encrypted_messages.size() << " encrypted messages and " << other_updates.size()
@ -1038,8 +1026,6 @@ void UpdatesManager::process_get_difference_updates(
}
process_updates(std::move(other_updates), true);
set_qts(qts).set_value(Unit());
}
void UpdatesManager::on_get_difference(tl_object_ptr<telegram_api::updates_Difference> &&difference_ptr) {
@ -1068,7 +1054,7 @@ void UpdatesManager::on_get_difference(tl_object_ptr<telegram_api::updates_Diffe
td_->contacts_manager_->on_get_chats(std::move(difference->chats_), "updates.difference");
process_get_difference_updates(std::move(difference->new_messages_),
std::move(difference->new_encrypted_messages_), difference->state_->qts_,
std::move(difference->new_encrypted_messages_),
std::move(difference->other_updates_));
if (running_get_difference_) {
LOG(ERROR) << "Get difference has run while processing get difference updates";
@ -1092,7 +1078,7 @@ void UpdatesManager::on_get_difference(tl_object_ptr<telegram_api::updates_Diffe
process_get_difference_updates(std::move(difference->new_messages_),
std::move(difference->new_encrypted_messages_),
difference->intermediate_state_->qts_, std::move(difference->other_updates_));
std::move(difference->other_updates_));
if (running_get_difference_) {
LOG(ERROR) << "Get difference has run while processing get difference updates";
break;
@ -1315,7 +1301,8 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
if (id == telegram_api::updateNewMessage::ID || id == telegram_api::updateReadMessagesContents::ID ||
id == telegram_api::updateEditMessage::ID || id == telegram_api::updateDeleteMessages::ID ||
id == telegram_api::updateReadHistoryInbox::ID || id == telegram_api::updateReadHistoryOutbox::ID ||
id == telegram_api::updateWebPage::ID) {
id == telegram_api::updateWebPage::ID || id == telegram_api::updateNewEncryptedMessage::ID ||
id == telegram_api::updateChannelParticipant::ID) {
if (!downcast_call(*update, OnUpdate(this, update, false))) {
LOG(ERROR) << "Can't call on some update received from " << source;
}
@ -1370,6 +1357,44 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
set_seq_gap_timeout(MAX_UNFILLED_GAP_TIME);
}
void UpdatesManager::add_pending_qts_update(tl_object_ptr<telegram_api::Update> &&update, int32 qts) {
CHECK(update != nullptr);
if (qts <= 1) {
LOG(ERROR) << "Receive wrong qts " << qts << " in " << oneline(to_string(update));
return;
}
int32 old_qts = get_qts();
if (qts < old_qts - 1000001) {
LOG(WARNING) << "Restore qts after qts overflow from " << old_qts << " to " << qts << " by "
<< oneline(to_string(update));
add_qts(qts - 1).set_value(Unit());
CHECK(get_qts() == qts - 1);
old_qts = qts - 1;
}
if (qts <= old_qts) {
LOG(INFO) << "Skip already applied update with qts = " << qts;
return;
}
CHECK(!running_get_difference_);
if (qts > old_qts + 1) {
if (pending_qts_updates_.empty()) {
set_qts_gap_timeout(MAX_UNFILLED_GAP_TIME);
}
bool is_inserted = pending_qts_updates_.emplace(qts, std::move(update)).second;
if (!is_inserted) {
LOG(INFO) << "Receive duplicate update with qts = " << qts;
}
return;
}
process_qts_update(std::move(update), qts);
process_pending_qts_updates();
}
void UpdatesManager::process_updates(vector<tl_object_ptr<telegram_api::Update>> &&updates, bool force_apply) {
tl_object_ptr<telegram_api::updatePtsChanged> update_pts_changed;
/*
@ -1435,6 +1460,22 @@ void UpdatesManager::process_seq_updates(int32 seq_end, int32 date,
}
}
void UpdatesManager::process_qts_update(tl_object_ptr<telegram_api::Update> &&update, int32 qts) {
switch (update->get_id()) {
case telegram_api::updateNewEncryptedMessage::ID: {
auto message = std::move(move_tl_object_as<telegram_api::updateNewEncryptedMessage>(update)->message_);
send_closure(td_->secret_chats_manager_, &SecretChatsManager::on_new_message, std::move(message), add_qts(qts));
break;
}
case telegram_api::updateChannelParticipant::ID:
// TODO
break;
default:
UNREACHABLE();
break;
}
}
void UpdatesManager::process_pending_seq_updates() {
while (!pending_seq_updates_.empty() && !running_get_difference_) {
auto update_it = pending_seq_updates_.begin();
@ -1458,6 +1499,27 @@ void UpdatesManager::process_pending_seq_updates() {
}
}
void UpdatesManager::process_pending_qts_updates() {
if (pending_qts_updates_.empty()) {
return;
}
while (!pending_qts_updates_.empty()) {
CHECK(!running_get_difference_);
auto update_it = pending_qts_updates_.begin();
auto qts = update_it->first;
if (qts > get_qts() + 1) {
return;
}
if (qts == get_qts() + 1) {
process_qts_update(std::move(update_it->second), qts);
}
pending_qts_updates_.erase(update_it);
}
if (pending_qts_updates_.empty()) {
qts_gap_timeout_.cancel_timeout();
}
}
void UpdatesManager::set_seq_gap_timeout(double timeout) {
if (!seq_gap_timeout_.has_timeout()) {
seq_gap_timeout_.set_callback(std::move(fill_seq_gap));
@ -1466,6 +1528,13 @@ void UpdatesManager::set_seq_gap_timeout(double timeout) {
}
}
void UpdatesManager::set_qts_gap_timeout(double timeout) {
CHECK(!qts_gap_timeout_.has_timeout());
qts_gap_timeout_.set_callback(std::move(fill_qts_gap));
qts_gap_timeout_.set_callback_data(static_cast<void *>(td_));
qts_gap_timeout_.set_timeout_in(timeout);
}
void UpdatesManager::on_pending_update(tl_object_ptr<telegram_api::Update> update, int32 seq, const char *source) {
vector<tl_object_ptr<telegram_api::Update>> updates;
updates.push_back(std::move(update));
@ -1941,19 +2010,12 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateEncryption> upd
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateNewEncryptedMessage> update, bool force_apply) {
if (!force_apply) {
if (update->qts_ <= get_qts()) {
LOG(INFO) << "Ignore already processed update with qts " << update->qts_;
return;
}
if (update->qts_ != get_qts() + 1) {
// TODO fill gap
return;
}
if (force_apply) {
return process_qts_update(std::move(update), 0);
}
send_closure(td_->secret_chats_manager_, &SecretChatsManager::on_new_message, std::move(update->message_),
add_qts(update->qts_));
auto qts = update->qts_;
add_pending_qts_update(std::move(update), qts);
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateEncryptedMessagesRead> update, bool /*force_apply*/) {

View File

@ -71,8 +71,6 @@ class UpdatesManager : public Actor {
Promise<> set_pts(int32 pts, const char *source) TD_WARN_UNUSED_RESULT;
Promise<> set_qts(int32 qts) TD_WARN_UNUSED_RESULT;
static const double MAX_UNFILLED_GAP_TIME;
static void fill_pts_gap(void *td);
@ -112,8 +110,12 @@ class UpdatesManager : public Actor {
std::multimap<int32, PendingUpdates> postponed_updates_; // updates received during getDifference
std::multimap<int32, PendingUpdates> pending_seq_updates_; // updates with too big seq
std::map<int32, tl_object_ptr<telegram_api::Update>> pending_qts_updates_; // updates with too big qts
Timeout seq_gap_timeout_;
Timeout qts_gap_timeout_;
int32 retry_time_ = 1;
Timeout retry_timeout_;
@ -139,10 +141,12 @@ class UpdatesManager : public Actor {
void process_get_difference_updates(vector<tl_object_ptr<telegram_api::Message>> &&new_messages,
vector<tl_object_ptr<telegram_api::EncryptedMessage>> &&new_encrypted_messages,
int32 qts, vector<tl_object_ptr<telegram_api::Update>> &&other_updates);
vector<tl_object_ptr<telegram_api::Update>> &&other_updates);
void on_pending_update(tl_object_ptr<telegram_api::Update> update, int32 seq, const char *source);
void add_pending_qts_update(tl_object_ptr<telegram_api::Update> &&update, int32 qts);
void on_pending_updates(vector<tl_object_ptr<telegram_api::Update>> &&updates, int32 seq_begin, int32 seq_end,
int32 date, const char *source);
@ -150,16 +154,24 @@ class UpdatesManager : public Actor {
void process_seq_updates(int32 seq_end, int32 date, vector<tl_object_ptr<telegram_api::Update>> &&updates);
void process_qts_update(tl_object_ptr<telegram_api::Update> &&update, int32 qts);
void process_pending_seq_updates();
void process_pending_qts_updates();
static void fill_seq_gap(void *td);
static void fill_qts_gap(void *td);
static void fill_get_difference_gap(void *td);
static void fill_gap(void *td, const char *source);
void set_seq_gap_timeout(double timeout);
void set_qts_gap_timeout(double timeout);
void on_failed_get_difference();
void before_get_difference(bool is_initial);