Repeat channels.getDifference if expected PTS/message_id wasn't reached.

This commit is contained in:
levlam 2023-05-22 19:19:36 +03:00
parent 398fbeadf1
commit 7bc891aa0b
4 changed files with 107 additions and 39 deletions

View File

@ -26,7 +26,7 @@ MessageId::MessageId(ScheduledServerMessageId server_message_id, int32 send_date
}
MessageId MessageId::get_message_id(const telegram_api::Message *message_ptr, bool is_scheduled) {
CHECK(message_ptr != nullptr)
CHECK(message_ptr != nullptr);
switch (message_ptr->get_id()) {
case telegram_api::messageEmpty::ID: {
auto message = static_cast<const telegram_api::messageEmpty *>(message_ptr);
@ -52,6 +52,17 @@ MessageId MessageId::get_message_id(const tl_object_ptr<telegram_api::Message> &
return get_message_id(message_ptr.get(), is_scheduled);
}
MessageId MessageId::get_max_message_id(const vector<telegram_api::object_ptr<telegram_api::Message>> &messages) {
MessageId max_message_id;
for (auto &message : messages) {
auto message_id = get_message_id(message, false);
if (message_id > max_message_id) {
max_message_id = message_id;
}
}
return max_message_id;
}
vector<MessageId> MessageId::get_message_ids(const vector<int64> &input_message_ids) {
return transform(input_message_ids, [](int64 input_message_id) { return MessageId(input_message_id); });
}

View File

@ -78,6 +78,8 @@ class MessageId {
static MessageId get_message_id(const tl_object_ptr<telegram_api::Message> &message_ptr, bool is_scheduled);
static MessageId get_max_message_id(const vector<telegram_api::object_ptr<telegram_api::Message>> &messages);
static vector<MessageId> get_message_ids(const vector<int64> &input_message_ids);
static vector<int32> get_server_message_ids(const vector<MessageId> &message_ids);

View File

@ -6835,11 +6835,11 @@ void MessagesManager::on_update_channel_too_long(tl_object_ptr<telegram_api::upd
if (d != nullptr) {
if (update->pts_ == 0 || update->pts_ > d->pts) {
get_channel_difference(dialog_id, d->pts, true, "on_update_channel_too_long 1");
get_channel_difference(dialog_id, d->pts, update->pts_, MessageId(), true, "on_update_channel_too_long 1");
}
} else {
if (force_apply) {
get_channel_difference(dialog_id, -1, true, "on_update_channel_too_long 2");
get_channel_difference(dialog_id, -1, update->pts_, MessageId(), true, "on_update_channel_too_long 2");
} else {
td_->updates_manager_->schedule_get_difference("on_update_channel_too_long 3");
}
@ -6999,7 +6999,7 @@ void MessagesManager::update_message_interaction_info(FullMessageId full_message
LOG(INFO) << "Ignore message interaction info about unknown " << full_message_id;
if (!message_id.is_scheduled() && message_id > d->last_new_message_id && d->last_new_message_id.is_valid() &&
dialog_id.get_type() == DialogType::Channel) {
get_channel_difference(dialog_id, d->pts, true, "update_message_interaction_info");
get_channel_difference(dialog_id, d->pts, 0, message_id, true, "update_message_interaction_info");
}
return;
}
@ -7516,7 +7516,7 @@ void MessagesManager::on_read_channel_inbox(ChannelId channel_id, MessageId max_
// update from the future, keep it until it can be applied
if (pts >= d->pending_read_channel_inbox_pts) {
if (d->pending_read_channel_inbox_pts == 0) {
schedule_get_channel_difference(dialog_id, 0.001);
schedule_get_channel_difference(dialog_id, pts, MessageId(), 0.001);
}
d->pending_read_channel_inbox_pts = pts;
d->pending_read_channel_inbox_max_message_id = max_message_id;
@ -7866,7 +7866,7 @@ void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_p
LOG(INFO) << "Found a gap in unknown " << dialog_id << " with PTS = " << pts << ". new_pts = " << new_pts
<< ", pts_count = " << pts_count << " in update from " << source;
add_postponed_channel_update(dialog_id, std::move(update), new_pts, pts_count, std::move(promise));
get_channel_difference(dialog_id, pts, true, "add_pending_channel_update 3");
get_channel_difference(dialog_id, pts, new_pts, MessageId(), true, "add_pending_channel_update 3");
return;
}
@ -7932,7 +7932,8 @@ void MessagesManager::add_pending_channel_update(DialogId dialog_id, tl_object_p
if (d->was_opened || td_->contacts_manager_->get_channel_status(channel_id).is_member() ||
is_dialog_sponsored(d)) {
add_postponed_channel_update(dialog_id, std::move(update), new_pts, pts_count, std::move(promise));
get_channel_difference(dialog_id, old_pts, true, "add_pending_channel_update PTS mismatch");
get_channel_difference(dialog_id, old_pts, new_pts, MessageId(), true,
"add_pending_channel_update PTS mismatch");
} else {
promise.set_value(Unit());
}
@ -9702,7 +9703,7 @@ void MessagesManager::after_get_difference() {
}),
"get missing");
} else if (dialog_id.get_type() == DialogType::Channel) {
schedule_get_channel_difference(dialog_id, 0.001);
schedule_get_channel_difference(dialog_id, 0, message_id, 0.001);
}
break;
}
@ -9768,7 +9769,7 @@ void MessagesManager::get_channel_difference_if_needed(DialogId dialog_id, Messa
for (auto &message : messages_info.messages) {
if (need_channel_difference_to_add_message(dialog_id, message)) {
return run_after_channel_difference(
dialog_id,
dialog_id, MessageId::get_max_message_id(messages_info.messages),
PromiseCreator::lambda([messages_info = std::move(messages_info), promise = std::move(promise)](
Unit ignored) mutable { promise.set_value(std::move(messages_info)); }));
}
@ -9788,7 +9789,8 @@ void MessagesManager::get_channel_differences_if_needed(MessagesInfo &&messages_
auto dialog_id = DialogId::get_message_dialog_id(message);
if (need_channel_difference_to_add_message(dialog_id, message)) {
run_after_channel_difference(dialog_id, mpas.get_promise());
run_after_channel_difference(dialog_id, MessageId::get_max_message_id(messages_info.messages),
mpas.get_promise());
}
}
// must be added after messages_info is checked
@ -12334,7 +12336,7 @@ void MessagesManager::read_channel_message_content_from_updates(Dialog *d, Messa
return;
}
if (message_id > d->last_new_message_id && d->last_new_message_id.is_valid()) {
get_channel_difference(d->dialog_id, d->pts, true, "read_channel_message_content_from_updates");
get_channel_difference(d->dialog_id, d->pts, 0, message_id, true, "read_channel_message_content_from_updates");
} else {
// there is no message, so the update can be ignored
if (d->unread_mention_count > 0) {
@ -12606,7 +12608,7 @@ void MessagesManager::read_history_inbox(Dialog *d, MessageId max_message_id, in
}
if (max_message_id > d->last_new_message_id && dialog_id.get_type() == DialogType::Channel) {
schedule_get_channel_difference(dialog_id, 0.001);
schedule_get_channel_difference(dialog_id, 0, max_message_id, 0.001);
}
int32 server_unread_count = calc_new_unread_count(d, max_message_id, MessageType::Server, unread_count);
@ -14559,11 +14561,11 @@ std::pair<DialogId, unique_ptr<MessagesManager::Message>> MessagesManager::creat
/*
// it is useless to call getChannelDifference, because the channel PTS will be increased already
if (dialog_type == DialogType::Channel && !running_get_difference_ && !running_get_channel_difference(dialog_id) &&
get_channel_difference_to_log_event_id_.count(dialog_id) == 0) {
!message_id.is_scheduled() && get_channel_difference_to_log_event_id_.count(dialog_id) == 0) {
// it is safer to completely ignore the message and re-get it through getChannelDifference
Dialog *d = get_dialog(dialog_id);
if (d != nullptr) {
schedule_get_channel_difference(dialog_id, 0.001);
schedule_get_channel_difference(dialog_id, 0, message_id, 0.001);
return {DialogId(), nullptr};
}
}
@ -14832,7 +14834,7 @@ FullMessageId MessagesManager::on_get_message(MessageInfo &&message_info, const
LOG(INFO) << "Ignore " << old_message_id << "/" << message_id << " received not through update from " << source
<< ": " << oneline(to_string(get_message_object(dialog_id, new_message.get(), "on_get_message")));
if (dialog_id.get_type() == DialogType::Channel && have_input_peer(dialog_id, AccessRights::Read)) {
schedule_get_channel_difference(dialog_id, 0.001);
schedule_get_channel_difference(dialog_id, 0, message_id, 0.001);
}
return FullMessageId();
}
@ -15811,7 +15813,7 @@ void MessagesManager::on_get_dialogs(FolderId folder_id, vector<tl_object_ptr<te
send_update_chat_last_message(d, source);
}
} else if (dialog_id.get_type() == DialogType::Channel) {
get_channel_difference(dialog_id, d->pts, true, source);
get_channel_difference(dialog_id, d->pts, dialog->pts_, last_message_id, true, source);
}
}
@ -17947,9 +17949,10 @@ void MessagesManager::process_discussion_message(
for (auto &message : result->messages_) {
if (need_channel_difference_to_add_message(expected_dialog_id, message)) {
return run_after_channel_difference(
expected_dialog_id, PromiseCreator::lambda([actor_id = actor_id(this), result = std::move(result), dialog_id,
message_id, expected_dialog_id, expected_message_id,
promise = std::move(promise)](Unit ignored) mutable {
expected_dialog_id, MessageId::get_max_message_id(result->messages_),
PromiseCreator::lambda([actor_id = actor_id(this), result = std::move(result), dialog_id, message_id,
expected_dialog_id, expected_message_id,
promise = std::move(promise)](Unit ignored) mutable {
send_closure(actor_id, &MessagesManager::process_discussion_message_impl, std::move(result), dialog_id,
message_id, expected_dialog_id, expected_message_id, std::move(promise));
}));
@ -20476,7 +20479,7 @@ void MessagesManager::open_dialog(Dialog *d) {
channel_id, td_api::make_object<td_api::supergroupMembersFilterRecent>(), string(), 0, 200, 200, Auto());
}
}
get_channel_difference(dialog_id, d->pts, true, "open_dialog");
get_channel_difference(dialog_id, d->pts, 0, MessageId(), true, "open_dialog");
reget_dialog_action_bar(dialog_id, "open_dialog", false);
if (td_->contacts_manager_->get_channel_has_linked_channel(channel_id)) {
@ -30763,7 +30766,7 @@ void MessagesManager::check_send_message_result(int64 random_id, DialogId dialog
Dialog *d = get_dialog(dialog_id);
CHECK(d != nullptr);
if (dialog_id.get_type() == DialogType::Channel) {
get_channel_difference(dialog_id, d->pts, true, "check_send_message_result");
get_channel_difference(dialog_id, d->pts, 0, MessageId(), true, "check_send_message_result");
} else {
td_->updates_manager_->schedule_get_difference("check_send_message_result");
}
@ -34502,7 +34505,7 @@ MessagesManager::Message *MessagesManager::add_message_to_dialog(Dialog *d, uniq
<< to_string(get_message_object(dialog_id, message.get(), "add_message_to_dialog"));
if (need_channel_difference_to_add_message(dialog_id, nullptr)) {
schedule_get_channel_difference(dialog_id, 0.001);
schedule_get_channel_difference(dialog_id, 0, MessageId(), 0.001);
}
} else {
LOG(INFO) << "Ignore " << message_id << " in " << dialog_id << " received not through update from " << source;
@ -36848,7 +36851,7 @@ void MessagesManager::fix_new_dialog(Dialog *d, unique_ptr<Message> &&last_datab
d->pending_read_channel_inbox_pts = 0;
on_dialog_updated(dialog_id, "fix_new_dialog 14");
} else {
schedule_get_channel_difference(dialog_id, 0.001);
schedule_get_channel_difference(dialog_id, d->pending_read_channel_inbox_pts, MessageId(), 0.001);
}
} else {
d->pending_read_channel_inbox_pts = 0;
@ -37198,7 +37201,7 @@ bool MessagesManager::set_dialog_order(Dialog *d, int64 new_order, bool need_sen
auto dialog_type = dialog_id.get_type();
if (dialog_type == DialogType::Channel && is_added && being_added_dialog_id_ != dialog_id) {
repair_channel_server_unread_count(d);
schedule_get_channel_difference(dialog_id, 0.001);
schedule_get_channel_difference(dialog_id, 0, MessageId(), 0.001);
}
if (dialog_type == DialogType::Channel && is_removed) {
remove_all_dialog_notifications(d, false, source);
@ -37902,15 +37905,16 @@ bool MessagesManager::need_channel_difference_to_add_message(DialogId dialog_id,
return MessageId::get_message_id(message_ptr, false) > d->last_new_message_id;
}
void MessagesManager::run_after_channel_difference(DialogId dialog_id, Promise<Unit> &&promise) {
void MessagesManager::run_after_channel_difference(DialogId dialog_id, MessageId expected_max_message_id,
Promise<Unit> &&promise) {
CHECK(dialog_id.get_type() == DialogType::Channel);
CHECK(have_input_peer(dialog_id, AccessRights::Read));
run_after_get_channel_difference_[dialog_id].push_back(std::move(promise));
const Dialog *d = get_dialog(dialog_id);
get_channel_difference(dialog_id, d == nullptr ? load_channel_pts(dialog_id) : d->pts, true,
"run_after_channel_difference");
get_channel_difference(dialog_id, d == nullptr ? load_channel_pts(dialog_id) : d->pts, 0, expected_max_message_id,
true, "run_after_channel_difference");
}
bool MessagesManager::running_get_channel_difference(DialogId dialog_id) const {
@ -37925,7 +37929,7 @@ void MessagesManager::on_channel_get_difference_timeout(DialogId dialog_id) {
CHECK(dialog_id.get_type() == DialogType::Channel);
auto d = get_dialog(dialog_id);
CHECK(d != nullptr);
get_channel_difference(dialog_id, d->pts, true, "on_channel_get_difference_timeout");
get_channel_difference(dialog_id, d->pts, 0, MessageId(), true, "on_channel_get_difference_timeout");
}
class MessagesManager::GetChannelDifferenceLogEvent {
@ -37953,13 +37957,40 @@ class MessagesManager::GetChannelDifferenceLogEvent {
}
};
void MessagesManager::schedule_get_channel_difference(DialogId dialog_id, double delay) {
void MessagesManager::update_expected_channel_pts(DialogId dialog_id, int32 expected_pts) {
if (expected_pts <= 0) {
return;
}
auto &old_pts = expected_channel_pts_[dialog_id];
if (old_pts < expected_pts) {
old_pts = expected_pts;
}
}
void MessagesManager::update_expected_channel_max_message_id(DialogId dialog_id, MessageId expected_max_message_id) {
if (expected_max_message_id == MessageId() || td_->auth_manager_->is_bot()) {
return;
}
auto &old_max_message_id = expected_channel_max_message_id_[dialog_id];
if (old_max_message_id < expected_max_message_id) {
old_max_message_id = expected_max_message_id;
}
}
void MessagesManager::schedule_get_channel_difference(DialogId dialog_id, int32 expected_pts,
MessageId expected_max_message_id, double delay) {
LOG(INFO) << "Schedule getDifference in " << dialog_id;
update_expected_channel_pts(dialog_id, expected_pts);
update_expected_channel_max_message_id(dialog_id, expected_max_message_id);
channel_get_difference_retry_timeout_.add_timeout_in(dialog_id.get(), delay);
}
void MessagesManager::get_channel_difference(DialogId dialog_id, int32 pts, bool force, const char *source,
void MessagesManager::get_channel_difference(DialogId dialog_id, int32 pts, int32 expected_pts,
MessageId expected_max_message_id, bool force, const char *source,
bool is_old) {
update_expected_channel_pts(dialog_id, expected_pts);
update_expected_channel_max_message_id(dialog_id, expected_max_message_id);
if (channel_get_difference_retry_timeout_.has_timeout(dialog_id.get())) {
LOG(INFO) << "Skip running channels.getDifference for " << dialog_id << " from " << source
<< " because it is scheduled for later time";
@ -38344,7 +38375,7 @@ void MessagesManager::on_get_channel_difference(
if (delay == 0) {
delay = 1;
}
schedule_get_channel_difference(dialog_id, Random::fast(delay * 1000, delay * 1500) * 1e-3);
schedule_get_channel_difference(dialog_id, 0, MessageId(), Random::fast(delay * 1000, delay * 1500) * 1e-3);
delay *= 2;
if (delay > 60) {
delay = Random::fast(60, 80);
@ -38547,7 +38578,7 @@ void MessagesManager::on_get_channel_difference(
if (!is_final) {
LOG_IF(ERROR, timeout > 0) << "Have timeout in non-final ChannelDifference in " << dialog_id;
get_channel_difference(dialog_id, d->pts, true, "on_get_channel_difference", is_old);
get_channel_difference(dialog_id, d->pts, 0, MessageId(), true, "on_get_channel_difference", is_old);
return;
}
@ -38650,11 +38681,11 @@ void MessagesManager::after_get_channel_difference(DialogId dialog_id, bool succ
set_promises(promises);
}
auto it = pending_channel_on_get_dialogs_.find(dialog_id);
if (it != pending_channel_on_get_dialogs_.end()) {
auto on_get_dialogs_it = pending_channel_on_get_dialogs_.find(dialog_id);
if (on_get_dialogs_it != pending_channel_on_get_dialogs_.end()) {
LOG(INFO) << "Apply postponed results of channel getDialogs for " << dialog_id;
PendingOnGetDialogs res = std::move(it->second);
pending_channel_on_get_dialogs_.erase(it);
PendingOnGetDialogs res = std::move(on_get_dialogs_it->second);
pending_channel_on_get_dialogs_.erase(on_get_dialogs_it);
on_get_dialogs(res.folder_id, std::move(res.dialogs), res.total_count, std::move(res.messages),
std::move(res.promise));
@ -38664,6 +38695,22 @@ void MessagesManager::after_get_channel_difference(DialogId dialog_id, bool succ
(d->order != DEFAULT_ORDER || is_dialog_sponsored(d))) {
get_history_from_the_end_impl(d, true, false, Auto(), "after_get_channel_difference");
}
auto expected_channel_pts_it = expected_channel_pts_.find(dialog_id);
if (expected_channel_pts_it != expected_channel_pts_.end()) {
if (success && expected_channel_pts_it->second > pts) {
schedule_get_channel_difference(dialog_id, 0, MessageId(), 1.0);
}
expected_channel_pts_.erase(expected_channel_pts_it);
}
auto expected_channel_max_message_id_it = expected_channel_max_message_id_.find(dialog_id);
if (expected_channel_max_message_id_it != expected_channel_max_message_id_.end()) {
if (success && d != nullptr && expected_channel_max_message_id_it->second > d->last_new_message_id) {
schedule_get_channel_difference(dialog_id, 0, MessageId(), 1.0);
}
expected_channel_max_message_id_.erase(expected_channel_max_message_id_it);
}
}
void MessagesManager::reget_message_from_server_if_needed(DialogId dialog_id, const Message *m) {

View File

@ -2991,15 +2991,21 @@ class MessagesManager final : public Actor {
bool need_channel_difference_to_add_message(DialogId dialog_id,
const tl_object_ptr<telegram_api::Message> &message_ptr);
void run_after_channel_difference(DialogId dialog_id, Promise<Unit> &&promise);
void run_after_channel_difference(DialogId dialog_id, MessageId expected_max_message_id, Promise<Unit> &&promise);
bool running_get_channel_difference(DialogId dialog_id) const;
void on_channel_get_difference_timeout(DialogId dialog_id);
void schedule_get_channel_difference(DialogId dialog_id, double delay);
void update_expected_channel_pts(DialogId dialog_id, int32 expected_pts);
void get_channel_difference(DialogId dialog_id, int32 pts, bool force, const char *source, bool is_old = false);
void update_expected_channel_max_message_id(DialogId dialog_id, MessageId expected_max_message_id);
void schedule_get_channel_difference(DialogId dialog_id, int32 expected_pts, MessageId expected_max_message_id,
double delay);
void get_channel_difference(DialogId dialog_id, int32 pts, int32 expected_pts, MessageId expected_max_message_id,
bool force, const char *source, bool is_old = false);
void do_get_channel_difference(DialogId dialog_id, int32 pts, bool force,
tl_object_ptr<telegram_api::InputChannel> &&input_channel, bool is_old,
@ -3437,6 +3443,8 @@ class MessagesManager final : public Actor {
FlatHashMap<DialogId, int32, DialogIdHash> channel_get_difference_retry_timeouts_;
FlatHashMap<DialogId, std::multimap<int32, PendingPtsUpdate>, DialogIdHash> postponed_channel_updates_;
FlatHashSet<DialogId, DialogIdHash> is_channel_difference_finished_;
FlatHashMap<DialogId, int32, DialogIdHash> expected_channel_pts_;
FlatHashMap<DialogId, MessageId, DialogIdHash> expected_channel_max_message_id_;
MultiTimeout channel_get_difference_timeout_{"ChannelGetDifferenceTimeout"};
MultiTimeout channel_get_difference_retry_timeout_{"ChannelGetDifferenceRetryTimeout"};