Synchronize scheduled messages with the server.

GitOrigin-RevId: e4934cf80b6fd769891a684f01ff7d76eeae9cf9
This commit is contained in:
levlam 2019-12-05 00:24:48 +03:00
parent f098a71926
commit 28f0cd5df5
2 changed files with 115 additions and 11 deletions

View File

@ -1557,6 +1557,48 @@ class SearchMessagesGlobalQuery : public Td::ResultHandler {
}
};
class GetAllScheduledMessagesQuery : public Td::ResultHandler {
Promise<Unit> promise_;
DialogId dialog_id_;
uint32 generation_;
public:
explicit GetAllScheduledMessagesQuery(Promise<Unit> &&promise) : promise_(std::move(promise)) {
}
void send(DialogId dialog_id, int32 hash, uint32 generation) {
auto input_peer = td->messages_manager_->get_input_peer(dialog_id, AccessRights::Read);
CHECK(input_peer != nullptr);
dialog_id_ = dialog_id;
generation_ = generation;
send_query(G()->net_query_creator().create(
create_storer(telegram_api::messages_getScheduledHistory(std::move(input_peer), hash))));
}
void on_result(uint64 id, BufferSlice packet) override {
auto result_ptr = fetch_result<telegram_api::messages_getScheduledHistory>(packet);
if (result_ptr.is_error()) {
return on_error(id, result_ptr.move_as_error());
}
if (result_ptr.ok()->get_id() == telegram_api::messages_messagesNotModified::ID) {
td->messages_manager_->on_get_scheduled_server_messages(dialog_id_, generation_, Auto(), true);
} else {
auto info = td->messages_manager_->on_get_messages(result_ptr.move_as_ok(), "GetAllScheduledMessagesQuery");
td->messages_manager_->on_get_scheduled_server_messages(dialog_id_, generation_, std::move(info.messages), false);
}
promise_.set_value(Unit());
}
void on_error(uint64 id, Status status) override {
td->messages_manager_->on_get_dialog_error(dialog_id_, status, "GetAllScheduledMessagesQuery");
promise_.set_error(std::move(status));
}
};
class GetRecentLocationsQuery : public Td::ResultHandler {
Promise<Unit> promise_;
DialogId dialog_id_;
@ -8096,6 +8138,60 @@ void MessagesManager::on_failed_messages_search(int64 random_id) {
found_messages_.erase(it);
}
void MessagesManager::on_get_scheduled_server_messages(DialogId dialog_id, uint32 generation,
vector<tl_object_ptr<telegram_api::Message>> &&messages,
bool is_not_modified) {
Dialog *d = get_dialog(dialog_id);
CHECK(d != nullptr);
if (generation < d->scheduled_messages_sync_generation) {
return;
}
d->scheduled_messages_sync_generation = generation;
if (is_not_modified) {
return;
}
vector<MessageId> old_message_ids;
find_old_messages(d->scheduled_messages.get(),
MessageId(ScheduledServerMessageId(), std::numeric_limits<int32>::max(), true), old_message_ids);
std::unordered_map<ScheduledServerMessageId, MessageId, ScheduledServerMessageIdHash> old_server_message_ids;
for (auto &message_id : old_message_ids) {
if (message_id.is_scheduled_server()) {
old_server_message_ids[message_id.get_scheduled_server_message_id()] = message_id;
}
}
bool is_channel_message = dialog_id.get_type() == DialogType::Channel;
for (auto &message : messages) {
auto message_dialog_id = get_message_dialog_id(message);
if (message_dialog_id != dialog_id) {
if (dialog_id.is_valid()) {
LOG(ERROR) << "Receive " << get_message_id(message, true) << " in wrong " << message_dialog_id << " instead of "
<< dialog_id << ": " << oneline(to_string(message));
}
continue;
}
auto full_message_id = on_get_message(std::move(message), false, is_channel_message, true, false, false,
"on_get_scheduled_server_messages");
auto message_id = full_message_id.get_message_id();
if (message_id.is_valid_scheduled()) {
CHECK(message_id.is_scheduled_server());
old_server_message_ids.erase(message_id.get_scheduled_server_message_id());
}
}
for (auto it : old_server_message_ids) {
auto message_id = it.second;
auto message = do_delete_scheduled_message(d, message_id, true, "on_get_scheduled_server_messages");
CHECK(message != nullptr);
send_update_delete_messages(dialog_id, {message_id.get()}, true, false);
}
send_update_chat_has_scheduled_messages(d);
}
void MessagesManager::on_get_recent_locations(DialogId dialog_id, int32 limit, int64 random_id, int32 total_count,
vector<tl_object_ptr<telegram_api::Message>> &&messages) {
LOG(INFO) << "Receive " << messages.size() << " recent locations in " << dialog_id;
@ -10739,7 +10835,7 @@ void MessagesManager::fix_message_info_dialog_id(MessageInfo &message_info) cons
}
message_info.dialog_id = DialogId(sender_user_id);
LOG_IF(ERROR, (message_info.flags & MESSAGE_FLAG_IS_OUT) != 0)
LOG_IF(ERROR, !message_info.message_id.is_scheduled() && (message_info.flags & MESSAGE_FLAG_IS_OUT) != 0)
<< "Receive message out flag for incoming " << message_info.message_id << " in " << message_info.dialog_id;
}
@ -10884,10 +10980,11 @@ std::pair<DialogId, unique_ptr<MessagesManager::Message>> MessagesManager::creat
CHECK(sender_user_id == my_id);
}
if (sender_user_id.is_valid() && (sender_user_id == my_id && dialog_id != my_dialog_id) != is_outgoing) {
bool supposed_to_be_outgoing = sender_user_id == my_id && !(dialog_id == my_dialog_id && !message_id.is_scheduled());
if (sender_user_id.is_valid() && supposed_to_be_outgoing != is_outgoing) {
LOG(ERROR) << "Receive wrong message out flag: me is " << my_id << ", message is from " << sender_user_id
<< ", flags = " << flags << " for " << message_id << " in " << dialog_id;
is_outgoing = !is_outgoing;
is_outgoing = supposed_to_be_outgoing;
if (dialog_type == DialogType::Channel && !running_get_difference_ && !running_get_channel_difference(dialog_id) &&
get_channel_difference_to_logevent_id_.count(dialog_id) == 0) {
@ -11029,7 +11126,7 @@ FullMessageId MessagesManager::on_get_message(MessageInfo &&message_info, bool f
new_message->have_previous = have_previous;
new_message->have_next = have_next;
bool need_update = from_update;
bool need_update = from_update || message_id.is_scheduled();
bool need_update_dialog_pos = false;
FullMessageId full_message_id(dialog_id, message_id);
@ -16907,8 +17004,11 @@ vector<MessageId> MessagesManager::get_dialog_scheduled_messages(DialogId dialog
}
auto hash = get_vector_hash(numbers);
// TODO reload synchronously, if there is no known server messages and (has_scheduled_server_messages == true or
// d->scheduled_messages_sync_generation == 0 && !G()->parameters().use_message_db)
if (d->has_scheduled_server_messages ||
(d->scheduled_messages_sync_generation == 0 && !G()->parameters().use_message_db)) {
load_dialog_scheduled_messages(dialog_id, false, hash, std::move(promise));
return {};
}
load_dialog_scheduled_messages(dialog_id, false, hash, Promise<Unit>());
}
@ -16931,7 +17031,8 @@ void MessagesManager::load_dialog_scheduled_messages(DialogId dialog_id, bool fr
}));
}
} else {
// TODO reload scheduled messages from server
td_->create_handler<GetAllScheduledMessagesQuery>(std::move(promise))
->send(dialog_id, hash, scheduled_messages_sync_generation_);
}
}
@ -25146,10 +25247,6 @@ MessagesManager::Message *MessagesManager::add_message_to_dialog(Dialog *d, uniq
<< ", from_update = " << from_update << ", have_previous = " << message->have_previous
<< ", have_next = " << message->have_next;
if (*need_update) {
CHECK(from_update);
}
if (!message_id.is_valid()) {
if (message_id.is_valid_scheduled()) {
return add_scheduled_message_to_dialog(d, std::move(message), from_update, need_update, source);
@ -25160,6 +25257,10 @@ MessagesManager::Message *MessagesManager::add_message_to_dialog(Dialog *d, uniq
return nullptr;
}
if (*need_update) {
CHECK(from_update);
}
if (d->deleted_message_ids.count(message_id)) {
LOG(INFO) << "Skip adding deleted " << message_id << " to " << dialog_id << " from " << source;
debug_add_message_to_dialog_fail_reason_ = "adding deleted message";

View File

@ -226,6 +226,9 @@ class MessagesManager : public Actor {
vector<tl_object_ptr<telegram_api::Message>> &&messages);
void on_failed_messages_search(int64 random_id);
void on_get_scheduled_server_messages(DialogId dialog_id, uint32 generation,
vector<tl_object_ptr<telegram_api::Message>> &&messages, bool is_not_modified);
void on_get_recent_locations(DialogId dialog_id, int32 limit, int64 random_id, int32 total_count,
vector<tl_object_ptr<telegram_api::Message>> &&messages);
void on_get_recent_locations_failed(int64 random_id);