diff --git a/td/telegram/ContactsManager.cpp b/td/telegram/ContactsManager.cpp index 96e4a739f..e619d2449 100644 --- a/td/telegram/ContactsManager.cpp +++ b/td/telegram/ContactsManager.cpp @@ -5474,11 +5474,12 @@ void ContactsManager::on_load_imported_contacts_from_database(string value) { LOG(INFO) << "Successfully loaded " << all_imported_contacts_.size() << " imported contacts from database"; } - load_imported_contact_users_multipromise_.add_promise(PromiseCreator::lambda([](Result<> result) { - if (result.is_ok()) { - send_closure_later(G()->contacts_manager(), &ContactsManager::on_load_imported_contacts_finished); - } - })); + load_imported_contact_users_multipromise_.add_promise( + PromiseCreator::lambda([actor_id = actor_id(this)](Result result) { + if (result.is_ok()) { + send_closure_later(actor_id, &ContactsManager::on_load_imported_contacts_finished); + } + })); auto lock_promise = load_imported_contact_users_multipromise_.get_promise(); @@ -7972,10 +7973,10 @@ void ContactsManager::on_load_contacts_from_database(string value) { LOG(INFO) << "Successfully loaded " << user_ids.size() << " contacts from database"; - load_contact_users_multipromise_.add_promise( - PromiseCreator::lambda([expected_contact_count = user_ids.size()](Result<> result) { + load_contact_users_multipromise_.add_promise(PromiseCreator::lambda( + [actor_id = actor_id(this), expected_contact_count = user_ids.size()](Result result) { if (result.is_ok()) { - send_closure(G()->contacts_manager(), &ContactsManager::on_get_contacts_finished, expected_contact_count); + send_closure(actor_id, &ContactsManager::on_get_contacts_finished, expected_contact_count); } })); @@ -15204,8 +15205,9 @@ void ContactsManager::on_load_dialog_administrators_from_database(DialogId dialo << " from database"; MultiPromiseActorSafe load_users_multipromise{"LoadUsersMultiPromiseActor"}; - load_users_multipromise.add_promise(PromiseCreator::lambda( - [actor_id = actor_id(this), dialog_id, administrators, promise = std::move(promise)](Result<> result) mutable { + load_users_multipromise.add_promise( + PromiseCreator::lambda([actor_id = actor_id(this), dialog_id, administrators, + promise = std::move(promise)](Result result) mutable { send_closure(actor_id, &ContactsManager::on_load_administrator_users_finished, dialog_id, std::move(administrators), std::move(result), std::move(promise)); })); diff --git a/td/telegram/FileReferenceManager.cpp b/td/telegram/FileReferenceManager.cpp index ed60fe6b2..dca2450a4 100644 --- a/td/telegram/FileReferenceManager.cpp +++ b/td/telegram/FileReferenceManager.cpp @@ -228,17 +228,7 @@ void FileReferenceManager::send_query(Destination dest, FileSourceId file_source auto promise = PromiseCreator::lambda([dest, file_source_id, actor_id = actor_id(this), file_manager_actor_id = G()->file_manager()](Result result) { - if (G()->close_flag()) { - VLOG(file_references) << "Ignore file reference repair from " << file_source_id << " during closing"; - return; - } - auto new_promise = PromiseCreator::lambda([dest, file_source_id, actor_id](Result result) { - if (G()->close_flag()) { - VLOG(file_references) << "Ignore file reference repair from " << file_source_id << " during closing"; - return; - } - Status status; if (result.is_error()) { status = result.move_as_error(); @@ -304,6 +294,11 @@ void FileReferenceManager::send_query(Destination dest, FileSourceId file_source FileReferenceManager::Destination FileReferenceManager::on_query_result(Destination dest, FileSourceId file_source_id, Status status, int32 sub) { + if (G()->close_flag()) { + VLOG(file_references) << "Ignore file reference repair from " << file_source_id << " during closing"; + return dest; + } + VLOG(file_references) << "Receive result of file reference repair query for file " << dest.node_id << " with generation " << dest.generation << " from " << file_source_id << ": " << status << " " << sub; diff --git a/td/telegram/MessagesManager.cpp b/td/telegram/MessagesManager.cpp index d759c3a9e..d77728ba5 100644 --- a/td/telegram/MessagesManager.cpp +++ b/td/telegram/MessagesManager.cpp @@ -9079,19 +9079,15 @@ void MessagesManager::after_get_difference() { dump_debug_message_op(get_dialog(dialog_id)); } if (message_id <= d->last_new_message_id) { - get_message_from_server( - it.first, PromiseCreator::lambda([this, full_message_id](Result result) { - if (G()->close_flag()) { - return; - } - if (result.is_error()) { - LOG(WARNING) << "Failed to get missing " << full_message_id << ": " << result.error(); - } else { - LOG(WARNING) << "Successfully get missing " << full_message_id << ": " - << to_string(get_message_object(full_message_id, "after_get_difference")); - } - }), - "get missing"); + get_message_from_server(it.first, PromiseCreator::lambda([full_message_id](Result result) { + if (result.is_error()) { + LOG(WARNING) + << "Failed to get missing " << full_message_id << ": " << result.error(); + } else { + LOG(WARNING) << "Successfully get missing " << full_message_id; + } + }), + "get missing"); } else if (dialog_id.get_type() == DialogType::Channel) { LOG(INFO) << "Schedule getDifference in " << dialog_id.get_channel_id(); channel_get_difference_retry_timeout_.add_timeout_in(dialog_id.get(), 0.001); @@ -11572,7 +11568,7 @@ void MessagesManager::on_get_secret_chat_total_count(DialogListId dialog_list_id } void MessagesManager::recalc_unread_count(DialogListId dialog_list_id, int32 old_dialog_total_count) { - if (td_->auth_manager_->is_bot() || !G()->parameters().use_message_db) { + if (G()->close_flag() || td_->auth_manager_->is_bot() || !G()->parameters().use_message_db) { return; } @@ -13123,12 +13119,9 @@ void MessagesManager::add_secret_message(unique_ptr pendin multipromise.set_ignore_errors(true); int64 token = pending_secret_messages_.add(std::move(pending_secret_message)); - multipromise.add_promise(PromiseCreator::lambda([token, actor_id = actor_id(this), - this](Result result) mutable { - if (result.is_ok() && !G()->close_flag()) { // if we aren't closing - this->pending_secret_messages_.finish(token, [actor_id](unique_ptr pending_secret_message) { - send_closure_later(actor_id, &MessagesManager::finish_add_secret_message, std::move(pending_secret_message)); - }); + multipromise.add_promise(PromiseCreator::lambda([actor_id = actor_id(this), token](Result result) { + if (result.is_ok()) { + send_closure(actor_id, &MessagesManager::on_add_secret_message_ready, token); } })); @@ -13138,6 +13131,17 @@ void MessagesManager::add_secret_message(unique_ptr pendin lock_promise.set_value(Unit()); } +void MessagesManager::on_add_secret_message_ready(int64 token) { + if (G()->close_flag()) { + return; + } + + pending_secret_messages_.finish( + token, [actor_id = actor_id(this)](unique_ptr pending_secret_message) { + send_closure_later(actor_id, &MessagesManager::finish_add_secret_message, std::move(pending_secret_message)); + }); +} + void MessagesManager::finish_add_secret_message(unique_ptr pending_secret_message) { if (G()->close_flag()) { return; @@ -15539,6 +15543,9 @@ void MessagesManager::on_get_recommended_dialog_filters( void MessagesManager::on_load_recommended_dialog_filters( Result &&result, vector &&filters, Promise> &&promise) { + if (G()->close_flag()) { + return promise.set_error(Status::Error(500, "Request aborted")); + } if (result.is_error()) { return promise.set_error(result.move_as_error()); } @@ -15740,6 +15747,10 @@ void MessagesManager::load_dialog_list(DialogList &list, int32 limit, Promiseclose_flag()) { + return; + } + CHECK(!td_->auth_manager_->is_bot()); auto &folder = *get_dialog_folder(folder_id); if (folder.folder_last_dialog_date_ == MAX_DIALOG_DATE) { @@ -15762,7 +15773,7 @@ void MessagesManager::load_folder_dialog_list(FolderId folder_id, int32 limit, b return; } multipromise.add_promise(PromiseCreator::lambda([actor_id = actor_id(this), folder_id](Result result) { - if (result.is_error() && !G()->close_flag()) { + if (result.is_error()) { send_closure(actor_id, &MessagesManager::on_load_folder_dialog_list_fail, folder_id, result.move_as_error()); } })); @@ -15799,6 +15810,10 @@ void MessagesManager::load_folder_dialog_list(FolderId folder_id, int32 limit, b } void MessagesManager::on_load_folder_dialog_list_fail(FolderId folder_id, Status error) { + if (G()->close_flag()) { + return; + } + LOG(WARNING) << "Failed to load chats in " << folder_id << ": " << error; CHECK(!td_->auth_manager_->is_bot()); const auto &folder = *get_dialog_folder(folder_id); @@ -17781,6 +17796,10 @@ void MessagesManager::get_message_link_info(Slice url, Promise } void MessagesManager::on_get_message_link_dialog(MessageLinkInfo &&info, Promise &&promise) { + if (G()->close_flag()) { + return promise.set_error(Status::Error(500, "Request aborted")); + } + DialogId dialog_id; if (info.username.empty()) { if (!td_->contacts_manager_->have_channel(info.channel_id)) { @@ -17813,6 +17832,10 @@ void MessagesManager::on_get_message_link_dialog(MessageLinkInfo &&info, Promise void MessagesManager::on_get_message_link_message(MessageLinkInfo &&info, DialogId dialog_id, Promise &&promise) { + if (G()->close_flag()) { + return promise.set_error(Status::Error(500, "Request aborted")); + } + Message *m = get_message_force({dialog_id, info.message_id}, "on_get_message_link_message"); if (info.comment_message_id == MessageId() || m == nullptr || !is_broadcast_channel(dialog_id) || !m->reply_info.is_comment || !is_active_message_reply_info(dialog_id, m->reply_info)) { @@ -17840,6 +17863,10 @@ void MessagesManager::on_get_message_link_message(MessageLinkInfo &&info, Dialog void MessagesManager::on_get_message_link_discussion_message(MessageLinkInfo &&info, DialogId comment_dialog_id, Promise &&promise) { + if (G()->close_flag()) { + return promise.set_error(Status::Error(500, "Request aborted")); + } + CHECK(comment_dialog_id.is_valid()); info.comment_dialog_id = comment_dialog_id; @@ -27038,8 +27065,8 @@ void MessagesManager::start_import_messages(DialogId dialog_id, int64 import_id, } while (random_id == 0 || pending_message_imports_.find(random_id) != pending_message_imports_.end()); pending_message_imports_[random_id] = std::move(pending_message_import); - multipromise.add_promise(PromiseCreator::lambda([random_id](Result result) { - send_closure_later(G()->messages_manager(), &MessagesManager::on_imported_message_attachments_uploaded, random_id, + multipromise.add_promise(PromiseCreator::lambda([actor_id = actor_id(this), random_id](Result result) { + send_closure_later(actor_id, &MessagesManager::on_imported_message_attachments_uploaded, random_id, std::move(result)); })); auto lock_promise = multipromise.get_promise(); @@ -28342,6 +28369,8 @@ bool MessagesManager::add_new_message_notification(Dialog *d, Message *m, bool f void MessagesManager::flush_pending_new_message_notifications(DialogId dialog_id, bool from_mentions, DialogId settings_dialog_id) { + // flush pending notifications even while closing + auto d = get_dialog(dialog_id); CHECK(d != nullptr); auto &pending_notifications = @@ -28361,10 +28390,8 @@ void MessagesManager::flush_pending_new_message_notifications(DialogId dialog_id auto it = pending_notifications.begin(); while (it != pending_notifications.end() && it->first == DialogId()) { auto m = get_message(d, it->second); - if (m != nullptr) { - if (add_new_message_notification(d, m, true)) { - on_message_changed(d, m, false, "flush_pending_new_message_notifications"); - } + if (m != nullptr && add_new_message_notification(d, m, true)) { + on_message_changed(d, m, false, "flush_pending_new_message_notifications"); } ++it; } @@ -33561,12 +33588,20 @@ void MessagesManager::do_delete_message_log_event(const DeleteMessageLogEvent &l } MultiPromiseActorSafe mpas{"DeleteMessageMultiPromiseActor"}; - mpas.add_promise(PromiseCreator::lambda([log_event_id](Result result) { - if (result.is_error() || G()->close_flag()) { - return; - } - binlog_erase(G()->td_db()->get_binlog(), log_event_id); - })); + mpas.add_promise( + PromiseCreator::lambda([log_event_id, context_weak_ptr = get_context_weak_ptr()](Result result) { + auto context = context_weak_ptr.lock(); + if (result.is_error() || context == nullptr) { + return; + } + CHECK(context->get_id() == Global::ID); + auto global = static_cast(context.get()); + if (global->close_flag()) { + return; + } + + binlog_erase(global->td_db()->get_binlog(), log_event_id); + })); auto lock = mpas.get_promise(); for (auto file_id : log_event.file_ids_) { diff --git a/td/telegram/MessagesManager.h b/td/telegram/MessagesManager.h index 7fe96a259..924dbf485 100644 --- a/td/telegram/MessagesManager.h +++ b/td/telegram/MessagesManager.h @@ -1743,6 +1743,8 @@ class MessagesManager final : public Actor { void add_secret_message(unique_ptr pending_secret_message, Promise lock_promise = Auto()); + void on_add_secret_message_ready(int64 token); + void finish_add_secret_message(unique_ptr pending_secret_message); void finish_delete_secret_messages(DialogId dialog_id, std::vector random_ids, Promise<> promise); diff --git a/td/telegram/RecentDialogList.cpp b/td/telegram/RecentDialogList.cpp index 5b332a009..e53196fb4 100644 --- a/td/telegram/RecentDialogList.cpp +++ b/td/telegram/RecentDialogList.cpp @@ -129,6 +129,13 @@ void RecentDialogList::on_load_dialogs(vector &&found_dialogs) { auto promises = std::move(load_list_queries_); CHECK(!promises.empty()); + if (G()->close_flag()) { + for (auto &promise : promises) { + promise.set_error(Status::Error(500, "Request aborted")); + } + return; + } + auto newly_found_dialogs = std::move(dialog_ids_); reset_to_empty(dialog_ids_); diff --git a/td/telegram/SecretChatActor.cpp b/td/telegram/SecretChatActor.cpp index b5fad2453..afcaf8820 100644 --- a/td/telegram/SecretChatActor.cpp +++ b/td/telegram/SecretChatActor.cpp @@ -1379,7 +1379,7 @@ void SecretChatActor::on_save_changes_start(ChangesProcessor::Id sa } void SecretChatActor::on_inbound_save_message_finish(uint64 state_id) { - if (close_flag_) { + if (close_flag_ || context_->close_flag()) { return; } auto *state = inbound_message_states_.get(state_id); diff --git a/td/telegram/StickersManager.cpp b/td/telegram/StickersManager.cpp index ccf14257c..d411cf317 100644 --- a/td/telegram/StickersManager.cpp +++ b/td/telegram/StickersManager.cpp @@ -5313,9 +5313,8 @@ void StickersManager::create_new_sticker_set(UserId user_id, string &title, stri } while (random_id == 0 || pending_new_sticker_sets_.find(random_id) != pending_new_sticker_sets_.end()); pending_new_sticker_sets_[random_id] = std::move(pending_new_sticker_set); - multipromise.add_promise(PromiseCreator::lambda([random_id](Result result) { - send_closure_later(G()->stickers_manager(), &StickersManager::on_new_stickers_uploaded, random_id, - std::move(result)); + multipromise.add_promise(PromiseCreator::lambda([actor_id = actor_id(this), random_id](Result result) { + send_closure_later(actor_id, &StickersManager::on_new_stickers_uploaded, random_id, std::move(result)); })); auto lock_promise = multipromise.get_promise(); @@ -5453,6 +5452,9 @@ void StickersManager::on_new_stickers_uploaded(int64 random_id, Result res pending_new_sticker_sets_.erase(it); + if (G()->close_flag()) { + result = Status::Error(500, "Request aborted"); + } if (result.is_error()) { pending_new_sticker_set->promise.set_error(result.move_as_error()); return; @@ -6811,8 +6813,8 @@ void StickersManager::on_get_emoji_keywords( const string &language_code, Result> &&result) { auto it = load_emoji_keywords_queries_.find(language_code); CHECK(it != load_emoji_keywords_queries_.end()); - CHECK(!it->second.empty()); auto promises = std::move(it->second); + CHECK(!promises.empty()); load_emoji_keywords_queries_.erase(it); if (result.is_error()) { diff --git a/td/telegram/UpdatesManager.cpp b/td/telegram/UpdatesManager.cpp index 0ba36bbe7..477477711 100644 --- a/td/telegram/UpdatesManager.cpp +++ b/td/telegram/UpdatesManager.cpp @@ -1656,7 +1656,9 @@ void UpdatesManager::on_pending_updates(vector &&result) mutable { + send_closure(actor_id, &UpdatesManager::on_pending_updates_processed, std::move(result), std::move(promise)); + }); auto lock = mpas.get_promise(); for (auto &update : updates) { @@ -1775,6 +1777,10 @@ void UpdatesManager::on_pending_updates(vector &&result, Promise &&promise) { + promise.set_result(std::move(result)); +} + void UpdatesManager::add_pending_qts_update(tl_object_ptr &&update, int32 qts, Promise &&promise) { CHECK(update != nullptr); diff --git a/td/telegram/UpdatesManager.h b/td/telegram/UpdatesManager.h index 7a3ef1312..42bf659e4 100644 --- a/td/telegram/UpdatesManager.h +++ b/td/telegram/UpdatesManager.h @@ -267,6 +267,8 @@ class UpdatesManager final : public Actor { void on_pending_updates(vector> &&updates, int32 seq_begin, int32 seq_end, int32 date, double receive_time, Promise &&promise, const char *source); + void on_pending_updates_processed(Result &&result, Promise &&promise); + void process_updates(vector> &&updates, bool force_apply, Promise &&promise); diff --git a/td/telegram/files/FileManager.cpp b/td/telegram/files/FileManager.cpp index a63a42a92..2c2316ac5 100644 --- a/td/telegram/files/FileManager.cpp +++ b/td/telegram/files/FileManager.cpp @@ -1745,6 +1745,12 @@ void FileManager::change_files_source(FileSourceId file_source_id, const vector< void FileManager::on_file_reference_repaired(FileId file_id, FileSourceId file_source_id, Result &&result, Promise &&promise) { + if (G()->close_flag()) { + VLOG(file_references) << "Ignore file reference of file " << file_id << " repair from " << file_source_id + << " during closing"; + return promise.set_error(Status::Error(500, "Request aborted")); + } + auto file_view = get_file_view(file_id); CHECK(!file_view.empty()); if (result.is_ok() && diff --git a/tdactor/td/actor/impl/Actor-decl.h b/tdactor/td/actor/impl/Actor-decl.h index 8a107375f..f8ae5f5ef 100644 --- a/tdactor/td/actor/impl/Actor-decl.h +++ b/tdactor/td/actor/impl/Actor-decl.h @@ -75,6 +75,7 @@ class Actor : public ObserverBase { void do_migrate(int32 sched_id); uint64 get_link_token(); + std::weak_ptr get_context_weak_ptr() const; std::shared_ptr set_context(std::shared_ptr context); string set_tag(string tag); diff --git a/tdactor/td/actor/impl/Actor.h b/tdactor/td/actor/impl/Actor.h index 3577093ce..b57eeb129 100644 --- a/tdactor/td/actor/impl/Actor.h +++ b/tdactor/td/actor/impl/Actor.h @@ -79,18 +79,26 @@ std::enable_if_t::value> start_migrate(ActorTy Scheduler::instance()->start_migrate_actor(&obj, sched_id); } } + template std::enable_if_t::value> finish_migrate(ActorType &obj) { if (!obj.empty()) { Scheduler::instance()->finish_migrate_actor(&obj); } } + inline uint64 Actor::get_link_token() { return Scheduler::instance()->get_link_token(this); } + +inline std::weak_ptr Actor::get_context_weak_ptr() const { + return info_->get_context_weak_ptr(); +} + inline std::shared_ptr Actor::set_context(std::shared_ptr context) { return info_->set_context(std::move(context)); } + inline string Actor::set_tag(string tag) { auto *ctx = info_->get_context(); string old_tag; @@ -105,12 +113,14 @@ inline string Actor::set_tag(string tag) { inline void Actor::init(ObjectPool::OwnerPtr &&info) { info_ = std::move(info); } + inline ActorInfo *Actor::get_info() { return &*info_; } inline const ActorInfo *Actor::get_info() const { return &*info_; } + inline ObjectPool::OwnerPtr Actor::clear() { return std::move(info_); } diff --git a/tdactor/td/actor/impl/ActorInfo-decl.h b/tdactor/td/actor/impl/ActorInfo-decl.h index ee8a78267..cb9a8e7bd 100644 --- a/tdactor/td/actor/impl/ActorInfo-decl.h +++ b/tdactor/td/actor/impl/ActorInfo-decl.h @@ -86,6 +86,7 @@ class ActorInfo final const Actor *get_actor_unsafe() const; std::shared_ptr set_context(std::shared_ptr context); + std::weak_ptr get_context_weak_ptr() const; ActorContext *get_context(); const ActorContext *get_context() const; CSlice get_name() const; diff --git a/tdactor/td/actor/impl/ActorInfo.h b/tdactor/td/actor/impl/ActorInfo.h index f4a6c088b..226490af9 100644 --- a/tdactor/td/actor/impl/ActorInfo.h +++ b/tdactor/td/actor/impl/ActorInfo.h @@ -156,6 +156,11 @@ inline std::shared_ptr ActorInfo::set_context(std::shared_ptr ActorInfo::get_context_weak_ptr() const { + return context_; +} + inline const ActorContext *ActorInfo::get_context() const { return context_.get(); }