Pass promise to MessagesManager::add_pending_update.

This commit is contained in:
levlam 2020-12-24 21:56:15 +03:00
parent c1a1fc881c
commit 4ea10b3de6
3 changed files with 67 additions and 52 deletions

View File

@ -660,7 +660,8 @@ class UnpinAllMessagesQuery : public Td::ResultHandler {
"unpin all messages");
} else {
td->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), affected_history->pts_,
affected_history->pts_count_, false, "unpin all messages");
affected_history->pts_count_, false, Promise<Unit>(),
"unpin all messages");
}
}
@ -1556,7 +1557,8 @@ class ReadMessagesContentsQuery : public Td::ResultHandler {
if (affected_messages->pts_count_ > 0) {
td->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), affected_messages->pts_,
affected_messages->pts_count_, false, "read messages content query");
affected_messages->pts_count_, false, Promise<Unit>(),
"read messages content query");
}
promise_.set_value(Unit());
@ -1773,7 +1775,8 @@ class ReadHistoryQuery : public Td::ResultHandler {
if (affected_messages->pts_count_ > 0) {
td->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), affected_messages->pts_,
affected_messages->pts_count_, false, "read history query");
affected_messages->pts_count_, false, Promise<Unit>(),
"read history query");
}
promise_.set_value(Unit());
@ -2242,7 +2245,8 @@ class DeleteHistoryQuery : public Td::ResultHandler {
if (affected_history->pts_count_ > 0) {
td->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), affected_history->pts_,
affected_history->pts_count_, false, "delete history query");
affected_history->pts_count_, false, Promise<Unit>(),
"delete history query");
}
if (affected_history->offset_ > 0) {
@ -2441,7 +2445,8 @@ class ReadAllMentionsQuery : public Td::ResultHandler {
td->updates_manager_->get_difference("Wrong messages_readMentions result");
} else {
td->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), affected_history->pts_,
affected_history->pts_count_, false, "read all mentions query");
affected_history->pts_count_, false, Promise<Unit>(),
"read all mentions query");
}
}
@ -2577,7 +2582,7 @@ class SendMessageActor : public NetActorOnce {
td->messages_manager_->add_pending_update(
make_tl_object<updateSentMessage>(random_id_, message_id, sent_message->date_), sent_message->pts_,
sent_message->pts_count_, false, "send message actor");
sent_message->pts_count_, false, Promise<Unit>(), "send message actor");
}
void on_error(uint64 id, Status status) override {
@ -3582,7 +3587,8 @@ class DeleteMessagesQuery : public Td::ResultHandler {
if (affected_messages->pts_count_ > 0) {
td->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), affected_messages->pts_,
affected_messages->pts_count_, false, "delete messages query");
affected_messages->pts_count_, false, Promise<Unit>(),
"delete messages query");
}
if (--query_count_ == 0) {
promise_.set_value(Unit());
@ -6122,7 +6128,7 @@ void MessagesManager::skip_old_pending_update(tl_object_ptr<telegram_api::Update
}
void MessagesManager::add_pending_update(tl_object_ptr<telegram_api::Update> &&update, int32 new_pts, int32 pts_count,
bool force_apply, const char *source) {
bool force_apply, Promise<Unit> &&promise, const char *source) {
// do not try to run getDifference from this function
CHECK(update != nullptr);
CHECK(source != nullptr);
@ -6131,7 +6137,7 @@ void MessagesManager::add_pending_update(tl_object_ptr<telegram_api::Update> &&u
if (pts_count < 0 || new_pts <= pts_count) {
LOG(ERROR) << "Receive update with wrong pts = " << new_pts << " or pts_count = " << pts_count << " from " << source
<< ": " << oneline(to_string(update));
return;
return promise.set_value(Unit());
}
// TODO need to save all updates that can change result of running queries not associated with pts (for example
@ -6148,7 +6154,7 @@ void MessagesManager::add_pending_update(tl_object_ptr<telegram_api::Update> &&u
auto update_new_message = static_cast<const telegram_api::updateNewMessage *>(update.get());
DialogId dialog_id = get_message_dialog_id(update_new_message->message_);
if (!check_update_dialog_id(update, dialog_id)) {
return;
return promise.set_value(Unit());
}
break;
}
@ -6156,7 +6162,7 @@ void MessagesManager::add_pending_update(tl_object_ptr<telegram_api::Update> &&u
auto update_read_history_inbox = static_cast<const telegram_api::updateReadHistoryInbox *>(update.get());
auto dialog_id = DialogId(update_read_history_inbox->peer_);
if (!check_update_dialog_id(update, dialog_id)) {
return;
return promise.set_value(Unit());
}
break;
}
@ -6164,7 +6170,7 @@ void MessagesManager::add_pending_update(tl_object_ptr<telegram_api::Update> &&u
auto update_read_history_outbox = static_cast<const telegram_api::updateReadHistoryOutbox *>(update.get());
auto dialog_id = DialogId(update_read_history_outbox->peer_);
if (!check_update_dialog_id(update, dialog_id)) {
return;
return promise.set_value(Unit());
}
break;
}
@ -6172,7 +6178,7 @@ void MessagesManager::add_pending_update(tl_object_ptr<telegram_api::Update> &&u
auto update_edit_message = static_cast<const telegram_api::updateEditMessage *>(update.get());
DialogId dialog_id = get_message_dialog_id(update_edit_message->message_);
if (!check_update_dialog_id(update, dialog_id)) {
return;
return promise.set_value(Unit());
}
break;
}
@ -6180,7 +6186,7 @@ void MessagesManager::add_pending_update(tl_object_ptr<telegram_api::Update> &&u
auto update_pinned_messages = static_cast<const telegram_api::updatePinnedMessages *>(update.get());
auto dialog_id = DialogId(update_pinned_messages->peer_);
if (!check_update_dialog_id(update, dialog_id)) {
return;
return promise.set_value(Unit());
}
break;
}
@ -6190,17 +6196,18 @@ void MessagesManager::add_pending_update(tl_object_ptr<telegram_api::Update> &&u
}
if (force_apply) {
CHECK(pending_updates_.empty());
CHECK(pending_pts_updates_.empty());
CHECK(accumulated_pts_ == -1);
if (pts_count != 0) {
LOG(ERROR) << "Receive forced update with pts_count = " << pts_count << " from " << source;
}
process_update(std::move(update));
return;
return promise.set_value(Unit());
}
if (DROP_UPDATES) {
return set_get_difference_timeout(1.0);
set_get_difference_timeout(1.0);
return promise.set_value(Unit());
}
int32 old_pts = td_->updates_manager_->get_pts();
@ -6226,14 +6233,17 @@ void MessagesManager::add_pending_update(tl_object_ptr<telegram_api::Update> &&u
if (new_pts <= old_pts) {
skip_old_pending_update(std::move(update), new_pts, old_pts, pts_count, source);
return;
return promise.set_value(Unit());
}
if (td_->updates_manager_->running_get_difference()) {
LOG(INFO) << "Save pending update got while running getDifference from " << source;
CHECK(update->get_id() == dummyUpdate::ID || update->get_id() == updateSentMessage::ID);
if (pts_count > 0) {
postponed_pts_updates_.emplace(new_pts, PendingPtsUpdate(std::move(update), new_pts, pts_count));
postponed_pts_updates_.emplace(new_pts,
PendingPtsUpdate(std::move(update), new_pts, pts_count, std::move(promise)));
} else {
promise.set_value(Unit());
}
return;
}
@ -6242,6 +6252,7 @@ void MessagesManager::add_pending_update(tl_object_ptr<telegram_api::Update> &&u
LOG(WARNING) << "Have old_pts (= " << old_pts << ") + pts_count (= " << pts_count << ") > new_pts (= " << new_pts
<< "). Logged in " << G()->shared_config().get_option_integer("authorization_date") << ". Update from "
<< source << " = " << oneline(to_string(update));
promise.set_value(Unit()); // TODO postpone
set_get_difference_timeout(0.001);
return;
}
@ -6257,13 +6268,14 @@ void MessagesManager::add_pending_update(tl_object_ptr<telegram_api::Update> &&u
<< ", pts_count = " << pts_count << ". Logged in "
<< G()->shared_config().get_option_integer("authorization_date") << ". Update from " << source << " = "
<< oneline(to_string(update));
promise.set_value(Unit()); // TODO postpone
set_get_difference_timeout(0.001);
return;
}
LOG_IF(INFO, pts_count == 0 && update->get_id() != dummyUpdate::ID) << "Skip useless update " << to_string(update);
if (pending_updates_.empty() && old_pts + accumulated_pts_count_ == accumulated_pts_ &&
if (pending_pts_updates_.empty() && old_pts + accumulated_pts_count_ == accumulated_pts_ &&
!pts_gap_timeout_.has_timeout()) {
if (pts_count > 0) {
process_update(std::move(update));
@ -6273,11 +6285,12 @@ void MessagesManager::add_pending_update(tl_object_ptr<telegram_api::Update> &&u
accumulated_pts_count_ = 0;
accumulated_pts_ = -1;
}
promise.set_value(Unit());
return;
}
if (pts_count > 0) {
pending_updates_.emplace(new_pts, PendingPtsUpdate(std::move(update), new_pts, pts_count));
pending_pts_updates_.emplace(new_pts, PendingPtsUpdate(std::move(update), new_pts, pts_count, std::move(promise)));
}
if (old_pts + accumulated_pts_count_ < accumulated_pts_) {
@ -6286,7 +6299,7 @@ void MessagesManager::add_pending_update(tl_object_ptr<telegram_api::Update> &&u
}
CHECK(old_pts + accumulated_pts_count_ == accumulated_pts_);
if (!pending_updates_.empty()) {
if (!pending_pts_updates_.empty()) {
process_pending_updates();
}
}
@ -7193,7 +7206,8 @@ void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_p
if (running_get_channel_difference(dialog_id)) {
if (pts_count > 0) {
d->postponed_channel_updates.emplace(new_pts, PendingPtsUpdate(std::move(update), new_pts, pts_count));
d->postponed_channel_updates.emplace(new_pts,
PendingPtsUpdate(std::move(update), new_pts, pts_count, Promise<Unit>()));
}
LOG(INFO) << "Postpone channel update, because getChannelDifference is run";
return;
@ -7204,7 +7218,8 @@ void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_p
<< ", pts_count = " << pts_count << " in update from " << source;
if (pts_count > 0) {
d->postponed_channel_updates.emplace(new_pts, PendingPtsUpdate(std::move(update), new_pts, pts_count));
d->postponed_channel_updates.emplace(new_pts,
PendingPtsUpdate(std::move(update), new_pts, pts_count, Promise<Unit>()));
}
get_channel_difference(dialog_id, old_pts, true, "add_pending_channel_update pts mismatch");
@ -7410,8 +7425,9 @@ void MessagesManager::on_message_edited(FullMessageId full_message_id) {
}
void MessagesManager::process_pending_updates() {
for (auto &update : pending_updates_) {
for (auto &update : pending_pts_updates_) {
process_update(std::move(update.second.update));
update.second.promise.set_value(Unit());
}
td_->updates_manager_->set_pts(accumulated_pts_, "process pending updates")
@ -7423,7 +7439,7 @@ void MessagesManager::drop_pending_updates() {
accumulated_pts_count_ = 0;
accumulated_pts_ = -1;
pts_gap_timeout_.cancel_timeout();
pending_updates_.clear();
pending_pts_updates_.clear();
}
string MessagesManager::get_notification_settings_scope_database_key(NotificationSettingsScope scope) {
@ -8749,8 +8765,8 @@ void MessagesManager::before_get_difference() {
// scheduled messages are not returned in getDifference, so we must always reget them after it
scheduled_messages_sync_generation_++;
postponed_pts_updates_.insert(std::make_move_iterator(pending_updates_.begin()),
std::make_move_iterator(pending_updates_.end()));
postponed_pts_updates_.insert(std::make_move_iterator(pending_pts_updates_.begin()),
std::make_move_iterator(pending_pts_updates_.end()));
drop_pending_updates();
}
@ -8766,9 +8782,10 @@ void MessagesManager::after_get_difference() {
if (new_pts <= old_pts) {
skip_old_pending_update(std::move(update.second.update), new_pts, old_pts, update.second.pts_count,
"after get difference");
update.second.promise.set_value(Unit());
} else {
add_pending_update(std::move(update.second.update), update.second.pts, update.second.pts_count, false,
"after get difference");
std::move(update.second.promise), "after get difference");
}
CHECK(!td_->updates_manager_->running_get_difference());
}
@ -8838,13 +8855,13 @@ void MessagesManager::after_get_difference() {
const Dialog *d = get_dialog(dialog_id);
CHECK(d != nullptr);
if (dialog_id.get_type() == DialogType::Channel || pending_updates_.empty() || message_id.is_scheduled() ||
if (dialog_id.get_type() == DialogType::Channel || pending_pts_updates_.empty() || message_id.is_scheduled() ||
message_id <= d->last_new_message_id) {
LOG(ERROR) << "Receive updateMessageId from " << it.second << " to " << full_message_id
<< " but not receive corresponding message, last_new_message_id = " << d->last_new_message_id;
}
if (dialog_id.get_type() != DialogType::Channel &&
(pending_updates_.empty() || message_id.is_scheduled() || message_id <= d->last_new_message_id)) {
(pending_pts_updates_.empty() || message_id.is_scheduled() || message_id <= d->last_new_message_id)) {
dump_debug_message_op(get_dialog(dialog_id));
}
if (message_id.is_scheduled() || message_id <= d->last_new_message_id) {
@ -24454,6 +24471,8 @@ void MessagesManager::on_message_media_edited(DialogId dialog_id, MessageId mess
FileId thumbnail_file_id, bool was_uploaded, bool was_thumbnail_uploaded,
string file_reference, int32 schedule_date, uint64 generation,
Result<Unit> &&result) {
// must not run getDifference
CHECK(message_id.is_any_server());
auto m = get_message({dialog_id, message_id});
if (m == nullptr || m->edit_generation != generation) {

View File

@ -793,7 +793,7 @@ class MessagesManager : public Actor {
bool skip_not_found);
void add_pending_update(tl_object_ptr<telegram_api::Update> &&update, int32 new_pts, int32 pts_count,
bool force_apply, const char *source);
bool force_apply, Promise<Unit> &&promise, const char *source);
void add_pending_channel_update(DialogId dialog_id, tl_object_ptr<telegram_api::Update> &&update, int32 new_pts,
int32 pts_count, const char *source, bool is_postponed_update = false);
@ -973,9 +973,10 @@ class MessagesManager : public Actor {
tl_object_ptr<telegram_api::Update> update;
int32 pts;
int32 pts_count;
Promise<Unit> promise;
PendingPtsUpdate(tl_object_ptr<telegram_api::Update> &&update, int32 pts, int32 pts_count)
: update(std::move(update)), pts(pts), pts_count(pts_count) {
PendingPtsUpdate(tl_object_ptr<telegram_api::Update> &&update, int32 pts, int32 pts_count, Promise<Unit> &&promise)
: update(std::move(update)), pts(pts), pts_count(pts_count), promise(std::move(promise)) {
}
};
@ -3107,7 +3108,7 @@ class MessagesManager : public Actor {
bool running_get_difference_ = false; // true after before_get_difference and false after after_get_difference
std::unordered_map<DialogId, unique_ptr<Dialog>, DialogIdHash> dialogs_;
std::multimap<int32, PendingPtsUpdate> pending_updates_;
std::multimap<int32, PendingPtsUpdate> pending_pts_updates_;
std::multimap<int32, PendingPtsUpdate> postponed_pts_updates_;
std::unordered_set<DialogId, DialogIdHash>

View File

@ -1695,8 +1695,8 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateNewMessage> upd
Promise<Unit> &&promise) {
int new_pts = update->pts_;
int pts_count = update->pts_count_;
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply, "on_updateNewMessage");
promise.set_value(Unit());
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply, std::move(promise),
"on_updateNewMessage");
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateNewChannelMessage> update, bool /*force_apply*/,
@ -1721,18 +1721,16 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateReadMessagesCon
Promise<Unit> &&promise) {
int new_pts = update->pts_;
int pts_count = update->pts_count_;
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply,
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply, std::move(promise),
"on_updateReadMessagesContents");
promise.set_value(Unit());
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateEditMessage> update, bool force_apply,
Promise<Unit> &&promise) {
int new_pts = update->pts_;
int pts_count = update->pts_count_;
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply,
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply, std::move(promise),
"on_updateEditMessage");
promise.set_value(Unit());
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateDeleteMessages> update, bool force_apply,
@ -1741,12 +1739,12 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateDeleteMessages>
int pts_count = update->pts_count_;
if (update->messages_.empty()) {
td_->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), new_pts, pts_count, force_apply,
"on_updateDeleteMessages");
Promise<Unit>(), "on_updateDeleteMessages");
promise.set_value(Unit());
} else {
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply,
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply, std::move(promise),
"on_updateDeleteMessages");
}
promise.set_value(Unit());
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateReadHistoryInbox> update, bool force_apply,
@ -1756,18 +1754,16 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateReadHistoryInbo
if (force_apply) {
update->still_unread_count_ = -1;
}
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply,
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply, std::move(promise),
"on_updateReadHistoryInbox");
promise.set_value(Unit());
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateReadHistoryOutbox> update, bool force_apply,
Promise<Unit> &&promise) {
int new_pts = update->pts_;
int pts_count = update->pts_count_;
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply,
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply, std::move(promise),
"on_updateReadHistoryOutbox");
promise.set_value(Unit());
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateServiceNotification> update, bool /*force_apply*/,
@ -1893,9 +1889,8 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updatePinnedMessages>
Promise<Unit> &&promise) {
int new_pts = update->pts_;
int pts_count = update->pts_count_;
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply,
td_->messages_manager_->add_pending_update(std::move(update), new_pts, pts_count, force_apply, std::move(promise),
"on_updatePinnedMessages");
promise.set_value(Unit());
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updatePinnedChannelMessages> update, bool /*force_apply*/,
@ -1960,7 +1955,7 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateWebPage> update
Promise<Unit> &&promise) {
td_->web_pages_manager_->on_get_web_page(std::move(update->webpage_), DialogId());
td_->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), update->pts_, update->pts_count_,
force_apply, "on_updateWebPage");
force_apply, Promise<Unit>(), "on_updateWebPage");
promise.set_value(Unit());
}
@ -1987,7 +1982,7 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateFolderPeers> up
}
td_->messages_manager_->add_pending_update(make_tl_object<dummyUpdate>(), update->pts_, update->pts_count_,
force_apply, "on_updateFolderPeers");
force_apply, Promise<Unit>(), "on_updateFolderPeers");
promise.set_value(Unit());
}