From 28f0cd5df544dd8d4a974f0174ec06ca7c3a1bb0 Mon Sep 17 00:00:00 2001 From: levlam Date: Thu, 5 Dec 2019 00:24:48 +0300 Subject: [PATCH] Synchronize scheduled messages with the server. GitOrigin-RevId: e4934cf80b6fd769891a684f01ff7d76eeae9cf9 --- td/telegram/MessagesManager.cpp | 123 +++++++++++++++++++++++++++++--- td/telegram/MessagesManager.h | 3 + 2 files changed, 115 insertions(+), 11 deletions(-) diff --git a/td/telegram/MessagesManager.cpp b/td/telegram/MessagesManager.cpp index d925cf3a..20f61613 100644 --- a/td/telegram/MessagesManager.cpp +++ b/td/telegram/MessagesManager.cpp @@ -1557,6 +1557,48 @@ class SearchMessagesGlobalQuery : public Td::ResultHandler { } }; +class GetAllScheduledMessagesQuery : public Td::ResultHandler { + Promise promise_; + DialogId dialog_id_; + uint32 generation_; + + public: + explicit GetAllScheduledMessagesQuery(Promise &&promise) : promise_(std::move(promise)) { + } + + void send(DialogId dialog_id, int32 hash, uint32 generation) { + auto input_peer = td->messages_manager_->get_input_peer(dialog_id, AccessRights::Read); + CHECK(input_peer != nullptr); + + dialog_id_ = dialog_id; + generation_ = generation; + + send_query(G()->net_query_creator().create( + create_storer(telegram_api::messages_getScheduledHistory(std::move(input_peer), hash)))); + } + + void on_result(uint64 id, BufferSlice packet) override { + auto result_ptr = fetch_result(packet); + if (result_ptr.is_error()) { + return on_error(id, result_ptr.move_as_error()); + } + + if (result_ptr.ok()->get_id() == telegram_api::messages_messagesNotModified::ID) { + td->messages_manager_->on_get_scheduled_server_messages(dialog_id_, generation_, Auto(), true); + } else { + auto info = td->messages_manager_->on_get_messages(result_ptr.move_as_ok(), "GetAllScheduledMessagesQuery"); + td->messages_manager_->on_get_scheduled_server_messages(dialog_id_, generation_, std::move(info.messages), false); + } + + promise_.set_value(Unit()); + } + + void on_error(uint64 id, Status status) override { + td->messages_manager_->on_get_dialog_error(dialog_id_, status, "GetAllScheduledMessagesQuery"); + promise_.set_error(std::move(status)); + } +}; + class GetRecentLocationsQuery : public Td::ResultHandler { Promise promise_; DialogId dialog_id_; @@ -8096,6 +8138,60 @@ void MessagesManager::on_failed_messages_search(int64 random_id) { found_messages_.erase(it); } +void MessagesManager::on_get_scheduled_server_messages(DialogId dialog_id, uint32 generation, + vector> &&messages, + bool is_not_modified) { + Dialog *d = get_dialog(dialog_id); + CHECK(d != nullptr); + if (generation < d->scheduled_messages_sync_generation) { + return; + } + d->scheduled_messages_sync_generation = generation; + + if (is_not_modified) { + return; + } + + vector old_message_ids; + find_old_messages(d->scheduled_messages.get(), + MessageId(ScheduledServerMessageId(), std::numeric_limits::max(), true), old_message_ids); + std::unordered_map old_server_message_ids; + for (auto &message_id : old_message_ids) { + if (message_id.is_scheduled_server()) { + old_server_message_ids[message_id.get_scheduled_server_message_id()] = message_id; + } + } + + bool is_channel_message = dialog_id.get_type() == DialogType::Channel; + for (auto &message : messages) { + auto message_dialog_id = get_message_dialog_id(message); + if (message_dialog_id != dialog_id) { + if (dialog_id.is_valid()) { + LOG(ERROR) << "Receive " << get_message_id(message, true) << " in wrong " << message_dialog_id << " instead of " + << dialog_id << ": " << oneline(to_string(message)); + } + continue; + } + + auto full_message_id = on_get_message(std::move(message), false, is_channel_message, true, false, false, + "on_get_scheduled_server_messages"); + auto message_id = full_message_id.get_message_id(); + if (message_id.is_valid_scheduled()) { + CHECK(message_id.is_scheduled_server()); + old_server_message_ids.erase(message_id.get_scheduled_server_message_id()); + } + } + + for (auto it : old_server_message_ids) { + auto message_id = it.second; + auto message = do_delete_scheduled_message(d, message_id, true, "on_get_scheduled_server_messages"); + CHECK(message != nullptr); + send_update_delete_messages(dialog_id, {message_id.get()}, true, false); + } + + send_update_chat_has_scheduled_messages(d); +} + void MessagesManager::on_get_recent_locations(DialogId dialog_id, int32 limit, int64 random_id, int32 total_count, vector> &&messages) { LOG(INFO) << "Receive " << messages.size() << " recent locations in " << dialog_id; @@ -10739,7 +10835,7 @@ void MessagesManager::fix_message_info_dialog_id(MessageInfo &message_info) cons } message_info.dialog_id = DialogId(sender_user_id); - LOG_IF(ERROR, (message_info.flags & MESSAGE_FLAG_IS_OUT) != 0) + LOG_IF(ERROR, !message_info.message_id.is_scheduled() && (message_info.flags & MESSAGE_FLAG_IS_OUT) != 0) << "Receive message out flag for incoming " << message_info.message_id << " in " << message_info.dialog_id; } @@ -10884,10 +10980,11 @@ std::pair> MessagesManager::creat CHECK(sender_user_id == my_id); } - if (sender_user_id.is_valid() && (sender_user_id == my_id && dialog_id != my_dialog_id) != is_outgoing) { + bool supposed_to_be_outgoing = sender_user_id == my_id && !(dialog_id == my_dialog_id && !message_id.is_scheduled()); + if (sender_user_id.is_valid() && supposed_to_be_outgoing != is_outgoing) { LOG(ERROR) << "Receive wrong message out flag: me is " << my_id << ", message is from " << sender_user_id << ", flags = " << flags << " for " << message_id << " in " << dialog_id; - is_outgoing = !is_outgoing; + is_outgoing = supposed_to_be_outgoing; if (dialog_type == DialogType::Channel && !running_get_difference_ && !running_get_channel_difference(dialog_id) && get_channel_difference_to_logevent_id_.count(dialog_id) == 0) { @@ -11029,7 +11126,7 @@ FullMessageId MessagesManager::on_get_message(MessageInfo &&message_info, bool f new_message->have_previous = have_previous; new_message->have_next = have_next; - bool need_update = from_update; + bool need_update = from_update || message_id.is_scheduled(); bool need_update_dialog_pos = false; FullMessageId full_message_id(dialog_id, message_id); @@ -16907,8 +17004,11 @@ vector MessagesManager::get_dialog_scheduled_messages(DialogId dialog } auto hash = get_vector_hash(numbers); - // TODO reload synchronously, if there is no known server messages and (has_scheduled_server_messages == true or - // d->scheduled_messages_sync_generation == 0 && !G()->parameters().use_message_db) + if (d->has_scheduled_server_messages || + (d->scheduled_messages_sync_generation == 0 && !G()->parameters().use_message_db)) { + load_dialog_scheduled_messages(dialog_id, false, hash, std::move(promise)); + return {}; + } load_dialog_scheduled_messages(dialog_id, false, hash, Promise()); } @@ -16931,7 +17031,8 @@ void MessagesManager::load_dialog_scheduled_messages(DialogId dialog_id, bool fr })); } } else { - // TODO reload scheduled messages from server + td_->create_handler(std::move(promise)) + ->send(dialog_id, hash, scheduled_messages_sync_generation_); } } @@ -25146,10 +25247,6 @@ MessagesManager::Message *MessagesManager::add_message_to_dialog(Dialog *d, uniq << ", from_update = " << from_update << ", have_previous = " << message->have_previous << ", have_next = " << message->have_next; - if (*need_update) { - CHECK(from_update); - } - if (!message_id.is_valid()) { if (message_id.is_valid_scheduled()) { return add_scheduled_message_to_dialog(d, std::move(message), from_update, need_update, source); @@ -25160,6 +25257,10 @@ MessagesManager::Message *MessagesManager::add_message_to_dialog(Dialog *d, uniq return nullptr; } + if (*need_update) { + CHECK(from_update); + } + if (d->deleted_message_ids.count(message_id)) { LOG(INFO) << "Skip adding deleted " << message_id << " to " << dialog_id << " from " << source; debug_add_message_to_dialog_fail_reason_ = "adding deleted message"; diff --git a/td/telegram/MessagesManager.h b/td/telegram/MessagesManager.h index 46cd2714..9b38c4fd 100644 --- a/td/telegram/MessagesManager.h +++ b/td/telegram/MessagesManager.h @@ -226,6 +226,9 @@ class MessagesManager : public Actor { vector> &&messages); void on_failed_messages_search(int64 random_id); + void on_get_scheduled_server_messages(DialogId dialog_id, uint32 generation, + vector> &&messages, bool is_not_modified); + void on_get_recent_locations(DialogId dialog_id, int32 limit, int64 random_id, int32 total_count, vector> &&messages); void on_get_recent_locations_failed(int64 random_id);