Move handling of pending pts updates to UpdatesManager.

This commit is contained in:
levlam 2021-01-16 01:17:35 +03:00
parent 3c1341731c
commit ef920f297d
4 changed files with 302 additions and 294 deletions

View File

@ -663,9 +663,9 @@ class UnpinAllMessagesQuery : public Td::ResultHandler {
affected_history->pts_, affected_history->pts_count_,
std::move(promise), "unpin all messages");
} else {
td->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), affected_history->pts_,
affected_history->pts_count_, std::move(promise),
"unpin all messages");
td->updates_manager_->add_pending_pts_update(make_tl_object<dummyUpdate>(), affected_history->pts_,
affected_history->pts_count_, std::move(promise),
"unpin all messages");
}
} else if (affected_history->offset_ <= 0) {
promise_.set_value(Unit());
@ -1560,9 +1560,9 @@ class ReadMessagesContentsQuery : public Td::ResultHandler {
CHECK(affected_messages->get_id() == telegram_api::messages_affectedMessages::ID);
if (affected_messages->pts_count_ > 0) {
td->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), affected_messages->pts_,
affected_messages->pts_count_, Promise<Unit>(),
"read messages content query");
td->updates_manager_->add_pending_pts_update(make_tl_object<dummyUpdate>(), affected_messages->pts_,
affected_messages->pts_count_, Promise<Unit>(),
"read messages content query");
}
promise_.set_value(Unit());
@ -1778,8 +1778,9 @@ class ReadHistoryQuery : public Td::ResultHandler {
LOG(INFO) << "Receive result for ReadHistoryQuery: " << to_string(affected_messages);
if (affected_messages->pts_count_ > 0) {
td->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), affected_messages->pts_,
affected_messages->pts_count_, Promise<Unit>(), "read history query");
td->updates_manager_->add_pending_pts_update(make_tl_object<dummyUpdate>(), affected_messages->pts_,
affected_messages->pts_count_, Promise<Unit>(),
"read history query");
}
promise_.set_value(Unit());
@ -2247,8 +2248,9 @@ class DeleteHistoryQuery : public Td::ResultHandler {
CHECK(affected_history->get_id() == telegram_api::messages_affectedHistory::ID);
if (affected_history->pts_count_ > 0) {
td->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), affected_history->pts_,
affected_history->pts_count_, Promise<Unit>(), "delete history query");
td->updates_manager_->add_pending_pts_update(make_tl_object<dummyUpdate>(), affected_history->pts_,
affected_history->pts_count_, Promise<Unit>(),
"delete history query");
}
if (affected_history->offset_ > 0) {
@ -2446,9 +2448,9 @@ class ReadAllMentionsQuery : public Td::ResultHandler {
<< dialog_id_;
td->updates_manager_->get_difference("Wrong messages_readMentions result");
} else {
td->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), affected_history->pts_,
affected_history->pts_count_, Promise<Unit>(),
"read all mentions query");
td->updates_manager_->add_pending_pts_update(make_tl_object<dummyUpdate>(), affected_history->pts_,
affected_history->pts_count_, Promise<Unit>(),
"read all mentions query");
}
}
@ -2582,7 +2584,7 @@ class SendMessageActor : public NetActorOnce {
return;
}
td->messages_manager_->add_pending_update(
td->updates_manager_->add_pending_pts_update(
make_tl_object<updateSentMessage>(random_id_, message_id, sent_message->date_), sent_message->pts_,
sent_message->pts_count_, Promise<Unit>(), "send message actor");
}
@ -3600,9 +3602,9 @@ class DeleteMessagesQuery : public Td::ResultHandler {
CHECK(affected_messages->get_id() == telegram_api::messages_affectedMessages::ID);
if (affected_messages->pts_count_ > 0) {
td->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), affected_messages->pts_,
affected_messages->pts_count_, Promise<Unit>(),
"delete messages query");
td->updates_manager_->add_pending_pts_update(make_tl_object<dummyUpdate>(), affected_messages->pts_,
affected_messages->pts_count_, Promise<Unit>(),
"delete messages query");
}
if (--query_count_ == 0) {
promise_.set_value(Unit());
@ -6070,7 +6072,7 @@ tl_object_ptr<telegram_api::inputEncryptedChat> MessagesManager::get_input_encry
}
}
bool MessagesManager::is_allowed_useless_update(const tl_object_ptr<telegram_api::Update> &update) const {
bool MessagesManager::is_allowed_useless_update(const tl_object_ptr<telegram_api::Update> &update) {
auto constructor_id = update->get_id();
if (constructor_id == dummyUpdate::ID) {
// allow dummyUpdate just in case
@ -6085,8 +6087,8 @@ bool MessagesManager::is_allowed_useless_update(const tl_object_ptr<telegram_api
return false;
}
void MessagesManager::skip_old_pending_update(tl_object_ptr<telegram_api::Update> &&update, int32 new_pts,
int32 old_pts, int32 pts_count, const char *source) {
void MessagesManager::skip_old_pending_pts_update(tl_object_ptr<telegram_api::Update> &&update, int32 new_pts,
int32 old_pts, int32 pts_count, const char *source) {
if (update->get_id() == telegram_api::updateNewMessage::ID) {
auto update_new_message = static_cast<telegram_api::updateNewMessage *>(update.get());
auto full_message_id = get_full_message_id(update_new_message->message_, false);
@ -6125,156 +6127,6 @@ void MessagesManager::skip_old_pending_update(tl_object_ptr<telegram_api::Update
<< "Receive useless update " << oneline(to_string(update)) << " from " << source;
}
int32 MessagesManager::get_min_pending_pts() const {
int32 result = std::numeric_limits<int32>::max();
if (!pending_pts_updates_.empty()) {
auto pts = pending_pts_updates_.begin()->first;
if (pts < result) {
result = pts;
}
}
if (!postponed_pts_updates_.empty()) {
auto pts = postponed_pts_updates_.begin()->first;
if (pts < result) {
result = pts;
}
}
return result;
}
void MessagesManager::process_pts_update(tl_object_ptr<telegram_api::Update> &&update) {
CHECK(update != nullptr);
// TODO need to save all updates that can change result of running queries not associated with pts (for example
// getHistory) and apply the updates to results of the queries
if (!UpdatesManager::check_pts_update(update)) {
LOG(ERROR) << "Receive wrong pts update: " << oneline(to_string(update));
return;
}
// must be called only during getDifference
CHECK(pending_pts_updates_.empty());
CHECK(accumulated_pts_ == -1);
process_update(std::move(update));
}
void MessagesManager::add_pending_update(tl_object_ptr<telegram_api::Update> &&update, int32 new_pts, int32 pts_count,
Promise<Unit> &&promise, const char *source) {
// do not try to run getDifference from this function
CHECK(update != nullptr);
CHECK(source != nullptr);
LOG(INFO) << "Receive from " << source << " pending " << to_string(update);
if (pts_count < 0 || new_pts <= pts_count) {
LOG(ERROR) << "Receive update with wrong pts = " << new_pts << " or pts_count = " << pts_count << " from " << source
<< ": " << oneline(to_string(update));
return promise.set_value(Unit());
}
// TODO need to save all updates that can change result of running queries not associated with pts (for example
// getHistory) and apply them to result of this queries
if (!UpdatesManager::check_pts_update(update)) {
LOG(ERROR) << "Receive wrong pts update from " << source << ": " << oneline(to_string(update));
return promise.set_value(Unit());
}
if (DROP_UPDATES) {
set_get_difference_timeout(1.0);
return promise.set_value(Unit());
}
int32 old_pts = td_->updates_manager_->get_pts();
if (new_pts < old_pts - 99 && Slice(source) != "after get difference") {
bool need_restore_pts = new_pts < old_pts - 19999;
auto now = Time::now();
if (now > last_pts_jump_warning_time_ + 1 && (need_restore_pts || now < last_pts_jump_warning_time_ + 5)) {
LOG(ERROR) << "Restore pts after delete_first_messages from " << old_pts << " to " << new_pts
<< " is disabled, pts_count = " << pts_count << ", update is from " << source << ": "
<< oneline(to_string(update));
last_pts_jump_warning_time_ = now;
}
if (need_restore_pts) {
set_get_difference_timeout(0.001);
/*
LOG(WARNING) << "Restore pts after delete_first_messages";
td_->updates_manager_->set_pts(new_pts - 1, "restore pts after delete_first_messages");
old_pts = td_->updates_manager_->get_pts();
CHECK(old_pts == new_pts - 1);
*/
}
}
if (new_pts <= old_pts || (old_pts >= 1 && new_pts > old_pts + 500000000)) {
skip_old_pending_update(std::move(update), new_pts, old_pts, pts_count, source);
return promise.set_value(Unit());
}
if (td_->updates_manager_->running_get_difference() || !postponed_pts_updates_.empty()) {
LOG(INFO) << "Save pending update got while running getDifference from " << source;
if (td_->updates_manager_->running_get_difference()) {
CHECK(update->get_id() == dummyUpdate::ID || update->get_id() == updateSentMessage::ID);
}
postpone_pts_update(std::move(update), new_pts, pts_count, std::move(promise));
return;
}
if (old_pts + pts_count > new_pts) {
LOG(WARNING) << "Have old_pts (= " << old_pts << ") + pts_count (= " << pts_count << ") > new_pts (= " << new_pts
<< "). Logged in " << G()->shared_config().get_option_integer("authorization_date") << ". Update from "
<< source << " = " << oneline(to_string(update));
postpone_pts_update(std::move(update), new_pts, pts_count, std::move(promise));
set_get_difference_timeout(0.001);
return;
}
accumulated_pts_count_ += pts_count;
if (new_pts > accumulated_pts_) {
accumulated_pts_ = new_pts;
}
if (old_pts + accumulated_pts_count_ > accumulated_pts_) {
LOG(WARNING) << "Have old_pts (= " << old_pts << ") + accumulated_pts_count (= " << accumulated_pts_count_
<< ") > accumulated_pts (= " << accumulated_pts_ << "). new_pts = " << new_pts
<< ", pts_count = " << pts_count << ". Logged in "
<< G()->shared_config().get_option_integer("authorization_date") << ". Update from " << source << " = "
<< oneline(to_string(update));
postpone_pts_update(std::move(update), new_pts, pts_count, std::move(promise));
set_get_difference_timeout(0.001);
return;
}
LOG_IF(INFO, pts_count == 0 && update->get_id() != dummyUpdate::ID) << "Skip useless update " << to_string(update);
if (pending_pts_updates_.empty() && old_pts + accumulated_pts_count_ == accumulated_pts_ &&
!pts_gap_timeout_.has_timeout()) {
if (pts_count > 0) {
process_update(std::move(update));
td_->updates_manager_->set_pts(accumulated_pts_, "process pending updates fast path")
.set_value(Unit()); // TODO can't set until get messages really stored on persistent storage
accumulated_pts_count_ = 0;
accumulated_pts_ = -1;
}
promise.set_value(Unit());
return;
}
pending_pts_updates_.emplace(new_pts, PendingPtsUpdate(std::move(update), new_pts, pts_count, std::move(promise)));
if (old_pts + accumulated_pts_count_ < accumulated_pts_) {
set_get_difference_timeout(UpdatesManager::MAX_UNFILLED_GAP_TIME);
return;
}
CHECK(old_pts + accumulated_pts_count_ == accumulated_pts_);
if (!pending_pts_updates_.empty()) {
process_pending_updates();
}
}
MessagesManager::Dialog *MessagesManager::get_service_notifications_dialog() {
UserId service_notifications_user_id = td_->contacts_manager_->add_service_notifications_user();
DialogId service_notifications_dialog_id(service_notifications_user_id);
@ -7084,11 +6936,11 @@ void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_p
if (new_pts < old_pts - 19999 && !is_postponed_update) {
// restore channel pts after delete_first_messages
auto now = Time::now();
if (now > last_pts_jump_warning_time_ + 1) {
if (now > last_channel_pts_jump_warning_time_ + 1) {
LOG(ERROR) << "Restore pts in " << d->dialog_id << " from " << source << " after delete_first_messages from "
<< old_pts << " to " << new_pts << " is temporarily disabled, pts_count = " << pts_count
<< ", update is from " << source << ": " << oneline(to_string(update));
last_pts_jump_warning_time_ = now;
last_channel_pts_jump_warning_time_ = now;
}
get_channel_difference(dialog_id, old_pts, true, "add_pending_channel_update old");
}
@ -7171,16 +7023,7 @@ bool MessagesManager::is_old_channel_update(DialogId dialog_id, int32 new_pts) {
return new_pts <= (d == nullptr ? load_channel_pts(dialog_id) : d->pts);
}
void MessagesManager::set_get_difference_timeout(double timeout) {
if (!pts_gap_timeout_.has_timeout()) {
LOG(INFO) << "Gap in pts has found, current pts is " << td_->updates_manager_->get_pts();
pts_gap_timeout_.set_callback(std::move(UpdatesManager::fill_pts_gap));
pts_gap_timeout_.set_callback_data(static_cast<void *>(td_));
pts_gap_timeout_.set_timeout_in(timeout);
}
}
void MessagesManager::process_update(tl_object_ptr<telegram_api::Update> &&update) {
void MessagesManager::process_pts_update(tl_object_ptr<telegram_api::Update> &&update) {
switch (update->get_id()) {
case dummyUpdate::ID:
LOG(INFO) << "Process dummyUpdate";
@ -7345,29 +7188,6 @@ void MessagesManager::on_message_edited(FullMessageId full_message_id, int32 pts
update_used_hashtags(dialog_id, m);
}
void MessagesManager::process_pending_updates() {
for (auto &update : pending_pts_updates_) {
process_update(std::move(update.second.update));
update.second.promise.set_value(Unit());
}
td_->updates_manager_->set_pts(accumulated_pts_, "process pending updates")
.set_value(Unit()); // TODO can't set until get messages really stored on persistent storage
drop_pending_updates();
}
void MessagesManager::drop_pending_updates() {
accumulated_pts_count_ = 0;
accumulated_pts_ = -1;
pts_gap_timeout_.cancel_timeout();
pending_pts_updates_.clear();
}
void MessagesManager::postpone_pts_update(tl_object_ptr<telegram_api::Update> &&update, int32 pts, int32 pts_count,
Promise<Unit> &&promise) {
postponed_pts_updates_.emplace(pts, PendingPtsUpdate(std::move(update), pts, pts_count, std::move(promise)));
}
string MessagesManager::get_notification_settings_scope_database_key(NotificationSettingsScope scope) {
switch (scope) {
case NotificationSettingsScope::Private:
@ -8690,31 +8510,11 @@ void MessagesManager::before_get_difference() {
// scheduled messages are not returned in getDifference, so we must always reget them after it
scheduled_messages_sync_generation_++;
postponed_pts_updates_.insert(std::make_move_iterator(pending_pts_updates_.begin()),
std::make_move_iterator(pending_pts_updates_.end()));
drop_pending_updates();
}
void MessagesManager::after_get_difference() {
CHECK(!td_->updates_manager_->running_get_difference());
if (postponed_pts_updates_.size()) {
auto postponed_updates = std::move(postponed_pts_updates_);
postponed_pts_updates_.clear();
LOG(INFO) << "Begin to apply " << postponed_updates.size() << " postponed pts updates";
for (auto &postponed_update : postponed_updates) {
auto &update = postponed_update.second;
add_pending_update(std::move(update.update), update.pts, update.pts_count, std::move(update.promise),
"after get difference");
CHECK(!td_->updates_manager_->running_get_difference());
}
LOG(INFO) << "Finish to apply postponed pts updates, have " << postponed_pts_updates_.size()
<< " left postponed updates";
}
running_get_difference_ = false;
if (!pending_on_get_dialogs_.empty()) {
@ -8777,13 +8577,13 @@ void MessagesManager::after_get_difference() {
const Dialog *d = get_dialog(dialog_id);
CHECK(d != nullptr);
if (dialog_id.get_type() == DialogType::Channel || pending_pts_updates_.empty() || message_id.is_scheduled() ||
if (dialog_id.get_type() == DialogType::Channel || message_id.is_scheduled() ||
message_id <= d->last_new_message_id) {
LOG(ERROR) << "Receive updateMessageId from " << it.second << " to " << full_message_id
<< " but not receive corresponding message, last_new_message_id = " << d->last_new_message_id;
}
if (dialog_id.get_type() != DialogType::Channel &&
(pending_pts_updates_.empty() || message_id.is_scheduled() || message_id <= d->last_new_message_id)) {
(message_id.is_scheduled() || message_id <= d->last_new_message_id)) {
dump_debug_message_op(get_dialog(dialog_id));
}
if (message_id.is_scheduled() || message_id <= d->last_new_message_id) {
@ -11749,7 +11549,7 @@ void MessagesManager::init() {
always_wait_for_mailbox();
start_time_ = Time::now();
last_pts_jump_warning_time_ = start_time_ - 3600;
last_channel_pts_jump_warning_time_ = start_time_ - 3600;
bool is_authorized = td_->auth_manager_->is_authorized();
bool was_authorized_user = td_->auth_manager_->was_authorized() && !td_->auth_manager_->is_bot();
@ -27908,7 +27708,7 @@ FullMessageId MessagesManager::on_send_message_success(int64 random_id, MessageI
FileId new_file_id, const char *source) {
CHECK(source != nullptr);
// do not try to run getDifference from this function
if (DROP_UPDATES) {
if (DROP_SEND_MESSAGE_UPDATES) {
return {};
}
if (!new_message_id.is_valid()) {

View File

@ -788,12 +788,10 @@ class MessagesManager : public Actor {
tl_object_ptr<td_api::messages> get_messages_object(int32 total_count, const vector<FullMessageId> &full_message_ids,
bool skip_not_found);
int32 get_min_pending_pts() const;
void process_pts_update(tl_object_ptr<telegram_api::Update> &&update);
void add_pending_update(tl_object_ptr<telegram_api::Update> &&update, int32 new_pts, int32 pts_count,
Promise<Unit> &&promise, const char *source);
void skip_old_pending_pts_update(tl_object_ptr<telegram_api::Update> &&update, int32 new_pts, int32 old_pts,
int32 pts_count, const char *source);
void add_pending_channel_update(DialogId dialog_id, tl_object_ptr<telegram_api::Update> &&update, int32 new_pts,
int32 pts_count, Promise<Unit> &&promise, const char *source,
@ -1730,7 +1728,7 @@ class MessagesManager : public Actor {
static constexpr const char *DELETE_MESSAGE_USER_REQUEST_SOURCE = "user request";
static constexpr bool DROP_UPDATES = false;
static constexpr bool DROP_SEND_MESSAGE_UPDATES = false;
static FullMessageId get_full_message_id(const tl_object_ptr<telegram_api::Message> &message_ptr, bool is_scheduled);
@ -1828,8 +1826,6 @@ class MessagesManager : public Actor {
bool can_set_game_score(DialogId dialog_id, const Message *m) const;
void process_update(tl_object_ptr<telegram_api::Update> &&update);
void process_channel_update(tl_object_ptr<telegram_api::Update> &&update);
void on_message_edited(FullMessageId full_message_id, int32 pts);
@ -2062,7 +2058,7 @@ class MessagesManager : public Actor {
static void set_message_id(unique_ptr<Message> &message, MessageId message_id);
bool is_allowed_useless_update(const tl_object_ptr<telegram_api::Update> &update) const;
static bool is_allowed_useless_update(const tl_object_ptr<telegram_api::Update> &update);
bool is_message_auto_read(DialogId dialog_id, bool is_outgoing) const;
@ -2789,18 +2785,6 @@ class MessagesManager : public Actor {
void load_notification_settings();
void set_get_difference_timeout(double timeout);
void skip_old_pending_update(tl_object_ptr<telegram_api::Update> &&update, int32 new_pts, int32 old_pts,
int32 pts_count, const char *source);
void process_pending_updates();
void drop_pending_updates();
void postpone_pts_update(tl_object_ptr<telegram_api::Update> &&update, int32 pts, int32 pts_count,
Promise<Unit> &&promise);
static string get_channel_pts_key(DialogId dialog_id);
int32 load_channel_pts(DialogId dialog_id) const;
@ -3013,10 +2997,7 @@ class MessagesManager : public Actor {
std::shared_ptr<UploadThumbnailCallback> upload_thumbnail_callback_;
std::shared_ptr<UploadDialogPhotoCallback> upload_dialog_photo_callback_;
int32 accumulated_pts_count_ = 0;
int32 accumulated_pts_ = -1;
Timeout pts_gap_timeout_;
double last_pts_jump_warning_time_ = 0;
double last_channel_pts_jump_warning_time_ = 0;
std::unordered_map<FileId, std::pair<FullMessageId, FileId>, FileIdHash>
being_uploaded_files_; // file_id -> message, thumbnail_file_id
@ -3115,8 +3096,6 @@ class MessagesManager : public Actor {
bool running_get_difference_ = false; // true after before_get_difference and false after after_get_difference
std::unordered_map<DialogId, unique_ptr<Dialog>, DialogIdHash> dialogs_;
std::multimap<int32, PendingPtsUpdate> pending_pts_updates_;
std::multimap<int32, PendingPtsUpdate> postponed_pts_updates_;
std::unordered_set<DialogId, DialogIdHash>
loaded_dialogs_; // dialogs loaded from database, but not added to dialogs_

View File

@ -16,6 +16,7 @@
#include "td/telegram/ChannelId.h"
#include "td/telegram/ChatId.h"
#include "td/telegram/ConfigManager.h"
#include "td/telegram/ConfigShared.h"
#include "td/telegram/ContactsManager.h"
#include "td/telegram/DialogAction.h"
#include "td/telegram/DialogId.h"
@ -188,7 +189,7 @@ void UpdatesManager::fill_pts_gap(void *td) {
auto td_ptr = static_cast<Td *>(td);
string source = PSTRING() << "pts from " << td_ptr->updates_manager_->get_pts() << " to "
<< td_ptr->messages_manager_->get_min_pending_pts();
<< td_ptr->updates_manager_->get_min_pending_pts();
fill_gap(td, source.c_str());
}
@ -270,7 +271,10 @@ void UpdatesManager::before_get_difference(bool is_initial) {
// may be called many times before after_get_difference is called
send_closure(G()->state_manager(), &StateManager::on_synchronized, false);
td_->messages_manager_->before_get_difference();
postponed_pts_updates_.insert(std::make_move_iterator(pending_pts_updates_.begin()),
std::make_move_iterator(pending_pts_updates_.end()));
drop_pending_pts_updates();
send_closure_later(td_->notification_manager_actor_, &NotificationManager::before_get_difference);
}
@ -1160,7 +1164,7 @@ void UpdatesManager::process_get_difference_updates(
// process updateReadHistoryInbox before new messages
if (constructor_id == telegram_api::updateReadHistoryInbox::ID) {
static_cast<telegram_api::updateReadHistoryInbox *>(update.get())->still_unread_count_ = -1;
td_->messages_manager_->process_pts_update(std::move(update));
process_pts_update(std::move(update));
CHECK(!running_get_difference_);
}
*/
@ -1340,6 +1344,21 @@ void UpdatesManager::after_get_difference() {
VLOG(get_difference) << "Finish to apply " << total_update_count << " postponed updates";
}
if (postponed_pts_updates_.size()) { // must be before td_->messages_manager_->after_get_difference()
auto postponed_updates = std::move(postponed_pts_updates_);
postponed_pts_updates_.clear();
LOG(INFO) << "Begin to apply " << postponed_updates.size() << " postponed pts updates";
for (auto &postponed_update : postponed_updates) {
auto &update = postponed_update.second;
add_pending_pts_update(std::move(update.update), update.pts, update.pts_count, std::move(update.promise),
"after get difference");
CHECK(!running_get_difference_);
}
LOG(INFO) << "Finish to apply postponed pts updates, have " << postponed_pts_updates_.size()
<< " left postponed updates";
}
td_->animations_manager_->after_get_difference();
td_->contacts_manager_->after_get_difference();
td_->inline_queries_manager_->after_get_difference();
@ -1385,7 +1404,7 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
}
}
postponed_updates_.emplace(seq_begin,
PendingUpdates(seq_begin, seq_end, date, std::move(updates), std::move(promise)));
PendingSeqUpdates(seq_begin, seq_end, date, std::move(updates), std::move(promise)));
return;
}
@ -1531,7 +1550,7 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
LOG(ERROR) << "Postpone " << updates.size() << " updates [" << seq_begin << ", " << seq_end
<< "] with date = " << date << " from " << source;
postponed_updates_.emplace(seq_begin,
PendingUpdates(seq_begin, seq_end, date, std::move(updates), mpas.get_promise()));
PendingSeqUpdates(seq_begin, seq_end, date, std::move(updates), mpas.get_promise()));
return lock.set_value(Unit());
}
@ -1568,7 +1587,7 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
<< "Already have pending updates with seq = " << seq_begin << ", but receive it again from " << source;
pending_seq_updates_.emplace(seq_begin,
PendingUpdates(seq_begin, seq_end, date, std::move(updates), mpas.get_promise()));
PendingSeqUpdates(seq_begin, seq_end, date, std::move(updates), mpas.get_promise()));
set_seq_gap_timeout(MAX_UNFILLED_GAP_TIME);
lock.set_value(Unit());
}
@ -1623,7 +1642,7 @@ void UpdatesManager::process_updates(vector<tl_object_ptr<telegram_api::Update>>
Promise<Unit> &&promise) {
tl_object_ptr<telegram_api::updatePtsChanged> update_pts_changed;
MultiPromiseActorSafe mpas{"OnPendingUpdatesMultiPromiseActor"};
MultiPromiseActorSafe mpas{"OnProcessUpdatesMultiPromiseActor"};
mpas.add_promise(std::move(promise));
auto lock = mpas.get_promise();
@ -1678,7 +1697,7 @@ void UpdatesManager::process_updates(vector<tl_object_ptr<telegram_api::Update>>
static_cast<telegram_api::updateReadHistoryInbox *>(update.get())->still_unread_count_ = -1;
}
td_->messages_manager_->process_pts_update(std::move(update));
process_pts_update(std::move(update));
}
if (update != nullptr && is_qts_update(update.get())) {
process_qts_update(std::move(update), 0, mpas.get_promise());
@ -1705,6 +1724,161 @@ void UpdatesManager::process_updates(vector<tl_object_ptr<telegram_api::Update>>
lock.set_value(Unit());
}
int32 UpdatesManager::get_min_pending_pts() const {
int32 result = std::numeric_limits<int32>::max();
if (!pending_pts_updates_.empty()) {
auto pts = pending_pts_updates_.begin()->first;
if (pts < result) {
result = pts;
}
}
if (!postponed_pts_updates_.empty()) {
auto pts = postponed_pts_updates_.begin()->first;
if (pts < result) {
result = pts;
}
}
return result;
}
void UpdatesManager::process_pts_update(tl_object_ptr<telegram_api::Update> &&update) {
CHECK(update != nullptr);
// TODO need to save all updates that can change result of running queries not associated with pts (for example
// getHistory) and apply the updates to results of the queries
if (!check_pts_update(update)) {
LOG(ERROR) << "Receive wrong pts update: " << oneline(to_string(update));
return;
}
// must be called only during getDifference
CHECK(pending_pts_updates_.empty());
CHECK(accumulated_pts_ == -1);
td_->messages_manager_->process_pts_update(std::move(update));
}
void UpdatesManager::add_pending_pts_update(tl_object_ptr<telegram_api::Update> &&update, int32 new_pts,
int32 pts_count, Promise<Unit> &&promise, const char *source) {
// do not try to run getDifference from this function
CHECK(update != nullptr);
CHECK(source != nullptr);
LOG(INFO) << "Receive from " << source << " pending " << to_string(update);
if (pts_count < 0 || new_pts <= pts_count) {
LOG(ERROR) << "Receive update with wrong pts = " << new_pts << " or pts_count = " << pts_count << " from " << source
<< ": " << oneline(to_string(update));
return promise.set_value(Unit());
}
// TODO need to save all updates that can change result of running queries not associated with pts (for example
// getHistory) and apply them to result of this queries
if (!check_pts_update(update)) {
LOG(ERROR) << "Receive wrong pts update from " << source << ": " << oneline(to_string(update));
return promise.set_value(Unit());
}
if (DROP_PTS_UPDATES) {
set_pts_gap_timeout(1.0);
return promise.set_value(Unit());
}
int32 old_pts = get_pts();
if (new_pts < old_pts - 99 && Slice(source) != "after get difference") {
bool need_restore_pts = new_pts < old_pts - 19999;
auto now = Time::now();
if (now > last_pts_jump_warning_time_ + 1 && (need_restore_pts || now < last_pts_jump_warning_time_ + 5)) {
LOG(ERROR) << "Restore pts after delete_first_messages from " << old_pts << " to " << new_pts
<< " is disabled, pts_count = " << pts_count << ", update is from " << source << ": "
<< oneline(to_string(update));
last_pts_jump_warning_time_ = now;
}
if (need_restore_pts) {
set_pts_gap_timeout(0.001);
/*
LOG(WARNING) << "Restore pts after delete_first_messages";
set_pts(new_pts - 1, "restore pts after delete_first_messages");
old_pts = get_pts();
CHECK(old_pts == new_pts - 1);
*/
}
}
if (new_pts <= old_pts || (old_pts >= 1 && new_pts > old_pts + 500000000)) {
td_->messages_manager_->skip_old_pending_pts_update(std::move(update), new_pts, old_pts, pts_count, source);
return promise.set_value(Unit());
}
if (running_get_difference_ || !postponed_pts_updates_.empty()) {
LOG(INFO) << "Save pending update got while running getDifference from " << source;
if (running_get_difference_) {
CHECK(update->get_id() == dummyUpdate::ID || update->get_id() == updateSentMessage::ID);
}
postpone_pts_update(std::move(update), new_pts, pts_count, std::move(promise));
return;
}
if (old_pts + pts_count > new_pts) {
LOG(WARNING) << "Have old_pts (= " << old_pts << ") + pts_count (= " << pts_count << ") > new_pts (= " << new_pts
<< "). Logged in " << G()->shared_config().get_option_integer("authorization_date") << ". Update from "
<< source << " = " << oneline(to_string(update));
postpone_pts_update(std::move(update), new_pts, pts_count, std::move(promise));
set_pts_gap_timeout(0.001);
return;
}
accumulated_pts_count_ += pts_count;
if (new_pts > accumulated_pts_) {
accumulated_pts_ = new_pts;
}
if (old_pts + accumulated_pts_count_ > accumulated_pts_) {
LOG(WARNING) << "Have old_pts (= " << old_pts << ") + accumulated_pts_count (= " << accumulated_pts_count_
<< ") > accumulated_pts (= " << accumulated_pts_ << "). new_pts = " << new_pts
<< ", pts_count = " << pts_count << ". Logged in "
<< G()->shared_config().get_option_integer("authorization_date") << ". Update from " << source << " = "
<< oneline(to_string(update));
postpone_pts_update(std::move(update), new_pts, pts_count, std::move(promise));
set_pts_gap_timeout(0.001);
return;
}
LOG_IF(INFO, pts_count == 0 && update->get_id() != dummyUpdate::ID) << "Skip useless update " << to_string(update);
if (pending_pts_updates_.empty() && old_pts + accumulated_pts_count_ == accumulated_pts_ &&
!pts_gap_timeout_.has_timeout()) {
if (pts_count > 0) {
td_->messages_manager_->process_pts_update(std::move(update));
set_pts(accumulated_pts_, "process pending updates fast path")
.set_value(Unit()); // TODO can't set until get messages really stored on persistent storage
accumulated_pts_count_ = 0;
accumulated_pts_ = -1;
}
promise.set_value(Unit());
return;
}
pending_pts_updates_.emplace(new_pts, PendingPtsUpdate(std::move(update), new_pts, pts_count, std::move(promise)));
if (old_pts + accumulated_pts_count_ < accumulated_pts_) {
set_pts_gap_timeout(MAX_UNFILLED_GAP_TIME);
return;
}
CHECK(old_pts + accumulated_pts_count_ == accumulated_pts_);
if (!pending_pts_updates_.empty()) {
process_pending_pts_updates();
}
}
void UpdatesManager::postpone_pts_update(tl_object_ptr<telegram_api::Update> &&update, int32 pts, int32 pts_count,
Promise<Unit> &&promise) {
postponed_pts_updates_.emplace(pts, PendingPtsUpdate(std::move(update), pts, pts_count, std::move(promise)));
}
void UpdatesManager::process_seq_updates(int32 seq_end, int32 date,
vector<tl_object_ptr<telegram_api::Update>> &&updates,
Promise<Unit> &&promise) {
@ -1752,6 +1926,24 @@ void UpdatesManager::process_qts_update(tl_object_ptr<telegram_api::Update> &&up
promise.set_value(Unit());
}
void UpdatesManager::process_pending_pts_updates() {
for (auto &update : pending_pts_updates_) {
td_->messages_manager_->process_pts_update(std::move(update.second.update));
update.second.promise.set_value(Unit());
}
set_pts(accumulated_pts_, "process pending updates")
.set_value(Unit()); // TODO can't set until get messages really stored on persistent storage
drop_pending_pts_updates();
}
void UpdatesManager::drop_pending_pts_updates() {
accumulated_pts_count_ = 0;
accumulated_pts_ = -1;
pts_gap_timeout_.cancel_timeout();
pending_pts_updates_.clear();
}
void UpdatesManager::process_pending_seq_updates() {
if (!pending_seq_updates_.empty()) {
LOG(DEBUG) << "Trying to process " << pending_seq_updates_.size() << " pending seq updates";
@ -1815,6 +2007,14 @@ void UpdatesManager::process_pending_qts_updates() {
}
}
void UpdatesManager::set_pts_gap_timeout(double timeout) {
if (!pts_gap_timeout_.has_timeout()) {
pts_gap_timeout_.set_callback(std::move(fill_pts_gap));
pts_gap_timeout_.set_callback_data(static_cast<void *>(td_));
pts_gap_timeout_.set_timeout_in(timeout);
}
}
void UpdatesManager::set_seq_gap_timeout(double timeout) {
if (!seq_gap_timeout_.has_timeout()) {
seq_gap_timeout_.set_callback(std::move(fill_seq_gap));
@ -1841,8 +2041,7 @@ void UpdatesManager::on_pending_update(tl_object_ptr<telegram_api::Update> updat
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateNewMessage> update, Promise<Unit> &&promise) {
int new_pts = update->pts_;
int pts_count = update->pts_count_;
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, std::move(promise),
"updateNewMessage");
add_pending_pts_update(std::move(update), new_pts, pts_count, std::move(promise), "updateNewMessage");
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateNewChannelMessage> update, Promise<Unit> &&promise) {
@ -1861,42 +2060,36 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateReadMessagesCon
Promise<Unit> &&promise) {
int new_pts = update->pts_;
int pts_count = update->pts_count_;
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, std::move(promise),
"updateReadMessagesContents");
add_pending_pts_update(std::move(update), new_pts, pts_count, std::move(promise), "updateReadMessagesContents");
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateEditMessage> update, Promise<Unit> &&promise) {
int new_pts = update->pts_;
int pts_count = update->pts_count_;
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, std::move(promise),
"updateEditMessage");
add_pending_pts_update(std::move(update), new_pts, pts_count, std::move(promise), "updateEditMessage");
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateDeleteMessages> update, Promise<Unit> &&promise) {
int new_pts = update->pts_;
int pts_count = update->pts_count_;
if (update->messages_.empty()) {
td_->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), new_pts, pts_count, Promise<Unit>(),
"updateDeleteMessages");
add_pending_pts_update(make_tl_object<dummyUpdate>(), new_pts, pts_count, Promise<Unit>(), "updateDeleteMessages");
promise.set_value(Unit());
} else {
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, std::move(promise),
"updateDeleteMessages");
add_pending_pts_update(std::move(update), new_pts, pts_count, std::move(promise), "updateDeleteMessages");
}
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateReadHistoryInbox> update, Promise<Unit> &&promise) {
int new_pts = update->pts_;
int pts_count = update->pts_count_;
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, std::move(promise),
"updateReadHistoryInbox");
add_pending_pts_update(std::move(update), new_pts, pts_count, std::move(promise), "updateReadHistoryInbox");
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateReadHistoryOutbox> update, Promise<Unit> &&promise) {
int new_pts = update->pts_;
int pts_count = update->pts_count_;
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, std::move(promise),
"updateReadHistoryOutbox");
add_pending_pts_update(std::move(update), new_pts, pts_count, std::move(promise), "updateReadHistoryOutbox");
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateServiceNotification> update, Promise<Unit> &&promise) {
@ -2008,8 +2201,7 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateReadChannelDisc
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updatePinnedMessages> update, Promise<Unit> &&promise) {
int new_pts = update->pts_;
int pts_count = update->pts_count_;
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, std::move(promise),
"updatePinnedMessages");
add_pending_pts_update(std::move(update), new_pts, pts_count, std::move(promise), "updatePinnedMessages");
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updatePinnedChannelMessages> update,
@ -2063,8 +2255,8 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updatePeerLocated> up
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateWebPage> update, Promise<Unit> &&promise) {
td_->web_pages_manager_->on_get_web_page(std::move(update->webpage_), DialogId());
td_->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), update->pts_, update->pts_count_,
Promise<Unit>(), "updateWebPage");
add_pending_pts_update(make_tl_object<dummyUpdate>(), update->pts_, update->pts_count_, Promise<Unit>(),
"updateWebPage");
promise.set_value(Unit());
}
@ -2084,8 +2276,8 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateFolderPeers> up
}
if (update->pts_ > 0) {
td_->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), update->pts_, update->pts_count_,
Promise<Unit>(), "updateFolderPeers");
add_pending_pts_update(make_tl_object<dummyUpdate>(), update->pts_, update->pts_count_, Promise<Unit>(),
"updateFolderPeers");
}
promise.set_value(Unit());
}

View File

@ -40,6 +40,9 @@ class UpdatesManager : public Actor {
void on_get_difference(tl_object_ptr<telegram_api::updates_Difference> &&difference_ptr);
void add_pending_pts_update(tl_object_ptr<telegram_api::Update> &&update, int32 new_pts, int32 pts_count,
Promise<Unit> &&promise, const char *source);
static std::unordered_set<int64> get_sent_messages_random_ids(const telegram_api::Updates *updates_ptr);
static vector<const tl_object_ptr<telegram_api::Message> *> get_new_messages(
@ -63,8 +66,6 @@ class UpdatesManager : public Actor {
void on_server_pong(tl_object_ptr<telegram_api::updates_state> &&state);
static bool check_pts_update(const tl_object_ptr<telegram_api::Update> &update);
int32 get_pts() const {
return pts_manager_.mem_pts();
}
@ -77,20 +78,30 @@ class UpdatesManager : public Actor {
Promise<> set_pts(int32 pts, const char *source) TD_WARN_UNUSED_RESULT;
static const double MAX_UNFILLED_GAP_TIME;
static void fill_pts_gap(void *td);
bool running_get_difference() const {
return running_get_difference_;
}
private:
static constexpr int32 FORCED_GET_DIFFERENCE_PTS_DIFF = 100000;
static const double MAX_UNFILLED_GAP_TIME;
static constexpr bool DROP_PTS_UPDATES = false;
friend class OnUpdate;
class PendingUpdates {
class PendingPtsUpdate {
public:
tl_object_ptr<telegram_api::Update> update;
int32 pts;
int32 pts_count;
Promise<Unit> promise;
PendingPtsUpdate(tl_object_ptr<telegram_api::Update> &&update, int32 pts, int32 pts_count, Promise<Unit> &&promise)
: update(std::move(update)), pts(pts), pts_count(pts_count), promise(std::move(promise)) {
}
};
class PendingSeqUpdates {
public:
int32 seq_begin;
int32 seq_end;
@ -98,8 +109,8 @@ class UpdatesManager : public Actor {
vector<tl_object_ptr<telegram_api::Update>> updates;
Promise<Unit> promise;
PendingUpdates(int32 seq_begin, int32 seq_end, int32 date, vector<tl_object_ptr<telegram_api::Update>> &&updates,
Promise<Unit> &&promise)
PendingSeqUpdates(int32 seq_begin, int32 seq_end, int32 date, vector<tl_object_ptr<telegram_api::Update>> &&updates,
Promise<Unit> &&promise)
: seq_begin(seq_begin), seq_end(seq_end), date(date), updates(std::move(updates)), promise(std::move(promise)) {
}
};
@ -121,11 +132,20 @@ class UpdatesManager : public Actor {
int32 short_update_date_ = 0;
std::multimap<int32, PendingUpdates> postponed_updates_; // updates received during getDifference
std::multimap<int32, PendingUpdates> pending_seq_updates_; // updates with too big seq
int32 accumulated_pts_count_ = 0;
int32 accumulated_pts_ = -1;
double last_pts_jump_warning_time_ = 0;
std::multimap<int32, PendingPtsUpdate> pending_pts_updates_;
std::multimap<int32, PendingPtsUpdate> postponed_pts_updates_;
std::multimap<int32, PendingSeqUpdates> postponed_updates_; // updates received during getDifference
std::multimap<int32, PendingSeqUpdates> pending_seq_updates_; // updates with too big seq
std::map<int32, PendingQtsUpdate> pending_qts_updates_; // updates with too big qts
Timeout pts_gap_timeout_;
Timeout seq_gap_timeout_;
Timeout qts_gap_timeout_;
@ -168,15 +188,26 @@ class UpdatesManager : public Actor {
void process_updates(vector<tl_object_ptr<telegram_api::Update>> &&updates, bool force_apply,
Promise<Unit> &&promise);
void postpone_pts_update(tl_object_ptr<telegram_api::Update> &&update, int32 pts, int32 pts_count,
Promise<Unit> &&promise);
void process_pts_update(tl_object_ptr<telegram_api::Update> &&update);
void process_seq_updates(int32 seq_end, int32 date, vector<tl_object_ptr<telegram_api::Update>> &&updates,
Promise<Unit> &&promise);
void process_qts_update(tl_object_ptr<telegram_api::Update> &&update_ptr, int32 qts, Promise<Unit> &&promise);
void process_pending_pts_updates();
void process_pending_seq_updates();
void process_pending_qts_updates();
void drop_pending_pts_updates();
static void fill_pts_gap(void *td);
static void fill_seq_gap(void *td);
static void fill_qts_gap(void *td);
@ -185,6 +216,8 @@ class UpdatesManager : public Actor {
static void fill_gap(void *td, const char *source);
void set_pts_gap_timeout(double timeout);
void set_seq_gap_timeout(double timeout);
void set_qts_gap_timeout(double timeout);
@ -197,10 +230,14 @@ class UpdatesManager : public Actor {
void after_get_difference();
int32 get_min_pending_pts() const;
static bool have_update_pts_changed(const vector<tl_object_ptr<telegram_api::Update>> &updates);
static bool check_pts_update_dialog_id(DialogId dialog_id);
static bool check_pts_update(const tl_object_ptr<telegram_api::Update> &update);
static bool is_pts_update(const telegram_api::Update *update);
static int32 get_update_pts(const telegram_api::Update *update);