diff --git a/td/telegram/MessagesManager.cpp b/td/telegram/MessagesManager.cpp index 6037f7a0a..2cf666078 100644 --- a/td/telegram/MessagesManager.cpp +++ b/td/telegram/MessagesManager.cpp @@ -275,16 +275,9 @@ class GetDialogQuery final : public Td::ResultHandler { td->contacts_manager_->on_get_chats(std::move(result->chats_), "GetDialogQuery"); td->messages_manager_->on_get_dialogs( FolderId(), std::move(result->dialogs_), -1, std::move(result->messages_), - PromiseCreator::lambda([td = td, dialog_id = dialog_id_](Result<> result) { - if (G()->close_flag()) { - return; - } - if (result.is_ok()) { - td->messages_manager_->on_get_dialog_query_finished(dialog_id, Status::OK()); - } else { - td->messages_manager_->on_get_dialog_error(dialog_id, result.error(), "OnGetDialogs"); - td->messages_manager_->on_get_dialog_query_finished(dialog_id, result.move_as_error()); - } + PromiseCreator::lambda([actor_id = td->messages_manager_actor_.get(), dialog_id = dialog_id_](Result<> result) { + send_closure(actor_id, &MessagesManager::on_get_dialog_query_finished, dialog_id, + result.is_error() ? result.move_as_error() : Status::OK()); })); } @@ -453,12 +446,10 @@ class GetMessagesQuery final : public Td::ResultHandler { return on_error(id, result_ptr.move_as_error()); } - auto info = td->messages_manager_->on_get_messages(result_ptr.move_as_ok(), "GetMessagesQuery"); + auto info = td->messages_manager_->get_messages_info(result_ptr.move_as_ok(), "GetMessagesQuery"); LOG_IF(ERROR, info.is_channel_messages) << "Receive channel messages in GetMessagesQuery"; td->messages_manager_->on_get_messages(std::move(info.messages), info.is_channel_messages, false, - "GetMessagesQuery"); - - promise_.set_value(Unit()); + std::move(promise_), "GetMessagesQuery"); } void on_error(uint64 id, Status status) final { @@ -492,7 +483,7 @@ class GetChannelMessagesQuery final : public Td::ResultHandler { return on_error(id, result_ptr.move_as_error()); } - auto info = td->messages_manager_->on_get_messages(result_ptr.move_as_ok(), "GetChannelMessagesQuery"); + auto info = td->messages_manager_->get_messages_info(result_ptr.move_as_ok(), "GetChannelMessagesQuery"); LOG_IF(ERROR, !info.is_channel_messages) << "Receive ordinary messages in GetChannelMessagesQuery"; if (!td->auth_manager_->is_bot()) { // bots can receive messageEmpty because of their privacy mode vector empty_message_ids; @@ -508,20 +499,16 @@ class GetChannelMessagesQuery final : public Td::ResultHandler { } td->messages_manager_->get_channel_difference_if_needed( DialogId(channel_id_), std::move(info), - PromiseCreator::lambda( - [td = td, promise = std::move(promise_)](Result &&result) mutable { - if (G()->close_flag()) { - result = Status::Error(500, "Request aborted"); - } - if (result.is_error()) { - promise.set_error(result.move_as_error()); - } else { - auto info = result.move_as_ok(); - td->messages_manager_->on_get_messages(std::move(info.messages), info.is_channel_messages, false, - "GetChannelMessagesQuery"); - promise.set_value(Unit()); - } - })); + PromiseCreator::lambda([actor_id = td->messages_manager_actor_.get(), + promise = std::move(promise_)](Result &&result) mutable { + if (result.is_error()) { + promise.set_error(result.move_as_error()); + } else { + auto info = result.move_as_ok(); + send_closure(actor_id, &MessagesManager::on_get_messages, std::move(info.messages), + info.is_channel_messages, false, std::move(promise), "GetChannelMessagesQuery"); + } + })); } void on_error(uint64 id, Status status) final { @@ -555,13 +542,11 @@ class GetScheduledMessagesQuery final : public Td::ResultHandler { return on_error(id, result_ptr.move_as_error()); } - auto info = td->messages_manager_->on_get_messages(result_ptr.move_as_ok(), "GetScheduledMessagesQuery"); + auto info = td->messages_manager_->get_messages_info(result_ptr.move_as_ok(), "GetScheduledMessagesQuery"); LOG_IF(ERROR, info.is_channel_messages != (dialog_id_.get_type() == DialogType::Channel)) << "Receive wrong messages constructor in GetScheduledMessagesQuery"; td->messages_manager_->on_get_messages(std::move(info.messages), info.is_channel_messages, true, - "GetScheduledMessagesQuery"); - - promise_.set_value(Unit()); + std::move(promise_), "GetScheduledMessagesQuery"); } void on_error(uint64 id, Status status) final { @@ -1974,21 +1959,18 @@ class GetDialogMessageByDateQuery final : public Td::ResultHandler { return on_error(id, result_ptr.move_as_error()); } - auto info = td->messages_manager_->on_get_messages(result_ptr.move_as_ok(), "GetDialogMessageByDateQuery"); + auto info = td->messages_manager_->get_messages_info(result_ptr.move_as_ok(), "GetDialogMessageByDateQuery"); td->messages_manager_->get_channel_difference_if_needed( dialog_id_, std::move(info), - PromiseCreator::lambda([td = td, dialog_id = dialog_id_, date = date_, random_id = random_id_, + PromiseCreator::lambda([actor_id = td->messages_manager_actor_.get(), dialog_id = dialog_id_, date = date_, + random_id = random_id_, promise = std::move(promise_)](Result &&result) mutable { - if (G()->close_flag()) { - result = Status::Error(500, "Request aborted"); - } if (result.is_error()) { promise.set_error(result.move_as_error()); } else { auto info = result.move_as_ok(); - td->messages_manager_->on_get_dialog_message_by_date_success(dialog_id, date, random_id, - std::move(info.messages)); - promise.set_value(Unit()); + send_closure(actor_id, &MessagesManager::on_get_dialog_message_by_date_success, dialog_id, date, random_id, + std::move(info.messages), std::move(promise)); } })); } @@ -2054,23 +2036,21 @@ class GetHistoryQuery final : public Td::ResultHandler { return on_error(id, result_ptr.move_as_error()); } - auto info = td->messages_manager_->on_get_messages(result_ptr.move_as_ok(), "GetHistoryQuery"); + auto info = td->messages_manager_->get_messages_info(result_ptr.move_as_ok(), "GetHistoryQuery"); td->messages_manager_->get_channel_difference_if_needed( dialog_id_, std::move(info), - PromiseCreator::lambda([td = td, dialog_id = dialog_id_, from_message_id = from_message_id_, - old_last_new_message_id = old_last_new_message_id_, offset = offset_, limit = limit_, - from_the_end = from_the_end_, + PromiseCreator::lambda([actor_id = td->messages_manager_actor_.get(), dialog_id = dialog_id_, + from_message_id = from_message_id_, old_last_new_message_id = old_last_new_message_id_, + offset = offset_, limit = limit_, from_the_end = from_the_end_, promise = std::move(promise_)](Result &&result) mutable { - if (G()->close_flag()) { - result = Status::Error(500, "Request aborted"); - } if (result.is_error()) { promise.set_error(result.move_as_error()); } else { auto info = result.move_as_ok(); // TODO use info.total_count, info.pts - td->messages_manager_->on_get_history(dialog_id, from_message_id, old_last_new_message_id, offset, limit, - from_the_end, std::move(info.messages), std::move(promise)); + send_closure(actor_id, &MessagesManager::on_get_history, dialog_id, from_message_id, + old_last_new_message_id, offset, limit, from_the_end, std::move(info.messages), + std::move(promise)); } })); } @@ -2271,25 +2251,21 @@ class SearchMessagesQuery final : public Td::ResultHandler { return on_error(id, result_ptr.move_as_error()); } - auto info = td->messages_manager_->on_get_messages(result_ptr.move_as_ok(), "SearchMessagesQuery"); + auto info = td->messages_manager_->get_messages_info(result_ptr.move_as_ok(), "SearchMessagesQuery"); td->messages_manager_->get_channel_difference_if_needed( dialog_id_, std::move(info), - PromiseCreator::lambda([td = td, dialog_id = dialog_id_, query = std::move(query_), - sender_dialog_id = sender_dialog_id_, from_message_id = from_message_id_, - offset = offset_, limit = limit_, filter = filter_, + PromiseCreator::lambda([actor_id = td->messages_manager_actor_.get(), dialog_id = dialog_id_, + query = std::move(query_), sender_dialog_id = sender_dialog_id_, + from_message_id = from_message_id_, offset = offset_, limit = limit_, filter = filter_, top_thread_message_id = top_thread_message_id_, random_id = random_id_, promise = std::move(promise_)](Result &&result) mutable { - if (G()->close_flag()) { - result = Status::Error(500, "Request aborted"); - } if (result.is_error()) { promise.set_error(result.move_as_error()); } else { auto info = result.move_as_ok(); - td->messages_manager_->on_get_dialog_messages_search_result( - dialog_id, query, sender_dialog_id, from_message_id, offset, limit, filter, top_thread_message_id, - random_id, info.total_count, std::move(info.messages)); - promise.set_value(Unit()); + send_closure(actor_id, &MessagesManager::on_get_dialog_messages_search_result, dialog_id, query, + sender_dialog_id, from_message_id, offset, limit, filter, top_thread_message_id, random_id, + info.total_count, std::move(info.messages), std::move(promise)); } })); } @@ -2399,25 +2375,21 @@ class SearchMessagesGlobalQuery final : public Td::ResultHandler { return on_error(id, result_ptr.move_as_error()); } - auto info = td->messages_manager_->on_get_messages(result_ptr.move_as_ok(), "SearchMessagesGlobalQuery"); + auto info = td->messages_manager_->get_messages_info(result_ptr.move_as_ok(), "SearchMessagesGlobalQuery"); td->messages_manager_->get_channel_differences_if_needed( std::move(info), - PromiseCreator::lambda([td = td, query = std::move(query_), offset_date = offset_date_, - offset_dialog_id = offset_dialog_id_, offset_message_id = offset_message_id_, - limit = limit_, filter = std::move(filter_), min_date = min_date_, max_date = max_date_, - random_id = random_id_, + PromiseCreator::lambda([actor_id = td->messages_manager_actor_.get(), query = std::move(query_), + offset_date = offset_date_, offset_dialog_id = offset_dialog_id_, + offset_message_id = offset_message_id_, limit = limit_, filter = std::move(filter_), + min_date = min_date_, max_date = max_date_, random_id = random_id_, promise = std::move(promise_)](Result &&result) mutable { - if (G()->close_flag()) { - result = Status::Error(500, "Request aborted"); - } if (result.is_error()) { promise.set_error(result.move_as_error()); } else { auto info = result.move_as_ok(); - td->messages_manager_->on_get_messages_search_result(query, offset_date, offset_dialog_id, - offset_message_id, limit, filter, min_date, max_date, - random_id, info.total_count, std::move(info.messages)); - promise.set_value(Unit()); + send_closure(actor_id, &MessagesManager::on_get_messages_search_result, query, offset_date, + offset_dialog_id, offset_message_id, limit, filter, min_date, max_date, random_id, + info.total_count, std::move(info.messages), std::move(promise)); } })); } @@ -2457,7 +2429,7 @@ class GetAllScheduledMessagesQuery final : public Td::ResultHandler { if (result_ptr.ok()->get_id() == telegram_api::messages_messagesNotModified::ID) { td->messages_manager_->on_get_scheduled_server_messages(dialog_id_, generation_, Auto(), true); } else { - auto info = td->messages_manager_->on_get_messages(result_ptr.move_as_ok(), "GetAllScheduledMessagesQuery"); + auto info = td->messages_manager_->get_messages_info(result_ptr.move_as_ok(), "GetAllScheduledMessagesQuery"); td->messages_manager_->on_get_scheduled_server_messages(dialog_id_, generation_, std::move(info.messages), false); } @@ -2499,20 +2471,17 @@ class GetRecentLocationsQuery final : public Td::ResultHandler { return on_error(id, result_ptr.move_as_error()); } - auto info = td->messages_manager_->on_get_messages(result_ptr.move_as_ok(), "GetRecentLocationsQuery"); + auto info = td->messages_manager_->get_messages_info(result_ptr.move_as_ok(), "GetRecentLocationsQuery"); td->messages_manager_->get_channel_difference_if_needed( dialog_id_, std::move(info), - PromiseCreator::lambda([td = td, dialog_id = dialog_id_, limit = limit_, + PromiseCreator::lambda([actor_id = td->messages_manager_actor_.get(), dialog_id = dialog_id_, limit = limit_, promise = std::move(promise_)](Result &&result) mutable { - if (G()->close_flag()) { - result = Status::Error(500, "Request aborted"); - } if (result.is_error()) { promise.set_error(result.move_as_error()); } else { auto info = result.move_as_ok(); - td->messages_manager_->on_get_recent_locations(dialog_id, limit, info.total_count, std::move(info.messages), - std::move(promise)); + send_closure(actor_id, &MessagesManager::on_get_recent_locations, dialog_id, limit, info.total_count, + std::move(info.messages), std::move(promise)); } })); } @@ -2555,19 +2524,17 @@ class GetMessagePublicForwardsQuery final : public Td::ResultHandler { return on_error(id, result_ptr.move_as_error()); } - auto info = td->messages_manager_->on_get_messages(result_ptr.move_as_ok(), "GetMessagePublicForwardsQuery"); + auto info = td->messages_manager_->get_messages_info(result_ptr.move_as_ok(), "GetMessagePublicForwardsQuery"); td->messages_manager_->get_channel_differences_if_needed( - std::move(info), PromiseCreator::lambda([td = td, promise = std::move(promise_)]( - Result &&result) mutable { - if (G()->close_flag()) { - result = Status::Error(500, "Request aborted"); - } + std::move(info), + PromiseCreator::lambda([actor_id = td->messages_manager_actor_.get(), + promise = std::move(promise_)](Result &&result) mutable { if (result.is_error()) { promise.set_error(result.move_as_error()); } else { auto info = result.move_as_ok(); - td->messages_manager_->on_get_message_public_forwards(info.total_count, std::move(info.messages), - std::move(promise)); + send_closure(actor_id, &MessagesManager::on_get_message_public_forwards, info.total_count, + std::move(info.messages), std::move(promise)); } })); } @@ -9168,7 +9135,7 @@ void MessagesManager::on_get_empty_messages(DialogId dialog_id, vector &&messages_ptr, const char *source) { CHECK(messages_ptr != nullptr); LOG(DEBUG) << "Receive result for " << source << ": " << to_string(messages_ptr); @@ -9248,27 +9215,31 @@ void MessagesManager::get_channel_differences_if_needed(MessagesInfo &&messages_ } } // must be added after messages_info is checked - mpas.add_promise(PromiseCreator::lambda( - [messages_info = std::move(messages_info), promise = std::move(promise)](Unit ignored) mutable { - if (G()->close_flag()) { - return promise.set_error(Status::Error(500, "Request aborted")); - } - promise.set_value(std::move(messages_info)); - })); + mpas.add_promise(PromiseCreator::lambda([messages_info = std::move(messages_info), promise = std::move(promise)]( + Unit ignored) mutable { promise.set_value(std::move(messages_info)); })); lock.set_value(Unit()); } void MessagesManager::on_get_messages(vector> &&messages, bool is_channel_message, - bool is_scheduled, const char *source) { + bool is_scheduled, Promise &&promise, const char *source) { + if (G()->close_flag()) { + return promise.set_error(Status::Error(500, "Request aborted")); + } + LOG(DEBUG) << "Receive " << messages.size() << " messages"; for (auto &message : messages) { on_get_message(std::move(message), false, is_channel_message, is_scheduled, false, false, source); } + promise.set_value(Unit()); } void MessagesManager::on_get_history(DialogId dialog_id, MessageId from_message_id, MessageId old_last_new_message_id, int32 offset, int32 limit, bool from_the_end, vector> &&messages, Promise &&promise) { + if (G()->close_flag()) { + return promise.set_error(Status::Error(500, "Request aborted")); + } + LOG(INFO) << "Receive " << messages.size() << " history messages " << (from_the_end ? "from the end " : "") << "in " << dialog_id << " from " << from_message_id << " with offset " << offset << " and limit " << limit; CHECK(-limit < offset && offset <= 0); @@ -9613,14 +9584,12 @@ void MessagesManager::on_failed_public_dialogs_search(const string &query, Statu } } -void MessagesManager::on_get_dialog_messages_search_result(DialogId dialog_id, const string &query, - DialogId sender_dialog_id, MessageId from_message_id, - int32 offset, int32 limit, MessageSearchFilter filter, - MessageId top_thread_message_id, int64 random_id, - int32 total_count, - vector> &&messages) { +void MessagesManager::on_get_dialog_messages_search_result( + DialogId dialog_id, const string &query, DialogId sender_dialog_id, MessageId from_message_id, int32 offset, + int32 limit, MessageSearchFilter filter, MessageId top_thread_message_id, int64 random_id, int32 total_count, + vector> &&messages, Promise &&promise) { if (G()->close_flag()) { - return; + return promise.set_error(Status::Error(500, "Request aborted")); } LOG(INFO) << "Receive " << messages.size() << " found messages in " << dialog_id; @@ -9685,6 +9654,7 @@ void MessagesManager::on_get_dialog_messages_search_result(DialogId dialog_id, c } } it->second.first = total_count; + promise.set_value(Unit()); return; } @@ -9782,6 +9752,7 @@ void MessagesManager::on_get_dialog_messages_search_result(DialogId dialog_id, c } it->second.first = total_count; + promise.set_value(Unit()); } void MessagesManager::on_failed_dialog_messages_search(DialogId dialog_id, int64 random_id) { @@ -9834,7 +9805,12 @@ void MessagesManager::on_get_messages_search_result(const string &query, int32 o MessageId offset_message_id, int32 limit, MessageSearchFilter filter, int32 min_date, int32 max_date, int64 random_id, int32 total_count, - vector> &&messages) { + vector> &&messages, + Promise &&promise) { + if (G()->close_flag()) { + return promise.set_error(Status::Error(500, "Request aborted")); + } + LOG(INFO) << "Receive " << messages.size() << " found messages"; auto it = found_messages_.find(random_id); CHECK(it != found_messages_.end()); @@ -9858,6 +9834,7 @@ void MessagesManager::on_get_messages_search_result(const string &query, int32 o total_count = static_cast(result.size()); } it->second.first = total_count; + promise.set_value(Unit()); } void MessagesManager::on_failed_messages_search(int64 random_id) { @@ -9927,6 +9904,10 @@ void MessagesManager::on_get_scheduled_server_messages(DialogId dialog_id, uint3 void MessagesManager::on_get_recent_locations(DialogId dialog_id, int32 limit, int32 total_count, vector> &&messages, Promise> &&promise) { + if (G()->close_flag()) { + return promise.set_error(Status::Error(500, "Request aborted")); + } + LOG(INFO) << "Receive " << messages.size() << " recent locations in " << dialog_id; vector result; for (auto &message : messages) { @@ -9966,6 +9947,7 @@ void MessagesManager::on_get_message_public_forwards(int32 total_count, if (G()->close_flag()) { return promise.set_error(Status::Error(500, "Request aborted")); } + LOG(INFO) << "Receive " << messages.size() << " forwarded messages"; vector> result; FullMessageId last_full_message_id; @@ -22185,7 +22167,12 @@ void MessagesManager::get_dialog_message_by_date_from_server(const Dialog *d, in } void MessagesManager::on_get_dialog_message_by_date_success(DialogId dialog_id, int32 date, int64 random_id, - vector> &&messages) { + vector> &&messages, + Promise &&promise) { + if (G()->close_flag()) { + return promise.set_error(Status::Error(500, "Request aborted")); + } + auto it = get_dialog_message_by_date_results_.find(random_id); CHECK(it != get_dialog_message_by_date_results_.end()); auto &result = it->second; @@ -22211,10 +22198,12 @@ void MessagesManager::on_get_dialog_message_by_date_success(DialogId dialog_id, } get_dialog_message_by_date_results_[random_id] = {dialog_id, message_id}; // TODO result must be adjusted by local messages + promise.set_value(Unit()); return; } } } + promise.set_value(Unit()); } void MessagesManager::on_get_dialog_message_by_date_fail(int64 random_id) { @@ -30614,6 +30603,10 @@ void MessagesManager::send_get_dialog_query(DialogId dialog_id, Promise && } void MessagesManager::on_get_dialog_query_finished(DialogId dialog_id, Status &&status) { + if (G()->close_flag()) { + return; + } + LOG(INFO) << "Finished getting " << dialog_id << " with result " << status; auto it = get_dialog_queries_.find(dialog_id); CHECK(it != get_dialog_queries_.end()); diff --git a/td/telegram/MessagesManager.h b/td/telegram/MessagesManager.h index eb7701286..7fe96a259 100644 --- a/td/telegram/MessagesManager.h +++ b/td/telegram/MessagesManager.h @@ -187,7 +187,7 @@ class MessagesManager final : public Actor { int32 total_count = 0; bool is_channel_messages = false; }; - MessagesInfo on_get_messages(tl_object_ptr &&messages_ptr, const char *source); + MessagesInfo get_messages_info(tl_object_ptr &&messages_ptr, const char *source); void get_channel_difference_if_needed(DialogId dialog_id, MessagesInfo &&messages_info, Promise &&promise); @@ -195,7 +195,7 @@ class MessagesManager final : public Actor { void get_channel_differences_if_needed(MessagesInfo &&messages_info, Promise &&promise); void on_get_messages(vector> &&messages, bool is_channel_message, - bool is_scheduled, const char *source); + bool is_scheduled, Promise &&promise, const char *source); void on_get_history(DialogId dialog_id, MessageId from_message_id, MessageId old_last_new_message_id, int32 offset, int32 limit, bool from_the_end, vector> &&messages, @@ -209,7 +209,8 @@ class MessagesManager final : public Actor { MessageId from_message_id, int32 offset, int32 limit, MessageSearchFilter filter, MessageId top_thread_message_id, int64 random_id, int32 total_count, - vector> &&messages); + vector> &&messages, + Promise &&promise); void on_failed_dialog_messages_search(DialogId dialog_id, int64 random_id); void on_get_dialog_message_count(DialogId dialog_id, MessageSearchFilter filter, int32 total_count, @@ -218,7 +219,7 @@ class MessagesManager final : public Actor { void on_get_messages_search_result(const string &query, int32 offset_date, DialogId offset_dialog_id, MessageId offset_message_id, int32 limit, MessageSearchFilter filter, int32 min_date, int32 max_date, int64 random_id, int32 total_count, - vector> &&messages); + vector> &&messages, Promise &&promise); void on_failed_messages_search(int64 random_id); void on_get_scheduled_server_messages(DialogId dialog_id, uint32 generation, @@ -726,7 +727,8 @@ class MessagesManager final : public Actor { int64 get_dialog_message_by_date(DialogId dialog_id, int32 date, Promise &&promise); void on_get_dialog_message_by_date_success(DialogId dialog_id, int32 date, int64 random_id, - vector> &&messages); + vector> &&messages, + Promise &&promise); void on_get_dialog_message_by_date_fail(int64 random_id);