Remove force_apply from add_pending_update.

This commit is contained in:
levlam 2021-01-12 01:46:13 +03:00
parent 840391b95c
commit 36b4c55927
3 changed files with 81 additions and 39 deletions

View File

@ -664,7 +664,7 @@ class UnpinAllMessagesQuery : public Td::ResultHandler {
std::move(promise), "unpin all messages");
} else {
td->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), affected_history->pts_,
affected_history->pts_count_, false, std::move(promise),
affected_history->pts_count_, std::move(promise),
"unpin all messages");
}
} else if (affected_history->offset_ <= 0) {
@ -1561,7 +1561,7 @@ class ReadMessagesContentsQuery : public Td::ResultHandler {
if (affected_messages->pts_count_ > 0) {
td->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), affected_messages->pts_,
affected_messages->pts_count_, false, Promise<Unit>(),
affected_messages->pts_count_, Promise<Unit>(),
"read messages content query");
}
@ -1779,8 +1779,7 @@ class ReadHistoryQuery : public Td::ResultHandler {
if (affected_messages->pts_count_ > 0) {
td->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), affected_messages->pts_,
affected_messages->pts_count_, false, Promise<Unit>(),
"read history query");
affected_messages->pts_count_, Promise<Unit>(), "read history query");
}
promise_.set_value(Unit());
@ -2249,8 +2248,7 @@ class DeleteHistoryQuery : public Td::ResultHandler {
if (affected_history->pts_count_ > 0) {
td->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), affected_history->pts_,
affected_history->pts_count_, false, Promise<Unit>(),
"delete history query");
affected_history->pts_count_, Promise<Unit>(), "delete history query");
}
if (affected_history->offset_ > 0) {
@ -2449,7 +2447,7 @@ class ReadAllMentionsQuery : public Td::ResultHandler {
td->updates_manager_->get_difference("Wrong messages_readMentions result");
} else {
td->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), affected_history->pts_,
affected_history->pts_count_, false, Promise<Unit>(),
affected_history->pts_count_, Promise<Unit>(),
"read all mentions query");
}
}
@ -2586,7 +2584,7 @@ class SendMessageActor : public NetActorOnce {
td->messages_manager_->add_pending_update(
make_tl_object<updateSentMessage>(random_id_, message_id, sent_message->date_), sent_message->pts_,
sent_message->pts_count_, false, Promise<Unit>(), "send message actor");
sent_message->pts_count_, Promise<Unit>(), "send message actor");
}
void on_error(uint64 id, Status status) override {
@ -3603,7 +3601,7 @@ class DeleteMessagesQuery : public Td::ResultHandler {
if (affected_messages->pts_count_ > 0) {
td->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), affected_messages->pts_,
affected_messages->pts_count_, false, Promise<Unit>(),
affected_messages->pts_count_, Promise<Unit>(),
"delete messages query");
}
if (--query_count_ == 0) {
@ -6192,13 +6190,30 @@ bool MessagesManager::check_pts_update(const tl_object_ptr<telegram_api::Update>
}
}
void MessagesManager::process_pts_update(tl_object_ptr<telegram_api::Update> &&update) {
CHECK(update != nullptr);
// TODO need to save all updates that can change result of running queries not associated with pts (for example
// getHistory) and apply the updates to results of the queries
if (!check_pts_update(update)) {
LOG(ERROR) << "Receive wrong pts update: " << oneline(to_string(update));
return;
}
// must be called only during getDifference
CHECK(pending_pts_updates_.empty());
CHECK(accumulated_pts_ == -1);
process_update(std::move(update));
}
void MessagesManager::add_pending_update(tl_object_ptr<telegram_api::Update> &&update, int32 new_pts, int32 pts_count,
bool force_apply, Promise<Unit> &&promise, const char *source) {
Promise<Unit> &&promise, const char *source) {
// do not try to run getDifference from this function
CHECK(update != nullptr);
CHECK(source != nullptr);
LOG(INFO) << "Receive from " << source << " pending " << to_string(update) << "new_pts = " << new_pts
<< ", pts_count = " << pts_count << ", force_apply = " << force_apply;
LOG(INFO) << "Receive from " << source << " pending " << to_string(update);
if (pts_count < 0 || new_pts <= pts_count) {
LOG(ERROR) << "Receive update with wrong pts = " << new_pts << " or pts_count = " << pts_count << " from " << source
<< ": " << oneline(to_string(update));
@ -6213,16 +6228,6 @@ void MessagesManager::add_pending_update(tl_object_ptr<telegram_api::Update> &&u
return promise.set_value(Unit());
}
if (force_apply) {
CHECK(pending_pts_updates_.empty());
CHECK(accumulated_pts_ == -1);
if (pts_count != 0) {
LOG(ERROR) << "Receive forced update with pts_count = " << pts_count << " from " << source;
}
process_update(std::move(update));
return promise.set_value(Unit());
}
if (DROP_UPDATES) {
set_get_difference_timeout(1.0);
return promise.set_value(Unit());
@ -8749,7 +8754,7 @@ void MessagesManager::after_get_difference() {
LOG(INFO) << "Begin to apply " << postponed_updates.size() << " postponed pts updates";
for (auto &postponed_update : postponed_updates) {
auto &update = postponed_update.second;
add_pending_update(std::move(update.update), update.pts, update.pts_count, false, std::move(update.promise),
add_pending_update(std::move(update.update), update.pts, update.pts_count, std::move(update.promise),
"after get difference");
CHECK(!td_->updates_manager_->running_get_difference());
}

View File

@ -790,8 +790,10 @@ class MessagesManager : public Actor {
int32 get_min_pending_pts() const;
void process_pts_update(tl_object_ptr<telegram_api::Update> &&update);
void add_pending_update(tl_object_ptr<telegram_api::Update> &&update, int32 new_pts, int32 pts_count,
bool force_apply, Promise<Unit> &&promise, const char *source);
Promise<Unit> &&promise, const char *source);
void add_pending_channel_update(DialogId dialog_id, tl_object_ptr<telegram_api::Update> &&update, int32 new_pts,
int32 pts_count, Promise<Unit> &&promise, const char *source,

View File

@ -1140,7 +1140,12 @@ void UpdatesManager::process_get_difference_updates(
}
if (constructor_id == telegram_api::updateFolderPeers::ID) {
on_update(move_tl_object_as<telegram_api::updateFolderPeers>(update), true, Promise<Unit>());
auto update_folder_peers = move_tl_object_as<telegram_api::updateFolderPeers>(update);
if (update_folder_peers->pts_count_ != 0) {
LOG(ERROR) << "Receive updateFolderPeers with pts_count = " << update_folder_peers->pts_count_;
}
update_folder_peers->pts_ = 0;
on_update(std::move(update_folder_peers), true, Promise<Unit>());
CHECK(!running_get_difference_);
}
@ -1799,7 +1804,11 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateNewMessage> upd
Promise<Unit> &&promise) {
int new_pts = update->pts_;
int pts_count = update->pts_count_;
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply, std::move(promise),
if (force_apply) {
td_->messages_manager_->process_pts_update(std::move(update));
return promise.set_value(Unit());
}
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, std::move(promise),
"updateNewMessage");
}
@ -1828,7 +1837,11 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateReadMessagesCon
Promise<Unit> &&promise) {
int new_pts = update->pts_;
int pts_count = update->pts_count_;
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply, std::move(promise),
if (force_apply) {
td_->messages_manager_->process_pts_update(std::move(update));
return promise.set_value(Unit());
}
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, std::move(promise),
"updateReadMessagesContents");
}
@ -1836,7 +1849,11 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateEditMessage> up
Promise<Unit> &&promise) {
int new_pts = update->pts_;
int pts_count = update->pts_count_;
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply, std::move(promise),
if (force_apply) {
td_->messages_manager_->process_pts_update(std::move(update));
return promise.set_value(Unit());
}
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, std::move(promise),
"updateEditMessage");
}
@ -1844,12 +1861,16 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateDeleteMessages>
Promise<Unit> &&promise) {
int new_pts = update->pts_;
int pts_count = update->pts_count_;
if (force_apply) {
td_->messages_manager_->process_pts_update(std::move(update));
return promise.set_value(Unit());
}
if (update->messages_.empty()) {
td_->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), new_pts, pts_count, force_apply,
Promise<Unit>(), "updateDeleteMessages");
td_->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), new_pts, pts_count, Promise<Unit>(),
"updateDeleteMessages");
promise.set_value(Unit());
} else {
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply, std::move(promise),
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, std::move(promise),
"updateDeleteMessages");
}
}
@ -1860,8 +1881,10 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateReadHistoryInbo
int pts_count = update->pts_count_;
if (force_apply) {
update->still_unread_count_ = -1;
td_->messages_manager_->process_pts_update(std::move(update));
return promise.set_value(Unit());
}
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply, std::move(promise),
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, std::move(promise),
"updateReadHistoryInbox");
}
@ -1869,7 +1892,11 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateReadHistoryOutb
Promise<Unit> &&promise) {
int new_pts = update->pts_;
int pts_count = update->pts_count_;
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply, std::move(promise),
if (force_apply) {
td_->messages_manager_->process_pts_update(std::move(update));
return promise.set_value(Unit());
}
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, std::move(promise),
"updateReadHistoryOutbox");
}
@ -1993,7 +2020,11 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updatePinnedMessages>
Promise<Unit> &&promise) {
int new_pts = update->pts_;
int pts_count = update->pts_count_;
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply, std::move(promise),
if (force_apply) {
td_->messages_manager_->process_pts_update(std::move(update));
return promise.set_value(Unit());
}
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, std::move(promise),
"updatePinnedMessages");
}
@ -2052,8 +2083,10 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updatePeerLocated> up
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateWebPage> update, bool force_apply,
Promise<Unit> &&promise) {
td_->web_pages_manager_->on_get_web_page(std::move(update->webpage_), DialogId());
td_->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), update->pts_, update->pts_count_,
force_apply, Promise<Unit>(), "updateWebPage");
if (!force_apply) {
td_->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), update->pts_, update->pts_count_,
Promise<Unit>(), "updateWebPage");
}
promise.set_value(Unit());
}
@ -2066,7 +2099,7 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateChannelWebPage>
promise.set_value(Unit());
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateFolderPeers> update, bool force_apply,
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateFolderPeers> update, bool /*force_apply*/,
Promise<Unit> &&promise) {
for (auto &folder_peer : update->folder_peers_) {
DialogId dialog_id(folder_peer->peer_);
@ -2074,8 +2107,10 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateFolderPeers> up
td_->messages_manager_->on_update_dialog_folder_id(dialog_id, folder_id);
}
td_->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), update->pts_, update->pts_count_,
force_apply, Promise<Unit>(), "updateFolderPeers");
if (update->pts_ > 0) {
td_->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), update->pts_, update->pts_count_,
Promise<Unit>(), "updateFolderPeers");
}
promise.set_value(Unit());
}