diff --git a/td/telegram/MessagesManager.cpp b/td/telegram/MessagesManager.cpp index 343831125..8ec6f86aa 100644 --- a/td/telegram/MessagesManager.cpp +++ b/td/telegram/MessagesManager.cpp @@ -610,10 +610,16 @@ class UpdateDialogPinnedMessageQuery final : public Td::ResultHandler { }; class UnpinAllMessagesQuery final : public Td::ResultHandler { - Promise promise_; + Promise promise_; DialogId dialog_id_; - void send_request() { + public: + explicit UnpinAllMessagesQuery(Promise &&promise) : promise_(std::move(promise)) { + } + + void send(DialogId dialog_id) { + dialog_id_ = dialog_id; + auto input_peer = td_->messages_manager_->get_input_peer(dialog_id_, AccessRights::Write); if (input_peer == nullptr) { LOG(INFO) << "Can't unpin all messages in " << dialog_id_; @@ -623,45 +629,13 @@ class UnpinAllMessagesQuery final : public Td::ResultHandler { send_query(G()->net_query_creator().create(telegram_api::messages_unpinAllMessages(std::move(input_peer)))); } - public: - explicit UnpinAllMessagesQuery(Promise &&promise) : promise_(std::move(promise)) { - } - - void send(DialogId dialog_id) { - dialog_id_ = dialog_id; - - send_request(); - } - void on_result(BufferSlice packet) final { auto result_ptr = fetch_result(packet); if (result_ptr.is_error()) { return on_error(result_ptr.move_as_error()); } - auto affected_history = result_ptr.move_as_ok(); - CHECK(affected_history->get_id() == telegram_api::messages_affectedHistory::ID); - - if (affected_history->pts_count_ > 0) { - affected_history->pts_count_ = 0; // force receiving real updates from the server - auto promise = affected_history->offset_ > 0 ? Promise() : std::move(promise_); - if (dialog_id_.get_type() == DialogType::Channel) { - td_->messages_manager_->add_pending_channel_update(dialog_id_, make_tl_object(), - affected_history->pts_, affected_history->pts_count_, - std::move(promise), "unpin all messages"); - } else { - td_->updates_manager_->add_pending_pts_update(make_tl_object(), affected_history->pts_, - affected_history->pts_count_, Time::now(), std::move(promise), - "unpin all messages"); - } - } else if (affected_history->offset_ <= 0) { - promise_.set_value(Unit()); - } - - if (affected_history->offset_ > 0) { - send_request(); - return; - } + promise_.set_value(result_ptr.move_as_ok()); } void on_error(Status status) final { @@ -2655,42 +2629,32 @@ class HidePromoDataQuery final : public Td::ResultHandler { }; class DeleteHistoryQuery final : public Td::ResultHandler { - Promise promise_; + Promise promise_; DialogId dialog_id_; - MessageId max_message_id_; - bool remove_from_dialog_list_; - bool revoke_; - void send_request() { + public: + explicit DeleteHistoryQuery(Promise &&promise) : promise_(std::move(promise)) { + } + + void send(DialogId dialog_id, MessageId max_message_id, bool remove_from_dialog_list, bool revoke) { + dialog_id_ = dialog_id; + auto input_peer = td_->messages_manager_->get_input_peer(dialog_id_, AccessRights::Read); if (input_peer == nullptr) { return promise_.set_error(Status::Error(400, "Chat is not accessible")); } int32 flags = 0; - if (!remove_from_dialog_list_) { + if (!remove_from_dialog_list) { flags |= telegram_api::messages_deleteHistory::JUST_CLEAR_MASK; } - if (revoke_) { + if (revoke) { flags |= telegram_api::messages_deleteHistory::REVOKE_MASK; } send_query(G()->net_query_creator().create( telegram_api::messages_deleteHistory(flags, false /*ignored*/, false /*ignored*/, std::move(input_peer), - max_message_id_.get_server_message_id().get(), 0, 0))); - } - - public: - explicit DeleteHistoryQuery(Promise &&promise) : promise_(std::move(promise)) { - } - - void send(DialogId dialog_id, MessageId max_message_id, bool remove_from_dialog_list, bool revoke) { - dialog_id_ = dialog_id; - max_message_id_ = max_message_id; - remove_from_dialog_list_ = remove_from_dialog_list; - revoke_ = revoke; - - send_request(); + max_message_id.get_server_message_id().get(), 0, 0))); } void on_result(BufferSlice packet) final { @@ -2699,21 +2663,7 @@ class DeleteHistoryQuery final : public Td::ResultHandler { return on_error(result_ptr.move_as_error()); } - auto affected_history = result_ptr.move_as_ok(); - CHECK(affected_history->get_id() == telegram_api::messages_affectedHistory::ID); - - if (affected_history->pts_count_ > 0) { - td_->updates_manager_->add_pending_pts_update(make_tl_object(), affected_history->pts_, - affected_history->pts_count_, Time::now(), Promise(), - "delete history query"); - } - - if (affected_history->offset_ > 0) { - send_request(); - return; - } - - promise_.set_value(Unit()); + promise_.set_value(result_ptr.move_as_ok()); } void on_error(Status status) final { @@ -2765,13 +2715,16 @@ class DeleteChannelHistoryQuery final : public Td::ResultHandler { }; class DeleteMessagesByDateQuery final : public Td::ResultHandler { - Promise promise_; + Promise promise_; DialogId dialog_id_; - int32 min_date_; - int32 max_date_; - bool revoke_; - void send_request() { + public: + explicit DeleteMessagesByDateQuery(Promise &&promise) : promise_(std::move(promise)) { + } + + void send(DialogId dialog_id, int32 min_date, int32 max_date, bool revoke) { + dialog_id_ = dialog_id; + auto input_peer = td_->messages_manager_->get_input_peer(dialog_id_, AccessRights::Read); if (input_peer == nullptr) { return promise_.set_error(Status::Error(400, "Chat is not accessible")); @@ -2780,25 +2733,12 @@ class DeleteMessagesByDateQuery final : public Td::ResultHandler { int32 flags = telegram_api::messages_deleteHistory::JUST_CLEAR_MASK | telegram_api::messages_deleteHistory::MIN_DATE_MASK | telegram_api::messages_deleteHistory::MAX_DATE_MASK; - if (revoke_) { + if (revoke) { flags |= telegram_api::messages_deleteHistory::REVOKE_MASK; } send_query(G()->net_query_creator().create(telegram_api::messages_deleteHistory( - flags, false /*ignored*/, false /*ignored*/, std::move(input_peer), 0, min_date_, max_date_))); - } - - public: - explicit DeleteMessagesByDateQuery(Promise &&promise) : promise_(std::move(promise)) { - } - - void send(DialogId dialog_id, int32 min_date, int32 max_date, bool revoke) { - dialog_id_ = dialog_id; - min_date_ = min_date; - max_date_ = max_date; - revoke_ = revoke; - - send_request(); + flags, false /*ignored*/, false /*ignored*/, std::move(input_peer), 0, min_date, max_date))); } void on_result(BufferSlice packet) final { @@ -2807,23 +2747,7 @@ class DeleteMessagesByDateQuery final : public Td::ResultHandler { return on_error(result_ptr.move_as_error()); } - auto affected_history = result_ptr.move_as_ok(); - CHECK(affected_history->get_id() == telegram_api::messages_affectedHistory::ID); - - if (affected_history->pts_count_ > 0) { - affected_history->pts_count_ = 0; // force receiving real updates from the server - auto promise = affected_history->offset_ > 0 ? Promise() : std::move(promise_); - td_->updates_manager_->add_pending_pts_update(make_tl_object(), affected_history->pts_, - affected_history->pts_count_, Time::now(), std::move(promise), - "DeleteMessagesByDateQuery"); - } else if (affected_history->offset_ <= 0) { - promise_.set_value(Unit()); - } - - if (affected_history->offset_ > 0) { - send_request(); - return; - } + promise_.set_value(result_ptr.move_as_ok()); } void on_error(Status status) final { @@ -2926,16 +2850,21 @@ class BlockFromRepliesQuery final : public Td::ResultHandler { }; class DeleteUserHistoryQuery final : public Td::ResultHandler { - Promise promise_; + Promise promise_; ChannelId channel_id_; - UserId user_id_; - void send_request() { + public: + explicit DeleteUserHistoryQuery(Promise &&promise) : promise_(std::move(promise)) { + } + + void send(ChannelId channel_id, UserId user_id) { + channel_id_ = channel_id; + auto input_channel = td_->contacts_manager_->get_input_channel(channel_id_); if (input_channel == nullptr) { return promise_.set_error(Status::Error(400, "Chat is not accessible")); } - auto input_user = td_->contacts_manager_->get_input_user(user_id_); + auto input_user = td_->contacts_manager_->get_input_user(user_id); if (input_user == nullptr) { return promise_.set_error(Status::Error(400, "Usat is not accessible")); } @@ -2944,38 +2873,13 @@ class DeleteUserHistoryQuery final : public Td::ResultHandler { telegram_api::channels_deleteUserHistory(std::move(input_channel), std::move(input_user)))); } - public: - explicit DeleteUserHistoryQuery(Promise &&promise) : promise_(std::move(promise)) { - } - - void send(ChannelId channel_id, UserId user_id) { - channel_id_ = channel_id; - user_id_ = user_id; - - send_request(); - } - void on_result(BufferSlice packet) final { auto result_ptr = fetch_result(packet); if (result_ptr.is_error()) { return on_error(result_ptr.move_as_error()); } - auto affected_history = result_ptr.move_as_ok(); - CHECK(affected_history->get_id() == telegram_api::messages_affectedHistory::ID); - - if (affected_history->pts_count_ > 0) { - td_->messages_manager_->add_pending_channel_update( - DialogId(channel_id_), make_tl_object(), affected_history->pts_, affected_history->pts_count_, - affected_history->offset_ > 0 ? Promise() : std::move(promise_), "delete user history query"); - } else if (affected_history->offset_ <= 0) { - promise_.set_value(Unit()); - } - - if (affected_history->offset_ > 0) { - send_request(); - return; - } + promise_.set_value(result_ptr.move_as_ok()); } void on_error(Status status) final { @@ -2985,10 +2889,16 @@ class DeleteUserHistoryQuery final : public Td::ResultHandler { }; class ReadMentionsQuery final : public Td::ResultHandler { - Promise promise_; + Promise promise_; DialogId dialog_id_; - void send_request() { + public: + explicit ReadMentionsQuery(Promise &&promise) : promise_(std::move(promise)) { + } + + void send(DialogId dialog_id) { + dialog_id_ = dialog_id; + auto input_peer = td_->messages_manager_->get_input_peer(dialog_id_, AccessRights::Read); if (input_peer == nullptr) { return promise_.set_error(Status::Error(400, "Chat is not accessible")); @@ -2997,43 +2907,13 @@ class ReadMentionsQuery final : public Td::ResultHandler { send_query(G()->net_query_creator().create(telegram_api::messages_readMentions(std::move(input_peer)))); } - public: - explicit ReadMentionsQuery(Promise &&promise) : promise_(std::move(promise)) { - } - - void send(DialogId dialog_id) { - dialog_id_ = dialog_id; - - send_request(); - } - void on_result(BufferSlice packet) final { auto result_ptr = fetch_result(packet); if (result_ptr.is_error()) { return on_error(result_ptr.move_as_error()); } - auto affected_history = result_ptr.move_as_ok(); - CHECK(affected_history->get_id() == telegram_api::messages_affectedHistory::ID); - - if (affected_history->pts_count_ > 0) { - if (dialog_id_.get_type() == DialogType::Channel) { - LOG(ERROR) << "Receive pts_count " << affected_history->pts_count_ << " in result of ReadMentionsQuery in " - << dialog_id_; - td_->updates_manager_->get_difference("Wrong messages_readMentions result"); - } else { - td_->updates_manager_->add_pending_pts_update(make_tl_object(), affected_history->pts_, - affected_history->pts_count_, Time::now(), Promise(), - "read all mentions query"); - } - } - - if (affected_history->offset_ > 0) { - send_request(); - return; - } - - promise_.set_value(Unit()); + promise_.set_value(result_ptr.move_as_ok()); } void on_error(Status status) final { @@ -10795,10 +10675,15 @@ void MessagesManager::delete_dialog_history_on_server(DialogId dialog_id, Messag switch (dialog_id.get_type()) { case DialogType::User: - case DialogType::Chat: - td_->create_handler(std::move(promise)) - ->send(dialog_id, max_message_id, remove_from_dialog_list, revoke); + case DialogType::Chat: { + AffectedHistoryQuery query = [td = td_, max_message_id, remove_from_dialog_list, revoke]( + DialogId dialog_id, Promise &&query_promise) { + td->create_handler(std::move(query_promise)) + ->send(dialog_id, max_message_id, remove_from_dialog_list, revoke); + }; + run_affected_history_query_until_complete(dialog_id, std::move(query), false, std::move(promise)); break; + } case DialogType::Channel: td_->create_handler(std::move(promise)) ->send(dialog_id.get_channel_id(), max_message_id, allow_error); @@ -11023,8 +10908,11 @@ void MessagesManager::delete_all_channel_messages_from_user_on_server(ChannelId log_event_id = save_delete_all_channel_messages_from_user_on_server_log_event(channel_id, user_id); } - td_->create_handler(get_erase_log_event_promise(log_event_id, std::move(promise))) - ->send(channel_id, user_id); + AffectedHistoryQuery query = [td = td_, user_id](DialogId dialog_id, Promise &&query_promise) { + td->create_handler(std::move(query_promise))->send(dialog_id.get_channel_id(), user_id); + }; + run_affected_history_query_until_complete(DialogId(channel_id), std::move(query), false, + get_erase_log_event_promise(log_event_id, std::move(promise))); } void MessagesManager::delete_dialog_messages_by_date(DialogId dialog_id, int32 min_date, int32 max_date, bool revoke, @@ -11143,8 +11031,13 @@ void MessagesManager::delete_dialog_messages_by_date_on_server(DialogId dialog_i log_event_id = save_delete_dialog_messages_by_date_on_server_log_event(dialog_id, min_date, max_date, revoke); } - td_->create_handler(get_erase_log_event_promise(log_event_id, std::move(promise))) - ->send(dialog_id, min_date, max_date, revoke); + AffectedHistoryQuery query = [td = td_, min_date, max_date, revoke](DialogId dialog_id, + Promise &&query_promise) { + td->create_handler(std::move(query_promise)) + ->send(dialog_id, min_date, max_date, revoke); + }; + run_affected_history_query_until_complete(dialog_id, std::move(query), true, + get_erase_log_event_promise(log_event_id, std::move(promise))); } int32 MessagesManager::get_unload_dialog_delay() const { @@ -11404,9 +11297,11 @@ void MessagesManager::read_all_dialog_mentions_on_server(DialogId dialog_id, uin log_event_id = save_read_all_dialog_mentions_on_server_log_event(dialog_id); } - LOG(INFO) << "Read all mentions on server in " << dialog_id; - td_->create_handler(get_erase_log_event_promise(log_event_id, std::move(promise))) - ->send(dialog_id); + AffectedHistoryQuery query = [td = td_](DialogId dialog_id, Promise &&query_promise) { + td->create_handler(std::move(query_promise))->send(dialog_id); + }; + run_affected_history_query_until_complete(dialog_id, std::move(query), false, + get_erase_log_event_promise(log_event_id, std::move(promise))); } void MessagesManager::read_message_content_from_updates(MessageId message_id) { @@ -22548,6 +22443,49 @@ int64 MessagesManager::get_dialog_message_by_date(DialogId dialog_id, int32 date return random_id; } +void MessagesManager::run_affected_history_query_until_complete(DialogId dialog_id, AffectedHistoryQuery query, + bool get_affected_messages, Promise &&promise) { + CHECK(!G()->close_flag()); + auto query_promise = PromiseCreator::lambda([actor_id = actor_id(this), dialog_id, query, get_affected_messages, + promise = std::move(promise)](Result &&result) mutable { + if (result.is_error()) { + return promise.set_error(result.move_as_error()); + } + + send_closure(actor_id, &MessagesManager::on_get_affected_history, dialog_id, query, get_affected_messages, + result.move_as_ok(), std::move(promise)); + }); + query(dialog_id, std::move(query_promise)); +} + +void MessagesManager::on_get_affected_history(DialogId dialog_id, AffectedHistoryQuery query, + bool get_affected_messages, AffectedHistory affected_history, + Promise &&promise) { + TRY_STATUS_PROMISE(promise, G()->close_status()); + + if (affected_history->pts_count_ > 0) { + if (get_affected_messages) { + affected_history->pts_count_ = 0; + } + auto update_promise = affected_history->offset_ > 0 ? Promise() : std::move(promise); + if (dialog_id.get_type() == DialogType::Channel) { + td_->messages_manager_->add_pending_channel_update(dialog_id, make_tl_object(), + affected_history->pts_, affected_history->pts_count_, + std::move(update_promise), "on_get_affected_history"); + } else { + td_->updates_manager_->add_pending_pts_update(make_tl_object(), affected_history->pts_, + affected_history->pts_count_, Time::now(), + std::move(update_promise), "on_get_affected_history"); + } + } else if (affected_history->offset_ <= 0) { + promise.set_value(Unit()); + } + + if (affected_history->offset_ > 0) { + run_affected_history_query_until_complete(dialog_id, std::move(query), get_affected_messages, std::move(promise)); + } +} + MessageId MessagesManager::find_message_by_date(const Message *m, int32 date) { if (m == nullptr) { return MessageId(); @@ -32255,8 +32193,11 @@ void MessagesManager::unpin_all_dialog_messages_on_server(DialogId dialog_id, ui log_event_id = save_unpin_all_dialog_messages_on_server_log_event(dialog_id); } - td_->create_handler(get_erase_log_event_promise(log_event_id, std::move(promise))) - ->send(dialog_id); + AffectedHistoryQuery query = [td = td_](DialogId dialog_id, Promise &&query_promise) { + td->create_handler(std::move(query_promise))->send(dialog_id); + }; + run_affected_history_query_until_complete(dialog_id, std::move(query), true, + get_erase_log_event_promise(log_event_id, std::move(promise))); } unique_ptr *MessagesManager::treap_find_message(unique_ptr *v, diff --git a/td/telegram/MessagesManager.h b/td/telegram/MessagesManager.h index 81312d929..3118e33df 100644 --- a/td/telegram/MessagesManager.h +++ b/td/telegram/MessagesManager.h @@ -92,6 +92,8 @@ class MessageContent; class MultiSequenceDispatcher; class Td; +using AffectedHistory = tl_object_ptr; + class MessagesManager final : public Actor { public: // static constexpr int32 MESSAGE_FLAG_IS_UNREAD = 1 << 0; @@ -2027,6 +2029,14 @@ class MessagesManager final : public Actor { void unpin_all_dialog_messages_on_server(DialogId dialog_id, uint64 log_event_id, Promise &&promise); + using AffectedHistoryQuery = std::function)>; + + void run_affected_history_query_until_complete(DialogId dialog_id, AffectedHistoryQuery query, + bool get_affected_messages, Promise &&promise); + + void on_get_affected_history(DialogId dialog_id, AffectedHistoryQuery query, bool get_affected_messages, + AffectedHistory affected_history, Promise &&promise); + static MessageId find_message_by_date(const Message *m, int32 date); static void find_messages_by_date(const Message *m, int32 min_date, int32 max_date, vector &message_ids);