Get channel difference if it is needed to add messages.

This commit is contained in:
levlam 2020-11-07 22:28:45 +03:00
parent ed25a03dda
commit 00382f825b
2 changed files with 99 additions and 28 deletions

View File

@ -504,10 +504,19 @@ class GetChannelMessagesQuery : public Td::ResultHandler {
}
}
td->messages_manager_->on_get_empty_messages(DialogId(channel_id_), std::move(empty_message_ids));
td->messages_manager_->on_get_messages(std::move(info.messages), info.is_channel_messages, false,
"GetChannelMessagesQuery");
promise_.set_value(Unit());
td->messages_manager_->get_channel_difference_if_needed(
DialogId(channel_id_), std::move(info),
PromiseCreator::lambda(
[td = td, promise = std::move(promise_)](Result<MessagesManager::MessagesInfo> &&result) mutable {
if (result.is_error()) {
promise.set_error(result.move_as_error());
} else {
auto info = result.move_as_ok();
td->messages_manager_->on_get_messages(std::move(info.messages), info.is_channel_messages, false,
"GetChannelMessagesQuery");
promise.set_value(Unit());
}
}));
}
void on_error(uint64 id, Status status) override {
@ -1641,9 +1650,19 @@ class GetDialogMessageByDateQuery : public Td::ResultHandler {
}
auto info = td->messages_manager_->on_get_messages(result_ptr.move_as_ok(), "GetDialogMessageByDateQuery");
td->messages_manager_->on_get_dialog_message_by_date_success(dialog_id_, date_, random_id_,
std::move(info.messages));
promise_.set_value(Unit());
td->messages_manager_->get_channel_difference_if_needed(
dialog_id_, std::move(info),
PromiseCreator::lambda([td = td, dialog_id = dialog_id_, date = date_, random_id = random_id_,
promise = std::move(promise_)](Result<MessagesManager::MessagesInfo> &&result) mutable {
if (result.is_error()) {
promise.set_error(result.move_as_error());
} else {
auto info = result.move_as_ok();
td->messages_manager_->on_get_dialog_message_by_date_success(dialog_id, date, random_id,
std::move(info.messages));
promise.set_value(Unit());
}
}));
}
void on_error(uint64 id, Status status) override {
@ -1706,11 +1725,21 @@ class GetHistoryQuery : public Td::ResultHandler {
}
auto info = td->messages_manager_->on_get_messages(result_ptr.move_as_ok(), "GetHistoryQuery");
// TODO use info.total_count, info.pts
td->messages_manager_->on_get_history(dialog_id_, from_message_id_, offset_, limit_, from_the_end_,
std::move(info.messages));
promise_.set_value(Unit());
td->messages_manager_->get_channel_difference_if_needed(
dialog_id_, std::move(info),
PromiseCreator::lambda([td = td, dialog_id = dialog_id_, from_message_id = from_message_id_, offset = offset_,
limit = limit_, from_the_end = from_the_end_,
promise = std::move(promise_)](Result<MessagesManager::MessagesInfo> &&result) mutable {
if (result.is_error()) {
promise.set_error(result.move_as_error());
} else {
auto info = result.move_as_ok();
// TODO use info.total_count, info.pts
td->messages_manager_->on_get_history(dialog_id, from_message_id, offset, limit, from_the_end,
std::move(info.messages));
promise.set_value(Unit());
}
}));
}
void on_error(uint64 id, Status status) override {
@ -1909,11 +1938,23 @@ class SearchMessagesQuery : public Td::ResultHandler {
}
auto info = td->messages_manager_->on_get_messages(result_ptr.move_as_ok(), "SearchMessagesQuery");
td->messages_manager_->on_get_dialog_messages_search_result(dialog_id_, query_, sender_dialog_id_, from_message_id_,
offset_, limit_, filter_, top_thread_message_id_,
random_id_, info.total_count, std::move(info.messages));
promise_.set_value(Unit());
td->messages_manager_->get_channel_difference_if_needed(
dialog_id_, std::move(info),
PromiseCreator::lambda([td = td, dialog_id = dialog_id_, query = std::move(query_),
sender_dialog_id = sender_dialog_id_, from_message_id = from_message_id_,
offset = offset_, limit = limit_, filter = filter_,
top_thread_message_id = top_thread_message_id_, random_id = random_id_,
promise = std::move(promise_)](Result<MessagesManager::MessagesInfo> &&result) mutable {
if (result.is_error()) {
promise.set_error(result.move_as_error());
} else {
auto info = result.move_as_ok();
td->messages_manager_->on_get_dialog_messages_search_result(
dialog_id, query, sender_dialog_id, from_message_id, offset, limit, filter, top_thread_message_id,
random_id, info.total_count, std::move(info.messages));
promise.set_value(Unit());
}
}));
}
void on_error(uint64 id, Status status) override {
@ -2060,10 +2101,19 @@ class GetRecentLocationsQuery : public Td::ResultHandler {
}
auto info = td->messages_manager_->on_get_messages(result_ptr.move_as_ok(), "GetRecentLocationsQuery");
td->messages_manager_->on_get_recent_locations(dialog_id_, limit_, random_id_, info.total_count,
std::move(info.messages));
promise_.set_value(Unit());
td->messages_manager_->get_channel_difference_if_needed(
dialog_id_, std::move(info),
PromiseCreator::lambda([td = td, dialog_id = dialog_id_, limit = limit_, random_id = random_id_,
promise = std::move(promise_)](Result<MessagesManager::MessagesInfo> &&result) mutable {
if (result.is_error()) {
promise.set_error(result.move_as_error());
} else {
auto info = result.move_as_ok();
td->messages_manager_->on_get_recent_locations(dialog_id, limit, random_id, info.total_count,
std::move(info.messages));
promise.set_value(Unit());
}
}));
}
void on_error(uint64 id, Status status) override {
@ -8844,6 +8894,23 @@ MessagesManager::MessagesInfo MessagesManager::on_get_messages(
return result;
}
void MessagesManager::get_channel_difference_if_needed(DialogId dialog_id, MessagesInfo &&messages_info,
Promise<MessagesInfo> &&promise) {
for (auto &message : messages_info.messages) {
if (need_channel_difference_to_add_message(dialog_id, message)) {
return run_after_channel_difference(dialog_id,
PromiseCreator::lambda([messages_info = std::move(messages_info),
promise = std::move(promise)](Unit ignored) mutable {
if (G()->close_flag()) {
return promise.set_error(Status::Error(500, "Request aborted"));
}
promise.set_value(std::move(messages_info));
}));
}
}
promise.set_value(std::move(messages_info));
}
void MessagesManager::on_get_messages(vector<tl_object_ptr<telegram_api::Message>> &&messages, bool is_channel_message,
bool is_scheduled, const char *source) {
LOG(DEBUG) << "Receive " << messages.size() << " messages";
@ -31044,11 +31111,9 @@ MessagesManager::Message *MessagesManager::add_message_to_dialog(Dialog *d, uniq
}
if (max_message_id != MessageId() && message_id > max_message_id) {
if (!message->from_database) {
if (!message->is_pinned) {
LOG(ERROR) << "Ignore " << message_id << " in " << dialog_id << " received not through update from " << source
<< ". Last is " << max_message_id << ", channel difference " << debug_channel_difference_dialog_
<< " " << to_string(get_message_object(dialog_id, message.get()));
}
LOG(ERROR) << "Ignore " << message_id << " in " << dialog_id << " received not through update from " << source
<< ". Last is " << max_message_id << ", channel difference " << debug_channel_difference_dialog_
<< " " << to_string(get_message_object(dialog_id, message.get()));
dump_debug_message_op(d, 3);
// keep consistent with need_channel_difference_to_add_message
@ -34308,6 +34373,9 @@ bool MessagesManager::need_channel_difference_to_add_message(DialogId dialog_id,
dialog_id == debug_channel_difference_dialog_) {
return false;
}
if (get_message_dialog_id(message_ptr) != dialog_id) {
return false;
}
Dialog *d = get_dialog_force(dialog_id);
if (d == nullptr || d->last_new_message_id == MessageId()) {

View File

@ -234,6 +234,9 @@ class MessagesManager : public Actor {
};
MessagesInfo on_get_messages(tl_object_ptr<telegram_api::messages_Messages> &&messages_ptr, const char *source);
void get_channel_difference_if_needed(DialogId dialog_id, MessagesInfo &&messages_info,
Promise<MessagesInfo> &&promise);
void on_get_messages(vector<tl_object_ptr<telegram_api::Message>> &&messages, bool is_channel_message,
bool is_scheduled, const char *source);
@ -806,7 +809,7 @@ class MessagesManager : public Actor {
void on_resolved_username(const string &username, DialogId dialog_id);
void drop_username(const string &username);
tl_object_ptr<telegram_api::InputNotifyPeer> get_input_notify_peer(DialogId dialogId) const;
tl_object_ptr<telegram_api::InputNotifyPeer> get_input_notify_peer(DialogId dialog_id) const;
void on_update_dialog_notify_settings(DialogId dialog_id,
tl_object_ptr<telegram_api::peerNotifySettings> &&peer_notify_settings,
@ -2070,7 +2073,7 @@ class MessagesManager : public Actor {
void load_folder_dialog_list_from_database(FolderId folder_id, int32 limit, Promise<Unit> &&promise);
void preload_folder_dialog_list(FolderId folderId);
void preload_folder_dialog_list(FolderId folder_id);
static void invalidate_message_indexes(Dialog *d);