Avoid G usage in lambda promises.

This commit is contained in:
levlam 2021-10-07 15:28:52 +03:00
parent c85f993de8
commit 3b794126d8
14 changed files with 133 additions and 59 deletions

View File

@ -5474,11 +5474,12 @@ void ContactsManager::on_load_imported_contacts_from_database(string value) {
LOG(INFO) << "Successfully loaded " << all_imported_contacts_.size() << " imported contacts from database";
}
load_imported_contact_users_multipromise_.add_promise(PromiseCreator::lambda([](Result<> result) {
if (result.is_ok()) {
send_closure_later(G()->contacts_manager(), &ContactsManager::on_load_imported_contacts_finished);
}
}));
load_imported_contact_users_multipromise_.add_promise(
PromiseCreator::lambda([actor_id = actor_id(this)](Result<Unit> result) {
if (result.is_ok()) {
send_closure_later(actor_id, &ContactsManager::on_load_imported_contacts_finished);
}
}));
auto lock_promise = load_imported_contact_users_multipromise_.get_promise();
@ -7972,10 +7973,10 @@ void ContactsManager::on_load_contacts_from_database(string value) {
LOG(INFO) << "Successfully loaded " << user_ids.size() << " contacts from database";
load_contact_users_multipromise_.add_promise(
PromiseCreator::lambda([expected_contact_count = user_ids.size()](Result<> result) {
load_contact_users_multipromise_.add_promise(PromiseCreator::lambda(
[actor_id = actor_id(this), expected_contact_count = user_ids.size()](Result<Unit> result) {
if (result.is_ok()) {
send_closure(G()->contacts_manager(), &ContactsManager::on_get_contacts_finished, expected_contact_count);
send_closure(actor_id, &ContactsManager::on_get_contacts_finished, expected_contact_count);
}
}));
@ -15204,8 +15205,9 @@ void ContactsManager::on_load_dialog_administrators_from_database(DialogId dialo
<< " from database";
MultiPromiseActorSafe load_users_multipromise{"LoadUsersMultiPromiseActor"};
load_users_multipromise.add_promise(PromiseCreator::lambda(
[actor_id = actor_id(this), dialog_id, administrators, promise = std::move(promise)](Result<> result) mutable {
load_users_multipromise.add_promise(
PromiseCreator::lambda([actor_id = actor_id(this), dialog_id, administrators,
promise = std::move(promise)](Result<Unit> result) mutable {
send_closure(actor_id, &ContactsManager::on_load_administrator_users_finished, dialog_id,
std::move(administrators), std::move(result), std::move(promise));
}));

View File

@ -228,17 +228,7 @@ void FileReferenceManager::send_query(Destination dest, FileSourceId file_source
auto promise = PromiseCreator::lambda([dest, file_source_id, actor_id = actor_id(this),
file_manager_actor_id = G()->file_manager()](Result<Unit> result) {
if (G()->close_flag()) {
VLOG(file_references) << "Ignore file reference repair from " << file_source_id << " during closing";
return;
}
auto new_promise = PromiseCreator::lambda([dest, file_source_id, actor_id](Result<Unit> result) {
if (G()->close_flag()) {
VLOG(file_references) << "Ignore file reference repair from " << file_source_id << " during closing";
return;
}
Status status;
if (result.is_error()) {
status = result.move_as_error();
@ -304,6 +294,11 @@ void FileReferenceManager::send_query(Destination dest, FileSourceId file_source
FileReferenceManager::Destination FileReferenceManager::on_query_result(Destination dest, FileSourceId file_source_id,
Status status, int32 sub) {
if (G()->close_flag()) {
VLOG(file_references) << "Ignore file reference repair from " << file_source_id << " during closing";
return dest;
}
VLOG(file_references) << "Receive result of file reference repair query for file " << dest.node_id
<< " with generation " << dest.generation << " from " << file_source_id << ": " << status << " "
<< sub;

View File

@ -9079,19 +9079,15 @@ void MessagesManager::after_get_difference() {
dump_debug_message_op(get_dialog(dialog_id));
}
if (message_id <= d->last_new_message_id) {
get_message_from_server(
it.first, PromiseCreator::lambda([this, full_message_id](Result<Unit> result) {
if (G()->close_flag()) {
return;
}
if (result.is_error()) {
LOG(WARNING) << "Failed to get missing " << full_message_id << ": " << result.error();
} else {
LOG(WARNING) << "Successfully get missing " << full_message_id << ": "
<< to_string(get_message_object(full_message_id, "after_get_difference"));
}
}),
"get missing");
get_message_from_server(it.first, PromiseCreator::lambda([full_message_id](Result<Unit> result) {
if (result.is_error()) {
LOG(WARNING)
<< "Failed to get missing " << full_message_id << ": " << result.error();
} else {
LOG(WARNING) << "Successfully get missing " << full_message_id;
}
}),
"get missing");
} else if (dialog_id.get_type() == DialogType::Channel) {
LOG(INFO) << "Schedule getDifference in " << dialog_id.get_channel_id();
channel_get_difference_retry_timeout_.add_timeout_in(dialog_id.get(), 0.001);
@ -11572,7 +11568,7 @@ void MessagesManager::on_get_secret_chat_total_count(DialogListId dialog_list_id
}
void MessagesManager::recalc_unread_count(DialogListId dialog_list_id, int32 old_dialog_total_count) {
if (td_->auth_manager_->is_bot() || !G()->parameters().use_message_db) {
if (G()->close_flag() || td_->auth_manager_->is_bot() || !G()->parameters().use_message_db) {
return;
}
@ -13123,12 +13119,9 @@ void MessagesManager::add_secret_message(unique_ptr<PendingSecretMessage> pendin
multipromise.set_ignore_errors(true);
int64 token = pending_secret_messages_.add(std::move(pending_secret_message));
multipromise.add_promise(PromiseCreator::lambda([token, actor_id = actor_id(this),
this](Result<Unit> result) mutable {
if (result.is_ok() && !G()->close_flag()) { // if we aren't closing
this->pending_secret_messages_.finish(token, [actor_id](unique_ptr<PendingSecretMessage> pending_secret_message) {
send_closure_later(actor_id, &MessagesManager::finish_add_secret_message, std::move(pending_secret_message));
});
multipromise.add_promise(PromiseCreator::lambda([actor_id = actor_id(this), token](Result<Unit> result) {
if (result.is_ok()) {
send_closure(actor_id, &MessagesManager::on_add_secret_message_ready, token);
}
}));
@ -13138,6 +13131,17 @@ void MessagesManager::add_secret_message(unique_ptr<PendingSecretMessage> pendin
lock_promise.set_value(Unit());
}
void MessagesManager::on_add_secret_message_ready(int64 token) {
if (G()->close_flag()) {
return;
}
pending_secret_messages_.finish(
token, [actor_id = actor_id(this)](unique_ptr<PendingSecretMessage> pending_secret_message) {
send_closure_later(actor_id, &MessagesManager::finish_add_secret_message, std::move(pending_secret_message));
});
}
void MessagesManager::finish_add_secret_message(unique_ptr<PendingSecretMessage> pending_secret_message) {
if (G()->close_flag()) {
return;
@ -15539,6 +15543,9 @@ void MessagesManager::on_get_recommended_dialog_filters(
void MessagesManager::on_load_recommended_dialog_filters(
Result<Unit> &&result, vector<RecommendedDialogFilter> &&filters,
Promise<td_api::object_ptr<td_api::recommendedChatFilters>> &&promise) {
if (G()->close_flag()) {
return promise.set_error(Status::Error(500, "Request aborted"));
}
if (result.is_error()) {
return promise.set_error(result.move_as_error());
}
@ -15740,6 +15747,10 @@ void MessagesManager::load_dialog_list(DialogList &list, int32 limit, Promise<Un
}
void MessagesManager::load_folder_dialog_list(FolderId folder_id, int32 limit, bool only_local) {
if (G()->close_flag()) {
return;
}
CHECK(!td_->auth_manager_->is_bot());
auto &folder = *get_dialog_folder(folder_id);
if (folder.folder_last_dialog_date_ == MAX_DIALOG_DATE) {
@ -15762,7 +15773,7 @@ void MessagesManager::load_folder_dialog_list(FolderId folder_id, int32 limit, b
return;
}
multipromise.add_promise(PromiseCreator::lambda([actor_id = actor_id(this), folder_id](Result<Unit> result) {
if (result.is_error() && !G()->close_flag()) {
if (result.is_error()) {
send_closure(actor_id, &MessagesManager::on_load_folder_dialog_list_fail, folder_id, result.move_as_error());
}
}));
@ -15799,6 +15810,10 @@ void MessagesManager::load_folder_dialog_list(FolderId folder_id, int32 limit, b
}
void MessagesManager::on_load_folder_dialog_list_fail(FolderId folder_id, Status error) {
if (G()->close_flag()) {
return;
}
LOG(WARNING) << "Failed to load chats in " << folder_id << ": " << error;
CHECK(!td_->auth_manager_->is_bot());
const auto &folder = *get_dialog_folder(folder_id);
@ -17781,6 +17796,10 @@ void MessagesManager::get_message_link_info(Slice url, Promise<MessageLinkInfo>
}
void MessagesManager::on_get_message_link_dialog(MessageLinkInfo &&info, Promise<MessageLinkInfo> &&promise) {
if (G()->close_flag()) {
return promise.set_error(Status::Error(500, "Request aborted"));
}
DialogId dialog_id;
if (info.username.empty()) {
if (!td_->contacts_manager_->have_channel(info.channel_id)) {
@ -17813,6 +17832,10 @@ void MessagesManager::on_get_message_link_dialog(MessageLinkInfo &&info, Promise
void MessagesManager::on_get_message_link_message(MessageLinkInfo &&info, DialogId dialog_id,
Promise<MessageLinkInfo> &&promise) {
if (G()->close_flag()) {
return promise.set_error(Status::Error(500, "Request aborted"));
}
Message *m = get_message_force({dialog_id, info.message_id}, "on_get_message_link_message");
if (info.comment_message_id == MessageId() || m == nullptr || !is_broadcast_channel(dialog_id) ||
!m->reply_info.is_comment || !is_active_message_reply_info(dialog_id, m->reply_info)) {
@ -17840,6 +17863,10 @@ void MessagesManager::on_get_message_link_message(MessageLinkInfo &&info, Dialog
void MessagesManager::on_get_message_link_discussion_message(MessageLinkInfo &&info, DialogId comment_dialog_id,
Promise<MessageLinkInfo> &&promise) {
if (G()->close_flag()) {
return promise.set_error(Status::Error(500, "Request aborted"));
}
CHECK(comment_dialog_id.is_valid());
info.comment_dialog_id = comment_dialog_id;
@ -27038,8 +27065,8 @@ void MessagesManager::start_import_messages(DialogId dialog_id, int64 import_id,
} while (random_id == 0 || pending_message_imports_.find(random_id) != pending_message_imports_.end());
pending_message_imports_[random_id] = std::move(pending_message_import);
multipromise.add_promise(PromiseCreator::lambda([random_id](Result<Unit> result) {
send_closure_later(G()->messages_manager(), &MessagesManager::on_imported_message_attachments_uploaded, random_id,
multipromise.add_promise(PromiseCreator::lambda([actor_id = actor_id(this), random_id](Result<Unit> result) {
send_closure_later(actor_id, &MessagesManager::on_imported_message_attachments_uploaded, random_id,
std::move(result));
}));
auto lock_promise = multipromise.get_promise();
@ -28342,6 +28369,8 @@ bool MessagesManager::add_new_message_notification(Dialog *d, Message *m, bool f
void MessagesManager::flush_pending_new_message_notifications(DialogId dialog_id, bool from_mentions,
DialogId settings_dialog_id) {
// flush pending notifications even while closing
auto d = get_dialog(dialog_id);
CHECK(d != nullptr);
auto &pending_notifications =
@ -28361,10 +28390,8 @@ void MessagesManager::flush_pending_new_message_notifications(DialogId dialog_id
auto it = pending_notifications.begin();
while (it != pending_notifications.end() && it->first == DialogId()) {
auto m = get_message(d, it->second);
if (m != nullptr) {
if (add_new_message_notification(d, m, true)) {
on_message_changed(d, m, false, "flush_pending_new_message_notifications");
}
if (m != nullptr && add_new_message_notification(d, m, true)) {
on_message_changed(d, m, false, "flush_pending_new_message_notifications");
}
++it;
}
@ -33561,12 +33588,20 @@ void MessagesManager::do_delete_message_log_event(const DeleteMessageLogEvent &l
}
MultiPromiseActorSafe mpas{"DeleteMessageMultiPromiseActor"};
mpas.add_promise(PromiseCreator::lambda([log_event_id](Result<Unit> result) {
if (result.is_error() || G()->close_flag()) {
return;
}
binlog_erase(G()->td_db()->get_binlog(), log_event_id);
}));
mpas.add_promise(
PromiseCreator::lambda([log_event_id, context_weak_ptr = get_context_weak_ptr()](Result<Unit> result) {
auto context = context_weak_ptr.lock();
if (result.is_error() || context == nullptr) {
return;
}
CHECK(context->get_id() == Global::ID);
auto global = static_cast<Global *>(context.get());
if (global->close_flag()) {
return;
}
binlog_erase(global->td_db()->get_binlog(), log_event_id);
}));
auto lock = mpas.get_promise();
for (auto file_id : log_event.file_ids_) {

View File

@ -1743,6 +1743,8 @@ class MessagesManager final : public Actor {
void add_secret_message(unique_ptr<PendingSecretMessage> pending_secret_message, Promise<Unit> lock_promise = Auto());
void on_add_secret_message_ready(int64 token);
void finish_add_secret_message(unique_ptr<PendingSecretMessage> pending_secret_message);
void finish_delete_secret_messages(DialogId dialog_id, std::vector<int64> random_ids, Promise<> promise);

View File

@ -129,6 +129,13 @@ void RecentDialogList::on_load_dialogs(vector<string> &&found_dialogs) {
auto promises = std::move(load_list_queries_);
CHECK(!promises.empty());
if (G()->close_flag()) {
for (auto &promise : promises) {
promise.set_error(Status::Error(500, "Request aborted"));
}
return;
}
auto newly_found_dialogs = std::move(dialog_ids_);
reset_to_empty(dialog_ids_);

View File

@ -1379,7 +1379,7 @@ void SecretChatActor::on_save_changes_start(ChangesProcessor<StateChange>::Id sa
}
void SecretChatActor::on_inbound_save_message_finish(uint64 state_id) {
if (close_flag_) {
if (close_flag_ || context_->close_flag()) {
return;
}
auto *state = inbound_message_states_.get(state_id);

View File

@ -5313,9 +5313,8 @@ void StickersManager::create_new_sticker_set(UserId user_id, string &title, stri
} while (random_id == 0 || pending_new_sticker_sets_.find(random_id) != pending_new_sticker_sets_.end());
pending_new_sticker_sets_[random_id] = std::move(pending_new_sticker_set);
multipromise.add_promise(PromiseCreator::lambda([random_id](Result<Unit> result) {
send_closure_later(G()->stickers_manager(), &StickersManager::on_new_stickers_uploaded, random_id,
std::move(result));
multipromise.add_promise(PromiseCreator::lambda([actor_id = actor_id(this), random_id](Result<Unit> result) {
send_closure_later(actor_id, &StickersManager::on_new_stickers_uploaded, random_id, std::move(result));
}));
auto lock_promise = multipromise.get_promise();
@ -5453,6 +5452,9 @@ void StickersManager::on_new_stickers_uploaded(int64 random_id, Result<Unit> res
pending_new_sticker_sets_.erase(it);
if (G()->close_flag()) {
result = Status::Error(500, "Request aborted");
}
if (result.is_error()) {
pending_new_sticker_set->promise.set_error(result.move_as_error());
return;
@ -6811,8 +6813,8 @@ void StickersManager::on_get_emoji_keywords(
const string &language_code, Result<telegram_api::object_ptr<telegram_api::emojiKeywordsDifference>> &&result) {
auto it = load_emoji_keywords_queries_.find(language_code);
CHECK(it != load_emoji_keywords_queries_.end());
CHECK(!it->second.empty());
auto promises = std::move(it->second);
CHECK(!promises.empty());
load_emoji_keywords_queries_.erase(it);
if (result.is_error()) {

View File

@ -1656,7 +1656,9 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
}
MultiPromiseActorSafe mpas{"OnPendingUpdatesMultiPromiseActor"};
mpas.add_promise(std::move(promise));
mpas.add_promise([actor_id = actor_id(this), promise = std::move(promise)](Result<Unit> &&result) mutable {
send_closure(actor_id, &UpdatesManager::on_pending_updates_processed, std::move(result), std::move(promise));
});
auto lock = mpas.get_promise();
for (auto &update : updates) {
@ -1775,6 +1777,10 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
lock.set_value(Unit());
}
void UpdatesManager::on_pending_updates_processed(Result<Unit> &&result, Promise<Unit> &&promise) {
promise.set_result(std::move(result));
}
void UpdatesManager::add_pending_qts_update(tl_object_ptr<telegram_api::Update> &&update, int32 qts,
Promise<Unit> &&promise) {
CHECK(update != nullptr);

View File

@ -267,6 +267,8 @@ class UpdatesManager final : public Actor {
void on_pending_updates(vector<tl_object_ptr<telegram_api::Update>> &&updates, int32 seq_begin, int32 seq_end,
int32 date, double receive_time, Promise<Unit> &&promise, const char *source);
void on_pending_updates_processed(Result<Unit> &&result, Promise<Unit> &&promise);
void process_updates(vector<tl_object_ptr<telegram_api::Update>> &&updates, bool force_apply,
Promise<Unit> &&promise);

View File

@ -1745,6 +1745,12 @@ void FileManager::change_files_source(FileSourceId file_source_id, const vector<
void FileManager::on_file_reference_repaired(FileId file_id, FileSourceId file_source_id, Result<Unit> &&result,
Promise<Unit> &&promise) {
if (G()->close_flag()) {
VLOG(file_references) << "Ignore file reference of file " << file_id << " repair from " << file_source_id
<< " during closing";
return promise.set_error(Status::Error(500, "Request aborted"));
}
auto file_view = get_file_view(file_id);
CHECK(!file_view.empty());
if (result.is_ok() &&

View File

@ -75,6 +75,7 @@ class Actor : public ObserverBase {
void do_migrate(int32 sched_id);
uint64 get_link_token();
std::weak_ptr<ActorContext> get_context_weak_ptr() const;
std::shared_ptr<ActorContext> set_context(std::shared_ptr<ActorContext> context);
string set_tag(string tag);

View File

@ -79,18 +79,26 @@ std::enable_if_t<std::is_base_of<Actor, ActorType>::value> start_migrate(ActorTy
Scheduler::instance()->start_migrate_actor(&obj, sched_id);
}
}
template <class ActorType>
std::enable_if_t<std::is_base_of<Actor, ActorType>::value> finish_migrate(ActorType &obj) {
if (!obj.empty()) {
Scheduler::instance()->finish_migrate_actor(&obj);
}
}
inline uint64 Actor::get_link_token() {
return Scheduler::instance()->get_link_token(this);
}
inline std::weak_ptr<ActorContext> Actor::get_context_weak_ptr() const {
return info_->get_context_weak_ptr();
}
inline std::shared_ptr<ActorContext> Actor::set_context(std::shared_ptr<ActorContext> context) {
return info_->set_context(std::move(context));
}
inline string Actor::set_tag(string tag) {
auto *ctx = info_->get_context();
string old_tag;
@ -105,12 +113,14 @@ inline string Actor::set_tag(string tag) {
inline void Actor::init(ObjectPool<ActorInfo>::OwnerPtr &&info) {
info_ = std::move(info);
}
inline ActorInfo *Actor::get_info() {
return &*info_;
}
inline const ActorInfo *Actor::get_info() const {
return &*info_;
}
inline ObjectPool<ActorInfo>::OwnerPtr Actor::clear() {
return std::move(info_);
}

View File

@ -86,6 +86,7 @@ class ActorInfo final
const Actor *get_actor_unsafe() const;
std::shared_ptr<ActorContext> set_context(std::shared_ptr<ActorContext> context);
std::weak_ptr<ActorContext> get_context_weak_ptr() const;
ActorContext *get_context();
const ActorContext *get_context() const;
CSlice get_name() const;

View File

@ -156,6 +156,11 @@ inline std::shared_ptr<ActorContext> ActorInfo::set_context(std::shared_ptr<Acto
Scheduler::on_context_updated();
return context;
}
inline std::weak_ptr<ActorContext> ActorInfo::get_context_weak_ptr() const {
return context_;
}
inline const ActorContext *ActorInfo::get_context() const {
return context_.get();
}