From 0b255f0260533071e252a35de968c8aecc0dd149 Mon Sep 17 00:00:00 2001 From: levlam Date: Thu, 13 Jun 2024 15:09:20 +0300 Subject: [PATCH] Use request promise in searchCallMessages. --- td/telegram/MessagesManager.cpp | 285 +++++++++++++++++--------------- td/telegram/MessagesManager.h | 14 +- td/telegram/Td.cpp | 31 +--- 3 files changed, 159 insertions(+), 171 deletions(-) diff --git a/td/telegram/MessagesManager.cpp b/td/telegram/MessagesManager.cpp index 2d7b10228..0bc5d9cc7 100644 --- a/td/telegram/MessagesManager.cpp +++ b/td/telegram/MessagesManager.cpp @@ -1899,8 +1899,7 @@ class SearchMessagesQuery final : public Td::ResultHandler { void send(DialogId dialog_id, SavedMessagesTopicId saved_messages_topic_id, const string &query, DialogId sender_dialog_id, MessageId from_message_id, int32 offset, int32 limit, MessageSearchFilter filter, MessageId top_thread_message_id, const ReactionType &tag, int64 random_id) { - auto input_peer = dialog_id.is_valid() ? td_->dialog_manager_->get_input_peer(dialog_id, AccessRights::Read) - : make_tl_object(); + auto input_peer = td_->dialog_manager_->get_input_peer(dialog_id, AccessRights::Read); CHECK(input_peer != nullptr); dialog_id_ = dialog_id; @@ -2019,6 +2018,57 @@ class SearchMessagesQuery final : public Td::ResultHandler { } }; +class SearchCallMessagesQuery final : public Td::ResultHandler { + Promise> promise_; + MessageId from_message_id_; + int32 limit_; + MessageSearchFilter filter_; + + public: + explicit SearchCallMessagesQuery(Promise> &&promise) + : promise_(std::move(promise)) { + } + + void send(MessageId from_message_id, int32 limit, MessageSearchFilter filter) { + from_message_id_ = from_message_id; + limit_ = limit; + filter_ = filter; + + auto offset_id = from_message_id.get_server_message_id().get(); + send_query(G()->net_query_creator().create(telegram_api::messages_search( + 0, telegram_api::make_object(), string(), nullptr, nullptr, + vector>(), 0, get_input_messages_filter(filter), 0, + std::numeric_limits::max(), offset_id, 0, limit, std::numeric_limits::max(), 0, 0))); + } + + void on_result(BufferSlice packet) final { + auto result_ptr = fetch_result(packet); + if (result_ptr.is_error()) { + return on_error(result_ptr.move_as_error()); + } + + auto info = get_messages_info(td_, DialogId(), result_ptr.move_as_ok(), "SearchCallMessagesQuery"); + td_->messages_manager_->get_channel_differences_if_needed( + std::move(info), + PromiseCreator::lambda([actor_id = td_->messages_manager_actor_.get(), from_message_id = from_message_id_, + limit = limit_, filter = filter_, + promise = std::move(promise_)](Result &&result) mutable { + if (result.is_error()) { + promise.set_error(result.move_as_error()); + } else { + auto info = result.move_as_ok(); + send_closure(actor_id, &MessagesManager::on_get_call_messages, from_message_id, limit, filter, + info.total_count, std::move(info.messages), std::move(promise)); + } + }), + "SearchCallMessagesQuery"); + } + + void on_error(Status status) final { + promise_.set_error(std::move(status)); + } +}; + class GetSearchResultPositionsQuery final : public Td::ResultHandler { Promise> promise_; DialogId dialog_id_; @@ -9092,6 +9142,80 @@ void MessagesManager::on_get_message_search_result_calendar( promise.set_value(td_api::make_object(total_count, std::move(days))); } +void MessagesManager::on_get_call_messages(MessageId from_message_id, int32 limit, MessageSearchFilter filter, + int32 total_count, + vector> &&messages, + Promise> &&promise) { + TRY_STATUS_PROMISE(promise, G()->close_status()); + + LOG(INFO) << "Receive " << messages.size() << " found call messages"; + MessageId first_added_message_id; + if (messages.empty()) { + // messages may be empty because there are no more messages or they can't be found due to global limit + // anyway pretend that there are no more messages + first_added_message_id = MessageId::min(); + } + + FoundMessages found_messages; + auto &result = found_messages.message_full_ids; + int32 added_message_count = 0; + MessageId next_offset_message_id; + for (auto &message : messages) { + auto message_id = MessageId::get_message_id(message, false); + if (message_id.is_valid() && (!next_offset_message_id.is_valid() || message_id < next_offset_message_id)) { + next_offset_message_id = message_id; + } + auto new_message_full_id = on_get_message(std::move(message), false, false, false, "on_get_call_messages"); + if (new_message_full_id == MessageFullId()) { + continue; + } + + result.push_back(new_message_full_id); + added_message_count++; + + CHECK(message_id == new_message_full_id.get_message_id()); + CHECK(message_id.is_valid()); + if (message_id < first_added_message_id || !first_added_message_id.is_valid()) { + first_added_message_id = message_id; + } + } + if (total_count < added_message_count) { + LOG(ERROR) << "Receive total_count = " << total_count << ", but added " << added_message_count + << " messages out of " << messages.size(); + total_count = added_message_count; + } + if (G()->use_message_database()) { + bool update_state = false; + + auto &old_message_count = calls_db_state_.message_count_by_index[call_message_search_filter_index(filter)]; + if (old_message_count != total_count) { + LOG(INFO) << "Update calls database message count to " << total_count; + old_message_count = total_count; + update_state = true; + } + + auto &old_first_db_message_id = + calls_db_state_.first_calls_database_message_id_by_index[call_message_search_filter_index(filter)]; + bool from_the_end = !from_message_id.is_valid() || from_message_id >= MessageId::max(); + LOG(INFO) << "Have from_the_end = " << from_the_end << ", old_first_db_message_id = " << old_first_db_message_id + << ", first_added_message_id = " << first_added_message_id << ", from_message_id = " << from_message_id; + if ((from_the_end || (old_first_db_message_id.is_valid() && old_first_db_message_id <= from_message_id)) && + (!old_first_db_message_id.is_valid() || first_added_message_id < old_first_db_message_id)) { + LOG(INFO) << "Update calls database first message to " << first_added_message_id; + old_first_db_message_id = first_added_message_id; + update_state = true; + } + if (update_state) { + save_calls_db_state(); + } + } + found_messages.total_count = total_count; + if (next_offset_message_id.is_valid()) { + found_messages.next_offset = PSTRING() << next_offset_message_id.get_server_message_id().get(); + } + promise.set_value(get_found_messages_object(found_messages, "on_get_call_messages")); +} + void MessagesManager::on_get_dialog_messages_search_result( DialogId dialog_id, SavedMessagesTopicId saved_messages_topic_id, const string &query, DialogId sender_dialog_id, MessageId from_message_id, int32 offset, int32 limit, MessageSearchFilter filter, MessageId top_thread_message_id, @@ -9100,82 +9224,6 @@ void MessagesManager::on_get_dialog_messages_search_result( TRY_STATUS_PROMISE(promise, G()->close_status()); LOG(INFO) << "Receive " << messages.size() << " found messages in " << dialog_id; - if (!dialog_id.is_valid()) { - CHECK(query.empty()); - CHECK(!sender_dialog_id.is_valid()); - CHECK(!top_thread_message_id.is_valid()); - CHECK(!saved_messages_topic_id.is_valid()); - CHECK(tag.is_empty()); - auto it = found_call_messages_.find(random_id); - CHECK(it != found_call_messages_.end()); - - MessageId first_added_message_id; - if (messages.empty()) { - // messages may be empty because there are no more messages or they can't be found due to global limit - // anyway pretend that there are no more messages - first_added_message_id = MessageId::min(); - } - - auto &result = it->second.message_full_ids; - CHECK(result.empty()); - int32 added_message_count = 0; - MessageId next_offset_message_id; - for (auto &message : messages) { - auto message_id = MessageId::get_message_id(message, false); - if (message_id.is_valid() && (!next_offset_message_id.is_valid() || message_id < next_offset_message_id)) { - next_offset_message_id = message_id; - } - auto new_message_full_id = on_get_message(std::move(message), false, false, false, "search call messages"); - if (new_message_full_id == MessageFullId()) { - continue; - } - - result.push_back(new_message_full_id); - added_message_count++; - - CHECK(message_id == new_message_full_id.get_message_id()); - CHECK(message_id.is_valid()); - if (message_id < first_added_message_id || !first_added_message_id.is_valid()) { - first_added_message_id = message_id; - } - } - if (total_count < added_message_count) { - LOG(ERROR) << "Receive total_count = " << total_count << ", but added " << added_message_count - << " messages out of " << messages.size(); - total_count = added_message_count; - } - if (G()->use_message_database()) { - bool update_state = false; - - auto &old_message_count = calls_db_state_.message_count_by_index[call_message_search_filter_index(filter)]; - if (old_message_count != total_count) { - LOG(INFO) << "Update calls database message count to " << total_count; - old_message_count = total_count; - update_state = true; - } - - auto &old_first_db_message_id = - calls_db_state_.first_calls_database_message_id_by_index[call_message_search_filter_index(filter)]; - bool from_the_end = !from_message_id.is_valid() || from_message_id >= MessageId::max(); - LOG(INFO) << "Have from_the_end = " << from_the_end << ", old_first_db_message_id = " << old_first_db_message_id - << ", first_added_message_id = " << first_added_message_id << ", from_message_id = " << from_message_id; - if ((from_the_end || (old_first_db_message_id.is_valid() && old_first_db_message_id <= from_message_id)) && - (!old_first_db_message_id.is_valid() || first_added_message_id < old_first_db_message_id)) { - LOG(INFO) << "Update calls database first message to " << first_added_message_id; - old_first_db_message_id = first_added_message_id; - update_state = true; - } - if (update_state) { - save_calls_db_state(); - } - } - it->second.total_count = total_count; - if (next_offset_message_id.is_valid()) { - it->second.next_offset = PSTRING() << next_offset_message_id.get_server_message_id().get(); - } - promise.set_value(Unit()); - return; - } auto it = found_dialog_messages_.find(random_id); CHECK(it != found_dialog_messages_.end()); @@ -9298,13 +9346,6 @@ void MessagesManager::on_get_dialog_messages_search_result( } void MessagesManager::on_failed_dialog_messages_search(DialogId dialog_id, int64 random_id) { - if (!dialog_id.is_valid()) { - auto it = found_call_messages_.find(random_id); - CHECK(it != found_call_messages_.end()); - found_call_messages_.erase(it); - return; - } - auto it = found_dialog_messages_.find(random_id); CHECK(it != found_dialog_messages_.end()); found_dialog_messages_.erase(it); @@ -20498,26 +20539,10 @@ MessagesManager::FoundDialogMessages MessagesManager::search_dialog_messages( return result; } -MessagesManager::FoundMessages MessagesManager::search_call_messages(const string &offset, int32 limit, - bool only_missed, int64 &random_id, bool use_db, - Promise &&promise) { - if (random_id != 0) { - // request has already been sent before - auto it = found_call_messages_.find(random_id); - if (it != found_call_messages_.end()) { - auto result = std::move(it->second); - found_call_messages_.erase(it); - promise.set_value(Unit()); - return result; - } - random_id = 0; - } - LOG(INFO) << "Search call messages from " << offset << " with limit " << limit; - - FoundMessages result; +void MessagesManager::search_call_messages(const string &offset, int32 limit, bool only_missed, + Promise> &&promise) { if (limit <= 0) { - promise.set_error(Status::Error(400, "Parameter limit must be positive")); - return result; + return promise.set_error(Status::Error(400, "Parameter limit must be positive")); } if (limit > MAX_SEARCH_MESSAGES) { limit = MAX_SEARCH_MESSAGES; @@ -20527,21 +20552,14 @@ MessagesManager::FoundMessages MessagesManager::search_call_messages(const strin if (!offset.empty()) { auto r_offset_server_message_id = to_integer_safe(offset); if (r_offset_server_message_id.is_error()) { - promise.set_error(Status::Error(400, "Invalid offset specified")); - return result; + return promise.set_error(Status::Error(400, "Invalid offset specified")); } offset_message_id = MessageId(ServerMessageId(r_offset_server_message_id.ok())); } - do { - random_id = Random::secure_int64(); - } while (random_id == 0 || found_call_messages_.count(random_id) > 0); - found_call_messages_[random_id]; // reserve place for result - auto filter = only_missed ? MessageSearchFilter::MissedCall : MessageSearchFilter::Call; - - if (use_db && G()->use_message_database()) { + if (G()->use_message_database()) { // try to use database MessageId first_db_message_id = calls_db_state_.first_calls_database_message_id_by_index[call_message_search_filter_index(filter)]; @@ -20561,19 +20579,17 @@ MessagesManager::FoundMessages MessagesManager::search_call_messages(const strin db_query.from_unique_message_id = fixed_from_message_id.get_server_message_id().get(); db_query.limit = limit; G()->td_db()->get_message_db_async()->get_calls( - db_query, PromiseCreator::lambda([random_id, first_db_message_id, filter, promise = std::move(promise)]( - Result calls_result) mutable { + db_query, + PromiseCreator::lambda([first_db_message_id, offset_message_id, limit, filter, + promise = std::move(promise)](Result calls_result) mutable { send_closure(G()->messages_manager(), &MessagesManager::on_message_db_calls_result, std::move(calls_result), - random_id, first_db_message_id, filter, std::move(promise)); + first_db_message_id, offset_message_id, limit, filter, std::move(promise)); })); - return result; + return; } } - td_->create_handler(std::move(promise)) - ->send(DialogId(), SavedMessagesTopicId(), string(), DialogId(), offset_message_id, 0, limit, filter, MessageId(), - ReactionType(), random_id); - return result; + td_->create_handler(std::move(promise))->send(offset_message_id, limit, filter); } void MessagesManager::search_outgoing_document_messages(const string &query, int32 limit, @@ -21200,19 +21216,14 @@ void MessagesManager::on_message_db_fts_result(Result result promise.set_value(get_found_messages_object(found_messages, "on_message_db_fts_result")); } -void MessagesManager::on_message_db_calls_result(Result result, int64 random_id, - MessageId first_db_message_id, MessageSearchFilter filter, - Promise &&promise) { - G()->ignore_result_if_closing(result); - if (result.is_error()) { - found_call_messages_.erase(random_id); - return promise.set_error(result.move_as_error()); - } - auto calls_result = result.move_as_ok(); +void MessagesManager::on_message_db_calls_result(Result result, MessageId first_db_message_id, + MessageId offset_message_id, int32 limit, MessageSearchFilter filter, + Promise> &&promise) { + TRY_STATUS_PROMISE(promise, G()->close_status()); + TRY_RESULT_PROMISE(promise, calls_result, std::move(result)); - auto it = found_call_messages_.find(random_id); - CHECK(it != found_call_messages_.end()); - auto &res = it->second.message_full_ids; + FoundMessages found_messages; + auto &res = found_messages.message_full_ids; CHECK(!first_db_message_id.is_scheduled()); res.reserve(calls_result.messages.size()); @@ -21226,17 +21237,17 @@ void MessagesManager::on_message_db_calls_result(Result re res.emplace_back(message.dialog_id, m->message_id); } } - it->second.total_count = calls_db_state_.message_count_by_index[call_message_search_filter_index(filter)]; + found_messages.total_count = calls_db_state_.message_count_by_index[call_message_search_filter_index(filter)]; if (next_offset_message_id.is_valid()) { - it->second.next_offset = PSTRING() << next_offset_message_id.get_server_message_id().get(); + found_messages.next_offset = PSTRING() << next_offset_message_id.get_server_message_id().get(); } if (res.empty() && first_db_message_id != MessageId::min()) { LOG(INFO) << "No messages found in database"; - found_call_messages_.erase(it); + return td_->create_handler(std::move(promise))->send(offset_message_id, limit, filter); } - promise.set_value(Unit()); + promise.set_value(get_found_messages_object(found_messages, "on_message_db_calls_result")); } void MessagesManager::search_messages(DialogListId dialog_list_id, bool ignore_folder_id, bool broadcasts_only, diff --git a/td/telegram/MessagesManager.h b/td/telegram/MessagesManager.h index 15daebd50..b212454c2 100644 --- a/td/telegram/MessagesManager.h +++ b/td/telegram/MessagesManager.h @@ -182,6 +182,10 @@ class MessagesManager final : public Actor { vector> &&periods, Promise> &&promise); + void on_get_call_messages(MessageId from_message_id, int32 limit, MessageSearchFilter filter, int32 total_count, + vector> &&messages, + Promise> &&promise); + void on_get_dialog_messages_search_result(DialogId dialog_id, SavedMessagesTopicId saved_messages_topic_id, const string &query, DialogId sender_dialog_id, MessageId from_message_id, int32 offset, int32 limit, MessageSearchFilter filter, @@ -728,8 +732,8 @@ class MessagesManager final : public Actor { const string &offset_str, int32 limit, MessageSearchFilter filter, int32 min_date, int32 max_date, Promise> &&promise); - FoundMessages search_call_messages(const string &offset, int32 limit, bool only_missed, int64 &random_id, bool use_db, - Promise &&promise); + void search_call_messages(const string &offset, int32 limit, bool only_missed, + Promise> &&promise); void search_outgoing_document_messages(const string &query, int32 limit, Promise> &&promise); @@ -2837,8 +2841,9 @@ class MessagesManager final : public Actor { void on_message_db_fts_result(Result result, string offset, int32 limit, Promise> &&promise); - void on_message_db_calls_result(Result result, int64 random_id, MessageId first_db_message_id, - MessageSearchFilter filter, Promise &&promise); + void on_message_db_calls_result(Result result, MessageId first_db_message_id, + MessageId offset_message_id, int32 limit, MessageSearchFilter filter, + Promise> &&promise); void on_load_active_live_location_message_full_ids_from_database(string value); @@ -3252,7 +3257,6 @@ class MessagesManager final : public Actor { FlatHashMap found_dialog_messages_; // random_id -> FoundDialogMessages FlatHashMap found_dialog_messages_dialog_id_; // random_id -> dialog_id - FlatHashMap found_call_messages_; // random_id -> FoundMessages struct MessageEmbeddingCodes { FlatHashMap embedding_codes_; diff --git a/td/telegram/Td.cpp b/td/telegram/Td.cpp index cd6a0dc5d..a607f29f6 100644 --- a/td/telegram/Td.cpp +++ b/td/telegram/Td.cpp @@ -1437,34 +1437,6 @@ class SearchChatMessagesRequest final : public RequestActor<> { } }; -class SearchCallMessagesRequest final : public RequestActor<> { - string offset_; - int32 limit_; - bool only_missed_; - int64 random_id_; - - MessagesManager::FoundMessages messages_; - - void do_run(Promise &&promise) final { - messages_ = td_->messages_manager_->search_call_messages(offset_, limit_, only_missed_, random_id_, - get_tries() == 3, std::move(promise)); - } - - void do_send_result() final { - send_result(td_->messages_manager_->get_found_messages_object(messages_, "SearchCallMessagesRequest")); - } - - public: - SearchCallMessagesRequest(ActorShared td, uint64 request_id, string offset, int32 limit, bool only_missed) - : RequestActor(std::move(td), request_id) - , offset_(std::move(offset)) - , limit_(limit) - , only_missed_(only_missed) - , random_id_(0) { - set_tries(3); - } -}; - class GetActiveLiveLocationMessagesRequest final : public RequestActor<> { vector message_full_ids_; @@ -5323,7 +5295,8 @@ void Td::on_request(uint64 id, td_api::searchSavedMessages &request) { void Td::on_request(uint64 id, const td_api::searchCallMessages &request) { CHECK_IS_USER(); - CREATE_REQUEST(SearchCallMessagesRequest, std::move(request.offset_), request.limit_, request.only_missed_); + CREATE_REQUEST_PROMISE(); + messages_manager_->search_call_messages(request.offset_, request.limit_, request.only_missed_, std::move(promise)); } void Td::on_request(uint64 id, td_api::searchOutgoingDocumentMessages &request) {