Extract pts and qts updates and apply them during getDifference.

This commit is contained in:
levlam 2021-08-20 17:21:31 +03:00
parent a7e2e85119
commit 241718eae8

View File

@ -1338,15 +1338,8 @@ void UpdatesManager::on_get_difference(tl_object_ptr<telegram_api::updates_Diffe
auto difference = move_tl_object_as<telegram_api::updates_differenceEmpty>(difference_ptr);
set_date(difference->date_, false, "on_get_difference_empty");
seq_ = difference->seq_;
if (!pending_seq_updates_.empty()) {
LOG(WARNING) << "Drop " << pending_seq_updates_.size() << " pending seq updates after receive empty difference";
auto pending_seq_updates = std::move(pending_seq_updates_);
pending_seq_updates_.clear();
for (auto &pending_update : pending_seq_updates) {
pending_update.second.promise.set_value(Unit());
}
}
process_pending_qts_updates();
if (!pending_qts_updates_.empty()) {
LOG(WARNING) << "Drop " << pending_qts_updates_.size() << " pending qts updates after receive empty difference";
auto pending_qts_updates = std::move(pending_qts_updates_);
@ -1359,6 +1352,17 @@ void UpdatesManager::on_get_difference(tl_object_ptr<telegram_api::updates_Diffe
}
}
}
process_pending_seq_updates();
if (!pending_seq_updates_.empty()) {
LOG(WARNING) << "Drop " << pending_seq_updates_.size() << " pending seq updates after receive empty difference";
auto pending_seq_updates = std::move(pending_seq_updates_);
pending_seq_updates_.clear();
for (auto &pending_update : pending_seq_updates) {
pending_update.second.promise.set_value(Unit());
}
}
break;
}
case telegram_api::updates_difference::ID: {
@ -1564,78 +1568,62 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
}
bool need_postpone = running_get_difference_ /*|| string(source) != string("postponed updates")*/;
if (need_postpone) {
LOG(INFO) << "Postpone " << updates.size() << " updates [" << seq_begin << ", " << seq_end
<< "] with date = " << date << " from " << source;
if (!need_postpone) {
for (auto &update : updates) {
auto pts = get_update_pts(update.get());
if (pts != 0 && (min_postponed_update_pts_ == 0 || pts < min_postponed_update_pts_)) {
min_postponed_update_pts_ = pts;
}
auto qts = get_update_qts(update.get());
if (qts != 0 && (min_postponed_update_qts_ == 0 || qts < min_postponed_update_qts_)) {
min_postponed_update_qts_ = qts;
}
}
postponed_updates_.emplace(
seq_begin, PendingSeqUpdates(seq_begin, seq_end, date, receive_time, std::move(updates), std::move(promise)));
return;
}
for (auto &update : updates) {
if (!is_acceptable_update(update.get())) {
CHECK(update != nullptr);
int32 id = update->get_id();
const tl_object_ptr<telegram_api::Message> *message_ptr = nullptr;
int32 pts = 0;
if (id == telegram_api::updateNewChannelMessage::ID) {
auto update_new_channel_message = static_cast<const telegram_api::updateNewChannelMessage *>(update.get());
message_ptr = &update_new_channel_message->message_;
pts = update_new_channel_message->pts_;
}
if (id == telegram_api::updateEditChannelMessage::ID) {
auto update_edit_channel_message = static_cast<const telegram_api::updateEditChannelMessage *>(update.get());
message_ptr = &update_edit_channel_message->message_;
pts = update_edit_channel_message->pts_;
}
// for channels we can try to replace unacceptable update with updateChannelTooLong
if (message_ptr != nullptr) {
auto dialog_id = td_->messages_manager_->get_message_dialog_id(*message_ptr);
if (dialog_id.get_type() == DialogType::Channel) {
auto channel_id = dialog_id.get_channel_id();
if (td_->contacts_manager_->have_channel_force(channel_id)) {
if (td_->messages_manager_->is_old_channel_update(dialog_id, pts)) {
// the update will be ignored anyway, so there is no reason to replace it or force get_difference
LOG(INFO) << "Allow an outdated unacceptable update from " << source;
continue;
}
if ((*message_ptr)->get_id() != telegram_api::messageService::ID) {
// don't replace service messages, because they can be about bot's kicking
LOG(INFO) << "Replace update about new message with updateChannelTooLong in " << dialog_id;
update = telegram_api::make_object<telegram_api::updateChannelTooLong>(
telegram_api::updateChannelTooLong::PTS_MASK, channel_id.get(), pts);
continue;
}
}
} else {
LOG(ERROR) << "Update is not from a channel: " << to_string(update);
if (!is_acceptable_update(update.get())) {
CHECK(update != nullptr);
int32 id = update->get_id();
const tl_object_ptr<telegram_api::Message> *message_ptr = nullptr;
int32 pts = 0;
if (id == telegram_api::updateNewChannelMessage::ID) {
auto update_new_channel_message = static_cast<const telegram_api::updateNewChannelMessage *>(update.get());
message_ptr = &update_new_channel_message->message_;
pts = update_new_channel_message->pts_;
}
if (id == telegram_api::updateEditChannelMessage::ID) {
auto update_edit_channel_message = static_cast<const telegram_api::updateEditChannelMessage *>(update.get());
message_ptr = &update_edit_channel_message->message_;
pts = update_edit_channel_message->pts_;
}
// for channels we can try to replace unacceptable update with updateChannelTooLong
if (message_ptr != nullptr) {
auto dialog_id = td_->messages_manager_->get_message_dialog_id(*message_ptr);
if (dialog_id.get_type() == DialogType::Channel) {
auto channel_id = dialog_id.get_channel_id();
if (td_->contacts_manager_->have_channel_force(channel_id)) {
if (td_->messages_manager_->is_old_channel_update(dialog_id, pts)) {
// the update will be ignored anyway, so there is no reason to replace it or force get_difference
LOG(INFO) << "Allow an outdated unacceptable update from " << source;
continue;
}
if ((*message_ptr)->get_id() != telegram_api::messageService::ID) {
// don't replace service messages, because they can be about bot's kicking
LOG(INFO) << "Replace update about new message with updateChannelTooLong in " << dialog_id;
update = telegram_api::make_object<telegram_api::updateChannelTooLong>(
telegram_api::updateChannelTooLong::PTS_MASK, channel_id.get(), pts);
continue;
}
}
} else {
LOG(ERROR) << "Update is not from a channel: " << to_string(update);
}
}
get_difference("on unacceptable updates in on_pending_updates");
return promise.set_value(Unit());
}
get_difference("on unacceptable updates in on_pending_updates");
return promise.set_value(Unit());
}
}
if (date > 0 && updates.size() == 1 && updates[0] != nullptr &&
updates[0]->get_id() == telegram_api::updateReadHistoryOutbox::ID) {
auto update = static_cast<const telegram_api::updateReadHistoryOutbox *>(updates[0].get());
DialogId dialog_id(update->peer_);
if (dialog_id.get_type() == DialogType::User) {
auto user_id = dialog_id.get_user_id();
if (user_id.is_valid()) {
td_->contacts_manager_->on_update_user_local_was_online(user_id, date);
if (date > 0 && updates.size() == 1 && updates[0] != nullptr &&
updates[0]->get_id() == telegram_api::updateReadHistoryOutbox::ID) {
auto update = static_cast<const telegram_api::updateReadHistoryOutbox *>(updates[0].get());
DialogId dialog_id(update->peer_);
if (dialog_id.get_type() == DialogType::User) {
auto user_id = dialog_id.get_user_id();
if (user_id.is_valid()) {
td_->contacts_manager_->on_update_user_local_was_online(user_id, date);
}
}
}
}
@ -1659,7 +1647,9 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
for (auto &update : updates) {
LOG(ERROR) << "Update: " << oneline(to_string(update));
}
schedule_get_difference("on_get_wrong_updates");
if (!running_get_difference_) {
schedule_get_difference("on_get_wrong_updates");
}
return promise.set_value(Unit());
}
@ -1697,27 +1687,34 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
on_update(move_tl_object_as<telegram_api::updateEncryption>(update), mpas.get_promise());
update = nullptr;
}
CHECK(!running_get_difference_);
CHECK(need_postpone || !running_get_difference_);
}
}
for (auto &update : updates) {
if (update != nullptr) {
if (is_pts_update(update.get()) || is_qts_update(update.get())) {
if (is_pts_update(update.get())) {
if (running_get_difference_) {
auto pts = get_update_pts(update.get());
if (pts != 0 && (min_postponed_update_pts_ == 0 || pts < min_postponed_update_pts_)) {
min_postponed_update_pts_ = pts;
}
}
downcast_call(*update, OnUpdate(this, update, mpas.get_promise()));
update = nullptr;
} else if (is_qts_update(update.get())) {
if (running_get_difference_) {
auto qts = get_update_qts(update.get());
if (qts != 0 && (min_postponed_update_qts_ == 0 || qts < min_postponed_update_qts_)) {
min_postponed_update_qts_ = qts;
}
}
downcast_call(*update, OnUpdate(this, update, mpas.get_promise()));
update = nullptr;
}
}
}
if (running_get_difference_) {
LOG(ERROR) << "Postpone " << updates.size() << " updates [" << seq_begin << ", " << seq_end
<< "] with date = " << date << " from " << source;
postponed_updates_.emplace(
seq_begin, PendingSeqUpdates(seq_begin, seq_end, date, receive_time, std::move(updates), mpas.get_promise()));
return lock.set_value(Unit());
}
if (seq_begin == 0 && seq_end == 0) {
bool have_updates = false;
for (auto &update : updates) {
@ -1727,11 +1724,22 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
}
}
if (!have_updates) {
LOG(INFO) << "All updates was processed";
LOG(INFO) << "All updates were processed";
return lock.set_value(Unit());
}
}
if (need_postpone || running_get_difference_) {
LOG(INFO) << "Postpone " << updates.size() << " updates [" << seq_begin << ", " << seq_end
<< "] with date = " << date << " from " << source;
if (!need_postpone) {
LOG(ERROR) << "Run get difference while applying updates from " << source;
}
postponed_updates_.emplace(
seq_begin, PendingSeqUpdates(seq_begin, seq_end, date, receive_time, std::move(updates), mpas.get_promise()));
return lock.set_value(Unit());
}
if (seq_begin == 0 || seq_begin == seq_ + 1) {
LOG(INFO) << "Process " << updates.size() << " updates [" << seq_begin << ", " << seq_end
<< "] with date = " << date << " from " << source;
@ -1792,11 +1800,9 @@ void UpdatesManager::add_pending_qts_update(tl_object_ptr<telegram_api::Update>
return;
}
CHECK(!running_get_difference_);
if (qts - 1 > old_qts && old_qts > 0) {
if (running_get_difference_ || (qts - 1 > old_qts && old_qts > 0)) {
LOG(INFO) << "Postpone update with qts = " << qts;
if (pending_qts_updates_.empty()) {
if (!running_get_difference_ && pending_qts_updates_.empty()) {
set_qts_gap_timeout(MAX_UNFILLED_GAP_TIME);
}
auto &pending_update = pending_qts_updates_[qts];
@ -1976,9 +1982,6 @@ void UpdatesManager::add_pending_pts_update(tl_object_ptr<telegram_api::Update>
if (running_get_difference_ || !postponed_pts_updates_.empty()) {
LOG(INFO) << "Save pending update got while running getDifference from " << source;
if (running_get_difference_) {
CHECK(update->get_id() == dummyUpdate::ID || update->get_id() == updateSentMessage::ID);
}
postpone_pts_update(std::move(update), new_pts, pts_count, receive_time, std::move(promise));
return;
}
@ -2360,6 +2363,7 @@ void UpdatesManager::process_pending_qts_updates() {
}
set_qts_gap_timeout(receive_time + MAX_UNFILLED_GAP_TIME - Time::now());
}
CHECK(!running_get_difference_);
}
void UpdatesManager::set_pts_gap_timeout(double timeout) {